mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
* WIP crew events emitter * Refactor event handling and introduce new event types - Migrate from global `emit` function to `event_bus.emit` - Add new event types for task failures, tool usage, and agent execution - Update event listeners and event bus to support more granular event tracking - Remove deprecated event emission methods - Improve event type consistency and add more detailed event information * Add event emission for agent execution lifecycle - Emit AgentExecutionStarted and AgentExecutionError events - Update CrewAgentExecutor to use event_bus for tracking agent execution - Refactor error handling to include event emission - Minor code formatting improvements in task.py and crew_agent_executor.py - Fix a typo in test file * Refactor event system and add third-party event listeners - Move event_bus import to correct module paths - Introduce BaseEventListener abstract base class - Add AgentOpsListener for third-party event tracking - Update event listener initialization and setup - Clean up event-related imports and exports * Enhance event system type safety and error handling - Improve type annotations for event bus and event types - Add null checks for agent and task in event emissions - Update import paths for base tool and base agent - Refactor event listener type hints - Remove unnecessary print statements - Update test configurations to match new event handling * Refactor event classes to improve type safety and naming consistency - Rename event classes to have explicit 'Event' suffix (e.g., TaskStartedEvent) - Update import statements and references across multiple files - Remove deprecated events.py module - Enhance event type hints and configurations - Clean up unnecessary event-related code * Add default model for CrewEvaluator and fix event import order - Set default model to "gpt-4o-mini" in CrewEvaluator when no model is specified - Reorder event-related imports in task.py to follow standard import conventions - Update event bus initialization method return type hint - Export event_bus in events/__init__.py * Fix tool usage and event import handling - Update tool usage to use `.get()` method when checking tool name - Remove unnecessary `__all__` export list in events/__init__.py * Refactor Flow and Agent event handling to use event_bus - Remove `event_emitter` from Flow class and replace with `event_bus.emit()` - Update Flow and Agent tests to use event_bus event listeners - Remove redundant event emissions in Flow methods - Add debug print statements in Flow execution - Simplify event tracking in test cases * Enhance event handling for Crew, Task, and Event classes - Add crew name to failed event types (CrewKickoffFailedEvent, CrewTrainFailedEvent, CrewTestFailedEvent) - Update Task events to remove redundant task and context attributes - Refactor EventListener to use Logger for consistent event logging - Add new event types for Crew train and test events - Improve event bus event tracking in test cases * Remove telemetry and tracing dependencies from Task and Flow classes - Remove telemetry-related imports and private attributes from Task class - Remove `_telemetry` attribute from Flow class - Update event handling to emit events without direct telemetry tracking - Simplify task and flow execution by removing explicit telemetry spans - Move telemetry-related event handling to EventListener * Clean up unused imports and event-related code - Remove unused imports from various event and flow-related files - Reorder event imports to follow standard conventions - Remove unnecessary event type references - Simplify import statements in event and flow modules * Update crew test to validate verbose output and kickoff_for_each method - Enhance test_crew_verbose_output to check specific listener log messages - Modify test_kickoff_for_each_invalid_input to use Pydantic validation error - Improve test coverage for crew logging and input validation * Update crew test verbose output with improved emoji icons - Replace task and agent completion icons from 👍 to ✅ - Enhance readability of test output logging - Maintain consistent test coverage for crew verbose output * Add MethodExecutionFailedEvent to handle flow method execution failures - Introduce new MethodExecutionFailedEvent in flow_events module - Update Flow class to catch and emit method execution failures - Add event listener for method execution failure events - Update event-related imports to include new event type - Enhance test coverage for method execution failure handling * Propagate method execution failures in Flow class - Modify Flow class to re-raise exceptions after emitting MethodExecutionFailedEvent - Reorder MethodExecutionFailedEvent import to maintain consistent import style * Enable test coverage for Flow method execution failure event - Uncomment pytest.raises() in test_events to verify exception handling - Ensure test validates MethodExecutionFailedEvent emission during flow kickoff * Add event handling for tool usage events - Introduce event listeners for ToolUsageFinishedEvent and ToolUsageErrorEvent - Log tool usage events with descriptive emoji icons (✅ and ❌) - Update event_listener to track and log tool usage lifecycle * Reorder and clean up event imports in event_listener - Reorganize imports for tool usage events and other event types - Maintain consistent import ordering and remove unused imports - Ensure clean and organized import structure in event_listener module * moving to dedicated eventlistener * dont forget crew level * Refactor AgentOps event listener for crew-level tracking - Modify AgentOpsListener to handle crew-level events - Initialize and end AgentOps session at crew kickoff and completion - Create agents for each crew member during session initialization - Improve session management and event recording - Clean up and simplify event handling logic * Update test_events to validate tool usage error event handling - Modify test to assert single error event with correct attributes - Use pytest.raises() to verify error event generation - Simplify error event validation in test case * Improve AgentOps listener type hints and formatting - Add string type hints for AgentOps classes to resolve potential import issues - Clean up unnecessary whitespace and improve code indentation - Simplify initialization and event handling logic * Update test_events to validate multiple tool usage events - Modify test to assert 75 events instead of a single error event - Remove pytest.raises() check, allowing crew kickoff to complete - Adjust event validation to support broader event tracking * Rename event_bus to crewai_event_bus for improved clarity and specificity - Replace all references to `event_bus` with `crewai_event_bus` - Update import statements across multiple files - Remove the old `event_bus.py` file - Maintain existing event handling functionality * Enhance EventListener with singleton pattern and color configuration - Implement singleton pattern for EventListener to ensure single instance - Add default color configuration using EMITTER_COLOR from constants - Modify log method calls to use default color and remove redundant color parameters - Improve initialization logic to prevent multiple initializations * Add FlowPlotEvent and update event bus to support flow plotting - Introduce FlowPlotEvent to track flow plotting events - Replace Telemetry method with event bus emission in Flow.plot() - Update event bus to support new FlowPlotEvent type - Add test case to validate flow plotting event emission * Remove RunType enum and clean up crew events module - Delete unused RunType enum from crew_events.py - Simplify crew_events.py by removing unnecessary enum definition - Improve code clarity by removing unneeded imports * Enhance event handling for tool usage and agent execution - Add new events for tool usage: ToolSelectionErrorEvent, ToolValidateInputErrorEvent - Improve error tracking and event emission in ToolUsage and LLM classes - Update AgentExecutionStartedEvent to use task_prompt instead of inputs - Add comprehensive test coverage for new event types and error scenarios * Refactor event system and improve crew testing - Extract base CrewEvent class to a new base_events.py module - Update event imports across multiple event-related files - Modify CrewTestStartedEvent to use eval_llm instead of openai_model_name - Add LLM creation validation in crew testing method - Improve type handling and event consistency * Refactor task events to use base CrewEvent - Move CrewEvent import from crew_events to base_events - Remove unnecessary blank lines in task_events.py - Simplify event class structure for task-related events * Update AgentExecutionStartedEvent to use task_prompt - Modify test_events.py to use task_prompt instead of inputs - Simplify event input validation in test case - Align with recent event system refactoring * Improve type hinting for TaskCompletedEvent handler - Add explicit type annotation for TaskCompletedEvent in event_listener.py - Enhance type safety for event handling in EventListener * Improve test_validate_tool_input_invalid_input with mock objects - Add explicit mock objects for agent and action in test case - Ensure proper string values for mock agent and action attributes - Simplify test setup for ToolUsage validation method * Remove ToolUsageStartedEvent emission in tool usage process - Remove unnecessary event emission for tool usage start - Simplify tool usage event handling - Eliminate redundant event data preparation step * refactor: clean up and organize imports in llm and flow modules * test: Improve flow persistence test cases and logging
354 lines
14 KiB
Python
354 lines
14 KiB
Python
import uuid
|
|
from abc import ABC, abstractmethod
|
|
from copy import copy as shallow_copy
|
|
from hashlib import md5
|
|
from typing import Any, Dict, List, Optional, TypeVar
|
|
|
|
from pydantic import (
|
|
UUID4,
|
|
BaseModel,
|
|
Field,
|
|
InstanceOf,
|
|
PrivateAttr,
|
|
field_validator,
|
|
model_validator,
|
|
)
|
|
from pydantic_core import PydanticCustomError
|
|
|
|
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
|
|
from crewai.agents.cache.cache_handler import CacheHandler
|
|
from crewai.agents.tools_handler import ToolsHandler
|
|
from crewai.knowledge.knowledge import Knowledge
|
|
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
|
from crewai.tools.base_tool import BaseTool, Tool
|
|
from crewai.utilities import I18N, Logger, RPMController
|
|
from crewai.utilities.config import process_config
|
|
from crewai.utilities.converter import Converter
|
|
|
|
T = TypeVar("T", bound="BaseAgent")
|
|
|
|
|
|
class BaseAgent(ABC, BaseModel):
|
|
"""Abstract Base Class for all third party agents compatible with CrewAI.
|
|
|
|
Attributes:
|
|
id (UUID4): Unique identifier for the agent.
|
|
role (str): Role of the agent.
|
|
goal (str): Objective of the agent.
|
|
backstory (str): Backstory of the agent.
|
|
cache (bool): Whether the agent should use a cache for tool usage.
|
|
config (Optional[Dict[str, Any]]): Configuration for the agent.
|
|
verbose (bool): Verbose mode for the Agent Execution.
|
|
max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution.
|
|
allow_delegation (bool): Allow delegation of tasks to agents.
|
|
tools (Optional[List[Any]]): Tools at the agent's disposal.
|
|
max_iter (int): Maximum iterations for an agent to execute a task.
|
|
agent_executor (InstanceOf): An instance of the CrewAgentExecutor class.
|
|
llm (Any): Language model that will run the agent.
|
|
crew (Any): Crew to which the agent belongs.
|
|
i18n (I18N): Internationalization settings.
|
|
cache_handler (InstanceOf[CacheHandler]): An instance of the CacheHandler class.
|
|
tools_handler (InstanceOf[ToolsHandler]): An instance of the ToolsHandler class.
|
|
max_tokens: Maximum number of tokens for the agent to generate in a response.
|
|
knowledge_sources: Knowledge sources for the agent.
|
|
knowledge_storage: Custom knowledge storage for the agent.
|
|
|
|
|
|
Methods:
|
|
execute_task(task: Any, context: Optional[str] = None, tools: Optional[List[BaseTool]] = None) -> str:
|
|
Abstract method to execute a task.
|
|
create_agent_executor(tools=None) -> None:
|
|
Abstract method to create an agent executor.
|
|
_parse_tools(tools: List[BaseTool]) -> List[Any]:
|
|
Abstract method to parse tools.
|
|
get_delegation_tools(agents: List["BaseAgent"]):
|
|
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
|
|
get_output_converter(llm, model, instructions):
|
|
Abstract method to get the converter class for the agent to create json/pydantic outputs.
|
|
interpolate_inputs(inputs: Dict[str, Any]) -> None:
|
|
Interpolate inputs into the agent description and backstory.
|
|
set_cache_handler(cache_handler: CacheHandler) -> None:
|
|
Set the cache handler for the agent.
|
|
increment_formatting_errors() -> None:
|
|
Increment formatting errors.
|
|
copy() -> "BaseAgent":
|
|
Create a copy of the agent.
|
|
set_rpm_controller(rpm_controller: RPMController) -> None:
|
|
Set the rpm controller for the agent.
|
|
set_private_attrs() -> "BaseAgent":
|
|
Set private attributes.
|
|
"""
|
|
|
|
__hash__ = object.__hash__ # type: ignore
|
|
_logger: Logger = PrivateAttr(default_factory=lambda: Logger(verbose=False))
|
|
_rpm_controller: Optional[RPMController] = PrivateAttr(default=None)
|
|
_request_within_rpm_limit: Any = PrivateAttr(default=None)
|
|
_original_role: Optional[str] = PrivateAttr(default=None)
|
|
_original_goal: Optional[str] = PrivateAttr(default=None)
|
|
_original_backstory: Optional[str] = PrivateAttr(default=None)
|
|
_token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess)
|
|
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
|
|
formatting_errors: int = Field(
|
|
default=0, description="Number of formatting errors."
|
|
)
|
|
role: str = Field(description="Role of the agent")
|
|
goal: str = Field(description="Objective of the agent")
|
|
backstory: str = Field(description="Backstory of the agent")
|
|
config: Optional[Dict[str, Any]] = Field(
|
|
description="Configuration for the agent", default=None, exclude=True
|
|
)
|
|
cache: bool = Field(
|
|
default=True, description="Whether the agent should use a cache for tool usage."
|
|
)
|
|
verbose: bool = Field(
|
|
default=False, description="Verbose mode for the Agent Execution"
|
|
)
|
|
max_rpm: Optional[int] = Field(
|
|
default=None,
|
|
description="Maximum number of requests per minute for the agent execution to be respected.",
|
|
)
|
|
allow_delegation: bool = Field(
|
|
default=False,
|
|
description="Enable agent to delegate and ask questions among each other.",
|
|
)
|
|
tools: Optional[List[BaseTool]] = Field(
|
|
default_factory=list, description="Tools at agents' disposal"
|
|
)
|
|
max_iter: int = Field(
|
|
default=25, description="Maximum iterations for an agent to execute a task"
|
|
)
|
|
agent_executor: InstanceOf = Field(
|
|
default=None, description="An instance of the CrewAgentExecutor class."
|
|
)
|
|
llm: Any = Field(
|
|
default=None, description="Language model that will run the agent."
|
|
)
|
|
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
|
|
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
|
|
cache_handler: Optional[InstanceOf[CacheHandler]] = Field(
|
|
default=None, description="An instance of the CacheHandler class."
|
|
)
|
|
tools_handler: InstanceOf[ToolsHandler] = Field(
|
|
default_factory=ToolsHandler,
|
|
description="An instance of the ToolsHandler class.",
|
|
)
|
|
max_tokens: Optional[int] = Field(
|
|
default=None, description="Maximum number of tokens for the agent's execution."
|
|
)
|
|
knowledge: Optional[Knowledge] = Field(
|
|
default=None, description="Knowledge for the agent."
|
|
)
|
|
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
|
|
default=None,
|
|
description="Knowledge sources for the agent.",
|
|
)
|
|
knowledge_storage: Optional[Any] = Field(
|
|
default=None,
|
|
description="Custom knowledge storage for the agent.",
|
|
)
|
|
|
|
@model_validator(mode="before")
|
|
@classmethod
|
|
def process_model_config(cls, values):
|
|
return process_config(values, cls)
|
|
|
|
@field_validator("tools")
|
|
@classmethod
|
|
def validate_tools(cls, tools: List[Any]) -> List[BaseTool]:
|
|
"""Validate and process the tools provided to the agent.
|
|
|
|
This method ensures that each tool is either an instance of BaseTool
|
|
or an object with 'name', 'func', and 'description' attributes. If the
|
|
tool meets these criteria, it is processed and added to the list of
|
|
tools. Otherwise, a ValueError is raised.
|
|
"""
|
|
processed_tools = []
|
|
for tool in tools:
|
|
if isinstance(tool, BaseTool):
|
|
processed_tools.append(tool)
|
|
elif (
|
|
hasattr(tool, "name")
|
|
and hasattr(tool, "func")
|
|
and hasattr(tool, "description")
|
|
):
|
|
# Tool has the required attributes, create a Tool instance
|
|
processed_tools.append(Tool.from_langchain(tool))
|
|
else:
|
|
raise ValueError(
|
|
f"Invalid tool type: {type(tool)}. "
|
|
"Tool must be an instance of BaseTool or "
|
|
"an object with 'name', 'func', and 'description' attributes."
|
|
)
|
|
return processed_tools
|
|
|
|
@model_validator(mode="after")
|
|
def validate_and_set_attributes(self):
|
|
# Validate required fields
|
|
for field in ["role", "goal", "backstory"]:
|
|
if getattr(self, field) is None:
|
|
raise ValueError(
|
|
f"{field} must be provided either directly or through config"
|
|
)
|
|
|
|
# Set private attributes
|
|
self._logger = Logger(verbose=self.verbose)
|
|
if self.max_rpm and not self._rpm_controller:
|
|
self._rpm_controller = RPMController(
|
|
max_rpm=self.max_rpm, logger=self._logger
|
|
)
|
|
if not self._token_process:
|
|
self._token_process = TokenProcess()
|
|
|
|
return self
|
|
|
|
@field_validator("id", mode="before")
|
|
@classmethod
|
|
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
|
|
if v:
|
|
raise PydanticCustomError(
|
|
"may_not_set_field", "This field is not to be set by the user.", {}
|
|
)
|
|
|
|
@model_validator(mode="after")
|
|
def set_private_attrs(self):
|
|
"""Set private attributes."""
|
|
self._logger = Logger(verbose=self.verbose)
|
|
if self.max_rpm and not self._rpm_controller:
|
|
self._rpm_controller = RPMController(
|
|
max_rpm=self.max_rpm, logger=self._logger
|
|
)
|
|
if not self._token_process:
|
|
self._token_process = TokenProcess()
|
|
return self
|
|
|
|
@property
|
|
def key(self):
|
|
source = [
|
|
self._original_role or self.role,
|
|
self._original_goal or self.goal,
|
|
self._original_backstory or self.backstory,
|
|
]
|
|
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
|
|
|
|
@abstractmethod
|
|
def execute_task(
|
|
self,
|
|
task: Any,
|
|
context: Optional[str] = None,
|
|
tools: Optional[List[BaseTool]] = None,
|
|
) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def create_agent_executor(self, tools=None) -> None:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _parse_tools(self, tools: List[BaseTool]) -> List[BaseTool]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
|
|
"""Set the task tools that init BaseAgenTools class."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_output_converter(
|
|
self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str
|
|
) -> Converter:
|
|
"""Get the converter class for the agent to create json/pydantic outputs."""
|
|
pass
|
|
|
|
def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel"
|
|
"""Create a deep copy of the Agent."""
|
|
exclude = {
|
|
"id",
|
|
"_logger",
|
|
"_rpm_controller",
|
|
"_request_within_rpm_limit",
|
|
"_token_process",
|
|
"agent_executor",
|
|
"tools",
|
|
"tools_handler",
|
|
"cache_handler",
|
|
"llm",
|
|
"knowledge_sources",
|
|
"knowledge_storage",
|
|
"knowledge",
|
|
}
|
|
|
|
# Copy llm
|
|
existing_llm = shallow_copy(self.llm)
|
|
copied_knowledge = shallow_copy(self.knowledge)
|
|
copied_knowledge_storage = shallow_copy(self.knowledge_storage)
|
|
# Properly copy knowledge sources if they exist
|
|
existing_knowledge_sources = None
|
|
if self.knowledge_sources:
|
|
# Create a shared storage instance for all knowledge sources
|
|
shared_storage = (
|
|
self.knowledge_sources[0].storage if self.knowledge_sources else None
|
|
)
|
|
|
|
existing_knowledge_sources = []
|
|
for source in self.knowledge_sources:
|
|
copied_source = (
|
|
source.model_copy()
|
|
if hasattr(source, "model_copy")
|
|
else shallow_copy(source)
|
|
)
|
|
# Ensure all copied sources use the same storage instance
|
|
copied_source.storage = shared_storage
|
|
existing_knowledge_sources.append(copied_source)
|
|
|
|
copied_data = self.model_dump(exclude=exclude)
|
|
copied_data = {k: v for k, v in copied_data.items() if v is not None}
|
|
copied_agent = type(self)(
|
|
**copied_data,
|
|
llm=existing_llm,
|
|
tools=self.tools,
|
|
knowledge_sources=existing_knowledge_sources,
|
|
knowledge=copied_knowledge,
|
|
knowledge_storage=copied_knowledge_storage,
|
|
)
|
|
|
|
return copied_agent
|
|
|
|
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
|
|
"""Interpolate inputs into the agent description and backstory."""
|
|
if self._original_role is None:
|
|
self._original_role = self.role
|
|
if self._original_goal is None:
|
|
self._original_goal = self.goal
|
|
if self._original_backstory is None:
|
|
self._original_backstory = self.backstory
|
|
|
|
if inputs:
|
|
self.role = self._original_role.format(**inputs)
|
|
self.goal = self._original_goal.format(**inputs)
|
|
self.backstory = self._original_backstory.format(**inputs)
|
|
|
|
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
|
|
"""Set the cache handler for the agent.
|
|
|
|
Args:
|
|
cache_handler: An instance of the CacheHandler class.
|
|
"""
|
|
self.tools_handler = ToolsHandler()
|
|
if self.cache:
|
|
self.cache_handler = cache_handler
|
|
self.tools_handler.cache = cache_handler
|
|
self.create_agent_executor()
|
|
|
|
def increment_formatting_errors(self) -> None:
|
|
self.formatting_errors += 1
|
|
|
|
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
|
|
"""Set the rpm controller for the agent.
|
|
|
|
Args:
|
|
rpm_controller: An instance of the RPMController class.
|
|
"""
|
|
if not self._rpm_controller:
|
|
self._rpm_controller = rpm_controller
|
|
self.create_agent_executor()
|