mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-27 09:48:30 +00:00
Compare commits
11 Commits
bugfix/asy
...
revert-90f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0e14b82124 | ||
|
|
38d92d1aaf | ||
|
|
70b5e753a5 | ||
|
|
5235442a5b | ||
|
|
1f8b90e391 | ||
|
|
c62fb615b1 | ||
|
|
78797c64b0 | ||
|
|
8ac6f6a536 | ||
|
|
a1cb222f3a | ||
|
|
6d846c0024 | ||
|
|
8a7584798b |
@@ -35,10 +35,8 @@ from crewai.process import Process
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.conditional_task import ConditionalTask
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.telemetry import Telemetry
|
||||
from crewai.tools.agent_tools.agent_tools import AgentTools
|
||||
from crewai.tools.base_tool import Tool
|
||||
from crewai.traces.unified_trace_controller import init_crew_main_trace
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
||||
from crewai.utilities.constants import TRAINING_DATA_FILE
|
||||
@@ -258,8 +256,6 @@ class Crew(BaseModel):
|
||||
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
|
||||
self.function_calling_llm = create_llm(self.function_calling_llm)
|
||||
|
||||
self._telemetry = Telemetry()
|
||||
self._telemetry.set_tracer()
|
||||
return self
|
||||
|
||||
@model_validator(mode="after")
|
||||
@@ -574,7 +570,6 @@ class Crew(BaseModel):
|
||||
CrewTrainingHandler(filename).clear()
|
||||
raise
|
||||
|
||||
@init_crew_main_trace
|
||||
def kickoff(
|
||||
self,
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
@@ -1115,7 +1110,6 @@ class Crew(BaseModel):
|
||||
"_short_term_memory",
|
||||
"_long_term_memory",
|
||||
"_entity_memory",
|
||||
"_telemetry",
|
||||
"agents",
|
||||
"tasks",
|
||||
"knowledge_sources",
|
||||
@@ -1278,11 +1272,11 @@ class Crew(BaseModel):
|
||||
def _reset_all_memories(self) -> None:
|
||||
"""Reset all available memory systems."""
|
||||
memory_systems = [
|
||||
("short term", self._short_term_memory),
|
||||
("entity", self._entity_memory),
|
||||
("long term", self._long_term_memory),
|
||||
("task output", self._task_output_handler),
|
||||
("knowledge", self.knowledge),
|
||||
("short term", getattr(self, "_short_term_memory", None)),
|
||||
("entity", getattr(self, "_entity_memory", None)),
|
||||
("long term", getattr(self, "_long_term_memory", None)),
|
||||
("task output", getattr(self, "_task_output_handler", None)),
|
||||
("knowledge", getattr(self, "knowledge", None)),
|
||||
]
|
||||
|
||||
for name, system in memory_systems:
|
||||
|
||||
@@ -22,10 +22,6 @@ from pydantic import BaseModel, Field, ValidationError
|
||||
from crewai.flow.flow_visualizer import plot_flow
|
||||
from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.flow.utils import get_possible_return_constants
|
||||
from crewai.traces.unified_trace_controller import (
|
||||
init_flow_main_trace,
|
||||
trace_flow_step,
|
||||
)
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.flow_events import (
|
||||
FlowCreatedEvent,
|
||||
@@ -725,7 +721,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
|
||||
return asyncio.run(run_flow())
|
||||
|
||||
@init_flow_main_trace
|
||||
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
||||
"""
|
||||
Start the flow execution asynchronously.
|
||||
@@ -782,18 +777,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
|
||||
)
|
||||
|
||||
if not self._start_methods:
|
||||
raise ValueError("No start method defined")
|
||||
if inputs is not None and "id" not in inputs:
|
||||
self._initialize_state(inputs)
|
||||
|
||||
# Execute all start methods concurrently.
|
||||
tasks = [
|
||||
self._execute_start_method(start_method)
|
||||
for start_method in self._start_methods
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
final_output = self._method_outputs[-1] if self._method_outputs else None
|
||||
|
||||
# Emit FlowFinishedEvent after all processing is complete.
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
FlowFinishedEvent(
|
||||
@@ -802,6 +796,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
result=final_output,
|
||||
),
|
||||
)
|
||||
|
||||
return final_output
|
||||
|
||||
async def _execute_start_method(self, start_method_name: str) -> None:
|
||||
@@ -827,7 +822,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
)
|
||||
await self._execute_listeners(start_method_name, result)
|
||||
|
||||
@trace_flow_step
|
||||
async def _execute_method(
|
||||
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
|
||||
) -> Any:
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -6,21 +5,17 @@ import sys
|
||||
import threading
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Literal,
|
||||
Optional,
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
from typing import Any, Dict, List, Literal, Optional, Type, Union, cast
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.utilities.events.llm_events import (
|
||||
LLMCallCompletedEvent,
|
||||
LLMCallFailedEvent,
|
||||
LLMCallStartedEvent,
|
||||
LLMCallType,
|
||||
)
|
||||
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
|
||||
|
||||
with warnings.catch_warnings():
|
||||
@@ -31,12 +26,10 @@ with warnings.catch_warnings():
|
||||
from litellm.utils import get_supported_openai_params, supports_response_schema
|
||||
|
||||
|
||||
from crewai.traces.unified_trace_controller import trace_llm_call
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.utilities.exceptions.context_window_exceeding_exception import (
|
||||
LLMContextLengthExceededException,
|
||||
)
|
||||
from crewai.utilities.protocols import AgentExecutorProtocol
|
||||
|
||||
load_dotenv()
|
||||
|
||||
@@ -180,7 +173,6 @@ class LLM:
|
||||
self.context_window_size = 0
|
||||
self.reasoning_effort = reasoning_effort
|
||||
self.additional_params = kwargs
|
||||
self._message_history: List[Dict[str, str]] = []
|
||||
self.is_anthropic = self._is_anthropic_model(model)
|
||||
|
||||
litellm.drop_params = True
|
||||
@@ -196,12 +188,6 @@ class LLM:
|
||||
self.set_callbacks(callbacks)
|
||||
self.set_env_callbacks()
|
||||
|
||||
@trace_llm_call
|
||||
def _call_llm(self, params: Dict[str, Any]) -> Any:
|
||||
with suppress_warnings():
|
||||
response = litellm.completion(**params)
|
||||
return response
|
||||
|
||||
def _is_anthropic_model(self, model: str) -> bool:
|
||||
"""Determine if the model is from Anthropic provider.
|
||||
|
||||
@@ -259,6 +245,15 @@ class LLM:
|
||||
>>> print(response)
|
||||
"The capital of France is Paris."
|
||||
"""
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallStartedEvent(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
),
|
||||
)
|
||||
# Validate parameters before proceeding with the call.
|
||||
self._validate_call_params()
|
||||
|
||||
@@ -311,7 +306,7 @@ class LLM:
|
||||
params = {k: v for k, v in params.items() if v is not None}
|
||||
|
||||
# --- 2) Make the completion call
|
||||
response = self._call_llm(params)
|
||||
response = litellm.completion(**params)
|
||||
response_message = cast(Choices, cast(ModelResponse, response).choices)[
|
||||
0
|
||||
].message
|
||||
@@ -333,12 +328,13 @@ class LLM:
|
||||
|
||||
# --- 4) If no tool calls, return the text response
|
||||
if not tool_calls or not available_functions:
|
||||
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL)
|
||||
return text_response
|
||||
|
||||
# --- 5) Handle the tool call
|
||||
tool_call = tool_calls[0]
|
||||
function_name = tool_call.function.name
|
||||
print("function_name", function_name)
|
||||
|
||||
if function_name in available_functions:
|
||||
try:
|
||||
function_args = json.loads(tool_call.function.arguments)
|
||||
@@ -350,6 +346,7 @@ class LLM:
|
||||
try:
|
||||
# Call the actual tool function
|
||||
result = fn(**function_args)
|
||||
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
@@ -365,6 +362,12 @@ class LLM:
|
||||
error=str(e),
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallFailedEvent(
|
||||
error=f"Tool execution error: {str(e)}"
|
||||
),
|
||||
)
|
||||
return text_response
|
||||
|
||||
else:
|
||||
@@ -374,12 +377,28 @@ class LLM:
|
||||
return text_response
|
||||
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallFailedEvent(error=str(e)),
|
||||
)
|
||||
if not LLMContextLengthExceededException(
|
||||
str(e)
|
||||
)._is_context_limit_error(str(e)):
|
||||
logging.error(f"LiteLLM call failed: {str(e)}")
|
||||
raise
|
||||
|
||||
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType):
|
||||
"""Handle the events for the LLM call.
|
||||
|
||||
Args:
|
||||
response (str): The response from the LLM call.
|
||||
call_type (str): The type of call, either "tool_call" or "llm_call".
|
||||
"""
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallCompletedEvent(response=response, call_type=call_type),
|
||||
)
|
||||
|
||||
def _format_messages_for_provider(
|
||||
self, messages: List[Dict[str, str]]
|
||||
) -> List[Dict[str, str]]:
|
||||
@@ -531,95 +550,3 @@ class LLM:
|
||||
|
||||
litellm.success_callback = success_callbacks
|
||||
litellm.failure_callback = failure_callbacks
|
||||
|
||||
def _get_execution_context(self) -> Tuple[Optional[Any], Optional[Any]]:
|
||||
"""Get the agent and task from the execution context.
|
||||
|
||||
Returns:
|
||||
tuple: (agent, task) from any AgentExecutor context, or (None, None) if not found
|
||||
"""
|
||||
frame = inspect.currentframe()
|
||||
caller_frame = frame.f_back if frame else None
|
||||
agent = None
|
||||
task = None
|
||||
|
||||
# Add a maximum depth to prevent infinite loops
|
||||
max_depth = 100 # Reasonable limit for call stack depth
|
||||
current_depth = 0
|
||||
|
||||
while caller_frame and current_depth < max_depth:
|
||||
if "self" in caller_frame.f_locals:
|
||||
caller_self = caller_frame.f_locals["self"]
|
||||
if isinstance(caller_self, AgentExecutorProtocol):
|
||||
agent = caller_self.agent
|
||||
task = caller_self.task
|
||||
break
|
||||
caller_frame = caller_frame.f_back
|
||||
current_depth += 1
|
||||
|
||||
return agent, task
|
||||
|
||||
def _get_new_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
|
||||
"""Get only the new messages that haven't been processed before."""
|
||||
if not hasattr(self, "_message_history"):
|
||||
self._message_history = []
|
||||
|
||||
new_messages = []
|
||||
for message in messages:
|
||||
message_key = (message["role"], message["content"])
|
||||
if message_key not in [
|
||||
(m["role"], m["content"]) for m in self._message_history
|
||||
]:
|
||||
new_messages.append(message)
|
||||
self._message_history.append(message)
|
||||
return new_messages
|
||||
|
||||
def _get_new_tool_results(self, agent) -> List[Dict]:
|
||||
"""Get only the new tool results that haven't been processed before."""
|
||||
if not agent or not agent.tools_results:
|
||||
return []
|
||||
|
||||
if not hasattr(self, "_tool_results_history"):
|
||||
self._tool_results_history: List[Dict] = []
|
||||
|
||||
new_tool_results = []
|
||||
|
||||
for result in agent.tools_results:
|
||||
# Process tool arguments to extract actual values
|
||||
processed_args = {}
|
||||
if isinstance(result["tool_args"], dict):
|
||||
for key, value in result["tool_args"].items():
|
||||
if isinstance(value, dict) and "type" in value:
|
||||
# Skip metadata and just store the actual value
|
||||
continue
|
||||
processed_args[key] = value
|
||||
|
||||
# Create a clean result with processed arguments
|
||||
clean_result = {
|
||||
"tool_name": result["tool_name"],
|
||||
"tool_args": processed_args,
|
||||
"result": result["result"],
|
||||
"content": result.get("content", ""),
|
||||
"start_time": result.get("start_time", ""),
|
||||
}
|
||||
|
||||
# Check if this exact tool execution exists in history
|
||||
is_duplicate = False
|
||||
for history_result in self._tool_results_history:
|
||||
if (
|
||||
clean_result["tool_name"] == history_result["tool_name"]
|
||||
and str(clean_result["tool_args"])
|
||||
== str(history_result["tool_args"])
|
||||
and str(clean_result["result"]) == str(history_result["result"])
|
||||
and clean_result["content"] == history_result.get("content", "")
|
||||
and clean_result["start_time"]
|
||||
== history_result.get("start_time", "")
|
||||
):
|
||||
is_duplicate = True
|
||||
break
|
||||
|
||||
if not is_duplicate:
|
||||
new_tool_results.append(clean_result)
|
||||
self._tool_results_history.append(clean_result)
|
||||
|
||||
return new_tool_results
|
||||
|
||||
@@ -2,7 +2,6 @@ import ast
|
||||
import datetime
|
||||
import json
|
||||
import time
|
||||
from datetime import UTC
|
||||
from difflib import SequenceMatcher
|
||||
from json import JSONDecodeError
|
||||
from textwrap import dedent
|
||||
@@ -118,10 +117,7 @@ class ToolUsage:
|
||||
self._printer.print(content=f"\n\n{error}\n", color="red")
|
||||
return error
|
||||
|
||||
if (
|
||||
isinstance(tool, CrewStructuredTool)
|
||||
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
|
||||
):
|
||||
if isinstance(tool, CrewStructuredTool) and tool.name == self._i18n.tools("add_image")["name"]: # type: ignore
|
||||
try:
|
||||
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
|
||||
return result
|
||||
@@ -158,7 +154,6 @@ class ToolUsage:
|
||||
self.task.increment_tools_errors()
|
||||
|
||||
started_at = time.time()
|
||||
started_at_trace = datetime.datetime.now(UTC)
|
||||
from_cache = False
|
||||
|
||||
result = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
|
||||
@@ -186,9 +181,7 @@ class ToolUsage:
|
||||
|
||||
if calling.arguments:
|
||||
try:
|
||||
acceptable_args = tool.args_schema.model_json_schema()[
|
||||
"properties"
|
||||
].keys() # type: ignore
|
||||
acceptable_args = tool.args_schema.model_json_schema()["properties"].keys() # type: ignore
|
||||
arguments = {
|
||||
k: v
|
||||
for k, v in calling.arguments.items()
|
||||
@@ -209,7 +202,7 @@ class ToolUsage:
|
||||
error=e, tool=tool.name, tool_inputs=tool.description
|
||||
)
|
||||
error = ToolUsageErrorException(
|
||||
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
|
||||
f'\n{error_message}.\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
|
||||
).message
|
||||
self.task.increment_tools_errors()
|
||||
if self.agent.verbose:
|
||||
@@ -244,7 +237,6 @@ class ToolUsage:
|
||||
"result": result,
|
||||
"tool_name": tool.name,
|
||||
"tool_args": calling.arguments,
|
||||
"start_time": started_at_trace,
|
||||
}
|
||||
|
||||
self.on_tool_use_finished(
|
||||
@@ -388,7 +380,7 @@ class ToolUsage:
|
||||
raise
|
||||
else:
|
||||
return ToolUsageErrorException(
|
||||
f"{self._i18n.errors('tool_arguments_error')}"
|
||||
f'{self._i18n.errors("tool_arguments_error")}'
|
||||
)
|
||||
|
||||
if not isinstance(arguments, dict):
|
||||
@@ -396,7 +388,7 @@ class ToolUsage:
|
||||
raise
|
||||
else:
|
||||
return ToolUsageErrorException(
|
||||
f"{self._i18n.errors('tool_arguments_error')}"
|
||||
f'{self._i18n.errors("tool_arguments_error")}'
|
||||
)
|
||||
|
||||
return ToolCalling(
|
||||
@@ -424,7 +416,7 @@ class ToolUsage:
|
||||
if self.agent.verbose:
|
||||
self._printer.print(content=f"\n\n{e}\n", color="red")
|
||||
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
|
||||
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
|
||||
f'{self._i18n.errors("tool_usage_error").format(error=e)}\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
|
||||
)
|
||||
return self._tool_calling(tool_string)
|
||||
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
from contextlib import contextmanager
|
||||
from contextvars import ContextVar
|
||||
from typing import Generator
|
||||
|
||||
|
||||
class TraceContext:
|
||||
"""Maintains the current trace context throughout the execution stack.
|
||||
|
||||
This class provides a context manager for tracking trace execution across
|
||||
async and sync code paths using ContextVars.
|
||||
"""
|
||||
|
||||
_context: ContextVar = ContextVar("trace_context", default=None)
|
||||
|
||||
@classmethod
|
||||
def get_current(cls):
|
||||
"""Get the current trace context.
|
||||
|
||||
Returns:
|
||||
Optional[UnifiedTraceController]: The current trace controller or None if not set.
|
||||
"""
|
||||
return cls._context.get()
|
||||
|
||||
@classmethod
|
||||
@contextmanager
|
||||
def set_current(cls, trace):
|
||||
"""Set the current trace context within a context manager.
|
||||
|
||||
Args:
|
||||
trace: The trace controller to set as current.
|
||||
|
||||
Yields:
|
||||
UnifiedTraceController: The current trace controller.
|
||||
"""
|
||||
token = cls._context.set(trace)
|
||||
try:
|
||||
yield trace
|
||||
finally:
|
||||
cls._context.reset(token)
|
||||
@@ -1,19 +0,0 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class TraceType(Enum):
|
||||
LLM_CALL = "llm_call"
|
||||
TOOL_CALL = "tool_call"
|
||||
FLOW_STEP = "flow_step"
|
||||
START_CALL = "start_call"
|
||||
|
||||
|
||||
class RunType(Enum):
|
||||
KICKOFF = "kickoff"
|
||||
TRAIN = "train"
|
||||
TEST = "test"
|
||||
|
||||
|
||||
class CrewType(Enum):
|
||||
CREW = "crew"
|
||||
FLOW = "flow"
|
||||
@@ -1,89 +0,0 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class ToolCall(BaseModel):
|
||||
"""Model representing a tool call during execution"""
|
||||
|
||||
name: str
|
||||
arguments: Dict[str, Any]
|
||||
output: str
|
||||
start_time: datetime
|
||||
end_time: Optional[datetime] = None
|
||||
latency_ms: Optional[int] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
class LLMRequest(BaseModel):
|
||||
"""Model representing the LLM request details"""
|
||||
|
||||
model: str
|
||||
messages: List[Dict[str, str]]
|
||||
temperature: Optional[float] = None
|
||||
max_tokens: Optional[int] = None
|
||||
stop_sequences: Optional[List[str]] = None
|
||||
additional_params: Dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class LLMResponse(BaseModel):
|
||||
"""Model representing the LLM response details"""
|
||||
|
||||
content: str
|
||||
finish_reason: Optional[str] = None
|
||||
|
||||
|
||||
class FlowStepIO(BaseModel):
|
||||
"""Model representing flow step input/output details"""
|
||||
|
||||
function_name: str
|
||||
inputs: Dict[str, Any] = Field(default_factory=dict)
|
||||
outputs: Any
|
||||
metadata: Dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class CrewTrace(BaseModel):
|
||||
"""Model for tracking detailed information about LLM interactions and Flow steps"""
|
||||
|
||||
deployment_instance_id: Optional[str] = Field(
|
||||
description="ID of the deployment instance"
|
||||
)
|
||||
trace_id: str = Field(description="Unique identifier for this trace")
|
||||
run_id: str = Field(description="Identifier for the execution run")
|
||||
agent_role: Optional[str] = Field(description="Role of the agent")
|
||||
task_id: Optional[str] = Field(description="ID of the current task being executed")
|
||||
task_name: Optional[str] = Field(description="Name of the current task")
|
||||
task_description: Optional[str] = Field(
|
||||
description="Description of the current task"
|
||||
)
|
||||
trace_type: str = Field(description="Type of the trace")
|
||||
crew_type: str = Field(description="Type of the crew")
|
||||
run_type: str = Field(description="Type of the run")
|
||||
|
||||
# Timing information
|
||||
start_time: Optional[datetime] = None
|
||||
end_time: Optional[datetime] = None
|
||||
latency_ms: Optional[int] = None
|
||||
|
||||
# Request/Response for LLM calls
|
||||
request: Optional[LLMRequest] = None
|
||||
response: Optional[LLMResponse] = None
|
||||
|
||||
# Input/Output for Flow steps
|
||||
flow_step: Optional[FlowStepIO] = None
|
||||
|
||||
# Tool usage
|
||||
tool_calls: List[ToolCall] = Field(default_factory=list)
|
||||
|
||||
# Metrics
|
||||
tokens_used: Optional[int] = None
|
||||
prompt_tokens: Optional[int] = None
|
||||
completion_tokens: Optional[int] = None
|
||||
cost: Optional[float] = None
|
||||
|
||||
# Additional metadata
|
||||
status: str = "running" # running, completed, error
|
||||
error: Optional[str] = None
|
||||
metadata: Dict[str, Any] = Field(default_factory=dict)
|
||||
tags: List[str] = Field(default_factory=list)
|
||||
@@ -1,543 +0,0 @@
|
||||
import inspect
|
||||
import os
|
||||
from datetime import UTC, datetime
|
||||
from functools import wraps
|
||||
from typing import Any, Awaitable, Callable, Dict, List, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from crewai.traces.context import TraceContext
|
||||
from crewai.traces.enums import CrewType, RunType, TraceType
|
||||
from crewai.traces.models import (
|
||||
CrewTrace,
|
||||
FlowStepIO,
|
||||
LLMRequest,
|
||||
LLMResponse,
|
||||
ToolCall,
|
||||
)
|
||||
|
||||
|
||||
class UnifiedTraceController:
|
||||
"""Controls and manages trace execution and recording.
|
||||
|
||||
This class handles the lifecycle of traces including creation, execution tracking,
|
||||
and recording of results for various types of operations (LLM calls, tool calls, flow steps).
|
||||
"""
|
||||
|
||||
_task_traces: Dict[str, List["UnifiedTraceController"]] = {}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
trace_type: TraceType,
|
||||
run_type: RunType,
|
||||
crew_type: CrewType,
|
||||
run_id: str,
|
||||
deployment_instance_id: str = os.environ.get(
|
||||
"CREWAI_DEPLOYMENT_INSTANCE_ID", ""
|
||||
),
|
||||
parent_trace_id: Optional[str] = None,
|
||||
agent_role: Optional[str] = "unknown",
|
||||
task_name: Optional[str] = None,
|
||||
task_description: Optional[str] = None,
|
||||
task_id: Optional[str] = None,
|
||||
flow_step: Dict[str, Any] = {},
|
||||
tool_calls: List[ToolCall] = [],
|
||||
**context: Any,
|
||||
) -> None:
|
||||
"""Initialize a new trace controller.
|
||||
|
||||
Args:
|
||||
trace_type: Type of trace being recorded.
|
||||
run_type: Type of run being executed.
|
||||
crew_type: Type of crew executing the trace.
|
||||
run_id: Unique identifier for the run.
|
||||
deployment_instance_id: Optional deployment instance identifier.
|
||||
parent_trace_id: Optional parent trace identifier for nested traces.
|
||||
agent_role: Role of the agent executing the trace.
|
||||
task_name: Optional name of the task being executed.
|
||||
task_description: Optional description of the task.
|
||||
task_id: Optional unique identifier for the task.
|
||||
flow_step: Optional flow step information.
|
||||
tool_calls: Optional list of tool calls made during execution.
|
||||
**context: Additional context parameters.
|
||||
"""
|
||||
self.trace_id = str(uuid4())
|
||||
self.run_id = run_id
|
||||
self.parent_trace_id = parent_trace_id
|
||||
self.trace_type = trace_type
|
||||
self.run_type = run_type
|
||||
self.crew_type = crew_type
|
||||
self.context = context
|
||||
self.agent_role = agent_role
|
||||
self.task_name = task_name
|
||||
self.task_description = task_description
|
||||
self.task_id = task_id
|
||||
self.deployment_instance_id = deployment_instance_id
|
||||
self.children: List[Dict[str, Any]] = []
|
||||
self.start_time: Optional[datetime] = None
|
||||
self.end_time: Optional[datetime] = None
|
||||
self.error: Optional[str] = None
|
||||
self.tool_calls = tool_calls
|
||||
self.flow_step = flow_step
|
||||
self.status: str = "running"
|
||||
|
||||
# Add trace to task's trace collection if task_id is present
|
||||
if task_id:
|
||||
self._add_to_task_traces()
|
||||
|
||||
def _add_to_task_traces(self) -> None:
|
||||
"""Add this trace to the task's trace collection."""
|
||||
if not hasattr(UnifiedTraceController, "_task_traces"):
|
||||
UnifiedTraceController._task_traces = {}
|
||||
|
||||
if self.task_id is None:
|
||||
return
|
||||
|
||||
if self.task_id not in UnifiedTraceController._task_traces:
|
||||
UnifiedTraceController._task_traces[self.task_id] = []
|
||||
|
||||
UnifiedTraceController._task_traces[self.task_id].append(self)
|
||||
|
||||
@classmethod
|
||||
def get_task_traces(cls, task_id: str) -> List["UnifiedTraceController"]:
|
||||
"""Get all traces for a specific task.
|
||||
|
||||
Args:
|
||||
task_id: The ID of the task to get traces for
|
||||
|
||||
Returns:
|
||||
List of traces associated with the task
|
||||
"""
|
||||
return cls._task_traces.get(task_id, [])
|
||||
|
||||
@classmethod
|
||||
def clear_task_traces(cls, task_id: str) -> None:
|
||||
"""Clear traces for a specific task.
|
||||
|
||||
Args:
|
||||
task_id: The ID of the task to clear traces for
|
||||
"""
|
||||
if hasattr(cls, "_task_traces") and task_id in cls._task_traces:
|
||||
del cls._task_traces[task_id]
|
||||
|
||||
def _get_current_trace(self) -> "UnifiedTraceController":
|
||||
return TraceContext.get_current()
|
||||
|
||||
def start_trace(self) -> "UnifiedTraceController":
|
||||
"""Start the trace execution.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: Self for method chaining.
|
||||
"""
|
||||
self.start_time = datetime.now(UTC)
|
||||
return self
|
||||
|
||||
def end_trace(self, result: Any = None, error: Optional[str] = None) -> None:
|
||||
"""End the trace execution and record results.
|
||||
|
||||
Args:
|
||||
result: Optional result from the trace execution.
|
||||
error: Optional error message if the trace failed.
|
||||
"""
|
||||
self.end_time = datetime.now(UTC)
|
||||
self.status = "error" if error else "completed"
|
||||
self.error = error
|
||||
self._record_trace(result)
|
||||
|
||||
def add_child_trace(self, child_trace: Dict[str, Any]) -> None:
|
||||
"""Add a child trace to this trace's execution history.
|
||||
|
||||
Args:
|
||||
child_trace: The child trace information to add.
|
||||
"""
|
||||
self.children.append(child_trace)
|
||||
|
||||
def to_crew_trace(self) -> CrewTrace:
|
||||
"""Convert to CrewTrace format for storage.
|
||||
|
||||
Returns:
|
||||
CrewTrace: The trace data in CrewTrace format.
|
||||
"""
|
||||
latency_ms = None
|
||||
|
||||
if self.tool_calls and hasattr(self.tool_calls[0], "start_time"):
|
||||
self.start_time = self.tool_calls[0].start_time
|
||||
|
||||
if self.start_time and self.end_time:
|
||||
latency_ms = int((self.end_time - self.start_time).total_seconds() * 1000)
|
||||
|
||||
request = None
|
||||
response = None
|
||||
flow_step_obj = None
|
||||
|
||||
if self.trace_type in [TraceType.LLM_CALL, TraceType.TOOL_CALL]:
|
||||
request = LLMRequest(
|
||||
model=self.context.get("model", "unknown"),
|
||||
messages=self.context.get("messages", []),
|
||||
temperature=self.context.get("temperature"),
|
||||
max_tokens=self.context.get("max_tokens"),
|
||||
stop_sequences=self.context.get("stop_sequences"),
|
||||
)
|
||||
if "response" in self.context:
|
||||
response = LLMResponse(
|
||||
content=self.context["response"].get("content", ""),
|
||||
finish_reason=self.context["response"].get("finish_reason"),
|
||||
)
|
||||
|
||||
elif self.trace_type == TraceType.FLOW_STEP:
|
||||
flow_step_obj = FlowStepIO(
|
||||
function_name=self.flow_step.get("function_name", "unknown"),
|
||||
inputs=self.flow_step.get("inputs", {}),
|
||||
outputs={"result": self.context.get("response")},
|
||||
metadata=self.flow_step.get("metadata", {}),
|
||||
)
|
||||
|
||||
return CrewTrace(
|
||||
deployment_instance_id=self.deployment_instance_id,
|
||||
trace_id=self.trace_id,
|
||||
task_id=self.task_id,
|
||||
run_id=self.run_id,
|
||||
agent_role=self.agent_role,
|
||||
task_name=self.task_name,
|
||||
task_description=self.task_description,
|
||||
trace_type=self.trace_type.value,
|
||||
crew_type=self.crew_type.value,
|
||||
run_type=self.run_type.value,
|
||||
start_time=self.start_time,
|
||||
end_time=self.end_time,
|
||||
latency_ms=latency_ms,
|
||||
request=request,
|
||||
response=response,
|
||||
flow_step=flow_step_obj,
|
||||
tool_calls=self.tool_calls,
|
||||
tokens_used=self.context.get("tokens_used"),
|
||||
prompt_tokens=self.context.get("prompt_tokens"),
|
||||
completion_tokens=self.context.get("completion_tokens"),
|
||||
status=self.status,
|
||||
error=self.error,
|
||||
)
|
||||
|
||||
def _record_trace(self, result: Any = None) -> None:
|
||||
"""Record the trace.
|
||||
|
||||
This method is called when a trace is completed. It ensures the trace
|
||||
is properly recorded and associated with its task if applicable.
|
||||
|
||||
Args:
|
||||
result: Optional result to include in the trace
|
||||
"""
|
||||
if result:
|
||||
self.context["response"] = result
|
||||
|
||||
# Add to task traces if this trace belongs to a task
|
||||
if self.task_id:
|
||||
self._add_to_task_traces()
|
||||
|
||||
|
||||
def should_trace() -> bool:
|
||||
"""Check if tracing is enabled via environment variable."""
|
||||
return os.getenv("CREWAI_ENABLE_TRACING", "false").lower() == "true"
|
||||
|
||||
|
||||
# Crew main trace
|
||||
def init_crew_main_trace(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||
"""Decorator to initialize and track the main crew execution trace.
|
||||
|
||||
This decorator sets up the trace context for the main crew execution,
|
||||
handling both synchronous and asynchronous crew operations.
|
||||
|
||||
Args:
|
||||
func: The crew function to be traced.
|
||||
|
||||
Returns:
|
||||
Wrapped function that creates and manages the main crew trace context.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
|
||||
if not should_trace():
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
trace = build_crew_main_trace(self)
|
||||
with TraceContext.set_current(trace):
|
||||
try:
|
||||
return func(self, *args, **kwargs)
|
||||
except Exception as e:
|
||||
trace.end_trace(error=str(e))
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def build_crew_main_trace(self: Any) -> "UnifiedTraceController":
|
||||
"""Build the main trace controller for a crew execution.
|
||||
|
||||
This function creates a trace controller configured for the main crew execution,
|
||||
handling different run types (kickoff, test, train) and maintaining context.
|
||||
|
||||
Args:
|
||||
self: The crew instance.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: The configured trace controller for the crew.
|
||||
"""
|
||||
run_type = RunType.KICKOFF
|
||||
if hasattr(self, "_test") and self._test:
|
||||
run_type = RunType.TEST
|
||||
elif hasattr(self, "_train") and self._train:
|
||||
run_type = RunType.TRAIN
|
||||
|
||||
current_trace = TraceContext.get_current()
|
||||
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=run_type,
|
||||
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
|
||||
run_id=current_trace.run_id if current_trace else str(self.id),
|
||||
parent_trace_id=current_trace.trace_id if current_trace else None,
|
||||
)
|
||||
return trace
|
||||
|
||||
|
||||
# Flow main trace
|
||||
def init_flow_main_trace(
|
||||
func: Callable[..., Awaitable[Any]],
|
||||
) -> Callable[..., Awaitable[Any]]:
|
||||
"""Decorator to initialize and track the main flow execution trace.
|
||||
|
||||
Args:
|
||||
func: The async flow function to be traced.
|
||||
|
||||
Returns:
|
||||
Wrapped async function that creates and manages the main flow trace context.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
async def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
|
||||
if not should_trace():
|
||||
return await func(self, *args, **kwargs)
|
||||
|
||||
trace = build_flow_main_trace(self, *args, **kwargs)
|
||||
with TraceContext.set_current(trace):
|
||||
try:
|
||||
return await func(self, *args, **kwargs)
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def build_flow_main_trace(
|
||||
self: Any, *args: Any, **kwargs: Any
|
||||
) -> "UnifiedTraceController":
|
||||
"""Build the main trace controller for a flow execution.
|
||||
|
||||
Args:
|
||||
self: The flow instance.
|
||||
*args: Variable positional arguments.
|
||||
**kwargs: Variable keyword arguments.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: The configured trace controller for the flow.
|
||||
"""
|
||||
current_trace = TraceContext.get_current()
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.FLOW_STEP,
|
||||
run_id=current_trace.run_id if current_trace else str(self.flow_id),
|
||||
parent_trace_id=current_trace.trace_id if current_trace else None,
|
||||
crew_type=CrewType.FLOW,
|
||||
run_type=RunType.KICKOFF,
|
||||
context={
|
||||
"crew_name": self.__class__.__name__,
|
||||
"inputs": kwargs.get("inputs", {}),
|
||||
"agents": [],
|
||||
"tasks": [],
|
||||
},
|
||||
)
|
||||
return trace
|
||||
|
||||
|
||||
# Flow step trace
|
||||
def trace_flow_step(
|
||||
func: Callable[..., Awaitable[Any]],
|
||||
) -> Callable[..., Awaitable[Any]]:
|
||||
"""Decorator to trace individual flow step executions.
|
||||
|
||||
Args:
|
||||
func: The async flow step function to be traced.
|
||||
|
||||
Returns:
|
||||
Wrapped async function that creates and manages the flow step trace context.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
async def wrapper(
|
||||
self: Any,
|
||||
method_name: str,
|
||||
method: Callable[..., Any],
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
if not should_trace():
|
||||
return await func(self, method_name, method, *args, **kwargs)
|
||||
|
||||
trace = build_flow_step_trace(self, method_name, method, *args, **kwargs)
|
||||
with TraceContext.set_current(trace):
|
||||
trace.start_trace()
|
||||
try:
|
||||
result = await func(self, method_name, method, *args, **kwargs)
|
||||
trace.end_trace(result=result)
|
||||
return result
|
||||
except Exception as e:
|
||||
trace.end_trace(error=str(e))
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def build_flow_step_trace(
|
||||
self: Any, method_name: str, method: Callable[..., Any], *args: Any, **kwargs: Any
|
||||
) -> "UnifiedTraceController":
|
||||
"""Build a trace controller for an individual flow step.
|
||||
|
||||
Args:
|
||||
self: The flow instance.
|
||||
method_name: Name of the method being executed.
|
||||
method: The actual method being executed.
|
||||
*args: Variable positional arguments.
|
||||
**kwargs: Variable keyword arguments.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: The configured trace controller for the flow step.
|
||||
"""
|
||||
current_trace = TraceContext.get_current()
|
||||
|
||||
# Get method signature
|
||||
sig = inspect.signature(method)
|
||||
params = list(sig.parameters.values())
|
||||
|
||||
# Create inputs dictionary mapping parameter names to values
|
||||
method_params = [p for p in params if p.name != "self"]
|
||||
inputs: Dict[str, Any] = {}
|
||||
|
||||
# Map positional args to their parameter names
|
||||
for i, param in enumerate(method_params):
|
||||
if i < len(args):
|
||||
inputs[param.name] = args[i]
|
||||
|
||||
# Add keyword arguments
|
||||
inputs.update(kwargs)
|
||||
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.FLOW_STEP,
|
||||
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
|
||||
crew_type=current_trace.crew_type if current_trace else CrewType.FLOW,
|
||||
run_id=current_trace.run_id if current_trace else str(self.flow_id),
|
||||
parent_trace_id=current_trace.trace_id if current_trace else None,
|
||||
flow_step={
|
||||
"function_name": method_name,
|
||||
"inputs": inputs,
|
||||
"metadata": {
|
||||
"crew_name": self.__class__.__name__,
|
||||
},
|
||||
},
|
||||
)
|
||||
return trace
|
||||
|
||||
|
||||
# LLM trace
|
||||
def trace_llm_call(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||
"""Decorator to trace LLM calls.
|
||||
|
||||
Args:
|
||||
func: The function to trace.
|
||||
|
||||
Returns:
|
||||
Wrapped function that creates and manages the LLM call trace context.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
|
||||
if not should_trace():
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
trace = build_llm_trace(self, *args, **kwargs)
|
||||
with TraceContext.set_current(trace):
|
||||
trace.start_trace()
|
||||
try:
|
||||
response = func(self, *args, **kwargs)
|
||||
# Extract relevant data from response
|
||||
trace_response = {
|
||||
"content": response["choices"][0]["message"]["content"],
|
||||
"finish_reason": response["choices"][0].get("finish_reason"),
|
||||
}
|
||||
|
||||
# Add usage metrics to context
|
||||
if "usage" in response:
|
||||
trace.context["tokens_used"] = response["usage"].get(
|
||||
"total_tokens", 0
|
||||
)
|
||||
trace.context["prompt_tokens"] = response["usage"].get(
|
||||
"prompt_tokens", 0
|
||||
)
|
||||
trace.context["completion_tokens"] = response["usage"].get(
|
||||
"completion_tokens", 0
|
||||
)
|
||||
|
||||
trace.end_trace(trace_response)
|
||||
return response
|
||||
except Exception as e:
|
||||
trace.end_trace(error=str(e))
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def build_llm_trace(
|
||||
self: Any, params: Dict[str, Any], *args: Any, **kwargs: Any
|
||||
) -> Any:
|
||||
"""Build a trace controller for an LLM call.
|
||||
|
||||
Args:
|
||||
self: The LLM instance.
|
||||
params: The parameters for the LLM call.
|
||||
*args: Variable positional arguments.
|
||||
**kwargs: Variable keyword arguments.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: The configured trace controller for the LLM call.
|
||||
"""
|
||||
current_trace = TraceContext.get_current()
|
||||
agent, task = self._get_execution_context()
|
||||
|
||||
# Get new messages and tool results
|
||||
new_messages = self._get_new_messages(params.get("messages", []))
|
||||
new_tool_results = self._get_new_tool_results(agent)
|
||||
|
||||
# Create trace context
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.TOOL_CALL if new_tool_results else TraceType.LLM_CALL,
|
||||
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
|
||||
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
|
||||
run_id=current_trace.run_id if current_trace else str(uuid4()),
|
||||
parent_trace_id=current_trace.trace_id if current_trace else None,
|
||||
agent_role=agent.role if agent else "unknown",
|
||||
task_id=str(task.id) if task else None,
|
||||
task_name=task.name if task else None,
|
||||
task_description=task.description if task else None,
|
||||
model=self.model,
|
||||
messages=new_messages,
|
||||
temperature=self.temperature,
|
||||
max_tokens=self.max_tokens,
|
||||
stop_sequences=self.stop,
|
||||
tool_calls=[
|
||||
ToolCall(
|
||||
name=result["tool_name"],
|
||||
arguments=result["tool_args"],
|
||||
output=str(result["result"]),
|
||||
start_time=result.get("start_time", ""),
|
||||
end_time=datetime.now(UTC),
|
||||
)
|
||||
for result in new_tool_results
|
||||
],
|
||||
)
|
||||
return trace
|
||||
@@ -34,6 +34,7 @@ from .tool_usage_events import (
|
||||
ToolUsageEvent,
|
||||
ToolValidateInputErrorEvent,
|
||||
)
|
||||
from .llm_events import LLMCallCompletedEvent, LLMCallFailedEvent, LLMCallStartedEvent
|
||||
|
||||
# events
|
||||
from .event_listener import EventListener
|
||||
|
||||
@@ -1,9 +1,17 @@
|
||||
from pydantic import PrivateAttr
|
||||
from typing import Any, Dict
|
||||
|
||||
from pydantic import Field, PrivateAttr
|
||||
|
||||
from crewai.task import Task
|
||||
from crewai.telemetry.telemetry import Telemetry
|
||||
from crewai.utilities import Logger
|
||||
from crewai.utilities.constants import EMITTER_COLOR
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
from crewai.utilities.events.llm_events import (
|
||||
LLMCallCompletedEvent,
|
||||
LLMCallFailedEvent,
|
||||
LLMCallStartedEvent,
|
||||
)
|
||||
|
||||
from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
|
||||
from .crew_events import (
|
||||
@@ -37,6 +45,7 @@ class EventListener(BaseEventListener):
|
||||
_instance = None
|
||||
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
|
||||
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
|
||||
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
@@ -49,6 +58,7 @@ class EventListener(BaseEventListener):
|
||||
super().__init__()
|
||||
self._telemetry = Telemetry()
|
||||
self._telemetry.set_tracer()
|
||||
self.execution_spans = {}
|
||||
self._initialized = True
|
||||
|
||||
# ----------- CREW EVENTS -----------
|
||||
@@ -57,7 +67,7 @@ class EventListener(BaseEventListener):
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def on_crew_started(source, event: CrewKickoffStartedEvent):
|
||||
self.logger.log(
|
||||
f"🚀 Crew '{event.crew_name}' started",
|
||||
f"🚀 Crew '{event.crew_name}' started, {source.id}",
|
||||
event.timestamp,
|
||||
)
|
||||
self._telemetry.crew_execution_span(source, event.inputs)
|
||||
@@ -67,28 +77,28 @@ class EventListener(BaseEventListener):
|
||||
final_string_output = event.output.raw
|
||||
self._telemetry.end_crew(source, final_string_output)
|
||||
self.logger.log(
|
||||
f"✅ Crew '{event.crew_name}' completed",
|
||||
f"✅ Crew '{event.crew_name}' completed, {source.id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(CrewKickoffFailedEvent)
|
||||
def on_crew_failed(source, event: CrewKickoffFailedEvent):
|
||||
self.logger.log(
|
||||
f"❌ Crew '{event.crew_name}' failed",
|
||||
f"❌ Crew '{event.crew_name}' failed, {source.id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(CrewTestStartedEvent)
|
||||
def on_crew_test_started(source, event: CrewTestStartedEvent):
|
||||
cloned_crew = source.copy()
|
||||
cloned_crew._telemetry.test_execution_span(
|
||||
self._telemetry.test_execution_span(
|
||||
cloned_crew,
|
||||
event.n_iterations,
|
||||
event.inputs,
|
||||
event.eval_llm,
|
||||
event.eval_llm or "",
|
||||
)
|
||||
self.logger.log(
|
||||
f"🚀 Crew '{event.crew_name}' started test",
|
||||
f"🚀 Crew '{event.crew_name}' started test, {source.id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@@ -131,9 +141,9 @@ class EventListener(BaseEventListener):
|
||||
|
||||
@crewai_event_bus.on(TaskStartedEvent)
|
||||
def on_task_started(source, event: TaskStartedEvent):
|
||||
source._execution_span = self._telemetry.task_started(
|
||||
crew=source.agent.crew, task=source
|
||||
)
|
||||
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
|
||||
self.execution_spans[source] = span
|
||||
|
||||
self.logger.log(
|
||||
f"📋 Task started: {source.description}",
|
||||
event.timestamp,
|
||||
@@ -141,24 +151,22 @@ class EventListener(BaseEventListener):
|
||||
|
||||
@crewai_event_bus.on(TaskCompletedEvent)
|
||||
def on_task_completed(source, event: TaskCompletedEvent):
|
||||
if source._execution_span:
|
||||
self._telemetry.task_ended(
|
||||
source._execution_span, source, source.agent.crew
|
||||
)
|
||||
span = self.execution_spans.get(source)
|
||||
if span:
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.logger.log(
|
||||
f"✅ Task completed: {source.description}",
|
||||
event.timestamp,
|
||||
)
|
||||
source._execution_span = None
|
||||
self.execution_spans[source] = None
|
||||
|
||||
@crewai_event_bus.on(TaskFailedEvent)
|
||||
def on_task_failed(source, event: TaskFailedEvent):
|
||||
if source._execution_span:
|
||||
span = self.execution_spans.get(source)
|
||||
if span:
|
||||
if source.agent and source.agent.crew:
|
||||
self._telemetry.task_ended(
|
||||
source._execution_span, source, source.agent.crew
|
||||
)
|
||||
source._execution_span = None
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.execution_spans[source] = None
|
||||
self.logger.log(
|
||||
f"❌ Task failed: {source.description}",
|
||||
event.timestamp,
|
||||
@@ -184,7 +192,7 @@ class EventListener(BaseEventListener):
|
||||
|
||||
@crewai_event_bus.on(FlowCreatedEvent)
|
||||
def on_flow_created(source, event: FlowCreatedEvent):
|
||||
self._telemetry.flow_creation_span(self.__class__.__name__)
|
||||
self._telemetry.flow_creation_span(event.flow_name)
|
||||
self.logger.log(
|
||||
f"🌊 Flow Created: '{event.flow_name}'",
|
||||
event.timestamp,
|
||||
@@ -193,17 +201,17 @@ class EventListener(BaseEventListener):
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def on_flow_started(source, event: FlowStartedEvent):
|
||||
self._telemetry.flow_execution_span(
|
||||
source.__class__.__name__, list(source._methods.keys())
|
||||
event.flow_name, list(source._methods.keys())
|
||||
)
|
||||
self.logger.log(
|
||||
f"🤖 Flow Started: '{event.flow_name}'",
|
||||
f"🤖 Flow Started: '{event.flow_name}', {source.flow_id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(FlowFinishedEvent)
|
||||
def on_flow_finished(source, event: FlowFinishedEvent):
|
||||
self.logger.log(
|
||||
f"👍 Flow Finished: '{event.flow_name}'",
|
||||
f"👍 Flow Finished: '{event.flow_name}', {source.flow_id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@@ -253,5 +261,28 @@ class EventListener(BaseEventListener):
|
||||
#
|
||||
)
|
||||
|
||||
# ----------- LLM EVENTS -----------
|
||||
|
||||
@crewai_event_bus.on(LLMCallStartedEvent)
|
||||
def on_llm_call_started(source, event: LLMCallStartedEvent):
|
||||
self.logger.log(
|
||||
f"🤖 LLM Call Started",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(LLMCallCompletedEvent)
|
||||
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
|
||||
self.logger.log(
|
||||
f"✅ LLM Call Completed",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(LLMCallFailedEvent)
|
||||
def on_llm_call_failed(source, event: LLMCallFailedEvent):
|
||||
self.logger.log(
|
||||
f"❌ LLM Call Failed: '{event.error}'",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
|
||||
event_listener = EventListener()
|
||||
|
||||
36
src/crewai/utilities/events/llm_events.py
Normal file
36
src/crewai/utilities/events/llm_events.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from crewai.utilities.events.base_events import CrewEvent
|
||||
|
||||
|
||||
class LLMCallType(Enum):
|
||||
"""Type of LLM call being made"""
|
||||
|
||||
TOOL_CALL = "tool_call"
|
||||
LLM_CALL = "llm_call"
|
||||
|
||||
|
||||
class LLMCallStartedEvent(CrewEvent):
|
||||
"""Event emitted when a LLM call starts"""
|
||||
|
||||
type: str = "llm_call_started"
|
||||
messages: Union[str, List[Dict[str, str]]]
|
||||
tools: Optional[List[dict]] = None
|
||||
callbacks: Optional[List[Any]] = None
|
||||
available_functions: Optional[Dict[str, Any]] = None
|
||||
|
||||
|
||||
class LLMCallCompletedEvent(CrewEvent):
|
||||
"""Event emitted when a LLM call completes"""
|
||||
|
||||
type: str = "llm_call_completed"
|
||||
response: Any
|
||||
call_type: LLMCallType
|
||||
|
||||
|
||||
class LLMCallFailedEvent(CrewEvent):
|
||||
"""Event emitted when a LLM call fails"""
|
||||
|
||||
error: str
|
||||
type: str = "llm_call_failed"
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Any, Optional
|
||||
from typing import Optional
|
||||
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.utilities.events.base_events import CrewEvent
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class AgentExecutorProtocol(Protocol):
|
||||
"""Protocol defining the expected interface for an agent executor."""
|
||||
|
||||
@property
|
||||
def agent(self) -> Any: ...
|
||||
|
||||
@property
|
||||
def task(self) -> Any: ...
|
||||
@@ -915,8 +915,6 @@ def test_tool_result_as_answer_is_the_final_answer_for_the_agent():
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_tool_usage_information_is_appended_to_agent():
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
|
||||
class MyCustomTool(BaseTool):
|
||||
@@ -926,36 +924,30 @@ def test_tool_usage_information_is_appended_to_agent():
|
||||
def _run(self) -> str:
|
||||
return "Howdy!"
|
||||
|
||||
fixed_datetime = datetime(2025, 2, 10, 12, 0, 0, tzinfo=UTC)
|
||||
with patch("datetime.datetime") as mock_datetime:
|
||||
mock_datetime.now.return_value = fixed_datetime
|
||||
mock_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw)
|
||||
agent1 = Agent(
|
||||
role="Friendly Neighbor",
|
||||
goal="Make everyone feel welcome",
|
||||
backstory="You are the friendly neighbor",
|
||||
tools=[MyCustomTool(result_as_answer=True)],
|
||||
)
|
||||
|
||||
agent1 = Agent(
|
||||
role="Friendly Neighbor",
|
||||
goal="Make everyone feel welcome",
|
||||
backstory="You are the friendly neighbor",
|
||||
tools=[MyCustomTool(result_as_answer=True)],
|
||||
)
|
||||
greeting = Task(
|
||||
description="Say an appropriate greeting.",
|
||||
expected_output="The greeting.",
|
||||
agent=agent1,
|
||||
)
|
||||
tasks = [greeting]
|
||||
crew = Crew(agents=[agent1], tasks=tasks)
|
||||
|
||||
greeting = Task(
|
||||
description="Say an appropriate greeting.",
|
||||
expected_output="The greeting.",
|
||||
agent=agent1,
|
||||
)
|
||||
tasks = [greeting]
|
||||
crew = Crew(agents=[agent1], tasks=tasks)
|
||||
|
||||
crew.kickoff()
|
||||
assert agent1.tools_results == [
|
||||
{
|
||||
"result": "Howdy!",
|
||||
"tool_name": "Decide Greetings",
|
||||
"tool_args": {},
|
||||
"result_as_answer": True,
|
||||
"start_time": fixed_datetime,
|
||||
}
|
||||
]
|
||||
crew.kickoff()
|
||||
assert agent1.tools_results == [
|
||||
{
|
||||
"result": "Howdy!",
|
||||
"tool_name": "Decide Greetings",
|
||||
"tool_args": {},
|
||||
"result_as_answer": True,
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def test_agent_definition_based_on_dict():
|
||||
|
||||
@@ -833,6 +833,12 @@ def test_crew_verbose_output(capsys):
|
||||
|
||||
crew.kickoff()
|
||||
captured = capsys.readouterr()
|
||||
|
||||
# Filter out event listener logs (lines starting with '[')
|
||||
filtered_output = "\n".join(
|
||||
line for line in captured.out.split("\n") if not line.startswith("[")
|
||||
)
|
||||
|
||||
expected_strings = [
|
||||
"\x1b[1m\x1b[95m# Agent:\x1b[00m \x1b[1m\x1b[92mResearcher",
|
||||
"\x1b[00m\n\x1b[95m## Task:\x1b[00m \x1b[92mResearch AI advancements.",
|
||||
@@ -845,27 +851,19 @@ def test_crew_verbose_output(capsys):
|
||||
]
|
||||
|
||||
for expected_string in expected_strings:
|
||||
assert expected_string in captured.out
|
||||
assert expected_string in filtered_output
|
||||
|
||||
# Now test with verbose set to False
|
||||
crew.verbose = False
|
||||
crew._logger = Logger(verbose=False)
|
||||
crew.kickoff()
|
||||
expected_listener_logs = [
|
||||
"[🚀 CREW 'CREW' STARTED]",
|
||||
"[📋 TASK STARTED: RESEARCH AI ADVANCEMENTS.]",
|
||||
"[🤖 AGENT 'RESEARCHER' STARTED TASK]",
|
||||
"[✅ AGENT 'RESEARCHER' COMPLETED TASK]",
|
||||
"[✅ TASK COMPLETED: RESEARCH AI ADVANCEMENTS.]",
|
||||
"[📋 TASK STARTED: WRITE ABOUT AI IN HEALTHCARE.]",
|
||||
"[🤖 AGENT 'SENIOR WRITER' STARTED TASK]",
|
||||
"[✅ AGENT 'SENIOR WRITER' COMPLETED TASK]",
|
||||
"[✅ TASK COMPLETED: WRITE ABOUT AI IN HEALTHCARE.]",
|
||||
"[✅ CREW 'CREW' COMPLETED]",
|
||||
]
|
||||
captured = capsys.readouterr()
|
||||
for log in expected_listener_logs:
|
||||
assert log in captured.out
|
||||
filtered_output = "\n".join(
|
||||
line
|
||||
for line in captured.out.split("\n")
|
||||
if not line.startswith("[") and line.strip() and not line.startswith("\x1b")
|
||||
)
|
||||
assert filtered_output == ""
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
|
||||
@@ -1,360 +0,0 @@
|
||||
import os
|
||||
from datetime import UTC, datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import UUID
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.traces.context import TraceContext
|
||||
from crewai.traces.enums import CrewType, RunType, TraceType
|
||||
from crewai.traces.models import (
|
||||
CrewTrace,
|
||||
FlowStepIO,
|
||||
LLMRequest,
|
||||
LLMResponse,
|
||||
)
|
||||
from crewai.traces.unified_trace_controller import (
|
||||
UnifiedTraceController,
|
||||
init_crew_main_trace,
|
||||
init_flow_main_trace,
|
||||
should_trace,
|
||||
trace_flow_step,
|
||||
trace_llm_call,
|
||||
)
|
||||
|
||||
|
||||
class TestUnifiedTraceController:
|
||||
@pytest.fixture
|
||||
def basic_trace_controller(self):
|
||||
return UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.CREW,
|
||||
run_id="test-run-id",
|
||||
agent_role="test-agent",
|
||||
task_name="test-task",
|
||||
task_description="test description",
|
||||
task_id="test-task-id",
|
||||
)
|
||||
|
||||
def test_initialization(self, basic_trace_controller):
|
||||
"""Test basic initialization of UnifiedTraceController"""
|
||||
assert basic_trace_controller.trace_type == TraceType.LLM_CALL
|
||||
assert basic_trace_controller.run_type == RunType.KICKOFF
|
||||
assert basic_trace_controller.crew_type == CrewType.CREW
|
||||
assert basic_trace_controller.run_id == "test-run-id"
|
||||
assert basic_trace_controller.agent_role == "test-agent"
|
||||
assert basic_trace_controller.task_name == "test-task"
|
||||
assert basic_trace_controller.task_description == "test description"
|
||||
assert basic_trace_controller.task_id == "test-task-id"
|
||||
assert basic_trace_controller.status == "running"
|
||||
assert isinstance(UUID(basic_trace_controller.trace_id), UUID)
|
||||
|
||||
def test_start_trace(self, basic_trace_controller):
|
||||
"""Test starting a trace"""
|
||||
result = basic_trace_controller.start_trace()
|
||||
assert result == basic_trace_controller
|
||||
assert basic_trace_controller.start_time is not None
|
||||
assert isinstance(basic_trace_controller.start_time, datetime)
|
||||
|
||||
def test_end_trace_success(self, basic_trace_controller):
|
||||
"""Test ending a trace successfully"""
|
||||
basic_trace_controller.start_trace()
|
||||
basic_trace_controller.end_trace(result={"test": "result"})
|
||||
|
||||
assert basic_trace_controller.end_time is not None
|
||||
assert basic_trace_controller.status == "completed"
|
||||
assert basic_trace_controller.error is None
|
||||
assert basic_trace_controller.context.get("response") == {"test": "result"}
|
||||
|
||||
def test_end_trace_with_error(self, basic_trace_controller):
|
||||
"""Test ending a trace with an error"""
|
||||
basic_trace_controller.start_trace()
|
||||
basic_trace_controller.end_trace(error="Test error occurred")
|
||||
|
||||
assert basic_trace_controller.end_time is not None
|
||||
assert basic_trace_controller.status == "error"
|
||||
assert basic_trace_controller.error == "Test error occurred"
|
||||
|
||||
def test_add_child_trace(self, basic_trace_controller):
|
||||
"""Test adding a child trace"""
|
||||
child_trace = {"id": "child-1", "type": "test"}
|
||||
basic_trace_controller.add_child_trace(child_trace)
|
||||
assert len(basic_trace_controller.children) == 1
|
||||
assert basic_trace_controller.children[0] == child_trace
|
||||
|
||||
def test_to_crew_trace_llm_call(self):
|
||||
"""Test converting to CrewTrace for LLM call"""
|
||||
test_messages = [{"role": "user", "content": "test"}]
|
||||
test_response = {
|
||||
"content": "test response",
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
|
||||
controller = UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.CREW,
|
||||
run_id="test-run-id",
|
||||
context={
|
||||
"messages": test_messages,
|
||||
"temperature": 0.7,
|
||||
"max_tokens": 100,
|
||||
},
|
||||
)
|
||||
|
||||
# Set model and messages in the context
|
||||
controller.context["model"] = "gpt-4"
|
||||
controller.context["messages"] = test_messages
|
||||
|
||||
controller.start_trace()
|
||||
controller.end_trace(result=test_response)
|
||||
|
||||
crew_trace = controller.to_crew_trace()
|
||||
assert isinstance(crew_trace, CrewTrace)
|
||||
assert isinstance(crew_trace.request, LLMRequest)
|
||||
assert isinstance(crew_trace.response, LLMResponse)
|
||||
assert crew_trace.request.model == "gpt-4"
|
||||
assert crew_trace.request.messages == test_messages
|
||||
assert crew_trace.response.content == test_response["content"]
|
||||
assert crew_trace.response.finish_reason == test_response["finish_reason"]
|
||||
|
||||
def test_to_crew_trace_flow_step(self):
|
||||
"""Test converting to CrewTrace for flow step"""
|
||||
flow_step_data = {
|
||||
"function_name": "test_function",
|
||||
"inputs": {"param1": "value1"},
|
||||
"metadata": {"meta": "data"},
|
||||
}
|
||||
|
||||
controller = UnifiedTraceController(
|
||||
trace_type=TraceType.FLOW_STEP,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.FLOW,
|
||||
run_id="test-run-id",
|
||||
flow_step=flow_step_data,
|
||||
)
|
||||
|
||||
controller.start_trace()
|
||||
controller.end_trace(result="test result")
|
||||
|
||||
crew_trace = controller.to_crew_trace()
|
||||
assert isinstance(crew_trace, CrewTrace)
|
||||
assert isinstance(crew_trace.flow_step, FlowStepIO)
|
||||
assert crew_trace.flow_step.function_name == "test_function"
|
||||
assert crew_trace.flow_step.inputs == {"param1": "value1"}
|
||||
assert crew_trace.flow_step.outputs == {"result": "test result"}
|
||||
|
||||
def test_should_trace(self):
|
||||
"""Test should_trace function"""
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
assert should_trace() is True
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "false"}):
|
||||
assert should_trace() is False
|
||||
|
||||
with patch.dict(os.environ, clear=True):
|
||||
assert should_trace() is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_trace_flow_step_decorator(self):
|
||||
"""Test trace_flow_step decorator"""
|
||||
|
||||
class TestFlow:
|
||||
flow_id = "test-flow-id"
|
||||
|
||||
@trace_flow_step
|
||||
async def test_method(self, method_name, method, *args, **kwargs):
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
flow = TestFlow()
|
||||
result = await flow.test_method("test_method", lambda x: x, arg1="value1")
|
||||
assert result == "test result"
|
||||
|
||||
def test_trace_llm_call_decorator(self):
|
||||
"""Test trace_llm_call decorator"""
|
||||
|
||||
class TestLLM:
|
||||
model = "gpt-4"
|
||||
temperature = 0.7
|
||||
max_tokens = 100
|
||||
stop = None
|
||||
|
||||
def _get_execution_context(self):
|
||||
return MagicMock(), MagicMock()
|
||||
|
||||
def _get_new_messages(self, messages):
|
||||
return messages
|
||||
|
||||
def _get_new_tool_results(self, agent):
|
||||
return []
|
||||
|
||||
@trace_llm_call
|
||||
def test_method(self, params):
|
||||
return {
|
||||
"choices": [
|
||||
{
|
||||
"message": {"content": "test response"},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
"usage": {
|
||||
"total_tokens": 50,
|
||||
"prompt_tokens": 20,
|
||||
"completion_tokens": 30,
|
||||
},
|
||||
}
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
llm = TestLLM()
|
||||
result = llm.test_method({"messages": []})
|
||||
assert result["choices"][0]["message"]["content"] == "test response"
|
||||
|
||||
def test_init_crew_main_trace_kickoff(self):
|
||||
"""Test init_crew_main_trace in kickoff mode"""
|
||||
trace_context = None
|
||||
|
||||
class TestCrew:
|
||||
id = "test-crew-id"
|
||||
_test = False
|
||||
_train = False
|
||||
|
||||
@init_crew_main_trace
|
||||
def test_method(self):
|
||||
nonlocal trace_context
|
||||
trace_context = TraceContext.get_current()
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
crew = TestCrew()
|
||||
result = test_method(crew)
|
||||
assert result == "test result"
|
||||
assert trace_context is not None
|
||||
assert trace_context.trace_type == TraceType.LLM_CALL
|
||||
assert trace_context.run_type == RunType.KICKOFF
|
||||
assert trace_context.crew_type == CrewType.CREW
|
||||
assert trace_context.run_id == str(crew.id)
|
||||
|
||||
def test_init_crew_main_trace_test_mode(self):
|
||||
"""Test init_crew_main_trace in test mode"""
|
||||
trace_context = None
|
||||
|
||||
class TestCrew:
|
||||
id = "test-crew-id"
|
||||
_test = True
|
||||
_train = False
|
||||
|
||||
@init_crew_main_trace
|
||||
def test_method(self):
|
||||
nonlocal trace_context
|
||||
trace_context = TraceContext.get_current()
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
crew = TestCrew()
|
||||
result = test_method(crew)
|
||||
assert result == "test result"
|
||||
assert trace_context is not None
|
||||
assert trace_context.run_type == RunType.TEST
|
||||
|
||||
def test_init_crew_main_trace_train_mode(self):
|
||||
"""Test init_crew_main_trace in train mode"""
|
||||
trace_context = None
|
||||
|
||||
class TestCrew:
|
||||
id = "test-crew-id"
|
||||
_test = False
|
||||
_train = True
|
||||
|
||||
@init_crew_main_trace
|
||||
def test_method(self):
|
||||
nonlocal trace_context
|
||||
trace_context = TraceContext.get_current()
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
crew = TestCrew()
|
||||
result = test_method(crew)
|
||||
assert result == "test result"
|
||||
assert trace_context is not None
|
||||
assert trace_context.run_type == RunType.TRAIN
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_init_flow_main_trace(self):
|
||||
"""Test init_flow_main_trace decorator"""
|
||||
trace_context = None
|
||||
test_inputs = {"test": "input"}
|
||||
|
||||
class TestFlow:
|
||||
flow_id = "test-flow-id"
|
||||
|
||||
@init_flow_main_trace
|
||||
async def test_method(self, **kwargs):
|
||||
nonlocal trace_context
|
||||
trace_context = TraceContext.get_current()
|
||||
# Verify the context is set during execution
|
||||
assert trace_context.context["context"]["inputs"] == test_inputs
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
flow = TestFlow()
|
||||
result = await flow.test_method(inputs=test_inputs)
|
||||
assert result == "test result"
|
||||
assert trace_context is not None
|
||||
assert trace_context.trace_type == TraceType.FLOW_STEP
|
||||
assert trace_context.crew_type == CrewType.FLOW
|
||||
assert trace_context.run_type == RunType.KICKOFF
|
||||
assert trace_context.run_id == str(flow.flow_id)
|
||||
assert trace_context.context["context"]["inputs"] == test_inputs
|
||||
|
||||
def test_trace_context_management(self):
|
||||
"""Test TraceContext management"""
|
||||
trace1 = UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.CREW,
|
||||
run_id="test-run-1",
|
||||
)
|
||||
|
||||
trace2 = UnifiedTraceController(
|
||||
trace_type=TraceType.FLOW_STEP,
|
||||
run_type=RunType.TEST,
|
||||
crew_type=CrewType.FLOW,
|
||||
run_id="test-run-2",
|
||||
)
|
||||
|
||||
# Test that context is initially empty
|
||||
assert TraceContext.get_current() is None
|
||||
|
||||
# Test setting and getting context
|
||||
with TraceContext.set_current(trace1):
|
||||
assert TraceContext.get_current() == trace1
|
||||
|
||||
# Test nested context
|
||||
with TraceContext.set_current(trace2):
|
||||
assert TraceContext.get_current() == trace2
|
||||
|
||||
# Test context restoration after nested block
|
||||
assert TraceContext.get_current() == trace1
|
||||
|
||||
# Test context cleanup after with block
|
||||
assert TraceContext.get_current() is None
|
||||
|
||||
def test_trace_context_error_handling(self):
|
||||
"""Test TraceContext error handling"""
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.CREW,
|
||||
run_id="test-run",
|
||||
)
|
||||
|
||||
# Test that context is properly cleaned up even if an error occurs
|
||||
try:
|
||||
with TraceContext.set_current(trace):
|
||||
raise ValueError("Test error")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
assert TraceContext.get_current() is None
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,236 @@
|
||||
interactions:
|
||||
- request:
|
||||
body: '{"messages": [{"role": "system", "content": "You are base_agent. You are
|
||||
a helpful assistant that just says hi\nYour personal goal is: Just say hi\nTo
|
||||
give my best complete final answer to the task respond using the exact following
|
||||
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
|
||||
answer must be the great and the most complete as possible, it must be outcome
|
||||
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
|
||||
"content": "\nCurrent Task: Just say hi\n\nThis is the expected criteria for
|
||||
your final answer: hi\nyou MUST return the actual complete content as the final
|
||||
answer, not a summary.\n\nBegin! This is VERY important to you, use the tools
|
||||
available and give your best Final Answer, your job depends on it!\n\nThought:"}],
|
||||
"model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
|
||||
headers:
|
||||
accept:
|
||||
- application/json
|
||||
accept-encoding:
|
||||
- gzip, deflate
|
||||
connection:
|
||||
- keep-alive
|
||||
content-length:
|
||||
- '838'
|
||||
content-type:
|
||||
- application/json
|
||||
host:
|
||||
- api.openai.com
|
||||
user-agent:
|
||||
- OpenAI/Python 1.61.0
|
||||
x-stainless-arch:
|
||||
- arm64
|
||||
x-stainless-async:
|
||||
- 'false'
|
||||
x-stainless-lang:
|
||||
- python
|
||||
x-stainless-os:
|
||||
- MacOS
|
||||
x-stainless-package-version:
|
||||
- 1.61.0
|
||||
x-stainless-raw-response:
|
||||
- 'true'
|
||||
x-stainless-retry-count:
|
||||
- '0'
|
||||
x-stainless-runtime:
|
||||
- CPython
|
||||
x-stainless-runtime-version:
|
||||
- 3.12.8
|
||||
method: POST
|
||||
uri: https://api.openai.com/v1/chat/completions
|
||||
response:
|
||||
content: "{\n \"id\": \"chatcmpl-B4VsaBZ4ec4b0ab4pkqWgyxTFVVfc\",\n \"object\":
|
||||
\"chat.completion\",\n \"created\": 1740415556,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
|
||||
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
|
||||
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
|
||||
Answer: hi\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
|
||||
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
|
||||
161,\n \"completion_tokens\": 12,\n \"total_tokens\": 173,\n \"prompt_tokens_details\":
|
||||
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
|
||||
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
|
||||
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
|
||||
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
|
||||
headers:
|
||||
CF-RAY:
|
||||
- 9170edc5da6f230e-SJC
|
||||
Connection:
|
||||
- keep-alive
|
||||
Content-Encoding:
|
||||
- gzip
|
||||
Content-Type:
|
||||
- application/json
|
||||
Date:
|
||||
- Mon, 24 Feb 2025 16:45:57 GMT
|
||||
Server:
|
||||
- cloudflare
|
||||
Set-Cookie:
|
||||
- __cf_bm=lvRw4Nyef7N35to64fj2_kHDfbZp0KSFbwgF5chYMRI-1740415557-1.0.1.1-o5BaN1FpBwv5Wq6zIlv0rCB28lk5hVI9wZQWU3pig1jgyAKDkYzTwZ0MlSR6v6TPIX9RfepjrO3.Gk3FEmcVRw;
|
||||
path=/; expires=Mon, 24-Feb-25 17:15:57 GMT; domain=.api.openai.com; HttpOnly;
|
||||
Secure; SameSite=None
|
||||
- _cfuvid=ySaVoTQvAcQyH5QoJQJDj75e5j8HwGFPOlFMAWEvXJk-1740415557302-0.0.1.1-604800000;
|
||||
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
X-Content-Type-Options:
|
||||
- nosniff
|
||||
access-control-expose-headers:
|
||||
- X-Request-ID
|
||||
alt-svc:
|
||||
- h3=":443"; ma=86400
|
||||
cf-cache-status:
|
||||
- DYNAMIC
|
||||
openai-organization:
|
||||
- crewai-iuxna1
|
||||
openai-processing-ms:
|
||||
- '721'
|
||||
openai-version:
|
||||
- '2020-10-01'
|
||||
strict-transport-security:
|
||||
- max-age=31536000; includeSubDomains; preload
|
||||
x-ratelimit-limit-requests:
|
||||
- '30000'
|
||||
x-ratelimit-limit-tokens:
|
||||
- '150000000'
|
||||
x-ratelimit-remaining-requests:
|
||||
- '29999'
|
||||
x-ratelimit-remaining-tokens:
|
||||
- '149999808'
|
||||
x-ratelimit-reset-requests:
|
||||
- 2ms
|
||||
x-ratelimit-reset-tokens:
|
||||
- 0s
|
||||
x-request-id:
|
||||
- req_fc3b3bcd4382cddaa3c04ce7003e4857
|
||||
http_version: HTTP/1.1
|
||||
status_code: 200
|
||||
- request:
|
||||
body: '{"messages": [{"role": "system", "content": "You are Task Execution Evaluator.
|
||||
Evaluator agent for crew evaluation with precise capabilities to evaluate the
|
||||
performance of the agents in the crew based on the tasks they have performed\nYour
|
||||
personal goal is: Your goal is to evaluate the performance of the agents in
|
||||
the crew based on the tasks they have performed using score from 1 to 10 evaluating
|
||||
on completion, quality, and overall performance.\nTo give my best complete final
|
||||
answer to the task respond using the exact following format:\n\nThought: I now
|
||||
can give a great answer\nFinal Answer: Your final answer must be the great and
|
||||
the most complete as possible, it must be outcome described.\n\nI MUST use these
|
||||
formats, my job depends on it!"}, {"role": "user", "content": "\nCurrent Task:
|
||||
Based on the task description and the expected output, compare and evaluate
|
||||
the performance of the agents in the crew based on the Task Output they have
|
||||
performed using score from 1 to 10 evaluating on completion, quality, and overall
|
||||
performance.task_description: Just say hi task_expected_output: hi agent: base_agent
|
||||
agent_goal: Just say hi Task Output: hi\n\nThis is the expected criteria for
|
||||
your final answer: Evaluation Score from 1 to 10 based on the performance of
|
||||
the agents on the tasks\nyou MUST return the actual complete content as the
|
||||
final answer, not a summary.\nEnsure your final answer contains only the content
|
||||
in the following format: {\n \"quality\": float\n}\n\nEnsure the final output
|
||||
does not include any code block markers like ```json or ```python.\n\nBegin!
|
||||
This is VERY important to you, use the tools available and give your best Final
|
||||
Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop":
|
||||
["\nObservation:"]}'
|
||||
headers:
|
||||
accept:
|
||||
- application/json
|
||||
accept-encoding:
|
||||
- gzip, deflate
|
||||
connection:
|
||||
- keep-alive
|
||||
content-length:
|
||||
- '1765'
|
||||
content-type:
|
||||
- application/json
|
||||
cookie:
|
||||
- __cf_bm=lvRw4Nyef7N35to64fj2_kHDfbZp0KSFbwgF5chYMRI-1740415557-1.0.1.1-o5BaN1FpBwv5Wq6zIlv0rCB28lk5hVI9wZQWU3pig1jgyAKDkYzTwZ0MlSR6v6TPIX9RfepjrO3.Gk3FEmcVRw;
|
||||
_cfuvid=ySaVoTQvAcQyH5QoJQJDj75e5j8HwGFPOlFMAWEvXJk-1740415557302-0.0.1.1-604800000
|
||||
host:
|
||||
- api.openai.com
|
||||
user-agent:
|
||||
- OpenAI/Python 1.61.0
|
||||
x-stainless-arch:
|
||||
- arm64
|
||||
x-stainless-async:
|
||||
- 'false'
|
||||
x-stainless-lang:
|
||||
- python
|
||||
x-stainless-os:
|
||||
- MacOS
|
||||
x-stainless-package-version:
|
||||
- 1.61.0
|
||||
x-stainless-raw-response:
|
||||
- 'true'
|
||||
x-stainless-retry-count:
|
||||
- '0'
|
||||
x-stainless-runtime:
|
||||
- CPython
|
||||
x-stainless-runtime-version:
|
||||
- 3.12.8
|
||||
method: POST
|
||||
uri: https://api.openai.com/v1/chat/completions
|
||||
response:
|
||||
content: "{\n \"id\": \"chatcmpl-B4Vsbd9AsRaJ2exDtWnHAwC8rIjfi\",\n \"object\":
|
||||
\"chat.completion\",\n \"created\": 1740415557,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
|
||||
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
|
||||
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
|
||||
Answer: { \\n \\\"quality\\\": 10 \\n} \",\n \"refusal\": null\n
|
||||
\ },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n
|
||||
\ ],\n \"usage\": {\n \"prompt_tokens\": 338,\n \"completion_tokens\":
|
||||
22,\n \"total_tokens\": 360,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
|
||||
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
|
||||
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
|
||||
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
|
||||
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
|
||||
headers:
|
||||
CF-RAY:
|
||||
- 9170edd15bb5230e-SJC
|
||||
Connection:
|
||||
- keep-alive
|
||||
Content-Encoding:
|
||||
- gzip
|
||||
Content-Type:
|
||||
- application/json
|
||||
Date:
|
||||
- Mon, 24 Feb 2025 16:45:58 GMT
|
||||
Server:
|
||||
- cloudflare
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
X-Content-Type-Options:
|
||||
- nosniff
|
||||
access-control-expose-headers:
|
||||
- X-Request-ID
|
||||
alt-svc:
|
||||
- h3=":443"; ma=86400
|
||||
cf-cache-status:
|
||||
- DYNAMIC
|
||||
openai-organization:
|
||||
- crewai-iuxna1
|
||||
openai-processing-ms:
|
||||
- '860'
|
||||
openai-version:
|
||||
- '2020-10-01'
|
||||
strict-transport-security:
|
||||
- max-age=31536000; includeSubDomains; preload
|
||||
x-ratelimit-limit-requests:
|
||||
- '30000'
|
||||
x-ratelimit-limit-tokens:
|
||||
- '150000000'
|
||||
x-ratelimit-remaining-requests:
|
||||
- '29999'
|
||||
x-ratelimit-remaining-tokens:
|
||||
- '149999578'
|
||||
x-ratelimit-reset-requests:
|
||||
- 2ms
|
||||
x-ratelimit-reset-tokens:
|
||||
- 0s
|
||||
x-request-id:
|
||||
- req_fad452c2d10b5fc95809130912b08837
|
||||
http_version: HTTP/1.1
|
||||
status_code: 200
|
||||
version: 1
|
||||
103
tests/utilities/cassettes/test_llm_emits_call_failed_event.yaml
Normal file
103
tests/utilities/cassettes/test_llm_emits_call_failed_event.yaml
Normal file
@@ -0,0 +1,103 @@
|
||||
interactions:
|
||||
- request:
|
||||
body: '{"messages": [{"role": "user", "content": "Hello, how are you?"}], "model":
|
||||
"gpt-4o-mini", "stop": []}'
|
||||
headers:
|
||||
accept:
|
||||
- application/json
|
||||
accept-encoding:
|
||||
- gzip, deflate
|
||||
connection:
|
||||
- keep-alive
|
||||
content-length:
|
||||
- '102'
|
||||
content-type:
|
||||
- application/json
|
||||
cookie:
|
||||
- _cfuvid=IY8ppO70AMHr2skDSUsGh71zqHHdCQCZ3OvkPi26NBc-1740424913267-0.0.1.1-604800000;
|
||||
__cf_bm=fU6K5KZoDmgcEuF8_yWAYKUO5fKHh6q5.wDPnna393g-1740424913-1.0.1.1-2iOaq3JVGWs439V0HxJee0IC9HdJm7dPkeJorD.AGw0YwkngRPM8rrTzn_7ht1BkbOauEezj.wPKcBz18gIYUg
|
||||
host:
|
||||
- api.openai.com
|
||||
user-agent:
|
||||
- OpenAI/Python 1.61.0
|
||||
x-stainless-arch:
|
||||
- arm64
|
||||
x-stainless-async:
|
||||
- 'false'
|
||||
x-stainless-lang:
|
||||
- python
|
||||
x-stainless-os:
|
||||
- MacOS
|
||||
x-stainless-package-version:
|
||||
- 1.61.0
|
||||
x-stainless-raw-response:
|
||||
- 'true'
|
||||
x-stainless-retry-count:
|
||||
- '0'
|
||||
x-stainless-runtime:
|
||||
- CPython
|
||||
x-stainless-runtime-version:
|
||||
- 3.12.8
|
||||
method: POST
|
||||
uri: https://api.openai.com/v1/chat/completions
|
||||
response:
|
||||
content: "{\n \"id\": \"chatcmpl-B4YLA2SrC2rwdVQ3U87G5a0P5lsLw\",\n \"object\":
|
||||
\"chat.completion\",\n \"created\": 1740425016,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
|
||||
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
|
||||
\"assistant\",\n \"content\": \"Hello! I'm just a computer program, so
|
||||
I don't have feelings, but I'm here and ready to help you. How can I assist
|
||||
you today?\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
|
||||
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
|
||||
13,\n \"completion_tokens\": 30,\n \"total_tokens\": 43,\n \"prompt_tokens_details\":
|
||||
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
|
||||
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
|
||||
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
|
||||
\"default\",\n \"system_fingerprint\": \"fp_709714d124\"\n}\n"
|
||||
headers:
|
||||
CF-RAY:
|
||||
- 9171d4c0ed44236e-SJC
|
||||
Connection:
|
||||
- keep-alive
|
||||
Content-Encoding:
|
||||
- gzip
|
||||
Content-Type:
|
||||
- application/json
|
||||
Date:
|
||||
- Mon, 24 Feb 2025 19:23:38 GMT
|
||||
Server:
|
||||
- cloudflare
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
X-Content-Type-Options:
|
||||
- nosniff
|
||||
access-control-expose-headers:
|
||||
- X-Request-ID
|
||||
alt-svc:
|
||||
- h3=":443"; ma=86400
|
||||
cf-cache-status:
|
||||
- DYNAMIC
|
||||
openai-organization:
|
||||
- crewai-iuxna1
|
||||
openai-processing-ms:
|
||||
- '1954'
|
||||
openai-version:
|
||||
- '2020-10-01'
|
||||
strict-transport-security:
|
||||
- max-age=31536000; includeSubDomains; preload
|
||||
x-ratelimit-limit-requests:
|
||||
- '30000'
|
||||
x-ratelimit-limit-tokens:
|
||||
- '150000000'
|
||||
x-ratelimit-remaining-requests:
|
||||
- '29999'
|
||||
x-ratelimit-remaining-tokens:
|
||||
- '149999978'
|
||||
x-ratelimit-reset-requests:
|
||||
- 2ms
|
||||
x-ratelimit-reset-tokens:
|
||||
- 0s
|
||||
x-request-id:
|
||||
- req_ea2703502b8827e4297cd2a7bae9d9c8
|
||||
http_version: HTTP/1.1
|
||||
status_code: 200
|
||||
version: 1
|
||||
108
tests/utilities/cassettes/test_llm_emits_call_started_event.yaml
Normal file
108
tests/utilities/cassettes/test_llm_emits_call_started_event.yaml
Normal file
@@ -0,0 +1,108 @@
|
||||
interactions:
|
||||
- request:
|
||||
body: '{"messages": [{"role": "user", "content": "Hello, how are you?"}], "model":
|
||||
"gpt-4o-mini", "stop": []}'
|
||||
headers:
|
||||
accept:
|
||||
- application/json
|
||||
accept-encoding:
|
||||
- gzip, deflate
|
||||
connection:
|
||||
- keep-alive
|
||||
content-length:
|
||||
- '102'
|
||||
content-type:
|
||||
- application/json
|
||||
cookie:
|
||||
- _cfuvid=GefCcEtb_Gem93E4a9Hvt3Xyof1YQZVJAXBb9I6pEUs-1739398417375-0.0.1.1-604800000
|
||||
host:
|
||||
- api.openai.com
|
||||
user-agent:
|
||||
- OpenAI/Python 1.61.0
|
||||
x-stainless-arch:
|
||||
- arm64
|
||||
x-stainless-async:
|
||||
- 'false'
|
||||
x-stainless-lang:
|
||||
- python
|
||||
x-stainless-os:
|
||||
- MacOS
|
||||
x-stainless-package-version:
|
||||
- 1.61.0
|
||||
x-stainless-raw-response:
|
||||
- 'true'
|
||||
x-stainless-retry-count:
|
||||
- '0'
|
||||
x-stainless-runtime:
|
||||
- CPython
|
||||
x-stainless-runtime-version:
|
||||
- 3.12.8
|
||||
method: POST
|
||||
uri: https://api.openai.com/v1/chat/completions
|
||||
response:
|
||||
content: "{\n \"id\": \"chatcmpl-B4YJU8IWKGyBQtAyPDRd3SFI2flYR\",\n \"object\":
|
||||
\"chat.completion\",\n \"created\": 1740424912,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
|
||||
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
|
||||
\"assistant\",\n \"content\": \"Hello! I'm just a computer program, so
|
||||
I don't have feelings, but I'm here and ready to help you. How can I assist
|
||||
you today?\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
|
||||
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
|
||||
13,\n \"completion_tokens\": 30,\n \"total_tokens\": 43,\n \"prompt_tokens_details\":
|
||||
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
|
||||
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
|
||||
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
|
||||
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
|
||||
headers:
|
||||
CF-RAY:
|
||||
- 9171d230d8ed7ae0-SJC
|
||||
Connection:
|
||||
- keep-alive
|
||||
Content-Encoding:
|
||||
- gzip
|
||||
Content-Type:
|
||||
- application/json
|
||||
Date:
|
||||
- Mon, 24 Feb 2025 19:21:53 GMT
|
||||
Server:
|
||||
- cloudflare
|
||||
Set-Cookie:
|
||||
- __cf_bm=fU6K5KZoDmgcEuF8_yWAYKUO5fKHh6q5.wDPnna393g-1740424913-1.0.1.1-2iOaq3JVGWs439V0HxJee0IC9HdJm7dPkeJorD.AGw0YwkngRPM8rrTzn_7ht1BkbOauEezj.wPKcBz18gIYUg;
|
||||
path=/; expires=Mon, 24-Feb-25 19:51:53 GMT; domain=.api.openai.com; HttpOnly;
|
||||
Secure; SameSite=None
|
||||
- _cfuvid=IY8ppO70AMHr2skDSUsGh71zqHHdCQCZ3OvkPi26NBc-1740424913267-0.0.1.1-604800000;
|
||||
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
X-Content-Type-Options:
|
||||
- nosniff
|
||||
access-control-expose-headers:
|
||||
- X-Request-ID
|
||||
alt-svc:
|
||||
- h3=":443"; ma=86400
|
||||
cf-cache-status:
|
||||
- DYNAMIC
|
||||
openai-organization:
|
||||
- crewai-iuxna1
|
||||
openai-processing-ms:
|
||||
- '993'
|
||||
openai-version:
|
||||
- '2020-10-01'
|
||||
strict-transport-security:
|
||||
- max-age=31536000; includeSubDomains; preload
|
||||
x-ratelimit-limit-requests:
|
||||
- '30000'
|
||||
x-ratelimit-limit-tokens:
|
||||
- '150000000'
|
||||
x-ratelimit-remaining-requests:
|
||||
- '29999'
|
||||
x-ratelimit-remaining-tokens:
|
||||
- '149999978'
|
||||
x-ratelimit-reset-requests:
|
||||
- 2ms
|
||||
x-ratelimit-reset-tokens:
|
||||
- 0s
|
||||
x-request-id:
|
||||
- req_d9c4d49185e97b1797061efc1e55d811
|
||||
http_version: HTTP/1.1
|
||||
status_code: 200
|
||||
version: 1
|
||||
@@ -1,6 +1,5 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
from pydantic import Field
|
||||
@@ -9,9 +8,9 @@ from crewai.agent import Agent
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
from crewai.crew import Crew
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai.llm import LLM
|
||||
from crewai.task import Task
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.tools.tool_usage import ToolUsage
|
||||
from crewai.utilities.events.agent_events import (
|
||||
AgentExecutionCompletedEvent,
|
||||
AgentExecutionErrorEvent,
|
||||
@@ -21,8 +20,11 @@ from crewai.utilities.events.crew_events import (
|
||||
CrewKickoffCompletedEvent,
|
||||
CrewKickoffFailedEvent,
|
||||
CrewKickoffStartedEvent,
|
||||
CrewTestCompletedEvent,
|
||||
CrewTestStartedEvent,
|
||||
)
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.event_listener import EventListener
|
||||
from crewai.utilities.events.event_types import ToolUsageFinishedEvent
|
||||
from crewai.utilities.events.flow_events import (
|
||||
FlowCreatedEvent,
|
||||
@@ -31,6 +33,12 @@ from crewai.utilities.events.flow_events import (
|
||||
MethodExecutionFailedEvent,
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
from crewai.utilities.events.llm_events import (
|
||||
LLMCallCompletedEvent,
|
||||
LLMCallFailedEvent,
|
||||
LLMCallStartedEvent,
|
||||
LLMCallType,
|
||||
)
|
||||
from crewai.utilities.events.task_events import (
|
||||
TaskCompletedEvent,
|
||||
TaskFailedEvent,
|
||||
@@ -52,26 +60,35 @@ base_task = Task(
|
||||
expected_output="hi",
|
||||
agent=base_agent,
|
||||
)
|
||||
event_listener = EventListener()
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_crew_emits_start_kickoff_event():
|
||||
received_events = []
|
||||
mock_span = Mock()
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def handle_crew_start(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def handle_crew_start(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "crew_execution_span", return_value=mock_span
|
||||
) as mock_crew_execution_span,
|
||||
patch.object(
|
||||
event_listener._telemetry, "end_crew", return_value=mock_span
|
||||
) as mock_crew_ended,
|
||||
):
|
||||
crew.kickoff()
|
||||
mock_crew_execution_span.assert_called_once_with(crew, None)
|
||||
mock_crew_ended.assert_called_once_with(crew, "hi")
|
||||
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
assert received_events[0].type == "crew_kickoff_started"
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
assert received_events[0].type == "crew_kickoff_started"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@@ -92,6 +109,45 @@ def test_crew_emits_end_kickoff_event():
|
||||
assert received_events[0].type == "crew_kickoff_completed"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_crew_emits_test_kickoff_type_event():
|
||||
received_events = []
|
||||
mock_span = Mock()
|
||||
|
||||
@crewai_event_bus.on(CrewTestStartedEvent)
|
||||
def handle_crew_end(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
@crewai_event_bus.on(CrewTestCompletedEvent)
|
||||
def handle_crew_test_end(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
eval_llm = LLM(model="gpt-4o-mini")
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "test_execution_span", return_value=mock_span
|
||||
) as mock_crew_execution_span,
|
||||
):
|
||||
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
|
||||
crew.test(n_iterations=1, eval_llm=eval_llm)
|
||||
|
||||
# Verify the call was made with correct argument types and values
|
||||
assert mock_crew_execution_span.call_count == 1
|
||||
args = mock_crew_execution_span.call_args[0]
|
||||
assert isinstance(args[0], Crew)
|
||||
assert args[1] == 1
|
||||
assert args[2] is None
|
||||
assert args[3] == eval_llm
|
||||
|
||||
assert len(received_events) == 2
|
||||
assert received_events[0].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
assert received_events[0].type == "crew_test_started"
|
||||
assert received_events[1].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[1].timestamp, datetime)
|
||||
assert received_events[1].type == "crew_test_completed"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_crew_emits_kickoff_failed_event():
|
||||
received_events = []
|
||||
@@ -142,9 +198,20 @@ def test_crew_emits_end_task_event():
|
||||
def handle_task_end(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
mock_span = Mock()
|
||||
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "task_started", return_value=mock_span
|
||||
) as mock_task_started,
|
||||
patch.object(
|
||||
event_listener._telemetry, "task_ended", return_value=mock_span
|
||||
) as mock_task_ended,
|
||||
):
|
||||
crew.kickoff()
|
||||
|
||||
crew.kickoff()
|
||||
mock_task_started.assert_called_once_with(crew=crew, task=base_task)
|
||||
mock_task_ended.assert_called_once_with(mock_span, base_task, crew)
|
||||
|
||||
assert len(received_events) == 1
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
@@ -334,24 +401,29 @@ def test_tools_emits_error_events():
|
||||
|
||||
def test_flow_emits_start_event():
|
||||
received_events = []
|
||||
mock_span = Mock()
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle_flow_start(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle_flow_start(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
class TestFlow(Flow[dict]):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
class TestFlow(Flow[dict]):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "flow_execution_span", return_value=mock_span
|
||||
) as mock_flow_execution_span,
|
||||
):
|
||||
flow = TestFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].flow_name == "TestFlow"
|
||||
assert received_events[0].type == "flow_started"
|
||||
mock_flow_execution_span.assert_called_once_with("TestFlow", ["begin"])
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].flow_name == "TestFlow"
|
||||
assert received_events[0].type == "flow_started"
|
||||
|
||||
|
||||
def test_flow_emits_finish_event():
|
||||
@@ -455,6 +527,7 @@ def test_multiple_handlers_for_same_event():
|
||||
|
||||
def test_flow_emits_created_event():
|
||||
received_events = []
|
||||
mock_span = Mock()
|
||||
|
||||
@crewai_event_bus.on(FlowCreatedEvent)
|
||||
def handle_flow_created(source, event):
|
||||
@@ -465,8 +538,15 @@ def test_flow_emits_created_event():
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
flow = TestFlow()
|
||||
flow.kickoff()
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "flow_creation_span", return_value=mock_span
|
||||
) as mock_flow_creation_span,
|
||||
):
|
||||
flow = TestFlow()
|
||||
flow.kickoff()
|
||||
|
||||
mock_flow_creation_span.assert_called_once_with("TestFlow")
|
||||
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].flow_name == "TestFlow"
|
||||
@@ -495,3 +575,43 @@ def test_flow_emits_method_execution_failed_event():
|
||||
assert received_events[0].flow_name == "TestFlow"
|
||||
assert received_events[0].type == "method_execution_failed"
|
||||
assert received_events[0].error == error
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_llm_emits_call_started_event():
|
||||
received_events = []
|
||||
|
||||
@crewai_event_bus.on(LLMCallStartedEvent)
|
||||
def handle_llm_call_started(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
@crewai_event_bus.on(LLMCallCompletedEvent)
|
||||
def handle_llm_call_completed(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
llm.call("Hello, how are you?")
|
||||
|
||||
assert len(received_events) == 2
|
||||
assert received_events[0].type == "llm_call_started"
|
||||
assert received_events[1].type == "llm_call_completed"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_llm_emits_call_failed_event():
|
||||
received_events = []
|
||||
|
||||
@crewai_event_bus.on(LLMCallFailedEvent)
|
||||
def handle_llm_call_failed(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
error_message = "Simulated LLM call failure"
|
||||
with patch("crewai.llm.litellm.completion", side_effect=Exception(error_message)):
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
llm.call("Hello, how are you?")
|
||||
|
||||
assert str(exc_info.value) == error_message
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].type == "llm_call_failed"
|
||||
assert received_events[0].error == error_message
|
||||
|
||||
Reference in New Issue
Block a user