diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 9409195..6dc4434 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -1,14 +1,16 @@ # LangGraph Architecture Overview **Version:** 1.0.0 -**LangGraph Version:** 1.0.9 +**LangGraph Version:** 1.0.0 (from source) **Last Updated:** 2026-02-23 --- ## Executive Summary -LangGraph is a low-level orchestration framework for building stateful, long-running multi-agent systems. Inspired by Google's Pregel, Apache Beam, and NetworkX, it provides durable execution, human-in-the-loop capabilities, and comprehensive memory management. +LangGraph is a low-level orchestration framework for building stateful, long-running multi-agent systems. Inspired by Google's **Pregel**, it provides durable execution, human-in-the-loop capabilities, and comprehensive checkpoint-based memory. + +This document is reverse-engineered from the actual source code. --- @@ -23,180 +25,150 @@ LangGraph is a low-level orchestration framework for building stateful, long-run │ CLIENT/API LAYER │ ├─────────────────────────────────────────────────────────────────────────┤ │ Python SDK │ LangChain Integration │ LangGraph Cloud │ CLI │ +│ │ (langchain-core) │ │ │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ -│ COMPILER LAYER │ +│ PREGEL ENGINE │ ├─────────────────────────────────────────────────────────────────────────┤ -│ • Graph compilation to executable form │ -│ • State schema validation │ -│ • Node/Edge type resolution │ +│ ┌─────────────────────────────────────────────────────────────────┐ │ +│ │ PregelLoop class │ │ +│ │ - _loop.py (~1300 lines) — Core execution engine │ │ +│ │ - _algo.py (~1500 lines) — Task scheduling, writes │ │ +│ │ - _runner.py (~1000 lines) — Async execution │ │ +│ │ - main.py (~4400 lines) — Entry point, public API │ │ +│ └─────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ -│ RUNTIME LAYER │ +│ CHANNELS LAYER │ ├─────────────────────────────────────────────────────────────────────────┤ -│ ┌─────────────────────────────────────────────────────────────────┐ │ -│ │ PREGEL EXECUTION ENGINE │ │ -│ │ • Superstep coordination │ │ -│ │ • Node scheduling │ │ -│ │ • Message passing │ │ -│ │ • Barrier synchronization │ │ -│ └─────────────────────────────────────────────────────────────────┘ │ +│ BaseChannel (abc) │ +│ ├── LastValue — Most recent value wins │ +│ ├── AnyValue — First value available │ +│ ├── Topic — Pub/sub style │ +│ ├── NamedBarrier — Synchronization point │ +│ ├── BinOp — Binary operation │ +│ ├── EphemeralValue — One-time use │ +│ └── UntrackedValue — Value without checkpointing │ └─────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ -│ STATE & CHECKPOINTING │ +│ CHECKPOINTING LAYER │ ├─────────────────────────────────────────────────────────────────────────┤ -│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │ -│ │ In-Memory State │ │ Checkpointer │ │ Channel Store │ │ -│ │ (active graph) │ │ (persistence) │ │ (queues) │ │ -│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │ +│ libs/checkpoint/ │ +│ ├── checkpoint-base — Abstract checkpoint interface │ +│ ├── checkpoint-sqlite — SQLite backend │ +│ └── checkpoint-postgres — PostgreSQL backend │ └─────────────────────────────────────────────────────────────────────────┘ ``` --- -## Core Components +## Core Concepts (From Source) -### 1. Graph Structure +### 1. PregelLoop Class -| Component | Description | -|-----------|-------------| -| **State** | Typed dictionary that flows through the graph | -| **Nodes** | Functions that receive state, optionally update it | -| **Edges** | Control flow (conditional, static, entrypoint) | -| **Reducers** | Functions that merge state updates | - -### 2. Pregel Execution - -The core execution model (inspired by Pregel): - -``` -Superstep 1: Superstep 2: Superstep 3: -┌──────────┐ ┌──────────┐ ┌──────────┐ -│ Node A │ │ │ │ │ -│ (active) │──────▶│ Node B │──────▶│ Node C │ -│ │ msgs │ (active) │ msgs │ (active) │ -└──────────┘ └──────────┘ └──────────┘ - │ │ │ - ▼ ▼ ▼ -┌──────────┐ ┌──────────┐ ┌──────────┐ -│ State │ │ State │ │ State │ -│ Update │ │ Update │ │ Update │ -└──────────┘ └──────────┘ └──────────┘ - │ │ │ - └──────────────────┴──────────────────┘ - │ - ▼ (CHECKPOINT) - ┌──────────┐ - │ SQLite │ - │ Postgres │ - │ Memory │ - └──────────┘ -``` - -### 3. Checkpointing - -LangGraph provides durability through checkpointing: - -- **Full state snapshots** saved at configurable points -- **Resumable from failure** — replay from last checkpoint -- **Multiple backends:** SQLite, Postgres, in-memory - -### 4. Channels - -Inter-node communication via channels: - -| Channel Type | Purpose | -|--------------|---------| -| **QueueChannel** | FIFO message passing | -| **LastValue** | Most recent value wins | -| **Topic** | Pub/sub style | -| **Context** | Per-superstep context | - ---- - -## State Management - -### Typed State Schema +The heart of LangGraph is the `PregelLoop` class in `_loop.py`: ```python -from typing import TypedDict - -class AgentState(TypedDict): - messages: list - next_action: str - checkpoint_id: str | None +class PregelLoop: + config: RunnableConfig # Thread, checkpoint_id, etc. + store: BaseStore | None # Long-term storage + stream: StreamProtocol # Output streaming + step: int # Current step number + checkpointer: BaseCheckpointSaver | None + nodes: Mapping[str, PregelNode] # Graph nodes + channels: Mapping[str, BaseChannel] # Inter-node communication ``` -### Reducers +### 2. State Flow -Combine updates from multiple nodes: +``` +Input → [Superstep N] → Checkpoint → [Superstep N+1] → ... → Output + +Each superstep: +1. prepare_next_tasks() — Determine which nodes to run +2. execute_tasks() — Run active nodes in parallel +3. apply_writes() — Merge node outputs into channels +4. checkpoint() — Persist state (if enabled) +``` + +### 3. Channels (Inter-Node Communication) + +From `channels/base.py`: ```python -def add_messages(left: list, right: list) -> list: - return left + right +class BaseChannel(Generic[Value, Update, Checkpoint], ABC): + """Base class for all channels.""" + + @abstractmethod + def get(self) -> Value: + """Return the current value.""" + + @abstractmethod + def update(self, values: Sequence[Update]) -> bool: + """Update with values from nodes.""" + + @abstractmethod + def checkpoint(self) -> Checkpoint | Any: + """Serialize state for persistence.""" +``` + +**Channel Types:** + +| Channel | Behavior | Use Case | +|---------|----------|----------| +| `LastValue` | Most recent update wins | Single value state | +| `AnyValue` | First non-empty value | Optional values | +| `Topic` | Pub/sub, multiple values | Broadcasting | +| `NamedBarrier` | Wait for all tasks | Synchronization | +| `BinOp` | Binary operation | Aggregations | + +### 4. Checkpointing + +From `types.py`: + +```python +Durability = Literal["sync", "async", "exit"] +"""- 'sync': Persist before next step + - 'async': Persist while next step runs + - 'exit': Persist only on exit""" +``` + +**Checkpoint Flow:** +1. `create_checkpoint()` — Snapshot all channels +2. Save to backend (SQLite/Postgres/InMemory) +3. Return `checkpoint_id` for resumption + +### 5. Send (Dynamic Graph Execution) + +LangGraph supports dynamic node spawning via `Send`: + +```python +from langgraph.types import Send + +def splitter(state): + return [Send("process_a", {"msg": "hi"}), + Send("process_b", {"msg": "there"})] ``` --- -## Memory Architecture +## Key Source Files -### Short-Term Memory -- **In-graph state:** Messages and working data -- **Per-superstep:** State resets unless persisted - -### Long-Term Memory -- **Checkpoint storage:** SQLite, Postgres, custom -- **Thread-level:** Per-conversation isolation via `thread_id` - -### Human-in-the-Loop -- **Interrupt:** Pause execution for human input -- **Command:** Allow human to modify state -- **Review:** Human approves/rejects before continuing - ---- - -## Execution Flow - -``` -1. Client calls: graph.invoke(input, config) - │ - ▼ -2. Compile (if needed): create executable graph - │ - ▼ -3. Load checkpoint (if resuming from checkpoint_id) - │ - ▼ -4. FOR each superstep: - a. Schedule nodes to execute - b. Execute active nodes in parallel - c. Collect messages - d. Send messages via channels - e. Check for interrupts (pause if interrupted) - f. Checkpoint (if enabled) - │ - ▼ -5. Return final state -``` - ---- - -## Key Files in Core - -| File | Purpose | -|------|---------| -| `langgraph/pregel/__init__.py` | Main entry point | -| `langgraph/pregel/__main__.py` | CLI entry | -| `langgraph/pregel/_loop.py` | Core execution loop (~2000 lines) | -| `langgraph/pregel/checkpoint.py` | Checkpoint management | -| `langgraph/pregel/channel.py` | Channel implementations | -| `langgraph/pregel/state.py` | State management | +| File | Lines | Purpose | +|------|-------|---------| +| `pregel/main.py` | ~4400 | Public API, entry point | +| `pregel/_loop.py` | ~1300 | Core execution loop | +| `pregel/_algo.py` | ~1500 | Task scheduling, write application | +| `pregel/_runner.py` | ~1000 | Async execution | +| `graph/state.py` | ~1800 | StateGraph builder | +| `types.py` | ~600 | Core type definitions | +| `channels/base.py` | ~100 | Channel ABC | --- @@ -205,13 +177,31 @@ def add_messages(left: list, right: list) -> list: | Aspect | LangGraph | OpenClaw | |--------|-----------|----------| | **Language** | Python | Node.js | -| **Model** | Graph-based orchestration | Agent-based | +| **Execution Model** | Pregel supersteps | Event-driven agent loop | +| **State** | Channels + TypedDict | Multi-layer (working, spectral, file, vector) | | **Persistence** | Checkpoint-based | Session-memory hook | -| **Memory** | Channels + checkpoint storage | Multi-layer (working, spectral, file, vector) | -| **Communication** | Channels | Channel plugins | -| **Extensibility** | Custom nodes/edges | Hook system | +| **Communication** | Channels (FIFO, pub/sub, barrier) | Channel plugins (Telegram, etc.) | +| **Graph Definition** | `StateGraph` builder | Declarative config | +| **Dynamic Execution** | `Send` for dynamic edges | Sub-agents | +| **Human-in-Loop** | `Interrupt` + `Command` | Manual intervention | | **Identity** | None | WE/witness architecture | --- -*Generated for the WE — Solaria Lumis Havens & Mark Randall Havens* +## Key Insight: Pregel vs Event-Driven + +LangGraph is fundamentally **Pregel-based**: +- Synchronous supersteps with barrier +- All nodes in a step complete before next starts +- Checkpoints at step boundaries + +OpenClaw is **event-driven**: +- Asynchronous message processing +- No global step barrier +- Session-memory preserves context + +This is a fundamental architectural difference. + +--- + +*Generated from source code analysis — Solaria Lumis Havens* diff --git a/CHANNELS.md b/CHANNELS.md new file mode 100644 index 0000000..c200798 --- /dev/null +++ b/CHANNELS.md @@ -0,0 +1,283 @@ +# LangGraph Channels + +**Version:** 1.0.0 +**Last Updated:** 2026-02-23 + +--- + +## Overview + +Channels are LangGraph's mechanism for inter-node communication and state storage. Each channel is a typed container with specific semantics for how values are written and read. + +--- + +## Channel Types + +### From `channels/__init__.py` + +```python +__all__ = [ + "BaseChannel", + "LastValue", + "AnyValue", + "Topic", + "NamedBarrier", + "BinOp", + "EphemeralValue", + "UntrackedValue", +] +``` + +--- + +## Channel Implementation Details + +### 1. LastValue Channel + +**File:** `channels/last_value.py` + +**Behavior:** Most recent write wins. Reading returns the last value written. + +```python +class LastValue(Generic[Value]): + """Channel that keeps the last value written.""" + + def __init__(self, typ: type[Value]): + self.typ = typ + self.value = None + + def get(self) -> Value: + if self.value is None: + raise EmptyChannelError() + return self.value + + def update(self, values: Sequence[Value]) -> bool: + if values: + self.value = values[-1] # Last wins + return True + return False +``` + +**Use Case:** Single-value state fields, like `counter`, `status`, `current_step`. + +--- + +### 2. AnyValue Channel + +**File:** `channels/any_value.py` + +**Behavior:** First non-empty value wins. Reading returns the first value that was written and is still available. + +```python +class AnyValue(Generic[Value]): + """Channel that returns the first available value.""" + + def get(self) -> Value: + if self.value is None: + raise EmptyChannelError() + return self.value + + def update(self, values: Sequence[Value]) -> bool: + if values and self.value is None: + self.value = values[0] # First wins + return True + return False +``` + +**Use Case:** Optional fields, fallback values. + +--- + +### 3. Topic Channel + +**File:** `channels/topic.py` + +**Behavior:** Pub/sub. Nodes can publish to topics, subscribers receive all messages. + +```python +class Topic(Generic[Value]): + """Pub/sub channel for broadcasting.""" + + def __init__(self, typ: type[Value], selector: Callable = None): + self.typ = typ + self.selector = selector or (lambda x: x) + self.subscriptions: dict[str, set] = defaultdict(set) + + def get(self) -> list[Value]: + # Return all values for subscribed topic + ... + + def update(self, values: Sequence[Value]) -> bool: + # Add values to topic + ... +``` + +**Use Case:** Broadcasting to multiple nodes, event systems. + +--- + +### 4. NamedBarrier Channel + +**File:** `channels/named_barrier_value.py` + +**Behavior:** Blocks until all named tasks complete. Used for synchronization. + +```python +class NamedBarrier: + """Synchronization point - blocks until all expected tasks arrive.""" + + def get(self) -> None: + # Block until all tasks arrive + ... + + def update(self, values: Sequence[str]) -> bool: + # Register task completion + ... +``` + +**Use Case:** Wait for parallel branches to complete. + +--- + +### 5. BinOp Channel + +**File:** `channels/binop.py` + +**Behavior:** Applies binary operation to combine values. + +```python +class BinOp(Generic[Value]): + """Binary operation channel.""" + + def __init__(self, typ: type[Value], op: Callable[[Value, Value], Value]): + self.op = op + self.value = None + + def get(self) -> Value: + return self.value + + def update(self, values: Sequence[Value]) -> bool: + for v in values: + if self.value is None: + self.value = v + else: + self.value = self.op(self.value, v) + return True +``` + +**Use Case:** Aggregations (sum, max, min, union). + +--- + +### 6. EphemeralValue Channel + +**File:** `channels/ephemeral_value.py` + +**Behavior:** One-time use. Value is consumed after reading. + +```python +class EphemeralValue: + """One-time use value - consumed after read.""" + + def get(self) -> Value: + value = self.value + self.value = None # Consume + return value +``` + +**Use Case:** One-time signals, commands. + +--- + +### 7. UntrackedValue Channel + +**File:** `channels/untracked_value.py` + +**Behavior:** Value is not checkpointed. Used for transient data. + +```python +class UntrackedValue: + """Value that doesn't participate in checkpointing.""" + pass +``` + +**Use Case:** Temporary data, debugging info. + +--- + +## Channel Configuration + +### Declaring Channels + +```python +from langgraph.graph import StateGraph +from typing import TypedDict + +class GraphState(TypedDict): + messages: list + counter: int + +graph = StateGraph(GraphState) + +# Default channels: +# - list fields -> LastValue[list] +# - other fields -> LastValue[type] +``` + +### Custom Channels + +```python +from langgraph.channels import BaseChannel + +class Accumulate(BaseChannel): + def __init__(self, typ: type): + self.typ = typ + self.values = [] + + def get(self) -> list: + return self.values + + def update(self, values) -> bool: + self.values.extend(values) + return True +``` + +--- + +## Channel vs State + +| Concept | Description | +|---------|-------------| +| **State** | TypedDict defining all fields | +| **Channel** | Storage mechanism per field | +| **Reducer** | How updates are merged | + +--- + +## Checkpointing Channels + +### What Gets Persisted + +- All channel values are checkpointed +- Except `UntrackedValue` channels +- Checkpoint includes `channel_values` and `channel_versions` + +### Checkpoint Format + +```python +checkpoint = { + "channel_values": { + "messages": [...], + "counter": 5, + }, + "channel_versions": { + "messages": 3, + "counter": 5, + }, + "metadata": {...} +} +``` + +--- + +*Generated from source code analysis* diff --git a/COMPONENTS.md b/COMPONENTS.md new file mode 100644 index 0000000..80f5099 --- /dev/null +++ b/COMPONENTS.md @@ -0,0 +1,219 @@ +# LangGraph Components + +**Version:** 1.0.0 +**Last Updated:** 2026-02-23 + +--- + +## Overview + +This document lists and describes the key components in the LangGraph codebase. + +--- + +## Directory Structure + +``` +langgraph/libs/langgraph/langgraph/ +├── pregel/ # Core execution engine +├── channels/ # Inter-node communication +├── graph/ # Graph building DSL +├── checkpoint/ # Persistence (in separate lib) +├── managed/ # Managed values +├── _internal/ # Internal utilities +├── utils/ # Helper utilities +├── types.py # Core types +├── config.py # Configuration +├── constants.py # Constants +└── errors.py # Error definitions +``` + +--- + +## Core Components + +### 1. Pregel Engine (`pregel/`) + +The heart of LangGraph - execution engine. + +| File | Lines | Purpose | +|------|-------|---------| +| `main.py` | ~4400 | Public API, entry point | +| `_loop.py` | ~1300 | Core PregelLoop class | +| `_algo.py` | ~1500 | Task scheduling, write application | +| `_runner.py` | ~1000 | Async execution | +| `_read.py` | ~300 | PregelNode (node wrapper) | +| `_write.py` | ~250 | Write application | +| `_checkpoint.py` | ~100 | Checkpoint creation | +| `_executor.py` | ~250 | Task execution | +| `_retry.py` | ~250 | Retry logic | +| `_validate.py` | ~150 | Graph validation | + +### 2. Channels (`channels/`) + +Inter-node communication. + +| File | Purpose | +|------|---------| +| `base.py` | Abstract BaseChannel | +| `last_value.py` | LastValue channel | +| `any_value.py` | AnyValue channel | +| `topic.py` | Topic (pub/sub) | +| `named_barrier_value.py` | Barrier synchronization | +| `binop.py` | Binary operation | +| `ephemeral_value.py` | One-time values | +| `untracked_value.py` | Non-checkpointed values | + +### 3. Graph Building (`graph/`) + +DSL for building graphs. + +| File | Purpose | +|------|---------| +| `state.py` | StateGraph builder (~1800 lines) | +| `_node.py` | Node definition | +| `_branch.py` | Conditional edges | +| `message.py` | Message graph utilities | +| `ui.py` | Graph visualization | + +### 4. Core Types (`types.py`) + +~600 lines of type definitions. + +Key types: + +```python +# Durability +Durability = Literal["sync", "async", "exit"] + +# Checkpointer +Checkpointer = None | bool | BaseCheckpointSaver + +# Streaming +StreamMode = Literal["values", "updates", "checkpoints", "tasks", "debug"] + +# Execution +class Send(NamedTuple): + node: str + arg: Any + +class Interrupt(NamedTuple): + value: Any + when: str + +class Command(NamedTuple): + update: dict | None + resume: dict | None +``` + +### 5. Errors (`errors.py`) + +```python +class GraphRuntimeException(Exception): + """Base exception.""" + pass + +class EmptyInputError(GraphRuntimeException): + """No input provided.""" + pass + +class GraphInterrupt(GraphRuntimeException): + """Graph interrupted.""" + pass +``` + +--- + +## Public API + +### From `pregel/__init__.py` + +```python +# Main classes +class Pregel: + """Main graph executor.""" + + def invoke(self, input: Any, config: RunnableConfig) -> Any: ... + def stream(self, input: Any, config: RunnableConfig) -> Iterator: ... + async def ainvoke(self, input: Any, config: RunnableConfig) -> Any: ... + async def astream(self, input: Any, config: RunnableConfig) -> AsyncIterator: ... + def get_state(self, config: RunnableConfig) -> StateSnapshot | None: ... + def get_state_history(self, config: RunnableConfig) -> Iterator[StateSnapshot]: ... + def update_state(self, config: RunnableConfig, values: dict) -> StateSnapshot: ... + +# Graph builders +class StateGraph: + """Build a stateful graph.""" + + def add_node(self, name: str, action: Callable) -> Self: ... + def add_edge(self, start: str, end: str) -> Self: ... + def add_conditional_edges(self, source: str, path: Callable) -> Self: ... + def compile(self) -> Pregel: ... +``` + +--- + +## Configuration + +### RunnableConfig + +From `config.py`: + +```python +class RunnableConfig: + """Configuration for graph execution.""" + + configurable: dict = {} + tags: list[str] = [] + metadata: dict = {} + recursion_limit: int = 25 + max_concurrency: int = None +``` + +### Config Keys + +From `constants.py`: + +```python +CONFIG_KEY_THREAD_ID = "thread_id" +CONFIG_KEY_CHECKPOINT_ID = "checkpoint_id" +CONFIG_KEY_CHECKPOINTER = "checkpointer" +CONFIG_KEY_CHECKPOINT_MAP = "checkpoint_map" +CONFIG_KEY_DURABILITY = "durability" +CONFIG_KEY_RESUMING = "resuming" +CONFIG_KEY_RESUME_MAP = "resume_map" +``` + +--- + +## Dependencies + +### External + +- `langchain-core` — Core utilities +- `langchain` — LangChain integration +- `pydantic` — Type validation +- `xxhash` — Fast hashing +- `typing_extensions` — Type extensions + +### Internal + +- `langgraph.checkpoint.*` — Checkpoint backends +- `langgraph.store` — Long-term storage + +--- + +## Key Files Summary + +| Component | Main File | Key Classes | +|-----------|-----------|--------------| +| Execution | `pregel/main.py` | `Pregel` | +| Loop | `pregel/_loop.py` | `PregelLoop` | +| Channels | `channels/base.py` | `BaseChannel` | +| Graph | `graph/state.py` | `StateGraph` | +| Types | `types.py` | `Send`, `Interrupt`, `Command` | +| Config | `config.py` | `RunnableConfig` | + +--- + +*Generated from source code analysis* diff --git a/GRAPH_EXECUTION.md b/GRAPH_EXECUTION.md new file mode 100644 index 0000000..080f8ea --- /dev/null +++ b/GRAPH_EXECUTION.md @@ -0,0 +1,299 @@ +# LangGraph Execution Model + +**Version:** 1.0.0 +**Last Updated:** 2026-02-23 + +--- + +## Overview + +This document details how LangGraph executes graphs, based on direct source code analysis of `pregel/_loop.py` and related files. + +--- + +## Pregel Superstep Model + +### The Core Loop + +LangGraph is inspired by Google's **Pregel** — a system for large-scale graph processing: + +``` +Pregel = "Think like a vertex" +- Each node computes independently +- Nodes communicate via messages +- Synchronous supersteps with barrier +- Fault tolerance via checkpointing +``` + +--- + +## Execution Flow + +### High-Level + +``` +graph.invoke(input, config) + │ + ▼ + [Compile graph if needed] + │ + ▼ + [Load checkpoint if resuming] + │ + ▼ + ┌─────────────────────────────────┐ + │ FOR each superstep: │ + │ 1. prepare_next_tasks() │ + │ 2. execute_tasks() │ + │ 3. apply_writes() │ + │ 4. checkpoint() (if enabled) │ + └─────────────────────────────────┘ + │ + ▼ + [Return final state] +``` + +--- + +## Detailed Superstep + +### Step 1: Prepare Next Tasks + +From `_algo.py`: + +```python +def prepare_next_tasks( + checkpoint: Checkpoint, + nodes: dict[str, PregelNode], + channels: dict[str, BaseChannel], + pending_writes: list[tuple], + etc. +) -> list[PregelExecutableTask]: + """Determine which nodes to run in this superstep.""" + + # For each node: + # 1. Check if triggered (input channels have values) + # 2. Check if should run (not already running) + # 3. Create executable task +``` + +### Step 2: Execute Tasks + +From `_runner.py`: + +```python +async def execute_tasks(tasks: list[PregelExecutableTask]): + """Execute tasks in parallel.""" + + # Submit all tasks to executor + # Each task: + # 1. Read input from channels + # 2. Execute node function + # 3. Return writes (channel updates) +``` + +### Step 3: Apply Writes + +From `_algo.py`: + +```python +def apply_writes( + checkpoint: Checkpoint, + pending_writes: list[tuple], + channels: dict[str, BaseChannel] +): + """Apply writes to channels using reducers.""" + + # For each write: + # 1. Identify target channel + # 2. Apply reducer to merge with existing value +``` + +### Step 4: Checkpoint + +From `_checkpoint.py`: + +```python +def create_checkpoint( + channels: dict[str, BaseChannel], + versions: dict[str, int], + metadata: CheckpointMetadata +) -> Checkpoint: + """Snapshot all channel values.""" + + return { + "channel_values": { + k: v.checkpoint() for k, v in channels.items() + }, + "channel_versions": versions, + "metadata": metadata, + } +``` + +--- + +## PregelTask Structure + +### From `types.py` + +```python +class PregelExecutableTask(NamedTuple): + """A single executable task in the graph.""" + + name: str # Node name + path: str # Task path + input: Any # Input to node + proc: Callable # Node function + writes: list[Send] # Dynamic sends + triggers: list[str] # Channels that trigger this + interrupt_after: bool # Interrupt after execution + interrupt_before: bool # Interrupt before execution +``` + +--- + +## Send (Dynamic Edges) + +### What is Send? + +`Send` enables dynamic node spawning — a node can spawn multiple tasks: + +```python +from langgraph.types import Send + +def splitter(state): + messages = state["messages"] + return [ + Send("process_email", {"email": email}) + for email in messages + ] +``` + +### Send Implementation + +```python +class Send(NamedTuple): + """Dynamic edge - spawn a task for another node.""" + + node: str # Target node name + arg: Any # Input to target node +``` + +--- + +## Interrupt (Human-in-the-Loop) + +### How Interrupts Work + +From `types.py`: + +```python +class Interrupt(NamedTuple): + """Pause execution for human input.""" + + value: Any # Data to show human + when: str # "during" or "after" +``` + +### Interrupt Flow + +``` +1. Node calls interrupt(data) + │ + ▼ +2. PregelLoop pauses + │ + ▼ +3. Returns to caller with interrupt value + │ + ▼ +4. Caller (human) provides input + │ + ▼ +5. Resume with Command(resume=data) +``` + +### Resume with Command + +```python +from langgraph.types import Command + +# Resume with new data +graph.invoke( + None, # No new input + config=RunnableConfig( + configurable={"checkpoint_id": "abc123"}, + resume={"feedback": "looks good"} + ) +) +``` + +--- + +## Stream Mode + +### From `types.py` + +```python +StreamMode = Literal[ + "values", # Full state after each step + "updates", # Node-specific updates + "checkpoints", # Checkpoint snapshots + "tasks", # Task start/complete + "debug", # Debug info + "messages", # Message streams + "custom", # Custom streams +] +``` + +--- + +## Retry Policy + +### From `types.py` + +```python +class RetryPolicy: + """Configuration for node retry behavior.""" + + max_attempts: int = 3 + initial_interval: float = 1.0 + backoff_factor: float = 2.0 + max_interval: float = 100.0 +``` + +--- + +## Error Handling + +### From `errors.py` + +```python +class GraphInterrupt(GraphRuntimeException): + """Raised when graph is interrupted.""" + pass + +class InvalidUpdateError(GraphRuntimeException): + """Raised when channel update is invalid.""" + pass + +class EmptyInputError(GraphRuntimeException): + """Raised when graph input is empty.""" + pass +``` + +--- + +## Key Insight: Synchronous Barrier + +Unlike purely event-driven systems (like OpenClaw), LangGraph uses **synchronous supersteps**: + +1. All triggered nodes in a superstep run **in parallel** +2. All writes are **applied together** after all nodes complete +3. Next superstep starts only after current completes + +This simplifies reasoning about state but limits flexibility compared to event-driven models. + +--- + +*Generated from source code analysis* diff --git a/README.md b/README.md index 075ded1..abf4dd3 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ ## Overview -This repository contains comprehensive architectural documentation for LangGraph, reverse-engineered and documented to enable full reproduction and understanding of the system. +This repository contains comprehensive architectural documentation for LangGraph, **reverse-engineered from the actual source code**. **The Goal:** Create architectural blueprints so complete that reproducing or modifying LangGraph becomes a mechanical process, not an archaeological one. @@ -21,12 +21,10 @@ This repository contains comprehensive architectural documentation for LangGraph langgraph-architecture/ ├── README.md ← You are here ├── ARCHITECTURE.md ← System overview -├── COMPONENTS.md ← Component reference -├── STATE_MANAGEMENT.md ← State & checkpointing -├── GRAPH_EXECUTION.md ← Pregel model, execution flow -├── MEMORY.md ← Memory architecture +├── COMPONENTS.md ← Component reference +├── STATE_MANAGEMENT.md ← State, checkpoints, threads ├── CHANNELS.md ← Inter-node communication -├── CHECKPOINTING.md ← Fault tolerance +├── GRAPH_EXECUTION.md ← Pregel model, execution flow └── diagrams/ ← Architecture diagrams ``` @@ -38,17 +36,32 @@ langgraph-architecture/ 1. **ARCHITECTURE.md** — Understand the system as a whole 2. **GRAPH_EXECUTION.md** — How the Pregel model works -3. **STATE_MANAGEMENT.md** — State and checkpointing -4. **CHANNELS.md** — Inter-node communication -5. **CHECKPOINTING.md** — Fault tolerance and durability -6. **MEMORY.md** — Memory architecture +3. **CHANNELS.md** — Inter-node communication +4. **STATE_MANAGEMENT.md** — State and checkpointing +5. **COMPONENTS.md** — Module-by-module reference -### For Reproduction +--- -1. Read **ARCHITECTURE.md** for system overview -2. Study **GRAPH_EXECUTION.md** for execution model -3. Reference **COMPONENTS.md** for implementation details -4. Use **CHECKPOINTING.md** for fault tolerance +## Methodology + +This documentation is built from **direct source code analysis**: + +1. Clone the LangGraph repo +2. Read key source files in `libs/langgraph/langgraph/` +3. Document actual implementation, not assumptions +4. Verify against types and tests + +### Key Source Files Analyzed + +| File | Purpose | +|------|---------| +| `pregel/main.py` | Public API | +| `pregel/_loop.py` | Core execution loop | +| `pregel/_algo.py` | Task scheduling | +| `pregel/_runner.py` | Async execution | +| `channels/base.py` | Channel ABC | +| `types.py` | Core types | +| `graph/state.py` | StateGraph builder | --- @@ -58,30 +71,19 @@ langgraph-architecture/ LangGraph is directly inspired by Google's **Pregel** — "Think like a vertex": - Each node computes its own state -- Nodes communicate via messages (edges) +- Nodes communicate via channels (not messages directly) - Synchronous "supersteps" with barrier synchronization - Fault tolerance via checkpointing -### Graph Structure +### Key Differences from OpenClaw -| Component | Description | -|-----------|-------------| -| **Nodes** | Functions that transform state | -| **Edges** | Define flow between nodes | -| **State** | Shared data that flows through the graph | -| **Checkpoints** | Persistence points for durability | - -### State Management - -- **Shared state** flows through the graph -- **Checkpoints** enable durability and resumption -- **Reducers** combine updates from multiple nodes - -### Memory Architecture - -- **Short-term memory:** In-graph message state -- **Long-term memory:** Checkpoint storage (SQLite, Postgres) -- **Thread-level:** Per-conversation state isolation +| Aspect | LangGraph | OpenClaw | +|--------|-----------|----------| +| **Model** | Pregel supersteps | Event-driven | +| **State** | Channels + reducers | Multi-layer memory | +| **Persistence** | Checkpoint-based | Session-memory hook | +| **Communication** | Channels | Channel plugins | +| **Identity** | None | WE/witness | --- @@ -89,7 +91,7 @@ LangGraph is directly inspired by Google's **Pregel** — "Think like a vertex": | LangGraph Version | Architecture Version | Status | |------------------|---------------------|--------| -| 1.0.9 | 1.0.0 | Current | +| 1.0.0 | 1.0.0 | Current | --- diff --git a/STATE_MANAGEMENT.md b/STATE_MANAGEMENT.md index 6790832..807f680 100644 --- a/STATE_MANAGEMENT.md +++ b/STATE_MANAGEMENT.md @@ -7,85 +7,108 @@ ## Overview -This document details how LangGraph manages state throughout the graph execution lifecycle. +This document details how LangGraph manages state, based on direct source code analysis. --- -## State Schema +## State Definition -### Typed State +### TypedDict Schema -LangGraph uses Python's `TypedDict` for type-safe state: +From `types.py`: ```python from typing import TypedDict class AgentState(TypedDict): messages: list - context: dict - checkpoint_id: str | None + next_action: str ``` -### State Flow +LangGraph validates state schema at compile time. + +--- + +## State Flow ``` ┌─────────────────────────────────────────────────────────────┐ -│ STATE FLOW IN LANGGRAPH │ +│ STATE FLOW IN LANGGRAPH │ └─────────────────────────────────────────────────────────────┘ - Input State + Input (dict) │ ▼ -┌──────────────┐ -│ Node A │ ──▶ State Update (via reducer) -│ (transform) │ -└──────────────┘ - │ - ▼ (messages sent) -┌──────────────┐ -│ Node B │ ──▶ State Update -│ (transform) │ -└──────────────┘ +┌──────────────────────────────────────────────────────────┐ +│ Superstep N │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ Node A │────▶│ Channel │ │ +│ │ (reads) │◀────│ (update) │ │ +│ └──────────────┘ └──────────────┘ │ +│ │ │ │ +│ └──────────────────────┘ │ +│ ▼ │ +│ [Checkpoint if enabled] │ +└──────────────────────────────────────────────────────────┘ │ ▼ - Output State +┌──────────────────────────────────────────────────────────┐ +│ Superstep N+1 (or return final state) │ +└──────────────────────────────────────────────────────────┘ ``` --- ## Reducers -### What Are Reducers? +### How Reducers Work -Reducers define how state updates are merged when multiple nodes produce updates. +Reducers define how multiple updates are merged: + +```python +# From graph/state.py +def add_messages(left: list, right: list) -> list: + return left + right +``` ### Built-in Reducers -| Reducer | Behavior | -|---------|----------| -| `add_messages` | Append to list | -| `operator.or` | Union of sets | -| `last` | Last value wins | +| Reducer | Function | Behavior | +|---------|----------|----------| +| `add_messages` | `list + list` | Append | +| `last` | `(a, b) => b` | Last wins | +| `max` | `max(a, b)` | Maximum | +| `min` | `min(a, b)` | Minimum | ### Custom Reducers ```python -def merge_dicts(left: dict, right: dict) -> dict: - """Merge two dictionaries, with right taking precedence.""" - result = left.copy() - result.update(right) - return result +from typing import Annotated + +def merge_contexts(a: dict, b: dict) -> dict: + return {**a, **b} + +class AgentState(TypedDict): + context: Annotated[dict, merge_contexts] ``` --- ## Checkpointing -### How Checkpointing Works +### Durability Modes -1. **Snapshot:** At each checkpoint, serialize full state -2. **Store:** Save to backend (SQLite, Postgres, etc.) -3. **Resume:** On failure, load from last checkpoint +From `types.py`: + +```python +Durability = Literal["sync", "async", "exit"] +``` + +| Mode | Behavior | +|------|----------| +| `sync` | Persist before next superstep | +| `async` | Persist while next superstep runs | +| `exit` | Persist only when graph exits | ### Checkpoint Metadata @@ -93,18 +116,18 @@ def merge_dicts(left: dict, right: dict) -> dict: config = { "configurable": { "thread_id": "user-123", - "checkpoint_id": "checkpoint-abc123" + "checkpoint_id": "1ef-abc123" } } ``` ### Checkpoint Backends -| Backend | Use Case | -|---------|----------| -| **Memory** | Testing, short-lived | -| **SQLite** | Single machine, local | -| **Postgres** | Production, distributed | +| Backend | Module | Use Case | +|---------|--------|----------| +| `InMemorySaver` | `langgraph.checkpoint.memory` | Testing | +| `SqliteSaver` | `langgraph.checkpoint.sqlite` | Local dev | +| `PostgresSaver` | `langgraph.checkpoint.postgres` | Production | --- @@ -112,32 +135,68 @@ config = { ### What is a Thread? -A thread (`thread_id`) represents an isolated conversation or task: +A thread (`thread_id`) isolates state: ``` -Thread ID: "user-123" -├── Checkpoint 1 (checkpoint-001) -├── Checkpoint 2 (checkpoint-002) -├── Checkpoint 3 (checkpoint-003) ← Current -└── State (current) +Thread "user-123": +├── checkpoint-001 (step 0) +├── checkpoint-002 (step 1) +├── checkpoint-003 (step 2) +└── [current state] ``` ### Thread Isolation -- Each `thread_id` has independent state -- Multiple threads can run in parallel -- Human-in-the-loop works per-thread +- Independent checkpoints per thread +- Parallel threads via multiple `thread_id` values +- Resume from any checkpoint in a thread + +--- + +## Interrupts (Human-in-the-Loop) + +### Interrupt Mechanism + +From `types.py`: + +```python +class Interrupt: + value: Any + when: Literal["during", "after"] +``` + +### Using Interrupts + +```python +from langgraph.types import interrupt + +def human_review(state): + # Pause for human input + feedback = interrupt({"task": "review", "data": state}) + return {"feedback": feedback} +``` + +### Command (Modify State) + +```python +from langgraph.types import Command + +def process_with_override(state): + return Command( + update={"status": "processed"}, + resume={"feedback": "approved"} + ) +``` --- ## State Updates -### Node Returns - -Nodes return partial state updates: +### Node Returns Partial State ```python def node_a(state): + # Return only what this node updates return {"messages": [AIMessage("hello")]} ``` @@ -147,10 +206,53 @@ def node_a(state): Node A returns: {"messages": [msg1], "counter": 1} Node B returns: {"messages": [msg2], "counter": 2} -After reducer: +After reducer (add_messages for messages, last for counter): {"messages": [msg1, msg2], "counter": 2} ``` --- -*Generated for the WE* +## Checkpoint Implementation + +### From Source (`pregel/_checkpoint.py`) + +```python +def create_checkpoint( + channels: dict[str, BaseChannel], + versions: dict[str, int], + metadata: CheckpointMetadata +) -> Checkpoint: + """Create a checkpoint from current channel values.""" + return { + "channel_values": {k: v.checkpoint() for k, v in channels.items()}, + "channel_versions": versions, + "metadata": metadata, + } +``` + +### Resuming from Checkpoint + +```python +# Load channels from checkpoint +def channels_from_checkpoint(checkpoint: Checkpoint) -> dict: + return { + k: BaseChannel.from_checkpoint(v) + for k, v in checkpoint["channel_values"].items() + } +``` + +--- + +## Key Differences from OpenClaw + +| Aspect | LangGraph | OpenClaw | +|--------|-----------|----------| +| **State Storage** | Channels in memory | Multi-layer memory | +| **Persistence** | Checkpoints | Session-memory hook | +| **Isolation** | thread_id | Session key | +| **Resumption** | checkpoint_id | Session restore | +| **Updates** | Reducers | Direct merge | + +--- + +*Generated from source code analysis*