diff --git a/examples/execution_trace_example.py b/examples/execution_trace_example.py new file mode 100644 index 000000000..292554b2f --- /dev/null +++ b/examples/execution_trace_example.py @@ -0,0 +1,49 @@ +from crewai import Agent, Crew, Task, Process, LLM + +researcher = Agent( + role="Researcher", + goal="Research and analyze information", + backstory="You are an expert researcher with years of experience.", + llm=LLM(model="gpt-4o-mini") +) + +writer = Agent( + role="Writer", + goal="Write compelling content", + backstory="You are a skilled writer who creates engaging content.", + llm=LLM(model="gpt-4o-mini") +) + +research_task = Task( + description="Research the latest trends in AI", + expected_output="A comprehensive report on AI trends", + agent=researcher +) + +writing_task = Task( + description="Write an article based on the research", + expected_output="A well-written article about AI trends", + agent=writer +) + +crew = Crew( + agents=[researcher, writer], + tasks=[research_task, writing_task], + process=Process.sequential, + trace_execution=True +) + +result = crew.kickoff(inputs={"topic": "artificial intelligence"}) + +if result.execution_trace: + print(f"Total execution steps: {result.execution_trace.total_steps}") + print(f"Execution duration: {result.execution_trace.end_time - result.execution_trace.start_time}") + + thoughts = result.execution_trace.get_steps_by_type("agent_thought") + print(f"Agent thoughts captured: {len(thoughts)}") + + tool_calls = result.execution_trace.get_steps_by_type("tool_call_started") + print(f"Tool calls made: {len(tool_calls)}") + + for step in result.execution_trace.steps: + print(f"{step.timestamp}: {step.step_type} - {step.agent_role or 'System'}") diff --git a/src/crewai/__init__.py b/src/crewai/__init__.py index b7fee2b81..d997b5321 100644 --- a/src/crewai/__init__.py +++ b/src/crewai/__init__.py @@ -5,6 +5,7 @@ import urllib.request from crewai.agent import Agent from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput +from crewai.crews.execution_trace import ExecutionTrace, ExecutionStep from crewai.flow.flow import Flow from crewai.knowledge.knowledge import Knowledge from crewai.llm import LLM @@ -59,6 +60,8 @@ __all__ = [ "Agent", "Crew", "CrewOutput", + "ExecutionTrace", + "ExecutionStep", "Process", "Task", "LLM", diff --git a/src/crewai/crew.py b/src/crewai/crew.py index e9d4b8d0a..760ec1dda 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -81,6 +81,7 @@ from crewai.utilities.llm_utils import create_llm from crewai.utilities.planning_handler import CrewPlanner from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler from crewai.utilities.training_handler import CrewTrainingHandler +from crewai.utilities.execution_trace_collector import ExecutionTraceCollector warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd") @@ -205,6 +206,9 @@ class Crew(FlowTrackable, BaseModel): default_factory=list, description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.", ) + trace_execution: bool = Field( + default=False, description="Whether to trace the execution steps of the crew" + ) max_rpm: Optional[int] = Field( default=None, description="Maximum number of requests per minute for the crew execution to be respected.", @@ -621,6 +625,11 @@ class Crew(FlowTrackable, BaseModel): self, inputs: Optional[Dict[str, Any]] = None, ) -> CrewOutput: + trace_collector = None + if self.trace_execution: + trace_collector = ExecutionTraceCollector() + trace_collector.start_collecting() + ctx = baggage.set_baggage( "crew_context", CrewContext(id=str(self.id), key=self.key) ) @@ -678,6 +687,10 @@ class Crew(FlowTrackable, BaseModel): result = after_callback(result) self.usage_metrics = self.calculate_usage_metrics() + + if trace_collector: + execution_trace = trace_collector.stop_collecting() + result.execution_trace = execution_trace return result except Exception as e: @@ -1086,6 +1099,7 @@ class Crew(FlowTrackable, BaseModel): json_dict=final_task_output.json_dict, tasks_output=task_outputs, token_usage=token_usage, + execution_trace=None, ) def _process_async_tasks( diff --git a/src/crewai/crews/crew_output.py b/src/crewai/crews/crew_output.py index c9a92a0d0..3e2cf3fb7 100644 --- a/src/crewai/crews/crew_output.py +++ b/src/crewai/crews/crew_output.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, Field from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput from crewai.types.usage_metrics import UsageMetrics +from crewai.crews.execution_trace import ExecutionTrace class CrewOutput(BaseModel): @@ -22,6 +23,9 @@ class CrewOutput(BaseModel): description="Output of each task", default=[] ) token_usage: UsageMetrics = Field(description="Processed token summary", default={}) + execution_trace: Optional[ExecutionTrace] = Field( + description="Detailed execution trace of crew steps", default=None + ) @property def json(self) -> Optional[str]: diff --git a/src/crewai/crews/execution_trace.py b/src/crewai/crews/execution_trace.py new file mode 100644 index 000000000..d952c2746 --- /dev/null +++ b/src/crewai/crews/execution_trace.py @@ -0,0 +1,34 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional, Union +from pydantic import BaseModel, Field + +class ExecutionStep(BaseModel): + """Represents a single step in the crew execution trace.""" + + timestamp: datetime = Field(description="When this step occurred") + step_type: str = Field(description="Type of step: agent_thought, tool_call, tool_result, task_start, task_complete, etc.") + agent_role: Optional[str] = Field(description="Role of the agent performing this step", default=None) + task_description: Optional[str] = Field(description="Description of the task being executed", default=None) + content: Dict[str, Any] = Field(description="Step-specific content (thought, tool args, result, etc.)", default_factory=dict) + metadata: Dict[str, Any] = Field(description="Additional metadata for this step", default_factory=dict) + +class ExecutionTrace(BaseModel): + """Complete execution trace for a crew run.""" + + steps: List[ExecutionStep] = Field(description="Ordered list of execution steps", default_factory=list) + total_steps: int = Field(description="Total number of steps in the trace", default=0) + start_time: Optional[datetime] = Field(description="When execution started", default=None) + end_time: Optional[datetime] = Field(description="When execution completed", default=None) + + def add_step(self, step: ExecutionStep) -> None: + """Add a step to the trace.""" + self.steps.append(step) + self.total_steps = len(self.steps) + + def get_steps_by_type(self, step_type: str) -> List[ExecutionStep]: + """Get all steps of a specific type.""" + return [step for step in self.steps if step.step_type == step_type] + + def get_steps_by_agent(self, agent_role: str) -> List[ExecutionStep]: + """Get all steps performed by a specific agent.""" + return [step for step in self.steps if step.agent_role == agent_role] diff --git a/src/crewai/utilities/execution_trace_collector.py b/src/crewai/utilities/execution_trace_collector.py new file mode 100644 index 000000000..efbee0178 --- /dev/null +++ b/src/crewai/utilities/execution_trace_collector.py @@ -0,0 +1,159 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional +from crewai.crews.execution_trace import ExecutionStep, ExecutionTrace +from crewai.utilities.events.crewai_event_bus import crewai_event_bus +from crewai.utilities.events.agent_events import ( + AgentExecutionStartedEvent, + AgentExecutionCompletedEvent, + AgentLogsExecutionEvent, +) +from crewai.utilities.events.tool_usage_events import ( + ToolUsageStartedEvent, + ToolUsageFinishedEvent, +) +from crewai.utilities.events.task_events import ( + TaskStartedEvent, + TaskCompletedEvent, +) + +class ExecutionTraceCollector: + """Collects execution events and builds an execution trace.""" + + def __init__(self): + self.trace = ExecutionTrace() + self.is_collecting = False + self._event_handlers = [] + + def start_collecting(self) -> None: + """Start collecting execution events.""" + self.is_collecting = True + self.trace = ExecutionTrace(start_time=datetime.now()) + + self._event_handlers = [ + crewai_event_bus.register_handler(TaskStartedEvent, self._handle_task_started), + crewai_event_bus.register_handler(TaskCompletedEvent, self._handle_task_completed), + crewai_event_bus.register_handler(AgentExecutionStartedEvent, self._handle_agent_started), + crewai_event_bus.register_handler(AgentExecutionCompletedEvent, self._handle_agent_completed), + crewai_event_bus.register_handler(AgentLogsExecutionEvent, self._handle_agent_logs), + crewai_event_bus.register_handler(ToolUsageStartedEvent, self._handle_tool_started), + crewai_event_bus.register_handler(ToolUsageFinishedEvent, self._handle_tool_finished), + ] + + def stop_collecting(self) -> ExecutionTrace: + """Stop collecting and return the execution trace.""" + self.is_collecting = False + self.trace.end_time = datetime.now() + + for handler in self._event_handlers: + crewai_event_bus.unregister_handler(handler) + self._event_handlers.clear() + + return self.trace + + + def _handle_agent_started(self, event: AgentExecutionStartedEvent) -> None: + if not self.is_collecting: + return + + step = ExecutionStep( + timestamp=datetime.now(), + step_type="agent_execution_started", + agent_role=event.agent.role if hasattr(event.agent, 'role') else None, + task_description=getattr(event.task, 'description', None) if event.task else None, + content={ + "task_prompt": event.task_prompt, + "tools": [tool.name for tool in event.tools] if event.tools else [], + } + ) + self.trace.add_step(step) + + def _handle_agent_completed(self, event: AgentExecutionCompletedEvent) -> None: + if not self.is_collecting: + return + + step = ExecutionStep( + timestamp=datetime.now(), + step_type="agent_execution_completed", + agent_role=event.agent.role if hasattr(event.agent, 'role') else None, + content={ + "output": event.output, + } + ) + self.trace.add_step(step) + + def _handle_agent_logs(self, event: AgentLogsExecutionEvent) -> None: + if not self.is_collecting: + return + + step = ExecutionStep( + timestamp=datetime.now(), + step_type="agent_thought", + agent_role=event.agent_role, + content={ + "formatted_answer": str(event.formatted_answer), + } + ) + self.trace.add_step(step) + + def _handle_tool_started(self, event: ToolUsageStartedEvent) -> None: + if not self.is_collecting: + return + + step = ExecutionStep( + timestamp=datetime.now(), + step_type="tool_call_started", + agent_role=event.agent_role, + content={ + "tool_name": event.tool_name, + "tool_args": event.tool_args, + "tool_class": event.tool_class, + } + ) + self.trace.add_step(step) + + def _handle_tool_finished(self, event: ToolUsageFinishedEvent) -> None: + if not self.is_collecting: + return + + step = ExecutionStep( + timestamp=datetime.now(), + step_type="tool_call_completed", + agent_role=event.agent_role, + content={ + "tool_name": event.tool_name, + "output": event.output, + "from_cache": event.from_cache, + "duration": (event.finished_at - event.started_at).total_seconds() if hasattr(event, 'started_at') and hasattr(event, 'finished_at') else None, + } + ) + self.trace.add_step(step) + + def _handle_task_started(self, event: TaskStartedEvent) -> None: + if not self.is_collecting: + return + + step = ExecutionStep( + timestamp=datetime.now(), + step_type="task_started", + task_description=getattr(event.task, 'description', None) if hasattr(event, 'task') and event.task else None, + content={ + "task_id": getattr(event.task, 'id', None) if hasattr(event, 'task') and event.task else None, + "context": getattr(event, 'context', None), + } + ) + self.trace.add_step(step) + + def _handle_task_completed(self, event: TaskCompletedEvent) -> None: + if not self.is_collecting: + return + + step = ExecutionStep( + timestamp=datetime.now(), + step_type="task_completed", + task_description=getattr(event.task, 'description', None) if hasattr(event, 'task') and event.task else None, + content={ + "task_id": getattr(event.task, 'id', None) if hasattr(event, 'task') and event.task else None, + "output": event.output.raw if hasattr(event, 'output') and event.output else None, + } + ) + self.trace.add_step(step) diff --git a/tests/test_execution_trace.py b/tests/test_execution_trace.py new file mode 100644 index 000000000..a5f0d2cd6 --- /dev/null +++ b/tests/test_execution_trace.py @@ -0,0 +1,310 @@ +import pytest +from unittest.mock import Mock, patch +from datetime import datetime + +from crewai import Agent, Crew, Task, Process +from crewai.crews.execution_trace import ExecutionStep, ExecutionTrace +from crewai.utilities.execution_trace_collector import ExecutionTraceCollector +from crewai.tools.base_tool import BaseTool + + +class MockTool(BaseTool): + name: str = "mock_tool" + description: str = "A mock tool for testing" + + def _run(self, query: str) -> str: + return f"Mock result for: {query}" + + +@pytest.fixture +def mock_llm(): + llm = Mock() + llm.call.return_value = "Test response" + llm.supports_stop_words.return_value = True + llm.stop = [] + return llm + + +@pytest.fixture +def test_agent(mock_llm): + return Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm=mock_llm, + tools=[MockTool()] + ) + + +@pytest.fixture +def test_task(test_agent): + return Task( + description="Test task description", + expected_output="Test expected output", + agent=test_agent + ) + + +def test_execution_step_creation(): + """Test creating an ExecutionStep.""" + step = ExecutionStep( + timestamp=datetime.now(), + step_type="agent_thought", + agent_role="Test Agent", + task_description="Test task", + content={"thought": "I need to think about this"}, + metadata={"iteration": 1} + ) + + assert step.step_type == "agent_thought" + assert step.agent_role == "Test Agent" + assert step.content["thought"] == "I need to think about this" + assert step.metadata["iteration"] == 1 + + +def test_execution_trace_creation(): + """Test creating an ExecutionTrace.""" + trace = ExecutionTrace() + + step1 = ExecutionStep( + timestamp=datetime.now(), + step_type="task_started", + content={"task_id": "1"} + ) + + step2 = ExecutionStep( + timestamp=datetime.now(), + step_type="agent_thought", + agent_role="Test Agent", + content={"thought": "Starting work"} + ) + + trace.add_step(step1) + trace.add_step(step2) + + assert trace.total_steps == 2 + assert len(trace.steps) == 2 + assert trace.get_steps_by_type("task_started") == [step1] + assert trace.get_steps_by_agent("Test Agent") == [step2] + + +def test_execution_trace_collector(): + """Test the ExecutionTraceCollector.""" + collector = ExecutionTraceCollector() + + collector.start_collecting() + assert collector.is_collecting is True + assert collector.trace.start_time is not None + + trace = collector.stop_collecting() + assert collector.is_collecting is False + assert trace.end_time is not None + assert isinstance(trace, ExecutionTrace) + + +@patch('crewai.crew.crewai_event_bus') +def test_crew_with_execution_trace_enabled(mock_event_bus, test_agent, test_task, mock_llm): + """Test crew execution with trace_execution=True.""" + crew = Crew( + agents=[test_agent], + tasks=[test_task], + process=Process.sequential, + trace_execution=True + ) + + with patch.object(test_task, 'execute_sync') as mock_execute: + from crewai.tasks.task_output import TaskOutput + mock_output = TaskOutput( + description="Test task description", + raw="Test output", + agent="Test Agent" + ) + mock_execute.return_value = mock_output + + result = crew.kickoff() + + assert result.execution_trace is not None + assert isinstance(result.execution_trace, ExecutionTrace) + assert result.execution_trace.start_time is not None + assert result.execution_trace.end_time is not None + + +@patch('crewai.crew.crewai_event_bus') +def test_crew_without_execution_trace(mock_event_bus, test_agent, test_task, mock_llm): + """Test crew execution with trace_execution=False (default).""" + crew = Crew( + agents=[test_agent], + tasks=[test_task], + process=Process.sequential, + trace_execution=False + ) + + with patch.object(test_task, 'execute_sync') as mock_execute: + from crewai.tasks.task_output import TaskOutput + mock_output = TaskOutput( + description="Test task description", + raw="Test output", + agent="Test Agent" + ) + mock_execute.return_value = mock_output + + result = crew.kickoff() + + assert result.execution_trace is None + + +def test_execution_trace_with_multiple_agents_and_tasks(mock_llm): + """Test execution trace with multiple agents and tasks.""" + agent1 = Agent( + role="Agent 1", + goal="Goal 1", + backstory="Backstory 1", + llm=mock_llm + ) + + agent2 = Agent( + role="Agent 2", + goal="Goal 2", + backstory="Backstory 2", + llm=mock_llm + ) + + task1 = Task( + description="Task 1", + expected_output="Output 1", + agent=agent1 + ) + + task2 = Task( + description="Task 2", + expected_output="Output 2", + agent=agent2 + ) + + crew = Crew( + agents=[agent1, agent2], + tasks=[task1, task2], + process=Process.sequential, + trace_execution=True + ) + + with patch.object(task1, 'execute_sync') as mock_execute1, \ + patch.object(task2, 'execute_sync') as mock_execute2: + + from crewai.tasks.task_output import TaskOutput + + mock_output1 = TaskOutput( + description="Task 1", + raw="Output 1", + agent="Agent 1" + ) + + mock_output2 = TaskOutput( + description="Task 2", + raw="Output 2", + agent="Agent 2" + ) + + mock_execute1.return_value = mock_output1 + mock_execute2.return_value = mock_output2 + + result = crew.kickoff() + + assert result.execution_trace is not None + agent1_steps = result.execution_trace.get_steps_by_agent("Agent 1") + agent2_steps = result.execution_trace.get_steps_by_agent("Agent 2") + + assert len(agent1_steps) >= 0 + assert len(agent2_steps) >= 0 + + +def test_execution_trace_step_types(): + """Test that different step types are properly categorized.""" + trace = ExecutionTrace() + + steps_data = [ + ("task_started", "Task 1", {}), + ("agent_thought", "Agent 1", {"thought": "I need to analyze this"}), + ("tool_call_started", "Agent 1", {"tool_name": "search", "args": {"query": "test"}}), + ("tool_call_completed", "Agent 1", {"tool_name": "search", "output": "results"}), + ("agent_execution_completed", "Agent 1", {"output": "Final answer"}), + ("task_completed", "Task 1", {"output": "Task complete"}), + ] + + for step_type, agent_role, content in steps_data: + step = ExecutionStep( + timestamp=datetime.now(), + step_type=step_type, + agent_role=agent_role if "agent" in step_type or "tool" in step_type else None, + task_description="Task 1" if "task" in step_type else None, + content=content + ) + trace.add_step(step) + + assert len(trace.get_steps_by_type("task_started")) == 1 + assert len(trace.get_steps_by_type("agent_thought")) == 1 + assert len(trace.get_steps_by_type("tool_call_started")) == 1 + assert len(trace.get_steps_by_type("tool_call_completed")) == 1 + assert len(trace.get_steps_by_type("agent_execution_completed")) == 1 + assert len(trace.get_steps_by_type("task_completed")) == 1 + + agent_steps = trace.get_steps_by_agent("Agent 1") + assert len(agent_steps) == 4 + + +def test_execution_trace_with_async_tasks(mock_llm): + """Test execution trace with async tasks.""" + agent = Agent( + role="Async Agent", + goal="Async goal", + backstory="Async backstory", + llm=mock_llm + ) + + task = Task( + description="Async task", + expected_output="Async output", + agent=agent, + async_execution=True + ) + + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + trace_execution=True + ) + + with patch.object(task, 'execute_async') as mock_execute_async: + from concurrent.futures import Future + from crewai.tasks.task_output import TaskOutput + + future = Future() + mock_output = TaskOutput( + description="Async task", + raw="Async output", + agent="Async Agent" + ) + future.set_result(mock_output) + mock_execute_async.return_value = future + + result = crew.kickoff() + + assert result.execution_trace is not None + assert isinstance(result.execution_trace, ExecutionTrace) + + +def test_execution_trace_error_handling(): + """Test execution trace handles errors gracefully.""" + collector = ExecutionTraceCollector() + + collector.start_collecting() + + mock_event = Mock() + mock_event.agent = Mock() + mock_event.agent.role = "Test Agent" + + collector._handle_agent_started(mock_event) + + trace = collector.stop_collecting() + assert isinstance(trace, ExecutionTrace)