Compare commits

..

3 Commits

Author SHA1 Message Date
Devin AI
42e93984c3 fix: Refactor try-except loop to resolve PERF203 lint issue
Co-Authored-By: João <joao@crewai.com>
2025-09-12 14:01:09 +00:00
Devin AI
41ad22a573 fix: Address lint issues in parser implementation
- Fix loop variable usage in detect_and_parse method
- Replace try-except-pass with proper exception handling
- Clean up docstring formatting to remove whitespace on blank lines

Co-Authored-By: João <joao@crewai.com>
2025-09-12 13:55:27 +00:00
Devin AI
c84bdac4a6 feat: Add support for multiple LLM output formats beyond ReAct
- Implement extensible parser architecture with BaseOutputParser abstract class
- Add OutputFormatRegistry for automatic format detection and parsing
- Support OpenAI Harmony format with analysis and commentary channels
- Maintain full backward compatibility with existing ReAct format
- Add comprehensive tests for both formats and automatic detection
- Zero breaking changes - existing ReAct code continues to work unchanged

Addresses issue #3508: Support for Multiple LLM Output Formats Beyond ReAct

Co-Authored-By: João <joao@crewai.com>
2025-09-12 13:48:38 +00:00
21 changed files with 454 additions and 710 deletions

View File

@@ -135,7 +135,6 @@ ignore = ["E501"] # ignore line too long
[tool.ruff.lint.per-file-ignores]
"tests/**/*.py" = ["S101"] # Allow assert statements in tests
"src/crewai/cli/subprocess_utils.py" = ["S602", "S603"] # Allow shell=True for Windows compatibility
[tool.mypy]
exclude = ["src/crewai/cli/templates", "tests"]

View File

@@ -1,99 +1,78 @@
"""OpenAI agents adapter for CrewAI integration.
from typing import Any, List, Optional
This module contains the OpenAIAgentAdapter class that integrates OpenAI Assistants
with CrewAI's agent system, providing tool integration and structured output support.
"""
from typing import Any, cast
from pydantic import ConfigDict, Field, PrivateAttr
from typing_extensions import Unpack
from pydantic import Field, PrivateAttr
from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter
from crewai.agents.agent_adapters.openai_agents.openai_agent_tool_adapter import (
OpenAIAgentToolAdapter,
)
from crewai.agents.agent_adapters.openai_agents.protocols import (
AgentKwargs,
OpenAIAgentsModule,
)
from crewai.agents.agent_adapters.openai_agents.protocols import (
OpenAIAgent as OpenAIAgentProtocol,
)
from crewai.agents.agent_adapters.openai_agents.structured_output_converter import (
OpenAIConverterAdapter,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Logger
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Logger
from crewai.utilities.import_utils import require
openai_agents_module = cast(
OpenAIAgentsModule,
require(
"agents",
purpose="OpenAI agents functionality",
),
)
OpenAIAgent = openai_agents_module.Agent
Runner = openai_agents_module.Runner
enable_verbose_stdout_logging = openai_agents_module.enable_verbose_stdout_logging
try:
from agents import Agent as OpenAIAgent # type: ignore
from agents import Runner, enable_verbose_stdout_logging # type: ignore
from .openai_agent_tool_adapter import OpenAIAgentToolAdapter
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
class OpenAIAgentAdapter(BaseAgentAdapter):
"""Adapter for OpenAI Assistants.
"""Adapter for OpenAI Assistants"""
Integrates OpenAI Assistants API with CrewAI's agent system, providing
tool configuration, structured output handling, and task execution.
"""
model_config = {"arbitrary_types_allowed": True}
model_config = ConfigDict(arbitrary_types_allowed=True)
_openai_agent: OpenAIAgentProtocol = PrivateAttr()
_logger: Logger = PrivateAttr(default_factory=Logger)
_active_thread: str | None = PrivateAttr(default=None)
_openai_agent: "OpenAIAgent" = PrivateAttr()
_logger: Logger = PrivateAttr(default_factory=lambda: Logger())
_active_thread: Optional[str] = PrivateAttr(default=None)
function_calling_llm: Any = Field(default=None)
step_callback: Any = Field(default=None)
_tool_adapter: OpenAIAgentToolAdapter = PrivateAttr()
_tool_adapter: "OpenAIAgentToolAdapter" = PrivateAttr()
_converter_adapter: OpenAIConverterAdapter = PrivateAttr()
def __init__(
self,
**kwargs: Unpack[AgentKwargs],
) -> None:
"""Initialize the OpenAI agent adapter.
Args:
**kwargs: All initialization arguments including role, goal, backstory,
model, tools, and agent_config.
Raises:
ImportError: If OpenAI agent dependencies are not installed.
"""
self.llm = kwargs.pop("model", "gpt-4o-mini")
super().__init__(**kwargs)
self._tool_adapter = OpenAIAgentToolAdapter(tools=kwargs.get("tools"))
self._converter_adapter = OpenAIConverterAdapter(agent_adapter=self)
model: str = "gpt-4o-mini",
tools: Optional[List[BaseTool]] = None,
agent_config: Optional[dict] = None,
**kwargs,
):
if not OPENAI_AVAILABLE:
raise ImportError(
"OpenAI Agent Dependencies are not installed. Please install it using `uv add openai-agents`"
)
else:
role = kwargs.pop("role", None)
goal = kwargs.pop("goal", None)
backstory = kwargs.pop("backstory", None)
super().__init__(
role=role,
goal=goal,
backstory=backstory,
tools=tools,
agent_config=agent_config,
**kwargs,
)
self._tool_adapter = OpenAIAgentToolAdapter(tools=tools)
self.llm = model
self._converter_adapter = OpenAIConverterAdapter(self)
def _build_system_prompt(self) -> str:
"""Build a system prompt for the OpenAI agent.
Creates a prompt containing the agent's role, goal, and backstory,
then enhances it with structured output instructions if needed.
Returns:
The complete system prompt string.
"""
"""Build a system prompt for the OpenAI agent."""
base_prompt = f"""
You are {self.role}.
Your goal is: {self.goal}
Your backstory: {self.backstory}
@@ -105,25 +84,10 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
def execute_task(
self,
task: Any,
context: str | None = None,
tools: list[BaseTool] | None = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task using the OpenAI Assistant.
Configures the assistant, processes the task, and handles event emission
for execution tracking.
Args:
task: The task object to execute.
context: Optional context information for the task.
tools: Optional additional tools for this execution.
Returns:
The final answer from the task execution.
Raises:
Exception: If task execution fails.
"""
"""Execute a task using the OpenAI Assistant"""
self._converter_adapter.configure_structured_output(task)
self.create_agent_executor(tools)
@@ -131,7 +95,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
enable_verbose_stdout_logging()
try:
task_prompt: str = task.prompt()
task_prompt = task.prompt()
if context:
task_prompt = self.i18n.slice("task_with_context").format(
task=task_prompt, context=context
@@ -145,8 +109,8 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
task=task,
),
)
result: Any = self.agent_executor.run_sync(self._openai_agent, task_prompt)
final_answer: str = self.handle_execution_result(result)
result = self.agent_executor.run_sync(self._openai_agent, task_prompt)
final_answer = self.handle_execution_result(result)
crewai_event_bus.emit(
self,
event=AgentExecutionCompletedEvent(
@@ -156,7 +120,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
return final_answer
except Exception as e:
self._logger.log("error", f"Error executing OpenAI task: {e!s}")
self._logger.log("error", f"Error executing OpenAI task: {str(e)}")
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
@@ -167,22 +131,15 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: list[BaseTool] | None = None) -> None:
"""Configure the OpenAI agent for execution.
While OpenAI handles execution differently through Runner,
this method sets up tools and agent configuration.
Args:
tools: Optional tools to configure for the agent.
Notes:
TODO: Properly type agent_executor in BaseAgent to avoid type issues
when assigning Runner class to this attribute.
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
"""
all_tools: list[BaseTool] = list(self.tools or []) + list(tools or [])
Configure the OpenAI agent for execution.
While OpenAI handles execution differently through Runner,
we can use this method to set up tools and configurations.
"""
all_tools = list(self.tools or []) + list(tools or [])
instructions: str = self._build_system_prompt()
instructions = self._build_system_prompt()
self._openai_agent = OpenAIAgent(
name=self.role,
instructions=instructions,
@@ -195,48 +152,27 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
self.agent_executor = Runner
def configure_tools(self, tools: list[BaseTool] | None = None) -> None:
"""Configure tools for the OpenAI Assistant.
Args:
tools: Optional tools to configure for the assistant.
"""
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure tools for the OpenAI Assistant"""
if tools:
self._tool_adapter.configure_tools(tools)
if self._tool_adapter.converted_tools:
self._openai_agent.tools = self._tool_adapter.converted_tools
def handle_execution_result(self, result: Any) -> str:
"""Process OpenAI Assistant execution result.
Converts any structured output to a string through the converter adapter.
Args:
result: The execution result from the OpenAI assistant.
Returns:
Processed result as a string.
"""
"""Process OpenAI Assistant execution result converting any structured output to a string"""
return self._converter_adapter.post_process_result(result.final_output)
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
"""Implement delegation tools support.
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
"""Implement delegation tools support"""
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
Creates delegation tools that allow this agent to delegate tasks to other agents.
Args:
agents: List of agents available for delegation.
Returns:
List of delegation tools.
"""
agent_tools: AgentTools = AgentTools(agents=agents)
return agent_tools.tools()
def configure_structured_output(self, task: Any) -> None:
def configure_structured_output(self, task) -> None:
"""Configure the structured output for the specific agent implementation.
Args:
task: The task object containing output format specifications.
structured_output: The structured output to be configured
"""
self._converter_adapter.configure_structured_output(task)

View File

@@ -1,125 +1,57 @@
"""OpenAI agent tool adapter for CrewAI tool integration.
This module contains the OpenAIAgentToolAdapter class that converts CrewAI tools
to OpenAI Assistant-compatible format using the agents library.
"""
import inspect
import json
import re
from collections.abc import Awaitable
from typing import Any, cast
from typing import Any, List, Optional
from agents import FunctionTool, Tool
from crewai.agents.agent_adapters.base_tool_adapter import BaseToolAdapter
from crewai.agents.agent_adapters.openai_agents.protocols import (
OpenAIFunctionTool,
OpenAITool,
)
from crewai.tools import BaseTool
from crewai.utilities.import_utils import require
agents_module = cast(
Any,
require(
"agents",
purpose="OpenAI agents functionality",
),
)
FunctionTool = agents_module.FunctionTool
Tool = agents_module.Tool
class OpenAIAgentToolAdapter(BaseToolAdapter):
"""Adapter for OpenAI Assistant tools.
"""Adapter for OpenAI Assistant tools"""
Converts CrewAI BaseTool instances to OpenAI Assistant FunctionTool format
that can be used by OpenAI agents.
"""
def __init__(self, tools: Optional[List[BaseTool]] = None):
self.original_tools = tools or []
def __init__(self, tools: list[BaseTool] | None = None) -> None:
"""Initialize the tool adapter.
Args:
tools: Optional list of CrewAI tools to adapt.
"""
super().__init__()
self.original_tools: list[BaseTool] = tools or []
self.converted_tools: list[OpenAITool] = []
def configure_tools(self, tools: list[BaseTool]) -> None:
"""Configure tools for the OpenAI Assistant.
Merges provided tools with original tools and converts them to
OpenAI Assistant format.
Args:
tools: List of CrewAI tools to configure.
"""
def configure_tools(self, tools: List[BaseTool]) -> None:
"""Configure tools for the OpenAI Assistant"""
if self.original_tools:
all_tools: list[BaseTool] = tools + self.original_tools
all_tools = tools + self.original_tools
else:
all_tools = tools
if all_tools:
self.converted_tools = self._convert_tools_to_openai_format(all_tools)
@staticmethod
def _convert_tools_to_openai_format(
tools: list[BaseTool] | None,
) -> list[OpenAITool]:
"""Convert CrewAI tools to OpenAI Assistant tool format.
Args:
tools: List of CrewAI tools to convert.
Returns:
List of OpenAI Assistant FunctionTool instances.
"""
self, tools: Optional[List[BaseTool]]
) -> List[Tool]:
"""Convert CrewAI tools to OpenAI Assistant tool format"""
if not tools:
return []
def sanitize_tool_name(name: str) -> str:
"""Convert tool name to match OpenAI's required pattern.
"""Convert tool name to match OpenAI's required pattern"""
import re
Args:
name: Original tool name.
sanitized = re.sub(r"[^a-zA-Z0-9_-]", "_", name).lower()
return sanitized
Returns:
Sanitized tool name matching OpenAI requirements.
"""
return re.sub(r"[^a-zA-Z0-9_-]", "_", name).lower()
def create_tool_wrapper(tool: BaseTool) -> Any:
"""Create a wrapper function that handles the OpenAI function tool interface.
Args:
tool: The CrewAI tool to wrap.
Returns:
Async wrapper function for OpenAI agent integration.
"""
def create_tool_wrapper(tool: BaseTool):
"""Create a wrapper function that handles the OpenAI function tool interface"""
async def wrapper(context_wrapper: Any, arguments: Any) -> Any:
"""Wrapper function to adapt CrewAI tool calls to OpenAI format.
Args:
context_wrapper: OpenAI context wrapper.
arguments: Tool arguments from OpenAI.
Returns:
Tool execution result.
"""
# Get the parameter name from the schema
param_name: str = next(
iter(tool.args_schema.model_json_schema()["properties"].keys())
)
param_name = list(
tool.args_schema.model_json_schema()["properties"].keys()
)[0]
# Handle different argument types
args_dict: dict[str, Any]
if isinstance(arguments, dict):
args_dict = arguments
elif isinstance(arguments, str):
try:
import json
args_dict = json.loads(arguments)
except json.JSONDecodeError:
args_dict = {param_name: arguments}
@@ -127,11 +59,11 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
args_dict = {param_name: str(arguments)}
# Run the tool with the processed arguments
output: Any | Awaitable[Any] = tool._run(**args_dict)
output = tool._run(**args_dict)
# Await if the tool returned a coroutine
if inspect.isawaitable(output):
result: Any = await output
result = await output
else:
result = output
@@ -142,20 +74,17 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
return wrapper
openai_tools: list[OpenAITool] = []
openai_tools = []
for tool in tools:
schema: dict[str, Any] = tool.args_schema.model_json_schema()
schema = tool.args_schema.model_json_schema()
schema.update({"additionalProperties": False, "type": "object"})
openai_tool: OpenAIFunctionTool = cast(
OpenAIFunctionTool,
FunctionTool(
name=sanitize_tool_name(tool.name),
description=tool.description,
params_json_schema=schema,
on_invoke_tool=create_tool_wrapper(tool),
),
openai_tool = FunctionTool(
name=sanitize_tool_name(tool.name),
description=tool.description,
params_json_schema=schema,
on_invoke_tool=create_tool_wrapper(tool),
)
openai_tools.append(openai_tool)

View File

@@ -1,74 +0,0 @@
"""Type protocols for OpenAI agents modules."""
from collections.abc import Callable
from typing import Any, Protocol, TypedDict, runtime_checkable
from crewai.tools.base_tool import BaseTool
class AgentKwargs(TypedDict, total=False):
"""Typed dict for agent initialization kwargs."""
role: str
goal: str
backstory: str
model: str
tools: list[BaseTool] | None
agent_config: dict[str, Any] | None
@runtime_checkable
class OpenAIAgent(Protocol):
"""Protocol for OpenAI Agent."""
def __init__(
self,
name: str,
instructions: str,
model: str,
**kwargs: Any,
) -> None:
"""Initialize the OpenAI agent."""
...
tools: list[Any]
output_type: Any
@runtime_checkable
class OpenAIRunner(Protocol):
"""Protocol for OpenAI Runner."""
@classmethod
def run_sync(cls, agent: OpenAIAgent, message: str) -> Any:
"""Run agent synchronously with a message."""
...
@runtime_checkable
class OpenAIAgentsModule(Protocol):
"""Protocol for OpenAI agents module."""
Agent: type[OpenAIAgent]
Runner: type[OpenAIRunner]
enable_verbose_stdout_logging: Callable[[], None]
@runtime_checkable
class OpenAITool(Protocol):
"""Protocol for OpenAI Tool."""
@runtime_checkable
class OpenAIFunctionTool(Protocol):
"""Protocol for OpenAI FunctionTool."""
def __init__(
self,
name: str,
description: str,
params_json_schema: dict[str, Any],
on_invoke_tool: Any,
) -> None:
"""Initialize the function tool."""
...

View File

@@ -1,12 +1,5 @@
"""OpenAI structured output converter for CrewAI task integration.
This module contains the OpenAIConverterAdapter class that handles structured
output conversion for OpenAI agents, supporting JSON and Pydantic model formats.
"""
import json
import re
from typing import Any, Literal
from crewai.agents.agent_adapters.base_converter_adapter import BaseConverterAdapter
from crewai.utilities.converter import generate_model_description
@@ -14,7 +7,8 @@ from crewai.utilities.i18n import I18N
class OpenAIConverterAdapter(BaseConverterAdapter):
"""Adapter for handling structured output conversion in OpenAI agents.
"""
Adapter for handling structured output conversion in OpenAI agents.
This adapter enhances the OpenAI agent to handle structured output formats
and post-processes the results when needed.
@@ -25,23 +19,19 @@ class OpenAIConverterAdapter(BaseConverterAdapter):
_output_model: The Pydantic model for the output
"""
def __init__(self, agent_adapter: Any) -> None:
"""Initialize the converter adapter with a reference to the agent adapter.
def __init__(self, agent_adapter):
"""Initialize the converter adapter with a reference to the agent adapter"""
self.agent_adapter = agent_adapter
self._output_format = None
self._schema = None
self._output_model = None
Args:
agent_adapter: The OpenAI agent adapter instance.
def configure_structured_output(self, task) -> None:
"""
super().__init__(agent_adapter=agent_adapter)
self.agent_adapter: Any = agent_adapter
self._output_format: Literal["json", "pydantic"] | None = None
self._schema: str | None = None
self._output_model: Any = None
def configure_structured_output(self, task: Any) -> None:
"""Configure the structured output for OpenAI agent based on task requirements.
Configure the structured output for OpenAI agent based on task requirements.
Args:
task: The task containing output format requirements.
task: The task containing output format requirements
"""
# Reset configuration
self._output_format = None
@@ -65,18 +55,19 @@ class OpenAIConverterAdapter(BaseConverterAdapter):
self._output_model = task.output_pydantic
def enhance_system_prompt(self, base_prompt: str) -> str:
"""Enhance the base system prompt with structured output requirements if needed.
"""
Enhance the base system prompt with structured output requirements if needed.
Args:
base_prompt: The original system prompt.
base_prompt: The original system prompt
Returns:
Enhanced system prompt with output format instructions if needed.
Enhanced system prompt with output format instructions if needed
"""
if not self._output_format:
return base_prompt
output_schema: str = (
output_schema = (
I18N()
.slice("formatted_task_instructions")
.format(output_format=self._schema)
@@ -85,15 +76,16 @@ class OpenAIConverterAdapter(BaseConverterAdapter):
return f"{base_prompt}\n\n{output_schema}"
def post_process_result(self, result: str) -> str:
"""Post-process the result to ensure it matches the expected format.
"""
Post-process the result to ensure it matches the expected format.
This method attempts to extract valid JSON from the result if necessary.
Args:
result: The raw result from the agent.
result: The raw result from the agent
Returns:
Processed result conforming to the expected output format.
Processed result conforming to the expected output format
"""
if not self._output_format:
return result
@@ -105,30 +97,26 @@ class OpenAIConverterAdapter(BaseConverterAdapter):
return result
except json.JSONDecodeError:
# Try to extract JSON from markdown code blocks
code_block_pattern: str = r"```(?:json)?\s*([\s\S]*?)```"
code_blocks: list[str] = re.findall(code_block_pattern, result)
code_block_pattern = r"```(?:json)?\s*([\s\S]*?)```"
code_blocks = re.findall(code_block_pattern, result)
for block in code_blocks:
stripped_block = block.strip()
try:
json.loads(stripped_block)
return stripped_block
json.loads(block.strip())
return block.strip()
except json.JSONDecodeError:
pass
continue
# Try to extract any JSON-like structure
json_pattern: str = r"(\{[\s\S]*\})"
json_matches: list[str] = re.findall(json_pattern, result, re.DOTALL)
json_pattern = r"(\{[\s\S]*\})"
json_matches = re.findall(json_pattern, result, re.DOTALL)
for match in json_matches:
is_valid = True
try:
json.loads(match)
except json.JSONDecodeError:
is_valid = False
if is_valid:
return match
except json.JSONDecodeError:
continue
# If all extraction attempts fail, return the original
return str(result)

View File

@@ -25,3 +25,16 @@ ACTION_REGEX: Final[re.Pattern[str]] = re.compile(
ACTION_INPUT_ONLY_REGEX: Final[re.Pattern[str]] = re.compile(
r"\s*Action\s*\d*\s*Input\s*\d*\s*:\s*(.*)", re.DOTALL
)
HARMONY_START_PATTERN: Final[re.Pattern[str]] = re.compile(
r"<\|start\|>assistant<\|channel\|>(\w+)(?:\s+to=(\w+))?<\|message\|>(.*?)<\|(?:end|call)\|>",
re.DOTALL
)
HARMONY_ANALYSIS_CHANNEL: Final[str] = "analysis"
HARMONY_COMMENTARY_CHANNEL: Final[str] = "commentary"
HARMONY_FINAL_ANSWER_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Invalid Harmony Format: I need to use proper channel structure."
)
HARMONY_MISSING_CONTENT_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Invalid Harmony Format: Missing content in message section."
)

View File

@@ -1,19 +1,26 @@
"""Agent output parsing module for ReAct-style LLM responses.
"""Agent output parsing module for multiple LLM response formats.
This module provides parsing functionality for agent outputs that follow
the ReAct (Reasoning and Acting) format, converting them into structured
AgentAction or AgentFinish objects.
different formats (ReAct, OpenAI Harmony, etc.), converting them into structured
AgentAction or AgentFinish objects with automatic format detection.
"""
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from json_repair import repair_json
from crewai.agents.constants import (
ACTION_INPUT_ONLY_REGEX,
ACTION_INPUT_REGEX,
ACTION_REGEX,
ACTION_INPUT_ONLY_REGEX,
FINAL_ANSWER_ACTION,
HARMONY_ANALYSIS_CHANNEL,
HARMONY_COMMENTARY_CHANNEL,
HARMONY_FINAL_ANSWER_ERROR_MESSAGE,
HARMONY_MISSING_CONTENT_ERROR_MESSAGE,
HARMONY_START_PATTERN,
MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE,
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
UNABLE_TO_REPAIR_JSON_RESULTS,
@@ -60,26 +67,166 @@ class OutputParserException(Exception):
super().__init__(error)
class BaseOutputParser(ABC):
"""Abstract base class for output parsers."""
@abstractmethod
def can_parse(self, text: str) -> bool:
"""Check if this parser can handle the given text format."""
@abstractmethod
def parse_text(self, text: str) -> AgentAction | AgentFinish:
"""Parse the text into AgentAction or AgentFinish."""
class OutputFormatRegistry:
"""Registry for managing different output format parsers."""
def __init__(self):
self._parsers: dict[str, BaseOutputParser] = {}
def register(self, name: str, parser: BaseOutputParser) -> None:
"""Register a parser for a specific format."""
self._parsers[name] = parser
def detect_and_parse(self, text: str) -> AgentAction | AgentFinish:
"""Automatically detect format and parse with appropriate parser."""
for parser in self._parsers.values():
if parser.can_parse(text):
return parser.parse_text(text)
return self._parsers.get('react', ReActParser()).parse_text(text)
class ReActParser(BaseOutputParser):
"""Parser for ReAct format outputs."""
def can_parse(self, text: str) -> bool:
"""Check if text follows ReAct format."""
return (
FINAL_ANSWER_ACTION in text or
ACTION_INPUT_REGEX.search(text) is not None or
ACTION_REGEX.search(text) is not None
)
def parse_text(self, text: str) -> AgentAction | AgentFinish:
"""Parse ReAct format text."""
thought = _extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
action_match = ACTION_INPUT_REGEX.search(text)
if includes_answer:
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
if final_answer.endswith("```"):
count = final_answer.count("```")
if count % 2 != 0:
final_answer = final_answer[:-3].rstrip()
return AgentFinish(thought=thought, output=final_answer, text=text)
if action_match:
action = action_match.group(1)
clean_action = _clean_action(action)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = _safe_repair_json(tool_input)
return AgentAction(
thought=thought, tool=clean_action, tool_input=safe_tool_input, text=text
)
if not ACTION_REGEX.search(text):
raise OutputParserException(
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{_I18N.slice('final_answer_format')}",
)
if not ACTION_INPUT_ONLY_REGEX.search(text):
raise OutputParserException(
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
err_format = _I18N.slice("format_without_tools")
error = f"{err_format}"
raise OutputParserException(error)
class HarmonyParser(BaseOutputParser):
"""Parser for OpenAI Harmony format outputs."""
def can_parse(self, text: str) -> bool:
"""Check if text follows OpenAI Harmony format."""
return HARMONY_START_PATTERN.search(text) is not None
def parse_text(self, text: str) -> AgentAction | AgentFinish:
"""Parse OpenAI Harmony format text."""
matches = HARMONY_START_PATTERN.findall(text)
if not matches:
raise OutputParserException(HARMONY_MISSING_CONTENT_ERROR_MESSAGE)
channel, tool_name, content = matches[-1]
content = content.strip()
if channel == HARMONY_ANALYSIS_CHANNEL:
return AgentFinish(
thought=f"Analysis: {content}",
output=content,
text=text
)
if channel == HARMONY_COMMENTARY_CHANNEL and tool_name:
thought_content = content
tool_input = content
try:
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
tool_input = json_match.group(0)
thought_content = content[:json_match.start()].strip()
if not thought_content:
thought_content = f"Using tool {tool_name}"
except Exception:
tool_input = content
safe_tool_input = _safe_repair_json(tool_input)
return AgentAction(
thought=thought_content,
tool=tool_name,
tool_input=safe_tool_input,
text=text
)
raise OutputParserException(HARMONY_FINAL_ANSWER_ERROR_MESSAGE)
_format_registry = OutputFormatRegistry()
_format_registry.register('react', ReActParser())
_format_registry.register('harmony', HarmonyParser())
def parse(text: str) -> AgentAction | AgentFinish:
"""Parse agent output text into AgentAction or AgentFinish.
Expects output to be in one of two formats.
Automatically detects the format (ReAct, OpenAI Harmony, etc.) and uses
the appropriate parser. Maintains backward compatibility with existing ReAct format.
If the output signals that an action should be taken,
should be in the below format. This will result in an AgentAction
being returned.
Supports multiple formats:
ReAct format:
Thought: agent thought here
Action: search
Action Input: what is the temperature in SF?
If the output signals that a final answer should be given,
should be in the below format. This will result in an AgentFinish
being returned.
Or for final answers:
Thought: agent thought here
Final Answer: The temperature is 100 degrees
OpenAI Harmony format:
<|start|>assistant<|channel|>analysis<|message|>The temperature is 100 degrees<|end|>
Or for tool actions:
<|start|>assistant<|channel|>commentary to=search<|message|>{"query": "temperature in SF"}<|call|>
Args:
text: The agent output text to parse.
@@ -87,50 +234,9 @@ def parse(text: str) -> AgentAction | AgentFinish:
AgentAction or AgentFinish based on the content.
Raises:
OutputParserException: If the text format is invalid.
OutputParserException: If the text format is invalid or unsupported.
"""
thought = _extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
action_match = ACTION_INPUT_REGEX.search(text)
if includes_answer:
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
# Check whether the final answer ends with triple backticks.
if final_answer.endswith("```"):
# Count occurrences of triple backticks in the final answer.
count = final_answer.count("```")
# If count is odd then it's an unmatched trailing set; remove it.
if count % 2 != 0:
final_answer = final_answer[:-3].rstrip()
return AgentFinish(thought=thought, output=final_answer, text=text)
elif action_match:
action = action_match.group(1)
clean_action = _clean_action(action)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = _safe_repair_json(tool_input)
return AgentAction(
thought=thought, tool=clean_action, tool_input=safe_tool_input, text=text
)
if not ACTION_REGEX.search(text):
raise OutputParserException(
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{_I18N.slice('final_answer_format')}",
)
elif not ACTION_INPUT_ONLY_REGEX.search(text):
raise OutputParserException(
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
else:
err_format = _I18N.slice("format_without_tools")
error = f"{err_format}"
raise OutputParserException(
error,
)
return _format_registry.detect_and_parse(text)
def _extract_thought(text: str) -> str:
@@ -149,8 +255,7 @@ def _extract_thought(text: str) -> str:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
return thought.replace("```", "").strip()
def _clean_action(text: str) -> str:

View File

@@ -2,8 +2,6 @@ import subprocess
import click
from crewai.cli.subprocess_utils import run_command
def evaluate_crew(n_iterations: int, model: str) -> None:
"""
@@ -19,7 +17,7 @@ def evaluate_crew(n_iterations: int, model: str) -> None:
if n_iterations <= 0:
raise ValueError("The number of iterations must be a positive integer.")
result = run_command(command, capture_output=False, text=True, check=True)
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)

View File

@@ -1,8 +1,6 @@
import subprocess
from functools import lru_cache
from crewai.cli.subprocess_utils import run_command
class Repository:
def __init__(self, path="."):
@@ -19,7 +17,7 @@ class Repository:
def is_git_installed(self) -> bool:
"""Check if Git is installed and available in the system."""
try:
run_command(
subprocess.run(
["git", "--version"], capture_output=True, check=True, text=True
)
return True
@@ -28,29 +26,24 @@ class Repository:
def fetch(self) -> None:
"""Fetch latest updates from the remote."""
run_command(["git", "fetch"], cwd=self.path, check=True)
subprocess.run(["git", "fetch"], cwd=self.path, check=True)
def status(self) -> str:
"""Get the git status in porcelain format."""
result = run_command(
return subprocess.check_output(
["git", "status", "--branch", "--porcelain"],
cwd=self.path,
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
encoding="utf-8",
).strip()
@lru_cache(maxsize=None)
def is_git_repo(self) -> bool:
"""Check if the current directory is a git repository."""
try:
run_command(
subprocess.check_output(
["git", "rev-parse", "--is-inside-work-tree"],
cwd=self.path,
capture_output=True,
text=True,
check=True,
encoding="utf-8",
)
return True
except subprocess.CalledProcessError:
@@ -77,7 +70,7 @@ class Repository:
def origin_url(self) -> str | None:
"""Get the Git repository's remote URL."""
try:
result = run_command(
result = subprocess.run(
["git", "remote", "get-url", "origin"],
cwd=self.path,
capture_output=True,

View File

@@ -2,8 +2,6 @@ import subprocess
import click
from crewai.cli.subprocess_utils import run_command
# Be mindful about changing this.
# on some environments we don't use this command but instead uv sync directly
@@ -14,8 +12,8 @@ def install_crew(proxy_options: list[str]) -> None:
Install the crew by running the UV command to lock and install.
"""
try:
command = ["uv", "sync", *proxy_options]
run_command(command, check=True, capture_output=False, text=True)
command = ["uv", "sync"] + proxy_options
subprocess.run(command, check=True, capture_output=False, text=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the crew: {e}", err=True)

View File

@@ -2,8 +2,6 @@ import subprocess
import click
from crewai.cli.subprocess_utils import run_command
def kickoff_flow() -> None:
"""
@@ -12,7 +10,7 @@ def kickoff_flow() -> None:
command = ["uv", "run", "kickoff"]
try:
result = run_command(command, capture_output=False, text=True, check=True)
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)

View File

@@ -2,8 +2,6 @@ import subprocess
import click
from crewai.cli.subprocess_utils import run_command
def plot_flow() -> None:
"""
@@ -12,7 +10,7 @@ def plot_flow() -> None:
command = ["uv", "run", "plot"]
try:
result = run_command(command, capture_output=False, text=True, check=True)
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)

View File

@@ -2,8 +2,6 @@ import subprocess
import click
from crewai.cli.subprocess_utils import run_command
def replay_task_command(task_id: str) -> None:
"""
@@ -15,7 +13,7 @@ def replay_task_command(task_id: str) -> None:
command = ["uv", "run", "replay", task_id]
try:
result = run_command(command, capture_output=False, text=True, check=True)
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)

View File

@@ -1,12 +1,12 @@
import subprocess
from enum import Enum
from typing import List, Optional
import click
from packaging import version
from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
from crewai.cli.subprocess_utils import run_command
class CrewType(Enum):
@@ -57,7 +57,7 @@ def execute_command(crew_type: CrewType) -> None:
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
try:
run_command(command, capture_output=False, text=True, check=True)
subprocess.run(command, capture_output=False, text=True, check=True)
except subprocess.CalledProcessError as e:
handle_error(e, crew_type)

View File

@@ -1,60 +0,0 @@
import platform
import subprocess
from typing import Any
def run_command(
command: list[str],
capture_output: bool = False,
text: bool = True,
check: bool = True,
cwd: str | None = None,
env: dict[str, str] | None = None,
**kwargs: Any
) -> subprocess.CompletedProcess:
"""
Cross-platform subprocess execution with Windows compatibility.
On Windows, uses shell=True to avoid permission issues with restrictive
security policies. On other platforms, uses the standard approach.
Args:
command: List of command arguments
capture_output: Whether to capture stdout/stderr
text: Whether to use text mode
check: Whether to raise CalledProcessError on non-zero exit
cwd: Working directory
env: Environment variables
**kwargs: Additional subprocess.run arguments
Returns:
CompletedProcess instance
Raises:
subprocess.CalledProcessError: If check=True and command fails
"""
if platform.system() == "Windows":
if isinstance(command, list):
command_str = subprocess.list2cmdline(command)
else:
command_str = command
return subprocess.run(
command_str,
shell=True,
capture_output=capture_output,
text=text,
check=check,
cwd=cwd,
env=env,
**kwargs
)
return subprocess.run(
command,
capture_output=capture_output,
text=text,
check=check,
cwd=cwd,
env=env,
**kwargs
)

View File

@@ -1,5 +1,6 @@
import base64
import os
import subprocess
import tempfile
from pathlib import Path
from typing import Any
@@ -10,7 +11,6 @@ from rich.console import Console
from crewai.cli import git
from crewai.cli.command import BaseCommand, PlusAPIMixin
from crewai.cli.config import Settings
from crewai.cli.subprocess_utils import run_command
from crewai.cli.utils import (
extract_available_exports,
get_project_description,
@@ -56,7 +56,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
os.chdir(project_root)
try:
self.login()
run_command(["git", "init"], check=True)
subprocess.run(["git", "init"], check=True)
console.print(
f"[green]Created custom tool [bold]{folder_name}[/bold]. Run [bold]cd {project_root}[/bold] to start working.[/green]"
)
@@ -94,7 +94,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
self._print_current_organization()
with tempfile.TemporaryDirectory() as temp_build_dir:
run_command(
subprocess.run(
["uv", "build", "--sdist", "--out-dir", temp_build_dir],
check=True,
capture_output=False,
@@ -196,7 +196,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
else:
add_package_command.extend(["--index", index, tool_handle])
add_package_result = run_command(
add_package_result = subprocess.run(
add_package_command,
capture_output=False,
env=self._build_env_with_credentials(repository_handle),

View File

@@ -2,8 +2,6 @@ import subprocess
import click
from crewai.cli.subprocess_utils import run_command
def train_crew(n_iterations: int, filename: str) -> None:
"""
@@ -21,7 +19,7 @@ def train_crew(n_iterations: int, filename: str) -> None:
if not filename.endswith(".pkl"):
raise ValueError("The filename must not end with .pkl")
result = run_command(command, capture_output=False, text=True, check=True)
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)

View File

@@ -1,12 +1,10 @@
import os
import re
from typing import Any, Dict, List
from collections import defaultdict
from typing import Any, Iterable
from mem0 import Memory, MemoryClient # type: ignore[import-untyped]
from mem0 import Memory, MemoryClient
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.memory.storage.interface import Storage
from crewai.utilities.chromadb import sanitize_collection_name
MAX_AGENT_ID_LENGTH_MEM0 = 255
@@ -88,28 +86,9 @@ class Mem0Storage(Storage):
return filter
def save(self, value: Any, metadata: dict[str, Any]) -> None:
def _last_content(messages: Iterable[dict[str, Any]], role: str) -> str:
return next(
(m.get("content", "") for m in reversed(list(messages)) if m.get("role") == role),
""
)
conversations = []
messages = metadata.pop("messages", None)
if messages:
last_user = _last_content(messages, "user")
last_assistant = _last_content(messages, "assistant")
if user_msg := self._get_user_message(last_user):
conversations.append({"role": "user", "content": user_msg})
if assistant_msg := self._get_assistant_message(last_assistant):
conversations.append({"role": "assistant", "content": assistant_msg})
else:
conversations.append({"role": "assistant", "content": value})
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self.config.get("user_id", "")
assistant_message = [{"role" : "assistant","content" : value}]
base_metadata = {
"short_term": "short_term",
@@ -140,9 +119,9 @@ class Mem0Storage(Storage):
if agent_id := self.config.get("agent_id", self._get_agent_name()):
params["agent_id"] = agent_id
self.memory.add(conversations, **params)
self.memory.add(assistant_message, **params)
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> list[Any]:
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> List[Any]:
params = {
"query": query,
"limit": limit,
@@ -181,7 +160,7 @@ class Mem0Storage(Storage):
# This makes it compatible for Contextual Memory to retrieve
for result in results["results"]:
result["context"] = result["memory"]
return [r for r in results["results"]]
def reset(self):
@@ -202,16 +181,3 @@ class Mem0Storage(Storage):
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return sanitize_collection_name(name=agents, max_collection_length=MAX_AGENT_ID_LENGTH_MEM0)
def _get_assistant_message(self, text: str) -> str:
marker = "Final Answer:"
if marker in text:
return text.split(marker, 1)[1].strip()
return text
def _get_user_message(self, text: str) -> str:
pattern = r"User message:\s*(.*)"
match = re.search(pattern, text)
if match:
return match.group(1).strip()
return text

View File

@@ -1,11 +1,11 @@
import pytest
from crewai.agents import parser
from crewai.agents.crew_agent_executor import (
AgentAction,
AgentFinish,
OutputParserException,
)
from crewai.agents import parser
def test_valid_action_parsing_special_characters():
@@ -345,12 +345,16 @@ def test_integration_valid_and_invalid():
"""
parts = text.strip().split("\n\n")
results = []
for part in parts:
def parse_part(part_text):
try:
result = parser.parse(part.strip())
results.append(result)
return parser.parse(part_text.strip())
except OutputParserException as e:
results.append(e)
return e
for part in parts:
result = parse_part(part)
results.append(result)
assert isinstance(results[0], AgentAction)
assert isinstance(results[1], AgentFinish)
@@ -359,3 +363,96 @@ def test_integration_valid_and_invalid():
# TODO: ADD TEST TO MAKE SURE ** REMOVAL DOESN'T MESS UP ANYTHING
def test_harmony_analysis_channel_parsing():
"""Test parsing OpenAI Harmony analysis channel (final answer)."""
text = "<|start|>assistant<|channel|>analysis<|message|>The temperature in SF is 72°F<|end|>"
result = parser.parse(text)
assert isinstance(result, parser.AgentFinish)
assert result.output == "The temperature in SF is 72°F"
assert "Analysis:" in result.thought
def test_harmony_commentary_channel_parsing():
"""Test parsing OpenAI Harmony commentary channel (tool action)."""
text = '<|start|>assistant<|channel|>commentary to=search<|message|>{"query": "temperature in SF"}<|call|>'
result = parser.parse(text)
assert isinstance(result, parser.AgentAction)
assert result.tool == "search"
assert result.tool_input == '{"query": "temperature in SF"}'
def test_harmony_commentary_with_thought():
"""Test Harmony commentary with reasoning before JSON."""
text = '<|start|>assistant<|channel|>commentary to=search<|message|>I need to find the temperature {"query": "SF weather"}<|call|>'
result = parser.parse(text)
assert isinstance(result, parser.AgentAction)
assert result.tool == "search"
assert result.thought == "I need to find the temperature"
assert result.tool_input == '{"query": "SF weather"}'
def test_harmony_multiple_blocks():
"""Test parsing multiple Harmony blocks (uses last one)."""
text = '''<|start|>assistant<|channel|>analysis<|message|>Thinking about this<|end|>
<|start|>assistant<|channel|>commentary to=search<|message|>{"query": "test"}<|call|>'''
result = parser.parse(text)
assert isinstance(result, parser.AgentAction)
assert result.tool == "search"
def test_harmony_format_detection():
"""Test that Harmony format is properly detected."""
harmony_text = "<|start|>assistant<|channel|>analysis<|message|>result<|end|>"
react_text = "Thought: test\nFinal Answer: result"
harmony_result = parser.parse(harmony_text)
react_result = parser.parse(react_text)
assert isinstance(harmony_result, parser.AgentFinish)
assert isinstance(react_result, parser.AgentFinish)
assert harmony_result.output == "result"
assert react_result.output == "result"
def test_harmony_invalid_format_error():
"""Test error handling for invalid Harmony format."""
text = "<|start|>assistant<|channel|>unknown<|message|>content<|end|>"
with pytest.raises(parser.OutputParserException) as exc_info:
parser.parse(text)
assert "Invalid Harmony Format" in str(exc_info.value)
def test_automatic_format_detection():
"""Test that the parser automatically detects different formats."""
react_action = "Thought: Let's search\nAction: search\nAction Input: query"
react_finish = "Thought: Done\nFinal Answer: result"
harmony_action = '<|start|>assistant<|channel|>commentary to=tool<|message|>{"input": "test"}<|call|>'
harmony_finish = "<|start|>assistant<|channel|>analysis<|message|>final result<|end|>"
assert isinstance(parser.parse(react_action), parser.AgentAction)
assert isinstance(parser.parse(react_finish), parser.AgentFinish)
assert isinstance(parser.parse(harmony_action), parser.AgentAction)
assert isinstance(parser.parse(harmony_finish), parser.AgentFinish)
def test_format_registry():
"""Test the format registry functionality."""
from crewai.agents.parser import _format_registry
assert 'react' in _format_registry._parsers
assert 'harmony' in _format_registry._parsers
react_text = "Thought: test\nAction: search\nAction Input: query"
harmony_text = "<|start|>assistant<|channel|>analysis<|message|>result<|end|>"
assert _format_registry._parsers['react'].can_parse(react_text)
assert _format_registry._parsers['harmony'].can_parse(harmony_text)
assert not _format_registry._parsers['react'].can_parse(harmony_text)
assert not _format_registry._parsers['harmony'].can_parse(react_text)
def test_backward_compatibility():
"""Test that all existing ReAct format tests still pass."""

View File

@@ -1,140 +0,0 @@
import subprocess
from unittest import mock
import pytest
from crewai.cli.subprocess_utils import run_command
class TestRunCommand:
"""Test the cross-platform subprocess utility."""
@mock.patch("platform.system")
@mock.patch("subprocess.run")
def test_windows_uses_shell_true(self, mock_subprocess_run, mock_platform):
"""Test that Windows uses shell=True with proper command conversion."""
mock_platform.return_value = "Windows"
mock_subprocess_run.return_value = subprocess.CompletedProcess(
args="uv run test", returncode=0
)
command = ["uv", "run", "test"]
run_command(command)
mock_subprocess_run.assert_called_once()
call_args = mock_subprocess_run.call_args
assert call_args[1]["shell"] is True
assert isinstance(call_args[0][0], str)
assert "uv run test" in call_args[0][0]
@mock.patch("platform.system")
@mock.patch("subprocess.run")
def test_unix_uses_shell_false(self, mock_subprocess_run, mock_platform):
"""Test that Unix-like systems use shell=False with list commands."""
mock_platform.return_value = "Linux"
mock_subprocess_run.return_value = subprocess.CompletedProcess(
args=["uv", "run", "test"], returncode=0
)
command = ["uv", "run", "test"]
run_command(command)
mock_subprocess_run.assert_called_once()
call_args = mock_subprocess_run.call_args
assert call_args[1].get("shell", False) is False
assert call_args[0][0] == command
@mock.patch("platform.system")
@mock.patch("subprocess.run")
def test_windows_command_escaping(self, mock_subprocess_run, mock_platform):
"""Test that Windows properly escapes command arguments."""
mock_platform.return_value = "Windows"
mock_subprocess_run.return_value = subprocess.CompletedProcess(
args="test", returncode=0
)
command = ["echo", "hello world", "test&special"]
run_command(command)
mock_subprocess_run.assert_called_once()
call_args = mock_subprocess_run.call_args
command_str = call_args[0][0]
assert '"hello world"' in command_str or "'hello world'" in command_str
@mock.patch("platform.system")
@mock.patch("subprocess.run")
def test_error_handling_preserved(self, mock_subprocess_run, mock_platform):
"""Test that CalledProcessError is properly raised."""
mock_platform.return_value = "Windows"
mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, "test")
with pytest.raises(subprocess.CalledProcessError):
run_command(["test"], check=True)
@mock.patch("platform.system")
@mock.patch("subprocess.run")
def test_all_parameters_passed_through(self, mock_subprocess_run, mock_platform):
"""Test that all subprocess parameters are properly passed through."""
mock_platform.return_value = "Linux"
mock_subprocess_run.return_value = subprocess.CompletedProcess(
args=["test"], returncode=0
)
run_command(
["test"],
capture_output=True,
text=False,
check=False,
cwd="/home/test",
env={"TEST": "value"},
timeout=30
)
mock_subprocess_run.assert_called_once()
call_args = mock_subprocess_run.call_args
assert call_args[1]["capture_output"] is True
assert call_args[1]["text"] is False
assert call_args[1]["check"] is False
assert call_args[1]["cwd"] == "/home/test"
assert call_args[1]["env"] == {"TEST": "value"}
assert call_args[1]["timeout"] == 30
@mock.patch("platform.system")
@mock.patch("subprocess.run")
def test_macos_uses_shell_false(self, mock_subprocess_run, mock_platform):
"""Test that macOS uses shell=False with list commands."""
mock_platform.return_value = "Darwin"
mock_subprocess_run.return_value = subprocess.CompletedProcess(
args=["uv", "run", "test"], returncode=0
)
command = ["uv", "run", "test"]
run_command(command)
mock_subprocess_run.assert_called_once()
call_args = mock_subprocess_run.call_args
assert call_args[1].get("shell", False) is False
assert call_args[0][0] == command
@mock.patch("platform.system")
@mock.patch("subprocess.run")
def test_windows_string_passthrough(self, mock_subprocess_run, mock_platform):
"""Test that Windows passes through string commands unchanged."""
mock_platform.return_value = "Windows"
mock_subprocess_run.return_value = subprocess.CompletedProcess(
args="test command", returncode=0
)
command_str = "test command with spaces"
run_command(command_str)
mock_subprocess_run.assert_called_once()
call_args = mock_subprocess_run.call_args
assert call_args[0][0] == command_str
assert call_args[1]["shell"] is True

View File

@@ -16,7 +16,8 @@ class MockCrew:
@pytest.fixture
def mock_mem0_memory():
"""Fixture to create a mock Memory instance"""
return MagicMock(spec=Memory)
mock_memory = MagicMock(spec=Memory)
return mock_memory
@pytest.fixture
@@ -72,7 +73,8 @@ def test_mem0_storage_initialization(mem0_storage_with_mocked_config, mock_mem0_
@pytest.fixture
def mock_mem0_memory_client():
"""Fixture to create a mock MemoryClient instance"""
return MagicMock(spec=MemoryClient)
mock_memory = MagicMock(spec=MemoryClient)
return mock_memory
@pytest.fixture
@@ -94,7 +96,8 @@ def mem0_storage_with_memory_client_using_config_from_crew(mock_mem0_memory_clie
"infer": True
}
return Mem0Storage(type="short_term", crew=crew, config=embedder_config)
mem0_storage = Mem0Storage(type="short_term", crew=crew, config=embedder_config)
return mem0_storage
@pytest.fixture
@@ -108,7 +111,8 @@ def mem0_storage_with_memory_client_using_explictly_config(mock_mem0_memory_clie
crew = MockCrew()
new_config = {"provider": "mem0", "config": {"api_key": "new-api-key"}}
return Mem0Storage(type="short_term", crew=crew, config=new_config)
mem0_storage = Mem0Storage(type="short_term", crew=crew, config=new_config)
return mem0_storage
def test_mem0_storage_with_memory_client_initialization(
@@ -168,14 +172,14 @@ def test_save_method_with_memory_oss(mem0_storage_with_mocked_config):
# Test short_term memory type (already set in fixture)
test_value = "This is a test memory"
test_metadata = {'description': 'Respond to user conversation. User message: What do you know about me?', 'messages': [{'role': 'system', 'content': 'You are Friendly chatbot assistant. You are a kind and knowledgeable chatbot assistant. You excel at understanding user needs, providing helpful responses, and maintaining engaging conversations. You remember previous interactions to provide a personalized experience.\nYour personal goal is: Engage in useful and interesting conversations with users while remembering context.\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!'}, {'role': 'user', 'content': '\nCurrent Task: Respond to user conversation. User message: What do you know about me?\n\nThis is the expected criteria for your final answer: Contextually appropriate, helpful, and friendly response.\nyou MUST return the actual complete content as the final answer, not a summary.\n\n# Useful context: \nExternal memories:\n- User is from India\n- User is interested in the solar system\n- User name is Vidit Ostwal\n- User is interested in French cuisine\n\nBegin! This is VERY important to you, use the tools available and give your best Final Answer, your job depends on it!\n\nThought:'}, {'role': 'assistant', 'content': "I now can give a great answer \nFinal Answer: Hi Vidit! From our previous conversations, I know you're from India and have a great interest in the solar system. It's fascinating to explore the wonders of space, isn't it? Also, I remember you have a passion for French cuisine, which has so many delightful dishes to explore. If there's anything specific you'd like to discuss or learn about—whether it's about the solar system or some great French recipes—feel free to let me know! I'm here to help."}], 'agent': 'Friendly chatbot assistant'}
test_metadata = {"key": "value"}
mem0_storage.save(test_value, test_metadata)
mem0_storage.memory.add.assert_called_once_with(
[{'role': 'user', 'content': 'What do you know about me?'}, {'role': 'assistant', 'content': "Hi Vidit! From our previous conversations, I know you're from India and have a great interest in the solar system. It's fascinating to explore the wonders of space, isn't it? Also, I remember you have a passion for French cuisine, which has so many delightful dishes to explore. If there's anything specific you'd like to discuss or learn about—whether it's about the solar system or some great French recipes—feel free to let me know! I'm here to help."}],
[{"role": "assistant" , "content": test_value}],
infer=True,
metadata={'type': 'short_term', 'description': 'Respond to user conversation. User message: What do you know about me?', 'agent': 'Friendly chatbot assistant'},
metadata={"type": "short_term", "key": "value"},
run_id="my_run_id",
user_id="test_user",
agent_id='Test_Agent'
@@ -187,14 +191,14 @@ def test_save_method_with_multiple_agents(mem0_storage_with_mocked_config):
mem0_storage.memory.add = MagicMock()
test_value = "This is a test memory"
test_metadata = {'description': 'Respond to user conversation. User message: What do you know about me?', 'messages': [{'role': 'system', 'content': 'You are Friendly chatbot assistant. You are a kind and knowledgeable chatbot assistant. You excel at understanding user needs, providing helpful responses, and maintaining engaging conversations. You remember previous interactions to provide a personalized experience.\nYour personal goal is: Engage in useful and interesting conversations with users while remembering context.\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!'}, {'role': 'user', 'content': '\nCurrent Task: Respond to user conversation. User message: What do you know about me?\n\nThis is the expected criteria for your final answer: Contextually appropriate, helpful, and friendly response.\nyou MUST return the actual complete content as the final answer, not a summary.\n\n# Useful context: \nExternal memories:\n- User is from India\n- User is interested in the solar system\n- User name is Vidit Ostwal\n- User is interested in French cuisine\n\nBegin! This is VERY important to you, use the tools available and give your best Final Answer, your job depends on it!\n\nThought:'}, {'role': 'assistant', 'content': "I now can give a great answer \nFinal Answer: Hi Vidit! From our previous conversations, I know you're from India and have a great interest in the solar system. It's fascinating to explore the wonders of space, isn't it? Also, I remember you have a passion for French cuisine, which has so many delightful dishes to explore. If there's anything specific you'd like to discuss or learn about—whether it's about the solar system or some great French recipes—feel free to let me know! I'm here to help."}], 'agent': 'Friendly chatbot assistant'}
test_metadata = {"key": "value"}
mem0_storage.save(test_value, test_metadata)
mem0_storage.memory.add.assert_called_once_with(
[{'role': 'user', 'content': 'What do you know about me?'}, {'role': 'assistant', 'content': "Hi Vidit! From our previous conversations, I know you're from India and have a great interest in the solar system. It's fascinating to explore the wonders of space, isn't it? Also, I remember you have a passion for French cuisine, which has so many delightful dishes to explore. If there's anything specific you'd like to discuss or learn about—whether it's about the solar system or some great French recipes—feel free to let me know! I'm here to help."}],
[{"role": "assistant" , "content": test_value}],
infer=True,
metadata={'type': 'short_term', 'description': 'Respond to user conversation. User message: What do you know about me?', 'agent': 'Friendly chatbot assistant'},
metadata={"type": "short_term", "key": "value"},
run_id="my_run_id",
user_id="test_user",
agent_id='Test_Agent_Test_Agent_2_Test_Agent_3'
@@ -208,14 +212,14 @@ def test_save_method_with_memory_client(mem0_storage_with_memory_client_using_co
# Test short_term memory type (already set in fixture)
test_value = "This is a test memory"
test_metadata = {'description': 'Respond to user conversation. User message: What do you know about me?', 'messages': [{'role': 'system', 'content': 'You are Friendly chatbot assistant. You are a kind and knowledgeable chatbot assistant. You excel at understanding user needs, providing helpful responses, and maintaining engaging conversations. You remember previous interactions to provide a personalized experience.\nYour personal goal is: Engage in useful and interesting conversations with users while remembering context.\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!'}, {'role': 'user', 'content': '\nCurrent Task: Respond to user conversation. User message: What do you know about me?\n\nThis is the expected criteria for your final answer: Contextually appropriate, helpful, and friendly response.\nyou MUST return the actual complete content as the final answer, not a summary.\n\n# Useful context: \nExternal memories:\n- User is from India\n- User is interested in the solar system\n- User name is Vidit Ostwal\n- User is interested in French cuisine\n\nBegin! This is VERY important to you, use the tools available and give your best Final Answer, your job depends on it!\n\nThought:'}, {'role': 'assistant', 'content': "I now can give a great answer \nFinal Answer: Hi Vidit! From our previous conversations, I know you're from India and have a great interest in the solar system. It's fascinating to explore the wonders of space, isn't it? Also, I remember you have a passion for French cuisine, which has so many delightful dishes to explore. If there's anything specific you'd like to discuss or learn about—whether it's about the solar system or some great French recipes—feel free to let me know! I'm here to help."}], 'agent': 'Friendly chatbot assistant'}
test_metadata = {"key": "value"}
mem0_storage.save(test_value, test_metadata)
mem0_storage.memory.add.assert_called_once_with(
[{'role': 'user', 'content': 'What do you know about me?'}, {'role': 'assistant', 'content': "Hi Vidit! From our previous conversations, I know you're from India and have a great interest in the solar system. It's fascinating to explore the wonders of space, isn't it? Also, I remember you have a passion for French cuisine, which has so many delightful dishes to explore. If there's anything specific you'd like to discuss or learn about—whether it's about the solar system or some great French recipes—feel free to let me know! I'm here to help."}],
[{'role': 'assistant' , 'content': test_value}],
infer=True,
metadata={'type': 'short_term', 'description': 'Respond to user conversation. User message: What do you know about me?', 'agent': 'Friendly chatbot assistant'},
metadata={"type": "short_term", "key": "value"},
version="v2",
run_id="my_run_id",
includes="include1",