Cách Tùy Chỉnh Cập Nhật Trạng Thái bằng Reducers
Trong LangGraph, Reducers là những hàm đặc biệt được sử dụng để tổng hợp dữ liệu hoặc kết hợp trạng thái khi có nhiều luồng song song (parallel branches) trả về kết quả khác nhau.
Nói cách khác:
Khi bạn chạy nhiều node cùng lúc (parallel), và muốn gom kết quả từ chúng lại để đưa vào bước tiếp theo – thì bạn cần một Reducer.
Bạn cần dùng Reducer khi:
-
Bạn có các nhánh song song trong graph.
-
Mỗi nhánh thực hiện công việc riêng và trả về một phần kết quả.
-
Bạn cần gom các kết quả đó về một trạng thái chung trước khi đi tiếp.
Mục đích | Chi tiết |
---|---|
Kết hợp dữ liệu | Khi có nhiều nhánh chạy song song |
Nhận đầu vào | Một danh sách các trạng thái (dict) |
Trả đầu ra | Một trạng thái duy nhất đã được tổng hợp |
Ứng dụng | Tổng hợp kết quả LLM, kết hợp thông tin từ nhiều nguồn |
Giả sử bạn có 3 nhánh song song:
-
Node A xử lý phần tên.
-
Node B xử lý địa chỉ.
-
Node C xử lý số điện thoại.
Sau khi 3 node này chạy xong, bạn cần gộp kết quả lại thành:
{
"name": "VHTSoft",
"address": "TP. Hồ Chí Minh",
"phone": "0123456789"
}
Reducer sẽ gom và hợp nhất những giá trị này thành một trạng thái chung.
Định nghĩa Reducer
Reducer là một hàm nhận vào nhiều trạng thái (dưới dạng danh sách) và trả về một trạng thái duy nhất.
def my_reducer(states: list[dict]) -> dict:
result = {}
for state in states:
result.update(state) # Gộp tất cả dict lại
return result
Cách sử dụng trong LangGraph
Khi bạn xây dựng graph với StateGraph
, bạn có thể chỉ định reducer cho một node nhất định như sau:
from langgraph.graph import StateGraph
builder = StateGraph(dict)
# Các node song song được thêm ở đây...
# Thêm reducer cho node tổng hợp
builder.add_reducer("merge_node", my_reducer)
Ví dụ thực tế
1. Khai báo các node
def node_1(state): return {"name": "VHTSoft"}
def node_2(state): return {"address": "TP.Hồ Chí Minh"}
def node_3(state): return {"phone": "0123456789"}
2. Tạo graph
builder = StateGraph(dict)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
builder.add_node("merge_node", lambda x: x) # Dummy node
# Thêm reducer
def my_reducer(states): # states là list các dict
result = {}
for s in states:
result.update(s)
return result
builder.add_reducer("merge_node", my_reducer)
# Chạy 3 node song song → gộp về merge_node
builder.add_edge("node_1", "merge_node")
builder.add_edge("node_2", "merge_node")
builder.add_edge("node_3", "merge_node")
builder.set_entry_point("node_1") # node_1 là điểm vào chính
graph = builder.compile()
Các hàm hữu ích về Reducers trong LangGraph
1. Custom Logic trong Reducer
Bạn không bị giới hạn bởi việc chỉ dùng dict.update()
. Bạn có thể áp dụng bất kỳ logic tùy chỉnh nào để gộp dữ liệu:
Ví dụ: Lọc trạng thái hợp lệ
def filtered_reducer(states: list[dict]) -> dict:
final_state = {}
for state in states:
# Bỏ qua những trạng thái thiếu dữ liệu cần thiết
if "score" in state and state["score"] >= 0.8:
final_state.update(state)
return final_state
Dùng khi:
-
Bạn chỉ muốn lấy kết quả “đủ tốt” từ các nhánh.
-
Bạn có các nhánh xử lý có thể bị lỗi, hoặc chất lượng không đồng đều.
Trong ví dụ này, ta sẽ có 3 nhánh song song trả về thông tin khác nhau cùng với một chỉ số độ tin cậy (score
). Sau đó, ta sẽ dùng một reducer
để chỉ giữ lại những kết quả có score >= 0.8
.
Giả sử bạn có một graph với 3 agent xử lý một nhiệm vụ phân tích cảm xúc từ một đoạn văn. Mỗi agent trả về:
-
sentiment
: happy/sad/neutral -
score
: độ tin cậy (confidence score)
Bạn chỉ muốn giữ lại kết quả của các agent có độ tin cậy cao (score ≥ 0.8).
Bước 1: Tạo các node trả về kết quả
from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
sentiment: str
score: float
# Mỗi node trả về một kết quả khác nhau
def analyzer_1(state):
return {"sentiment": "happy", "score": 0.9}
def analyzer_2(state):
return {"sentiment": "sad", "score": 0.7} # Không đạt
def analyzer_3(state):
return {"sentiment": "neutral", "score": 0.85}
Bước 2: Tạo filtered_reducer
def filtered_reducer(states: list[dict]) -> dict:
for state in states:
if state.get("score", 0) >= 0.8:
return state # Trả về state đầu tiên đạt yêu cầu
return {"sentiment": "unknown", "score": 0.0}
Bước 3: Dựng graph và chạy thử
graph = StateGraph(State)
graph.add_node("analyzer_1", analyzer_1)
graph.add_node("analyzer_2", analyzer_2)
graph.add_node("analyzer_3", analyzer_3)
graph.add_edge("analyzer_1", END)
graph.add_edge("analyzer_2", END)
graph.add_edge("analyzer_3", END)
graph.set_entry_point(["analyzer_1", "analyzer_2", "analyzer_3"])
graph.set_finish_point(END, filtered_reducer)
app = graph.compile()
result = app.invoke({})
print(result)
Kết quả mong đợi:
{'sentiment': 'happy', 'score': 0.9}
Vì analyzer_1 là node đầu tiên đạt yêu cầu score >= 0.8
, nên nó được giữ lại. Các kết quả khác bị bỏ qua.
Biến thể nâng cao: Trả về danh sách tất cả các kết quả tốt
def filtered_reducer(states: list[dict]) -> dict:
good_results = [s for s in states if s.get("score", 0) >= 0.8]
return {"results": good_results}
Kết quả:
{
'results': [
{'sentiment': 'happy', 'score': 0.9},
{'sentiment': 'neutral', 'score': 0.85}
]
}
2. Merging trạng thái phức tạp (nested)
Giả sử mỗi nhánh trả về trạng thái dạng lồng nhau (nested):
{
"agent": {
"name": "Alice",
"tasks": ["translate"]
}
}
Bạn có thể merge có điều kiện, thậm chí hợp nhất danh sách:
def deep_merge_reducer(states: list[dict]) -> dict:
merged = {"agent": {"name": "", "tasks": []}}
for state in states:
agent = state.get("agent", {})
if "name" in agent:
merged["agent"]["name"] = agent["name"]
if "tasks" in agent:
merged["agent"]["tasks"] += agent["tasks"]
return merged
Giả sử bạn có một hệ thống thu thập thông tin sản phẩm từ nhiều nguồn. Mỗi node (agent) sẽ trả về thông tin chi tiết khác nhau về sản phẩm, như:
-
name
-
price
-
reviews
Mỗi thông tin nằm trong một cấu trúc nested dictionary:
{
"product": {
"name": ...,
"price": ...,
"reviews": [...],
}
}
Chúng ta sẽ merge lại tất cả thành một state
hoàn chỉnh.
1. Định nghĩa state phức tạp
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END
class ProductInfo(TypedDict, total=False):
name: Optional[str]
price: Optional[float]
reviews: list[str]
class State(TypedDict, total=False):
product: ProductInfo
2. Các node thu thập thông tin khác nhau
def fetch_name(state):
return {
"product": {
"name": "Smartphone X"
}
}
def fetch_price(state):
return {
"product": {
"price": 799.99
}
}
def fetch_reviews(state):
return {
"product": {
"reviews": ["Good value", "Excellent battery", "Fast delivery"]
}
}
3. Merge state phức tạp bằng custom reducer
LangGraph sẽ merge các dicts theo mặc định, nhưng với dict lồng nhau, bạn cần viết custom reducer
.
def nested_merge_reducer(states: list[dict]) -> dict:
merged = {"product": {}}
for state in states:
product = state.get("product", {})
for key, value in product.items():
if key == "reviews":
merged["product"].setdefault("reviews", []).extend(value)
else:
merged["product"][key] = value
return merged
4. Tạo Graph
graph = StateGraph(State)
graph.add_node("name", fetch_name)
graph.add_node("price", fetch_price)
graph.add_node("reviews", fetch_reviews)
graph.add_edge("name", END)
graph.add_edge("price", END)
graph.add_edge("reviews", END)
graph.set_entry_point(["name", "price", "reviews"])
graph.set_finish_point(END, nested_merge_reducer)
app = graph.compile()
result = app.invoke({})
print(result)
Kết quả mong đợi:
{
"product": {
"name": "Smartphone X",
"price": 799.99,
"reviews": [
"Good value",
"Excellent battery",
"Fast delivery"
]
}
}
Ghi chú:
-
Nếu bạn không viết
reducer
, các dict lồng nhau sẽ bị ghi đè. -
nested_merge_reducer
giúp kết hợp thông minh các phần tử trongproduct
.
END
graph = StateGraph(State) graph.add_node("agent_1", agent_1) graph.add_node("agent_2", agent_2) graph.add_node("agent_3", agent_3) graph.add_edge("agent_1", END) graph.add_edge("agent_2", END) graph.add_edge("agent_3", END) graph.set_entry_point("agent_1") graph.set_finish_point(END, reducer=weighted_merge) app = graph.compile() result = app.invoke({}) print(result)Kết quả:
{'final_answer': 'Trả lời từ agent 3'}