Breakpoints và Human-in-the-loop trong LangGraph
Breakpoint là gì?
Một breakpoint trong LangGraph cho phép bạn tạm dừng workflow tại một node (bước), từ đó bạn có thể:
-
Kiểm tra input/output của node đó
-
Thay đổi trạng thái trước khi tiếp tục
-
Dùng trong quá trình debug, testing, hoặc human-in-the-loop
Cách hoạt động
Khi một node có breakpoint, LangGraph sẽ trả về trạng thái tạm thời (trạng thái dừng), và bạn có thể tiếp tục thực thi bằng cách gọi .invoke()
hoặc .stream()
lại với trạng thái đã cập nhật.
Ví dụ đơn giản: Breakpoint ở một bước LLM
1. Khởi tạo LangGraph
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda
# Bước 1: Xử lý văn bản
def process_text(state):
text = state["input"]
return {"processed": text.upper()}
# Bước 2: Tổng kết
def summarize(state):
return {"summary": f"Summary: {state['processed']}"}
graph = StateGraph()
# Thêm node và breakpoints
graph.add_node("process", RunnableLambda(process_text))
graph.add_node("summarize", RunnableLambda(summarize))
graph.set_entry_point("process")
graph.add_edge("process", "summarize")
graph.add_edge("summarize", END)
# Gắn breakpoint vào bước "summarize"
graph.add_conditional_edges("summarize", lambda x: "__break__")
app = graph.compile()
2. Gọi invoke()
và dừng tại breakpoint
state = app.invoke({"input": "Hello LangGraph!"})
print(state)
Output sẽ không kết thúc workflow, mà dừng tại node "summarize"
vì có breakpoint.
3. Kiểm tra hoặc chỉnh sửa rồi tiếp tục
# Bạn có thể chỉnh sửa state nếu muốn
state["processed"] = "HELLO LANGGRAPH! (edited)"
# Tiếp tục từ breakpoint
final = app.invoke(state)
print(final)
Khi nào nên dùng breakpoints?
Tình huống | Có nên dùng không? |
---|---|
Debug từng bước của graph | Có |
Thêm bước xác nhận từ người dùng | Có |
Gửi yêu cầu API cần kiểm tra trước | Có |
Tự động hóa hoàn toàn không kiểm tra | ❌ Không cần |
Tác giả: Đỗ Ngọc Tú
Công Ty Phần Mềm VHTSoft
Không có bình luận