Module 22 · Section 22.3

Agentic Workflows & Pipelines

State machines, conditional branching, error recovery, checkpointing, and building production-grade multi-agent research systems
★ Big Picture

Moving from prototype to production requires solving the hardest problems in agent engineering: what happens when things go wrong. Real-world agentic workflows must handle network failures, LLM hallucinations, token budget exhaustion, and unexpected tool errors gracefully. This section covers the engineering patterns that make multi-agent systems reliable: state machine workflows with conditional branching, retry and compensation logic, checkpointing for long-running tasks, and human-in-the-loop gates that catch problems before they propagate. The lab builds a complete multi-agent research pipeline that demonstrates all of these patterns working together.

1. Workflow Engines for Agent Orchestration

Agent workflows need more than simple sequential execution. They require conditional branching (take path A if the research is sufficient, path B if more data is needed), loops (keep revising until the reviewer approves), parallel execution (run multiple researchers simultaneously), and error handling (retry on failure, compensate on unrecoverable errors). Two dominant approaches have emerged: LangGraph's built-in state machine model and external workflow engines like Temporal.

Feature LangGraph Temporal
Execution model In-process graph traversal Distributed workflow engine
State management TypedDict with checkpointing Event-sourced workflow history
Retry logic Custom in conditional edges Built-in retry policies
Fault tolerance Checkpoint-based recovery Automatic workflow replay
Best for LLM-centric workflows Long-running, distributed workflows
Complexity Library (pip install) Infrastructure (server + workers)

2. Conditional Branching and Loops

The most common workflow pattern in agent systems is the conditional loop: execute a step, evaluate the result, and either proceed to the next stage or loop back for refinement. In LangGraph, this is implemented through conditional edges that inspect the current state and return the name of the next node to execute.

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages

class WorkflowState(TypedDict):
    messages: Annotated[list, add_messages]
    draft: str
    review_feedback: str
    revision_count: int
    quality_score: float
    approved: bool

def writer_node(state: WorkflowState) -> dict:
    """Generate or revise the draft based on feedback."""
    if state["revision_count"] == 0:
        prompt = "Write an initial draft based on the research."
    else:
        prompt = (
            f"Revise the draft based on this feedback: "
            f"{state['review_feedback']}"
        )
    draft = writer_llm.invoke(prompt)
    return {
        "draft": draft.content,
        "revision_count": state["revision_count"] + 1
    }

def reviewer_node(state: WorkflowState) -> dict:
    """Review the draft and assign a quality score."""
    review = reviewer_llm.invoke(
        f"Review this draft critically. Score 0 to 1.\n\n{state['draft']}"
    )
    score = extract_score(review.content)
    return {
        "review_feedback": review.content,
        "quality_score": score,
        "approved": score >= 0.8
    }

def should_revise(state: WorkflowState) -> Literal["revise", "publish"]:
    """Loop back for revision or proceed to publish."""
    if state["approved"]:
        return "publish"
    if state["revision_count"] >= 3:  # Safety limit
        return "publish"  # Publish best effort
    return "revise"

graph = StateGraph(WorkflowState)
graph.add_node("write", writer_node)
graph.add_node("review", reviewer_node)
graph.add_node("publish", publish_node)

graph.set_entry_point("write")
graph.add_edge("write", "review")
graph.add_conditional_edges("review", should_revise, {
    "revise": "write",   # Loop back
    "publish": "publish"  # Proceed
})
graph.add_edge("publish", END)
◆ Key Insight

Always set a maximum iteration limit on loops. Without a safety bound, an overly critical reviewer combined with a writer that cannot satisfy its demands will loop forever, burning tokens and never producing output. The revision_count >= 3 guard in the example above is not just good practice; it is essential for production reliability. When the limit is hit, publish the best available version and log a warning rather than failing silently.

3. Error Handling, Retries, and Compensation

Production workflows must handle three categories of failures: transient errors (API rate limits, network timeouts) that succeed on retry, permanent errors (invalid input, unsupported operation) that require alternative approaches, and partial failures where some steps succeed before a later step fails. Compensation logic undoes the effects of previously completed steps when a workflow cannot proceed.

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

class ResilientAgentNode:
    """Wraps an agent node with retry and fallback logic."""

    def __init__(self, primary_llm, fallback_llm, max_retries=3):
        self.primary = primary_llm
        self.fallback = fallback_llm
        self.max_retries = max_retries

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=30)
    )
    async def _call_with_retry(self, llm, messages):
        return await llm.ainvoke(messages)

    async def __call__(self, state: dict) -> dict:
        messages = state["messages"]
        try:
            # Try primary model with retries
            result = await self._call_with_retry(self.primary, messages)
        except Exception as e:
            # Fall back to secondary model
            try:
                result = await self._call_with_retry(
                    self.fallback, messages
                )
            except Exception as fallback_err:
                # Both models failed; return error state
                return {
                    "error": f"All models failed: {fallback_err}",
                    "needs_human": True
                }
        return {"messages": [result]}
⚠ Warning

Compensation logic is the most commonly overlooked failure mode. If your workflow sends an email in step 3 and then fails in step 4, you cannot unsend the email. Design your workflows so that irreversible actions (sending messages, making payments, publishing content) happen as late as possible in the pipeline, ideally as the final step. If irreversible actions must happen mid-workflow, implement compensating actions (send a correction email, issue a refund, unpublish the content) that execute when downstream steps fail.

4. Checkpointing and Streaming

Long-running agent workflows can take minutes or even hours. Checkpointing saves the workflow state after each step so that execution can resume from the last successful point after a crash or restart. Streaming provides real-time visibility into the workflow's progress, letting users see intermediate results and intervene if needed.

Checkpointing and Recovery Flow Plan ✓ Saved Research ✓ Saved Write ✗ Failed Resume from last checkpoint Write Retry Review Continue
Figure 22.5: Checkpointing enables recovery from failures without re-executing completed steps
from langgraph.checkpoint.postgres import PostgresSaver

# Production-grade checkpointing with PostgreSQL
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost:5432/agents"
)
app = graph.compile(checkpointer=checkpointer)

# Stream execution to see real-time progress
config = {"configurable": {"thread_id": "research-task-101"}}

async for event in app.astream_events(
    {"messages": [("user", "Research and write about fusion energy")]},
    config=config,
    version="v2"
):
    if event["event"] == "on_chain_start":
        print(f"Starting: {event['name']}")
    elif event["event"] == "on_chain_end":
        print(f"Completed: {event['name']}")
    elif event["event"] == "on_chat_model_stream":
        # Stream individual tokens for real-time display
        print(event["data"]["chunk"].content, end="")

5. Human-in-the-Loop Gates

For high-stakes workflows, human review gates pause execution at critical decision points and wait for human approval before proceeding. LangGraph supports this natively through its interrupt_before parameter, which halts execution before specified nodes and waits for explicit resumption.

# Compile with interrupt points for human review
app = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["publish"]  # Pause before publishing
)

# Run until the interrupt point
config = {"configurable": {"thread_id": "article-draft-42"}}
result = app.invoke(initial_state, config)

# At this point, execution is paused before "publish"
# A human reviews the draft (could be hours later)
current_state = app.get_state(config)
draft = current_state.values["draft"]
print("Draft for review:", draft[:500])

# Human approves: resume execution
app.update_state(config, {"human_approved": True})
final_result = app.invoke(None, config)  # Resume from checkpoint

# Human rejects: update with feedback and re-run from writer
# app.update_state(config, {"review_feedback": "Needs more data"})
# final_result = app.invoke(None, config)
ⓘ Note

Human-in-the-loop gates work because of checkpointing. When execution pauses at an interrupt point, the full workflow state is serialized to the checkpoint store. The human can review the state hours or days later, and resumption picks up exactly where it left off. This is why a durable checkpoint backend (PostgreSQL, not in-memory) is essential for production human-in-the-loop workflows.

6. Lab: Multi-Agent Research System

This lab builds a complete multi-agent research pipeline with four stages: a Planner that decomposes the research topic into subtasks, a Researcher that gathers information for each subtask, a Writer that synthesizes the research into a coherent document, and a Reviewer that provides quality feedback. The system includes a revision loop between the Writer and Reviewer, a human approval gate before final output, and checkpoint-based recovery.

Multi-Agent Research Pipeline Research Topic Planner Agent Researcher 1 Researcher 2 Researcher 3 Writer Agent Reviewer Agent Revision loop (max 3 rounds) Human Gate Approve / Revise
Figure 22.6: The complete research pipeline with parallel research, revision loops, and human approval
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.postgres import PostgresSaver

class ResearchPipelineState(TypedDict):
    messages: Annotated[list, add_messages]
    topic: str
    subtasks: list[str]
    research_results: list[dict]
    draft: str
    review_feedback: str
    quality_score: float
    revision_count: int
    human_approved: bool

def planner(state: ResearchPipelineState) -> dict:
    """Decompose the topic into 3-5 research subtasks."""
    plan = planner_llm.invoke(
        f"Break this topic into 3-5 specific research subtasks: "
        f"{state['topic']}"
    )
    subtasks = parse_subtasks(plan.content)
    return {"subtasks": subtasks}

async def parallel_research(state: ResearchPipelineState) -> dict:
    """Research all subtasks in parallel."""
    tasks = [
        research_agent.ainvoke(subtask)
        for subtask in state["subtasks"]
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    # Filter out failed tasks, keep successful results
    valid = [r for r in results if not isinstance(r, Exception)]
    return {"research_results": valid}

# Assemble the full pipeline
graph = StateGraph(ResearchPipelineState)
graph.add_node("plan", planner)
graph.add_node("research", parallel_research)
graph.add_node("write", writer_node)
graph.add_node("review", reviewer_node)
graph.add_node("publish", publish_node)

graph.set_entry_point("plan")
graph.add_edge("plan", "research")
graph.add_edge("research", "write")
graph.add_edge("write", "review")
graph.add_conditional_edges("review", should_revise, {
    "revise": "write",
    "publish": "publish"
})
graph.add_edge("publish", END)

# Compile with checkpointing and human-in-the-loop
checkpointer = PostgresSaver.from_conn_string(DB_URL)
app = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["publish"]
)
◆ Key Insight

The return_exceptions=True parameter in asyncio.gather is crucial for resilient parallel execution. Without it, a single failed research subtask would cancel all parallel tasks and crash the entire pipeline. With it, failed tasks return their exception as a result, letting you filter them out and continue with the successful results. This graceful degradation pattern applies to any parallel agent execution scenario.

Knowledge Check

1. Why is a maximum iteration limit essential for revision loops in agent workflows?

Show Answer
Without a maximum iteration limit, an overly critical reviewer combined with a writer that cannot meet its standards will loop indefinitely, consuming tokens and compute resources without producing output. The limit ensures the workflow always terminates. When the limit is reached, the system should publish the best available version and log a warning, rather than failing entirely. A typical limit is 3 to 5 revision rounds.

2. What is "compensation logic" in the context of workflow error handling?

Show Answer
Compensation logic undoes the effects of previously completed steps when a later step fails. For example, if step 3 sent an email and step 4 fails, compensation logic for step 3 might send a correction or retraction email. It is needed because some actions are irreversible once completed. Best practice is to place irreversible actions (sending messages, publishing content, making payments) as late as possible in the workflow to minimize the need for compensation.

3. How does LangGraph's interrupt_before work with checkpointing to enable human-in-the-loop review?

Show Answer
When execution reaches a node listed in interrupt_before, LangGraph serializes the full workflow state to the checkpoint store and pauses. The human can review the state at any later time (minutes, hours, or days). To resume, the human (or application code) calls app.invoke(None, config) with the same thread config, and execution picks up from the checkpointed state. The human can also modify the state before resuming (for example, adding feedback) using app.update_state().

4. Why does the parallel research node use return_exceptions=True in asyncio.gather?

Show Answer
Without return_exceptions=True, if any single research subtask raises an exception, asyncio.gather cancels all other pending tasks and propagates the exception. This means one failed subtask kills the entire research phase. With return_exceptions=True, failed tasks return their exception as a result value while successful tasks complete normally. The code then filters out exceptions and continues with whatever results were successfully gathered.

5. What are the advantages of using PostgreSQL over SQLite for checkpointing in production?

Show Answer
PostgreSQL offers several production advantages: concurrent access from multiple workers or processes (SQLite locks the entire database on writes), network accessibility (workers on different machines can share the same checkpoint store), proper transaction isolation for reliable state updates, better performance under concurrent load, built-in backup and replication features, and the ability to query checkpoint data for monitoring and debugging across many workflow instances.

Key Takeaways