Compare commits

..

12 Commits

Author SHA1 Message Date
Greyson LaLonde
9a27fbb36b feat: use flush in atexit to not block results 2026-01-20 02:56:55 -05:00
Greyson LaLonde
74b9a7d9ab chore: update test assumption 2026-01-20 02:45:12 -05:00
Greyson LaLonde
ff9108dac4 feat: add flush() to event bus for deterministic event handling 2026-01-20 02:43:25 -05:00
Greyson LaLonde
c8f9547816 fix: ensure cache hit formatting; additional event types 2026-01-20 02:14:08 -05:00
Greyson LaLonde
d78ee915dc fix: add additional stack checks 2026-01-20 01:39:55 -05:00
Greyson LaLonde
5914f8be7d chore: update test assumptions 2026-01-20 01:19:47 -05:00
Greyson LaLonde
da07bd4d9f refactor: use try/finally for event pairing in tool usage 2026-01-20 01:15:05 -05:00
Greyson LaLonde
161f9bd063 feat: reset emission counter for test isolation 2026-01-20 01:12:52 -05:00
Greyson LaLonde
ae253b4156 refactor: improve tracing listener typing and sorting 2026-01-20 01:07:56 -05:00
Greyson LaLonde
f7e1bdb64e feat: integrate ordering and hierarchy into event bus 2026-01-20 01:07:13 -05:00
Greyson LaLonde
1707df8785 feat: add parent-child event hierarchy with scope management 2026-01-20 01:03:40 -05:00
Greyson LaLonde
decdefe8f5 feat: add emission sequence for event ordering 2026-01-20 01:01:14 -05:00
25 changed files with 6200 additions and 5458 deletions

View File

@@ -31,6 +31,21 @@ def cleanup_event_handlers() -> Generator[None, Any, None]:
pass
@pytest.fixture(autouse=True, scope="function")
def reset_event_state() -> None:
"""Reset event system state before each test for isolation."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import (
EventContextConfig,
_event_context_config,
_event_id_stack,
)
reset_emission_counter()
_event_id_stack.set(())
_event_context_config.set(EventContextConfig())
@pytest.fixture(autouse=True, scope="function")
def setup_test_environment() -> Generator[None, Any, None]:
"""Setup test environment for crewAI workspace."""

View File

@@ -89,9 +89,6 @@ from crewai_tools.tools.jina_scrape_website_tool.jina_scrape_website_tool import
from crewai_tools.tools.json_search_tool.json_search_tool import JSONSearchTool
from crewai_tools.tools.linkup.linkup_search_tool import LinkupSearchTool
from crewai_tools.tools.llamaindex_tool.llamaindex_tool import LlamaIndexTool
from crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool import (
MCPDiscoveryTool,
)
from crewai_tools.tools.mdx_search_tool.mdx_search_tool import MDXSearchTool
from crewai_tools.tools.merge_agent_handler_tool.merge_agent_handler_tool import (
MergeAgentHandlerTool,
@@ -239,7 +236,6 @@ __all__ = [
"JinaScrapeWebsiteTool",
"LinkupSearchTool",
"LlamaIndexTool",
"MCPDiscoveryTool",
"MCPServerAdapter",
"MDXSearchTool",
"MergeAgentHandlerTool",

View File

@@ -1,16 +0,0 @@
from crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool import (
MCPDiscoveryResult,
MCPDiscoveryTool,
MCPDiscoveryToolSchema,
MCPServerMetrics,
MCPServerRecommendation,
)
__all__ = [
"MCPDiscoveryResult",
"MCPDiscoveryTool",
"MCPDiscoveryToolSchema",
"MCPServerMetrics",
"MCPServerRecommendation",
]

View File

@@ -1,414 +0,0 @@
"""MCP Discovery Tool for CrewAI agents.
This tool enables agents to dynamically discover MCP servers based on
natural language queries using the MCP Discovery API.
"""
import json
import logging
import os
from typing import Any, TypedDict
from crewai.tools import BaseTool, EnvVar
from pydantic import BaseModel, Field
import requests
logger = logging.getLogger(__name__)
class MCPServerMetrics(TypedDict, total=False):
"""Performance metrics for an MCP server."""
avg_latency_ms: float | None
uptime_pct: float | None
last_checked: str | None
class MCPServerRecommendation(TypedDict, total=False):
"""A recommended MCP server from the discovery API."""
server: str
npm_package: str
install_command: str
confidence: float
description: str
capabilities: list[str]
metrics: MCPServerMetrics
docs_url: str
github_url: str
class MCPDiscoveryResult(TypedDict):
"""Result from the MCP Discovery API."""
recommendations: list[MCPServerRecommendation]
total_found: int
query_time_ms: int
class MCPDiscoveryConstraints(BaseModel):
"""Constraints for MCP server discovery."""
max_latency_ms: int | None = Field(
default=None,
description="Maximum acceptable latency in milliseconds",
)
required_features: list[str] | None = Field(
default=None,
description="List of required features/capabilities",
)
exclude_servers: list[str] | None = Field(
default=None,
description="List of server names to exclude from results",
)
class MCPDiscoveryToolSchema(BaseModel):
"""Input schema for MCPDiscoveryTool."""
need: str = Field(
...,
description=(
"Natural language description of what you need. "
"For example: 'database with authentication', 'email automation', "
"'file storage', 'web scraping'"
),
)
constraints: MCPDiscoveryConstraints | None = Field(
default=None,
description="Optional constraints to filter results",
)
limit: int = Field(
default=5,
description="Maximum number of recommendations to return (1-10)",
ge=1,
le=10,
)
class MCPDiscoveryTool(BaseTool):
"""Tool for discovering MCP servers dynamically.
This tool uses the MCP Discovery API to find MCP servers that match
a natural language description of what the agent needs. It enables
agents to dynamically discover and select the best MCP servers for
their tasks without requiring pre-configuration.
Example:
```python
from crewai import Agent
from crewai_tools import MCPDiscoveryTool
discovery_tool = MCPDiscoveryTool()
agent = Agent(
role='Researcher',
tools=[discovery_tool],
goal='Research and analyze data'
)
# The agent can now discover MCP servers dynamically:
# discover_mcp_server(need="database with authentication")
# Returns: Supabase MCP server with installation instructions
```
Attributes:
name: The name of the tool.
description: A description of what the tool does.
args_schema: The Pydantic model for input validation.
base_url: The base URL for the MCP Discovery API.
timeout: Request timeout in seconds.
"""
name: str = "Discover MCP Server"
description: str = (
"Discover MCP (Model Context Protocol) servers that match your needs. "
"Use this tool to find the best MCP server for any task using natural "
"language. Returns server recommendations with installation instructions, "
"capabilities, and performance metrics."
)
args_schema: type[BaseModel] = MCPDiscoveryToolSchema
base_url: str = "https://mcp-discovery-production.up.railway.app"
timeout: int = 30
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="MCP_DISCOVERY_API_KEY",
description="API key for MCP Discovery (optional for free tier)",
required=False,
),
]
)
def _build_request_payload(
self,
need: str,
constraints: MCPDiscoveryConstraints | None,
limit: int,
) -> dict[str, Any]:
"""Build the request payload for the discovery API.
Args:
need: Natural language description of what is needed.
constraints: Optional constraints to filter results.
limit: Maximum number of recommendations.
Returns:
Dictionary containing the request payload.
"""
payload: dict[str, Any] = {
"need": need,
"limit": limit,
}
if constraints:
constraints_dict: dict[str, Any] = {}
if constraints.max_latency_ms is not None:
constraints_dict["max_latency_ms"] = constraints.max_latency_ms
if constraints.required_features:
constraints_dict["required_features"] = constraints.required_features
if constraints.exclude_servers:
constraints_dict["exclude_servers"] = constraints.exclude_servers
if constraints_dict:
payload["constraints"] = constraints_dict
return payload
def _make_api_request(self, payload: dict[str, Any]) -> dict[str, Any]:
"""Make a request to the MCP Discovery API.
Args:
payload: The request payload.
Returns:
The API response as a dictionary.
Raises:
ValueError: If the API returns an empty response.
requests.exceptions.RequestException: If the request fails.
"""
url = f"{self.base_url}/api/v1/discover"
headers = {
"Content-Type": "application/json",
}
api_key = os.environ.get("MCP_DISCOVERY_API_KEY")
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
response = None
try:
response = requests.post(
url,
headers=headers,
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
results = response.json()
if not results:
logger.error("Empty response from MCP Discovery API")
raise ValueError("Empty response from MCP Discovery API")
return results
except requests.exceptions.RequestException as e:
error_msg = f"Error making request to MCP Discovery API: {e}"
if response is not None and hasattr(response, "content"):
error_msg += (
f"\nResponse content: "
f"{response.content.decode('utf-8', errors='replace')}"
)
logger.error(error_msg)
raise
except json.JSONDecodeError as e:
if response is not None and hasattr(response, "content"):
logger.error(f"Error decoding JSON response: {e}")
logger.error(
f"Response content: "
f"{response.content.decode('utf-8', errors='replace')}"
)
else:
logger.error(
f"Error decoding JSON response: {e} (No response content available)"
)
raise
def _process_single_recommendation(
self, rec: dict[str, Any]
) -> MCPServerRecommendation | None:
"""Process a single recommendation from the API.
Args:
rec: Raw recommendation dictionary from the API.
Returns:
Processed MCPServerRecommendation or None if malformed.
"""
try:
metrics_data = rec.get("metrics", {}) if isinstance(rec, dict) else {}
metrics: MCPServerMetrics = {
"avg_latency_ms": metrics_data.get("avg_latency_ms"),
"uptime_pct": metrics_data.get("uptime_pct"),
"last_checked": metrics_data.get("last_checked"),
}
recommendation: MCPServerRecommendation = {
"server": rec.get("server", ""),
"npm_package": rec.get("npm_package", ""),
"install_command": rec.get("install_command", ""),
"confidence": rec.get("confidence", 0.0),
"description": rec.get("description", ""),
"capabilities": rec.get("capabilities", []),
"metrics": metrics,
"docs_url": rec.get("docs_url", ""),
"github_url": rec.get("github_url", ""),
}
return recommendation
except (KeyError, TypeError, AttributeError) as e:
logger.warning(f"Skipping malformed recommendation: {rec}, error: {e}")
return None
def _process_recommendations(
self, recommendations: list[dict[str, Any]]
) -> list[MCPServerRecommendation]:
"""Process and validate server recommendations.
Args:
recommendations: Raw recommendations from the API.
Returns:
List of processed MCPServerRecommendation objects.
"""
processed: list[MCPServerRecommendation] = []
for rec in recommendations:
result = self._process_single_recommendation(rec)
if result is not None:
processed.append(result)
return processed
def _format_result(self, result: MCPDiscoveryResult) -> str:
"""Format the discovery result as a human-readable string.
Args:
result: The discovery result to format.
Returns:
A formatted string representation of the result.
"""
if not result["recommendations"]:
return "No MCP servers found matching your requirements."
lines = [
f"Found {result['total_found']} MCP server(s) "
f"(query took {result['query_time_ms']}ms):\n"
]
for i, rec in enumerate(result["recommendations"], 1):
confidence_pct = rec.get("confidence", 0) * 100
lines.append(f"{i}. {rec.get('server', 'Unknown')} ({confidence_pct:.0f}% confidence)")
lines.append(f" Description: {rec.get('description', 'N/A')}")
lines.append(f" Capabilities: {', '.join(rec.get('capabilities', []))}")
lines.append(f" Install: {rec.get('install_command', 'N/A')}")
lines.append(f" NPM Package: {rec.get('npm_package', 'N/A')}")
metrics = rec.get("metrics", {})
if metrics.get("avg_latency_ms") is not None:
lines.append(f" Avg Latency: {metrics['avg_latency_ms']}ms")
if metrics.get("uptime_pct") is not None:
lines.append(f" Uptime: {metrics['uptime_pct']}%")
if rec.get("docs_url"):
lines.append(f" Docs: {rec['docs_url']}")
if rec.get("github_url"):
lines.append(f" GitHub: {rec['github_url']}")
lines.append("")
return "\n".join(lines)
def _run(self, **kwargs: Any) -> str:
"""Execute the MCP discovery operation.
Args:
**kwargs: Keyword arguments matching MCPDiscoveryToolSchema.
Returns:
A formatted string with discovered MCP servers.
Raises:
ValueError: If required parameters are missing.
"""
need: str | None = kwargs.get("need")
if not need:
raise ValueError("'need' parameter is required")
constraints_data = kwargs.get("constraints")
constraints: MCPDiscoveryConstraints | None = None
if constraints_data:
if isinstance(constraints_data, dict):
constraints = MCPDiscoveryConstraints(**constraints_data)
elif isinstance(constraints_data, MCPDiscoveryConstraints):
constraints = constraints_data
limit: int = kwargs.get("limit", 5)
payload = self._build_request_payload(need, constraints, limit)
response = self._make_api_request(payload)
recommendations = self._process_recommendations(
response.get("recommendations", [])
)
result: MCPDiscoveryResult = {
"recommendations": recommendations,
"total_found": response.get("total_found", len(recommendations)),
"query_time_ms": response.get("query_time_ms", 0),
}
return self._format_result(result)
def discover(
self,
need: str,
constraints: MCPDiscoveryConstraints | None = None,
limit: int = 5,
) -> MCPDiscoveryResult:
"""Discover MCP servers matching the given requirements.
This is a convenience method that returns structured data instead
of a formatted string.
Args:
need: Natural language description of what is needed.
constraints: Optional constraints to filter results.
limit: Maximum number of recommendations (1-10).
Returns:
MCPDiscoveryResult containing server recommendations.
Example:
```python
tool = MCPDiscoveryTool()
result = tool.discover(
need="database with authentication",
constraints=MCPDiscoveryConstraints(
max_latency_ms=200,
required_features=["auth", "realtime"]
),
limit=3
)
for rec in result["recommendations"]:
print(f"{rec['server']}: {rec['description']}")
```
"""
payload = self._build_request_payload(need, constraints, limit)
response = self._make_api_request(payload)
recommendations = self._process_recommendations(
response.get("recommendations", [])
)
return {
"recommendations": recommendations,
"total_found": response.get("total_found", len(recommendations)),
"query_time_ms": response.get("query_time_ms", 0),
}

View File

@@ -1,452 +0,0 @@
"""Tests for the MCP Discovery Tool."""
import json
from unittest.mock import MagicMock, patch
import pytest
import requests
from crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool import (
MCPDiscoveryConstraints,
MCPDiscoveryResult,
MCPDiscoveryTool,
MCPDiscoveryToolSchema,
MCPServerMetrics,
MCPServerRecommendation,
)
@pytest.fixture
def mock_api_response() -> dict:
"""Create a mock API response for testing."""
return {
"recommendations": [
{
"server": "sqlite-server",
"npm_package": "@modelcontextprotocol/server-sqlite",
"install_command": "npx -y @modelcontextprotocol/server-sqlite",
"confidence": 0.38,
"description": "SQLite database server for MCP.",
"capabilities": ["sqlite", "sql", "database", "embedded"],
"metrics": {
"avg_latency_ms": 50.0,
"uptime_pct": 99.9,
"last_checked": "2026-01-17T10:30:00Z",
},
"docs_url": "https://modelcontextprotocol.io/docs/servers/sqlite",
"github_url": "https://github.com/modelcontextprotocol/servers",
},
{
"server": "postgres-server",
"npm_package": "@modelcontextprotocol/server-postgres",
"install_command": "npx -y @modelcontextprotocol/server-postgres",
"confidence": 0.33,
"description": "PostgreSQL database server for MCP.",
"capabilities": ["postgres", "sql", "database", "queries"],
"metrics": {
"avg_latency_ms": None,
"uptime_pct": None,
"last_checked": None,
},
"docs_url": "https://modelcontextprotocol.io/docs/servers/postgres",
"github_url": "https://github.com/modelcontextprotocol/servers",
},
],
"total_found": 2,
"query_time_ms": 245,
}
@pytest.fixture
def discovery_tool() -> MCPDiscoveryTool:
"""Create an MCPDiscoveryTool instance for testing."""
return MCPDiscoveryTool()
class TestMCPDiscoveryToolSchema:
"""Tests for MCPDiscoveryToolSchema."""
def test_schema_with_required_fields(self) -> None:
"""Test schema with only required fields."""
schema = MCPDiscoveryToolSchema(need="database server")
assert schema.need == "database server"
assert schema.constraints is None
assert schema.limit == 5
def test_schema_with_all_fields(self) -> None:
"""Test schema with all fields."""
constraints = MCPDiscoveryConstraints(
max_latency_ms=200,
required_features=["auth", "realtime"],
exclude_servers=["deprecated-server"],
)
schema = MCPDiscoveryToolSchema(
need="database with authentication",
constraints=constraints,
limit=3,
)
assert schema.need == "database with authentication"
assert schema.constraints is not None
assert schema.constraints.max_latency_ms == 200
assert schema.constraints.required_features == ["auth", "realtime"]
assert schema.constraints.exclude_servers == ["deprecated-server"]
assert schema.limit == 3
def test_schema_limit_validation(self) -> None:
"""Test that limit is validated to be between 1 and 10."""
with pytest.raises(ValueError):
MCPDiscoveryToolSchema(need="test", limit=0)
with pytest.raises(ValueError):
MCPDiscoveryToolSchema(need="test", limit=11)
class TestMCPDiscoveryConstraints:
"""Tests for MCPDiscoveryConstraints."""
def test_empty_constraints(self) -> None:
"""Test creating empty constraints."""
constraints = MCPDiscoveryConstraints()
assert constraints.max_latency_ms is None
assert constraints.required_features is None
assert constraints.exclude_servers is None
def test_full_constraints(self) -> None:
"""Test creating constraints with all fields."""
constraints = MCPDiscoveryConstraints(
max_latency_ms=100,
required_features=["feature1", "feature2"],
exclude_servers=["server1", "server2"],
)
assert constraints.max_latency_ms == 100
assert constraints.required_features == ["feature1", "feature2"]
assert constraints.exclude_servers == ["server1", "server2"]
class TestMCPDiscoveryTool:
"""Tests for MCPDiscoveryTool."""
def test_tool_initialization(self, discovery_tool: MCPDiscoveryTool) -> None:
"""Test tool initialization with default values."""
assert discovery_tool.name == "Discover MCP Server"
assert "MCP" in discovery_tool.description
assert discovery_tool.base_url == "https://mcp-discovery-production.up.railway.app"
assert discovery_tool.timeout == 30
def test_build_request_payload_basic(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test building request payload with basic parameters."""
payload = discovery_tool._build_request_payload(
need="database server",
constraints=None,
limit=5,
)
assert payload == {"need": "database server", "limit": 5}
def test_build_request_payload_with_constraints(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test building request payload with constraints."""
constraints = MCPDiscoveryConstraints(
max_latency_ms=200,
required_features=["auth"],
exclude_servers=["old-server"],
)
payload = discovery_tool._build_request_payload(
need="database",
constraints=constraints,
limit=3,
)
assert payload["need"] == "database"
assert payload["limit"] == 3
assert "constraints" in payload
assert payload["constraints"]["max_latency_ms"] == 200
assert payload["constraints"]["required_features"] == ["auth"]
assert payload["constraints"]["exclude_servers"] == ["old-server"]
def test_build_request_payload_partial_constraints(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test building request payload with partial constraints."""
constraints = MCPDiscoveryConstraints(max_latency_ms=100)
payload = discovery_tool._build_request_payload(
need="test",
constraints=constraints,
limit=5,
)
assert payload["constraints"] == {"max_latency_ms": 100}
def test_process_recommendations(
self, discovery_tool: MCPDiscoveryTool, mock_api_response: dict
) -> None:
"""Test processing recommendations from API response."""
recommendations = discovery_tool._process_recommendations(
mock_api_response["recommendations"]
)
assert len(recommendations) == 2
first_rec = recommendations[0]
assert first_rec["server"] == "sqlite-server"
assert first_rec["npm_package"] == "@modelcontextprotocol/server-sqlite"
assert first_rec["confidence"] == 0.38
assert first_rec["capabilities"] == ["sqlite", "sql", "database", "embedded"]
assert first_rec["metrics"]["avg_latency_ms"] == 50.0
assert first_rec["metrics"]["uptime_pct"] == 99.9
second_rec = recommendations[1]
assert second_rec["server"] == "postgres-server"
assert second_rec["metrics"]["avg_latency_ms"] is None
def test_process_recommendations_with_malformed_data(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test processing recommendations with malformed data."""
malformed_recommendations = [
{"server": "valid-server", "confidence": 0.5},
None,
{"invalid": "data"},
]
recommendations = discovery_tool._process_recommendations(
malformed_recommendations
)
assert len(recommendations) >= 1
assert recommendations[0]["server"] == "valid-server"
def test_format_result_with_recommendations(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test formatting results with recommendations."""
result: MCPDiscoveryResult = {
"recommendations": [
{
"server": "test-server",
"npm_package": "@test/server",
"install_command": "npx -y @test/server",
"confidence": 0.85,
"description": "A test server",
"capabilities": ["test", "demo"],
"metrics": {
"avg_latency_ms": 100.0,
"uptime_pct": 99.5,
"last_checked": "2026-01-17T10:00:00Z",
},
"docs_url": "https://example.com/docs",
"github_url": "https://github.com/test/server",
}
],
"total_found": 1,
"query_time_ms": 150,
}
formatted = discovery_tool._format_result(result)
assert "Found 1 MCP server(s)" in formatted
assert "test-server" in formatted
assert "85% confidence" in formatted
assert "A test server" in formatted
assert "test, demo" in formatted
assert "npx -y @test/server" in formatted
assert "100.0ms" in formatted
assert "99.5%" in formatted
def test_format_result_empty(self, discovery_tool: MCPDiscoveryTool) -> None:
"""Test formatting results with no recommendations."""
result: MCPDiscoveryResult = {
"recommendations": [],
"total_found": 0,
"query_time_ms": 50,
}
formatted = discovery_tool._format_result(result)
assert "No MCP servers found" in formatted
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_success(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test successful API request."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool._make_api_request({"need": "database", "limit": 5})
assert result == mock_api_response
mock_post.assert_called_once()
call_args = mock_post.call_args
assert call_args[1]["json"] == {"need": "database", "limit": 5}
assert call_args[1]["timeout"] == 30
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_with_api_key(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test API request with API key."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
with patch.dict("os.environ", {"MCP_DISCOVERY_API_KEY": "test-key"}):
discovery_tool._make_api_request({"need": "test", "limit": 5})
call_args = mock_post.call_args
assert "Authorization" in call_args[1]["headers"]
assert call_args[1]["headers"]["Authorization"] == "Bearer test-key"
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_empty_response(
self, mock_post: MagicMock, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test API request with empty response."""
mock_response = MagicMock()
mock_response.json.return_value = {}
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
with pytest.raises(ValueError, match="Empty response"):
discovery_tool._make_api_request({"need": "test", "limit": 5})
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_network_error(
self, mock_post: MagicMock, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test API request with network error."""
mock_post.side_effect = requests.exceptions.ConnectionError("Network error")
with pytest.raises(requests.exceptions.ConnectionError):
discovery_tool._make_api_request({"need": "test", "limit": 5})
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_json_decode_error(
self, mock_post: MagicMock, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test API request with JSON decode error."""
mock_response = MagicMock()
mock_response.json.side_effect = json.JSONDecodeError("Error", "", 0)
mock_response.raise_for_status.return_value = None
mock_response.content = b"invalid json"
mock_post.return_value = mock_response
with pytest.raises(json.JSONDecodeError):
discovery_tool._make_api_request({"need": "test", "limit": 5})
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_run_success(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test successful _run execution."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool._run(need="database server")
assert "sqlite-server" in result
assert "postgres-server" in result
assert "Found 2 MCP server(s)" in result
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_run_with_constraints(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test _run with constraints."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool._run(
need="database",
constraints={"max_latency_ms": 100, "required_features": ["sql"]},
limit=3,
)
assert "sqlite-server" in result
call_args = mock_post.call_args
payload = call_args[1]["json"]
assert payload["constraints"]["max_latency_ms"] == 100
assert payload["constraints"]["required_features"] == ["sql"]
assert payload["limit"] == 3
def test_run_missing_need_parameter(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test _run with missing need parameter."""
with pytest.raises(ValueError, match="'need' parameter is required"):
discovery_tool._run()
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_discover_method(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test the discover convenience method."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool.discover(
need="database",
constraints=MCPDiscoveryConstraints(max_latency_ms=200),
limit=3,
)
assert isinstance(result, dict)
assert "recommendations" in result
assert "total_found" in result
assert "query_time_ms" in result
assert len(result["recommendations"]) == 2
assert result["total_found"] == 2
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_discover_returns_structured_data(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test that discover returns properly structured data."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool.discover(need="database")
first_rec = result["recommendations"][0]
assert "server" in first_rec
assert "npm_package" in first_rec
assert "install_command" in first_rec
assert "confidence" in first_rec
assert "description" in first_rec
assert "capabilities" in first_rec
assert "metrics" in first_rec
assert "docs_url" in first_rec
assert "github_url" in first_rec
class TestMCPDiscoveryToolIntegration:
"""Integration tests for MCPDiscoveryTool (requires network)."""
@pytest.mark.skip(reason="Integration test - requires network access")
def test_real_api_call(self) -> None:
"""Test actual API call to MCP Discovery service."""
tool = MCPDiscoveryTool()
result = tool._run(need="database", limit=3)
assert "MCP server" in result or "No MCP servers found" in result

View File

@@ -189,9 +189,14 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any]
Returns:
The potentially modified inputs dictionary after before callbacks.
"""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import get_current_parent_id
from crewai.events.types.crew_events import CrewKickoffStartedEvent
if get_current_parent_id() is None:
reset_emission_counter()
for before_callback in crew.before_kickoff_callbacks:
if inputs is None:
inputs = {}

View File

@@ -1,9 +1,35 @@
from collections.abc import Iterator
from datetime import datetime, timezone
import itertools
from typing import Any
import uuid
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
from crewai.utilities.serialization import Serializable, to_serializable
_emission_counter: Iterator[int] = itertools.count(start=1)
def get_next_emission_sequence() -> int:
"""Get the next emission sequence number.
Thread-safe due to atomic next() on itertools.count under the GIL.
Returns:
The next sequence number.
"""
return next(_emission_counter)
def reset_emission_counter() -> None:
"""Reset the emission sequence counter to 1.
Useful for test isolation.
"""
global _emission_counter
_emission_counter = itertools.count(start=1)
class BaseEvent(BaseModel):
@@ -22,7 +48,11 @@ class BaseEvent(BaseModel):
agent_id: str | None = None
agent_role: str | None = None
def to_json(self, exclude: set[str] | None = None):
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
parent_event_id: str | None = None
emission_sequence: int | None = None
def to_json(self, exclude: set[str] | None = None) -> Serializable:
"""
Converts the event to a JSON-serializable dictionary.
@@ -34,13 +64,13 @@ class BaseEvent(BaseModel):
"""
return to_serializable(self, exclude=exclude)
def _set_task_params(self, data: dict[str, Any]):
def _set_task_params(self, data: dict[str, Any]) -> None:
if "from_task" in data and (task := data["from_task"]):
self.task_id = str(task.id)
self.task_name = task.name or task.description
self.from_task = None
def _set_agent_params(self, data: dict[str, Any]):
def _set_agent_params(self, data: dict[str, Any]) -> None:
task = data.get("from_task", None)
agent = task.agent if task else data.get("from_agent", None)

View File

@@ -16,8 +16,19 @@ from typing import Any, Final, ParamSpec, TypeVar
from typing_extensions import Self
from crewai.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent, get_next_emission_sequence
from crewai.events.depends import Depends
from crewai.events.event_context import (
SCOPE_ENDING_EVENTS,
SCOPE_STARTING_EVENTS,
VALID_EVENT_PAIRS,
get_current_parent_id,
get_enclosing_parent_id,
handle_empty_pop,
handle_mismatch,
pop_event_scope,
push_event_scope,
)
from crewai.events.handler_graph import build_execution_plan
from crewai.events.types.event_bus_types import (
AsyncHandler,
@@ -69,6 +80,8 @@ class CrewAIEventsBus:
_execution_plan_cache: dict[type[BaseEvent], ExecutionPlan]
_console: ConsoleFormatter
_shutting_down: bool
_pending_futures: set[Future[Any]]
_futures_lock: threading.Lock
def __new__(cls) -> Self:
"""Create or return the singleton instance.
@@ -91,6 +104,8 @@ class CrewAIEventsBus:
"""
self._shutting_down = False
self._rwlock = RWLock()
self._pending_futures: set[Future[Any]] = set()
self._futures_lock = threading.Lock()
self._sync_handlers: dict[type[BaseEvent], SyncHandlerSet] = {}
self._async_handlers: dict[type[BaseEvent], AsyncHandlerSet] = {}
self._handler_dependencies: dict[
@@ -111,6 +126,25 @@ class CrewAIEventsBus:
)
self._loop_thread.start()
def _track_future(self, future: Future[Any]) -> Future[Any]:
"""Track a future and set up automatic cleanup when it completes.
Args:
future: The future to track
Returns:
The same future for chaining
"""
with self._futures_lock:
self._pending_futures.add(future)
def _cleanup(f: Future[Any]) -> None:
with self._futures_lock:
self._pending_futures.discard(f)
future.add_done_callback(_cleanup)
return future
def _run_loop(self) -> None:
"""Run the background async event loop."""
asyncio.set_event_loop(self._loop)
@@ -326,6 +360,25 @@ class CrewAIEventsBus:
... await asyncio.wrap_future(future) # In async test
... # or future.result(timeout=5.0) in sync code
"""
event.emission_sequence = get_next_emission_sequence()
if event.parent_event_id is None:
event_type_name = event.type
if event_type_name in SCOPE_ENDING_EVENTS:
event.parent_event_id = get_enclosing_parent_id()
popped = pop_event_scope()
if popped is None:
handle_empty_pop(event_type_name)
else:
_, popped_type = popped
expected_start = VALID_EVENT_PAIRS.get(event_type_name)
if expected_start and popped_type and popped_type != expected_start:
handle_mismatch(event_type_name, popped_type, expected_start)
elif event_type_name in SCOPE_STARTING_EVENTS:
event.parent_event_id = get_current_parent_id()
push_event_scope(event.event_id, event_type_name)
else:
event.parent_event_id = get_current_parent_id()
event_type = type(event)
with self._rwlock.r_locked():
@@ -339,9 +392,11 @@ class CrewAIEventsBus:
async_handlers = self._async_handlers.get(event_type, frozenset())
if has_dependencies:
return asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies(source, event),
self._loop,
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies(source, event),
self._loop,
)
)
if sync_handlers:
@@ -353,16 +408,53 @@ class CrewAIEventsBus:
ctx.run, self._call_handlers, source, event, sync_handlers
)
if not async_handlers:
return sync_future
return self._track_future(sync_future)
if async_handlers:
return asyncio.run_coroutine_threadsafe(
self._acall_handlers(source, event, async_handlers),
self._loop,
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers(source, event, async_handlers),
self._loop,
)
)
return None
def flush(self, timeout: float | None = None) -> bool:
"""Block until all pending event handlers complete.
This method waits for all futures from previously emitted events to
finish executing. Useful at the end of operations (like kickoff) to
ensure all event handlers have completed before returning.
Args:
timeout: Maximum time in seconds to wait for handlers to complete.
If None, waits indefinitely.
Returns:
True if all handlers completed, False if timeout occurred.
"""
with self._futures_lock:
futures_to_wait = list(self._pending_futures)
if not futures_to_wait:
return True
from concurrent.futures import wait as wait_futures
done, not_done = wait_futures(futures_to_wait, timeout=timeout)
# Check for exceptions in completed futures
errors = [
future.exception() for future in done if future.exception() is not None
]
for error in errors:
self._console.print(
f"[CrewAIEventsBus] Handler exception during flush: {error}"
)
return len(not_done) == 0
async def aemit(self, source: Any, event: BaseEvent) -> None:
"""Asynchronously emit an event to registered async handlers.
@@ -464,6 +556,9 @@ class CrewAIEventsBus:
wait: If True, wait for all pending tasks to complete before stopping.
If False, cancel all pending tasks immediately.
"""
if wait:
self.flush()
with self._rwlock.w_locked():
self._shutting_down = True
loop = getattr(self, "_loop", None)

View File

@@ -0,0 +1,260 @@
"""Event context management for parent-child relationship tracking."""
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from dataclasses import dataclass
from enum import Enum
from crewai.events.utils.console_formatter import ConsoleFormatter
class MismatchBehavior(Enum):
"""Behavior when event pairs don't match."""
WARN = "warn"
RAISE = "raise"
SILENT = "silent"
@dataclass
class EventContextConfig:
"""Configuration for event context behavior."""
max_stack_depth: int = 100
mismatch_behavior: MismatchBehavior = MismatchBehavior.WARN
empty_pop_behavior: MismatchBehavior = MismatchBehavior.WARN
class StackDepthExceededError(Exception):
"""Raised when stack depth limit is exceeded."""
class EventPairingError(Exception):
"""Raised when event pairs don't match."""
class EmptyStackError(Exception):
"""Raised when popping from empty stack."""
_event_id_stack: contextvars.ContextVar[tuple[tuple[str, str], ...]] = (
contextvars.ContextVar("_event_id_stack", default=())
)
_event_context_config: contextvars.ContextVar[EventContextConfig | None] = (
contextvars.ContextVar("_event_context_config", default=None)
)
_default_config = EventContextConfig()
_console = ConsoleFormatter()
def get_current_parent_id() -> str | None:
"""Get the current parent event ID from the stack."""
stack = _event_id_stack.get()
return stack[-1][0] if stack else None
def get_enclosing_parent_id() -> str | None:
"""Get the parent of the current scope (stack[-2])."""
stack = _event_id_stack.get()
return stack[-2][0] if len(stack) >= 2 else None
def push_event_scope(event_id: str, event_type: str = "") -> None:
"""Push an event ID and type onto the scope stack."""
config = _event_context_config.get() or _default_config
stack = _event_id_stack.get()
if 0 < config.max_stack_depth <= len(stack):
raise StackDepthExceededError(
f"Event stack depth limit ({config.max_stack_depth}) exceeded. "
f"This usually indicates missing ending events."
)
_event_id_stack.set((*stack, (event_id, event_type)))
def pop_event_scope() -> tuple[str, str] | None:
"""Pop an event entry from the scope stack."""
stack = _event_id_stack.get()
if not stack:
return None
_event_id_stack.set(stack[:-1])
return stack[-1]
def handle_empty_pop(event_type_name: str) -> None:
"""Handle a pop attempt on an empty stack."""
config = _event_context_config.get() or _default_config
msg = (
f"Ending event '{event_type_name}' emitted with empty scope stack. "
"Missing starting event?"
)
if config.empty_pop_behavior == MismatchBehavior.RAISE:
raise EmptyStackError(msg)
if config.empty_pop_behavior == MismatchBehavior.WARN:
_console.print(f"[CrewAIEventsBus] Warning: {msg}")
def handle_mismatch(
event_type_name: str,
popped_type: str,
expected_start: str,
) -> None:
"""Handle a mismatched event pair."""
config = _event_context_config.get() or _default_config
msg = (
f"Event pairing mismatch. '{event_type_name}' closed '{popped_type}' "
f"(expected '{expected_start}')"
)
if config.mismatch_behavior == MismatchBehavior.RAISE:
raise EventPairingError(msg)
if config.mismatch_behavior == MismatchBehavior.WARN:
_console.print(f"[CrewAIEventsBus] Warning: {msg}")
@contextmanager
def event_scope(event_id: str, event_type: str = "") -> Generator[None, None, None]:
"""Context manager to establish a parent event scope."""
stack = _event_id_stack.get()
already_on_stack = any(entry[0] == event_id for entry in stack)
if not already_on_stack:
push_event_scope(event_id, event_type)
try:
yield
finally:
if not already_on_stack:
pop_event_scope()
SCOPE_STARTING_EVENTS: frozenset[str] = frozenset(
{
"flow_started",
"method_execution_started",
"crew_kickoff_started",
"crew_train_started",
"crew_test_started",
"agent_execution_started",
"agent_evaluation_started",
"lite_agent_execution_started",
"task_started",
"llm_call_started",
"llm_guardrail_started",
"tool_usage_started",
"mcp_connection_started",
"mcp_tool_execution_started",
"memory_retrieval_started",
"memory_save_started",
"memory_query_started",
"knowledge_query_started",
"knowledge_search_query_started",
"a2a_delegation_started",
"a2a_conversation_started",
"a2a_server_task_started",
"a2a_parallel_delegation_started",
"agent_reasoning_started",
}
)
SCOPE_ENDING_EVENTS: frozenset[str] = frozenset(
{
"flow_finished",
"flow_paused",
"method_execution_finished",
"method_execution_failed",
"crew_kickoff_completed",
"crew_kickoff_failed",
"crew_train_completed",
"crew_train_failed",
"crew_test_completed",
"crew_test_failed",
"agent_execution_completed",
"agent_execution_error",
"agent_evaluation_completed",
"agent_evaluation_failed",
"lite_agent_execution_completed",
"lite_agent_execution_error",
"task_completed",
"task_failed",
"llm_call_completed",
"llm_call_failed",
"llm_guardrail_completed",
"llm_guardrail_failed",
"tool_usage_finished",
"tool_usage_error",
"mcp_connection_completed",
"mcp_connection_failed",
"mcp_tool_execution_completed",
"mcp_tool_execution_failed",
"memory_retrieval_completed",
"memory_save_completed",
"memory_save_failed",
"memory_query_completed",
"memory_query_failed",
"knowledge_query_completed",
"knowledge_query_failed",
"knowledge_search_query_completed",
"knowledge_search_query_failed",
"a2a_delegation_completed",
"a2a_conversation_completed",
"a2a_server_task_completed",
"a2a_server_task_canceled",
"a2a_server_task_failed",
"a2a_parallel_delegation_completed",
"agent_reasoning_completed",
"agent_reasoning_failed",
}
)
VALID_EVENT_PAIRS: dict[str, str] = {
"flow_finished": "flow_started",
"flow_paused": "flow_started",
"method_execution_finished": "method_execution_started",
"method_execution_failed": "method_execution_started",
"crew_kickoff_completed": "crew_kickoff_started",
"crew_kickoff_failed": "crew_kickoff_started",
"crew_train_completed": "crew_train_started",
"crew_train_failed": "crew_train_started",
"crew_test_completed": "crew_test_started",
"crew_test_failed": "crew_test_started",
"agent_execution_completed": "agent_execution_started",
"agent_execution_error": "agent_execution_started",
"agent_evaluation_completed": "agent_evaluation_started",
"agent_evaluation_failed": "agent_evaluation_started",
"lite_agent_execution_completed": "lite_agent_execution_started",
"lite_agent_execution_error": "lite_agent_execution_started",
"task_completed": "task_started",
"task_failed": "task_started",
"llm_call_completed": "llm_call_started",
"llm_call_failed": "llm_call_started",
"llm_guardrail_completed": "llm_guardrail_started",
"llm_guardrail_failed": "llm_guardrail_started",
"tool_usage_finished": "tool_usage_started",
"tool_usage_error": "tool_usage_started",
"mcp_connection_completed": "mcp_connection_started",
"mcp_connection_failed": "mcp_connection_started",
"mcp_tool_execution_completed": "mcp_tool_execution_started",
"mcp_tool_execution_failed": "mcp_tool_execution_started",
"memory_retrieval_completed": "memory_retrieval_started",
"memory_save_completed": "memory_save_started",
"memory_save_failed": "memory_save_started",
"memory_query_completed": "memory_query_started",
"memory_query_failed": "memory_query_started",
"knowledge_query_completed": "knowledge_query_started",
"knowledge_query_failed": "knowledge_query_started",
"knowledge_search_query_completed": "knowledge_search_query_started",
"knowledge_search_query_failed": "knowledge_search_query_started",
"a2a_delegation_completed": "a2a_delegation_started",
"a2a_conversation_completed": "a2a_conversation_started",
"a2a_server_task_completed": "a2a_server_task_started",
"a2a_server_task_canceled": "a2a_server_task_started",
"a2a_server_task_failed": "a2a_server_task_started",
"a2a_parallel_delegation_completed": "a2a_parallel_delegation_started",
"agent_reasoning_completed": "agent_reasoning_started",
"agent_reasoning_failed": "agent_reasoning_started",
}

View File

@@ -267,9 +267,12 @@ class TraceBatchManager:
sorted_events = sorted(
self.event_buffer,
key=lambda e: e.timestamp
if hasattr(e, "timestamp") and e.timestamp
else "",
key=lambda e: (
e.emission_sequence
if e.emission_sequence is not None
else float("inf"),
e.timestamp if hasattr(e, "timestamp") and e.timestamp else "",
),
)
self.current_batch.events = sorted_events

View File

@@ -9,6 +9,7 @@ from typing_extensions import Self
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.listeners.tracing.first_time_trace_handler import (
FirstTimeTraceHandler,
@@ -616,7 +617,7 @@ class TraceCollectionListener(BaseEventListener):
if self.batch_manager.is_batch_initialized():
self.batch_manager.finalize_batch()
def _initialize_crew_batch(self, source: Any, event: Any) -> None:
def _initialize_crew_batch(self, source: Any, event: BaseEvent) -> None:
"""Initialize trace batch.
Args:
@@ -626,7 +627,7 @@ class TraceCollectionListener(BaseEventListener):
user_context = self._get_user_context()
execution_metadata = {
"crew_name": getattr(event, "crew_name", "Unknown Crew"),
"execution_start": event.timestamp if hasattr(event, "timestamp") else None,
"execution_start": event.timestamp,
"crewai_version": get_crewai_version(),
}
@@ -635,7 +636,7 @@ class TraceCollectionListener(BaseEventListener):
self._initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any) -> None:
def _initialize_flow_batch(self, source: Any, event: BaseEvent) -> None:
"""Initialize trace batch for Flow execution.
Args:
@@ -645,7 +646,7 @@ class TraceCollectionListener(BaseEventListener):
user_context = self._get_user_context()
execution_metadata = {
"flow_name": getattr(event, "flow_name", "Unknown Flow"),
"execution_start": event.timestamp if hasattr(event, "timestamp") else None,
"execution_start": event.timestamp,
"crewai_version": get_crewai_version(),
"execution_type": "flow",
}
@@ -714,18 +715,16 @@ class TraceCollectionListener(BaseEventListener):
self.batch_manager.end_event_processing()
def _create_trace_event(
self, event_type: str, source: Any, event: Any
self, event_type: str, source: Any, event: BaseEvent
) -> TraceEvent:
"""Create a trace event"""
if hasattr(event, "timestamp") and event.timestamp:
trace_event = TraceEvent(
type=event_type,
timestamp=event.timestamp.isoformat(),
)
else:
trace_event = TraceEvent(
type=event_type,
)
"""Create a trace event with ordering information."""
trace_event = TraceEvent(
type=event_type,
timestamp=event.timestamp.isoformat() if event.timestamp else "",
event_id=event.event_id,
emission_sequence=event.emission_sequence,
parent_event_id=event.parent_event_id,
)
trace_event.event_data = self._build_event_data(event_type, event, source)
@@ -778,10 +777,8 @@ class TraceCollectionListener(BaseEventListener):
}
if event_type == "llm_call_started":
event_data = safe_serialize_to_dict(event)
event_data["task_name"] = (
event.task_name or event.task_description
if hasattr(event, "task_name") and event.task_name
else None
event_data["task_name"] = event.task_name or getattr(
event, "task_description", None
)
return event_data
if event_type == "llm_call_completed":

View File

@@ -15,5 +15,8 @@ class TraceEvent:
type: str = ""
event_data: dict[str, Any] = field(default_factory=dict)
emission_sequence: int | None = None
parent_event_id: str | None = None
def to_dict(self) -> dict[str, Any]:
return asdict(self)

View File

@@ -30,7 +30,9 @@ from pydantic import BaseModel, Field, ValidationError
from rich.console import Console
from rich.panel import Panel
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import get_current_parent_id
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
@@ -73,6 +75,7 @@ from crewai.flow.utils import (
is_simple_flow_condition,
)
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.human_feedback import HumanFeedbackResult
@@ -570,7 +573,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
flow_id: str,
persistence: FlowPersistence | None = None,
**kwargs: Any,
) -> "Flow[Any]":
) -> Flow[Any]:
"""Create a Flow instance from a pending feedback state.
This classmethod is used to restore a flow that was paused waiting
@@ -631,7 +634,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return instance
@property
def pending_feedback(self) -> "PendingFeedbackContext | None":
def pending_feedback(self) -> PendingFeedbackContext | None:
"""Get the pending feedback context if this flow is waiting for feedback.
Returns:
@@ -716,9 +719,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
Raises:
ValueError: If no pending feedback context exists
"""
from crewai.flow.human_feedback import HumanFeedbackResult
from datetime import datetime
from crewai.flow.human_feedback import HumanFeedbackResult
if self._pending_feedback_context is None:
raise ValueError(
"No pending feedback context. Use from_pending() to restore a paused flow."
@@ -744,7 +748,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
collapsed_outcome = self._collapse_to_outcome(
feedback=feedback,
outcomes=emit,
llm=llm,
llm=llm, # type: ignore[arg-type]
)
# Create result
@@ -792,13 +796,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_outputs.append(collapsed_outcome)
# Then trigger listeners for the outcome (e.g., "approved" triggers @listen("approved"))
final_result = await self._execute_listeners(
final_result = await self._execute_listeners( # type: ignore[func-returns-value]
FlowMethodName(collapsed_outcome), # Use outcome as trigger
result, # Pass HumanFeedbackResult to listeners
)
else:
# Normal behavior - pass the HumanFeedbackResult
final_result = await self._execute_listeners(
final_result = await self._execute_listeners( # type: ignore[func-returns-value]
FlowMethodName(context.method_name),
result,
)
@@ -901,11 +905,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
model_fields = getattr(self.initial_state, "model_fields", None)
if not model_fields or "id" not in model_fields:
raise ValueError("Flow state model must have an 'id' field")
instance = self.initial_state()
instance = self.initial_state() # type: ignore[assignment]
# Ensure id is set - generate UUID if empty
if not getattr(instance, "id", None):
object.__setattr__(instance, "id", str(uuid4()))
return instance
return instance # type: ignore[return-value]
if self.initial_state is dict:
return cast(T, {"id": str(uuid4())})
@@ -1326,6 +1330,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
if filtered_inputs:
self._initialize_state(filtered_inputs)
if get_current_parent_id() is None:
reset_emission_counter()
# Emit FlowStartedEvent and log the start of the flow.
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
@@ -2053,7 +2060,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if isinstance(llm, str):
llm_instance = LLM(model=llm)
elif isinstance(llm, BaseLLMClass):
llm_instance = llm
llm_instance = llm # type: ignore[assignment]
else:
raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.")
@@ -2090,7 +2097,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
try:
parsed = json.loads(response)
return parsed.get("outcome", outcomes[0])
return parsed.get("outcome", outcomes[0]) # type: ignore[no-any-return]
except json.JSONDecodeError:
# Not valid JSON, might be raw outcome string
response_clean = response.strip()
@@ -2099,9 +2106,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
return outcome
return outcomes[0]
elif isinstance(response, FeedbackOutcome):
return response.outcome
return response.outcome # type: ignore[no-any-return]
elif hasattr(response, "outcome"):
return response.outcome
return response.outcome # type: ignore[no-any-return]
else:
# Unexpected type, fall back to first outcome
logger.warning(f"Unexpected response type: {type(response)}")

View File

@@ -241,6 +241,9 @@ class ToolUsage:
if self.task:
self.task.increment_tools_errors()
started_at = time.time()
started_event_emitted = False
if self.agent:
event_data = {
"agent_key": self.agent.key,
@@ -258,151 +261,162 @@ class ToolUsage:
event_data["task_name"] = self.task.name or self.task.description
event_data["task_id"] = str(self.task.id)
crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data))
started_event_emitted = True
started_at = time.time()
from_cache = False
result = None # type: ignore
should_retry = False
available_tool = None
if self.tools_handler and self.tools_handler.cache:
input_str = ""
if calling.arguments:
if isinstance(calling.arguments, dict):
input_str = json.dumps(calling.arguments)
else:
input_str = str(calling.arguments)
try:
if self.tools_handler and self.tools_handler.cache:
input_str = ""
if calling.arguments:
if isinstance(calling.arguments, dict):
input_str = json.dumps(calling.arguments)
else:
input_str = str(calling.arguments)
result = self.tools_handler.cache.read(
tool=calling.tool_name, input=input_str
) # type: ignore
from_cache = result is not None
result = self.tools_handler.cache.read(
tool=calling.tool_name, input=input_str
) # type: ignore
from_cache = result is not None
available_tool = next(
(
available_tool
for available_tool in self.tools
if available_tool.name == tool.name
),
None,
)
available_tool = next(
(
available_tool
for available_tool in self.tools
if available_tool.name == tool.name
),
None,
)
usage_limit_error = self._check_usage_limit(available_tool, tool.name)
if usage_limit_error:
try:
usage_limit_error = self._check_usage_limit(available_tool, tool.name)
if usage_limit_error:
result = usage_limit_error
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
return self._format_result(result=result)
except Exception:
if self.task:
self.task.increment_tools_errors()
if result is None:
try:
if calling.tool_name in [
"Delegate work to coworker",
"Ask question to coworker",
]:
coworker = (
calling.arguments.get("coworker") if calling.arguments else None
)
if self.task:
self.task.increment_delegations(coworker)
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys()
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
arguments = self._add_fingerprint_metadata(arguments)
result = await tool.ainvoke(input=arguments)
except Exception:
arguments = calling.arguments
arguments = self._add_fingerprint_metadata(arguments)
result = await tool.ainvoke(input=arguments)
else:
arguments = self._add_fingerprint_metadata({})
result = await tool.ainvoke(input=arguments)
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors("tool_usage_exception").format(
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
result = self._format_result(result=result)
# Don't return early - fall through to finally block
elif result is None:
try:
if calling.tool_name in [
"Delegate work to coworker",
"Ask question to coworker",
]:
coworker = (
calling.arguments.get("coworker")
if calling.arguments
else None
)
return error
if self.task:
self.task.increment_delegations(coworker)
if self.task:
self.task.increment_tools_errors()
return await self.ause(calling=calling, tool_string=tool_string)
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys()
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
arguments = self._add_fingerprint_metadata(arguments)
result = await tool.ainvoke(input=arguments)
except Exception:
arguments = calling.arguments
arguments = self._add_fingerprint_metadata(arguments)
result = await tool.ainvoke(input=arguments)
else:
arguments = self._add_fingerprint_metadata({})
result = await tool.ainvoke(input=arguments)
if self.tools_handler:
should_cache = True
if (
hasattr(available_tool, "cache_function")
and available_tool.cache_function
):
should_cache = available_tool.cache_function(
calling.arguments, result
if self.tools_handler:
should_cache = True
if (
hasattr(available_tool, "cache_function")
and available_tool.cache_function
):
should_cache = available_tool.cache_function(
calling.arguments, result
)
self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache
)
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
attempts=self._run_attempts,
)
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
}
self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache
if (
hasattr(available_tool, "result_as_answer")
and available_tool.result_as_answer
):
result_as_answer = available_tool.result_as_answer
data["result_as_answer"] = result_as_answer
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
if available_tool and hasattr(
available_tool, "current_usage_count"
):
available_tool.current_usage_count += 1
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors(
"tool_usage_exception"
).format(error=e, tool=tool.name, tool_inputs=tool.description)
result = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
)
else:
if self.task:
self.task.increment_tools_errors()
should_retry = True
else:
result = self._format_result(result=result)
finally:
if started_event_emitted:
self.on_tool_use_finished(
tool=tool,
tool_calling=calling,
from_cache=from_cache,
started_at=started_at,
result=result,
)
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
attempts=self._run_attempts,
)
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
}
self.on_tool_use_finished(
tool=tool,
tool_calling=calling,
from_cache=from_cache,
started_at=started_at,
result=result,
)
if (
hasattr(available_tool, "result_as_answer")
and available_tool.result_as_answer # type: ignore
):
result_as_answer = available_tool.result_as_answer # type: ignore
data["result_as_answer"] = result_as_answer # type: ignore
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
if available_tool and hasattr(available_tool, "current_usage_count"):
available_tool.current_usage_count += 1
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
# Handle retry after finally block ensures finished event was emitted
if should_retry:
return await self.ause(calling=calling, tool_string=tool_string)
return result
@@ -412,6 +426,7 @@ class ToolUsage:
tool: CrewStructuredTool,
calling: ToolCalling | InstructorToolCalling,
) -> str:
# Repeated usage check happens before event emission - safe to return early
if self._check_tool_repeated_usage(calling=calling):
try:
result = self._i18n.errors("task_repeated_usage").format(
@@ -428,6 +443,9 @@ class ToolUsage:
if self.task:
self.task.increment_tools_errors()
started_at = time.time()
started_event_emitted = False
if self.agent:
event_data = {
"agent_key": self.agent.key,
@@ -446,155 +464,162 @@ class ToolUsage:
event_data["task_name"] = self.task.name or self.task.description
event_data["task_id"] = str(self.task.id)
crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data))
started_event_emitted = True
started_at = time.time()
from_cache = False
result = None # type: ignore
should_retry = False
available_tool = None
if self.tools_handler and self.tools_handler.cache:
input_str = ""
if calling.arguments:
if isinstance(calling.arguments, dict):
import json
try:
if self.tools_handler and self.tools_handler.cache:
input_str = ""
if calling.arguments:
if isinstance(calling.arguments, dict):
input_str = json.dumps(calling.arguments)
else:
input_str = str(calling.arguments)
input_str = json.dumps(calling.arguments)
else:
input_str = str(calling.arguments)
result = self.tools_handler.cache.read(
tool=calling.tool_name, input=input_str
) # type: ignore
from_cache = result is not None
result = self.tools_handler.cache.read(
tool=calling.tool_name, input=input_str
) # type: ignore
from_cache = result is not None
available_tool = next(
(
available_tool
for available_tool in self.tools
if available_tool.name == tool.name
),
None,
)
available_tool = next(
(
available_tool
for available_tool in self.tools
if available_tool.name == tool.name
),
None,
)
usage_limit_error = self._check_usage_limit(available_tool, tool.name)
if usage_limit_error:
try:
usage_limit_error = self._check_usage_limit(available_tool, tool.name)
if usage_limit_error:
result = usage_limit_error
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
return self._format_result(result=result)
except Exception:
if self.task:
self.task.increment_tools_errors()
if result is None:
try:
if calling.tool_name in [
"Delegate work to coworker",
"Ask question to coworker",
]:
coworker = (
calling.arguments.get("coworker") if calling.arguments else None
)
if self.task:
self.task.increment_delegations(coworker)
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys()
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
# Add fingerprint metadata if available
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
except Exception:
arguments = calling.arguments
# Add fingerprint metadata if available
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
else:
# Add fingerprint metadata even to empty arguments
arguments = self._add_fingerprint_metadata({})
result = tool.invoke(input=arguments)
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors("tool_usage_exception").format(
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
result = self._format_result(result=result)
# Don't return early - fall through to finally block
elif result is None:
try:
if calling.tool_name in [
"Delegate work to coworker",
"Ask question to coworker",
]:
coworker = (
calling.arguments.get("coworker")
if calling.arguments
else None
)
return error
if self.task:
self.task.increment_delegations(coworker)
if self.task:
self.task.increment_tools_errors()
return self.use(calling=calling, tool_string=tool_string)
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys()
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
except Exception:
arguments = calling.arguments
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
else:
arguments = self._add_fingerprint_metadata({})
result = tool.invoke(input=arguments)
if self.tools_handler:
should_cache = True
if (
hasattr(available_tool, "cache_function")
and available_tool.cache_function
):
should_cache = available_tool.cache_function(
calling.arguments, result
if self.tools_handler:
should_cache = True
if (
hasattr(available_tool, "cache_function")
and available_tool.cache_function
):
should_cache = available_tool.cache_function(
calling.arguments, result
)
self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache
)
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
attempts=self._run_attempts,
)
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
}
self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache
if (
hasattr(available_tool, "result_as_answer")
and available_tool.result_as_answer
):
result_as_answer = available_tool.result_as_answer
data["result_as_answer"] = result_as_answer
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
if available_tool and hasattr(
available_tool, "current_usage_count"
):
available_tool.current_usage_count += 1
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors(
"tool_usage_exception"
).format(error=e, tool=tool.name, tool_inputs=tool.description)
result = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
)
else:
if self.task:
self.task.increment_tools_errors()
should_retry = True
else:
result = self._format_result(result=result)
finally:
if started_event_emitted:
self.on_tool_use_finished(
tool=tool,
tool_calling=calling,
from_cache=from_cache,
started_at=started_at,
result=result,
)
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
attempts=self._run_attempts,
)
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
}
self.on_tool_use_finished(
tool=tool,
tool_calling=calling,
from_cache=from_cache,
started_at=started_at,
result=result,
)
if (
hasattr(available_tool, "result_as_answer")
and available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
):
result_as_answer = available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "result_as_answer"
data["result_as_answer"] = result_as_answer # type: ignore
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
if available_tool and hasattr(available_tool, "current_usage_count"):
available_tool.current_usage_count += 1
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
# Handle retry after finally block ensures finished event was emitted
if should_retry:
return self.use(calling=calling, tool_string=tool_string)
return result

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Helper. You help.\nYour
personal goal is: Help with tasks\nTo give my best complete final answer to
the task respond using the exact following format:\n\nThought: I now can give
a great answer\nFinal Answer: Your final answer must be the great and the most
complete as possible, it must be outcome described.\n\nI MUST use these formats,
my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say ''done''
and nothing else.\n\nThis is the expected criteria for your final answer: The
word done.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '794'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFLBbtswDL37Kwid4yJOnTr1regwrMN2C7DDVhisTNtaZUmQ6GZFkX8f
ZKexu3XALgbMx/f0HsmXBECoWpQgZIcse6fTD+t12B1QfpN+n/Ht/stjc2icvvsaPj1/FqvIsA8/
SfIr60La3mliZc0ES0/IFFWz4mq3u94Wm3wEeluTjrTWcZpfZGmvjEo36802Xedplp/onVWSgijh
ewIA8DJ+o1FT0y9Rwnr1WukpBGxJlOcmAOGtjhWBIajAaFisZlBaw2RG7/vODm3HJdyBsQeQaKBV
TwQIbQwAaMKB/A/zURnUcDP+lVBbQ0tBT80QMKYyg9YLAI2xjHEqY5T7E3I8m9e2dd4+hD+oolFG
ha7yhMGaaDSwdWJEjwnA/Tik4U1u4bztHVdsH2l8LtteT3piXs4CzU8gW0a9qBeXq3f0qpoYlQ6L
MQuJsqN6ps47waFWdgEki9R/u3lPe0quTPs/8jMgJTmmunKeaiXfJp7bPMXb/VfbecqjYRHIPylJ
FSvycRM1NTjo6aBEeA5MfdUo05J3Xk1X1bjqsrjCQkpqMpEck98AAAD//wMAnStaOGQDAAA=
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:55:25 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '601'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '628'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Helper. You help.\nYour
personal goal is: Help with tasks\nTo give my best complete final answer to
the task respond using the exact following format:\n\nThought: I now can give
a great answer\nFinal Answer: Your final answer must be the great and the most
complete as possible, it must be outcome described.\n\nI MUST use these formats,
my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say ''hi''
and nothing else.\n\nThis is the expected criteria for your final answer: The
word hi.\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '790'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFLBbtswDL37Kwid48JOk7rxLUAxdIdil24osBUGI9M2N1kSJLnpUOTf
Bzlp7HYdsIsB8/E9vUfyJQEQXIsShOwwyN6q9CbL/Gb7sH/YfZUFP+Pdt1u/VTdfbu8afS8WkWF2
P0mGV9aFNL1VFNjoIywdYaComhdX19ebdbFcj0BvalKR1tqQri7ytGfN6TJbrtNslearE70zLMmL
Er4nAAAv4zca1TU9ixKyxWulJ++xJVGemwCEMypWBHrPPqAOYjGB0uhAevR+35mh7UIJn0GbPUjU
0PITAUIbAwBqvyf3Q39ijQq2418JHc/lHDWDx5hJD0rNANTaBIwzGYM8npDD2boyrXVm599RRcOa
fVc5Qm90tOmDsWJEDwnA4zii4U1qYZ3pbaiC+UXjc/l6c9QT02pm6OoEBhNQzerF5eIDvaqmgKz8
bMhCouyonqjTRnCo2cyAZJb6bzcfaR+Ts27/R34CpCQbqK6so5rl28RTm6N4uf9qO095NCw8uSeW
VAUmFzdRU4ODOp6T8L99oL5qWLfkrOPjTTW2uiyusJCSmlwkh+QPAAAA//8DAASWsy5iAwAA
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:55:26 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '369'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '391'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Responder. You give short
answers.\nYour personal goal is: Respond briefly\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say
''yes'' and nothing else.\n\nThis is the expected criteria for your final answer:
The word yes.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '809'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFJda9wwEHz3r1j0fA72nT9SvxVC0mvfSimUNpiNvLbVypKQ5CRHuP9e
ZF/OTpNAXwze2RnN7O5TBMBEwypgvEfPByPjqyRxN1efH2+w+DGkn/bF9a7Nv/f7/PA1+8I2gaHv
fhP3z6wLrgcjyQutZphbQk9BNS2Ly8sPebnbTsCgG5KB1hkfZxdpPAgl4m2yzeMki9PsRO+14ORY
BT8jAICn6RuMqoYeWQXJ5rkykHPYEavOTQDMahkqDJ0TzqPybLOAXCtPavL+rddj1/sK9qD0A3BU
0Il7AoQuBABU7oHsL3UtFEr4OP1VcCC31rPUjg5DKDVKuQJQKe0xDGVKcntCjmfvUnfG6jv3D5W1
QgnX15bQaRV8Oq8Nm9BjBHA7zWh8EZsZqwfja6//0PRcWqSzHlt2s0KzE+i1R7mql/nmDb26IY9C
utWUGUfeU7NQl5Xg2Ai9AqJV6tdu3tKekwvV/Y/8AnBOxlNTG0uN4C8TL22Wwum+13ae8mSYObL3
glPtBdmwiYZaHOV8T8wdnKehboXqyBor5qNqTb0rCyw5pzZl0TH6CwAA//8DAPUTEd9jAwAA
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:55:33 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '418'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '434'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Responder. You give short
answers.\nYour personal goal is: Respond briefly\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say
''hello'' and nothing else.\n\nThis is the expected criteria for your final
answer: The word hello.\nyou MUST return the actual complete content as the
final answer, not a summary.\n\nBegin! This is VERY important to you, use the
tools available and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '813'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAA4xSTW/bMAy9+1cQOsdFnOarvm0YtvWyU4EdtsJgZdrWJouCRKcrivz3QU4au1sH
7GLAfHxP75F8zgCUqVUJSncouvc2/7Bcxk9PX776fs+fe2zu+PYgZLvu/VZu1CIx+OEHaXlhXWnu
vSUx7E6wDoRCSbXYbff7m83uejUCPddkE631kq+virw3zuSr5WqTL9d5sT7TOzaaoirhWwYA8Dx+
k1FX0y9VwnLxUukpRmxJlZcmABXYporCGE0UdKIWE6jZCbnR+13HQ9tJCbfg+BE0OmjNgQChTQEA
XXyk8N19NA4tvBv/SujIWp4rBmqGiCmWG6ydAegcC6axjFnuz8jx4t5y6wM/xD+oqjHOxK4KhJFd
chqFvRrRYwZwP05peBVc+cC9l0r4J43PFdvipKem7czQ9RkUFrSz+m6zeEOvqknQ2Dibs9KoO6on
6rQUHGrDMyCbpf7bzVvap+TGtf8jPwFakxeqKx+oNvp14qktUDref7VdpjwaVpHCwWiqxFBIm6ip
wcGeLkrFpyjUV41xLQUfzOmsGl9d77a405qaQmXH7DcAAAD//wMAQklYDmUDAAA=
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:55:32 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '581'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '619'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Responder. You give short
answers.\nYour personal goal is: Respond briefly\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say
''ok'' and nothing else.\n\nThis is the expected criteria for your final answer:
The word ok.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '807'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFJda9wwEHz3r1j0fA72fTnxW6GU9iglD4FC22A28tpWI0tCWicN4f57
kX05O20KfTF4Z2c0s7vPCYBQtShByA5Z9k6n77MsfCRyX9ZDsTnsrp/Wh5v++uu37urzoRWryLB3
P0nyC+tC2t5pYmXNBEtPyBRV82J/eXm1KzabEehtTTrSWsfp9iJPe2VUus7WuzTbpvn2RO+skhRE
Cd8TAIDn8RuNmpp+iRKy1UulpxCwJVGemwCEtzpWBIagAqNhsZpBaQ2TGb3fdHZoOy7hExj7CBIN
tOqBAKGNAQBNeCT/w3xQBjW8G/9KsPdLOU/NEDBmMoPWCwCNsYxxJmOQ2xNyPFvXtnXe3oU/qKJR
RoWu8oTBmmgzsHViRI8JwO04ouFVauG87R1XbO9pfC7f55OemFezQLcnkC2jXtSL3eoNvaomRqXD
YshCouyonqnzRnColV0AySL1327e0p6SK9P+j/wMSEmOqa6cp1rJ14nnNk/xcv/Vdp7yaFgE8g9K
UsWKfNxETQ0OejonEZ4CU181yrTknVfTTTWu2hR7LKSkJhfJMfkNAAD//wMAw/X5HWIDAAA=
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:55:34 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '499'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '517'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,99 @@
"""Tests for event context management."""
import pytest
from crewai.events.event_context import (
SCOPE_ENDING_EVENTS,
SCOPE_STARTING_EVENTS,
VALID_EVENT_PAIRS,
EmptyStackError,
EventPairingError,
MismatchBehavior,
StackDepthExceededError,
_event_context_config,
EventContextConfig,
get_current_parent_id,
get_enclosing_parent_id,
handle_empty_pop,
handle_mismatch,
pop_event_scope,
push_event_scope,
)
class TestStackOperations:
"""Tests for stack push/pop operations."""
def test_empty_stack_returns_none(self) -> None:
assert get_current_parent_id() is None
assert get_enclosing_parent_id() is None
def test_push_and_get_parent(self) -> None:
push_event_scope("event-1", "task_started")
assert get_current_parent_id() == "event-1"
def test_nested_push(self) -> None:
push_event_scope("event-1", "crew_kickoff_started")
push_event_scope("event-2", "task_started")
assert get_current_parent_id() == "event-2"
assert get_enclosing_parent_id() == "event-1"
def test_pop_restores_parent(self) -> None:
push_event_scope("event-1", "crew_kickoff_started")
push_event_scope("event-2", "task_started")
popped = pop_event_scope()
assert popped == ("event-2", "task_started")
assert get_current_parent_id() == "event-1"
def test_pop_empty_stack_returns_none(self) -> None:
assert pop_event_scope() is None
class TestStackDepthLimit:
"""Tests for stack depth limit."""
def test_depth_limit_exceeded_raises(self) -> None:
_event_context_config.set(EventContextConfig(max_stack_depth=3))
push_event_scope("event-1", "type-1")
push_event_scope("event-2", "type-2")
push_event_scope("event-3", "type-3")
with pytest.raises(StackDepthExceededError):
push_event_scope("event-4", "type-4")
class TestMismatchHandling:
"""Tests for mismatch behavior."""
def test_handle_mismatch_raises_when_configured(self) -> None:
_event_context_config.set(
EventContextConfig(mismatch_behavior=MismatchBehavior.RAISE)
)
with pytest.raises(EventPairingError):
handle_mismatch("task_completed", "llm_call_started", "task_started")
def test_handle_empty_pop_raises_when_configured(self) -> None:
_event_context_config.set(
EventContextConfig(empty_pop_behavior=MismatchBehavior.RAISE)
)
with pytest.raises(EmptyStackError):
handle_empty_pop("task_completed")
class TestEventTypeSets:
"""Tests for event type set completeness."""
def test_all_ending_events_have_pairs(self) -> None:
for ending_event in SCOPE_ENDING_EVENTS:
assert ending_event in VALID_EVENT_PAIRS
def test_all_pairs_reference_starting_events(self) -> None:
for ending_event, starting_event in VALID_EVENT_PAIRS.items():
assert starting_event in SCOPE_STARTING_EVENTS
def test_starting_and_ending_are_disjoint(self) -> None:
overlap = SCOPE_STARTING_EVENTS & SCOPE_ENDING_EVENTS
assert not overlap

View File

@@ -0,0 +1,508 @@
"""Tests for event ordering and parent-child relationships."""
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffStartedEvent,
)
from crewai.events.types.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskStartedEvent,
)
from crewai.flow.flow import Flow, listen, start
from crewai.task import Task
class EventCollector:
"""Collects events and provides helpers to find related events."""
def __init__(self) -> None:
self.events: list[BaseEvent] = []
def add(self, event: BaseEvent) -> None:
self.events.append(event)
def first(self, event_type: type[BaseEvent]) -> BaseEvent | None:
for e in self.events:
if isinstance(e, event_type):
return e
return None
def all_of(self, event_type: type[BaseEvent]) -> list[BaseEvent]:
return [e for e in self.events if isinstance(e, event_type)]
def with_parent(self, parent_id: str) -> list[BaseEvent]:
return [e for e in self.events if e.parent_event_id == parent_id]
@pytest.fixture
def collector() -> EventCollector:
"""Fixture that collects events during test execution."""
c = EventCollector()
@crewai_event_bus.on(CrewKickoffStartedEvent)
def h1(source, event):
c.add(event)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def h2(source, event):
c.add(event)
@crewai_event_bus.on(TaskStartedEvent)
def h3(source, event):
c.add(event)
@crewai_event_bus.on(TaskCompletedEvent)
def h4(source, event):
c.add(event)
@crewai_event_bus.on(AgentExecutionStartedEvent)
def h5(source, event):
c.add(event)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def h6(source, event):
c.add(event)
@crewai_event_bus.on(LLMCallStartedEvent)
def h7(source, event):
c.add(event)
@crewai_event_bus.on(LLMCallCompletedEvent)
def h8(source, event):
c.add(event)
@crewai_event_bus.on(FlowStartedEvent)
def h9(source, event):
c.add(event)
@crewai_event_bus.on(FlowFinishedEvent)
def h10(source, event):
c.add(event)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def h11(source, event):
c.add(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def h12(source, event):
c.add(event)
return c
class TestCrewEventOrdering:
"""Tests for event ordering in crew execution."""
@pytest.mark.vcr()
def test_crew_events_have_event_ids(self, collector: EventCollector) -> None:
"""Every crew event should have a unique event_id."""
agent = Agent(
role="Responder",
goal="Respond briefly",
backstory="You give short answers.",
verbose=False,
)
task = Task(
description="Say 'hello' and nothing else.",
expected_output="The word hello.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
crew.kickoff()
crewai_event_bus.flush()
started = collector.first(CrewKickoffStartedEvent)
completed = collector.first(CrewKickoffCompletedEvent)
assert started is not None
assert started.event_id is not None
assert len(started.event_id) > 0
assert completed is not None
assert completed.event_id is not None
assert completed.event_id != started.event_id
@pytest.mark.vcr()
def test_crew_completed_after_started(self, collector: EventCollector) -> None:
"""Crew completed event should have higher sequence than started."""
agent = Agent(
role="Responder",
goal="Respond briefly",
backstory="You give short answers.",
verbose=False,
)
task = Task(
description="Say 'yes' and nothing else.",
expected_output="The word yes.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
crew.kickoff()
crewai_event_bus.flush()
started = collector.first(CrewKickoffStartedEvent)
completed = collector.first(CrewKickoffCompletedEvent)
assert started is not None
assert completed is not None
assert started.emission_sequence is not None
assert completed.emission_sequence is not None
assert completed.emission_sequence > started.emission_sequence
@pytest.mark.vcr()
def test_task_parent_is_crew(self, collector: EventCollector) -> None:
"""Task events should have crew event as parent."""
agent = Agent(
role="Responder",
goal="Respond briefly",
backstory="You give short answers.",
verbose=False,
)
task = Task(
description="Say 'ok' and nothing else.",
expected_output="The word ok.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
crew.kickoff()
crewai_event_bus.flush()
crew_started = collector.first(CrewKickoffStartedEvent)
task_started = collector.first(TaskStartedEvent)
assert crew_started is not None
assert task_started is not None
assert task_started.parent_event_id == crew_started.event_id
class TestAgentEventOrdering:
"""Tests for event ordering in agent execution."""
@pytest.mark.vcr()
def test_agent_events_have_event_ids(self, collector: EventCollector) -> None:
"""Agent execution events should have event_ids."""
agent = Agent(
role="Helper",
goal="Help with tasks",
backstory="You help.",
verbose=False,
)
task = Task(
description="Say 'done' and nothing else.",
expected_output="The word done.",
agent=agent,
)
agent.execute_task(task)
crewai_event_bus.flush()
started = collector.first(AgentExecutionStartedEvent)
completed = collector.first(AgentExecutionCompletedEvent)
if started:
assert started.event_id is not None
if completed:
assert completed.event_id is not None
@pytest.mark.vcr()
def test_llm_events_have_parent(self, collector: EventCollector) -> None:
"""LLM call events should have a parent event."""
agent = Agent(
role="Helper",
goal="Help with tasks",
backstory="You help.",
verbose=False,
)
task = Task(
description="Say 'hi' and nothing else.",
expected_output="The word hi.",
agent=agent,
)
agent.execute_task(task)
crewai_event_bus.flush()
llm_started = collector.first(LLMCallStartedEvent)
if llm_started:
assert llm_started.event_id is not None
# LLM events should have some parent in the hierarchy
assert llm_started.parent_event_id is not None
class TestFlowWithCrewEventOrdering:
"""Tests for event ordering in flows containing crews."""
@pytest.mark.vcr()
def test_flow_events_have_ids(self, collector: EventCollector) -> None:
"""Flow events should have event_ids."""
agent = Agent(
role="Worker",
goal="Do work",
backstory="You work.",
verbose=False,
)
task = Task(
description="Say 'complete' and nothing else.",
expected_output="The word complete.",
agent=agent,
)
class SimpleFlow(Flow):
@start()
def run_crew(self):
c = Crew(agents=[agent], tasks=[task], verbose=False)
return c.kickoff()
flow = SimpleFlow()
flow.kickoff()
crewai_event_bus.flush()
flow_started = collector.first(FlowStartedEvent)
flow_finished = collector.first(FlowFinishedEvent)
assert flow_started is not None
assert flow_started.event_id is not None
assert flow_finished is not None
assert flow_finished.event_id is not None
@pytest.mark.vcr()
def test_method_parent_is_flow(self, collector: EventCollector) -> None:
"""Method execution events should have flow as parent."""
agent = Agent(
role="Worker",
goal="Do work",
backstory="You work.",
verbose=False,
)
task = Task(
description="Say 'ready' and nothing else.",
expected_output="The word ready.",
agent=agent,
)
class FlowWithMethod(Flow):
@start()
def my_method(self):
c = Crew(agents=[agent], tasks=[task], verbose=False)
return c.kickoff()
flow = FlowWithMethod()
flow.kickoff()
crewai_event_bus.flush()
flow_started = collector.first(FlowStartedEvent)
method_started = collector.first(MethodExecutionStartedEvent)
assert flow_started is not None
assert method_started is not None
assert method_started.parent_event_id == flow_started.event_id
@pytest.mark.vcr()
def test_crew_parent_is_method(self, collector: EventCollector) -> None:
"""Crew inside flow method should have method as parent."""
agent = Agent(
role="Worker",
goal="Do work",
backstory="You work.",
verbose=False,
)
task = Task(
description="Say 'go' and nothing else.",
expected_output="The word go.",
agent=agent,
)
class FlowWithCrew(Flow):
@start()
def run_it(self):
c = Crew(agents=[agent], tasks=[task], verbose=False)
return c.kickoff()
flow = FlowWithCrew()
flow.kickoff()
crewai_event_bus.flush()
method_started = collector.first(MethodExecutionStartedEvent)
crew_started = collector.first(CrewKickoffStartedEvent)
assert method_started is not None
assert crew_started is not None
assert crew_started.parent_event_id == method_started.event_id
class TestFlowWithMultipleCrewsEventOrdering:
"""Tests for event ordering in flows with multiple crews."""
@pytest.mark.vcr()
def test_two_crews_have_different_ids(self, collector: EventCollector) -> None:
"""Two crews in a flow should have different event_ids."""
agent1 = Agent(
role="First",
goal="Be first",
backstory="You go first.",
verbose=False,
)
agent2 = Agent(
role="Second",
goal="Be second",
backstory="You go second.",
verbose=False,
)
task1 = Task(
description="Say '1' and nothing else.",
expected_output="The number 1.",
agent=agent1,
)
task2 = Task(
description="Say '2' and nothing else.",
expected_output="The number 2.",
agent=agent2,
)
class TwoCrewFlow(Flow):
@start()
def first(self):
c = Crew(agents=[agent1], tasks=[task1], verbose=False)
return c.kickoff()
@listen(first)
def second(self, _):
c = Crew(agents=[agent2], tasks=[task2], verbose=False)
return c.kickoff()
flow = TwoCrewFlow()
flow.kickoff()
crewai_event_bus.flush()
crew_started_events = collector.all_of(CrewKickoffStartedEvent)
assert len(crew_started_events) >= 2
assert crew_started_events[0].event_id != crew_started_events[1].event_id
@pytest.mark.vcr()
def test_second_crew_after_first(self, collector: EventCollector) -> None:
"""Second crew should have higher sequence than first."""
agent1 = Agent(
role="First",
goal="Be first",
backstory="You go first.",
verbose=False,
)
agent2 = Agent(
role="Second",
goal="Be second",
backstory="You go second.",
verbose=False,
)
task1 = Task(
description="Say 'a' and nothing else.",
expected_output="The letter a.",
agent=agent1,
)
task2 = Task(
description="Say 'b' and nothing else.",
expected_output="The letter b.",
agent=agent2,
)
class SequentialCrewFlow(Flow):
@start()
def crew_a(self):
c = Crew(agents=[agent1], tasks=[task1], verbose=False)
return c.kickoff()
@listen(crew_a)
def crew_b(self, _):
c = Crew(agents=[agent2], tasks=[task2], verbose=False)
return c.kickoff()
flow = SequentialCrewFlow()
flow.kickoff()
crewai_event_bus.flush()
crew_started_events = collector.all_of(CrewKickoffStartedEvent)
assert len(crew_started_events) >= 2
first = crew_started_events[0]
second = crew_started_events[1]
assert first.emission_sequence is not None
assert second.emission_sequence is not None
assert second.emission_sequence > first.emission_sequence
@pytest.mark.vcr()
def test_tasks_have_correct_crew_parents(self, collector: EventCollector) -> None:
"""Tasks in different crews should have their own crew as parent."""
agent1 = Agent(
role="Alpha",
goal="Do alpha work",
backstory="You are alpha.",
verbose=False,
)
agent2 = Agent(
role="Beta",
goal="Do beta work",
backstory="You are beta.",
verbose=False,
)
task1 = Task(
description="Say 'alpha' and nothing else.",
expected_output="The word alpha.",
agent=agent1,
)
task2 = Task(
description="Say 'beta' and nothing else.",
expected_output="The word beta.",
agent=agent2,
)
class ParentTestFlow(Flow):
@start()
def alpha_crew(self):
c = Crew(agents=[agent1], tasks=[task1], verbose=False)
return c.kickoff()
@listen(alpha_crew)
def beta_crew(self, _):
c = Crew(agents=[agent2], tasks=[task2], verbose=False)
return c.kickoff()
flow = ParentTestFlow()
flow.kickoff()
crewai_event_bus.flush()
crew_started_events = collector.all_of(CrewKickoffStartedEvent)
task_started_events = collector.all_of(TaskStartedEvent)
assert len(crew_started_events) >= 2
assert len(task_started_events) >= 2
crew1_id = crew_started_events[0].event_id
crew2_id = crew_started_events[1].event_id
task1_parent = task_started_events[0].parent_event_id
task2_parent = task_started_events[1].parent_event_id
assert task1_parent == crew1_id
assert task2_parent == crew2_id

View File

@@ -70,6 +70,9 @@ def test_long_term_memory_save_events(long_term_memory):
"from_agent": None,
"agent_role": "test_agent",
"agent_id": None,
"event_id": ANY,
"parent_event_id": None,
"emission_sequence": ANY,
"value": "test_task",
"metadata": {"task": "test_task", "quality": 0.5},
}
@@ -85,6 +88,9 @@ def test_long_term_memory_save_events(long_term_memory):
"from_agent": None,
"agent_role": "test_agent",
"agent_id": None,
"event_id": ANY,
"parent_event_id": None,
"emission_sequence": ANY,
"value": "test_task",
"metadata": {
"task": "test_task",
@@ -139,6 +145,9 @@ def test_long_term_memory_search_events(long_term_memory):
"from_agent": None,
"agent_role": None,
"agent_id": None,
"event_id": ANY,
"parent_event_id": None,
"emission_sequence": ANY,
"query": "test query",
"limit": 5,
"score_threshold": None,
@@ -156,6 +165,9 @@ def test_long_term_memory_search_events(long_term_memory):
"from_agent": None,
"agent_role": None,
"agent_id": None,
"event_id": ANY,
"parent_event_id": ANY,
"emission_sequence": ANY,
"query": "test query",
"results": None,
"limit": 5,

View File

@@ -81,6 +81,9 @@ def test_short_term_memory_search_events(short_term_memory):
"from_agent": None,
"agent_role": None,
"agent_id": None,
"event_id": ANY,
"parent_event_id": None,
"emission_sequence": ANY,
"query": "test value",
"limit": 3,
"score_threshold": 0.35,
@@ -98,6 +101,9 @@ def test_short_term_memory_search_events(short_term_memory):
"from_agent": None,
"agent_role": None,
"agent_id": None,
"event_id": ANY,
"parent_event_id": None,
"emission_sequence": ANY,
"query": "test value",
"results": [],
"limit": 3,
@@ -150,6 +156,9 @@ def test_short_term_memory_save_events(short_term_memory):
"from_agent": None,
"agent_role": None,
"agent_id": None,
"event_id": ANY,
"parent_event_id": None,
"emission_sequence": ANY,
"value": "test value",
"metadata": {"task": "test_task"},
}
@@ -166,6 +175,9 @@ def test_short_term_memory_save_events(short_term_memory):
"from_agent": None,
"agent_role": None,
"agent_id": None,
"event_id": ANY,
"parent_event_id": None,
"emission_sequence": ANY,
"value": "test value",
"metadata": {"task": "test_task"},
"save_time_ms": ANY,

8509
uv.lock generated

File diff suppressed because it is too large Load Diff