This commit is contained in:
Brandon Hancock
2025-02-12 11:10:00 -05:00
parent 796e50aba8
commit 7910dc9337
3 changed files with 158 additions and 62 deletions

View File

@@ -340,7 +340,6 @@ class BaseAgent(ABC, BaseModel):
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
print(f"Setting cache handler for agent: {self.id}")
# Only create the executor if it hasn't been created yet.
if self.agent_executor is None:
self.create_agent_executor()
@@ -348,7 +347,6 @@ class BaseAgent(ABC, BaseModel):
print(
"Agent executor already exists, skipping creation in set_cache_handler."
)
print(f"Cache handler set for agent: {self.id}")
def increment_formatting_errors(self) -> None:
self.formatting_errors += 1
@@ -361,7 +359,6 @@ class BaseAgent(ABC, BaseModel):
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
print(f"Setting RPM controller for agent: {self.id}")
# Only create the executor if it hasn't been created yet.
if self.agent_executor is None:
self.create_agent_executor()
@@ -369,4 +366,3 @@ class BaseAgent(ABC, BaseModel):
print(
"Agent executor already exists, skipping creation in set_rpm_controller."
)
print(f"RPM controller set for agent: {self.id}")

View File

@@ -1,6 +1,13 @@
from typing import Any, List, Optional, Type, cast
from typing import Any, List, Optional, Type, Union, cast
from pydantic import Field
from crewai.tools.base_tool import Tool
try:
from langchain_core.tools import Tool as LangChainTool # type: ignore
except ImportError:
LangChainTool = None
from pydantic import Field, field_validator
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
@@ -24,6 +31,10 @@ class LangChainAgentAdapter(BaseAgent):
...,
description="The wrapped LangChain runnable agent instance. It is expected to have an 'invoke' method.",
)
tools: Optional[List[Union[BaseTool, Any]]] = Field(
default_factory=list,
description="Tools at the agent's disposal. Accepts both CrewAI BaseTool instances and other tools.",
)
function_calling_llm: Optional[Any] = Field(
default=None, description="Optional function calling LLM."
)
@@ -40,11 +51,24 @@ class LangChainAgentAdapter(BaseAgent):
i18n: Any = None
crew: Any = None
knowledge: Any = None
tools: Optional[List[BaseTool]] = None
class Config:
arbitrary_types_allowed = True
@field_validator("tools", mode="before")
def convert_tools(cls, value):
"""Ensure tools are valid CrewAI BaseTool instances."""
if not value:
return value
new_tools = []
for tool in value:
# If tool is already a CrewAI BaseTool instance, keep it as is.
if isinstance(tool, BaseTool):
new_tools.append(tool)
else:
new_tools.append(Tool.from_langchain(tool))
return new_tools
def execute_task(
self,
task: Task,
@@ -115,6 +139,8 @@ class LangChainAgentAdapter(BaseAgent):
tools = tools or self.tools or []
self.create_agent_executor(tools=tools)
self._show_start_logs(task)
if self.crew and getattr(self.crew, "_train", False):
task_prompt = self._training_handler(task_prompt=task_prompt)
else:
@@ -223,20 +249,43 @@ class LangChainAgentAdapter(BaseAgent):
"""
Creates an agent executor using LangChain's AgentExecutor.
"""
from importlib import import_module
try:
from langchain.agents import AgentExecutor
except ImportError as e:
raise ImportError(
"LangChain library not found. Please run `uv add langchain` to add LangChain support."
) from e
langchain_agents = import_module("langchain.agents")
AgentExecutor = getattr(langchain_agents, "AgentExecutor")
used_tools = tools or self.tools or []
# Use the following fallback strategy:
# 1. If tools were passed in, use them.
# 2. Otherwise, if self.tools exists, use them.
# 3. Otherwise, try to extract the tools set in the underlying langchain agent.
raw_tools = tools or self.tools
if not raw_tools:
# Try getting the tools from the agent's inner 'agent' attribute.
if hasattr(self.langchain_agent, "agent") and hasattr(
self.langchain_agent.agent, "tools"
):
raw_tools = self.langchain_agent.agent.tools
else:
raw_tools = getattr(self.langchain_agent, "tools", [])
# Convert each CrewAI tool to a native LangChain tool if possible.
used_tools = []
for tool in raw_tools:
if hasattr(tool, "to_langchain"):
used_tools.append(tool.to_langchain())
else:
used_tools.append(tool)
print("Raw tools:", raw_tools)
print("Used tools:", used_tools)
print(f"Creating agent executor for langchain agent: {self.langchain_agent}")
print("Passing tools: ", used_tools)
self.agent_executor = AgentExecutor.from_agent_and_tools(
agent=self.langchain_agent,
tools=used_tools,
verbose=getattr(self, "verbose", True),
)
print("Created agent executor for langchain agent")
def _parse_tools(self, tools: List[BaseTool]) -> List[BaseTool]:
return tools
@@ -252,3 +301,21 @@ class LangChainAgentAdapter(BaseAgent):
instructions: str = "",
) -> Converter:
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def _show_start_logs(self, task: Task) -> None:
if self.langchain_agent is None:
raise ValueError("Agent cannot be None")
# Check if the adapter or its crew is in verbose mode.
verbose = self.verbose or (self.crew and getattr(self.crew, "verbose", False))
if verbose:
from crewai.utilities import Printer
printer = Printer()
# Use the adapter's role (inherited from BaseAgent) for logging.
printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{self.role}\033[00m"
)
description = getattr(task, "description", "Not Found")
printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m"
)

View File

@@ -1,7 +1,7 @@
import warnings
from abc import ABC, abstractmethod
from inspect import signature
from typing import Any, Callable, Type, get_args, get_origin
from typing import Any, Callable, Optional, Type, get_args, get_origin
from pydantic import (
BaseModel,
@@ -23,7 +23,10 @@ class BaseTool(BaseModel, ABC):
class _ArgsSchemaPlaceholder(PydanticBaseModel):
pass
model_config = ConfigDict()
model_config = ConfigDict(
arbitrary_types_allowed=True,
from_attributes=True, # Allow conversion from ORM objects
)
name: str
"""The unique name of the tool that clearly communicates its purpose."""
@@ -89,6 +92,7 @@ class BaseTool(BaseModel, ABC):
@classmethod
def from_langchain(cls, tool: Any) -> "BaseTool":
print("CREATING TOOL FROM LANGCHAIN")
"""Create a Tool instance from a CrewStructuredTool.
This method takes a CrewStructuredTool object and converts it into a
@@ -177,67 +181,96 @@ class BaseTool(BaseModel, ABC):
return origin.__name__
def to_langchain(self) -> Any:
"""
Convert this CrewAI Tool instance into a LangChain-compatible tool.
Returns a concrete subclass of LangChain's BaseTool.
"""
try:
from langchain_core.tools import Tool as LC_Tool
except ImportError as e:
raise ImportError(
"LangChain library not found. Please run `uv add langchain` to add LangChain support."
) from e
# Capture the function in a local variable to avoid referencing None.
tool_func = self.func
class ConcreteLangChainTool(LC_Tool):
def _run(self, *args, **kwargs):
return tool_func(*args, **kwargs)
# Do not pass callback_manager; let LC_Tool use its default.
print("Creating concrete langchain tool")
return ConcreteLangChainTool(
name=self.name,
description=self.description,
func=tool_func,
args_schema=self.args_schema,
)
@property
def get(self) -> Callable[[str, Any], Any]:
# Returns a callable that looks up attributes on the instance.
return lambda key, default=None: getattr(self, key, default)
class Tool(BaseTool):
"""The function that will be executed when the tool is called."""
"""Tool implementation that requires a function."""
func: Callable
model_config = ConfigDict(
arbitrary_types_allowed=True,
from_attributes=True,
)
def _run(self, *args: Any, **kwargs: Any) -> Any:
return self.func(*args, **kwargs)
@classmethod
def from_langchain(cls, tool: Any) -> "Tool":
"""Create a Tool instance from a CrewStructuredTool.
This method takes a CrewStructuredTool object and converts it into a
Tool instance. It ensures that the provided tool has a callable 'func'
attribute and infers the argument schema if not explicitly provided.
Args:
tool (Any): The CrewStructuredTool object to be converted.
Returns:
Tool: A new Tool instance created from the provided CrewStructuredTool.
Raises:
ValueError: If the provided tool does not have a callable 'func' attribute.
"""
if not hasattr(tool, "func") or not callable(tool.func):
raise ValueError("The provided tool must have a callable 'func' attribute.")
"""Convert a LangChain tool to a CrewAI Tool."""
# Handle missing args_schema
args_schema = getattr(tool, "args_schema", None)
if args_schema is None:
# Infer args_schema from the function signature if not provided
func_signature = signature(tool.func)
annotations = func_signature.parameters
args_fields = {}
for name, param in annotations.items():
if name != "self":
param_annotation = (
param.annotation if param.annotation != param.empty else Any
)
field_info = Field(
default=...,
description="",
)
args_fields[name] = (param_annotation, field_info)
if args_fields:
args_schema = create_model(f"{tool.name}Input", **args_fields)
else:
# Create a default schema with no fields if no parameters are found
args_schema = create_model(
f"{tool.name}Input", __base__=PydanticBaseModel
)
# Create default args schema
args_schema = create_model(f"{tool.name}Input", __base__=PydanticBaseModel)
tool_dict = {
"name": tool.name,
"description": tool.description,
"func": tool._run, # LangChain tools use _run
"args_schema": args_schema,
}
# Create and validate a new instance directly from the dictionary
return cls.model_validate(tool_dict)
return cls(
name=getattr(tool, "name", "Unnamed Tool"),
description=getattr(tool, "description", ""),
func=tool.func,
args_schema=args_schema,
def to_langchain(self) -> Any:
"""Convert to LangChain tool with proper get method."""
try:
from langchain_core.tools import Tool as LC_Tool
except ImportError:
raise ImportError("langchain_core is not installed")
LC_Tool(
name=self.name,
description=self.description,
func=self.func,
args_schema=self.args_schema,
)
# # Create subclass with get method
# class PatchedTool(LC_Tool):
# def get(self, key: str, default: Any = None) -> Any:
# return getattr(self, key, default)
# return PatchedTool(
# name=self.name,
# description=self.description,
# func=self.func,
# args_schema=self.args_schema,
# callback_manager=None,
# )
def to_langchain(
tools: list[BaseTool | CrewStructuredTool],