mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-06 01:32:36 +00:00
feat: add LangGraph agent and tool adapter for CrewAI integration
- Introduced LangGraphAgentAdapter to facilitate interaction with LangGraph agents. - Implemented methods for task execution, context handling, and tool configuration. - Created LangGraphToolAdapter to convert CrewAI tools into LangGraph-compatible format. - Enhanced error handling and logging for task execution and streaming processes.
This commit is contained in:
263
src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py
Normal file
263
src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py
Normal file
@@ -0,0 +1,263 @@
|
||||
from typing import Any, AsyncIterable, Dict, List, Optional
|
||||
|
||||
from langchain_core.messages import ToolMessage
|
||||
from langgraph.checkpoint.memory import MemorySaver
|
||||
from langgraph.prebuilt import create_react_agent
|
||||
from pydantic import Field, PrivateAttr
|
||||
|
||||
from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter
|
||||
from crewai.agents.agent_adapters.langgraph.langgraph_tool_adapter import (
|
||||
LangGraphToolAdapter,
|
||||
)
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.tools.agent_tools.agent_tools import AgentTools
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities import Logger
|
||||
from crewai.utilities.converter import Converter
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.utilities.events.agent_events import (
|
||||
AgentExecutionErrorEvent,
|
||||
AgentExecutionStartedEvent,
|
||||
)
|
||||
|
||||
|
||||
class LangGraphAgentAdapter(BaseAgentAdapter):
|
||||
"""Adapter for LangGraph agents to work with CrewAI."""
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
_logger: Logger = PrivateAttr(default_factory=lambda: Logger())
|
||||
_tool_adapter: LangGraphToolAdapter = PrivateAttr()
|
||||
_graph: Any = PrivateAttr(default=None)
|
||||
_memory: Any = PrivateAttr(default=None)
|
||||
_max_iterations: int = PrivateAttr(default=10)
|
||||
function_calling_llm: Any = Field(default=None)
|
||||
step_callback: Any = Field(default=None)
|
||||
|
||||
# Config parameters for LangGraph
|
||||
model: str = Field(default="gpt-4o")
|
||||
verbose: bool = Field(default=False)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
role: str,
|
||||
goal: str,
|
||||
backstory: str,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
llm: Any = None,
|
||||
max_iterations: int = 10,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize the LangGraph agent adapter."""
|
||||
super().__init__(
|
||||
role=role,
|
||||
goal=goal,
|
||||
backstory=backstory,
|
||||
tools=tools,
|
||||
llm=llm or self.model,
|
||||
**kwargs,
|
||||
)
|
||||
self._tool_adapter = LangGraphToolAdapter(tools=tools)
|
||||
self._max_iterations = max_iterations
|
||||
self._setup_graph()
|
||||
|
||||
def _setup_graph(self) -> None:
|
||||
"""Set up the LangGraph workflow graph."""
|
||||
try:
|
||||
# Initialize memory for the agent
|
||||
self._memory = MemorySaver()
|
||||
|
||||
# Convert CrewAI tools to LangGraph/LangChain compatible tools
|
||||
converted_tools = self._tool_adapter.converted_tools
|
||||
|
||||
# Create the agent graph with ReAct pattern
|
||||
self._graph = create_react_agent(
|
||||
model=self.llm, # Pass as model parameter
|
||||
tools=converted_tools,
|
||||
checkpointer=self._memory,
|
||||
)
|
||||
|
||||
except ImportError as e:
|
||||
self._logger.log(
|
||||
"error", f"Failed to import LangGraph dependencies: {str(e)}"
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
self._logger.log("error", f"Error setting up LangGraph agent: {str(e)}")
|
||||
raise
|
||||
|
||||
def _build_system_prompt(self) -> str:
|
||||
"""Build a system prompt for the LangGraph agent."""
|
||||
return f"""You are {self.role}.
|
||||
|
||||
Your goal is: {self.goal}
|
||||
|
||||
Your backstory: {self.backstory}
|
||||
|
||||
When working on tasks, think step-by-step and use the available tools when necessary.
|
||||
"""
|
||||
|
||||
def execute_task(
|
||||
self,
|
||||
task: Any,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
) -> str:
|
||||
"""Execute a task using the LangGraph workflow."""
|
||||
self.create_agent_executor(tools)
|
||||
|
||||
try:
|
||||
task_prompt = task.prompt() if hasattr(task, "prompt") else str(task)
|
||||
|
||||
if context:
|
||||
task_prompt = self.i18n.slice("task_with_context").format(
|
||||
task=task_prompt, context=context
|
||||
)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=AgentExecutionStartedEvent(
|
||||
agent=self,
|
||||
tools=self.tools,
|
||||
task_prompt=task_prompt,
|
||||
task=task,
|
||||
),
|
||||
)
|
||||
|
||||
# Set up a session ID for this task
|
||||
session_id = f"task_{id(task)}"
|
||||
|
||||
# Configure the invocation
|
||||
config = {"configurable": {"thread_id": session_id}}
|
||||
|
||||
# Invoke the agent graph with the task prompt
|
||||
result = self._graph.invoke({"messages": [("user", task_prompt)]}, config)
|
||||
print("result", result)
|
||||
|
||||
# Get the final response
|
||||
messages = result.get("messages", [])
|
||||
last_message = messages[-1] if messages else None
|
||||
|
||||
final_answer = ""
|
||||
print("final_answer", final_answer)
|
||||
if isinstance(last_message, dict):
|
||||
final_answer = last_message.get("content", "")
|
||||
elif hasattr(last_message, "content"):
|
||||
final_answer = getattr(last_message, "content", "")
|
||||
|
||||
return (
|
||||
final_answer
|
||||
or "Task execution completed but no clear answer was provided."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self._logger.log("error", f"Error executing LangGraph task: {str(e)}")
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=AgentExecutionErrorEvent(
|
||||
agent=self,
|
||||
task=task,
|
||||
error=str(e),
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
async def stream_task(
|
||||
self,
|
||||
task: Any,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
) -> AsyncIterable[Dict[str, Any]]:
|
||||
"""Stream the execution of a task."""
|
||||
self.create_agent_executor(tools)
|
||||
|
||||
try:
|
||||
task_prompt = task.prompt() if hasattr(task, "prompt") else str(task)
|
||||
|
||||
if context:
|
||||
task_prompt = self.i18n.slice("task_with_context").format(
|
||||
task=task_prompt, context=context
|
||||
)
|
||||
|
||||
# Set up a session ID for this task
|
||||
session_id = f"task_{id(task)}"
|
||||
|
||||
# Configure the invocation
|
||||
config = {"configurable": {"thread_id": session_id}}
|
||||
|
||||
# Stream the execution
|
||||
inputs = {"messages": [("user", task_prompt)]}
|
||||
|
||||
for item in self._graph.stream(inputs, config, stream_mode="values"):
|
||||
message = item.get("messages", [])[-1] if "messages" in item else None
|
||||
|
||||
if (
|
||||
message is not None
|
||||
and hasattr(message, "tool_calls")
|
||||
and getattr(message, "tool_calls", None)
|
||||
):
|
||||
tool_calls = getattr(message, "tool_calls", [])
|
||||
if tool_calls and len(tool_calls) > 0:
|
||||
yield {
|
||||
"is_task_complete": False,
|
||||
"require_user_input": False,
|
||||
"content": f"Using tool: {tool_calls[0].name}",
|
||||
}
|
||||
elif isinstance(message, ToolMessage):
|
||||
content = getattr(message, "content", "Tool execution complete")
|
||||
yield {
|
||||
"is_task_complete": False,
|
||||
"require_user_input": False,
|
||||
"content": f"Tool result: {content[:50]}...",
|
||||
}
|
||||
elif message is not None:
|
||||
# Final response or intermediary thinking
|
||||
content = getattr(message, "content", str(message))
|
||||
yield {
|
||||
"is_task_complete": True,
|
||||
"require_user_input": False,
|
||||
"content": content,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
self._logger.log("error", f"Error streaming LangGraph task: {str(e)}")
|
||||
yield {
|
||||
"is_task_complete": True,
|
||||
"require_user_input": False,
|
||||
"content": f"Error: {str(e)}",
|
||||
}
|
||||
|
||||
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
|
||||
"""Configure the LangGraph agent for execution."""
|
||||
if tools:
|
||||
self.configure_tools(tools)
|
||||
|
||||
# No need for a separate executor in LangGraph
|
||||
|
||||
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
|
||||
"""Configure tools for the LangGraph agent."""
|
||||
if tools:
|
||||
all_tools = list(self.tools or []) + list(tools or [])
|
||||
self._tool_adapter.configure_tools(all_tools)
|
||||
# We need to recreate the graph with the new tools
|
||||
self._setup_graph()
|
||||
|
||||
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
|
||||
"""Implement delegation tools support for LangGraph."""
|
||||
agent_tools = AgentTools(agents=agents)
|
||||
return agent_tools.tools()
|
||||
|
||||
def get_output_converter(
|
||||
self, llm: Any, text: str, model: Any, instructions: str
|
||||
) -> Any:
|
||||
"""Convert output format if needed."""
|
||||
return Converter(llm=llm, text=text, model=model, instructions=instructions)
|
||||
|
||||
def _parse_tools(self, tools: List[BaseTool]) -> List[BaseTool]:
|
||||
"""Parse and validate tools."""
|
||||
return tools
|
||||
|
||||
def configure_structured_output(self, task) -> None:
|
||||
"""Configure the structured output for LangGraph."""
|
||||
# This will be implemented in a separate improvement
|
||||
pass
|
||||
@@ -0,0 +1,40 @@
|
||||
class LangGraphToolAdapter:
|
||||
"""Adapts CrewAI tools to LangGraph-compatible format"""
|
||||
|
||||
def __init__(self, tools: Optional[List[BaseTool]] = None):
|
||||
self.tools = tools or []
|
||||
self.converted_tools = []
|
||||
|
||||
def configure_tools(self, tools: List[BaseTool]) -> None:
|
||||
"""Convert CrewAI tools to LangGraph tools"""
|
||||
self.tools = tools
|
||||
self.converted_tools = self._convert_tools(tools)
|
||||
|
||||
def _convert_tools(self, tools: List[BaseTool]) -> List[Any]:
|
||||
"""
|
||||
Convert CrewAI tools to LangGraph-compatible tools
|
||||
LangGraph expects tools in langchain_core.tools format
|
||||
"""
|
||||
from langchain_core.tools import Tool
|
||||
|
||||
converted_tools = []
|
||||
|
||||
for tool in tools:
|
||||
# Create a wrapper function that matches LangGraph's expected format
|
||||
def tool_wrapper(*args, tool=tool, **kwargs):
|
||||
# Extract inputs based on the tool's schema
|
||||
if len(args) > 0 and isinstance(args[0], str):
|
||||
return tool.run(args[0])
|
||||
elif "input" in kwargs:
|
||||
return tool.run(kwargs["input"])
|
||||
else:
|
||||
return tool.run(**kwargs)
|
||||
|
||||
# Create a LangChain Tool
|
||||
converted_tool = Tool(
|
||||
name=tool.name, description=tool.description, func=tool_wrapper
|
||||
)
|
||||
|
||||
converted_tools.append(converted_tool)
|
||||
|
||||
return converted_tools
|
||||
Reference in New Issue
Block a user