Back to Blog

RAG Pipelines: Best Practices from Production

10 min read
By Hari Prakash Pokala
RAGVector DatabasesOpenSearchPineconeAILLM

Retrieval-Augmented Generation (RAG) has become the cornerstone of building AI systems that need access to external knowledge. After deploying several production RAG systems processing millions of queries, here are the battle-tested strategies that actually work.

Understanding RAG Architecture

RAG combines the power of large language models with external knowledge retrieval, enabling AI systems to provide accurate, up-to-date responses grounded in your data.

The RAG Pipeline

A production RAG system consists of several critical components:

  1. Document Ingestion: Processing and preparing source documents
  2. Chunking: Breaking documents into meaningful segments
  3. Embedding Generation: Converting text to vector representations
  4. Vector Storage: Storing embeddings for efficient retrieval
  5. Query Processing: Understanding user queries
  6. Retrieval: Finding relevant documents
  7. Context Assembly: Preparing retrieved content for LLM
  8. Generation: Creating final responses

Document Processing & Chunking

Chunking Strategies

The way you chunk documents significantly impacts retrieval quality:

Fixed-Size Chunking

def fixed_size_chunk(text: str, chunk_size: int = 512, overlap: int = 50):
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]
        chunks.append(chunk)
        start = end - overlap
    return chunks

Semantic Chunking

from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ". ", " ", ""]
)

chunks = splitter.split_text(document)

Best Practices for Chunking

  1. Size Optimization: 512-1024 tokens works well for most use cases
  2. Overlap: Use 10-20% overlap to preserve context across chunks
  3. Metadata Enrichment: Include source, title, section, and page numbers
  4. Preserve Structure: Respect document hierarchy (headings, paragraphs)
def create_chunk_with_metadata(text: str, source: str, page: int):
    return {
        "content": text,
        "metadata": {
            "source": source,
            "page": page,
            "chunk_size": len(text),
            "timestamp": datetime.now().isoformat()
        }
    }

Embedding Selection & Generation

Choosing the Right Embeddings

OpenAI text-embedding-3-large

  • Dimensions: 3072 (configurable)
  • Best for: General-purpose applications
  • Cost: $0.13 per 1M tokens

sentence-transformers

  • Open-source, self-hosted
  • Best for: Cost-sensitive applications
  • Models: all-MiniLM-L6-v2, all-mpnet-base-v2

Domain-Specific Embeddings Fine-tune for specialized domains:

from sentence_transformers import SentenceTransformer, InputExample
from torch.utils.data import DataLoader

# Load base model
model = SentenceTransformer('all-mpnet-base-v2')

# Prepare training data
train_examples = [
    InputExample(texts=['query', 'relevant_doc'], label=1.0),
    # ... more examples
]

# Fine-tune
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=16)
model.fit(train_objectives=[(train_dataloader, loss)])

Vector Store Optimization

Choosing a Vector Database

Pinecone

  • Fully managed, serverless
  • Excellent for production
  • Built-in hybrid search

OpenSearch

  • Self-hosted, cost-effective
  • k-NN and hybrid search
  • Good for large-scale deployments

FAISS

  • Local, in-memory
  • Best for development
  • Extremely fast

Indexing Strategies

import pinecone

pinecone.init(api_key="your-key", environment="us-west1-gcp")

# Create index with metadata filtering
index = pinecone.Index("documents")

# Batch upsert for efficiency
def batch_upsert(vectors, batch_size=100):
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i+batch_size]
        index.upsert(vectors=batch)

Advanced Retrieval Techniques

Hybrid Search

Combine semantic and keyword search:

def hybrid_search(query: str, k: int = 10, alpha: float = 0.5):
    # Semantic search
    semantic_results = vector_store.similarity_search(query, k=k)

    # Keyword search (BM25)
    keyword_results = bm25_search(query, k=k)

    # Combine with weights
    combined = reciprocal_rank_fusion(
        semantic_results,
        keyword_results,
        weights=[alpha, 1-alpha]
    )

    return combined[:k]

Re-ranking

Improve precision with a re-ranker:

from sentence_transformers import CrossEncoder

reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

def rerank_results(query: str, documents: List[str], top_k: int = 3):
    # Score each document
    pairs = [[query, doc] for doc in documents]
    scores = reranker.predict(pairs)

    # Sort by score
    ranked = sorted(
        zip(documents, scores),
        key=lambda x: x[1],
        reverse=True
    )

    return [doc for doc, score in ranked[:top_k]]

Query Expansion

Enhance queries for better retrieval:

def expand_query(query: str) -> List[str]:
    llm = ChatOpenAI(model="gpt-3.5-turbo")

    prompt = f"""Generate 3 alternative phrasings of this query:
    Query: {query}

    Return only the alternative queries, one per line."""

    response = llm.invoke(prompt)
    expanded = [query] + response.content.split('\n')

    return expanded

# Use expanded queries
queries = expand_query("How do I reset my password?")
all_results = []
for q in queries:
    results = vector_store.similarity_search(q, k=3)
    all_results.extend(results)

# Deduplicate and rank
final_results = deduplicate(all_results)

Context Management

Compression

Fit more context within token limits:

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=vector_store.as_retriever()
)

compressed_docs = compression_retriever.get_relevant_documents(query)

Relevance Filtering

Remove low-relevance results:

def filter_by_relevance(results: List[Document], threshold: float = 0.7):
    return [
        doc for doc in results
        if doc.metadata.get('score', 0) >= threshold
    ]

Performance Optimization

Multi-Level Caching

import redis
from functools import lru_cache

# Redis for distributed caching
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def cached_retrieval(query: str, k: int = 5):
    # Check cache
    cache_key = f"query:{hash(query)}:k:{k}"
    cached = redis_client.get(cache_key)

    if cached:
        return json.loads(cached)

    # Retrieve and cache
    results = vector_store.similarity_search(query, k=k)
    redis_client.setex(
        cache_key,
        3600,  # 1 hour TTL
        json.dumps([doc.dict() for doc in results])
    )

    return results

Async Processing

import asyncio
from typing import List

async def async_embed_batch(texts: List[str]):
    embeddings = await asyncio.gather(*[
        embed_text(text) for text in texts
    ])
    return embeddings

async def async_retrieve(query: str):
    # Parallel retrieval from multiple sources
    results = await asyncio.gather(
        retrieve_from_docs(query),
        retrieve_from_kb(query),
        retrieve_from_web(query)
    )

    return combine_results(results)

Monitoring & Evaluation

Key Metrics

Track these metrics in production:

  1. Retrieval Accuracy: Are you finding relevant documents?
  2. Response Quality: Are generated answers accurate?
  3. Latency: End-to-end response time
  4. Cost: Embedding and LLM API costs
  5. User Satisfaction: Thumbs up/down feedback
from dataclasses import dataclass
import time

@dataclass
class RAGMetrics:
    query: str
    retrieval_time: float
    generation_time: float
    total_time: float
    num_chunks_retrieved: int
    cost: float
    user_feedback: Optional[str] = None

def track_rag_request(query: str):
    start = time.time()

    # Retrieval
    retrieval_start = time.time()
    chunks = retrieve(query)
    retrieval_time = time.time() - retrieval_start

    # Generation
    gen_start = time.time()
    response = generate(query, chunks)
    gen_time = time.time() - gen_start

    metrics = RAGMetrics(
        query=query,
        retrieval_time=retrieval_time,
        generation_time=gen_time,
        total_time=time.time() - start,
        num_chunks_retrieved=len(chunks),
        cost=calculate_cost(chunks, response)
    )

    log_metrics(metrics)
    return response

A/B Testing

Test different retrieval strategies:

def ab_test_retrieval(query: str, user_id: str):
    variant = hash(user_id) % 2

    if variant == 0:
        # Control: Standard retrieval
        return standard_retrieve(query)
    else:
        # Treatment: Hybrid search
        return hybrid_retrieve(query)

Common Pitfalls & Solutions

Pitfall 1: Chunks Too Large

Problem: Poor retrieval precision Solution: Reduce chunk size to 512-768 tokens

Pitfall 2: No Re-ranking

Problem: Irrelevant results in top positions Solution: Implement cross-encoder re-ranking

Pitfall 3: Ignoring Metadata

Problem: Missing valuable context Solution: Enrich chunks with source, date, author

Pitfall 4: No Feedback Loop

Problem: Can't improve over time Solution: Collect user feedback and retrain

Production Checklist

  • [ ] Implement chunking with overlap
  • [ ] Add metadata to all chunks
  • [ ] Use hybrid search (semantic + keyword)
  • [ ] Implement re-ranking
  • [ ] Add relevance filtering
  • [ ] Set up multi-level caching
  • [ ] Monitor key metrics
  • [ ] Collect user feedback
  • [ ] Regular evaluation with test queries
  • [ ] Cost monitoring and optimization

Conclusion

Building production RAG systems requires careful attention to chunking, retrieval, and context management. The difference between a demo and production system lies in the details: proper chunking, hybrid search, re-ranking, caching, and continuous evaluation.

Start simple, measure everything, and iterate based on real-world performance. Your users will tell you what works.

Next Steps:

  1. Implement basic RAG pipeline
  2. Add monitoring and metrics
  3. Optimize based on data
  4. Scale to production traffic

What's been your biggest challenge with RAG? Share in the comments!