diff --git a/lib/crewai/src/crewai/agents/agent_adapters/openai_agents/structured_output_converter.py b/lib/crewai/src/crewai/agents/agent_adapters/openai_agents/structured_output_converter.py index c476fe6ff..54ed9ddde 100644 --- a/lib/crewai/src/crewai/agents/agent_adapters/openai_agents/structured_output_converter.py +++ b/lib/crewai/src/crewai/agents/agent_adapters/openai_agents/structured_output_converter.py @@ -7,7 +7,7 @@ output conversion for OpenAI agents, supporting JSON and Pydantic model formats. from typing import Any from crewai.agents.agent_adapters.base_converter_adapter import BaseConverterAdapter -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import get_i18n class OpenAIConverterAdapter(BaseConverterAdapter): @@ -59,7 +59,7 @@ class OpenAIConverterAdapter(BaseConverterAdapter): return base_prompt output_schema: str = ( - I18N() + get_i18n() .slice("formatted_task_instructions") .format(output_format=self._schema) ) diff --git a/lib/crewai/src/crewai/agents/agent_builder/base_agent.py b/lib/crewai/src/crewai/agents/agent_builder/base_agent.py index f825bd10e..b26c24515 100644 --- a/lib/crewai/src/crewai/agents/agent_builder/base_agent.py +++ b/lib/crewai/src/crewai/agents/agent_builder/base_agent.py @@ -29,7 +29,7 @@ from crewai.rag.embeddings.types import EmbedderConfig from crewai.security.security_config import SecurityConfig from crewai.tools.base_tool import BaseTool, Tool from crewai.utilities.config import process_config -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import I18N, get_i18n from crewai.utilities.logger import Logger from crewai.utilities.rpm_controller import RPMController from crewai.utilities.string_utils import interpolate_only @@ -107,7 +107,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): Set private attributes. """ - __hash__ = object.__hash__ # type: ignore + __hash__ = object.__hash__ _logger: Logger = PrivateAttr(default_factory=lambda: Logger(verbose=False)) _rpm_controller: RPMController | None = PrivateAttr(default=None) _request_within_rpm_limit: Any = PrivateAttr(default=None) @@ -150,7 +150,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): ) crew: Any = Field(default=None, description="Crew to which the agent belongs.") i18n: I18N = Field( - default_factory=I18N, description="Internationalization settings." + default_factory=get_i18n, description="Internationalization settings." ) cache_handler: CacheHandler | None = Field( default=None, description="An instance of the CacheHandler class." @@ -180,8 +180,8 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): default_factory=SecurityConfig, description="Security configuration for the agent, including fingerprinting.", ) - callbacks: list[Callable] = Field( - default=[], description="Callbacks to be used for the agent" + callbacks: list[Callable[[Any], Any]] = Field( + default_factory=list, description="Callbacks to be used for the agent" ) adapted_agent: bool = Field( default=False, description="Whether the agent is adapted" @@ -201,7 +201,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): @model_validator(mode="before") @classmethod - def process_model_config(cls, values): + def process_model_config(cls, values: Any) -> dict[str, Any]: return process_config(values, cls) @field_validator("tools") @@ -269,7 +269,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): return list(set(validated_mcps)) @model_validator(mode="after") - def validate_and_set_attributes(self): + def validate_and_set_attributes(self) -> Self: # Validate required fields for field in ["role", "goal", "backstory"]: if getattr(self, field) is None: @@ -301,7 +301,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): ) @model_validator(mode="after") - def set_private_attrs(self): + def set_private_attrs(self) -> Self: """Set private attributes.""" self._logger = Logger(verbose=self.verbose) if self.max_rpm and not self._rpm_controller: @@ -313,7 +313,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): return self @property - def key(self): + def key(self) -> str: source = [ self._original_role or self.role, self._original_goal or self.goal, @@ -331,7 +331,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): pass @abstractmethod - def create_agent_executor(self, tools=None) -> None: + def create_agent_executor(self, tools: list[BaseTool] | None = None) -> None: pass @abstractmethod @@ -443,5 +443,5 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): self._rpm_controller = rpm_controller self.create_agent_executor() - def set_knowledge(self, crew_embedder: EmbedderConfig | None = None): + def set_knowledge(self, crew_embedder: EmbedderConfig | None = None) -> None: pass diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index c57c4ddcf..8c1eb2c0e 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -37,7 +37,7 @@ from crewai.utilities.agent_utils import ( process_llm_response, ) from crewai.utilities.constants import TRAINING_DATA_FILE -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import I18N, get_i18n from crewai.utilities.printer import Printer from crewai.utilities.tool_utils import execute_tool_and_check_finality from crewai.utilities.training_handler import CrewTrainingHandler @@ -65,7 +65,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): def __init__( self, - llm: BaseLLM | Any | None, + llm: BaseLLM, task: Task, crew: Crew, agent: Agent, @@ -106,7 +106,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): callbacks: Optional callbacks list. response_model: Optional Pydantic model for structured outputs. """ - self._i18n: I18N = I18N() + self._i18n: I18N = get_i18n() self.llm = llm self.task = task self.agent = agent diff --git a/lib/crewai/src/crewai/agents/parser.py b/lib/crewai/src/crewai/agents/parser.py index d38fe73ae..c338e8360 100644 --- a/lib/crewai/src/crewai/agents/parser.py +++ b/lib/crewai/src/crewai/agents/parser.py @@ -18,10 +18,10 @@ from crewai.agents.constants import ( MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE, UNABLE_TO_REPAIR_JSON_RESULTS, ) -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import get_i18n -_I18N = I18N() +_I18N = get_i18n() @dataclass diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index a8e88ce55..f5af4a426 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -27,6 +27,7 @@ from pydantic import ( model_validator, ) from pydantic_core import PydanticCustomError +from typing_extensions import Self from crewai.agent import Agent from crewai.agents.agent_builder.base_agent import BaseAgent @@ -70,7 +71,7 @@ from crewai.task import Task from crewai.tasks.conditional_task import ConditionalTask from crewai.tasks.task_output import TaskOutput from crewai.tools.agent_tools.agent_tools import AgentTools -from crewai.tools.base_tool import BaseTool, Tool +from crewai.tools.base_tool import BaseTool from crewai.types.usage_metrics import UsageMetrics from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE from crewai.utilities.crew.models import CrewContext @@ -81,7 +82,7 @@ from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, ) -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import get_i18n from crewai.utilities.llm_utils import create_llm from crewai.utilities.logger import Logger from crewai.utilities.planning_handler import CrewPlanner @@ -195,7 +196,7 @@ class Crew(FlowTrackable, BaseModel): function_calling_llm: str | InstanceOf[LLM] | Any | None = Field( description="Language model that will run the agent.", default=None ) - config: Json | dict[str, Any] | None = Field(default=None) + config: Json[dict[str, Any]] | dict[str, Any] | None = Field(default=None) id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True) share_crew: bool | None = Field(default=False) step_callback: Any | None = Field( @@ -294,7 +295,9 @@ class Crew(FlowTrackable, BaseModel): @field_validator("config", mode="before") @classmethod - def check_config_type(cls, v: Json | dict[str, Any]) -> Json | dict[str, Any]: + def check_config_type( + cls, v: Json[dict[str, Any]] | dict[str, Any] + ) -> dict[str, Any]: """Validates that the config is a valid type. Args: v: The config to be validated. @@ -310,7 +313,7 @@ class Crew(FlowTrackable, BaseModel): """set private attributes.""" self._cache_handler = CacheHandler() - event_listener = EventListener() + event_listener = EventListener() # type: ignore[no-untyped-call] if ( is_tracing_enabled() @@ -330,13 +333,13 @@ class Crew(FlowTrackable, BaseModel): return self - def _initialize_default_memories(self): - self._long_term_memory = self._long_term_memory or LongTermMemory() - self._short_term_memory = self._short_term_memory or ShortTermMemory( + def _initialize_default_memories(self) -> None: + self._long_term_memory = self._long_term_memory or LongTermMemory() # type: ignore[no-untyped-call] + self._short_term_memory = self._short_term_memory or ShortTermMemory( # type: ignore[no-untyped-call] crew=self, embedder_config=self.embedder, ) - self._entity_memory = self.entity_memory or EntityMemory( + self._entity_memory = self.entity_memory or EntityMemory( # type: ignore[no-untyped-call] crew=self, embedder_config=self.embedder ) @@ -380,7 +383,7 @@ class Crew(FlowTrackable, BaseModel): return self @model_validator(mode="after") - def check_manager_llm(self): + def check_manager_llm(self) -> Self: """Validates that the language model is set when using hierarchical process.""" if self.process == Process.hierarchical: if not self.manager_llm and not self.manager_agent: @@ -405,7 +408,7 @@ class Crew(FlowTrackable, BaseModel): return self @model_validator(mode="after") - def check_config(self): + def check_config(self) -> Self: """Validates that the crew is properly configured with agents and tasks.""" if not self.config and not self.tasks and not self.agents: raise PydanticCustomError( @@ -426,23 +429,20 @@ class Crew(FlowTrackable, BaseModel): return self @model_validator(mode="after") - def validate_tasks(self): + def validate_tasks(self) -> Self: if self.process == Process.sequential: for task in self.tasks: if task.agent is None: raise PydanticCustomError( "missing_agent_in_task", - ( - f"Sequential process error: Agent is missing in the task " - f"with the following description: {task.description}" - ), # type: ignore # Dynamic string in error message - {}, + "Sequential process error: Agent is missing in the task with the following description: {description}", + {"description": task.description}, ) return self @model_validator(mode="after") - def validate_end_with_at_most_one_async_task(self): + def validate_end_with_at_most_one_async_task(self) -> Self: """Validates that the crew ends with at most one asynchronous task.""" final_async_task_count = 0 @@ -505,7 +505,9 @@ class Crew(FlowTrackable, BaseModel): return self @model_validator(mode="after") - def validate_async_task_cannot_include_sequential_async_tasks_in_context(self): + def validate_async_task_cannot_include_sequential_async_tasks_in_context( + self, + ) -> Self: """ Validates that if a task is set to be executed asynchronously, it cannot include other asynchronous tasks in its context unless @@ -527,7 +529,7 @@ class Crew(FlowTrackable, BaseModel): return self @model_validator(mode="after") - def validate_context_no_future_tasks(self): + def validate_context_no_future_tasks(self) -> Self: """Validates that a task's context does not include future tasks.""" task_indices = {id(task): i for i, task in enumerate(self.tasks)} @@ -561,7 +563,7 @@ class Crew(FlowTrackable, BaseModel): """ return self.security_config.fingerprint - def _setup_from_config(self): + def _setup_from_config(self) -> None: """Initializes agents and tasks from the provided config.""" if self.config is None: raise ValueError("Config should not be None.") @@ -628,12 +630,12 @@ class Crew(FlowTrackable, BaseModel): for agent in train_crew.agents: if training_data.get(str(agent.id)): - result = TaskEvaluator(agent).evaluate_training_data( + result = TaskEvaluator(agent).evaluate_training_data( # type: ignore[arg-type] training_data=training_data, agent_id=str(agent.id) ) CrewTrainingHandler(filename).save_trained_data( agent_id=str(agent.role), - trained_data=result.model_dump(), # type: ignore[arg-type] + trained_data=result.model_dump(), ) crewai_event_bus.emit( @@ -684,12 +686,8 @@ class Crew(FlowTrackable, BaseModel): self._set_tasks_callbacks() self._set_allow_crewai_trigger_context_for_first_task() - i18n = I18N(prompt_file=self.prompt_file) - for agent in self.agents: - agent.i18n = i18n - # type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]" - agent.crew = self # type: ignore[attr-defined] + agent.crew = self agent.set_knowledge(crew_embedder=self.embedder) # TODO: Create an AgentFunctionCalling protocol for future refactoring if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm" @@ -753,10 +751,12 @@ class Crew(FlowTrackable, BaseModel): inputs = inputs or {} return await asyncio.to_thread(self.kickoff, inputs) - async def kickoff_for_each_async(self, inputs: list[dict]) -> list[CrewOutput]: + async def kickoff_for_each_async( + self, inputs: list[dict[str, Any]] + ) -> list[CrewOutput]: crew_copies = [self.copy() for _ in inputs] - async def run_crew(crew, input_data): + async def run_crew(crew: Self, input_data: Any) -> CrewOutput: return await crew.kickoff_async(inputs=input_data) tasks = [ @@ -775,7 +775,7 @@ class Crew(FlowTrackable, BaseModel): self._task_output_handler.reset() return results - def _handle_crew_planning(self): + def _handle_crew_planning(self) -> None: """Handles the Crew planning.""" self._logger.log("info", "Planning the crew execution") result = CrewPlanner( @@ -793,7 +793,7 @@ class Crew(FlowTrackable, BaseModel): output: TaskOutput, task_index: int, was_replayed: bool = False, - ): + ) -> None: if self._inputs: inputs = self._inputs else: @@ -825,19 +825,21 @@ class Crew(FlowTrackable, BaseModel): self._create_manager_agent() return self._execute_tasks(self.tasks) - def _create_manager_agent(self): - i18n = I18N(prompt_file=self.prompt_file) + def _create_manager_agent(self) -> None: if self.manager_agent is not None: self.manager_agent.allow_delegation = True manager = self.manager_agent if manager.tools is not None and len(manager.tools) > 0: self._logger.log( - "warning", "Manager agent should not have tools", color="orange" + "warning", + "Manager agent should not have tools", + color="bold_yellow", ) manager.tools = [] raise Exception("Manager agent should not have tools") else: self.manager_llm = create_llm(self.manager_llm) + i18n = get_i18n(prompt_file=self.prompt_file) manager = Agent( role=i18n.retrieve("hierarchical_manager_agent", "role"), goal=i18n.retrieve("hierarchical_manager_agent", "goal"), @@ -895,7 +897,7 @@ class Crew(FlowTrackable, BaseModel): tools_for_task = self._prepare_tools( agent_to_use, task, - cast(list[Tool] | list[BaseTool], tools_for_task), + tools_for_task, ) self._log_task_start(task, agent_to_use.role) @@ -915,7 +917,7 @@ class Crew(FlowTrackable, BaseModel): future = task.execute_async( agent=agent_to_use, context=context, - tools=cast(list[BaseTool], tools_for_task), + tools=tools_for_task, ) futures.append((task, future, task_index)) else: @@ -927,7 +929,7 @@ class Crew(FlowTrackable, BaseModel): task_output = task.execute_sync( agent=agent_to_use, context=context, - tools=cast(list[BaseTool], tools_for_task), + tools=tools_for_task, ) task_outputs.append(task_output) self._process_task_result(task, task_output) @@ -965,7 +967,7 @@ class Crew(FlowTrackable, BaseModel): return None def _prepare_tools( - self, agent: BaseAgent, task: Task, tools: list[Tool] | list[BaseTool] + self, agent: BaseAgent, task: Task, tools: list[BaseTool] ) -> list[BaseTool]: # Add delegation tools if agent allows delegation if hasattr(agent, "allow_delegation") and getattr( @@ -1002,21 +1004,21 @@ class Crew(FlowTrackable, BaseModel): tools = self._add_mcp_tools(task, tools) # Return a list[BaseTool] compatible with Task.execute_sync and execute_async - return cast(list[BaseTool], tools) + return tools def _get_agent_to_use(self, task: Task) -> BaseAgent | None: if self.process == Process.hierarchical: return self.manager_agent return task.agent + @staticmethod def _merge_tools( - self, - existing_tools: list[Tool] | list[BaseTool], - new_tools: list[Tool] | list[BaseTool], + existing_tools: list[BaseTool], + new_tools: list[BaseTool], ) -> list[BaseTool]: """Merge new tools into existing tools list, avoiding duplicates.""" if not new_tools: - return cast(list[BaseTool], existing_tools) + return existing_tools # Create mapping of tool names to new tools new_tool_map = {tool.name: tool for tool in new_tools} @@ -1027,63 +1029,62 @@ class Crew(FlowTrackable, BaseModel): # Add all new tools tools.extend(new_tools) - return cast(list[BaseTool], tools) + return tools def _inject_delegation_tools( self, - tools: list[Tool] | list[BaseTool], + tools: list[BaseTool], task_agent: BaseAgent, agents: list[BaseAgent], ) -> list[BaseTool]: if hasattr(task_agent, "get_delegation_tools"): delegation_tools = task_agent.get_delegation_tools(agents) # Cast delegation_tools to the expected type for _merge_tools - return self._merge_tools(tools, cast(list[BaseTool], delegation_tools)) - return cast(list[BaseTool], tools) + return self._merge_tools(tools, delegation_tools) + return tools def _inject_platform_tools( self, - tools: list[Tool] | list[BaseTool], + tools: list[BaseTool], task_agent: BaseAgent, ) -> list[BaseTool]: apps = getattr(task_agent, "apps", None) or [] if hasattr(task_agent, "get_platform_tools") and apps: platform_tools = task_agent.get_platform_tools(apps=apps) - return self._merge_tools(tools, cast(list[BaseTool], platform_tools)) - return cast(list[BaseTool], tools) + return self._merge_tools(tools, platform_tools) + return tools def _inject_mcp_tools( self, - tools: list[Tool] | list[BaseTool], + tools: list[BaseTool], task_agent: BaseAgent, ) -> list[BaseTool]: mcps = getattr(task_agent, "mcps", None) or [] if hasattr(task_agent, "get_mcp_tools") and mcps: mcp_tools = task_agent.get_mcp_tools(mcps=mcps) - return self._merge_tools(tools, cast(list[BaseTool], mcp_tools)) - return cast(list[BaseTool], tools) + return self._merge_tools(tools, mcp_tools) + return tools def _add_multimodal_tools( - self, agent: BaseAgent, tools: list[Tool] | list[BaseTool] + self, agent: BaseAgent, tools: list[BaseTool] ) -> list[BaseTool]: if hasattr(agent, "get_multimodal_tools"): multimodal_tools = agent.get_multimodal_tools() - # Cast multimodal_tools to the expected type for _merge_tools return self._merge_tools(tools, cast(list[BaseTool], multimodal_tools)) - return cast(list[BaseTool], tools) + return tools def _add_code_execution_tools( - self, agent: BaseAgent, tools: list[Tool] | list[BaseTool] + self, agent: BaseAgent, tools: list[BaseTool] ) -> list[BaseTool]: if hasattr(agent, "get_code_execution_tools"): code_tools = agent.get_code_execution_tools() # Cast code_tools to the expected type for _merge_tools return self._merge_tools(tools, cast(list[BaseTool], code_tools)) - return cast(list[BaseTool], tools) + return tools def _add_delegation_tools( - self, task: Task, tools: list[Tool] | list[BaseTool] + self, task: Task, tools: list[BaseTool] ) -> list[BaseTool]: agents_for_delegation = [agent for agent in self.agents if agent != task.agent] if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent: @@ -1092,25 +1093,21 @@ class Crew(FlowTrackable, BaseModel): tools = self._inject_delegation_tools( tools, task.agent, agents_for_delegation ) - return cast(list[BaseTool], tools) + return tools - def _add_platform_tools( - self, task: Task, tools: list[Tool] | list[BaseTool] - ) -> list[BaseTool]: + def _add_platform_tools(self, task: Task, tools: list[BaseTool]) -> list[BaseTool]: if task.agent: tools = self._inject_platform_tools(tools, task.agent) - return cast(list[BaseTool], tools or []) + return tools or [] - def _add_mcp_tools( - self, task: Task, tools: list[Tool] | list[BaseTool] - ) -> list[BaseTool]: + def _add_mcp_tools(self, task: Task, tools: list[BaseTool]) -> list[BaseTool]: if task.agent: tools = self._inject_mcp_tools(tools, task.agent) - return cast(list[BaseTool], tools or []) + return tools or [] - def _log_task_start(self, task: Task, role: str = "None"): + def _log_task_start(self, task: Task, role: str = "None") -> None: if self.output_log_file: self._file_handler.log( task_name=task.name, # type: ignore[arg-type] @@ -1120,7 +1117,7 @@ class Crew(FlowTrackable, BaseModel): ) def _update_manager_tools( - self, task: Task, tools: list[Tool] | list[BaseTool] + self, task: Task, tools: list[BaseTool] ) -> list[BaseTool]: if self.manager_agent: if task.agent: @@ -1129,7 +1126,7 @@ class Crew(FlowTrackable, BaseModel): tools = self._inject_delegation_tools( tools, self.manager_agent, self.agents ) - return cast(list[BaseTool], tools) + return tools def _get_context(self, task: Task, task_outputs: list[TaskOutput]) -> str: if not task.context: @@ -1280,7 +1277,7 @@ class Crew(FlowTrackable, BaseModel): return required_inputs - def copy(self): + def copy(self) -> Crew: # type: ignore[override] """ Creates a deep copy of the Crew instance. @@ -1311,7 +1308,7 @@ class Crew(FlowTrackable, BaseModel): manager_agent = self.manager_agent.copy() if self.manager_agent else None manager_llm = shallow_copy(self.manager_llm) if self.manager_llm else None - task_mapping = {} + task_mapping: dict[str, Any] = {} cloned_tasks = [] existing_knowledge_sources = shallow_copy(self.knowledge_sources) @@ -1373,7 +1370,6 @@ class Crew(FlowTrackable, BaseModel): ) for task in self.tasks ] - # type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None) for agent in self.agents: agent.interpolate_inputs(inputs) @@ -1463,7 +1459,7 @@ class Crew(FlowTrackable, BaseModel): ) raise - def __repr__(self): + def __repr__(self) -> str: return ( f"Crew(id={self.id}, process={self.process}, " f"number_of_agents={len(self.agents)}, " @@ -1520,7 +1516,9 @@ class Crew(FlowTrackable, BaseModel): if (system := config.get("system")) is not None: name = config.get("name") try: - reset_fn: Callable = cast(Callable, config.get("reset")) + reset_fn: Callable[[Any], Any] = cast( + Callable[[Any], Any], config.get("reset") + ) reset_fn(system) self._logger.log( "info", @@ -1551,7 +1549,9 @@ class Crew(FlowTrackable, BaseModel): raise RuntimeError(f"{name} memory system is not initialized") try: - reset_fn: Callable = cast(Callable, config.get("reset")) + reset_fn: Callable[[Any], Any] = cast( + Callable[[Any], Any], config.get("reset") + ) reset_fn(system) self._logger.log( "info", @@ -1564,7 +1564,7 @@ class Crew(FlowTrackable, BaseModel): f"Failed to reset {name} memory: {e!s}" ) from e - def _get_memory_systems(self): + def _get_memory_systems(self) -> dict[str, Any]: """Get all available memory systems with their configuration. Returns: @@ -1572,10 +1572,10 @@ class Crew(FlowTrackable, BaseModel): display names. """ - def default_reset(memory): + def default_reset(memory: Any) -> Any: return memory.reset() - def knowledge_reset(memory): + def knowledge_reset(memory: Any) -> Any: return self.reset_knowledge(memory) # Get knowledge for agents @@ -1635,7 +1635,7 @@ class Crew(FlowTrackable, BaseModel): for ks in knowledges: ks.reset() - def _set_allow_crewai_trigger_context_for_first_task(self): + def _set_allow_crewai_trigger_context_for_first_task(self) -> None: crewai_trigger_payload = self._inputs and self._inputs.get( "crewai_trigger_payload" ) diff --git a/lib/crewai/src/crewai/lite_agent.py b/lib/crewai/src/crewai/lite_agent.py index d613681e2..97971d6c3 100644 --- a/lib/crewai/src/crewai/lite_agent.py +++ b/lib/crewai/src/crewai/lite_agent.py @@ -61,7 +61,7 @@ from crewai.utilities.agent_utils import ( from crewai.utilities.converter import generate_model_description from crewai.utilities.guardrail import process_guardrail from crewai.utilities.guardrail_types import GuardrailCallable, GuardrailType -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import I18N, get_i18n from crewai.utilities.llm_utils import create_llm from crewai.utilities.printer import Printer from crewai.utilities.token_counter_callback import TokenCalcHandler @@ -90,8 +90,6 @@ class LiteAgent(FlowTrackable, BaseModel): """ model_config = {"arbitrary_types_allowed": True} - - # Core Agent Properties id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True) role: str = Field(description="Role of the agent") goal: str = Field(description="Goal of the agent") @@ -102,8 +100,6 @@ class LiteAgent(FlowTrackable, BaseModel): tools: list[BaseTool] = Field( default_factory=list, description="Tools at agent's disposal" ) - - # Execution Control Properties max_iterations: int = Field( default=15, description="Maximum number of iterations for tool usage" ) @@ -120,24 +116,17 @@ class LiteAgent(FlowTrackable, BaseModel): ) request_within_rpm_limit: Callable[[], bool] | None = Field( default=None, - description="Callback to check if the request is within the RPM limit", + description="Callback to check if the request is within the RPM8 limit", ) i18n: I18N = Field( - default_factory=I18N, description="Internationalization settings." + default_factory=get_i18n, description="Internationalization settings." ) - - # Output and Formatting Properties response_format: type[BaseModel] | None = Field( default=None, description="Pydantic model for structured output" ) verbose: bool = Field( default=False, description="Whether to print execution details" ) - callbacks: list[Callable] = Field( - default_factory=list, description="Callbacks to be used for the agent" - ) - - # Guardrail Properties guardrail: GuardrailType | None = Field( default=None, description="Function or string description of a guardrail to validate agent output", @@ -145,17 +134,12 @@ class LiteAgent(FlowTrackable, BaseModel): guardrail_max_retries: int = Field( default=3, description="Maximum number of retries when guardrail fails" ) - - # State and Results tools_results: list[dict[str, Any]] = Field( default_factory=list, description="Results of the tools used by the agent." ) - - # Reference of Agent original_agent: BaseAgent | None = Field( default=None, description="Reference to the agent that created this LiteAgent" ) - # Private Attributes _parsed_tools: list[CrewStructuredTool] = PrivateAttr(default_factory=list) _token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess) _cache_handler: CacheHandler = PrivateAttr(default_factory=CacheHandler) @@ -165,6 +149,7 @@ class LiteAgent(FlowTrackable, BaseModel): _printer: Printer = PrivateAttr(default_factory=Printer) _guardrail: GuardrailCallable | None = PrivateAttr(default=None) _guardrail_retry_count: int = PrivateAttr(default=0) + _callbacks: list[TokenCalcHandler] = PrivateAttr(default_factory=list) @model_validator(mode="after") def setup_llm(self) -> Self: @@ -174,15 +159,13 @@ class LiteAgent(FlowTrackable, BaseModel): raise ValueError( f"Expected LLM instance of type BaseLLM, got {type(self.llm).__name__}" ) - - # Initialize callbacks token_callback = TokenCalcHandler(token_cost_process=self._token_process) self._callbacks = [token_callback] return self @model_validator(mode="after") - def parse_tools(self): + def parse_tools(self) -> Self: """Parse the tools and convert them to CrewStructuredTool instances.""" self._parsed_tools = parse_tools(self.tools) @@ -201,7 +184,7 @@ class LiteAgent(FlowTrackable, BaseModel): ) self._guardrail = cast( GuardrailCallable, - LLMGuardrail(description=self.guardrail, llm=self.llm), + cast(object, LLMGuardrail(description=self.guardrail, llm=self.llm)), ) return self @@ -209,8 +192,8 @@ class LiteAgent(FlowTrackable, BaseModel): @field_validator("guardrail", mode="before") @classmethod def validate_guardrail_function( - cls, v: Callable | str | None - ) -> Callable | str | None: + cls, v: GuardrailCallable | str | None + ) -> GuardrailCallable | str | None: """Validate that the guardrail function has the correct signature. If v is a callable, validate that it has the correct signature. @@ -559,7 +542,7 @@ class LiteAgent(FlowTrackable, BaseModel): self._show_logs(formatted_answer) return formatted_answer - def _show_logs(self, formatted_answer: AgentAction | AgentFinish): + def _show_logs(self, formatted_answer: AgentAction | AgentFinish) -> None: """Show logs for the agent's execution.""" crewai_event_bus.emit( self, @@ -574,4 +557,4 @@ class LiteAgent(FlowTrackable, BaseModel): self, text: str, role: Literal["user", "assistant", "system"] = "assistant" ) -> None: """Append a message to the message list with the given role.""" - self._messages.append(cast(LLMMessage, format_message_for_llm(text, role=role))) + self._messages.append(format_message_for_llm(text, role=role)) diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index 85efe331e..36c406156 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -1,6 +1,5 @@ from __future__ import annotations -from collections.abc import Callable from concurrent.futures import Future from copy import copy as shallow_copy import datetime @@ -29,6 +28,7 @@ from pydantic import ( model_validator, ) from pydantic_core import PydanticCustomError +from typing_extensions import Self from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.events.event_bus import crewai_event_bus @@ -52,7 +52,7 @@ from crewai.utilities.guardrail_types import ( GuardrailType, GuardrailsType, ) -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import I18N, get_i18n from crewai.utilities.printer import Printer from crewai.utilities.string_utils import interpolate_only @@ -90,7 +90,7 @@ class Task(BaseModel): used_tools: int = 0 tools_errors: int = 0 delegations: int = 0 - i18n: I18N = Field(default_factory=I18N) + i18n: I18N = Field(default_factory=get_i18n) name: str | None = Field(default=None) prompt_context: str | None = None description: str = Field(description="Description of the actual task.") @@ -207,8 +207,8 @@ class Task(BaseModel): @field_validator("guardrail") @classmethod def validate_guardrail_function( - cls, v: str | Callable | None - ) -> str | Callable | None: + cls, v: str | GuardrailCallable | None + ) -> str | GuardrailCallable | None: """ If v is a callable, validate that the guardrail function has the correct signature and behavior. If v is a string, return it as is. @@ -265,11 +265,11 @@ class Task(BaseModel): @model_validator(mode="before") @classmethod - def process_model_config(cls, values): + def process_model_config(cls, values: dict[str, Any]) -> dict[str, Any]: return process_config(values, cls) @model_validator(mode="after") - def validate_required_fields(self): + def validate_required_fields(self) -> Self: required_fields = ["description", "expected_output"] for field in required_fields: if getattr(self, field) is None: @@ -418,14 +418,14 @@ class Task(BaseModel): return self @model_validator(mode="after") - def check_tools(self): + def check_tools(self) -> Self: """Check if the tools are set.""" if not self.tools and self.agent and self.agent.tools: - self.tools.extend(self.agent.tools) + self.tools = self.agent.tools return self @model_validator(mode="after") - def check_output(self): + def check_output(self) -> Self: """Check if an output type is set.""" output_types = [self.output_json, self.output_pydantic] if len([type for type in output_types if type]) > 1: @@ -437,7 +437,7 @@ class Task(BaseModel): return self @model_validator(mode="after") - def handle_max_retries_deprecation(self): + def handle_max_retries_deprecation(self) -> Self: if self.max_retries is not None: warnings.warn( "The 'max_retries' parameter is deprecated and will be removed in CrewAI v1.0.0. " @@ -518,7 +518,7 @@ class Task(BaseModel): tools = tools or self.tools or [] self.processed_by_agents.add(agent.role) - crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self)) + crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self)) # type: ignore[no-untyped-call] result = agent.execute_task( task=self, context=context, @@ -576,12 +576,13 @@ class Task(BaseModel): ) self._save_file(content) crewai_event_bus.emit( - self, TaskCompletedEvent(output=task_output, task=self) + self, + TaskCompletedEvent(output=task_output, task=self), # type: ignore[no-untyped-call] ) return task_output except Exception as e: self.end_time = datetime.datetime.now() - crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self)) + crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self)) # type: ignore[no-untyped-call] raise e # Re-raise the exception after emitting the event def prompt(self) -> str: @@ -786,7 +787,7 @@ Follow these guidelines: return OutputFormat.PYDANTIC return OutputFormat.RAW - def _save_file(self, result: dict | str | Any) -> None: + def _save_file(self, result: dict[str, Any] | str | Any) -> None: """Save task output to a file. Note: @@ -838,7 +839,7 @@ Follow these guidelines: ) from e return - def __repr__(self): + def __repr__(self) -> str: return f"Task(description={self.description}, expected_output={self.expected_output})" @property diff --git a/lib/crewai/src/crewai/tools/agent_tools/agent_tools.py b/lib/crewai/src/crewai/tools/agent_tools/agent_tools.py index a5f444c25..0a1fd32e3 100644 --- a/lib/crewai/src/crewai/tools/agent_tools/agent_tools.py +++ b/lib/crewai/src/crewai/tools/agent_tools/agent_tools.py @@ -1,8 +1,16 @@ -from crewai.agents.agent_builder.base_agent import BaseAgent +from __future__ import annotations + +from typing import TYPE_CHECKING + from crewai.tools.agent_tools.ask_question_tool import AskQuestionTool from crewai.tools.agent_tools.delegate_work_tool import DelegateWorkTool -from crewai.tools.base_tool import BaseTool -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import get_i18n + + +if TYPE_CHECKING: + from crewai.agents.agent_builder.base_agent import BaseAgent + from crewai.tools.base_tool import BaseTool + from crewai.utilities.i18n import I18N class AgentTools: @@ -10,7 +18,7 @@ class AgentTools: def __init__(self, agents: list[BaseAgent], i18n: I18N | None = None) -> None: self.agents = agents - self.i18n = i18n if i18n is not None else I18N() + self.i18n = i18n if i18n is not None else get_i18n() def tools(self) -> list[BaseTool]: """Get all available agent tools""" diff --git a/lib/crewai/src/crewai/tools/agent_tools/base_agent_tools.py b/lib/crewai/src/crewai/tools/agent_tools/base_agent_tools.py index db8e43a17..8e5b959a4 100644 --- a/lib/crewai/src/crewai/tools/agent_tools/base_agent_tools.py +++ b/lib/crewai/src/crewai/tools/agent_tools/base_agent_tools.py @@ -1,12 +1,12 @@ import logging +from typing import Any from pydantic import Field from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.task import Task from crewai.tools.base_tool import BaseTool -from crewai.utilities.i18n import I18N - +from crewai.utilities.i18n import I18N, get_i18n logger = logging.getLogger(__name__) @@ -17,7 +17,7 @@ class BaseAgentTool(BaseTool): agents: list[BaseAgent] = Field(description="List of available agents") i18n: I18N = Field( - default_factory=I18N, description="Internationalization settings" + default_factory=get_i18n, description="Internationalization settings" ) def sanitize_agent_name(self, name: str) -> str: @@ -40,7 +40,7 @@ class BaseAgentTool(BaseTool): return normalized.replace('"', "").casefold() @staticmethod - def _get_coworker(coworker: str | None, **kwargs) -> str | None: + def _get_coworker(coworker: str | None, **kwargs: Any) -> str | None: coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker") if coworker: is_list = coworker.startswith("[") and coworker.endswith("]") @@ -83,7 +83,7 @@ class BaseAgentTool(BaseTool): available_agents = [agent.role for agent in self.agents] logger.debug(f"Available agents: {available_agents}") - agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None") + agent = [ available_agent for available_agent in self.agents if self.sanitize_agent_name(available_agent.role) == sanitized_name diff --git a/lib/crewai/src/crewai/tools/tool_usage.py b/lib/crewai/src/crewai/tools/tool_usage.py index e6ad9c357..6f0e92cb8 100644 --- a/lib/crewai/src/crewai/tools/tool_usage.py +++ b/lib/crewai/src/crewai/tools/tool_usage.py @@ -7,10 +7,10 @@ import json from json import JSONDecodeError from textwrap import dedent import time -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal import json5 -from json_repair import repair_json # type: ignore[import-untyped,import-error] +from json_repair import repair_json # type: ignore[import-untyped] from crewai.events.event_bus import crewai_event_bus from crewai.events.types.tool_usage_events import ( @@ -28,7 +28,7 @@ from crewai.utilities.agent_utils import ( render_text_description_and_args, ) from crewai.utilities.converter import Converter -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import I18N, get_i18n from crewai.utilities.printer import Printer @@ -39,7 +39,18 @@ if TYPE_CHECKING: from crewai.llm import LLM from crewai.task import Task -OPENAI_BIGGER_MODELS = [ + +OPENAI_BIGGER_MODELS: list[ + Literal[ + "gpt-4", + "gpt-4o", + "o1-preview", + "o1-mini", + "o1", + "o3", + "o3-mini", + ] +] = [ "gpt-4", "gpt-4o", "o1-preview", @@ -81,7 +92,7 @@ class ToolUsage: action: Any = None, fingerprint_context: dict[str, str] | None = None, ) -> None: - self._i18n: I18N = agent.i18n if agent else I18N() + self._i18n: I18N = agent.i18n if agent else get_i18n() self._printer: Printer = Printer() self._telemetry: Telemetry = Telemetry() self._run_attempts: int = 1 @@ -100,12 +111,14 @@ class ToolUsage: # Set the maximum parsing attempts for bigger models if ( self.function_calling_llm - and self.function_calling_llm in OPENAI_BIGGER_MODELS + and self.function_calling_llm.model in OPENAI_BIGGER_MODELS ): self._max_parsing_attempts = 2 self._remember_format_after_usages = 4 - def parse_tool_calling(self, tool_string: str): + def parse_tool_calling( + self, tool_string: str + ) -> ToolCalling | InstructorToolCalling | ToolUsageError: """Parse the tool string and return the tool calling.""" return self._tool_calling(tool_string) @@ -153,7 +166,7 @@ class ToolUsage: tool: CrewStructuredTool, calling: ToolCalling | InstructorToolCalling, ) -> str: - if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None) + if self._check_tool_repeated_usage(calling=calling): try: result = self._i18n.errors("task_repeated_usage").format( tool_names=self.tools_names @@ -163,7 +176,7 @@ class ToolUsage: tool_name=tool.name, attempts=self._run_attempts, ) - return self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None) + return self._format_result(result=result) except Exception: if self.task: @@ -241,7 +254,7 @@ class ToolUsage: try: acceptable_args = tool.args_schema.model_json_schema()[ "properties" - ].keys() # type: ignore + ].keys() arguments = { k: v for k, v in calling.arguments.items() @@ -276,19 +289,19 @@ class ToolUsage: self._printer.print( content=f"\n\n{error_message}\n", color="red" ) - return error # type: ignore # No return value expected + return error if self.task: self.task.increment_tools_errors() - return self.use(calling=calling, tool_string=tool_string) # type: ignore # No return value expected + return self.use(calling=calling, tool_string=tool_string) if self.tools_handler: should_cache = True if ( hasattr(available_tool, "cache_function") - and available_tool.cache_function # type: ignore # Item "None" of "Any | None" has no attribute "cache_function" + and available_tool.cache_function ): - should_cache = available_tool.cache_function( # type: ignore # Item "None" of "Any | None" has no attribute "cache_function" + should_cache = available_tool.cache_function( calling.arguments, result ) @@ -300,7 +313,7 @@ class ToolUsage: tool_name=tool.name, attempts=self._run_attempts, ) - result = self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None) + result = self._format_result(result=result) data = { "result": result, "tool_name": tool.name, @@ -508,7 +521,7 @@ class ToolUsage: self.task.increment_tools_errors() if self.agent and self.agent.verbose: self._printer.print(content=f"\n\n{e}\n", color="red") - return ToolUsageError( # type: ignore # Incompatible return value type (got "ToolUsageError", expected "ToolCalling | InstructorToolCalling") + return ToolUsageError( f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}" ) return self._tool_calling(tool_string) @@ -567,7 +580,7 @@ class ToolUsage: # If all parsing attempts fail, raise an error raise Exception(error_message) - def _emit_validate_input_error(self, final_error: str): + def _emit_validate_input_error(self, final_error: str) -> None: tool_selection_data = { "agent_key": getattr(self.agent, "key", None) if self.agent else None, "agent_role": getattr(self.agent, "role", None) if self.agent else None, @@ -636,7 +649,7 @@ class ToolUsage: def _prepare_event_data( self, tool: Any, tool_calling: ToolCalling | InstructorToolCalling - ) -> dict: + ) -> dict[str, Any]: event_data = { "run_attempts": self._run_attempts, "delegations": self.task.delegations if self.task else 0, @@ -660,7 +673,7 @@ class ToolUsage: return event_data - def _add_fingerprint_metadata(self, arguments: dict) -> dict: + def _add_fingerprint_metadata(self, arguments: dict[str, Any]) -> dict[str, Any]: """Add fingerprint metadata to tool arguments if available. Args: diff --git a/lib/crewai/src/crewai/utilities/i18n.py b/lib/crewai/src/crewai/utilities/i18n.py index 5bc8c764c..104e452a7 100644 --- a/lib/crewai/src/crewai/utilities/i18n.py +++ b/lib/crewai/src/crewai/utilities/i18n.py @@ -1,5 +1,6 @@ """Internationalization support for CrewAI prompts and messages.""" +from functools import lru_cache import json import os from typing import Literal @@ -108,3 +109,19 @@ class I18N(BaseModel): return self._prompts[kind][key] except Exception as e: raise Exception(f"Prompt for '{kind}':'{key}' not found.") from e + + +@lru_cache(maxsize=None) +def get_i18n(prompt_file: str | None = None) -> I18N: + """Get a cached I18N instance. + + This function caches I18N instances to avoid redundant file I/O and JSON parsing. + Each unique prompt_file path gets its own cached instance. + + Args: + prompt_file: Optional custom prompt file path. Defaults to None (uses built-in prompts). + + Returns: + Cached I18N instance. + """ + return I18N(prompt_file=prompt_file) diff --git a/lib/crewai/src/crewai/utilities/prompts.py b/lib/crewai/src/crewai/utilities/prompts.py index 4d3e168e1..890c8a626 100644 --- a/lib/crewai/src/crewai/utilities/prompts.py +++ b/lib/crewai/src/crewai/utilities/prompts.py @@ -1,29 +1,38 @@ +"""Prompt generation and management utilities for CrewAI agents.""" + from __future__ import annotations -from typing import Any, TypedDict +from typing import Annotated, Any, Literal, TypedDict from pydantic import BaseModel, Field -from crewai.utilities.i18n import I18N +from crewai.utilities.i18n import I18N, get_i18n class StandardPromptResult(TypedDict): """Result with only prompt field for standard mode.""" - prompt: str + prompt: Annotated[str, "The generated prompt string"] class SystemPromptResult(StandardPromptResult): """Result with system, user, and prompt fields for system prompt mode.""" - system: str - user: str + system: Annotated[str, "The system prompt component"] + user: Annotated[str, "The user prompt component"] + + +COMPONENTS = Literal["role_playing", "tools", "no_tools", "task"] class Prompts(BaseModel): - """Manages and generates prompts for a generic agent.""" + """Manages and generates prompts for a generic agent. - i18n: I18N = Field(default_factory=I18N) + Notes: + - Need to refactor so that prompt is not tightly coupled to agent. + """ + + i18n: I18N = Field(default_factory=get_i18n) has_tools: bool = Field( default=False, description="Indicates if the agent has access to tools" ) @@ -36,7 +45,7 @@ class Prompts(BaseModel): response_template: str | None = Field( default=None, description="Custom response prompt template" ) - use_system_prompt: bool | None = Field( + use_system_prompt: bool = Field( default=False, description="Whether to use the system prompt when no custom templates are provided", ) @@ -48,7 +57,7 @@ class Prompts(BaseModel): Returns: A dictionary containing the constructed prompt(s). """ - slices: list[str] = ["role_playing"] + slices: list[COMPONENTS] = ["role_playing"] if self.has_tools: slices.append("tools") else: @@ -77,7 +86,7 @@ class Prompts(BaseModel): def _build_prompt( self, - components: list[str], + components: list[COMPONENTS], system_template: str | None = None, prompt_template: str | None = None, response_template: str | None = None, diff --git a/lib/crewai/src/crewai/utilities/reasoning_handler.py b/lib/crewai/src/crewai/utilities/reasoning_handler.py index fb78e3e64..660354d20 100644 --- a/lib/crewai/src/crewai/utilities/reasoning_handler.py +++ b/lib/crewai/src/crewai/utilities/reasoning_handler.py @@ -13,7 +13,6 @@ from crewai.events.types.reasoning_events import ( ) from crewai.llm import LLM from crewai.task import Task -from crewai.utilities.i18n import I18N class ReasoningPlan(BaseModel): @@ -62,7 +61,6 @@ class AgentReasoning: agent: The agent performing the reasoning. llm: The language model used for reasoning. logger: Logger for logging events and errors. - i18n: Internationalization utility for retrieving prompts. """ def __init__(self, task: Task, agent: Agent) -> None: @@ -76,7 +74,6 @@ class AgentReasoning: self.agent = agent self.llm = cast(LLM, agent.llm) self.logger = logging.getLogger(__name__) - self.i18n = I18N() def handle_agent_reasoning(self) -> AgentReasoningOutput: """Public method for the reasoning process that creates and refines a plan for the task until the agent is ready to execute it. @@ -163,8 +160,7 @@ class AgentReasoning: llm=self.llm, prompt=reasoning_prompt, task=self.task, - agent=self.agent, - i18n=self.i18n, + reasoning_agent=self.agent, backstory=self.__get_agent_backstory(), plan_type="initial_plan", ) @@ -208,8 +204,7 @@ class AgentReasoning: llm=self.llm, prompt=refine_prompt, task=self.task, - agent=self.agent, - i18n=self.i18n, + reasoning_agent=self.agent, backstory=self.__get_agent_backstory(), plan_type="refine_plan", ) @@ -238,14 +233,14 @@ class AgentReasoning: self.logger.debug(f"Using function calling for {prompt_type} reasoning") try: - system_prompt = self.i18n.retrieve("reasoning", prompt_type).format( + system_prompt = self.agent.i18n.retrieve("reasoning", prompt_type).format( role=self.agent.role, goal=self.agent.goal, backstory=self.__get_agent_backstory(), ) # Prepare a simple callable that just returns the tool arguments as JSON - def _create_reasoning_plan(plan: str, ready: bool = True): + def _create_reasoning_plan(plan: str, ready: bool = True) -> str: """Return the reasoning plan result in JSON string form.""" return json.dumps({"plan": plan, "ready": ready}) @@ -281,7 +276,9 @@ class AgentReasoning: ) try: - system_prompt = self.i18n.retrieve("reasoning", prompt_type).format( + system_prompt = self.agent.i18n.retrieve( + "reasoning", prompt_type + ).format( role=self.agent.role, goal=self.agent.goal, backstory=self.__get_agent_backstory(), @@ -326,7 +323,7 @@ class AgentReasoning: """ available_tools = self.__format_available_tools() - return self.i18n.retrieve("reasoning", "create_plan_prompt").format( + return self.agent.i18n.retrieve("reasoning", "create_plan_prompt").format( role=self.agent.role, goal=self.agent.goal, backstory=self.__get_agent_backstory(), @@ -357,7 +354,7 @@ class AgentReasoning: Returns: str: The refine prompt. """ - return self.i18n.retrieve("reasoning", "refine_plan_prompt").format( + return self.agent.i18n.retrieve("reasoning", "refine_plan_prompt").format( role=self.agent.role, goal=self.agent.goal, backstory=self.__get_agent_backstory(), @@ -405,8 +402,7 @@ def _call_llm_with_reasoning_prompt( llm: LLM, prompt: str, task: Task, - agent: Agent, - i18n: I18N, + reasoning_agent: Agent, backstory: str, plan_type: Literal["initial_plan", "refine_plan"], ) -> str: @@ -416,17 +412,16 @@ def _call_llm_with_reasoning_prompt( llm: The language model to use. prompt: The prompt to send to the LLM. task: The task for which the agent is reasoning. - agent: The agent performing the reasoning. - i18n: Internationalization utility for retrieving prompts. + reasoning_agent: The agent performing the reasoning. backstory: The agent's backstory. plan_type: The type of plan being created ("initial_plan" or "refine_plan"). Returns: The LLM response. """ - system_prompt = i18n.retrieve("reasoning", plan_type).format( - role=agent.role, - goal=agent.goal, + system_prompt = reasoning_agent.i18n.retrieve("reasoning", plan_type).format( + role=reasoning_agent.role, + goal=reasoning_agent.goal, backstory=backstory, ) @@ -436,6 +431,6 @@ def _call_llm_with_reasoning_prompt( {"role": "user", "content": prompt}, ], from_task=task, - from_agent=agent, + from_agent=reasoning_agent, ) return str(response)