Cada sistema RAG que he construido para clientes este año comenzó de la misma manera: "Tenemos un prototipo, pero no funciona bien en producción". El pipeline es siempre el mismo — consulta del usuario, embedding, búsqueda vectorial, top-K, LLM. Funciona en demos con 50 documentos. Luego cargas 10,000 y todo se rompe.
El problema no es que RAG no funcione. El problema es que la arquitectura ingenua oculta tres problemas fundamentales que solo aparecen a escala.
El pipeline de RAG ingenuo y por qué falla
El flujo estándar de RAG parece engañosamente simple: tomar la consulta del usuario, generar un embedding, buscar los K vectores más similares, concatenar esos fragmentos y pasarlos al LLM. En un notebook con un conjunto de datos curado, esto obtiene resultados impresionantes. Pero la producción es una historia diferente.
El primer problema es que la similitud de coseno entre embeddings no equivale a la relevancia real. El segundo problema es que el top-K se vuelve cada vez más ruidoso a medida que crece el corpus. El tercer problema es que los fragmentos de tamaño fijo rompen el contexto en los límites de las oraciones y secciones.
Explicaré la arquitectura de tres capas que usamos en Cloudstudio para resolver cada uno de estos problemas, con el código real que ejecutamos en producción.
Capa 1: Fragmentación inteligente
En lugar de cortar cada N tokens, utilizamos una fragmentación recursiva que respeta la estructura del documento: los encabezados, párrafos, bloques de código y tablas se tratan como unidades atómicas. Cada fragmento lleva metadatos del elemento padre: fuente, fecha, jerarquía de secciones y su posición en el documento.
Añadimos una ventana de solapamiento del 10-15% entre fragmentos consecutivos para que el contexto en los límites nunca se pierda.
Aquí está el fragmentador recursivo que utilizamos. Intenta dividir primero en el límite más significativo (encabezados), luego recurre a párrafos, oraciones y, finalmente, a límites estrictos de tokens:
import re
from dataclasses import dataclass, field
@dataclass
class Chunk:
text: str
metadata: dict = field(default_factory=dict)
token_count: int = 0
chunk_id: str = ""
parent_id: str | None = None
class RecursiveChunker:
"""Split documents respecting structural boundaries."""
def __init__(
self,
max_tokens: int = 512,
overlap_tokens: int = 64,
tokenizer=None,
):
self.max_tokens = max_tokens
self.overlap_tokens = overlap_tokens
self.tokenizer = tokenizer or self._simple_tokenizer
# Separators ordered by priority — try the most meaningful split first
self.separators = [
r'\n#{1,3}\s', # Markdown headings
r'\n\n', # Double newline (paragraph break)
r'\n', # Single newline
r'(?<=[.!?])\s+', # Sentence boundary
r'\s+', # Word boundary (last resort)
]
def chunk_document(self, text: str, source_metadata: dict) -> list[Chunk]:
"""Chunk a document with overlap and metadata propagation."""
raw_sections = self._recursive_split(text, separator_idx=0)
chunks = []
doc_id = source_metadata.get("document_id", "unknown")
for i, section_text in enumerate(raw_sections):
chunk = Chunk(
text=section_text.strip(),
token_count=len(self.tokenizer(section_text)),
chunk_id=f"{doc_id}_chunk_{i:04d}",
metadata={
**source_metadata,
"chunk_index": i,
"total_chunks": len(raw_sections),
"heading_hierarchy": self._extract_headings(section_text),
},
)
chunks.append(chunk)
# Add overlap between consecutive chunks
chunks = self._add_overlap(chunks)
return chunks
def _recursive_split(self, text: str, separator_idx: int) -> list[str]:
"""Try to split with the current separator; if chunks are too big, recurse with the next one."""
if separator_idx >= len(self.separators):
# Last resort: hard cut at token limit
return self._hard_split(text)
pattern = self.separators[separator_idx]
parts = re.split(pattern, text)
result = []
current = ""
for part in parts:
if len(self.tokenizer(current + part)) <= self.max_tokens:
current += part
else:
if current:
result.append(current)
# If this single part is too large, split it with the next separator
if len(self.tokenizer(part)) > self.max_tokens:
result.extend(self._recursive_split(part, separator_idx + 1))
else:
current = part
if current:
result.append(current)
return result
def _add_overlap(self, chunks: list[Chunk]) -> list[Chunk]:
"""Add overlap text from the previous chunk to maintain context at boundaries."""
for i in range(1, len(chunks)):
prev_tokens = self.tokenizer(chunks[i - 1].text)
overlap_text = self._detokenize(prev_tokens[-self.overlap_tokens:])
chunks[i].text = overlap_text + "\n" + chunks[i].text
chunks[i].token_count = len(self.tokenizer(chunks[i].text))
chunks[i].metadata["has_overlap"] = True
return chunks
def _extract_headings(self, text: str) -> list[str]:
return re.findall(r'^#{1,3}\s+(.+)$', text, re.MULTILINE)
def _hard_split(self, text: str) -> list[str]:
tokens = self.tokenizer(text)
return [
self._detokenize(tokens[i:i + self.max_tokens])
for i in range(0, len(tokens), self.max_tokens - self.overlap_tokens)
]
@staticmethod
def _simple_tokenizer(text: str) -> list[str]:
return text.split()
@staticmethod
def _detokenize(tokens: list[str]) -> str:
return " ".join(tokens)
La clave reside en la prioridad de los separadores. Un fragmento de 512 tokens que termina en el límite de un encabezado es drásticamente más útil que uno que corta a mitad de una oración. Hemos medido esto: la fragmentación en los límites estructurales mejora la precisión de la recuperación entre un 15% y un 25% en comparación con la división de tamaño fijo, sin coste adicional.
Generación de embeddings
Una vez que tengas fragmentos limpios, necesitas generar sus embeddings. Utilizamos un pipeline por lotes que gestiona los límites de tasa, los reintentos y la propagación de metadatos:
import numpy as np
from openai import OpenAI
import time
class EmbeddingPipeline:
"""Generate embeddings with batching and rate-limit handling."""
def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 100):
self.client = OpenAI()
self.model = model
self.batch_size = batch_size
self.dimensions = 1536 # For text-embedding-3-small
def embed_chunks(self, chunks: list[Chunk]) -> list[dict]:
"""Embed all chunks, returning vectors with metadata."""
results = []
for i in range(0, len(chunks), self.batch_size):
batch = chunks[i:i + self.batch_size]
texts = [chunk.text for chunk in batch]
embeddings = self._embed_with_retry(texts)
for chunk, embedding in zip(batch, embeddings):
results.append({
"id": chunk.chunk_id,
"values": embedding,
"metadata": {
**chunk.metadata,
"text": chunk.text,
"token_count": chunk.token_count,
}
})
return results
def _embed_with_retry(self, texts: list[str], max_retries: int = 3) -> list[list[float]]:
for attempt in range(max_retries):
try:
response = self.client.embeddings.create(
model=self.model,
input=texts,
dimensions=self.dimensions,
)
return [item.embedding for item in response.data]
except Exception as e:
if "rate_limit" in str(e).lower() and attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
def embed_query(self, query: str) -> list[float]:
"""Embed a single query for search."""
response = self.client.embeddings.create(
model=self.model,
input=query,
dimensions=self.dimensions,
)
return response.data[0].embedding
Dos elecciones importantes aquí: usamos text-embedding-3-small en lugar de la variante grande porque la diferencia de calidad es marginal para la mayoría de las tareas de recuperación, pero el costo es 5 veces menor. Y establecemos dimensiones explícitas; esto nos permite reducir el tamaño del vector más adelante si el almacenamiento se convierte en una limitación sin tener que volver a generar los embeddings de todo el corpus.
Capa 2: Recuperación híbrida
La búsqueda vectorial pura captura la similitud semántica, pero pasa por alto términos exactos que importan: nombres de productos, códigos de error, IDs internos. La búsqueda por palabras clave BM25 captura esas coincidencias exactas que los vectores omiten. Usar ambas en paralelo ofrece lo mejor de los dos mundos.
Pero la verdadera magia reside en el re-ranking mediante cross-encoders. Se toman los candidatos combinados de ambas búsquedas y se califica cada uno con un modelo que evalúa los pares consulta-documento para determinar su relevancia real, no solo la proximidad vectorial.
Aquí está nuestra implementación de recuperación híbrida. Utilizamos Qdrant para los vectores y una implementación sencilla de BM25, para luego fusionar los resultados:
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
import math
from collections import Counter
class BM25Index:
"""Implementación ligera de BM25 para búsqueda por palabras clave."""
def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
self.doc_freqs: dict[str, int] = {}
self.doc_lengths: dict[str, int] = {}
self.avg_doc_length: float = 0
self.corpus_size: int = 0
self.index: dict[str, dict[str, int]] = {} # term -> {doc_id: freq}
self.documents: dict[str, str] = {}
def add_documents(self, docs: list[dict]):
for doc in docs:
doc_id = doc["id"]
text = doc["text"].lower()
tokens = text.split()
self.documents[doc_id] = doc["text"]
self.doc_lengths[doc_id] = len(tokens)
term_freqs = Counter(tokens)
for term, freq in term_freqs.items():
if term not in self.index:
self.index[term] = {}
self.index[term][doc_id] = freq
self.doc_freqs[term] = len(self.index.get(term, {}))
self.corpus_size = len(self.documents)
self.avg_doc_length = sum(self.doc_lengths.values()) / max(self.corpus_size, 1)
def search(self, query: str, top_k: int = 20) -> list[tuple[str, float]]:
query_terms = query.lower().split()
scores: dict[str, float] = {}
for term in query_terms:
if term not in self.index:
continue
df = self.doc_freqs[term]
idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
for doc_id, tf in self.index[term].items():
dl = self.doc_lengths[doc_id]
numerator = tf * (self.k1 + 1)
denominator = tf + self.k1 * (1 - self.b + self.b * dl / self.avg_doc_length)
scores[doc_id] = scores.get(doc_id, 0) + idf * numerator / denominator
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:top_k]
class HybridRetriever:
"""Combina la búsqueda vectorial y BM25 con fusión de rango recíproco (RRF)."""
def __init__(self, qdrant_url: str, collection_name: str):
self.vector_client = QdrantClient(url=qdrant_url)
self.collection = collection_name
self.bm25 = BM25Index()
self.embedding_pipeline = EmbeddingPipeline()
def search(self, query: str, top_k: int = 10, vector_weight: float = 0.6) -> list[dict]:
"""Ejecuta la búsqueda híbrida con fusión de rango recíproco."""
# Ejecuta ambas búsquedas en paralelo (simplificado aquí como secuencial)
vector_results = self._vector_search(query, top_k=top_k * 2)
bm25_results = self.bm25.search(query, top_k=top_k * 2)
# Fusión de rango recíproco (RRF)
fused_scores: dict[str, float] = {}
k = 60 # Constante RRF
for rank, (doc_id, _score) in enumerate(vector_results):
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + vector_weight / (k + rank + 1)
for rank, (doc_id, _score) in enumerate(bm25_results):
fused_scores[doc_id] = fused_scores.get(doc_id, 0) + (1 - vector_weight) / (k + rank + 1)
# Ordena por puntuación fusionada y devuelve top_k
ranked = sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
return [{"id": doc_id, "score": score} for doc_id, score in ranked]
def _vector_search(self, query: str, top_k: int) -> list[tuple[str, float]]:
query_embedding = self.embedding_pipeline.embed_query(query)
results = self.vector_client.search(
collection_name=self.collection,
query_vector=query_embedding,
limit=top_k,
)
return [(hit.id, hit.score) for hit in results]
Utilizamos la fusión de rango recíproco (RRF) en lugar de la normalización de puntuaciones porque RRF es más robusta: las puntuaciones de similitud vectorial y las de BM25 operan en escalas completamente diferentes, y normalizarlas introduce artefactos. RRF solo tiene en cuenta la posición en el ranking, lo que hace que la fusión sea estable en diferentes consultas y tamaños de corpus.
El parámetro vector_weight tiene un valor predeterminado de 0.6, lo que significa que nos inclinamos ligeramente hacia la búsqueda semántica. Para documentación técnica con muchos términos específicos (códigos de error, endpoints de API), aumentamos el peso de BM25 a 0.5 o incluso 0.6. Para contenido conversacional (artículos de soporte, preguntas frecuentes), mantenemos el predominio de la búsqueda semántica.
Re-ranking para mayor precisión
Los resultados combinados de la búsqueda híbrida son buenos, pero el re-ranking con un modelo de cross-encoder eleva la precisión significativamente. Un cross-encoder califica cada par consulta-documento directamente en lugar de comparar embeddings precalculados:
from sentence_transformers import CrossEncoder
class Reranker:
"""Re-rank retrieved chunks using a cross-encoder for maximum relevance."""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2"):
self.model = CrossEncoder(model_name)
def rerank(self, query: str, documents: list[dict], top_k: int = 5) -> list[dict]:
"""Score each document against the query and return the top_k most relevant."""
if not documents:
return []
# Build query-document pairs for the cross-encoder
pairs = [(query, doc["text"]) for doc in documents]
scores = self.model.predict(pairs)
# Attach scores and sort
for doc, score in zip(documents, scores):
doc["rerank_score"] = float(score)
ranked = sorted(documents, key=lambda d: d["rerank_score"], reverse=True)
return ranked[:top_k]
El re-ranking suele tardar entre 50 y 200 ms para 20 candidatos, lo cual es aceptable para la mayoría de las aplicaciones. La mejora en la precisión es sustancial: medimos consistentemente una mejora del 20-30% en la relevancia de los 5 mejores resultados después del re-ranking. El modelo de cross-encoder es pequeño (66 millones de parámetros) y puede ejecutarse en CPU, por lo que añade un coste de infraestructura mínimo.
Para aplicaciones sensibles a la latencia, puede omitir el re-ranking y confiar en las puntuaciones de fusión híbrida. Pero para cualquier caso donde la calidad de la respuesta importe más que el tiempo de respuesta (bases de conocimientos internas, preguntas y respuestas sobre documentos, búsqueda de cumplimiento), el re-ranking vale los 100 ms adicionales.
Capa 3: Ensamblaje de contexto
Tener buenos fragmentos (chunks) no es suficiente. La forma en que los ensamblas en el prompt final determina si el LLM ofrece una respuesta coherente o un resumen inconexo.
La expansión padre-hijo es clave: cuando recuperas un fragmento, extraes su sección superior para obtener el contexto completo. La deduplicación de fuentes evita el envío de contenido superpuesto. La gestión del presupuesto de tokens garantiza que encajes el máximo contexto relevante dentro de la ventana del modelo. Y el seguimiento de citas vincula cada afirmación con su fragmento de origen.
Aquí está el ensamblador de contexto que utilizamos:
import tiktoken
class ContextAssembler:
"""Ensambla los fragmentos recuperados en un prompt con citas y presupuestos de tokens."""
def __init__(self, max_context_tokens: int = 12000):
self.max_tokens = max_context_tokens
self.encoder = tiktoken.encoding_for_model("gpt-4") # Conteo de tokens
def assemble(self, query: str, ranked_chunks: list[dict], chunk_store: dict) -> dict:
"""Construye el bloque de contexto con seguimiento de citas."""
context_parts = []
citations = []
used_tokens = 0
seen_texts = set() # Para deduplicación
for rank, chunk in enumerate(ranked_chunks):
# Expansión padre: si el fragmento tiene un padre, incluirlo para contexto
expanded_text = self._expand_with_parent(chunk, chunk_store)
# Deduplicación: omitir si ya vimos contenido sustancialmente similar
text_fingerprint = self._fingerprint(expanded_text)
if text_fingerprint in seen_texts:
continue
seen_texts.add(text_fingerprint)
# Verificación de presupuesto de tokens
chunk_tokens = len(self.encoder.encode(expanded_text))
if used_tokens + chunk_tokens > self.max_tokens:
# Intentar incluir una versión truncada
remaining = self.max_tokens - used_tokens
if remaining > 100: # Solo incluir si es significativo
expanded_text = self._truncate_to_tokens(expanded_text, remaining)
chunk_tokens = remaining
else:
break
# Agregar con marcador de cita
citation_id = f"[{len(citations) + 1}]"
source = chunk.get("metadata", {}).get("source", "Desconocido")
section = chunk.get("metadata", {}).get("heading_hierarchy", [])
section_str = " > ".join(section) if section else "N/A"
context_parts.append(
f"--- Fuente {citation_id}: {source} | Sección: {section_str} ---\n"
f"{expanded_text}\n"
)
citations.append({
"id": citation_id,
"source": source,
"section": section_str,
"chunk_id": chunk.get("id"),
"relevance_score": chunk.get("rerank_score", chunk.get("score", 0)),
})
used_tokens += chunk_tokens
context_block = "\n".join(context_parts)
return {
"context": context_block,
"citations": citations,
"tokens_used": used_tokens,
"chunks_included": len(citations),
"prompt": self._build_prompt(query, context_block, citations),
}
def _build_prompt(self, query: str, context: str, citations: list) -> str:
citation_legend = "\n".join(
f" {c['id']} = {c['source']} ({c['section']})" for c in citations
)
return (
f"Responda la siguiente pregunta usando SOLO las fuentes proporcionadas. "
f"Cite sus fuentes usando la notación de corchetes (ej., [1], [2]).\n"
f"Si las fuentes no contienen información suficiente, dígalo explícitamente.\n\n"
f"Fuentes:\n{context}\n\n"
f"Leyenda de citas:\n{citation_legend}\n\n"
f"Pregunta: {query}\n\n"
f"Respuesta:"
)
def _expand_with_parent(self, chunk: dict, chunk_store: dict) -> str:
"""Si el fragmento tiene padre, anteponer el encabezado del padre para contexto."""
parent_id = chunk.get("metadata", {}).get("parent_id")
if parent_id and parent_id in chunk_store:
parent = chunk_store[parent_id]
headings = parent.get("metadata", {}).get("heading_hierarchy", [])
if headings:
return f"## {headings[-1]}\n\n{chunk['text']}"
return chunk["text"]
def _fingerprint(self, text: str) -> str:
"""Crear una huella digital aproximada para deduplicación."""
words = text.lower().split()[:50]
return " ".join(sorted(set(words)))
def _truncate_to_tokens(self, text: str, max_tokens: int) -> str:
tokens = self.encoder.encode(text)[:max_tokens]
return self.encoder.decode(tokens)
El seguimiento de citas es esencial para RAG en producción. Los usuarios necesitan verificar las respuestas, y su sistema necesita una pista de auditoría. Cada afirmación en la respuesta del LLM se puede rastrear hasta un fragmento específico, que a su vez se puede rastrear hasta un documento y sección específicos. Sin citas, RAG es una caja negra en la que nadie confía.
Evaluación: medición de la calidad de recuperación
No se puede mejorar lo que no se mide. Mantenemos un conjunto de evaluación de pares consulta-documento_esperado y ejecutamos métricas automatizadas después de cada cambio en el pipeline:
from dataclasses import dataclass
@dataclass
class RetrievalEvalResult:
query: str
expected_doc_ids: list[str]
retrieved_doc_ids: list[str]
precision_at_5: float
recall_at_5: float
mrr: float # Mean reciprocal rank
hit: bool # Was any expected doc in top 5?
class RAGEvaluator:
"""Evaluate retrieval pipeline quality against a labeled dataset."""
def __init__(self, retriever: HybridRetriever, reranker: Reranker):
self.retriever = retriever
self.reranker = reranker
def evaluate(self, eval_set: list[dict], top_k: int = 5) -> dict:
"""Run evaluation on a set of {query, expected_doc_ids} pairs."""
results = []
for item in eval_set:
query = item["query"]
expected = set(item["expected_doc_ids"])
# Run the full retrieval pipeline
candidates = self.retriever.search(query, top_k=20)
reranked = self.reranker.rerank(query, candidates, top_k=top_k)
retrieved = [doc["id"] for doc in reranked]
# Calculate metrics
hits_at_k = [1 if doc_id in expected else 0 for doc_id in retrieved[:top_k]]
precision = sum(hits_at_k) / top_k if top_k > 0 else 0
recall = sum(hits_at_k) / len(expected) if expected else 0
# Mean reciprocal rank
mrr = 0.0
for rank, doc_id in enumerate(retrieved, 1):
if doc_id in expected:
mrr = 1.0 / rank
break
results.append(RetrievalEvalResult(
query=query,
expected_doc_ids=list(expected),
retrieved_doc_ids=retrieved,
precision_at_5=precision,
recall_at_5=recall,
mrr=mrr,
hit=any(hits_at_k),
))
# Aggregate metrics
n = len(results)
return {
"num_queries": n,
"avg_precision_at_5": sum(r.precision_at_5 for r in results) / n,
"avg_recall_at_5": sum(r.recall_at_5 for r in results) / n,
"avg_mrr": sum(r.mrr for r in results) / n,
"hit_rate": sum(1 for r in results if r.hit) / n,
"results": results, # For detailed analysis
}
Ejecutamos esta evaluación como un paso de CI. Cada cambio en la estrategia de fragmentación (chunking), el modelo de embeddings, los pesos de recuperación o el modelo de re-ranking activa una ejecución de evaluación completa. La métrica clave que optimizamos es la tasa de aciertos o hit rate (porcentaje de consultas donde al menos un documento relevante está en el top 5); es la que más correlaciona con la satisfacción del usuario final.
Un sistema RAG de producción saludable debería alcanzar: una tasa de aciertos (hit rate) superior al 90%, un MRR superior a 0.6 y una precisión@5 superior a 0.4. Si sus cifras están por debajo de estos umbrales, lo primero que debe corregirse suele ser la fragmentación (chunking): los fragmentos deficientes envenenan todo el proceso posterior.
Resultados
Con estas tres capas, el salto de la precisión de prototipo a la de producción es enorme. Las consultas que devolvían ruido irrelevante ahora muestran el documento exacto que el usuario necesita. Las respuestas incluyen citas que apuntan a párrafos específicos.
No necesitas las tres capas desde el primer día. Empieza con la fragmentación inteligente: es el cambio de mayor impacto. Añade la recuperación híbrida cuando tu corpus crezca más allá de unos pocos cientos de documentos. Añade el ensamblaje de contexto cuando necesites citas y precisión.
El pipeline completo añade aproximadamente entre 200 y 400 ms de latencia en comparación con un RAG básico (principalmente por el re-ranking y el índice BM25). A cambio, obtienes una calidad de recuperación que realmente funciona en producción con usuarios reales y colecciones de documentos desordenadas y diversas. Ese intercambio vale la pena siempre.