Merge branch 'main' into bugfix/support-tool-calling

This commit is contained in:
Brandon Hancock (bhancock_ai)
2025-02-20 11:46:47 -05:00
committed by GitHub
59 changed files with 15678 additions and 1229 deletions

View File

@@ -19,25 +19,17 @@ from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.events.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
agentops = None
try:
import agentops # type: ignore # Name "agentops" is already defined
from agentops import track_agent # type: ignore
except ImportError:
def track_agent():
def noop(f):
return f
return noop
@track_agent()
class Agent(BaseAgent):
"""Represents an agent in a system.
@@ -240,6 +232,15 @@ class Agent(BaseAgent):
task_prompt = self._use_trained_data(task_prompt=task_prompt)
try:
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
result = self.agent_executor.invoke(
{
"input": task_prompt,
@@ -251,9 +252,25 @@ class Agent(BaseAgent):
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
result = self.execute_task(task, context, tools)
@@ -266,7 +283,10 @@ class Agent(BaseAgent):
for tool_result in self.tools_results: # type: ignore # Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable)
if tool_result.get("result_as_answer", False):
result = tool_result["result"]
crewai_event_bus.emit(
self,
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
)
return result
def create_agent_executor(

View File

@@ -20,8 +20,7 @@ from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.tools_handler import ToolsHandler
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.tools import BaseTool
from crewai.tools.base_tool import Tool
from crewai.tools.base_tool import BaseTool, Tool
from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter
@@ -112,7 +111,7 @@ class BaseAgent(ABC, BaseModel):
default=False,
description="Enable agent to delegate and ask questions among each other.",
)
tools: Optional[List[Any]] = Field(
tools: Optional[List[BaseTool]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: int = Field(

View File

@@ -18,6 +18,12 @@ from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N, Printer
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.events import (
ToolUsageErrorEvent,
ToolUsageStartedEvent,
crewai_event_bus,
)
from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
@@ -107,11 +113,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
raise
except Exception as e:
self._handle_unknown_error(e)
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
else:
self._handle_unknown_error(e)
raise e
if self.ask_for_human_input:
@@ -349,40 +355,68 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
def _execute_tool_and_check_finality(self, agent_action: AgentAction) -> ToolResult:
tool_usage = ToolUsage(
tools_handler=self.tools_handler,
tools=self.tools,
original_tools=self.original_tools,
tools_description=self.tools_description,
tools_names=self.tools_names,
function_calling_llm=self.function_calling_llm,
task=self.task, # type: ignore[arg-type]
agent=self.agent,
action=agent_action,
)
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
if isinstance(tool_calling, ToolUsageErrorException):
tool_result = tool_calling.message
return ToolResult(result=tool_result, result_as_answer=False)
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in self.tool_name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in self.tool_name_to_tool_map
]:
tool_result = tool_usage.use(tool_calling, agent_action.text)
tool = self.tool_name_to_tool_map.get(tool_calling.tool_name)
if tool:
return ToolResult(
result=tool_result, result_as_answer=tool.result_as_answer
)
else:
tool_result = self._i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in self.tools]),
try:
if self.agent:
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
agent_key=self.agent.key,
agent_role=self.agent.role,
tool_name=agent_action.tool,
tool_args=agent_action.tool_input,
tool_class=agent_action.tool,
),
)
return ToolResult(result=tool_result, result_as_answer=False)
tool_usage = ToolUsage(
tools_handler=self.tools_handler,
tools=self.tools,
original_tools=self.original_tools,
tools_description=self.tools_description,
tools_names=self.tools_names,
function_calling_llm=self.function_calling_llm,
task=self.task, # type: ignore[arg-type]
agent=self.agent,
action=agent_action,
)
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
if isinstance(tool_calling, ToolUsageErrorException):
tool_result = tool_calling.message
return ToolResult(result=tool_result, result_as_answer=False)
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in self.tool_name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in self.tool_name_to_tool_map
]:
tool_result = tool_usage.use(tool_calling, agent_action.text)
tool = self.tool_name_to_tool_map.get(tool_calling.tool_name)
if tool:
return ToolResult(
result=tool_result, result_as_answer=tool.result_as_answer
)
else:
tool_result = self._i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in self.tools]),
)
return ToolResult(result=tool_result, result_as_answer=False)
except Exception as e:
# TODO: drop
if self.agent:
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent( # validation error
agent_key=self.agent.key,
agent_role=self.agent.role,
tool_name=agent_action.tool,
tool_args=agent_action.tool_input,
tool_class=agent_action.tool,
error=str(e),
),
)
raise e
def _summarize_messages(self) -> None:
messages_groups = []

View File

@@ -38,11 +38,24 @@ from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.traces.unified_trace_controller import init_crew_main_trace
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.events.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -52,12 +65,6 @@ from crewai.utilities.planning_handler import CrewPlanner
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
try:
import agentops # type: ignore
except ImportError:
agentops = None
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -521,10 +528,19 @@ class Crew(BaseModel):
self, n_iterations: int, filename: str, inputs: Optional[Dict[str, Any]] = {}
) -> None:
"""Trains the crew for a given number of iterations."""
train_crew = self.copy()
train_crew._setup_for_training(filename)
try:
crewai_event_bus.emit(
self,
CrewTrainStartedEvent(
crew_name=self.name or "crew",
n_iterations=n_iterations,
filename=filename,
inputs=inputs,
),
)
train_crew = self.copy()
train_crew._setup_for_training(filename)
for n_iteration in range(n_iterations):
train_crew._train_iteration = n_iteration
train_crew.kickoff(inputs=inputs)
@@ -539,70 +555,94 @@ class Crew(BaseModel):
CrewTrainingHandler(filename).save_trained_data(
agent_id=str(agent.role), trained_data=result.model_dump()
)
crewai_event_bus.emit(
self,
CrewTrainCompletedEvent(
crew_name=self.name or "crew",
n_iterations=n_iterations,
filename=filename,
),
)
except Exception as e:
crewai_event_bus.emit(
self,
CrewTrainFailedEvent(error=str(e), crew_name=self.name or "crew"),
)
self._logger.log("error", f"Training failed: {e}", color="red")
CrewTrainingHandler(TRAINING_DATA_FILE).clear()
CrewTrainingHandler(filename).clear()
raise
@init_crew_main_trace
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
for before_callback in self.before_kickoff_callbacks:
if inputs is None:
inputs = {}
inputs = before_callback(inputs)
try:
for before_callback in self.before_kickoff_callbacks:
if inputs is None:
inputs = {}
inputs = before_callback(inputs)
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
self._task_output_handler.reset()
self._logging_color = "bold_purple"
if inputs is not None:
self._inputs = inputs
self._interpolate_inputs(inputs)
self._set_tasks_callbacks()
i18n = I18N(prompt_file=self.prompt_file)
for agent in self.agents:
agent.i18n = i18n
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined]
# TODO: Create an AgentFunctionCalling protocol for future refactoring
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
if not agent.step_callback: # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.step_callback = self.step_callback # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.create_agent_executor()
if self.planning:
self._handle_crew_planning()
metrics: List[UsageMetrics] = []
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result = self._run_hierarchical_process()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
crewai_event_bus.emit(
self,
CrewKickoffStartedEvent(crew_name=self.name or "crew", inputs=inputs),
)
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
# Starts the crew to work on its assigned tasks.
self._task_output_handler.reset()
self._logging_color = "bold_purple"
metrics += [agent._token_process.get_summary() for agent in self.agents]
if inputs is not None:
self._inputs = inputs
self._interpolate_inputs(inputs)
self._set_tasks_callbacks()
self.usage_metrics = UsageMetrics()
for metric in metrics:
self.usage_metrics.add_usage_metrics(metric)
i18n = I18N(prompt_file=self.prompt_file)
return result
for agent in self.agents:
agent.i18n = i18n
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined]
# TODO: Create an AgentFunctionCalling protocol for future refactoring
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
if not agent.step_callback: # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.step_callback = self.step_callback # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.create_agent_executor()
if self.planning:
self._handle_crew_planning()
metrics: List[UsageMetrics] = []
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result = self._run_hierarchical_process()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
metrics += [agent._token_process.get_summary() for agent in self.agents]
self.usage_metrics = UsageMetrics()
for metric in metrics:
self.usage_metrics.add_usage_metrics(metric)
return result
except Exception as e:
crewai_event_bus.emit(
self,
CrewKickoffFailedEvent(error=str(e), crew_name=self.name or "crew"),
)
raise
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
"""Executes the Crew's workflow for each input in the list and aggregates results."""
@@ -950,7 +990,12 @@ class Crew(BaseModel):
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
crewai_event_bus.emit(
self,
CrewKickoffCompletedEvent(
crew_name=self.name or "crew", output=final_task_output
),
)
return CrewOutput(
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
@@ -1136,13 +1181,6 @@ class Crew(BaseModel):
def _finish_execution(self, final_string_output: str) -> None:
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
if agentops:
agentops.end_session(
end_state="Success",
end_state_reason="Finished Execution",
is_auto_end=True,
)
self._telemetry.end_crew(self, final_string_output)
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
@@ -1164,26 +1202,41 @@ class Crew(BaseModel):
inputs: Optional[Dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
test_crew = self.copy()
try:
eval_llm = create_llm(eval_llm)
if not eval_llm:
raise ValueError("Failed to create LLM instance.")
eval_llm = create_llm(eval_llm)
crewai_event_bus.emit(
self,
CrewTestStartedEvent(
crew_name=self.name or "crew",
n_iterations=n_iterations,
eval_llm=eval_llm,
inputs=inputs,
),
)
test_crew = self.copy()
evaluator = CrewEvaluator(test_crew, eval_llm) # type: ignore[arg-type]
if not eval_llm:
raise ValueError("Failed to create LLM instance.")
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)
test_crew.kickoff(inputs=inputs)
self._test_execution_span = test_crew._telemetry.test_execution_span(
test_crew,
n_iterations,
inputs,
eval_llm.model, # type: ignore[arg-type]
) # type: ignore[arg-type]
evaluator = CrewEvaluator(test_crew, eval_llm) # type: ignore[arg-type]
evaluator.print_crew_evaluation_result()
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)
test_crew.kickoff(inputs=inputs)
evaluator.print_crew_evaluation_result()
crewai_event_bus.emit(
self,
CrewTestCompletedEvent(
crew_name=self.name or "crew",
),
)
except Exception as e:
crewai_event_bus.emit(
self,
CrewTestFailedEvent(error=str(e), crew_name=self.name or "crew"),
)
raise
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"

View File

@@ -17,19 +17,25 @@ from typing import (
)
from uuid import uuid4
from blinker import Signal
from pydantic import BaseModel, Field, ValidationError
from crewai.flow.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry
from crewai.traces.unified_trace_controller import (
init_flow_main_trace,
trace_flow_step,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
@@ -427,7 +433,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
Type parameter T must be either Dict[str, Any] or a subclass of BaseModel."""
_telemetry = Telemetry()
_printer = Printer()
_start_methods: List[str] = []
@@ -435,7 +440,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
_routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
event_emitter = Signal("event_emitter")
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
@@ -469,7 +473,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
if kwargs:
self._initialize_state(kwargs)
self._telemetry.flow_creation_span(self.__class__.__name__)
crewai_event_bus.emit(
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self.__class__.__name__,
),
)
# Register all flow-related methods
for method_name in dir(self):
@@ -738,9 +748,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._initialize_state(filtered_inputs)
# Start flow execution
self.event_emitter.send(
crewai_event_bus.emit(
self,
event=FlowStartedEvent(
FlowStartedEvent(
type="flow_started",
flow_name=self.__class__.__name__,
inputs=inputs,
@@ -753,16 +763,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
return asyncio.run(self.kickoff_async())
async def run_flow():
return await self.kickoff_async()
return asyncio.run(run_flow())
@init_flow_main_trace
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")
self._telemetry.flow_execution_span(
self.__class__.__name__, list(self._methods.keys())
)
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
@@ -771,9 +781,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
final_output = self._method_outputs[-1] if self._method_outputs else None
self.event_emitter.send(
crewai_event_bus.emit(
self,
event=FlowFinishedEvent(
FlowFinishedEvent(
type="flow_finished",
flow_name=self.__class__.__name__,
result=final_output,
@@ -804,43 +814,59 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
await self._execute_listeners(start_method_name, result)
@trace_flow_step
async def _execute_method(
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {})
self.event_emitter.send(
self,
event=MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
)
try:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (
kwargs or {}
)
crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
)
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
self.event_emitter.send(
self,
event=MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
state=self._copy_state(),
result=result,
),
)
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
return result
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
state=self._copy_state(),
result=result,
),
)
return result
except Exception as e:
crewai_event_bus.emit(
self,
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.__class__.__name__,
error=e,
),
)
raise e
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
"""
@@ -978,6 +1004,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""
try:
method = self._methods[listener_name]
sig = inspect.signature(method)
params = list(sig.parameters.values())
method_params = [p for p in params if p.name != "self"]
@@ -1027,7 +1054,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
logger.warning(message)
def plot(self, filename: str = "crewai_flow") -> None:
self._telemetry.flow_plotting_span(
self.__class__.__name__, list(self._methods.keys())
crewai_event_bus.emit(
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self.__class__.__name__,
),
)
plot_flow(self, filename)

View File

@@ -1,39 +0,0 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel
@dataclass
class Event:
type: str
flow_name: str
timestamp: datetime = field(init=False)
def __post_init__(self):
self.timestamp = datetime.now()
@dataclass
class FlowStartedEvent(Event):
inputs: Optional[Dict[str, Any]] = None
@dataclass
class MethodExecutionStartedEvent(Event):
method_name: str
state: Union[Dict[str, Any], BaseModel]
params: Optional[Dict[str, Any]] = None
@dataclass
class MethodExecutionFinishedEvent(Event):
method_name: str
state: Union[Dict[str, Any], BaseModel]
result: Any = None
@dataclass
class FlowFinishedEvent(Event):
result: Optional[Any] = None

View File

@@ -58,7 +58,7 @@ class PersistenceDecorator:
_printer = Printer() # Class-level printer instance
@classmethod
def persist_state(cls, flow_instance: Any, method_name: str, persistence_instance: FlowPersistence) -> None:
def persist_state(cls, flow_instance: Any, method_name: str, persistence_instance: FlowPersistence, verbose: bool = False) -> None:
"""Persist flow state with proper error handling and logging.
This method handles the persistence of flow state data, including proper
@@ -68,6 +68,7 @@ class PersistenceDecorator:
flow_instance: The flow instance whose state to persist
method_name: Name of the method that triggered persistence
persistence_instance: The persistence backend to use
verbose: Whether to log persistence operations
Raises:
ValueError: If flow has no state or state lacks an ID
@@ -88,9 +89,10 @@ class PersistenceDecorator:
if not flow_uuid:
raise ValueError("Flow state must have an 'id' field for persistence")
# Log state saving with consistent message
cls._printer.print(LOG_MESSAGES["save_state"].format(flow_uuid), color="cyan")
logger.info(LOG_MESSAGES["save_state"].format(flow_uuid))
# Log state saving only if verbose is True
if verbose:
cls._printer.print(LOG_MESSAGES["save_state"].format(flow_uuid), color="cyan")
logger.info(LOG_MESSAGES["save_state"].format(flow_uuid))
try:
persistence_instance.save_state(
@@ -115,7 +117,7 @@ class PersistenceDecorator:
raise ValueError(error_msg) from e
def persist(persistence: Optional[FlowPersistence] = None):
def persist(persistence: Optional[FlowPersistence] = None, verbose: bool = False):
"""Decorator to persist flow state.
This decorator can be applied at either the class level or method level.
@@ -126,6 +128,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
Args:
persistence: Optional FlowPersistence implementation to use.
If not provided, uses SQLiteFlowPersistence.
verbose: Whether to log persistence operations. Defaults to False.
Returns:
A decorator that can be applied to either a class or method
@@ -135,13 +138,12 @@ def persist(persistence: Optional[FlowPersistence] = None):
RuntimeError: If state persistence fails
Example:
@persist # Class-level persistence with default SQLite
@persist(verbose=True) # Class-level persistence with logging
class MyFlow(Flow[MyState]):
@start()
def begin(self):
pass
"""
def decorator(target: Union[Type, Callable[..., T]]) -> Union[Type, Callable[..., T]]:
"""Decorator that handles both class and method decoration."""
actual_persistence = persistence or SQLiteFlowPersistence()
@@ -179,7 +181,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
@functools.wraps(original_method)
async def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = await original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(self, method_name, actual_persistence)
PersistenceDecorator.persist_state(self, method_name, actual_persistence, verbose)
return result
return method_wrapper
@@ -199,7 +201,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
@functools.wraps(original_method)
def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(self, method_name, actual_persistence)
PersistenceDecorator.persist_state(self, method_name, actual_persistence, verbose)
return result
return method_wrapper
@@ -228,7 +230,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
result = await method_coro
else:
result = method_coro
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence)
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence, verbose)
return result
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:
@@ -240,7 +242,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
@functools.wraps(method)
def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
result = method(flow_instance, *args, **kwargs)
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence)
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence, verbose)
return result
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:

View File

@@ -1,12 +1,18 @@
import json
from datetime import date, datetime
from typing import Any
from typing import Any, Dict, List, Union
from pydantic import BaseModel
from crewai.flow import Flow
SerializablePrimitive = Union[str, int, float, bool, None]
Serializable = Union[
SerializablePrimitive, List["Serializable"], Dict[str, "Serializable"]
]
def export_state(flow: Flow) -> dict[str, Any]:
def export_state(flow: Flow) -> dict[str, Serializable]:
"""Exports the Flow's internal state as JSON-compatible data structures.
Performs a one-way transformation of a Flow's state into basic Python types
@@ -20,10 +26,27 @@ def export_state(flow: Flow) -> dict[str, Any]:
dict[str, Any]: The transformed state using JSON-compatible Python
types.
"""
return _to_serializable(flow._state)
result = to_serializable(flow._state)
assert isinstance(result, dict)
return result
def _to_serializable(obj: Any, max_depth: int = 5, _current_depth: int = 0) -> Any:
def to_serializable(
obj: Any, max_depth: int = 5, _current_depth: int = 0
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.
Supports primitives, datetime objects, collections, dictionaries, and
Pydantic models. Recursion depth is limited to prevent infinite nesting.
Non-convertible objects default to their string representations.
Args:
obj (Any): Object to transform.
max_depth (int, optional): Maximum recursion depth. Defaults to 5.
Returns:
Serializable: A JSON-compatible structure.
"""
if _current_depth >= max_depth:
return repr(obj)
@@ -32,16 +55,16 @@ def _to_serializable(obj: Any, max_depth: int = 5, _current_depth: int = 0) -> A
elif isinstance(obj, (date, datetime)):
return obj.isoformat()
elif isinstance(obj, (list, tuple, set)):
return [_to_serializable(item, max_depth, _current_depth + 1) for item in obj]
return [to_serializable(item, max_depth, _current_depth + 1) for item in obj]
elif isinstance(obj, dict):
return {
_to_serializable_key(key): _to_serializable(
_to_serializable_key(key): to_serializable(
value, max_depth, _current_depth + 1
)
for key, value in obj.items()
}
elif isinstance(obj, BaseModel):
return _to_serializable(obj.model_dump(), max_depth, _current_depth + 1)
return to_serializable(obj.model_dump(), max_depth, _current_depth + 1)
else:
return repr(obj)
@@ -50,3 +73,19 @@ def _to_serializable_key(key: Any) -> str:
if isinstance(key, (str, int)):
return str(key)
return f"key_{id(key)}_{repr(key)}"
def to_string(obj: Any) -> str | None:
"""Serializes an object into a JSON string.
Args:
obj (Any): Object to serialize.
Returns:
str | None: A JSON-formatted string or `None` if empty.
"""
serializable = to_serializable(obj)
if serializable is None:
return None
else:
return json.dumps(serializable)

View File

@@ -76,7 +76,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
"context": fetched["documents"][0][i], # type: ignore
"score": fetched["distances"][0][i], # type: ignore
}
if result["score"] >= score_threshold: # type: ignore
if result["score"] >= score_threshold:
results.append(result)
return results
else:

View File

@@ -1,3 +1,4 @@
import inspect
import json
import logging
import os
@@ -5,11 +6,23 @@ import sys
import threading
import warnings
from contextlib import contextmanager
from typing import Any, Dict, List, Literal, Optional, Type, Union, cast
from typing import (
Any,
Dict,
List,
Literal,
Optional,
Tuple,
Type,
Union,
cast,
)
from dotenv import load_dotenv
from pydantic import BaseModel
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
import litellm
@@ -18,9 +31,12 @@ with warnings.catch_warnings():
from litellm.utils import supports_response_schema
from crewai.traces.unified_trace_controller import trace_llm_call
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.protocols import AgentExecutorProtocol
load_dotenv()
@@ -164,6 +180,7 @@ class LLM:
self.context_window_size = 0
self.reasoning_effort = reasoning_effort
self.additional_params = kwargs
self._message_history: List[Dict[str, str]] = []
self.is_anthropic = self._is_anthropic_model(model)
litellm.drop_params = True
@@ -179,6 +196,12 @@ class LLM:
self.set_callbacks(callbacks)
self.set_env_callbacks()
@trace_llm_call
def _call_llm(self, params: Dict[str, Any]) -> Any:
with suppress_warnings():
response = litellm.completion(**params)
return response
def _is_anthropic_model(self, model: str) -> bool:
"""Determine if the model is from Anthropic provider.
@@ -288,7 +311,7 @@ class LLM:
params = {k: v for k, v in params.items() if v is not None}
# --- 2) Make the completion call
response = litellm.completion(**params)
response = self._call_llm(params)
response_message = cast(Choices, cast(ModelResponse, response).choices)[
0
].message
@@ -315,7 +338,7 @@ class LLM:
# --- 5) Handle the tool call
tool_call = tool_calls[0]
function_name = tool_call.function.name
print("function_name", function_name)
if function_name in available_functions:
try:
function_args = json.loads(tool_call.function.arguments)
@@ -333,6 +356,15 @@ class LLM:
logging.error(
f"Error executing function '{function_name}': {e}"
)
crewai_event_bus.emit(
self,
event=ToolExecutionErrorEvent(
tool_name=function_name,
tool_args=function_args,
tool_class=fn,
error=str(e),
),
)
return text_response
else:
@@ -499,3 +531,95 @@ class LLM:
litellm.success_callback = success_callbacks
litellm.failure_callback = failure_callbacks
def _get_execution_context(self) -> Tuple[Optional[Any], Optional[Any]]:
"""Get the agent and task from the execution context.
Returns:
tuple: (agent, task) from any AgentExecutor context, or (None, None) if not found
"""
frame = inspect.currentframe()
caller_frame = frame.f_back if frame else None
agent = None
task = None
# Add a maximum depth to prevent infinite loops
max_depth = 100 # Reasonable limit for call stack depth
current_depth = 0
while caller_frame and current_depth < max_depth:
if "self" in caller_frame.f_locals:
caller_self = caller_frame.f_locals["self"]
if isinstance(caller_self, AgentExecutorProtocol):
agent = caller_self.agent
task = caller_self.task
break
caller_frame = caller_frame.f_back
current_depth += 1
return agent, task
def _get_new_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Get only the new messages that haven't been processed before."""
if not hasattr(self, "_message_history"):
self._message_history = []
new_messages = []
for message in messages:
message_key = (message["role"], message["content"])
if message_key not in [
(m["role"], m["content"]) for m in self._message_history
]:
new_messages.append(message)
self._message_history.append(message)
return new_messages
def _get_new_tool_results(self, agent) -> List[Dict]:
"""Get only the new tool results that haven't been processed before."""
if not agent or not agent.tools_results:
return []
if not hasattr(self, "_tool_results_history"):
self._tool_results_history: List[Dict] = []
new_tool_results = []
for result in agent.tools_results:
# Process tool arguments to extract actual values
processed_args = {}
if isinstance(result["tool_args"], dict):
for key, value in result["tool_args"].items():
if isinstance(value, dict) and "type" in value:
# Skip metadata and just store the actual value
continue
processed_args[key] = value
# Create a clean result with processed arguments
clean_result = {
"tool_name": result["tool_name"],
"tool_args": processed_args,
"result": result["result"],
"content": result.get("content", ""),
"start_time": result.get("start_time", ""),
}
# Check if this exact tool execution exists in history
is_duplicate = False
for history_result in self._tool_results_history:
if (
clean_result["tool_name"] == history_result["tool_name"]
and str(clean_result["tool_args"])
== str(history_result["tool_args"])
and str(clean_result["result"]) == str(history_result["result"])
and clean_result["content"] == history_result.get("content", "")
and clean_result["start_time"]
== history_result.get("start_time", "")
):
is_duplicate = True
break
if not is_duplicate:
new_tool_results.append(clean_result)
self._tool_results_history.append(clean_result)
return new_tool_results

View File

@@ -21,7 +21,6 @@ from typing import (
Union,
)
from opentelemetry.trace import Span
from pydantic import (
UUID4,
BaseModel,
@@ -36,10 +35,15 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.guardrail_result import GuardrailResult
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
@@ -183,8 +187,6 @@ class Task(BaseModel):
)
return v
_telemetry: Telemetry = PrivateAttr(default_factory=Telemetry)
_execution_span: Optional[Span] = PrivateAttr(default=None)
_original_description: Optional[str] = PrivateAttr(default=None)
_original_expected_output: Optional[str] = PrivateAttr(default=None)
_original_output_file: Optional[str] = PrivateAttr(default=None)
@@ -348,100 +350,102 @@ class Task(BaseModel):
tools: Optional[List[Any]],
) -> TaskOutput:
"""Run the core execution logic of the task."""
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
try:
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
self.start_time = datetime.datetime.now()
self.prompt_context = context
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
crewai_event_bus.emit(self, TaskStartedEvent(context=context))
result = agent.execute_task(
task=self,
context=context,
tools=tools,
)
self.start_time = datetime.datetime.now()
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(
name=self.name,
description=self.description,
expected_output=self.expected_output,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
self.prompt_context = context
tools = tools or self.tools or []
if self.guardrail:
guardrail_result = GuardrailResult.from_tuple(
self.guardrail(task_output)
)
if not guardrail_result.success:
if self.retry_count >= self.max_retries:
raise Exception(
f"Task failed guardrail validation after {self.max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
self.processed_by_agents.add(agent.role)
self.retry_count += 1
context = self.i18n.errors("validation_error").format(
guardrail_result_error=guardrail_result.error,
task_output=task_output.raw,
)
printer = Printer()
printer.print(
content=f"Guardrail blocked, retrying, due to: {guardrail_result.error}\n",
color="yellow",
)
return self._execute_core(agent, context, tools)
result = agent.execute_task(
task=self,
context=context,
tools=tools,
)
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(
name=self.name,
description=self.description,
expected_output=self.expected_output,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
if self.guardrail:
guardrail_result = GuardrailResult.from_tuple(self.guardrail(task_output))
if not guardrail_result.success:
if self.retry_count >= self.max_retries:
if guardrail_result.result is None:
raise Exception(
f"Task failed guardrail validation after {self.max_retries} retries. "
f"Last error: {guardrail_result.error}"
"Task guardrail returned None as result. This is not allowed."
)
self.retry_count += 1
context = self.i18n.errors("validation_error").format(
guardrail_result_error=guardrail_result.error,
task_output=task_output.raw,
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result
self.output = task_output
self.end_time = datetime.datetime.now()
if self.callback:
self.callback(self.output)
crew = self.agent.crew # type: ignore[union-attr]
if crew and crew.task_callback and crew.task_callback != self.callback:
crew.task_callback(self.output)
if self.output_file:
content = (
json_output
if json_output
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
printer = Printer()
printer.print(
content=f"Guardrail blocked, retrying, due to: {guardrail_result.error}\n",
color="yellow",
)
return self._execute_core(agent, context, tools)
if guardrail_result.result is None:
raise Exception(
"Task guardrail returned None as result. This is not allowed."
)
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result
self.output = task_output
self.end_time = datetime.datetime.now()
if self.callback:
self.callback(self.output)
crew = self.agent.crew # type: ignore[union-attr]
if crew and crew.task_callback and crew.task_callback != self.callback:
crew.task_callback(self.output)
if self._execution_span:
self._telemetry.task_ended(self._execution_span, self, agent.crew)
self._execution_span = None
if self.output_file:
content = (
json_output
if json_output
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)
return task_output
self._save_file(content)
crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output))
return task_output
except Exception as e:
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e)))
raise e # Re-raise the exception after emitting the event
def prompt(self) -> str:
"""Prompt the task.
@@ -716,10 +720,9 @@ class Task(BaseModel):
file.write(str(result))
except (OSError, IOError) as e:
raise RuntimeError(
"\n".join([
f"Failed to save output file: {e}",
FILEWRITER_RECOMMENDATION
])
"\n".join(
[f"Failed to save output file: {e}", FILEWRITER_RECOMMENDATION]
)
)
return None

View File

@@ -2,6 +2,7 @@ import ast
import datetime
import json
import time
from datetime import UTC
from difflib import SequenceMatcher
from json import JSONDecodeError
from textwrap import dedent
@@ -10,20 +11,21 @@ from typing import Any, Dict, List, Optional, Union
import json5
from json_repair import repair_json
import crewai.utilities.events as events
from crewai.agents.tools_handler import ToolsHandler
from crewai.task import Task
from crewai.telemetry import Telemetry
from crewai.tools import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.tools.tool_usage_events import ToolUsageError, ToolUsageFinished
from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.tool_usage_events import (
ToolSelectionErrorEvent,
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolValidateInputErrorEvent,
)
try:
import agentops # type: ignore
except ImportError:
agentops = None
OPENAI_BIGGER_MODELS = [
"gpt-4",
"gpt-4o",
@@ -116,7 +118,10 @@ class ToolUsage:
self._printer.print(content=f"\n\n{error}\n", color="red")
return error
if isinstance(tool, CrewStructuredTool) and tool.name == self._i18n.tools("add_image")["name"]: # type: ignore
if (
isinstance(tool, CrewStructuredTool)
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
):
try:
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
return result
@@ -136,7 +141,6 @@ class ToolUsage:
tool: Any,
calling: Union[ToolCalling, InstructorToolCalling],
) -> str: # TODO: Fix this return type
tool_event = agentops.ToolEvent(name=calling.tool_name) if agentops else None # type: ignore
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
try:
result = self._i18n.errors("task_repeated_usage").format(
@@ -154,6 +158,7 @@ class ToolUsage:
self.task.increment_tools_errors()
started_at = time.time()
started_at_trace = datetime.datetime.now(UTC)
from_cache = False
result = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
@@ -181,7 +186,9 @@ class ToolUsage:
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()["properties"].keys() # type: ignore
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys() # type: ignore
arguments = {
k: v
for k, v in calling.arguments.items()
@@ -202,7 +209,7 @@ class ToolUsage:
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageErrorException(
f'\n{error_message}.\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
self.task.increment_tools_errors()
if self.agent.verbose:
@@ -212,10 +219,6 @@ class ToolUsage:
return error # type: ignore # No return value expected
self.task.increment_tools_errors()
if agentops:
agentops.record(
agentops.ErrorEvent(exception=e, trigger_event=tool_event)
)
return self.use(calling=calling, tool_string=tool_string) # type: ignore # No return value expected
if self.tools_handler:
@@ -231,9 +234,6 @@ class ToolUsage:
self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache
)
if agentops:
agentops.record(tool_event)
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
@@ -244,6 +244,7 @@ class ToolUsage:
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
"start_time": started_at_trace,
}
self.on_tool_use_finished(
@@ -308,14 +309,33 @@ class ToolUsage:
):
return tool
self.task.increment_tools_errors()
tool_selection_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": tool_name,
"tool_args": {},
"tool_class": self.tools_description,
}
if tool_name and tool_name != "":
raise Exception(
f"Action '{tool_name}' don't exist, these are the only available Actions:\n{self.tools_description}"
error = f"Action '{tool_name}' don't exist, these are the only available Actions:\n{self.tools_description}"
crewai_event_bus.emit(
self,
ToolSelectionErrorEvent(
**tool_selection_data,
error=error,
),
)
raise Exception(error)
else:
raise Exception(
f"I forgot the Action name, these are the only available Actions: {self.tools_description}"
error = f"I forgot the Action name, these are the only available Actions: {self.tools_description}"
crewai_event_bus.emit(
self,
ToolSelectionErrorEvent(
**tool_selection_data,
error=error,
),
)
raise Exception(error)
def _render(self) -> str:
"""Render the tool name and description in plain text."""
@@ -368,7 +388,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f'{self._i18n.errors("tool_arguments_error")}'
f"{self._i18n.errors('tool_arguments_error')}"
)
if not isinstance(arguments, dict):
@@ -376,7 +396,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f'{self._i18n.errors("tool_arguments_error")}'
f"{self._i18n.errors('tool_arguments_error')}"
)
return ToolCalling(
@@ -404,7 +424,7 @@ class ToolUsage:
if self.agent.verbose:
self._printer.print(content=f"\n\n{e}\n", color="red")
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
f'{self._i18n.errors("tool_usage_error").format(error=e)}\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
)
return self._tool_calling(tool_string)
@@ -451,18 +471,33 @@ class ToolUsage:
if isinstance(arguments, dict):
return arguments
except Exception as e:
self._printer.print(content=f"Failed to repair JSON: {e}", color="red")
error = f"Failed to repair JSON: {e}"
self._printer.print(content=error, color="red")
# If all parsing attempts fail, raise an error
raise Exception(
error_message = (
"Tool input must be a valid dictionary in JSON or Python literal format"
)
self._emit_validate_input_error(error_message)
# If all parsing attempts fail, raise an error
raise Exception(error_message)
def _emit_validate_input_error(self, final_error: str):
tool_selection_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": self.action.tool,
"tool_args": str(self.action.tool_input),
"tool_class": self.__class__.__name__,
}
crewai_event_bus.emit(
self,
ToolValidateInputErrorEvent(**tool_selection_data, error=final_error),
)
def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None:
event_data = self._prepare_event_data(tool, tool_calling)
events.emit(
source=self, event=ToolUsageError(**{**event_data, "error": str(e)})
)
crewai_event_bus.emit(self, ToolUsageErrorEvent(**{**event_data, "error": e}))
def on_tool_use_finished(
self, tool: Any, tool_calling: ToolCalling, from_cache: bool, started_at: float
@@ -476,7 +511,7 @@ class ToolUsage:
"from_cache": from_cache,
}
)
events.emit(source=self, event=ToolUsageFinished(**event_data))
crewai_event_bus.emit(self, ToolUsageFinishedEvent(**event_data))
def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict:
return {

View File

@@ -1,24 +0,0 @@
from datetime import datetime
from typing import Any, Dict
from pydantic import BaseModel
class ToolUsageEvent(BaseModel):
agent_key: str
agent_role: str
tool_name: str
tool_args: Dict[str, Any]
tool_class: str
run_attempts: int | None = None
delegations: int | None = None
class ToolUsageFinished(ToolUsageEvent):
started_at: datetime
finished_at: datetime
from_cache: bool = False
class ToolUsageError(ToolUsageEvent):
error: str

View File

View File

@@ -0,0 +1,39 @@
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Generator
class TraceContext:
"""Maintains the current trace context throughout the execution stack.
This class provides a context manager for tracking trace execution across
async and sync code paths using ContextVars.
"""
_context: ContextVar = ContextVar("trace_context", default=None)
@classmethod
def get_current(cls):
"""Get the current trace context.
Returns:
Optional[UnifiedTraceController]: The current trace controller or None if not set.
"""
return cls._context.get()
@classmethod
@contextmanager
def set_current(cls, trace):
"""Set the current trace context within a context manager.
Args:
trace: The trace controller to set as current.
Yields:
UnifiedTraceController: The current trace controller.
"""
token = cls._context.set(trace)
try:
yield trace
finally:
cls._context.reset(token)

View File

@@ -0,0 +1,19 @@
from enum import Enum
class TraceType(Enum):
LLM_CALL = "llm_call"
TOOL_CALL = "tool_call"
FLOW_STEP = "flow_step"
START_CALL = "start_call"
class RunType(Enum):
KICKOFF = "kickoff"
TRAIN = "train"
TEST = "test"
class CrewType(Enum):
CREW = "crew"
FLOW = "flow"

View File

@@ -0,0 +1,89 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class ToolCall(BaseModel):
"""Model representing a tool call during execution"""
name: str
arguments: Dict[str, Any]
output: str
start_time: datetime
end_time: Optional[datetime] = None
latency_ms: Optional[int] = None
error: Optional[str] = None
class LLMRequest(BaseModel):
"""Model representing the LLM request details"""
model: str
messages: List[Dict[str, str]]
temperature: Optional[float] = None
max_tokens: Optional[int] = None
stop_sequences: Optional[List[str]] = None
additional_params: Dict[str, Any] = Field(default_factory=dict)
class LLMResponse(BaseModel):
"""Model representing the LLM response details"""
content: str
finish_reason: Optional[str] = None
class FlowStepIO(BaseModel):
"""Model representing flow step input/output details"""
function_name: str
inputs: Dict[str, Any] = Field(default_factory=dict)
outputs: Any
metadata: Dict[str, Any] = Field(default_factory=dict)
class CrewTrace(BaseModel):
"""Model for tracking detailed information about LLM interactions and Flow steps"""
deployment_instance_id: Optional[str] = Field(
description="ID of the deployment instance"
)
trace_id: str = Field(description="Unique identifier for this trace")
run_id: str = Field(description="Identifier for the execution run")
agent_role: Optional[str] = Field(description="Role of the agent")
task_id: Optional[str] = Field(description="ID of the current task being executed")
task_name: Optional[str] = Field(description="Name of the current task")
task_description: Optional[str] = Field(
description="Description of the current task"
)
trace_type: str = Field(description="Type of the trace")
crew_type: str = Field(description="Type of the crew")
run_type: str = Field(description="Type of the run")
# Timing information
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
latency_ms: Optional[int] = None
# Request/Response for LLM calls
request: Optional[LLMRequest] = None
response: Optional[LLMResponse] = None
# Input/Output for Flow steps
flow_step: Optional[FlowStepIO] = None
# Tool usage
tool_calls: List[ToolCall] = Field(default_factory=list)
# Metrics
tokens_used: Optional[int] = None
prompt_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
cost: Optional[float] = None
# Additional metadata
status: str = "running" # running, completed, error
error: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
tags: List[str] = Field(default_factory=list)

View File

@@ -0,0 +1,543 @@
import inspect
import os
from datetime import UTC, datetime
from functools import wraps
from typing import Any, Awaitable, Callable, Dict, List, Optional
from uuid import uuid4
from crewai.traces.context import TraceContext
from crewai.traces.enums import CrewType, RunType, TraceType
from crewai.traces.models import (
CrewTrace,
FlowStepIO,
LLMRequest,
LLMResponse,
ToolCall,
)
class UnifiedTraceController:
"""Controls and manages trace execution and recording.
This class handles the lifecycle of traces including creation, execution tracking,
and recording of results for various types of operations (LLM calls, tool calls, flow steps).
"""
_task_traces: Dict[str, List["UnifiedTraceController"]] = {}
def __init__(
self,
trace_type: TraceType,
run_type: RunType,
crew_type: CrewType,
run_id: str,
deployment_instance_id: str = os.environ.get(
"CREWAI_DEPLOYMENT_INSTANCE_ID", ""
),
parent_trace_id: Optional[str] = None,
agent_role: Optional[str] = "unknown",
task_name: Optional[str] = None,
task_description: Optional[str] = None,
task_id: Optional[str] = None,
flow_step: Dict[str, Any] = {},
tool_calls: List[ToolCall] = [],
**context: Any,
) -> None:
"""Initialize a new trace controller.
Args:
trace_type: Type of trace being recorded.
run_type: Type of run being executed.
crew_type: Type of crew executing the trace.
run_id: Unique identifier for the run.
deployment_instance_id: Optional deployment instance identifier.
parent_trace_id: Optional parent trace identifier for nested traces.
agent_role: Role of the agent executing the trace.
task_name: Optional name of the task being executed.
task_description: Optional description of the task.
task_id: Optional unique identifier for the task.
flow_step: Optional flow step information.
tool_calls: Optional list of tool calls made during execution.
**context: Additional context parameters.
"""
self.trace_id = str(uuid4())
self.run_id = run_id
self.parent_trace_id = parent_trace_id
self.trace_type = trace_type
self.run_type = run_type
self.crew_type = crew_type
self.context = context
self.agent_role = agent_role
self.task_name = task_name
self.task_description = task_description
self.task_id = task_id
self.deployment_instance_id = deployment_instance_id
self.children: List[Dict[str, Any]] = []
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
self.error: Optional[str] = None
self.tool_calls = tool_calls
self.flow_step = flow_step
self.status: str = "running"
# Add trace to task's trace collection if task_id is present
if task_id:
self._add_to_task_traces()
def _add_to_task_traces(self) -> None:
"""Add this trace to the task's trace collection."""
if not hasattr(UnifiedTraceController, "_task_traces"):
UnifiedTraceController._task_traces = {}
if self.task_id is None:
return
if self.task_id not in UnifiedTraceController._task_traces:
UnifiedTraceController._task_traces[self.task_id] = []
UnifiedTraceController._task_traces[self.task_id].append(self)
@classmethod
def get_task_traces(cls, task_id: str) -> List["UnifiedTraceController"]:
"""Get all traces for a specific task.
Args:
task_id: The ID of the task to get traces for
Returns:
List of traces associated with the task
"""
return cls._task_traces.get(task_id, [])
@classmethod
def clear_task_traces(cls, task_id: str) -> None:
"""Clear traces for a specific task.
Args:
task_id: The ID of the task to clear traces for
"""
if hasattr(cls, "_task_traces") and task_id in cls._task_traces:
del cls._task_traces[task_id]
def _get_current_trace(self) -> "UnifiedTraceController":
return TraceContext.get_current()
def start_trace(self) -> "UnifiedTraceController":
"""Start the trace execution.
Returns:
UnifiedTraceController: Self for method chaining.
"""
self.start_time = datetime.now(UTC)
return self
def end_trace(self, result: Any = None, error: Optional[str] = None) -> None:
"""End the trace execution and record results.
Args:
result: Optional result from the trace execution.
error: Optional error message if the trace failed.
"""
self.end_time = datetime.now(UTC)
self.status = "error" if error else "completed"
self.error = error
self._record_trace(result)
def add_child_trace(self, child_trace: Dict[str, Any]) -> None:
"""Add a child trace to this trace's execution history.
Args:
child_trace: The child trace information to add.
"""
self.children.append(child_trace)
def to_crew_trace(self) -> CrewTrace:
"""Convert to CrewTrace format for storage.
Returns:
CrewTrace: The trace data in CrewTrace format.
"""
latency_ms = None
if self.tool_calls and hasattr(self.tool_calls[0], "start_time"):
self.start_time = self.tool_calls[0].start_time
if self.start_time and self.end_time:
latency_ms = int((self.end_time - self.start_time).total_seconds() * 1000)
request = None
response = None
flow_step_obj = None
if self.trace_type in [TraceType.LLM_CALL, TraceType.TOOL_CALL]:
request = LLMRequest(
model=self.context.get("model", "unknown"),
messages=self.context.get("messages", []),
temperature=self.context.get("temperature"),
max_tokens=self.context.get("max_tokens"),
stop_sequences=self.context.get("stop_sequences"),
)
if "response" in self.context:
response = LLMResponse(
content=self.context["response"].get("content", ""),
finish_reason=self.context["response"].get("finish_reason"),
)
elif self.trace_type == TraceType.FLOW_STEP:
flow_step_obj = FlowStepIO(
function_name=self.flow_step.get("function_name", "unknown"),
inputs=self.flow_step.get("inputs", {}),
outputs={"result": self.context.get("response")},
metadata=self.flow_step.get("metadata", {}),
)
return CrewTrace(
deployment_instance_id=self.deployment_instance_id,
trace_id=self.trace_id,
task_id=self.task_id,
run_id=self.run_id,
agent_role=self.agent_role,
task_name=self.task_name,
task_description=self.task_description,
trace_type=self.trace_type.value,
crew_type=self.crew_type.value,
run_type=self.run_type.value,
start_time=self.start_time,
end_time=self.end_time,
latency_ms=latency_ms,
request=request,
response=response,
flow_step=flow_step_obj,
tool_calls=self.tool_calls,
tokens_used=self.context.get("tokens_used"),
prompt_tokens=self.context.get("prompt_tokens"),
completion_tokens=self.context.get("completion_tokens"),
status=self.status,
error=self.error,
)
def _record_trace(self, result: Any = None) -> None:
"""Record the trace.
This method is called when a trace is completed. It ensures the trace
is properly recorded and associated with its task if applicable.
Args:
result: Optional result to include in the trace
"""
if result:
self.context["response"] = result
# Add to task traces if this trace belongs to a task
if self.task_id:
self._add_to_task_traces()
def should_trace() -> bool:
"""Check if tracing is enabled via environment variable."""
return os.getenv("CREWAI_ENABLE_TRACING", "false").lower() == "true"
# Crew main trace
def init_crew_main_trace(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator to initialize and track the main crew execution trace.
This decorator sets up the trace context for the main crew execution,
handling both synchronous and asynchronous crew operations.
Args:
func: The crew function to be traced.
Returns:
Wrapped function that creates and manages the main crew trace context.
"""
@wraps(func)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
if not should_trace():
return func(self, *args, **kwargs)
trace = build_crew_main_trace(self)
with TraceContext.set_current(trace):
try:
return func(self, *args, **kwargs)
except Exception as e:
trace.end_trace(error=str(e))
raise
return wrapper
def build_crew_main_trace(self: Any) -> "UnifiedTraceController":
"""Build the main trace controller for a crew execution.
This function creates a trace controller configured for the main crew execution,
handling different run types (kickoff, test, train) and maintaining context.
Args:
self: The crew instance.
Returns:
UnifiedTraceController: The configured trace controller for the crew.
"""
run_type = RunType.KICKOFF
if hasattr(self, "_test") and self._test:
run_type = RunType.TEST
elif hasattr(self, "_train") and self._train:
run_type = RunType.TRAIN
current_trace = TraceContext.get_current()
trace = UnifiedTraceController(
trace_type=TraceType.LLM_CALL,
run_type=run_type,
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
run_id=current_trace.run_id if current_trace else str(self.id),
parent_trace_id=current_trace.trace_id if current_trace else None,
)
return trace
# Flow main trace
def init_flow_main_trace(
func: Callable[..., Awaitable[Any]],
) -> Callable[..., Awaitable[Any]]:
"""Decorator to initialize and track the main flow execution trace.
Args:
func: The async flow function to be traced.
Returns:
Wrapped async function that creates and manages the main flow trace context.
"""
@wraps(func)
async def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
if not should_trace():
return await func(self, *args, **kwargs)
trace = build_flow_main_trace(self, *args, **kwargs)
with TraceContext.set_current(trace):
try:
return await func(self, *args, **kwargs)
except Exception:
raise
return wrapper
def build_flow_main_trace(
self: Any, *args: Any, **kwargs: Any
) -> "UnifiedTraceController":
"""Build the main trace controller for a flow execution.
Args:
self: The flow instance.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
Returns:
UnifiedTraceController: The configured trace controller for the flow.
"""
current_trace = TraceContext.get_current()
trace = UnifiedTraceController(
trace_type=TraceType.FLOW_STEP,
run_id=current_trace.run_id if current_trace else str(self.flow_id),
parent_trace_id=current_trace.trace_id if current_trace else None,
crew_type=CrewType.FLOW,
run_type=RunType.KICKOFF,
context={
"crew_name": self.__class__.__name__,
"inputs": kwargs.get("inputs", {}),
"agents": [],
"tasks": [],
},
)
return trace
# Flow step trace
def trace_flow_step(
func: Callable[..., Awaitable[Any]],
) -> Callable[..., Awaitable[Any]]:
"""Decorator to trace individual flow step executions.
Args:
func: The async flow step function to be traced.
Returns:
Wrapped async function that creates and manages the flow step trace context.
"""
@wraps(func)
async def wrapper(
self: Any,
method_name: str,
method: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Any:
if not should_trace():
return await func(self, method_name, method, *args, **kwargs)
trace = build_flow_step_trace(self, method_name, method, *args, **kwargs)
with TraceContext.set_current(trace):
trace.start_trace()
try:
result = await func(self, method_name, method, *args, **kwargs)
trace.end_trace(result=result)
return result
except Exception as e:
trace.end_trace(error=str(e))
raise
return wrapper
def build_flow_step_trace(
self: Any, method_name: str, method: Callable[..., Any], *args: Any, **kwargs: Any
) -> "UnifiedTraceController":
"""Build a trace controller for an individual flow step.
Args:
self: The flow instance.
method_name: Name of the method being executed.
method: The actual method being executed.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
Returns:
UnifiedTraceController: The configured trace controller for the flow step.
"""
current_trace = TraceContext.get_current()
# Get method signature
sig = inspect.signature(method)
params = list(sig.parameters.values())
# Create inputs dictionary mapping parameter names to values
method_params = [p for p in params if p.name != "self"]
inputs: Dict[str, Any] = {}
# Map positional args to their parameter names
for i, param in enumerate(method_params):
if i < len(args):
inputs[param.name] = args[i]
# Add keyword arguments
inputs.update(kwargs)
trace = UnifiedTraceController(
trace_type=TraceType.FLOW_STEP,
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
crew_type=current_trace.crew_type if current_trace else CrewType.FLOW,
run_id=current_trace.run_id if current_trace else str(self.flow_id),
parent_trace_id=current_trace.trace_id if current_trace else None,
flow_step={
"function_name": method_name,
"inputs": inputs,
"metadata": {
"crew_name": self.__class__.__name__,
},
},
)
return trace
# LLM trace
def trace_llm_call(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator to trace LLM calls.
Args:
func: The function to trace.
Returns:
Wrapped function that creates and manages the LLM call trace context.
"""
@wraps(func)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
if not should_trace():
return func(self, *args, **kwargs)
trace = build_llm_trace(self, *args, **kwargs)
with TraceContext.set_current(trace):
trace.start_trace()
try:
response = func(self, *args, **kwargs)
# Extract relevant data from response
trace_response = {
"content": response["choices"][0]["message"]["content"],
"finish_reason": response["choices"][0].get("finish_reason"),
}
# Add usage metrics to context
if "usage" in response:
trace.context["tokens_used"] = response["usage"].get(
"total_tokens", 0
)
trace.context["prompt_tokens"] = response["usage"].get(
"prompt_tokens", 0
)
trace.context["completion_tokens"] = response["usage"].get(
"completion_tokens", 0
)
trace.end_trace(trace_response)
return response
except Exception as e:
trace.end_trace(error=str(e))
raise
return wrapper
def build_llm_trace(
self: Any, params: Dict[str, Any], *args: Any, **kwargs: Any
) -> Any:
"""Build a trace controller for an LLM call.
Args:
self: The LLM instance.
params: The parameters for the LLM call.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
Returns:
UnifiedTraceController: The configured trace controller for the LLM call.
"""
current_trace = TraceContext.get_current()
agent, task = self._get_execution_context()
# Get new messages and tool results
new_messages = self._get_new_messages(params.get("messages", []))
new_tool_results = self._get_new_tool_results(agent)
# Create trace context
trace = UnifiedTraceController(
trace_type=TraceType.TOOL_CALL if new_tool_results else TraceType.LLM_CALL,
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
run_id=current_trace.run_id if current_trace else str(uuid4()),
parent_trace_id=current_trace.trace_id if current_trace else None,
agent_role=agent.role if agent else "unknown",
task_id=str(task.id) if task else None,
task_name=task.name if task else None,
task_description=task.description if task else None,
model=self.model,
messages=new_messages,
temperature=self.temperature,
max_tokens=self.max_tokens,
stop_sequences=self.stop,
tool_calls=[
ToolCall(
name=result["tool_name"],
arguments=result["tool_args"],
output=str(result["result"]),
start_time=result.get("start_time", ""),
end_time=datetime.now(UTC),
)
for result in new_tool_results
],
)
return trace

View File

@@ -4,3 +4,4 @@ DEFAULT_SCORE_THRESHOLD = 0.35
KNOWLEDGE_DIRECTORY = "knowledge"
MAX_LLM_RETRY = 3
MAX_FILE_NAME_LENGTH = 255
EMITTER_COLOR = "bold_blue"

View File

@@ -3,19 +3,9 @@ from typing import List
from pydantic import BaseModel, Field
from crewai.utilities import Converter
from crewai.utilities.events import TaskEvaluationEvent, crewai_event_bus
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
agentops = None
try:
from agentops import track_agent # type: ignore
except ImportError:
def track_agent(name):
def noop(f):
return f
return noop
class Entity(BaseModel):
name: str = Field(description="The name of the entity.")
@@ -48,12 +38,15 @@ class TrainingTaskEvaluation(BaseModel):
)
@track_agent(name="Task Evaluator")
class TaskEvaluator:
def __init__(self, original_agent):
self.llm = original_agent.llm
self.original_agent = original_agent
def evaluate(self, task, output) -> TaskEvaluation:
crewai_event_bus.emit(
self, TaskEvaluationEvent(evaluation_type="task_evaluation")
)
evaluation_query = (
f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n"
f"Task Description:\n{task.description}\n\n"
@@ -90,6 +83,9 @@ class TaskEvaluator:
- training_data (dict): The training data to be evaluated.
- agent_id (str): The ID of the agent.
"""
crewai_event_bus.emit(
self, TaskEvaluationEvent(evaluation_type="training_data_evaluation")
)
output_training_data = training_data[agent_id]
final_aggregated_data = ""

View File

@@ -1,44 +0,0 @@
from functools import wraps
from typing import Any, Callable, Dict, Generic, List, Type, TypeVar
from pydantic import BaseModel
T = TypeVar("T")
EVT = TypeVar("EVT", bound=BaseModel)
class Emitter(Generic[T, EVT]):
_listeners: Dict[Type[EVT], List[Callable]] = {}
def on(self, event_type: Type[EVT]):
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
self._listeners.setdefault(event_type, []).append(wrapper)
return wrapper
return decorator
def emit(self, source: T, event: EVT) -> None:
event_type = type(event)
for func in self._listeners.get(event_type, []):
func(source, event)
default_emitter = Emitter[Any, BaseModel]()
def emit(source: Any, event: BaseModel, raise_on_error: bool = False) -> None:
try:
default_emitter.emit(source, event)
except Exception as e:
if raise_on_error:
raise e
else:
print(f"Error emitting event: {e}")
def on(event_type: Type[BaseModel]) -> Callable:
return default_emitter.on(event_type)

View File

@@ -0,0 +1,40 @@
from .crew_events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewTrainStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTestStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
)
from .agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
)
from .task_events import TaskStartedEvent, TaskCompletedEvent, TaskFailedEvent, TaskEvaluationEvent
from .flow_events import (
FlowCreatedEvent,
FlowStartedEvent,
FlowFinishedEvent,
FlowPlotEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
)
from .crewai_event_bus import CrewAIEventsBus, crewai_event_bus
from .tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
ToolExecutionErrorEvent,
ToolSelectionErrorEvent,
ToolUsageEvent,
ToolValidateInputErrorEvent,
)
# events
from .event_listener import EventListener
from .third_party.agentops_listener import agentops_listener

View File

@@ -0,0 +1,40 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from .base_events import CrewEvent
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
class AgentExecutionStartedEvent(CrewEvent):
"""Event emitted when an agent starts executing a task"""
agent: BaseAgent
task: Any
tools: Optional[Sequence[Union[BaseTool, CrewStructuredTool]]]
task_prompt: str
type: str = "agent_execution_started"
model_config = {"arbitrary_types_allowed": True}
class AgentExecutionCompletedEvent(CrewEvent):
"""Event emitted when an agent completes executing a task"""
agent: BaseAgent
task: Any
output: str
type: str = "agent_execution_completed"
class AgentExecutionErrorEvent(CrewEvent):
"""Event emitted when an agent encounters an error during execution"""
agent: BaseAgent
task: Any
error: str
type: str = "agent_execution_error"

View File

@@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
from logging import Logger
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus, crewai_event_bus
class BaseEventListener(ABC):
def __init__(self):
super().__init__()
self.setup_listeners(crewai_event_bus)
@abstractmethod
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus):
pass

View File

@@ -0,0 +1,10 @@
from datetime import datetime
from pydantic import BaseModel, Field
class CrewEvent(BaseModel):
"""Base class for all crew events"""
timestamp: datetime = Field(default_factory=datetime.now)
type: str

View File

@@ -0,0 +1,81 @@
from typing import Any, Dict, Optional, Union
from pydantic import InstanceOf
from crewai.utilities.events.base_events import CrewEvent
class CrewKickoffStartedEvent(CrewEvent):
"""Event emitted when a crew starts execution"""
crew_name: Optional[str]
inputs: Optional[Dict[str, Any]]
type: str = "crew_kickoff_started"
class CrewKickoffCompletedEvent(CrewEvent):
"""Event emitted when a crew completes execution"""
crew_name: Optional[str]
output: Any
type: str = "crew_kickoff_completed"
class CrewKickoffFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete execution"""
error: str
crew_name: Optional[str]
type: str = "crew_kickoff_failed"
class CrewTrainStartedEvent(CrewEvent):
"""Event emitted when a crew starts training"""
crew_name: Optional[str]
n_iterations: int
filename: str
inputs: Optional[Dict[str, Any]]
type: str = "crew_train_started"
class CrewTrainCompletedEvent(CrewEvent):
"""Event emitted when a crew completes training"""
crew_name: Optional[str]
n_iterations: int
filename: str
type: str = "crew_train_completed"
class CrewTrainFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete training"""
error: str
crew_name: Optional[str]
type: str = "crew_train_failed"
class CrewTestStartedEvent(CrewEvent):
"""Event emitted when a crew starts testing"""
crew_name: Optional[str]
n_iterations: int
eval_llm: Optional[Union[str, Any]]
inputs: Optional[Dict[str, Any]]
type: str = "crew_test_started"
class CrewTestCompletedEvent(CrewEvent):
"""Event emitted when a crew completes testing"""
crew_name: Optional[str]
type: str = "crew_test_completed"
class CrewTestFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete testing"""
error: str
crew_name: Optional[str]
type: str = "crew_test_failed"

View File

@@ -0,0 +1,113 @@
import threading
from contextlib import contextmanager
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from blinker import Signal
from crewai.utilities.events.base_events import CrewEvent
from crewai.utilities.events.event_types import EventTypes
EventT = TypeVar("EventT", bound=CrewEvent)
class CrewAIEventsBus:
"""
A singleton event bus that uses blinker signals for event handling.
Allows both internal (Flow/Crew) and external event handling.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None: # prevent race condition
cls._instance = super(CrewAIEventsBus, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self) -> None:
"""Initialize the event bus internal state"""
self._signal = Signal("crewai_event_bus")
self._handlers: Dict[Type[CrewEvent], List[Callable]] = {}
def on(
self, event_type: Type[EventT]
) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]:
"""
Decorator to register an event handler for a specific event type.
Usage:
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(
source: Any, event: AgentExecutionCompletedEvent
):
print(f"👍 Agent '{event.agent}' completed task")
print(f" Output: {event.output}")
"""
def decorator(
handler: Callable[[Any, EventT], None],
) -> Callable[[Any, EventT], None]:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, EventT], None], handler)
)
return handler
return decorator
def emit(self, source: Any, event: CrewEvent) -> None:
"""
Emit an event to all registered handlers
Args:
source: The object emitting the event
event: The event instance to emit
"""
event_type = type(event)
if event_type in self._handlers:
for handler in self._handlers[event_type]:
handler(source, event)
self._signal.send(source, event=event)
def clear_handlers(self) -> None:
"""Clear all registered event handlers - useful for testing"""
self._handlers.clear()
def register_handler(
self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None]
) -> None:
"""Register an event handler for a specific event type"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, EventTypes], None], handler)
)
@contextmanager
def scoped_handlers(self):
"""
Context manager for temporary event handling scope.
Useful for testing or temporary event handling.
Usage:
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStarted)
def temp_handler(source, event):
print("Temporary handler")
# Do stuff...
# Handlers are cleared after the context
"""
previous_handlers = self._handlers.copy()
self._handlers.clear()
try:
yield
finally:
self._handlers = previous_handlers
# Global instance
crewai_event_bus = CrewAIEventsBus()

View File

@@ -0,0 +1,257 @@
from pydantic import PrivateAttr
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
from crewai.utilities.events.base_event_listener import BaseEventListener
from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
from .crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from .flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from .task_events import TaskCompletedEvent, TaskFailedEvent, TaskStartedEvent
from .tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
class EventListener(BaseEventListener):
_instance = None
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not hasattr(self, "_initialized") or not self._initialized:
super().__init__()
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self._initialized = True
# ----------- CREW EVENTS -----------
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent):
self.logger.log(
f"🚀 Crew '{event.crew_name}' started",
event.timestamp,
)
self._telemetry.crew_execution_span(source, event.inputs)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompletedEvent):
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
self.logger.log(
f"✅ Crew '{event.crew_name}' completed",
event.timestamp,
)
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed",
event.timestamp,
)
@crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy()
cloned_crew._telemetry.test_execution_span(
cloned_crew,
event.n_iterations,
event.inputs,
event.eval_llm,
)
self.logger.log(
f"🚀 Crew '{event.crew_name}' started test",
event.timestamp,
)
@crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent):
self.logger.log(
f"✅ Crew '{event.crew_name}' completed test",
event.timestamp,
)
@crewai_event_bus.on(CrewTestFailedEvent)
def on_crew_test_failed(source, event: CrewTestFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed test",
event.timestamp,
)
@crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent):
self.logger.log(
f"📋 Crew '{event.crew_name}' started train",
event.timestamp,
)
@crewai_event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
self.logger.log(
f"✅ Crew '{event.crew_name}' completed train",
event.timestamp,
)
@crewai_event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed train",
event.timestamp,
)
# ----------- TASK EVENTS -----------
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
source._execution_span = self._telemetry.task_started(
crew=source.agent.crew, task=source
)
self.logger.log(
f"📋 Task started: {source.description}",
event.timestamp,
)
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
if source._execution_span:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
self.logger.log(
f"✅ Task completed: {source.description}",
event.timestamp,
)
source._execution_span = None
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
if source._execution_span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
source._execution_span = None
self.logger.log(
f"❌ Task failed: {source.description}",
event.timestamp,
)
# ----------- AGENT EVENTS -----------
@crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
self.logger.log(
f"🤖 Agent '{event.agent.role}' started task",
event.timestamp,
)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
self.logger.log(
f"✅ Agent '{event.agent.role}' completed task",
event.timestamp,
)
# ----------- FLOW EVENTS -----------
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(self.__class__.__name__)
self.logger.log(
f"🌊 Flow Created: '{event.flow_name}'",
event.timestamp,
)
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
source.__class__.__name__, list(source._methods.keys())
)
self.logger.log(
f"🤖 Flow Started: '{event.flow_name}'",
event.timestamp,
)
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
self.logger.log(
f"👍 Flow Finished: '{event.flow_name}'",
event.timestamp,
)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent):
self.logger.log(
f"🤖 Flow Method Started: '{event.method_name}'",
event.timestamp,
)
@crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
self.logger.log(
f"❌ Flow Method Failed: '{event.method_name}'",
event.timestamp,
)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
self.logger.log(
f"👍 Flow Method Finished: '{event.method_name}'",
event.timestamp,
)
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.logger.log(
f"🤖 Tool Usage Started: '{event.tool_name}'",
event.timestamp,
)
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
self.logger.log(
f"✅ Tool Usage Finished: '{event.tool_name}'",
event.timestamp,
#
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.logger.log(
f"❌ Tool Usage Error: '{event.tool_name}'",
event.timestamp,
#
)
event_listener = EventListener()

View File

@@ -0,0 +1,61 @@
from typing import Union
from .agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from .crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from .flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from .task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from .tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
EventTypes = Union[
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewTestStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTrainStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
TaskStartedEvent,
TaskCompletedEvent,
TaskFailedEvent,
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
AgentExecutionErrorEvent,
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
]

View File

@@ -0,0 +1,71 @@
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel
from .base_events import CrewEvent
class FlowEvent(CrewEvent):
"""Base class for all flow events"""
type: str
flow_name: str
class FlowStartedEvent(FlowEvent):
"""Event emitted when a flow starts execution"""
flow_name: str
inputs: Optional[Dict[str, Any]] = None
type: str = "flow_started"
class FlowCreatedEvent(FlowEvent):
"""Event emitted when a flow is created"""
flow_name: str
type: str = "flow_created"
class MethodExecutionStartedEvent(FlowEvent):
"""Event emitted when a flow method starts execution"""
flow_name: str
method_name: str
state: Union[Dict[str, Any], BaseModel]
params: Optional[Dict[str, Any]] = None
type: str = "method_execution_started"
class MethodExecutionFinishedEvent(FlowEvent):
"""Event emitted when a flow method completes execution"""
flow_name: str
method_name: str
result: Any = None
state: Union[Dict[str, Any], BaseModel]
type: str = "method_execution_finished"
class MethodExecutionFailedEvent(FlowEvent):
"""Event emitted when a flow method fails execution"""
flow_name: str
method_name: str
error: Any
type: str = "method_execution_failed"
class FlowFinishedEvent(FlowEvent):
"""Event emitted when a flow completes execution"""
flow_name: str
result: Optional[Any] = None
type: str = "flow_finished"
class FlowPlotEvent(FlowEvent):
"""Event emitted when a flow plot is created"""
flow_name: str
type: str = "flow_plot"

View File

@@ -0,0 +1,32 @@
from typing import Any, Optional
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.base_events import CrewEvent
class TaskStartedEvent(CrewEvent):
"""Event emitted when a task starts"""
type: str = "task_started"
context: Optional[str]
class TaskCompletedEvent(CrewEvent):
"""Event emitted when a task completes"""
output: TaskOutput
type: str = "task_completed"
class TaskFailedEvent(CrewEvent):
"""Event emitted when a task fails"""
error: str
type: str = "task_failed"
class TaskEvaluationEvent(CrewEvent):
"""Event emitted when a task evaluation is completed"""
type: str = "task_evaluation"
evaluation_type: str

View File

@@ -0,0 +1 @@
from .agentops_listener import agentops_listener

View File

@@ -0,0 +1,67 @@
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
@crewai_event_bus.on(TaskEvaluationEvent)
def on_task_evaluation(source, event: TaskEvaluationEvent):
if self.session:
self.session.create_agent(
name="Task Evaluator", agent_id=str(source.original_agent.id)
)
agentops_listener = AgentOpsListener()

View File

@@ -0,0 +1,64 @@
from datetime import datetime
from typing import Any, Callable, Dict
from .base_events import CrewEvent
class ToolUsageEvent(CrewEvent):
"""Base event for tool usage tracking"""
agent_key: str
agent_role: str
tool_name: str
tool_args: Dict[str, Any] | str
tool_class: str
run_attempts: int | None = None
delegations: int | None = None
model_config = {"arbitrary_types_allowed": True}
class ToolUsageStartedEvent(ToolUsageEvent):
"""Event emitted when a tool execution is started"""
type: str = "tool_usage_started"
class ToolUsageFinishedEvent(ToolUsageEvent):
"""Event emitted when a tool execution is completed"""
started_at: datetime
finished_at: datetime
from_cache: bool = False
type: str = "tool_usage_finished"
class ToolUsageErrorEvent(ToolUsageEvent):
"""Event emitted when a tool execution encounters an error"""
error: Any
type: str = "tool_usage_error"
class ToolValidateInputErrorEvent(ToolUsageEvent):
"""Event emitted when a tool input validation encounters an error"""
error: Any
type: str = "tool_validate_input_error"
class ToolSelectionErrorEvent(ToolUsageEvent):
"""Event emitted when a tool selection encounters an error"""
error: Any
type: str = "tool_selection_error"
class ToolExecutionErrorEvent(CrewEvent):
"""Event emitted when a tool execution encounters an error"""
error: Any
type: str = "tool_execution_error"
tool_name: str
tool_args: Dict[str, Any]
tool_class: Callable

View File

@@ -8,8 +8,11 @@ from crewai.utilities.printer import Printer
class Logger(BaseModel):
verbose: bool = Field(default=False)
_printer: Printer = PrivateAttr(default_factory=Printer)
default_color: str = Field(default="bold_yellow")
def log(self, level, message, color="bold_yellow"):
def log(self, level, message, color=None):
if color is None:
color = self.default_color
if self.verbose:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self._printer.print(

View File

@@ -0,0 +1,12 @@
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class AgentExecutorProtocol(Protocol):
"""Protocol defining the expected interface for an agent executor."""
@property
def agent(self) -> Any: ...
@property
def task(self) -> Any: ...