From 79d65e55a1886a5353123342d30e97a91216a395 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Thu, 11 Sep 2025 13:06:44 -0400 Subject: [PATCH] chore: add type annotations and docstrings to langgraph adapters (#3503) --- .../langgraph/langgraph_adapter.py | 246 ++++++++++++------ .../langgraph/langgraph_tool_adapter.py | 77 ++++-- .../agent_adapters/langgraph/protocols.py | 55 ++++ .../langgraph/structured_output_converter.py | 84 ++++-- 4 files changed, 347 insertions(+), 115 deletions(-) create mode 100644 src/crewai/agents/agent_adapters/langgraph/protocols.py diff --git a/src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py b/src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py index 4397ac927..504e1ad07 100644 --- a/src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py +++ b/src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py @@ -1,47 +1,56 @@ -from typing import Any, Dict, List, Optional +"""LangGraph agent adapter for CrewAI integration. -from pydantic import Field, PrivateAttr +This module contains the LangGraphAgentAdapter class that integrates LangGraph ReAct agents +with CrewAI's agent system. Provides memory persistence, tool integration, and structured +output functionality. +""" + +from collections.abc import Callable +from typing import Any, cast + +from pydantic import ConfigDict, Field, PrivateAttr from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter from crewai.agents.agent_adapters.langgraph.langgraph_tool_adapter import ( LangGraphToolAdapter, ) +from crewai.agents.agent_adapters.langgraph.protocols import ( + LangGraphCheckPointMemoryModule, + LangGraphPrebuiltModule, +) from crewai.agents.agent_adapters.langgraph.structured_output_converter import ( LangGraphConverterAdapter, ) from crewai.agents.agent_builder.base_agent import BaseAgent -from crewai.tools.agent_tools.agent_tools import AgentTools -from crewai.tools.base_tool import BaseTool -from crewai.utilities import Logger -from crewai.utilities.converter import Converter from crewai.events.event_bus import crewai_event_bus from crewai.events.types.agent_events import ( AgentExecutionCompletedEvent, AgentExecutionErrorEvent, AgentExecutionStartedEvent, ) - -try: - from langgraph.checkpoint.memory import MemorySaver - from langgraph.prebuilt import create_react_agent - - LANGGRAPH_AVAILABLE = True -except ImportError: - LANGGRAPH_AVAILABLE = False +from crewai.tools.agent_tools.agent_tools import AgentTools +from crewai.tools.base_tool import BaseTool +from crewai.utilities import Logger +from crewai.utilities.converter import Converter +from crewai.utilities.import_utils import require class LangGraphAgentAdapter(BaseAgentAdapter): - """Adapter for LangGraph agents to work with CrewAI.""" + """Adapter for LangGraph agents to work with CrewAI. - model_config = {"arbitrary_types_allowed": True} + This adapter integrates LangGraph's ReAct agents with CrewAI's agent system, + providing memory persistence, tool integration, and structured output support. + """ - _logger: Logger = PrivateAttr(default_factory=lambda: Logger()) + model_config = ConfigDict(arbitrary_types_allowed=True) + + _logger: Logger = PrivateAttr(default_factory=Logger) _tool_adapter: LangGraphToolAdapter = PrivateAttr() _graph: Any = PrivateAttr(default=None) _memory: Any = PrivateAttr(default=None) _max_iterations: int = PrivateAttr(default=10) function_calling_llm: Any = Field(default=None) - step_callback: Any = Field(default=None) + step_callback: Callable[..., Any] | None = Field(default=None) model: str = Field(default="gpt-4o") verbose: bool = Field(default=False) @@ -51,17 +60,24 @@ class LangGraphAgentAdapter(BaseAgentAdapter): role: str, goal: str, backstory: str, - tools: Optional[List[BaseTool]] = None, + tools: list[BaseTool] | None = None, llm: Any = None, max_iterations: int = 10, - agent_config: Optional[Dict[str, Any]] = None, + agent_config: dict[str, Any] | None = None, **kwargs, - ): - """Initialize the LangGraph agent adapter.""" - if not LANGGRAPH_AVAILABLE: - raise ImportError( - "LangGraph Agent Dependencies are not installed. Please install it using `uv add langchain-core langgraph`" - ) + ) -> None: + """Initialize the LangGraph agent adapter. + + Args: + role: The role description for the agent. + goal: The primary goal the agent should achieve. + backstory: Background information about the agent. + tools: Optional list of tools available to the agent. + llm: Language model to use, defaults to gpt-4o. + max_iterations: Maximum number of iterations for task execution. + agent_config: Additional configuration for the LangGraph agent. + **kwargs: Additional arguments passed to the base adapter. + """ super().__init__( role=role, goal=goal, @@ -72,46 +88,65 @@ class LangGraphAgentAdapter(BaseAgentAdapter): **kwargs, ) self._tool_adapter = LangGraphToolAdapter(tools=tools) - self._converter_adapter = LangGraphConverterAdapter(self) + self._converter_adapter: LangGraphConverterAdapter = LangGraphConverterAdapter( + self + ) self._max_iterations = max_iterations self._setup_graph() def _setup_graph(self) -> None: - """Set up the LangGraph workflow graph.""" - try: - self._memory = MemorySaver() + """Set up the LangGraph workflow graph. - converted_tools: List[Any] = self._tool_adapter.tools() - if self._agent_config: - self._graph = create_react_agent( - model=self.llm, - tools=converted_tools, - checkpointer=self._memory, - debug=self.verbose, - **self._agent_config, - ) - else: - self._graph = create_react_agent( - model=self.llm, - tools=converted_tools or [], - checkpointer=self._memory, - debug=self.verbose, - ) + Initializes the memory saver and creates a ReAct agent with the configured + tools, memory checkpointer, and debug settings. + """ - except ImportError as e: - self._logger.log( - "error", f"Failed to import LangGraph dependencies: {str(e)}" + memory_saver: type[Any] = cast( + LangGraphCheckPointMemoryModule, + require( + "langgraph.checkpoint.memory", + purpose="LangGraph core functionality", + ), + ).MemorySaver + create_react_agent: Callable[..., Any] = cast( + LangGraphPrebuiltModule, + require( + "langgraph.prebuilt", + purpose="LangGraph core functionality", + ), + ).create_react_agent + + self._memory = memory_saver() + + converted_tools: list[Any] = self._tool_adapter.tools() + if self._agent_config: + self._graph = create_react_agent( + model=self.llm, + tools=converted_tools, + checkpointer=self._memory, + debug=self.verbose, + **self._agent_config, + ) + else: + self._graph = create_react_agent( + model=self.llm, + tools=converted_tools or [], + checkpointer=self._memory, + debug=self.verbose, ) - raise - except Exception as e: - self._logger.log("error", f"Error setting up LangGraph agent: {str(e)}") - raise def _build_system_prompt(self) -> str: - """Build a system prompt for the LangGraph agent.""" + """Build a system prompt for the LangGraph agent. + + Creates a prompt that includes the agent's role, goal, and backstory, + then enhances it through the converter adapter for structured output. + + Returns: + The complete system prompt string. + """ base_prompt = f""" You are {self.role}. - + Your goal is: {self.goal} Your backstory: {self.backstory} @@ -123,10 +158,25 @@ class LangGraphAgentAdapter(BaseAgentAdapter): def execute_task( self, task: Any, - context: Optional[str] = None, - tools: Optional[List[BaseTool]] = None, + context: str | None = None, + tools: list[BaseTool] | None = None, ) -> str: - """Execute a task using the LangGraph workflow.""" + """Execute a task using the LangGraph workflow. + + Configures the agent, processes the task through the LangGraph workflow, + and handles event emission for execution tracking. + + Args: + task: The task object to execute. + context: Optional context information for the task. + tools: Optional additional tools for this specific execution. + + Returns: + The final answer from the task execution. + + Raises: + Exception: If task execution fails. + """ self.create_agent_executor(tools) self.configure_structured_output(task) @@ -151,9 +201,11 @@ class LangGraphAgentAdapter(BaseAgentAdapter): session_id = f"task_{id(task)}" - config = {"configurable": {"thread_id": session_id}} + config: dict[str, dict[str, str]] = { + "configurable": {"thread_id": session_id} + } - result = self._graph.invoke( + result: dict[str, Any] = self._graph.invoke( { "messages": [ ("system", self._build_system_prompt()), @@ -163,10 +215,10 @@ class LangGraphAgentAdapter(BaseAgentAdapter): config, ) - messages = result.get("messages", []) - last_message = messages[-1] if messages else None + messages: list[Any] = result.get("messages", []) + last_message: Any = messages[-1] if messages else None - final_answer = "" + final_answer: str = "" if isinstance(last_message, dict): final_answer = last_message.get("content", "") elif hasattr(last_message, "content"): @@ -186,7 +238,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter): return final_answer except Exception as e: - self._logger.log("error", f"Error executing LangGraph task: {str(e)}") + self._logger.log("error", f"Error executing LangGraph task: {e!s}") crewai_event_bus.emit( self, event=AgentExecutionErrorEvent( @@ -197,29 +249,67 @@ class LangGraphAgentAdapter(BaseAgentAdapter): ) raise - def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None: - """Configure the LangGraph agent for execution.""" + def create_agent_executor(self, tools: list[BaseTool] | None = None) -> None: + """Configure the LangGraph agent for execution. + + Args: + tools: Optional tools to configure for the agent. + """ self.configure_tools(tools) - def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None: - """Configure tools for the LangGraph agent.""" + def configure_tools(self, tools: list[BaseTool] | None = None) -> None: + """Configure tools for the LangGraph agent. + + Merges additional tools with existing ones and updates the graph's + available tools through the tool adapter. + + Args: + tools: Optional additional tools to configure. + """ if tools: - all_tools = list(self.tools or []) + list(tools or []) + all_tools: list[BaseTool] = list(self.tools or []) + list(tools or []) self._tool_adapter.configure_tools(all_tools) - available_tools = self._tool_adapter.tools() + available_tools: list[Any] = self._tool_adapter.tools() self._graph.tools = available_tools - def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]: - """Implement delegation tools support for LangGraph.""" - agent_tools = AgentTools(agents=agents) + def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]: + """Implement delegation tools support for LangGraph. + + Creates delegation tools that allow this agent to delegate tasks to other agents. + + Args: + agents: List of agents available for delegation. + + Returns: + List of delegation tools. + """ + agent_tools: AgentTools = AgentTools(agents=agents) return agent_tools.tools() + @staticmethod def get_output_converter( - self, llm: Any, text: str, model: Any, instructions: str - ) -> Any: - """Convert output format if needed.""" + llm: Any, text: str, model: Any, instructions: str + ) -> Converter: + """Convert output format if needed. + + Args: + llm: Language model instance. + text: Text to convert. + model: Model configuration. + instructions: Conversion instructions. + + Returns: + Converter instance for output transformation. + """ return Converter(llm=llm, text=text, model=model, instructions=instructions) - def configure_structured_output(self, task) -> None: - """Configure the structured output for LangGraph.""" + def configure_structured_output(self, task: Any) -> None: + """Configure the structured output for LangGraph. + + Uses the converter adapter to set up structured output formatting + based on the task requirements. + + Args: + task: Task object containing output requirements. + """ self._converter_adapter.configure_structured_output(task) diff --git a/src/crewai/agents/agent_adapters/langgraph/langgraph_tool_adapter.py b/src/crewai/agents/agent_adapters/langgraph/langgraph_tool_adapter.py index 0bc31d201..9dc055e37 100644 --- a/src/crewai/agents/agent_adapters/langgraph/langgraph_tool_adapter.py +++ b/src/crewai/agents/agent_adapters/langgraph/langgraph_tool_adapter.py @@ -1,38 +1,72 @@ +"""LangGraph tool adapter for CrewAI tool integration. + +This module contains the LangGraphToolAdapter class that converts CrewAI tools +to LangGraph-compatible format using langchain_core.tools. +""" + import inspect -from typing import Any, List, Optional +from collections.abc import Awaitable +from typing import Any from crewai.agents.agent_adapters.base_tool_adapter import BaseToolAdapter from crewai.tools.base_tool import BaseTool class LangGraphToolAdapter(BaseToolAdapter): - """Adapts CrewAI tools to LangGraph agent tool compatible format""" + """Adapts CrewAI tools to LangGraph agent tool compatible format. - def __init__(self, tools: Optional[List[BaseTool]] = None): - self.original_tools = tools or [] - self.converted_tools = [] + Converts CrewAI BaseTool instances to langchain_core.tools format + that can be used by LangGraph agents. + """ - def configure_tools(self, tools: List[BaseTool]) -> None: + def __init__(self, tools: list[BaseTool] | None = None) -> None: + """Initialize the tool adapter. + + Args: + tools: Optional list of CrewAI tools to adapt. """ - Configure and convert CrewAI tools to LangGraph-compatible format. - LangGraph expects tools in langchain_core.tools format. - """ - from langchain_core.tools import BaseTool, StructuredTool + super().__init__() + self.original_tools: list[BaseTool] = tools or [] + self.converted_tools: list[Any] = [] - converted_tools = [] + def configure_tools(self, tools: list[BaseTool]) -> None: + """Configure and convert CrewAI tools to LangGraph-compatible format. + + LangGraph expects tools in langchain_core.tools format. This method + converts CrewAI BaseTool instances to StructuredTool instances. + + Args: + tools: List of CrewAI tools to convert. + """ + from langchain_core.tools import BaseTool as LangChainBaseTool + from langchain_core.tools import StructuredTool + + converted_tools: list[Any] = [] if self.original_tools: - all_tools = tools + self.original_tools + all_tools: list[BaseTool] = tools + self.original_tools else: all_tools = tools for tool in all_tools: - if isinstance(tool, BaseTool): + if isinstance(tool, LangChainBaseTool): converted_tools.append(tool) continue - sanitized_name = self.sanitize_tool_name(tool.name) + sanitized_name: str = self.sanitize_tool_name(tool.name) - async def tool_wrapper(*args, tool=tool, **kwargs): - output = None + async def tool_wrapper( + *args: Any, tool: BaseTool = tool, **kwargs: Any + ) -> Any: + """Wrapper function to adapt CrewAI tool calls to LangGraph format. + + Args: + *args: Positional arguments for the tool. + tool: The CrewAI tool to wrap. + **kwargs: Keyword arguments for the tool. + + Returns: + The result from the tool execution. + """ + output: Any | Awaitable[Any] if len(args) > 0 and isinstance(args[0], str): output = tool.run(args[0]) elif "input" in kwargs: @@ -41,12 +75,12 @@ class LangGraphToolAdapter(BaseToolAdapter): output = tool.run(**kwargs) if inspect.isawaitable(output): - result = await output + result: Any = await output else: result = output return result - converted_tool = StructuredTool( + converted_tool: StructuredTool = StructuredTool( name=sanitized_name, description=tool.description, func=tool_wrapper, @@ -57,5 +91,10 @@ class LangGraphToolAdapter(BaseToolAdapter): self.converted_tools = converted_tools - def tools(self) -> List[Any]: + def tools(self) -> list[Any]: + """Get the list of converted tools. + + Returns: + List of LangGraph-compatible tools. + """ return self.converted_tools or [] diff --git a/src/crewai/agents/agent_adapters/langgraph/protocols.py b/src/crewai/agents/agent_adapters/langgraph/protocols.py new file mode 100644 index 000000000..858b35bb5 --- /dev/null +++ b/src/crewai/agents/agent_adapters/langgraph/protocols.py @@ -0,0 +1,55 @@ +"""Type protocols for LangGraph modules.""" + +from typing import Any, Protocol, runtime_checkable + + +@runtime_checkable +class LangGraphMemorySaver(Protocol): + """Protocol for LangGraph MemorySaver. + + Defines the interface for LangGraph's memory persistence mechanism. + """ + + def __init__(self) -> None: + """Initialize the memory saver.""" + ... + + +@runtime_checkable +class LangGraphCheckPointMemoryModule(Protocol): + """Protocol for LangGraph checkpoint memory module. + + Defines the interface for modules containing memory checkpoint functionality. + """ + + MemorySaver: type[LangGraphMemorySaver] + + +@runtime_checkable +class LangGraphPrebuiltModule(Protocol): + """Protocol for LangGraph prebuilt module. + + Defines the interface for modules containing prebuilt agent factories. + """ + + def create_react_agent( + self, + model: Any, + tools: list[Any], + checkpointer: Any, + debug: bool = False, + **kwargs: Any, + ) -> Any: + """Create a ReAct agent with the given configuration. + + Args: + model: The language model to use for the agent. + tools: List of tools available to the agent. + checkpointer: Memory checkpointer for state persistence. + debug: Whether to enable debug mode. + **kwargs: Additional configuration options. + + Returns: + The configured ReAct agent instance. + """ + ... diff --git a/src/crewai/agents/agent_adapters/langgraph/structured_output_converter.py b/src/crewai/agents/agent_adapters/langgraph/structured_output_converter.py index 79f6dcb15..ce615f4ff 100644 --- a/src/crewai/agents/agent_adapters/langgraph/structured_output_converter.py +++ b/src/crewai/agents/agent_adapters/langgraph/structured_output_converter.py @@ -1,21 +1,45 @@ +"""LangGraph structured output converter for CrewAI task integration. + +This module contains the LangGraphConverterAdapter class that handles structured +output conversion for LangGraph agents, supporting JSON and Pydantic model formats. +""" + import json +import re +from typing import Any, Literal from crewai.agents.agent_adapters.base_converter_adapter import BaseConverterAdapter from crewai.utilities.converter import generate_model_description class LangGraphConverterAdapter(BaseConverterAdapter): - """Adapter for handling structured output conversion in LangGraph agents""" + """Adapter for handling structured output conversion in LangGraph agents. - def __init__(self, agent_adapter): - """Initialize the converter adapter with a reference to the agent adapter""" - self.agent_adapter = agent_adapter - self._output_format = None - self._schema = None - self._system_prompt_appendix = None + Converts task output requirements into system prompt modifications and + post-processing logic to ensure agents return properly structured outputs. + """ - def configure_structured_output(self, task) -> None: - """Configure the structured output for LangGraph.""" + def __init__(self, agent_adapter: Any) -> None: + """Initialize the converter adapter with a reference to the agent adapter. + + Args: + agent_adapter: The LangGraph agent adapter instance. + """ + super().__init__(agent_adapter=agent_adapter) + self.agent_adapter: Any = agent_adapter + self._output_format: Literal["json", "pydantic"] | None = None + self._schema: str | None = None + self._system_prompt_appendix: str | None = None + + def configure_structured_output(self, task: Any) -> None: + """Configure the structured output for LangGraph. + + Analyzes the task's output requirements and sets up the necessary + formatting and validation logic. + + Args: + task: The task object containing output format specifications. + """ if not (task.output_json or task.output_pydantic): self._output_format = None self._schema = None @@ -32,7 +56,14 @@ class LangGraphConverterAdapter(BaseConverterAdapter): self._system_prompt_appendix = self._generate_system_prompt_appendix() def _generate_system_prompt_appendix(self) -> str: - """Generate an appendix for the system prompt to enforce structured output""" + """Generate an appendix for the system prompt to enforce structured output. + + Creates instructions that are appended to the system prompt to guide + the agent in producing properly formatted output. + + Returns: + System prompt appendix string, or empty string if no structured output. + """ if not self._output_format or not self._schema: return "" @@ -41,19 +72,36 @@ Important: Your final answer MUST be provided in the following structured format {self._schema} -DO NOT include any markdown code blocks, backticks, or other formatting around your response. +DO NOT include any markdown code blocks, backticks, or other formatting around your response. The output should be raw JSON that exactly matches the specified schema. """ def enhance_system_prompt(self, original_prompt: str) -> str: - """Add structured output instructions to the system prompt if needed""" + """Add structured output instructions to the system prompt if needed. + + Args: + original_prompt: The base system prompt. + + Returns: + Enhanced system prompt with structured output instructions. + """ if not self._system_prompt_appendix: return original_prompt return f"{original_prompt}\n{self._system_prompt_appendix}" def post_process_result(self, result: str) -> str: - """Post-process the result to ensure it matches the expected format""" + """Post-process the result to ensure it matches the expected format. + + Attempts to extract and validate JSON content from agent responses, + handling cases where JSON may be wrapped in markdown or other formatting. + + Args: + result: The raw result string from the agent. + + Returns: + Processed result string, ideally in valid JSON format. + """ if not self._output_format: return result @@ -65,16 +113,16 @@ The output should be raw JSON that exactly matches the specified schema. return result except json.JSONDecodeError: # Try to extract JSON from the text - import re - - json_match = re.search(r"(\{.*\})", result, re.DOTALL) + json_match: re.Match[str] | None = re.search( + r"(\{.*})", result, re.DOTALL + ) if json_match: try: - extracted = json_match.group(1) + extracted: str = json_match.group(1) # Validate it's proper JSON json.loads(extracted) return extracted - except: + except json.JSONDecodeError: pass return result