fix: Resolve remaining lint issues - RET504, N806, B007, PERF102, F841

- Fix unnecessary assignments before return statements in crew.py
- Change VALID_TYPES to lowercase valid_types variable naming
- Optimize loop to use .values() instead of .items() when keys not needed
- Remove unused variables in test_integration.py
- All lint checks now pass locally

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-09-10 12:02:21 +00:00
parent 2c59748437
commit ac93c81076
2 changed files with 216 additions and 229 deletions

View File

@@ -3,26 +3,17 @@ import json
import re import re
import uuid import uuid
import warnings import warnings
from collections.abc import Callable
from concurrent.futures import Future from concurrent.futures import Future
from copy import copy as shallow_copy from copy import copy as shallow_copy
from hashlib import md5 from hashlib import md5
from typing import ( from typing import (
Any, Any,
Callable,
Dict,
List,
Optional,
Set,
Tuple,
Union,
cast, cast,
) )
from opentelemetry import baggage from opentelemetry import baggage
from opentelemetry.context import attach, detach from opentelemetry.context import attach, detach
from crewai.utilities.crew.models import CrewContext
from pydantic import ( from pydantic import (
UUID4, UUID4,
BaseModel, BaseModel,
@@ -39,6 +30,25 @@ from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache import CacheHandler from crewai.agents.cache import CacheHandler
from crewai.crews.crew_output import CrewOutput from crewai.crews.crew_output import CrewOutput
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from crewai.flow.flow_trackable import FlowTrackable from crewai.flow.flow_trackable import FlowTrackable
from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
@@ -57,29 +67,9 @@ from crewai.tools.base_tool import BaseTool, Tool
from crewai.types.usage_metrics import UsageMetrics from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
from crewai.utilities.crew.models import CrewContext
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled,
)
from crewai.utilities.formatter import ( from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks, aggregate_raw_outputs_from_tasks,
@@ -124,13 +114,13 @@ class Crew(FlowTrackable, BaseModel):
_logger: Logger = PrivateAttr() _logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr() _file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler()) _cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr() _short_term_memory: InstanceOf[ShortTermMemory] | None = PrivateAttr()
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr() _long_term_memory: InstanceOf[LongTermMemory] | None = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr() _entity_memory: InstanceOf[EntityMemory] | None = PrivateAttr()
_external_memory: Optional[InstanceOf[ExternalMemory]] = PrivateAttr() _external_memory: InstanceOf[ExternalMemory] | None = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False) _train: bool | None = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr() _train_iteration: int | None = PrivateAttr()
_inputs: Optional[Dict[str, Any]] = PrivateAttr(default=None) _inputs: dict[str, Any] | None = PrivateAttr(default=None)
_logging_color: str = PrivateAttr( _logging_color: str = PrivateAttr(
default="bold_purple", default="bold_purple",
) )
@@ -138,107 +128,107 @@ class Crew(FlowTrackable, BaseModel):
default_factory=TaskOutputStorageHandler default_factory=TaskOutputStorageHandler
) )
name: Optional[str] = Field(default="crew") name: str | None = Field(default="crew")
cache: bool = Field(default=True) cache: bool = Field(default=True)
tasks: List[Task] = Field(default_factory=list) tasks: list[Task] = Field(default_factory=list)
agents: List[BaseAgent] = Field(default_factory=list) agents: list[BaseAgent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential) process: Process = Field(default=Process.sequential)
verbose: bool = Field(default=False) verbose: bool = Field(default=False)
memory: bool = Field( memory: bool = Field(
default=False, default=False,
description="Whether the crew should use memory to store memories of it's execution", description="Whether the crew should use memory to store memories of it's execution",
) )
short_term_memory: Optional[InstanceOf[ShortTermMemory]] = Field( short_term_memory: InstanceOf[ShortTermMemory] | None = Field(
default=None, default=None,
description="An Instance of the ShortTermMemory to be used by the Crew", description="An Instance of the ShortTermMemory to be used by the Crew",
) )
long_term_memory: Optional[InstanceOf[LongTermMemory]] = Field( long_term_memory: InstanceOf[LongTermMemory] | None = Field(
default=None, default=None,
description="An Instance of the LongTermMemory to be used by the Crew", description="An Instance of the LongTermMemory to be used by the Crew",
) )
entity_memory: Optional[InstanceOf[EntityMemory]] = Field( entity_memory: InstanceOf[EntityMemory] | None = Field(
default=None, default=None,
description="An Instance of the EntityMemory to be used by the Crew", description="An Instance of the EntityMemory to be used by the Crew",
) )
external_memory: Optional[InstanceOf[ExternalMemory]] = Field( external_memory: InstanceOf[ExternalMemory] | None = Field(
default=None, default=None,
description="An Instance of the ExternalMemory to be used by the Crew", description="An Instance of the ExternalMemory to be used by the Crew",
) )
embedder: Optional[dict] = Field( embedder: dict | None = Field(
default=None, default=None,
description="Configuration for the embedder to be used for the crew.", description="Configuration for the embedder to be used for the crew.",
) )
usage_metrics: Optional[UsageMetrics] = Field( usage_metrics: UsageMetrics | None = Field(
default=None, default=None,
description="Metrics for the LLM usage during all tasks execution.", description="Metrics for the LLM usage during all tasks execution.",
) )
manager_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field( manager_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
description="Language model that will run the agent.", default=None description="Language model that will run the agent.", default=None
) )
manager_agent: Optional[BaseAgent] = Field( manager_agent: BaseAgent | None = Field(
description="Custom agent that will be used as manager.", default=None description="Custom agent that will be used as manager.", default=None
) )
function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field( function_calling_llm: str | InstanceOf[LLM] | Any | None = Field(
description="Language model that will run the agent.", default=None description="Language model that will run the agent.", default=None
) )
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None) config: Json | dict[str, Any] | None = Field(default=None)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True) id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
share_crew: Optional[bool] = Field(default=False) share_crew: bool | None = Field(default=False)
step_callback: Optional[Any] = Field( step_callback: Any | None = Field(
default=None, default=None,
description="Callback to be executed after each step for all agents execution.", description="Callback to be executed after each step for all agents execution.",
) )
task_callback: Optional[Any] = Field( task_callback: Any | None = Field(
default=None, default=None,
description="Callback to be executed after each task for all agents execution.", description="Callback to be executed after each task for all agents execution.",
) )
before_kickoff_callbacks: List[ before_kickoff_callbacks: list[
Callable[[Optional[Dict[str, Any]]], Optional[Dict[str, Any]]] Callable[[dict[str, Any] | None], dict[str, Any] | None]
] = Field( ] = Field(
default_factory=list, default_factory=list,
description="List of callbacks to be executed before crew kickoff. It may be used to adjust inputs before the crew is executed.", description="List of callbacks to be executed before crew kickoff. It may be used to adjust inputs before the crew is executed.",
) )
after_kickoff_callbacks: List[Callable[[CrewOutput], CrewOutput]] = Field( after_kickoff_callbacks: list[Callable[[CrewOutput], CrewOutput]] = Field(
default_factory=list, default_factory=list,
description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.", description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.",
) )
max_rpm: Optional[int] = Field( max_rpm: int | None = Field(
default=None, default=None,
description="Maximum number of requests per minute for the crew execution to be respected.", description="Maximum number of requests per minute for the crew execution to be respected.",
) )
prompt_file: Optional[str] = Field( prompt_file: str | None = Field(
default=None, default=None,
description="Path to the prompt json file to be used for the crew.", description="Path to the prompt json file to be used for the crew.",
) )
output_log_file: Optional[Union[bool, str]] = Field( output_log_file: bool | str | None = Field(
default=None, default=None,
description="Path to the log file to be saved", description="Path to the log file to be saved",
) )
planning: Optional[bool] = Field( planning: bool | None = Field(
default=False, default=False,
description="Plan the crew execution and add the plan to the crew.", description="Plan the crew execution and add the plan to the crew.",
) )
planning_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field( planning_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
default=None, default=None,
description="Language model that will run the AgentPlanner if planning is True.", description="Language model that will run the AgentPlanner if planning is True.",
) )
task_execution_output_json_files: Optional[List[str]] = Field( task_execution_output_json_files: list[str] | None = Field(
default=None, default=None,
description="List of file paths for task execution JSON files.", description="List of file paths for task execution JSON files.",
) )
execution_logs: List[Dict[str, Any]] = Field( execution_logs: list[dict[str, Any]] = Field(
default=[], default=[],
description="List of execution logs for tasks", description="List of execution logs for tasks",
) )
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field( knowledge_sources: list[BaseKnowledgeSource] | None = Field(
default=None, default=None,
description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.", description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.",
) )
chat_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field( chat_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
default=None, default=None,
description="LLM used to handle chatting with the crew.", description="LLM used to handle chatting with the crew.",
) )
knowledge: Optional[Knowledge] = Field( knowledge: Knowledge | None = Field(
default=None, default=None,
description="Knowledge for the crew.", description="Knowledge for the crew.",
) )
@@ -246,18 +236,18 @@ class Crew(FlowTrackable, BaseModel):
default_factory=SecurityConfig, default_factory=SecurityConfig,
description="Security configuration for the crew, including fingerprinting.", description="Security configuration for the crew, including fingerprinting.",
) )
token_usage: Optional[UsageMetrics] = Field( token_usage: UsageMetrics | None = Field(
default=None, default=None,
description="Metrics for the LLM usage during all tasks execution.", description="Metrics for the LLM usage during all tasks execution.",
) )
tracing: Optional[bool] = Field( tracing: bool | None = Field(
default=False, default=False,
description="Whether to enable tracing for the crew.", description="Whether to enable tracing for the crew.",
) )
@field_validator("id", mode="before") @field_validator("id", mode="before")
@classmethod @classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: def _deny_user_set_id(cls, v: UUID4 | None) -> None:
"""Prevent manual setting of the 'id' field by users.""" """Prevent manual setting of the 'id' field by users."""
if v: if v:
raise PydanticCustomError( raise PydanticCustomError(
@@ -267,8 +257,8 @@ class Crew(FlowTrackable, BaseModel):
@field_validator("config", mode="before") @field_validator("config", mode="before")
@classmethod @classmethod
def check_config_type( def check_config_type(
cls, v: Union[Json, Dict[str, Any]] cls, v: Json | dict[str, Any]
) -> Union[Json, Dict[str, Any]]: ) -> Json | dict[str, Any]:
"""Validates that the config is a valid type. """Validates that the config is a valid type.
Args: Args:
v: The config to be validated. v: The config to be validated.
@@ -323,7 +313,7 @@ class Crew(FlowTrackable, BaseModel):
def create_crew_memory(self) -> "Crew": def create_crew_memory(self) -> "Crew":
"""Initialize private memory attributes.""" """Initialize private memory attributes."""
self._external_memory = ( self._external_memory = (
# External memory doesnt support a default value since it was designed to be managed entirely externally # External memory doesn't support a default value since it was designed to be managed entirely externally
self.external_memory.set_crew(self) if self.external_memory else None self.external_memory.set_crew(self) if self.external_memory else None
) )
@@ -513,7 +503,7 @@ class Crew(FlowTrackable, BaseModel):
@property @property
def key(self) -> str: def key(self) -> str:
source: List[str] = [agent.key for agent in self.agents] + [ source: list[str] = [agent.key for agent in self.agents] + [
task.key for task in self.tasks task.key for task in self.tasks
] ]
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest() return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@@ -541,7 +531,7 @@ class Crew(FlowTrackable, BaseModel):
self.agents = [Agent(**agent) for agent in self.config["agents"]] self.agents = [Agent(**agent) for agent in self.config["agents"]]
self.tasks = [self._create_task(task) for task in self.config["tasks"]] self.tasks = [self._create_task(task) for task in self.config["tasks"]]
def _create_task(self, task_config: Dict[str, Any]) -> Task: def _create_task(self, task_config: dict[str, Any]) -> Task:
"""Creates a task instance from its configuration. """Creates a task instance from its configuration.
Args: Args:
@@ -570,7 +560,7 @@ class Crew(FlowTrackable, BaseModel):
CrewTrainingHandler(filename).initialize_file() CrewTrainingHandler(filename).initialize_file()
def train( def train(
self, n_iterations: int, filename: str, inputs: Optional[Dict[str, Any]] = None self, n_iterations: int, filename: str, inputs: dict[str, Any] | None = None
) -> None: ) -> None:
"""Trains the crew for a given number of iterations.""" """Trains the crew for a given number of iterations."""
inputs = inputs or {} inputs = inputs or {}
@@ -622,7 +612,7 @@ class Crew(FlowTrackable, BaseModel):
def kickoff( def kickoff(
self, self,
inputs: Optional[Dict[str, Any]] = None, inputs: dict[str, Any] | None = None,
) -> CrewOutput: ) -> CrewOutput:
ctx = baggage.set_baggage( ctx = baggage.set_baggage(
"crew_context", CrewContext(id=str(self.id), key=self.key) "crew_context", CrewContext(id=str(self.id), key=self.key)
@@ -693,9 +683,9 @@ class Crew(FlowTrackable, BaseModel):
finally: finally:
detach(token) detach(token)
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: def kickoff_for_each(self, inputs: list[dict[str, Any]]) -> list[CrewOutput]:
"""Executes the Crew's workflow for each input in the list and aggregates results.""" """Executes the Crew's workflow for each input in the list and aggregates results."""
results: List[CrewOutput] = [] results: list[CrewOutput] = []
# Initialize the parent crew's usage metrics # Initialize the parent crew's usage metrics
total_usage_metrics = UsageMetrics() total_usage_metrics = UsageMetrics()
@@ -715,13 +705,13 @@ class Crew(FlowTrackable, BaseModel):
return results return results
async def kickoff_async( async def kickoff_async(
self, inputs: Optional[Dict[str, Any]] = None self, inputs: dict[str, Any] | None = None
) -> CrewOutput: ) -> CrewOutput:
"""Asynchronous kickoff method to start the crew execution.""" """Asynchronous kickoff method to start the crew execution."""
inputs = inputs or {} inputs = inputs or {}
return await asyncio.to_thread(self.kickoff, inputs) return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[CrewOutput]: async def kickoff_for_each_async(self, inputs: list[dict]) -> list[CrewOutput]:
crew_copies = [self.copy() for _ in inputs] crew_copies = [self.copy() for _ in inputs]
async def run_crew(crew, input_data): async def run_crew(crew, input_data):
@@ -750,7 +740,7 @@ class Crew(FlowTrackable, BaseModel):
tasks=self.tasks, planning_agent_llm=self.planning_llm tasks=self.tasks, planning_agent_llm=self.planning_llm
)._handle_crew_planning() )._handle_crew_planning()
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task): for task, step_plan in zip(self.tasks, result.list_of_plans_per_task, strict=False):
task.description += step_plan.plan task.description += step_plan.plan
def _store_execution_log( def _store_execution_log(
@@ -789,10 +779,10 @@ class Crew(FlowTrackable, BaseModel):
def _run_hierarchical_process(self) -> CrewOutput: def _run_hierarchical_process(self) -> CrewOutput:
"""Creates and assigns a manager agent to make sure the crew completes the tasks.""" """Creates and assigns a manager agent to make sure the crew completes the tasks."""
self._create_manager_agent() self._create_manager_agent()
if self.manager_agent and self.responsibility_system: if self.manager_agent and self.responsibility_system:
self.manager_agent.set_responsibility_system(self.responsibility_system) self.manager_agent.set_responsibility_system(self.responsibility_system)
return self._execute_tasks(self.tasks) return self._execute_tasks(self.tasks)
def _create_manager_agent(self): def _create_manager_agent(self):
@@ -822,8 +812,8 @@ class Crew(FlowTrackable, BaseModel):
def _execute_tasks( def _execute_tasks(
self, self,
tasks: List[Task], tasks: list[Task],
start_index: Optional[int] = 0, start_index: int | None = 0,
was_replayed: bool = False, was_replayed: bool = False,
) -> CrewOutput: ) -> CrewOutput:
"""Executes tasks sequentially and returns the final output. """Executes tasks sequentially and returns the final output.
@@ -836,9 +826,9 @@ class Crew(FlowTrackable, BaseModel):
CrewOutput: Final output of the crew CrewOutput: Final output of the crew
""" """
task_outputs: List[TaskOutput] = [] task_outputs: list[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput], int]] = [] futures: list[tuple[Task, Future[TaskOutput], int]] = []
last_sync_output: Optional[TaskOutput] = None last_sync_output: TaskOutput | None = None
for task_index, task in enumerate(tasks): for task_index, task in enumerate(tasks):
if start_index is not None and task_index < start_index: if start_index is not None and task_index < start_index:
@@ -862,7 +852,7 @@ class Crew(FlowTrackable, BaseModel):
tools_for_task = self._prepare_tools( tools_for_task = self._prepare_tools(
agent_to_use, agent_to_use,
task, task,
cast(Union[List[Tool], List[BaseTool]], tools_for_task), cast(list[Tool] | list[BaseTool], tools_for_task),
) )
self._log_task_start(task, agent_to_use.role) self._log_task_start(task, agent_to_use.role)
@@ -882,7 +872,7 @@ class Crew(FlowTrackable, BaseModel):
future = task.execute_async( future = task.execute_async(
agent=agent_to_use, agent=agent_to_use,
context=context, context=context,
tools=cast(List[BaseTool], tools_for_task), tools=cast(list[BaseTool], tools_for_task),
) )
futures.append((task, future, task_index)) futures.append((task, future, task_index))
else: else:
@@ -894,7 +884,7 @@ class Crew(FlowTrackable, BaseModel):
task_output = task.execute_sync( task_output = task.execute_sync(
agent=agent_to_use, agent=agent_to_use,
context=context, context=context,
tools=cast(List[BaseTool], tools_for_task), tools=cast(list[BaseTool], tools_for_task),
) )
task_outputs.append(task_output) task_outputs.append(task_output)
self._process_task_result(task, task_output) self._process_task_result(task, task_output)
@@ -908,11 +898,11 @@ class Crew(FlowTrackable, BaseModel):
def _handle_conditional_task( def _handle_conditional_task(
self, self,
task: ConditionalTask, task: ConditionalTask,
task_outputs: List[TaskOutput], task_outputs: list[TaskOutput],
futures: List[Tuple[Task, Future[TaskOutput], int]], futures: list[tuple[Task, Future[TaskOutput], int]],
task_index: int, task_index: int,
was_replayed: bool, was_replayed: bool,
) -> Optional[TaskOutput]: ) -> TaskOutput | None:
if futures: if futures:
task_outputs = self._process_async_tasks(futures, was_replayed) task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear() futures.clear()
@@ -932,8 +922,8 @@ class Crew(FlowTrackable, BaseModel):
return None return None
def _prepare_tools( def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: Union[List[Tool], List[BaseTool]] self, agent: BaseAgent, task: Task, tools: list[Tool] | list[BaseTool]
) -> List[BaseTool]: ) -> list[BaseTool]:
# Add delegation tools if agent allows delegation # Add delegation tools if agent allows delegation
if hasattr(agent, "allow_delegation") and getattr( if hasattr(agent, "allow_delegation") and getattr(
agent, "allow_delegation", False agent, "allow_delegation", False
@@ -963,21 +953,21 @@ class Crew(FlowTrackable, BaseModel):
tools = self._add_multimodal_tools(agent, tools) tools = self._add_multimodal_tools(agent, tools)
# Return a List[BaseTool] which is compatible with both Task.execute_sync and Task.execute_async # Return a List[BaseTool] which is compatible with both Task.execute_sync and Task.execute_async
return cast(List[BaseTool], tools) return cast(list[BaseTool], tools)
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]: def _get_agent_to_use(self, task: Task) -> BaseAgent | None:
if self.process == Process.hierarchical: if self.process == Process.hierarchical:
return self.manager_agent return self.manager_agent
return task.agent return task.agent
def _merge_tools( def _merge_tools(
self, self,
existing_tools: Union[List[Tool], List[BaseTool]], existing_tools: list[Tool] | list[BaseTool],
new_tools: Union[List[Tool], List[BaseTool]], new_tools: list[Tool] | list[BaseTool],
) -> List[BaseTool]: ) -> list[BaseTool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name.""" """Merge new tools into existing tools list, avoiding duplicates by tool name."""
if not new_tools: if not new_tools:
return cast(List[BaseTool], existing_tools) return cast(list[BaseTool], existing_tools)
# Create mapping of tool names to new tools # Create mapping of tool names to new tools
new_tool_map = {tool.name: tool for tool in new_tools} new_tool_map = {tool.name: tool for tool in new_tools}
@@ -988,41 +978,41 @@ class Crew(FlowTrackable, BaseModel):
# Add all new tools # Add all new tools
tools.extend(new_tools) tools.extend(new_tools)
return cast(List[BaseTool], tools) return cast(list[BaseTool], tools)
def _inject_delegation_tools( def _inject_delegation_tools(
self, self,
tools: Union[List[Tool], List[BaseTool]], tools: list[Tool] | list[BaseTool],
task_agent: BaseAgent, task_agent: BaseAgent,
agents: List[BaseAgent], agents: list[BaseAgent],
) -> List[BaseTool]: ) -> list[BaseTool]:
if hasattr(task_agent, "get_delegation_tools"): if hasattr(task_agent, "get_delegation_tools"):
delegation_tools = task_agent.get_delegation_tools(agents) delegation_tools = task_agent.get_delegation_tools(agents)
# Cast delegation_tools to the expected type for _merge_tools # Cast delegation_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], delegation_tools)) return self._merge_tools(tools, cast(list[BaseTool], delegation_tools))
return cast(List[BaseTool], tools) return cast(list[BaseTool], tools)
def _add_multimodal_tools( def _add_multimodal_tools(
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]] self, agent: BaseAgent, tools: list[Tool] | list[BaseTool]
) -> List[BaseTool]: ) -> list[BaseTool]:
if hasattr(agent, "get_multimodal_tools"): if hasattr(agent, "get_multimodal_tools"):
multimodal_tools = agent.get_multimodal_tools() multimodal_tools = agent.get_multimodal_tools()
# Cast multimodal_tools to the expected type for _merge_tools # Cast multimodal_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], multimodal_tools)) return self._merge_tools(tools, cast(list[BaseTool], multimodal_tools))
return cast(List[BaseTool], tools) return cast(list[BaseTool], tools)
def _add_code_execution_tools( def _add_code_execution_tools(
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]] self, agent: BaseAgent, tools: list[Tool] | list[BaseTool]
) -> List[BaseTool]: ) -> list[BaseTool]:
if hasattr(agent, "get_code_execution_tools"): if hasattr(agent, "get_code_execution_tools"):
code_tools = agent.get_code_execution_tools() code_tools = agent.get_code_execution_tools()
# Cast code_tools to the expected type for _merge_tools # Cast code_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], code_tools)) return self._merge_tools(tools, cast(list[BaseTool], code_tools))
return cast(List[BaseTool], tools) return cast(list[BaseTool], tools)
def _add_delegation_tools( def _add_delegation_tools(
self, task: Task, tools: Union[List[Tool], List[BaseTool]] self, task: Task, tools: list[Tool] | list[BaseTool]
) -> List[BaseTool]: ) -> list[BaseTool]:
agents_for_delegation = [agent for agent in self.agents if agent != task.agent] agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent: if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools: if not tools:
@@ -1030,7 +1020,7 @@ class Crew(FlowTrackable, BaseModel):
tools = self._inject_delegation_tools( tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation tools, task.agent, agents_for_delegation
) )
return cast(List[BaseTool], tools) return cast(list[BaseTool], tools)
def _log_task_start(self, task: Task, role: str = "None"): def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file: if self.output_log_file:
@@ -1039,8 +1029,8 @@ class Crew(FlowTrackable, BaseModel):
) )
def _update_manager_tools( def _update_manager_tools(
self, task: Task, tools: Union[List[Tool], List[BaseTool]] self, task: Task, tools: list[Tool] | list[BaseTool]
) -> List[BaseTool]: ) -> list[BaseTool]:
if self.manager_agent: if self.manager_agent:
if task.agent: if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent]) tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
@@ -1048,18 +1038,17 @@ class Crew(FlowTrackable, BaseModel):
tools = self._inject_delegation_tools( tools = self._inject_delegation_tools(
tools, self.manager_agent, self.agents tools, self.manager_agent, self.agents
) )
return cast(List[BaseTool], tools) return cast(list[BaseTool], tools)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]) -> str: def _get_context(self, task: Task, task_outputs: list[TaskOutput]) -> str:
if not task.context: if not task.context:
return "" return ""
context = ( return (
aggregate_raw_outputs_from_task_outputs(task_outputs) aggregate_raw_outputs_from_task_outputs(task_outputs)
if task.context is NOT_SPECIFIED if task.context is NOT_SPECIFIED
else aggregate_raw_outputs_from_tasks(task.context) else aggregate_raw_outputs_from_tasks(task.context)
) )
return context
def _process_task_result(self, task: Task, output: TaskOutput) -> None: def _process_task_result(self, task: Task, output: TaskOutput) -> None:
role = task.agent.role if task.agent is not None else "None" role = task.agent.role if task.agent is not None else "None"
@@ -1072,7 +1061,7 @@ class Crew(FlowTrackable, BaseModel):
output=output.raw, output=output.raw,
) )
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput: def _create_crew_output(self, task_outputs: list[TaskOutput]) -> CrewOutput:
if not task_outputs: if not task_outputs:
raise ValueError("No task outputs available to create crew output.") raise ValueError("No task outputs available to create crew output.")
@@ -1103,10 +1092,10 @@ class Crew(FlowTrackable, BaseModel):
def _process_async_tasks( def _process_async_tasks(
self, self,
futures: List[Tuple[Task, Future[TaskOutput], int]], futures: list[tuple[Task, Future[TaskOutput], int]],
was_replayed: bool = False, was_replayed: bool = False,
) -> List[TaskOutput]: ) -> list[TaskOutput]:
task_outputs: List[TaskOutput] = [] task_outputs: list[TaskOutput] = []
for future_task, future, task_index in futures: for future_task, future, task_index in futures:
task_output = future.result() task_output = future.result()
task_outputs.append(task_output) task_outputs.append(task_output)
@@ -1117,8 +1106,8 @@ class Crew(FlowTrackable, BaseModel):
return task_outputs return task_outputs
def _find_task_index( def _find_task_index(
self, task_id: str, stored_outputs: List[Any] self, task_id: str, stored_outputs: list[Any]
) -> Optional[int]: ) -> int | None:
return next( return next(
( (
index index
@@ -1129,7 +1118,7 @@ class Crew(FlowTrackable, BaseModel):
) )
def replay( def replay(
self, task_id: str, inputs: Optional[Dict[str, Any]] = None self, task_id: str, inputs: dict[str, Any] | None = None
) -> CrewOutput: ) -> CrewOutput:
stored_outputs = self._task_output_handler.load() stored_outputs = self._task_output_handler.load()
if not stored_outputs: if not stored_outputs:
@@ -1166,19 +1155,18 @@ class Crew(FlowTrackable, BaseModel):
self.tasks[i].output = task_output self.tasks[i].output = task_output
self._logging_color = "bold_blue" self._logging_color = "bold_blue"
result = self._execute_tasks(self.tasks, start_index, True) return self._execute_tasks(self.tasks, start_index, True)
return result
def query_knowledge( def query_knowledge(
self, query: List[str], results_limit: int = 3, score_threshold: float = 0.35 self, query: list[str], results_limit: int = 3, score_threshold: float = 0.35
) -> Union[List[Dict[str, Any]], None]: ) -> list[dict[str, Any]] | None:
if self.knowledge: if self.knowledge:
return self.knowledge.query( return self.knowledge.query(
query, results_limit=results_limit, score_threshold=score_threshold query, results_limit=results_limit, score_threshold=score_threshold
) )
return None return None
def fetch_inputs(self) -> Set[str]: def fetch_inputs(self) -> set[str]:
""" """
Gathers placeholders (e.g., {something}) referenced in tasks or agents. Gathers placeholders (e.g., {something}) referenced in tasks or agents.
Scans each task's 'description' + 'expected_output', and each agent's Scans each task's 'description' + 'expected_output', and each agent's
@@ -1187,7 +1175,7 @@ class Crew(FlowTrackable, BaseModel):
Returns a set of all discovered placeholder names. Returns a set of all discovered placeholder names.
""" """
placeholder_pattern = re.compile(r"\{(.+?)\}") placeholder_pattern = re.compile(r"\{(.+?)\}")
required_inputs: Set[str] = set() required_inputs: set[str] = set()
# Scan tasks for inputs # Scan tasks for inputs
for task in self.tasks: for task in self.tasks:
@@ -1245,7 +1233,7 @@ class Crew(FlowTrackable, BaseModel):
cloned_tasks.append(cloned_task) cloned_tasks.append(cloned_task)
task_mapping[task.key] = cloned_task task_mapping[task.key] = cloned_task
for cloned_task, original_task in zip(cloned_tasks, self.tasks): for cloned_task, original_task in zip(cloned_tasks, self.tasks, strict=False):
if isinstance(original_task.context, list): if isinstance(original_task.context, list):
cloned_context = [ cloned_context = [
task_mapping[context_task.key] task_mapping[context_task.key]
@@ -1271,7 +1259,7 @@ class Crew(FlowTrackable, BaseModel):
copied_data.pop("agents", None) copied_data.pop("agents", None)
copied_data.pop("tasks", None) copied_data.pop("tasks", None)
copied_crew = Crew( return Crew(
**copied_data, **copied_data,
agents=cloned_agents, agents=cloned_agents,
tasks=cloned_tasks, tasks=cloned_tasks,
@@ -1281,15 +1269,13 @@ class Crew(FlowTrackable, BaseModel):
manager_llm=manager_llm, manager_llm=manager_llm,
) )
return copied_crew
def _set_tasks_callbacks(self) -> None: def _set_tasks_callbacks(self) -> None:
"""Sets callback for every task suing task_callback""" """Sets callback for every task suing task_callback"""
for task in self.tasks: for task in self.tasks:
if not task.callback: if not task.callback:
task.callback = self.task_callback task.callback = self.task_callback
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None: def _interpolate_inputs(self, inputs: dict[str, Any]) -> None:
"""Interpolates the inputs in the tasks and agents.""" """Interpolates the inputs in the tasks and agents."""
[ [
task.interpolate_inputs_and_add_conversation_history( task.interpolate_inputs_and_add_conversation_history(
@@ -1322,8 +1308,8 @@ class Crew(FlowTrackable, BaseModel):
def test( def test(
self, self,
n_iterations: int, n_iterations: int,
eval_llm: Union[str, InstanceOf[BaseLLM]], eval_llm: str | InstanceOf[BaseLLM],
inputs: Optional[Dict[str, Any]] = None, inputs: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures.""" """Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
try: try:
@@ -1379,7 +1365,7 @@ class Crew(FlowTrackable, BaseModel):
ValueError: If an invalid command type is provided. ValueError: If an invalid command type is provided.
RuntimeError: If memory reset operation fails. RuntimeError: If memory reset operation fails.
""" """
VALID_TYPES = frozenset( valid_types = frozenset(
[ [
"long", "long",
"short", "short",
@@ -1392,9 +1378,9 @@ class Crew(FlowTrackable, BaseModel):
] ]
) )
if command_type not in VALID_TYPES: if command_type not in valid_types:
raise ValueError( raise ValueError(
f"Invalid command type. Must be one of: {', '.join(sorted(VALID_TYPES))}" f"Invalid command type. Must be one of: {', '.join(sorted(valid_types))}"
) )
try: try:
@@ -1404,7 +1390,7 @@ class Crew(FlowTrackable, BaseModel):
self._reset_specific_memory(command_type) self._reset_specific_memory(command_type)
except Exception as e: except Exception as e:
error_msg = f"Failed to reset {command_type} memory: {str(e)}" error_msg = f"Failed to reset {command_type} memory: {e!s}"
self._logger.log("error", error_msg) self._logger.log("error", error_msg)
raise RuntimeError(error_msg) from e raise RuntimeError(error_msg) from e
@@ -1412,7 +1398,7 @@ class Crew(FlowTrackable, BaseModel):
"""Reset all available memory systems.""" """Reset all available memory systems."""
memory_systems = self._get_memory_systems() memory_systems = self._get_memory_systems()
for memory_type, config in memory_systems.items(): for config in memory_systems.values():
if (system := config.get("system")) is not None: if (system := config.get("system")) is not None:
name = config.get("name") name = config.get("name")
try: try:
@@ -1424,7 +1410,7 @@ class Crew(FlowTrackable, BaseModel):
) )
except Exception as e: except Exception as e:
raise RuntimeError( raise RuntimeError(
f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {str(e)}" f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {e!s}"
) from e ) from e
def _reset_specific_memory(self, memory_type: str) -> None: def _reset_specific_memory(self, memory_type: str) -> None:
@@ -1453,7 +1439,7 @@ class Crew(FlowTrackable, BaseModel):
) )
except Exception as e: except Exception as e:
raise RuntimeError( raise RuntimeError(
f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {str(e)}" f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {e!s}"
) from e ) from e
def _get_memory_systems(self): def _get_memory_systems(self):
@@ -1521,7 +1507,7 @@ class Crew(FlowTrackable, BaseModel):
}, },
} }
def reset_knowledge(self, knowledges: List[Knowledge]) -> None: def reset_knowledge(self, knowledges: list[Knowledge]) -> None:
"""Reset crew and agent knowledge storage.""" """Reset crew and agent knowledge storage."""
for ks in knowledges: for ks in knowledges:
ks.reset() ks.reset()

View File

@@ -2,33 +2,38 @@
Integration tests for the responsibility tracking system. Integration tests for the responsibility tracking system.
""" """
import pytest
from unittest.mock import Mock from unittest.mock import Mock
import pytest
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement
from crewai.responsibility.system import ResponsibilitySystem
from crewai.responsibility.assignment import AssignmentStrategy from crewai.responsibility.assignment import AssignmentStrategy
from crewai.responsibility.models import (
AgentCapability,
CapabilityType,
TaskRequirement,
)
from crewai.responsibility.system import ResponsibilitySystem
from crewai.task import Task
class TestResponsibilitySystemIntegration: class TestResponsibilitySystemIntegration:
@pytest.fixture @pytest.fixture
def system(self): def system(self):
return ResponsibilitySystem() return ResponsibilitySystem()
@pytest.fixture @pytest.fixture
def python_agent(self): def python_agent(self):
agent = Mock(spec=BaseAgent) agent = Mock(spec=BaseAgent)
agent.role = "Python Developer" agent.role = "Python Developer"
return agent return agent
@pytest.fixture @pytest.fixture
def analysis_agent(self): def analysis_agent(self):
agent = Mock(spec=BaseAgent) agent = Mock(spec=BaseAgent)
agent.role = "Data Analyst" agent.role = "Data Analyst"
return agent return agent
@pytest.fixture @pytest.fixture
def python_capability(self): def python_capability(self):
return AgentCapability( return AgentCapability(
@@ -38,7 +43,7 @@ class TestResponsibilitySystemIntegration:
confidence_score=0.8, confidence_score=0.8,
keywords=["python", "programming", "development"] keywords=["python", "programming", "development"]
) )
@pytest.fixture @pytest.fixture
def analysis_capability(self): def analysis_capability(self):
return AgentCapability( return AgentCapability(
@@ -48,24 +53,24 @@ class TestResponsibilitySystemIntegration:
confidence_score=0.9, confidence_score=0.9,
keywords=["data", "analysis", "statistics"] keywords=["data", "analysis", "statistics"]
) )
@pytest.fixture @pytest.fixture
def mock_task(self): def mock_task(self):
task = Mock(spec=Task) task = Mock(spec=Task)
task.id = "integration_test_task" task.id = "integration_test_task"
task.description = "Complex data processing task requiring Python skills" task.description = "Complex data processing task requiring Python skills"
return task return task
def test_full_workflow(self, system, python_agent, python_capability, mock_task): def test_full_workflow(self, system, python_agent, python_capability, mock_task):
"""Test complete workflow from agent registration to task completion.""" """Test complete workflow from agent registration to task completion."""
system.register_agent(python_agent, [python_capability]) system.register_agent(python_agent, [python_capability])
status = system.get_agent_status(python_agent) status = system.get_agent_status(python_agent)
assert status["role"] == "Python Developer" assert status["role"] == "Python Developer"
assert len(status["capabilities"]) == 1 assert len(status["capabilities"]) == 1
assert status["capabilities"][0]["name"] == "Python Programming" assert status["capabilities"][0]["name"] == "Python Programming"
requirements = [ requirements = [
TaskRequirement( TaskRequirement(
capability_name="Python Programming", capability_name="Python Programming",
@@ -74,16 +79,16 @@ class TestResponsibilitySystemIntegration:
weight=1.0 weight=1.0
) )
] ]
assignment = system.assign_task_responsibility(mock_task, requirements) assignment = system.assign_task_responsibility(mock_task, requirements)
assert assignment is not None assert assignment is not None
assert assignment.task_id == "integration_test_task" assert assignment.task_id == "integration_test_task"
assert assignment.responsibility_score > 0.5 assert assignment.responsibility_score > 0.5
updated_status = system.get_agent_status(python_agent) updated_status = system.get_agent_status(python_agent)
assert updated_status["current_workload"] == 1 assert updated_status["current_workload"] == 1
system.complete_task( system.complete_task(
agent=python_agent, agent=python_agent,
task=mock_task, task=mock_task,
@@ -92,19 +97,19 @@ class TestResponsibilitySystemIntegration:
quality_score=0.9, quality_score=0.9,
outcome_description="Task completed successfully" outcome_description="Task completed successfully"
) )
final_status = system.get_agent_status(python_agent) final_status = system.get_agent_status(python_agent)
assert final_status["performance"]["total_tasks"] == 1 assert final_status["performance"]["total_tasks"] == 1
assert final_status["performance"]["success_rate"] == 1.0 assert final_status["performance"]["success_rate"] == 1.0
assert final_status["current_workload"] == 0 # Should be decremented assert final_status["current_workload"] == 0 # Should be decremented
def test_multi_agent_scenario(self, system, python_agent, analysis_agent, def test_multi_agent_scenario(self, system, python_agent, analysis_agent,
python_capability, analysis_capability, mock_task): python_capability, analysis_capability, mock_task):
"""Test scenario with multiple agents and capabilities.""" """Test scenario with multiple agents and capabilities."""
system.register_agent(python_agent, [python_capability]) system.register_agent(python_agent, [python_capability])
system.register_agent(analysis_agent, [analysis_capability]) system.register_agent(analysis_agent, [analysis_capability])
requirements = [ requirements = [
TaskRequirement( TaskRequirement(
capability_name="Python Programming", capability_name="Python Programming",
@@ -119,59 +124,55 @@ class TestResponsibilitySystemIntegration:
weight=0.8 weight=0.8
) )
] ]
greedy_assignment = system.assign_task_responsibility( greedy_assignment = system.assign_task_responsibility(
mock_task, requirements, AssignmentStrategy.GREEDY mock_task, requirements, AssignmentStrategy.GREEDY
) )
assert greedy_assignment is not None assert greedy_assignment is not None
system.calculator.update_workload(python_agent, 5) system.calculator.update_workload(python_agent, 5)
balanced_assignment = system.assign_task_responsibility( balanced_assignment = system.assign_task_responsibility(
mock_task, requirements, AssignmentStrategy.BALANCED mock_task, requirements, AssignmentStrategy.BALANCED
) )
assert balanced_assignment is not None assert balanced_assignment is not None
def test_delegation_workflow(self, system, python_agent, analysis_agent, def test_delegation_workflow(self, system, python_agent, analysis_agent,
python_capability, analysis_capability, mock_task): python_capability, analysis_capability, mock_task):
"""Test task delegation between agents.""" """Test task delegation between agents."""
system.register_agent(python_agent, [python_capability], supervisor=None) system.register_agent(python_agent, [python_capability], supervisor=None)
system.register_agent(analysis_agent, [analysis_capability], supervisor=python_agent) system.register_agent(analysis_agent, [analysis_capability], supervisor=python_agent)
system.delegate_task( system.delegate_task(
delegating_agent=python_agent, delegating_agent=python_agent,
receiving_agent=analysis_agent, receiving_agent=analysis_agent,
task=mock_task, task=mock_task,
reason="Analysis expertise required" reason="Analysis expertise required"
) )
python_status = system.get_agent_status(python_agent)
analysis_status = system.get_agent_status(analysis_agent) analysis_status = system.get_agent_status(analysis_agent)
assert analysis_status["current_workload"] > 0 assert analysis_status["current_workload"] > 0
delegation_records = system.accountability.get_agent_records( delegation_records = system.accountability.get_agent_records(
python_agent, action_type="delegation" python_agent, action_type="delegation"
) )
assert len(delegation_records) > 0 assert len(delegation_records) > 0
def test_performance_based_capability_adjustment(self, system, python_agent, def test_performance_based_capability_adjustment(self, system, python_agent,
python_capability, mock_task): python_capability, mock_task):
"""Test that capabilities are adjusted based on performance.""" """Test that capabilities are adjusted based on performance."""
system.register_agent(python_agent, [python_capability]) system.register_agent(python_agent, [python_capability])
initial_capabilities = system.hierarchy.get_agent_capabilities(python_agent)
initial_proficiency = initial_capabilities[0].proficiency_level
for i in range(5): for i in range(5):
task = Mock(spec=Task) task = Mock(spec=Task)
task.id = f"task_{i}" task.id = f"task_{i}"
task.description = f"Task {i}" task.description = f"Task {i}"
system.complete_task( system.complete_task(
agent=python_agent, agent=python_agent,
task=task, task=task,
@@ -179,37 +180,37 @@ class TestResponsibilitySystemIntegration:
completion_time=1800.0, completion_time=1800.0,
quality_score=0.9 quality_score=0.9
) )
updated_capabilities = system.hierarchy.get_agent_capabilities(python_agent) updated_capabilities = system.hierarchy.get_agent_capabilities(python_agent)
assert len(updated_capabilities) == 1 assert len(updated_capabilities) == 1
def test_system_overview_and_recommendations(self, system, python_agent, def test_system_overview_and_recommendations(self, system, python_agent,
analysis_agent, python_capability, analysis_agent, python_capability,
analysis_capability): analysis_capability):
"""Test system overview and recommendation generation.""" """Test system overview and recommendation generation."""
system.register_agent(python_agent, [python_capability]) system.register_agent(python_agent, [python_capability])
system.register_agent(analysis_agent, [analysis_capability]) system.register_agent(analysis_agent, [analysis_capability])
overview = system.get_system_overview() overview = system.get_system_overview()
assert overview["enabled"] is True assert overview["enabled"] is True
assert overview["total_agents"] == 2 assert overview["total_agents"] == 2
assert "capability_distribution" in overview assert "capability_distribution" in overview
assert "system_performance" in overview assert "system_performance" in overview
recommendations = system.generate_recommendations() recommendations = system.generate_recommendations()
assert isinstance(recommendations, list) assert isinstance(recommendations, list)
def test_system_enable_disable(self, system, python_agent, python_capability, mock_task): def test_system_enable_disable(self, system, python_agent, python_capability, mock_task):
"""Test enabling and disabling the responsibility system.""" """Test enabling and disabling the responsibility system."""
assert system.enabled is True assert system.enabled is True
system.register_agent(python_agent, [python_capability]) system.register_agent(python_agent, [python_capability])
requirements = [ requirements = [
TaskRequirement( TaskRequirement(
capability_name="Python Programming", capability_name="Python Programming",
@@ -218,36 +219,36 @@ class TestResponsibilitySystemIntegration:
weight=1.0 weight=1.0
) )
] ]
assignment = system.assign_task_responsibility(mock_task, requirements) assignment = system.assign_task_responsibility(mock_task, requirements)
assert assignment is not None assert assignment is not None
system.disable_system() system.disable_system()
assert system.enabled is False assert system.enabled is False
disabled_assignment = system.assign_task_responsibility(mock_task, requirements) disabled_assignment = system.assign_task_responsibility(mock_task, requirements)
assert disabled_assignment is None assert disabled_assignment is None
disabled_status = system.get_agent_status(python_agent) disabled_status = system.get_agent_status(python_agent)
assert disabled_status == {} assert disabled_status == {}
system.enable_system() system.enable_system()
assert system.enabled is True assert system.enabled is True
enabled_assignment = system.assign_task_responsibility(mock_task, requirements) enabled_assignment = system.assign_task_responsibility(mock_task, requirements)
assert enabled_assignment is not None assert enabled_assignment is not None
def test_accountability_tracking_integration(self, system, python_agent, def test_accountability_tracking_integration(self, system, python_agent,
python_capability, mock_task): python_capability, mock_task):
"""Test that all operations are properly logged for accountability.""" """Test that all operations are properly logged for accountability."""
system.register_agent(python_agent, [python_capability]) system.register_agent(python_agent, [python_capability])
registration_records = system.accountability.get_agent_records( registration_records = system.accountability.get_agent_records(
python_agent, action_type="registration" python_agent, action_type="registration"
) )
assert len(registration_records) == 1 assert len(registration_records) == 1
requirements = [ requirements = [
TaskRequirement( TaskRequirement(
capability_name="Python Programming", capability_name="Python Programming",
@@ -256,14 +257,14 @@ class TestResponsibilitySystemIntegration:
weight=1.0 weight=1.0
) )
] ]
assignment = system.assign_task_responsibility(mock_task, requirements) system.assign_task_responsibility(mock_task, requirements)
assignment_records = system.accountability.get_agent_records( assignment_records = system.accountability.get_agent_records(
python_agent, action_type="task_assignment" python_agent, action_type="task_assignment"
) )
assert len(assignment_records) == 1 assert len(assignment_records) == 1
system.complete_task( system.complete_task(
agent=python_agent, agent=python_agent,
task=mock_task, task=mock_task,
@@ -271,14 +272,14 @@ class TestResponsibilitySystemIntegration:
completion_time=1800.0, completion_time=1800.0,
quality_score=0.9 quality_score=0.9
) )
completion_records = system.accountability.get_agent_records( completion_records = system.accountability.get_agent_records(
python_agent, action_type="task_completion" python_agent, action_type="task_completion"
) )
assert len(completion_records) == 1 assert len(completion_records) == 1
report = system.accountability.generate_accountability_report(agent=python_agent) report = system.accountability.generate_accountability_report(agent=python_agent)
assert report["total_records"] >= 3 # At least registration, assignment, completion assert report["total_records"] >= 3 # At least registration, assignment, completion
assert "registration" in report["action_counts"] assert "registration" in report["action_counts"]
assert "task_assignment" in report["action_counts"] assert "task_assignment" in report["action_counts"]