From b6c249311199cd4b1727dca28124b8fc4c75be2a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 10 Sep 2025 11:36:31 +0000 Subject: [PATCH] feat: Implement formal responsibility tracking system for CrewAI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add capability-based agent hierarchy with mathematical scoring - Implement responsibility assignment algorithms (greedy, balanced, optimal) - Add comprehensive accountability logging and tracking - Implement performance-based capability adjustment with learning rates - Integrate with existing Agent and Crew classes seamlessly - Add 58 comprehensive tests covering all functionality - Include example usage demonstrating all features Addresses issue #3491 with four key features: 1. Capability-Based Agent Hierarchy 2. Mathematical Responsibility Assignment 3. Accountability Logging 4. Performance-Based Capability Adjustment The system is fully backward compatible and optional - existing crews continue to work without modification. Co-Authored-By: João --- examples/responsibility_tracking_example.py | 297 ++++++++++++++++++ src/crewai/agent.py | 30 ++ src/crewai/crew.py | 15 + src/crewai/responsibility/__init__.py | 33 ++ src/crewai/responsibility/accountability.py | 210 +++++++++++++ src/crewai/responsibility/assignment.py | 251 +++++++++++++++ src/crewai/responsibility/hierarchy.py | 257 +++++++++++++++ src/crewai/responsibility/models.py | 188 +++++++++++ src/crewai/responsibility/performance.py | 232 ++++++++++++++ src/crewai/responsibility/system.py | 259 +++++++++++++++ .../tools/agent_tools/delegate_work_tool.py | 15 + tests/responsibility/__init__.py | 3 + tests/responsibility/test_accountability.py | 199 ++++++++++++ tests/responsibility/test_assignment.py | 221 +++++++++++++ tests/responsibility/test_hierarchy.py | 208 ++++++++++++ tests/responsibility/test_integration.py | 285 +++++++++++++++++ tests/responsibility/test_models.py | 187 +++++++++++ tests/responsibility/test_performance.py | 226 +++++++++++++ 18 files changed, 3116 insertions(+) create mode 100644 examples/responsibility_tracking_example.py create mode 100644 src/crewai/responsibility/__init__.py create mode 100644 src/crewai/responsibility/accountability.py create mode 100644 src/crewai/responsibility/assignment.py create mode 100644 src/crewai/responsibility/hierarchy.py create mode 100644 src/crewai/responsibility/models.py create mode 100644 src/crewai/responsibility/performance.py create mode 100644 src/crewai/responsibility/system.py create mode 100644 tests/responsibility/__init__.py create mode 100644 tests/responsibility/test_accountability.py create mode 100644 tests/responsibility/test_assignment.py create mode 100644 tests/responsibility/test_hierarchy.py create mode 100644 tests/responsibility/test_integration.py create mode 100644 tests/responsibility/test_models.py create mode 100644 tests/responsibility/test_performance.py diff --git a/examples/responsibility_tracking_example.py b/examples/responsibility_tracking_example.py new file mode 100644 index 000000000..55623b568 --- /dev/null +++ b/examples/responsibility_tracking_example.py @@ -0,0 +1,297 @@ +""" +Example demonstrating the formal responsibility tracking system in CrewAI. + +This example shows how to: +1. Set up agents with capabilities +2. Use responsibility-based task assignment +3. Monitor accountability and performance +4. Generate system insights and recommendations +""" + +from crewai import Agent, Crew, Task +from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement +from crewai.responsibility.system import ResponsibilitySystem +from crewai.responsibility.assignment import AssignmentStrategy + + +def create_agents_with_capabilities(): + """Create agents with defined capabilities.""" + + python_capabilities = [ + AgentCapability( + name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + proficiency_level=0.9, + confidence_score=0.8, + description="Expert in Python development and scripting", + keywords=["python", "programming", "development", "scripting"] + ), + AgentCapability( + name="Web Development", + capability_type=CapabilityType.TECHNICAL, + proficiency_level=0.7, + confidence_score=0.7, + description="Experience with web frameworks", + keywords=["web", "flask", "django", "fastapi"] + ) + ] + + python_agent = Agent( + role="Python Developer", + goal="Develop high-quality Python applications and scripts", + backstory="Experienced Python developer with expertise in various frameworks", + capabilities=python_capabilities + ) + + analysis_capabilities = [ + AgentCapability( + name="Data Analysis", + capability_type=CapabilityType.ANALYTICAL, + proficiency_level=0.9, + confidence_score=0.9, + description="Expert in statistical analysis and data interpretation", + keywords=["data", "analysis", "statistics", "pandas", "numpy"] + ), + AgentCapability( + name="Machine Learning", + capability_type=CapabilityType.ANALYTICAL, + proficiency_level=0.8, + confidence_score=0.7, + description="Experience with ML algorithms and model building", + keywords=["machine learning", "ml", "scikit-learn", "tensorflow"] + ) + ] + + analyst_agent = Agent( + role="Data Analyst", + goal="Extract insights from data and build predictive models", + backstory="Data scientist with strong statistical background", + capabilities=analysis_capabilities + ) + + management_capabilities = [ + AgentCapability( + name="Project Management", + capability_type=CapabilityType.LEADERSHIP, + proficiency_level=0.8, + confidence_score=0.9, + description="Experienced in managing technical projects", + keywords=["project management", "coordination", "planning"] + ), + AgentCapability( + name="Communication", + capability_type=CapabilityType.COMMUNICATION, + proficiency_level=0.9, + confidence_score=0.8, + description="Excellent communication and coordination skills", + keywords=["communication", "coordination", "stakeholder management"] + ) + ] + + manager_agent = Agent( + role="Project Manager", + goal="Coordinate team efforts and ensure project success", + backstory="Experienced project manager with technical background", + capabilities=management_capabilities + ) + + return [python_agent, analyst_agent, manager_agent] + + +def create_tasks_with_requirements(): + """Create tasks with specific capability requirements.""" + + data_processing_requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.7, + weight=1.0, + keywords=["python", "programming"] + ), + TaskRequirement( + capability_name="Data Analysis", + capability_type=CapabilityType.ANALYTICAL, + minimum_proficiency=0.6, + weight=0.8, + keywords=["data", "analysis"] + ) + ] + + data_task = Task( + description="Create a Python script to process and analyze customer data", + expected_output="A Python script that processes CSV data and generates summary statistics" + ) + + web_dashboard_requirements = [ + TaskRequirement( + capability_name="Web Development", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.6, + weight=1.0, + keywords=["web", "development"] + ), + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5, + weight=0.7, + keywords=["python", "programming"] + ) + ] + + web_task = Task( + description="Create a web dashboard to visualize data analysis results", + expected_output="A web application with interactive charts and data visualization" + ) + + coordination_requirements = [ + TaskRequirement( + capability_name="Project Management", + capability_type=CapabilityType.LEADERSHIP, + minimum_proficiency=0.7, + weight=1.0, + keywords=["project management", "coordination"] + ), + TaskRequirement( + capability_name="Communication", + capability_type=CapabilityType.COMMUNICATION, + minimum_proficiency=0.8, + weight=0.9, + keywords=["communication", "coordination"] + ) + ] + + coordination_task = Task( + description="Coordinate the team efforts and ensure project milestones are met", + expected_output="Project status report with timeline and deliverable tracking" + ) + + return [ + (data_task, data_processing_requirements), + (web_task, web_dashboard_requirements), + (coordination_task, coordination_requirements) + ] + + +def demonstrate_responsibility_tracking(): + """Demonstrate the complete responsibility tracking workflow.""" + + print("🚀 CrewAI Formal Responsibility Tracking System Demo") + print("=" * 60) + + print("\n1. Creating agents with defined capabilities...") + agents = create_agents_with_capabilities() + + for agent in agents: + print(f" ✓ {agent.role}: {len(agent.capabilities)} capabilities") + + print("\n2. Setting up crew with responsibility tracking...") + crew = Crew( + agents=agents, + tasks=[], + verbose=True + ) + + responsibility_system = crew.responsibility_system + print(f" ✓ Responsibility system enabled: {responsibility_system.enabled}") + + print("\n3. System overview:") + overview = responsibility_system.get_system_overview() + print(f" • Total agents: {overview['total_agents']}") + print(f" • Capability distribution: {overview['capability_distribution']}") + + print("\n4. Creating tasks with capability requirements...") + tasks_with_requirements = create_tasks_with_requirements() + + print("\n5. Demonstrating responsibility assignment strategies...") + + for i, (task, requirements) in enumerate(tasks_with_requirements): + print(f"\n Task {i+1}: {task.description[:50]}...") + + for strategy in [AssignmentStrategy.GREEDY, AssignmentStrategy.BALANCED, AssignmentStrategy.OPTIMAL]: + assignment = responsibility_system.assign_task_responsibility( + task, requirements, strategy + ) + + if assignment: + agent = responsibility_system._get_agent_by_id(assignment.agent_id) + print(f" • {strategy.value}: {agent.role} (score: {assignment.responsibility_score:.3f})") + print(f" Capabilities matched: {', '.join(assignment.capability_matches)}") + + responsibility_system.complete_task( + agent=agent, + task=task, + success=True, + completion_time=1800.0, + quality_score=0.85, + outcome_description="Task completed successfully" + ) + else: + print(f" • {strategy.value}: No suitable agent found") + + print("\n6. Agent status and performance:") + for agent in agents: + status = responsibility_system.get_agent_status(agent) + print(f"\n {agent.role}:") + print(f" • Current workload: {status['current_workload']}") + if status['performance']: + perf = status['performance'] + print(f" • Success rate: {perf['success_rate']:.2f}") + print(f" • Quality score: {perf['quality_score']:.2f}") + print(f" • Total tasks: {perf['total_tasks']}") + + print("\n7. Accountability tracking:") + for agent in agents: + report = responsibility_system.accountability.generate_accountability_report(agent=agent) + if report['total_records'] > 0: + print(f"\n {agent.role} accountability:") + print(f" • Total records: {report['total_records']}") + print(f" • Action types: {list(report['action_counts'].keys())}") + print(f" • Recent actions: {len(report['recent_actions'])}") + + print("\n8. System recommendations:") + recommendations = responsibility_system.generate_recommendations() + if recommendations: + for rec in recommendations: + print(f" • {rec['type']}: {rec['description']} (Priority: {rec['priority']})") + else: + print(" • No recommendations at this time") + + print("\n9. Demonstrating task delegation:") + if len(agents) >= 2: + delegation_task = Task( + description="Complex task requiring delegation", + expected_output="Delegated task completion report" + ) + + responsibility_system.delegate_task( + delegating_agent=agents[0], + receiving_agent=agents[1], + task=delegation_task, + reason="Specialized expertise required" + ) + + print(f" ✓ Delegated task from {agents[0].role} to {agents[1].role}") + + delegation_records = responsibility_system.accountability.get_delegation_chain(delegation_task) + print(f" • Delegation chain length: {len(delegation_records)}") + + print("\n" + "=" * 60) + print("🎉 Responsibility tracking demonstration completed!") + print("\nKey features demonstrated:") + print("• Capability-based agent hierarchy") + print("• Mathematical responsibility assignment") + print("• Accountability logging") + print("• Performance-based capability adjustment") + + +if __name__ == "__main__": + try: + demonstrate_responsibility_tracking() + print("\n✅ All demonstrations completed successfully!") + + except Exception as e: + print(f"\n❌ Error during demonstration: {str(e)}") + import traceback + traceback.print_exc() diff --git a/src/crewai/agent.py b/src/crewai/agent.py index 122ddc82f..f46581db2 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -59,6 +59,7 @@ from crewai.events.types.knowledge_events import ( from crewai.utilities.llm_utils import create_llm from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.training_handler import CrewTrainingHandler +from crewai.responsibility.models import AgentCapability class Agent(BaseAgent): @@ -178,6 +179,10 @@ class Agent(BaseAgent): guardrail_max_retries: int = Field( default=3, description="Maximum number of retries when guardrail fails" ) + capabilities: Optional[List[AgentCapability]] = Field( + default_factory=list, + description="List of agent capabilities for responsibility tracking" + ) @model_validator(mode="before") def validate_from_repository(cls, v): @@ -188,6 +193,7 @@ class Agent(BaseAgent): @model_validator(mode="after") def post_init_setup(self): self.agent_ops_agent_name = self.role + self._responsibility_system = None self.llm = create_llm(self.llm) if self.function_calling_llm and not isinstance( @@ -208,6 +214,30 @@ class Agent(BaseAgent): self.cache_handler = CacheHandler() self.set_cache_handler(self.cache_handler) + def set_responsibility_system(self, responsibility_system) -> None: + """Set the responsibility tracking system for this agent.""" + self._responsibility_system = responsibility_system + + if self.capabilities: + self._responsibility_system.register_agent(self, self.capabilities) + + def add_capability(self, capability: AgentCapability) -> None: + """Add a capability to this agent.""" + if self.capabilities is None: + self.capabilities = [] + self.capabilities.append(capability) + + if self._responsibility_system: + self._responsibility_system.hierarchy.add_agent(self, self.capabilities) + + def get_capabilities(self) -> List[AgentCapability]: + """Get all capabilities for this agent.""" + return self.capabilities or [] + + def get_responsibility_system(self): + """Get the responsibility tracking system for this agent.""" + return self._responsibility_system + def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None): try: if self.embedder is None and crew_embedder: diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 9185d143d..6285d166e 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -298,8 +298,17 @@ class Crew(FlowTrackable, BaseModel): if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM): self.function_calling_llm = create_llm(self.function_calling_llm) + # Initialize responsibility system + from crewai.responsibility.system import ResponsibilitySystem + self._responsibility_system = ResponsibilitySystem() + return self + @property + def responsibility_system(self): + """Get the responsibility tracking system for this crew.""" + return getattr(self, '_responsibility_system', None) + def _initialize_default_memories(self): self._long_term_memory = self._long_term_memory or LongTermMemory() self._short_term_memory = self._short_term_memory or ShortTermMemory( @@ -389,6 +398,8 @@ class Crew(FlowTrackable, BaseModel): agent.set_cache_handler(self._cache_handler) if self.max_rpm: agent.set_rpm_controller(self._rpm_controller) + if self.responsibility_system: + agent.set_responsibility_system(self.responsibility_system) return self @model_validator(mode="after") @@ -778,6 +789,10 @@ class Crew(FlowTrackable, BaseModel): def _run_hierarchical_process(self) -> CrewOutput: """Creates and assigns a manager agent to make sure the crew completes the tasks.""" self._create_manager_agent() + + if self.manager_agent and self.responsibility_system: + self.manager_agent.set_responsibility_system(self.responsibility_system) + return self._execute_tasks(self.tasks) def _create_manager_agent(self): diff --git a/src/crewai/responsibility/__init__.py b/src/crewai/responsibility/__init__.py new file mode 100644 index 000000000..cb60a59ac --- /dev/null +++ b/src/crewai/responsibility/__init__.py @@ -0,0 +1,33 @@ +""" +Formal Responsibility Tracking System for CrewAI + +This module provides comprehensive responsibility tracking capabilities including: +- Capability-based agent hierarchy +- Mathematical responsibility assignment +- Accountability logging +- Performance-based capability adjustment +""" + +from crewai.responsibility.accountability import AccountabilityLogger +from crewai.responsibility.assignment import ResponsibilityCalculator +from crewai.responsibility.hierarchy import CapabilityHierarchy +from crewai.responsibility.models import ( + AccountabilityRecord, + AgentCapability, + PerformanceMetrics, + ResponsibilityAssignment, +) +from crewai.responsibility.performance import PerformanceTracker +from crewai.responsibility.system import ResponsibilitySystem + +__all__ = [ + "AccountabilityLogger", + "AccountabilityRecord", + "AgentCapability", + "CapabilityHierarchy", + "PerformanceMetrics", + "PerformanceTracker", + "ResponsibilityAssignment", + "ResponsibilityCalculator", + "ResponsibilitySystem", +] diff --git a/src/crewai/responsibility/accountability.py b/src/crewai/responsibility/accountability.py new file mode 100644 index 000000000..89d4b85da --- /dev/null +++ b/src/crewai/responsibility/accountability.py @@ -0,0 +1,210 @@ +""" +Accountability logging and tracking system. +""" + +from collections import defaultdict +from datetime import datetime, timedelta +from typing import Any + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.responsibility.models import AccountabilityRecord +from crewai.task import Task + + +class AccountabilityLogger: + """Logs and tracks agent actions for accountability.""" + + def __init__(self): + self.records: list[AccountabilityRecord] = [] + self.agent_records: dict[str, list[AccountabilityRecord]] = defaultdict(list) + self._setup_event_listeners() + + def log_action( + self, + agent: BaseAgent, + action_type: str, + action_description: str, + task: Task | None = None, + context: dict[str, Any] | None = None + ) -> AccountabilityRecord: + """Log an agent action.""" + agent_id = self._get_agent_id(agent) + task_id = str(task.id) if task else None + + record = AccountabilityRecord( + agent_id=agent_id, + action_type=action_type, + action_description=action_description, + task_id=task_id, + context=context or {} + ) + + self.records.append(record) + self.agent_records[agent_id].append(record) + + return record + + def log_decision( + self, + agent: BaseAgent, + decision: str, + reasoning: str, + task: Task | None = None, + alternatives_considered: list[str] | None = None + ) -> AccountabilityRecord: + """Log an agent decision with reasoning.""" + context = { + "reasoning": reasoning, + "alternatives_considered": alternatives_considered or [] + } + + return self.log_action( + agent=agent, + action_type="decision", + action_description=decision, + task=task, + context=context + ) + + def log_delegation( + self, + delegating_agent: BaseAgent, + receiving_agent: BaseAgent, + task: Task, + delegation_reason: str + ) -> AccountabilityRecord: + """Log task delegation between agents.""" + context = { + "receiving_agent_id": self._get_agent_id(receiving_agent), + "receiving_agent_role": receiving_agent.role, + "delegation_reason": delegation_reason + } + + return self.log_action( + agent=delegating_agent, + action_type="delegation", + action_description=f"Delegated task to {receiving_agent.role}", + task=task, + context=context + ) + + def log_task_completion( + self, + agent: BaseAgent, + task: Task, + success: bool, + outcome_description: str, + completion_time: float | None = None + ) -> AccountabilityRecord: + """Log task completion with outcome.""" + context = { + "completion_time": completion_time, + "task_description": task.description + } + + record = self.log_action( + agent=agent, + action_type="task_completion", + action_description=f"Completed task: {task.description[:100]}...", + task=task, + context=context + ) + + record.set_outcome(outcome_description, success) + return record + + def get_agent_records( + self, + agent: BaseAgent, + action_type: str | None = None, + since: datetime | None = None + ) -> list[AccountabilityRecord]: + """Get accountability records for a specific agent.""" + agent_id = self._get_agent_id(agent) + records = self.agent_records.get(agent_id, []) + + if action_type: + records = [r for r in records if r.action_type == action_type] + + if since: + records = [r for r in records if r.timestamp >= since] + + return records + + def get_task_records(self, task: Task) -> list[AccountabilityRecord]: + """Get all accountability records related to a specific task.""" + task_id = str(task.id) + return [r for r in self.records if r.task_id == task_id] + + def get_delegation_chain(self, task: Task) -> list[AccountabilityRecord]: + """Get the delegation chain for a task.""" + task_records = self.get_task_records(task) + delegation_records = [r for r in task_records if r.action_type == "delegation"] + + delegation_records.sort(key=lambda r: r.timestamp) + return delegation_records + + def generate_accountability_report( + self, + agent: BaseAgent | None = None, + time_period: timedelta | None = None + ) -> dict[str, Any]: + """Generate an accountability report.""" + since = datetime.utcnow() - time_period if time_period else None + + if agent: + records = self.get_agent_records(agent, since=since) + agent_id = self._get_agent_id(agent) + else: + records = self.records + if since: + records = [r for r in records if r.timestamp >= since] + agent_id = "all_agents" + + action_counts = defaultdict(int) + success_counts = defaultdict(int) + failure_counts = defaultdict(int) + + for record in records: + action_counts[record.action_type] += 1 + if record.success is True: + success_counts[record.action_type] += 1 + elif record.success is False: + failure_counts[record.action_type] += 1 + + success_rates = {} + for action_type in action_counts: + total = success_counts[action_type] + failure_counts[action_type] + if total > 0: + success_rates[action_type] = success_counts[action_type] / total + else: + success_rates[action_type] = None + + return { + "agent_id": agent_id, + "report_period": { + "start": since.isoformat() if since else None, + "end": datetime.utcnow().isoformat() + }, + "total_records": len(records), + "action_counts": dict(action_counts), + "success_counts": dict(success_counts), + "failure_counts": dict(failure_counts), + "success_rates": success_rates, + "recent_actions": [ + { + "timestamp": r.timestamp.isoformat(), + "action_type": r.action_type, + "description": r.action_description, + "success": r.success + } + for r in sorted(records, key=lambda x: x.timestamp, reverse=True)[:10] + ] + } + + def _setup_event_listeners(self) -> None: + """Set up event listeners for automatic logging.""" + + def _get_agent_id(self, agent: BaseAgent) -> str: + """Get a unique identifier for an agent.""" + return f"{agent.role}_{id(agent)}" diff --git a/src/crewai/responsibility/assignment.py b/src/crewai/responsibility/assignment.py new file mode 100644 index 000000000..a95c5cb0d --- /dev/null +++ b/src/crewai/responsibility/assignment.py @@ -0,0 +1,251 @@ +""" +Mathematical responsibility assignment algorithms. +""" + +import math +from enum import Enum + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.responsibility.hierarchy import CapabilityHierarchy +from crewai.responsibility.models import ResponsibilityAssignment, TaskRequirement +from crewai.task import Task + + +class AssignmentStrategy(str, Enum): + """Different strategies for responsibility assignment.""" + GREEDY = "greedy" # Assign to best available agent + BALANCED = "balanced" # Balance workload across agents + OPTIMAL = "optimal" # Optimize for overall system performance + + +class ResponsibilityCalculator: + """Calculates and assigns responsibilities using mathematical algorithms.""" + + def __init__(self, hierarchy: CapabilityHierarchy): + self.hierarchy = hierarchy + self.current_workloads: dict[str, int] = {} # agent_id -> current task count + + def calculate_responsibility_assignment( + self, + task: Task, + requirements: list[TaskRequirement], + strategy: AssignmentStrategy = AssignmentStrategy.GREEDY, + exclude_agents: list[BaseAgent] | None = None + ) -> ResponsibilityAssignment | None: + """Calculate the best responsibility assignment for a task.""" + exclude_agent_ids = set() + if exclude_agents: + exclude_agent_ids = {self.hierarchy._get_agent_id(agent) for agent in exclude_agents} + + if strategy == AssignmentStrategy.GREEDY: + return self._greedy_assignment(task, requirements, exclude_agent_ids) + if strategy == AssignmentStrategy.BALANCED: + return self._balanced_assignment(task, requirements, exclude_agent_ids) + if strategy == AssignmentStrategy.OPTIMAL: + return self._optimal_assignment(task, requirements, exclude_agent_ids) + raise ValueError(f"Unknown assignment strategy: {strategy}") + + def calculate_multi_agent_assignment( + self, + task: Task, + requirements: list[TaskRequirement], + max_agents: int = 3, + strategy: AssignmentStrategy = AssignmentStrategy.OPTIMAL + ) -> list[ResponsibilityAssignment]: + """Calculate assignment for tasks requiring multiple agents.""" + assignments = [] + used_agents = set() + + sorted_requirements = sorted(requirements, key=lambda r: r.weight, reverse=True) + + for i, requirement in enumerate(sorted_requirements): + if len(assignments) >= max_agents: + break + + single_req_assignment = self.calculate_responsibility_assignment( + task, [requirement], strategy, + exclude_agents=[self.hierarchy.agents[agent_id] for agent_id in used_agents] + ) + + if single_req_assignment: + single_req_assignment.responsibility_score *= (1.0 / (i + 1)) # Diminishing returns + assignments.append(single_req_assignment) + used_agents.add(single_req_assignment.agent_id) + + return assignments + + def update_workload(self, agent: BaseAgent, workload_change: int) -> None: + """Update the current workload for an agent.""" + agent_id = self.hierarchy._get_agent_id(agent) + current = self.current_workloads.get(agent_id, 0) + self.current_workloads[agent_id] = max(0, current + workload_change) + + def get_workload_distribution(self) -> dict[str, int]: + """Get current workload distribution across all agents.""" + return self.current_workloads.copy() + + def _greedy_assignment( + self, + task: Task, + requirements: list[TaskRequirement], + exclude_agent_ids: set + ) -> ResponsibilityAssignment | None: + """Assign to the agent with highest capability match score.""" + best_match = self.hierarchy.get_best_agent_for_task(requirements, exclude_agent_ids) + + if not best_match: + return None + + agent, score, matches = best_match + agent_id = self.hierarchy._get_agent_id(agent) + + return ResponsibilityAssignment( + agent_id=agent_id, + task_id=str(task.id), + responsibility_score=score, + capability_matches=matches, + reasoning=f"Greedy assignment: highest capability match score ({score:.3f})" + ) + + def _balanced_assignment( + self, + task: Task, + requirements: list[TaskRequirement], + exclude_agent_ids: set + ) -> ResponsibilityAssignment | None: + """Assign considering both capability and current workload.""" + capable_agents = self.hierarchy.find_capable_agents(requirements, minimum_match_score=0.3) + + if not capable_agents: + return None + + best_agent = None + best_score = -1.0 + best_matches = [] + + for agent, capability_score in capable_agents: + agent_id = self.hierarchy._get_agent_id(agent) + + if agent_id in exclude_agent_ids: + continue + + current_workload = self.current_workloads.get(agent_id, 0) + workload_penalty = self._calculate_workload_penalty(current_workload) + + combined_score = capability_score * (1.0 - workload_penalty) + + if combined_score > best_score: + best_score = combined_score + best_agent = agent + _, best_matches = self.hierarchy._calculate_detailed_capability_match(agent_id, requirements) + + if best_agent: + agent_id = self.hierarchy._get_agent_id(best_agent) + return ResponsibilityAssignment( + agent_id=agent_id, + task_id=str(task.id), + responsibility_score=best_score, + capability_matches=best_matches, + reasoning=f"Balanced assignment: capability ({capability_score:.3f}) with workload consideration" + ) + + return None + + def _optimal_assignment( + self, + task: Task, + requirements: list[TaskRequirement], + exclude_agent_ids: set + ) -> ResponsibilityAssignment | None: + """Assign using optimization for overall system performance.""" + capable_agents = self.hierarchy.find_capable_agents(requirements, minimum_match_score=0.2) + + if not capable_agents: + return None + + best_agent = None + best_score = -1.0 + best_matches = [] + + for agent, capability_score in capable_agents: + agent_id = self.hierarchy._get_agent_id(agent) + + if agent_id in exclude_agent_ids: + continue + + optimization_score = self._calculate_optimization_score( + agent_id, capability_score, requirements + ) + + if optimization_score > best_score: + best_score = optimization_score + best_agent = agent + _, best_matches = self.hierarchy._calculate_detailed_capability_match(agent_id, requirements) + + if best_agent: + agent_id = self.hierarchy._get_agent_id(best_agent) + return ResponsibilityAssignment( + agent_id=agent_id, + task_id=str(task.id), + responsibility_score=best_score, + capability_matches=best_matches, + reasoning=f"Optimal assignment: multi-factor optimization score ({best_score:.3f})" + ) + + return None + + def _calculate_workload_penalty(self, current_workload: int) -> float: + """Calculate penalty based on current workload.""" + if current_workload == 0: + return 0.0 + + return min(0.8, 1.0 - math.exp(-current_workload / 3.0)) + + def _calculate_optimization_score( + self, + agent_id: str, + capability_score: float, + requirements: list[TaskRequirement] + ) -> float: + """Calculate multi-factor optimization score.""" + score = capability_score + + current_workload = self.current_workloads.get(agent_id, 0) + workload_factor = 1.0 - self._calculate_workload_penalty(current_workload) + + agent_capabilities = self.hierarchy.agent_capabilities.get(agent_id, []) + specialization_bonus = self._calculate_specialization_bonus(agent_capabilities, requirements) + + reliability_factor = 1.0 # Placeholder for future performance integration + + return ( + score * 0.5 + # 50% capability match + workload_factor * 0.2 + # 20% workload consideration + specialization_bonus * 0.2 + # 20% specialization bonus + reliability_factor * 0.1 # 10% reliability + ) + + def _calculate_specialization_bonus( + self, + agent_capabilities: list, + requirements: list[TaskRequirement] + ) -> float: + """Calculate bonus for agents with specialized capabilities.""" + if not agent_capabilities or not requirements: + return 0.0 + + high_proficiency_matches = 0 + total_matches = 0 + + for capability in agent_capabilities: + for requirement in requirements: + if self.hierarchy._capabilities_match(capability, requirement): + total_matches += 1 + if capability.proficiency_level >= 0.8: + high_proficiency_matches += 1 + + if total_matches == 0: + return 0.0 + + specialization_ratio = high_proficiency_matches / total_matches + return min(0.3, specialization_ratio * 0.3) # Max 30% bonus diff --git a/src/crewai/responsibility/hierarchy.py b/src/crewai/responsibility/hierarchy.py new file mode 100644 index 000000000..790018437 --- /dev/null +++ b/src/crewai/responsibility/hierarchy.py @@ -0,0 +1,257 @@ +""" +Capability-based agent hierarchy management. +""" + +from collections import defaultdict, deque + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.responsibility.models import ( + AgentCapability, + CapabilityType, + TaskRequirement, +) + + +class CapabilityHierarchy: + """Manages capability-based agent hierarchy and relationships.""" + + def __init__(self): + self.agents: dict[str, BaseAgent] = {} + self.agent_capabilities: dict[str, list[AgentCapability]] = defaultdict(list) + self.capability_index: dict[str, set[str]] = defaultdict(set) # capability_name -> agent_ids + self.hierarchy_relationships: dict[str, set[str]] = defaultdict(set) # supervisor -> subordinates + + def add_agent(self, agent: BaseAgent, capabilities: list[AgentCapability]) -> None: + """Add an agent with their capabilities to the hierarchy.""" + agent_id = self._get_agent_id(agent) + self.agents[agent_id] = agent + self.agent_capabilities[agent_id] = capabilities + + for capability in capabilities: + self.capability_index[capability.name].add(agent_id) + + def remove_agent(self, agent: BaseAgent) -> None: + """Remove an agent from the hierarchy.""" + agent_id = self._get_agent_id(agent) + + if agent_id in self.agents: + for capability in self.agent_capabilities[agent_id]: + self.capability_index[capability.name].discard(agent_id) + + for supervisor_id in self.hierarchy_relationships: + self.hierarchy_relationships[supervisor_id].discard(agent_id) + if agent_id in self.hierarchy_relationships: + del self.hierarchy_relationships[agent_id] + + del self.agents[agent_id] + del self.agent_capabilities[agent_id] + + def set_supervision_relationship(self, supervisor: BaseAgent, subordinate: BaseAgent) -> None: + """Establish a supervision relationship between agents.""" + supervisor_id = self._get_agent_id(supervisor) + subordinate_id = self._get_agent_id(subordinate) + + if supervisor_id in self.agents and subordinate_id in self.agents: + self.hierarchy_relationships[supervisor_id].add(subordinate_id) + + def get_agent_capabilities(self, agent: BaseAgent) -> list[AgentCapability]: + """Get capabilities for a specific agent.""" + agent_id = self._get_agent_id(agent) + return self.agent_capabilities.get(agent_id, []) + + def update_agent_capability( + self, + agent: BaseAgent, + capability_name: str, + new_proficiency: float, + new_confidence: float + ) -> bool: + """Update a specific capability for an agent.""" + agent_id = self._get_agent_id(agent) + + if agent_id not in self.agent_capabilities: + return False + + for capability in self.agent_capabilities[agent_id]: + if capability.name == capability_name: + capability.update_proficiency(new_proficiency, new_confidence) + return True + + return False + + def find_capable_agents( + self, + requirements: list[TaskRequirement], + minimum_match_score: float = 0.5 + ) -> list[tuple[BaseAgent, float]]: + """Find agents capable of handling the given requirements.""" + agent_scores = [] + + for agent_id, agent in self.agents.items(): + score = self._calculate_capability_match_score(agent_id, requirements) + if score >= minimum_match_score: + agent_scores.append((agent, score)) + + agent_scores.sort(key=lambda x: x[1], reverse=True) + return agent_scores + + def get_best_agent_for_task( + self, + requirements: list[TaskRequirement], + exclude_agents: set[str] | None = None + ) -> tuple[BaseAgent, float, list[str]] | None: + """Get the best agent for a task based on capability requirements.""" + exclude_agents = exclude_agents or set() + best_agent = None + best_score = 0.0 + best_matches = [] + + for agent_id, agent in self.agents.items(): + if agent_id in exclude_agents: + continue + + score, matches = self._calculate_detailed_capability_match(agent_id, requirements) + if score > best_score: + best_score = score + best_agent = agent + best_matches = matches + + if best_agent: + return best_agent, best_score, best_matches + return None + + def get_subordinates(self, supervisor: BaseAgent) -> list[BaseAgent]: + """Get all subordinates of a supervisor agent.""" + supervisor_id = self._get_agent_id(supervisor) + subordinate_ids = self.hierarchy_relationships.get(supervisor_id, set()) + return [self.agents[sub_id] for sub_id in subordinate_ids if sub_id in self.agents] + + def get_hierarchy_path(self, from_agent: BaseAgent, to_agent: BaseAgent) -> list[BaseAgent] | None: + """Find the shortest path in the hierarchy between two agents.""" + from_id = self._get_agent_id(from_agent) + to_id = self._get_agent_id(to_agent) + + if from_id not in self.agents or to_id not in self.agents: + return None + + queue = deque([(from_id, [from_id])]) + visited = {from_id} + + while queue: + current_id, path = queue.popleft() + + if current_id == to_id: + return [self.agents[agent_id] for agent_id in path] + + for subordinate_id in self.hierarchy_relationships.get(current_id, set()): + if subordinate_id not in visited: + visited.add(subordinate_id) + queue.append((subordinate_id, [*path, subordinate_id])) + + return None + + def get_capability_distribution(self) -> dict[CapabilityType, dict[str, int]]: + """Get distribution of capabilities across all agents.""" + distribution = defaultdict(lambda: defaultdict(int)) + + for capabilities in self.agent_capabilities.values(): + for capability in capabilities: + proficiency_level = "high" if capability.proficiency_level >= 0.8 else \ + "medium" if capability.proficiency_level >= 0.5 else "low" + distribution[capability.capability_type][proficiency_level] += 1 + + return dict(distribution) + + def _get_agent_id(self, agent: BaseAgent) -> str: + """Get a unique identifier for an agent.""" + return f"{agent.role}_{id(agent)}" + + def _calculate_capability_match_score( + self, + agent_id: str, + requirements: list[TaskRequirement] + ) -> float: + """Calculate how well an agent's capabilities match task requirements.""" + if not requirements: + return 1.0 + + agent_capabilities = self.agent_capabilities.get(agent_id, []) + if not agent_capabilities: + return 0.0 + + total_weight = sum(req.weight for req in requirements) + if total_weight == 0: + return 0.0 + + weighted_score = 0.0 + + for requirement in requirements: + best_match_score = 0.0 + + for capability in agent_capabilities: + if self._capabilities_match(capability, requirement): + proficiency_score = min(capability.proficiency_level / requirement.minimum_proficiency, 1.0) + confidence_factor = capability.confidence_score + match_score = proficiency_score * confidence_factor + best_match_score = max(best_match_score, match_score) + + weighted_score += best_match_score * requirement.weight + + return weighted_score / total_weight + + def _calculate_detailed_capability_match( + self, + agent_id: str, + requirements: list[TaskRequirement] + ) -> tuple[float, list[str]]: + """Calculate detailed capability match with matched capability names.""" + if not requirements: + return 1.0, [] + + agent_capabilities = self.agent_capabilities.get(agent_id, []) + if not agent_capabilities: + return 0.0, [] + + total_weight = sum(req.weight for req in requirements) + if total_weight == 0: + return 0.0, [] + + weighted_score = 0.0 + matched_capabilities = [] + + for requirement in requirements: + best_match_score = 0.0 + best_match_capability = None + + for capability in agent_capabilities: + if self._capabilities_match(capability, requirement): + proficiency_score = min(capability.proficiency_level / requirement.minimum_proficiency, 1.0) + confidence_factor = capability.confidence_score + match_score = proficiency_score * confidence_factor + + if match_score > best_match_score: + best_match_score = match_score + best_match_capability = capability.name + + if best_match_capability: + matched_capabilities.append(best_match_capability) + + weighted_score += best_match_score * requirement.weight + + return weighted_score / total_weight, matched_capabilities + + def _capabilities_match(self, capability: AgentCapability, requirement: TaskRequirement) -> bool: + """Check if a capability matches a requirement.""" + if capability.name.lower() == requirement.capability_name.lower(): + return True + + if capability.capability_type == requirement.capability_type: + return True + + capability_keywords = set(kw.lower() for kw in capability.keywords) + requirement_keywords = set(kw.lower() for kw in requirement.keywords) + + if capability_keywords.intersection(requirement_keywords): + return True + + return False diff --git a/src/crewai/responsibility/models.py b/src/crewai/responsibility/models.py new file mode 100644 index 000000000..f87b18445 --- /dev/null +++ b/src/crewai/responsibility/models.py @@ -0,0 +1,188 @@ +""" +Data models for the formal responsibility tracking system. +""" + +from datetime import datetime +from enum import Enum +from typing import Any +from uuid import UUID, uuid4 + +from pydantic import BaseModel, Field + + +class CapabilityType(str, Enum): + """Types of capabilities an agent can have.""" + TECHNICAL = "technical" + ANALYTICAL = "analytical" + CREATIVE = "creative" + COMMUNICATION = "communication" + LEADERSHIP = "leadership" + DOMAIN_SPECIFIC = "domain_specific" + + +class AgentCapability(BaseModel): + """Represents a specific capability of an agent.""" + + id: UUID = Field(default_factory=uuid4) + name: str = Field(..., description="Name of the capability") + capability_type: CapabilityType = Field(..., description="Type of capability") + proficiency_level: float = Field( + ..., + ge=0.0, + le=1.0, + description="Proficiency level from 0.0 to 1.0" + ) + confidence_score: float = Field( + ..., + ge=0.0, + le=1.0, + description="Confidence in this capability assessment" + ) + description: str | None = Field(None, description="Detailed description of the capability") + keywords: list[str] = Field(default_factory=list, description="Keywords associated with this capability") + last_updated: datetime = Field(default_factory=datetime.utcnow) + + def update_proficiency(self, new_level: float, confidence: float) -> None: + """Update proficiency level and confidence.""" + self.proficiency_level = max(0.0, min(1.0, new_level)) + self.confidence_score = max(0.0, min(1.0, confidence)) + self.last_updated = datetime.utcnow() + + +class ResponsibilityAssignment(BaseModel): + """Represents the assignment of responsibility for a task to an agent.""" + + id: UUID = Field(default_factory=uuid4) + agent_id: str = Field(..., description="ID of the assigned agent") + task_id: str = Field(..., description="ID of the task") + responsibility_score: float = Field( + ..., + ge=0.0, + le=1.0, + description="Calculated responsibility score" + ) + capability_matches: list[str] = Field( + default_factory=list, + description="Capabilities that matched for this assignment" + ) + reasoning: str = Field(..., description="Explanation for this assignment") + assigned_at: datetime = Field(default_factory=datetime.utcnow) + completed_at: datetime | None = Field(None) + success: bool | None = Field(None, description="Whether the assignment was successful") + + def mark_completed(self, success: bool) -> None: + """Mark the assignment as completed.""" + self.completed_at = datetime.utcnow() + self.success = success + + +class AccountabilityRecord(BaseModel): + """Records agent actions and decisions for accountability tracking.""" + + id: UUID = Field(default_factory=uuid4) + agent_id: str = Field(..., description="ID of the agent") + action_type: str = Field(..., description="Type of action taken") + action_description: str = Field(..., description="Description of the action") + task_id: str | None = Field(None, description="Related task ID if applicable") + context: dict[str, Any] = Field(default_factory=dict, description="Additional context") + outcome: str | None = Field(None, description="Outcome of the action") + success: bool | None = Field(None, description="Whether the action was successful") + timestamp: datetime = Field(default_factory=datetime.utcnow) + + def set_outcome(self, outcome: str, success: bool) -> None: + """Set the outcome of the action.""" + self.outcome = outcome + self.success = success + + +class PerformanceMetrics(BaseModel): + """Performance metrics for an agent.""" + + agent_id: str = Field(..., description="ID of the agent") + total_tasks: int = Field(default=0, description="Total number of tasks assigned") + successful_tasks: int = Field(default=0, description="Number of successful tasks") + failed_tasks: int = Field(default=0, description="Number of failed tasks") + average_completion_time: float = Field(default=0.0, description="Average task completion time in seconds") + quality_score: float = Field( + default=0.5, + ge=0.0, + le=1.0, + description="Overall quality score" + ) + efficiency_score: float = Field( + default=0.5, + ge=0.0, + le=1.0, + description="Efficiency score based on completion times" + ) + reliability_score: float = Field( + default=0.5, + ge=0.0, + le=1.0, + description="Reliability score based on success rate" + ) + last_updated: datetime = Field(default_factory=datetime.utcnow) + + @property + def success_rate(self) -> float: + """Calculate success rate.""" + if self.total_tasks == 0: + return 0.0 + return self.successful_tasks / self.total_tasks + + def update_metrics( + self, + task_success: bool, + completion_time: float, + quality_score: float | None = None + ) -> None: + """Update performance metrics with new task result.""" + self.total_tasks += 1 + if task_success: + self.successful_tasks += 1 + else: + self.failed_tasks += 1 + + alpha = 0.1 # Learning rate + + if self.total_tasks == 1: + self.average_completion_time = completion_time + else: + self.average_completion_time = ( + alpha * completion_time + (1 - alpha) * self.average_completion_time + ) + + self.reliability_score = self.success_rate + + if completion_time > 0: + normalized_time = min(completion_time / 3600, 1.0) # Normalize to hours, cap at 1 + self.efficiency_score = max(0.1, 1.0 - normalized_time) + + if quality_score is not None: + self.quality_score = ( + alpha * quality_score + (1 - alpha) * self.quality_score + ) + + self.last_updated = datetime.utcnow() + + +class TaskRequirement(BaseModel): + """Represents capability requirements for a task.""" + + capability_name: str = Field(..., description="Name of required capability") + capability_type: CapabilityType = Field(..., description="Type of required capability") + minimum_proficiency: float = Field( + ..., + ge=0.0, + le=1.0, + description="Minimum required proficiency level" + ) + weight: float = Field( + default=1.0, + ge=0.0, + description="Weight/importance of this requirement" + ) + keywords: list[str] = Field( + default_factory=list, + description="Keywords that help match capabilities" + ) diff --git a/src/crewai/responsibility/performance.py b/src/crewai/responsibility/performance.py new file mode 100644 index 000000000..1da7ce7da --- /dev/null +++ b/src/crewai/responsibility/performance.py @@ -0,0 +1,232 @@ +""" +Performance-based capability adjustment system. +""" + +from datetime import timedelta + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.responsibility.hierarchy import CapabilityHierarchy +from crewai.responsibility.models import AgentCapability, PerformanceMetrics + + +class PerformanceTracker: + """Tracks agent performance and adjusts capabilities accordingly.""" + + def __init__(self, hierarchy: CapabilityHierarchy): + self.hierarchy = hierarchy + self.performance_metrics: dict[str, PerformanceMetrics] = {} + self.learning_rate = 0.1 + self.adjustment_threshold = 0.05 # Minimum change to trigger capability update + + def record_task_completion( + self, + agent: BaseAgent, + task_success: bool, + completion_time: float, + quality_score: float | None = None, + capability_used: str | None = None + ) -> None: + """Record a task completion and update performance metrics.""" + agent_id = self._get_agent_id(agent) + + if agent_id not in self.performance_metrics: + self.performance_metrics[agent_id] = PerformanceMetrics(agent_id=agent_id) + + metrics = self.performance_metrics[agent_id] + metrics.update_metrics(task_success, completion_time, quality_score) + + if capability_used and task_success is not None: + self._update_capability_based_on_performance( + agent, capability_used, task_success, quality_score + ) + + def get_performance_metrics(self, agent: BaseAgent) -> PerformanceMetrics | None: + """Get performance metrics for an agent.""" + agent_id = self._get_agent_id(agent) + return self.performance_metrics.get(agent_id) + + def adjust_capabilities_based_on_performance( + self, + agent: BaseAgent, + performance_window: timedelta = timedelta(days=7) + ) -> list[tuple[str, float, float]]: + """Adjust agent capabilities based on recent performance.""" + agent_id = self._get_agent_id(agent) + metrics = self.performance_metrics.get(agent_id) + + if not metrics: + return [] + + adjustments = [] + agent_capabilities = self.hierarchy.get_agent_capabilities(agent) + + for capability in agent_capabilities: + old_proficiency = capability.proficiency_level + old_confidence = capability.confidence_score + + new_proficiency, new_confidence = self._calculate_adjusted_capability( + capability, metrics + ) + + proficiency_change = abs(new_proficiency - old_proficiency) + confidence_change = abs(new_confidence - old_confidence) + + if proficiency_change >= self.adjustment_threshold or confidence_change >= self.adjustment_threshold: + self.hierarchy.update_agent_capability( + agent, capability.name, new_proficiency, new_confidence + ) + adjustments.append((capability.name, new_proficiency - old_proficiency, new_confidence - old_confidence)) + + return adjustments + + def get_performance_trends( + self, + agent: BaseAgent, + capability_name: str | None = None + ) -> dict[str, list[float]]: + """Get performance trends for an agent.""" + agent_id = self._get_agent_id(agent) + metrics = self.performance_metrics.get(agent_id) + + if not metrics: + return {} + + return { + "success_rate": [metrics.success_rate], + "quality_score": [metrics.quality_score], + "efficiency_score": [metrics.efficiency_score], + "reliability_score": [metrics.reliability_score] + } + + def identify_improvement_opportunities( + self, + agent: BaseAgent + ) -> list[dict[str, any]]: + """Identify areas where an agent could improve.""" + agent_id = self._get_agent_id(agent) + metrics = self.performance_metrics.get(agent_id) + + if not metrics: + return [] + + opportunities = [] + + if metrics.success_rate < 0.7: + opportunities.append({ + "area": "success_rate", + "current_value": metrics.success_rate, + "recommendation": "Focus on task completion accuracy and problem-solving skills" + }) + + if metrics.quality_score < 0.6: + opportunities.append({ + "area": "quality", + "current_value": metrics.quality_score, + "recommendation": "Improve attention to detail and output quality" + }) + + if metrics.efficiency_score < 0.5: + opportunities.append({ + "area": "efficiency", + "current_value": metrics.efficiency_score, + "recommendation": "Work on time management and process optimization" + }) + + return opportunities + + def compare_agent_performance( + self, + agents: list[BaseAgent], + metric: str = "overall" + ) -> list[tuple[BaseAgent, float]]: + """Compare performance across multiple agents.""" + agent_scores = [] + + for agent in agents: + agent_id = self._get_agent_id(agent) + metrics = self.performance_metrics.get(agent_id) + + if not metrics: + continue + + if metric == "overall": + score = ( + metrics.success_rate * 0.4 + + metrics.quality_score * 0.3 + + metrics.efficiency_score * 0.2 + + metrics.reliability_score * 0.1 + ) + elif metric == "success_rate": + score = metrics.success_rate + elif metric == "quality": + score = metrics.quality_score + elif metric == "efficiency": + score = metrics.efficiency_score + elif metric == "reliability": + score = metrics.reliability_score + else: + continue + + agent_scores.append((agent, score)) + + agent_scores.sort(key=lambda x: x[1], reverse=True) + return agent_scores + + def _update_capability_based_on_performance( + self, + agent: BaseAgent, + capability_name: str, + task_success: bool, + quality_score: float | None + ) -> None: + """Update a specific capability based on task performance.""" + agent_capabilities = self.hierarchy.get_agent_capabilities(agent) + + for capability in agent_capabilities: + if capability.name == capability_name: + if task_success: + proficiency_adjustment = self.learning_rate * 0.1 # Small positive adjustment + confidence_adjustment = self.learning_rate * 0.05 + else: + proficiency_adjustment = -self.learning_rate * 0.05 # Small negative adjustment + confidence_adjustment = -self.learning_rate * 0.1 + + if quality_score is not None: + quality_factor = (quality_score - 0.5) * 2 # Scale to -1 to 1 + proficiency_adjustment *= (1 + quality_factor * 0.5) + + new_proficiency = max(0.0, min(1.0, capability.proficiency_level + proficiency_adjustment)) + new_confidence = max(0.0, min(1.0, capability.confidence_score + confidence_adjustment)) + + self.hierarchy.update_agent_capability( + agent, capability_name, new_proficiency, new_confidence + ) + break + + def _calculate_adjusted_capability( + self, + capability: AgentCapability, + metrics: PerformanceMetrics + ) -> tuple[float, float]: + """Calculate adjusted capability values based on performance metrics.""" + performance_factor = ( + metrics.success_rate * 0.4 + + metrics.quality_score * 0.3 + + metrics.efficiency_score * 0.2 + + metrics.reliability_score * 0.1 + ) + + adjustment_magnitude = (performance_factor - 0.5) * self.learning_rate + + new_proficiency = capability.proficiency_level + adjustment_magnitude + new_proficiency = max(0.0, min(1.0, new_proficiency)) + + confidence_adjustment = (metrics.reliability_score - 0.5) * self.learning_rate * 0.5 + new_confidence = capability.confidence_score + confidence_adjustment + new_confidence = max(0.0, min(1.0, new_confidence)) + + return new_proficiency, new_confidence + + def _get_agent_id(self, agent: BaseAgent) -> str: + """Get a unique identifier for an agent.""" + return f"{agent.role}_{id(agent)}" diff --git a/src/crewai/responsibility/system.py b/src/crewai/responsibility/system.py new file mode 100644 index 000000000..09d00a4dd --- /dev/null +++ b/src/crewai/responsibility/system.py @@ -0,0 +1,259 @@ +""" +Main responsibility system that coordinates all components. +""" + +from datetime import datetime, timedelta +from typing import Any + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.responsibility.accountability import AccountabilityLogger +from crewai.responsibility.assignment import ( + AssignmentStrategy, + ResponsibilityCalculator, +) +from crewai.responsibility.hierarchy import CapabilityHierarchy +from crewai.responsibility.models import ( + AgentCapability, + ResponsibilityAssignment, + TaskRequirement, +) +from crewai.responsibility.performance import PerformanceTracker +from crewai.task import Task + + +class ResponsibilitySystem: + """Main system that coordinates all responsibility tracking components.""" + + def __init__(self): + self.hierarchy = CapabilityHierarchy() + self.calculator = ResponsibilityCalculator(self.hierarchy) + self.accountability = AccountabilityLogger() + self.performance = PerformanceTracker(self.hierarchy) + self.enabled = True + + def register_agent( + self, + agent: BaseAgent, + capabilities: list[AgentCapability], + supervisor: BaseAgent | None = None + ) -> None: + """Register an agent with the responsibility system.""" + if not self.enabled: + return + + self.hierarchy.add_agent(agent, capabilities) + + if supervisor: + self.hierarchy.set_supervision_relationship(supervisor, agent) + + self.accountability.log_action( + agent=agent, + action_type="registration", + action_description=f"Agent registered with {len(capabilities)} capabilities", + context={"capabilities": [cap.name for cap in capabilities]} + ) + + def assign_task_responsibility( + self, + task: Task, + requirements: list[TaskRequirement], + strategy: AssignmentStrategy = AssignmentStrategy.GREEDY, + exclude_agents: list[BaseAgent] | None = None + ) -> ResponsibilityAssignment | None: + """Assign responsibility for a task to the best agent.""" + if not self.enabled: + return None + + assignment = self.calculator.calculate_responsibility_assignment( + task, requirements, strategy, exclude_agents + ) + + if assignment: + agent = self._get_agent_by_id(assignment.agent_id) + if agent: + self.calculator.update_workload(agent, 1) + + self.accountability.log_action( + agent=agent, + action_type="task_assignment", + action_description=f"Assigned responsibility for task: {task.description[:100]}...", + task=task, + context={ + "responsibility_score": assignment.responsibility_score, + "capability_matches": assignment.capability_matches, + "strategy": strategy.value + } + ) + + return assignment + + def complete_task( + self, + agent: BaseAgent, + task: Task, + success: bool, + completion_time: float, + quality_score: float | None = None, + outcome_description: str = "" + ) -> None: + """Record task completion and update performance metrics.""" + if not self.enabled: + return + + self.performance.record_task_completion( + agent, success, completion_time, quality_score + ) + + self.calculator.update_workload(agent, -1) + + self.accountability.log_task_completion( + agent, task, success, outcome_description, completion_time + ) + + adjustments = self.performance.adjust_capabilities_based_on_performance(agent) + if adjustments: + self.accountability.log_action( + agent=agent, + action_type="capability_adjustment", + action_description="Capabilities adjusted based on performance", + context={"adjustments": adjustments} + ) + + def delegate_task( + self, + delegating_agent: BaseAgent, + receiving_agent: BaseAgent, + task: Task, + reason: str + ) -> None: + """Record task delegation between agents.""" + if not self.enabled: + return + + self.calculator.update_workload(delegating_agent, -1) + self.calculator.update_workload(receiving_agent, 1) + + self.accountability.log_delegation( + delegating_agent, receiving_agent, task, reason + ) + + def get_agent_status(self, agent: BaseAgent) -> dict[str, Any]: + """Get comprehensive status for an agent.""" + if not self.enabled: + return {} + + agent_id = self.hierarchy._get_agent_id(agent) + capabilities = self.hierarchy.get_agent_capabilities(agent) + performance = self.performance.get_performance_metrics(agent) + recent_records = self.accountability.get_agent_records( + agent, since=datetime.utcnow() - timedelta(days=7) + ) + current_workload = self.calculator.current_workloads.get(agent_id, 0) + + return { + "agent_id": agent_id, + "role": agent.role, + "capabilities": [ + { + "name": cap.name, + "type": cap.capability_type.value, + "proficiency": cap.proficiency_level, + "confidence": cap.confidence_score + } + for cap in capabilities + ], + "performance": { + "success_rate": performance.success_rate if performance else 0.0, + "quality_score": performance.quality_score if performance else 0.0, + "efficiency_score": performance.efficiency_score if performance else 0.0, + "total_tasks": performance.total_tasks if performance else 0 + } if performance else None, + "current_workload": current_workload, + "recent_activity_count": len(recent_records) + } + + def get_system_overview(self) -> dict[str, Any]: + """Get overview of the entire responsibility system.""" + if not self.enabled: + return {"enabled": False} + + total_agents = len(self.hierarchy.agents) + capability_distribution = self.hierarchy.get_capability_distribution() + workload_distribution = self.calculator.get_workload_distribution() + + all_performance = list(self.performance.performance_metrics.values()) + avg_success_rate = sum(p.success_rate for p in all_performance) / len(all_performance) if all_performance else 0.0 + avg_quality = sum(p.quality_score for p in all_performance) / len(all_performance) if all_performance else 0.0 + + return { + "enabled": True, + "total_agents": total_agents, + "capability_distribution": capability_distribution, + "workload_distribution": workload_distribution, + "system_performance": { + "average_success_rate": avg_success_rate, + "average_quality_score": avg_quality, + "total_tasks_completed": sum(p.total_tasks for p in all_performance) + }, + "total_accountability_records": len(self.accountability.records) + } + + def generate_recommendations(self) -> list[dict[str, Any]]: + """Generate system-wide recommendations for improvement.""" + if not self.enabled: + return [] + + recommendations = [] + + workloads = self.calculator.get_workload_distribution() + if workloads: + max_workload = max(workloads.values()) + min_workload = min(workloads.values()) + + if max_workload - min_workload > 3: # Significant imbalance + recommendations.append({ + "type": "workload_balancing", + "priority": "high", + "description": "Workload imbalance detected. Consider redistributing tasks.", + "details": {"max_workload": max_workload, "min_workload": min_workload} + }) + + capability_dist = self.hierarchy.get_capability_distribution() + for cap_type, levels in capability_dist.items(): + total_agents_with_cap = sum(levels.values()) + if total_agents_with_cap < 2: # Too few agents with this capability + recommendations.append({ + "type": "capability_gap", + "priority": "medium", + "description": f"Limited coverage for {cap_type.value} capabilities", + "details": {"capability_type": cap_type.value, "agent_count": total_agents_with_cap} + }) + + for agent_id, metrics in self.performance.performance_metrics.items(): + if metrics.success_rate < 0.6: # Low success rate + agent = self._get_agent_by_id(agent_id) + if agent: + recommendations.append({ + "type": "performance_improvement", + "priority": "high", + "description": f"Agent {agent.role} has low success rate", + "details": { + "agent_role": agent.role, + "success_rate": metrics.success_rate, + "improvement_opportunities": self.performance.identify_improvement_opportunities(agent) + } + }) + + return recommendations + + def enable_system(self) -> None: + """Enable the responsibility system.""" + self.enabled = True + + def disable_system(self) -> None: + """Disable the responsibility system.""" + self.enabled = False + + def _get_agent_by_id(self, agent_id: str) -> BaseAgent | None: + """Get agent by ID.""" + return self.hierarchy.agents.get(agent_id) diff --git a/src/crewai/tools/agent_tools/delegate_work_tool.py b/src/crewai/tools/agent_tools/delegate_work_tool.py index 9dbf6c920..3c5fdcf87 100644 --- a/src/crewai/tools/agent_tools/delegate_work_tool.py +++ b/src/crewai/tools/agent_tools/delegate_work_tool.py @@ -27,4 +27,19 @@ class DelegateWorkTool(BaseAgentTool): **kwargs, ) -> str: coworker = self._get_coworker(coworker, **kwargs) + + if hasattr(self, 'agents') and self.agents: + delegating_agent = kwargs.get('delegating_agent') + if delegating_agent and hasattr(delegating_agent, 'responsibility_system'): + responsibility_system = delegating_agent.responsibility_system + if responsibility_system and responsibility_system.enabled: + task_obj = kwargs.get('task_obj') + if task_obj: + responsibility_system.delegate_task( + delegating_agent=delegating_agent, + receiving_agent=coworker, + task=task_obj, + reason=f"Delegation based on capability match for: {task[:100]}..." + ) + return self._execute(coworker, task, context) diff --git a/tests/responsibility/__init__.py b/tests/responsibility/__init__.py new file mode 100644 index 000000000..dc4f7c4a5 --- /dev/null +++ b/tests/responsibility/__init__.py @@ -0,0 +1,3 @@ +""" +Tests for the formal responsibility tracking system. +""" diff --git a/tests/responsibility/test_accountability.py b/tests/responsibility/test_accountability.py new file mode 100644 index 000000000..ae00db621 --- /dev/null +++ b/tests/responsibility/test_accountability.py @@ -0,0 +1,199 @@ +""" +Tests for accountability logging system. +""" + +import pytest +from datetime import datetime, timedelta +from unittest.mock import Mock + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.task import Task +from crewai.responsibility.accountability import AccountabilityLogger + + +class TestAccountabilityLogger: + @pytest.fixture + def logger(self): + return AccountabilityLogger() + + @pytest.fixture + def mock_agent(self): + agent = Mock(spec=BaseAgent) + agent.role = "Test Agent" + return agent + + @pytest.fixture + def mock_task(self): + task = Mock(spec=Task) + task.id = "test_task_1" + task.description = "Test task description" + return task + + def test_log_action(self, logger, mock_agent, mock_task): + context = {"complexity": "high", "priority": "urgent"} + + record = logger.log_action( + agent=mock_agent, + action_type="task_execution", + action_description="Executed data processing task", + task=mock_task, + context=context + ) + + assert record.agent_id == "Test Agent_" + str(id(mock_agent)) + assert record.action_type == "task_execution" + assert record.action_description == "Executed data processing task" + assert record.task_id == "test_task_1" + assert record.context["complexity"] == "high" + assert len(logger.records) == 1 + + def test_log_decision(self, logger, mock_agent, mock_task): + alternatives = ["Option A", "Option B", "Option C"] + + record = logger.log_decision( + agent=mock_agent, + decision="Chose Option A", + reasoning="Best performance characteristics", + task=mock_task, + alternatives_considered=alternatives + ) + + assert record.action_type == "decision" + assert record.action_description == "Chose Option A" + assert record.context["reasoning"] == "Best performance characteristics" + assert record.context["alternatives_considered"] == alternatives + + def test_log_delegation(self, logger, mock_task): + delegating_agent = Mock(spec=BaseAgent) + delegating_agent.role = "Manager" + + receiving_agent = Mock(spec=BaseAgent) + receiving_agent.role = "Developer" + + record = logger.log_delegation( + delegating_agent=delegating_agent, + receiving_agent=receiving_agent, + task=mock_task, + delegation_reason="Specialized expertise required" + ) + + assert record.action_type == "delegation" + assert "Delegated task to Developer" in record.action_description + assert record.context["receiving_agent_role"] == "Developer" + assert record.context["delegation_reason"] == "Specialized expertise required" + + def test_log_task_completion(self, logger, mock_agent, mock_task): + record = logger.log_task_completion( + agent=mock_agent, + task=mock_task, + success=True, + outcome_description="Task completed successfully with high quality", + completion_time=1800.0 + ) + + assert record.action_type == "task_completion" + assert record.success is True + assert record.outcome == "Task completed successfully with high quality" + assert record.context["completion_time"] == 1800.0 + + def test_get_agent_records(self, logger, mock_agent, mock_task): + logger.log_action(mock_agent, "action1", "Description 1", mock_task) + logger.log_action(mock_agent, "action2", "Description 2", mock_task) + logger.log_decision(mock_agent, "decision1", "Reasoning", mock_task) + + all_records = logger.get_agent_records(mock_agent) + assert len(all_records) == 3 + + decision_records = logger.get_agent_records(mock_agent, action_type="decision") + assert len(decision_records) == 1 + assert decision_records[0].action_type == "decision" + + recent_time = datetime.utcnow() - timedelta(minutes=1) + recent_records = logger.get_agent_records(mock_agent, since=recent_time) + assert len(recent_records) == 3 # All should be recent + + def test_get_task_records(self, logger, mock_agent, mock_task): + other_task = Mock(spec=Task) + other_task.id = "other_task" + + logger.log_action(mock_agent, "action1", "Description 1", mock_task) + logger.log_action(mock_agent, "action2", "Description 2", other_task) + logger.log_action(mock_agent, "action3", "Description 3", mock_task) + + task_records = logger.get_task_records(mock_task) + assert len(task_records) == 2 + + for record in task_records: + assert record.task_id == "test_task_1" + + def test_get_delegation_chain(self, logger, mock_task): + manager = Mock(spec=BaseAgent) + manager.role = "Manager" + + supervisor = Mock(spec=BaseAgent) + supervisor.role = "Supervisor" + + developer = Mock(spec=BaseAgent) + developer.role = "Developer" + + logger.log_delegation(manager, supervisor, mock_task, "Initial delegation") + logger.log_delegation(supervisor, developer, mock_task, "Further delegation") + + chain = logger.get_delegation_chain(mock_task) + assert len(chain) == 2 + + assert chain[0].context["receiving_agent_role"] == "Supervisor" + assert chain[1].context["receiving_agent_role"] == "Developer" + + def test_generate_accountability_report(self, logger, mock_agent, mock_task): + record1 = logger.log_action(mock_agent, "task_execution", "Task 1", mock_task) + record1.set_outcome("Success", True) + + record2 = logger.log_action(mock_agent, "task_execution", "Task 2", mock_task) + record2.set_outcome("Failed", False) + + record3 = logger.log_decision(mock_agent, "Decision 1", "Reasoning", mock_task) + record3.set_outcome("Good decision", True) + + report = logger.generate_accountability_report(agent=mock_agent) + + assert report["total_records"] == 3 + assert report["action_counts"]["task_execution"] == 2 + assert report["action_counts"]["decision"] == 1 + assert report["success_counts"]["task_execution"] == 1 + assert report["failure_counts"]["task_execution"] == 1 + assert report["success_rates"]["task_execution"] == 0.5 + assert report["success_rates"]["decision"] == 1.0 + + assert len(report["recent_actions"]) == 3 + + def test_generate_system_wide_report(self, logger, mock_task): + agent1 = Mock(spec=BaseAgent) + agent1.role = "Agent 1" + + agent2 = Mock(spec=BaseAgent) + agent2.role = "Agent 2" + + logger.log_action(agent1, "task_execution", "Task 1", mock_task) + logger.log_action(agent2, "task_execution", "Task 2", mock_task) + + report = logger.generate_accountability_report() + + assert report["agent_id"] == "all_agents" + assert report["total_records"] == 2 + assert report["action_counts"]["task_execution"] == 2 + + def test_time_filtered_report(self, logger, mock_agent, mock_task): + logger.log_action(mock_agent, "old_action", "Old action", mock_task) + + report = logger.generate_accountability_report( + agent=mock_agent, + time_period=timedelta(hours=1) + ) + + assert report["total_records"] == 1 + + report = logger.generate_accountability_report( + agent=mock_agent, + time_period=timedelta(seconds=1) + ) diff --git a/tests/responsibility/test_assignment.py b/tests/responsibility/test_assignment.py new file mode 100644 index 000000000..6087f6b84 --- /dev/null +++ b/tests/responsibility/test_assignment.py @@ -0,0 +1,221 @@ +""" +Tests for mathematical responsibility assignment. +""" + +import pytest +from unittest.mock import Mock + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.task import Task +from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement +from crewai.responsibility.hierarchy import CapabilityHierarchy +from crewai.responsibility.assignment import ResponsibilityCalculator, AssignmentStrategy + + +class TestResponsibilityCalculator: + @pytest.fixture + def hierarchy(self): + return CapabilityHierarchy() + + @pytest.fixture + def calculator(self, hierarchy): + return ResponsibilityCalculator(hierarchy) + + @pytest.fixture + def mock_task(self): + task = Mock(spec=Task) + task.id = "test_task_1" + task.description = "Test task description" + return task + + @pytest.fixture + def python_agent(self, hierarchy): + agent = Mock(spec=BaseAgent) + agent.role = "Python Developer" + + capability = AgentCapability( + name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + proficiency_level=0.9, + confidence_score=0.8, + keywords=["python", "programming"] + ) + + hierarchy.add_agent(agent, [capability]) + return agent + + @pytest.fixture + def analysis_agent(self, hierarchy): + agent = Mock(spec=BaseAgent) + agent.role = "Data Analyst" + + capability = AgentCapability( + name="Data Analysis", + capability_type=CapabilityType.ANALYTICAL, + proficiency_level=0.8, + confidence_score=0.9, + keywords=["data", "analysis"] + ) + + hierarchy.add_agent(agent, [capability]) + return agent + + def test_greedy_assignment(self, calculator, mock_task, python_agent): + requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5, + weight=1.0 + ) + ] + + assignment = calculator.calculate_responsibility_assignment( + mock_task, requirements, AssignmentStrategy.GREEDY + ) + + assert assignment is not None + assert assignment.task_id == "test_task_1" + assert assignment.responsibility_score > 0.5 + assert "Python Programming" in assignment.capability_matches + assert "Greedy assignment" in assignment.reasoning + + def test_balanced_assignment(self, calculator, mock_task, python_agent, analysis_agent): + calculator.update_workload(python_agent, 5) # High workload + calculator.update_workload(analysis_agent, 1) # Low workload + + requirements = [ + TaskRequirement( + capability_name="General Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.3, + weight=1.0 + ) + ] + + assignment = calculator.calculate_responsibility_assignment( + mock_task, requirements, AssignmentStrategy.BALANCED + ) + + assert assignment is not None + assert "Balanced assignment" in assignment.reasoning + + def test_optimal_assignment(self, calculator, mock_task, python_agent): + requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5, + weight=1.0 + ) + ] + + assignment = calculator.calculate_responsibility_assignment( + mock_task, requirements, AssignmentStrategy.OPTIMAL + ) + + assert assignment is not None + assert "Optimal assignment" in assignment.reasoning + + def test_multi_agent_assignment(self, calculator, mock_task, python_agent, analysis_agent): + requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5, + weight=1.0 + ), + TaskRequirement( + capability_name="Data Analysis", + capability_type=CapabilityType.ANALYTICAL, + minimum_proficiency=0.5, + weight=0.8 + ) + ] + + assignments = calculator.calculate_multi_agent_assignment( + mock_task, requirements, max_agents=2 + ) + + assert len(assignments) <= 2 + assert len(assignments) > 0 + + agent_ids = [assignment.agent_id for assignment in assignments] + assert len(agent_ids) == len(set(agent_ids)) + + def test_workload_update(self, calculator, python_agent): + initial_workload = calculator.current_workloads.get( + calculator.hierarchy._get_agent_id(python_agent), 0 + ) + + calculator.update_workload(python_agent, 3) + + new_workload = calculator.current_workloads.get( + calculator.hierarchy._get_agent_id(python_agent), 0 + ) + + assert new_workload == initial_workload + 3 + + calculator.update_workload(python_agent, -2) + + final_workload = calculator.current_workloads.get( + calculator.hierarchy._get_agent_id(python_agent), 0 + ) + + assert final_workload == new_workload - 2 + + def test_workload_distribution(self, calculator, python_agent, analysis_agent): + calculator.update_workload(python_agent, 3) + calculator.update_workload(analysis_agent, 1) + + distribution = calculator.get_workload_distribution() + + python_id = calculator.hierarchy._get_agent_id(python_agent) + analysis_id = calculator.hierarchy._get_agent_id(analysis_agent) + + assert distribution[python_id] == 3 + assert distribution[analysis_id] == 1 + + def test_exclude_agents(self, calculator, mock_task, python_agent, analysis_agent): + requirements = [ + TaskRequirement( + capability_name="Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.3, + weight=1.0 + ) + ] + + assignment = calculator.calculate_responsibility_assignment( + mock_task, requirements, AssignmentStrategy.GREEDY, + exclude_agents=[python_agent] + ) + + if assignment: # If any agent was assigned + python_id = calculator.hierarchy._get_agent_id(python_agent) + assert assignment.agent_id != python_id + + def test_no_capable_agents(self, calculator, mock_task): + requirements = [ + TaskRequirement( + capability_name="Quantum Computing", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.9, + weight=1.0 + ) + ] + + assignment = calculator.calculate_responsibility_assignment( + mock_task, requirements, AssignmentStrategy.GREEDY + ) + + assert assignment is None + + def test_workload_penalty_calculation(self, calculator): + assert calculator._calculate_workload_penalty(0) == 0.0 + + penalty_1 = calculator._calculate_workload_penalty(1) + penalty_5 = calculator._calculate_workload_penalty(5) + + assert penalty_1 < penalty_5 # Higher workload should have higher penalty + assert penalty_5 <= 0.8 # Should not exceed maximum penalty diff --git a/tests/responsibility/test_hierarchy.py b/tests/responsibility/test_hierarchy.py new file mode 100644 index 000000000..e96c3532a --- /dev/null +++ b/tests/responsibility/test_hierarchy.py @@ -0,0 +1,208 @@ +""" +Tests for capability-based agent hierarchy. +""" + +import pytest +from unittest.mock import Mock + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement +from crewai.responsibility.hierarchy import CapabilityHierarchy + + +class TestCapabilityHierarchy: + @pytest.fixture + def hierarchy(self): + return CapabilityHierarchy() + + @pytest.fixture + def mock_agent(self): + agent = Mock(spec=BaseAgent) + agent.role = "Test Agent" + return agent + + @pytest.fixture + def python_capability(self): + return AgentCapability( + name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + proficiency_level=0.8, + confidence_score=0.9, + keywords=["python", "programming"] + ) + + @pytest.fixture + def analysis_capability(self): + return AgentCapability( + name="Data Analysis", + capability_type=CapabilityType.ANALYTICAL, + proficiency_level=0.7, + confidence_score=0.8, + keywords=["data", "analysis", "statistics"] + ) + + def test_add_agent(self, hierarchy, mock_agent, python_capability): + capabilities = [python_capability] + hierarchy.add_agent(mock_agent, capabilities) + + assert len(hierarchy.agents) == 1 + assert len(hierarchy.agent_capabilities) == 1 + assert "Python Programming" in hierarchy.capability_index + + def test_remove_agent(self, hierarchy, mock_agent, python_capability): + capabilities = [python_capability] + hierarchy.add_agent(mock_agent, capabilities) + + assert len(hierarchy.agents) == 1 + + hierarchy.remove_agent(mock_agent) + + assert len(hierarchy.agents) == 0 + assert len(hierarchy.agent_capabilities) == 0 + assert len(hierarchy.capability_index["Python Programming"]) == 0 + + def test_supervision_relationship(self, hierarchy): + supervisor = Mock(spec=BaseAgent) + supervisor.role = "Supervisor" + subordinate = Mock(spec=BaseAgent) + subordinate.role = "Subordinate" + + hierarchy.add_agent(supervisor, []) + hierarchy.add_agent(subordinate, []) + + hierarchy.set_supervision_relationship(supervisor, subordinate) + + subordinates = hierarchy.get_subordinates(supervisor) + assert len(subordinates) == 1 + assert subordinates[0] == subordinate + + def test_update_agent_capability(self, hierarchy, mock_agent, python_capability): + hierarchy.add_agent(mock_agent, [python_capability]) + + success = hierarchy.update_agent_capability( + mock_agent, "Python Programming", 0.9, 0.95 + ) + + assert success is True + + capabilities = hierarchy.get_agent_capabilities(mock_agent) + updated_cap = next(cap for cap in capabilities if cap.name == "Python Programming") + assert updated_cap.proficiency_level == 0.9 + assert updated_cap.confidence_score == 0.95 + + def test_find_capable_agents(self, hierarchy, mock_agent, python_capability): + hierarchy.add_agent(mock_agent, [python_capability]) + + requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5, + weight=1.0 + ) + ] + + capable_agents = hierarchy.find_capable_agents(requirements) + + assert len(capable_agents) == 1 + assert capable_agents[0][0] == mock_agent + assert capable_agents[0][1] > 0.5 # Should have a good match score + + def test_get_best_agent_for_task(self, hierarchy, python_capability, analysis_capability): + agent1 = Mock(spec=BaseAgent) + agent1.role = "Python Developer" + agent2 = Mock(spec=BaseAgent) + agent2.role = "Data Analyst" + + hierarchy.add_agent(agent1, [python_capability]) + hierarchy.add_agent(agent2, [analysis_capability]) + + requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5, + weight=1.0 + ) + ] + + result = hierarchy.get_best_agent_for_task(requirements) + + assert result is not None + best_agent, score, matches = result + assert best_agent == agent1 # Python developer should be chosen + assert "Python Programming" in matches + + def test_capability_distribution(self, hierarchy, python_capability, analysis_capability): + agent1 = Mock(spec=BaseAgent) + agent1.role = "Developer" + agent2 = Mock(spec=BaseAgent) + agent2.role = "Analyst" + + hierarchy.add_agent(agent1, [python_capability]) + hierarchy.add_agent(agent2, [analysis_capability]) + + distribution = hierarchy.get_capability_distribution() + + assert CapabilityType.TECHNICAL in distribution + assert CapabilityType.ANALYTICAL in distribution + assert distribution[CapabilityType.TECHNICAL]["high"] == 1 # Python capability is high proficiency + assert distribution[CapabilityType.ANALYTICAL]["medium"] == 1 # Analysis capability is medium proficiency + + def test_hierarchy_path(self, hierarchy): + manager = Mock(spec=BaseAgent) + manager.role = "Manager" + supervisor = Mock(spec=BaseAgent) + supervisor.role = "Supervisor" + worker = Mock(spec=BaseAgent) + worker.role = "Worker" + + hierarchy.add_agent(manager, []) + hierarchy.add_agent(supervisor, []) + hierarchy.add_agent(worker, []) + + hierarchy.set_supervision_relationship(manager, supervisor) + hierarchy.set_supervision_relationship(supervisor, worker) + + path = hierarchy.get_hierarchy_path(manager, worker) + + assert path is not None + assert len(path) == 3 + assert path[0] == manager + assert path[1] == supervisor + assert path[2] == worker + + def test_capabilities_match(self, hierarchy, python_capability): + requirement = TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5 + ) + + assert hierarchy._capabilities_match(python_capability, requirement) is True + + requirement2 = TaskRequirement( + capability_name="Different Name", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5 + ) + + assert hierarchy._capabilities_match(python_capability, requirement2) is True + + requirement3 = TaskRequirement( + capability_name="Different Name", + capability_type=CapabilityType.ANALYTICAL, + minimum_proficiency=0.5, + keywords=["python"] + ) + + assert hierarchy._capabilities_match(python_capability, requirement3) is True + + requirement4 = TaskRequirement( + capability_name="Different Name", + capability_type=CapabilityType.ANALYTICAL, + minimum_proficiency=0.5, + keywords=["java"] + ) + + assert hierarchy._capabilities_match(python_capability, requirement4) is False diff --git a/tests/responsibility/test_integration.py b/tests/responsibility/test_integration.py new file mode 100644 index 000000000..006197c72 --- /dev/null +++ b/tests/responsibility/test_integration.py @@ -0,0 +1,285 @@ +""" +Integration tests for the responsibility tracking system. +""" + +import pytest +from unittest.mock import Mock + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.task import Task +from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement +from crewai.responsibility.system import ResponsibilitySystem +from crewai.responsibility.assignment import AssignmentStrategy + + +class TestResponsibilitySystemIntegration: + @pytest.fixture + def system(self): + return ResponsibilitySystem() + + @pytest.fixture + def python_agent(self): + agent = Mock(spec=BaseAgent) + agent.role = "Python Developer" + return agent + + @pytest.fixture + def analysis_agent(self): + agent = Mock(spec=BaseAgent) + agent.role = "Data Analyst" + return agent + + @pytest.fixture + def python_capability(self): + return AgentCapability( + name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + proficiency_level=0.9, + confidence_score=0.8, + keywords=["python", "programming", "development"] + ) + + @pytest.fixture + def analysis_capability(self): + return AgentCapability( + name="Data Analysis", + capability_type=CapabilityType.ANALYTICAL, + proficiency_level=0.8, + confidence_score=0.9, + keywords=["data", "analysis", "statistics"] + ) + + @pytest.fixture + def mock_task(self): + task = Mock(spec=Task) + task.id = "integration_test_task" + task.description = "Complex data processing task requiring Python skills" + return task + + def test_full_workflow(self, system, python_agent, python_capability, mock_task): + """Test complete workflow from agent registration to task completion.""" + + system.register_agent(python_agent, [python_capability]) + + status = system.get_agent_status(python_agent) + assert status["role"] == "Python Developer" + assert len(status["capabilities"]) == 1 + assert status["capabilities"][0]["name"] == "Python Programming" + + requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5, + weight=1.0 + ) + ] + + assignment = system.assign_task_responsibility(mock_task, requirements) + + assert assignment is not None + assert assignment.task_id == "integration_test_task" + assert assignment.responsibility_score > 0.5 + + updated_status = system.get_agent_status(python_agent) + assert updated_status["current_workload"] == 1 + + system.complete_task( + agent=python_agent, + task=mock_task, + success=True, + completion_time=1800.0, + quality_score=0.9, + outcome_description="Task completed successfully" + ) + + final_status = system.get_agent_status(python_agent) + assert final_status["performance"]["total_tasks"] == 1 + assert final_status["performance"]["success_rate"] == 1.0 + assert final_status["current_workload"] == 0 # Should be decremented + + def test_multi_agent_scenario(self, system, python_agent, analysis_agent, + python_capability, analysis_capability, mock_task): + """Test scenario with multiple agents and capabilities.""" + + system.register_agent(python_agent, [python_capability]) + system.register_agent(analysis_agent, [analysis_capability]) + + requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.7, + weight=1.0 + ), + TaskRequirement( + capability_name="Data Analysis", + capability_type=CapabilityType.ANALYTICAL, + minimum_proficiency=0.6, + weight=0.8 + ) + ] + + greedy_assignment = system.assign_task_responsibility( + mock_task, requirements, AssignmentStrategy.GREEDY + ) + + assert greedy_assignment is not None + + system.calculator.update_workload(python_agent, 5) + + balanced_assignment = system.assign_task_responsibility( + mock_task, requirements, AssignmentStrategy.BALANCED + ) + + assert balanced_assignment is not None + + def test_delegation_workflow(self, system, python_agent, analysis_agent, + python_capability, analysis_capability, mock_task): + """Test task delegation between agents.""" + + system.register_agent(python_agent, [python_capability], supervisor=None) + system.register_agent(analysis_agent, [analysis_capability], supervisor=python_agent) + + system.delegate_task( + delegating_agent=python_agent, + receiving_agent=analysis_agent, + task=mock_task, + reason="Analysis expertise required" + ) + + python_status = system.get_agent_status(python_agent) + analysis_status = system.get_agent_status(analysis_agent) + + assert analysis_status["current_workload"] > 0 + + delegation_records = system.accountability.get_agent_records( + python_agent, action_type="delegation" + ) + assert len(delegation_records) > 0 + + def test_performance_based_capability_adjustment(self, system, python_agent, + python_capability, mock_task): + """Test that capabilities are adjusted based on performance.""" + + system.register_agent(python_agent, [python_capability]) + + initial_capabilities = system.hierarchy.get_agent_capabilities(python_agent) + initial_proficiency = initial_capabilities[0].proficiency_level + + for i in range(5): + task = Mock(spec=Task) + task.id = f"task_{i}" + task.description = f"Task {i}" + + system.complete_task( + agent=python_agent, + task=task, + success=True, + completion_time=1800.0, + quality_score=0.9 + ) + + updated_capabilities = system.hierarchy.get_agent_capabilities(python_agent) + + assert len(updated_capabilities) == 1 + + def test_system_overview_and_recommendations(self, system, python_agent, + analysis_agent, python_capability, + analysis_capability): + """Test system overview and recommendation generation.""" + + system.register_agent(python_agent, [python_capability]) + system.register_agent(analysis_agent, [analysis_capability]) + + overview = system.get_system_overview() + + assert overview["enabled"] is True + assert overview["total_agents"] == 2 + assert "capability_distribution" in overview + assert "system_performance" in overview + + recommendations = system.generate_recommendations() + + assert isinstance(recommendations, list) + + def test_system_enable_disable(self, system, python_agent, python_capability, mock_task): + """Test enabling and disabling the responsibility system.""" + + assert system.enabled is True + + system.register_agent(python_agent, [python_capability]) + + requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5, + weight=1.0 + ) + ] + + assignment = system.assign_task_responsibility(mock_task, requirements) + assert assignment is not None + + system.disable_system() + assert system.enabled is False + + disabled_assignment = system.assign_task_responsibility(mock_task, requirements) + assert disabled_assignment is None + + disabled_status = system.get_agent_status(python_agent) + assert disabled_status == {} + + system.enable_system() + assert system.enabled is True + + enabled_assignment = system.assign_task_responsibility(mock_task, requirements) + assert enabled_assignment is not None + + def test_accountability_tracking_integration(self, system, python_agent, + python_capability, mock_task): + """Test that all operations are properly logged for accountability.""" + + system.register_agent(python_agent, [python_capability]) + + registration_records = system.accountability.get_agent_records( + python_agent, action_type="registration" + ) + assert len(registration_records) == 1 + + requirements = [ + TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.5, + weight=1.0 + ) + ] + + assignment = system.assign_task_responsibility(mock_task, requirements) + + assignment_records = system.accountability.get_agent_records( + python_agent, action_type="task_assignment" + ) + assert len(assignment_records) == 1 + + system.complete_task( + agent=python_agent, + task=mock_task, + success=True, + completion_time=1800.0, + quality_score=0.9 + ) + + completion_records = system.accountability.get_agent_records( + python_agent, action_type="task_completion" + ) + assert len(completion_records) == 1 + + report = system.accountability.generate_accountability_report(agent=python_agent) + + assert report["total_records"] >= 3 # At least registration, assignment, completion + assert "registration" in report["action_counts"] + assert "task_assignment" in report["action_counts"] + assert "task_completion" in report["action_counts"] diff --git a/tests/responsibility/test_models.py b/tests/responsibility/test_models.py new file mode 100644 index 000000000..8b43d1c8f --- /dev/null +++ b/tests/responsibility/test_models.py @@ -0,0 +1,187 @@ +""" +Tests for responsibility tracking data models. +""" + +import pytest +from datetime import datetime, timedelta +from uuid import uuid4 + +from crewai.responsibility.models import ( + AgentCapability, + CapabilityType, + ResponsibilityAssignment, + AccountabilityRecord, + PerformanceMetrics, + TaskRequirement +) + + +class TestAgentCapability: + def test_create_capability(self): + capability = AgentCapability( + name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + proficiency_level=0.8, + confidence_score=0.9, + description="Expert in Python development", + keywords=["python", "programming", "development"] + ) + + assert capability.name == "Python Programming" + assert capability.capability_type == CapabilityType.TECHNICAL + assert capability.proficiency_level == 0.8 + assert capability.confidence_score == 0.9 + assert "python" in capability.keywords + + def test_update_proficiency(self): + capability = AgentCapability( + name="Data Analysis", + capability_type=CapabilityType.ANALYTICAL, + proficiency_level=0.5, + confidence_score=0.6 + ) + + old_updated = capability.last_updated + capability.update_proficiency(0.7, 0.8) + + assert capability.proficiency_level == 0.7 + assert capability.confidence_score == 0.8 + assert capability.last_updated > old_updated + + def test_proficiency_bounds(self): + capability = AgentCapability( + name="Test", + capability_type=CapabilityType.TECHNICAL, + proficiency_level=0.5, + confidence_score=0.5 + ) + + capability.update_proficiency(1.5, 1.2) + assert capability.proficiency_level == 1.0 + assert capability.confidence_score == 1.0 + + capability.update_proficiency(-0.5, -0.2) + assert capability.proficiency_level == 0.0 + assert capability.confidence_score == 0.0 + + +class TestResponsibilityAssignment: + def test_create_assignment(self): + assignment = ResponsibilityAssignment( + agent_id="agent_1", + task_id="task_1", + responsibility_score=0.85, + capability_matches=["Python Programming", "Data Analysis"], + reasoning="Best match for technical requirements" + ) + + assert assignment.agent_id == "agent_1" + assert assignment.task_id == "task_1" + assert assignment.responsibility_score == 0.85 + assert len(assignment.capability_matches) == 2 + assert assignment.success is None + + def test_mark_completed(self): + assignment = ResponsibilityAssignment( + agent_id="agent_1", + task_id="task_1", + responsibility_score=0.85, + reasoning="Test assignment" + ) + + assert assignment.completed_at is None + assert assignment.success is None + + assignment.mark_completed(True) + + assert assignment.completed_at is not None + assert assignment.success is True + + +class TestAccountabilityRecord: + def test_create_record(self): + record = AccountabilityRecord( + agent_id="agent_1", + action_type="task_execution", + action_description="Executed data analysis task", + task_id="task_1", + context={"complexity": "high", "duration": 3600} + ) + + assert record.agent_id == "agent_1" + assert record.action_type == "task_execution" + assert record.context["complexity"] == "high" + assert record.outcome is None + + def test_set_outcome(self): + record = AccountabilityRecord( + agent_id="agent_1", + action_type="decision", + action_description="Chose algorithm X" + ) + + record.set_outcome("Algorithm performed well", True) + + assert record.outcome == "Algorithm performed well" + assert record.success is True + + +class TestPerformanceMetrics: + def test_create_metrics(self): + metrics = PerformanceMetrics(agent_id="agent_1") + + assert metrics.agent_id == "agent_1" + assert metrics.total_tasks == 0 + assert metrics.success_rate == 0.0 + assert metrics.quality_score == 0.5 + + def test_update_metrics_success(self): + metrics = PerformanceMetrics(agent_id="agent_1") + + metrics.update_metrics(True, 1800, 0.8) + + assert metrics.total_tasks == 1 + assert metrics.successful_tasks == 1 + assert metrics.failed_tasks == 0 + assert metrics.success_rate == 1.0 + assert metrics.average_completion_time == 1800 + assert metrics.reliability_score == 1.0 + + def test_update_metrics_failure(self): + metrics = PerformanceMetrics(agent_id="agent_1") + + metrics.update_metrics(False, 3600) + + assert metrics.total_tasks == 1 + assert metrics.successful_tasks == 0 + assert metrics.failed_tasks == 1 + assert metrics.success_rate == 0.0 + + def test_update_metrics_mixed(self): + metrics = PerformanceMetrics(agent_id="agent_1") + + metrics.update_metrics(True, 1800, 0.8) + metrics.update_metrics(False, 3600, 0.3) + metrics.update_metrics(True, 2400, 0.9) + + assert metrics.total_tasks == 3 + assert metrics.successful_tasks == 2 + assert metrics.failed_tasks == 1 + assert abs(metrics.success_rate - 2/3) < 0.001 + + +class TestTaskRequirement: + def test_create_requirement(self): + requirement = TaskRequirement( + capability_name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + minimum_proficiency=0.7, + weight=1.5, + keywords=["python", "coding"] + ) + + assert requirement.capability_name == "Python Programming" + assert requirement.capability_type == CapabilityType.TECHNICAL + assert requirement.minimum_proficiency == 0.7 + assert requirement.weight == 1.5 + assert "python" in requirement.keywords diff --git a/tests/responsibility/test_performance.py b/tests/responsibility/test_performance.py new file mode 100644 index 000000000..7d96ff116 --- /dev/null +++ b/tests/responsibility/test_performance.py @@ -0,0 +1,226 @@ +""" +Tests for performance-based capability adjustment. +""" + +import pytest +from datetime import timedelta +from unittest.mock import Mock + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.responsibility.models import AgentCapability, CapabilityType, PerformanceMetrics +from crewai.responsibility.hierarchy import CapabilityHierarchy +from crewai.responsibility.performance import PerformanceTracker + + +class TestPerformanceTracker: + @pytest.fixture + def hierarchy(self): + return CapabilityHierarchy() + + @pytest.fixture + def tracker(self, hierarchy): + return PerformanceTracker(hierarchy) + + @pytest.fixture + def mock_agent(self, hierarchy): + agent = Mock(spec=BaseAgent) + agent.role = "Test Agent" + + capability = AgentCapability( + name="Python Programming", + capability_type=CapabilityType.TECHNICAL, + proficiency_level=0.7, + confidence_score=0.8 + ) + + hierarchy.add_agent(agent, [capability]) + return agent + + def test_record_task_completion_success(self, tracker, mock_agent): + tracker.record_task_completion( + agent=mock_agent, + task_success=True, + completion_time=1800.0, + quality_score=0.9 + ) + + metrics = tracker.get_performance_metrics(mock_agent) + + assert metrics is not None + assert metrics.total_tasks == 1 + assert metrics.successful_tasks == 1 + assert metrics.failed_tasks == 0 + assert metrics.success_rate == 1.0 + assert metrics.average_completion_time == 1800.0 + assert metrics.quality_score > 0.5 # Should be updated towards 0.9 + + def test_record_task_completion_failure(self, tracker, mock_agent): + tracker.record_task_completion( + agent=mock_agent, + task_success=False, + completion_time=3600.0, + quality_score=0.3 + ) + + metrics = tracker.get_performance_metrics(mock_agent) + + assert metrics is not None + assert metrics.total_tasks == 1 + assert metrics.successful_tasks == 0 + assert metrics.failed_tasks == 1 + assert metrics.success_rate == 0.0 + + def test_multiple_task_completions(self, tracker, mock_agent): + tracker.record_task_completion(mock_agent, True, 1800.0, 0.8) + tracker.record_task_completion(mock_agent, False, 3600.0, 0.4) + tracker.record_task_completion(mock_agent, True, 2400.0, 0.9) + + metrics = tracker.get_performance_metrics(mock_agent) + + assert metrics.total_tasks == 3 + assert metrics.successful_tasks == 2 + assert metrics.failed_tasks == 1 + assert abs(metrics.success_rate - 2/3) < 0.001 + + def test_capability_adjustment_on_success(self, tracker, mock_agent): + initial_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent) + initial_proficiency = initial_capabilities[0].proficiency_level + + tracker.record_task_completion( + agent=mock_agent, + task_success=True, + completion_time=1800.0, + quality_score=0.9, + capability_used="Python Programming" + ) + + updated_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent) + updated_proficiency = updated_capabilities[0].proficiency_level + + assert updated_proficiency >= initial_proficiency + + def test_capability_adjustment_on_failure(self, tracker, mock_agent): + initial_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent) + initial_proficiency = initial_capabilities[0].proficiency_level + + tracker.record_task_completion( + agent=mock_agent, + task_success=False, + completion_time=3600.0, + quality_score=0.2, + capability_used="Python Programming" + ) + + updated_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent) + updated_proficiency = updated_capabilities[0].proficiency_level + + assert updated_proficiency <= initial_proficiency + + def test_adjust_capabilities_based_on_performance(self, tracker, mock_agent): + for _ in range(5): + tracker.record_task_completion(mock_agent, True, 1800.0, 0.9) + for _ in range(2): + tracker.record_task_completion(mock_agent, False, 3600.0, 0.3) + + adjustments = tracker.adjust_capabilities_based_on_performance(mock_agent) + + assert isinstance(adjustments, list) + + def test_get_performance_trends(self, tracker, mock_agent): + tracker.record_task_completion(mock_agent, True, 1800.0, 0.8) + tracker.record_task_completion(mock_agent, True, 2000.0, 0.9) + + trends = tracker.get_performance_trends(mock_agent) + + assert "success_rate" in trends + assert "quality_score" in trends + assert "efficiency_score" in trends + assert "reliability_score" in trends + + assert len(trends["success_rate"]) > 0 + + def test_identify_improvement_opportunities(self, tracker, mock_agent): + tracker.record_task_completion(mock_agent, False, 7200.0, 0.3) # Long time, low quality + tracker.record_task_completion(mock_agent, False, 6000.0, 0.4) + tracker.record_task_completion(mock_agent, True, 5400.0, 0.5) + + opportunities = tracker.identify_improvement_opportunities(mock_agent) + + assert isinstance(opportunities, list) + assert len(opportunities) > 0 + + areas = [opp["area"] for opp in opportunities] + assert "success_rate" in areas or "quality" in areas or "efficiency" in areas + + def test_compare_agent_performance(self, tracker, hierarchy): + agent1 = Mock(spec=BaseAgent) + agent1.role = "Agent 1" + agent2 = Mock(spec=BaseAgent) + agent2.role = "Agent 2" + + capability = AgentCapability( + name="Test Capability", + capability_type=CapabilityType.TECHNICAL, + proficiency_level=0.7, + confidence_score=0.8 + ) + + hierarchy.add_agent(agent1, [capability]) + hierarchy.add_agent(agent2, [capability]) + + tracker.record_task_completion(agent1, True, 1800.0, 0.9) # Good performance + tracker.record_task_completion(agent1, True, 2000.0, 0.8) + + tracker.record_task_completion(agent2, False, 3600.0, 0.4) # Poor performance + tracker.record_task_completion(agent2, True, 4000.0, 0.5) + + comparison = tracker.compare_agent_performance([agent1, agent2], metric="overall") + + assert len(comparison) == 2 + assert comparison[0][1] > comparison[1][1] # First agent should have higher score + + success_comparison = tracker.compare_agent_performance([agent1, agent2], metric="success_rate") + assert len(success_comparison) == 2 + + def test_learning_rate_effect(self, tracker, mock_agent): + original_learning_rate = tracker.learning_rate + + tracker.learning_rate = 0.5 + + initial_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent) + initial_proficiency = initial_capabilities[0].proficiency_level + + tracker.record_task_completion( + mock_agent, True, 1800.0, 0.9, capability_used="Python Programming" + ) + + high_lr_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent) + high_lr_proficiency = high_lr_capabilities[0].proficiency_level + + tracker.hierarchy.update_agent_capability( + mock_agent, "Python Programming", initial_proficiency, 0.8 + ) + tracker.learning_rate = 0.01 + + tracker.record_task_completion( + mock_agent, True, 1800.0, 0.9, capability_used="Python Programming" + ) + + low_lr_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent) + low_lr_proficiency = low_lr_capabilities[0].proficiency_level + + high_lr_change = abs(high_lr_proficiency - initial_proficiency) + low_lr_change = abs(low_lr_proficiency - initial_proficiency) + + assert high_lr_change > low_lr_change + + tracker.learning_rate = original_learning_rate + + def test_performance_metrics_creation(self, tracker, mock_agent): + assert tracker.get_performance_metrics(mock_agent) is None + + tracker.record_task_completion(mock_agent, True, 1800.0) + + metrics = tracker.get_performance_metrics(mock_agent) + assert metrics is not None + assert metrics.agent_id == tracker._get_agent_id(mock_agent)