Files
2026-02-23 13:11:51 -06:00

6.3 KiB

LangGraph State Management

Version: 1.0.0
Last Updated: 2026-02-23


Overview

This document details how LangGraph manages state, based on direct source code analysis.


State Definition

TypedDict Schema

From types.py:

from typing import TypedDict

class AgentState(TypedDict):
    messages: list
    next_action: str

LangGraph validates state schema at compile time.


State Flow

┌─────────────────────────────────────────────────────────────┐
│                    STATE FLOW IN LANGGRAPH                     │
└─────────────────────────────────────────────────────────────┘

  Input (dict)
       │
       ▼
┌──────────────────────────────────────────────────────────┐
│  Superstep N                                            │
│  ┌──────────────┐     ┌──────────────┐                  │
│  │  Node A     │────▶│  Channel    │                  │
│  │  (reads)    │◀────│  (update)   │                  │
│  └──────────────┘     └──────────────┘                  │
│       │                      │                             │
│       └──────────────────────┘                             │
│                    ▼                                        │
│           [Checkpoint if enabled]                           │
└──────────────────────────────────────────────────────────┘
       │
       ▼
┌──────────────────────────────────────────────────────────┐
│  Superstep N+1 (or return final state)                 │
└──────────────────────────────────────────────────────────┘

Reducers

How Reducers Work

Reducers define how multiple updates are merged:

# From graph/state.py
def add_messages(left: list, right: list) -> list:
    return left + right

Built-in Reducers

Reducer Function Behavior
add_messages list + list Append
last (a, b) => b Last wins
max max(a, b) Maximum
min min(a, b) Minimum

Custom Reducers

from typing import Annotated

def merge_contexts(a: dict, b: dict) -> dict:
    return {**a, **b}

class AgentState(TypedDict):
    context: Annotated[dict, merge_contexts]

Checkpointing

Durability Modes

From types.py:

Durability = Literal["sync", "async", "exit"]
Mode Behavior
sync Persist before next superstep
async Persist while next superstep runs
exit Persist only when graph exits

Checkpoint Metadata

config = {
    "configurable": {
        "thread_id": "user-123",
        "checkpoint_id": "1ef-abc123"
    }
}

Checkpoint Backends

Backend Module Use Case
InMemorySaver langgraph.checkpoint.memory Testing
SqliteSaver langgraph.checkpoint.sqlite Local dev
PostgresSaver langgraph.checkpoint.postgres Production

Thread Model

What is a Thread?

A thread (thread_id) isolates state:

Thread "user-123":
├── checkpoint-001 (step 0)
├── checkpoint-002 (step 1)  
├── checkpoint-003 (step 2)
└── [current state]

Thread Isolation

  • Independent checkpoints per thread
  • Parallel threads via multiple thread_id values
  • Resume from any checkpoint in a thread

Interrupts (Human-in-the-Loop)

Interrupt Mechanism

From types.py:

class Interrupt:
    value: Any
    when: Literal["during", "after"]

Using Interrupts

from langgraph.types import interrupt

def human_review(state):
    # Pause for human input
    feedback = interrupt({"task": "review", "data": state})
    return {"feedback": feedback}

Command (Modify State)

from langgraph.types import Command

def process_with_override(state):
    return Command(
        update={"status": "processed"},
        resume={"feedback": "approved"}
    )

State Updates

Node Returns Partial State

def node_a(state):
    # Return only what this node updates
    return {"messages": [AIMessage("hello")]}

Merge Process

Node A returns: {"messages": [msg1], "counter": 1}
Node B returns: {"messages": [msg2], "counter": 2}

After reducer (add_messages for messages, last for counter):
{"messages": [msg1, msg2], "counter": 2}

Checkpoint Implementation

From Source (pregel/_checkpoint.py)

def create_checkpoint(
    channels: dict[str, BaseChannel],
    versions: dict[str, int],
    metadata: CheckpointMetadata
) -> Checkpoint:
    """Create a checkpoint from current channel values."""
    return {
        "channel_values": {k: v.checkpoint() for k, v in channels.items()},
        "channel_versions": versions,
        "metadata": metadata,
    }

Resuming from Checkpoint

# Load channels from checkpoint
def channels_from_checkpoint(checkpoint: Checkpoint) -> dict:
    return {
        k: BaseChannel.from_checkpoint(v) 
        for k, v in checkpoint["channel_values"].items()
    }

Key Differences from OpenClaw

Aspect LangGraph OpenClaw
State Storage Channels in memory Multi-layer memory
Persistence Checkpoints Session-memory hook
Isolation thread_id Session key
Resumption checkpoint_id Session restore
Updates Reducers Direct merge

Generated from source code analysis