Module 09 · Section 9.3

API Engineering Best Practices

Routing, caching, retries, circuit breakers, cost management, and observability for production LLM systems
★ Big Picture

From prototype to production: Calling an LLM API in a notebook is straightforward. Running those same calls reliably at scale, across multiple providers, with cost controls, error recovery, and observability, is an engineering discipline in itself. This section covers the patterns and tools that separate production LLM systems from proof-of-concept demos. Every concept here addresses a real failure mode that teams encounter when they move from development to deployment.

1. Provider Routing with LiteLLM

LiteLLM is an open-source library that provides a unified interface for calling over 100 LLM providers using the OpenAI SDK format. Instead of writing provider-specific code for OpenAI, Anthropic, Google, and others, you call litellm.completion() with a model string that includes a provider prefix. LiteLLM handles the translation between the OpenAI format and each provider's native API.

import litellm

# Same function call, different providers
# LiteLLM translates automatically

# OpenAI
response_openai = litellm.completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Explain caching in one sentence."}],
    max_tokens=50
)

# Anthropic (note the provider prefix)
response_anthropic = litellm.completion(
    model="anthropic/claude-sonnet-4-20250514",
    messages=[{"role": "user", "content": "Explain caching in one sentence."}],
    max_tokens=50
)

# Google Gemini
response_gemini = litellm.completion(
    model="gemini/gemini-2.5-flash",
    messages=[{"role": "user", "content": "Explain caching in one sentence."}],
    max_tokens=50
)

for name, resp in [("OpenAI", response_openai),
                    ("Anthropic", response_anthropic),
                    ("Gemini", response_gemini)]:
    print(f"{name}: {resp.choices[0].message.content[:80]}...")
    print(f"  Cost: ${litellm.completion_cost(resp):.6f}")
OpenAI: Caching stores frequently accessed data in fast storage to reduce latency... Cost: $0.000342 Anthropic: Caching is the practice of storing computed results or data closer to whe... Cost: $0.000285 Gemini: Caching stores copies of data in a temporary, fast-access location so future... Cost: $0.000078

1.1 Fallback Routing

One of LiteLLM's most valuable features is automatic fallback routing. You define a primary model and one or more fallback models. If the primary model fails (due to rate limits, downtime, or errors), LiteLLM automatically retries with the next model in the list. This is critical for production systems where a single provider outage should not bring down your application.

import litellm
from litellm import Router

# Configure router with fallback models
router = Router(
    model_list=[
        {
            "model_name": "main-model",  # Your application references this name
            "litellm_params": {
                "model": "gpt-4o",
                "api_key": "sk-...",
            }
        },
        {
            "model_name": "main-model",  # Same name = fallback for the same logical model
            "litellm_params": {
                "model": "anthropic/claude-sonnet-4-20250514",
                "api_key": "sk-ant-...",
            }
        },
        {
            "model_name": "main-model",
            "litellm_params": {
                "model": "gemini/gemini-2.5-flash",
                "api_key": "...",
            }
        }
    ],
    fallbacks=[{"main-model": ["main-model"]}],  # Try all deployments
    routing_strategy="least-busy",  # Route to the deployment with lowest queue
    num_retries=2
)

# Your application code just references "main-model"
response = router.completion(
    model="main-model",
    messages=[{"role": "user", "content": "Summarize the benefits of caching."}]
)
print(response.choices[0].message.content[:100])
Caching provides several key benefits: reduced latency by serving data from fast storage, lower cost
✓ Key Insight

The abstraction layer principle: Your application code should reference logical model names (like "main-model" or "fast-classifier"), never specific provider model IDs. The routing layer maps these logical names to physical deployments. This means you can change providers, add fallbacks, or adjust routing strategies without modifying any application code.

2. Retry Strategies and Error Handling

LLM API calls fail in predictable ways. Understanding the error taxonomy lets you build appropriate recovery strategies for each failure type.

2.1 Error Taxonomy

Error HTTP Code Strategy
Rate limit exceeded 429 Exponential backoff with jitter; respect Retry-After header
Context length exceeded 400 Truncate input and retry; summarize long contexts
Content filter triggered 400 Rephrase prompt; use a different model; skip the request
Malformed tool call JSON N/A (parsing) Retry with stricter schema; include error message in reprompt
Server error 500, 502, 503 Retry with backoff; failover to another provider
Timeout N/A (network) Separate TTFT timeout from total generation timeout
Authentication failure 401, 403 Do not retry; alert on key expiration; rotate keys

2.2 Exponential Backoff with Jitter

The standard retry pattern for rate limits and transient errors is exponential backoff with jitter. Without jitter, all clients that hit a rate limit at the same time would retry at the same time, causing a "thundering herd" that overwhelms the API again. Adding random jitter spreads the retries across time.

import time
import random
from openai import OpenAI, RateLimitError, APIStatusError

client = OpenAI()

def call_with_retry(messages, max_retries=5, base_delay=1.0):
    """Call the API with exponential backoff and jitter."""
    for attempt in range(max_retries):
        try:
            return client.chat.completions.create(
                model="gpt-4o",
                messages=messages,
                max_tokens=200
            )
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise
            # Exponential backoff: 1s, 2s, 4s, 8s, 16s (with jitter)
            delay = base_delay * (2 ** attempt)
            jitter = random.uniform(0, delay * 0.5)  # Add up to 50% jitter
            wait_time = delay + jitter
            print(f"Rate limited (attempt {attempt + 1}/{max_retries}). "
                  f"Waiting {wait_time:.1f}s...")
            time.sleep(wait_time)
        except APIStatusError as e:
            if e.status_code in (500, 502, 503):
                # Server errors: retry with backoff
                delay = base_delay * (2 ** attempt)
                time.sleep(delay + random.uniform(0, 1))
                continue
            raise  # Don't retry client errors (400, 401, etc.)

response = call_with_retry([{"role": "user", "content": "Hello!"}])
print(f"Success: {response.choices[0].message.content}")
Success: Hello! How can I assist you today?

3. The Circuit Breaker Pattern

Exponential backoff handles transient failures, but what about sustained outages? If a provider is down for minutes or hours, you do not want every request to wait through the full retry sequence before failing over. The circuit breaker pattern solves this by tracking failure rates and "tripping" when failures exceed a threshold, immediately routing requests to a fallback without attempting the failed provider.

Circuit Breaker State Machine CLOSED Normal operation Requests pass through OPEN Provider is down Requests go to fallback HALF-OPEN Testing recovery Let 1 request through Failure threshold exceeded Timeout expires Test succeeds Test fails
Figure 9.5: The circuit breaker state machine. When a provider fails repeatedly, the circuit "opens" and requests are immediately routed to a fallback. After a cooldown period, a test request probes whether the provider has recovered.
import time
from dataclasses import dataclass, field
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"        # Normal operation
    OPEN = "open"            # Provider is down, use fallback
    HALF_OPEN = "half_open"  # Testing if provider recovered

@dataclass
class CircuitBreaker:
    failure_threshold: int = 5       # Failures before opening
    recovery_timeout: float = 60.0   # Seconds before testing recovery
    failure_count: int = field(default=0, init=False)
    state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    last_failure_time: float = field(default=0.0, init=False)

    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            # Check if recovery timeout has elapsed
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True  # Allow one test request
            return False
        if self.state == CircuitState.HALF_OPEN:
            return True
        return False

    def record_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit OPENED after {self.failure_count} failures. "
                  f"Routing to fallback for {self.recovery_timeout}s.")

# Usage
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)

def call_with_circuit_breaker(messages, primary_fn, fallback_fn):
    if breaker.can_execute():
        try:
            result = primary_fn(messages)
            breaker.record_success()
            return result
        except Exception as e:
            breaker.record_failure()
            if breaker.state == CircuitState.OPEN:
                return fallback_fn(messages)
            raise
    else:
        return fallback_fn(messages)

print(f"Circuit state: {breaker.state.value}")
print(f"Ready: {breaker.can_execute()}")
Circuit state: closed Ready: True

4. Caching Strategies

LLM calls are expensive and slow compared to traditional API calls. Caching is one of the most effective optimization strategies, and there are two complementary approaches: exact caching and semantic caching.

4.1 Exact Caching

The simplest caching strategy stores responses keyed by the exact input (model + messages + parameters). If the same request comes in again, the cached response is returned instantly without making an API call. This is ideal for deterministic outputs (temperature=0) or when slight variations in output are acceptable.

import hashlib
import json
import time
from functools import lru_cache

class LLMCache:
    """Simple in-memory cache for LLM responses."""

    def __init__(self, max_size=1000, ttl_seconds=3600):
        self.cache = {}
        self.max_size = max_size
        self.ttl = ttl_seconds
        self.hits = 0
        self.misses = 0

    def _make_key(self, model, messages, **kwargs):
        """Create a deterministic cache key from request parameters."""
        key_data = json.dumps({
            "model": model,
            "messages": messages,
            "params": {k: v for k, v in sorted(kwargs.items())}
        }, sort_keys=True)
        return hashlib.sha256(key_data.encode()).hexdigest()

    def get(self, model, messages, **kwargs):
        key = self._make_key(model, messages, **kwargs)
        if key in self.cache:
            entry = self.cache[key]
            if time.time() - entry["timestamp"] < self.ttl:
                self.hits += 1
                return entry["response"]
            else:
                del self.cache[key]  # Expired
        self.misses += 1
        return None

    def put(self, model, messages, response, **kwargs):
        if len(self.cache) >= self.max_size:
            # Evict oldest entry
            oldest = min(self.cache, key=lambda k: self.cache[k]["timestamp"])
            del self.cache[oldest]
        key = self._make_key(model, messages, **kwargs)
        self.cache[key] = {"response": response, "timestamp": time.time()}

    @property
    def hit_rate(self):
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0.0

cache = LLMCache(ttl_seconds=3600)
print(f"Cache initialized. Hit rate: {cache.hit_rate:.1%}")
print(f"Max size: {cache.max_size}, TTL: {cache.ttl}s")
Cache initialized. Hit rate: 0.0% Max size: 1000, TTL: 3600s

4.2 Semantic Caching

Exact caching misses when semantically identical queries use different wording. "What is the capital of France?" and "Tell me France's capital city" are the same question but produce different cache keys. Semantic caching solves this by embedding incoming queries into a vector space and searching for similar cached queries using cosine similarity. If a cached query is sufficiently similar (typically above a 0.95 cosine threshold), the cached response is returned.

import numpy as np
from dataclasses import dataclass

@dataclass
class CacheEntry:
    query_text: str
    query_embedding: np.ndarray
    response: str
    timestamp: float

class SemanticCache:
    """Semantic cache using embedding similarity."""

    def __init__(self, similarity_threshold=0.95, max_entries=500):
        self.entries: list[CacheEntry] = []
        self.threshold = similarity_threshold
        self.max_entries = max_entries

    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

    def search(self, query_embedding: np.ndarray) -> str | None:
        """Find a cached response if a similar query exists."""
        best_score = 0.0
        best_response = None
        for entry in self.entries:
            score = self._cosine_similarity(query_embedding, entry.query_embedding)
            if score > best_score:
                best_score = score
                best_response = entry.response
        if best_score >= self.threshold:
            return best_response
        return None

    def store(self, query_text, query_embedding, response):
        import time
        if len(self.entries) >= self.max_entries:
            self.entries.pop(0)  # Remove oldest
        self.entries.append(CacheEntry(
            query_text=query_text,
            query_embedding=query_embedding,
            response=response,
            timestamp=time.time()
        ))

# Example: two semantically identical queries
cache = SemanticCache(similarity_threshold=0.95)

# Simulate embeddings (in production, use an embedding model)
emb1 = np.random.randn(1536)
emb1 = emb1 / np.linalg.norm(emb1)
# A very similar embedding (simulating a semantically close query)
noise = np.random.randn(1536) * 0.02
emb2 = emb1 + noise
emb2 = emb2 / np.linalg.norm(emb2)

cache.store("What is the capital of France?", emb1, "The capital of France is Paris.")
result = cache.search(emb2)
similarity = float(np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2)))
print(f"Similarity: {similarity:.4f}")
print(f"Cache hit: {result is not None}")
print(f"Result: {result}")
Similarity: 0.9987 Cache hit: True Result: The capital of France is Paris.
⚠ Threshold Tuning Is Critical

The 0.95 cosine similarity threshold is a reasonable starting point, but it must be calibrated for your specific use case. A false cache hit (returning a wrong cached answer) is far worse than a cache miss. Build a validation set of 100+ query pairs, labeled as "same intent" or "different intent," and measure precision and recall at different thresholds. For safety-critical applications, use 0.97+. For FAQ-style workloads, 0.90 to 0.92 may be appropriate. See Section 11.4 for a detailed threshold analysis.

ⓘ Note

Production tools for semantic caching: For production use, consider GPTCache (an open-source library that integrates with multiple embedding models and vector stores) or Redis with its vector search capability. These handle the embedding, similarity search, TTL management, and eviction policies for you, so you do not need to implement them from scratch.

5. AI Gateways: Portkey and Helicone

AI gateways sit between your application and LLM providers, acting as a proxy that adds routing, observability, caching, guardrails, and cost tracking. They require minimal code changes (often just changing the base URL) and provide immediate production benefits.

5.1 Portkey

Portkey is an AI gateway that supports routing, fallbacks, spend tracking, caching, and guardrails across 1600+ LLMs. It works as a proxy: you point your OpenAI client at Portkey's gateway URL and add a configuration header. Portkey handles the rest.

from openai import OpenAI
from portkey_ai import PORTKEY_GATEWAY_URL, createHeaders

# Configure Portkey with routing and caching
client = OpenAI(
    base_url=PORTKEY_GATEWAY_URL,
    default_headers=createHeaders(
        api_key="your-portkey-key",
        config={
            "strategy": {
                "mode": "fallback",  # Try models in order
            },
            "targets": [
                {
                    "provider": "openai",
                    "api_key": "sk-...",
                    "override_params": {"model": "gpt-4o"}
                },
                {
                    "provider": "anthropic",
                    "api_key": "sk-ant-...",
                    "override_params": {"model": "claude-sonnet-4-20250514"}
                }
            ],
            "cache": {"mode": "semantic", "max_age": 3600}
        }
    )
)

# Your code is unchanged; Portkey handles routing and caching
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Explain caching briefly."}]
)
print(response.choices[0].message.content[:100])
Caching is the process of storing data in a temporary, fast-access location so that future requests

5.2 Helicone

Helicone is an open-source observability proxy focused on request logging, cost tracking, and analytics. Like Portkey, it works by changing the base URL. Helicone logs every request with latency, token counts, cost, and custom metadata, giving you a dashboard for monitoring your LLM usage.

from openai import OpenAI

# Route through Helicone for observability
client = OpenAI(
    base_url="https://oai.helicone.ai/v1",
    default_headers={
        "Helicone-Auth": "Bearer your-helicone-key",
        "Helicone-Cache-Enabled": "true",          # Enable response caching
        "Helicone-Property-Environment": "production",
        "Helicone-Property-Feature": "customer-support",  # Tag for cost attribution
    }
)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "How does Helicone work?"}]
)

# Helicone dashboard now shows:
# - Request latency, token count, estimated cost
# - Cache hit/miss status
# - Custom properties for filtering and grouping
print(f"Response received. Tokens: {response.usage.total_tokens}")
print("Check Helicone dashboard for detailed analytics.")
Response received. Tokens: 89 Check Helicone dashboard for detailed analytics.
✓ Key Insight

Start with a gateway early: Adding an AI gateway is one of the highest-leverage changes you can make to a production LLM system. The cost tracking alone pays for itself by identifying which features, users, or prompts consume the most tokens. Adding it later requires changing every API call site; adding it from the start requires changing only the base URL.

6. Token Budget Enforcement

Without explicit controls, LLM costs can spike unpredictably. Token budget enforcement tracks and limits spending at multiple granularities: per user, per organization, per feature, and per time period. The goal is to prevent runaway costs while maintaining service availability for within-budget users.

import time
from dataclasses import dataclass, field
from collections import defaultdict

@dataclass
class TokenBudget:
    """Track and enforce token spending limits."""
    limits: dict = field(default_factory=dict)       # entity -> max tokens per period
    usage: dict = field(default_factory=lambda: defaultdict(int))
    period_start: dict = field(default_factory=dict)
    period_seconds: float = 86400  # Default: daily budget

    def set_limit(self, entity: str, max_tokens: int):
        self.limits[entity] = max_tokens
        self.period_start[entity] = time.time()

    def check_budget(self, entity: str, estimated_tokens: int) -> bool:
        """Return True if the request is within budget."""
        if entity not in self.limits:
            return True  # No limit set
        # Reset period if expired
        if time.time() - self.period_start.get(entity, 0) > self.period_seconds:
            self.usage[entity] = 0
            self.period_start[entity] = time.time()
        return self.usage[entity] + estimated_tokens <= self.limits[entity]

    def record_usage(self, entity: str, tokens_used: int):
        self.usage[entity] += tokens_used

    def remaining(self, entity: str) -> int:
        if entity not in self.limits:
            return float('inf')
        return max(0, self.limits[entity] - self.usage[entity])

# Example: per-user budget enforcement
budget = TokenBudget(period_seconds=86400)  # Daily budgets
budget.set_limit("user:alice", max_tokens=100_000)
budget.set_limit("user:bob", max_tokens=50_000)
budget.set_limit("feature:chat", max_tokens=1_000_000)

# Check before making API call
user = "user:alice"
estimated = 500  # Estimated tokens for this request

if budget.check_budget(user, estimated):
    # Make the API call
    budget.record_usage(user, 480)  # Actual tokens used
    print(f"Request allowed. {user} remaining: {budget.remaining(user):,} tokens")
else:
    print(f"Budget exceeded for {user}. Remaining: {budget.remaining(user):,} tokens")

# Simulate heavy usage
budget.record_usage("user:bob", 48_000)
print(f"user:bob remaining: {budget.remaining('user:bob'):,} tokens")
print(f"user:bob can make 500-token request: {budget.check_budget('user:bob', 500)}")
Request allowed. user:alice remaining: 99,520 tokens user:bob remaining: 2,000 tokens user:bob can make 500-token request: True
⚠ Warning

Soft limits and hard limits: Implement both. A soft limit (at 80% of budget) triggers an alert so you can investigate usage patterns. A hard limit (at 100%) blocks further requests. Without a hard limit, a single runaway process (such as an infinite retry loop) can generate thousands of dollars in charges before anyone notices.

7. Graceful Degradation

When all providers are unavailable or a user has exhausted their budget, your application should not simply crash. Graceful degradation provides a reduced but functional experience. The degradation ladder, from best to worst user experience, typically follows this pattern:

  1. Full LLM response: Normal operation with the primary model
  2. Cached response: Return a previously cached answer for a similar query
  3. Simpler model: Fall back to a cheaper, smaller model (e.g., GPT-4o-mini instead of GPT-4o)
  4. Static FAQ: Match the user's query against a set of pre-written answers using keyword or embedding similarity
  5. Error message: Inform the user that the service is temporarily degraded and suggest trying again later
Graceful Degradation Ladder Level 1 Full LLM Response Level 2 Cached Response Level 3 Simpler Model Level 4 Static FAQ Level 5 Error Message Best UX Worst UX Each level is a fallback when the level above is unavailable
Figure 9.6: The graceful degradation ladder. A production system should implement at least three levels to maintain service availability during partial outages.

8. Production Error Handling Patterns

Bringing together retries, circuit breakers, caching, and degradation into a cohesive error handling strategy requires careful orchestration. The following pattern combines these techniques into a single resilient call function.

from dataclasses import dataclass
from enum import Enum
import time
import json

class FallbackLevel(Enum):
    PRIMARY = "primary"
    CACHE = "cache"
    SIMPLE_MODEL = "simple_model"
    STATIC = "static"
    ERROR = "error"

@dataclass
class ResilientResponse:
    content: str
    fallback_level: FallbackLevel
    latency_ms: float
    model_used: str | None = None

def resilient_llm_call(messages, cache, circuit_breaker, budget,
                        user_id="default") -> ResilientResponse:
    """Production-grade LLM call with full resilience stack."""
    start = time.time()

    # Step 1: Check budget
    if not budget.check_budget(user_id, estimated_tokens=500):
        return ResilientResponse(
            content="You have reached your daily usage limit. Please try again tomorrow.",
            fallback_level=FallbackLevel.ERROR,
            latency_ms=(time.time() - start) * 1000
        )

    # Step 2: Check cache
    cached = cache.get("gpt-4o", messages, temperature=0)
    if cached:
        return ResilientResponse(
            content=cached,
            fallback_level=FallbackLevel.CACHE,
            latency_ms=(time.time() - start) * 1000,
            model_used="cache"
        )

    # Step 3: Try primary model (with circuit breaker)
    if circuit_breaker.can_execute():
        try:
            response = call_primary_model(messages)
            circuit_breaker.record_success()
            cache.put("gpt-4o", messages, response)
            budget.record_usage(user_id, 480)
            return ResilientResponse(
                content=response,
                fallback_level=FallbackLevel.PRIMARY,
                latency_ms=(time.time() - start) * 1000,
                model_used="gpt-4o"
            )
        except Exception:
            circuit_breaker.record_failure()

    # Step 4: Try simpler model
    try:
        response = call_simple_model(messages)
        return ResilientResponse(
            content=response,
            fallback_level=FallbackLevel.SIMPLE_MODEL,
            latency_ms=(time.time() - start) * 1000,
            model_used="gpt-4o-mini"
        )
    except Exception:
        pass

    # Step 5: Static fallback
    return ResilientResponse(
        content="I'm currently experiencing high demand. Please try again shortly.",
        fallback_level=FallbackLevel.STATIC,
        latency_ms=(time.time() - start) * 1000
    )

# These would be real API calls in production
def call_primary_model(messages):
    return "Response from GPT-4o"
def call_simple_model(messages):
    return "Response from GPT-4o-mini"

print("Resilient LLM call pattern configured.")
print("Fallback order: Primary -> Cache -> Simple Model -> Static -> Error")
Resilient LLM call pattern configured. Fallback order: Primary -> Cache -> Simple Model -> Static -> Error
ⓘ Note

Separate TTFT and total timeouts: When configuring timeouts for LLM calls, use two separate timers. The time-to-first-token (TTFT) timeout detects when a request is stuck in a queue and will never start generating. The total generation timeout caps the overall response time. A typical configuration is 10 seconds for TTFT and 60 seconds for total generation. If TTFT expires, fail over immediately; if the total timeout expires, return whatever partial response has been received.

Knowledge Check

1. What problem does "jitter" solve in exponential backoff?
Show Answer
Without jitter, all clients that hit a rate limit at the same time would retry at exactly the same time (after 1s, 2s, 4s, etc.), creating a "thundering herd" effect that overwhelms the API again. Jitter adds a random delay to each retry, spreading retries across time and preventing synchronized spikes. Typical implementations add random jitter of up to 50% of the backoff delay.
2. What are the three states of a circuit breaker, and what does each mean?
Show Answer
Closed: Normal operation; requests pass through to the provider. Open: The provider has failed repeatedly (exceeding the failure threshold), so requests are immediately routed to a fallback without attempting the provider. Half-Open: After a recovery timeout, one test request is allowed through. If it succeeds, the circuit closes (back to normal). If it fails, the circuit opens again.
3. How does semantic caching differ from exact caching?
Show Answer
Exact caching uses the full request (model + messages + parameters) as a hash key; the same query in different words produces a different key and a cache miss. Semantic caching embeds the query into a vector and searches for cached queries with high cosine similarity (typically above 0.95). This means semantically equivalent queries with different wording can still produce cache hits, significantly improving the cache hit rate.
4. Why should you implement both soft and hard token budget limits?
Show Answer
A soft limit (at ~80% of budget) triggers an alert for investigation, allowing teams to understand usage patterns and adjust limits before users are affected. A hard limit (at 100%) blocks further requests to prevent runaway costs. Without a hard limit, a single malfunctioning process (such as an infinite retry loop) can generate thousands of dollars in charges before detection. Without a soft limit, users hit the hard limit without warning.
5. What is the advantage of using an AI gateway like Portkey or Helicone over implementing routing and observability yourself?
Show Answer
AI gateways require minimal code changes (typically just changing the base URL and adding a header) while providing a comprehensive suite of production features: automatic routing and fallbacks, request logging, cost tracking, caching, rate limiting, and analytics dashboards. Implementing these features from scratch requires significant engineering effort and ongoing maintenance. Gateways also provide pre-built integrations with 100+ LLM providers and battle-tested implementations of patterns like circuit breaking and semantic caching.

Key Takeaways

🎓 Where This Leads Next

Module 09 has given you fluency with LLM APIs, structured output, and production engineering patterns. The next frontier is agentic tool ecosystems. The Model Context Protocol (MCP) is emerging as a standard for connecting LLMs to external tools, databases, and services in a provider-agnostic way. Beyond MCP, autonomous API orchestration (where agents compose tool calls without human intervention) is reshaping how production systems are built. Module 10 covers the prompt engineering techniques that control model behavior in these pipelines, and Module 11 shows how to combine LLMs with classical ML for cost-effective production systems.