Last active
October 15, 2025 11:22
-
-
Save snopoke/4d32289a3750468614a1fb54b53a0f46 to your computer and use it in GitHub Desktop.
POC to illustrate remote tool calling in Langgraph using interrupts.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """POC to illustrate remote tool calling in Langgraph using interrupts. | |
| Based on the example at https://langchain-ai.github.io/langgraph/how-tos/human_in_the_loop/add-human-in-the-loop/#add-interrupts-to-any-tool | |
| export ANTHROPIC_API_KEY="..." | |
| export PG_CONNECTON_URL="postgres://postgres:postgres@localhost:5432/langgraph" | |
| python remote_tool.py "Book me a room at the Sheraton Hotel" | |
| python remote_tool.py --thread-id xxx --tool-response 'Hotel booking confirmed' | |
| """ | |
| import argparse | |
| import os | |
| import uuid | |
| from langchain_core.messages import AnyMessage | |
| from langchain_core.runnables import RunnableConfig | |
| from langchain_core.tools import BaseTool | |
| from langchain_core.tools import tool as create_tool | |
| from langgraph.checkpoint.postgres import PostgresSaver | |
| from langgraph.constants import START | |
| from langgraph.graph import MessagesState, StateGraph | |
| from langgraph.prebuilt import create_react_agent | |
| from langgraph.prebuilt.chat_agent_executor import AgentState | |
| from langgraph.prebuilt.interrupt import HumanInterrupt, HumanInterruptConfig | |
| from langgraph.types import Command, interrupt | |
| os.environ["ANTHROPIC_API_KEY"] = "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" | |
| def create_external_tool(tool_schema) -> BaseTool: | |
| tool_name = tool_schema["title"] | |
| @create_tool(tool_name, description=tool_schema["description"], args_schema=tool_schema) | |
| def call_tool_with_interrupt(config: RunnableConfig, **tool_input): | |
| request: HumanInterrupt = { | |
| "action_request": {"action": tool_name, "args": tool_input}, | |
| "config": HumanInterruptConfig(allow_respond=True), | |
| "description": "Please provide the tool response", | |
| } | |
| response = interrupt([request])[0] | |
| if response["type"] == "response": | |
| user_feedback = response["args"] | |
| tool_response = user_feedback | |
| else: | |
| raise ValueError(f"Unsupported interrupt response type: {response['type']}") | |
| return tool_response | |
| return call_tool_with_interrupt | |
| def run_agent(inputs, thread_id, verbose=False): | |
| connection_string = os.environ.get("PG_CONNECTON_URL", "postgres://postgres:postgres@localhost:5432/ocs_dimagi") | |
| pg_saver = PostgresSaver.from_conn_string(connection_string) | |
| tool_schema = { | |
| "description": "Book a hotel", | |
| "properties": {"hotel_name": {"title": "Hotel Name", "type": "string"}}, | |
| "required": ["hotel_name"], | |
| "title": "book_hotel", | |
| "type": "object", | |
| } | |
| with pg_saver as checkpointer: | |
| checkpointer.setup() | |
| def prompt(state: AgentState, config: RunnableConfig) -> list[AnyMessage]: | |
| print("building prompt for agent") | |
| user_name = config["configurable"].get("user_name") | |
| system_msg = f"You are a helpful assistant. Address the user as {user_name}." | |
| return [{"role": "system", "content": system_msg}] + state["messages"] | |
| agent = create_react_agent( | |
| model="anthropic:claude-3-5-sonnet-latest", | |
| tools=[ | |
| create_external_tool(tool_schema), | |
| ], | |
| prompt=prompt, | |
| ) | |
| def parent_node(state, config: RunnableConfig): | |
| print("Entered `parent_node`") | |
| subgraph_state = agent.invoke({"messages": state["messages"]}, config=config) | |
| return {"messages": [subgraph_state["messages"][-1]]} | |
| builder = StateGraph(MessagesState) | |
| builder.add_node("parent_node", parent_node) | |
| builder.add_edge(START, "parent_node") | |
| graph = builder.compile(checkpointer=checkpointer) | |
| config = {"configurable": {"thread_id": thread_id}} | |
| # Run the agent | |
| for chunk in graph.stream(inputs, config, subgraphs=True): | |
| if verbose: | |
| print(chunk) | |
| continue | |
| parent, chunk = chunk | |
| if item := chunk.get("agent"): | |
| content = item["messages"][0].content | |
| if isinstance(content, list): | |
| for piece in content: | |
| if piece["type"] == "text": | |
| print(piece["text"]) | |
| elif piece["type"] == "tool_use": | |
| print(f"tool call requested: {piece['name']}({piece['input']})") | |
| else: | |
| print(content) | |
| elif item := chunk.get("tools"): | |
| print("Tool Response: ", item["messages"][0].content) | |
| def main(): | |
| """Main CLI entry point | |
| Example usage: | |
| python remote_tool.py "Book my a room at the Sheraton Hotel" | |
| python remote_tool.py --thread-id 719b82c8-d430-4e7b-b7ce-a8c5a6f67d8f --tool-response 'Hotel booking confirmed' | |
| """ | |
| parser = argparse.ArgumentParser(description="LangGraph External Tool POC") | |
| parser.add_argument("query", nargs="?", help="Single query to process") | |
| parser.add_argument("--thread-id", "-t", help="Thread ID") | |
| parser.add_argument("--tool-response", "-r", help="Tool response") | |
| parser.add_argument("--verbose", "-v", action="store_true") | |
| args = parser.parse_args() | |
| thread_id = str(uuid.uuid4()) | |
| if args.thread_id: | |
| thread_id = args.thread_id | |
| if args.tool_response: | |
| run_agent(Command(resume=[{"type": "response", "args": args.tool_response}]), thread_id, args.verbose) | |
| else: | |
| if not args.query: | |
| parser.print_help() | |
| else: | |
| run_agent({"messages": [{"role": "user", "content": args.query}]}, thread_id, args.verbose) | |
| print("\nExecution has been interrupted.\nResume as follows:") | |
| print(f"\npython remote_tool.py --thread-id {thread_id} --tool-response 'hotel booking confirmed'") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import uuid | |
| from typing import TypedDict | |
| from langchain_core.messages import AnyMessage | |
| from langchain_core.runnables import RunnableConfig | |
| from langgraph.checkpoint.memory import InMemorySaver | |
| from langgraph.constants import START | |
| from langgraph.graph import StateGraph | |
| from langgraph.prebuilt import create_react_agent | |
| from langgraph.prebuilt.chat_agent_executor import AgentState | |
| from langgraph.types import Command, interrupt | |
| class State(TypedDict): | |
| """The graph state.""" | |
| state_counter: int | |
| counter_node_in_subgraph = 0 | |
| def node_in_subgraph(state: State): | |
| """A node in the sub-graph.""" | |
| global counter_node_in_subgraph | |
| counter_node_in_subgraph += 1 # This code will **NOT** run again! | |
| print(f"Entered `node_in_subgraph` a total of {counter_node_in_subgraph} times") | |
| counter_human_node = 0 | |
| def human_node(state: State): | |
| global counter_human_node | |
| counter_human_node += 1 # This code will run again! | |
| print(f"Entered human_node in sub-graph a total of {counter_human_node} times") | |
| answer = interrupt("what is your name?") | |
| print(f"Got an answer of {answer}") | |
| checkpointer = InMemorySaver() | |
| def prompt(state: AgentState, config: RunnableConfig) -> list[AnyMessage]: | |
| print("building prompt for agent") | |
| user_name = config["configurable"].get("user_name") | |
| system_msg = f"You are a helpful assistant. Address the user as {user_name}." | |
| return [{"role": "system", "content": system_msg}] + state["messages"] | |
| counter_parent_node = 0 | |
| def parent_node(state: State): | |
| """This parent node will invoke the subgraph.""" | |
| global counter_parent_node | |
| counter_parent_node += 1 # This code will run again on resuming! | |
| print(f"Entered `parent_node` a total of {counter_parent_node} times") | |
| subgraph_state = subgraph.invoke({"messages": [{"role": "user", "content": "hi"}]}) | |
| return subgraph_state | |
| builder = StateGraph(State) | |
| builder.add_node("parent_node", parent_node) | |
| builder.add_edge(START, "parent_node") | |
| # A checkpointer must be enabled for interrupts to work! | |
| checkpointer = InMemorySaver() | |
| os.environ["ANTHROPIC_API_KEY"] = "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" | |
| subgraph = create_react_agent( | |
| model="anthropic:claude-3-5-sonnet-latest", | |
| prompt=prompt, | |
| tools=[], | |
| checkpointer=checkpointer | |
| ) | |
| graph = builder.compile(checkpointer=checkpointer) | |
| config = { | |
| "configurable": { | |
| "thread_id": uuid.uuid4(), | |
| } | |
| } | |
| for chunk in graph.stream({"state_counter": 1}, config): | |
| print(chunk) | |
| print("--- Resuming ---") | |
| for chunk in graph.stream(Command(resume="35"), config): | |
| print(chunk) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment