Building a Robust Retrieval-Augmented Generation System with LangChain and OpenAI

Table of Contents


Introduction

In the realm of artificial intelligence, Retrieval-Augmented Generation (RAG) has emerged as a powerful technique to enhance the capabilities of language models. By combining retrieval mechanisms with generative models, RAG systems can access external knowledge bases, leading to more accurate and contextually relevant responses.

This blog post will guide you through implementing a RAG system using the following technologies:

We’ll walk through a Python script that processes documents from a folder, creates embeddings, stores them in a vector database, and sets up an interactive question-answering system.


Prerequisites

Before we begin, ensure you have the following:


Setting Up the Environment

First, let’s set up a virtual environment and install the required libraries.

# Create and activate a virtual environment
python3 -m venv rag-env
source rag-env/bin/activate  # For Windows, use 'rag-env\Scripts\activate'

# Upgrade pip
pip install --upgrade pip

# Install required packages
pip install langchain openai chromadb pinecone-client tiktoken
pip install sentence-transformers python-dotenv PyPDF2
pip install langchain-community langchain-openai langchain-chroma

Understanding the Code

Below is the Python script we’ll be discussing:

import os
import sys
import glob
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Updated imports
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_chroma.vectorstores import Chroma
from langchain_openai.llms import OpenAI
from langchain.chains import RetrievalQA

# Updated document loaders
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

def main():
   # Load OpenAI API key
   openai_api_key = os.getenv("OPENAI_API_KEY")
   if not openai_api_key:
       print("Please set your OPENAI_API_KEY in the .env file.")
       sys.exit(1)
  
   # Define the folder path (change 'data' to your folder name)
   folder_path = './data'
   if not os.path.exists(folder_path):
       print(f"Folder '{folder_path}' does not exist.")
       sys.exit(1)
  
   # Read all files in the folder
   documents = []
   for filepath in glob.glob(os.path.join(folder_path, '**/*.*'), recursive=True):
       if os.path.isfile(filepath):
           ext = os.path.splitext(filepath)[1].lower()
           try:
               if ext == '.txt':
                   loader = TextLoader(filepath, encoding='utf-8')
                   documents.extend(loader.load_and_split())
               elif ext == '.pdf':
                   loader = PyPDFLoader(filepath)
                   documents.extend(loader.load_and_split())
               else:
                   print(f"Unsupported file format: {filepath}")
           except Exception as e:
               print(f"Error reading '{filepath}': {e}")
  
   if not documents:
       print("No documents found in the folder.")
       sys.exit(1)
  
   # Split documents into chunks
   text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
   texts = text_splitter.split_documents(documents)
  
   # Initialize embeddings and vector store
   embeddings = OpenAIEmbeddings()
   vector_store = Chroma(embedding_function=embeddings, persist_directory="./chroma_store")
  
   # Add texts to vector store in batches
   batch_size = 500  # Adjust this number as needed
   for i in range(0, len(texts), batch_size):
       batch_texts = texts[i:i+batch_size]
       vector_store.add_documents(batch_texts)
  
   # Set up retriever
   retriever = vector_store.as_retriever(search_kwargs={"k": 3})
  
   # Set up the language model
   llm = OpenAI(temperature=0.7)
  
   # Create the RetrievalQA chain
   qa_chain = RetrievalQA.from_chain_type(
       llm=llm,
       chain_type="stuff",  # Options: 'stuff', 'map_reduce', 'refine', 'map_rerank'
       retriever=retriever
   )
  
   # Interactive prompt for user queries
   print("The system is ready. You can now ask questions about the content.")
   while True:
       query = input("Enter your question (or type 'exit' to quit): ")
       if query.lower() in ('exit', 'quit'):
           break
       try:
           response = qa_chain.run(query)
           print(f"\nAnswer: {response}\n")
       except Exception as e:
           print(f"An error occurred: {e}\n")
          
if __name__ == "__main__":
   main()

Let’s break down each part of the code.

1. Loading Environment Variables

We use python-dotenv to load environment variables from a .env file. This is where we’ll store our OpenAI API key securely.

import os
import sys
from dotenv import load_dotenv

load_dotenv()

openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
    print("Please set your OPENAI_API_KEY in the .env file.")
    sys.exit(1)

Instructions:

2. Importing Necessary Libraries

We import updated modules from langchain and associated packages.

# Embeddings and vector store
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_chroma.vectorstores import Chroma
from langchain_openai.llms import OpenAI
from langchain.chains import RetrievalQA

# Document loaders and text splitter
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

Note: Ensure all packages are up-to-date to avoid deprecation warnings.

3. Loading and Splitting Documents

The script reads all .txt and .pdf files from the specified folder and splits them into manageable chunks.

import glob

folder_path = './data'
if not os.path.exists(folder_path):
    print(f"Folder '{folder_path}' does not exist.")
    sys.exit(1)

documents = []
for filepath in glob.glob(os.path.join(folder_path, '**/*.*'), recursive=True):
    if os.path.isfile(filepath):
        ext = os.path.splitext(filepath)[1].lower()
        try:
            if ext == '.txt':
                loader = TextLoader(filepath, encoding='utf-8')
                documents.extend(loader.load_and_split())
            elif ext == '.pdf':
                loader = PyPDFLoader(filepath)
                documents.extend(loader.load_and_split())
            else:
                print(f"Unsupported file format: {filepath}")
        except Exception as e:
            print(f"Error reading '{filepath}': {e}")

if not documents:
    print("No documents found in the folder.")
    sys.exit(1)

# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
texts = text_splitter.split_documents(documents)

Instructions:

4. Creating Embeddings and Vector Store

We initialize embeddings using OpenAI’s models and store them in ChromaDB.

embeddings = OpenAIEmbeddings()
vector_store = Chroma(embedding_function=embeddings, persist_directory="./chroma_store")

batch_size = 500  # Adjust this number as needed
for i in range(0, len(texts), batch_size):
    batch_texts = texts[i:i+batch_size]
    vector_store.add_documents(batch_texts)

Explanation:

5. Setting Up Retrieval and LLM Chain

We set up the retriever and connect it to the OpenAI language model using LangChain’s RetrievalQA chain.

retriever = vector_store.as_retriever(search_kwargs={"k": 3})

llm = OpenAI(temperature=0.7)

qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",  # Options: 'stuff', 'map_reduce', 'refine', 'map_rerank'
    retriever=retriever
)

Explanation:

6. Interactive Querying

We create an interactive loop where users can input queries and receive answers.

print("The system is ready. You can now ask questions about the content.")
while True:
    query = input("Enter your question (or type 'exit' to quit): ")
    if query.lower() in ('exit', 'quit'):
        break
    try:
        response = qa_chain.run(query)
        print(f"\nAnswer: {response}\n")
    except Exception as e:
        print(f"An error occurred: {e}\n")

Implementing for More Robust Systems

To enhance the robustness and scalability of the system, consider the following improvements.

1. Enhanced Error Handling and Logging

Implement more comprehensive error handling and logging mechanisms to make debugging easier.

Example:

import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Replace print statements with logger
logger.info("The system is ready. You can now ask questions about the content.")

2. Supporting Additional File Types

Extend support to more file formats like .docx, .html, or .csv by using appropriate loaders.

Example:

from langchain_community.document_loaders import UnstructuredWordDocumentLoader, UnstructuredHTMLLoader

# Add support in the file processing loop
elif ext == '.docx':
    loader = UnstructuredWordDocumentLoader(filepath)
    documents.extend(loader.load_and_split())
elif ext == '.html':
    loader = UnstructuredHTMLLoader(filepath)
    documents.extend(loader.load_and_split())

3. Optimizing Text Splitting Strategy

Fine-tune the chunk_size and chunk_overlap based on the nature of your documents to balance context and performance.

Example:

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=300)

4. Advanced Retrieval Techniques

Enhance the retriever by using metadata filtering or experimenting with different similarity metrics.

Example:

retriever = vector_store.as_retriever(
    search_kwargs={"k": 5},
    metadata_filters={"category": "finance"}
)

5. Implementing Caching Mechanisms

Use caching to reduce API calls to OpenAI and improve response times.

Example:

from langchain.cache import InMemoryCache

# Enable caching
qa_chain.cache = InMemoryCache()

6. Scaling with Cloud-Based Vector Stores

For larger datasets, consider using a cloud-based vector store like Pinecone.

Example with Pinecone:

import pinecone

pinecone.init(api_key="your_pinecone_api_key", environment="your_pinecone_environment")

# Create an index
index_name = "your_index_name"
if index_name not in pinecone.list_indexes():
    pinecone.create_index(index_name, dimension=embeddings.dimension)

from langchain_pinecone.vectorstores import Pinecone

index = pinecone.Index(index_name)
vector_store = Pinecone(index, embedding_function=embeddings)

7. Security Best Practices

Ensure the security of your system:


Conclusion

Building a Retrieval-Augmented Generation system using LangChain and OpenAI empowers you to create intelligent applications capable of understanding and utilizing vast amounts of textual data. By implementing the enhancements discussed, you can develop a more robust, scalable, and efficient system tailored to your specific needs.

Next Steps:


References