Compare commits

..

2 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
e8653c81e1 Merge branch 'main' into fix/unnecessary-imports 2025-03-26 11:13:56 -04:00
Lorenze Jay
a244370f48 Refactor type hints and clean up imports in crew.py
- Removed unused import of BaseTool from langchain_core.tools.
- Updated type hints in crew.py to streamline code and improve readability.
- Cleaned up whitespace for better code formatting.
2025-03-26 08:11:57 -07:00
35 changed files with 247 additions and 1799 deletions

View File

@@ -13,7 +13,7 @@ CrewAI provides a powerful event system that allows you to listen for and react
CrewAI uses an event bus architecture to emit events throughout the execution lifecycle. The event system is built on the following components:
1. **CrewAIEventsBus**: A singleton event bus that manages event registration and emission
2. **BaseEvent**: Base class for all events in the system
2. **CrewEvent**: Base class for all events in the system
3. **BaseEventListener**: Abstract base class for creating custom event listeners
When specific actions occur in CrewAI (like a Crew starting execution, an Agent completing a task, or a tool being used), the system emits corresponding events. You can register handlers for these events to execute custom code when they occur.
@@ -234,7 +234,7 @@ Each event handler receives two parameters:
1. **source**: The object that emitted the event
2. **event**: The event instance, containing event-specific data
The structure of the event object depends on the event type, but all events inherit from `BaseEvent` and include:
The structure of the event object depends on the event type, but all events inherit from `CrewEvent` and include:
- **timestamp**: The time when the event was emitted
- **type**: A string identifier for the event type

View File

@@ -164,10 +164,7 @@ crew = Crew(
[Mem0](https://mem0.ai/) is a self-improving memory layer for LLM applications, enabling personalized AI experiences.
### Using Mem0 API platform
To include user-specific memory you can get your API key [here](https://app.mem0.ai/dashboard/api-keys) and refer the [docs](https://docs.mem0.ai/platform/quickstart#4-1-create-memories) for adding user preferences. In this case `user_memory` is set to `MemoryClient` from mem0.
To include user-specific memory you can get your API key [here](https://app.mem0.ai/dashboard/api-keys) and refer the [docs](https://docs.mem0.ai/platform/quickstart#4-1-create-memories) for adding user preferences.
```python Code
@@ -178,7 +175,18 @@ from mem0 import MemoryClient
# Set environment variables for Mem0
os.environ["MEM0_API_KEY"] = "m0-xx"
# Step 1: Create a Crew with User Memory
# Step 1: Record preferences based on past conversation or user input
client = MemoryClient()
messages = [
{"role": "user", "content": "Hi there! I'm planning a vacation and could use some advice."},
{"role": "assistant", "content": "Hello! I'd be happy to help with your vacation planning. What kind of destination do you prefer?"},
{"role": "user", "content": "I am more of a beach person than a mountain person."},
{"role": "assistant", "content": "That's interesting. Do you like hotels or Airbnb?"},
{"role": "user", "content": "I like Airbnb more."},
]
client.add(messages, user_id="john")
# Step 2: Create a Crew with User Memory
crew = Crew(
agents=[...],
@@ -189,12 +197,11 @@ crew = Crew(
memory_config={
"provider": "mem0",
"config": {"user_id": "john"},
"user_memory" : {} #Set user_memory explicitly to a dictionary, we are working on this issue.
},
)
```
#### Additional Memory Configuration Options
## Memory Configuration Options
If you want to access a specific organization and project, you can set the `org_id` and `project_id` parameters in the memory configuration.
```python Code
@@ -208,74 +215,10 @@ crew = Crew(
memory_config={
"provider": "mem0",
"config": {"user_id": "john", "org_id": "my_org_id", "project_id": "my_project_id"},
"user_memory" : {} #Set user_memory explicitly to a dictionary, we are working on this issue.
},
)
```
### Using Local Mem0 memory
If you want to use local mem0 memory, with a custom configuration, you can set a parameter `local_mem0_config` in the config itself.
If both os environment key is set and local_mem0_config is given, the API platform takes higher priority over the local configuration.
Check [this](https://docs.mem0.ai/open-source/python-quickstart#run-mem0-locally) mem0 local configuration docs for more understanding.
In this case `user_memory` is set to `Memory` from mem0.
```python Code
from crewai import Crew
#local mem0 config
config = {
"vector_store": {
"provider": "qdrant",
"config": {
"host": "localhost",
"port": 6333
}
},
"llm": {
"provider": "openai",
"config": {
"api_key": "your-api-key",
"model": "gpt-4"
}
},
"embedder": {
"provider": "openai",
"config": {
"api_key": "your-api-key",
"model": "text-embedding-3-small"
}
},
"graph_store": {
"provider": "neo4j",
"config": {
"url": "neo4j+s://your-instance",
"username": "neo4j",
"password": "password"
}
},
"history_db_path": "/path/to/history.db",
"version": "v1.1",
"custom_fact_extraction_prompt": "Optional custom prompt for fact extraction for memory",
"custom_update_memory_prompt": "Optional custom prompt for update memory"
}
crew = Crew(
agents=[...],
tasks=[...],
verbose=True,
memory=True,
memory_config={
"provider": "mem0",
"config": {"user_id": "john", 'local_mem0_config': config},
"user_memory" : {} #Set user_memory explicitly to a dictionary, we are working on this issue.
},
)
```
## Additional Embedding Providers
### Using OpenAI embeddings (already default)

View File

@@ -1,6 +1,6 @@
{
"$schema": "https://mintlify.com/docs.json",
"theme": "mint",
"theme": "palm",
"name": "CrewAI",
"colors": {
"primary": "#EB6658",

View File

@@ -8,10 +8,6 @@ icon: meteor
With [Comet Opik](https://www.comet.com/docs/opik/), debug, evaluate, and monitor your LLM applications, RAG systems, and agentic workflows with comprehensive tracing, automated evaluations, and production-ready dashboards.
<Frame caption="Opik Agent Dashboard">
<img src="/images/opik-crewai-dashboard.png" alt="Opik agent monitoring example with CrewAI" />
</Frame>
Opik provides comprehensive support for every stage of your CrewAI application development:
- **Log Traces and Spans**: Automatically track LLM calls and application logic to debug and analyze development and production systems. Manually or programmatically annotate, view, and compare responses across projects.
@@ -31,7 +27,7 @@ For this guide we will use CrewAIs quickstart example.
<Steps>
<Step title="Install required packages">
```shell
pip install crewai crewai-tools opik --upgrade
%pip install crewai crewai-tools opik --upgrade
```
</Step>
<Step title="Configure Opik">
@@ -39,10 +35,8 @@ For this guide we will use CrewAIs quickstart example.
import opik
opik.configure(use_local=False)
```
</Step>
<Step title="Prepare environment">
First, we set up our API keys for our LLM-provider as environment variables:
```python
import os
import getpass
@@ -53,56 +47,53 @@ For this guide we will use CrewAIs quickstart example.
</Step>
<Step title="Using CrewAI">
The first step is to create our project. We will use an example from CrewAIs documentation:
```python
from crewai import Agent, Crew, Task, Process
class YourCrewName:
def agent_one(self) -> Agent:
return Agent(
role="Data Analyst",
goal="Analyze data trends in the market",
backstory="An experienced data analyst with a background in economics",
verbose=True,
)
class YourCrewName:
def agent_one(self) -> Agent:
return Agent(
role="Data Analyst",
goal="Analyze data trends in the market",
backstory="An experienced data analyst with a background in economics",
verbose=True,
)
def agent_two(self) -> Agent:
return Agent(
role="Market Researcher",
goal="Gather information on market dynamics",
backstory="A diligent researcher with a keen eye for detail",
verbose=True,
)
def agent_two(self) -> Agent:
return Agent(
role="Market Researcher",
goal="Gather information on market dynamics",
backstory="A diligent researcher with a keen eye for detail",
verbose=True,
)
def task_one(self) -> Task:
return Task(
name="Collect Data Task",
description="Collect recent market data and identify trends.",
expected_output="A report summarizing key trends in the market.",
agent=self.agent_one(),
)
def task_one(self) -> Task:
return Task(
name="Collect Data Task",
description="Collect recent market data and identify trends.",
expected_output="A report summarizing key trends in the market.",
agent=self.agent_one(),
)
def task_two(self) -> Task:
return Task(
name="Market Research Task",
description="Research factors affecting market dynamics.",
expected_output="An analysis of factors influencing the market.",
agent=self.agent_two(),
)
def task_two(self) -> Task:
return Task(
name="Market Research Task",
description="Research factors affecting market dynamics.",
expected_output="An analysis of factors influencing the market.",
agent=self.agent_two(),
)
def crew(self) -> Crew:
return Crew(
agents=[self.agent_one(), self.agent_two()],
tasks=[self.task_one(), self.task_two()],
process=Process.sequential,
verbose=True,
)
def crew(self) -> Crew:
return Crew(
agents=[self.agent_one(), self.agent_two()],
tasks=[self.task_one(), self.task_two()],
process=Process.sequential,
verbose=True,
)
```
Now we can import Opiks tracker and run our crew:
```python
from opik.integrations.crewai import track_crewai
@@ -118,6 +109,10 @@ For this guide we will use CrewAIs quickstart example.
- Agent interactions and task execution flow
- Performance metrics like latency and token usage
- Evaluation metrics (built-in or custom)
<Frame caption="Opik Agent Dashboard">
<img src="/images/opik-crewai-dashboard.png" alt="Opik agent monitoring example with CrewAI" />
</Frame>
</Step>
</Steps>

View File

@@ -45,7 +45,7 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.38.0"]
tools = ["crewai-tools>=0.37.0"]
embeddings = [
"tiktoken~=0.7.0"
]

View File

@@ -1,9 +1,7 @@
import concurrent.futures
import re
import shutil
import subprocess
import threading
from typing import Any, Dict, List, Literal, Optional, Sequence, TypeVar, Union
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -59,7 +57,6 @@ class Agent(BaseAgent):
"""
_times_executed: int = PrivateAttr(default=0)
_times_executed_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -157,13 +154,13 @@ class Agent(BaseAgent):
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
def _execute_task_without_timeout(
def execute_task(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task with the agent without timeout.
"""Execute a task with the agent.
Args:
task: Task to execute.
@@ -267,19 +264,18 @@ class Agent(BaseAgent):
),
)
raise e
with self._times_executed_lock:
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
result = self._execute_task_without_timeout(task, context, tools)
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
result = self.execute_task(task, context, tools)
if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
@@ -295,122 +291,6 @@ class Agent(BaseAgent):
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
)
return result
def _execute_with_timeout(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> Union[str, Any]:
"""Execute a task with a timeout.
Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.
Returns:
Output of the agent
Raises:
TimeoutError: If the execution exceeds max_execution_time
"""
if not isinstance(self.max_execution_time, int) or self.max_execution_time <= 0:
raise ValueError("max_execution_time must be a positive integer")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
self._execute_task_without_timeout, task, context, tools
)
try:
return future.result(timeout=self.max_execution_time)
except concurrent.futures.TimeoutError:
future.cancel()
self._cleanup_timeout_resources()
self._logger.log(
"warning",
f"Task execution timed out after {self.max_execution_time} seconds"
)
if hasattr(self, 'agent_executor') and self.agent_executor:
try:
self._logger.log(
"info",
"Requesting final answer due to timeout"
)
force_final_answer_message = {
"role": "assistant",
"content": self.i18n.errors("force_final_answer")
}
if hasattr(self.agent_executor, 'messages'):
self.agent_executor.messages.append(force_final_answer_message)
final_answer = self.agent_executor.llm.call(
self.agent_executor.messages,
callbacks=self.agent_executor.callbacks,
)
if final_answer:
formatted_answer = self.agent_executor._format_answer(final_answer)
if hasattr(formatted_answer, 'output'):
return formatted_answer.output
except Exception as e:
self._logger.log(
"error",
f"Failed to get partial result after timeout: {str(e)}"
)
error_msg = (
f"Task '{task.description}' execution timed out after "
f"{self.max_execution_time} seconds. Consider:\n"
f"1. Increasing max_execution_time\n"
f"2. Optimizing the task\n"
f"3. Breaking the task into smaller subtasks"
)
raise TimeoutError(error_msg)
def execute_task(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task with the agent.
Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.
Returns:
Output of the agent
Raises:
TimeoutError: If the execution exceeds max_execution_time (if set)
Exception: For other execution errors
"""
with self._times_executed_lock:
self._times_executed = 0
if self.max_execution_time is None:
return self._execute_task_without_timeout(task, context, tools)
try:
return self._execute_with_timeout(task, context, tools)
except TimeoutError as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise
def create_agent_executor(
self, tools: Optional[List[BaseTool]] = None, task=None
@@ -522,23 +402,6 @@ class Agent(BaseAgent):
return task_prompt
def _cleanup_timeout_resources(self) -> None:
"""Clean up resources after a timeout occurs.
This method is called when a task execution times out to ensure
that no resources are left in an inconsistent state.
"""
with self._times_executed_lock:
self._times_executed = 0
if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
self._logger.log(
"info",
"Cleaned up resources after timeout"
)
def _use_trained_data(self, task_prompt: str) -> str:
"""Use trained data for the agent task prompt to improve output."""
if data := CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load():

View File

@@ -20,6 +20,7 @@ from crewai.utilities import I18N, Printer
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.events import (
ToolUsageErrorEvent,
ToolUsageStartedEvent,
crewai_event_bus,
)
from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent
@@ -152,21 +153,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer = self._process_llm_response(answer)
if isinstance(formatted_answer, AgentAction):
# Extract agent fingerprint if available
fingerprint_context = {}
if (
self.agent
and hasattr(self.agent, "security_config")
and hasattr(self.agent.security_config, "fingerprint")
):
fingerprint_context = {
"agent_fingerprint": str(
self.agent.security_config.fingerprint
)
}
tool_result = self._execute_tool_and_check_finality(
formatted_answer, fingerprint_context=fingerprint_context
formatted_answer
)
formatted_answer = self._handle_agent_action(
formatted_answer, tool_result
@@ -372,35 +360,19 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n"
)
def _execute_tool_and_check_finality(
self,
agent_action: AgentAction,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> ToolResult:
def _execute_tool_and_check_finality(self, agent_action: AgentAction) -> ToolResult:
try:
fingerprint_context = fingerprint_context or {}
if self.agent:
# Create tool usage event with fingerprint information
event_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": agent_action.tool,
"tool_args": agent_action.tool_input,
"tool_class": agent_action.tool,
"agent": self.agent, # Pass the agent object for fingerprint extraction
}
# Include fingerprint context
if fingerprint_context:
event_data.update(fingerprint_context)
# Emit the tool usage started event with agent information
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(**event_data),
event=ToolUsageStartedEvent(
agent_key=self.agent.key,
agent_role=self.agent.role,
tool_name=agent_action.tool,
tool_args=agent_action.tool_input,
tool_class=agent_action.tool,
),
)
tool_usage = ToolUsage(
tools_handler=self.tools_handler,
tools=self.tools,
@@ -411,7 +383,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
task=self.task, # type: ignore[arg-type]
agent=self.agent,
action=agent_action,
fingerprint_context=fingerprint_context, # Pass fingerprint context
)
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
@@ -440,23 +411,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
except Exception as e:
# TODO: drop
if self.agent:
error_event_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": agent_action.tool,
"tool_args": agent_action.tool_input,
"tool_class": agent_action.tool,
"error": str(e),
"agent": self.agent, # Pass the agent object for fingerprint extraction
}
# Include fingerprint context
if fingerprint_context:
error_event_data.update(fingerprint_context)
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(**error_event_data),
event=ToolUsageErrorEvent( # validation error
agent_key=self.agent.key,
agent_role=self.agent.role,
tool_name=agent_action.tool,
tool_args=agent_action.tool_input,
tool_class=agent_action.tool,
error=str(e),
),
)
raise e

View File

@@ -290,17 +290,23 @@ class Crew(BaseModel):
else EntityMemory(crew=self, embedder_config=self.embedder)
)
if (
self.memory_config
and "user_memory" in self.memory_config
and self.memory_config.get("provider") == "mem0"
self.memory_config and "user_memory" in self.memory_config
): # Check for user_memory in config
user_memory_config = self.memory_config["user_memory"]
if isinstance(
user_memory_config, UserMemory
): # Check if it is already an instance
self._user_memory = user_memory_config
elif isinstance(
user_memory_config, dict
): # Check if it's a configuration dict
self._user_memory = UserMemory(crew=self)
self._user_memory = UserMemory(
crew=self, **user_memory_config
) # Initialize with config
else:
raise TypeError("user_memory must be a configuration dictionary")
raise TypeError(
"user_memory must be a UserMemory instance or a configuration dictionary"
)
else:
self._user_memory = None # No user memory if not in config
return self
@@ -1150,12 +1156,7 @@ class Crew(BaseModel):
return required_inputs
def copy(self):
"""
Creates a deep copy of the Crew instance.
Returns:
Crew: A new instance with copied components
"""
"""Create a deep copy of the Crew."""
exclude = {
"id",
@@ -1167,18 +1168,13 @@ class Crew(BaseModel):
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_telemetry",
"agents",
"tasks",
"knowledge_sources",
"knowledge",
"manager_agent",
"manager_llm",
}
cloned_agents = [agent.copy() for agent in self.agents]
manager_agent = self.manager_agent.copy() if self.manager_agent else None
manager_llm = shallow_copy(self.manager_llm) if self.manager_llm else None
task_mapping = {}
@@ -1211,8 +1207,6 @@ class Crew(BaseModel):
tasks=cloned_tasks,
knowledge_sources=existing_knowledge_sources,
knowledge=existing_knowledge,
manager_agent=manager_agent,
manager_llm=manager_llm,
)
return copied_crew

View File

@@ -5,17 +5,35 @@ from typing import Any, Dict, List, Union
from pydantic import BaseModel
from crewai.flow import Flow
SerializablePrimitive = Union[str, int, float, bool, None]
Serializable = Union[
SerializablePrimitive, List["Serializable"], Dict[str, "Serializable"]
]
def export_state(flow: Flow) -> dict[str, Serializable]:
"""Exports the Flow's internal state as JSON-compatible data structures.
Performs a one-way transformation of a Flow's state into basic Python types
that can be safely serialized to JSON. To prevent infinite recursion with
circular references, the conversion is limited to a depth of 5 levels.
Args:
flow: The Flow object whose state needs to be exported
Returns:
dict[str, Any]: The transformed state using JSON-compatible Python
types.
"""
result = to_serializable(flow._state)
assert isinstance(result, dict)
return result
def to_serializable(
obj: Any,
exclude: set[str] | None = None,
max_depth: int = 5,
_current_depth: int = 0,
obj: Any, exclude: set[str] | None = None, max_depth: int = 5, _current_depth: int = 0
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.

View File

@@ -94,10 +94,6 @@ class ContextualMemory:
Returns:
str: Formatted user memories as bullet points, or an empty string if none found.
"""
if self.um is None:
return ""
user_memories = self.um.search(query)
if not user_memories:
return ""

View File

@@ -31,7 +31,6 @@ class Mem0Storage(Storage):
mem0_api_key = config.get("api_key") or os.getenv("MEM0_API_KEY")
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
mem0_local_config = config.get("local_mem0_config")
# Initialize MemoryClient or Memory based on the presence of the mem0_api_key
if mem0_api_key:
@@ -42,10 +41,7 @@ class Mem0Storage(Storage):
else:
self.memory = MemoryClient(api_key=mem0_api_key)
else:
if mem0_local_config and len(mem0_local_config):
self.memory = Memory.from_config(config)
else:
self.memory = Memory()
self.memory = Memory() # Fallback to Memory if no Mem0 API key is provided
def _sanitize_role(self, role: str) -> str:
"""
@@ -118,7 +114,3 @@ class Mem0Storage(Storage):
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return agents
def reset(self):
if self.memory:
self.memory.reset()

View File

@@ -43,11 +43,3 @@ class UserMemory(Memory):
score_threshold=score_threshold,
)
return results
def reset(self) -> None:
try:
self.storage.reset()
except Exception as e:
raise Exception(
f"An error occurred while resetting the user memory: {e}"
)

View File

@@ -388,7 +388,7 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))
crewai_event_bus.emit(self, TaskStartedEvent(context=context))
result = agent.execute_task(
task=self,
context=context,
@@ -464,11 +464,11 @@ class Task(BaseModel):
)
)
self._save_file(content)
crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output, task=self))
crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output))
return task_output
except Exception as e:
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e)))
raise e # Re-raise the exception after emitting the event
def prompt(self) -> str:

View File

@@ -112,23 +112,6 @@ class Telemetry:
self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
# Add fingerprint data
if hasattr(crew, "fingerprint") and crew.fingerprint:
self._add_attribute(span, "crew_fingerprint", crew.fingerprint.uuid_str)
self._add_attribute(
span,
"crew_fingerprint_created_at",
crew.fingerprint.created_at.isoformat(),
)
# Add fingerprint metadata if it exists
if hasattr(crew.fingerprint, "metadata") and crew.fingerprint.metadata:
self._add_attribute(
span,
"crew_fingerprint_metadata",
json.dumps(crew.fingerprint.metadata),
)
if crew.share_crew:
self._add_attribute(
span,
@@ -146,43 +129,17 @@ class Telemetry:
"max_rpm": agent.max_rpm,
"i18n": agent.i18n.prompt_file,
"function_calling_llm": (
getattr(
getattr(agent, "function_calling_llm", None),
"model",
"",
)
if getattr(agent, "function_calling_llm", None)
agent.function_calling_llm.model
if agent.function_calling_llm
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": getattr(
agent, "allow_code_execution", False
),
"max_retry_limit": getattr(agent, "max_retry_limit", 3),
"allow_code_execution?": agent.allow_code_execution,
"max_retry_limit": agent.max_retry_limit,
"tools_names": [
tool.name.casefold() for tool in agent.tools or []
],
# Add agent fingerprint data if sharing crew details
"fingerprint": (
getattr(
getattr(agent, "fingerprint", None),
"uuid_str",
None,
)
),
"fingerprint_created_at": (
created_at.isoformat()
if (
created_at := getattr(
getattr(agent, "fingerprint", None),
"created_at",
None,
)
)
is not None
else None
),
}
for agent in crew.agents
]
@@ -212,17 +169,6 @@ class Telemetry:
"tools_names": [
tool.name.casefold() for tool in task.tools or []
],
# Add task fingerprint data if sharing crew details
"fingerprint": (
task.fingerprint.uuid_str
if hasattr(task, "fingerprint") and task.fingerprint
else None
),
"fingerprint_created_at": (
task.fingerprint.created_at.isoformat()
if hasattr(task, "fingerprint") and task.fingerprint
else None
),
}
for task in crew.tasks
]
@@ -250,20 +196,14 @@ class Telemetry:
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"function_calling_llm": (
getattr(
getattr(agent, "function_calling_llm", None),
"model",
"",
)
if getattr(agent, "function_calling_llm", None)
agent.function_calling_llm.model
if agent.function_calling_llm
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": getattr(
agent, "allow_code_execution", False
),
"max_retry_limit": getattr(agent, "max_retry_limit", 3),
"allow_code_execution?": agent.allow_code_execution,
"max_retry_limit": agent.max_retry_limit,
"tools_names": [
tool.name.casefold() for tool in agent.tools or []
],
@@ -312,39 +252,6 @@ class Telemetry:
self._add_attribute(created_span, "task_key", task.key)
self._add_attribute(created_span, "task_id", str(task.id))
# Add fingerprint data
if hasattr(crew, "fingerprint") and crew.fingerprint:
self._add_attribute(
created_span, "crew_fingerprint", crew.fingerprint.uuid_str
)
if hasattr(task, "fingerprint") and task.fingerprint:
self._add_attribute(
created_span, "task_fingerprint", task.fingerprint.uuid_str
)
self._add_attribute(
created_span,
"task_fingerprint_created_at",
task.fingerprint.created_at.isoformat(),
)
# Add fingerprint metadata if it exists
if hasattr(task.fingerprint, "metadata") and task.fingerprint.metadata:
self._add_attribute(
created_span,
"task_fingerprint_metadata",
json.dumps(task.fingerprint.metadata),
)
# Add agent fingerprint if task has an assigned agent
if hasattr(task, "agent") and task.agent:
agent_fingerprint = getattr(
getattr(task.agent, "fingerprint", None), "uuid_str", None
)
if agent_fingerprint:
self._add_attribute(
created_span, "agent_fingerprint", agent_fingerprint
)
if crew.share_crew:
self._add_attribute(
created_span, "formatted_description", task.description
@@ -363,21 +270,6 @@ class Telemetry:
self._add_attribute(span, "task_key", task.key)
self._add_attribute(span, "task_id", str(task.id))
# Add fingerprint data to execution span
if hasattr(crew, "fingerprint") and crew.fingerprint:
self._add_attribute(span, "crew_fingerprint", crew.fingerprint.uuid_str)
if hasattr(task, "fingerprint") and task.fingerprint:
self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str)
# Add agent fingerprint if task has an assigned agent
if hasattr(task, "agent") and task.agent:
agent_fingerprint = getattr(
getattr(task.agent, "fingerprint", None), "uuid_str", None
)
if agent_fingerprint:
self._add_attribute(span, "agent_fingerprint", agent_fingerprint)
if crew.share_crew:
self._add_attribute(span, "formatted_description", task.description)
self._add_attribute(
@@ -399,12 +291,7 @@ class Telemetry:
Note:
If share_crew is enabled, this will also record the task output
"""
def operation():
# Ensure fingerprint data is present on completion span
if hasattr(task, "fingerprint") and task.fingerprint:
self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str)
if crew.share_crew:
self._add_attribute(
span,
@@ -425,7 +312,6 @@ class Telemetry:
tool_name (str): Name of the tool being repeatedly used
attempts (int): Number of attempts made with this tool
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Repeated Usage")
@@ -443,16 +329,14 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_usage(self, llm: Any, tool_name: str, attempts: int, agent: Any = None):
def tool_usage(self, llm: Any, tool_name: str, attempts: int):
"""Records the usage of a tool by an agent.
Args:
llm (Any): The language model being used
tool_name (str): Name of the tool being used
attempts (int): Number of attempts made with this tool
agent (Any, optional): The agent using the tool
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage")
@@ -465,31 +349,17 @@ class Telemetry:
self._add_attribute(span, "attempts", attempts)
if llm:
self._add_attribute(span, "llm", llm.model)
# Add agent fingerprint data if available
if agent and hasattr(agent, "fingerprint") and agent.fingerprint:
self._add_attribute(
span, "agent_fingerprint", agent.fingerprint.uuid_str
)
if hasattr(agent, "role"):
self._add_attribute(span, "agent_role", agent.role)
span.set_status(Status(StatusCode.OK))
span.end()
self._safe_telemetry_operation(operation)
def tool_usage_error(
self, llm: Any, agent: Any = None, tool_name: Optional[str] = None
):
def tool_usage_error(self, llm: Any):
"""Records when a tool usage results in an error.
Args:
llm (Any): The language model being used when the error occurred
agent (Any, optional): The agent using the tool
tool_name (str, optional): Name of the tool that caused the error
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage Error")
@@ -500,18 +370,6 @@ class Telemetry:
)
if llm:
self._add_attribute(span, "llm", llm.model)
if tool_name:
self._add_attribute(span, "tool_name", tool_name)
# Add agent fingerprint data if available
if agent and hasattr(agent, "fingerprint") and agent.fingerprint:
self._add_attribute(
span, "agent_fingerprint", agent.fingerprint.uuid_str
)
if hasattr(agent, "role"):
self._add_attribute(span, "agent_role", agent.role)
span.set_status(Status(StatusCode.OK))
span.end()
@@ -528,7 +386,6 @@ class Telemetry:
exec_time (int): Execution time in seconds
model_name (str): Name of the model used
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Individual Test Result")
@@ -563,7 +420,6 @@ class Telemetry:
inputs (dict[str, Any] | None): Input parameters for the test
model_name (str): Name of the model used in testing
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Test Execution")
@@ -590,7 +446,6 @@ class Telemetry:
def deploy_signup_error_span(self):
"""Records when an error occurs during the deployment signup process."""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Deploy Signup Error")
@@ -605,7 +460,6 @@ class Telemetry:
Args:
uuid (Optional[str]): Unique identifier for the deployment
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Start Deployment")
@@ -618,7 +472,6 @@ class Telemetry:
def create_crew_deployment_span(self):
"""Records the creation of a new crew deployment."""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Create Crew Deployment")
@@ -634,7 +487,6 @@ class Telemetry:
uuid (Optional[str]): Unique identifier for the crew
log_type (str, optional): Type of logs being retrieved. Defaults to "deployment".
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Get Crew Logs")
@@ -652,7 +504,6 @@ class Telemetry:
Args:
uuid (Optional[str]): Unique identifier for the crew being removed
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Remove Crew")
@@ -783,7 +634,6 @@ class Telemetry:
Args:
flow_name (str): Name of the flow being created
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
@@ -800,7 +650,6 @@ class Telemetry:
flow_name (str): Name of the flow being plotted
node_names (list[str]): List of node names in the flow
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Plotting")
@@ -818,7 +667,6 @@ class Telemetry:
flow_name (str): Name of the flow being executed
node_names (list[str]): List of nodes being executed in the flow
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Execution")

View File

@@ -22,7 +22,6 @@ from crewai.utilities.events.tool_usage_events import (
ToolSelectionErrorEvent,
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
ToolValidateInputErrorEvent,
)
@@ -70,7 +69,6 @@ class ToolUsage:
function_calling_llm: Any,
agent: Any,
action: Any,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> None:
self._i18n: I18N = agent.i18n
self._printer: Printer = Printer()
@@ -87,7 +85,6 @@ class ToolUsage:
self.task = task
self.action = action
self.function_calling_llm = function_calling_llm
self.fingerprint_context = fingerprint_context or {}
# Set the maximum parsing attempts for bigger models
if (
@@ -120,10 +117,7 @@ class ToolUsage:
self._printer.print(content=f"\n\n{error}\n", color="red")
return error
if (
isinstance(tool, CrewStructuredTool)
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
):
if isinstance(tool, CrewStructuredTool) and tool.name == self._i18n.tools("add_image")["name"]: # type: ignore
try:
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
return result
@@ -187,26 +181,18 @@ class ToolUsage:
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys() # type: ignore
acceptable_args = tool.args_schema.model_json_schema()["properties"].keys() # type: ignore
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
# Add fingerprint metadata if available
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
except Exception:
arguments = calling.arguments
# Add fingerprint metadata if available
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
else:
# Add fingerprint metadata even to empty arguments
arguments = self._add_fingerprint_metadata({})
result = tool.invoke(input=arguments)
result = tool.invoke(input={})
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
@@ -216,7 +202,7 @@ class ToolUsage:
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageErrorException(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
f'\n{error_message}.\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
).message
self.task.increment_tools_errors()
if self.agent.verbose:
@@ -258,7 +244,6 @@ class ToolUsage:
tool_calling=calling,
from_cache=from_cache,
started_at=started_at,
result=result,
)
if (
@@ -395,7 +380,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f"{self._i18n.errors('tool_arguments_error')}"
f'{self._i18n.errors("tool_arguments_error")}'
)
if not isinstance(arguments, dict):
@@ -403,7 +388,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f"{self._i18n.errors('tool_arguments_error')}"
f'{self._i18n.errors("tool_arguments_error")}'
)
return ToolCalling(
@@ -431,7 +416,7 @@ class ToolUsage:
if self.agent.verbose:
self._printer.print(content=f"\n\n{e}\n", color="red")
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
f'{self._i18n.errors("tool_usage_error").format(error=e)}\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
)
return self._tool_calling(tool_string)
@@ -495,13 +480,8 @@ class ToolUsage:
"tool_name": self.action.tool,
"tool_args": str(self.action.tool_input),
"tool_class": self.__class__.__name__,
"agent": self.agent, # Adding agent for fingerprint extraction
}
# Include fingerprint context if available
if self.fingerprint_context:
tool_selection_data.update(self.fingerprint_context)
crewai_event_bus.emit(
self,
ToolValidateInputErrorEvent(**tool_selection_data, error=final_error),
@@ -512,12 +492,7 @@ class ToolUsage:
crewai_event_bus.emit(self, ToolUsageErrorEvent(**{**event_data, "error": e}))
def on_tool_use_finished(
self,
tool: Any,
tool_calling: ToolCalling,
from_cache: bool,
started_at: float,
result: Any,
self, tool: Any, tool_calling: ToolCalling, from_cache: bool, started_at: float
) -> None:
finished_at = time.time()
event_data = self._prepare_event_data(tool, tool_calling)
@@ -526,13 +501,12 @@ class ToolUsage:
"started_at": datetime.datetime.fromtimestamp(started_at),
"finished_at": datetime.datetime.fromtimestamp(finished_at),
"from_cache": from_cache,
"output": result,
}
)
crewai_event_bus.emit(self, ToolUsageFinishedEvent(**event_data))
def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict:
event_data = {
return {
"agent_key": self.agent.key,
"agent_role": (self.agent._original_role or self.agent.role),
"run_attempts": self._run_attempts,
@@ -540,43 +514,4 @@ class ToolUsage:
"tool_name": tool.name,
"tool_args": tool_calling.arguments,
"tool_class": tool.__class__.__name__,
"agent": self.agent, # Adding agent for fingerprint extraction
}
# Include fingerprint context if available
if self.fingerprint_context:
event_data.update(self.fingerprint_context)
return event_data
def _add_fingerprint_metadata(self, arguments: dict) -> dict:
"""Add fingerprint metadata to tool arguments if available.
Args:
arguments: The original tool arguments
Returns:
Updated arguments dictionary with fingerprint metadata
"""
# Create a shallow copy to avoid modifying the original
arguments = arguments.copy()
# Add security metadata under a designated key
if not "security_context" in arguments:
arguments["security_context"] = {}
security_context = arguments["security_context"]
# Add agent fingerprint if available
if hasattr(self, "agent") and hasattr(self.agent, "security_config"):
security_context["agent_fingerprint"] = self.agent.security_config.fingerprint.to_dict()
# Add task fingerprint if available
if hasattr(self, "task") and hasattr(self.task, "security_config"):
security_context["task_fingerprint"] = self.task.security_config.fingerprint.to_dict()
# Add crew fingerprint if available
if hasattr(self, "crew") and hasattr(self.crew, "security_config"):
security_context["crew_fingerprint"] = self.crew.security_config.fingerprint.to_dict()
return arguments

View File

@@ -45,7 +45,7 @@ class TaskEvaluator:
def evaluate(self, task, output) -> TaskEvaluation:
crewai_event_bus.emit(
self, TaskEvaluationEvent(evaluation_type="task_evaluation", task=task)
self, TaskEvaluationEvent(evaluation_type="task_evaluation")
)
evaluation_query = (
f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n"

View File

@@ -4,13 +4,13 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from .base_events import BaseEvent
from .base_events import CrewEvent
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
class AgentExecutionStartedEvent(BaseEvent):
class AgentExecutionStartedEvent(CrewEvent):
"""Event emitted when an agent starts executing a task"""
agent: BaseAgent
@@ -21,20 +21,8 @@ class AgentExecutionStartedEvent(BaseEvent):
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
class AgentExecutionCompletedEvent(BaseEvent):
class AgentExecutionCompletedEvent(CrewEvent):
"""Event emitted when an agent completes executing a task"""
agent: BaseAgent
@@ -42,35 +30,11 @@ class AgentExecutionCompletedEvent(BaseEvent):
output: str
type: str = "agent_execution_completed"
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
class AgentExecutionErrorEvent(BaseEvent):
class AgentExecutionErrorEvent(CrewEvent):
"""Event emitted when an agent encounters an error during execution"""
agent: BaseAgent
task: Any
error: str
type: str = "agent_execution_error"
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata

View File

@@ -1,28 +1,10 @@
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
class BaseEvent(BaseModel):
"""Base class for all events"""
class CrewEvent(BaseModel):
"""Base class for all crew events"""
timestamp: datetime = Field(default_factory=datetime.now)
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew"
fingerprint_metadata: Optional[Dict[str, Any]] = None # Any relevant metadata
def to_json(self, exclude: set[str] | None = None):
"""
Converts the event to a JSON-serializable dictionary.
Args:
exclude (set[str], optional): Set of keys to exclude from the result. Defaults to None.
Returns:
dict: A JSON-serializable dictionary.
"""
return to_serializable(self, exclude=exclude)

View File

@@ -1,102 +1,81 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union
from crewai.utilities.events.base_events import BaseEvent
from pydantic import InstanceOf
if TYPE_CHECKING:
from crewai.crew import Crew
else:
Crew = Any
from crewai.utilities.events.base_events import CrewEvent
class CrewBaseEvent(BaseEvent):
"""Base class for crew events with fingerprint handling"""
crew_name: Optional[str]
crew: Optional[Crew] = None
def __init__(self, **data):
super().__init__(**data)
self.set_crew_fingerprint()
def set_crew_fingerprint(self) -> None:
if self.crew and hasattr(self.crew, "fingerprint") and self.crew.fingerprint:
self.source_fingerprint = self.crew.fingerprint.uuid_str
self.source_type = "crew"
if (
hasattr(self.crew.fingerprint, "metadata")
and self.crew.fingerprint.metadata
):
self.fingerprint_metadata = self.crew.fingerprint.metadata
def to_json(self, exclude: set[str] | None = None):
if exclude is None:
exclude = set()
exclude.add("crew")
return super().to_json(exclude=exclude)
class CrewKickoffStartedEvent(CrewBaseEvent):
class CrewKickoffStartedEvent(CrewEvent):
"""Event emitted when a crew starts execution"""
crew_name: Optional[str]
inputs: Optional[Dict[str, Any]]
type: str = "crew_kickoff_started"
class CrewKickoffCompletedEvent(CrewBaseEvent):
class CrewKickoffCompletedEvent(CrewEvent):
"""Event emitted when a crew completes execution"""
crew_name: Optional[str]
output: Any
type: str = "crew_kickoff_completed"
class CrewKickoffFailedEvent(CrewBaseEvent):
class CrewKickoffFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete execution"""
error: str
crew_name: Optional[str]
type: str = "crew_kickoff_failed"
class CrewTrainStartedEvent(CrewBaseEvent):
class CrewTrainStartedEvent(CrewEvent):
"""Event emitted when a crew starts training"""
crew_name: Optional[str]
n_iterations: int
filename: str
inputs: Optional[Dict[str, Any]]
type: str = "crew_train_started"
class CrewTrainCompletedEvent(CrewBaseEvent):
class CrewTrainCompletedEvent(CrewEvent):
"""Event emitted when a crew completes training"""
crew_name: Optional[str]
n_iterations: int
filename: str
type: str = "crew_train_completed"
class CrewTrainFailedEvent(CrewBaseEvent):
class CrewTrainFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete training"""
error: str
crew_name: Optional[str]
type: str = "crew_train_failed"
class CrewTestStartedEvent(CrewBaseEvent):
class CrewTestStartedEvent(CrewEvent):
"""Event emitted when a crew starts testing"""
crew_name: Optional[str]
n_iterations: int
eval_llm: Optional[Union[str, Any]]
inputs: Optional[Dict[str, Any]]
type: str = "crew_test_started"
class CrewTestCompletedEvent(CrewBaseEvent):
class CrewTestCompletedEvent(CrewEvent):
"""Event emitted when a crew completes testing"""
crew_name: Optional[str]
type: str = "crew_test_completed"
class CrewTestFailedEvent(CrewBaseEvent):
class CrewTestFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete testing"""
error: str
crew_name: Optional[str]
type: str = "crew_test_failed"

View File

@@ -4,10 +4,10 @@ from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from blinker import Signal
from crewai.utilities.events.base_events import BaseEvent
from crewai.utilities.events.base_events import CrewEvent
from crewai.utilities.events.event_types import EventTypes
EventT = TypeVar("EventT", bound=BaseEvent)
EventT = TypeVar("EventT", bound=CrewEvent)
class CrewAIEventsBus:
@@ -30,7 +30,7 @@ class CrewAIEventsBus:
def _initialize(self) -> None:
"""Initialize the event bus internal state"""
self._signal = Signal("crewai_event_bus")
self._handlers: Dict[Type[BaseEvent], List[Callable]] = {}
self._handlers: Dict[Type[CrewEvent], List[Callable]] = {}
def on(
self, event_type: Type[EventT]
@@ -59,7 +59,7 @@ class CrewAIEventsBus:
return decorator
def emit(self, source: Any, event: BaseEvent) -> None:
def emit(self, source: Any, event: CrewEvent) -> None:
"""
Emit an event to all registered handlers

View File

@@ -2,10 +2,10 @@ from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, ConfigDict
from .base_events import BaseEvent
from .base_events import CrewEvent
class FlowEvent(BaseEvent):
class FlowEvent(CrewEvent):
"""Base class for all flow events"""
type: str

View File

@@ -1,7 +1,7 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from crewai.utilities.events.base_events import BaseEvent
from crewai.utilities.events.base_events import CrewEvent
class LLMCallType(Enum):
@@ -11,22 +11,17 @@ class LLMCallType(Enum):
LLM_CALL = "llm_call"
class LLMCallStartedEvent(BaseEvent):
"""Event emitted when a LLM call starts
Attributes:
messages: Content can be either a string or a list of dictionaries that support
multimodal content (text, images, etc.)
"""
class LLMCallStartedEvent(CrewEvent):
"""Event emitted when a LLM call starts"""
type: str = "llm_call_started"
messages: Union[str, List[Dict[str, Any]]]
messages: Union[str, List[Dict[str, str]]]
tools: Optional[List[dict]] = None
callbacks: Optional[List[Any]] = None
available_functions: Optional[Dict[str, Any]] = None
class LLMCallCompletedEvent(BaseEvent):
class LLMCallCompletedEvent(CrewEvent):
"""Event emitted when a LLM call completes"""
type: str = "llm_call_completed"
@@ -34,14 +29,14 @@ class LLMCallCompletedEvent(BaseEvent):
call_type: LLMCallType
class LLMCallFailedEvent(BaseEvent):
class LLMCallFailedEvent(CrewEvent):
"""Event emitted when a LLM call fails"""
error: str
type: str = "llm_call_failed"
class LLMStreamChunkEvent(BaseEvent):
class LLMStreamChunkEvent(CrewEvent):
"""Event emitted when a streaming chunk is received"""
type: str = "llm_stream_chunk"

View File

@@ -1,84 +1,32 @@
from typing import Any, Optional
from typing import Optional
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.base_events import BaseEvent
from crewai.utilities.events.base_events import CrewEvent
class TaskStartedEvent(BaseEvent):
class TaskStartedEvent(CrewEvent):
"""Event emitted when a task starts"""
type: str = "task_started"
context: Optional[str]
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class TaskCompletedEvent(BaseEvent):
class TaskCompletedEvent(CrewEvent):
"""Event emitted when a task completes"""
output: TaskOutput
type: str = "task_completed"
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class TaskFailedEvent(BaseEvent):
class TaskFailedEvent(CrewEvent):
"""Event emitted when a task fails"""
error: str
type: str = "task_failed"
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class TaskEvaluationEvent(BaseEvent):
class TaskEvaluationEvent(CrewEvent):
"""Event emitted when a task evaluation is completed"""
type: str = "task_evaluation"
evaluation_type: str
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata

View File

@@ -1,10 +1,10 @@
from datetime import datetime
from typing import Any, Callable, Dict, Optional
from typing import Any, Callable, Dict
from .base_events import BaseEvent
from .base_events import CrewEvent
class ToolUsageEvent(BaseEvent):
class ToolUsageEvent(CrewEvent):
"""Base event for tool usage tracking"""
agent_key: str
@@ -14,22 +14,9 @@ class ToolUsageEvent(BaseEvent):
tool_class: str
run_attempts: int | None = None
delegations: int | None = None
agent: Optional[Any] = None
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
class ToolUsageStartedEvent(ToolUsageEvent):
"""Event emitted when a tool execution is started"""
@@ -43,7 +30,6 @@ class ToolUsageFinishedEvent(ToolUsageEvent):
started_at: datetime
finished_at: datetime
from_cache: bool = False
output: Any
type: str = "tool_usage_finished"
@@ -68,7 +54,7 @@ class ToolSelectionErrorEvent(ToolUsageEvent):
type: str = "tool_selection_error"
class ToolExecutionErrorEvent(BaseEvent):
class ToolExecutionErrorEvent(CrewEvent):
"""Event emitted when a tool execution encounters an error"""
error: Any
@@ -76,16 +62,3 @@ class ToolExecutionErrorEvent(BaseEvent):
tool_name: str
tool_args: Dict[str, Any]
tool_class: Callable
agent: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata

View File

@@ -507,10 +507,9 @@ class ConsoleFormatter:
# Remove the thinking status node when complete
if "Thinking" in str(tool_branch.label):
if tool_branch in agent_branch.children:
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
def handle_llm_call_failed(
self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree]
@@ -588,7 +587,6 @@ class ConsoleFormatter:
for child in flow_tree.children:
if "Running tests" in str(child.label):
child.label = Text("✅ Tests completed successfully", style="green")
break
self.print(flow_tree)
self.print()

View File

@@ -1,9 +1,8 @@
"""Test Agent creation and execution basic functionality."""
import concurrent.futures
import os
from unittest import mock
from unittest.mock import MagicMock, patch
from unittest.mock import patch
import pytest
@@ -547,98 +546,6 @@ def test_agent_moved_on_after_max_iterations():
assert output == "42"
def test_agent_timeout_with_max_execution_time():
"""Test that an agent with max_execution_time raises a TimeoutError when execution takes too long."""
@tool
def slow_tool() -> str:
"""A tool that takes a long time to execute."""
import time
time.sleep(2) # Sleep for 2 seconds
return "This is a slow response"
with patch.object(Agent, "_execute_task_without_timeout") as mock_execute:
mock_execute.side_effect = concurrent.futures.TimeoutError()
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
max_execution_time=1, # Set timeout to 1 second
allow_delegation=False,
)
task = Task(
description="Use the slow_tool and wait for its response.",
expected_output="The response from the slow tool.",
)
with pytest.raises(TimeoutError):
agent.execute_task(
task=task,
tools=[slow_tool],
)
def test_agent_partial_result_with_timeout():
"""Test that an agent with max_execution_time can return a partial result before timeout."""
@tool
def slow_tool() -> str:
"""A tool that takes a long time to execute."""
import time
time.sleep(0.1) # Just a small delay
return "This is a slow response"
with patch("concurrent.futures.ThreadPoolExecutor.submit") as mock_submit:
mock_future = MagicMock()
mock_future.result.side_effect = concurrent.futures.TimeoutError()
mock_submit.return_value = mock_future
with patch.object(LLM, "call") as mock_llm_call:
mock_llm_call.return_value = "Partial result due to timeout"
with patch.object(CrewAgentExecutor, "_format_answer") as mock_format_answer:
mock_format_answer.return_value = AgentFinish(
thought="",
output="Partial result due to timeout",
text="Partial result due to timeout",
)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
max_execution_time=1, # Set timeout to 1 second
allow_delegation=False,
)
agent.agent_executor = MagicMock()
agent.agent_executor.messages = []
agent.agent_executor.llm = MagicMock()
agent.agent_executor.llm.call.return_value = "Partial result due to timeout"
agent.agent_executor._format_answer.return_value = AgentFinish(
thought="",
output="Partial result due to timeout",
text="Partial result due to timeout",
)
agent.agent_executor.callbacks = []
task = Task(
description="Use the slow_tool and wait for its response.",
expected_output="The response from the slow tool.",
)
try:
result = agent.execute_task(
task=task,
tools=[slow_tool],
)
assert "Partial result" in result
except Exception as e:
assert isinstance(e, TimeoutError)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_respect_the_max_rpm_set(capsys):
@tool

View File

@@ -1,378 +0,0 @@
interactions:
- request:
body: !!binary |
CpIKCiQKIgoMc2VydmljZS5uYW1lEhIKEGNyZXdBSS10ZWxlbWV0cnkS6QkKEgoQY3Jld2FpLnRl
bGVtZXRyeRLBBwoQ08SlQ6w2FsCauTgZCqberRIITfOsgNi1qJkqDENyZXcgQ3JlYXRlZDABOdjG
6D/PcDAYQahPEkDPcDAYShsKDmNyZXdhaV92ZXJzaW9uEgkKBzAuMTA4LjBKGgoOcHl0aG9uX3Zl
cnNpb24SCAoGMy4xMi45Si4KCGNyZXdfa2V5EiIKIDkwNzMxMTU4MzVlMWNhZjJhNmUxNTIyZDA1
YTBiNTFkSjEKB2NyZXdfaWQSJgokMzdjOGM4NzgtN2NmZC00YjEyLWE4YzctYzIyZDZlOTIxODBk
ShwKDGNyZXdfcHJvY2VzcxIMCgpzZXF1ZW50aWFsShEKC2NyZXdfbWVtb3J5EgIQAEoaChRjcmV3
X251bWJlcl9vZl90YXNrcxICGAFKGwoVY3Jld19udW1iZXJfb2ZfYWdlbnRzEgIYAUrgAgoLY3Jl
d19hZ2VudHMS0AIKzQJbeyJrZXkiOiAiNzYyM2ZjNGY3ZDk0Y2YzZmRiZmNjMjlmYjBiMDIyYmIi
LCAiaWQiOiAiYmVjMjljMTAtOTljYi00MzQwLWIwYTItMWU1NTVkNGRmZGM0IiwgInJvbGUiOiAi
VmlzdWFsIFF1YWxpdHkgSW5zcGVjdG9yIiwgInZlcmJvc2U/IjogdHJ1ZSwgIm1heF9pdGVyIjog
MjUsICJtYXhfcnBtIjogbnVsbCwgImZ1bmN0aW9uX2NhbGxpbmdfbGxtIjogIiIsICJsbG0iOiAi
b3BlbmFpL2dwdC00byIsICJkZWxlZ2F0aW9uX2VuYWJsZWQ/IjogZmFsc2UsICJhbGxvd19jb2Rl
X2V4ZWN1dGlvbj8iOiBmYWxzZSwgIm1heF9yZXRyeV9saW1pdCI6IDIsICJ0b29sc19uYW1lcyI6
IFtdfV1KjQIKCmNyZXdfdGFza3MS/gEK+wFbeyJrZXkiOiAiMDExM2E5ZTg0N2M2NjI2ZDY0ZDZk
Yzk4M2IwNDA5MTgiLCAiaWQiOiAiZWQzYmY1YWUtZTBjMS00MjIxLWFhYTgtMThlNjVkYTMyZjc1
IiwgImFzeW5jX2V4ZWN1dGlvbj8iOiBmYWxzZSwgImh1bWFuX2lucHV0PyI6IGZhbHNlLCAiYWdl
bnRfcm9sZSI6ICJWaXN1YWwgUXVhbGl0eSBJbnNwZWN0b3IiLCAiYWdlbnRfa2V5IjogIjc2MjNm
YzRmN2Q5NGNmM2ZkYmZjYzI5ZmIwYjAyMmJiIiwgInRvb2xzX25hbWVzIjogW119XXoCGAGFAQAB
AAASjgIKECo77ESam8oLrZMmgLLaoksSCLE6x14/Kb1vKgxUYXNrIENyZWF0ZWQwATlI/chAz3Aw
GEEAgMpAz3AwGEouCghjcmV3X2tleRIiCiA5MDczMTE1ODM1ZTFjYWYyYTZlMTUyMmQwNWEwYjUx
ZEoxCgdjcmV3X2lkEiYKJDM3YzhjODc4LTdjZmQtNGIxMi1hOGM3LWMyMmQ2ZTkyMTgwZEouCgh0
YXNrX2tleRIiCiAwMTEzYTllODQ3YzY2MjZkNjRkNmRjOTgzYjA0MDkxOEoxCgd0YXNrX2lkEiYK
JGVkM2JmNWFlLWUwYzEtNDIyMS1hYWE4LTE4ZTY1ZGEzMmY3NXoCGAGFAQABAAA=
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '1301'
Content-Type:
- application/x-protobuf
User-Agent:
- OTel-OTLP-Exporter-Python/1.31.1
method: POST
uri: https://telemetry.crewai.com:4319/v1/traces
response:
body:
string: "\n\0"
headers:
Content-Length:
- '2'
Content-Type:
- application/x-protobuf
Date:
- Wed, 26 Mar 2025 19:24:52 GMT
status:
code: 200
message: OK
- request:
body: '{"messages": [{"role": "system", "content": "You are Visual Quality Inspector.
Senior quality control expert with expertise in visual inspection\nYour personal
goal is: Perform detailed quality analysis of product images\nYou ONLY have
access to the following tools, and should NEVER make up tools that are not listed
here:\n\nTool Name: Add image to content\nTool Arguments: {''image_url'': {''description'':
''The URL or path of the image to add'', ''type'': ''str''}, ''action'': {''description'':
''Optional context or question about the image'', ''type'': ''Union[str, NoneType]''}}\nTool
Description: See image to understand its content, you can optionally ask a question
about the image\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought:
you should always think about what to do\nAction: the action to take, only one
name of [Add image to content], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: \n Analyze
the product image at https://www.us.maguireshoes.com/cdn/shop/files/FW24-Edito-Lucena-Distressed-01_1920x.jpg?v=1736371244
with focus on:\n 1. Quality of materials\n 2. Manufacturing defects\n 3.
Compliance with standards\n Provide a detailed report highlighting any
issues found.\n \n\nThis is the expected criteria for your final answer:
A detailed report highlighting any issues found\nyou MUST return the actual
complete content as the final answer, not a summary.\n\nBegin! This is VERY
important to you, use the tools available and give your best Final Answer, your
job depends on it!\n\nThought:"}], "model": "gpt-4o", "stop": ["\nObservation:"],
"temperature": 0.7}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '2033'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.68.2
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.68.2
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-BFQepLwSYYzdKLylSFsgcJeg6GTqS\",\n \"object\":
\"chat.completion\",\n \"created\": 1743017091,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Thought: I need to examine the product
image to assess the quality of materials, look for any manufacturing defects,
and check compliance with standards.\\n\\nAction: Add image to content\\nAction
Input: {\\\"image_url\\\": \\\"https://www.us.maguireshoes.com/cdn/shop/files/FW24-Edito-Lucena-Distressed-01_1920x.jpg?v=1736371244\\\",
\\\"action\\\": \\\"Analyze the quality of materials, manufacturing defects,
and compliance with standards.\\\"}\",\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 413,\n \"completion_tokens\":
101,\n \"total_tokens\": 514,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_7e8d90e604\"\n}\n"
headers:
CF-RAY:
- 926907d79dcff1e7-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Wed, 26 Mar 2025 19:24:53 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=WK433.4kW8cr9rwvOlk4EZ2SfRYK9lAPwXCBYEvLcmU-1743017093-1.0.1.1-kVZyUew5rUbMk.2koGJF_rmX.fTseqN241n2M40n8KvBGoKgy6KM6xBmvFbIVWxUs2Y5ZAz8mWy9CrGjaNKSfCzxmv4.pq78z_DGHr37PgI;
path=/; expires=Wed, 26-Mar-25 19:54:53 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=T77PMcuNYeyzK0tQyDOe7EScjVBVzW_7DpD3YQBqmUc-1743017093675-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '1729'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '50000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '49999'
x-ratelimit-remaining-tokens:
- '149999534'
x-ratelimit-reset-requests:
- 1ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_2399c3355adf16734907c73611a7d330
http_version: HTTP/1.1
status_code: 200
- request:
body: !!binary |
CtgBCiQKIgoMc2VydmljZS5uYW1lEhIKEGNyZXdBSS10ZWxlbWV0cnkSrwEKEgoQY3Jld2FpLnRl
bGVtZXRyeRKYAQoQp2ACB2xRGve4HGtU2RdWCBIIlQcsbhK22ykqClRvb2wgVXNhZ2UwATlACEXG
z3AwGEHAjGPGz3AwGEobCg5jcmV3YWlfdmVyc2lvbhIJCgcwLjEwOC4wSiMKCXRvb2xfbmFtZRIW
ChRBZGQgaW1hZ2UgdG8gY29udGVudEoOCghhdHRlbXB0cxICGAF6AhgBhQEAAQAA
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '219'
Content-Type:
- application/x-protobuf
User-Agent:
- OTel-OTLP-Exporter-Python/1.31.1
method: POST
uri: https://telemetry.crewai.com:4319/v1/traces
response:
body:
string: "\n\0"
headers:
Content-Length:
- '2'
Content-Type:
- application/x-protobuf
Date:
- Wed, 26 Mar 2025 19:24:57 GMT
status:
code: 200
message: OK
- request:
body: '{"messages": [{"role": "system", "content": "You are Visual Quality Inspector.
Senior quality control expert with expertise in visual inspection\nYour personal
goal is: Perform detailed quality analysis of product images\nYou ONLY have
access to the following tools, and should NEVER make up tools that are not listed
here:\n\nTool Name: Add image to content\nTool Arguments: {''image_url'': {''description'':
''The URL or path of the image to add'', ''type'': ''str''}, ''action'': {''description'':
''Optional context or question about the image'', ''type'': ''Union[str, NoneType]''}}\nTool
Description: See image to understand its content, you can optionally ask a question
about the image\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought:
you should always think about what to do\nAction: the action to take, only one
name of [Add image to content], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: \n Analyze
the product image at https://www.us.maguireshoes.com/cdn/shop/files/FW24-Edito-Lucena-Distressed-01_1920x.jpg?v=1736371244
with focus on:\n 1. Quality of materials\n 2. Manufacturing defects\n 3.
Compliance with standards\n Provide a detailed report highlighting any
issues found.\n \n\nThis is the expected criteria for your final answer:
A detailed report highlighting any issues found\nyou MUST return the actual
complete content as the final answer, not a summary.\n\nBegin! This is VERY
important to you, use the tools available and give your best Final Answer, your
job depends on it!\n\nThought:"}, {"role": "user", "content": [{"type": "text",
"text": "Analyze the quality of materials, manufacturing defects, and compliance
with standards."}, {"type": "image_url", "image_url": {"url": "https://www.us.maguireshoes.com/cdn/shop/files/FW24-Edito-Lucena-Distressed-01_1920x.jpg?v=1736371244"}}]},
{"role": "assistant", "content": "Thought: I need to examine the product image
to assess the quality of materials, look for any manufacturing defects, and
check compliance with standards.\n\nAction: Add image to content\nAction Input:
{\"image_url\": \"https://www.us.maguireshoes.com/cdn/shop/files/FW24-Edito-Lucena-Distressed-01_1920x.jpg?v=1736371244\",
\"action\": \"Analyze the quality of materials, manufacturing defects, and compliance
with standards.\"}"}], "model": "gpt-4o", "stop": ["\nObservation:"], "temperature":
0.7}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '2797'
content-type:
- application/json
cookie:
- __cf_bm=WK433.4kW8cr9rwvOlk4EZ2SfRYK9lAPwXCBYEvLcmU-1743017093-1.0.1.1-kVZyUew5rUbMk.2koGJF_rmX.fTseqN241n2M40n8KvBGoKgy6KM6xBmvFbIVWxUs2Y5ZAz8mWy9CrGjaNKSfCzxmv4.pq78z_DGHr37PgI;
_cfuvid=T77PMcuNYeyzK0tQyDOe7EScjVBVzW_7DpD3YQBqmUc-1743017093675-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.68.2
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.68.2
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-BFQetNNvmPgPxhzaKiHYsPqm8aN0i\",\n \"object\":
\"chat.completion\",\n \"created\": 1743017095,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Observation: The image displays a black
leather boot with a pointed toe and a low heel. \\n\\nQuality of Materials:\\n1.
The leather appears to be of good quality, displaying a consistent texture and
finish, which suggests durability.\\n2. The material has a slight sheen, indicating
a possible finishing treatment that enhances the appearance and may offer some
protection.\\n\\nManufacturing Defects:\\n1. There are no visible stitching
errors; the seams appear straight and clean.\\n2. No apparent glue marks or
uneven edges, which indicates good craftsmanship.\\n3. There is a slight distressed
effect, but it appears intentional as part of the design rather than a defect.\\n\\nCompliance
with Standards:\\n1. The shoe design seems to comply with typical fashion standards,
showing a balance of aesthetics and functionality.\\n2. The heel height and
shape appear to provide stability, aligning with safety standards for footwear.\\n\\nFinal
Answer: The analysis of the product image reveals that the black leather boot
is made of high-quality materials with no visible manufacturing defects. The
craftsmanship is precise, with clean seams and a well-executed design. The distressed
effect appears intentional and part of the aesthetic. The boot seems to comply
with fashion and safety standards, offering both style and functionality. No
significant issues were found.\",\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 1300,\n \"completion_tokens\":
250,\n \"total_tokens\": 1550,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_3a5b33c01a\"\n}\n"
headers:
CF-RAY:
- 926907e45f33f1e7-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Wed, 26 Mar 2025 19:25:01 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '7242'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-input-images:
- '250000'
x-ratelimit-limit-requests:
- '50000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-input-images:
- '249999'
x-ratelimit-remaining-requests:
- '49999'
x-ratelimit-remaining-tokens:
- '149998641'
x-ratelimit-reset-input-images:
- 0s
x-ratelimit-reset-requests:
- 1ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_c5dd144c8ac1bb3bd96ffbba40707b2d
http_version: HTTP/1.1
status_code: 200
version: 1

View File

@@ -11,9 +11,7 @@ import pydantic_core
import pytest
from crewai.agent import Agent
from crewai.agents import CacheHandler
from crewai.agents.cache import CacheHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
@@ -3733,44 +3731,6 @@ def test_multimodal_agent_image_tool_handling():
assert result["content"][1]["type"] == "image_url"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multimodal_agent_describing_image_successfully():
"""
Test that a multimodal agent can process images without validation errors.
This test reproduces the scenario from issue #2475.
"""
llm = LLM(model="openai/gpt-4o", temperature=0.7) # model with vision capabilities
expert_analyst = Agent(
role="Visual Quality Inspector",
goal="Perform detailed quality analysis of product images",
backstory="Senior quality control expert with expertise in visual inspection",
llm=llm,
verbose=True,
allow_delegation=False,
multimodal=True,
)
inspection_task = Task(
description="""
Analyze the product image at https://www.us.maguireshoes.com/cdn/shop/files/FW24-Edito-Lucena-Distressed-01_1920x.jpg?v=1736371244 with focus on:
1. Quality of materials
2. Manufacturing defects
3. Compliance with standards
Provide a detailed report highlighting any issues found.
""",
expected_output="A detailed report highlighting any issues found",
agent=expert_analyst,
)
crew = Crew(agents=[expert_analyst], tasks=[inspection_task])
result = crew.kickoff()
task_output = result.tasks_output[0]
assert isinstance(task_output, TaskOutput)
assert task_output.raw == result.raw
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multimodal_agent_live_image_analysis():
"""
@@ -4065,52 +4025,3 @@ def test_crew_with_knowledge_sources_works_with_copy():
assert len(crew_copy.tasks) == len(crew.tasks)
assert len(crew_copy.tasks) == len(crew.tasks)
def test_crew_kickoff_for_each_works_with_manager_agent_copy():
researcher = Agent(
role="Researcher",
goal="Conduct thorough research and analysis on AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI, and startups. You work as a freelancer and are currently researching for a new client.",
allow_delegation=False
)
writer = Agent(
role="Senior Writer",
goal="Create compelling content about AI and AI agents",
backstory="You're a senior writer, specialized in technology, software engineering, AI, and startups. You work as a freelancer and are currently writing content for a new client.",
allow_delegation=False
)
# Define task
task = Task(
description="Generate a list of 5 interesting ideas for an article, then write one captivating paragraph for each idea that showcases the potential of a full article on this topic. Return the list of ideas with their paragraphs and your notes.",
expected_output="5 bullet points, each with a paragraph and accompanying notes.",
)
# Define manager agent
manager = Agent(
role="Project Manager",
goal="Efficiently manage the crew and ensure high-quality task completion",
backstory="You're an experienced project manager, skilled in overseeing complex projects and guiding teams to success. Your role is to coordinate the efforts of the crew members, ensuring that each task is completed on time and to the highest standard.",
allow_delegation=True
)
# Instantiate crew with a custom manager
crew = Crew(
agents=[researcher, writer],
tasks=[task],
manager_agent=manager,
process=Process.hierarchical,
verbose=True
)
crew_copy = crew.copy()
assert crew_copy.manager_agent is not None
assert crew_copy.manager_agent.id != crew.manager_agent.id
assert crew_copy.manager_agent.role == crew.manager_agent.role
assert crew_copy.manager_agent.goal == crew.manager_agent.goal
assert crew_copy.manager_agent.backstory == crew.manager_agent.backstory
assert isinstance(crew_copy.manager_agent.agent_executor, CrewAgentExecutor)
assert isinstance(crew_copy.manager_agent.cache_handler, CacheHandler)

View File

@@ -5,7 +5,8 @@ from unittest.mock import Mock
import pytest
from pydantic import BaseModel
from crewai.utilities.serialization import to_serializable, to_string
from crewai.flow import Flow
from crewai.flow.state_utils import export_state, to_serializable, to_string
class Address(BaseModel):
@@ -22,6 +23,16 @@ class Person(BaseModel):
skills: List[str]
@pytest.fixture
def mock_flow():
def create_flow(state):
flow = Mock(spec=Flow)
flow._state = state
return flow
return create_flow
@pytest.mark.parametrize(
"test_input,expected",
[
@@ -36,8 +47,9 @@ class Person(BaseModel):
({"nested": [1, [2, 3], {4, 5}]}, {"nested": [1, [2, 3], [4, 5]]}),
],
)
def test_basic_serialization(test_input, expected):
result = to_serializable(test_input)
def test_basic_serialization(mock_flow, test_input, expected):
flow = mock_flow(test_input)
result = export_state(flow)
assert result == expected
@@ -48,8 +60,9 @@ def test_basic_serialization(test_input, expected):
(datetime(2024, 1, 1, 12, 30), "2024-01-01T12:30:00"),
],
)
def test_temporal_serialization(input_date, expected):
result = to_serializable({"date": input_date})
def test_temporal_serialization(mock_flow, input_date, expected):
flow = mock_flow({"date": input_date})
result = export_state(flow)
assert result["date"] == expected
@@ -62,8 +75,9 @@ def test_temporal_serialization(input_date, expected):
("normal", "value", str),
],
)
def test_dictionary_key_serialization(key, value, expected_key_type):
result = to_serializable({key: value})
def test_dictionary_key_serialization(mock_flow, key, value, expected_key_type):
flow = mock_flow({key: value})
result = export_state(flow)
assert len(result) == 1
result_key = next(iter(result.keys()))
assert isinstance(result_key, expected_key_type)
@@ -77,13 +91,14 @@ def test_dictionary_key_serialization(key, value, expected_key_type):
(str.upper, "upper"),
],
)
def test_callable_serialization(callable_obj, expected_in_result):
result = to_serializable({"func": callable_obj})
def test_callable_serialization(mock_flow, callable_obj, expected_in_result):
flow = mock_flow({"func": callable_obj})
result = export_state(flow)
assert isinstance(result["func"], str)
assert expected_in_result in result["func"].lower()
def test_pydantic_model_serialization():
def test_pydantic_model_serialization(mock_flow):
address = Address(street="123 Main St", city="Tech City", country="Pythonia")
person = Person(
@@ -94,21 +109,23 @@ def test_pydantic_model_serialization():
skills=["Python", "Testing"],
)
data = {
"single_model": address,
"nested_model": person,
"model_list": [address, address],
"model_dict": {"home": address},
}
flow = mock_flow(
{
"single_model": address,
"nested_model": person,
"model_list": [address, address],
"model_dict": {"home": address},
}
)
result = to_serializable(data)
result = export_state(flow)
assert (
to_string(result)
== '{"single_model": {"street": "123 Main St", "city": "Tech City", "country": "Pythonia"}, "nested_model": {"name": "John Doe", "age": 30, "address": {"street": "123 Main St", "city": "Tech City", "country": "Pythonia"}, "birthday": "1994-01-01", "skills": ["Python", "Testing"]}, "model_list": [{"street": "123 Main St", "city": "Tech City", "country": "Pythonia"}, {"street": "123 Main St", "city": "Tech City", "country": "Pythonia"}], "model_dict": {"home": {"street": "123 Main St", "city": "Tech City", "country": "Pythonia"}}}'
)
def test_depth_limit():
def test_depth_limit(mock_flow):
"""Test max depth handling with a deeply nested structure"""
def create_nested(depth):
@@ -117,7 +134,8 @@ def test_depth_limit():
return {"next": create_nested(depth - 1)}
deep_structure = create_nested(10)
result = to_serializable(deep_structure)
flow = mock_flow(deep_structure)
result = export_state(flow)
assert result == {
"next": {

View File

@@ -1,68 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
from mem0.memory.main import Memory
from crewai.memory.user.user_memory import UserMemory
from crewai.memory.user.user_memory_item import UserMemoryItem
class MockCrew:
def __init__(self, memory_config):
self.memory_config = memory_config
@pytest.fixture
def user_memory():
"""Fixture to create a UserMemory instance"""
crew = MockCrew(
memory_config={
"provider": "mem0",
"config": {"user_id": "john"},
"user_memory" : {}
}
)
user_memory = MagicMock(spec=UserMemory)
with patch.object(Memory,'__new__',return_value=user_memory):
user_memory_instance = UserMemory(crew=crew)
return user_memory_instance
def test_save_and_search(user_memory):
memory = UserMemoryItem(
data="""test value test value test value test value test value test value
test value test value test value test value test value test value
test value test value test value test value test value test value""",
user="test_user",
metadata={"task": "test_task"},
)
with patch.object(UserMemory, "save") as mock_save:
user_memory.save(
value=memory.data,
metadata=memory.metadata,
user=memory.user
)
mock_save.assert_called_once_with(
value=memory.data,
metadata=memory.metadata,
user=memory.user
)
expected_result = [
{
"context": memory.data,
"metadata": {"agent": "test_agent"},
"score": 0.95,
}
]
expected_result = ["mocked_result"]
# Use patch.object to mock UserMemory's search method
with patch.object(UserMemory, 'search', return_value=expected_result) as mock_search:
find = UserMemory.search("test value", score_threshold=0.01)[0]
mock_search.assert_called_once_with("test value", score_threshold=0.01)
assert find == expected_result[0]

View File

@@ -1,114 +0,0 @@
import os
from unittest.mock import MagicMock, patch
import pytest
from mem0.client.main import MemoryClient
from mem0.memory.main import Memory
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.storage.mem0_storage import Mem0Storage
from crewai.task import Task
# Define the class (if not already defined)
class MockCrew:
def __init__(self, memory_config):
self.memory_config = memory_config
@pytest.fixture
def mock_mem0_memory():
"""Fixture to create a mock Memory instance"""
mock_memory = MagicMock(spec=Memory)
return mock_memory
@pytest.fixture
def mem0_storage_with_mocked_config(mock_mem0_memory):
"""Fixture to create a Mem0Storage instance with mocked dependencies"""
# Patch the Memory class to return our mock
with patch('mem0.memory.main.Memory.from_config', return_value=mock_mem0_memory):
config = {
"vector_store": {
"provider": "mock_vector_store",
"config": {
"host": "localhost",
"port": 6333
}
},
"llm": {
"provider": "mock_llm",
"config": {
"api_key": "mock-api-key",
"model": "mock-model"
}
},
"embedder": {
"provider": "mock_embedder",
"config": {
"api_key": "mock-api-key",
"model": "mock-model"
}
},
"graph_store": {
"provider": "mock_graph_store",
"config": {
"url": "mock-url",
"username": "mock-user",
"password": "mock-password"
}
},
"history_db_path": "/mock/path",
"version": "test-version",
"custom_fact_extraction_prompt": "mock prompt 1",
"custom_update_memory_prompt": "mock prompt 2"
}
# Instantiate the class with memory_config
crew = MockCrew(
memory_config={
"provider": "mem0",
"config": {"user_id": "test_user", "local_mem0_config": config},
}
)
mem0_storage = Mem0Storage(type="short_term", crew=crew)
return mem0_storage
def test_mem0_storage_initialization(mem0_storage_with_mocked_config, mock_mem0_memory):
"""Test that Mem0Storage initializes correctly with the mocked config"""
assert mem0_storage_with_mocked_config.memory_type == "short_term"
assert mem0_storage_with_mocked_config.memory is mock_mem0_memory
@pytest.fixture
def mock_mem0_memory_client():
"""Fixture to create a mock MemoryClient instance"""
mock_memory = MagicMock(spec=MemoryClient)
return mock_memory
@pytest.fixture
def mem0_storage_with_memory_client(mock_mem0_memory_client):
"""Fixture to create a Mem0Storage instance with mocked dependencies"""
# We need to patch the MemoryClient before it's instantiated
with patch.object(MemoryClient, '__new__', return_value=mock_mem0_memory_client):
crew = MockCrew(
memory_config={
"provider": "mem0",
"config": {"user_id": "test_user", "api_key": "ABCDEFGH", "org_id": "my_org_id", "project_id": "my_project_id"},
}
)
mem0_storage = Mem0Storage(type="short_term", crew=crew)
return mem0_storage
def test_mem0_storage_with_memory_client_initialization(mem0_storage_with_memory_client, mock_mem0_memory_client):
"""Test Mem0Storage initialization with MemoryClient"""
assert mem0_storage_with_memory_client.memory_type == "short_term"
assert mem0_storage_with_memory_client.memory is mock_mem0_memory_client

View File

@@ -1,46 +0,0 @@
import os
import pytest
from crewai import LLM, Agent, Crew, Task
@pytest.mark.skip(reason="Only run manually with valid API keys")
def test_multimodal_agent_with_image_url():
"""
Test that a multimodal agent can process images without validation errors.
This test reproduces the scenario from issue #2475.
"""
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
pytest.skip("OPENAI_API_KEY environment variable not set")
llm = LLM(
model="openai/gpt-4o", # model with vision capabilities
api_key=OPENAI_API_KEY,
temperature=0.7
)
expert_analyst = Agent(
role="Visual Quality Inspector",
goal="Perform detailed quality analysis of product images",
backstory="Senior quality control expert with expertise in visual inspection",
llm=llm,
verbose=True,
allow_delegation=False,
multimodal=True
)
inspection_task = Task(
description="""
Analyze the product image at https://www.us.maguireshoes.com/collections/spring-25/products/lucena-black-boot with focus on:
1. Quality of materials
2. Manufacturing defects
3. Compliance with standards
Provide a detailed report highlighting any issues found.
""",
expected_output="A detailed report highlighting any issues found",
agent=expert_analyst
)
crew = Crew(agents=[expert_analyst], tasks=[inspection_task])

View File

@@ -1,7 +1,5 @@
import datetime
import json
import random
import time
from unittest.mock import MagicMock, patch
import pytest
@@ -13,7 +11,6 @@ from crewai.tools.tool_usage import ToolUsage
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.tool_usage_events import (
ToolSelectionErrorEvent,
ToolUsageFinishedEvent,
ToolValidateInputErrorEvent,
)
@@ -627,161 +624,3 @@ def test_tool_validate_input_error_event():
assert event.agent_role == "test_role"
assert event.tool_name == "test_tool"
assert "must be a valid dictionary" in event.error
def test_tool_usage_finished_event_with_result():
"""Test that ToolUsageFinishedEvent is emitted with correct result attributes."""
# Create mock agent with proper string values
mock_agent = MagicMock()
mock_agent.key = "test_agent_key"
mock_agent.role = "test_agent_role"
mock_agent._original_role = "test_agent_role"
mock_agent.i18n = MagicMock()
mock_agent.verbose = False
# Create mock task
mock_task = MagicMock()
mock_task.delegations = 0
# Create mock tool
class TestTool(BaseTool):
name: str = "Test Tool"
description: str = "A test tool"
def _run(self, input: dict) -> str:
return "test result"
test_tool = TestTool()
# Create mock tool calling
mock_tool_calling = MagicMock()
mock_tool_calling.arguments = {"arg1": "value1"}
# Create ToolUsage instance
tool_usage = ToolUsage(
tools_handler=MagicMock(),
tools=[test_tool],
original_tools=[test_tool],
tools_description="Test Tool Description",
tools_names="Test Tool",
task=mock_task,
function_calling_llm=None,
agent=mock_agent,
action=MagicMock(),
)
# Track received events
received_events = []
@crewai_event_bus.on(ToolUsageFinishedEvent)
def event_handler(source, event):
received_events.append(event)
# Call on_tool_use_finished with test data
started_at = time.time()
result = "test output result"
tool_usage.on_tool_use_finished(
tool=test_tool,
tool_calling=mock_tool_calling,
from_cache=False,
started_at=started_at,
result=result,
)
# Verify event was emitted
assert len(received_events) == 1, "Expected one event to be emitted"
event = received_events[0]
assert isinstance(event, ToolUsageFinishedEvent)
# Verify event attributes
assert event.agent_key == "test_agent_key"
assert event.agent_role == "test_agent_role"
assert event.tool_name == "Test Tool"
assert event.tool_args == {"arg1": "value1"}
assert event.tool_class == "TestTool"
assert event.run_attempts == 1 # Default value from ToolUsage
assert event.delegations == 0
assert event.from_cache is False
assert event.output == "test output result"
assert isinstance(event.started_at, datetime.datetime)
assert isinstance(event.finished_at, datetime.datetime)
assert event.type == "tool_usage_finished"
def test_tool_usage_finished_event_with_cached_result():
"""Test that ToolUsageFinishedEvent is emitted with correct result attributes when using cached result."""
# Create mock agent with proper string values
mock_agent = MagicMock()
mock_agent.key = "test_agent_key"
mock_agent.role = "test_agent_role"
mock_agent._original_role = "test_agent_role"
mock_agent.i18n = MagicMock()
mock_agent.verbose = False
# Create mock task
mock_task = MagicMock()
mock_task.delegations = 0
# Create mock tool
class TestTool(BaseTool):
name: str = "Test Tool"
description: str = "A test tool"
def _run(self, input: dict) -> str:
return "test result"
test_tool = TestTool()
# Create mock tool calling
mock_tool_calling = MagicMock()
mock_tool_calling.arguments = {"arg1": "value1"}
# Create ToolUsage instance
tool_usage = ToolUsage(
tools_handler=MagicMock(),
tools=[test_tool],
original_tools=[test_tool],
tools_description="Test Tool Description",
tools_names="Test Tool",
task=mock_task,
function_calling_llm=None,
agent=mock_agent,
action=MagicMock(),
)
# Track received events
received_events = []
@crewai_event_bus.on(ToolUsageFinishedEvent)
def event_handler(source, event):
received_events.append(event)
# Call on_tool_use_finished with test data and from_cache=True
started_at = time.time()
result = "cached test output result"
tool_usage.on_tool_use_finished(
tool=test_tool,
tool_calling=mock_tool_calling,
from_cache=True,
started_at=started_at,
result=result,
)
# Verify event was emitted
assert len(received_events) == 1, "Expected one event to be emitted"
event = received_events[0]
assert isinstance(event, ToolUsageFinishedEvent)
# Verify event attributes
assert event.agent_key == "test_agent_key"
assert event.agent_role == "test_agent_role"
assert event.tool_name == "Test Tool"
assert event.tool_args == {"arg1": "value1"}
assert event.tool_class == "TestTool"
assert event.run_attempts == 1 # Default value from ToolUsage
assert event.delegations == 0
assert event.from_cache is True
assert event.output == "cached test output result"
assert isinstance(event.started_at, datetime.datetime)
assert isinstance(event.finished_at, datetime.datetime)
assert event.type == "tool_usage_finished"

View File

@@ -1,10 +1,10 @@
from unittest.mock import Mock
from crewai.utilities.events.base_events import BaseEvent
from crewai.utilities.events.base_events import CrewEvent
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
class TestEvent(BaseEvent):
class TestEvent(CrewEvent):
pass
@@ -24,7 +24,7 @@ def test_specific_event_handler():
def test_wildcard_event_handler():
mock_handler = Mock()
@crewai_event_bus.on(BaseEvent)
@crewai_event_bus.on(CrewEvent)
def handler(source, event):
mock_handler(source, event)

6
uv.lock generated
View File

@@ -1,5 +1,4 @@
version = 1
revision = 1
requires-python = ">=3.10, <3.13"
resolution-markers = [
"python_full_version < '3.11' and sys_platform == 'darwin'",
@@ -695,7 +694,7 @@ requires-dist = [
{ name = "blinker", specifier = ">=1.9.0" },
{ name = "chromadb", specifier = ">=0.5.23" },
{ name = "click", specifier = ">=8.1.7" },
{ name = "crewai-tools", marker = "extra == 'tools'", specifier = "~=0.38.0" },
{ name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.37.0" },
{ name = "docling", marker = "extra == 'docling'", specifier = ">=2.12.0" },
{ name = "fastembed", marker = "extra == 'fastembed'", specifier = ">=0.4.1" },
{ name = "instructor", specifier = ">=1.3.3" },
@@ -722,7 +721,6 @@ requires-dist = [
{ name = "tomli-w", specifier = ">=1.1.0" },
{ name = "uv", specifier = ">=0.4.25" },
]
provides-extras = ["tools", "embeddings", "agentops", "fastembed", "pdfplumber", "pandas", "openpyxl", "mem0", "docling", "aisuite"]
[package.metadata.requires-dev]
dev = [
@@ -2975,6 +2973,7 @@ name = "nvidia-nccl-cu12"
version = "2.20.5"
source = { registry = "https://pypi.org/simple" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c1/bb/d09dda47c881f9ff504afd6f9ca4f502ded6d8fc2f572cacc5e39da91c28/nvidia_nccl_cu12-2.20.5-py3-none-manylinux2014_aarch64.whl", hash = "sha256:1fc150d5c3250b170b29410ba682384b14581db722b2531b0d8d33c595f33d01", size = 176238458 },
{ url = "https://files.pythonhosted.org/packages/4b/2a/0a131f572aa09f741c30ccd45a8e56316e8be8dfc7bc19bf0ab7cfef7b19/nvidia_nccl_cu12-2.20.5-py3-none-manylinux2014_x86_64.whl", hash = "sha256:057f6bf9685f75215d0c53bf3ac4a10b3e6578351de307abad9e18a99182af56", size = 176249402 },
]
@@ -2984,6 +2983,7 @@ version = "12.6.85"
source = { registry = "https://pypi.org/simple" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9d/d7/c5383e47c7e9bf1c99d5bd2a8c935af2b6d705ad831a7ec5c97db4d82f4f/nvidia_nvjitlink_cu12-12.6.85-py3-none-manylinux2010_x86_64.manylinux_2_12_x86_64.whl", hash = "sha256:eedc36df9e88b682efe4309aa16b5b4e78c2407eac59e8c10a6a47535164369a", size = 19744971 },
{ url = "https://files.pythonhosted.org/packages/31/db/dc71113d441f208cdfe7ae10d4983884e13f464a6252450693365e166dcf/nvidia_nvjitlink_cu12-12.6.85-py3-none-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cf4eaa7d4b6b543ffd69d6abfb11efdeb2db48270d94dfd3a452c24150829e41", size = 19270338 },
]
[[package]]