Compare commits

..

7 Commits

Author SHA1 Message Date
Devin AI
42e93984c3 fix: Refactor try-except loop to resolve PERF203 lint issue
Co-Authored-By: João <joao@crewai.com>
2025-09-12 14:01:09 +00:00
Devin AI
41ad22a573 fix: Address lint issues in parser implementation
- Fix loop variable usage in detect_and_parse method
- Replace try-except-pass with proper exception handling
- Clean up docstring formatting to remove whitespace on blank lines

Co-Authored-By: João <joao@crewai.com>
2025-09-12 13:55:27 +00:00
Devin AI
c84bdac4a6 feat: Add support for multiple LLM output formats beyond ReAct
- Implement extensible parser architecture with BaseOutputParser abstract class
- Add OutputFormatRegistry for automatic format detection and parsing
- Support OpenAI Harmony format with analysis and commentary channels
- Maintain full backward compatibility with existing ReAct format
- Add comprehensive tests for both formats and automatic detection
- Zero breaking changes - existing ReAct code continues to work unchanged

Addresses issue #3508: Support for Multiple LLM Output Formats Beyond ReAct

Co-Authored-By: João <joao@crewai.com>
2025-09-12 13:48:38 +00:00
Greyson LaLonde
1f1ab14b07 fix: resolve test duration cache issues in CI workflows (#3506)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
2025-09-12 08:38:47 -04:00
Lucas Gomide
1a70f1698e feat: add thread-safe platform context management (#3502)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2025-09-11 17:32:51 -04:00
Greyson LaLonde
8883fb656b feat(tests): add duration caching for pytest-split
- Cache test durations for optimized splitting
2025-09-11 15:16:05 -04:00
Greyson LaLonde
79d65e55a1 chore: add type annotations and docstrings to langgraph adapters (#3503) 2025-09-11 13:06:44 -04:00
15 changed files with 1160 additions and 618 deletions

View File

@@ -22,6 +22,8 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0 # Fetch all history for proper diff
- name: Restore global uv cache
id: cache-restore
@@ -45,14 +47,41 @@ jobs:
- name: Install the project
run: uv sync --all-groups --all-extras
- name: Restore test durations
uses: actions/cache/restore@v4
with:
path: .test_durations_py*
key: test-durations-py${{ matrix.python-version }}
- name: Run tests (group ${{ matrix.group }} of 8)
run: |
PYTHON_VERSION_SAFE=$(echo "${{ matrix.python-version }}" | tr '.' '_')
DURATION_FILE=".test_durations_py${PYTHON_VERSION_SAFE}"
# Temporarily always skip cached durations to fix test splitting
# When durations don't match, pytest-split runs duplicate tests instead of splitting
echo "Using even test splitting (duration cache disabled until fix merged)"
DURATIONS_ARG=""
# Original logic (disabled temporarily):
# if [ ! -f "$DURATION_FILE" ]; then
# echo "No cached durations found, tests will be split evenly"
# DURATIONS_ARG=""
# elif git diff origin/${{ github.base_ref }}...HEAD --name-only 2>/dev/null | grep -q "^tests/.*\.py$"; then
# echo "Test files have changed, skipping cached durations to avoid mismatches"
# DURATIONS_ARG=""
# else
# echo "No test changes detected, using cached test durations for optimal splitting"
# DURATIONS_ARG="--durations-path=${DURATION_FILE}"
# fi
uv run pytest \
--block-network \
--timeout=30 \
-vv \
--splits 8 \
--group ${{ matrix.group }} \
$DURATIONS_ARG \
--durations=10 \
-n auto \
--maxfail=3

View File

@@ -0,0 +1,71 @@
name: Update Test Durations
on:
push:
branches:
- main
paths:
- 'tests/**/*.py'
workflow_dispatch:
permissions:
contents: read
jobs:
update-durations:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
env:
OPENAI_API_KEY: fake-api-key
PYTHONUNBUFFERED: 1
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Restore global uv cache
id: cache-restore
uses: actions/cache/restore@v4
with:
path: |
~/.cache/uv
~/.local/share/uv
.venv
key: uv-main-py${{ matrix.python-version }}-${{ hashFiles('uv.lock') }}
restore-keys: |
uv-main-py${{ matrix.python-version }}-
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
version: "0.8.4"
python-version: ${{ matrix.python-version }}
enable-cache: false
- name: Install the project
run: uv sync --all-groups --all-extras
- name: Run all tests and store durations
run: |
PYTHON_VERSION_SAFE=$(echo "${{ matrix.python-version }}" | tr '.' '_')
uv run pytest --store-durations --durations-path=.test_durations_py${PYTHON_VERSION_SAFE} -n auto
continue-on-error: true
- name: Save durations to cache
if: always()
uses: actions/cache/save@v4
with:
path: .test_durations_py*
key: test-durations-py${{ matrix.python-version }}
- name: Save uv caches
if: steps.cache-restore.outputs.cache-hit != 'true'
uses: actions/cache/save@v4
with:
path: |
~/.cache/uv
~/.local/share/uv
.venv
key: uv-main-py${{ matrix.python-version }}-${{ hashFiles('uv.lock') }}

View File

@@ -1,47 +1,56 @@
from typing import Any, Dict, List, Optional
"""LangGraph agent adapter for CrewAI integration.
from pydantic import Field, PrivateAttr
This module contains the LangGraphAgentAdapter class that integrates LangGraph ReAct agents
with CrewAI's agent system. Provides memory persistence, tool integration, and structured
output functionality.
"""
from collections.abc import Callable
from typing import Any, cast
from pydantic import ConfigDict, Field, PrivateAttr
from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter
from crewai.agents.agent_adapters.langgraph.langgraph_tool_adapter import (
LangGraphToolAdapter,
)
from crewai.agents.agent_adapters.langgraph.protocols import (
LangGraphCheckPointMemoryModule,
LangGraphPrebuiltModule,
)
from crewai.agents.agent_adapters.langgraph.structured_output_converter import (
LangGraphConverterAdapter,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.utilities import Logger
from crewai.utilities.converter import Converter
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
try:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
LANGGRAPH_AVAILABLE = True
except ImportError:
LANGGRAPH_AVAILABLE = False
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.utilities import Logger
from crewai.utilities.converter import Converter
from crewai.utilities.import_utils import require
class LangGraphAgentAdapter(BaseAgentAdapter):
"""Adapter for LangGraph agents to work with CrewAI."""
"""Adapter for LangGraph agents to work with CrewAI.
model_config = {"arbitrary_types_allowed": True}
This adapter integrates LangGraph's ReAct agents with CrewAI's agent system,
providing memory persistence, tool integration, and structured output support.
"""
_logger: Logger = PrivateAttr(default_factory=lambda: Logger())
model_config = ConfigDict(arbitrary_types_allowed=True)
_logger: Logger = PrivateAttr(default_factory=Logger)
_tool_adapter: LangGraphToolAdapter = PrivateAttr()
_graph: Any = PrivateAttr(default=None)
_memory: Any = PrivateAttr(default=None)
_max_iterations: int = PrivateAttr(default=10)
function_calling_llm: Any = Field(default=None)
step_callback: Any = Field(default=None)
step_callback: Callable[..., Any] | None = Field(default=None)
model: str = Field(default="gpt-4o")
verbose: bool = Field(default=False)
@@ -51,17 +60,24 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
role: str,
goal: str,
backstory: str,
tools: Optional[List[BaseTool]] = None,
tools: list[BaseTool] | None = None,
llm: Any = None,
max_iterations: int = 10,
agent_config: Optional[Dict[str, Any]] = None,
agent_config: dict[str, Any] | None = None,
**kwargs,
):
"""Initialize the LangGraph agent adapter."""
if not LANGGRAPH_AVAILABLE:
raise ImportError(
"LangGraph Agent Dependencies are not installed. Please install it using `uv add langchain-core langgraph`"
)
) -> None:
"""Initialize the LangGraph agent adapter.
Args:
role: The role description for the agent.
goal: The primary goal the agent should achieve.
backstory: Background information about the agent.
tools: Optional list of tools available to the agent.
llm: Language model to use, defaults to gpt-4o.
max_iterations: Maximum number of iterations for task execution.
agent_config: Additional configuration for the LangGraph agent.
**kwargs: Additional arguments passed to the base adapter.
"""
super().__init__(
role=role,
goal=goal,
@@ -72,46 +88,65 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
**kwargs,
)
self._tool_adapter = LangGraphToolAdapter(tools=tools)
self._converter_adapter = LangGraphConverterAdapter(self)
self._converter_adapter: LangGraphConverterAdapter = LangGraphConverterAdapter(
self
)
self._max_iterations = max_iterations
self._setup_graph()
def _setup_graph(self) -> None:
"""Set up the LangGraph workflow graph."""
try:
self._memory = MemorySaver()
"""Set up the LangGraph workflow graph.
converted_tools: List[Any] = self._tool_adapter.tools()
if self._agent_config:
self._graph = create_react_agent(
model=self.llm,
tools=converted_tools,
checkpointer=self._memory,
debug=self.verbose,
**self._agent_config,
)
else:
self._graph = create_react_agent(
model=self.llm,
tools=converted_tools or [],
checkpointer=self._memory,
debug=self.verbose,
)
Initializes the memory saver and creates a ReAct agent with the configured
tools, memory checkpointer, and debug settings.
"""
except ImportError as e:
self._logger.log(
"error", f"Failed to import LangGraph dependencies: {str(e)}"
memory_saver: type[Any] = cast(
LangGraphCheckPointMemoryModule,
require(
"langgraph.checkpoint.memory",
purpose="LangGraph core functionality",
),
).MemorySaver
create_react_agent: Callable[..., Any] = cast(
LangGraphPrebuiltModule,
require(
"langgraph.prebuilt",
purpose="LangGraph core functionality",
),
).create_react_agent
self._memory = memory_saver()
converted_tools: list[Any] = self._tool_adapter.tools()
if self._agent_config:
self._graph = create_react_agent(
model=self.llm,
tools=converted_tools,
checkpointer=self._memory,
debug=self.verbose,
**self._agent_config,
)
else:
self._graph = create_react_agent(
model=self.llm,
tools=converted_tools or [],
checkpointer=self._memory,
debug=self.verbose,
)
raise
except Exception as e:
self._logger.log("error", f"Error setting up LangGraph agent: {str(e)}")
raise
def _build_system_prompt(self) -> str:
"""Build a system prompt for the LangGraph agent."""
"""Build a system prompt for the LangGraph agent.
Creates a prompt that includes the agent's role, goal, and backstory,
then enhances it through the converter adapter for structured output.
Returns:
The complete system prompt string.
"""
base_prompt = f"""
You are {self.role}.
Your goal is: {self.goal}
Your backstory: {self.backstory}
@@ -123,10 +158,25 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
def execute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> str:
"""Execute a task using the LangGraph workflow."""
"""Execute a task using the LangGraph workflow.
Configures the agent, processes the task through the LangGraph workflow,
and handles event emission for execution tracking.
Args:
task: The task object to execute.
context: Optional context information for the task.
tools: Optional additional tools for this specific execution.
Returns:
The final answer from the task execution.
Raises:
Exception: If task execution fails.
"""
self.create_agent_executor(tools)
self.configure_structured_output(task)
@@ -151,9 +201,11 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
session_id = f"task_{id(task)}"
config = {"configurable": {"thread_id": session_id}}
config: dict[str, dict[str, str]] = {
"configurable": {"thread_id": session_id}
}
result = self._graph.invoke(
result: dict[str, Any] = self._graph.invoke(
{
"messages": [
("system", self._build_system_prompt()),
@@ -163,10 +215,10 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
config,
)
messages = result.get("messages", [])
last_message = messages[-1] if messages else None
messages: list[Any] = result.get("messages", [])
last_message: Any = messages[-1] if messages else None
final_answer = ""
final_answer: str = ""
if isinstance(last_message, dict):
final_answer = last_message.get("content", "")
elif hasattr(last_message, "content"):
@@ -186,7 +238,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
return final_answer
except Exception as e:
self._logger.log("error", f"Error executing LangGraph task: {str(e)}")
self._logger.log("error", f"Error executing LangGraph task: {e!s}")
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
@@ -197,29 +249,67 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure the LangGraph agent for execution."""
def create_agent_executor(self, tools: list[BaseTool] | None = None) -> None:
"""Configure the LangGraph agent for execution.
Args:
tools: Optional tools to configure for the agent.
"""
self.configure_tools(tools)
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure tools for the LangGraph agent."""
def configure_tools(self, tools: list[BaseTool] | None = None) -> None:
"""Configure tools for the LangGraph agent.
Merges additional tools with existing ones and updates the graph's
available tools through the tool adapter.
Args:
tools: Optional additional tools to configure.
"""
if tools:
all_tools = list(self.tools or []) + list(tools or [])
all_tools: list[BaseTool] = list(self.tools or []) + list(tools or [])
self._tool_adapter.configure_tools(all_tools)
available_tools = self._tool_adapter.tools()
available_tools: list[Any] = self._tool_adapter.tools()
self._graph.tools = available_tools
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
"""Implement delegation tools support for LangGraph."""
agent_tools = AgentTools(agents=agents)
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
"""Implement delegation tools support for LangGraph.
Creates delegation tools that allow this agent to delegate tasks to other agents.
Args:
agents: List of agents available for delegation.
Returns:
List of delegation tools.
"""
agent_tools: AgentTools = AgentTools(agents=agents)
return agent_tools.tools()
@staticmethod
def get_output_converter(
self, llm: Any, text: str, model: Any, instructions: str
) -> Any:
"""Convert output format if needed."""
llm: Any, text: str, model: Any, instructions: str
) -> Converter:
"""Convert output format if needed.
Args:
llm: Language model instance.
text: Text to convert.
model: Model configuration.
instructions: Conversion instructions.
Returns:
Converter instance for output transformation.
"""
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def configure_structured_output(self, task) -> None:
"""Configure the structured output for LangGraph."""
def configure_structured_output(self, task: Any) -> None:
"""Configure the structured output for LangGraph.
Uses the converter adapter to set up structured output formatting
based on the task requirements.
Args:
task: Task object containing output requirements.
"""
self._converter_adapter.configure_structured_output(task)

View File

@@ -1,38 +1,72 @@
"""LangGraph tool adapter for CrewAI tool integration.
This module contains the LangGraphToolAdapter class that converts CrewAI tools
to LangGraph-compatible format using langchain_core.tools.
"""
import inspect
from typing import Any, List, Optional
from collections.abc import Awaitable
from typing import Any
from crewai.agents.agent_adapters.base_tool_adapter import BaseToolAdapter
from crewai.tools.base_tool import BaseTool
class LangGraphToolAdapter(BaseToolAdapter):
"""Adapts CrewAI tools to LangGraph agent tool compatible format"""
"""Adapts CrewAI tools to LangGraph agent tool compatible format.
def __init__(self, tools: Optional[List[BaseTool]] = None):
self.original_tools = tools or []
self.converted_tools = []
Converts CrewAI BaseTool instances to langchain_core.tools format
that can be used by LangGraph agents.
"""
def configure_tools(self, tools: List[BaseTool]) -> None:
def __init__(self, tools: list[BaseTool] | None = None) -> None:
"""Initialize the tool adapter.
Args:
tools: Optional list of CrewAI tools to adapt.
"""
Configure and convert CrewAI tools to LangGraph-compatible format.
LangGraph expects tools in langchain_core.tools format.
"""
from langchain_core.tools import BaseTool, StructuredTool
super().__init__()
self.original_tools: list[BaseTool] = tools or []
self.converted_tools: list[Any] = []
converted_tools = []
def configure_tools(self, tools: list[BaseTool]) -> None:
"""Configure and convert CrewAI tools to LangGraph-compatible format.
LangGraph expects tools in langchain_core.tools format. This method
converts CrewAI BaseTool instances to StructuredTool instances.
Args:
tools: List of CrewAI tools to convert.
"""
from langchain_core.tools import BaseTool as LangChainBaseTool
from langchain_core.tools import StructuredTool
converted_tools: list[Any] = []
if self.original_tools:
all_tools = tools + self.original_tools
all_tools: list[BaseTool] = tools + self.original_tools
else:
all_tools = tools
for tool in all_tools:
if isinstance(tool, BaseTool):
if isinstance(tool, LangChainBaseTool):
converted_tools.append(tool)
continue
sanitized_name = self.sanitize_tool_name(tool.name)
sanitized_name: str = self.sanitize_tool_name(tool.name)
async def tool_wrapper(*args, tool=tool, **kwargs):
output = None
async def tool_wrapper(
*args: Any, tool: BaseTool = tool, **kwargs: Any
) -> Any:
"""Wrapper function to adapt CrewAI tool calls to LangGraph format.
Args:
*args: Positional arguments for the tool.
tool: The CrewAI tool to wrap.
**kwargs: Keyword arguments for the tool.
Returns:
The result from the tool execution.
"""
output: Any | Awaitable[Any]
if len(args) > 0 and isinstance(args[0], str):
output = tool.run(args[0])
elif "input" in kwargs:
@@ -41,12 +75,12 @@ class LangGraphToolAdapter(BaseToolAdapter):
output = tool.run(**kwargs)
if inspect.isawaitable(output):
result = await output
result: Any = await output
else:
result = output
return result
converted_tool = StructuredTool(
converted_tool: StructuredTool = StructuredTool(
name=sanitized_name,
description=tool.description,
func=tool_wrapper,
@@ -57,5 +91,10 @@ class LangGraphToolAdapter(BaseToolAdapter):
self.converted_tools = converted_tools
def tools(self) -> List[Any]:
def tools(self) -> list[Any]:
"""Get the list of converted tools.
Returns:
List of LangGraph-compatible tools.
"""
return self.converted_tools or []

View File

@@ -0,0 +1,55 @@
"""Type protocols for LangGraph modules."""
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class LangGraphMemorySaver(Protocol):
"""Protocol for LangGraph MemorySaver.
Defines the interface for LangGraph's memory persistence mechanism.
"""
def __init__(self) -> None:
"""Initialize the memory saver."""
...
@runtime_checkable
class LangGraphCheckPointMemoryModule(Protocol):
"""Protocol for LangGraph checkpoint memory module.
Defines the interface for modules containing memory checkpoint functionality.
"""
MemorySaver: type[LangGraphMemorySaver]
@runtime_checkable
class LangGraphPrebuiltModule(Protocol):
"""Protocol for LangGraph prebuilt module.
Defines the interface for modules containing prebuilt agent factories.
"""
def create_react_agent(
self,
model: Any,
tools: list[Any],
checkpointer: Any,
debug: bool = False,
**kwargs: Any,
) -> Any:
"""Create a ReAct agent with the given configuration.
Args:
model: The language model to use for the agent.
tools: List of tools available to the agent.
checkpointer: Memory checkpointer for state persistence.
debug: Whether to enable debug mode.
**kwargs: Additional configuration options.
Returns:
The configured ReAct agent instance.
"""
...

View File

@@ -1,21 +1,45 @@
"""LangGraph structured output converter for CrewAI task integration.
This module contains the LangGraphConverterAdapter class that handles structured
output conversion for LangGraph agents, supporting JSON and Pydantic model formats.
"""
import json
import re
from typing import Any, Literal
from crewai.agents.agent_adapters.base_converter_adapter import BaseConverterAdapter
from crewai.utilities.converter import generate_model_description
class LangGraphConverterAdapter(BaseConverterAdapter):
"""Adapter for handling structured output conversion in LangGraph agents"""
"""Adapter for handling structured output conversion in LangGraph agents.
def __init__(self, agent_adapter):
"""Initialize the converter adapter with a reference to the agent adapter"""
self.agent_adapter = agent_adapter
self._output_format = None
self._schema = None
self._system_prompt_appendix = None
Converts task output requirements into system prompt modifications and
post-processing logic to ensure agents return properly structured outputs.
"""
def configure_structured_output(self, task) -> None:
"""Configure the structured output for LangGraph."""
def __init__(self, agent_adapter: Any) -> None:
"""Initialize the converter adapter with a reference to the agent adapter.
Args:
agent_adapter: The LangGraph agent adapter instance.
"""
super().__init__(agent_adapter=agent_adapter)
self.agent_adapter: Any = agent_adapter
self._output_format: Literal["json", "pydantic"] | None = None
self._schema: str | None = None
self._system_prompt_appendix: str | None = None
def configure_structured_output(self, task: Any) -> None:
"""Configure the structured output for LangGraph.
Analyzes the task's output requirements and sets up the necessary
formatting and validation logic.
Args:
task: The task object containing output format specifications.
"""
if not (task.output_json or task.output_pydantic):
self._output_format = None
self._schema = None
@@ -32,7 +56,14 @@ class LangGraphConverterAdapter(BaseConverterAdapter):
self._system_prompt_appendix = self._generate_system_prompt_appendix()
def _generate_system_prompt_appendix(self) -> str:
"""Generate an appendix for the system prompt to enforce structured output"""
"""Generate an appendix for the system prompt to enforce structured output.
Creates instructions that are appended to the system prompt to guide
the agent in producing properly formatted output.
Returns:
System prompt appendix string, or empty string if no structured output.
"""
if not self._output_format or not self._schema:
return ""
@@ -41,19 +72,36 @@ Important: Your final answer MUST be provided in the following structured format
{self._schema}
DO NOT include any markdown code blocks, backticks, or other formatting around your response.
DO NOT include any markdown code blocks, backticks, or other formatting around your response.
The output should be raw JSON that exactly matches the specified schema.
"""
def enhance_system_prompt(self, original_prompt: str) -> str:
"""Add structured output instructions to the system prompt if needed"""
"""Add structured output instructions to the system prompt if needed.
Args:
original_prompt: The base system prompt.
Returns:
Enhanced system prompt with structured output instructions.
"""
if not self._system_prompt_appendix:
return original_prompt
return f"{original_prompt}\n{self._system_prompt_appendix}"
def post_process_result(self, result: str) -> str:
"""Post-process the result to ensure it matches the expected format"""
"""Post-process the result to ensure it matches the expected format.
Attempts to extract and validate JSON content from agent responses,
handling cases where JSON may be wrapped in markdown or other formatting.
Args:
result: The raw result string from the agent.
Returns:
Processed result string, ideally in valid JSON format.
"""
if not self._output_format:
return result
@@ -65,16 +113,16 @@ The output should be raw JSON that exactly matches the specified schema.
return result
except json.JSONDecodeError:
# Try to extract JSON from the text
import re
json_match = re.search(r"(\{.*\})", result, re.DOTALL)
json_match: re.Match[str] | None = re.search(
r"(\{.*})", result, re.DOTALL
)
if json_match:
try:
extracted = json_match.group(1)
extracted: str = json_match.group(1)
# Validate it's proper JSON
json.loads(extracted)
return extracted
except:
except json.JSONDecodeError:
pass
return result

View File

@@ -25,3 +25,16 @@ ACTION_REGEX: Final[re.Pattern[str]] = re.compile(
ACTION_INPUT_ONLY_REGEX: Final[re.Pattern[str]] = re.compile(
r"\s*Action\s*\d*\s*Input\s*\d*\s*:\s*(.*)", re.DOTALL
)
HARMONY_START_PATTERN: Final[re.Pattern[str]] = re.compile(
r"<\|start\|>assistant<\|channel\|>(\w+)(?:\s+to=(\w+))?<\|message\|>(.*?)<\|(?:end|call)\|>",
re.DOTALL
)
HARMONY_ANALYSIS_CHANNEL: Final[str] = "analysis"
HARMONY_COMMENTARY_CHANNEL: Final[str] = "commentary"
HARMONY_FINAL_ANSWER_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Invalid Harmony Format: I need to use proper channel structure."
)
HARMONY_MISSING_CONTENT_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Invalid Harmony Format: Missing content in message section."
)

View File

@@ -1,19 +1,26 @@
"""Agent output parsing module for ReAct-style LLM responses.
"""Agent output parsing module for multiple LLM response formats.
This module provides parsing functionality for agent outputs that follow
the ReAct (Reasoning and Acting) format, converting them into structured
AgentAction or AgentFinish objects.
different formats (ReAct, OpenAI Harmony, etc.), converting them into structured
AgentAction or AgentFinish objects with automatic format detection.
"""
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from json_repair import repair_json
from crewai.agents.constants import (
ACTION_INPUT_ONLY_REGEX,
ACTION_INPUT_REGEX,
ACTION_REGEX,
ACTION_INPUT_ONLY_REGEX,
FINAL_ANSWER_ACTION,
HARMONY_ANALYSIS_CHANNEL,
HARMONY_COMMENTARY_CHANNEL,
HARMONY_FINAL_ANSWER_ERROR_MESSAGE,
HARMONY_MISSING_CONTENT_ERROR_MESSAGE,
HARMONY_START_PATTERN,
MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE,
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
UNABLE_TO_REPAIR_JSON_RESULTS,
@@ -60,26 +67,166 @@ class OutputParserException(Exception):
super().__init__(error)
class BaseOutputParser(ABC):
"""Abstract base class for output parsers."""
@abstractmethod
def can_parse(self, text: str) -> bool:
"""Check if this parser can handle the given text format."""
@abstractmethod
def parse_text(self, text: str) -> AgentAction | AgentFinish:
"""Parse the text into AgentAction or AgentFinish."""
class OutputFormatRegistry:
"""Registry for managing different output format parsers."""
def __init__(self):
self._parsers: dict[str, BaseOutputParser] = {}
def register(self, name: str, parser: BaseOutputParser) -> None:
"""Register a parser for a specific format."""
self._parsers[name] = parser
def detect_and_parse(self, text: str) -> AgentAction | AgentFinish:
"""Automatically detect format and parse with appropriate parser."""
for parser in self._parsers.values():
if parser.can_parse(text):
return parser.parse_text(text)
return self._parsers.get('react', ReActParser()).parse_text(text)
class ReActParser(BaseOutputParser):
"""Parser for ReAct format outputs."""
def can_parse(self, text: str) -> bool:
"""Check if text follows ReAct format."""
return (
FINAL_ANSWER_ACTION in text or
ACTION_INPUT_REGEX.search(text) is not None or
ACTION_REGEX.search(text) is not None
)
def parse_text(self, text: str) -> AgentAction | AgentFinish:
"""Parse ReAct format text."""
thought = _extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
action_match = ACTION_INPUT_REGEX.search(text)
if includes_answer:
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
if final_answer.endswith("```"):
count = final_answer.count("```")
if count % 2 != 0:
final_answer = final_answer[:-3].rstrip()
return AgentFinish(thought=thought, output=final_answer, text=text)
if action_match:
action = action_match.group(1)
clean_action = _clean_action(action)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = _safe_repair_json(tool_input)
return AgentAction(
thought=thought, tool=clean_action, tool_input=safe_tool_input, text=text
)
if not ACTION_REGEX.search(text):
raise OutputParserException(
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{_I18N.slice('final_answer_format')}",
)
if not ACTION_INPUT_ONLY_REGEX.search(text):
raise OutputParserException(
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
err_format = _I18N.slice("format_without_tools")
error = f"{err_format}"
raise OutputParserException(error)
class HarmonyParser(BaseOutputParser):
"""Parser for OpenAI Harmony format outputs."""
def can_parse(self, text: str) -> bool:
"""Check if text follows OpenAI Harmony format."""
return HARMONY_START_PATTERN.search(text) is not None
def parse_text(self, text: str) -> AgentAction | AgentFinish:
"""Parse OpenAI Harmony format text."""
matches = HARMONY_START_PATTERN.findall(text)
if not matches:
raise OutputParserException(HARMONY_MISSING_CONTENT_ERROR_MESSAGE)
channel, tool_name, content = matches[-1]
content = content.strip()
if channel == HARMONY_ANALYSIS_CHANNEL:
return AgentFinish(
thought=f"Analysis: {content}",
output=content,
text=text
)
if channel == HARMONY_COMMENTARY_CHANNEL and tool_name:
thought_content = content
tool_input = content
try:
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
tool_input = json_match.group(0)
thought_content = content[:json_match.start()].strip()
if not thought_content:
thought_content = f"Using tool {tool_name}"
except Exception:
tool_input = content
safe_tool_input = _safe_repair_json(tool_input)
return AgentAction(
thought=thought_content,
tool=tool_name,
tool_input=safe_tool_input,
text=text
)
raise OutputParserException(HARMONY_FINAL_ANSWER_ERROR_MESSAGE)
_format_registry = OutputFormatRegistry()
_format_registry.register('react', ReActParser())
_format_registry.register('harmony', HarmonyParser())
def parse(text: str) -> AgentAction | AgentFinish:
"""Parse agent output text into AgentAction or AgentFinish.
Expects output to be in one of two formats.
Automatically detects the format (ReAct, OpenAI Harmony, etc.) and uses
the appropriate parser. Maintains backward compatibility with existing ReAct format.
If the output signals that an action should be taken,
should be in the below format. This will result in an AgentAction
being returned.
Supports multiple formats:
ReAct format:
Thought: agent thought here
Action: search
Action Input: what is the temperature in SF?
If the output signals that a final answer should be given,
should be in the below format. This will result in an AgentFinish
being returned.
Or for final answers:
Thought: agent thought here
Final Answer: The temperature is 100 degrees
OpenAI Harmony format:
<|start|>assistant<|channel|>analysis<|message|>The temperature is 100 degrees<|end|>
Or for tool actions:
<|start|>assistant<|channel|>commentary to=search<|message|>{"query": "temperature in SF"}<|call|>
Args:
text: The agent output text to parse.
@@ -87,50 +234,9 @@ def parse(text: str) -> AgentAction | AgentFinish:
AgentAction or AgentFinish based on the content.
Raises:
OutputParserException: If the text format is invalid.
OutputParserException: If the text format is invalid or unsupported.
"""
thought = _extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
action_match = ACTION_INPUT_REGEX.search(text)
if includes_answer:
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
# Check whether the final answer ends with triple backticks.
if final_answer.endswith("```"):
# Count occurrences of triple backticks in the final answer.
count = final_answer.count("```")
# If count is odd then it's an unmatched trailing set; remove it.
if count % 2 != 0:
final_answer = final_answer[:-3].rstrip()
return AgentFinish(thought=thought, output=final_answer, text=text)
elif action_match:
action = action_match.group(1)
clean_action = _clean_action(action)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = _safe_repair_json(tool_input)
return AgentAction(
thought=thought, tool=clean_action, tool_input=safe_tool_input, text=text
)
if not ACTION_REGEX.search(text):
raise OutputParserException(
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{_I18N.slice('final_answer_format')}",
)
elif not ACTION_INPUT_ONLY_REGEX.search(text):
raise OutputParserException(
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
else:
err_format = _I18N.slice("format_without_tools")
error = f"{err_format}"
raise OutputParserException(
error,
)
return _format_registry.detect_and_parse(text)
def _extract_thought(text: str) -> str:
@@ -149,8 +255,7 @@ def _extract_thought(text: str) -> str:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
return thought.replace("```", "").strip()
def _clean_action(text: str) -> str:

25
src/crewai/context.py Normal file
View File

@@ -0,0 +1,25 @@
import os
import contextvars
from typing import Optional
from contextlib import contextmanager
_platform_integration_token: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"platform_integration_token", default=None
)
def set_platform_integration_token(integration_token: str) -> None:
_platform_integration_token.set(integration_token)
def get_platform_integration_token() -> Optional[str]:
token = _platform_integration_token.get()
if token is None:
token = os.getenv("CREWAI_PLATFORM_INTEGRATION_TOKEN")
return token
@contextmanager
def platform_context(integration_token: str):
token = _platform_integration_token.set(integration_token)
try:
yield
finally:
_platform_integration_token.reset(token)

View File

@@ -1,25 +1,25 @@
from typing import Any, ClassVar
from typing import Any, Dict, Optional
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.syntax import Syntax
from rich.text import Text
from rich.tree import Tree
from rich.live import Live
from rich.syntax import Syntax
class ConsoleFormatter:
current_crew_tree: Tree | None = None
current_task_branch: Tree | None = None
current_agent_branch: Tree | None = None
current_tool_branch: Tree | None = None
current_flow_tree: Tree | None = None
current_method_branch: Tree | None = None
current_lite_agent_branch: Tree | None = None
tool_usage_counts: ClassVar[dict[str, int]] = {}
current_reasoning_branch: Tree | None = None # Track reasoning status
current_crew_tree: Optional[Tree] = None
current_task_branch: Optional[Tree] = None
current_agent_branch: Optional[Tree] = None
current_tool_branch: Optional[Tree] = None
current_flow_tree: Optional[Tree] = None
current_method_branch: Optional[Tree] = None
current_lite_agent_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
current_reasoning_branch: Optional[Tree] = None # Track reasoning status
_live_paused: bool = False
current_llm_tool_tree: Tree | None = None
current_llm_tool_tree: Optional[Tree] = None
def __init__(self, verbose: bool = False):
self.console = Console(width=None)
@@ -29,7 +29,7 @@ class ConsoleFormatter:
# instance so the previous render is replaced instead of writing a new one.
# Once any non-Tree renderable is printed we stop the Live session so the
# final Tree persists on the terminal.
self._live: Live | None = None
self._live: Optional[Live] = None
def create_panel(self, content: Text, title: str, style: str = "blue") -> Panel:
"""Create a standardized panel with consistent styling."""
@@ -45,7 +45,7 @@ class ConsoleFormatter:
title: str,
name: str,
status_style: str = "blue",
tool_args: dict[str, Any] | str = "",
tool_args: Dict[str, Any] | str = "",
**fields,
) -> Text:
"""Create standardized status content with consistent formatting."""
@@ -70,7 +70,7 @@ class ConsoleFormatter:
prefix: str,
name: str,
style: str = "blue",
status: str | None = None,
status: Optional[str] = None,
) -> None:
"""Update tree label with consistent formatting."""
label = Text()
@@ -115,7 +115,7 @@ class ConsoleFormatter:
self._live.update(tree, refresh=True)
return # Nothing else to do
# Case 2: blank line while a live session is running - ignore so we
# Case 2: blank line while a live session is running ignore so we
# don't break the in-place rendering behaviour
if len(args) == 0 and self._live:
return
@@ -156,7 +156,7 @@ class ConsoleFormatter:
def update_crew_tree(
self,
tree: Tree | None,
tree: Optional[Tree],
crew_name: str,
source_id: str,
status: str = "completed",
@@ -196,7 +196,7 @@ class ConsoleFormatter:
self.print_panel(content, title, style)
def create_crew_tree(self, crew_name: str, source_id: str) -> Tree | None:
def create_crew_tree(self, crew_name: str, source_id: str) -> Optional[Tree]:
"""Create and initialize a new crew tree with initial status."""
if not self.verbose:
return None
@@ -220,8 +220,8 @@ class ConsoleFormatter:
return tree
def create_task_branch(
self, crew_tree: Tree | None, task_id: str, task_name: str | None = None
) -> Tree | None:
self, crew_tree: Optional[Tree], task_id: str, task_name: Optional[str] = None
) -> Optional[Tree]:
"""Create and initialize a task branch."""
if not self.verbose:
return None
@@ -255,11 +255,11 @@ class ConsoleFormatter:
def update_task_status(
self,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
task_id: str,
agent_role: str,
status: str = "completed",
task_name: str | None = None,
task_name: Optional[str] = None,
) -> None:
"""Update task status in the tree."""
if not self.verbose or crew_tree is None:
@@ -306,8 +306,8 @@ class ConsoleFormatter:
self.print_panel(content, panel_title, style)
def create_agent_branch(
self, task_branch: Tree | None, agent_role: str, crew_tree: Tree | None
) -> Tree | None:
self, task_branch: Optional[Tree], agent_role: str, crew_tree: Optional[Tree]
) -> Optional[Tree]:
"""Create and initialize an agent branch."""
if not self.verbose or not task_branch or not crew_tree:
return None
@@ -325,9 +325,9 @@ class ConsoleFormatter:
def update_agent_status(
self,
agent_branch: Tree | None,
agent_branch: Optional[Tree],
agent_role: str,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
status: str = "completed",
) -> None:
"""Update agent status in the tree."""
@@ -336,7 +336,7 @@ class ConsoleFormatter:
# altering the tree. Keeping it a no-op avoids duplicate status lines.
return
def create_flow_tree(self, flow_name: str, flow_id: str) -> Tree | None:
def create_flow_tree(self, flow_name: str, flow_id: str) -> Optional[Tree]:
"""Create and initialize a flow tree."""
content = self.create_status_content(
"Starting Flow Execution", flow_name, "blue", ID=flow_id
@@ -356,7 +356,7 @@ class ConsoleFormatter:
return flow_tree
def start_flow(self, flow_name: str, flow_id: str) -> Tree | None:
def start_flow(self, flow_name: str, flow_id: str) -> Optional[Tree]:
"""Initialize a flow execution tree."""
flow_tree = Tree("")
flow_label = Text()
@@ -376,7 +376,7 @@ class ConsoleFormatter:
def update_flow_status(
self,
flow_tree: Tree | None,
flow_tree: Optional[Tree],
flow_name: str,
flow_id: str,
status: str = "completed",
@@ -423,11 +423,11 @@ class ConsoleFormatter:
def update_method_status(
self,
method_branch: Tree | None,
flow_tree: Tree | None,
method_branch: Optional[Tree],
flow_tree: Optional[Tree],
method_name: str,
status: str = "running",
) -> Tree | None:
) -> Optional[Tree]:
"""Update method status in the flow tree."""
if not flow_tree:
return None
@@ -480,7 +480,7 @@ class ConsoleFormatter:
def handle_llm_tool_usage_started(
self,
tool_name: str,
tool_args: dict[str, Any] | str,
tool_args: Dict[str, Any] | str,
):
# Create status content for the tool usage
content = self.create_status_content(
@@ -520,11 +520,11 @@ class ConsoleFormatter:
def handle_tool_usage_started(
self,
agent_branch: Tree | None,
agent_branch: Optional[Tree],
tool_name: str,
crew_tree: Tree | None,
tool_args: dict[str, Any] | str = "",
) -> Tree | None:
crew_tree: Optional[Tree],
tool_args: Dict[str, Any] | str = "",
) -> Optional[Tree]:
"""Handle tool usage started event."""
if not self.verbose:
return None
@@ -569,9 +569,9 @@ class ConsoleFormatter:
def handle_tool_usage_finished(
self,
tool_branch: Tree | None,
tool_branch: Optional[Tree],
tool_name: str,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
) -> None:
"""Handle tool usage finished event."""
if not self.verbose or tool_branch is None:
@@ -600,10 +600,10 @@ class ConsoleFormatter:
def handle_tool_usage_error(
self,
tool_branch: Tree | None,
tool_branch: Optional[Tree],
tool_name: str,
error: str,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
) -> None:
"""Handle tool usage error event."""
if not self.verbose:
@@ -631,9 +631,9 @@ class ConsoleFormatter:
def handle_llm_call_started(
self,
agent_branch: Tree | None,
crew_tree: Tree | None,
) -> Tree | None:
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle LLM call started event."""
if not self.verbose:
return None
@@ -672,9 +672,9 @@ class ConsoleFormatter:
def handle_llm_call_completed(
self,
tool_branch: Tree | None,
agent_branch: Tree | None,
crew_tree: Tree | None,
tool_branch: Optional[Tree],
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> None:
"""Handle LLM call completed event."""
if not self.verbose:
@@ -736,7 +736,7 @@ class ConsoleFormatter:
self.print()
def handle_llm_call_failed(
self, tool_branch: Tree | None, error: str, crew_tree: Tree | None
self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree]
) -> None:
"""Handle LLM call failed event."""
if not self.verbose:
@@ -789,7 +789,7 @@ class ConsoleFormatter:
def handle_crew_test_started(
self, crew_name: str, source_id: str, n_iterations: int
) -> Tree | None:
) -> Optional[Tree]:
"""Handle crew test started event."""
if not self.verbose:
return None
@@ -823,7 +823,7 @@ class ConsoleFormatter:
return test_tree
def handle_crew_test_completed(
self, flow_tree: Tree | None, crew_name: str
self, flow_tree: Optional[Tree], crew_name: str
) -> None:
"""Handle crew test completed event."""
if not self.verbose:
@@ -913,7 +913,7 @@ class ConsoleFormatter:
self.print_panel(failure_content, "Test Failure", "red")
self.print()
def create_lite_agent_branch(self, lite_agent_role: str) -> Tree | None:
def create_lite_agent_branch(self, lite_agent_role: str) -> Optional[Tree]:
"""Create and initialize a lite agent branch."""
if not self.verbose:
return None
@@ -935,10 +935,10 @@ class ConsoleFormatter:
def update_lite_agent_status(
self,
lite_agent_branch: Tree | None,
lite_agent_branch: Optional[Tree],
lite_agent_role: str,
status: str = "completed",
**fields: dict[str, Any],
**fields: Dict[str, Any],
) -> None:
"""Update lite agent status in the tree."""
if not self.verbose or lite_agent_branch is None:
@@ -981,7 +981,7 @@ class ConsoleFormatter:
lite_agent_role: str,
status: str = "started",
error: Any = None,
**fields: dict[str, Any],
**fields: Dict[str, Any],
) -> None:
"""Handle lite agent execution events with consistent formatting."""
if not self.verbose:
@@ -1006,9 +1006,9 @@ class ConsoleFormatter:
def handle_knowledge_retrieval_started(
self,
agent_branch: Tree | None,
crew_tree: Tree | None,
) -> Tree | None:
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle knowledge retrieval started event."""
if not self.verbose:
return None
@@ -1034,13 +1034,13 @@ class ConsoleFormatter:
def handle_knowledge_retrieval_completed(
self,
agent_branch: Tree | None,
crew_tree: Tree | None,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
retrieved_knowledge: Any,
) -> None:
"""Handle knowledge retrieval completed event."""
if not self.verbose:
return
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
@@ -1062,7 +1062,7 @@ class ConsoleFormatter:
)
self.print(knowledge_panel)
self.print()
return
return None
knowledge_branch_found = False
for child in branch_to_use.children:
@@ -1111,18 +1111,18 @@ class ConsoleFormatter:
def handle_knowledge_query_started(
self,
agent_branch: Tree | None,
agent_branch: Optional[Tree],
task_prompt: str,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
) -> None:
"""Handle knowledge query generated event."""
if not self.verbose:
return
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return
return None
query_branch = branch_to_use.add("")
self.update_tree_label(
@@ -1134,9 +1134,9 @@ class ConsoleFormatter:
def handle_knowledge_query_failed(
self,
agent_branch: Tree | None,
agent_branch: Optional[Tree],
error: str,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
) -> None:
"""Handle knowledge query failed event."""
if not self.verbose:
@@ -1159,18 +1159,18 @@ class ConsoleFormatter:
def handle_knowledge_query_completed(
self,
agent_branch: Tree | None,
crew_tree: Tree | None,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> None:
"""Handle knowledge query completed event."""
if not self.verbose:
return
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return
return None
query_branch = branch_to_use.add("")
self.update_tree_label(query_branch, "", "Knowledge Query Completed", "green")
@@ -1180,9 +1180,9 @@ class ConsoleFormatter:
def handle_knowledge_search_query_failed(
self,
agent_branch: Tree | None,
agent_branch: Optional[Tree],
error: str,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
) -> None:
"""Handle knowledge search query failed event."""
if not self.verbose:
@@ -1207,10 +1207,10 @@ class ConsoleFormatter:
def handle_reasoning_started(
self,
agent_branch: Tree | None,
agent_branch: Optional[Tree],
attempt: int,
crew_tree: Tree | None,
) -> Tree | None:
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle agent reasoning started (or refinement) event."""
if not self.verbose:
return None
@@ -1249,7 +1249,7 @@ class ConsoleFormatter:
self,
plan: str,
ready: bool,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
) -> None:
"""Handle agent reasoning completed event."""
if not self.verbose:
@@ -1292,7 +1292,7 @@ class ConsoleFormatter:
def handle_reasoning_failed(
self,
error: str,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
) -> None:
"""Handle agent reasoning failure event."""
if not self.verbose:
@@ -1329,7 +1329,7 @@ class ConsoleFormatter:
def handle_agent_logs_started(
self,
agent_role: str,
task_description: str | None = None,
task_description: Optional[str] = None,
verbose: bool = False,
) -> None:
"""Handle agent logs started event."""
@@ -1367,11 +1367,10 @@ class ConsoleFormatter:
if not verbose:
return
from crewai.agents.parser import AgentAction, AgentFinish
import json
import re
from crewai.agents.parser import AgentAction, AgentFinish
agent_role = agent_role.partition("\n")[0]
if isinstance(formatted_answer, AgentAction):
@@ -1438,17 +1437,8 @@ class ConsoleFormatter:
# Create tool output content with better formatting
output_text = str(formatted_answer.result)
if len(output_text) > 5000:
if output_text.count("\n") > 10: # Multi-line structured data
lines = output_text.split("\n")
truncated_lines = lines[:10]
remaining_lines = len(lines) - 10
output_text = (
"\n".join(truncated_lines)
+ f"\n... and {remaining_lines} more rows"
)
else:
output_text = output_text[:4997] + "..."
if len(output_text) > 2000:
output_text = output_text[:1997] + "..."
output_panel = Panel(
Text(output_text, style="bright_green"),
@@ -1483,9 +1473,9 @@ class ConsoleFormatter:
def handle_memory_retrieval_started(
self,
agent_branch: Tree | None,
crew_tree: Tree | None,
) -> Tree | None:
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> Optional[Tree]:
if not self.verbose:
return None
@@ -1507,13 +1497,13 @@ class ConsoleFormatter:
def handle_memory_retrieval_completed(
self,
agent_branch: Tree | None,
crew_tree: Tree | None,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
memory_content: str,
retrieval_time_ms: float,
) -> None:
if not self.verbose:
return
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
@@ -1538,7 +1528,7 @@ class ConsoleFormatter:
if branch_to_use is None or tree_to_use is None:
add_panel()
return
return None
memory_branch_found = False
for child in branch_to_use.children:
@@ -1575,13 +1565,13 @@ class ConsoleFormatter:
def handle_memory_query_completed(
self,
agent_branch: Tree | None,
agent_branch: Optional[Tree],
source_type: str,
query_time_ms: float,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
) -> None:
if not self.verbose:
return
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
@@ -1590,15 +1580,15 @@ class ConsoleFormatter:
branch_to_use = tree_to_use
if branch_to_use is None:
return
return None
memory_type = source_type.replace("_", " ").title()
for child in branch_to_use.children:
if "Memory Retrieval" in str(child.label):
for inner_child in child.children:
sources_branch = inner_child
if "Sources Used" in str(inner_child.label):
for child in child.children:
sources_branch = child
if "Sources Used" in str(child.label):
sources_branch.add(f"{memory_type} ({query_time_ms:.2f}ms)")
break
else:
@@ -1608,13 +1598,13 @@ class ConsoleFormatter:
def handle_memory_query_failed(
self,
agent_branch: Tree | None,
crew_tree: Tree | None,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
error: str,
source_type: str,
) -> None:
if not self.verbose:
return
return None
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
@@ -1623,15 +1613,15 @@ class ConsoleFormatter:
branch_to_use = tree_to_use
if branch_to_use is None:
return
return None
memory_type = source_type.replace("_", " ").title()
for child in branch_to_use.children:
if "Memory Retrieval" in str(child.label):
for inner_child in child.children:
sources_branch = inner_child
if "Sources Used" in str(inner_child.label):
for child in child.children:
sources_branch = child
if "Sources Used" in str(child.label):
sources_branch.add(f"{memory_type} - Error: {error}")
break
else:
@@ -1640,16 +1630,16 @@ class ConsoleFormatter:
break
def handle_memory_save_started(
self, agent_branch: Tree | None, crew_tree: Tree | None
self, agent_branch: Optional[Tree], crew_tree: Optional[Tree]
) -> None:
if not self.verbose:
return
return None
branch_to_use = agent_branch or self.current_lite_agent_branch
tree_to_use = branch_to_use or crew_tree
if tree_to_use is None:
return
return None
for child in tree_to_use.children:
if "Memory Update" in str(child.label):
@@ -1665,19 +1655,19 @@ class ConsoleFormatter:
def handle_memory_save_completed(
self,
agent_branch: Tree | None,
crew_tree: Tree | None,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
save_time_ms: float,
source_type: str,
) -> None:
if not self.verbose:
return
return None
branch_to_use = agent_branch or self.current_lite_agent_branch
tree_to_use = branch_to_use or crew_tree
if tree_to_use is None:
return
return None
memory_type = source_type.replace("_", " ").title()
content = f"{memory_type} Memory Saved ({save_time_ms:.2f}ms)"
@@ -1695,19 +1685,19 @@ class ConsoleFormatter:
def handle_memory_save_failed(
self,
agent_branch: Tree | None,
agent_branch: Optional[Tree],
error: str,
source_type: str,
crew_tree: Tree | None,
crew_tree: Optional[Tree],
) -> None:
if not self.verbose:
return
return None
branch_to_use = agent_branch or self.current_lite_agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return
return None
memory_type = source_type.replace("_", " ").title()
content = f"{memory_type} Memory Save Failed"
@@ -1748,7 +1738,7 @@ class ConsoleFormatter:
def handle_guardrail_completed(
self,
success: bool,
error: str | None,
error: Optional[str],
retry_count: int,
) -> None:
"""Display guardrail evaluation result.

View File

@@ -5,20 +5,12 @@ import time
from difflib import SequenceMatcher
from json import JSONDecodeError
from textwrap import dedent
from typing import TYPE_CHECKING, Any, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import json5
from json_repair import repair_json # type: ignore
from json_repair import repair_json
from crewai.agents.tools_handler import ToolsHandler
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolSelectionErrorEvent,
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
ToolValidateInputErrorEvent,
)
from crewai.task import Task
from crewai.telemetry import Telemetry
from crewai.tools.structured_tool import CrewStructuredTool
@@ -28,6 +20,14 @@ from crewai.utilities.agent_utils import (
get_tool_names,
render_text_description_and_args,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolSelectionErrorEvent,
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
ToolValidateInputErrorEvent,
)
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -44,7 +44,7 @@ OPENAI_BIGGER_MODELS = [
]
class ToolUsageError(Exception):
class ToolUsageErrorException(Exception):
"""Exception raised for errors in the tool usage."""
def __init__(self, message: str) -> None:
@@ -68,13 +68,13 @@ class ToolUsage:
def __init__(
self,
tools_handler: ToolsHandler | None,
tools: list[CrewStructuredTool],
task: Task | None,
tools_handler: Optional[ToolsHandler],
tools: List[CrewStructuredTool],
task: Optional[Task],
function_calling_llm: Any,
agent: Union["BaseAgent", "LiteAgent"] | None = None,
agent: Optional[Union["BaseAgent", "LiteAgent"]] = None,
action: Any = None,
fingerprint_context: dict[str, str] | None = None,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> None:
self._i18n: I18N = agent.i18n if agent else I18N()
self._printer: Printer = Printer()
@@ -105,9 +105,9 @@ class ToolUsage:
return self._tool_calling(tool_string)
def use(
self, calling: ToolCalling | InstructorToolCalling, tool_string: str
self, calling: Union[ToolCalling, InstructorToolCalling], tool_string: str
) -> str:
if isinstance(calling, ToolUsageError):
if isinstance(calling, ToolUsageErrorException):
error = calling.message
if self.agent and self.agent.verbose:
self._printer.print(content=f"\n\n{error}\n", color="red")
@@ -130,7 +130,8 @@ class ToolUsage:
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
):
try:
return self._use(tool_string=tool_string, tool=tool, calling=calling)
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
return result
except Exception as e:
error = getattr(e, "message", str(e))
@@ -146,7 +147,7 @@ class ToolUsage:
self,
tool_string: str,
tool: CrewStructuredTool,
calling: ToolCalling | InstructorToolCalling,
calling: Union[ToolCalling, InstructorToolCalling],
) -> str:
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
try:
@@ -158,7 +159,8 @@ class ToolUsage:
tool_name=tool.name,
attempts=self._run_attempts,
)
return self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
result = self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
return result # type: ignore # Fix the return type of this function
except Exception:
if self.task:
@@ -174,7 +176,7 @@ class ToolUsage:
"agent": self.agent,
}
if hasattr(self.agent, 'fingerprint') and self.agent.fingerprint:
if self.agent.fingerprint:
event_data.update(self.agent.fingerprint)
if self.task:
event_data["task_name"] = self.task.name or self.task.description
@@ -183,14 +185,13 @@ class ToolUsage:
started_at = time.time()
from_cache = False
result = None # type: ignore
if self.tools_handler and self.tools_handler.cache:
cache_result = self.tools_handler.cache.read(
tool=calling.tool_name, input=str(calling.arguments)
)
from_cache = cache_result is not None
if cache_result is not None:
result = cache_result
result = self.tools_handler.cache.read(
tool=calling.tool_name, input=calling.arguments
) # type: ignore
from_cache = result is not None
available_tool = next(
(
@@ -206,7 +207,8 @@ class ToolUsage:
try:
result = usage_limit_error
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
return self._format_result(result=result)
result = self._format_result(result=result)
return result
except Exception:
if self.task:
self.task.increment_tools_errors()
@@ -253,7 +255,7 @@ class ToolUsage:
error_message = self._i18n.errors("tool_usage_exception").format(
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageError(
error = ToolUsageErrorException(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
if self.task:
@@ -329,15 +331,6 @@ class ToolUsage:
self.task.used_tools += 1
if self._should_remember_format():
result = self._remember_format(result=result)
if isinstance(result, (list, dict)):
import json
try:
return json.dumps(result, indent=2, ensure_ascii=False)
except (TypeError, ValueError):
return str(result)
return str(result)
def _should_remember_format(self) -> bool:
@@ -353,7 +346,7 @@ class ToolUsage:
return result
def _check_tool_repeated_usage(
self, calling: ToolCalling | InstructorToolCalling
self, calling: Union[ToolCalling, InstructorToolCalling]
) -> bool:
if not self.tools_handler:
return False
@@ -400,7 +393,7 @@ class ToolUsage:
return tool
if self.task:
self.task.increment_tools_errors()
tool_selection_data: dict[str, Any] = {
tool_selection_data: Dict[str, Any] = {
"agent_key": getattr(self.agent, "key", None) if self.agent else None,
"agent_role": getattr(self.agent, "role", None) if self.agent else None,
"tool_name": tool_name,
@@ -417,24 +410,27 @@ class ToolUsage:
),
)
raise Exception(error)
error = f"I forgot the Action name, these are the only available Actions: {self.tools_description}"
crewai_event_bus.emit(
self,
ToolSelectionErrorEvent(
**tool_selection_data,
error=error,
),
)
raise Exception(error)
else:
error = f"I forgot the Action name, these are the only available Actions: {self.tools_description}"
crewai_event_bus.emit(
self,
ToolSelectionErrorEvent(
**tool_selection_data,
error=error,
),
)
raise Exception(error)
def _render(self) -> str:
"""Render the tool name and description in plain text."""
descriptions = [tool.description for tool in self.tools]
descriptions = []
for tool in self.tools:
descriptions.append(tool.description)
return "\n--\n".join(descriptions)
def _function_calling(
self, tool_string: str
) -> ToolCalling | InstructorToolCalling:
) -> Union[ToolCalling, InstructorToolCalling]:
model = (
InstructorToolCalling
if self.function_calling_llm.supports_function_calling()
@@ -457,13 +453,13 @@ class ToolUsage:
)
tool_object = converter.to_pydantic()
if not isinstance(tool_object, (ToolCalling, InstructorToolCalling)):
raise ToolUsageError("Failed to parse tool calling")
raise ToolUsageErrorException("Failed to parse tool calling")
return tool_object
def _original_tool_calling(
self, tool_string: str, raise_error: bool = False
) -> ToolCalling | InstructorToolCalling | ToolUsageError:
) -> Union[ToolCalling, InstructorToolCalling, ToolUsageErrorException]:
tool_name = self.action.tool
tool = self._select_tool(tool_name)
try:
@@ -472,16 +468,18 @@ class ToolUsage:
except Exception:
if raise_error:
raise
return ToolUsageError(
f"{self._i18n.errors('tool_arguments_error')}"
)
else:
return ToolUsageErrorException(
f"{self._i18n.errors('tool_arguments_error')}"
)
if not isinstance(arguments, dict):
if raise_error:
raise
return ToolUsageError(
f"{self._i18n.errors('tool_arguments_error')}"
)
else:
return ToolUsageErrorException(
f"{self._i18n.errors('tool_arguments_error')}"
)
return ToolCalling(
tool_name=tool.name,
@@ -490,14 +488,15 @@ class ToolUsage:
def _tool_calling(
self, tool_string: str
) -> ToolCalling | InstructorToolCalling | ToolUsageError:
) -> Union[ToolCalling, InstructorToolCalling, ToolUsageErrorException]:
try:
try:
return self._original_tool_calling(tool_string, raise_error=True)
except Exception:
if self.function_calling_llm:
return self._function_calling(tool_string)
return self._original_tool_calling(tool_string)
else:
return self._original_tool_calling(tool_string)
except Exception as e:
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
@@ -506,12 +505,12 @@ class ToolUsage:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(content=f"\n\n{e}\n", color="red")
return ToolUsageError( # type: ignore # Incompatible return value type (got "ToolUsageError", expected "ToolCalling | InstructorToolCalling")
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
)
return self._tool_calling(tool_string)
def _validate_tool_input(self, tool_input: str | None) -> dict[str, Any]:
def _validate_tool_input(self, tool_input: Optional[str]) -> Dict[str, Any]:
if tool_input is None:
return {}
@@ -535,7 +534,7 @@ class ToolUsage:
return arguments
except (ValueError, SyntaxError):
repaired_input = repair_json(tool_input)
# Continue to the next parsing attempt
pass # Continue to the next parsing attempt
# Attempt 3: Parse as JSON5
try:
@@ -587,7 +586,7 @@ class ToolUsage:
def on_tool_error(
self,
tool: Any,
tool_calling: ToolCalling | InstructorToolCalling,
tool_calling: Union[ToolCalling, InstructorToolCalling],
e: Exception,
) -> None:
event_data = self._prepare_event_data(tool, tool_calling)
@@ -596,7 +595,7 @@ class ToolUsage:
def on_tool_use_finished(
self,
tool: Any,
tool_calling: ToolCalling | InstructorToolCalling,
tool_calling: Union[ToolCalling, InstructorToolCalling],
from_cache: bool,
started_at: float,
result: Any,
@@ -617,7 +616,7 @@ class ToolUsage:
crewai_event_bus.emit(self, ToolUsageFinishedEvent(**event_data))
def _prepare_event_data(
self, tool: Any, tool_calling: ToolCalling | InstructorToolCalling
self, tool: Any, tool_calling: Union[ToolCalling, InstructorToolCalling]
) -> dict:
event_data = {
"run_attempts": self._run_attempts,

View File

@@ -1,24 +1,24 @@
from typing import Any
from typing import Any, Dict, List, Optional
from crewai.agents.parser import AgentAction
from crewai.security import Fingerprint
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.tools.tool_usage import ToolUsage, ToolUsageError
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities.i18n import I18N
def execute_tool_and_check_finality(
agent_action: AgentAction,
tools: list[CrewStructuredTool],
tools: List[CrewStructuredTool],
i18n: I18N,
agent_key: str | None = None,
agent_role: str | None = None,
tools_handler: Any | None = None,
task: Any | None = None,
agent: Any | None = None,
function_calling_llm: Any | None = None,
fingerprint_context: dict[str, str] | None = None,
agent_key: Optional[str] = None,
agent_role: Optional[str] = None,
tools_handler: Optional[Any] = None,
task: Optional[Any] = None,
agent: Optional[Any] = None,
function_calling_llm: Optional[Any] = None,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> ToolResult:
"""Execute a tool and check if the result should be treated as a final answer.
@@ -50,7 +50,7 @@ def execute_tool_and_check_finality(
fingerprint_obj = Fingerprint.from_dict(fingerprint_context)
agent.set_fingerprint(fingerprint_obj)
except Exception as e:
raise ValueError(f"Failed to set fingerprint: {e}") from e
raise ValueError(f"Failed to set fingerprint: {e}")
# Create tool usage instance
tool_usage = ToolUsage(
@@ -65,7 +65,7 @@ def execute_tool_and_check_finality(
# Parse tool calling
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
if isinstance(tool_calling, ToolUsageError):
if isinstance(tool_calling, ToolUsageErrorException):
return ToolResult(tool_calling.message, False)
# Check if tool name matches

View File

@@ -1,11 +1,11 @@
import pytest
from crewai.agents import parser
from crewai.agents.crew_agent_executor import (
AgentAction,
AgentFinish,
OutputParserException,
)
from crewai.agents import parser
def test_valid_action_parsing_special_characters():
@@ -345,12 +345,16 @@ def test_integration_valid_and_invalid():
"""
parts = text.strip().split("\n\n")
results = []
for part in parts:
def parse_part(part_text):
try:
result = parser.parse(part.strip())
results.append(result)
return parser.parse(part_text.strip())
except OutputParserException as e:
results.append(e)
return e
for part in parts:
result = parse_part(part)
results.append(result)
assert isinstance(results[0], AgentAction)
assert isinstance(results[1], AgentFinish)
@@ -359,3 +363,96 @@ def test_integration_valid_and_invalid():
# TODO: ADD TEST TO MAKE SURE ** REMOVAL DOESN'T MESS UP ANYTHING
def test_harmony_analysis_channel_parsing():
"""Test parsing OpenAI Harmony analysis channel (final answer)."""
text = "<|start|>assistant<|channel|>analysis<|message|>The temperature in SF is 72°F<|end|>"
result = parser.parse(text)
assert isinstance(result, parser.AgentFinish)
assert result.output == "The temperature in SF is 72°F"
assert "Analysis:" in result.thought
def test_harmony_commentary_channel_parsing():
"""Test parsing OpenAI Harmony commentary channel (tool action)."""
text = '<|start|>assistant<|channel|>commentary to=search<|message|>{"query": "temperature in SF"}<|call|>'
result = parser.parse(text)
assert isinstance(result, parser.AgentAction)
assert result.tool == "search"
assert result.tool_input == '{"query": "temperature in SF"}'
def test_harmony_commentary_with_thought():
"""Test Harmony commentary with reasoning before JSON."""
text = '<|start|>assistant<|channel|>commentary to=search<|message|>I need to find the temperature {"query": "SF weather"}<|call|>'
result = parser.parse(text)
assert isinstance(result, parser.AgentAction)
assert result.tool == "search"
assert result.thought == "I need to find the temperature"
assert result.tool_input == '{"query": "SF weather"}'
def test_harmony_multiple_blocks():
"""Test parsing multiple Harmony blocks (uses last one)."""
text = '''<|start|>assistant<|channel|>analysis<|message|>Thinking about this<|end|>
<|start|>assistant<|channel|>commentary to=search<|message|>{"query": "test"}<|call|>'''
result = parser.parse(text)
assert isinstance(result, parser.AgentAction)
assert result.tool == "search"
def test_harmony_format_detection():
"""Test that Harmony format is properly detected."""
harmony_text = "<|start|>assistant<|channel|>analysis<|message|>result<|end|>"
react_text = "Thought: test\nFinal Answer: result"
harmony_result = parser.parse(harmony_text)
react_result = parser.parse(react_text)
assert isinstance(harmony_result, parser.AgentFinish)
assert isinstance(react_result, parser.AgentFinish)
assert harmony_result.output == "result"
assert react_result.output == "result"
def test_harmony_invalid_format_error():
"""Test error handling for invalid Harmony format."""
text = "<|start|>assistant<|channel|>unknown<|message|>content<|end|>"
with pytest.raises(parser.OutputParserException) as exc_info:
parser.parse(text)
assert "Invalid Harmony Format" in str(exc_info.value)
def test_automatic_format_detection():
"""Test that the parser automatically detects different formats."""
react_action = "Thought: Let's search\nAction: search\nAction Input: query"
react_finish = "Thought: Done\nFinal Answer: result"
harmony_action = '<|start|>assistant<|channel|>commentary to=tool<|message|>{"input": "test"}<|call|>'
harmony_finish = "<|start|>assistant<|channel|>analysis<|message|>final result<|end|>"
assert isinstance(parser.parse(react_action), parser.AgentAction)
assert isinstance(parser.parse(react_finish), parser.AgentFinish)
assert isinstance(parser.parse(harmony_action), parser.AgentAction)
assert isinstance(parser.parse(harmony_finish), parser.AgentFinish)
def test_format_registry():
"""Test the format registry functionality."""
from crewai.agents.parser import _format_registry
assert 'react' in _format_registry._parsers
assert 'harmony' in _format_registry._parsers
react_text = "Thought: test\nAction: search\nAction Input: query"
harmony_text = "<|start|>assistant<|channel|>analysis<|message|>result<|end|>"
assert _format_registry._parsers['react'].can_parse(react_text)
assert _format_registry._parsers['harmony'].can_parse(harmony_text)
assert not _format_registry._parsers['react'].can_parse(harmony_text)
assert not _format_registry._parsers['harmony'].can_parse(react_text)
def test_backward_compatibility():
"""Test that all existing ReAct format tests still pass."""

216
tests/test_context.py Normal file
View File

@@ -0,0 +1,216 @@
# ruff: noqa: S105
import os
import pytest
from unittest.mock import patch
from crewai.context import (
set_platform_integration_token,
get_platform_integration_token,
platform_context,
_platform_integration_token,
)
class TestPlatformIntegrationToken:
def setup_method(self):
_platform_integration_token.set(None)
def teardown_method(self):
_platform_integration_token.set(None)
def test_set_platform_integration_token(self):
test_token = "test-token-123"
assert get_platform_integration_token() is None
set_platform_integration_token(test_token)
assert get_platform_integration_token() == test_token
def test_get_platform_integration_token_from_context_var(self):
test_token = "context-var-token"
_platform_integration_token.set(test_token)
assert get_platform_integration_token() == test_token
@patch.dict(os.environ, {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "env-token-456"})
def test_get_platform_integration_token_from_env_var(self):
assert _platform_integration_token.get() is None
assert get_platform_integration_token() == "env-token-456"
@patch.dict(os.environ, {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "env-token"})
def test_context_var_takes_precedence_over_env_var(self):
context_token = "context-token"
set_platform_integration_token(context_token)
assert get_platform_integration_token() == context_token
@patch.dict(os.environ, {}, clear=True)
def test_get_platform_integration_token_returns_none_when_not_set(self):
assert _platform_integration_token.get() is None
assert get_platform_integration_token() is None
def test_platform_context_manager_basic_usage(self):
test_token = "context-manager-token"
assert get_platform_integration_token() is None
with platform_context(test_token):
assert get_platform_integration_token() == test_token
assert get_platform_integration_token() is None
def test_platform_context_manager_nested_contexts(self):
"""Test nested platform_context context managers."""
outer_token = "outer-token"
inner_token = "inner-token"
assert get_platform_integration_token() is None
with platform_context(outer_token):
assert get_platform_integration_token() == outer_token
with platform_context(inner_token):
assert get_platform_integration_token() == inner_token
assert get_platform_integration_token() == outer_token
assert get_platform_integration_token() is None
def test_platform_context_manager_preserves_existing_token(self):
"""Test that platform_context preserves existing token when exiting."""
initial_token = "initial-token"
context_token = "context-token"
set_platform_integration_token(initial_token)
assert get_platform_integration_token() == initial_token
with platform_context(context_token):
assert get_platform_integration_token() == context_token
assert get_platform_integration_token() == initial_token
def test_platform_context_manager_exception_handling(self):
"""Test that platform_context properly resets token even when exception occurs."""
initial_token = "initial-token"
context_token = "context-token"
set_platform_integration_token(initial_token)
with pytest.raises(ValueError):
with platform_context(context_token):
assert get_platform_integration_token() == context_token
raise ValueError("Test exception")
assert get_platform_integration_token() == initial_token
def test_platform_context_manager_with_none_initial_state(self):
"""Test platform_context when initial state is None."""
context_token = "context-token"
assert get_platform_integration_token() is None
with pytest.raises(RuntimeError):
with platform_context(context_token):
assert get_platform_integration_token() == context_token
raise RuntimeError("Test exception")
assert get_platform_integration_token() is None
@patch.dict(os.environ, {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "env-backup"})
def test_platform_context_with_env_fallback(self):
"""Test platform_context interaction with environment variable fallback."""
context_token = "context-token"
assert get_platform_integration_token() == "env-backup"
with platform_context(context_token):
assert get_platform_integration_token() == context_token
assert get_platform_integration_token() == "env-backup"
def test_multiple_sequential_context_managers(self):
"""Test multiple sequential uses of platform_context."""
token1 = "token-1"
token2 = "token-2"
token3 = "token-3"
with platform_context(token1):
assert get_platform_integration_token() == token1
assert get_platform_integration_token() is None
with platform_context(token2):
assert get_platform_integration_token() == token2
assert get_platform_integration_token() is None
with platform_context(token3):
assert get_platform_integration_token() == token3
assert get_platform_integration_token() is None
def test_empty_string_token(self):
empty_token = ""
set_platform_integration_token(empty_token)
assert get_platform_integration_token() == ""
with platform_context(empty_token):
assert get_platform_integration_token() == ""
def test_special_characters_in_token(self):
special_token = "token-with-!@#$%^&*()_+-={}[]|\\:;\"'<>?,./"
set_platform_integration_token(special_token)
assert get_platform_integration_token() == special_token
with platform_context(special_token):
assert get_platform_integration_token() == special_token
def test_very_long_token(self):
long_token = "a" * 10000
set_platform_integration_token(long_token)
assert get_platform_integration_token() == long_token
with platform_context(long_token):
assert get_platform_integration_token() == long_token
@patch.dict(os.environ, {"CREWAI_PLATFORM_INTEGRATION_TOKEN": ""})
def test_empty_env_var(self):
assert _platform_integration_token.get() is None
assert get_platform_integration_token() == ""
@patch('crewai.context.os.getenv')
def test_env_var_access_error_handling(self, mock_getenv):
mock_getenv.side_effect = OSError("Environment access error")
with pytest.raises(OSError):
get_platform_integration_token()
def test_context_var_isolation_between_tests(self):
"""Test that context variable changes don't leak between test methods."""
test_token = "isolation-test-token"
assert get_platform_integration_token() is None
set_platform_integration_token(test_token)
assert get_platform_integration_token() == test_token
def test_context_manager_return_value(self):
"""Test that platform_context can be used in with statement with return value."""
test_token = "return-value-token"
with platform_context(test_token):
assert get_platform_integration_token() == test_token
with platform_context(test_token) as ctx:
assert ctx is None
assert get_platform_integration_token() == test_token

View File

@@ -1,235 +0,0 @@
import json
from typing import Any, ClassVar
from unittest.mock import Mock, patch
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crew import Crew
from crewai.project import CrewBase, agent, crew, task
from crewai.task import Task
from crewai.tools import tool
@tool
def mock_bigquery_single_row():
"""Mock BigQuery tool that returns a single row"""
return {"id": 1, "name": "John", "age": 30}
@tool
def mock_bigquery_multiple_rows():
"""Mock BigQuery tool that returns multiple rows"""
return [
{"id": 1, "name": "John", "age": 30},
{"id": 2, "name": "Jane", "age": 25},
{"id": 3, "name": "Bob", "age": 35},
{"id": 4, "name": "Alice", "age": 28},
]
@tool
def mock_bigquery_large_dataset():
"""Mock BigQuery tool that returns a large dataset"""
return [{"id": i, "name": f"User{i}", "value": f"data_{i}"} for i in range(100)]
@tool
def mock_bigquery_nested_data():
"""Mock BigQuery tool that returns nested data structures"""
return [
{
"id": 1,
"user": {"name": "John", "email": "john@example.com"},
"orders": [
{"order_id": 101, "amount": 50.0},
{"order_id": 102, "amount": 75.0},
],
},
{
"id": 2,
"user": {"name": "Jane", "email": "jane@example.com"},
"orders": [{"order_id": 103, "amount": 100.0}],
},
]
@CrewBase
class MCPTestCrew:
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
mcp_server_params: ClassVar[dict[str, Any]] = {"host": "localhost", "port": 8000}
mcp_connect_timeout = 120
agents: list[BaseAgent]
tasks: list[Task]
@agent
def data_analyst(self):
return Agent(
role="Data Analyst",
goal="Analyze data from various sources",
backstory="Expert in data analysis and BigQuery",
tools=[mock_bigquery_single_row, mock_bigquery_multiple_rows],
)
@agent
def mcp_agent(self):
return Agent(
role="MCP Agent",
goal="Use MCP tools to fetch data",
backstory="Agent that uses MCP tools",
tools=self.get_mcp_tools(),
)
@task
def analyze_single_row(self):
return Task(
description="Use mock_bigquery_single_row tool to get data",
expected_output="Single row of data",
agent=self.data_analyst(),
)
@task
def analyze_multiple_rows(self):
return Task(
description="Use mock_bigquery_multiple_rows tool to get data",
expected_output="Multiple rows of data",
agent=self.data_analyst(),
)
@crew
def crew(self):
return Crew(agents=self.agents, tasks=self.tasks, verbose=True)
def test_single_row_tool_output():
"""Test that single row tool output works correctly"""
result = mock_bigquery_single_row.invoke({})
assert isinstance(result, dict)
assert result["id"] == 1
assert result["name"] == "John"
assert result["age"] == 30
def test_multiple_rows_tool_output():
"""Test that multiple rows tool output is preserved"""
result = mock_bigquery_multiple_rows.invoke({})
assert isinstance(result, list)
assert len(result) == 4
assert result[0]["id"] == 1
assert result[1]["id"] == 2
assert result[2]["id"] == 3
assert result[3]["id"] == 4
def test_large_dataset_tool_output():
"""Test that large datasets are handled correctly"""
result = mock_bigquery_large_dataset.invoke({})
assert isinstance(result, list)
assert len(result) == 100
assert result[0]["id"] == 0
assert result[99]["id"] == 99
def test_nested_data_tool_output():
"""Test that nested data structures are preserved"""
result = mock_bigquery_nested_data.invoke({})
assert isinstance(result, list)
assert len(result) == 2
assert result[0]["user"]["name"] == "John"
assert len(result[0]["orders"]) == 2
assert result[1]["user"]["name"] == "Jane"
assert len(result[1]["orders"]) == 1
def test_tool_result_formatting():
"""Test that tool results are properly formatted as strings"""
from crewai.tools.tool_usage import ToolUsage
tool_usage = ToolUsage()
single_result = mock_bigquery_single_row.invoke({})
formatted_single = tool_usage._format_result(single_result)
assert isinstance(formatted_single, str)
parsed_single = json.loads(formatted_single)
assert parsed_single["id"] == 1
multi_result = mock_bigquery_multiple_rows.invoke({})
formatted_multi = tool_usage._format_result(multi_result)
assert isinstance(formatted_multi, str)
parsed_multi = json.loads(formatted_multi)
assert len(parsed_multi) == 4
assert parsed_multi[0]["id"] == 1
assert parsed_multi[3]["id"] == 4
def test_mcp_crew_with_mock_tools():
"""Test MCP crew integration with mock tools"""
with patch("embedchain.client.Client.setup"):
from crewai_tools import MCPServerAdapter
from crewai_tools.adapters.mcp_adapter import ToolCollection
mock_adapter = Mock(spec=MCPServerAdapter)
mock_adapter.tools = ToolCollection([mock_bigquery_multiple_rows])
with patch("crewai_tools.MCPServerAdapter", return_value=mock_adapter):
crew = MCPTestCrew()
mcp_agent = crew.mcp_agent()
assert mock_bigquery_multiple_rows in mcp_agent.tools
def test_tool_output_preserves_structure():
"""Test that tool output preserves data structure through the processing pipeline"""
from crewai.tools.tool_usage import ToolUsage
tool_usage = ToolUsage()
bigquery_result = [
{"id": 1, "name": "John", "revenue": 1000.50},
{"id": 2, "name": "Jane", "revenue": 2500.75},
{"id": 3, "name": "Bob", "revenue": 1750.25},
]
formatted_result = tool_usage._format_result(bigquery_result)
assert isinstance(formatted_result, str)
parsed_result = json.loads(formatted_result)
assert len(parsed_result) == 3
assert parsed_result[0]["id"] == 1
assert parsed_result[1]["name"] == "Jane"
assert parsed_result[2]["revenue"] == 1750.25
def test_tool_output_backward_compatibility():
"""Test that simple string/number outputs still work"""
from crewai.tools.tool_usage import ToolUsage
tool_usage = ToolUsage()
string_result = "Simple string result"
formatted_string = tool_usage._format_result(string_result)
assert formatted_string == "Simple string result"
number_result = 42
formatted_number = tool_usage._format_result(number_result)
assert formatted_number == "42"
bool_result = True
formatted_bool = tool_usage._format_result(bool_result)
assert formatted_bool == "True"
def test_malformed_data_handling():
"""Test that malformed data is handled gracefully"""
from crewai.tools.tool_usage import ToolUsage
tool_usage = ToolUsage()
class NonSerializable:
def __str__(self):
return "NonSerializable object"
non_serializable = NonSerializable()
formatted_result = tool_usage._format_result(non_serializable)
assert formatted_result == "NonSerializable object"