Nhảy đến nội dung chính

Breakpoints và Human-in-the-loop trong LangGraph

Breakpoint là gì?

Một breakpoint trong LangGraph cho phép bạn tạm dừng workflow tại một node (bước), từ đó bạn có thể:

  • Kiểm tra input/output của node đó

  • Thay đổi trạng thái trước khi tiếp tục

  • Dùng trong quá trình debug, testing, hoặc human-in-the-loop

Cách hoạt động

Khi một node có breakpoint, LangGraph sẽ trả về trạng thái tạm thời (trạng thái dừng), và bạn có thể tiếp tục thực thi bằng cách gọi .invoke() hoặc .stream() lại với trạng thái đã cập nhật.

Ví dụ đơn giản: Breakpoint ở một bước LLM

1. Khởi tạo LangGraph
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda

# Bước 1: Xử lý văn bản
def process_text(state):
    text = state["input"]
    return {"processed": text.upper()}

# Bước 2: Tổng kết
def summarize(state):
    return {"summary": f"Summary: {state['processed']}"}

graph = StateGraph()

# Thêm node và breakpoints
graph.add_node("process", RunnableLambda(process_text))
graph.add_node("summarize", RunnableLambda(summarize))

graph.set_entry_point("process")
graph.add_edge("process", "summarize")
graph.add_edge("summarize", END)

# Gắn breakpoint vào bước "summarize"
graph.add_conditional_edges("summarize", lambda x: "__break__")

app = graph.compile()
2. Gọi invoke() và dừng tại breakpoint
state = app.invoke({"input": "Hello LangGraph!"})
print(state)

Output sẽ không kết thúc workflow, mà dừng tại node "summarize" vì có breakpoint.

3. Kiểm tra hoặc chỉnh sửa rồi tiếp tục
# Bạn có thể chỉnh sửa state nếu muốn
state["processed"] = "HELLO LANGGRAPH! (edited)"

# Tiếp tục từ breakpoint
final = app.invoke(state)
print(final)

Khi nào nên dùng breakpoints?

Tình huống Có nên dùng không?
Debug từng bước của graph
Thêm bước xác nhận từ người dùng
Gửi yêu cầu API cần kiểm tra trước
Tự động hóa hoàn toàn không kiểm tra ❌ Không cần

Tác giả: Đỗ Ngọc Tú
Công Ty Phần Mềm VHTSoft