Add event emission for agent execution lifecycle

- Emit AgentExecutionStarted and AgentExecutionError events
- Update CrewAgentExecutor to use event_bus for tracking agent execution
- Refactor error handling to include event emission
- Minor code formatting improvements in task.py and crew_agent_executor.py
- Fix a typo in test file
This commit is contained in:
Lorenze Jay
2025-02-11 14:35:55 -08:00
parent 9eb5b361dd
commit 3a89b9feab
3 changed files with 40 additions and 21 deletions

View File

@@ -18,6 +18,11 @@ from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N, Printer from crewai.utilities import I18N, Printer
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.events import event_bus
from crewai.utilities.events.agent_events import (
AgentExecutionError,
AgentExecutionStarted,
)
from crewai.utilities.exceptions.context_window_exceeding_exception import ( from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException, LLMContextLengthExceededException,
) )
@@ -85,6 +90,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.llm.stop = list(set(self.llm.stop + self.stop)) self.llm.stop = list(set(self.llm.stop + self.stop))
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
event_bus.emit(
self,
event=AgentExecutionStarted(
agent=self.agent,
task=self.task,
tools=self.tools,
inputs=inputs,
),
)
if "system" in self.prompt: if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs) system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs) user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
@@ -107,11 +121,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
) )
raise raise
except Exception as e: except Exception as e:
self._handle_unknown_error(e)
if e.__class__.__module__.startswith("litellm"): if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors # Do not retry on litellm errors
raise e raise e
else: else:
self._handle_unknown_error(e)
raise e raise e
if self.ask_for_human_input: if self.ask_for_human_input:
@@ -177,6 +191,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_unknown_error(self, exception: Exception) -> None: def _handle_unknown_error(self, exception: Exception) -> None:
"""Handle unknown errors by informing the user.""" """Handle unknown errors by informing the user."""
event_bus.emit(
self,
event=AgentExecutionError(
agent=self.agent, task=self.task, error=str(exception)
),
)
self._printer.print( self._printer.print(
content="An unknown error occurred. Please check the details below.", content="An unknown error occurred. Please check the details below.",
color="red", color="red",

View File

@@ -430,23 +430,23 @@ class Task(BaseModel):
if self.callback: if self.callback:
self.callback(self.output) self.callback(self.output)
crew = self.agent.crew # type: ignore[union-attr] crew = self.agent.crew # type: ignore[union-attr]
if crew and crew.task_callback and crew.task_callback != self.callback: if crew and crew.task_callback and crew.task_callback != self.callback:
crew.task_callback(self.output) crew.task_callback(self.output)
if self._execution_span: if self._execution_span:
self._telemetry.task_ended(self._execution_span, self, agent.crew) self._telemetry.task_ended(self._execution_span, self, agent.crew)
self._execution_span = None self._execution_span = None
if self.output_file: if self.output_file:
content = ( content = (
json_output json_output
if json_output if json_output
else pydantic_output.model_dump_json() else pydantic_output.model_dump_json()
if pydantic_output if pydantic_output
else result else result
) )
self._save_file(content) self._save_file(content)
event_bus.emit(self, TaskCompleted(task=self, output=task_output)) event_bus.emit(self, TaskCompleted(task=self, output=task_output))
return task_output return task_output
except Exception as e: except Exception as e:
@@ -732,10 +732,9 @@ class Task(BaseModel):
file.write(str(result)) file.write(str(result))
except (OSError, IOError) as e: except (OSError, IOError) as e:
raise RuntimeError( raise RuntimeError(
"\n".join([ "\n".join(
f"Failed to save output file: {e}", [f"Failed to save output file: {e}", FILEWRITER_RECOMMENDATION]
FILEWRITER_RECOMMENDATION )
])
) )
return None return None

View File

@@ -190,7 +190,7 @@ def test_agent_emits_execution_started_and_completed_events():
"ask_for_human_input": False, "ask_for_human_input": False,
"input": "Just say hi\n" "input": "Just say hi\n"
"\n" "\n"
"This is the expect criteria for your final answer: hi\n" "This is the expected criteria for your final answer: hi\n"
"you MUST return the actual complete content as the final answer, not a " "you MUST return the actual complete content as the final answer, not a "
"summary.", "summary.",
"tool_names": "", "tool_names": "",