From 6ae0c6f6643a15530855d0e4d27bbaf942fd0d4f Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Thu, 10 Apr 2025 14:01:37 -0700 Subject: [PATCH] feat: integrate structured output handling in LangGraph and OpenAI agents - Added LangGraphConverterAdapter for managing structured output in LangGraph agents. - Enhanced LangGraphAgentAdapter to utilize the new converter for system prompt and task execution. - Updated LangGraphToolAdapter to use StructuredTool for better compatibility. - Introduced OpenAIConverterAdapter for structured output management in OpenAI agents. - Improved task execution flow in OpenAIAgentAdapter to incorporate structured output configuration and post-processing. --- .../langgraph/langgraph_adapter.py | 37 +++++-- .../langgraph/langgraph_tool_adapter.py | 10 +- .../langgraph/structured_output_converter.py | 79 ++++++++++++++ .../openai_agents/openai_adapter.py | 19 ++-- .../structured_output_adapter.py | 102 ++++++++++++++++++ src/crewai/utilities/converter.py | 2 +- 6 files changed, 225 insertions(+), 24 deletions(-) create mode 100644 src/crewai/agents/agent_adapters/langgraph/structured_output_converter.py create mode 100644 src/crewai/agents/agent_adapters/openai_agents/structured_output_adapter.py diff --git a/src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py b/src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py index 6b298370f..49ac2b224 100644 --- a/src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py +++ b/src/crewai/agents/agent_adapters/langgraph/langgraph_adapter.py @@ -9,6 +9,9 @@ from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter from crewai.agents.agent_adapters.langgraph.langgraph_tool_adapter import ( LangGraphToolAdapter, ) +from crewai.agents.agent_adapters.langgraph.structured_output_converter import ( + LangGraphConverterAdapter, +) from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.tools.base_tool import BaseTool @@ -58,6 +61,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter): **kwargs, ) self._tool_adapter = LangGraphToolAdapter(tools=tools) + self._converter_adapter = LangGraphConverterAdapter(self) self._max_iterations = max_iterations self._setup_graph() @@ -69,13 +73,15 @@ class LangGraphAgentAdapter(BaseAgentAdapter): # Convert CrewAI tools to LangGraph/LangChain compatible tools converted_tools = self._tool_adapter.converted_tools + print("langgraph converted_tools", converted_tools) # Create the agent graph with ReAct pattern self._graph = create_react_agent( - model=self.llm, # Pass as model parameter + model=self.llm, tools=converted_tools, checkpointer=self._memory, ) + print("langgraph graph", self._graph) except ImportError as e: self._logger.log( @@ -88,7 +94,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter): def _build_system_prompt(self) -> str: """Build a system prompt for the LangGraph agent.""" - return f"""You are {self.role}. + base_prompt = f"""You are {self.role}. Your goal is: {self.goal} @@ -96,6 +102,8 @@ Your backstory: {self.backstory} When working on tasks, think step-by-step and use the available tools when necessary. """ + # Enhance with structured output instructions if configured + return self._converter_adapter.enhance_system_prompt(base_prompt) def execute_task( self, @@ -106,6 +114,9 @@ When working on tasks, think step-by-step and use the available tools when neces """Execute a task using the LangGraph workflow.""" self.create_agent_executor(tools) + # Configure structured output if needed + self.configure_structured_output(task) + try: task_prompt = task.prompt() if hasattr(task, "prompt") else str(task) @@ -131,25 +142,34 @@ When working on tasks, think step-by-step and use the available tools when neces config = {"configurable": {"thread_id": session_id}} # Invoke the agent graph with the task prompt - result = self._graph.invoke({"messages": [("user", task_prompt)]}, config) - print("result", result) + result = self._graph.invoke( + { + "messages": [ + ("system", self._build_system_prompt()), + ("user", task_prompt), + ] + }, + config, + ) # Get the final response messages = result.get("messages", []) last_message = messages[-1] if messages else None final_answer = "" - print("final_answer", final_answer) if isinstance(last_message, dict): final_answer = last_message.get("content", "") elif hasattr(last_message, "content"): final_answer = getattr(last_message, "content", "") - return ( - final_answer + # Post-process to ensure correct structured output format if needed + final_answer = ( + self._converter_adapter.post_process_result(final_answer) or "Task execution completed but no clear answer was provided." ) + return final_answer + except Exception as e: self._logger.log("error", f"Error executing LangGraph task: {str(e)}") crewai_event_bus.emit( @@ -259,5 +279,4 @@ When working on tasks, think step-by-step and use the available tools when neces def configure_structured_output(self, task) -> None: """Configure the structured output for LangGraph.""" - # This will be implemented in a separate improvement - pass + self._converter_adapter.configure_structured_output(task) diff --git a/src/crewai/agents/agent_adapters/langgraph/langgraph_tool_adapter.py b/src/crewai/agents/agent_adapters/langgraph/langgraph_tool_adapter.py index f82749283..cff6804fd 100644 --- a/src/crewai/agents/agent_adapters/langgraph/langgraph_tool_adapter.py +++ b/src/crewai/agents/agent_adapters/langgraph/langgraph_tool_adapter.py @@ -20,7 +20,7 @@ class LangGraphToolAdapter: Convert CrewAI tools to LangGraph-compatible tools LangGraph expects tools in langchain_core.tools format """ - from langchain_core.tools import Tool + from langchain_core.tools import StructuredTool converted_tools = [] @@ -35,9 +35,11 @@ class LangGraphToolAdapter: else: return tool.run(**kwargs) - # Create a LangChain Tool - converted_tool = Tool( - name=tool.name, description=tool.description, func=tool_wrapper + converted_tool = StructuredTool( + name=tool.name, + description=tool.description, + func=tool_wrapper, + args_schema=tool.args_schema, ) converted_tools.append(converted_tool) diff --git a/src/crewai/agents/agent_adapters/langgraph/structured_output_converter.py b/src/crewai/agents/agent_adapters/langgraph/structured_output_converter.py new file mode 100644 index 000000000..9beeae74a --- /dev/null +++ b/src/crewai/agents/agent_adapters/langgraph/structured_output_converter.py @@ -0,0 +1,79 @@ +import json + +from crewai.utilities.converter import generate_model_description + + +class LangGraphConverterAdapter: + """Adapter for handling structured output conversion in LangGraph agents""" + + def __init__(self, agent_adapter): + """Initialize the converter adapter with a reference to the agent adapter""" + self.agent_adapter = agent_adapter + self._output_format = None + self._schema = None + self._system_prompt_appendix = None + + def configure_structured_output(self, task) -> None: + """Configure the structured output for LangGraph.""" + if not (task.output_json or task.output_pydantic): + self._output_format = None + self._schema = None + self._system_prompt_appendix = None + return + + if task.output_json: + self._output_format = "json" + self._schema = generate_model_description(task.output_json) + elif task.output_pydantic: + self._output_format = "pydantic" + self._schema = generate_model_description(task.output_pydantic) + + self._system_prompt_appendix = self._generate_system_prompt_appendix() + + def _generate_system_prompt_appendix(self) -> str: + """Generate an appendix for the system prompt to enforce structured output""" + if not self._output_format or not self._schema: + return "" + + return f""" +Important: Your final answer MUST be provided in the following structured format: + +{self._schema} + +DO NOT include any markdown code blocks, backticks, or other formatting around your response. +The output should be raw JSON that exactly matches the specified schema. +""" + + def enhance_system_prompt(self, original_prompt: str) -> str: + """Add structured output instructions to the system prompt if needed""" + if not self._system_prompt_appendix: + return original_prompt + + return f"{original_prompt}\n{self._system_prompt_appendix}" + + def post_process_result(self, result: str) -> str: + """Post-process the result to ensure it matches the expected format""" + if not self._output_format: + return result + + # Try to extract valid JSON if it's wrapped in code blocks or other text + if self._output_format in ["json", "pydantic"]: + try: + # First, try to parse as is + json.loads(result) + return result + except json.JSONDecodeError: + # Try to extract JSON from the text + import re + + json_match = re.search(r"(\{.*\})", result, re.DOTALL) + if json_match: + try: + extracted = json_match.group(1) + # Validate it's proper JSON + json.loads(extracted) + return extracted + except: + pass + + return result diff --git a/src/crewai/agents/agent_adapters/openai_agents/openai_adapter.py b/src/crewai/agents/agent_adapters/openai_agents/openai_adapter.py index 71e7cd7c6..1f941d02a 100644 --- a/src/crewai/agents/agent_adapters/openai_agents/openai_adapter.py +++ b/src/crewai/agents/agent_adapters/openai_agents/openai_adapter.py @@ -5,6 +5,9 @@ from agents import Runner, Tool, enable_verbose_stdout_logging from pydantic import Field, PrivateAttr from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter +from crewai.agents.agent_adapters.openai_agents.structured_output_adapter import ( + OpenAIConverterAdapter, +) from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.tools import BaseTool from crewai.tools.agent_tools.agent_tools import AgentTools @@ -30,6 +33,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter): step_callback: Any = Field(default=None) converted_tools: Optional[List[Tool]] = Field(default=None) _tool_adapter: OpenAIAgentToolAdapter = PrivateAttr() + _converter_adapter: OpenAIConverterAdapter = PrivateAttr() def __init__( self, @@ -49,6 +53,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter): self.tools = tools self._tool_adapter = OpenAIAgentToolAdapter(tools=tools) self.llm = model + self._converter_adapter = OpenAIConverterAdapter(self) def execute_task( self, @@ -58,6 +63,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter): ) -> str: """Execute a task using the OpenAI Assistant""" self.create_agent_executor(tools) + self._converter_adapter.configure_structured_output(task) if self.verbose: enable_verbose_stdout_logging() @@ -114,8 +120,8 @@ class OpenAIAgentAdapter(BaseAgentAdapter): self._openai_agent.tools = self._tool_adapter.converted_tools def handle_execution_result(self, result: Any) -> str: - """Process OpenAI Assistant execution result""" - return result.final_output + """Process OpenAI Assistant execution result converting any structured output to a string""" + return self._converter_adapter.post_process_result(result.final_output) def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]: """Implement delegation tools support""" @@ -156,11 +162,4 @@ class OpenAIAgentAdapter(BaseAgentAdapter): Args: structured_output: The structured output to be configured """ - if task.output_json or task.output_pydantic: - # Generate the schema based on the output format - if task.output_json: - # schema = json.dumps(task.output_json, indent=2) - self._openai_agent.output_type = task.output_json - - elif task.output_pydantic: - self._openai_agent.output_type = task.output_pydantic + self._converter_adapter.configure_structured_output(task) diff --git a/src/crewai/agents/agent_adapters/openai_agents/structured_output_adapter.py b/src/crewai/agents/agent_adapters/openai_agents/structured_output_adapter.py new file mode 100644 index 000000000..8aba1cca9 --- /dev/null +++ b/src/crewai/agents/agent_adapters/openai_agents/structured_output_adapter.py @@ -0,0 +1,102 @@ +import json +import re + +from crewai.utilities.converter import generate_model_description + + +class OpenAIConverterAdapter: + """ + Adapter for handling structured output conversion in OpenAI agents. + + This adapter enhances the OpenAI agent to handle structured output formats + and post-processes the results when needed. + + Attributes: + agent_adapter: Reference to the parent OpenAIAgentAdapter + _output_format: The expected output format (json, pydantic, or None) + _schema: The schema description for the expected output + _output_model: The Pydantic model for the output + """ + + def __init__(self, agent_adapter): + """Initialize the converter adapter with a reference to the agent adapter""" + self.agent_adapter = agent_adapter + self._output_format = None + self._schema = None + self._output_model = None + + def configure_structured_output(self, task) -> None: + """ + Configure the structured output for OpenAI agent based on task requirements. + + Args: + task: The task containing output format requirements + """ + # Reset configuration + self._output_format = None + self._schema = None + self._output_model = None + + # If no structured output is required, return early + if not (task.output_json or task.output_pydantic): + return + + # Configure based on task output format + if task.output_json: + self._output_format = "json" + self._schema = generate_model_description(task.output_json) + self._output_model = task.output_json + self.agent_adapter._openai_agent.output_type = task.output_json + elif task.output_pydantic: + self._output_format = "pydantic" + self._schema = generate_model_description(task.output_pydantic) + self._output_model = task.output_pydantic + self.agent_adapter._openai_agent.output_type = task.output_pydantic + + def post_process_result(self, result: str) -> str: + """ + Post-process the result to ensure it matches the expected format. + + This method attempts to extract valid JSON from the result if necessary. + + Args: + result: The raw result from the agent + + Returns: + Processed result conforming to the expected output format + """ + print("result", result) + if not self._output_format: + return result + print("self._output_format", self._output_format) + # Try to extract valid JSON if it's wrapped in code blocks or other text + if isinstance(result, str) and self._output_format in ["json", "pydantic"]: + # First, try to parse as is + try: + json.loads(result) + return result + except json.JSONDecodeError: + # Try to extract JSON from markdown code blocks + code_block_pattern = r"```(?:json)?\s*([\s\S]*?)```" + code_blocks = re.findall(code_block_pattern, result) + + for block in code_blocks: + try: + json.loads(block.strip()) + return block.strip() + except json.JSONDecodeError: + continue + + # Try to extract any JSON-like structure + json_pattern = r"(\{[\s\S]*\})" + json_matches = re.findall(json_pattern, result, re.DOTALL) + + for match in json_matches: + try: + json.loads(match) + return match + except json.JSONDecodeError: + continue + + # If all extraction attempts fail, return the original + return str(result) diff --git a/src/crewai/utilities/converter.py b/src/crewai/utilities/converter.py index 120b6a028..a6144868e 100644 --- a/src/crewai/utilities/converter.py +++ b/src/crewai/utilities/converter.py @@ -216,7 +216,7 @@ def convert_with_instructions( def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str: instructions = "Please convert the following text into valid JSON." - if llm and llm.supports_function_calling(): + if llm and not isinstance(llm, str) and llm.supports_function_calling(): model_schema = PydanticSchemaParser(model=model).get_schema() instructions += ( f"\n\nOutput ONLY the valid JSON and nothing else.\n\n"