5.7 KiB
LangGraph Channels
Version: 1.0.0
Last Updated: 2026-02-23
Overview
Channels are LangGraph's mechanism for inter-node communication and state storage. Each channel is a typed container with specific semantics for how values are written and read.
Channel Types
From channels/__init__.py
__all__ = [
"BaseChannel",
"LastValue",
"AnyValue",
"Topic",
"NamedBarrier",
"BinOp",
"EphemeralValue",
"UntrackedValue",
]
Channel Implementation Details
1. LastValue Channel
File: channels/last_value.py
Behavior: Most recent write wins. Reading returns the last value written.
class LastValue(Generic[Value]):
"""Channel that keeps the last value written."""
def __init__(self, typ: type[Value]):
self.typ = typ
self.value = None
def get(self) -> Value:
if self.value is None:
raise EmptyChannelError()
return self.value
def update(self, values: Sequence[Value]) -> bool:
if values:
self.value = values[-1] # Last wins
return True
return False
Use Case: Single-value state fields, like counter, status, current_step.
2. AnyValue Channel
File: channels/any_value.py
Behavior: First non-empty value wins. Reading returns the first value that was written and is still available.
class AnyValue(Generic[Value]):
"""Channel that returns the first available value."""
def get(self) -> Value:
if self.value is None:
raise EmptyChannelError()
return self.value
def update(self, values: Sequence[Value]) -> bool:
if values and self.value is None:
self.value = values[0] # First wins
return True
return False
Use Case: Optional fields, fallback values.
3. Topic Channel
File: channels/topic.py
Behavior: Pub/sub. Nodes can publish to topics, subscribers receive all messages.
class Topic(Generic[Value]):
"""Pub/sub channel for broadcasting."""
def __init__(self, typ: type[Value], selector: Callable = None):
self.typ = typ
self.selector = selector or (lambda x: x)
self.subscriptions: dict[str, set] = defaultdict(set)
def get(self) -> list[Value]:
# Return all values for subscribed topic
...
def update(self, values: Sequence[Value]) -> bool:
# Add values to topic
...
Use Case: Broadcasting to multiple nodes, event systems.
4. NamedBarrier Channel
File: channels/named_barrier_value.py
Behavior: Blocks until all named tasks complete. Used for synchronization.
class NamedBarrier:
"""Synchronization point - blocks until all expected tasks arrive."""
def get(self) -> None:
# Block until all tasks arrive
...
def update(self, values: Sequence[str]) -> bool:
# Register task completion
...
Use Case: Wait for parallel branches to complete.
5. BinOp Channel
File: channels/binop.py
Behavior: Applies binary operation to combine values.
class BinOp(Generic[Value]):
"""Binary operation channel."""
def __init__(self, typ: type[Value], op: Callable[[Value, Value], Value]):
self.op = op
self.value = None
def get(self) -> Value:
return self.value
def update(self, values: Sequence[Value]) -> bool:
for v in values:
if self.value is None:
self.value = v
else:
self.value = self.op(self.value, v)
return True
Use Case: Aggregations (sum, max, min, union).
6. EphemeralValue Channel
File: channels/ephemeral_value.py
Behavior: One-time use. Value is consumed after reading.
class EphemeralValue:
"""One-time use value - consumed after read."""
def get(self) -> Value:
value = self.value
self.value = None # Consume
return value
Use Case: One-time signals, commands.
7. UntrackedValue Channel
File: channels/untracked_value.py
Behavior: Value is not checkpointed. Used for transient data.
class UntrackedValue:
"""Value that doesn't participate in checkpointing."""
pass
Use Case: Temporary data, debugging info.
Channel Configuration
Declaring Channels
from langgraph.graph import StateGraph
from typing import TypedDict
class GraphState(TypedDict):
messages: list
counter: int
graph = StateGraph(GraphState)
# Default channels:
# - list fields -> LastValue[list]
# - other fields -> LastValue[type]
Custom Channels
from langgraph.channels import BaseChannel
class Accumulate(BaseChannel):
def __init__(self, typ: type):
self.typ = typ
self.values = []
def get(self) -> list:
return self.values
def update(self, values) -> bool:
self.values.extend(values)
return True
Channel vs State
| Concept | Description |
|---|---|
| State | TypedDict defining all fields |
| Channel | Storage mechanism per field |
| Reducer | How updates are merged |
Checkpointing Channels
What Gets Persisted
- All channel values are checkpointed
- Except
UntrackedValuechannels - Checkpoint includes
channel_valuesandchannel_versions
Checkpoint Format
checkpoint = {
"channel_values": {
"messages": [...],
"counter": 5,
},
"channel_versions": {
"messages": 3,
"counter": 5,
},
"metadata": {...}
}
Generated from source code analysis