RAG Pipelines: Best Practices from Production
Retrieval-Augmented Generation (RAG) has become the cornerstone of building AI systems that need access to external knowledge. After deploying several production RAG systems processing millions of queries, here are the battle-tested strategies that actually work.
Understanding RAG Architecture
RAG combines the power of large language models with external knowledge retrieval, enabling AI systems to provide accurate, up-to-date responses grounded in your data.
The RAG Pipeline
A production RAG system consists of several critical components:
- Document Ingestion: Processing and preparing source documents
- Chunking: Breaking documents into meaningful segments
- Embedding Generation: Converting text to vector representations
- Vector Storage: Storing embeddings for efficient retrieval
- Query Processing: Understanding user queries
- Retrieval: Finding relevant documents
- Context Assembly: Preparing retrieved content for LLM
- Generation: Creating final responses
Document Processing & Chunking
Chunking Strategies
The way you chunk documents significantly impacts retrieval quality:
Fixed-Size Chunking
def fixed_size_chunk(text: str, chunk_size: int = 512, overlap: int = 50):
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap
return chunks
Semantic Chunking
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = splitter.split_text(document)
Best Practices for Chunking
- Size Optimization: 512-1024 tokens works well for most use cases
- Overlap: Use 10-20% overlap to preserve context across chunks
- Metadata Enrichment: Include source, title, section, and page numbers
- Preserve Structure: Respect document hierarchy (headings, paragraphs)
def create_chunk_with_metadata(text: str, source: str, page: int):
return {
"content": text,
"metadata": {
"source": source,
"page": page,
"chunk_size": len(text),
"timestamp": datetime.now().isoformat()
}
}
Embedding Selection & Generation
Choosing the Right Embeddings
OpenAI text-embedding-3-large
- Dimensions: 3072 (configurable)
- Best for: General-purpose applications
- Cost: $0.13 per 1M tokens
sentence-transformers
- Open-source, self-hosted
- Best for: Cost-sensitive applications
- Models: all-MiniLM-L6-v2, all-mpnet-base-v2
Domain-Specific Embeddings Fine-tune for specialized domains:
from sentence_transformers import SentenceTransformer, InputExample
from torch.utils.data import DataLoader
# Load base model
model = SentenceTransformer('all-mpnet-base-v2')
# Prepare training data
train_examples = [
InputExample(texts=['query', 'relevant_doc'], label=1.0),
# ... more examples
]
# Fine-tune
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=16)
model.fit(train_objectives=[(train_dataloader, loss)])
Vector Store Optimization
Choosing a Vector Database
Pinecone
- Fully managed, serverless
- Excellent for production
- Built-in hybrid search
OpenSearch
- Self-hosted, cost-effective
- k-NN and hybrid search
- Good for large-scale deployments
FAISS
- Local, in-memory
- Best for development
- Extremely fast
Indexing Strategies
import pinecone
pinecone.init(api_key="your-key", environment="us-west1-gcp")
# Create index with metadata filtering
index = pinecone.Index("documents")
# Batch upsert for efficiency
def batch_upsert(vectors, batch_size=100):
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i+batch_size]
index.upsert(vectors=batch)
Advanced Retrieval Techniques
Hybrid Search
Combine semantic and keyword search:
def hybrid_search(query: str, k: int = 10, alpha: float = 0.5):
# Semantic search
semantic_results = vector_store.similarity_search(query, k=k)
# Keyword search (BM25)
keyword_results = bm25_search(query, k=k)
# Combine with weights
combined = reciprocal_rank_fusion(
semantic_results,
keyword_results,
weights=[alpha, 1-alpha]
)
return combined[:k]
Re-ranking
Improve precision with a re-ranker:
from sentence_transformers import CrossEncoder
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
def rerank_results(query: str, documents: List[str], top_k: int = 3):
# Score each document
pairs = [[query, doc] for doc in documents]
scores = reranker.predict(pairs)
# Sort by score
ranked = sorted(
zip(documents, scores),
key=lambda x: x[1],
reverse=True
)
return [doc for doc, score in ranked[:top_k]]
Query Expansion
Enhance queries for better retrieval:
def expand_query(query: str) -> List[str]:
llm = ChatOpenAI(model="gpt-3.5-turbo")
prompt = f"""Generate 3 alternative phrasings of this query:
Query: {query}
Return only the alternative queries, one per line."""
response = llm.invoke(prompt)
expanded = [query] + response.content.split('\n')
return expanded
# Use expanded queries
queries = expand_query("How do I reset my password?")
all_results = []
for q in queries:
results = vector_store.similarity_search(q, k=3)
all_results.extend(results)
# Deduplicate and rank
final_results = deduplicate(all_results)
Context Management
Compression
Fit more context within token limits:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=vector_store.as_retriever()
)
compressed_docs = compression_retriever.get_relevant_documents(query)
Relevance Filtering
Remove low-relevance results:
def filter_by_relevance(results: List[Document], threshold: float = 0.7):
return [
doc for doc in results
if doc.metadata.get('score', 0) >= threshold
]
Performance Optimization
Multi-Level Caching
import redis
from functools import lru_cache
# Redis for distributed caching
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cached_retrieval(query: str, k: int = 5):
# Check cache
cache_key = f"query:{hash(query)}:k:{k}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Retrieve and cache
results = vector_store.similarity_search(query, k=k)
redis_client.setex(
cache_key,
3600, # 1 hour TTL
json.dumps([doc.dict() for doc in results])
)
return results
Async Processing
import asyncio
from typing import List
async def async_embed_batch(texts: List[str]):
embeddings = await asyncio.gather(*[
embed_text(text) for text in texts
])
return embeddings
async def async_retrieve(query: str):
# Parallel retrieval from multiple sources
results = await asyncio.gather(
retrieve_from_docs(query),
retrieve_from_kb(query),
retrieve_from_web(query)
)
return combine_results(results)
Monitoring & Evaluation
Key Metrics
Track these metrics in production:
- Retrieval Accuracy: Are you finding relevant documents?
- Response Quality: Are generated answers accurate?
- Latency: End-to-end response time
- Cost: Embedding and LLM API costs
- User Satisfaction: Thumbs up/down feedback
from dataclasses import dataclass
import time
@dataclass
class RAGMetrics:
query: str
retrieval_time: float
generation_time: float
total_time: float
num_chunks_retrieved: int
cost: float
user_feedback: Optional[str] = None
def track_rag_request(query: str):
start = time.time()
# Retrieval
retrieval_start = time.time()
chunks = retrieve(query)
retrieval_time = time.time() - retrieval_start
# Generation
gen_start = time.time()
response = generate(query, chunks)
gen_time = time.time() - gen_start
metrics = RAGMetrics(
query=query,
retrieval_time=retrieval_time,
generation_time=gen_time,
total_time=time.time() - start,
num_chunks_retrieved=len(chunks),
cost=calculate_cost(chunks, response)
)
log_metrics(metrics)
return response
A/B Testing
Test different retrieval strategies:
def ab_test_retrieval(query: str, user_id: str):
variant = hash(user_id) % 2
if variant == 0:
# Control: Standard retrieval
return standard_retrieve(query)
else:
# Treatment: Hybrid search
return hybrid_retrieve(query)
Common Pitfalls & Solutions
Pitfall 1: Chunks Too Large
Problem: Poor retrieval precision Solution: Reduce chunk size to 512-768 tokens
Pitfall 2: No Re-ranking
Problem: Irrelevant results in top positions Solution: Implement cross-encoder re-ranking
Pitfall 3: Ignoring Metadata
Problem: Missing valuable context Solution: Enrich chunks with source, date, author
Pitfall 4: No Feedback Loop
Problem: Can't improve over time Solution: Collect user feedback and retrain
Production Checklist
- [ ] Implement chunking with overlap
- [ ] Add metadata to all chunks
- [ ] Use hybrid search (semantic + keyword)
- [ ] Implement re-ranking
- [ ] Add relevance filtering
- [ ] Set up multi-level caching
- [ ] Monitor key metrics
- [ ] Collect user feedback
- [ ] Regular evaluation with test queries
- [ ] Cost monitoring and optimization
Conclusion
Building production RAG systems requires careful attention to chunking, retrieval, and context management. The difference between a demo and production system lies in the details: proper chunking, hybrid search, re-ranking, caching, and continuous evaluation.
Start simple, measure everything, and iterate based on real-world performance. Your users will tell you what works.
Next Steps:
- Implement basic RAG pipeline
- Add monitoring and metrics
- Optimize based on data
- Scale to production traffic
What's been your biggest challenge with RAG? Share in the comments!