mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 04:18:35 +00:00
Compare commits
3 Commits
1.2.1
...
devin/1754
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c1bcbe072 | ||
|
|
e4ba3f4c4c | ||
|
|
58fb717ab2 |
49
examples/execution_trace_example.py
Normal file
49
examples/execution_trace_example.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from crewai import Agent, Crew, Task, Process, LLM
|
||||
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Research and analyze information",
|
||||
backstory="You are an expert researcher with years of experience.",
|
||||
llm=LLM(model="gpt-4o-mini")
|
||||
)
|
||||
|
||||
writer = Agent(
|
||||
role="Writer",
|
||||
goal="Write compelling content",
|
||||
backstory="You are a skilled writer who creates engaging content.",
|
||||
llm=LLM(model="gpt-4o-mini")
|
||||
)
|
||||
|
||||
research_task = Task(
|
||||
description="Research the latest trends in AI",
|
||||
expected_output="A comprehensive report on AI trends",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
writing_task = Task(
|
||||
description="Write an article based on the research",
|
||||
expected_output="A well-written article about AI trends",
|
||||
agent=writer
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
tasks=[research_task, writing_task],
|
||||
process=Process.sequential,
|
||||
trace_execution=True
|
||||
)
|
||||
|
||||
result = crew.kickoff(inputs={"topic": "artificial intelligence"})
|
||||
|
||||
if result.execution_trace:
|
||||
print(f"Total execution steps: {result.execution_trace.total_steps}")
|
||||
print(f"Execution duration: {result.execution_trace.end_time - result.execution_trace.start_time}")
|
||||
|
||||
thoughts = result.execution_trace.get_steps_by_type("agent_thought")
|
||||
print(f"Agent thoughts captured: {len(thoughts)}")
|
||||
|
||||
tool_calls = result.execution_trace.get_steps_by_type("tool_call_started")
|
||||
print(f"Tool calls made: {len(tool_calls)}")
|
||||
|
||||
for step in result.execution_trace.steps:
|
||||
print(f"{step.timestamp}: {step.step_type} - {step.agent_role or 'System'}")
|
||||
@@ -5,6 +5,7 @@ import urllib.request
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.crews.crew_output import CrewOutput
|
||||
from crewai.crews.execution_trace import ExecutionTrace, ExecutionStep
|
||||
from crewai.flow.flow import Flow
|
||||
from crewai.knowledge.knowledge import Knowledge
|
||||
from crewai.llm import LLM
|
||||
@@ -59,6 +60,8 @@ __all__ = [
|
||||
"Agent",
|
||||
"Crew",
|
||||
"CrewOutput",
|
||||
"ExecutionTrace",
|
||||
"ExecutionStep",
|
||||
"Process",
|
||||
"Task",
|
||||
"LLM",
|
||||
|
||||
@@ -81,6 +81,7 @@ from crewai.utilities.llm_utils import create_llm
|
||||
from crewai.utilities.planning_handler import CrewPlanner
|
||||
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
|
||||
from crewai.utilities.training_handler import CrewTrainingHandler
|
||||
from crewai.utilities.execution_trace_collector import ExecutionTraceCollector
|
||||
|
||||
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
|
||||
|
||||
@@ -205,6 +206,9 @@ class Crew(FlowTrackable, BaseModel):
|
||||
default_factory=list,
|
||||
description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.",
|
||||
)
|
||||
trace_execution: bool = Field(
|
||||
default=False, description="Whether to trace the execution steps of the crew"
|
||||
)
|
||||
max_rpm: Optional[int] = Field(
|
||||
default=None,
|
||||
description="Maximum number of requests per minute for the crew execution to be respected.",
|
||||
@@ -621,6 +625,11 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self,
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
) -> CrewOutput:
|
||||
trace_collector = None
|
||||
if self.trace_execution:
|
||||
trace_collector = ExecutionTraceCollector()
|
||||
trace_collector.start_collecting()
|
||||
|
||||
ctx = baggage.set_baggage(
|
||||
"crew_context", CrewContext(id=str(self.id), key=self.key)
|
||||
)
|
||||
@@ -678,6 +687,10 @@ class Crew(FlowTrackable, BaseModel):
|
||||
result = after_callback(result)
|
||||
|
||||
self.usage_metrics = self.calculate_usage_metrics()
|
||||
|
||||
if trace_collector:
|
||||
execution_trace = trace_collector.stop_collecting()
|
||||
result.execution_trace = execution_trace
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
@@ -1086,6 +1099,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
json_dict=final_task_output.json_dict,
|
||||
tasks_output=task_outputs,
|
||||
token_usage=token_usage,
|
||||
execution_trace=None,
|
||||
)
|
||||
|
||||
def _process_async_tasks(
|
||||
|
||||
@@ -6,6 +6,7 @@ from pydantic import BaseModel, Field
|
||||
from crewai.tasks.output_format import OutputFormat
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
from crewai.crews.execution_trace import ExecutionTrace
|
||||
|
||||
|
||||
class CrewOutput(BaseModel):
|
||||
@@ -22,6 +23,9 @@ class CrewOutput(BaseModel):
|
||||
description="Output of each task", default=[]
|
||||
)
|
||||
token_usage: UsageMetrics = Field(description="Processed token summary", default={})
|
||||
execution_trace: Optional[ExecutionTrace] = Field(
|
||||
description="Detailed execution trace of crew steps", default=None
|
||||
)
|
||||
|
||||
@property
|
||||
def json(self) -> Optional[str]:
|
||||
|
||||
34
src/crewai/crews/execution_trace.py
Normal file
34
src/crewai/crews/execution_trace.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
class ExecutionStep(BaseModel):
|
||||
"""Represents a single step in the crew execution trace."""
|
||||
|
||||
timestamp: datetime = Field(description="When this step occurred")
|
||||
step_type: str = Field(description="Type of step: agent_thought, tool_call, tool_result, task_start, task_complete, etc.")
|
||||
agent_role: Optional[str] = Field(description="Role of the agent performing this step", default=None)
|
||||
task_description: Optional[str] = Field(description="Description of the task being executed", default=None)
|
||||
content: Dict[str, Any] = Field(description="Step-specific content (thought, tool args, result, etc.)", default_factory=dict)
|
||||
metadata: Dict[str, Any] = Field(description="Additional metadata for this step", default_factory=dict)
|
||||
|
||||
class ExecutionTrace(BaseModel):
|
||||
"""Complete execution trace for a crew run."""
|
||||
|
||||
steps: List[ExecutionStep] = Field(description="Ordered list of execution steps", default_factory=list)
|
||||
total_steps: int = Field(description="Total number of steps in the trace", default=0)
|
||||
start_time: Optional[datetime] = Field(description="When execution started", default=None)
|
||||
end_time: Optional[datetime] = Field(description="When execution completed", default=None)
|
||||
|
||||
def add_step(self, step: ExecutionStep) -> None:
|
||||
"""Add a step to the trace."""
|
||||
self.steps.append(step)
|
||||
self.total_steps = len(self.steps)
|
||||
|
||||
def get_steps_by_type(self, step_type: str) -> List[ExecutionStep]:
|
||||
"""Get all steps of a specific type."""
|
||||
return [step for step in self.steps if step.step_type == step_type]
|
||||
|
||||
def get_steps_by_agent(self, agent_role: str) -> List[ExecutionStep]:
|
||||
"""Get all steps performed by a specific agent."""
|
||||
return [step for step in self.steps if step.agent_role == agent_role]
|
||||
152
src/crewai/utilities/execution_trace_collector.py
Normal file
152
src/crewai/utilities/execution_trace_collector.py
Normal file
@@ -0,0 +1,152 @@
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from crewai.crews.execution_trace import ExecutionStep, ExecutionTrace
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.agent_events import (
|
||||
AgentExecutionStartedEvent,
|
||||
AgentExecutionCompletedEvent,
|
||||
AgentLogsExecutionEvent,
|
||||
)
|
||||
from crewai.utilities.events.tool_usage_events import (
|
||||
ToolUsageStartedEvent,
|
||||
ToolUsageFinishedEvent,
|
||||
)
|
||||
from crewai.utilities.events.task_events import (
|
||||
TaskStartedEvent,
|
||||
TaskCompletedEvent,
|
||||
)
|
||||
|
||||
class ExecutionTraceCollector:
|
||||
"""Collects execution events and builds an execution trace."""
|
||||
|
||||
def __init__(self):
|
||||
self.trace = ExecutionTrace()
|
||||
self.is_collecting = False
|
||||
|
||||
def start_collecting(self) -> None:
|
||||
"""Start collecting execution events."""
|
||||
self.is_collecting = True
|
||||
self.trace = ExecutionTrace(start_time=datetime.now())
|
||||
|
||||
crewai_event_bus.register_handler(TaskStartedEvent, self._handle_task_started)
|
||||
crewai_event_bus.register_handler(TaskCompletedEvent, self._handle_task_completed)
|
||||
crewai_event_bus.register_handler(AgentExecutionStartedEvent, self._handle_agent_started)
|
||||
crewai_event_bus.register_handler(AgentExecutionCompletedEvent, self._handle_agent_completed)
|
||||
crewai_event_bus.register_handler(AgentLogsExecutionEvent, self._handle_agent_logs)
|
||||
crewai_event_bus.register_handler(ToolUsageStartedEvent, self._handle_tool_started)
|
||||
crewai_event_bus.register_handler(ToolUsageFinishedEvent, self._handle_tool_finished)
|
||||
|
||||
def stop_collecting(self) -> ExecutionTrace:
|
||||
"""Stop collecting and return the execution trace."""
|
||||
self.is_collecting = False
|
||||
self.trace.end_time = datetime.now()
|
||||
|
||||
return self.trace
|
||||
|
||||
|
||||
def _handle_agent_started(self, source: Any, event: AgentExecutionStartedEvent) -> None:
|
||||
if not self.is_collecting:
|
||||
return
|
||||
|
||||
step = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="agent_execution_started",
|
||||
agent_role=event.agent.role if hasattr(event.agent, 'role') else None,
|
||||
task_description=getattr(event.task, 'description', None) if event.task else None,
|
||||
content={
|
||||
"task_prompt": event.task_prompt,
|
||||
"tools": [tool.name for tool in event.tools] if event.tools else [],
|
||||
}
|
||||
)
|
||||
self.trace.add_step(step)
|
||||
|
||||
def _handle_agent_completed(self, source: Any, event: AgentExecutionCompletedEvent) -> None:
|
||||
if not self.is_collecting:
|
||||
return
|
||||
|
||||
step = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="agent_execution_completed",
|
||||
agent_role=event.agent.role if hasattr(event.agent, 'role') else None,
|
||||
content={
|
||||
"output": event.output,
|
||||
}
|
||||
)
|
||||
self.trace.add_step(step)
|
||||
|
||||
def _handle_agent_logs(self, source: Any, event: AgentLogsExecutionEvent) -> None:
|
||||
if not self.is_collecting:
|
||||
return
|
||||
|
||||
step = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="agent_thought",
|
||||
agent_role=event.agent_role,
|
||||
content={
|
||||
"formatted_answer": str(event.formatted_answer),
|
||||
}
|
||||
)
|
||||
self.trace.add_step(step)
|
||||
|
||||
def _handle_tool_started(self, source: Any, event: ToolUsageStartedEvent) -> None:
|
||||
if not self.is_collecting:
|
||||
return
|
||||
|
||||
step = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="tool_call_started",
|
||||
agent_role=event.agent_role,
|
||||
content={
|
||||
"tool_name": event.tool_name,
|
||||
"tool_args": event.tool_args,
|
||||
"tool_class": event.tool_class,
|
||||
}
|
||||
)
|
||||
self.trace.add_step(step)
|
||||
|
||||
def _handle_tool_finished(self, source: Any, event: ToolUsageFinishedEvent) -> None:
|
||||
if not self.is_collecting:
|
||||
return
|
||||
|
||||
step = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="tool_call_completed",
|
||||
agent_role=event.agent_role,
|
||||
content={
|
||||
"tool_name": event.tool_name,
|
||||
"output": event.output,
|
||||
"from_cache": event.from_cache,
|
||||
"duration": (event.finished_at - event.started_at).total_seconds() if hasattr(event, 'started_at') and hasattr(event, 'finished_at') else None,
|
||||
}
|
||||
)
|
||||
self.trace.add_step(step)
|
||||
|
||||
def _handle_task_started(self, source: Any, event: TaskStartedEvent) -> None:
|
||||
if not self.is_collecting:
|
||||
return
|
||||
|
||||
step = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="task_started",
|
||||
task_description=getattr(event.task, 'description', None) if hasattr(event, 'task') and event.task else None,
|
||||
content={
|
||||
"task_id": getattr(event.task, 'id', None) if hasattr(event, 'task') and event.task else None,
|
||||
"context": getattr(event, 'context', None),
|
||||
}
|
||||
)
|
||||
self.trace.add_step(step)
|
||||
|
||||
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
|
||||
if not self.is_collecting:
|
||||
return
|
||||
|
||||
step = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="task_completed",
|
||||
task_description=getattr(event.task, 'description', None) if hasattr(event, 'task') and event.task else None,
|
||||
content={
|
||||
"task_id": getattr(event.task, 'id', None) if hasattr(event, 'task') and event.task else None,
|
||||
"output": event.output.raw if hasattr(event, 'output') and event.output else None,
|
||||
}
|
||||
)
|
||||
self.trace.add_step(step)
|
||||
310
tests/test_execution_trace.py
Normal file
310
tests/test_execution_trace.py
Normal file
@@ -0,0 +1,310 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from datetime import datetime
|
||||
|
||||
from crewai import Agent, Crew, Task, Process
|
||||
from crewai.crews.execution_trace import ExecutionStep, ExecutionTrace
|
||||
from crewai.utilities.execution_trace_collector import ExecutionTraceCollector
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
|
||||
|
||||
class MockTool(BaseTool):
|
||||
name: str = "mock_tool"
|
||||
description: str = "A mock tool for testing"
|
||||
|
||||
def _run(self, query: str) -> str:
|
||||
return f"Mock result for: {query}"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_llm():
|
||||
llm = Mock()
|
||||
llm.call.return_value = "Test response"
|
||||
llm.supports_stop_words.return_value = True
|
||||
llm.stop = []
|
||||
return llm
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_agent(mock_llm):
|
||||
return Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
tools=[MockTool()]
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_task(test_agent):
|
||||
return Task(
|
||||
description="Test task description",
|
||||
expected_output="Test expected output",
|
||||
agent=test_agent
|
||||
)
|
||||
|
||||
|
||||
def test_execution_step_creation():
|
||||
"""Test creating an ExecutionStep."""
|
||||
step = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="agent_thought",
|
||||
agent_role="Test Agent",
|
||||
task_description="Test task",
|
||||
content={"thought": "I need to think about this"},
|
||||
metadata={"iteration": 1}
|
||||
)
|
||||
|
||||
assert step.step_type == "agent_thought"
|
||||
assert step.agent_role == "Test Agent"
|
||||
assert step.content["thought"] == "I need to think about this"
|
||||
assert step.metadata["iteration"] == 1
|
||||
|
||||
|
||||
def test_execution_trace_creation():
|
||||
"""Test creating an ExecutionTrace."""
|
||||
trace = ExecutionTrace()
|
||||
|
||||
step1 = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="task_started",
|
||||
content={"task_id": "1"}
|
||||
)
|
||||
|
||||
step2 = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type="agent_thought",
|
||||
agent_role="Test Agent",
|
||||
content={"thought": "Starting work"}
|
||||
)
|
||||
|
||||
trace.add_step(step1)
|
||||
trace.add_step(step2)
|
||||
|
||||
assert trace.total_steps == 2
|
||||
assert len(trace.steps) == 2
|
||||
assert trace.get_steps_by_type("task_started") == [step1]
|
||||
assert trace.get_steps_by_agent("Test Agent") == [step2]
|
||||
|
||||
|
||||
def test_execution_trace_collector():
|
||||
"""Test the ExecutionTraceCollector."""
|
||||
collector = ExecutionTraceCollector()
|
||||
|
||||
collector.start_collecting()
|
||||
assert collector.is_collecting is True
|
||||
assert collector.trace.start_time is not None
|
||||
|
||||
trace = collector.stop_collecting()
|
||||
assert collector.is_collecting is False
|
||||
assert trace.end_time is not None
|
||||
assert isinstance(trace, ExecutionTrace)
|
||||
|
||||
|
||||
@patch('crewai.crew.crewai_event_bus')
|
||||
def test_crew_with_execution_trace_enabled(mock_event_bus, test_agent, test_task, mock_llm):
|
||||
"""Test crew execution with trace_execution=True."""
|
||||
crew = Crew(
|
||||
agents=[test_agent],
|
||||
tasks=[test_task],
|
||||
process=Process.sequential,
|
||||
trace_execution=True
|
||||
)
|
||||
|
||||
with patch.object(test_task, 'execute_sync') as mock_execute:
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
mock_output = TaskOutput(
|
||||
description="Test task description",
|
||||
raw="Test output",
|
||||
agent="Test Agent"
|
||||
)
|
||||
mock_execute.return_value = mock_output
|
||||
|
||||
result = crew.kickoff()
|
||||
|
||||
assert result.execution_trace is not None
|
||||
assert isinstance(result.execution_trace, ExecutionTrace)
|
||||
assert result.execution_trace.start_time is not None
|
||||
assert result.execution_trace.end_time is not None
|
||||
|
||||
|
||||
@patch('crewai.crew.crewai_event_bus')
|
||||
def test_crew_without_execution_trace(mock_event_bus, test_agent, test_task, mock_llm):
|
||||
"""Test crew execution with trace_execution=False (default)."""
|
||||
crew = Crew(
|
||||
agents=[test_agent],
|
||||
tasks=[test_task],
|
||||
process=Process.sequential,
|
||||
trace_execution=False
|
||||
)
|
||||
|
||||
with patch.object(test_task, 'execute_sync') as mock_execute:
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
mock_output = TaskOutput(
|
||||
description="Test task description",
|
||||
raw="Test output",
|
||||
agent="Test Agent"
|
||||
)
|
||||
mock_execute.return_value = mock_output
|
||||
|
||||
result = crew.kickoff()
|
||||
|
||||
assert result.execution_trace is None
|
||||
|
||||
|
||||
def test_execution_trace_with_multiple_agents_and_tasks(mock_llm):
|
||||
"""Test execution trace with multiple agents and tasks."""
|
||||
agent1 = Agent(
|
||||
role="Agent 1",
|
||||
goal="Goal 1",
|
||||
backstory="Backstory 1",
|
||||
llm=mock_llm
|
||||
)
|
||||
|
||||
agent2 = Agent(
|
||||
role="Agent 2",
|
||||
goal="Goal 2",
|
||||
backstory="Backstory 2",
|
||||
llm=mock_llm
|
||||
)
|
||||
|
||||
task1 = Task(
|
||||
description="Task 1",
|
||||
expected_output="Output 1",
|
||||
agent=agent1
|
||||
)
|
||||
|
||||
task2 = Task(
|
||||
description="Task 2",
|
||||
expected_output="Output 2",
|
||||
agent=agent2
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent1, agent2],
|
||||
tasks=[task1, task2],
|
||||
process=Process.sequential,
|
||||
trace_execution=True
|
||||
)
|
||||
|
||||
with patch.object(task1, 'execute_sync') as mock_execute1, \
|
||||
patch.object(task2, 'execute_sync') as mock_execute2:
|
||||
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
mock_output1 = TaskOutput(
|
||||
description="Task 1",
|
||||
raw="Output 1",
|
||||
agent="Agent 1"
|
||||
)
|
||||
|
||||
mock_output2 = TaskOutput(
|
||||
description="Task 2",
|
||||
raw="Output 2",
|
||||
agent="Agent 2"
|
||||
)
|
||||
|
||||
mock_execute1.return_value = mock_output1
|
||||
mock_execute2.return_value = mock_output2
|
||||
|
||||
result = crew.kickoff()
|
||||
|
||||
assert result.execution_trace is not None
|
||||
agent1_steps = result.execution_trace.get_steps_by_agent("Agent 1")
|
||||
agent2_steps = result.execution_trace.get_steps_by_agent("Agent 2")
|
||||
|
||||
assert len(agent1_steps) >= 0
|
||||
assert len(agent2_steps) >= 0
|
||||
|
||||
|
||||
def test_execution_trace_step_types():
|
||||
"""Test that different step types are properly categorized."""
|
||||
trace = ExecutionTrace()
|
||||
|
||||
steps_data = [
|
||||
("task_started", "Task 1", {}),
|
||||
("agent_thought", "Agent 1", {"thought": "I need to analyze this"}),
|
||||
("tool_call_started", "Agent 1", {"tool_name": "search", "args": {"query": "test"}}),
|
||||
("tool_call_completed", "Agent 1", {"tool_name": "search", "output": "results"}),
|
||||
("agent_execution_completed", "Agent 1", {"output": "Final answer"}),
|
||||
("task_completed", "Task 1", {"output": "Task complete"}),
|
||||
]
|
||||
|
||||
for step_type, agent_role, content in steps_data:
|
||||
step = ExecutionStep(
|
||||
timestamp=datetime.now(),
|
||||
step_type=step_type,
|
||||
agent_role=agent_role if "agent" in step_type or "tool" in step_type else None,
|
||||
task_description="Task 1" if "task" in step_type else None,
|
||||
content=content
|
||||
)
|
||||
trace.add_step(step)
|
||||
|
||||
assert len(trace.get_steps_by_type("task_started")) == 1
|
||||
assert len(trace.get_steps_by_type("agent_thought")) == 1
|
||||
assert len(trace.get_steps_by_type("tool_call_started")) == 1
|
||||
assert len(trace.get_steps_by_type("tool_call_completed")) == 1
|
||||
assert len(trace.get_steps_by_type("agent_execution_completed")) == 1
|
||||
assert len(trace.get_steps_by_type("task_completed")) == 1
|
||||
|
||||
agent_steps = trace.get_steps_by_agent("Agent 1")
|
||||
assert len(agent_steps) == 4
|
||||
|
||||
|
||||
def test_execution_trace_with_async_tasks(mock_llm):
|
||||
"""Test execution trace with async tasks."""
|
||||
agent = Agent(
|
||||
role="Async Agent",
|
||||
goal="Async goal",
|
||||
backstory="Async backstory",
|
||||
llm=mock_llm
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Async task",
|
||||
expected_output="Async output",
|
||||
agent=agent,
|
||||
async_execution=True
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
trace_execution=True
|
||||
)
|
||||
|
||||
with patch.object(task, 'execute_async') as mock_execute_async:
|
||||
from concurrent.futures import Future
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
future = Future()
|
||||
mock_output = TaskOutput(
|
||||
description="Async task",
|
||||
raw="Async output",
|
||||
agent="Async Agent"
|
||||
)
|
||||
future.set_result(mock_output)
|
||||
mock_execute_async.return_value = future
|
||||
|
||||
result = crew.kickoff()
|
||||
|
||||
assert result.execution_trace is not None
|
||||
assert isinstance(result.execution_trace, ExecutionTrace)
|
||||
|
||||
|
||||
def test_execution_trace_error_handling():
|
||||
"""Test execution trace handles errors gracefully."""
|
||||
collector = ExecutionTraceCollector()
|
||||
|
||||
collector.start_collecting()
|
||||
|
||||
mock_event = Mock()
|
||||
mock_event.agent = Mock()
|
||||
mock_event.agent.role = "Test Agent"
|
||||
|
||||
collector._handle_agent_started(mock_event)
|
||||
|
||||
trace = collector.stop_collecting()
|
||||
assert isinstance(trace, ExecutionTrace)
|
||||
Reference in New Issue
Block a user