From ac93c8107659a4852e20ac8e811ce2152b46a9be Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 10 Sep 2025 12:02:21 +0000 Subject: [PATCH] fix: Resolve remaining lint issues - RET504, N806, B007, PERF102, F841 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix unnecessary assignments before return statements in crew.py - Change VALID_TYPES to lowercase valid_types variable naming - Optimize loop to use .values() instead of .items() when keys not needed - Remove unused variables in test_integration.py - All lint checks now pass locally Co-Authored-By: João --- src/crewai/crew.py | 294 +++++++++++------------ tests/responsibility/test_integration.py | 151 ++++++------ 2 files changed, 216 insertions(+), 229 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 6285d166e..c6ee4b17e 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -3,26 +3,17 @@ import json import re import uuid import warnings +from collections.abc import Callable from concurrent.futures import Future from copy import copy as shallow_copy from hashlib import md5 from typing import ( Any, - Callable, - Dict, - List, - Optional, - Set, - Tuple, - Union, cast, ) from opentelemetry import baggage from opentelemetry.context import attach, detach - -from crewai.utilities.crew.models import CrewContext - from pydantic import ( UUID4, BaseModel, @@ -39,6 +30,25 @@ from crewai.agent import Agent from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.cache import CacheHandler from crewai.crews.crew_output import CrewOutput +from crewai.events.event_bus import crewai_event_bus +from crewai.events.event_listener import EventListener +from crewai.events.listeners.tracing.trace_listener import ( + TraceCollectionListener, +) +from crewai.events.listeners.tracing.utils import ( + is_tracing_enabled, +) +from crewai.events.types.crew_events import ( + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, + CrewKickoffStartedEvent, + CrewTestCompletedEvent, + CrewTestFailedEvent, + CrewTestStartedEvent, + CrewTrainCompletedEvent, + CrewTrainFailedEvent, + CrewTrainStartedEvent, +) from crewai.flow.flow_trackable import FlowTrackable from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource @@ -57,29 +67,9 @@ from crewai.tools.base_tool import BaseTool, Tool from crewai.types.usage_metrics import UsageMetrics from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE +from crewai.utilities.crew.models import CrewContext from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator -from crewai.events.types.crew_events import ( - CrewKickoffCompletedEvent, - CrewKickoffFailedEvent, - CrewKickoffStartedEvent, - CrewTestCompletedEvent, - CrewTestFailedEvent, - CrewTestStartedEvent, - CrewTrainCompletedEvent, - CrewTrainFailedEvent, - CrewTrainStartedEvent, -) -from crewai.events.event_bus import crewai_event_bus -from crewai.events.event_listener import EventListener -from crewai.events.listeners.tracing.trace_listener import ( - TraceCollectionListener, -) - - -from crewai.events.listeners.tracing.utils import ( - is_tracing_enabled, -) from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, @@ -124,13 +114,13 @@ class Crew(FlowTrackable, BaseModel): _logger: Logger = PrivateAttr() _file_handler: FileHandler = PrivateAttr() _cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler()) - _short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr() - _long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr() - _entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr() - _external_memory: Optional[InstanceOf[ExternalMemory]] = PrivateAttr() - _train: Optional[bool] = PrivateAttr(default=False) - _train_iteration: Optional[int] = PrivateAttr() - _inputs: Optional[Dict[str, Any]] = PrivateAttr(default=None) + _short_term_memory: InstanceOf[ShortTermMemory] | None = PrivateAttr() + _long_term_memory: InstanceOf[LongTermMemory] | None = PrivateAttr() + _entity_memory: InstanceOf[EntityMemory] | None = PrivateAttr() + _external_memory: InstanceOf[ExternalMemory] | None = PrivateAttr() + _train: bool | None = PrivateAttr(default=False) + _train_iteration: int | None = PrivateAttr() + _inputs: dict[str, Any] | None = PrivateAttr(default=None) _logging_color: str = PrivateAttr( default="bold_purple", ) @@ -138,107 +128,107 @@ class Crew(FlowTrackable, BaseModel): default_factory=TaskOutputStorageHandler ) - name: Optional[str] = Field(default="crew") + name: str | None = Field(default="crew") cache: bool = Field(default=True) - tasks: List[Task] = Field(default_factory=list) - agents: List[BaseAgent] = Field(default_factory=list) + tasks: list[Task] = Field(default_factory=list) + agents: list[BaseAgent] = Field(default_factory=list) process: Process = Field(default=Process.sequential) verbose: bool = Field(default=False) memory: bool = Field( default=False, description="Whether the crew should use memory to store memories of it's execution", ) - short_term_memory: Optional[InstanceOf[ShortTermMemory]] = Field( + short_term_memory: InstanceOf[ShortTermMemory] | None = Field( default=None, description="An Instance of the ShortTermMemory to be used by the Crew", ) - long_term_memory: Optional[InstanceOf[LongTermMemory]] = Field( + long_term_memory: InstanceOf[LongTermMemory] | None = Field( default=None, description="An Instance of the LongTermMemory to be used by the Crew", ) - entity_memory: Optional[InstanceOf[EntityMemory]] = Field( + entity_memory: InstanceOf[EntityMemory] | None = Field( default=None, description="An Instance of the EntityMemory to be used by the Crew", ) - external_memory: Optional[InstanceOf[ExternalMemory]] = Field( + external_memory: InstanceOf[ExternalMemory] | None = Field( default=None, description="An Instance of the ExternalMemory to be used by the Crew", ) - embedder: Optional[dict] = Field( + embedder: dict | None = Field( default=None, description="Configuration for the embedder to be used for the crew.", ) - usage_metrics: Optional[UsageMetrics] = Field( + usage_metrics: UsageMetrics | None = Field( default=None, description="Metrics for the LLM usage during all tasks execution.", ) - manager_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field( + manager_llm: str | InstanceOf[BaseLLM] | Any | None = Field( description="Language model that will run the agent.", default=None ) - manager_agent: Optional[BaseAgent] = Field( + manager_agent: BaseAgent | None = Field( description="Custom agent that will be used as manager.", default=None ) - function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field( + function_calling_llm: str | InstanceOf[LLM] | Any | None = Field( description="Language model that will run the agent.", default=None ) - config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None) + config: Json | dict[str, Any] | None = Field(default=None) id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True) - share_crew: Optional[bool] = Field(default=False) - step_callback: Optional[Any] = Field( + share_crew: bool | None = Field(default=False) + step_callback: Any | None = Field( default=None, description="Callback to be executed after each step for all agents execution.", ) - task_callback: Optional[Any] = Field( + task_callback: Any | None = Field( default=None, description="Callback to be executed after each task for all agents execution.", ) - before_kickoff_callbacks: List[ - Callable[[Optional[Dict[str, Any]]], Optional[Dict[str, Any]]] + before_kickoff_callbacks: list[ + Callable[[dict[str, Any] | None], dict[str, Any] | None] ] = Field( default_factory=list, description="List of callbacks to be executed before crew kickoff. It may be used to adjust inputs before the crew is executed.", ) - after_kickoff_callbacks: List[Callable[[CrewOutput], CrewOutput]] = Field( + after_kickoff_callbacks: list[Callable[[CrewOutput], CrewOutput]] = Field( default_factory=list, description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.", ) - max_rpm: Optional[int] = Field( + max_rpm: int | None = Field( default=None, description="Maximum number of requests per minute for the crew execution to be respected.", ) - prompt_file: Optional[str] = Field( + prompt_file: str | None = Field( default=None, description="Path to the prompt json file to be used for the crew.", ) - output_log_file: Optional[Union[bool, str]] = Field( + output_log_file: bool | str | None = Field( default=None, description="Path to the log file to be saved", ) - planning: Optional[bool] = Field( + planning: bool | None = Field( default=False, description="Plan the crew execution and add the plan to the crew.", ) - planning_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field( + planning_llm: str | InstanceOf[BaseLLM] | Any | None = Field( default=None, description="Language model that will run the AgentPlanner if planning is True.", ) - task_execution_output_json_files: Optional[List[str]] = Field( + task_execution_output_json_files: list[str] | None = Field( default=None, description="List of file paths for task execution JSON files.", ) - execution_logs: List[Dict[str, Any]] = Field( + execution_logs: list[dict[str, Any]] = Field( default=[], description="List of execution logs for tasks", ) - knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field( + knowledge_sources: list[BaseKnowledgeSource] | None = Field( default=None, description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.", ) - chat_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field( + chat_llm: str | InstanceOf[BaseLLM] | Any | None = Field( default=None, description="LLM used to handle chatting with the crew.", ) - knowledge: Optional[Knowledge] = Field( + knowledge: Knowledge | None = Field( default=None, description="Knowledge for the crew.", ) @@ -246,18 +236,18 @@ class Crew(FlowTrackable, BaseModel): default_factory=SecurityConfig, description="Security configuration for the crew, including fingerprinting.", ) - token_usage: Optional[UsageMetrics] = Field( + token_usage: UsageMetrics | None = Field( default=None, description="Metrics for the LLM usage during all tasks execution.", ) - tracing: Optional[bool] = Field( + tracing: bool | None = Field( default=False, description="Whether to enable tracing for the crew.", ) @field_validator("id", mode="before") @classmethod - def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: + def _deny_user_set_id(cls, v: UUID4 | None) -> None: """Prevent manual setting of the 'id' field by users.""" if v: raise PydanticCustomError( @@ -267,8 +257,8 @@ class Crew(FlowTrackable, BaseModel): @field_validator("config", mode="before") @classmethod def check_config_type( - cls, v: Union[Json, Dict[str, Any]] - ) -> Union[Json, Dict[str, Any]]: + cls, v: Json | dict[str, Any] + ) -> Json | dict[str, Any]: """Validates that the config is a valid type. Args: v: The config to be validated. @@ -323,7 +313,7 @@ class Crew(FlowTrackable, BaseModel): def create_crew_memory(self) -> "Crew": """Initialize private memory attributes.""" self._external_memory = ( - # External memory doesn’t support a default value since it was designed to be managed entirely externally + # External memory doesn't support a default value since it was designed to be managed entirely externally self.external_memory.set_crew(self) if self.external_memory else None ) @@ -513,7 +503,7 @@ class Crew(FlowTrackable, BaseModel): @property def key(self) -> str: - source: List[str] = [agent.key for agent in self.agents] + [ + source: list[str] = [agent.key for agent in self.agents] + [ task.key for task in self.tasks ] return md5("|".join(source).encode(), usedforsecurity=False).hexdigest() @@ -541,7 +531,7 @@ class Crew(FlowTrackable, BaseModel): self.agents = [Agent(**agent) for agent in self.config["agents"]] self.tasks = [self._create_task(task) for task in self.config["tasks"]] - def _create_task(self, task_config: Dict[str, Any]) -> Task: + def _create_task(self, task_config: dict[str, Any]) -> Task: """Creates a task instance from its configuration. Args: @@ -570,7 +560,7 @@ class Crew(FlowTrackable, BaseModel): CrewTrainingHandler(filename).initialize_file() def train( - self, n_iterations: int, filename: str, inputs: Optional[Dict[str, Any]] = None + self, n_iterations: int, filename: str, inputs: dict[str, Any] | None = None ) -> None: """Trains the crew for a given number of iterations.""" inputs = inputs or {} @@ -622,7 +612,7 @@ class Crew(FlowTrackable, BaseModel): def kickoff( self, - inputs: Optional[Dict[str, Any]] = None, + inputs: dict[str, Any] | None = None, ) -> CrewOutput: ctx = baggage.set_baggage( "crew_context", CrewContext(id=str(self.id), key=self.key) @@ -693,9 +683,9 @@ class Crew(FlowTrackable, BaseModel): finally: detach(token) - def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: + def kickoff_for_each(self, inputs: list[dict[str, Any]]) -> list[CrewOutput]: """Executes the Crew's workflow for each input in the list and aggregates results.""" - results: List[CrewOutput] = [] + results: list[CrewOutput] = [] # Initialize the parent crew's usage metrics total_usage_metrics = UsageMetrics() @@ -715,13 +705,13 @@ class Crew(FlowTrackable, BaseModel): return results async def kickoff_async( - self, inputs: Optional[Dict[str, Any]] = None + self, inputs: dict[str, Any] | None = None ) -> CrewOutput: """Asynchronous kickoff method to start the crew execution.""" inputs = inputs or {} return await asyncio.to_thread(self.kickoff, inputs) - async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[CrewOutput]: + async def kickoff_for_each_async(self, inputs: list[dict]) -> list[CrewOutput]: crew_copies = [self.copy() for _ in inputs] async def run_crew(crew, input_data): @@ -750,7 +740,7 @@ class Crew(FlowTrackable, BaseModel): tasks=self.tasks, planning_agent_llm=self.planning_llm )._handle_crew_planning() - for task, step_plan in zip(self.tasks, result.list_of_plans_per_task): + for task, step_plan in zip(self.tasks, result.list_of_plans_per_task, strict=False): task.description += step_plan.plan def _store_execution_log( @@ -789,10 +779,10 @@ class Crew(FlowTrackable, BaseModel): def _run_hierarchical_process(self) -> CrewOutput: """Creates and assigns a manager agent to make sure the crew completes the tasks.""" self._create_manager_agent() - + if self.manager_agent and self.responsibility_system: self.manager_agent.set_responsibility_system(self.responsibility_system) - + return self._execute_tasks(self.tasks) def _create_manager_agent(self): @@ -822,8 +812,8 @@ class Crew(FlowTrackable, BaseModel): def _execute_tasks( self, - tasks: List[Task], - start_index: Optional[int] = 0, + tasks: list[Task], + start_index: int | None = 0, was_replayed: bool = False, ) -> CrewOutput: """Executes tasks sequentially and returns the final output. @@ -836,9 +826,9 @@ class Crew(FlowTrackable, BaseModel): CrewOutput: Final output of the crew """ - task_outputs: List[TaskOutput] = [] - futures: List[Tuple[Task, Future[TaskOutput], int]] = [] - last_sync_output: Optional[TaskOutput] = None + task_outputs: list[TaskOutput] = [] + futures: list[tuple[Task, Future[TaskOutput], int]] = [] + last_sync_output: TaskOutput | None = None for task_index, task in enumerate(tasks): if start_index is not None and task_index < start_index: @@ -862,7 +852,7 @@ class Crew(FlowTrackable, BaseModel): tools_for_task = self._prepare_tools( agent_to_use, task, - cast(Union[List[Tool], List[BaseTool]], tools_for_task), + cast(list[Tool] | list[BaseTool], tools_for_task), ) self._log_task_start(task, agent_to_use.role) @@ -882,7 +872,7 @@ class Crew(FlowTrackable, BaseModel): future = task.execute_async( agent=agent_to_use, context=context, - tools=cast(List[BaseTool], tools_for_task), + tools=cast(list[BaseTool], tools_for_task), ) futures.append((task, future, task_index)) else: @@ -894,7 +884,7 @@ class Crew(FlowTrackable, BaseModel): task_output = task.execute_sync( agent=agent_to_use, context=context, - tools=cast(List[BaseTool], tools_for_task), + tools=cast(list[BaseTool], tools_for_task), ) task_outputs.append(task_output) self._process_task_result(task, task_output) @@ -908,11 +898,11 @@ class Crew(FlowTrackable, BaseModel): def _handle_conditional_task( self, task: ConditionalTask, - task_outputs: List[TaskOutput], - futures: List[Tuple[Task, Future[TaskOutput], int]], + task_outputs: list[TaskOutput], + futures: list[tuple[Task, Future[TaskOutput], int]], task_index: int, was_replayed: bool, - ) -> Optional[TaskOutput]: + ) -> TaskOutput | None: if futures: task_outputs = self._process_async_tasks(futures, was_replayed) futures.clear() @@ -932,8 +922,8 @@ class Crew(FlowTrackable, BaseModel): return None def _prepare_tools( - self, agent: BaseAgent, task: Task, tools: Union[List[Tool], List[BaseTool]] - ) -> List[BaseTool]: + self, agent: BaseAgent, task: Task, tools: list[Tool] | list[BaseTool] + ) -> list[BaseTool]: # Add delegation tools if agent allows delegation if hasattr(agent, "allow_delegation") and getattr( agent, "allow_delegation", False @@ -963,21 +953,21 @@ class Crew(FlowTrackable, BaseModel): tools = self._add_multimodal_tools(agent, tools) # Return a List[BaseTool] which is compatible with both Task.execute_sync and Task.execute_async - return cast(List[BaseTool], tools) + return cast(list[BaseTool], tools) - def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]: + def _get_agent_to_use(self, task: Task) -> BaseAgent | None: if self.process == Process.hierarchical: return self.manager_agent return task.agent def _merge_tools( self, - existing_tools: Union[List[Tool], List[BaseTool]], - new_tools: Union[List[Tool], List[BaseTool]], - ) -> List[BaseTool]: + existing_tools: list[Tool] | list[BaseTool], + new_tools: list[Tool] | list[BaseTool], + ) -> list[BaseTool]: """Merge new tools into existing tools list, avoiding duplicates by tool name.""" if not new_tools: - return cast(List[BaseTool], existing_tools) + return cast(list[BaseTool], existing_tools) # Create mapping of tool names to new tools new_tool_map = {tool.name: tool for tool in new_tools} @@ -988,41 +978,41 @@ class Crew(FlowTrackable, BaseModel): # Add all new tools tools.extend(new_tools) - return cast(List[BaseTool], tools) + return cast(list[BaseTool], tools) def _inject_delegation_tools( self, - tools: Union[List[Tool], List[BaseTool]], + tools: list[Tool] | list[BaseTool], task_agent: BaseAgent, - agents: List[BaseAgent], - ) -> List[BaseTool]: + agents: list[BaseAgent], + ) -> list[BaseTool]: if hasattr(task_agent, "get_delegation_tools"): delegation_tools = task_agent.get_delegation_tools(agents) # Cast delegation_tools to the expected type for _merge_tools - return self._merge_tools(tools, cast(List[BaseTool], delegation_tools)) - return cast(List[BaseTool], tools) + return self._merge_tools(tools, cast(list[BaseTool], delegation_tools)) + return cast(list[BaseTool], tools) def _add_multimodal_tools( - self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]] - ) -> List[BaseTool]: + self, agent: BaseAgent, tools: list[Tool] | list[BaseTool] + ) -> list[BaseTool]: if hasattr(agent, "get_multimodal_tools"): multimodal_tools = agent.get_multimodal_tools() # Cast multimodal_tools to the expected type for _merge_tools - return self._merge_tools(tools, cast(List[BaseTool], multimodal_tools)) - return cast(List[BaseTool], tools) + return self._merge_tools(tools, cast(list[BaseTool], multimodal_tools)) + return cast(list[BaseTool], tools) def _add_code_execution_tools( - self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]] - ) -> List[BaseTool]: + self, agent: BaseAgent, tools: list[Tool] | list[BaseTool] + ) -> list[BaseTool]: if hasattr(agent, "get_code_execution_tools"): code_tools = agent.get_code_execution_tools() # Cast code_tools to the expected type for _merge_tools - return self._merge_tools(tools, cast(List[BaseTool], code_tools)) - return cast(List[BaseTool], tools) + return self._merge_tools(tools, cast(list[BaseTool], code_tools)) + return cast(list[BaseTool], tools) def _add_delegation_tools( - self, task: Task, tools: Union[List[Tool], List[BaseTool]] - ) -> List[BaseTool]: + self, task: Task, tools: list[Tool] | list[BaseTool] + ) -> list[BaseTool]: agents_for_delegation = [agent for agent in self.agents if agent != task.agent] if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent: if not tools: @@ -1030,7 +1020,7 @@ class Crew(FlowTrackable, BaseModel): tools = self._inject_delegation_tools( tools, task.agent, agents_for_delegation ) - return cast(List[BaseTool], tools) + return cast(list[BaseTool], tools) def _log_task_start(self, task: Task, role: str = "None"): if self.output_log_file: @@ -1039,8 +1029,8 @@ class Crew(FlowTrackable, BaseModel): ) def _update_manager_tools( - self, task: Task, tools: Union[List[Tool], List[BaseTool]] - ) -> List[BaseTool]: + self, task: Task, tools: list[Tool] | list[BaseTool] + ) -> list[BaseTool]: if self.manager_agent: if task.agent: tools = self._inject_delegation_tools(tools, task.agent, [task.agent]) @@ -1048,18 +1038,17 @@ class Crew(FlowTrackable, BaseModel): tools = self._inject_delegation_tools( tools, self.manager_agent, self.agents ) - return cast(List[BaseTool], tools) + return cast(list[BaseTool], tools) - def _get_context(self, task: Task, task_outputs: List[TaskOutput]) -> str: + def _get_context(self, task: Task, task_outputs: list[TaskOutput]) -> str: if not task.context: return "" - context = ( + return ( aggregate_raw_outputs_from_task_outputs(task_outputs) if task.context is NOT_SPECIFIED else aggregate_raw_outputs_from_tasks(task.context) ) - return context def _process_task_result(self, task: Task, output: TaskOutput) -> None: role = task.agent.role if task.agent is not None else "None" @@ -1072,7 +1061,7 @@ class Crew(FlowTrackable, BaseModel): output=output.raw, ) - def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput: + def _create_crew_output(self, task_outputs: list[TaskOutput]) -> CrewOutput: if not task_outputs: raise ValueError("No task outputs available to create crew output.") @@ -1103,10 +1092,10 @@ class Crew(FlowTrackable, BaseModel): def _process_async_tasks( self, - futures: List[Tuple[Task, Future[TaskOutput], int]], + futures: list[tuple[Task, Future[TaskOutput], int]], was_replayed: bool = False, - ) -> List[TaskOutput]: - task_outputs: List[TaskOutput] = [] + ) -> list[TaskOutput]: + task_outputs: list[TaskOutput] = [] for future_task, future, task_index in futures: task_output = future.result() task_outputs.append(task_output) @@ -1117,8 +1106,8 @@ class Crew(FlowTrackable, BaseModel): return task_outputs def _find_task_index( - self, task_id: str, stored_outputs: List[Any] - ) -> Optional[int]: + self, task_id: str, stored_outputs: list[Any] + ) -> int | None: return next( ( index @@ -1129,7 +1118,7 @@ class Crew(FlowTrackable, BaseModel): ) def replay( - self, task_id: str, inputs: Optional[Dict[str, Any]] = None + self, task_id: str, inputs: dict[str, Any] | None = None ) -> CrewOutput: stored_outputs = self._task_output_handler.load() if not stored_outputs: @@ -1166,19 +1155,18 @@ class Crew(FlowTrackable, BaseModel): self.tasks[i].output = task_output self._logging_color = "bold_blue" - result = self._execute_tasks(self.tasks, start_index, True) - return result + return self._execute_tasks(self.tasks, start_index, True) def query_knowledge( - self, query: List[str], results_limit: int = 3, score_threshold: float = 0.35 - ) -> Union[List[Dict[str, Any]], None]: + self, query: list[str], results_limit: int = 3, score_threshold: float = 0.35 + ) -> list[dict[str, Any]] | None: if self.knowledge: return self.knowledge.query( query, results_limit=results_limit, score_threshold=score_threshold ) return None - def fetch_inputs(self) -> Set[str]: + def fetch_inputs(self) -> set[str]: """ Gathers placeholders (e.g., {something}) referenced in tasks or agents. Scans each task's 'description' + 'expected_output', and each agent's @@ -1187,7 +1175,7 @@ class Crew(FlowTrackable, BaseModel): Returns a set of all discovered placeholder names. """ placeholder_pattern = re.compile(r"\{(.+?)\}") - required_inputs: Set[str] = set() + required_inputs: set[str] = set() # Scan tasks for inputs for task in self.tasks: @@ -1245,7 +1233,7 @@ class Crew(FlowTrackable, BaseModel): cloned_tasks.append(cloned_task) task_mapping[task.key] = cloned_task - for cloned_task, original_task in zip(cloned_tasks, self.tasks): + for cloned_task, original_task in zip(cloned_tasks, self.tasks, strict=False): if isinstance(original_task.context, list): cloned_context = [ task_mapping[context_task.key] @@ -1271,7 +1259,7 @@ class Crew(FlowTrackable, BaseModel): copied_data.pop("agents", None) copied_data.pop("tasks", None) - copied_crew = Crew( + return Crew( **copied_data, agents=cloned_agents, tasks=cloned_tasks, @@ -1281,15 +1269,13 @@ class Crew(FlowTrackable, BaseModel): manager_llm=manager_llm, ) - return copied_crew - def _set_tasks_callbacks(self) -> None: """Sets callback for every task suing task_callback""" for task in self.tasks: if not task.callback: task.callback = self.task_callback - def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None: + def _interpolate_inputs(self, inputs: dict[str, Any]) -> None: """Interpolates the inputs in the tasks and agents.""" [ task.interpolate_inputs_and_add_conversation_history( @@ -1322,8 +1308,8 @@ class Crew(FlowTrackable, BaseModel): def test( self, n_iterations: int, - eval_llm: Union[str, InstanceOf[BaseLLM]], - inputs: Optional[Dict[str, Any]] = None, + eval_llm: str | InstanceOf[BaseLLM], + inputs: dict[str, Any] | None = None, ) -> None: """Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures.""" try: @@ -1379,7 +1365,7 @@ class Crew(FlowTrackable, BaseModel): ValueError: If an invalid command type is provided. RuntimeError: If memory reset operation fails. """ - VALID_TYPES = frozenset( + valid_types = frozenset( [ "long", "short", @@ -1392,9 +1378,9 @@ class Crew(FlowTrackable, BaseModel): ] ) - if command_type not in VALID_TYPES: + if command_type not in valid_types: raise ValueError( - f"Invalid command type. Must be one of: {', '.join(sorted(VALID_TYPES))}" + f"Invalid command type. Must be one of: {', '.join(sorted(valid_types))}" ) try: @@ -1404,7 +1390,7 @@ class Crew(FlowTrackable, BaseModel): self._reset_specific_memory(command_type) except Exception as e: - error_msg = f"Failed to reset {command_type} memory: {str(e)}" + error_msg = f"Failed to reset {command_type} memory: {e!s}" self._logger.log("error", error_msg) raise RuntimeError(error_msg) from e @@ -1412,7 +1398,7 @@ class Crew(FlowTrackable, BaseModel): """Reset all available memory systems.""" memory_systems = self._get_memory_systems() - for memory_type, config in memory_systems.items(): + for config in memory_systems.values(): if (system := config.get("system")) is not None: name = config.get("name") try: @@ -1424,7 +1410,7 @@ class Crew(FlowTrackable, BaseModel): ) except Exception as e: raise RuntimeError( - f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {str(e)}" + f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {e!s}" ) from e def _reset_specific_memory(self, memory_type: str) -> None: @@ -1453,7 +1439,7 @@ class Crew(FlowTrackable, BaseModel): ) except Exception as e: raise RuntimeError( - f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {str(e)}" + f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {e!s}" ) from e def _get_memory_systems(self): @@ -1521,7 +1507,7 @@ class Crew(FlowTrackable, BaseModel): }, } - def reset_knowledge(self, knowledges: List[Knowledge]) -> None: + def reset_knowledge(self, knowledges: list[Knowledge]) -> None: """Reset crew and agent knowledge storage.""" for ks in knowledges: ks.reset() diff --git a/tests/responsibility/test_integration.py b/tests/responsibility/test_integration.py index 006197c72..6cc1d192f 100644 --- a/tests/responsibility/test_integration.py +++ b/tests/responsibility/test_integration.py @@ -2,33 +2,38 @@ Integration tests for the responsibility tracking system. """ -import pytest from unittest.mock import Mock +import pytest + from crewai.agents.agent_builder.base_agent import BaseAgent -from crewai.task import Task -from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement -from crewai.responsibility.system import ResponsibilitySystem from crewai.responsibility.assignment import AssignmentStrategy +from crewai.responsibility.models import ( + AgentCapability, + CapabilityType, + TaskRequirement, +) +from crewai.responsibility.system import ResponsibilitySystem +from crewai.task import Task class TestResponsibilitySystemIntegration: @pytest.fixture def system(self): return ResponsibilitySystem() - + @pytest.fixture def python_agent(self): agent = Mock(spec=BaseAgent) agent.role = "Python Developer" return agent - + @pytest.fixture def analysis_agent(self): agent = Mock(spec=BaseAgent) agent.role = "Data Analyst" return agent - + @pytest.fixture def python_capability(self): return AgentCapability( @@ -38,7 +43,7 @@ class TestResponsibilitySystemIntegration: confidence_score=0.8, keywords=["python", "programming", "development"] ) - + @pytest.fixture def analysis_capability(self): return AgentCapability( @@ -48,24 +53,24 @@ class TestResponsibilitySystemIntegration: confidence_score=0.9, keywords=["data", "analysis", "statistics"] ) - + @pytest.fixture def mock_task(self): task = Mock(spec=Task) task.id = "integration_test_task" task.description = "Complex data processing task requiring Python skills" return task - + def test_full_workflow(self, system, python_agent, python_capability, mock_task): """Test complete workflow from agent registration to task completion.""" - + system.register_agent(python_agent, [python_capability]) - + status = system.get_agent_status(python_agent) assert status["role"] == "Python Developer" assert len(status["capabilities"]) == 1 assert status["capabilities"][0]["name"] == "Python Programming" - + requirements = [ TaskRequirement( capability_name="Python Programming", @@ -74,16 +79,16 @@ class TestResponsibilitySystemIntegration: weight=1.0 ) ] - + assignment = system.assign_task_responsibility(mock_task, requirements) - + assert assignment is not None assert assignment.task_id == "integration_test_task" assert assignment.responsibility_score > 0.5 - + updated_status = system.get_agent_status(python_agent) assert updated_status["current_workload"] == 1 - + system.complete_task( agent=python_agent, task=mock_task, @@ -92,19 +97,19 @@ class TestResponsibilitySystemIntegration: quality_score=0.9, outcome_description="Task completed successfully" ) - + final_status = system.get_agent_status(python_agent) assert final_status["performance"]["total_tasks"] == 1 assert final_status["performance"]["success_rate"] == 1.0 assert final_status["current_workload"] == 0 # Should be decremented - - def test_multi_agent_scenario(self, system, python_agent, analysis_agent, + + def test_multi_agent_scenario(self, system, python_agent, analysis_agent, python_capability, analysis_capability, mock_task): """Test scenario with multiple agents and capabilities.""" - + system.register_agent(python_agent, [python_capability]) system.register_agent(analysis_agent, [analysis_capability]) - + requirements = [ TaskRequirement( capability_name="Python Programming", @@ -119,59 +124,55 @@ class TestResponsibilitySystemIntegration: weight=0.8 ) ] - + greedy_assignment = system.assign_task_responsibility( mock_task, requirements, AssignmentStrategy.GREEDY ) - + assert greedy_assignment is not None - + system.calculator.update_workload(python_agent, 5) - + balanced_assignment = system.assign_task_responsibility( mock_task, requirements, AssignmentStrategy.BALANCED ) - + assert balanced_assignment is not None - - def test_delegation_workflow(self, system, python_agent, analysis_agent, + + def test_delegation_workflow(self, system, python_agent, analysis_agent, python_capability, analysis_capability, mock_task): """Test task delegation between agents.""" - + system.register_agent(python_agent, [python_capability], supervisor=None) system.register_agent(analysis_agent, [analysis_capability], supervisor=python_agent) - + system.delegate_task( delegating_agent=python_agent, receiving_agent=analysis_agent, task=mock_task, reason="Analysis expertise required" ) - - python_status = system.get_agent_status(python_agent) + analysis_status = system.get_agent_status(analysis_agent) - + assert analysis_status["current_workload"] > 0 - + delegation_records = system.accountability.get_agent_records( python_agent, action_type="delegation" ) assert len(delegation_records) > 0 - - def test_performance_based_capability_adjustment(self, system, python_agent, + + def test_performance_based_capability_adjustment(self, system, python_agent, python_capability, mock_task): """Test that capabilities are adjusted based on performance.""" - + system.register_agent(python_agent, [python_capability]) - - initial_capabilities = system.hierarchy.get_agent_capabilities(python_agent) - initial_proficiency = initial_capabilities[0].proficiency_level - + for i in range(5): task = Mock(spec=Task) task.id = f"task_{i}" task.description = f"Task {i}" - + system.complete_task( agent=python_agent, task=task, @@ -179,37 +180,37 @@ class TestResponsibilitySystemIntegration: completion_time=1800.0, quality_score=0.9 ) - + updated_capabilities = system.hierarchy.get_agent_capabilities(python_agent) - + assert len(updated_capabilities) == 1 - - def test_system_overview_and_recommendations(self, system, python_agent, - analysis_agent, python_capability, + + def test_system_overview_and_recommendations(self, system, python_agent, + analysis_agent, python_capability, analysis_capability): """Test system overview and recommendation generation.""" - + system.register_agent(python_agent, [python_capability]) system.register_agent(analysis_agent, [analysis_capability]) - + overview = system.get_system_overview() - + assert overview["enabled"] is True assert overview["total_agents"] == 2 assert "capability_distribution" in overview assert "system_performance" in overview - + recommendations = system.generate_recommendations() - + assert isinstance(recommendations, list) - + def test_system_enable_disable(self, system, python_agent, python_capability, mock_task): """Test enabling and disabling the responsibility system.""" - + assert system.enabled is True - + system.register_agent(python_agent, [python_capability]) - + requirements = [ TaskRequirement( capability_name="Python Programming", @@ -218,36 +219,36 @@ class TestResponsibilitySystemIntegration: weight=1.0 ) ] - + assignment = system.assign_task_responsibility(mock_task, requirements) assert assignment is not None - + system.disable_system() assert system.enabled is False - + disabled_assignment = system.assign_task_responsibility(mock_task, requirements) assert disabled_assignment is None - + disabled_status = system.get_agent_status(python_agent) assert disabled_status == {} - + system.enable_system() assert system.enabled is True - + enabled_assignment = system.assign_task_responsibility(mock_task, requirements) assert enabled_assignment is not None - - def test_accountability_tracking_integration(self, system, python_agent, + + def test_accountability_tracking_integration(self, system, python_agent, python_capability, mock_task): """Test that all operations are properly logged for accountability.""" - + system.register_agent(python_agent, [python_capability]) - + registration_records = system.accountability.get_agent_records( python_agent, action_type="registration" ) assert len(registration_records) == 1 - + requirements = [ TaskRequirement( capability_name="Python Programming", @@ -256,14 +257,14 @@ class TestResponsibilitySystemIntegration: weight=1.0 ) ] - - assignment = system.assign_task_responsibility(mock_task, requirements) - + + system.assign_task_responsibility(mock_task, requirements) + assignment_records = system.accountability.get_agent_records( python_agent, action_type="task_assignment" ) assert len(assignment_records) == 1 - + system.complete_task( agent=python_agent, task=mock_task, @@ -271,14 +272,14 @@ class TestResponsibilitySystemIntegration: completion_time=1800.0, quality_score=0.9 ) - + completion_records = system.accountability.get_agent_records( python_agent, action_type="task_completion" ) assert len(completion_records) == 1 - + report = system.accountability.generate_accountability_report(agent=python_agent) - + assert report["total_records"] >= 3 # At least registration, assignment, completion assert "registration" in report["action_counts"] assert "task_assignment" in report["action_counts"]