mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-06 09:42:39 +00:00
feat: integrate structured output handling in LangGraph and OpenAI agents
- Added LangGraphConverterAdapter for managing structured output in LangGraph agents. - Enhanced LangGraphAgentAdapter to utilize the new converter for system prompt and task execution. - Updated LangGraphToolAdapter to use StructuredTool for better compatibility. - Introduced OpenAIConverterAdapter for structured output management in OpenAI agents. - Improved task execution flow in OpenAIAgentAdapter to incorporate structured output configuration and post-processing.
This commit is contained in:
@@ -9,6 +9,9 @@ from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter
|
||||
from crewai.agents.agent_adapters.langgraph.langgraph_tool_adapter import (
|
||||
LangGraphToolAdapter,
|
||||
)
|
||||
from crewai.agents.agent_adapters.langgraph.structured_output_converter import (
|
||||
LangGraphConverterAdapter,
|
||||
)
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.tools.agent_tools.agent_tools import AgentTools
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
@@ -58,6 +61,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
|
||||
**kwargs,
|
||||
)
|
||||
self._tool_adapter = LangGraphToolAdapter(tools=tools)
|
||||
self._converter_adapter = LangGraphConverterAdapter(self)
|
||||
self._max_iterations = max_iterations
|
||||
self._setup_graph()
|
||||
|
||||
@@ -69,13 +73,15 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
|
||||
|
||||
# Convert CrewAI tools to LangGraph/LangChain compatible tools
|
||||
converted_tools = self._tool_adapter.converted_tools
|
||||
print("langgraph converted_tools", converted_tools)
|
||||
|
||||
# Create the agent graph with ReAct pattern
|
||||
self._graph = create_react_agent(
|
||||
model=self.llm, # Pass as model parameter
|
||||
model=self.llm,
|
||||
tools=converted_tools,
|
||||
checkpointer=self._memory,
|
||||
)
|
||||
print("langgraph graph", self._graph)
|
||||
|
||||
except ImportError as e:
|
||||
self._logger.log(
|
||||
@@ -88,7 +94,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
|
||||
|
||||
def _build_system_prompt(self) -> str:
|
||||
"""Build a system prompt for the LangGraph agent."""
|
||||
return f"""You are {self.role}.
|
||||
base_prompt = f"""You are {self.role}.
|
||||
|
||||
Your goal is: {self.goal}
|
||||
|
||||
@@ -96,6 +102,8 @@ Your backstory: {self.backstory}
|
||||
|
||||
When working on tasks, think step-by-step and use the available tools when necessary.
|
||||
"""
|
||||
# Enhance with structured output instructions if configured
|
||||
return self._converter_adapter.enhance_system_prompt(base_prompt)
|
||||
|
||||
def execute_task(
|
||||
self,
|
||||
@@ -106,6 +114,9 @@ When working on tasks, think step-by-step and use the available tools when neces
|
||||
"""Execute a task using the LangGraph workflow."""
|
||||
self.create_agent_executor(tools)
|
||||
|
||||
# Configure structured output if needed
|
||||
self.configure_structured_output(task)
|
||||
|
||||
try:
|
||||
task_prompt = task.prompt() if hasattr(task, "prompt") else str(task)
|
||||
|
||||
@@ -131,25 +142,34 @@ When working on tasks, think step-by-step and use the available tools when neces
|
||||
config = {"configurable": {"thread_id": session_id}}
|
||||
|
||||
# Invoke the agent graph with the task prompt
|
||||
result = self._graph.invoke({"messages": [("user", task_prompt)]}, config)
|
||||
print("result", result)
|
||||
result = self._graph.invoke(
|
||||
{
|
||||
"messages": [
|
||||
("system", self._build_system_prompt()),
|
||||
("user", task_prompt),
|
||||
]
|
||||
},
|
||||
config,
|
||||
)
|
||||
|
||||
# Get the final response
|
||||
messages = result.get("messages", [])
|
||||
last_message = messages[-1] if messages else None
|
||||
|
||||
final_answer = ""
|
||||
print("final_answer", final_answer)
|
||||
if isinstance(last_message, dict):
|
||||
final_answer = last_message.get("content", "")
|
||||
elif hasattr(last_message, "content"):
|
||||
final_answer = getattr(last_message, "content", "")
|
||||
|
||||
return (
|
||||
final_answer
|
||||
# Post-process to ensure correct structured output format if needed
|
||||
final_answer = (
|
||||
self._converter_adapter.post_process_result(final_answer)
|
||||
or "Task execution completed but no clear answer was provided."
|
||||
)
|
||||
|
||||
return final_answer
|
||||
|
||||
except Exception as e:
|
||||
self._logger.log("error", f"Error executing LangGraph task: {str(e)}")
|
||||
crewai_event_bus.emit(
|
||||
@@ -259,5 +279,4 @@ When working on tasks, think step-by-step and use the available tools when neces
|
||||
|
||||
def configure_structured_output(self, task) -> None:
|
||||
"""Configure the structured output for LangGraph."""
|
||||
# This will be implemented in a separate improvement
|
||||
pass
|
||||
self._converter_adapter.configure_structured_output(task)
|
||||
|
||||
@@ -20,7 +20,7 @@ class LangGraphToolAdapter:
|
||||
Convert CrewAI tools to LangGraph-compatible tools
|
||||
LangGraph expects tools in langchain_core.tools format
|
||||
"""
|
||||
from langchain_core.tools import Tool
|
||||
from langchain_core.tools import StructuredTool
|
||||
|
||||
converted_tools = []
|
||||
|
||||
@@ -35,9 +35,11 @@ class LangGraphToolAdapter:
|
||||
else:
|
||||
return tool.run(**kwargs)
|
||||
|
||||
# Create a LangChain Tool
|
||||
converted_tool = Tool(
|
||||
name=tool.name, description=tool.description, func=tool_wrapper
|
||||
converted_tool = StructuredTool(
|
||||
name=tool.name,
|
||||
description=tool.description,
|
||||
func=tool_wrapper,
|
||||
args_schema=tool.args_schema,
|
||||
)
|
||||
|
||||
converted_tools.append(converted_tool)
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
import json
|
||||
|
||||
from crewai.utilities.converter import generate_model_description
|
||||
|
||||
|
||||
class LangGraphConverterAdapter:
|
||||
"""Adapter for handling structured output conversion in LangGraph agents"""
|
||||
|
||||
def __init__(self, agent_adapter):
|
||||
"""Initialize the converter adapter with a reference to the agent adapter"""
|
||||
self.agent_adapter = agent_adapter
|
||||
self._output_format = None
|
||||
self._schema = None
|
||||
self._system_prompt_appendix = None
|
||||
|
||||
def configure_structured_output(self, task) -> None:
|
||||
"""Configure the structured output for LangGraph."""
|
||||
if not (task.output_json or task.output_pydantic):
|
||||
self._output_format = None
|
||||
self._schema = None
|
||||
self._system_prompt_appendix = None
|
||||
return
|
||||
|
||||
if task.output_json:
|
||||
self._output_format = "json"
|
||||
self._schema = generate_model_description(task.output_json)
|
||||
elif task.output_pydantic:
|
||||
self._output_format = "pydantic"
|
||||
self._schema = generate_model_description(task.output_pydantic)
|
||||
|
||||
self._system_prompt_appendix = self._generate_system_prompt_appendix()
|
||||
|
||||
def _generate_system_prompt_appendix(self) -> str:
|
||||
"""Generate an appendix for the system prompt to enforce structured output"""
|
||||
if not self._output_format or not self._schema:
|
||||
return ""
|
||||
|
||||
return f"""
|
||||
Important: Your final answer MUST be provided in the following structured format:
|
||||
|
||||
{self._schema}
|
||||
|
||||
DO NOT include any markdown code blocks, backticks, or other formatting around your response.
|
||||
The output should be raw JSON that exactly matches the specified schema.
|
||||
"""
|
||||
|
||||
def enhance_system_prompt(self, original_prompt: str) -> str:
|
||||
"""Add structured output instructions to the system prompt if needed"""
|
||||
if not self._system_prompt_appendix:
|
||||
return original_prompt
|
||||
|
||||
return f"{original_prompt}\n{self._system_prompt_appendix}"
|
||||
|
||||
def post_process_result(self, result: str) -> str:
|
||||
"""Post-process the result to ensure it matches the expected format"""
|
||||
if not self._output_format:
|
||||
return result
|
||||
|
||||
# Try to extract valid JSON if it's wrapped in code blocks or other text
|
||||
if self._output_format in ["json", "pydantic"]:
|
||||
try:
|
||||
# First, try to parse as is
|
||||
json.loads(result)
|
||||
return result
|
||||
except json.JSONDecodeError:
|
||||
# Try to extract JSON from the text
|
||||
import re
|
||||
|
||||
json_match = re.search(r"(\{.*\})", result, re.DOTALL)
|
||||
if json_match:
|
||||
try:
|
||||
extracted = json_match.group(1)
|
||||
# Validate it's proper JSON
|
||||
json.loads(extracted)
|
||||
return extracted
|
||||
except:
|
||||
pass
|
||||
|
||||
return result
|
||||
@@ -5,6 +5,9 @@ from agents import Runner, Tool, enable_verbose_stdout_logging
|
||||
from pydantic import Field, PrivateAttr
|
||||
|
||||
from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter
|
||||
from crewai.agents.agent_adapters.openai_agents.structured_output_adapter import (
|
||||
OpenAIConverterAdapter,
|
||||
)
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.tools import BaseTool
|
||||
from crewai.tools.agent_tools.agent_tools import AgentTools
|
||||
@@ -30,6 +33,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
|
||||
step_callback: Any = Field(default=None)
|
||||
converted_tools: Optional[List[Tool]] = Field(default=None)
|
||||
_tool_adapter: OpenAIAgentToolAdapter = PrivateAttr()
|
||||
_converter_adapter: OpenAIConverterAdapter = PrivateAttr()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -49,6 +53,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
|
||||
self.tools = tools
|
||||
self._tool_adapter = OpenAIAgentToolAdapter(tools=tools)
|
||||
self.llm = model
|
||||
self._converter_adapter = OpenAIConverterAdapter(self)
|
||||
|
||||
def execute_task(
|
||||
self,
|
||||
@@ -58,6 +63,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
|
||||
) -> str:
|
||||
"""Execute a task using the OpenAI Assistant"""
|
||||
self.create_agent_executor(tools)
|
||||
self._converter_adapter.configure_structured_output(task)
|
||||
|
||||
if self.verbose:
|
||||
enable_verbose_stdout_logging()
|
||||
@@ -114,8 +120,8 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
|
||||
self._openai_agent.tools = self._tool_adapter.converted_tools
|
||||
|
||||
def handle_execution_result(self, result: Any) -> str:
|
||||
"""Process OpenAI Assistant execution result"""
|
||||
return result.final_output
|
||||
"""Process OpenAI Assistant execution result converting any structured output to a string"""
|
||||
return self._converter_adapter.post_process_result(result.final_output)
|
||||
|
||||
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
|
||||
"""Implement delegation tools support"""
|
||||
@@ -156,11 +162,4 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
|
||||
Args:
|
||||
structured_output: The structured output to be configured
|
||||
"""
|
||||
if task.output_json or task.output_pydantic:
|
||||
# Generate the schema based on the output format
|
||||
if task.output_json:
|
||||
# schema = json.dumps(task.output_json, indent=2)
|
||||
self._openai_agent.output_type = task.output_json
|
||||
|
||||
elif task.output_pydantic:
|
||||
self._openai_agent.output_type = task.output_pydantic
|
||||
self._converter_adapter.configure_structured_output(task)
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
import json
|
||||
import re
|
||||
|
||||
from crewai.utilities.converter import generate_model_description
|
||||
|
||||
|
||||
class OpenAIConverterAdapter:
|
||||
"""
|
||||
Adapter for handling structured output conversion in OpenAI agents.
|
||||
|
||||
This adapter enhances the OpenAI agent to handle structured output formats
|
||||
and post-processes the results when needed.
|
||||
|
||||
Attributes:
|
||||
agent_adapter: Reference to the parent OpenAIAgentAdapter
|
||||
_output_format: The expected output format (json, pydantic, or None)
|
||||
_schema: The schema description for the expected output
|
||||
_output_model: The Pydantic model for the output
|
||||
"""
|
||||
|
||||
def __init__(self, agent_adapter):
|
||||
"""Initialize the converter adapter with a reference to the agent adapter"""
|
||||
self.agent_adapter = agent_adapter
|
||||
self._output_format = None
|
||||
self._schema = None
|
||||
self._output_model = None
|
||||
|
||||
def configure_structured_output(self, task) -> None:
|
||||
"""
|
||||
Configure the structured output for OpenAI agent based on task requirements.
|
||||
|
||||
Args:
|
||||
task: The task containing output format requirements
|
||||
"""
|
||||
# Reset configuration
|
||||
self._output_format = None
|
||||
self._schema = None
|
||||
self._output_model = None
|
||||
|
||||
# If no structured output is required, return early
|
||||
if not (task.output_json or task.output_pydantic):
|
||||
return
|
||||
|
||||
# Configure based on task output format
|
||||
if task.output_json:
|
||||
self._output_format = "json"
|
||||
self._schema = generate_model_description(task.output_json)
|
||||
self._output_model = task.output_json
|
||||
self.agent_adapter._openai_agent.output_type = task.output_json
|
||||
elif task.output_pydantic:
|
||||
self._output_format = "pydantic"
|
||||
self._schema = generate_model_description(task.output_pydantic)
|
||||
self._output_model = task.output_pydantic
|
||||
self.agent_adapter._openai_agent.output_type = task.output_pydantic
|
||||
|
||||
def post_process_result(self, result: str) -> str:
|
||||
"""
|
||||
Post-process the result to ensure it matches the expected format.
|
||||
|
||||
This method attempts to extract valid JSON from the result if necessary.
|
||||
|
||||
Args:
|
||||
result: The raw result from the agent
|
||||
|
||||
Returns:
|
||||
Processed result conforming to the expected output format
|
||||
"""
|
||||
print("result", result)
|
||||
if not self._output_format:
|
||||
return result
|
||||
print("self._output_format", self._output_format)
|
||||
# Try to extract valid JSON if it's wrapped in code blocks or other text
|
||||
if isinstance(result, str) and self._output_format in ["json", "pydantic"]:
|
||||
# First, try to parse as is
|
||||
try:
|
||||
json.loads(result)
|
||||
return result
|
||||
except json.JSONDecodeError:
|
||||
# Try to extract JSON from markdown code blocks
|
||||
code_block_pattern = r"```(?:json)?\s*([\s\S]*?)```"
|
||||
code_blocks = re.findall(code_block_pattern, result)
|
||||
|
||||
for block in code_blocks:
|
||||
try:
|
||||
json.loads(block.strip())
|
||||
return block.strip()
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
# Try to extract any JSON-like structure
|
||||
json_pattern = r"(\{[\s\S]*\})"
|
||||
json_matches = re.findall(json_pattern, result, re.DOTALL)
|
||||
|
||||
for match in json_matches:
|
||||
try:
|
||||
json.loads(match)
|
||||
return match
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
# If all extraction attempts fail, return the original
|
||||
return str(result)
|
||||
@@ -216,7 +216,7 @@ def convert_with_instructions(
|
||||
|
||||
def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str:
|
||||
instructions = "Please convert the following text into valid JSON."
|
||||
if llm and llm.supports_function_calling():
|
||||
if llm and not isinstance(llm, str) and llm.supports_function_calling():
|
||||
model_schema = PydanticSchemaParser(model=model).get_schema()
|
||||
instructions += (
|
||||
f"\n\nOutput ONLY the valid JSON and nothing else.\n\n"
|
||||
|
||||
Reference in New Issue
Block a user