feat: Implement formal responsibility tracking system for CrewAI

- Add capability-based agent hierarchy with mathematical scoring
- Implement responsibility assignment algorithms (greedy, balanced, optimal)
- Add comprehensive accountability logging and tracking
- Implement performance-based capability adjustment with learning rates
- Integrate with existing Agent and Crew classes seamlessly
- Add 58 comprehensive tests covering all functionality
- Include example usage demonstrating all features

Addresses issue #3491 with four key features:
1. Capability-Based Agent Hierarchy
2. Mathematical Responsibility Assignment
3. Accountability Logging
4. Performance-Based Capability Adjustment

The system is fully backward compatible and optional - existing crews
continue to work without modification.

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-09-10 11:36:31 +00:00
parent 6676d94ba1
commit b6c2493111
18 changed files with 3116 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
"""
Tests for the formal responsibility tracking system.
"""

View File

@@ -0,0 +1,199 @@
"""
Tests for accountability logging system.
"""
import pytest
from datetime import datetime, timedelta
from unittest.mock import Mock
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.responsibility.accountability import AccountabilityLogger
class TestAccountabilityLogger:
@pytest.fixture
def logger(self):
return AccountabilityLogger()
@pytest.fixture
def mock_agent(self):
agent = Mock(spec=BaseAgent)
agent.role = "Test Agent"
return agent
@pytest.fixture
def mock_task(self):
task = Mock(spec=Task)
task.id = "test_task_1"
task.description = "Test task description"
return task
def test_log_action(self, logger, mock_agent, mock_task):
context = {"complexity": "high", "priority": "urgent"}
record = logger.log_action(
agent=mock_agent,
action_type="task_execution",
action_description="Executed data processing task",
task=mock_task,
context=context
)
assert record.agent_id == "Test Agent_" + str(id(mock_agent))
assert record.action_type == "task_execution"
assert record.action_description == "Executed data processing task"
assert record.task_id == "test_task_1"
assert record.context["complexity"] == "high"
assert len(logger.records) == 1
def test_log_decision(self, logger, mock_agent, mock_task):
alternatives = ["Option A", "Option B", "Option C"]
record = logger.log_decision(
agent=mock_agent,
decision="Chose Option A",
reasoning="Best performance characteristics",
task=mock_task,
alternatives_considered=alternatives
)
assert record.action_type == "decision"
assert record.action_description == "Chose Option A"
assert record.context["reasoning"] == "Best performance characteristics"
assert record.context["alternatives_considered"] == alternatives
def test_log_delegation(self, logger, mock_task):
delegating_agent = Mock(spec=BaseAgent)
delegating_agent.role = "Manager"
receiving_agent = Mock(spec=BaseAgent)
receiving_agent.role = "Developer"
record = logger.log_delegation(
delegating_agent=delegating_agent,
receiving_agent=receiving_agent,
task=mock_task,
delegation_reason="Specialized expertise required"
)
assert record.action_type == "delegation"
assert "Delegated task to Developer" in record.action_description
assert record.context["receiving_agent_role"] == "Developer"
assert record.context["delegation_reason"] == "Specialized expertise required"
def test_log_task_completion(self, logger, mock_agent, mock_task):
record = logger.log_task_completion(
agent=mock_agent,
task=mock_task,
success=True,
outcome_description="Task completed successfully with high quality",
completion_time=1800.0
)
assert record.action_type == "task_completion"
assert record.success is True
assert record.outcome == "Task completed successfully with high quality"
assert record.context["completion_time"] == 1800.0
def test_get_agent_records(self, logger, mock_agent, mock_task):
logger.log_action(mock_agent, "action1", "Description 1", mock_task)
logger.log_action(mock_agent, "action2", "Description 2", mock_task)
logger.log_decision(mock_agent, "decision1", "Reasoning", mock_task)
all_records = logger.get_agent_records(mock_agent)
assert len(all_records) == 3
decision_records = logger.get_agent_records(mock_agent, action_type="decision")
assert len(decision_records) == 1
assert decision_records[0].action_type == "decision"
recent_time = datetime.utcnow() - timedelta(minutes=1)
recent_records = logger.get_agent_records(mock_agent, since=recent_time)
assert len(recent_records) == 3 # All should be recent
def test_get_task_records(self, logger, mock_agent, mock_task):
other_task = Mock(spec=Task)
other_task.id = "other_task"
logger.log_action(mock_agent, "action1", "Description 1", mock_task)
logger.log_action(mock_agent, "action2", "Description 2", other_task)
logger.log_action(mock_agent, "action3", "Description 3", mock_task)
task_records = logger.get_task_records(mock_task)
assert len(task_records) == 2
for record in task_records:
assert record.task_id == "test_task_1"
def test_get_delegation_chain(self, logger, mock_task):
manager = Mock(spec=BaseAgent)
manager.role = "Manager"
supervisor = Mock(spec=BaseAgent)
supervisor.role = "Supervisor"
developer = Mock(spec=BaseAgent)
developer.role = "Developer"
logger.log_delegation(manager, supervisor, mock_task, "Initial delegation")
logger.log_delegation(supervisor, developer, mock_task, "Further delegation")
chain = logger.get_delegation_chain(mock_task)
assert len(chain) == 2
assert chain[0].context["receiving_agent_role"] == "Supervisor"
assert chain[1].context["receiving_agent_role"] == "Developer"
def test_generate_accountability_report(self, logger, mock_agent, mock_task):
record1 = logger.log_action(mock_agent, "task_execution", "Task 1", mock_task)
record1.set_outcome("Success", True)
record2 = logger.log_action(mock_agent, "task_execution", "Task 2", mock_task)
record2.set_outcome("Failed", False)
record3 = logger.log_decision(mock_agent, "Decision 1", "Reasoning", mock_task)
record3.set_outcome("Good decision", True)
report = logger.generate_accountability_report(agent=mock_agent)
assert report["total_records"] == 3
assert report["action_counts"]["task_execution"] == 2
assert report["action_counts"]["decision"] == 1
assert report["success_counts"]["task_execution"] == 1
assert report["failure_counts"]["task_execution"] == 1
assert report["success_rates"]["task_execution"] == 0.5
assert report["success_rates"]["decision"] == 1.0
assert len(report["recent_actions"]) == 3
def test_generate_system_wide_report(self, logger, mock_task):
agent1 = Mock(spec=BaseAgent)
agent1.role = "Agent 1"
agent2 = Mock(spec=BaseAgent)
agent2.role = "Agent 2"
logger.log_action(agent1, "task_execution", "Task 1", mock_task)
logger.log_action(agent2, "task_execution", "Task 2", mock_task)
report = logger.generate_accountability_report()
assert report["agent_id"] == "all_agents"
assert report["total_records"] == 2
assert report["action_counts"]["task_execution"] == 2
def test_time_filtered_report(self, logger, mock_agent, mock_task):
logger.log_action(mock_agent, "old_action", "Old action", mock_task)
report = logger.generate_accountability_report(
agent=mock_agent,
time_period=timedelta(hours=1)
)
assert report["total_records"] == 1
report = logger.generate_accountability_report(
agent=mock_agent,
time_period=timedelta(seconds=1)
)

View File

@@ -0,0 +1,221 @@
"""
Tests for mathematical responsibility assignment.
"""
import pytest
from unittest.mock import Mock
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement
from crewai.responsibility.hierarchy import CapabilityHierarchy
from crewai.responsibility.assignment import ResponsibilityCalculator, AssignmentStrategy
class TestResponsibilityCalculator:
@pytest.fixture
def hierarchy(self):
return CapabilityHierarchy()
@pytest.fixture
def calculator(self, hierarchy):
return ResponsibilityCalculator(hierarchy)
@pytest.fixture
def mock_task(self):
task = Mock(spec=Task)
task.id = "test_task_1"
task.description = "Test task description"
return task
@pytest.fixture
def python_agent(self, hierarchy):
agent = Mock(spec=BaseAgent)
agent.role = "Python Developer"
capability = AgentCapability(
name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
proficiency_level=0.9,
confidence_score=0.8,
keywords=["python", "programming"]
)
hierarchy.add_agent(agent, [capability])
return agent
@pytest.fixture
def analysis_agent(self, hierarchy):
agent = Mock(spec=BaseAgent)
agent.role = "Data Analyst"
capability = AgentCapability(
name="Data Analysis",
capability_type=CapabilityType.ANALYTICAL,
proficiency_level=0.8,
confidence_score=0.9,
keywords=["data", "analysis"]
)
hierarchy.add_agent(agent, [capability])
return agent
def test_greedy_assignment(self, calculator, mock_task, python_agent):
requirements = [
TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5,
weight=1.0
)
]
assignment = calculator.calculate_responsibility_assignment(
mock_task, requirements, AssignmentStrategy.GREEDY
)
assert assignment is not None
assert assignment.task_id == "test_task_1"
assert assignment.responsibility_score > 0.5
assert "Python Programming" in assignment.capability_matches
assert "Greedy assignment" in assignment.reasoning
def test_balanced_assignment(self, calculator, mock_task, python_agent, analysis_agent):
calculator.update_workload(python_agent, 5) # High workload
calculator.update_workload(analysis_agent, 1) # Low workload
requirements = [
TaskRequirement(
capability_name="General Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.3,
weight=1.0
)
]
assignment = calculator.calculate_responsibility_assignment(
mock_task, requirements, AssignmentStrategy.BALANCED
)
assert assignment is not None
assert "Balanced assignment" in assignment.reasoning
def test_optimal_assignment(self, calculator, mock_task, python_agent):
requirements = [
TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5,
weight=1.0
)
]
assignment = calculator.calculate_responsibility_assignment(
mock_task, requirements, AssignmentStrategy.OPTIMAL
)
assert assignment is not None
assert "Optimal assignment" in assignment.reasoning
def test_multi_agent_assignment(self, calculator, mock_task, python_agent, analysis_agent):
requirements = [
TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5,
weight=1.0
),
TaskRequirement(
capability_name="Data Analysis",
capability_type=CapabilityType.ANALYTICAL,
minimum_proficiency=0.5,
weight=0.8
)
]
assignments = calculator.calculate_multi_agent_assignment(
mock_task, requirements, max_agents=2
)
assert len(assignments) <= 2
assert len(assignments) > 0
agent_ids = [assignment.agent_id for assignment in assignments]
assert len(agent_ids) == len(set(agent_ids))
def test_workload_update(self, calculator, python_agent):
initial_workload = calculator.current_workloads.get(
calculator.hierarchy._get_agent_id(python_agent), 0
)
calculator.update_workload(python_agent, 3)
new_workload = calculator.current_workloads.get(
calculator.hierarchy._get_agent_id(python_agent), 0
)
assert new_workload == initial_workload + 3
calculator.update_workload(python_agent, -2)
final_workload = calculator.current_workloads.get(
calculator.hierarchy._get_agent_id(python_agent), 0
)
assert final_workload == new_workload - 2
def test_workload_distribution(self, calculator, python_agent, analysis_agent):
calculator.update_workload(python_agent, 3)
calculator.update_workload(analysis_agent, 1)
distribution = calculator.get_workload_distribution()
python_id = calculator.hierarchy._get_agent_id(python_agent)
analysis_id = calculator.hierarchy._get_agent_id(analysis_agent)
assert distribution[python_id] == 3
assert distribution[analysis_id] == 1
def test_exclude_agents(self, calculator, mock_task, python_agent, analysis_agent):
requirements = [
TaskRequirement(
capability_name="Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.3,
weight=1.0
)
]
assignment = calculator.calculate_responsibility_assignment(
mock_task, requirements, AssignmentStrategy.GREEDY,
exclude_agents=[python_agent]
)
if assignment: # If any agent was assigned
python_id = calculator.hierarchy._get_agent_id(python_agent)
assert assignment.agent_id != python_id
def test_no_capable_agents(self, calculator, mock_task):
requirements = [
TaskRequirement(
capability_name="Quantum Computing",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.9,
weight=1.0
)
]
assignment = calculator.calculate_responsibility_assignment(
mock_task, requirements, AssignmentStrategy.GREEDY
)
assert assignment is None
def test_workload_penalty_calculation(self, calculator):
assert calculator._calculate_workload_penalty(0) == 0.0
penalty_1 = calculator._calculate_workload_penalty(1)
penalty_5 = calculator._calculate_workload_penalty(5)
assert penalty_1 < penalty_5 # Higher workload should have higher penalty
assert penalty_5 <= 0.8 # Should not exceed maximum penalty

View File

@@ -0,0 +1,208 @@
"""
Tests for capability-based agent hierarchy.
"""
import pytest
from unittest.mock import Mock
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement
from crewai.responsibility.hierarchy import CapabilityHierarchy
class TestCapabilityHierarchy:
@pytest.fixture
def hierarchy(self):
return CapabilityHierarchy()
@pytest.fixture
def mock_agent(self):
agent = Mock(spec=BaseAgent)
agent.role = "Test Agent"
return agent
@pytest.fixture
def python_capability(self):
return AgentCapability(
name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
proficiency_level=0.8,
confidence_score=0.9,
keywords=["python", "programming"]
)
@pytest.fixture
def analysis_capability(self):
return AgentCapability(
name="Data Analysis",
capability_type=CapabilityType.ANALYTICAL,
proficiency_level=0.7,
confidence_score=0.8,
keywords=["data", "analysis", "statistics"]
)
def test_add_agent(self, hierarchy, mock_agent, python_capability):
capabilities = [python_capability]
hierarchy.add_agent(mock_agent, capabilities)
assert len(hierarchy.agents) == 1
assert len(hierarchy.agent_capabilities) == 1
assert "Python Programming" in hierarchy.capability_index
def test_remove_agent(self, hierarchy, mock_agent, python_capability):
capabilities = [python_capability]
hierarchy.add_agent(mock_agent, capabilities)
assert len(hierarchy.agents) == 1
hierarchy.remove_agent(mock_agent)
assert len(hierarchy.agents) == 0
assert len(hierarchy.agent_capabilities) == 0
assert len(hierarchy.capability_index["Python Programming"]) == 0
def test_supervision_relationship(self, hierarchy):
supervisor = Mock(spec=BaseAgent)
supervisor.role = "Supervisor"
subordinate = Mock(spec=BaseAgent)
subordinate.role = "Subordinate"
hierarchy.add_agent(supervisor, [])
hierarchy.add_agent(subordinate, [])
hierarchy.set_supervision_relationship(supervisor, subordinate)
subordinates = hierarchy.get_subordinates(supervisor)
assert len(subordinates) == 1
assert subordinates[0] == subordinate
def test_update_agent_capability(self, hierarchy, mock_agent, python_capability):
hierarchy.add_agent(mock_agent, [python_capability])
success = hierarchy.update_agent_capability(
mock_agent, "Python Programming", 0.9, 0.95
)
assert success is True
capabilities = hierarchy.get_agent_capabilities(mock_agent)
updated_cap = next(cap for cap in capabilities if cap.name == "Python Programming")
assert updated_cap.proficiency_level == 0.9
assert updated_cap.confidence_score == 0.95
def test_find_capable_agents(self, hierarchy, mock_agent, python_capability):
hierarchy.add_agent(mock_agent, [python_capability])
requirements = [
TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5,
weight=1.0
)
]
capable_agents = hierarchy.find_capable_agents(requirements)
assert len(capable_agents) == 1
assert capable_agents[0][0] == mock_agent
assert capable_agents[0][1] > 0.5 # Should have a good match score
def test_get_best_agent_for_task(self, hierarchy, python_capability, analysis_capability):
agent1 = Mock(spec=BaseAgent)
agent1.role = "Python Developer"
agent2 = Mock(spec=BaseAgent)
agent2.role = "Data Analyst"
hierarchy.add_agent(agent1, [python_capability])
hierarchy.add_agent(agent2, [analysis_capability])
requirements = [
TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5,
weight=1.0
)
]
result = hierarchy.get_best_agent_for_task(requirements)
assert result is not None
best_agent, score, matches = result
assert best_agent == agent1 # Python developer should be chosen
assert "Python Programming" in matches
def test_capability_distribution(self, hierarchy, python_capability, analysis_capability):
agent1 = Mock(spec=BaseAgent)
agent1.role = "Developer"
agent2 = Mock(spec=BaseAgent)
agent2.role = "Analyst"
hierarchy.add_agent(agent1, [python_capability])
hierarchy.add_agent(agent2, [analysis_capability])
distribution = hierarchy.get_capability_distribution()
assert CapabilityType.TECHNICAL in distribution
assert CapabilityType.ANALYTICAL in distribution
assert distribution[CapabilityType.TECHNICAL]["high"] == 1 # Python capability is high proficiency
assert distribution[CapabilityType.ANALYTICAL]["medium"] == 1 # Analysis capability is medium proficiency
def test_hierarchy_path(self, hierarchy):
manager = Mock(spec=BaseAgent)
manager.role = "Manager"
supervisor = Mock(spec=BaseAgent)
supervisor.role = "Supervisor"
worker = Mock(spec=BaseAgent)
worker.role = "Worker"
hierarchy.add_agent(manager, [])
hierarchy.add_agent(supervisor, [])
hierarchy.add_agent(worker, [])
hierarchy.set_supervision_relationship(manager, supervisor)
hierarchy.set_supervision_relationship(supervisor, worker)
path = hierarchy.get_hierarchy_path(manager, worker)
assert path is not None
assert len(path) == 3
assert path[0] == manager
assert path[1] == supervisor
assert path[2] == worker
def test_capabilities_match(self, hierarchy, python_capability):
requirement = TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5
)
assert hierarchy._capabilities_match(python_capability, requirement) is True
requirement2 = TaskRequirement(
capability_name="Different Name",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5
)
assert hierarchy._capabilities_match(python_capability, requirement2) is True
requirement3 = TaskRequirement(
capability_name="Different Name",
capability_type=CapabilityType.ANALYTICAL,
minimum_proficiency=0.5,
keywords=["python"]
)
assert hierarchy._capabilities_match(python_capability, requirement3) is True
requirement4 = TaskRequirement(
capability_name="Different Name",
capability_type=CapabilityType.ANALYTICAL,
minimum_proficiency=0.5,
keywords=["java"]
)
assert hierarchy._capabilities_match(python_capability, requirement4) is False

View File

@@ -0,0 +1,285 @@
"""
Integration tests for the responsibility tracking system.
"""
import pytest
from unittest.mock import Mock
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement
from crewai.responsibility.system import ResponsibilitySystem
from crewai.responsibility.assignment import AssignmentStrategy
class TestResponsibilitySystemIntegration:
@pytest.fixture
def system(self):
return ResponsibilitySystem()
@pytest.fixture
def python_agent(self):
agent = Mock(spec=BaseAgent)
agent.role = "Python Developer"
return agent
@pytest.fixture
def analysis_agent(self):
agent = Mock(spec=BaseAgent)
agent.role = "Data Analyst"
return agent
@pytest.fixture
def python_capability(self):
return AgentCapability(
name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
proficiency_level=0.9,
confidence_score=0.8,
keywords=["python", "programming", "development"]
)
@pytest.fixture
def analysis_capability(self):
return AgentCapability(
name="Data Analysis",
capability_type=CapabilityType.ANALYTICAL,
proficiency_level=0.8,
confidence_score=0.9,
keywords=["data", "analysis", "statistics"]
)
@pytest.fixture
def mock_task(self):
task = Mock(spec=Task)
task.id = "integration_test_task"
task.description = "Complex data processing task requiring Python skills"
return task
def test_full_workflow(self, system, python_agent, python_capability, mock_task):
"""Test complete workflow from agent registration to task completion."""
system.register_agent(python_agent, [python_capability])
status = system.get_agent_status(python_agent)
assert status["role"] == "Python Developer"
assert len(status["capabilities"]) == 1
assert status["capabilities"][0]["name"] == "Python Programming"
requirements = [
TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5,
weight=1.0
)
]
assignment = system.assign_task_responsibility(mock_task, requirements)
assert assignment is not None
assert assignment.task_id == "integration_test_task"
assert assignment.responsibility_score > 0.5
updated_status = system.get_agent_status(python_agent)
assert updated_status["current_workload"] == 1
system.complete_task(
agent=python_agent,
task=mock_task,
success=True,
completion_time=1800.0,
quality_score=0.9,
outcome_description="Task completed successfully"
)
final_status = system.get_agent_status(python_agent)
assert final_status["performance"]["total_tasks"] == 1
assert final_status["performance"]["success_rate"] == 1.0
assert final_status["current_workload"] == 0 # Should be decremented
def test_multi_agent_scenario(self, system, python_agent, analysis_agent,
python_capability, analysis_capability, mock_task):
"""Test scenario with multiple agents and capabilities."""
system.register_agent(python_agent, [python_capability])
system.register_agent(analysis_agent, [analysis_capability])
requirements = [
TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.7,
weight=1.0
),
TaskRequirement(
capability_name="Data Analysis",
capability_type=CapabilityType.ANALYTICAL,
minimum_proficiency=0.6,
weight=0.8
)
]
greedy_assignment = system.assign_task_responsibility(
mock_task, requirements, AssignmentStrategy.GREEDY
)
assert greedy_assignment is not None
system.calculator.update_workload(python_agent, 5)
balanced_assignment = system.assign_task_responsibility(
mock_task, requirements, AssignmentStrategy.BALANCED
)
assert balanced_assignment is not None
def test_delegation_workflow(self, system, python_agent, analysis_agent,
python_capability, analysis_capability, mock_task):
"""Test task delegation between agents."""
system.register_agent(python_agent, [python_capability], supervisor=None)
system.register_agent(analysis_agent, [analysis_capability], supervisor=python_agent)
system.delegate_task(
delegating_agent=python_agent,
receiving_agent=analysis_agent,
task=mock_task,
reason="Analysis expertise required"
)
python_status = system.get_agent_status(python_agent)
analysis_status = system.get_agent_status(analysis_agent)
assert analysis_status["current_workload"] > 0
delegation_records = system.accountability.get_agent_records(
python_agent, action_type="delegation"
)
assert len(delegation_records) > 0
def test_performance_based_capability_adjustment(self, system, python_agent,
python_capability, mock_task):
"""Test that capabilities are adjusted based on performance."""
system.register_agent(python_agent, [python_capability])
initial_capabilities = system.hierarchy.get_agent_capabilities(python_agent)
initial_proficiency = initial_capabilities[0].proficiency_level
for i in range(5):
task = Mock(spec=Task)
task.id = f"task_{i}"
task.description = f"Task {i}"
system.complete_task(
agent=python_agent,
task=task,
success=True,
completion_time=1800.0,
quality_score=0.9
)
updated_capabilities = system.hierarchy.get_agent_capabilities(python_agent)
assert len(updated_capabilities) == 1
def test_system_overview_and_recommendations(self, system, python_agent,
analysis_agent, python_capability,
analysis_capability):
"""Test system overview and recommendation generation."""
system.register_agent(python_agent, [python_capability])
system.register_agent(analysis_agent, [analysis_capability])
overview = system.get_system_overview()
assert overview["enabled"] is True
assert overview["total_agents"] == 2
assert "capability_distribution" in overview
assert "system_performance" in overview
recommendations = system.generate_recommendations()
assert isinstance(recommendations, list)
def test_system_enable_disable(self, system, python_agent, python_capability, mock_task):
"""Test enabling and disabling the responsibility system."""
assert system.enabled is True
system.register_agent(python_agent, [python_capability])
requirements = [
TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5,
weight=1.0
)
]
assignment = system.assign_task_responsibility(mock_task, requirements)
assert assignment is not None
system.disable_system()
assert system.enabled is False
disabled_assignment = system.assign_task_responsibility(mock_task, requirements)
assert disabled_assignment is None
disabled_status = system.get_agent_status(python_agent)
assert disabled_status == {}
system.enable_system()
assert system.enabled is True
enabled_assignment = system.assign_task_responsibility(mock_task, requirements)
assert enabled_assignment is not None
def test_accountability_tracking_integration(self, system, python_agent,
python_capability, mock_task):
"""Test that all operations are properly logged for accountability."""
system.register_agent(python_agent, [python_capability])
registration_records = system.accountability.get_agent_records(
python_agent, action_type="registration"
)
assert len(registration_records) == 1
requirements = [
TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.5,
weight=1.0
)
]
assignment = system.assign_task_responsibility(mock_task, requirements)
assignment_records = system.accountability.get_agent_records(
python_agent, action_type="task_assignment"
)
assert len(assignment_records) == 1
system.complete_task(
agent=python_agent,
task=mock_task,
success=True,
completion_time=1800.0,
quality_score=0.9
)
completion_records = system.accountability.get_agent_records(
python_agent, action_type="task_completion"
)
assert len(completion_records) == 1
report = system.accountability.generate_accountability_report(agent=python_agent)
assert report["total_records"] >= 3 # At least registration, assignment, completion
assert "registration" in report["action_counts"]
assert "task_assignment" in report["action_counts"]
assert "task_completion" in report["action_counts"]

View File

@@ -0,0 +1,187 @@
"""
Tests for responsibility tracking data models.
"""
import pytest
from datetime import datetime, timedelta
from uuid import uuid4
from crewai.responsibility.models import (
AgentCapability,
CapabilityType,
ResponsibilityAssignment,
AccountabilityRecord,
PerformanceMetrics,
TaskRequirement
)
class TestAgentCapability:
def test_create_capability(self):
capability = AgentCapability(
name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
proficiency_level=0.8,
confidence_score=0.9,
description="Expert in Python development",
keywords=["python", "programming", "development"]
)
assert capability.name == "Python Programming"
assert capability.capability_type == CapabilityType.TECHNICAL
assert capability.proficiency_level == 0.8
assert capability.confidence_score == 0.9
assert "python" in capability.keywords
def test_update_proficiency(self):
capability = AgentCapability(
name="Data Analysis",
capability_type=CapabilityType.ANALYTICAL,
proficiency_level=0.5,
confidence_score=0.6
)
old_updated = capability.last_updated
capability.update_proficiency(0.7, 0.8)
assert capability.proficiency_level == 0.7
assert capability.confidence_score == 0.8
assert capability.last_updated > old_updated
def test_proficiency_bounds(self):
capability = AgentCapability(
name="Test",
capability_type=CapabilityType.TECHNICAL,
proficiency_level=0.5,
confidence_score=0.5
)
capability.update_proficiency(1.5, 1.2)
assert capability.proficiency_level == 1.0
assert capability.confidence_score == 1.0
capability.update_proficiency(-0.5, -0.2)
assert capability.proficiency_level == 0.0
assert capability.confidence_score == 0.0
class TestResponsibilityAssignment:
def test_create_assignment(self):
assignment = ResponsibilityAssignment(
agent_id="agent_1",
task_id="task_1",
responsibility_score=0.85,
capability_matches=["Python Programming", "Data Analysis"],
reasoning="Best match for technical requirements"
)
assert assignment.agent_id == "agent_1"
assert assignment.task_id == "task_1"
assert assignment.responsibility_score == 0.85
assert len(assignment.capability_matches) == 2
assert assignment.success is None
def test_mark_completed(self):
assignment = ResponsibilityAssignment(
agent_id="agent_1",
task_id="task_1",
responsibility_score=0.85,
reasoning="Test assignment"
)
assert assignment.completed_at is None
assert assignment.success is None
assignment.mark_completed(True)
assert assignment.completed_at is not None
assert assignment.success is True
class TestAccountabilityRecord:
def test_create_record(self):
record = AccountabilityRecord(
agent_id="agent_1",
action_type="task_execution",
action_description="Executed data analysis task",
task_id="task_1",
context={"complexity": "high", "duration": 3600}
)
assert record.agent_id == "agent_1"
assert record.action_type == "task_execution"
assert record.context["complexity"] == "high"
assert record.outcome is None
def test_set_outcome(self):
record = AccountabilityRecord(
agent_id="agent_1",
action_type="decision",
action_description="Chose algorithm X"
)
record.set_outcome("Algorithm performed well", True)
assert record.outcome == "Algorithm performed well"
assert record.success is True
class TestPerformanceMetrics:
def test_create_metrics(self):
metrics = PerformanceMetrics(agent_id="agent_1")
assert metrics.agent_id == "agent_1"
assert metrics.total_tasks == 0
assert metrics.success_rate == 0.0
assert metrics.quality_score == 0.5
def test_update_metrics_success(self):
metrics = PerformanceMetrics(agent_id="agent_1")
metrics.update_metrics(True, 1800, 0.8)
assert metrics.total_tasks == 1
assert metrics.successful_tasks == 1
assert metrics.failed_tasks == 0
assert metrics.success_rate == 1.0
assert metrics.average_completion_time == 1800
assert metrics.reliability_score == 1.0
def test_update_metrics_failure(self):
metrics = PerformanceMetrics(agent_id="agent_1")
metrics.update_metrics(False, 3600)
assert metrics.total_tasks == 1
assert metrics.successful_tasks == 0
assert metrics.failed_tasks == 1
assert metrics.success_rate == 0.0
def test_update_metrics_mixed(self):
metrics = PerformanceMetrics(agent_id="agent_1")
metrics.update_metrics(True, 1800, 0.8)
metrics.update_metrics(False, 3600, 0.3)
metrics.update_metrics(True, 2400, 0.9)
assert metrics.total_tasks == 3
assert metrics.successful_tasks == 2
assert metrics.failed_tasks == 1
assert abs(metrics.success_rate - 2/3) < 0.001
class TestTaskRequirement:
def test_create_requirement(self):
requirement = TaskRequirement(
capability_name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
minimum_proficiency=0.7,
weight=1.5,
keywords=["python", "coding"]
)
assert requirement.capability_name == "Python Programming"
assert requirement.capability_type == CapabilityType.TECHNICAL
assert requirement.minimum_proficiency == 0.7
assert requirement.weight == 1.5
assert "python" in requirement.keywords

View File

@@ -0,0 +1,226 @@
"""
Tests for performance-based capability adjustment.
"""
import pytest
from datetime import timedelta
from unittest.mock import Mock
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.responsibility.models import AgentCapability, CapabilityType, PerformanceMetrics
from crewai.responsibility.hierarchy import CapabilityHierarchy
from crewai.responsibility.performance import PerformanceTracker
class TestPerformanceTracker:
@pytest.fixture
def hierarchy(self):
return CapabilityHierarchy()
@pytest.fixture
def tracker(self, hierarchy):
return PerformanceTracker(hierarchy)
@pytest.fixture
def mock_agent(self, hierarchy):
agent = Mock(spec=BaseAgent)
agent.role = "Test Agent"
capability = AgentCapability(
name="Python Programming",
capability_type=CapabilityType.TECHNICAL,
proficiency_level=0.7,
confidence_score=0.8
)
hierarchy.add_agent(agent, [capability])
return agent
def test_record_task_completion_success(self, tracker, mock_agent):
tracker.record_task_completion(
agent=mock_agent,
task_success=True,
completion_time=1800.0,
quality_score=0.9
)
metrics = tracker.get_performance_metrics(mock_agent)
assert metrics is not None
assert metrics.total_tasks == 1
assert metrics.successful_tasks == 1
assert metrics.failed_tasks == 0
assert metrics.success_rate == 1.0
assert metrics.average_completion_time == 1800.0
assert metrics.quality_score > 0.5 # Should be updated towards 0.9
def test_record_task_completion_failure(self, tracker, mock_agent):
tracker.record_task_completion(
agent=mock_agent,
task_success=False,
completion_time=3600.0,
quality_score=0.3
)
metrics = tracker.get_performance_metrics(mock_agent)
assert metrics is not None
assert metrics.total_tasks == 1
assert metrics.successful_tasks == 0
assert metrics.failed_tasks == 1
assert metrics.success_rate == 0.0
def test_multiple_task_completions(self, tracker, mock_agent):
tracker.record_task_completion(mock_agent, True, 1800.0, 0.8)
tracker.record_task_completion(mock_agent, False, 3600.0, 0.4)
tracker.record_task_completion(mock_agent, True, 2400.0, 0.9)
metrics = tracker.get_performance_metrics(mock_agent)
assert metrics.total_tasks == 3
assert metrics.successful_tasks == 2
assert metrics.failed_tasks == 1
assert abs(metrics.success_rate - 2/3) < 0.001
def test_capability_adjustment_on_success(self, tracker, mock_agent):
initial_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
initial_proficiency = initial_capabilities[0].proficiency_level
tracker.record_task_completion(
agent=mock_agent,
task_success=True,
completion_time=1800.0,
quality_score=0.9,
capability_used="Python Programming"
)
updated_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
updated_proficiency = updated_capabilities[0].proficiency_level
assert updated_proficiency >= initial_proficiency
def test_capability_adjustment_on_failure(self, tracker, mock_agent):
initial_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
initial_proficiency = initial_capabilities[0].proficiency_level
tracker.record_task_completion(
agent=mock_agent,
task_success=False,
completion_time=3600.0,
quality_score=0.2,
capability_used="Python Programming"
)
updated_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
updated_proficiency = updated_capabilities[0].proficiency_level
assert updated_proficiency <= initial_proficiency
def test_adjust_capabilities_based_on_performance(self, tracker, mock_agent):
for _ in range(5):
tracker.record_task_completion(mock_agent, True, 1800.0, 0.9)
for _ in range(2):
tracker.record_task_completion(mock_agent, False, 3600.0, 0.3)
adjustments = tracker.adjust_capabilities_based_on_performance(mock_agent)
assert isinstance(adjustments, list)
def test_get_performance_trends(self, tracker, mock_agent):
tracker.record_task_completion(mock_agent, True, 1800.0, 0.8)
tracker.record_task_completion(mock_agent, True, 2000.0, 0.9)
trends = tracker.get_performance_trends(mock_agent)
assert "success_rate" in trends
assert "quality_score" in trends
assert "efficiency_score" in trends
assert "reliability_score" in trends
assert len(trends["success_rate"]) > 0
def test_identify_improvement_opportunities(self, tracker, mock_agent):
tracker.record_task_completion(mock_agent, False, 7200.0, 0.3) # Long time, low quality
tracker.record_task_completion(mock_agent, False, 6000.0, 0.4)
tracker.record_task_completion(mock_agent, True, 5400.0, 0.5)
opportunities = tracker.identify_improvement_opportunities(mock_agent)
assert isinstance(opportunities, list)
assert len(opportunities) > 0
areas = [opp["area"] for opp in opportunities]
assert "success_rate" in areas or "quality" in areas or "efficiency" in areas
def test_compare_agent_performance(self, tracker, hierarchy):
agent1 = Mock(spec=BaseAgent)
agent1.role = "Agent 1"
agent2 = Mock(spec=BaseAgent)
agent2.role = "Agent 2"
capability = AgentCapability(
name="Test Capability",
capability_type=CapabilityType.TECHNICAL,
proficiency_level=0.7,
confidence_score=0.8
)
hierarchy.add_agent(agent1, [capability])
hierarchy.add_agent(agent2, [capability])
tracker.record_task_completion(agent1, True, 1800.0, 0.9) # Good performance
tracker.record_task_completion(agent1, True, 2000.0, 0.8)
tracker.record_task_completion(agent2, False, 3600.0, 0.4) # Poor performance
tracker.record_task_completion(agent2, True, 4000.0, 0.5)
comparison = tracker.compare_agent_performance([agent1, agent2], metric="overall")
assert len(comparison) == 2
assert comparison[0][1] > comparison[1][1] # First agent should have higher score
success_comparison = tracker.compare_agent_performance([agent1, agent2], metric="success_rate")
assert len(success_comparison) == 2
def test_learning_rate_effect(self, tracker, mock_agent):
original_learning_rate = tracker.learning_rate
tracker.learning_rate = 0.5
initial_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
initial_proficiency = initial_capabilities[0].proficiency_level
tracker.record_task_completion(
mock_agent, True, 1800.0, 0.9, capability_used="Python Programming"
)
high_lr_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
high_lr_proficiency = high_lr_capabilities[0].proficiency_level
tracker.hierarchy.update_agent_capability(
mock_agent, "Python Programming", initial_proficiency, 0.8
)
tracker.learning_rate = 0.01
tracker.record_task_completion(
mock_agent, True, 1800.0, 0.9, capability_used="Python Programming"
)
low_lr_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
low_lr_proficiency = low_lr_capabilities[0].proficiency_level
high_lr_change = abs(high_lr_proficiency - initial_proficiency)
low_lr_change = abs(low_lr_proficiency - initial_proficiency)
assert high_lr_change > low_lr_change
tracker.learning_rate = original_learning_rate
def test_performance_metrics_creation(self, tracker, mock_agent):
assert tracker.get_performance_metrics(mock_agent) is None
tracker.record_task_completion(mock_agent, True, 1800.0)
metrics = tracker.get_performance_metrics(mock_agent)
assert metrics is not None
assert metrics.agent_id == tracker._get_agent_id(mock_agent)