From prototype to production: Calling an LLM API in a notebook is straightforward. Running those same calls reliably at scale, across multiple providers, with cost controls, error recovery, and observability, is an engineering discipline in itself. This section covers the patterns and tools that separate production LLM systems from proof-of-concept demos. Every concept here addresses a real failure mode that teams encounter when they move from development to deployment.
1. Provider Routing with LiteLLM
LiteLLM is an open-source library that provides a unified interface for calling over 100 LLM providers using the OpenAI SDK format. Instead of writing provider-specific code for OpenAI, Anthropic, Google, and others, you call litellm.completion() with a model string that includes a provider prefix. LiteLLM handles the translation between the OpenAI format and each provider's native API.
import litellm
# Same function call, different providers
# LiteLLM translates automatically
# OpenAI
response_openai = litellm.completion(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain caching in one sentence."}],
max_tokens=50
)
# Anthropic (note the provider prefix)
response_anthropic = litellm.completion(
model="anthropic/claude-sonnet-4-20250514",
messages=[{"role": "user", "content": "Explain caching in one sentence."}],
max_tokens=50
)
# Google Gemini
response_gemini = litellm.completion(
model="gemini/gemini-2.5-flash",
messages=[{"role": "user", "content": "Explain caching in one sentence."}],
max_tokens=50
)
for name, resp in [("OpenAI", response_openai),
("Anthropic", response_anthropic),
("Gemini", response_gemini)]:
print(f"{name}: {resp.choices[0].message.content[:80]}...")
print(f" Cost: ${litellm.completion_cost(resp):.6f}")
1.1 Fallback Routing
One of LiteLLM's most valuable features is automatic fallback routing. You define a primary model and one or more fallback models. If the primary model fails (due to rate limits, downtime, or errors), LiteLLM automatically retries with the next model in the list. This is critical for production systems where a single provider outage should not bring down your application.
import litellm
from litellm import Router
# Configure router with fallback models
router = Router(
model_list=[
{
"model_name": "main-model", # Your application references this name
"litellm_params": {
"model": "gpt-4o",
"api_key": "sk-...",
}
},
{
"model_name": "main-model", # Same name = fallback for the same logical model
"litellm_params": {
"model": "anthropic/claude-sonnet-4-20250514",
"api_key": "sk-ant-...",
}
},
{
"model_name": "main-model",
"litellm_params": {
"model": "gemini/gemini-2.5-flash",
"api_key": "...",
}
}
],
fallbacks=[{"main-model": ["main-model"]}], # Try all deployments
routing_strategy="least-busy", # Route to the deployment with lowest queue
num_retries=2
)
# Your application code just references "main-model"
response = router.completion(
model="main-model",
messages=[{"role": "user", "content": "Summarize the benefits of caching."}]
)
print(response.choices[0].message.content[:100])
The abstraction layer principle: Your application code should reference logical model names (like "main-model" or "fast-classifier"), never specific provider model IDs. The routing layer maps these logical names to physical deployments. This means you can change providers, add fallbacks, or adjust routing strategies without modifying any application code.
2. Retry Strategies and Error Handling
LLM API calls fail in predictable ways. Understanding the error taxonomy lets you build appropriate recovery strategies for each failure type.
2.1 Error Taxonomy
| Error | HTTP Code | Strategy |
|---|---|---|
| Rate limit exceeded | 429 | Exponential backoff with jitter; respect Retry-After header |
| Context length exceeded | 400 | Truncate input and retry; summarize long contexts |
| Content filter triggered | 400 | Rephrase prompt; use a different model; skip the request |
| Malformed tool call JSON | N/A (parsing) | Retry with stricter schema; include error message in reprompt |
| Server error | 500, 502, 503 | Retry with backoff; failover to another provider |
| Timeout | N/A (network) | Separate TTFT timeout from total generation timeout |
| Authentication failure | 401, 403 | Do not retry; alert on key expiration; rotate keys |
2.2 Exponential Backoff with Jitter
The standard retry pattern for rate limits and transient errors is exponential backoff with jitter. Without jitter, all clients that hit a rate limit at the same time would retry at the same time, causing a "thundering herd" that overwhelms the API again. Adding random jitter spreads the retries across time.
import time
import random
from openai import OpenAI, RateLimitError, APIStatusError
client = OpenAI()
def call_with_retry(messages, max_retries=5, base_delay=1.0):
"""Call the API with exponential backoff and jitter."""
for attempt in range(max_retries):
try:
return client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=200
)
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff: 1s, 2s, 4s, 8s, 16s (with jitter)
delay = base_delay * (2 ** attempt)
jitter = random.uniform(0, delay * 0.5) # Add up to 50% jitter
wait_time = delay + jitter
print(f"Rate limited (attempt {attempt + 1}/{max_retries}). "
f"Waiting {wait_time:.1f}s...")
time.sleep(wait_time)
except APIStatusError as e:
if e.status_code in (500, 502, 503):
# Server errors: retry with backoff
delay = base_delay * (2 ** attempt)
time.sleep(delay + random.uniform(0, 1))
continue
raise # Don't retry client errors (400, 401, etc.)
response = call_with_retry([{"role": "user", "content": "Hello!"}])
print(f"Success: {response.choices[0].message.content}")
3. The Circuit Breaker Pattern
Exponential backoff handles transient failures, but what about sustained outages? If a provider is down for minutes or hours, you do not want every request to wait through the full retry sequence before failing over. The circuit breaker pattern solves this by tracking failure rates and "tripping" when failures exceed a threshold, immediately routing requests to a fallback without attempting the failed provider.
import time
from dataclasses import dataclass, field
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Provider is down, use fallback
HALF_OPEN = "half_open" # Testing if provider recovered
@dataclass
class CircuitBreaker:
failure_threshold: int = 5 # Failures before opening
recovery_timeout: float = 60.0 # Seconds before testing recovery
failure_count: int = field(default=0, init=False)
state: CircuitState = field(default=CircuitState.CLOSED, init=False)
last_failure_time: float = field(default=0.0, init=False)
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if recovery timeout has elapsed
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True # Allow one test request
return False
if self.state == CircuitState.HALF_OPEN:
return True
return False
def record_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit OPENED after {self.failure_count} failures. "
f"Routing to fallback for {self.recovery_timeout}s.")
# Usage
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)
def call_with_circuit_breaker(messages, primary_fn, fallback_fn):
if breaker.can_execute():
try:
result = primary_fn(messages)
breaker.record_success()
return result
except Exception as e:
breaker.record_failure()
if breaker.state == CircuitState.OPEN:
return fallback_fn(messages)
raise
else:
return fallback_fn(messages)
print(f"Circuit state: {breaker.state.value}")
print(f"Ready: {breaker.can_execute()}")
4. Caching Strategies
LLM calls are expensive and slow compared to traditional API calls. Caching is one of the most effective optimization strategies, and there are two complementary approaches: exact caching and semantic caching.
4.1 Exact Caching
The simplest caching strategy stores responses keyed by the exact input (model + messages + parameters). If the same request comes in again, the cached response is returned instantly without making an API call. This is ideal for deterministic outputs (temperature=0) or when slight variations in output are acceptable.
import hashlib
import json
import time
from functools import lru_cache
class LLMCache:
"""Simple in-memory cache for LLM responses."""
def __init__(self, max_size=1000, ttl_seconds=3600):
self.cache = {}
self.max_size = max_size
self.ttl = ttl_seconds
self.hits = 0
self.misses = 0
def _make_key(self, model, messages, **kwargs):
"""Create a deterministic cache key from request parameters."""
key_data = json.dumps({
"model": model,
"messages": messages,
"params": {k: v for k, v in sorted(kwargs.items())}
}, sort_keys=True)
return hashlib.sha256(key_data.encode()).hexdigest()
def get(self, model, messages, **kwargs):
key = self._make_key(model, messages, **kwargs)
if key in self.cache:
entry = self.cache[key]
if time.time() - entry["timestamp"] < self.ttl:
self.hits += 1
return entry["response"]
else:
del self.cache[key] # Expired
self.misses += 1
return None
def put(self, model, messages, response, **kwargs):
if len(self.cache) >= self.max_size:
# Evict oldest entry
oldest = min(self.cache, key=lambda k: self.cache[k]["timestamp"])
del self.cache[oldest]
key = self._make_key(model, messages, **kwargs)
self.cache[key] = {"response": response, "timestamp": time.time()}
@property
def hit_rate(self):
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
cache = LLMCache(ttl_seconds=3600)
print(f"Cache initialized. Hit rate: {cache.hit_rate:.1%}")
print(f"Max size: {cache.max_size}, TTL: {cache.ttl}s")
4.2 Semantic Caching
Exact caching misses when semantically identical queries use different wording. "What is the capital of France?" and "Tell me France's capital city" are the same question but produce different cache keys. Semantic caching solves this by embedding incoming queries into a vector space and searching for similar cached queries using cosine similarity. If a cached query is sufficiently similar (typically above a 0.95 cosine threshold), the cached response is returned.
import numpy as np
from dataclasses import dataclass
@dataclass
class CacheEntry:
query_text: str
query_embedding: np.ndarray
response: str
timestamp: float
class SemanticCache:
"""Semantic cache using embedding similarity."""
def __init__(self, similarity_threshold=0.95, max_entries=500):
self.entries: list[CacheEntry] = []
self.threshold = similarity_threshold
self.max_entries = max_entries
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def search(self, query_embedding: np.ndarray) -> str | None:
"""Find a cached response if a similar query exists."""
best_score = 0.0
best_response = None
for entry in self.entries:
score = self._cosine_similarity(query_embedding, entry.query_embedding)
if score > best_score:
best_score = score
best_response = entry.response
if best_score >= self.threshold:
return best_response
return None
def store(self, query_text, query_embedding, response):
import time
if len(self.entries) >= self.max_entries:
self.entries.pop(0) # Remove oldest
self.entries.append(CacheEntry(
query_text=query_text,
query_embedding=query_embedding,
response=response,
timestamp=time.time()
))
# Example: two semantically identical queries
cache = SemanticCache(similarity_threshold=0.95)
# Simulate embeddings (in production, use an embedding model)
emb1 = np.random.randn(1536)
emb1 = emb1 / np.linalg.norm(emb1)
# A very similar embedding (simulating a semantically close query)
noise = np.random.randn(1536) * 0.02
emb2 = emb1 + noise
emb2 = emb2 / np.linalg.norm(emb2)
cache.store("What is the capital of France?", emb1, "The capital of France is Paris.")
result = cache.search(emb2)
similarity = float(np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2)))
print(f"Similarity: {similarity:.4f}")
print(f"Cache hit: {result is not None}")
print(f"Result: {result}")
The 0.95 cosine similarity threshold is a reasonable starting point, but it must be calibrated for your specific use case. A false cache hit (returning a wrong cached answer) is far worse than a cache miss. Build a validation set of 100+ query pairs, labeled as "same intent" or "different intent," and measure precision and recall at different thresholds. For safety-critical applications, use 0.97+. For FAQ-style workloads, 0.90 to 0.92 may be appropriate. See Section 11.4 for a detailed threshold analysis.
Production tools for semantic caching: For production use, consider GPTCache (an open-source library that integrates with multiple embedding models and vector stores) or Redis with its vector search capability. These handle the embedding, similarity search, TTL management, and eviction policies for you, so you do not need to implement them from scratch.
5. AI Gateways: Portkey and Helicone
AI gateways sit between your application and LLM providers, acting as a proxy that adds routing, observability, caching, guardrails, and cost tracking. They require minimal code changes (often just changing the base URL) and provide immediate production benefits.
5.1 Portkey
Portkey is an AI gateway that supports routing, fallbacks, spend tracking, caching, and guardrails across 1600+ LLMs. It works as a proxy: you point your OpenAI client at Portkey's gateway URL and add a configuration header. Portkey handles the rest.
from openai import OpenAI
from portkey_ai import PORTKEY_GATEWAY_URL, createHeaders
# Configure Portkey with routing and caching
client = OpenAI(
base_url=PORTKEY_GATEWAY_URL,
default_headers=createHeaders(
api_key="your-portkey-key",
config={
"strategy": {
"mode": "fallback", # Try models in order
},
"targets": [
{
"provider": "openai",
"api_key": "sk-...",
"override_params": {"model": "gpt-4o"}
},
{
"provider": "anthropic",
"api_key": "sk-ant-...",
"override_params": {"model": "claude-sonnet-4-20250514"}
}
],
"cache": {"mode": "semantic", "max_age": 3600}
}
)
)
# Your code is unchanged; Portkey handles routing and caching
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain caching briefly."}]
)
print(response.choices[0].message.content[:100])
5.2 Helicone
Helicone is an open-source observability proxy focused on request logging, cost tracking, and analytics. Like Portkey, it works by changing the base URL. Helicone logs every request with latency, token counts, cost, and custom metadata, giving you a dashboard for monitoring your LLM usage.
from openai import OpenAI
# Route through Helicone for observability
client = OpenAI(
base_url="https://oai.helicone.ai/v1",
default_headers={
"Helicone-Auth": "Bearer your-helicone-key",
"Helicone-Cache-Enabled": "true", # Enable response caching
"Helicone-Property-Environment": "production",
"Helicone-Property-Feature": "customer-support", # Tag for cost attribution
}
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "How does Helicone work?"}]
)
# Helicone dashboard now shows:
# - Request latency, token count, estimated cost
# - Cache hit/miss status
# - Custom properties for filtering and grouping
print(f"Response received. Tokens: {response.usage.total_tokens}")
print("Check Helicone dashboard for detailed analytics.")
Start with a gateway early: Adding an AI gateway is one of the highest-leverage changes you can make to a production LLM system. The cost tracking alone pays for itself by identifying which features, users, or prompts consume the most tokens. Adding it later requires changing every API call site; adding it from the start requires changing only the base URL.
6. Token Budget Enforcement
Without explicit controls, LLM costs can spike unpredictably. Token budget enforcement tracks and limits spending at multiple granularities: per user, per organization, per feature, and per time period. The goal is to prevent runaway costs while maintaining service availability for within-budget users.
import time
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class TokenBudget:
"""Track and enforce token spending limits."""
limits: dict = field(default_factory=dict) # entity -> max tokens per period
usage: dict = field(default_factory=lambda: defaultdict(int))
period_start: dict = field(default_factory=dict)
period_seconds: float = 86400 # Default: daily budget
def set_limit(self, entity: str, max_tokens: int):
self.limits[entity] = max_tokens
self.period_start[entity] = time.time()
def check_budget(self, entity: str, estimated_tokens: int) -> bool:
"""Return True if the request is within budget."""
if entity not in self.limits:
return True # No limit set
# Reset period if expired
if time.time() - self.period_start.get(entity, 0) > self.period_seconds:
self.usage[entity] = 0
self.period_start[entity] = time.time()
return self.usage[entity] + estimated_tokens <= self.limits[entity]
def record_usage(self, entity: str, tokens_used: int):
self.usage[entity] += tokens_used
def remaining(self, entity: str) -> int:
if entity not in self.limits:
return float('inf')
return max(0, self.limits[entity] - self.usage[entity])
# Example: per-user budget enforcement
budget = TokenBudget(period_seconds=86400) # Daily budgets
budget.set_limit("user:alice", max_tokens=100_000)
budget.set_limit("user:bob", max_tokens=50_000)
budget.set_limit("feature:chat", max_tokens=1_000_000)
# Check before making API call
user = "user:alice"
estimated = 500 # Estimated tokens for this request
if budget.check_budget(user, estimated):
# Make the API call
budget.record_usage(user, 480) # Actual tokens used
print(f"Request allowed. {user} remaining: {budget.remaining(user):,} tokens")
else:
print(f"Budget exceeded for {user}. Remaining: {budget.remaining(user):,} tokens")
# Simulate heavy usage
budget.record_usage("user:bob", 48_000)
print(f"user:bob remaining: {budget.remaining('user:bob'):,} tokens")
print(f"user:bob can make 500-token request: {budget.check_budget('user:bob', 500)}")
Soft limits and hard limits: Implement both. A soft limit (at 80% of budget) triggers an alert so you can investigate usage patterns. A hard limit (at 100%) blocks further requests. Without a hard limit, a single runaway process (such as an infinite retry loop) can generate thousands of dollars in charges before anyone notices.
7. Graceful Degradation
When all providers are unavailable or a user has exhausted their budget, your application should not simply crash. Graceful degradation provides a reduced but functional experience. The degradation ladder, from best to worst user experience, typically follows this pattern:
- Full LLM response: Normal operation with the primary model
- Cached response: Return a previously cached answer for a similar query
- Simpler model: Fall back to a cheaper, smaller model (e.g., GPT-4o-mini instead of GPT-4o)
- Static FAQ: Match the user's query against a set of pre-written answers using keyword or embedding similarity
- Error message: Inform the user that the service is temporarily degraded and suggest trying again later
8. Production Error Handling Patterns
Bringing together retries, circuit breakers, caching, and degradation into a cohesive error handling strategy requires careful orchestration. The following pattern combines these techniques into a single resilient call function.
from dataclasses import dataclass
from enum import Enum
import time
import json
class FallbackLevel(Enum):
PRIMARY = "primary"
CACHE = "cache"
SIMPLE_MODEL = "simple_model"
STATIC = "static"
ERROR = "error"
@dataclass
class ResilientResponse:
content: str
fallback_level: FallbackLevel
latency_ms: float
model_used: str | None = None
def resilient_llm_call(messages, cache, circuit_breaker, budget,
user_id="default") -> ResilientResponse:
"""Production-grade LLM call with full resilience stack."""
start = time.time()
# Step 1: Check budget
if not budget.check_budget(user_id, estimated_tokens=500):
return ResilientResponse(
content="You have reached your daily usage limit. Please try again tomorrow.",
fallback_level=FallbackLevel.ERROR,
latency_ms=(time.time() - start) * 1000
)
# Step 2: Check cache
cached = cache.get("gpt-4o", messages, temperature=0)
if cached:
return ResilientResponse(
content=cached,
fallback_level=FallbackLevel.CACHE,
latency_ms=(time.time() - start) * 1000,
model_used="cache"
)
# Step 3: Try primary model (with circuit breaker)
if circuit_breaker.can_execute():
try:
response = call_primary_model(messages)
circuit_breaker.record_success()
cache.put("gpt-4o", messages, response)
budget.record_usage(user_id, 480)
return ResilientResponse(
content=response,
fallback_level=FallbackLevel.PRIMARY,
latency_ms=(time.time() - start) * 1000,
model_used="gpt-4o"
)
except Exception:
circuit_breaker.record_failure()
# Step 4: Try simpler model
try:
response = call_simple_model(messages)
return ResilientResponse(
content=response,
fallback_level=FallbackLevel.SIMPLE_MODEL,
latency_ms=(time.time() - start) * 1000,
model_used="gpt-4o-mini"
)
except Exception:
pass
# Step 5: Static fallback
return ResilientResponse(
content="I'm currently experiencing high demand. Please try again shortly.",
fallback_level=FallbackLevel.STATIC,
latency_ms=(time.time() - start) * 1000
)
# These would be real API calls in production
def call_primary_model(messages):
return "Response from GPT-4o"
def call_simple_model(messages):
return "Response from GPT-4o-mini"
print("Resilient LLM call pattern configured.")
print("Fallback order: Primary -> Cache -> Simple Model -> Static -> Error")
Separate TTFT and total timeouts: When configuring timeouts for LLM calls, use two separate timers. The time-to-first-token (TTFT) timeout detects when a request is stuck in a queue and will never start generating. The total generation timeout caps the overall response time. A typical configuration is 10 seconds for TTFT and 60 seconds for total generation. If TTFT expires, fail over immediately; if the total timeout expires, return whatever partial response has been received.
Knowledge Check
Show Answer
Show Answer
Show Answer
Show Answer
Show Answer
Key Takeaways
- Use an abstraction layer: LiteLLM (or a similar library) lets you call 100+ providers through a single interface. Reference logical model names in your code, and configure physical deployments in the routing layer.
- Implement exponential backoff with jitter: This is the standard retry pattern for rate limits (429) and server errors (5xx). Jitter prevents the thundering herd problem.
- Circuit breakers prevent cascading failures: When a provider is down for an extended period, the circuit breaker immediately routes to a fallback rather than making every request wait through the full retry sequence.
- Cache at two levels: Exact caching handles repeated identical requests; semantic caching catches semantically similar queries with different wording. Together, they can reduce API costs by 30% to 60% for typical workloads.
- Enforce token budgets with soft and hard limits: Track spending per user, organization, and feature. Alert at 80% (soft limit) and block at 100% (hard limit) to prevent runaway costs.
- Implement graceful degradation: Build a fallback ladder (primary model, cache, simpler model, static FAQ, error message) so your application remains functional even during partial outages.
- Adopt an AI gateway early: Portkey and Helicone add routing, caching, cost tracking, and observability with minimal code changes. The investment pays for itself through cost visibility alone.
Module 09 has given you fluency with LLM APIs, structured output, and production engineering patterns. The next frontier is agentic tool ecosystems. The Model Context Protocol (MCP) is emerging as a standard for connecting LLMs to external tools, databases, and services in a provider-agnostic way. Beyond MCP, autonomous API orchestration (where agents compose tool calls without human intervention) is reshaping how production systems are built. Module 10 covers the prompt engineering techniques that control model behavior in these pipelines, and Module 11 shows how to combine LLMs with classical ML for cost-effective production systems.