Every RAG system I've built for clients this year started the same way: "We have a prototype, but it doesn't work well in production." The pipeline is always the same — user query, embed, vector search, top-K, LLM. It works in demos with 50 documents. Then you load 10,000 and everything breaks.
The problem isn't that RAG doesn't work. The problem is that the naive architecture hides three fundamental issues that only appear at scale.
The naive RAG pipeline and why it breaks
The standard RAG flow looks deceptively simple: take the user query, generate an embedding, search for the K most similar vectors, concatenate those chunks, and pass them to the LLM. In a notebook with a curated dataset, this gets impressive results. But production is a different story.
The first problem is that cosine similarity between embeddings does not equal actual relevance. The second problem is that top-K becomes increasingly noisy as your corpus grows. The third problem is that fixed-size chunks break context at sentence and section boundaries.
I will walk through the three-layer architecture we use at Cloudstudio to solve each of these problems, with the actual code we run in production.
Layer 1: Smart chunking
Instead of cutting every N tokens, we use recursive chunking that respects the document structure — headings, paragraphs, code blocks, and tables are treated as atomic units. Each chunk carries metadata from the parent: source, date, section hierarchy, and its position in the document.
We add a 10-15% overlap window between consecutive chunks so that context at boundaries is never lost.
Here is the recursive chunker we use. It tries to split at the most meaningful boundary first (headings), then falls back to paragraphs, sentences, and finally hard token limits:
import re
from dataclasses import dataclass, field
@dataclass
class Chunk:
text: str
metadata: dict = field(default_factory=dict)
token_count: int = 0
chunk_id: str = ""
parent_id: str | None = None
class RecursiveChunker:
"""Split documents respecting structural boundaries."""
def __init__(
self,
max_tokens: int = 512,
overlap_tokens: int = 64,
tokenizer=None,
):
self.max_tokens = max_tokens
self.overlap_tokens = overlap_tokens
self.tokenizer = tokenizer or self._simple_tokenizer
# Separators ordered by priority — try the most meaningful split first
self.separators = [
r'\n#{1,3}\s', # Markdown headings
r'\n\n', # Double newline (paragraph break)
r'\n', # Single newline
r'(?<=[.!?])\s+', # Sentence boundary
r'\s+', # Word boundary (last resort)
]
def chunk_document(self, text: str, source_metadata: dict) -> list[Chunk]:
"""Chunk a document with overlap and metadata propagation."""
raw_sections = self._recursive_split(text, separator_idx=0)
chunks = []
doc_id = source_metadata.get("document_id", "unknown")
for i, section_text in enumerate(raw_sections):
chunk = Chunk(
text=section_text.strip(),
token_count=len(self.tokenizer(section_text)),
chunk_id=f"{doc_id}_chunk_{i:04d}",
metadata={
**source_metadata,
"chunk_index": i,
"total_chunks": len(raw_sections),
"heading_hierarchy": self._extract_headings(section_text),
},
)
chunks.append(chunk)
# Add overlap between consecutive chunks
chunks = self._add_overlap(chunks)
return chunks
def _recursive_split(self, text: str, separator_idx: int) -> list[str]:
"""Try to split with the current separator; if chunks are too big, recurse with the next one."""
if separator_idx >= len(self.separators):
# Last resort: hard cut at token limit
return self._hard_split(text)
pattern = self.separators[separator_idx]
parts = re.split(pattern, text)
result = []
current = ""
for part in parts:
if len(self.tokenizer(current + part)) <= self.max_tokens:
current += part
else:
if current:
result.append(current)
# If this single part is too large, split it with the next separator
if len(self.tokenizer(part)) > self.max_tokens:
result.extend(self._recursive_split(part, separator_idx + 1))
else:
current = part
if current:
result.append(current)
return result
def _add_overlap(self, chunks: list[Chunk]) -> list[Chunk]:
"""Add overlap text from the previous chunk to maintain context at boundaries."""
for i in range(1, len(chunks)):
prev_tokens = self.tokenizer(chunks[i - 1].text)
overlap_text = self._detokenize(prev_tokens[-self.overlap_tokens:])
chunks[i].text = overlap_text + "\n" + chunks[i].text
chunks[i].token_count = len(self.tokenizer(chunks[i].text))
chunks[i].metadata["has_overlap"] = True
return chunks
def _extract_headings(self, text: str) -> list[str]:
return re.findall(r'^#{1,3}\s+(.+)$', text, re.MULTILINE)
def _hard_split(self, text: str) -> list[str]:
tokens = self.tokenizer(text)
return [
self._detokenize(tokens[i:i + self.max_tokens])
for i in range(0, len(tokens), self.max_tokens - self.overlap_tokens)
]
@staticmethod
def _simple_tokenizer(text: str) -> list[str]:
return text.split()
@staticmethod
def _detokenize(tokens: list[str]) -> str:
return " ".join(tokens)
The key insight is the separator priority. A 512-token chunk that ends at a heading boundary is dramatically more useful than one that cuts mid-sentence. We have measured this: chunking at structural boundaries improves retrieval precision by 15-25% compared to fixed-size splitting, with zero additional cost.
Embedding generation
Once you have clean chunks, you need to embed them. We use a batch pipeline that handles rate limits, retries, and metadata propagation:
import numpy as np
from openai import OpenAI
import time
class EmbeddingPipeline:
"""Generate embeddings with batching and rate-limit handling."""
def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 100):
self.client = OpenAI()
self.model = model
self.batch_size = batch_size
self.dimensions = 1536 # For text-embedding-3-small
def embed_chunks(self, chunks: list[Chunk]) -> list[dict]:
"""Embed all chunks, returning vectors with metadata."""
results = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
texts = [chunk.text for chunk in batch]
embeddings = self._embed_with_retry(texts)
for chunk, embedding in zip(batch, embeddings):
results.append({
"id": chunk.chunk_id,
"values": embedding,
"metadata": {
**chunk.metadata,
"text": chunk.text,
"token_count": chunk.token_count,
}
})
return results
def _embed_with_retry(self, texts: list[str], max_retries: int = 3) -> list[list[float]]:
for attempt in range(max_retries):
try:
response = self.client.embeddings.create(
model=self.model,
input=texts,
dimensions=self.dimensions,
)
return [item.embedding for item in response.data]
except Exception as e:
if "rate_limit" in str(e).lower() and attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
def embed_query(self, query: str) -> list[float]:
"""Embed a single query for search."""
response = self.client.embeddings.create(
model=self.model,
input=query,
dimensions=self.dimensions,
)
return response.data[0].embedding
Two important choices here: we use text-embedding-3-small instead of the large variant because the quality difference is marginal for most retrieval tasks but the cost is 5x lower. And we set explicit dimensions — this lets us reduce the vector size later if storage becomes a constraint without re-embedding the entire corpus.
Layer 2: Hybrid retrieval
Pure vector search captures semantic similarity, but misses exact terms that matter — product names, error codes, internal IDs. BM25 keyword search catches those exact matches that vectors miss. Using both in parallel gives you the best of both worlds.
But the real magic is in cross-encoder reranking. You take the combined candidates from both searches and score each one with a model that evaluates query-document pairs for actual relevance, not just vector proximity.
Here is our hybrid retrieval implementation. We use Qdrant for vectors and a simple BM25 implementation, then fuse the results:
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
import math
from collections import Counter
class BM25Index:
"""Lightweight BM25 implementation for keyword search."""
def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
self.doc_freqs: dict[str, int] = {}
self.doc_lengths: dict[str, int] = {}
self.avg_doc_length: float = 0
self.corpus_size: int = 0
self.index: dict[str, dict[str, int]] = {} # term -> {doc_id: freq}
self.documents: dict[str, str] = {}
def add_documents(self, docs: list[dict]):
for doc in docs:
doc_id = doc["id"]
text = doc["text"].lower()
tokens = text.split()
self.documents[doc_id] = doc["text"]
self.doc_lengths[doc_id] = len(tokens)
term_freqs = Counter(tokens)
for term, freq in term_freqs.items():
if term not in self.index:
self.index[term] = {}
self.index[term][doc_id] = freq
self.doc_freqs[term] = len(self.index.get(term, {}))
self.corpus_size = len(self.documents)
self.avg_doc_length = sum(self.doc_lengths.values()) / max(self.corpus_size, 1)
def search(self, query: str, top_k: int = 20) -> list[tuple[str, float]]:
query_terms = query.lower().split()
scores: dict[str, float] = {}
for term in query_terms:
if term not in self.index:
continue
df = self.doc_freqs[term]
idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
for doc_id, tf in self.index[term].items():
dl = self.doc_lengths[doc_id]
numerator = tf * (self.k1 + 1)
denominator = tf + self.k1 * (1 - self.b + self.b * dl / self.avg_doc_length)
scores[doc_id] = scores.get(doc_id, 0) + idf * numerator / denominator
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:top_k]
class HybridRetriever:
"""Combine vector search and BM25 with reciprocal rank fusion."""
def __init__(self, qdrant_url: str, collection_name: str):
self.vector_client = QdrantClient(url=qdrant_url)
self.collection = collection_name
self.bm25 = BM25Index()
self.embedding_pipeline = EmbeddingPipeline()
def search(self, query: str, top_k: int = 10, vector_weight: float = 0.6) -> list[dict]:
"""Run hybrid search with reciprocal rank fusion."""
# Run both searches in parallel (simplified here as sequential)
vector_results = self._vector_search(query, top_k=top_k * 2)
bm25_results = self.bm25.search(query, top_k=top_k * 2)
# Reciprocal rank fusion
fused_scores: dict[str, float] = {}
k = 60 # RRF constant
for rank, (doc_id, _score) in enumerate(vector_results):
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + vector_weight / (k + rank + 1)
for rank, (doc_id, _score) in enumerate(bm25_results):
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + (1 - vector_weight) / (k + rank + 1)
# Sort by fused score and return top_k
ranked = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
return [{"id": doc_id, "score": score} for doc_id, score in ranked]
def _vector_search(self, query: str, top_k: int) -> list[tuple[str, float]]:
query_embedding = self.embedding_pipeline.embed_query(query)
results = self.vector_client.search(
collection_name=self.collection,
query_vector=query_embedding,
limit=top_k,
)
return [(hit.id, hit.score) for hit in results]
We use reciprocal rank fusion (RRF) instead of score normalization because RRF is more robust — vector similarity scores and BM25 scores live on completely different scales, and normalizing them introduces artifacts. RRF only cares about rank position, which makes the fusion stable across different queries and corpus sizes.
The vector_weight parameter defaults to 0.6, meaning we lean slightly toward semantic search. For technical documentation with lots of specific terms (error codes, API endpoints), we increase the BM25 weight to 0.5 or even 0.6. For conversational content (support articles, FAQs), we keep semantic search dominant.
Re-ranking for precision
The combined results from hybrid search are good, but re-ranking with a cross-encoder model pushes precision significantly higher. A cross-encoder scores each query-document pair directly instead of comparing pre-computed embeddings:
from sentence_transformers import CrossEncoder
class Reranker:
"""Re-rank retrieved chunks using a cross-encoder for maximum relevance."""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: list[dict], top_k: int = 5) -> list[dict]:
"""Score each document against the query and return the top_k most relevant."""
if not documents:
return []
# Build query-document pairs for the cross-encoder
pairs = [(query, doc["text"]) for doc in documents]
scores = self.model.predict(pairs)
# Attach scores and sort
for doc, score in zip(documents, scores):
doc["rerank_score"] = float(score)
ranked = sorted(documents, key=lambda d: d["rerank_score"], reverse=True)
return ranked[:top_k]
Re-ranking typically takes 50-200ms for 20 candidates, which is acceptable for most applications. The precision improvement is substantial — we consistently measure a 20-30% improvement in top-5 relevance after re-ranking. The cross-encoder model is small (66M parameters) and can run on CPU, so it adds minimal infrastructure cost.
For latency-sensitive applications, you can skip re-ranking and rely on the hybrid fusion scores. But for anything where answer quality matters more than response time (internal knowledge bases, document Q&A, compliance search), re-ranking is worth the extra 100ms.
Layer 3: Context assembly
Having good chunks is not enough. The way you assemble them into the final prompt determines whether the LLM gives a coherent answer or a disjointed summary.
Parent-child expansion is key: when you retrieve a chunk, pull its parent section for full context. Source deduplication prevents sending overlapping content. Token budget management ensures you fit maximum relevant context within the model's window. And citation tracking maps every claim back to its source chunk.
Here is the context assembler we use:
import tiktoken
class ContextAssembler:
"""Assemble retrieved chunks into a prompt with citations and token budgets."""
def __init__(self, max_context_tokens: int = 12000):
self.max_tokens = max_context_tokens
self.encoder = tiktoken.encoding_for_model("gpt-4") # Token counting
def assemble(self, query: str, ranked_chunks: list[dict], chunk_store: dict) -> dict:
"""Build the context block with citation tracking."""
context_parts = []
citations = []
used_tokens = 0
seen_texts = set() # For deduplication
for rank, chunk in enumerate(ranked_chunks):
# Parent expansion: if the chunk has a parent, include it for context
expanded_text = self._expand_with_parent(chunk, chunk_store)
# Deduplication: skip if we have seen substantially similar content
text_fingerprint = self._fingerprint(expanded_text)
if text_fingerprint in seen_texts:
continue
seen_texts.add(text_fingerprint)
# Token budget check
chunk_tokens = len(self.encoder.encode(expanded_text))
if used_tokens + chunk_tokens > self.max_tokens:
# Try to fit a truncated version
remaining = self.max_tokens - used_tokens
if remaining > 100: # Only include if meaningful
expanded_text = self._truncate_to_tokens(expanded_text, remaining)
chunk_tokens = remaining
else:
break
# Add with citation marker
citation_id = f"[{len(citations) + 1}]"
source = chunk.get("metadata", {}).get("source", "Unknown")
section = chunk.get("metadata", {}).get("heading_hierarchy", [])
section_str = " > ".join(section) if section else "N/A"
context_parts.append(
f"--- Source {citation_id}: {source} | Section: {section_str} ---\n"
f"{expanded_text}\n"
)
citations.append({
"id": citation_id,
"source": source,
"section": section_str,
"chunk_id": chunk.get("id"),
"relevance_score": chunk.get("rerank_score", chunk.get("score", 0)),
})
used_tokens += chunk_tokens
context_block = "\n".join(context_parts)
return {
"context": context_block,
"citations": citations,
"tokens_used": used_tokens,
"chunks_included": len(citations),
"prompt": self._build_prompt(query, context_block, citations),
}
def _build_prompt(self, query: str, context: str, citations: list) -> str:
citation_legend = "\n".join(
f" {c['id']} = {c['source']} ({c['section']})" for c in citations
)
return (
f"Answer the following question using ONLY the provided sources. "
f"Cite your sources using the bracket notation (e.g., [1], [2]).\n"
f"If the sources do not contain enough information, say so explicitly.\n\n"
f"Sources:\n{context}\n\n"
f"Citation legend:\n{citation_legend}\n\n"
f"Question: {query}\n\n"
f"Answer:"
)
def _expand_with_parent(self, chunk: dict, chunk_store: dict) -> str:
"""If the chunk has a parent, prepend the parent's heading for context."""
parent_id = chunk.get("metadata", {}).get("parent_id")
if parent_id and parent_id in chunk_store:
parent = chunk_store[parent_id]
headings = parent.get("metadata", {}).get("heading_hierarchy", [])
if headings:
return f"## {headings[-1]}\n\n{chunk['text']}"
return chunk["text"]
def _fingerprint(self, text: str) -> str:
"""Create a rough fingerprint for deduplication."""
words = text.lower().split()[:50]
return " ".join(sorted(set(words)))
def _truncate_to_tokens(self, text: str, max_tokens: int) -> str:
tokens = self.encoder.encode(text)[:max_tokens]
return self.encoder.decode(tokens)
The citation tracking is essential for production RAG. Users need to verify answers, and your system needs an audit trail. Every claim in the LLM response can be traced back to a specific chunk, which can be traced back to a specific document and section. Without citations, RAG is a black box that nobody trusts.
Evaluation: measuring retrieval quality
You cannot improve what you do not measure. We maintain an evaluation set of query-expected_document pairs and run automated metrics after every change to the pipeline:
from dataclasses import dataclass
@dataclass
class RetrievalEvalResult:
query: str
expected_doc_ids: list[str]
retrieved_doc_ids: list[str]
precision_at_5: float
recall_at_5: float
mrr: float # Mean reciprocal rank
hit: bool # Was any expected doc in top 5?
class RAGEvaluator:
"""Evaluate retrieval pipeline quality against a labeled dataset."""
def __init__(self, retriever: HybridRetriever, reranker: Reranker):
self.retriever = retriever
self.reranker = reranker
def evaluate(self, eval_set: list[dict], top_k: int = 5) -> dict:
"""Run evaluation on a set of {query, expected_doc_ids} pairs."""
results = []
for item in eval_set:
query = item["query"]
expected = set(item["expected_doc_ids"])
# Run the full retrieval pipeline
candidates = self.retriever.search(query, top_k=20)
reranked = self.reranker.rerank(query, candidates, top_k=top_k)
retrieved = [doc["id"] for doc in reranked]
# Calculate metrics
hits_at_k = [1 if doc_id in expected else 0 for doc_id in retrieved[:top_k]]
precision = sum(hits_at_k) / top_k if top_k > 0 else 0
recall = sum(hits_at_k) / len(expected) if expected else 0
# Mean reciprocal rank
mrr = 0.0
for rank, doc_id in enumerate(retrieved, 1):
if doc_id in expected:
mrr = 1.0 / rank
break
results.append(RetrievalEvalResult(
query=query,
expected_doc_ids=list(expected),
retrieved_doc_ids=retrieved,
precision_at_5=precision,
recall_at_5=recall,
mrr=mrr,
hit=any(hits_at_k),
))
# Aggregate metrics
n = len(results)
return {
"num_queries": n,
"avg_precision_at_5": sum(r.precision_at_5 for r in results) / n,
"avg_recall_at_5": sum(r.recall_at_5 for r in results) / n,
"avg_mrr": sum(r.mrr for r in results) / n,
"hit_rate": sum(1 for r in results if r.hit) / n,
"results": results, # For detailed analysis
}
We run this evaluation as a CI step. Every change to chunking strategy, embedding model, retrieval weights, or re-ranking model triggers a full evaluation run. The key metric we optimize for is hit rate (percentage of queries where at least one relevant document is in the top 5) — it is the most correlated with end-user satisfaction.
A healthy production RAG system should achieve: hit rate above 90%, MRR above 0.6, and precision@5 above 0.4. If your numbers are below these thresholds, the first thing to fix is usually chunking — bad chunks poison everything downstream.
Results
With these three layers, the jump from prototype to production accuracy is massive. Queries that returned irrelevant noise now surface the exact document the user needs. Answers include citations that point to specific paragraphs.
You don't need all three layers from day one. Start with smart chunking — it's the highest-impact change. Add hybrid retrieval when your corpus grows past a few hundred documents. Add context assembly when you need citations and precision.
The full pipeline adds roughly 200-400ms of latency compared to naive RAG (mostly from re-ranking and the BM25 index). In exchange, you get retrieval quality that actually works in production with real users and messy, diverse document collections. That tradeoff is worth it every time.