diff --git a/src/crewai/agent.py b/src/crewai/agent.py index e137a1e41..b1f806973 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -113,10 +113,11 @@ class Agent(BaseAgent): description="Maximum number of retries for an agent to execute a task when an error occurs.", ) - def __init__(__pydantic_self__, **data): - config = data.pop("config", {}) - super().__init__(**config, **data) - __pydantic_self__.agent_ops_agent_name = __pydantic_self__.role + @model_validator(mode="after") + def set_agent_ops_agent_name(self) -> "Agent": + """Set agent ops agent name.""" + self.agent_ops_agent_name = self.role + return self @model_validator(mode="after") def set_agent_executor(self) -> "Agent": @@ -213,7 +214,7 @@ class Agent(BaseAgent): raise e result = self.execute_task(task, context, tools) - if self.max_rpm: + if self.max_rpm and self._rpm_controller: self._rpm_controller.stop_rpm_counter() # If there was any tool in self.tools_results that had result_as_answer diff --git a/src/crewai/agents/agent_builder/base_agent.py b/src/crewai/agents/agent_builder/base_agent.py index 22790827a..0c0ebcef5 100644 --- a/src/crewai/agents/agent_builder/base_agent.py +++ b/src/crewai/agents/agent_builder/base_agent.py @@ -7,7 +7,6 @@ from typing import Any, Dict, List, Optional, TypeVar from pydantic import ( UUID4, BaseModel, - ConfigDict, Field, InstanceOf, PrivateAttr, @@ -74,12 +73,17 @@ class BaseAgent(ABC, BaseModel): """ __hash__ = object.__hash__ # type: ignore - _logger: Logger = PrivateAttr() - _rpm_controller: RPMController = PrivateAttr(default=None) + _logger: Logger = PrivateAttr(default_factory=lambda: Logger(verbose=False)) + _rpm_controller: Optional[RPMController] = PrivateAttr(default=None) _request_within_rpm_limit: Any = PrivateAttr(default=None) - formatting_errors: int = 0 - model_config = ConfigDict(arbitrary_types_allowed=True) + _original_role: Optional[str] = PrivateAttr(default=None) + _original_goal: Optional[str] = PrivateAttr(default=None) + _original_backstory: Optional[str] = PrivateAttr(default=None) + _token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess) id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True) + formatting_errors: int = Field( + default=0, description="Number of formatting errors." + ) role: str = Field(description="Role of the agent") goal: str = Field(description="Objective of the agent") backstory: str = Field(description="Backstory of the agent") @@ -123,15 +127,6 @@ class BaseAgent(ABC, BaseModel): default=None, description="Maximum number of tokens for the agent's execution." ) - _original_role: str | None = None - _original_goal: str | None = None - _original_backstory: str | None = None - _token_process: TokenProcess = TokenProcess() - - def __init__(__pydantic_self__, **data): - config = data.pop("config", {}) - super().__init__(**config, **data) - @model_validator(mode="after") def set_config_attributes(self): if self.config: diff --git a/src/crewai/agents/cache/cache_handler.py b/src/crewai/agents/cache/cache_handler.py index 27dccbcb5..09dd76f26 100644 --- a/src/crewai/agents/cache/cache_handler.py +++ b/src/crewai/agents/cache/cache_handler.py @@ -1,13 +1,12 @@ -from typing import Optional +from typing import Any, Dict, Optional + +from pydantic import BaseModel, PrivateAttr -class CacheHandler: +class CacheHandler(BaseModel): """Callback handler for tool usage.""" - _cache: dict = {} - - def __init__(self): - self._cache = {} + _cache: Dict[str, Any] = PrivateAttr(default_factory=dict) def add(self, tool, input, output): self._cache[f"{tool}-{input}"] = output diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 8125f0255..87f3f29f6 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -10,7 +10,6 @@ from langchain_core.callbacks import BaseCallbackHandler from pydantic import ( UUID4, BaseModel, - ConfigDict, Field, InstanceOf, Json, @@ -105,7 +104,6 @@ class Crew(BaseModel): name: Optional[str] = Field(default=None) cache: bool = Field(default=True) - model_config = ConfigDict(arbitrary_types_allowed=True) tasks: List[Task] = Field(default_factory=list) agents: List[BaseAgent] = Field(default_factory=list) process: Process = Field(default=Process.sequential) diff --git a/src/crewai/project/crew_base.py b/src/crewai/project/crew_base.py index 90db88b7c..5e0f154ea 100644 --- a/src/crewai/project/crew_base.py +++ b/src/crewai/project/crew_base.py @@ -4,14 +4,12 @@ from typing import Any, Callable, Dict import yaml from dotenv import load_dotenv -from pydantic import ConfigDict load_dotenv() def CrewBase(cls): class WrappedClass(cls): - model_config = ConfigDict(arbitrary_types_allowed=True) is_crew_class: bool = True # type: ignore # Get the directory of the class being decorated diff --git a/src/crewai/project/pipeline_base.py b/src/crewai/project/pipeline_base.py index 3f2340301..268cb3901 100644 --- a/src/crewai/project/pipeline_base.py +++ b/src/crewai/project/pipeline_base.py @@ -1,24 +1,24 @@ -from typing import Callable, Dict - -from pydantic import ConfigDict +from typing import Any, Callable, Dict, List, Type, Union from crewai.crew import Crew from crewai.pipeline.pipeline import Pipeline from crewai.routers.router import Router +PipelineStage = Union[Crew, List[Crew], Router] + # TODO: Could potentially remove. Need to check with @joao and @gui if this is needed for CrewAI+ -def PipelineBase(cls): +def PipelineBase(cls: Type[Any]) -> Type[Any]: class WrappedClass(cls): - model_config = ConfigDict(arbitrary_types_allowed=True) is_pipeline_class: bool = True # type: ignore + stages: List[PipelineStage] - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.stages = [] self._map_pipeline_components() - def _get_all_functions(self): + def _get_all_functions(self) -> Dict[str, Callable[..., Any]]: return { name: getattr(self, name) for name in dir(self) @@ -26,15 +26,15 @@ def PipelineBase(cls): } def _filter_functions( - self, functions: Dict[str, Callable], attribute: str - ) -> Dict[str, Callable]: + self, functions: Dict[str, Callable[..., Any]], attribute: str + ) -> Dict[str, Callable[..., Any]]: return { name: func for name, func in functions.items() if hasattr(func, attribute) } - def _map_pipeline_components(self): + def _map_pipeline_components(self) -> None: all_functions = self._get_all_functions() crew_functions = self._filter_functions(all_functions, "is_crew") router_functions = self._filter_functions(all_functions, "is_router") diff --git a/src/crewai/routers/router.py b/src/crewai/routers/router.py index 76549565b..e85ce2256 100644 --- a/src/crewai/routers/router.py +++ b/src/crewai/routers/router.py @@ -1,32 +1,26 @@ from copy import deepcopy -from typing import Any, Callable, Dict, Generic, Tuple, TypeVar +from typing import Any, Callable, Dict, Tuple from pydantic import BaseModel, Field, PrivateAttr -T = TypeVar("T", bound=Dict[str, Any]) -U = TypeVar("U") + +class Route(BaseModel): + condition: Callable[[Dict[str, Any]], bool] + pipeline: Any -class Route(Generic[T, U]): - condition: Callable[[T], bool] - pipeline: U - - def __init__(self, condition: Callable[[T], bool], pipeline: U): - self.condition = condition - self.pipeline = pipeline - - -class Router(BaseModel, Generic[T, U]): - routes: Dict[str, Route[T, U]] = Field( +class Router(BaseModel): + routes: Dict[str, Route] = Field( default_factory=dict, description="Dictionary of route names to (condition, pipeline) tuples", ) - default: U = Field(..., description="Default pipeline if no conditions are met") + default: Any = Field(..., description="Default pipeline if no conditions are met") _route_types: Dict[str, type] = PrivateAttr(default_factory=dict) - model_config = {"arbitrary_types_allowed": True} + class Config: + arbitrary_types_allowed = True - def __init__(self, routes: Dict[str, Route[T, U]], default: U, **data): + def __init__(self, routes: Dict[str, Route], default: Any, **data): super().__init__(routes=routes, default=default, **data) self._check_copyable(default) for name, route in routes.items(): @@ -34,16 +28,16 @@ class Router(BaseModel, Generic[T, U]): self._route_types[name] = type(route.pipeline) @staticmethod - def _check_copyable(obj): + def _check_copyable(obj: Any) -> None: if not hasattr(obj, "copy") or not callable(getattr(obj, "copy")): raise ValueError(f"Object of type {type(obj)} must have a 'copy' method") def add_route( self, name: str, - condition: Callable[[T], bool], - pipeline: U, - ) -> "Router[T, U]": + condition: Callable[[Dict[str, Any]], bool], + pipeline: Any, + ) -> "Router": """ Add a named route with its condition and corresponding pipeline to the router. @@ -60,7 +54,7 @@ class Router(BaseModel, Generic[T, U]): self._route_types[name] = type(pipeline) return self - def route(self, input_data: T) -> Tuple[U, str]: + def route(self, input_data: Dict[str, Any]) -> Tuple[Any, str]: """ Evaluate the input against the conditions and return the appropriate pipeline. @@ -76,15 +70,15 @@ class Router(BaseModel, Generic[T, U]): return self.default, "default" - def copy(self) -> "Router[T, U]": + def copy(self) -> "Router": """Create a deep copy of the Router.""" new_routes = { name: Route( condition=deepcopy(route.condition), - pipeline=route.pipeline.copy(), # type: ignore + pipeline=route.pipeline.copy(), ) for name, route in self.routes.items() } - new_default = self.default.copy() # type: ignore + new_default = self.default.copy() return Router(routes=new_routes, default=new_default) diff --git a/src/crewai/task.py b/src/crewai/task.py index 3c8987b64..d00e2cc49 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -9,7 +9,14 @@ from hashlib import md5 from typing import Any, Dict, List, Optional, Tuple, Type, Union from opentelemetry.trace import Span -from pydantic import UUID4, BaseModel, Field, field_validator, model_validator +from pydantic import ( + UUID4, + BaseModel, + Field, + PrivateAttr, + field_validator, + model_validator, +) from pydantic_core import PydanticCustomError from crewai.agents.agent_builder.base_agent import BaseAgent @@ -39,9 +46,6 @@ class Task(BaseModel): tools: List of tools/resources limited for task execution. """ - class Config: - arbitrary_types_allowed = True - __hash__ = object.__hash__ # type: ignore used_tools: int = 0 tools_errors: int = 0 @@ -104,16 +108,12 @@ class Task(BaseModel): default=None, ) - _telemetry: Telemetry - _execution_span: Span | None = None - _original_description: str | None = None - _original_expected_output: str | None = None - _thread: threading.Thread | None = None - _execution_time: float | None = None - - def __init__(__pydantic_self__, **data): - config = data.pop("config", {}) - super().__init__(**config, **data) + _telemetry: Telemetry = PrivateAttr(default_factory=Telemetry) + _execution_span: Optional[Span] = PrivateAttr(default=None) + _original_description: Optional[str] = PrivateAttr(default=None) + _original_expected_output: Optional[str] = PrivateAttr(default=None) + _thread: Optional[threading.Thread] = PrivateAttr(default=None) + _execution_time: Optional[float] = PrivateAttr(default=None) @field_validator("id", mode="before") @classmethod @@ -137,12 +137,6 @@ class Task(BaseModel): return value[1:] return value - @model_validator(mode="after") - def set_private_attrs(self) -> "Task": - """Set private attributes.""" - self._telemetry = Telemetry() - return self - @model_validator(mode="after") def set_attributes_based_on_config(self) -> "Task": """Set attributes based on the agent configuration.""" @@ -263,9 +257,7 @@ class Task(BaseModel): content = ( json_output if json_output - else pydantic_output.model_dump_json() - if pydantic_output - else result + else pydantic_output.model_dump_json() if pydantic_output else result ) self._save_file(content) diff --git a/src/crewai/tools/cache_tools.py b/src/crewai/tools/cache_tools.py index 77ff8881b..cc42d07f0 100644 --- a/src/crewai/tools/cache_tools.py +++ b/src/crewai/tools/cache_tools.py @@ -1,5 +1,5 @@ from langchain.tools import StructuredTool -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, Field from crewai.agents.cache import CacheHandler @@ -7,11 +7,10 @@ from crewai.agents.cache import CacheHandler class CacheTools(BaseModel): """Default tools to hit the cache.""" - model_config = ConfigDict(arbitrary_types_allowed=True) name: str = "Hit Cache" cache_handler: CacheHandler = Field( description="Cache Handler for the crew", - default=CacheHandler(), + default_factory=CacheHandler, ) def tool(self): diff --git a/src/crewai/utilities/logger.py b/src/crewai/utilities/logger.py index 973c694d2..9cf06757a 100644 --- a/src/crewai/utilities/logger.py +++ b/src/crewai/utilities/logger.py @@ -1,13 +1,13 @@ from datetime import datetime +from pydantic import BaseModel, Field, PrivateAttr + from crewai.utilities.printer import Printer -class Logger: - _printer = Printer() - - def __init__(self, verbose=False): - self.verbose = verbose +class Logger(BaseModel): + verbose: bool = Field(default=False) + _printer: Printer = PrivateAttr(default_factory=Printer) def log(self, level, message, color="bold_green"): if self.verbose: diff --git a/src/crewai/utilities/rpm_controller.py b/src/crewai/utilities/rpm_controller.py index 761760bf8..5ee054c5f 100644 --- a/src/crewai/utilities/rpm_controller.py +++ b/src/crewai/utilities/rpm_controller.py @@ -1,44 +1,50 @@ import threading import time -from typing import Union +from typing import Optional -from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator +from pydantic import BaseModel, Field, PrivateAttr, model_validator from crewai.utilities.logger import Logger class RPMController(BaseModel): - model_config = ConfigDict(arbitrary_types_allowed=True) - max_rpm: Union[int, None] = Field(default=None) - logger: Logger = Field(default=None) + max_rpm: Optional[int] = Field(default=None) + logger: Logger = Field(default_factory=lambda: Logger(verbose=False)) _current_rpm: int = PrivateAttr(default=0) - _timer: threading.Timer | None = PrivateAttr(default=None) - _lock: threading.Lock = PrivateAttr(default=None) - _shutdown_flag = False + _timer: Optional[threading.Timer] = PrivateAttr(default=None) + _lock: Optional[threading.Lock] = PrivateAttr(default=None) + _shutdown_flag: bool = PrivateAttr(default=False) @model_validator(mode="after") def reset_counter(self): - if self.max_rpm: + if self.max_rpm is not None: if not self._shutdown_flag: self._lock = threading.Lock() self._reset_request_count() return self def check_or_wait(self): - if not self.max_rpm: + if self.max_rpm is None: return True - with self._lock: - if self._current_rpm < self.max_rpm: + def _check_and_increment(): + if self.max_rpm is not None and self._current_rpm < self.max_rpm: self._current_rpm += 1 return True - else: + elif self.max_rpm is not None: self.logger.log( "info", "Max RPM reached, waiting for next minute to start." ) self._wait_for_next_minute() self._current_rpm = 1 return True + return True + + if self._lock: + with self._lock: + return _check_and_increment() + else: + return _check_and_increment() def stop_rpm_counter(self): if self._timer: @@ -50,10 +56,18 @@ class RPMController(BaseModel): self._current_rpm = 0 def _reset_request_count(self): - with self._lock: + def _reset(): self._current_rpm = 0 + if not self._shutdown_flag: + self._timer = threading.Timer(60.0, self._reset_request_count) + self._timer.start() + + if self._lock: + with self._lock: + _reset() + else: + _reset() + if self._timer: self._shutdown_flag = True self._timer.cancel() - self._timer = threading.Timer(60.0, self._reset_request_count) - self._timer.start() diff --git a/tests/agent_test.py b/tests/agent_test.py index 95e975f6a..d90e35845 100644 --- a/tests/agent_test.py +++ b/tests/agent_test.py @@ -4,11 +4,6 @@ from unittest import mock from unittest.mock import patch import pytest -from langchain.tools import tool -from langchain_core.exceptions import OutputParserException -from langchain_openai import ChatOpenAI -from langchain.schema import AgentAction - from crewai import Agent, Crew, Task from crewai.agents.cache import CacheHandler from crewai.agents.executor import CrewAgentExecutor @@ -16,6 +11,10 @@ from crewai.agents.parser import CrewAgentParser from crewai.tools.tool_calling import InstructorToolCalling from crewai.tools.tool_usage import ToolUsage from crewai.utilities import RPMController +from langchain.schema import AgentAction +from langchain.tools import tool +from langchain_core.exceptions import OutputParserException +from langchain_openai import ChatOpenAI def test_agent_creation(): @@ -817,7 +816,7 @@ def test_agent_definition_based_on_dict(): "verbose": True, } - agent = Agent(config=config) + agent = Agent(**config) assert agent.role == "test role" assert agent.goal == "test goal" @@ -837,7 +836,7 @@ def test_agent_human_input(): "backstory": "test backstory", } - agent = Agent(config=config) + agent = Agent(**config) task = Task( agent=agent, diff --git a/tests/crew_test.py b/tests/crew_test.py index 0ba450fcc..8c85d58c6 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -8,7 +8,6 @@ from unittest.mock import MagicMock, patch import pydantic_core import pytest - from crewai.agent import Agent from crewai.agents.cache import CacheHandler from crewai.crew import Crew diff --git a/tests/task_test.py b/tests/task_test.py index e4ee23e9f..7bba169b3 100644 --- a/tests/task_test.py +++ b/tests/task_test.py @@ -1,8 +1,8 @@ """Test Agent creation and execution basic functionality.""" -import os import hashlib import json +import os from unittest.mock import MagicMock, patch import pytest @@ -703,7 +703,7 @@ def test_task_definition_based_on_dict(): "expected_output": "The score of the title.", } - task = Task(config=config) + task = Task(**config) assert task.description == config["description"] assert task.expected_output == config["expected_output"] @@ -716,7 +716,7 @@ def test_conditional_task_definition_based_on_dict(): "expected_output": "The score of the title.", } - task = ConditionalTask(config=config, condition=lambda x: True) + task = ConditionalTask(**config, condition=lambda x: True) assert task.description == config["description"] assert task.expected_output == config["expected_output"]