Feat/byoa (#2523)
Some checks are pending
Notify Downstream / notify-downstream (push) Waiting to run

* feat: add OpenAI agent adapter implementation

- Introduced OpenAIAgentAdapter class to facilitate interaction with OpenAI Assistants.
- Implemented methods for task execution, tool configuration, and response processing.
- Added support for converting CrewAI tools to OpenAI format and handling delegation tools.

* created an adapter for the delegate and ask_question tools

* delegate and ask_questions work and it delegates to crewai agents*

* refactor: introduce OpenAIAgentToolAdapter for tool management

- Created OpenAIAgentToolAdapter class to encapsulate tool configuration and conversion for OpenAI Assistant.
- Removed tool configuration logic from OpenAIAgentAdapter and integrated it into the new adapter.
- Enhanced the tool conversion process to ensure compatibility with OpenAI's requirements.

* feat: implement BaseAgentAdapter for agent integration

- Introduced BaseAgentAdapter as an abstract base class for agent adapters in CrewAI.
- Defined common interface and methods for configuring tools and structured output.
- Updated OpenAIAgentAdapter to inherit from BaseAgentAdapter, enhancing its structure and functionality.

* feat: add LangGraph agent and tool adapter for CrewAI integration

- Introduced LangGraphAgentAdapter to facilitate interaction with LangGraph agents.
- Implemented methods for task execution, context handling, and tool configuration.
- Created LangGraphToolAdapter to convert CrewAI tools into LangGraph-compatible format.
- Enhanced error handling and logging for task execution and streaming processes.

* feat: enhance LangGraphToolAdapter and improve conversion instructions

- Added type hints for better clarity and type checking in LangGraphToolAdapter.
- Updated conversion instructions to ensure compatibility with optional LLM checks.

* feat: integrate structured output handling in LangGraph and OpenAI agents

- Added LangGraphConverterAdapter for managing structured output in LangGraph agents.
- Enhanced LangGraphAgentAdapter to utilize the new converter for system prompt and task execution.
- Updated LangGraphToolAdapter to use StructuredTool for better compatibility.
- Introduced OpenAIConverterAdapter for structured output management in OpenAI agents.
- Improved task execution flow in OpenAIAgentAdapter to incorporate structured output configuration and post-processing.

* feat: implement BaseToolAdapter for tool integration

- Introduced BaseToolAdapter as an abstract base class for tool adapters in CrewAI.
- Updated LangGraphToolAdapter and OpenAIAgentToolAdapter to inherit from BaseToolAdapter, enhancing their structure and functionality.
- Improved tool configuration methods to support better integration with various frameworks.
- Added type hints and documentation for clarity and maintainability.

* feat: enhance OpenAIAgentAdapter with configurable agent properties

- Refactored OpenAIAgentAdapter to accept agent configuration as an argument.
- Introduced a method to build a system prompt for the OpenAI agent, improving task execution context.
- Updated initialization to utilize role, goal, and backstory from kwargs, enhancing flexibility in agent setup.
- Improved tool handling and integration within the adapter.

* feat: enhance agent adapters with structured output support

- Introduced BaseConverterAdapter as an abstract class for structured output handling.
- Implemented LangGraphConverterAdapter and OpenAIConverterAdapter to manage structured output in their respective agents.
- Updated BaseAgentAdapter to accept an agent configuration dictionary during initialization.
- Enhanced LangGraphAgentAdapter to utilize the new converter and improved tool handling.
- Added methods for configuring structured output and enhancing system prompts in converter adapters.

* refactor: remove _parse_tools method from OpenAIAgentAdapter and BaseAgent

- Eliminated the _parse_tools method from OpenAIAgentAdapter and its abstract declaration in BaseAgent.
- Cleaned up related test code in MockAgent to reflect the removal of the method.

* also removed _parse_tools here as not used

* feat: add dynamic import handling for LangGraph dependencies

- Implemented conditional imports for LangGraph components to handle ImportError gracefully.
- Updated LangGraphAgentAdapter initialization to check for LangGraph availability and raise an informative error if dependencies are missing.
- Enhanced the agent adapter's robustness by ensuring it only initializes components when the required libraries are present.

* fix: improve error handling for agent adapters

- Updated LangGraphAgentAdapter to raise an ImportError with a clear message if LangGraph dependencies are not installed.
- Refactored OpenAIAgentAdapter to include a similar check for OpenAI dependencies, ensuring robust initialization and user guidance for missing libraries.
- Enhanced overall error handling in agent adapters to prevent runtime issues when dependencies are unavailable.

* refactor: enhance tool handling in agent adapters

- Updated BaseToolAdapter to initialize original and converted tools in the constructor.
- Renamed method `all_tools` to `tools` for clarity in BaseToolAdapter.
- Added `sanitize_tool_name` method to ensure tool names are API compatible.
- Modified LangGraphAgentAdapter to utilize the updated tool handling and ensure proper tool configuration.
- Refactored LangGraphToolAdapter to streamline tool conversion and ensure consistent naming conventions.

* feat: emit AgentExecutionCompletedEvent in agent adapters

- Added emission of AgentExecutionCompletedEvent in both LangGraphAgentAdapter and OpenAIAgentAdapter to signal task completion.
- Enhanced event handling to include agent, task, and output details for better tracking of execution results.

* docs: Enhance BaseConverterAdapter documentation

- Added a detailed docstring to the BaseConverterAdapter class, outlining its purpose and the expected functionality for all converter adapters.
- Updated the post_process_result method's docstring to specify the expected format of the result as a string.

* docs: Add comprehensive guide for bringing custom agents into CrewAI

- Introduced a new documentation file detailing the process of integrating custom agents using the BaseAgentAdapter, BaseToolAdapter, and BaseConverter.
- Included step-by-step instructions for creating custom adapters, configuring tools, and handling structured output.
- Provided examples for implementing adapters for various frameworks, enhancing the usability of CrewAI for developers.

* feat: Introduce adapted_agent flag in BaseAgent and update BaseAgentAdapter initialization

- Added an `adapted_agent` boolean field to the BaseAgent class to indicate if the agent is adapted.
- Updated the BaseAgentAdapter's constructor to pass `adapted_agent=True` to the superclass, ensuring proper initialization of the new field.

* feat: Enhance LangGraphAgentAdapter to support optional agent configuration

- Updated LangGraphAgentAdapter to conditionally apply agent configuration when creating the agent graph, allowing for more flexible initialization.
- Modified LangGraphToolAdapter to ensure only instances of BaseTool are converted, improving tool compatibility and handling.

* feat: Introduce OpenAIConverterAdapter for structured output handling

- Added OpenAIConverterAdapter to manage structured output conversion for OpenAI agents, enhancing their ability to process and format results.
- Updated OpenAIAgentAdapter to utilize the new converter for configuring structured output and post-processing results.
- Removed the deprecated get_output_converter method from OpenAIAgentAdapter.
- Added unit tests for BaseAgentAdapter and BaseToolAdapter to ensure proper functionality and integration of new features.

* feat: Enhance tool adapters to support asynchronous execution

- Updated LangGraphToolAdapter and OpenAIAgentToolAdapter to handle asynchronous tool execution by checking if the output is awaitable.
- Introduced `inspect` import to facilitate the awaitability check.
- Refactored tool wrapper functions to ensure proper handling of both synchronous and asynchronous tool results.

* fix: Correct method definition syntax and enhance tool adapter implementation

- Updated the method definition for `configure_structured_output` to include the `def` keyword for clarity.
- Added an asynchronous tool wrapper to ensure tools can operate in both synchronous and asynchronous contexts.
- Modified the constructor of the custom converter adapter to directly assign the agent adapter, improving clarity and functionality.

* linted

* refactor: Improve tool processing logic in BaseAgent

- Added a check to return an empty list if no tools are provided.
- Simplified the tool attribute validation by using a list of required attributes.
- Removed commented-out abstract method definition for clarity.

* refactor: Simplify tool handling in agent adapters

- Changed default value of `tools` parameter in LangGraphAgentAdapter to None for better handling of empty tool lists.
- Updated tool initialization in both LangGraphAgentAdapter and OpenAIAgentAdapter to directly pass the `tools` parameter, removing unnecessary list handling.
- Cleaned up commented-out code in OpenAIConverterAdapter to improve readability.

* refactor: Remove unused stream_task method from LangGraphAgentAdapter

- Deleted the `stream_task` method from LangGraphAgentAdapter to streamline the code and eliminate unnecessary complexity.
- This change enhances maintainability by focusing on essential functionalities within the agent adapter.
This commit is contained in:
Lorenze Jay
2025-04-17 09:22:48 -07:00
committed by GitHub
parent ced3c8f0e0
commit 870dffbb89
16 changed files with 1525 additions and 18 deletions

View File

@@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from pydantic import PrivateAttr
from crewai.agent import BaseAgent
from crewai.tools import BaseTool
class BaseAgentAdapter(BaseAgent, ABC):
"""Base class for all agent adapters in CrewAI.
This abstract class defines the common interface and functionality that all
agent adapters must implement. It extends BaseAgent to maintain compatibility
with the CrewAI framework while adding adapter-specific requirements.
"""
adapted_structured_output: bool = False
_agent_config: Optional[Dict[str, Any]] = PrivateAttr(default=None)
model_config = {"arbitrary_types_allowed": True}
def __init__(self, agent_config: Optional[Dict[str, Any]] = None, **kwargs: Any):
super().__init__(adapted_agent=True, **kwargs)
self._agent_config = agent_config
@abstractmethod
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure and adapt tools for the specific agent implementation.
Args:
tools: Optional list of BaseTool instances to be configured
"""
pass
def configure_structured_output(self, structured_output: Any) -> None:
"""Configure the structured output for the specific agent implementation.
Args:
structured_output: The structured output to be configured
"""
pass

View File

@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
class BaseConverterAdapter(ABC):
"""Base class for all converter adapters in CrewAI.
This abstract class defines the common interface and functionality that all
converter adapters must implement for converting structured output.
"""
def __init__(self, agent_adapter):
self.agent_adapter = agent_adapter
@abstractmethod
def configure_structured_output(self, task) -> None:
"""Configure agents to return structured output.
Must support json and pydantic output.
"""
pass
@abstractmethod
def enhance_system_prompt(self, base_prompt: str) -> str:
"""Enhance the system prompt with structured output instructions."""
pass
@abstractmethod
def post_process_result(self, result: str) -> str:
"""Post-process the result to ensure it matches the expected format: string."""
pass

View File

@@ -0,0 +1,37 @@
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from crewai.tools.base_tool import BaseTool
class BaseToolAdapter(ABC):
"""Base class for all tool adapters in CrewAI.
This abstract class defines the common interface that all tool adapters
must implement. It provides the structure for adapting CrewAI tools to
different frameworks and platforms.
"""
original_tools: List[BaseTool]
converted_tools: List[Any]
def __init__(self, tools: Optional[List[BaseTool]] = None):
self.original_tools = tools or []
self.converted_tools = []
@abstractmethod
def configure_tools(self, tools: List[BaseTool]) -> None:
"""Configure and convert tools for the specific implementation.
Args:
tools: List of BaseTool instances to be configured and converted
"""
pass
def tools(self) -> List[Any]:
"""Return all converted tools."""
return self.converted_tools
def sanitize_tool_name(self, tool_name: str) -> str:
"""Sanitize tool name for API compatibility."""
return tool_name.replace(" ", "_")

View File

@@ -0,0 +1,226 @@
from typing import Any, AsyncIterable, Dict, List, Optional
from pydantic import Field, PrivateAttr
from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter
from crewai.agents.agent_adapters.langgraph.langgraph_tool_adapter import (
LangGraphToolAdapter,
)
from crewai.agents.agent_adapters.langgraph.structured_output_converter import (
LangGraphConverterAdapter,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.utilities import Logger
from crewai.utilities.converter import Converter
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
try:
from langchain_core.messages import ToolMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
LANGGRAPH_AVAILABLE = True
except ImportError:
LANGGRAPH_AVAILABLE = False
class LangGraphAgentAdapter(BaseAgentAdapter):
"""Adapter for LangGraph agents to work with CrewAI."""
model_config = {"arbitrary_types_allowed": True}
_logger: Logger = PrivateAttr(default_factory=lambda: Logger())
_tool_adapter: LangGraphToolAdapter = PrivateAttr()
_graph: Any = PrivateAttr(default=None)
_memory: Any = PrivateAttr(default=None)
_max_iterations: int = PrivateAttr(default=10)
function_calling_llm: Any = Field(default=None)
step_callback: Any = Field(default=None)
model: str = Field(default="gpt-4o")
verbose: bool = Field(default=False)
def __init__(
self,
role: str,
goal: str,
backstory: str,
tools: Optional[List[BaseTool]] = None,
llm: Any = None,
max_iterations: int = 10,
agent_config: Optional[Dict[str, Any]] = None,
**kwargs,
):
"""Initialize the LangGraph agent adapter."""
if not LANGGRAPH_AVAILABLE:
raise ImportError(
"LangGraph Agent Dependencies are not installed. Please install it using `uv add langchain-core langgraph`"
)
super().__init__(
role=role,
goal=goal,
backstory=backstory,
tools=tools,
llm=llm or self.model,
agent_config=agent_config,
**kwargs,
)
self._tool_adapter = LangGraphToolAdapter(tools=tools)
self._converter_adapter = LangGraphConverterAdapter(self)
self._max_iterations = max_iterations
self._setup_graph()
def _setup_graph(self) -> None:
"""Set up the LangGraph workflow graph."""
try:
self._memory = MemorySaver()
converted_tools: List[Any] = self._tool_adapter.tools()
if self._agent_config:
self._graph = create_react_agent(
model=self.llm,
tools=converted_tools,
checkpointer=self._memory,
debug=self.verbose,
**self._agent_config,
)
else:
self._graph = create_react_agent(
model=self.llm,
tools=converted_tools or [],
checkpointer=self._memory,
debug=self.verbose,
)
except ImportError as e:
self._logger.log(
"error", f"Failed to import LangGraph dependencies: {str(e)}"
)
raise
except Exception as e:
self._logger.log("error", f"Error setting up LangGraph agent: {str(e)}")
raise
def _build_system_prompt(self) -> str:
"""Build a system prompt for the LangGraph agent."""
base_prompt = f"""
You are {self.role}.
Your goal is: {self.goal}
Your backstory: {self.backstory}
When working on tasks, think step-by-step and use the available tools when necessary.
"""
return self._converter_adapter.enhance_system_prompt(base_prompt)
def execute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task using the LangGraph workflow."""
self.create_agent_executor(tools)
self.configure_structured_output(task)
try:
task_prompt = task.prompt() if hasattr(task, "prompt") else str(task)
if context:
task_prompt = self.i18n.slice("task_with_context").format(
task=task_prompt, context=context
)
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
session_id = f"task_{id(task)}"
config = {"configurable": {"thread_id": session_id}}
result = self._graph.invoke(
{
"messages": [
("system", self._build_system_prompt()),
("user", task_prompt),
]
},
config,
)
messages = result.get("messages", [])
last_message = messages[-1] if messages else None
final_answer = ""
if isinstance(last_message, dict):
final_answer = last_message.get("content", "")
elif hasattr(last_message, "content"):
final_answer = getattr(last_message, "content", "")
final_answer = (
self._converter_adapter.post_process_result(final_answer)
or "Task execution completed but no clear answer was provided."
)
crewai_event_bus.emit(
self,
event=AgentExecutionCompletedEvent(
agent=self, task=task, output=final_answer
),
)
return final_answer
except Exception as e:
self._logger.log("error", f"Error executing LangGraph task: {str(e)}")
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure the LangGraph agent for execution."""
self.configure_tools(tools)
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure tools for the LangGraph agent."""
if tools:
all_tools = list(self.tools or []) + list(tools or [])
self._tool_adapter.configure_tools(all_tools)
available_tools = self._tool_adapter.tools()
self._graph.tools = available_tools
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
"""Implement delegation tools support for LangGraph."""
agent_tools = AgentTools(agents=agents)
return agent_tools.tools()
def get_output_converter(
self, llm: Any, text: str, model: Any, instructions: str
) -> Any:
"""Convert output format if needed."""
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def configure_structured_output(self, task) -> None:
"""Configure the structured output for LangGraph."""
self._converter_adapter.configure_structured_output(task)

View File

@@ -0,0 +1,61 @@
import inspect
from typing import Any, List, Optional
from crewai.agents.agent_adapters.base_tool_adapter import BaseToolAdapter
from crewai.tools.base_tool import BaseTool
class LangGraphToolAdapter(BaseToolAdapter):
"""Adapts CrewAI tools to LangGraph agent tool compatible format"""
def __init__(self, tools: Optional[List[BaseTool]] = None):
self.original_tools = tools or []
self.converted_tools = []
def configure_tools(self, tools: List[BaseTool]) -> None:
"""
Configure and convert CrewAI tools to LangGraph-compatible format.
LangGraph expects tools in langchain_core.tools format.
"""
from langchain_core.tools import BaseTool, StructuredTool
converted_tools = []
if self.original_tools:
all_tools = tools + self.original_tools
else:
all_tools = tools
for tool in all_tools:
if isinstance(tool, BaseTool):
converted_tools.append(tool)
continue
sanitized_name = self.sanitize_tool_name(tool.name)
async def tool_wrapper(*args, tool=tool, **kwargs):
output = None
if len(args) > 0 and isinstance(args[0], str):
output = tool.run(args[0])
elif "input" in kwargs:
output = tool.run(kwargs["input"])
else:
output = tool.run(**kwargs)
if inspect.isawaitable(output):
result = await output
else:
result = output
return result
converted_tool = StructuredTool(
name=sanitized_name,
description=tool.description,
func=tool_wrapper,
args_schema=tool.args_schema,
)
converted_tools.append(converted_tool)
self.converted_tools = converted_tools
def tools(self) -> List[Any]:
return self.converted_tools or []

View File

@@ -0,0 +1,80 @@
import json
from crewai.agents.agent_adapters.base_converter_adapter import BaseConverterAdapter
from crewai.utilities.converter import generate_model_description
class LangGraphConverterAdapter(BaseConverterAdapter):
"""Adapter for handling structured output conversion in LangGraph agents"""
def __init__(self, agent_adapter):
"""Initialize the converter adapter with a reference to the agent adapter"""
self.agent_adapter = agent_adapter
self._output_format = None
self._schema = None
self._system_prompt_appendix = None
def configure_structured_output(self, task) -> None:
"""Configure the structured output for LangGraph."""
if not (task.output_json or task.output_pydantic):
self._output_format = None
self._schema = None
self._system_prompt_appendix = None
return
if task.output_json:
self._output_format = "json"
self._schema = generate_model_description(task.output_json)
elif task.output_pydantic:
self._output_format = "pydantic"
self._schema = generate_model_description(task.output_pydantic)
self._system_prompt_appendix = self._generate_system_prompt_appendix()
def _generate_system_prompt_appendix(self) -> str:
"""Generate an appendix for the system prompt to enforce structured output"""
if not self._output_format or not self._schema:
return ""
return f"""
Important: Your final answer MUST be provided in the following structured format:
{self._schema}
DO NOT include any markdown code blocks, backticks, or other formatting around your response.
The output should be raw JSON that exactly matches the specified schema.
"""
def enhance_system_prompt(self, original_prompt: str) -> str:
"""Add structured output instructions to the system prompt if needed"""
if not self._system_prompt_appendix:
return original_prompt
return f"{original_prompt}\n{self._system_prompt_appendix}"
def post_process_result(self, result: str) -> str:
"""Post-process the result to ensure it matches the expected format"""
if not self._output_format:
return result
# Try to extract valid JSON if it's wrapped in code blocks or other text
if self._output_format in ["json", "pydantic"]:
try:
# First, try to parse as is
json.loads(result)
return result
except json.JSONDecodeError:
# Try to extract JSON from the text
import re
json_match = re.search(r"(\{.*\})", result, re.DOTALL)
if json_match:
try:
extracted = json_match.group(1)
# Validate it's proper JSON
json.loads(extracted)
return extracted
except:
pass
return result

View File

@@ -0,0 +1,178 @@
from typing import Any, List, Optional
from pydantic import Field, PrivateAttr
from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter
from crewai.agents.agent_adapters.openai_agents.structured_output_converter import (
OpenAIConverterAdapter,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Logger
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
try:
from agents import Agent as OpenAIAgent # type: ignore
from agents import Runner, enable_verbose_stdout_logging # type: ignore
from .openai_agent_tool_adapter import OpenAIAgentToolAdapter
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
class OpenAIAgentAdapter(BaseAgentAdapter):
"""Adapter for OpenAI Assistants"""
model_config = {"arbitrary_types_allowed": True}
_openai_agent: "OpenAIAgent" = PrivateAttr()
_logger: Logger = PrivateAttr(default_factory=lambda: Logger())
_active_thread: Optional[str] = PrivateAttr(default=None)
function_calling_llm: Any = Field(default=None)
step_callback: Any = Field(default=None)
_tool_adapter: "OpenAIAgentToolAdapter" = PrivateAttr()
_converter_adapter: OpenAIConverterAdapter = PrivateAttr()
def __init__(
self,
model: str = "gpt-4o-mini",
tools: Optional[List[BaseTool]] = None,
agent_config: Optional[dict] = None,
**kwargs,
):
if not OPENAI_AVAILABLE:
raise ImportError(
"OpenAI Agent Dependencies are not installed. Please install it using `uv add openai-agents`"
)
else:
role = kwargs.pop("role", None)
goal = kwargs.pop("goal", None)
backstory = kwargs.pop("backstory", None)
super().__init__(
role=role,
goal=goal,
backstory=backstory,
tools=tools,
agent_config=agent_config,
**kwargs,
)
self._tool_adapter = OpenAIAgentToolAdapter(tools=tools)
self.llm = model
self._converter_adapter = OpenAIConverterAdapter(self)
def _build_system_prompt(self) -> str:
"""Build a system prompt for the OpenAI agent."""
base_prompt = f"""
You are {self.role}.
Your goal is: {self.goal}
Your backstory: {self.backstory}
When working on tasks, think step-by-step and use the available tools when necessary.
"""
return self._converter_adapter.enhance_system_prompt(base_prompt)
def execute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task using the OpenAI Assistant"""
self._converter_adapter.configure_structured_output(task)
self.create_agent_executor(tools)
if self.verbose:
enable_verbose_stdout_logging()
try:
task_prompt = task.prompt()
if context:
task_prompt = self.i18n.slice("task_with_context").format(
task=task_prompt, context=context
)
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
result = self.agent_executor.run_sync(self._openai_agent, task_prompt)
final_answer = self.handle_execution_result(result)
crewai_event_bus.emit(
self,
event=AgentExecutionCompletedEvent(
agent=self, task=task, output=final_answer
),
)
return final_answer
except Exception as e:
self._logger.log("error", f"Error executing OpenAI task: {str(e)}")
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
"""
Configure the OpenAI agent for execution.
While OpenAI handles execution differently through Runner,
we can use this method to set up tools and configurations.
"""
all_tools = list(self.tools or []) + list(tools or [])
instructions = self._build_system_prompt()
self._openai_agent = OpenAIAgent(
name=self.role,
instructions=instructions,
model=self.llm,
**self._agent_config or {},
)
if all_tools:
self.configure_tools(all_tools)
self.agent_executor = Runner
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure tools for the OpenAI Assistant"""
if tools:
self._tool_adapter.configure_tools(tools)
if self._tool_adapter.converted_tools:
self._openai_agent.tools = self._tool_adapter.converted_tools
def handle_execution_result(self, result: Any) -> str:
"""Process OpenAI Assistant execution result converting any structured output to a string"""
return self._converter_adapter.post_process_result(result.final_output)
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
"""Implement delegation tools support"""
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
def configure_structured_output(self, task) -> None:
"""Configure the structured output for the specific agent implementation.
Args:
structured_output: The structured output to be configured
"""
self._converter_adapter.configure_structured_output(task)

View File

@@ -0,0 +1,91 @@
import inspect
from typing import Any, List, Optional
from agents import FunctionTool, Tool
from crewai.agents.agent_adapters.base_tool_adapter import BaseToolAdapter
from crewai.tools import BaseTool
class OpenAIAgentToolAdapter(BaseToolAdapter):
"""Adapter for OpenAI Assistant tools"""
def __init__(self, tools: Optional[List[BaseTool]] = None):
self.original_tools = tools or []
def configure_tools(self, tools: List[BaseTool]) -> None:
"""Configure tools for the OpenAI Assistant"""
if self.original_tools:
all_tools = tools + self.original_tools
else:
all_tools = tools
if all_tools:
self.converted_tools = self._convert_tools_to_openai_format(all_tools)
def _convert_tools_to_openai_format(
self, tools: Optional[List[BaseTool]]
) -> List[Tool]:
"""Convert CrewAI tools to OpenAI Assistant tool format"""
if not tools:
return []
def sanitize_tool_name(name: str) -> str:
"""Convert tool name to match OpenAI's required pattern"""
import re
sanitized = re.sub(r"[^a-zA-Z0-9_-]", "_", name).lower()
return sanitized
def create_tool_wrapper(tool: BaseTool):
"""Create a wrapper function that handles the OpenAI function tool interface"""
async def wrapper(context_wrapper: Any, arguments: Any) -> Any:
# Get the parameter name from the schema
param_name = list(
tool.args_schema.model_json_schema()["properties"].keys()
)[0]
# Handle different argument types
if isinstance(arguments, dict):
args_dict = arguments
elif isinstance(arguments, str):
try:
import json
args_dict = json.loads(arguments)
except json.JSONDecodeError:
args_dict = {param_name: arguments}
else:
args_dict = {param_name: str(arguments)}
# Run the tool with the processed arguments
output = tool._run(**args_dict)
# Await if the tool returned a coroutine
if inspect.isawaitable(output):
result = await output
else:
result = output
# Ensure the result is JSON serializable
if isinstance(result, (dict, list, str, int, float, bool, type(None))):
return result
return str(result)
return wrapper
openai_tools = []
for tool in tools:
schema = tool.args_schema.model_json_schema()
schema.update({"additionalProperties": False, "type": "object"})
openai_tool = FunctionTool(
name=sanitize_tool_name(tool.name),
description=tool.description,
params_json_schema=schema,
on_invoke_tool=create_tool_wrapper(tool),
)
openai_tools.append(openai_tool)
return openai_tools

View File

@@ -0,0 +1,122 @@
import json
import re
from crewai.agents.agent_adapters.base_converter_adapter import BaseConverterAdapter
from crewai.utilities.converter import generate_model_description
from crewai.utilities.i18n import I18N
class OpenAIConverterAdapter(BaseConverterAdapter):
"""
Adapter for handling structured output conversion in OpenAI agents.
This adapter enhances the OpenAI agent to handle structured output formats
and post-processes the results when needed.
Attributes:
_output_format: The expected output format (json, pydantic, or None)
_schema: The schema description for the expected output
_output_model: The Pydantic model for the output
"""
def __init__(self, agent_adapter):
"""Initialize the converter adapter with a reference to the agent adapter"""
self.agent_adapter = agent_adapter
self._output_format = None
self._schema = None
self._output_model = None
def configure_structured_output(self, task) -> None:
"""
Configure the structured output for OpenAI agent based on task requirements.
Args:
task: The task containing output format requirements
"""
# Reset configuration
self._output_format = None
self._schema = None
self._output_model = None
# If no structured output is required, return early
if not (task.output_json or task.output_pydantic):
return
# Configure based on task output format
if task.output_json:
self._output_format = "json"
self._schema = generate_model_description(task.output_json)
self.agent_adapter._openai_agent.output_type = task.output_json
self._output_model = task.output_json
elif task.output_pydantic:
self._output_format = "pydantic"
self._schema = generate_model_description(task.output_pydantic)
self.agent_adapter._openai_agent.output_type = task.output_pydantic
self._output_model = task.output_pydantic
def enhance_system_prompt(self, base_prompt: str) -> str:
"""
Enhance the base system prompt with structured output requirements if needed.
Args:
base_prompt: The original system prompt
Returns:
Enhanced system prompt with output format instructions if needed
"""
if not self._output_format:
return base_prompt
output_schema = (
I18N()
.slice("formatted_task_instructions")
.format(output_format=self._schema)
)
return f"{base_prompt}\n\n{output_schema}"
def post_process_result(self, result: str) -> str:
"""
Post-process the result to ensure it matches the expected format.
This method attempts to extract valid JSON from the result if necessary.
Args:
result: The raw result from the agent
Returns:
Processed result conforming to the expected output format
"""
if not self._output_format:
return result
# Try to extract valid JSON if it's wrapped in code blocks or other text
if isinstance(result, str) and self._output_format in ["json", "pydantic"]:
# First, try to parse as is
try:
json.loads(result)
return result
except json.JSONDecodeError:
# Try to extract JSON from markdown code blocks
code_block_pattern = r"```(?:json)?\s*([\s\S]*?)```"
code_blocks = re.findall(code_block_pattern, result)
for block in code_blocks:
try:
json.loads(block.strip())
return block.strip()
except json.JSONDecodeError:
continue
# Try to extract any JSON-like structure
json_pattern = r"(\{[\s\S]*\})"
json_matches = re.findall(json_pattern, result, re.DOTALL)
for match in json_matches:
try:
json.loads(match)
return match
except json.JSONDecodeError:
continue
# If all extraction attempts fail, return the original
return str(result)

View File

@@ -62,8 +62,6 @@ class BaseAgent(ABC, BaseModel):
Abstract method to execute a task.
create_agent_executor(tools=None) -> None:
Abstract method to create an agent executor.
_parse_tools(tools: List[BaseTool]) -> List[Any]:
Abstract method to parse tools.
get_delegation_tools(agents: List["BaseAgent"]):
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
get_output_converter(llm, model, instructions):
@@ -154,6 +152,9 @@ class BaseAgent(ABC, BaseModel):
callbacks: List[Callable] = Field(
default=[], description="Callbacks to be used for the agent"
)
adapted_agent: bool = Field(
default=False, description="Whether the agent is adapted"
)
@model_validator(mode="before")
@classmethod
@@ -170,15 +171,15 @@ class BaseAgent(ABC, BaseModel):
tool meets these criteria, it is processed and added to the list of
tools. Otherwise, a ValueError is raised.
"""
if not tools:
return []
processed_tools = []
required_attrs = ["name", "func", "description"]
for tool in tools:
if isinstance(tool, BaseTool):
processed_tools.append(tool)
elif (
hasattr(tool, "name")
and hasattr(tool, "func")
and hasattr(tool, "description")
):
elif all(hasattr(tool, attr) for attr in required_attrs):
# Tool has the required attributes, create a Tool instance
processed_tools.append(Tool.from_langchain(tool))
else:
@@ -260,13 +261,6 @@ class BaseAgent(ABC, BaseModel):
"""Set the task tools that init BaseAgenTools class."""
pass
@abstractmethod
def get_output_converter(
self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str
) -> Converter:
"""Get the converter class for the agent to create json/pydantic outputs."""
pass
def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel"
"""Create a deep copy of the Agent."""
exclude = {

View File

@@ -216,7 +216,7 @@ def convert_with_instructions(
def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str:
instructions = "Please convert the following text into valid JSON."
if llm.supports_function_calling():
if llm and not isinstance(llm, str) and llm.supports_function_calling():
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions += (
f"\n\nOutput ONLY the valid JSON and nothing else.\n\n"