6.1 KiB
6.1 KiB
LangGraph Execution Model
Version: 1.0.0
Last Updated: 2026-02-23
Overview
This document details how LangGraph executes graphs, based on direct source code analysis of pregel/_loop.py and related files.
Pregel Superstep Model
The Core Loop
LangGraph is inspired by Google's Pregel — a system for large-scale graph processing:
Pregel = "Think like a vertex"
- Each node computes independently
- Nodes communicate via messages
- Synchronous supersteps with barrier
- Fault tolerance via checkpointing
Execution Flow
High-Level
graph.invoke(input, config)
│
▼
[Compile graph if needed]
│
▼
[Load checkpoint if resuming]
│
▼
┌─────────────────────────────────┐
│ FOR each superstep: │
│ 1. prepare_next_tasks() │
│ 2. execute_tasks() │
│ 3. apply_writes() │
│ 4. checkpoint() (if enabled) │
└─────────────────────────────────┘
│
▼
[Return final state]
Detailed Superstep
Step 1: Prepare Next Tasks
From _algo.py:
def prepare_next_tasks(
checkpoint: Checkpoint,
nodes: dict[str, PregelNode],
channels: dict[str, BaseChannel],
pending_writes: list[tuple],
etc.
) -> list[PregelExecutableTask]:
"""Determine which nodes to run in this superstep."""
# For each node:
# 1. Check if triggered (input channels have values)
# 2. Check if should run (not already running)
# 3. Create executable task
Step 2: Execute Tasks
From _runner.py:
async def execute_tasks(tasks: list[PregelExecutableTask]):
"""Execute tasks in parallel."""
# Submit all tasks to executor
# Each task:
# 1. Read input from channels
# 2. Execute node function
# 3. Return writes (channel updates)
Step 3: Apply Writes
From _algo.py:
def apply_writes(
checkpoint: Checkpoint,
pending_writes: list[tuple],
channels: dict[str, BaseChannel]
):
"""Apply writes to channels using reducers."""
# For each write:
# 1. Identify target channel
# 2. Apply reducer to merge with existing value
Step 4: Checkpoint
From _checkpoint.py:
def create_checkpoint(
channels: dict[str, BaseChannel],
versions: dict[str, int],
metadata: CheckpointMetadata
) -> Checkpoint:
"""Snapshot all channel values."""
return {
"channel_values": {
k: v.checkpoint() for k, v in channels.items()
},
"channel_versions": versions,
"metadata": metadata,
}
PregelTask Structure
From types.py
class PregelExecutableTask(NamedTuple):
"""A single executable task in the graph."""
name: str # Node name
path: str # Task path
input: Any # Input to node
proc: Callable # Node function
writes: list[Send] # Dynamic sends
triggers: list[str] # Channels that trigger this
interrupt_after: bool # Interrupt after execution
interrupt_before: bool # Interrupt before execution
Send (Dynamic Edges)
What is Send?
Send enables dynamic node spawning — a node can spawn multiple tasks:
from langgraph.types import Send
def splitter(state):
messages = state["messages"]
return [
Send("process_email", {"email": email})
for email in messages
]
Send Implementation
class Send(NamedTuple):
"""Dynamic edge - spawn a task for another node."""
node: str # Target node name
arg: Any # Input to target node
Interrupt (Human-in-the-Loop)
How Interrupts Work
From types.py:
class Interrupt(NamedTuple):
"""Pause execution for human input."""
value: Any # Data to show human
when: str # "during" or "after"
Interrupt Flow
1. Node calls interrupt(data)
│
▼
2. PregelLoop pauses
│
▼
3. Returns to caller with interrupt value
│
▼
4. Caller (human) provides input
│
▼
5. Resume with Command(resume=data)
Resume with Command
from langgraph.types import Command
# Resume with new data
graph.invoke(
None, # No new input
config=RunnableConfig(
configurable={"checkpoint_id": "abc123"},
resume={"feedback": "looks good"}
)
)
Stream Mode
From types.py
StreamMode = Literal[
"values", # Full state after each step
"updates", # Node-specific updates
"checkpoints", # Checkpoint snapshots
"tasks", # Task start/complete
"debug", # Debug info
"messages", # Message streams
"custom", # Custom streams
]
Retry Policy
From types.py
class RetryPolicy:
"""Configuration for node retry behavior."""
max_attempts: int = 3
initial_interval: float = 1.0
backoff_factor: float = 2.0
max_interval: float = 100.0
Error Handling
From errors.py
class GraphInterrupt(GraphRuntimeException):
"""Raised when graph is interrupted."""
pass
class InvalidUpdateError(GraphRuntimeException):
"""Raised when channel update is invalid."""
pass
class EmptyInputError(GraphRuntimeException):
"""Raised when graph input is empty."""
pass
Key Insight: Synchronous Barrier
Unlike purely event-driven systems (like OpenClaw), LangGraph uses synchronous supersteps:
- All triggered nodes in a superstep run in parallel
- All writes are applied together after all nodes complete
- Next superstep starts only after current completes
This simplifies reasoning about state but limits flexibility compared to event-driven models.
Generated from source code analysis