Merge branch 'main' into devin/1740154466-add-o3-mini-context-window

This commit is contained in:
Brandon Hancock (bhancock_ai)
2025-02-25 15:30:01 -05:00
committed by GitHub
28 changed files with 2345 additions and 1624 deletions

View File

@@ -257,11 +257,11 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
import os
for root, _, files in os.walk("."):
if "crew.py" in files:
crew_path = os.path.join(root, "crew.py")
if crew_path in files:
crew_os_path = os.path.join(root, crew_path)
try:
spec = importlib.util.spec_from_file_location(
"crew_module", crew_path
"crew_module", crew_os_path
)
if not spec or not spec.loader:
continue
@@ -273,9 +273,11 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
for attr_name in dir(module):
attr = getattr(module, attr_name)
try:
if callable(attr) and hasattr(attr, "crew"):
crew_instance = attr().crew()
return crew_instance
if isinstance(attr, Crew) and hasattr(attr, "kickoff"):
print(
f"Found valid crew object in attribute '{attr_name}' at {crew_os_path}."
)
return attr
except Exception as e:
print(f"Error processing attribute {attr_name}: {e}")

View File

@@ -35,10 +35,8 @@ from crewai.process import Process
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.traces.unified_trace_controller import init_crew_main_trace
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -258,8 +256,6 @@ class Crew(BaseModel):
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
return self
@model_validator(mode="after")
@@ -574,7 +570,6 @@ class Crew(BaseModel):
CrewTrainingHandler(filename).clear()
raise
@init_crew_main_trace
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = None,
@@ -1115,7 +1110,6 @@ class Crew(BaseModel):
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_telemetry",
"agents",
"tasks",
"knowledge_sources",
@@ -1278,11 +1272,11 @@ class Crew(BaseModel):
def _reset_all_memories(self) -> None:
"""Reset all available memory systems."""
memory_systems = [
("short term", self._short_term_memory),
("entity", self._entity_memory),
("long term", self._long_term_memory),
("task output", self._task_output_handler),
("knowledge", self.knowledge),
("short term", getattr(self, "_short_term_memory", None)),
("entity", getattr(self, "_entity_memory", None)),
("long term", getattr(self, "_long_term_memory", None)),
("task output", getattr(self, "_task_output_handler", None)),
("knowledge", getattr(self, "knowledge", None)),
]
for name, system in memory_systems:

View File

@@ -22,10 +22,6 @@ from pydantic import BaseModel, Field, ValidationError
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.utils import get_possible_return_constants
from crewai.traces.unified_trace_controller import (
init_flow_main_trace,
trace_flow_step,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.flow_events import (
FlowCreatedEvent,
@@ -713,16 +709,34 @@ class Flow(Generic[T], metaclass=FlowMeta):
raise TypeError(f"State must be dict or BaseModel, got {type(self._state)}")
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""Start the flow execution.
"""
Start the flow execution in a synchronous context.
This method wraps kickoff_async so that all state initialization and event
emission is handled in the asynchronous method.
"""
async def run_flow():
return await self.kickoff_async(inputs)
return asyncio.run(run_flow())
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Start the flow execution asynchronously.
This method performs state restoration (if an 'id' is provided and persistence is available)
and updates the flow state with any additional inputs. It then emits the FlowStartedEvent,
logs the flow startup, and executes all start methods. Once completed, it emits the
FlowFinishedEvent and returns the final output.
Args:
inputs: Optional dictionary containing input values and potentially a state ID to restore
"""
# Handle state restoration if ID is provided in inputs
if inputs and "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
inputs: Optional dictionary containing input values and/or a state ID for restoration.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
if inputs:
# Override the id in the state if it exists in inputs
if "id" in inputs:
if isinstance(self._state, dict):
@@ -730,24 +744,27 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif isinstance(self._state, BaseModel):
setattr(self._state, "id", inputs["id"])
if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
# Restore the state
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
# If persistence is enabled, attempt to restore the stored state using the provided id.
if "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
# Apply any additional inputs after restoration
# Update state with any additional inputs (ignoring the 'id' key)
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
if filtered_inputs:
self._initialize_state(filtered_inputs)
# Start flow execution
# Emit FlowStartedEvent and log the start of the flow.
crewai_event_bus.emit(
self,
FlowStartedEvent(
@@ -763,16 +780,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
async def run_flow():
return await self.kickoff_async()
return asyncio.run(run_flow())
@init_flow_main_trace
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
@@ -789,6 +796,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
result=final_output,
),
)
return final_output
async def _execute_start_method(self, start_method_name: str) -> None:
@@ -814,7 +822,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
await self._execute_listeners(start_method_name, result)
@trace_flow_step
async def _execute_method(
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any:

View File

@@ -16,7 +16,8 @@ Example
import ast
import inspect
import textwrap
from typing import Any, Dict, List, Optional, Set, Union
from collections import defaultdict, deque
from typing import Any, Deque, Dict, List, Optional, Set, Union
def get_possible_return_constants(function: Any) -> Optional[List[str]]:
@@ -118,7 +119,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
- Processes router paths separately
"""
levels: Dict[str, int] = {}
queue: List[str] = []
queue: Deque[str] = deque()
visited: Set[str] = set()
pending_and_listeners: Dict[str, Set[str]] = {}
@@ -128,28 +129,35 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
levels[method_name] = 0
queue.append(method_name)
# Precompute listener dependencies
or_listeners = defaultdict(list)
and_listeners = defaultdict(set)
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
if condition_type == "OR":
for method in trigger_methods:
or_listeners[method].append(listener_name)
elif condition_type == "AND":
and_listeners[listener_name] = set(trigger_methods)
# Breadth-first traversal to assign levels
while queue:
current = queue.pop(0)
current = queue.popleft()
current_level = levels[current]
visited.add(current)
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
if condition_type == "OR":
if current in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
elif condition_type == "AND":
for listener_name in or_listeners[current]:
if listener_name not in levels or levels[listener_name] > current_level + 1:
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
for listener_name, required_methods in and_listeners.items():
if current in required_methods:
if listener_name not in pending_and_listeners:
pending_and_listeners[listener_name] = set()
if current in trigger_methods:
pending_and_listeners[listener_name].add(current)
if set(trigger_methods) == pending_and_listeners[listener_name]:
pending_and_listeners[listener_name].add(current)
if required_methods == pending_and_listeners[listener_name]:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
@@ -159,22 +167,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
queue.append(listener_name)
# Handle router connections
if current in flow._routers:
router_method_name = current
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
process_router_paths(flow, current, current_level, levels, queue)
return levels
@@ -227,10 +220,7 @@ def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
def dfs_ancestors(
node: str,
ancestors: Dict[str, Set[str]],
visited: Set[str],
flow: Any
node: str, ancestors: Dict[str, Set[str]], visited: Set[str], flow: Any
) -> None:
"""
Perform depth-first search to build ancestor relationships.
@@ -274,7 +264,9 @@ def dfs_ancestors(
dfs_ancestors(listener_name, ancestors, visited, flow)
def is_ancestor(node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]) -> bool:
def is_ancestor(
node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]
) -> bool:
"""
Check if one node is an ancestor of another.
@@ -339,7 +331,9 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
return parent_children
def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str]]) -> int:
def get_child_index(
parent: str, child: str, parent_children: Dict[str, List[str]]
) -> int:
"""
Get the index of a child node in its parent's sorted children list.
@@ -360,3 +354,23 @@ def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str
children = parent_children.get(parent, [])
children.sort()
return children.index(child)
def process_router_paths(flow, current, current_level, levels, queue):
"""
Handle the router connections for the current node.
"""
if current in flow._routers:
paths = flow._router_paths.get(current, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
queue.append(listener_name)

View File

@@ -1,4 +1,3 @@
import inspect
import json
import logging
import os
@@ -6,21 +5,17 @@ import sys
import threading
import warnings
from contextlib import contextmanager
from typing import (
Any,
Dict,
List,
Literal,
Optional,
Tuple,
Type,
Union,
cast,
)
from typing import Any, Dict, List, Literal, Optional, Type, Union, cast
from dotenv import load_dotenv
from pydantic import BaseModel
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
)
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
with warnings.catch_warnings():
@@ -31,12 +26,10 @@ with warnings.catch_warnings():
from litellm.utils import get_supported_openai_params, supports_response_schema
from crewai.traces.unified_trace_controller import trace_llm_call
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.protocols import AgentExecutorProtocol
load_dotenv()
@@ -181,7 +174,6 @@ class LLM:
self.context_window_size = 0
self.reasoning_effort = reasoning_effort
self.additional_params = kwargs
self._message_history: List[Dict[str, str]] = []
self.is_anthropic = self._is_anthropic_model(model)
litellm.drop_params = True
@@ -197,12 +189,6 @@ class LLM:
self.set_callbacks(callbacks)
self.set_env_callbacks()
@trace_llm_call
def _call_llm(self, params: Dict[str, Any]) -> Any:
with suppress_warnings():
response = litellm.completion(**params)
return response
def _is_anthropic_model(self, model: str) -> bool:
"""Determine if the model is from Anthropic provider.
@@ -260,6 +246,15 @@ class LLM:
>>> print(response)
"The capital of France is Paris."
"""
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
),
)
# Validate parameters before proceeding with the call.
self._validate_call_params()
@@ -312,7 +307,7 @@ class LLM:
params = {k: v for k, v in params.items() if v is not None}
# --- 2) Make the completion call
response = self._call_llm(params)
response = litellm.completion(**params)
response_message = cast(Choices, cast(ModelResponse, response).choices)[
0
].message
@@ -334,12 +329,13 @@ class LLM:
# --- 4) If no tool calls, return the text response
if not tool_calls or not available_functions:
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL)
return text_response
# --- 5) Handle the tool call
tool_call = tool_calls[0]
function_name = tool_call.function.name
print("function_name", function_name)
if function_name in available_functions:
try:
function_args = json.loads(tool_call.function.arguments)
@@ -351,6 +347,7 @@ class LLM:
try:
# Call the actual tool function
result = fn(**function_args)
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
return result
except Exception as e:
@@ -366,6 +363,12 @@ class LLM:
error=str(e),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=f"Tool execution error: {str(e)}"
),
)
return text_response
else:
@@ -375,12 +378,28 @@ class LLM:
return text_response
except Exception as e:
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e)),
)
if not LLMContextLengthExceededException(
str(e)
)._is_context_limit_error(str(e)):
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType):
"""Handle the events for the LLM call.
Args:
response (str): The response from the LLM call.
call_type (str): The type of call, either "tool_call" or "llm_call".
"""
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(response=response, call_type=call_type),
)
def _format_messages_for_provider(
self, messages: List[Dict[str, str]]
) -> List[Dict[str, str]]:
@@ -545,95 +564,3 @@ class LLM:
litellm.success_callback = success_callbacks
litellm.failure_callback = failure_callbacks
def _get_execution_context(self) -> Tuple[Optional[Any], Optional[Any]]:
"""Get the agent and task from the execution context.
Returns:
tuple: (agent, task) from any AgentExecutor context, or (None, None) if not found
"""
frame = inspect.currentframe()
caller_frame = frame.f_back if frame else None
agent = None
task = None
# Add a maximum depth to prevent infinite loops
max_depth = 100 # Reasonable limit for call stack depth
current_depth = 0
while caller_frame and current_depth < max_depth:
if "self" in caller_frame.f_locals:
caller_self = caller_frame.f_locals["self"]
if isinstance(caller_self, AgentExecutorProtocol):
agent = caller_self.agent
task = caller_self.task
break
caller_frame = caller_frame.f_back
current_depth += 1
return agent, task
def _get_new_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Get only the new messages that haven't been processed before."""
if not hasattr(self, "_message_history"):
self._message_history = []
new_messages = []
for message in messages:
message_key = (message["role"], message["content"])
if message_key not in [
(m["role"], m["content"]) for m in self._message_history
]:
new_messages.append(message)
self._message_history.append(message)
return new_messages
def _get_new_tool_results(self, agent) -> List[Dict]:
"""Get only the new tool results that haven't been processed before."""
if not agent or not agent.tools_results:
return []
if not hasattr(self, "_tool_results_history"):
self._tool_results_history: List[Dict] = []
new_tool_results = []
for result in agent.tools_results:
# Process tool arguments to extract actual values
processed_args = {}
if isinstance(result["tool_args"], dict):
for key, value in result["tool_args"].items():
if isinstance(value, dict) and "type" in value:
# Skip metadata and just store the actual value
continue
processed_args[key] = value
# Create a clean result with processed arguments
clean_result = {
"tool_name": result["tool_name"],
"tool_args": processed_args,
"result": result["result"],
"content": result.get("content", ""),
"start_time": result.get("start_time", ""),
}
# Check if this exact tool execution exists in history
is_duplicate = False
for history_result in self._tool_results_history:
if (
clean_result["tool_name"] == history_result["tool_name"]
and str(clean_result["tool_args"])
== str(history_result["tool_args"])
and str(clean_result["result"]) == str(history_result["result"])
and clean_result["content"] == history_result.get("content", "")
and clean_result["start_time"]
== history_result.get("start_time", "")
):
is_duplicate = True
break
if not is_duplicate:
new_tool_results.append(clean_result)
self._tool_results_history.append(clean_result)
return new_tool_results

View File

@@ -2,7 +2,6 @@ import ast
import datetime
import json
import time
from datetime import UTC
from difflib import SequenceMatcher
from json import JSONDecodeError
from textwrap import dedent
@@ -118,10 +117,7 @@ class ToolUsage:
self._printer.print(content=f"\n\n{error}\n", color="red")
return error
if (
isinstance(tool, CrewStructuredTool)
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
):
if isinstance(tool, CrewStructuredTool) and tool.name == self._i18n.tools("add_image")["name"]: # type: ignore
try:
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
return result
@@ -158,7 +154,6 @@ class ToolUsage:
self.task.increment_tools_errors()
started_at = time.time()
started_at_trace = datetime.datetime.now(UTC)
from_cache = False
result = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
@@ -186,9 +181,7 @@ class ToolUsage:
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys() # type: ignore
acceptable_args = tool.args_schema.model_json_schema()["properties"].keys() # type: ignore
arguments = {
k: v
for k, v in calling.arguments.items()
@@ -209,7 +202,7 @@ class ToolUsage:
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageErrorException(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
f'\n{error_message}.\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
).message
self.task.increment_tools_errors()
if self.agent.verbose:
@@ -244,7 +237,6 @@ class ToolUsage:
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
"start_time": started_at_trace,
}
self.on_tool_use_finished(
@@ -388,7 +380,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f"{self._i18n.errors('tool_arguments_error')}"
f'{self._i18n.errors("tool_arguments_error")}'
)
if not isinstance(arguments, dict):
@@ -396,7 +388,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f"{self._i18n.errors('tool_arguments_error')}"
f'{self._i18n.errors("tool_arguments_error")}'
)
return ToolCalling(
@@ -424,7 +416,7 @@ class ToolUsage:
if self.agent.verbose:
self._printer.print(content=f"\n\n{e}\n", color="red")
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
f'{self._i18n.errors("tool_usage_error").format(error=e)}\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
)
return self._tool_calling(tool_string)

View File

@@ -1,39 +0,0 @@
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Generator
class TraceContext:
"""Maintains the current trace context throughout the execution stack.
This class provides a context manager for tracking trace execution across
async and sync code paths using ContextVars.
"""
_context: ContextVar = ContextVar("trace_context", default=None)
@classmethod
def get_current(cls):
"""Get the current trace context.
Returns:
Optional[UnifiedTraceController]: The current trace controller or None if not set.
"""
return cls._context.get()
@classmethod
@contextmanager
def set_current(cls, trace):
"""Set the current trace context within a context manager.
Args:
trace: The trace controller to set as current.
Yields:
UnifiedTraceController: The current trace controller.
"""
token = cls._context.set(trace)
try:
yield trace
finally:
cls._context.reset(token)

View File

@@ -1,19 +0,0 @@
from enum import Enum
class TraceType(Enum):
LLM_CALL = "llm_call"
TOOL_CALL = "tool_call"
FLOW_STEP = "flow_step"
START_CALL = "start_call"
class RunType(Enum):
KICKOFF = "kickoff"
TRAIN = "train"
TEST = "test"
class CrewType(Enum):
CREW = "crew"
FLOW = "flow"

View File

@@ -1,89 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class ToolCall(BaseModel):
"""Model representing a tool call during execution"""
name: str
arguments: Dict[str, Any]
output: str
start_time: datetime
end_time: Optional[datetime] = None
latency_ms: Optional[int] = None
error: Optional[str] = None
class LLMRequest(BaseModel):
"""Model representing the LLM request details"""
model: str
messages: List[Dict[str, str]]
temperature: Optional[float] = None
max_tokens: Optional[int] = None
stop_sequences: Optional[List[str]] = None
additional_params: Dict[str, Any] = Field(default_factory=dict)
class LLMResponse(BaseModel):
"""Model representing the LLM response details"""
content: str
finish_reason: Optional[str] = None
class FlowStepIO(BaseModel):
"""Model representing flow step input/output details"""
function_name: str
inputs: Dict[str, Any] = Field(default_factory=dict)
outputs: Any
metadata: Dict[str, Any] = Field(default_factory=dict)
class CrewTrace(BaseModel):
"""Model for tracking detailed information about LLM interactions and Flow steps"""
deployment_instance_id: Optional[str] = Field(
description="ID of the deployment instance"
)
trace_id: str = Field(description="Unique identifier for this trace")
run_id: str = Field(description="Identifier for the execution run")
agent_role: Optional[str] = Field(description="Role of the agent")
task_id: Optional[str] = Field(description="ID of the current task being executed")
task_name: Optional[str] = Field(description="Name of the current task")
task_description: Optional[str] = Field(
description="Description of the current task"
)
trace_type: str = Field(description="Type of the trace")
crew_type: str = Field(description="Type of the crew")
run_type: str = Field(description="Type of the run")
# Timing information
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
latency_ms: Optional[int] = None
# Request/Response for LLM calls
request: Optional[LLMRequest] = None
response: Optional[LLMResponse] = None
# Input/Output for Flow steps
flow_step: Optional[FlowStepIO] = None
# Tool usage
tool_calls: List[ToolCall] = Field(default_factory=list)
# Metrics
tokens_used: Optional[int] = None
prompt_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
cost: Optional[float] = None
# Additional metadata
status: str = "running" # running, completed, error
error: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
tags: List[str] = Field(default_factory=list)

View File

@@ -1,543 +0,0 @@
import inspect
import os
from datetime import UTC, datetime
from functools import wraps
from typing import Any, Awaitable, Callable, Dict, List, Optional
from uuid import uuid4
from crewai.traces.context import TraceContext
from crewai.traces.enums import CrewType, RunType, TraceType
from crewai.traces.models import (
CrewTrace,
FlowStepIO,
LLMRequest,
LLMResponse,
ToolCall,
)
class UnifiedTraceController:
"""Controls and manages trace execution and recording.
This class handles the lifecycle of traces including creation, execution tracking,
and recording of results for various types of operations (LLM calls, tool calls, flow steps).
"""
_task_traces: Dict[str, List["UnifiedTraceController"]] = {}
def __init__(
self,
trace_type: TraceType,
run_type: RunType,
crew_type: CrewType,
run_id: str,
deployment_instance_id: str = os.environ.get(
"CREWAI_DEPLOYMENT_INSTANCE_ID", ""
),
parent_trace_id: Optional[str] = None,
agent_role: Optional[str] = "unknown",
task_name: Optional[str] = None,
task_description: Optional[str] = None,
task_id: Optional[str] = None,
flow_step: Dict[str, Any] = {},
tool_calls: List[ToolCall] = [],
**context: Any,
) -> None:
"""Initialize a new trace controller.
Args:
trace_type: Type of trace being recorded.
run_type: Type of run being executed.
crew_type: Type of crew executing the trace.
run_id: Unique identifier for the run.
deployment_instance_id: Optional deployment instance identifier.
parent_trace_id: Optional parent trace identifier for nested traces.
agent_role: Role of the agent executing the trace.
task_name: Optional name of the task being executed.
task_description: Optional description of the task.
task_id: Optional unique identifier for the task.
flow_step: Optional flow step information.
tool_calls: Optional list of tool calls made during execution.
**context: Additional context parameters.
"""
self.trace_id = str(uuid4())
self.run_id = run_id
self.parent_trace_id = parent_trace_id
self.trace_type = trace_type
self.run_type = run_type
self.crew_type = crew_type
self.context = context
self.agent_role = agent_role
self.task_name = task_name
self.task_description = task_description
self.task_id = task_id
self.deployment_instance_id = deployment_instance_id
self.children: List[Dict[str, Any]] = []
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
self.error: Optional[str] = None
self.tool_calls = tool_calls
self.flow_step = flow_step
self.status: str = "running"
# Add trace to task's trace collection if task_id is present
if task_id:
self._add_to_task_traces()
def _add_to_task_traces(self) -> None:
"""Add this trace to the task's trace collection."""
if not hasattr(UnifiedTraceController, "_task_traces"):
UnifiedTraceController._task_traces = {}
if self.task_id is None:
return
if self.task_id not in UnifiedTraceController._task_traces:
UnifiedTraceController._task_traces[self.task_id] = []
UnifiedTraceController._task_traces[self.task_id].append(self)
@classmethod
def get_task_traces(cls, task_id: str) -> List["UnifiedTraceController"]:
"""Get all traces for a specific task.
Args:
task_id: The ID of the task to get traces for
Returns:
List of traces associated with the task
"""
return cls._task_traces.get(task_id, [])
@classmethod
def clear_task_traces(cls, task_id: str) -> None:
"""Clear traces for a specific task.
Args:
task_id: The ID of the task to clear traces for
"""
if hasattr(cls, "_task_traces") and task_id in cls._task_traces:
del cls._task_traces[task_id]
def _get_current_trace(self) -> "UnifiedTraceController":
return TraceContext.get_current()
def start_trace(self) -> "UnifiedTraceController":
"""Start the trace execution.
Returns:
UnifiedTraceController: Self for method chaining.
"""
self.start_time = datetime.now(UTC)
return self
def end_trace(self, result: Any = None, error: Optional[str] = None) -> None:
"""End the trace execution and record results.
Args:
result: Optional result from the trace execution.
error: Optional error message if the trace failed.
"""
self.end_time = datetime.now(UTC)
self.status = "error" if error else "completed"
self.error = error
self._record_trace(result)
def add_child_trace(self, child_trace: Dict[str, Any]) -> None:
"""Add a child trace to this trace's execution history.
Args:
child_trace: The child trace information to add.
"""
self.children.append(child_trace)
def to_crew_trace(self) -> CrewTrace:
"""Convert to CrewTrace format for storage.
Returns:
CrewTrace: The trace data in CrewTrace format.
"""
latency_ms = None
if self.tool_calls and hasattr(self.tool_calls[0], "start_time"):
self.start_time = self.tool_calls[0].start_time
if self.start_time and self.end_time:
latency_ms = int((self.end_time - self.start_time).total_seconds() * 1000)
request = None
response = None
flow_step_obj = None
if self.trace_type in [TraceType.LLM_CALL, TraceType.TOOL_CALL]:
request = LLMRequest(
model=self.context.get("model", "unknown"),
messages=self.context.get("messages", []),
temperature=self.context.get("temperature"),
max_tokens=self.context.get("max_tokens"),
stop_sequences=self.context.get("stop_sequences"),
)
if "response" in self.context:
response = LLMResponse(
content=self.context["response"].get("content", ""),
finish_reason=self.context["response"].get("finish_reason"),
)
elif self.trace_type == TraceType.FLOW_STEP:
flow_step_obj = FlowStepIO(
function_name=self.flow_step.get("function_name", "unknown"),
inputs=self.flow_step.get("inputs", {}),
outputs={"result": self.context.get("response")},
metadata=self.flow_step.get("metadata", {}),
)
return CrewTrace(
deployment_instance_id=self.deployment_instance_id,
trace_id=self.trace_id,
task_id=self.task_id,
run_id=self.run_id,
agent_role=self.agent_role,
task_name=self.task_name,
task_description=self.task_description,
trace_type=self.trace_type.value,
crew_type=self.crew_type.value,
run_type=self.run_type.value,
start_time=self.start_time,
end_time=self.end_time,
latency_ms=latency_ms,
request=request,
response=response,
flow_step=flow_step_obj,
tool_calls=self.tool_calls,
tokens_used=self.context.get("tokens_used"),
prompt_tokens=self.context.get("prompt_tokens"),
completion_tokens=self.context.get("completion_tokens"),
status=self.status,
error=self.error,
)
def _record_trace(self, result: Any = None) -> None:
"""Record the trace.
This method is called when a trace is completed. It ensures the trace
is properly recorded and associated with its task if applicable.
Args:
result: Optional result to include in the trace
"""
if result:
self.context["response"] = result
# Add to task traces if this trace belongs to a task
if self.task_id:
self._add_to_task_traces()
def should_trace() -> bool:
"""Check if tracing is enabled via environment variable."""
return os.getenv("CREWAI_ENABLE_TRACING", "false").lower() == "true"
# Crew main trace
def init_crew_main_trace(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator to initialize and track the main crew execution trace.
This decorator sets up the trace context for the main crew execution,
handling both synchronous and asynchronous crew operations.
Args:
func: The crew function to be traced.
Returns:
Wrapped function that creates and manages the main crew trace context.
"""
@wraps(func)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
if not should_trace():
return func(self, *args, **kwargs)
trace = build_crew_main_trace(self)
with TraceContext.set_current(trace):
try:
return func(self, *args, **kwargs)
except Exception as e:
trace.end_trace(error=str(e))
raise
return wrapper
def build_crew_main_trace(self: Any) -> "UnifiedTraceController":
"""Build the main trace controller for a crew execution.
This function creates a trace controller configured for the main crew execution,
handling different run types (kickoff, test, train) and maintaining context.
Args:
self: The crew instance.
Returns:
UnifiedTraceController: The configured trace controller for the crew.
"""
run_type = RunType.KICKOFF
if hasattr(self, "_test") and self._test:
run_type = RunType.TEST
elif hasattr(self, "_train") and self._train:
run_type = RunType.TRAIN
current_trace = TraceContext.get_current()
trace = UnifiedTraceController(
trace_type=TraceType.LLM_CALL,
run_type=run_type,
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
run_id=current_trace.run_id if current_trace else str(self.id),
parent_trace_id=current_trace.trace_id if current_trace else None,
)
return trace
# Flow main trace
def init_flow_main_trace(
func: Callable[..., Awaitable[Any]],
) -> Callable[..., Awaitable[Any]]:
"""Decorator to initialize and track the main flow execution trace.
Args:
func: The async flow function to be traced.
Returns:
Wrapped async function that creates and manages the main flow trace context.
"""
@wraps(func)
async def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
if not should_trace():
return await func(self, *args, **kwargs)
trace = build_flow_main_trace(self, *args, **kwargs)
with TraceContext.set_current(trace):
try:
return await func(self, *args, **kwargs)
except Exception:
raise
return wrapper
def build_flow_main_trace(
self: Any, *args: Any, **kwargs: Any
) -> "UnifiedTraceController":
"""Build the main trace controller for a flow execution.
Args:
self: The flow instance.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
Returns:
UnifiedTraceController: The configured trace controller for the flow.
"""
current_trace = TraceContext.get_current()
trace = UnifiedTraceController(
trace_type=TraceType.FLOW_STEP,
run_id=current_trace.run_id if current_trace else str(self.flow_id),
parent_trace_id=current_trace.trace_id if current_trace else None,
crew_type=CrewType.FLOW,
run_type=RunType.KICKOFF,
context={
"crew_name": self.__class__.__name__,
"inputs": kwargs.get("inputs", {}),
"agents": [],
"tasks": [],
},
)
return trace
# Flow step trace
def trace_flow_step(
func: Callable[..., Awaitable[Any]],
) -> Callable[..., Awaitable[Any]]:
"""Decorator to trace individual flow step executions.
Args:
func: The async flow step function to be traced.
Returns:
Wrapped async function that creates and manages the flow step trace context.
"""
@wraps(func)
async def wrapper(
self: Any,
method_name: str,
method: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Any:
if not should_trace():
return await func(self, method_name, method, *args, **kwargs)
trace = build_flow_step_trace(self, method_name, method, *args, **kwargs)
with TraceContext.set_current(trace):
trace.start_trace()
try:
result = await func(self, method_name, method, *args, **kwargs)
trace.end_trace(result=result)
return result
except Exception as e:
trace.end_trace(error=str(e))
raise
return wrapper
def build_flow_step_trace(
self: Any, method_name: str, method: Callable[..., Any], *args: Any, **kwargs: Any
) -> "UnifiedTraceController":
"""Build a trace controller for an individual flow step.
Args:
self: The flow instance.
method_name: Name of the method being executed.
method: The actual method being executed.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
Returns:
UnifiedTraceController: The configured trace controller for the flow step.
"""
current_trace = TraceContext.get_current()
# Get method signature
sig = inspect.signature(method)
params = list(sig.parameters.values())
# Create inputs dictionary mapping parameter names to values
method_params = [p for p in params if p.name != "self"]
inputs: Dict[str, Any] = {}
# Map positional args to their parameter names
for i, param in enumerate(method_params):
if i < len(args):
inputs[param.name] = args[i]
# Add keyword arguments
inputs.update(kwargs)
trace = UnifiedTraceController(
trace_type=TraceType.FLOW_STEP,
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
crew_type=current_trace.crew_type if current_trace else CrewType.FLOW,
run_id=current_trace.run_id if current_trace else str(self.flow_id),
parent_trace_id=current_trace.trace_id if current_trace else None,
flow_step={
"function_name": method_name,
"inputs": inputs,
"metadata": {
"crew_name": self.__class__.__name__,
},
},
)
return trace
# LLM trace
def trace_llm_call(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator to trace LLM calls.
Args:
func: The function to trace.
Returns:
Wrapped function that creates and manages the LLM call trace context.
"""
@wraps(func)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
if not should_trace():
return func(self, *args, **kwargs)
trace = build_llm_trace(self, *args, **kwargs)
with TraceContext.set_current(trace):
trace.start_trace()
try:
response = func(self, *args, **kwargs)
# Extract relevant data from response
trace_response = {
"content": response["choices"][0]["message"]["content"],
"finish_reason": response["choices"][0].get("finish_reason"),
}
# Add usage metrics to context
if "usage" in response:
trace.context["tokens_used"] = response["usage"].get(
"total_tokens", 0
)
trace.context["prompt_tokens"] = response["usage"].get(
"prompt_tokens", 0
)
trace.context["completion_tokens"] = response["usage"].get(
"completion_tokens", 0
)
trace.end_trace(trace_response)
return response
except Exception as e:
trace.end_trace(error=str(e))
raise
return wrapper
def build_llm_trace(
self: Any, params: Dict[str, Any], *args: Any, **kwargs: Any
) -> Any:
"""Build a trace controller for an LLM call.
Args:
self: The LLM instance.
params: The parameters for the LLM call.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
Returns:
UnifiedTraceController: The configured trace controller for the LLM call.
"""
current_trace = TraceContext.get_current()
agent, task = self._get_execution_context()
# Get new messages and tool results
new_messages = self._get_new_messages(params.get("messages", []))
new_tool_results = self._get_new_tool_results(agent)
# Create trace context
trace = UnifiedTraceController(
trace_type=TraceType.TOOL_CALL if new_tool_results else TraceType.LLM_CALL,
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
run_id=current_trace.run_id if current_trace else str(uuid4()),
parent_trace_id=current_trace.trace_id if current_trace else None,
agent_role=agent.role if agent else "unknown",
task_id=str(task.id) if task else None,
task_name=task.name if task else None,
task_description=task.description if task else None,
model=self.model,
messages=new_messages,
temperature=self.temperature,
max_tokens=self.max_tokens,
stop_sequences=self.stop,
tool_calls=[
ToolCall(
name=result["tool_name"],
arguments=result["tool_args"],
output=str(result["result"]),
start_time=result.get("start_time", ""),
end_time=datetime.now(UTC),
)
for result in new_tool_results
],
)
return trace

View File

@@ -39,8 +39,8 @@
"validation_error": "### Previous attempt failed validation: {guardrail_result_error}\n\n\n### Previous result:\n{task_output}\n\n\nTry again, making sure to address the validation error."
},
"tools": {
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",
"ask_question": "Ask a specific question to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolute everything you know, don't reference things but instead explain them.",
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolutely everything you know, don't reference things but instead explain them.",
"ask_question": "Ask a specific question to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolutely everything you know, don't reference things but instead explain them.",
"add_image": {
"name": "Add image to content",
"description": "See image to understand its content, you can optionally ask a question about the image",

View File

@@ -34,6 +34,7 @@ from .tool_usage_events import (
ToolUsageEvent,
ToolValidateInputErrorEvent,
)
from .llm_events import LLMCallCompletedEvent, LLMCallFailedEvent, LLMCallStartedEvent
# events
from .event_listener import EventListener

View File

@@ -1,9 +1,17 @@
from pydantic import PrivateAttr
from typing import Any, Dict
from pydantic import Field, PrivateAttr
from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
from .crew_events import (
@@ -37,6 +45,7 @@ class EventListener(BaseEventListener):
_instance = None
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
def __new__(cls):
if cls._instance is None:
@@ -49,6 +58,7 @@ class EventListener(BaseEventListener):
super().__init__()
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self.execution_spans = {}
self._initialized = True
# ----------- CREW EVENTS -----------
@@ -57,7 +67,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent):
self.logger.log(
f"🚀 Crew '{event.crew_name}' started",
f"🚀 Crew '{event.crew_name}' started, {source.id}",
event.timestamp,
)
self._telemetry.crew_execution_span(source, event.inputs)
@@ -67,28 +77,28 @@ class EventListener(BaseEventListener):
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
self.logger.log(
f"✅ Crew '{event.crew_name}' completed",
f"✅ Crew '{event.crew_name}' completed, {source.id}",
event.timestamp,
)
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed",
f"❌ Crew '{event.crew_name}' failed, {source.id}",
event.timestamp,
)
@crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy()
cloned_crew._telemetry.test_execution_span(
self._telemetry.test_execution_span(
cloned_crew,
event.n_iterations,
event.inputs,
event.eval_llm,
event.eval_llm or "",
)
self.logger.log(
f"🚀 Crew '{event.crew_name}' started test",
f"🚀 Crew '{event.crew_name}' started test, {source.id}",
event.timestamp,
)
@@ -131,9 +141,9 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
source._execution_span = self._telemetry.task_started(
crew=source.agent.crew, task=source
)
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
self.logger.log(
f"📋 Task started: {source.description}",
event.timestamp,
@@ -141,24 +151,22 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
if source._execution_span:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
span = self.execution_spans.get(source)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.logger.log(
f"✅ Task completed: {source.description}",
event.timestamp,
)
source._execution_span = None
self.execution_spans[source] = None
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
if source._execution_span:
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
source._execution_span = None
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
self.logger.log(
f"❌ Task failed: {source.description}",
event.timestamp,
@@ -184,7 +192,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(self.__class__.__name__)
self._telemetry.flow_creation_span(event.flow_name)
self.logger.log(
f"🌊 Flow Created: '{event.flow_name}'",
event.timestamp,
@@ -193,17 +201,17 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
source.__class__.__name__, list(source._methods.keys())
event.flow_name, list(source._methods.keys())
)
self.logger.log(
f"🤖 Flow Started: '{event.flow_name}'",
f"🤖 Flow Started: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
self.logger.log(
f"👍 Flow Finished: '{event.flow_name}'",
f"👍 Flow Finished: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)
@@ -253,5 +261,28 @@ class EventListener(BaseEventListener):
#
)
# ----------- LLM EVENTS -----------
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
self.logger.log(
f"🤖 LLM Call Started",
event.timestamp,
)
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.logger.log(
f"✅ LLM Call Completed",
event.timestamp,
)
@crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event: LLMCallFailedEvent):
self.logger.log(
f"❌ LLM Call Failed: '{event.error}'",
event.timestamp,
)
event_listener = EventListener()

View File

@@ -0,0 +1,36 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from crewai.utilities.events.base_events import CrewEvent
class LLMCallType(Enum):
"""Type of LLM call being made"""
TOOL_CALL = "tool_call"
LLM_CALL = "llm_call"
class LLMCallStartedEvent(CrewEvent):
"""Event emitted when a LLM call starts"""
type: str = "llm_call_started"
messages: Union[str, List[Dict[str, str]]]
tools: Optional[List[dict]] = None
callbacks: Optional[List[Any]] = None
available_functions: Optional[Dict[str, Any]] = None
class LLMCallCompletedEvent(CrewEvent):
"""Event emitted when a LLM call completes"""
type: str = "llm_call_completed"
response: Any
call_type: LLMCallType
class LLMCallFailedEvent(CrewEvent):
"""Event emitted when a LLM call fails"""
error: str
type: str = "llm_call_failed"

View File

@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Optional
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.base_events import CrewEvent

View File

@@ -44,6 +44,7 @@ def create_llm(
# Extract attributes with explicit types
model = (
getattr(llm_value, "model_name", None)
or getattr(llm_value, "model", None)
or getattr(llm_value, "deployment_name", None)
or str(llm_value)
)

View File

@@ -1,12 +0,0 @@
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class AgentExecutorProtocol(Protocol):
"""Protocol defining the expected interface for an agent executor."""
@property
def agent(self) -> Any: ...
@property
def task(self) -> Any: ...