Files
2026-02-23 13:11:51 -06:00

6.1 KiB

LangGraph Execution Model

Version: 1.0.0
Last Updated: 2026-02-23


Overview

This document details how LangGraph executes graphs, based on direct source code analysis of pregel/_loop.py and related files.


Pregel Superstep Model

The Core Loop

LangGraph is inspired by Google's Pregel — a system for large-scale graph processing:

Pregel = "Think like a vertex"
- Each node computes independently
- Nodes communicate via messages
- Synchronous supersteps with barrier
- Fault tolerance via checkpointing

Execution Flow

High-Level

graph.invoke(input, config)
         │
         ▼
    [Compile graph if needed]
         │
         ▼
    [Load checkpoint if resuming]
         │
         ▼
    ┌─────────────────────────────────┐
    │     FOR each superstep:          │
    │  1. prepare_next_tasks()         │
    │  2. execute_tasks()              │
    │  3. apply_writes()              │
    │  4. checkpoint() (if enabled)    │
    └─────────────────────────────────┘
         │
         ▼
    [Return final state]

Detailed Superstep

Step 1: Prepare Next Tasks

From _algo.py:

def prepare_next_tasks(
    checkpoint: Checkpoint,
    nodes: dict[str, PregelNode],
    channels: dict[str, BaseChannel],
    pending_writes: list[tuple],
    etc.
) -> list[PregelExecutableTask]:
    """Determine which nodes to run in this superstep."""
    
    # For each node:
    # 1. Check if triggered (input channels have values)
    # 2. Check if should run (not already running)
    # 3. Create executable task

Step 2: Execute Tasks

From _runner.py:

async def execute_tasks(tasks: list[PregelExecutableTask]):
    """Execute tasks in parallel."""
    
    # Submit all tasks to executor
    # Each task:
    # 1. Read input from channels
    # 2. Execute node function
    # 3. Return writes (channel updates)

Step 3: Apply Writes

From _algo.py:

def apply_writes(
    checkpoint: Checkpoint,
    pending_writes: list[tuple],
    channels: dict[str, BaseChannel]
):
    """Apply writes to channels using reducers."""
    
    # For each write:
    # 1. Identify target channel
    # 2. Apply reducer to merge with existing value

Step 4: Checkpoint

From _checkpoint.py:

def create_checkpoint(
    channels: dict[str, BaseChannel],
    versions: dict[str, int],
    metadata: CheckpointMetadata
) -> Checkpoint:
    """Snapshot all channel values."""
    
    return {
        "channel_values": {
            k: v.checkpoint() for k, v in channels.items()
        },
        "channel_versions": versions,
        "metadata": metadata,
    }

PregelTask Structure

From types.py

class PregelExecutableTask(NamedTuple):
    """A single executable task in the graph."""
    
    name: str                    # Node name
    path: str                   # Task path
    input: Any                  # Input to node
    proc: Callable              # Node function
    writes: list[Send]          # Dynamic sends
    triggers: list[str]        # Channels that trigger this
    interrupt_after: bool       # Interrupt after execution
    interrupt_before: bool     # Interrupt before execution

Send (Dynamic Edges)

What is Send?

Send enables dynamic node spawning — a node can spawn multiple tasks:

from langgraph.types import Send

def splitter(state):
    messages = state["messages"]
    return [
        Send("process_email", {"email": email})
        for email in messages
    ]

Send Implementation

class Send(NamedTuple):
    """Dynamic edge - spawn a task for another node."""
    
    node: str           # Target node name
    arg: Any           # Input to target node

Interrupt (Human-in-the-Loop)

How Interrupts Work

From types.py:

class Interrupt(NamedTuple):
    """Pause execution for human input."""
    
    value: Any         # Data to show human
    when: str          # "during" or "after"

Interrupt Flow

1. Node calls interrupt(data)
        │
        ▼
2. PregelLoop pauses
        │
        ▼
3. Returns to caller with interrupt value
        │
        ▼
4. Caller (human) provides input
        │
        ▼
5. Resume with Command(resume=data)

Resume with Command

from langgraph.types import Command

# Resume with new data
graph.invoke(
    None,  # No new input
    config=RunnableConfig(
        configurable={"checkpoint_id": "abc123"},
        resume={"feedback": "looks good"}
    )
)

Stream Mode

From types.py

StreamMode = Literal[
    "values",      # Full state after each step
    "updates",     # Node-specific updates
    "checkpoints", # Checkpoint snapshots
    "tasks",       # Task start/complete
    "debug",       # Debug info
    "messages",    # Message streams
    "custom",      # Custom streams
]

Retry Policy

From types.py

class RetryPolicy:
    """Configuration for node retry behavior."""
    
    max_attempts: int = 3
    initial_interval: float = 1.0
    backoff_factor: float = 2.0
    max_interval: float = 100.0

Error Handling

From errors.py

class GraphInterrupt(GraphRuntimeException):
    """Raised when graph is interrupted."""
    pass

class InvalidUpdateError(GraphRuntimeException):
    """Raised when channel update is invalid."""
    pass

class EmptyInputError(GraphRuntimeException):
    """Raised when graph input is empty."""
    pass

Key Insight: Synchronous Barrier

Unlike purely event-driven systems (like OpenClaw), LangGraph uses synchronous supersteps:

  1. All triggered nodes in a superstep run in parallel
  2. All writes are applied together after all nodes complete
  3. Next superstep starts only after current completes

This simplifies reasoning about state but limits flexibility compared to event-driven models.


Generated from source code analysis