Claude no es simplemente otro modelo de lenguaje. Su arquitectura está diseñada para integraciones empresariales serias: uso de herramientas nativo que permite al modelo invocar funciones de su sistema, salidas estructuradas que garantizan respuestas en el formato exacto que necesita, y streaming que mantiene la experiencia del usuario fluida incluso con respuestas largas.
En este artículo compartimos los patrones que utilizamos en Cloudstudio para integrar a Claude en aplicaciones de producción. Esta no es una guía de "hola mundo"; es lo que funciona cuando tienes miles de solicitudes por día y el costo importa.
Uso de herramientas: el superpoder de Claude
El uso de herramientas te permite definir herramientas como esquemas JSON que Claude puede invocar durante una conversación. El modelo decide cuándo usar cada herramienta, con qué parámetros y cómo interpretar los resultados. Esto convierte a Claude de un generador de texto en un componente activo de tu sistema.
La clave es diseñar herramientas granulares y bien documentadas. Cada herramienta debe hacer una sola cosa bien, con un esquema claro y una descripción que el modelo pueda entender. Las herramientas que son demasiado amplias confunden al modelo; las herramientas que son demasiado granulares generan llamadas excesivas.
Aquí tienes un ejemplo real de cómo definimos herramientas para el sistema de gestión de pedidos de un cliente:
import anthropic
client = anthropic.Anthropic()
tools = [
{
"name": "lookup_order",
"description": "Look up an order by order ID. Returns order status, items, shipping info, and payment details. Use this when the user asks about a specific order.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID, e.g. ORD-2024-1234"
}
},
"required": ["order_id"]
}
},
{
"name": "search_orders",
"description": "Search orders by customer email, date range, or status. Returns a list of matching orders. Use this when the user wants to find orders matching certain criteria.",
"input_schema": {
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "Customer email address"
},
"status": {
"type": "string",
"enum": ["pending", "shipped", "delivered", "cancelled"],
"description": "Order status filter"
},
"date_from": {
"type": "string",
"description": "Start date in YYYY-MM-DD format"
},
"date_to": {
"type": "string",
"description": "End date in YYYY-MM-DD format"
}
},
"required": []
}
}
]
La descripción de la herramienta es fundamental; es la señal principal que Claude utiliza para decidir qué herramienta llamar. Hemos descubierto que las descripciones escritas como instrucciones ("Usa esto cuando...") superan a las descripciones que simplemente indican lo que hace la herramienta.
Manejo de los resultados de las herramientas en el bucle de conversación
Cuando Claude decide usar una herramienta, la API devuelve una respuesta con . Tu aplicación ejecuta la herramienta y envía el resultado de vuelta. Este es el bucle completo:
def run_conversation(user_message: str, tools: list, system_prompt: str) -> str:
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system=system_prompt,
tools=tools,
messages=messages,
)
# If Claude responds with text and stops, we are done
if response.stop_reason == "end_turn":
return next(
block.text for block in response.content
if block.type == "text"
)
# If Claude wants to use a tool, execute it
if response.stop_reason == "tool_use":
# Add Claude's response (with tool_use blocks) to messages
messages.append({"role": "assistant", "content": response.content})
# Process each tool call
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result),
})
# Send tool results back to Claude
messages.append({"role": "user", "content": tool_results})
def execute_tool(name: str, params: dict) -> dict:
"""Route tool calls to actual implementations."""
handlers = {
"lookup_order": order_service.lookup,
"search_orders": order_service.search,
}
handler = handlers.get(name)
if not handler:
return {"error": f"Unknown tool: {name}"}
try:
return handler(**params)
except Exception as e:
return {"error": str(e)}
Un detalle crítico: maneja siempre el caso en el que falla una llamada a una herramienta. Devuelve el error como un resultado de la herramienta; Claude normalmente reconocerá el error e intentará de nuevo con parámetros diferentes o explicará la situación al usuario. Nunca ignores los errores de las herramientas en silencio.
Salidas estructuradas: Respuestas estructuradas para sistemas reales
Cuando Claude forma parte de una canalización automatizada, se necesitan respuestas en un formato predecible. Las salidas estructuradas obligan a que la respuesta siga un esquema JSON específico. Esto elimina el frágil análisis de texto libre y hace que la integración sea robusta.
Utilizamos salidas estructuradas para la clasificación de documentos, la extracción de datos, el análisis de sentimientos y cualquier caso en el que la respuesta alimente a otro componente del sistema. La fiabilidad pasa de un ~90% con prompts de texto libre a un ~99% con esquemas estrictos.
A continuación, se muestra cómo extraemos datos estructurados de correos electrónicos de soporte para el sistema de tickets de un cliente:
import anthropic
import json
client = anthropic.Anthropic()
def classify_support_email(email_body: str) -> dict:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Analyze this support email and extract structured data:\n\n{email_body}"
}],
tool_choice={"type": "tool", "name": "classify_email"},
tools=[{
"name": "classify_email",
"description": "Classify and extract data from a support email.",
"input_schema": {
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["billing", "technical", "account", "feature_request", "complaint"]
},
"priority": {
"type": "string",
"enum": ["low", "medium", "high", "urgent"]
},
"sentiment": {
"type": "string",
"enum": ["positive", "neutral", "negative", "angry"]
},
"summary": {
"type": "string",
"description": "One-sentence summary of the issue"
},
"entities": {
"type": "object",
"properties": {
"order_ids": {
"type": "array",
"items": {"type": "string"}
},
"product_names": {
"type": "array",
"items": {"type": "string"}
}
}
},
"suggested_action": {
"type": "string",
"description": "Recommended next step for the support team"
}
},
"required": ["category", "priority", "sentiment", "summary", "suggested_action"]
}
}]
)
# Extract the structured result from the tool call
tool_block = next(b for b in response.content if b.type == "tool_use")
return tool_block.input
El truco aquí es usar con — esto obliga a Claude a llamar a esa herramienta específica, garantizando una respuesta estructurada. El modelo no puede responder con texto libre. Este patrón es más fiable que solicitar JSON en el prompt porque el esquema se aplica a nivel de API.
Streaming: Respuestas en tiempo real para interfaces de usuario interactivas
Para las aplicaciones orientadas al usuario, el streaming es innegociable. Sin él, los usuarios se quedan mirando un indicador de carga durante 5-15 segundos. Con el streaming, el primer token aparece en menos de un segundo y la respuesta se construye progresivamente.
Implementamos el streaming utilizando Server-Sent Events (SSE), que funciona de forma nativa con los navegadores y frameworks modernos:
import anthropic
from flask import Response, stream_with_context
client = anthropic.Anthropic()
def stream_response(user_message: str, conversation_history: list):
"""Transmite la respuesta de Claude como Server-Sent Events."""
def generate():
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=conversation_history + [
{"role": "user", "content": user_message}
],
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield f"data: {json.dumps({'text': event.delta.text})}\n\n"
elif event.type == "message_stop":
# Enviar estadísticas de uso al final
usage = stream.get_final_message().usage
yield f"data: {json.dumps({'done': True, 'input_tokens': usage.input_tokens, 'output_tokens': usage.output_tokens})}\n\n"
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
)
En el frontend, consumir el flujo es sencillo:
async function streamChat(message) {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop(); // Mantener el fragmento incompleto en el búfer
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.text) {
appendToResponse(data.text);
}
if (data.done) {
showTokenUsage(data.input_tokens, data.output_tokens);
}
}
}
}
}
Un detalle importante: el streaming con el uso de herramientas requiere manejar eventos para detectar cuándo Claude comienza una llamada a una herramienta. La entrada de la herramienta llega como deltas que debes acumular y analizar como JSON una vez que el bloque esté completo. Envolvemos esto en una máquina de estados que rastrea si el bloque actual es texto o uso de herramientas.
Gestión de costes en producción
El coste de los tokens es predecible si se diseña para ello. Utilizamos el almacenamiento en caché de prompts para reducir el coste de los prompts de sistema repetidos, para limitar las respuestas y seleccionar el modelo por tarea: Haiku para una clasificación rápida, Sonnet para razonamiento general, Opus para tareas que requieren la máxima calidad.
Monitorizamos el coste por solicitud, por usuario y por funcionalidad. Establecemos alertas cuando el coste se desvía de la línea base. Y diseñamos alternativas: si el modelo principal no responde a tiempo, pasamos a un modelo más rápido en lugar de fallar.
Selección de modelo por tarea
No todas las solicitudes necesitan el mismo modelo. Enrutamos en función de la complejidad de la tarea:
from enum import Enum
from dataclasses import dataclass
class ModelTier(Enum):
FAST = "claude-haiku-4-20250514"
BALANCED = "claude-sonnet-4-20250514"
MAX = "claude-opus-4-20250514"
@dataclass
class TaskConfig:
model: ModelTier
max_tokens: int
cache_system_prompt: bool = True
# Enrutar tareas al modelo correcto
TASK_ROUTING = {
"classify_intent": TaskConfig(ModelTier.FAST, max_tokens=128),
"extract_entities": TaskConfig(ModelTier.FAST, max_tokens=512),
"summarize_document": TaskConfig(ModelTier.BALANCED, max_tokens=2048),
"generate_report": TaskConfig(ModelTier.BALANCED, max_tokens=4096),
"complex_analysis": TaskConfig(ModelTier.MAX, max_tokens=4096),
"code_review": TaskConfig(ModelTier.MAX, max_tokens=4096),
}
def get_model_for_task(task_type: str) -> TaskConfig:
return TASK_ROUTING.get(
task_type,
TaskConfig(ModelTier.BALANCED, max_tokens=2048) # por defecto
)
Almacenamiento en caché de prompts para la reducción de costes
Los prompts de sistema que se repiten en varias solicitudes son costosos. Con el almacenamiento en caché de prompts, Anthropic almacena en caché el prefijo del prompt y cobra un 90% menos por los tokens almacenados en caché. Para un prompt de sistema de 2.000 tokens repetido en 1.000 solicitudes diarias, eso supone un ahorro significativo:
def create_cached_request(user_message: str, task_type: str):
config = get_model_for_task(task_type)
system_blocks = [
{
"type": "text",
"text": SYSTEM_PROMPTS[task_type], # Prompt de sistema largo y detallado
"cache_control": {"type": "ephemeral"}
}
]
response = client.messages.create(
model=config.model.value,
max_tokens=config.max_tokens,
system=system_blocks,
messages=[{"role": "user", "content": user_message}],
)
# Registrar el rendimiento de la caché
logger.info(
"Cache stats",
extra={
"cache_read_tokens": response.usage.cache_read_input_tokens,
"cache_creation_tokens": response.usage.cache_creation_input_tokens,
"input_tokens": response.usage.input_tokens,
}
)
return response
En nuestros sistemas de producción, el almacenamiento en caché de prompts suele alcanzar una tasa de acierto de caché del 70-85% tras el periodo de calentamiento, lo que se traduce en una reducción de costes de aproximadamente el 60% en los tokens de entrada.
Manejo de errores y reintentos
En producción, las llamadas a la API fallan. Se alcanzan los límites de velocidad. Las redes agotan el tiempo de espera. Tu integración debe manejar todo esto de manera elegante sin perder el contexto del usuario ni generar acciones duplicadas.
Utilizamos el retroceso exponencial (exponential backoff) con fluctuación (jitter) para los reintentos, y distinguimos entre errores reintentables y no reintentables:
import time
import random
import anthropic
def call_claude_with_retries(
messages: list,
model: str = "claude-sonnet-4-20250514",
max_retries: int = 3,
**kwargs
) -> anthropic.types.Message:
"""Llamar a Claude con retroceso exponencial y lógica de reintento inteligente."""
client = anthropic.Anthropic()
for attempt in range(max_retries + 1):
try:
return client.messages.create(
model=model,
messages=messages,
**kwargs,
)
except anthropic.RateLimitError as e:
if attempt == max_retries:
raise
# Usar el encabezado retry-after si está disponible, de lo contrario, retroceso exponencial
retry_after = float(e.response.headers.get("retry-after", 0))
wait = max(retry_after, (2 ** attempt) + random.uniform(0, 1))
logger.warning(f"Límite de velocidad alcanzado, reintentando en {wait:.1f}s (intento {attempt + 1})")
time.sleep(wait)
except anthropic.APIStatusError as e:
if e.status_code >= 500:
# Los errores del servidor son reintentables
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Error del servidor {e.status_code}, reintentando en {wait:.1f}s")
time.sleep(wait)
else:
# Los errores del cliente (400, 401, 403) no son reintentables
raise
except anthropic.APIConnectionError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Error de conexión, reintentando en {wait:.1f}s")
time.sleep(wait)
Tiempos de espera y alternativas a nivel de solicitud
Para las solicitudes orientadas al usuario, establecemos tiempos de espera agresivos y recurrimos a modelos más rápidos:
import anthropic
def call_with_fallback(messages: list, timeout: float = 15.0) -> dict:
"""Intentar con el modelo principal con un tiempo de espera, recurrir a un modelo más rápido."""
client = anthropic.Anthropic(timeout=timeout)
try:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=messages,
)
return {"text": extract_text(response), "model": "sonnet", "fallback": False}
except (anthropic.APITimeoutError, anthropic.APIConnectionError):
logger.warning("El modelo principal agotó el tiempo de espera, recurriendo a Haiku")
fallback_client = anthropic.Anthropic(timeout=10.0)
response = fallback_client.messages.create(
model="claude-haiku-4-20250514",
max_tokens=1024,
messages=messages,
)
return {"text": extract_text(response), "model": "haiku", "fallback": True}
Lista de verificación para producción
Después de integrar a Claude en docenas de proyectos, esta es la lista de verificación que revisamos antes de cada despliegue:
- Cada herramienta tiene manejo de errores — las herramientas devuelven errores estructurados, nunca lanzan excepciones no controladas.
- Los presupuestos de tokens están establecidos — se configuran explícitamente para cada solicitud, basándose en el tamaño de salida esperado.
- El almacenamiento en caché de prompts está habilitado — para cualquier prompt de sistema de más de 1,024 tokens que se repita en varias solicitudes.
- El streaming está habilitado para funciones orientadas al usuario — ningún usuario debería esperar una respuesta completa para ver el progreso.
- El enrutamiento de modelos está configurado — las tareas económicas usan Haiku, las tareas complejas usan Sonnet u Opus.
- La lógica de reintento cubre todos los modos de fallo — límites de tasa, errores de servidor, tiempos de espera y fallos de conexión.
- Las cadenas de respaldo están probadas — verificamos que las respuestas degradadas sigan siendo aceptables.
- El monitoreo de costos está activo — registro de tokens por solicitud con alertas sobre anomalías.
- Las entradas de las herramientas están validadas — antes de ejecutar cualquier herramienta, validamos los parámetros que envía Claude contra nuestro propio esquema.
- El contexto de la conversación está limitado — truncamos o resumimos las conversaciones largas para evitar la explosión de tokens.
La inversión en estos patrones rinde frutos de inmediato. Un despliegue de Claude bien integrado debería lograr un tiempo de actividad >99.5%, costos predecibles y tiempos de respuesta inferiores a 2 segundos para el 95% de las solicitudes. El modelo es notablemente capaz; el desafío de ingeniería consiste en construir el sistema a su alrededor.