Moving from prototype to production requires solving the hardest problems in agent engineering: what happens when things go wrong. Real-world agentic workflows must handle network failures, LLM hallucinations, token budget exhaustion, and unexpected tool errors gracefully. This section covers the engineering patterns that make multi-agent systems reliable: state machine workflows with conditional branching, retry and compensation logic, checkpointing for long-running tasks, and human-in-the-loop gates that catch problems before they propagate. The lab builds a complete multi-agent research pipeline that demonstrates all of these patterns working together.
1. Workflow Engines for Agent Orchestration
Agent workflows need more than simple sequential execution. They require conditional branching (take path A if the research is sufficient, path B if more data is needed), loops (keep revising until the reviewer approves), parallel execution (run multiple researchers simultaneously), and error handling (retry on failure, compensate on unrecoverable errors). Two dominant approaches have emerged: LangGraph's built-in state machine model and external workflow engines like Temporal.
| Feature | LangGraph | Temporal |
|---|---|---|
| Execution model | In-process graph traversal | Distributed workflow engine |
| State management | TypedDict with checkpointing | Event-sourced workflow history |
| Retry logic | Custom in conditional edges | Built-in retry policies |
| Fault tolerance | Checkpoint-based recovery | Automatic workflow replay |
| Best for | LLM-centric workflows | Long-running, distributed workflows |
| Complexity | Library (pip install) | Infrastructure (server + workers) |
2. Conditional Branching and Loops
The most common workflow pattern in agent systems is the conditional loop: execute a step, evaluate the result, and either proceed to the next stage or loop back for refinement. In LangGraph, this is implemented through conditional edges that inspect the current state and return the name of the next node to execute.
from typing import TypedDict, Annotated, Literal from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages class WorkflowState(TypedDict): messages: Annotated[list, add_messages] draft: str review_feedback: str revision_count: int quality_score: float approved: bool def writer_node(state: WorkflowState) -> dict: """Generate or revise the draft based on feedback.""" if state["revision_count"] == 0: prompt = "Write an initial draft based on the research." else: prompt = ( f"Revise the draft based on this feedback: " f"{state['review_feedback']}" ) draft = writer_llm.invoke(prompt) return { "draft": draft.content, "revision_count": state["revision_count"] + 1 } def reviewer_node(state: WorkflowState) -> dict: """Review the draft and assign a quality score.""" review = reviewer_llm.invoke( f"Review this draft critically. Score 0 to 1.\n\n{state['draft']}" ) score = extract_score(review.content) return { "review_feedback": review.content, "quality_score": score, "approved": score >= 0.8 } def should_revise(state: WorkflowState) -> Literal["revise", "publish"]: """Loop back for revision or proceed to publish.""" if state["approved"]: return "publish" if state["revision_count"] >= 3: # Safety limit return "publish" # Publish best effort return "revise" graph = StateGraph(WorkflowState) graph.add_node("write", writer_node) graph.add_node("review", reviewer_node) graph.add_node("publish", publish_node) graph.set_entry_point("write") graph.add_edge("write", "review") graph.add_conditional_edges("review", should_revise, { "revise": "write", # Loop back "publish": "publish" # Proceed }) graph.add_edge("publish", END)
Always set a maximum iteration limit on loops. Without a safety bound, an overly critical reviewer combined with a writer that cannot satisfy its demands will loop forever, burning tokens and never producing output. The revision_count >= 3 guard in the example above is not just good practice; it is essential for production reliability. When the limit is hit, publish the best available version and log a warning rather than failing silently.
3. Error Handling, Retries, and Compensation
Production workflows must handle three categories of failures: transient errors (API rate limits, network timeouts) that succeed on retry, permanent errors (invalid input, unsupported operation) that require alternative approaches, and partial failures where some steps succeed before a later step fails. Compensation logic undoes the effects of previously completed steps when a workflow cannot proceed.
import asyncio from tenacity import retry, stop_after_attempt, wait_exponential class ResilientAgentNode: """Wraps an agent node with retry and fallback logic.""" def __init__(self, primary_llm, fallback_llm, max_retries=3): self.primary = primary_llm self.fallback = fallback_llm self.max_retries = max_retries @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=30) ) async def _call_with_retry(self, llm, messages): return await llm.ainvoke(messages) async def __call__(self, state: dict) -> dict: messages = state["messages"] try: # Try primary model with retries result = await self._call_with_retry(self.primary, messages) except Exception as e: # Fall back to secondary model try: result = await self._call_with_retry( self.fallback, messages ) except Exception as fallback_err: # Both models failed; return error state return { "error": f"All models failed: {fallback_err}", "needs_human": True } return {"messages": [result]}
Compensation logic is the most commonly overlooked failure mode. If your workflow sends an email in step 3 and then fails in step 4, you cannot unsend the email. Design your workflows so that irreversible actions (sending messages, making payments, publishing content) happen as late as possible in the pipeline, ideally as the final step. If irreversible actions must happen mid-workflow, implement compensating actions (send a correction email, issue a refund, unpublish the content) that execute when downstream steps fail.
4. Checkpointing and Streaming
Long-running agent workflows can take minutes or even hours. Checkpointing saves the workflow state after each step so that execution can resume from the last successful point after a crash or restart. Streaming provides real-time visibility into the workflow's progress, letting users see intermediate results and intervene if needed.
from langgraph.checkpoint.postgres import PostgresSaver # Production-grade checkpointing with PostgreSQL checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@localhost:5432/agents" ) app = graph.compile(checkpointer=checkpointer) # Stream execution to see real-time progress config = {"configurable": {"thread_id": "research-task-101"}} async for event in app.astream_events( {"messages": [("user", "Research and write about fusion energy")]}, config=config, version="v2" ): if event["event"] == "on_chain_start": print(f"Starting: {event['name']}") elif event["event"] == "on_chain_end": print(f"Completed: {event['name']}") elif event["event"] == "on_chat_model_stream": # Stream individual tokens for real-time display print(event["data"]["chunk"].content, end="")
5. Human-in-the-Loop Gates
For high-stakes workflows, human review gates pause execution at critical decision points
and wait for human approval before proceeding. LangGraph supports this natively through
its interrupt_before parameter, which halts execution before specified nodes
and waits for explicit resumption.
# Compile with interrupt points for human review app = graph.compile( checkpointer=checkpointer, interrupt_before=["publish"] # Pause before publishing ) # Run until the interrupt point config = {"configurable": {"thread_id": "article-draft-42"}} result = app.invoke(initial_state, config) # At this point, execution is paused before "publish" # A human reviews the draft (could be hours later) current_state = app.get_state(config) draft = current_state.values["draft"] print("Draft for review:", draft[:500]) # Human approves: resume execution app.update_state(config, {"human_approved": True}) final_result = app.invoke(None, config) # Resume from checkpoint # Human rejects: update with feedback and re-run from writer # app.update_state(config, {"review_feedback": "Needs more data"}) # final_result = app.invoke(None, config)
Human-in-the-loop gates work because of checkpointing. When execution pauses at an interrupt point, the full workflow state is serialized to the checkpoint store. The human can review the state hours or days later, and resumption picks up exactly where it left off. This is why a durable checkpoint backend (PostgreSQL, not in-memory) is essential for production human-in-the-loop workflows.
6. Lab: Multi-Agent Research System
This lab builds a complete multi-agent research pipeline with four stages: a Planner that decomposes the research topic into subtasks, a Researcher that gathers information for each subtask, a Writer that synthesizes the research into a coherent document, and a Reviewer that provides quality feedback. The system includes a revision loop between the Writer and Reviewer, a human approval gate before final output, and checkpoint-based recovery.
from typing import TypedDict, Annotated from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from langgraph.checkpoint.postgres import PostgresSaver class ResearchPipelineState(TypedDict): messages: Annotated[list, add_messages] topic: str subtasks: list[str] research_results: list[dict] draft: str review_feedback: str quality_score: float revision_count: int human_approved: bool def planner(state: ResearchPipelineState) -> dict: """Decompose the topic into 3-5 research subtasks.""" plan = planner_llm.invoke( f"Break this topic into 3-5 specific research subtasks: " f"{state['topic']}" ) subtasks = parse_subtasks(plan.content) return {"subtasks": subtasks} async def parallel_research(state: ResearchPipelineState) -> dict: """Research all subtasks in parallel.""" tasks = [ research_agent.ainvoke(subtask) for subtask in state["subtasks"] ] results = await asyncio.gather(*tasks, return_exceptions=True) # Filter out failed tasks, keep successful results valid = [r for r in results if not isinstance(r, Exception)] return {"research_results": valid} # Assemble the full pipeline graph = StateGraph(ResearchPipelineState) graph.add_node("plan", planner) graph.add_node("research", parallel_research) graph.add_node("write", writer_node) graph.add_node("review", reviewer_node) graph.add_node("publish", publish_node) graph.set_entry_point("plan") graph.add_edge("plan", "research") graph.add_edge("research", "write") graph.add_edge("write", "review") graph.add_conditional_edges("review", should_revise, { "revise": "write", "publish": "publish" }) graph.add_edge("publish", END) # Compile with checkpointing and human-in-the-loop checkpointer = PostgresSaver.from_conn_string(DB_URL) app = graph.compile( checkpointer=checkpointer, interrupt_before=["publish"] )
The return_exceptions=True parameter in asyncio.gather is crucial for resilient parallel execution. Without it, a single failed research subtask would cancel all parallel tasks and crash the entire pipeline. With it, failed tasks return their exception as a result, letting you filter them out and continue with the successful results. This graceful degradation pattern applies to any parallel agent execution scenario.
Knowledge Check
1. Why is a maximum iteration limit essential for revision loops in agent workflows?
Show Answer
2. What is "compensation logic" in the context of workflow error handling?
Show Answer
3. How does LangGraph's interrupt_before work with checkpointing to enable human-in-the-loop review?
Show Answer
interrupt_before, LangGraph serializes the full workflow state to the checkpoint store and pauses. The human can review the state at any later time (minutes, hours, or days). To resume, the human (or application code) calls app.invoke(None, config) with the same thread config, and execution picks up from the checkpointed state. The human can also modify the state before resuming (for example, adding feedback) using app.update_state().4. Why does the parallel research node use return_exceptions=True in asyncio.gather?
Show Answer
return_exceptions=True, if any single research subtask raises an exception, asyncio.gather cancels all other pending tasks and propagates the exception. This means one failed subtask kills the entire research phase. With return_exceptions=True, failed tasks return their exception as a result value while successful tasks complete normally. The code then filters out exceptions and continues with whatever results were successfully gathered.5. What are the advantages of using PostgreSQL over SQLite for checkpointing in production?
Show Answer
Key Takeaways
- Production workflows need conditional branching and bounded loops: always set maximum iteration limits on revision cycles and implement clear exit conditions to prevent runaway execution.
- Error handling requires three strategies: retries with exponential backoff for transient failures, fallback models for provider outages, and compensation logic for undoing the effects of completed steps when later steps fail.
- Checkpointing enables both crash recovery and human-in-the-loop: use durable backends (PostgreSQL) for production systems where workflows may need to pause for hours or days awaiting human review.
- Streaming provides real-time visibility: use
astream_eventsto show users intermediate progress and enable early intervention when workflows go off track. - Design for graceful degradation: parallel tasks should use
return_exceptions=True, failed research should not block writing, and irreversible actions should happen as late as possible in the pipeline.