Claude is not just another language model. Its architecture is designed for serious enterprise integrations: native tool use that allows the model to invoke functions from your system, structured outputs that guarantee responses in the exact format you need, and streaming that keeps the user experience fluid even with long responses.
In this article we share the patterns we use at Cloudstudio to integrate Claude into production applications. This is not a "hello world" guide — it is what works when you have thousands of requests per day and cost matters.
Tool Use: Claude's superpower
Tool use allows you to define tools as JSON schemas that Claude can invoke during a conversation. The model decides when to use each tool, with what parameters, and how to interpret the results. This turns Claude from a text generator into an active component of your system.
The key is designing granular, well-documented tools. Each tool should do one thing well, with a clear schema and a description the model can understand. Tools that are too broad confuse the model; tools that are too granular generate excessive calls.
Here is a real example of how we define tools for a client's order management system:
import anthropic
client = anthropic.Anthropic()
tools = [
{
"name": "lookup_order",
"description": "Look up an order by order ID. Returns order status, items, shipping info, and payment details. Use this when the user asks about a specific order.",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID, e.g. ORD-2024-1234"
}
},
"required": ["order_id"]
}
},
{
"name": "search_orders",
"description": "Search orders by customer email, date range, or status. Returns a list of matching orders. Use this when the user wants to find orders matching certain criteria.",
"input_schema": {
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "Customer email address"
},
"status": {
"type": "string",
"enum": ["pending", "shipped", "delivered", "cancelled"],
"description": "Order status filter"
},
"date_from": {
"type": "string",
"description": "Start date in YYYY-MM-DD format"
},
"date_to": {
"type": "string",
"description": "End date in YYYY-MM-DD format"
}
},
"required": []
}
}
]
The tool description is critical — it is the primary signal Claude uses to decide which tool to call. We have found that descriptions written as instructions ("Use this when...") outperform descriptions that simply state what the tool does.
Handling tool results in the conversation loop
When Claude decides to use a tool, the API returns a response with . Your application executes the tool and sends the result back. This is the full loop:
def run_conversation(user_message: str, tools: list, system_prompt: str) -> str:
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system=system_prompt,
tools=tools,
messages=messages,
)
# If Claude responds with text and stops, we are done
if response.stop_reason == "end_turn":
return next(
block.text for block in response.content
if block.type == "text"
)
# If Claude wants to use a tool, execute it
if response.stop_reason == "tool_use":
# Add Claude's response (with tool_use blocks) to messages
messages.append({"role": "assistant", "content": response.content})
# Process each tool call
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result),
})
# Send tool results back to Claude
messages.append({"role": "user", "content": tool_results})
def execute_tool(name: str, params: dict) -> dict:
"""Route tool calls to actual implementations."""
handlers = {
"lookup_order": order_service.lookup,
"search_orders": order_service.search,
}
handler = handlers.get(name)
if not handler:
return {"error": f"Unknown tool: {name}"}
try:
return handler(**params)
except Exception as e:
return {"error": str(e)}
A critical detail: always handle the case where a tool call fails. Return the error as a tool result — Claude will typically acknowledge the error and either retry with different parameters or explain the situation to the user. Never swallow tool errors silently.
Structured Outputs: Structured responses for real systems
When Claude is part of an automated pipeline, you need responses in a predictable format. Structured outputs force the response to follow a specific JSON schema. This eliminates fragile free-text parsing and makes the integration robust.
We use structured outputs for document classification, data extraction, sentiment analysis, and any case where the response feeds another system component. Reliability goes from ~90% with free-text prompts to ~99% with strict schemas.
Here is how we extract structured data from support emails for a client's ticketing system:
import anthropic
import json
client = anthropic.Anthropic()
def classify_support_email(email_body: str) -> dict:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Analyze this support email and extract structured data:\n\n{email_body}"
}],
tool_choice={"type": "tool", "name": "classify_email"},
tools=[{
"name": "classify_email",
"description": "Classify and extract data from a support email.",
"input_schema": {
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["billing", "technical", "account", "feature_request", "complaint"]
},
"priority": {
"type": "string",
"enum": ["low", "medium", "high", "urgent"]
},
"sentiment": {
"type": "string",
"enum": ["positive", "neutral", "negative", "angry"]
},
"summary": {
"type": "string",
"description": "One-sentence summary of the issue"
},
"entities": {
"type": "object",
"properties": {
"order_ids": {
"type": "array",
"items": {"type": "string"}
},
"product_names": {
"type": "array",
"items": {"type": "string"}
}
}
},
"suggested_action": {
"type": "string",
"description": "Recommended next step for the support team"
}
},
"required": ["category", "priority", "sentiment", "summary", "suggested_action"]
}
}]
)
# Extract the structured result from the tool call
tool_block = next(b for b in response.content if b.type == "tool_use")
return tool_block.input
The trick here is using with — this forces Claude to call that specific tool, guaranteeing a structured response. The model cannot respond with free text. This pattern is more reliable than asking for JSON in the prompt because the schema is enforced at the API level.
Streaming: Real-time responses for interactive UIs
For user-facing applications, streaming is non-negotiable. Without it, users stare at a loading spinner for 5-15 seconds. With streaming, the first token appears in under a second, and the response builds progressively.
We implement streaming using Server-Sent Events (SSE), which works natively with modern browsers and frameworks:
import anthropic
from flask import Response, stream_with_context
client = anthropic.Anthropic()
def stream_response(user_message: str, conversation_history: list):
"""Stream Claude's response as Server-Sent Events."""
def generate():
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=conversation_history + [
{"role": "user", "content": user_message}
],
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield f"data: {json.dumps({'text': event.delta.text})}\n\n"
elif event.type == "message_stop":
# Send usage stats at the end
usage = stream.get_final_message().usage
yield f"data: {json.dumps({'done': True, 'input_tokens': usage.input_tokens, 'output_tokens': usage.output_tokens})}\n\n"
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
)
On the frontend, consuming the stream is straightforward:
async function streamChat(message) {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop(); // Keep incomplete chunk in buffer
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (data.text) {
appendToResponse(data.text);
}
if (data.done) {
showTokenUsage(data.input_tokens, data.output_tokens);
}
}
}
}
}
One important gotcha: streaming with tool use requires handling events to detect when Claude begins a tool call. The tool input arrives as deltas that you need to accumulate and parse as JSON once the block is complete. We wrap this in a state machine that tracks whether the current block is text or tool use.
Cost management in production
Token cost is predictable if you design for it. We use prompt caching to reduce the cost of repeated system prompts, to limit responses, and select the model per task: Haiku for fast classification, Sonnet for general reasoning, Opus for tasks that require maximum quality.
We monitor cost per request, per user, and per feature. We set alerts when cost deviates from the baseline. And we design fallbacks: if the primary model does not respond in time, we degrade to a faster model instead of failing.
Model selection per task
Not every request needs the same model. We route based on task complexity:
from enum import Enum
from dataclasses import dataclass
class ModelTier(Enum):
FAST = "claude-haiku-4-20250514"
BALANCED = "claude-sonnet-4-20250514"
MAX = "claude-opus-4-20250514"
@dataclass
class TaskConfig:
model: ModelTier
max_tokens: int
cache_system_prompt: bool = True
# Route tasks to the right model
TASK_ROUTING = {
"classify_intent": TaskConfig(ModelTier.FAST, max_tokens=128),
"extract_entities": TaskConfig(ModelTier.FAST, max_tokens=512),
"summarize_document": TaskConfig(ModelTier.BALANCED, max_tokens=2048),
"generate_report": TaskConfig(ModelTier.BALANCED, max_tokens=4096),
"complex_analysis": TaskConfig(ModelTier.MAX, max_tokens=4096),
"code_review": TaskConfig(ModelTier.MAX, max_tokens=4096),
}
def get_model_for_task(task_type: str) -> TaskConfig:
return TASK_ROUTING.get(
task_type,
TaskConfig(ModelTier.BALANCED, max_tokens=2048) # default
)
Prompt caching for cost reduction
System prompts that repeat across requests are expensive. With prompt caching, Anthropic caches the prompt prefix and charges 90% less for cached tokens. For a system prompt of 2,000 tokens repeated across 1,000 daily requests, that is a significant saving:
def create_cached_request(user_message: str, task_type: str):
config = get_model_for_task(task_type)
system_blocks = [
{
"type": "text",
"text": SYSTEM_PROMPTS[task_type], # Long, detailed system prompt
"cache_control": {"type": "ephemeral"}
}
]
response = client.messages.create(
model=config.model.value,
max_tokens=config.max_tokens,
system=system_blocks,
messages=[{"role": "user", "content": user_message}],
)
# Log cache performance
logger.info(
"Cache stats",
extra={
"cache_read_tokens": response.usage.cache_read_input_tokens,
"cache_creation_tokens": response.usage.cache_creation_input_tokens,
"input_tokens": response.usage.input_tokens,
}
)
return response
In our production systems, prompt caching typically achieves a 70-85% cache hit rate after warm-up, which translates to roughly 60% cost reduction on input tokens.
Error handling and retries
In production, API calls fail. Rate limits hit. Networks time out. Your integration must handle all of this gracefully without losing user context or generating duplicate actions.
We use exponential backoff with jitter for retries, and we distinguish between retryable and non-retryable errors:
import time
import random
import anthropic
def call_claude_with_retries(
messages: list,
model: str = "claude-sonnet-4-20250514",
max_retries: int = 3,
**kwargs
) -> anthropic.types.Message:
"""Call Claude with exponential backoff and smart retry logic."""
client = anthropic.Anthropic()
for attempt in range(max_retries + 1):
try:
return client.messages.create(
model=model,
messages=messages,
**kwargs,
)
except anthropic.RateLimitError as e:
if attempt == max_retries:
raise
# Use retry-after header if available, otherwise exponential backoff
retry_after = float(e.response.headers.get("retry-after", 0))
wait = max(retry_after, (2 ** attempt) + random.uniform(0, 1))
logger.warning(f"Rate limited, retrying in {wait:.1f}s (attempt {attempt + 1})")
time.sleep(wait)
except anthropic.APIStatusError as e:
if e.status_code >= 500:
# Server errors are retryable
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Server error {e.status_code}, retrying in {wait:.1f}s")
time.sleep(wait)
else:
# Client errors (400, 401, 403) are not retryable
raise
except anthropic.APIConnectionError:
if attempt == max_retries:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Connection error, retrying in {wait:.1f}s")
time.sleep(wait)
Request-level timeouts and fallbacks
For user-facing requests, we set aggressive timeouts and fall back to faster models:
import anthropic
def call_with_fallback(messages: list, timeout: float = 15.0) -> dict:
"""Try the primary model with a timeout, fall back to a faster model."""
client = anthropic.Anthropic(timeout=timeout)
try:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=messages,
)
return {"text": extract_text(response), "model": "sonnet", "fallback": False}
except (anthropic.APITimeoutError, anthropic.APIConnectionError):
logger.warning("Primary model timed out, falling back to Haiku")
fallback_client = anthropic.Anthropic(timeout=10.0)
response = fallback_client.messages.create(
model="claude-haiku-4-20250514",
max_tokens=1024,
messages=messages,
)
return {"text": extract_text(response), "model": "haiku", "fallback": True}
Production checklist
After integrating Claude in dozens of projects, here is the checklist we go through before every deployment:
- Every tool has error handling — tools return structured errors, never raise unhandled exceptions.
- Token budgets are set — is explicitly set for every request, based on the expected output size.
- Prompt caching is enabled — for any system prompt over 1,024 tokens that repeats across requests.
- Streaming is enabled for user-facing features — no user should wait for a complete response to see progress.
- Model routing is configured — cheap tasks use Haiku, complex tasks use Sonnet or Opus.
- Retry logic covers all failure modes — rate limits, server errors, timeouts, and connection failures.
- Fallback chains are tested — we verify that degraded responses are still acceptable.
- Cost monitoring is active — per-request token logging with alerts on anomalies.
- Tool inputs are validated — before executing any tool, we validate the parameters Claude sends against our own schema.
- Conversation context is bounded — we truncate or summarize long conversations to prevent token explosion.
The investment in these patterns pays off immediately. A well-integrated Claude deployment should achieve >99.5% uptime, predictable costs, and response times under 2 seconds for 95% of requests. The model is remarkably capable — the engineering challenge is building the system around it.