Files
2026-02-23 13:11:51 -06:00

5.7 KiB

LangGraph Channels

Version: 1.0.0
Last Updated: 2026-02-23


Overview

Channels are LangGraph's mechanism for inter-node communication and state storage. Each channel is a typed container with specific semantics for how values are written and read.


Channel Types

From channels/__init__.py

__all__ = [
    "BaseChannel",
    "LastValue",
    "AnyValue", 
    "Topic",
    "NamedBarrier",
    "BinOp",
    "EphemeralValue",
    "UntrackedValue",
]

Channel Implementation Details

1. LastValue Channel

File: channels/last_value.py

Behavior: Most recent write wins. Reading returns the last value written.

class LastValue(Generic[Value]):
    """Channel that keeps the last value written."""
    
    def __init__(self, typ: type[Value]):
        self.typ = typ
        self.value = None
        
    def get(self) -> Value:
        if self.value is None:
            raise EmptyChannelError()
        return self.value
    
    def update(self, values: Sequence[Value]) -> bool:
        if values:
            self.value = values[-1]  # Last wins
            return True
        return False

Use Case: Single-value state fields, like counter, status, current_step.


2. AnyValue Channel

File: channels/any_value.py

Behavior: First non-empty value wins. Reading returns the first value that was written and is still available.

class AnyValue(Generic[Value]):
    """Channel that returns the first available value."""
    
    def get(self) -> Value:
        if self.value is None:
            raise EmptyChannelError()
        return self.value
    
    def update(self, values: Sequence[Value]) -> bool:
        if values and self.value is None:
            self.value = values[0]  # First wins
            return True
        return False

Use Case: Optional fields, fallback values.


3. Topic Channel

File: channels/topic.py

Behavior: Pub/sub. Nodes can publish to topics, subscribers receive all messages.

class Topic(Generic[Value]):
    """Pub/sub channel for broadcasting."""
    
    def __init__(self, typ: type[Value], selector: Callable = None):
        self.typ = typ
        self.selector = selector or (lambda x: x)
        self.subscriptions: dict[str, set] = defaultdict(set)
        
    def get(self) -> list[Value]:
        # Return all values for subscribed topic
        ...
    
    def update(self, values: Sequence[Value]) -> bool:
        # Add values to topic
        ...

Use Case: Broadcasting to multiple nodes, event systems.


4. NamedBarrier Channel

File: channels/named_barrier_value.py

Behavior: Blocks until all named tasks complete. Used for synchronization.

class NamedBarrier:
    """Synchronization point - blocks until all expected tasks arrive."""
    
    def get(self) -> None:
        # Block until all tasks arrive
        ...
    
    def update(self, values: Sequence[str]) -> bool:
        # Register task completion
        ...

Use Case: Wait for parallel branches to complete.


5. BinOp Channel

File: channels/binop.py

Behavior: Applies binary operation to combine values.

class BinOp(Generic[Value]):
    """Binary operation channel."""
    
    def __init__(self, typ: type[Value], op: Callable[[Value, Value], Value]):
        self.op = op
        self.value = None
        
    def get(self) -> Value:
        return self.value
    
    def update(self, values: Sequence[Value]) -> bool:
        for v in values:
            if self.value is None:
                self.value = v
            else:
                self.value = self.op(self.value, v)
        return True

Use Case: Aggregations (sum, max, min, union).


6. EphemeralValue Channel

File: channels/ephemeral_value.py

Behavior: One-time use. Value is consumed after reading.

class EphemeralValue:
    """One-time use value - consumed after read."""
    
    def get(self) -> Value:
        value = self.value
        self.value = None  # Consume
        return value

Use Case: One-time signals, commands.


7. UntrackedValue Channel

File: channels/untracked_value.py

Behavior: Value is not checkpointed. Used for transient data.

class UntrackedValue:
    """Value that doesn't participate in checkpointing."""
    pass

Use Case: Temporary data, debugging info.


Channel Configuration

Declaring Channels

from langgraph.graph import StateGraph
from typing import TypedDict

class GraphState(TypedDict):
    messages: list
    counter: int

graph = StateGraph(GraphState)

# Default channels:
# - list fields -> LastValue[list] 
# - other fields -> LastValue[type]

Custom Channels

from langgraph.channels import BaseChannel

class Accumulate(BaseChannel):
    def __init__(self, typ: type):
        self.typ = typ
        self.values = []
    
    def get(self) -> list:
        return self.values
    
    def update(self, values) -> bool:
        self.values.extend(values)
        return True

Channel vs State

Concept Description
State TypedDict defining all fields
Channel Storage mechanism per field
Reducer How updates are merged

Checkpointing Channels

What Gets Persisted

  • All channel values are checkpointed
  • Except UntrackedValue channels
  • Checkpoint includes channel_values and channel_versions

Checkpoint Format

checkpoint = {
    "channel_values": {
        "messages": [...],
        "counter": 5,
    },
    "channel_versions": {
        "messages": 3,
        "counter": 5,
    },
    "metadata": {...}
}

Generated from source code analysis