mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-07 15:18:29 +00:00
Compare commits
4 Commits
devin/1763
...
devin/1757
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d53bb141d8 | ||
|
|
ac93c81076 | ||
|
|
2c59748437 | ||
|
|
b6c2493111 |
297
examples/responsibility_tracking_example.py
Normal file
297
examples/responsibility_tracking_example.py
Normal file
@@ -0,0 +1,297 @@
|
||||
"""
|
||||
Example demonstrating the formal responsibility tracking system in CrewAI.
|
||||
|
||||
This example shows how to:
|
||||
1. Set up agents with capabilities
|
||||
2. Use responsibility-based task assignment
|
||||
3. Monitor accountability and performance
|
||||
4. Generate system insights and recommendations
|
||||
"""
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement
|
||||
from crewai.responsibility.system import ResponsibilitySystem
|
||||
from crewai.responsibility.assignment import AssignmentStrategy
|
||||
|
||||
|
||||
def create_agents_with_capabilities():
|
||||
"""Create agents with defined capabilities."""
|
||||
|
||||
python_capabilities = [
|
||||
AgentCapability(
|
||||
name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
proficiency_level=0.9,
|
||||
confidence_score=0.8,
|
||||
description="Expert in Python development and scripting",
|
||||
keywords=["python", "programming", "development", "scripting"]
|
||||
),
|
||||
AgentCapability(
|
||||
name="Web Development",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
proficiency_level=0.7,
|
||||
confidence_score=0.7,
|
||||
description="Experience with web frameworks",
|
||||
keywords=["web", "flask", "django", "fastapi"]
|
||||
)
|
||||
]
|
||||
|
||||
python_agent = Agent(
|
||||
role="Python Developer",
|
||||
goal="Develop high-quality Python applications and scripts",
|
||||
backstory="Experienced Python developer with expertise in various frameworks",
|
||||
capabilities=python_capabilities
|
||||
)
|
||||
|
||||
analysis_capabilities = [
|
||||
AgentCapability(
|
||||
name="Data Analysis",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
proficiency_level=0.9,
|
||||
confidence_score=0.9,
|
||||
description="Expert in statistical analysis and data interpretation",
|
||||
keywords=["data", "analysis", "statistics", "pandas", "numpy"]
|
||||
),
|
||||
AgentCapability(
|
||||
name="Machine Learning",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
proficiency_level=0.8,
|
||||
confidence_score=0.7,
|
||||
description="Experience with ML algorithms and model building",
|
||||
keywords=["machine learning", "ml", "scikit-learn", "tensorflow"]
|
||||
)
|
||||
]
|
||||
|
||||
analyst_agent = Agent(
|
||||
role="Data Analyst",
|
||||
goal="Extract insights from data and build predictive models",
|
||||
backstory="Data scientist with strong statistical background",
|
||||
capabilities=analysis_capabilities
|
||||
)
|
||||
|
||||
management_capabilities = [
|
||||
AgentCapability(
|
||||
name="Project Management",
|
||||
capability_type=CapabilityType.LEADERSHIP,
|
||||
proficiency_level=0.8,
|
||||
confidence_score=0.9,
|
||||
description="Experienced in managing technical projects",
|
||||
keywords=["project management", "coordination", "planning"]
|
||||
),
|
||||
AgentCapability(
|
||||
name="Communication",
|
||||
capability_type=CapabilityType.COMMUNICATION,
|
||||
proficiency_level=0.9,
|
||||
confidence_score=0.8,
|
||||
description="Excellent communication and coordination skills",
|
||||
keywords=["communication", "coordination", "stakeholder management"]
|
||||
)
|
||||
]
|
||||
|
||||
manager_agent = Agent(
|
||||
role="Project Manager",
|
||||
goal="Coordinate team efforts and ensure project success",
|
||||
backstory="Experienced project manager with technical background",
|
||||
capabilities=management_capabilities
|
||||
)
|
||||
|
||||
return [python_agent, analyst_agent, manager_agent]
|
||||
|
||||
|
||||
def create_tasks_with_requirements():
|
||||
"""Create tasks with specific capability requirements."""
|
||||
|
||||
data_processing_requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.7,
|
||||
weight=1.0,
|
||||
keywords=["python", "programming"]
|
||||
),
|
||||
TaskRequirement(
|
||||
capability_name="Data Analysis",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
minimum_proficiency=0.6,
|
||||
weight=0.8,
|
||||
keywords=["data", "analysis"]
|
||||
)
|
||||
]
|
||||
|
||||
data_task = Task(
|
||||
description="Create a Python script to process and analyze customer data",
|
||||
expected_output="A Python script that processes CSV data and generates summary statistics"
|
||||
)
|
||||
|
||||
web_dashboard_requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Web Development",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.6,
|
||||
weight=1.0,
|
||||
keywords=["web", "development"]
|
||||
),
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=0.7,
|
||||
keywords=["python", "programming"]
|
||||
)
|
||||
]
|
||||
|
||||
web_task = Task(
|
||||
description="Create a web dashboard to visualize data analysis results",
|
||||
expected_output="A web application with interactive charts and data visualization"
|
||||
)
|
||||
|
||||
coordination_requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Project Management",
|
||||
capability_type=CapabilityType.LEADERSHIP,
|
||||
minimum_proficiency=0.7,
|
||||
weight=1.0,
|
||||
keywords=["project management", "coordination"]
|
||||
),
|
||||
TaskRequirement(
|
||||
capability_name="Communication",
|
||||
capability_type=CapabilityType.COMMUNICATION,
|
||||
minimum_proficiency=0.8,
|
||||
weight=0.9,
|
||||
keywords=["communication", "coordination"]
|
||||
)
|
||||
]
|
||||
|
||||
coordination_task = Task(
|
||||
description="Coordinate the team efforts and ensure project milestones are met",
|
||||
expected_output="Project status report with timeline and deliverable tracking"
|
||||
)
|
||||
|
||||
return [
|
||||
(data_task, data_processing_requirements),
|
||||
(web_task, web_dashboard_requirements),
|
||||
(coordination_task, coordination_requirements)
|
||||
]
|
||||
|
||||
|
||||
def demonstrate_responsibility_tracking():
|
||||
"""Demonstrate the complete responsibility tracking workflow."""
|
||||
|
||||
print("🚀 CrewAI Formal Responsibility Tracking System Demo")
|
||||
print("=" * 60)
|
||||
|
||||
print("\n1. Creating agents with defined capabilities...")
|
||||
agents = create_agents_with_capabilities()
|
||||
|
||||
for agent in agents:
|
||||
print(f" ✓ {agent.role}: {len(agent.capabilities)} capabilities")
|
||||
|
||||
print("\n2. Setting up crew with responsibility tracking...")
|
||||
crew = Crew(
|
||||
agents=agents,
|
||||
tasks=[],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
responsibility_system = crew.responsibility_system
|
||||
print(f" ✓ Responsibility system enabled: {responsibility_system.enabled}")
|
||||
|
||||
print("\n3. System overview:")
|
||||
overview = responsibility_system.get_system_overview()
|
||||
print(f" • Total agents: {overview['total_agents']}")
|
||||
print(f" • Capability distribution: {overview['capability_distribution']}")
|
||||
|
||||
print("\n4. Creating tasks with capability requirements...")
|
||||
tasks_with_requirements = create_tasks_with_requirements()
|
||||
|
||||
print("\n5. Demonstrating responsibility assignment strategies...")
|
||||
|
||||
for i, (task, requirements) in enumerate(tasks_with_requirements):
|
||||
print(f"\n Task {i+1}: {task.description[:50]}...")
|
||||
|
||||
for strategy in [AssignmentStrategy.GREEDY, AssignmentStrategy.BALANCED, AssignmentStrategy.OPTIMAL]:
|
||||
assignment = responsibility_system.assign_task_responsibility(
|
||||
task, requirements, strategy
|
||||
)
|
||||
|
||||
if assignment:
|
||||
agent = responsibility_system._get_agent_by_id(assignment.agent_id)
|
||||
print(f" • {strategy.value}: {agent.role} (score: {assignment.responsibility_score:.3f})")
|
||||
print(f" Capabilities matched: {', '.join(assignment.capability_matches)}")
|
||||
|
||||
responsibility_system.complete_task(
|
||||
agent=agent,
|
||||
task=task,
|
||||
success=True,
|
||||
completion_time=1800.0,
|
||||
quality_score=0.85,
|
||||
outcome_description="Task completed successfully"
|
||||
)
|
||||
else:
|
||||
print(f" • {strategy.value}: No suitable agent found")
|
||||
|
||||
print("\n6. Agent status and performance:")
|
||||
for agent in agents:
|
||||
status = responsibility_system.get_agent_status(agent)
|
||||
print(f"\n {agent.role}:")
|
||||
print(f" • Current workload: {status['current_workload']}")
|
||||
if status['performance']:
|
||||
perf = status['performance']
|
||||
print(f" • Success rate: {perf['success_rate']:.2f}")
|
||||
print(f" • Quality score: {perf['quality_score']:.2f}")
|
||||
print(f" • Total tasks: {perf['total_tasks']}")
|
||||
|
||||
print("\n7. Accountability tracking:")
|
||||
for agent in agents:
|
||||
report = responsibility_system.accountability.generate_accountability_report(agent=agent)
|
||||
if report['total_records'] > 0:
|
||||
print(f"\n {agent.role} accountability:")
|
||||
print(f" • Total records: {report['total_records']}")
|
||||
print(f" • Action types: {list(report['action_counts'].keys())}")
|
||||
print(f" • Recent actions: {len(report['recent_actions'])}")
|
||||
|
||||
print("\n8. System recommendations:")
|
||||
recommendations = responsibility_system.generate_recommendations()
|
||||
if recommendations:
|
||||
for rec in recommendations:
|
||||
print(f" • {rec['type']}: {rec['description']} (Priority: {rec['priority']})")
|
||||
else:
|
||||
print(" • No recommendations at this time")
|
||||
|
||||
print("\n9. Demonstrating task delegation:")
|
||||
if len(agents) >= 2:
|
||||
delegation_task = Task(
|
||||
description="Complex task requiring delegation",
|
||||
expected_output="Delegated task completion report"
|
||||
)
|
||||
|
||||
responsibility_system.delegate_task(
|
||||
delegating_agent=agents[0],
|
||||
receiving_agent=agents[1],
|
||||
task=delegation_task,
|
||||
reason="Specialized expertise required"
|
||||
)
|
||||
|
||||
print(f" ✓ Delegated task from {agents[0].role} to {agents[1].role}")
|
||||
|
||||
delegation_records = responsibility_system.accountability.get_delegation_chain(delegation_task)
|
||||
print(f" • Delegation chain length: {len(delegation_records)}")
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("🎉 Responsibility tracking demonstration completed!")
|
||||
print("\nKey features demonstrated:")
|
||||
print("• Capability-based agent hierarchy")
|
||||
print("• Mathematical responsibility assignment")
|
||||
print("• Accountability logging")
|
||||
print("• Performance-based capability adjustment")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
demonstrate_responsibility_tracking()
|
||||
print("\n✅ All demonstrations completed successfully!")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ Error during demonstration: {str(e)}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
@@ -131,7 +131,7 @@ select = [
|
||||
"I001", # sort imports
|
||||
"I002", # remove unused imports
|
||||
]
|
||||
ignore = ["E501"] # ignore line too long
|
||||
ignore = ["E501", "S101"] # ignore line too long and assert statements
|
||||
|
||||
[tool.mypy]
|
||||
exclude = ["src/crewai/cli/templates", "tests"]
|
||||
|
||||
@@ -1,17 +1,10 @@
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from collections.abc import Callable, Sequence
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Literal,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
)
|
||||
|
||||
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||
@@ -19,12 +12,31 @@ from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||
from crewai.agents import CacheHandler
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.agent_events import (
|
||||
AgentExecutionCompletedEvent,
|
||||
AgentExecutionErrorEvent,
|
||||
AgentExecutionStartedEvent,
|
||||
)
|
||||
from crewai.events.types.knowledge_events import (
|
||||
KnowledgeQueryCompletedEvent,
|
||||
KnowledgeQueryFailedEvent,
|
||||
KnowledgeQueryStartedEvent,
|
||||
KnowledgeRetrievalCompletedEvent,
|
||||
KnowledgeRetrievalStartedEvent,
|
||||
KnowledgeSearchQueryFailedEvent,
|
||||
)
|
||||
from crewai.events.types.memory_events import (
|
||||
MemoryRetrievalCompletedEvent,
|
||||
MemoryRetrievalStartedEvent,
|
||||
)
|
||||
from crewai.knowledge.knowledge import Knowledge
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
|
||||
from crewai.lite_agent import LiteAgent, LiteAgentOutput
|
||||
from crewai.llm import BaseLLM
|
||||
from crewai.memory.contextual.contextual_memory import ContextualMemory
|
||||
from crewai.responsibility.models import AgentCapability
|
||||
from crewai.security import Fingerprint
|
||||
from crewai.task import Task
|
||||
from crewai.tools import BaseTool
|
||||
@@ -38,24 +50,6 @@ from crewai.utilities.agent_utils import (
|
||||
)
|
||||
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
||||
from crewai.utilities.converter import generate_model_description
|
||||
from crewai.events.types.agent_events import (
|
||||
AgentExecutionCompletedEvent,
|
||||
AgentExecutionErrorEvent,
|
||||
AgentExecutionStartedEvent,
|
||||
)
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.memory_events import (
|
||||
MemoryRetrievalStartedEvent,
|
||||
MemoryRetrievalCompletedEvent,
|
||||
)
|
||||
from crewai.events.types.knowledge_events import (
|
||||
KnowledgeQueryCompletedEvent,
|
||||
KnowledgeQueryFailedEvent,
|
||||
KnowledgeQueryStartedEvent,
|
||||
KnowledgeRetrievalCompletedEvent,
|
||||
KnowledgeRetrievalStartedEvent,
|
||||
KnowledgeSearchQueryFailedEvent,
|
||||
)
|
||||
from crewai.utilities.llm_utils import create_llm
|
||||
from crewai.utilities.token_counter_callback import TokenCalcHandler
|
||||
from crewai.utilities.training_handler import CrewTrainingHandler
|
||||
@@ -87,36 +81,36 @@ class Agent(BaseAgent):
|
||||
"""
|
||||
|
||||
_times_executed: int = PrivateAttr(default=0)
|
||||
max_execution_time: Optional[int] = Field(
|
||||
max_execution_time: int | None = Field(
|
||||
default=None,
|
||||
description="Maximum execution time for an agent to execute a task",
|
||||
)
|
||||
agent_ops_agent_name: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
|
||||
agent_ops_agent_id: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
|
||||
step_callback: Optional[Any] = Field(
|
||||
step_callback: Any | None = Field(
|
||||
default=None,
|
||||
description="Callback to be executed after each step of the agent execution.",
|
||||
)
|
||||
use_system_prompt: Optional[bool] = Field(
|
||||
use_system_prompt: bool | None = Field(
|
||||
default=True,
|
||||
description="Use system prompt for the agent.",
|
||||
)
|
||||
llm: Union[str, InstanceOf[BaseLLM], Any] = Field(
|
||||
llm: str | InstanceOf[BaseLLM] | Any = Field(
|
||||
description="Language model that will run the agent.", default=None
|
||||
)
|
||||
function_calling_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
|
||||
function_calling_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
|
||||
description="Language model that will run the agent.", default=None
|
||||
)
|
||||
system_template: Optional[str] = Field(
|
||||
system_template: str | None = Field(
|
||||
default=None, description="System format for the agent."
|
||||
)
|
||||
prompt_template: Optional[str] = Field(
|
||||
prompt_template: str | None = Field(
|
||||
default=None, description="Prompt format for the agent."
|
||||
)
|
||||
response_template: Optional[str] = Field(
|
||||
response_template: str | None = Field(
|
||||
default=None, description="Response format for the agent."
|
||||
)
|
||||
allow_code_execution: Optional[bool] = Field(
|
||||
allow_code_execution: bool | None = Field(
|
||||
default=False, description="Enable code execution for the agent."
|
||||
)
|
||||
respect_context_window: bool = Field(
|
||||
@@ -147,39 +141,44 @@ class Agent(BaseAgent):
|
||||
default=False,
|
||||
description="Whether the agent should reflect and create a plan before executing a task.",
|
||||
)
|
||||
max_reasoning_attempts: Optional[int] = Field(
|
||||
max_reasoning_attempts: int | None = Field(
|
||||
default=None,
|
||||
description="Maximum number of reasoning attempts before executing the task. If None, will try until ready.",
|
||||
)
|
||||
embedder: Optional[Dict[str, Any]] = Field(
|
||||
embedder: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
description="Embedder configuration for the agent.",
|
||||
)
|
||||
agent_knowledge_context: Optional[str] = Field(
|
||||
agent_knowledge_context: str | None = Field(
|
||||
default=None,
|
||||
description="Knowledge context for the agent.",
|
||||
)
|
||||
crew_knowledge_context: Optional[str] = Field(
|
||||
crew_knowledge_context: str | None = Field(
|
||||
default=None,
|
||||
description="Knowledge context for the crew.",
|
||||
)
|
||||
knowledge_search_query: Optional[str] = Field(
|
||||
knowledge_search_query: str | None = Field(
|
||||
default=None,
|
||||
description="Knowledge search query for the agent dynamically generated by the agent.",
|
||||
)
|
||||
from_repository: Optional[str] = Field(
|
||||
from_repository: str | None = Field(
|
||||
default=None,
|
||||
description="The Agent's role to be used from your repository.",
|
||||
)
|
||||
guardrail: Optional[Union[Callable[[Any], Tuple[bool, Any]], str]] = Field(
|
||||
guardrail: Callable[[Any], tuple[bool, Any]] | str | None = Field(
|
||||
default=None,
|
||||
description="Function or string description of a guardrail to validate agent output",
|
||||
)
|
||||
guardrail_max_retries: int = Field(
|
||||
default=3, description="Maximum number of retries when guardrail fails"
|
||||
)
|
||||
capabilities: list[AgentCapability] | None = Field(
|
||||
default_factory=list,
|
||||
description="List of agent capabilities for responsibility tracking"
|
||||
)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_from_repository(cls, v):
|
||||
if v is not None and (from_repository := v.get("from_repository")):
|
||||
return load_agent_from_repository(from_repository) | v
|
||||
@@ -188,6 +187,7 @@ class Agent(BaseAgent):
|
||||
@model_validator(mode="after")
|
||||
def post_init_setup(self):
|
||||
self.agent_ops_agent_name = self.role
|
||||
self._responsibility_system = None
|
||||
|
||||
self.llm = create_llm(self.llm)
|
||||
if self.function_calling_llm and not isinstance(
|
||||
@@ -208,7 +208,31 @@ class Agent(BaseAgent):
|
||||
self.cache_handler = CacheHandler()
|
||||
self.set_cache_handler(self.cache_handler)
|
||||
|
||||
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
|
||||
def set_responsibility_system(self, responsibility_system) -> None:
|
||||
"""Set the responsibility tracking system for this agent."""
|
||||
self._responsibility_system = responsibility_system
|
||||
|
||||
if self.capabilities:
|
||||
self._responsibility_system.register_agent(self, self.capabilities)
|
||||
|
||||
def add_capability(self, capability: AgentCapability) -> None:
|
||||
"""Add a capability to this agent."""
|
||||
if self.capabilities is None:
|
||||
self.capabilities = []
|
||||
self.capabilities.append(capability)
|
||||
|
||||
if self._responsibility_system:
|
||||
self._responsibility_system.hierarchy.add_agent(self, self.capabilities)
|
||||
|
||||
def get_capabilities(self) -> list[AgentCapability]:
|
||||
"""Get all capabilities for this agent."""
|
||||
return self.capabilities or []
|
||||
|
||||
def get_responsibility_system(self):
|
||||
"""Get the responsibility tracking system for this agent."""
|
||||
return self._responsibility_system
|
||||
|
||||
def set_knowledge(self, crew_embedder: dict[str, Any] | None = None):
|
||||
try:
|
||||
if self.embedder is None and crew_embedder:
|
||||
self.embedder = crew_embedder
|
||||
@@ -224,7 +248,7 @@ class Agent(BaseAgent):
|
||||
)
|
||||
self.knowledge.add_sources()
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
|
||||
raise ValueError(f"Invalid Knowledge Configuration: {e!s}") from e
|
||||
|
||||
def _is_any_available_memory(self) -> bool:
|
||||
"""Check if any memory is available."""
|
||||
@@ -244,8 +268,8 @@ class Agent(BaseAgent):
|
||||
def execute_task(
|
||||
self,
|
||||
task: Task,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
context: str | None = None,
|
||||
tools: list[BaseTool] | None = None,
|
||||
) -> str:
|
||||
"""Execute a task with the agent.
|
||||
|
||||
@@ -279,10 +303,10 @@ class Agent(BaseAgent):
|
||||
except Exception as e:
|
||||
if hasattr(self, "_logger"):
|
||||
self._logger.log(
|
||||
"error", f"Error during reasoning process: {str(e)}"
|
||||
"error", f"Error during reasoning process: {e!s}"
|
||||
)
|
||||
else:
|
||||
print(f"Error during reasoning process: {str(e)}")
|
||||
print(f"Error during reasoning process: {e!s}")
|
||||
|
||||
self._inject_date_to_task(task)
|
||||
|
||||
@@ -525,14 +549,14 @@ class Agent(BaseAgent):
|
||||
|
||||
try:
|
||||
return future.result(timeout=timeout)
|
||||
except concurrent.futures.TimeoutError:
|
||||
except concurrent.futures.TimeoutError as e:
|
||||
future.cancel()
|
||||
raise TimeoutError(
|
||||
f"Task '{task.description}' execution timed out after {timeout} seconds. Consider increasing max_execution_time or optimizing the task."
|
||||
)
|
||||
) from e
|
||||
except Exception as e:
|
||||
future.cancel()
|
||||
raise RuntimeError(f"Task execution failed: {str(e)}")
|
||||
raise RuntimeError(f"Task execution failed: {e!s}") from e
|
||||
|
||||
def _execute_without_timeout(self, task_prompt: str, task: Task) -> str:
|
||||
"""Execute a task without a timeout.
|
||||
@@ -554,14 +578,14 @@ class Agent(BaseAgent):
|
||||
)["output"]
|
||||
|
||||
def create_agent_executor(
|
||||
self, tools: Optional[List[BaseTool]] = None, task=None
|
||||
self, tools: list[BaseTool] | None = None, task=None
|
||||
) -> None:
|
||||
"""Create an agent executor for the agent.
|
||||
|
||||
Returns:
|
||||
An instance of the CrewAgentExecutor class.
|
||||
"""
|
||||
raw_tools: List[BaseTool] = tools or self.tools or []
|
||||
raw_tools: list[BaseTool] = tools or self.tools or []
|
||||
parsed_tools = parse_tools(raw_tools)
|
||||
|
||||
prompt = Prompts(
|
||||
@@ -603,10 +627,9 @@ class Agent(BaseAgent):
|
||||
callbacks=[TokenCalcHandler(self._token_process)],
|
||||
)
|
||||
|
||||
def get_delegation_tools(self, agents: List[BaseAgent]):
|
||||
def get_delegation_tools(self, agents: list[BaseAgent]):
|
||||
agent_tools = AgentTools(agents=agents)
|
||||
tools = agent_tools.tools()
|
||||
return tools
|
||||
return agent_tools.tools()
|
||||
|
||||
def get_multimodal_tools(self) -> Sequence[BaseTool]:
|
||||
from crewai.tools.agent_tools.add_image_tool import AddImageTool
|
||||
@@ -654,7 +677,7 @@ class Agent(BaseAgent):
|
||||
)
|
||||
return task_prompt
|
||||
|
||||
def _render_text_description(self, tools: List[Any]) -> str:
|
||||
def _render_text_description(self, tools: list[Any]) -> str:
|
||||
"""Render the tool name and description in plain text.
|
||||
|
||||
Output will be in the format of:
|
||||
@@ -664,15 +687,13 @@ class Agent(BaseAgent):
|
||||
search: This tool is used for search
|
||||
calculator: This tool is used for math
|
||||
"""
|
||||
description = "\n".join(
|
||||
return "\n".join(
|
||||
[
|
||||
f"Tool name: {tool.name}\nTool description:\n{tool.description}"
|
||||
for tool in tools
|
||||
]
|
||||
)
|
||||
|
||||
return description
|
||||
|
||||
def _inject_date_to_task(self, task):
|
||||
"""Inject the current date into the task description if inject_date is enabled."""
|
||||
if self.inject_date:
|
||||
@@ -700,9 +721,9 @@ class Agent(BaseAgent):
|
||||
task.description += f"\n\nCurrent Date: {current_date}"
|
||||
except Exception as e:
|
||||
if hasattr(self, "_logger"):
|
||||
self._logger.log("warning", f"Failed to inject date: {str(e)}")
|
||||
self._logger.log("warning", f"Failed to inject date: {e!s}")
|
||||
else:
|
||||
print(f"Warning: Failed to inject date: {str(e)}")
|
||||
print(f"Warning: Failed to inject date: {e!s}")
|
||||
|
||||
def _validate_docker_installation(self) -> None:
|
||||
"""Check if Docker is installed and running."""
|
||||
@@ -713,15 +734,15 @@ class Agent(BaseAgent):
|
||||
|
||||
try:
|
||||
subprocess.run(
|
||||
["docker", "info"],
|
||||
["/usr/bin/docker", "info"],
|
||||
check=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
except subprocess.CalledProcessError:
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise RuntimeError(
|
||||
f"Docker is not running. Please start Docker to use code execution with agent: {self.role}"
|
||||
)
|
||||
) from e
|
||||
|
||||
def __repr__(self):
|
||||
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
|
||||
@@ -796,8 +817,8 @@ class Agent(BaseAgent):
|
||||
|
||||
def kickoff(
|
||||
self,
|
||||
messages: Union[str, List[Dict[str, str]]],
|
||||
response_format: Optional[Type[Any]] = None,
|
||||
messages: str | list[dict[str, str]],
|
||||
response_format: type[Any] | None = None,
|
||||
) -> LiteAgentOutput:
|
||||
"""
|
||||
Execute the agent with the given messages using a LiteAgent instance.
|
||||
@@ -836,8 +857,8 @@ class Agent(BaseAgent):
|
||||
|
||||
async def kickoff_async(
|
||||
self,
|
||||
messages: Union[str, List[Dict[str, str]]],
|
||||
response_format: Optional[Type[Any]] = None,
|
||||
messages: str | list[dict[str, str]],
|
||||
response_format: type[Any] | None = None,
|
||||
) -> LiteAgentOutput:
|
||||
"""
|
||||
Execute the agent asynchronously with the given messages using a LiteAgent instance.
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import uuid
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Callable
|
||||
from copy import copy as shallow_copy
|
||||
from hashlib import md5
|
||||
from typing import Any, Callable, Dict, List, Optional, TypeVar
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from pydantic import (
|
||||
UUID4,
|
||||
@@ -25,7 +26,6 @@ from crewai.security.security_config import SecurityConfig
|
||||
from crewai.tools.base_tool import BaseTool, Tool
|
||||
from crewai.utilities import I18N, Logger, RPMController
|
||||
from crewai.utilities.config import process_config
|
||||
from crewai.utilities.converter import Converter
|
||||
from crewai.utilities.string_utils import interpolate_only
|
||||
|
||||
T = TypeVar("T", bound="BaseAgent")
|
||||
@@ -81,17 +81,17 @@ class BaseAgent(ABC, BaseModel):
|
||||
|
||||
__hash__ = object.__hash__ # type: ignore
|
||||
_logger: Logger = PrivateAttr(default_factory=lambda: Logger(verbose=False))
|
||||
_rpm_controller: Optional[RPMController] = PrivateAttr(default=None)
|
||||
_rpm_controller: RPMController | None = PrivateAttr(default=None)
|
||||
_request_within_rpm_limit: Any = PrivateAttr(default=None)
|
||||
_original_role: Optional[str] = PrivateAttr(default=None)
|
||||
_original_goal: Optional[str] = PrivateAttr(default=None)
|
||||
_original_backstory: Optional[str] = PrivateAttr(default=None)
|
||||
_original_role: str | None = PrivateAttr(default=None)
|
||||
_original_goal: str | None = PrivateAttr(default=None)
|
||||
_original_backstory: str | None = PrivateAttr(default=None)
|
||||
_token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess)
|
||||
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
|
||||
role: str = Field(description="Role of the agent")
|
||||
goal: str = Field(description="Objective of the agent")
|
||||
backstory: str = Field(description="Backstory of the agent")
|
||||
config: Optional[Dict[str, Any]] = Field(
|
||||
config: dict[str, Any] | None = Field(
|
||||
description="Configuration for the agent", default=None, exclude=True
|
||||
)
|
||||
cache: bool = Field(
|
||||
@@ -100,7 +100,7 @@ class BaseAgent(ABC, BaseModel):
|
||||
verbose: bool = Field(
|
||||
default=False, description="Verbose mode for the Agent Execution"
|
||||
)
|
||||
max_rpm: Optional[int] = Field(
|
||||
max_rpm: int | None = Field(
|
||||
default=None,
|
||||
description="Maximum number of requests per minute for the agent execution to be respected.",
|
||||
)
|
||||
@@ -108,7 +108,7 @@ class BaseAgent(ABC, BaseModel):
|
||||
default=False,
|
||||
description="Enable agent to delegate and ask questions among each other.",
|
||||
)
|
||||
tools: Optional[List[BaseTool]] = Field(
|
||||
tools: list[BaseTool] | None = Field(
|
||||
default_factory=list, description="Tools at agents' disposal"
|
||||
)
|
||||
max_iter: int = Field(
|
||||
@@ -122,27 +122,27 @@ class BaseAgent(ABC, BaseModel):
|
||||
)
|
||||
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
|
||||
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
|
||||
cache_handler: Optional[InstanceOf[CacheHandler]] = Field(
|
||||
cache_handler: InstanceOf[CacheHandler] | None = Field(
|
||||
default=None, description="An instance of the CacheHandler class."
|
||||
)
|
||||
tools_handler: InstanceOf[ToolsHandler] = Field(
|
||||
default_factory=ToolsHandler,
|
||||
description="An instance of the ToolsHandler class.",
|
||||
)
|
||||
tools_results: List[Dict[str, Any]] = Field(
|
||||
tools_results: list[dict[str, Any]] = Field(
|
||||
default=[], description="Results of the tools used by the agent."
|
||||
)
|
||||
max_tokens: Optional[int] = Field(
|
||||
max_tokens: int | None = Field(
|
||||
default=None, description="Maximum number of tokens for the agent's execution."
|
||||
)
|
||||
knowledge: Optional[Knowledge] = Field(
|
||||
knowledge: Knowledge | None = Field(
|
||||
default=None, description="Knowledge for the agent."
|
||||
)
|
||||
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
|
||||
knowledge_sources: list[BaseKnowledgeSource] | None = Field(
|
||||
default=None,
|
||||
description="Knowledge sources for the agent.",
|
||||
)
|
||||
knowledge_storage: Optional[Any] = Field(
|
||||
knowledge_storage: Any | None = Field(
|
||||
default=None,
|
||||
description="Custom knowledge storage for the agent.",
|
||||
)
|
||||
@@ -150,13 +150,13 @@ class BaseAgent(ABC, BaseModel):
|
||||
default_factory=SecurityConfig,
|
||||
description="Security configuration for the agent, including fingerprinting.",
|
||||
)
|
||||
callbacks: List[Callable] = Field(
|
||||
callbacks: list[Callable] = Field(
|
||||
default=[], description="Callbacks to be used for the agent"
|
||||
)
|
||||
adapted_agent: bool = Field(
|
||||
default=False, description="Whether the agent is adapted"
|
||||
)
|
||||
knowledge_config: Optional[KnowledgeConfig] = Field(
|
||||
knowledge_config: KnowledgeConfig | None = Field(
|
||||
default=None,
|
||||
description="Knowledge configuration for the agent such as limits and threshold",
|
||||
)
|
||||
@@ -168,7 +168,7 @@ class BaseAgent(ABC, BaseModel):
|
||||
|
||||
@field_validator("tools")
|
||||
@classmethod
|
||||
def validate_tools(cls, tools: List[Any]) -> List[BaseTool]:
|
||||
def validate_tools(cls, tools: list[Any]) -> list[BaseTool]:
|
||||
"""Validate and process the tools provided to the agent.
|
||||
|
||||
This method ensures that each tool is either an instance of BaseTool
|
||||
@@ -221,7 +221,7 @@ class BaseAgent(ABC, BaseModel):
|
||||
|
||||
@field_validator("id", mode="before")
|
||||
@classmethod
|
||||
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
|
||||
def _deny_user_set_id(cls, v: UUID4 | None) -> None:
|
||||
if v:
|
||||
raise PydanticCustomError(
|
||||
"may_not_set_field", "This field is not to be set by the user.", {}
|
||||
@@ -252,8 +252,8 @@ class BaseAgent(ABC, BaseModel):
|
||||
def execute_task(
|
||||
self,
|
||||
task: Any,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
context: str | None = None,
|
||||
tools: list[BaseTool] | None = None,
|
||||
) -> str:
|
||||
pass
|
||||
|
||||
@@ -262,9 +262,8 @@ class BaseAgent(ABC, BaseModel):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
|
||||
def get_delegation_tools(self, agents: list["BaseAgent"]) -> list[BaseTool]:
|
||||
"""Set the task tools that init BaseAgenTools class."""
|
||||
pass
|
||||
|
||||
def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel"
|
||||
"""Create a deep copy of the Agent."""
|
||||
@@ -309,7 +308,7 @@ class BaseAgent(ABC, BaseModel):
|
||||
|
||||
copied_data = self.model_dump(exclude=exclude)
|
||||
copied_data = {k: v for k, v in copied_data.items() if v is not None}
|
||||
copied_agent = type(self)(
|
||||
return type(self)(
|
||||
**copied_data,
|
||||
llm=existing_llm,
|
||||
tools=self.tools,
|
||||
@@ -318,9 +317,7 @@ class BaseAgent(ABC, BaseModel):
|
||||
knowledge_storage=copied_knowledge_storage,
|
||||
)
|
||||
|
||||
return copied_agent
|
||||
|
||||
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
|
||||
def interpolate_inputs(self, inputs: dict[str, Any]) -> None:
|
||||
"""Interpolate inputs into the agent description and backstory."""
|
||||
if self._original_role is None:
|
||||
self._original_role = self.role
|
||||
@@ -362,5 +359,8 @@ class BaseAgent(ABC, BaseModel):
|
||||
self._rpm_controller = rpm_controller
|
||||
self.create_agent_executor()
|
||||
|
||||
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
|
||||
def set_knowledge(self, crew_embedder: dict[str, Any] | None = None):
|
||||
pass
|
||||
|
||||
def set_responsibility_system(self, responsibility_system: Any) -> None:
|
||||
"""Set the responsibility system for the agent."""
|
||||
|
||||
@@ -3,26 +3,17 @@ import json
|
||||
import re
|
||||
import uuid
|
||||
import warnings
|
||||
from collections.abc import Callable
|
||||
from concurrent.futures import Future
|
||||
from copy import copy as shallow_copy
|
||||
from hashlib import md5
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
from opentelemetry import baggage
|
||||
from opentelemetry.context import attach, detach
|
||||
|
||||
from crewai.utilities.crew.models import CrewContext
|
||||
|
||||
from pydantic import (
|
||||
UUID4,
|
||||
BaseModel,
|
||||
@@ -39,6 +30,25 @@ from crewai.agent import Agent
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.agents.cache import CacheHandler
|
||||
from crewai.crews.crew_output import CrewOutput
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.event_listener import EventListener
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
TraceCollectionListener,
|
||||
)
|
||||
from crewai.events.listeners.tracing.utils import (
|
||||
is_tracing_enabled,
|
||||
)
|
||||
from crewai.events.types.crew_events import (
|
||||
CrewKickoffCompletedEvent,
|
||||
CrewKickoffFailedEvent,
|
||||
CrewKickoffStartedEvent,
|
||||
CrewTestCompletedEvent,
|
||||
CrewTestFailedEvent,
|
||||
CrewTestStartedEvent,
|
||||
CrewTrainCompletedEvent,
|
||||
CrewTrainFailedEvent,
|
||||
CrewTrainStartedEvent,
|
||||
)
|
||||
from crewai.flow.flow_trackable import FlowTrackable
|
||||
from crewai.knowledge.knowledge import Knowledge
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
@@ -57,29 +67,9 @@ from crewai.tools.base_tool import BaseTool, Tool
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
||||
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
|
||||
from crewai.utilities.crew.models import CrewContext
|
||||
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
|
||||
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
||||
from crewai.events.types.crew_events import (
|
||||
CrewKickoffCompletedEvent,
|
||||
CrewKickoffFailedEvent,
|
||||
CrewKickoffStartedEvent,
|
||||
CrewTestCompletedEvent,
|
||||
CrewTestFailedEvent,
|
||||
CrewTestStartedEvent,
|
||||
CrewTrainCompletedEvent,
|
||||
CrewTrainFailedEvent,
|
||||
CrewTrainStartedEvent,
|
||||
)
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.event_listener import EventListener
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
TraceCollectionListener,
|
||||
)
|
||||
|
||||
|
||||
from crewai.events.listeners.tracing.utils import (
|
||||
is_tracing_enabled,
|
||||
)
|
||||
from crewai.utilities.formatter import (
|
||||
aggregate_raw_outputs_from_task_outputs,
|
||||
aggregate_raw_outputs_from_tasks,
|
||||
@@ -124,13 +114,13 @@ class Crew(FlowTrackable, BaseModel):
|
||||
_logger: Logger = PrivateAttr()
|
||||
_file_handler: FileHandler = PrivateAttr()
|
||||
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
|
||||
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
|
||||
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
|
||||
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
|
||||
_external_memory: Optional[InstanceOf[ExternalMemory]] = PrivateAttr()
|
||||
_train: Optional[bool] = PrivateAttr(default=False)
|
||||
_train_iteration: Optional[int] = PrivateAttr()
|
||||
_inputs: Optional[Dict[str, Any]] = PrivateAttr(default=None)
|
||||
_short_term_memory: InstanceOf[ShortTermMemory] | None = PrivateAttr()
|
||||
_long_term_memory: InstanceOf[LongTermMemory] | None = PrivateAttr()
|
||||
_entity_memory: InstanceOf[EntityMemory] | None = PrivateAttr()
|
||||
_external_memory: InstanceOf[ExternalMemory] | None = PrivateAttr()
|
||||
_train: bool | None = PrivateAttr(default=False)
|
||||
_train_iteration: int | None = PrivateAttr()
|
||||
_inputs: dict[str, Any] | None = PrivateAttr(default=None)
|
||||
_logging_color: str = PrivateAttr(
|
||||
default="bold_purple",
|
||||
)
|
||||
@@ -138,107 +128,107 @@ class Crew(FlowTrackable, BaseModel):
|
||||
default_factory=TaskOutputStorageHandler
|
||||
)
|
||||
|
||||
name: Optional[str] = Field(default="crew")
|
||||
name: str | None = Field(default="crew")
|
||||
cache: bool = Field(default=True)
|
||||
tasks: List[Task] = Field(default_factory=list)
|
||||
agents: List[BaseAgent] = Field(default_factory=list)
|
||||
tasks: list[Task] = Field(default_factory=list)
|
||||
agents: list[BaseAgent] = Field(default_factory=list)
|
||||
process: Process = Field(default=Process.sequential)
|
||||
verbose: bool = Field(default=False)
|
||||
memory: bool = Field(
|
||||
default=False,
|
||||
description="Whether the crew should use memory to store memories of it's execution",
|
||||
)
|
||||
short_term_memory: Optional[InstanceOf[ShortTermMemory]] = Field(
|
||||
short_term_memory: InstanceOf[ShortTermMemory] | None = Field(
|
||||
default=None,
|
||||
description="An Instance of the ShortTermMemory to be used by the Crew",
|
||||
)
|
||||
long_term_memory: Optional[InstanceOf[LongTermMemory]] = Field(
|
||||
long_term_memory: InstanceOf[LongTermMemory] | None = Field(
|
||||
default=None,
|
||||
description="An Instance of the LongTermMemory to be used by the Crew",
|
||||
)
|
||||
entity_memory: Optional[InstanceOf[EntityMemory]] = Field(
|
||||
entity_memory: InstanceOf[EntityMemory] | None = Field(
|
||||
default=None,
|
||||
description="An Instance of the EntityMemory to be used by the Crew",
|
||||
)
|
||||
external_memory: Optional[InstanceOf[ExternalMemory]] = Field(
|
||||
external_memory: InstanceOf[ExternalMemory] | None = Field(
|
||||
default=None,
|
||||
description="An Instance of the ExternalMemory to be used by the Crew",
|
||||
)
|
||||
embedder: Optional[dict] = Field(
|
||||
embedder: dict | None = Field(
|
||||
default=None,
|
||||
description="Configuration for the embedder to be used for the crew.",
|
||||
)
|
||||
usage_metrics: Optional[UsageMetrics] = Field(
|
||||
usage_metrics: UsageMetrics | None = Field(
|
||||
default=None,
|
||||
description="Metrics for the LLM usage during all tasks execution.",
|
||||
)
|
||||
manager_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
|
||||
manager_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
|
||||
description="Language model that will run the agent.", default=None
|
||||
)
|
||||
manager_agent: Optional[BaseAgent] = Field(
|
||||
manager_agent: BaseAgent | None = Field(
|
||||
description="Custom agent that will be used as manager.", default=None
|
||||
)
|
||||
function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
|
||||
function_calling_llm: str | InstanceOf[LLM] | Any | None = Field(
|
||||
description="Language model that will run the agent.", default=None
|
||||
)
|
||||
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None)
|
||||
config: Json | dict[str, Any] | None = Field(default=None)
|
||||
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
|
||||
share_crew: Optional[bool] = Field(default=False)
|
||||
step_callback: Optional[Any] = Field(
|
||||
share_crew: bool | None = Field(default=False)
|
||||
step_callback: Any | None = Field(
|
||||
default=None,
|
||||
description="Callback to be executed after each step for all agents execution.",
|
||||
)
|
||||
task_callback: Optional[Any] = Field(
|
||||
task_callback: Any | None = Field(
|
||||
default=None,
|
||||
description="Callback to be executed after each task for all agents execution.",
|
||||
)
|
||||
before_kickoff_callbacks: List[
|
||||
Callable[[Optional[Dict[str, Any]]], Optional[Dict[str, Any]]]
|
||||
before_kickoff_callbacks: list[
|
||||
Callable[[dict[str, Any] | None], dict[str, Any] | None]
|
||||
] = Field(
|
||||
default_factory=list,
|
||||
description="List of callbacks to be executed before crew kickoff. It may be used to adjust inputs before the crew is executed.",
|
||||
)
|
||||
after_kickoff_callbacks: List[Callable[[CrewOutput], CrewOutput]] = Field(
|
||||
after_kickoff_callbacks: list[Callable[[CrewOutput], CrewOutput]] = Field(
|
||||
default_factory=list,
|
||||
description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.",
|
||||
)
|
||||
max_rpm: Optional[int] = Field(
|
||||
max_rpm: int | None = Field(
|
||||
default=None,
|
||||
description="Maximum number of requests per minute for the crew execution to be respected.",
|
||||
)
|
||||
prompt_file: Optional[str] = Field(
|
||||
prompt_file: str | None = Field(
|
||||
default=None,
|
||||
description="Path to the prompt json file to be used for the crew.",
|
||||
)
|
||||
output_log_file: Optional[Union[bool, str]] = Field(
|
||||
output_log_file: bool | str | None = Field(
|
||||
default=None,
|
||||
description="Path to the log file to be saved",
|
||||
)
|
||||
planning: Optional[bool] = Field(
|
||||
planning: bool | None = Field(
|
||||
default=False,
|
||||
description="Plan the crew execution and add the plan to the crew.",
|
||||
)
|
||||
planning_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
|
||||
planning_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
|
||||
default=None,
|
||||
description="Language model that will run the AgentPlanner if planning is True.",
|
||||
)
|
||||
task_execution_output_json_files: Optional[List[str]] = Field(
|
||||
task_execution_output_json_files: list[str] | None = Field(
|
||||
default=None,
|
||||
description="List of file paths for task execution JSON files.",
|
||||
)
|
||||
execution_logs: List[Dict[str, Any]] = Field(
|
||||
execution_logs: list[dict[str, Any]] = Field(
|
||||
default=[],
|
||||
description="List of execution logs for tasks",
|
||||
)
|
||||
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
|
||||
knowledge_sources: list[BaseKnowledgeSource] | None = Field(
|
||||
default=None,
|
||||
description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.",
|
||||
)
|
||||
chat_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
|
||||
chat_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
|
||||
default=None,
|
||||
description="LLM used to handle chatting with the crew.",
|
||||
)
|
||||
knowledge: Optional[Knowledge] = Field(
|
||||
knowledge: Knowledge | None = Field(
|
||||
default=None,
|
||||
description="Knowledge for the crew.",
|
||||
)
|
||||
@@ -246,18 +236,18 @@ class Crew(FlowTrackable, BaseModel):
|
||||
default_factory=SecurityConfig,
|
||||
description="Security configuration for the crew, including fingerprinting.",
|
||||
)
|
||||
token_usage: Optional[UsageMetrics] = Field(
|
||||
token_usage: UsageMetrics | None = Field(
|
||||
default=None,
|
||||
description="Metrics for the LLM usage during all tasks execution.",
|
||||
)
|
||||
tracing: Optional[bool] = Field(
|
||||
tracing: bool | None = Field(
|
||||
default=False,
|
||||
description="Whether to enable tracing for the crew.",
|
||||
)
|
||||
|
||||
@field_validator("id", mode="before")
|
||||
@classmethod
|
||||
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
|
||||
def _deny_user_set_id(cls, v: UUID4 | None) -> None:
|
||||
"""Prevent manual setting of the 'id' field by users."""
|
||||
if v:
|
||||
raise PydanticCustomError(
|
||||
@@ -267,8 +257,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
@field_validator("config", mode="before")
|
||||
@classmethod
|
||||
def check_config_type(
|
||||
cls, v: Union[Json, Dict[str, Any]]
|
||||
) -> Union[Json, Dict[str, Any]]:
|
||||
cls, v: Json | dict[str, Any]
|
||||
) -> Json | dict[str, Any]:
|
||||
"""Validates that the config is a valid type.
|
||||
Args:
|
||||
v: The config to be validated.
|
||||
@@ -298,8 +288,17 @@ class Crew(FlowTrackable, BaseModel):
|
||||
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
|
||||
self.function_calling_llm = create_llm(self.function_calling_llm)
|
||||
|
||||
# Initialize responsibility system
|
||||
from crewai.responsibility.system import ResponsibilitySystem
|
||||
self._responsibility_system = ResponsibilitySystem()
|
||||
|
||||
return self
|
||||
|
||||
@property
|
||||
def responsibility_system(self):
|
||||
"""Get the responsibility tracking system for this crew."""
|
||||
return getattr(self, '_responsibility_system', None)
|
||||
|
||||
def _initialize_default_memories(self):
|
||||
self._long_term_memory = self._long_term_memory or LongTermMemory()
|
||||
self._short_term_memory = self._short_term_memory or ShortTermMemory(
|
||||
@@ -314,7 +313,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
def create_crew_memory(self) -> "Crew":
|
||||
"""Initialize private memory attributes."""
|
||||
self._external_memory = (
|
||||
# External memory doesn’t support a default value since it was designed to be managed entirely externally
|
||||
# External memory doesn't support a default value since it was designed to be managed entirely externally
|
||||
self.external_memory.set_crew(self) if self.external_memory else None
|
||||
)
|
||||
|
||||
@@ -389,6 +388,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
agent.set_cache_handler(self._cache_handler)
|
||||
if self.max_rpm:
|
||||
agent.set_rpm_controller(self._rpm_controller)
|
||||
if self.responsibility_system:
|
||||
agent.set_responsibility_system(self.responsibility_system)
|
||||
return self
|
||||
|
||||
@model_validator(mode="after")
|
||||
@@ -502,7 +503,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
source: List[str] = [agent.key for agent in self.agents] + [
|
||||
source: list[str] = [agent.key for agent in self.agents] + [
|
||||
task.key for task in self.tasks
|
||||
]
|
||||
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
|
||||
@@ -530,7 +531,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self.agents = [Agent(**agent) for agent in self.config["agents"]]
|
||||
self.tasks = [self._create_task(task) for task in self.config["tasks"]]
|
||||
|
||||
def _create_task(self, task_config: Dict[str, Any]) -> Task:
|
||||
def _create_task(self, task_config: dict[str, Any]) -> Task:
|
||||
"""Creates a task instance from its configuration.
|
||||
|
||||
Args:
|
||||
@@ -559,7 +560,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
CrewTrainingHandler(filename).initialize_file()
|
||||
|
||||
def train(
|
||||
self, n_iterations: int, filename: str, inputs: Optional[Dict[str, Any]] = None
|
||||
self, n_iterations: int, filename: str, inputs: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
"""Trains the crew for a given number of iterations."""
|
||||
inputs = inputs or {}
|
||||
@@ -611,7 +612,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
|
||||
def kickoff(
|
||||
self,
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
) -> CrewOutput:
|
||||
ctx = baggage.set_baggage(
|
||||
"crew_context", CrewContext(id=str(self.id), key=self.key)
|
||||
@@ -682,9 +683,9 @@ class Crew(FlowTrackable, BaseModel):
|
||||
finally:
|
||||
detach(token)
|
||||
|
||||
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
|
||||
def kickoff_for_each(self, inputs: list[dict[str, Any]]) -> list[CrewOutput]:
|
||||
"""Executes the Crew's workflow for each input in the list and aggregates results."""
|
||||
results: List[CrewOutput] = []
|
||||
results: list[CrewOutput] = []
|
||||
|
||||
# Initialize the parent crew's usage metrics
|
||||
total_usage_metrics = UsageMetrics()
|
||||
@@ -704,13 +705,13 @@ class Crew(FlowTrackable, BaseModel):
|
||||
return results
|
||||
|
||||
async def kickoff_async(
|
||||
self, inputs: Optional[Dict[str, Any]] = None
|
||||
self, inputs: dict[str, Any] | None = None
|
||||
) -> CrewOutput:
|
||||
"""Asynchronous kickoff method to start the crew execution."""
|
||||
inputs = inputs or {}
|
||||
return await asyncio.to_thread(self.kickoff, inputs)
|
||||
|
||||
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[CrewOutput]:
|
||||
async def kickoff_for_each_async(self, inputs: list[dict]) -> list[CrewOutput]:
|
||||
crew_copies = [self.copy() for _ in inputs]
|
||||
|
||||
async def run_crew(crew, input_data):
|
||||
@@ -739,7 +740,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
tasks=self.tasks, planning_agent_llm=self.planning_llm
|
||||
)._handle_crew_planning()
|
||||
|
||||
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task):
|
||||
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task, strict=False):
|
||||
task.description += step_plan.plan
|
||||
|
||||
def _store_execution_log(
|
||||
@@ -778,6 +779,10 @@ class Crew(FlowTrackable, BaseModel):
|
||||
def _run_hierarchical_process(self) -> CrewOutput:
|
||||
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
|
||||
self._create_manager_agent()
|
||||
|
||||
if self.manager_agent and self.responsibility_system:
|
||||
self.manager_agent.set_responsibility_system(self.responsibility_system)
|
||||
|
||||
return self._execute_tasks(self.tasks)
|
||||
|
||||
def _create_manager_agent(self):
|
||||
@@ -807,8 +812,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
|
||||
def _execute_tasks(
|
||||
self,
|
||||
tasks: List[Task],
|
||||
start_index: Optional[int] = 0,
|
||||
tasks: list[Task],
|
||||
start_index: int | None = 0,
|
||||
was_replayed: bool = False,
|
||||
) -> CrewOutput:
|
||||
"""Executes tasks sequentially and returns the final output.
|
||||
@@ -821,9 +826,9 @@ class Crew(FlowTrackable, BaseModel):
|
||||
CrewOutput: Final output of the crew
|
||||
"""
|
||||
|
||||
task_outputs: List[TaskOutput] = []
|
||||
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
||||
last_sync_output: Optional[TaskOutput] = None
|
||||
task_outputs: list[TaskOutput] = []
|
||||
futures: list[tuple[Task, Future[TaskOutput], int]] = []
|
||||
last_sync_output: TaskOutput | None = None
|
||||
|
||||
for task_index, task in enumerate(tasks):
|
||||
if start_index is not None and task_index < start_index:
|
||||
@@ -847,7 +852,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
tools_for_task = self._prepare_tools(
|
||||
agent_to_use,
|
||||
task,
|
||||
cast(Union[List[Tool], List[BaseTool]], tools_for_task),
|
||||
cast(list[Tool] | list[BaseTool], tools_for_task),
|
||||
)
|
||||
|
||||
self._log_task_start(task, agent_to_use.role)
|
||||
@@ -867,7 +872,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
future = task.execute_async(
|
||||
agent=agent_to_use,
|
||||
context=context,
|
||||
tools=cast(List[BaseTool], tools_for_task),
|
||||
tools=cast(list[BaseTool], tools_for_task),
|
||||
)
|
||||
futures.append((task, future, task_index))
|
||||
else:
|
||||
@@ -879,7 +884,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
task_output = task.execute_sync(
|
||||
agent=agent_to_use,
|
||||
context=context,
|
||||
tools=cast(List[BaseTool], tools_for_task),
|
||||
tools=cast(list[BaseTool], tools_for_task),
|
||||
)
|
||||
task_outputs.append(task_output)
|
||||
self._process_task_result(task, task_output)
|
||||
@@ -893,11 +898,11 @@ class Crew(FlowTrackable, BaseModel):
|
||||
def _handle_conditional_task(
|
||||
self,
|
||||
task: ConditionalTask,
|
||||
task_outputs: List[TaskOutput],
|
||||
futures: List[Tuple[Task, Future[TaskOutput], int]],
|
||||
task_outputs: list[TaskOutput],
|
||||
futures: list[tuple[Task, Future[TaskOutput], int]],
|
||||
task_index: int,
|
||||
was_replayed: bool,
|
||||
) -> Optional[TaskOutput]:
|
||||
) -> TaskOutput | None:
|
||||
if futures:
|
||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
||||
futures.clear()
|
||||
@@ -917,8 +922,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
return None
|
||||
|
||||
def _prepare_tools(
|
||||
self, agent: BaseAgent, task: Task, tools: Union[List[Tool], List[BaseTool]]
|
||||
) -> List[BaseTool]:
|
||||
self, agent: BaseAgent, task: Task, tools: list[Tool] | list[BaseTool]
|
||||
) -> list[BaseTool]:
|
||||
# Add delegation tools if agent allows delegation
|
||||
if hasattr(agent, "allow_delegation") and getattr(
|
||||
agent, "allow_delegation", False
|
||||
@@ -948,21 +953,21 @@ class Crew(FlowTrackable, BaseModel):
|
||||
tools = self._add_multimodal_tools(agent, tools)
|
||||
|
||||
# Return a List[BaseTool] which is compatible with both Task.execute_sync and Task.execute_async
|
||||
return cast(List[BaseTool], tools)
|
||||
return cast(list[BaseTool], tools)
|
||||
|
||||
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
|
||||
def _get_agent_to_use(self, task: Task) -> BaseAgent | None:
|
||||
if self.process == Process.hierarchical:
|
||||
return self.manager_agent
|
||||
return task.agent
|
||||
|
||||
def _merge_tools(
|
||||
self,
|
||||
existing_tools: Union[List[Tool], List[BaseTool]],
|
||||
new_tools: Union[List[Tool], List[BaseTool]],
|
||||
) -> List[BaseTool]:
|
||||
existing_tools: list[Tool] | list[BaseTool],
|
||||
new_tools: list[Tool] | list[BaseTool],
|
||||
) -> list[BaseTool]:
|
||||
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
|
||||
if not new_tools:
|
||||
return cast(List[BaseTool], existing_tools)
|
||||
return cast(list[BaseTool], existing_tools)
|
||||
|
||||
# Create mapping of tool names to new tools
|
||||
new_tool_map = {tool.name: tool for tool in new_tools}
|
||||
@@ -973,41 +978,41 @@ class Crew(FlowTrackable, BaseModel):
|
||||
# Add all new tools
|
||||
tools.extend(new_tools)
|
||||
|
||||
return cast(List[BaseTool], tools)
|
||||
return cast(list[BaseTool], tools)
|
||||
|
||||
def _inject_delegation_tools(
|
||||
self,
|
||||
tools: Union[List[Tool], List[BaseTool]],
|
||||
tools: list[Tool] | list[BaseTool],
|
||||
task_agent: BaseAgent,
|
||||
agents: List[BaseAgent],
|
||||
) -> List[BaseTool]:
|
||||
agents: list[BaseAgent],
|
||||
) -> list[BaseTool]:
|
||||
if hasattr(task_agent, "get_delegation_tools"):
|
||||
delegation_tools = task_agent.get_delegation_tools(agents)
|
||||
# Cast delegation_tools to the expected type for _merge_tools
|
||||
return self._merge_tools(tools, cast(List[BaseTool], delegation_tools))
|
||||
return cast(List[BaseTool], tools)
|
||||
return self._merge_tools(tools, cast(list[BaseTool], delegation_tools))
|
||||
return cast(list[BaseTool], tools)
|
||||
|
||||
def _add_multimodal_tools(
|
||||
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]
|
||||
) -> List[BaseTool]:
|
||||
self, agent: BaseAgent, tools: list[Tool] | list[BaseTool]
|
||||
) -> list[BaseTool]:
|
||||
if hasattr(agent, "get_multimodal_tools"):
|
||||
multimodal_tools = agent.get_multimodal_tools()
|
||||
# Cast multimodal_tools to the expected type for _merge_tools
|
||||
return self._merge_tools(tools, cast(List[BaseTool], multimodal_tools))
|
||||
return cast(List[BaseTool], tools)
|
||||
return self._merge_tools(tools, cast(list[BaseTool], multimodal_tools))
|
||||
return cast(list[BaseTool], tools)
|
||||
|
||||
def _add_code_execution_tools(
|
||||
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]
|
||||
) -> List[BaseTool]:
|
||||
self, agent: BaseAgent, tools: list[Tool] | list[BaseTool]
|
||||
) -> list[BaseTool]:
|
||||
if hasattr(agent, "get_code_execution_tools"):
|
||||
code_tools = agent.get_code_execution_tools()
|
||||
# Cast code_tools to the expected type for _merge_tools
|
||||
return self._merge_tools(tools, cast(List[BaseTool], code_tools))
|
||||
return cast(List[BaseTool], tools)
|
||||
return self._merge_tools(tools, cast(list[BaseTool], code_tools))
|
||||
return cast(list[BaseTool], tools)
|
||||
|
||||
def _add_delegation_tools(
|
||||
self, task: Task, tools: Union[List[Tool], List[BaseTool]]
|
||||
) -> List[BaseTool]:
|
||||
self, task: Task, tools: list[Tool] | list[BaseTool]
|
||||
) -> list[BaseTool]:
|
||||
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
|
||||
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
|
||||
if not tools:
|
||||
@@ -1015,7 +1020,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
tools = self._inject_delegation_tools(
|
||||
tools, task.agent, agents_for_delegation
|
||||
)
|
||||
return cast(List[BaseTool], tools)
|
||||
return cast(list[BaseTool], tools)
|
||||
|
||||
def _log_task_start(self, task: Task, role: str = "None"):
|
||||
if self.output_log_file:
|
||||
@@ -1024,8 +1029,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
)
|
||||
|
||||
def _update_manager_tools(
|
||||
self, task: Task, tools: Union[List[Tool], List[BaseTool]]
|
||||
) -> List[BaseTool]:
|
||||
self, task: Task, tools: list[Tool] | list[BaseTool]
|
||||
) -> list[BaseTool]:
|
||||
if self.manager_agent:
|
||||
if task.agent:
|
||||
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
|
||||
@@ -1033,18 +1038,17 @@ class Crew(FlowTrackable, BaseModel):
|
||||
tools = self._inject_delegation_tools(
|
||||
tools, self.manager_agent, self.agents
|
||||
)
|
||||
return cast(List[BaseTool], tools)
|
||||
return cast(list[BaseTool], tools)
|
||||
|
||||
def _get_context(self, task: Task, task_outputs: List[TaskOutput]) -> str:
|
||||
def _get_context(self, task: Task, task_outputs: list[TaskOutput]) -> str:
|
||||
if not task.context:
|
||||
return ""
|
||||
|
||||
context = (
|
||||
return (
|
||||
aggregate_raw_outputs_from_task_outputs(task_outputs)
|
||||
if task.context is NOT_SPECIFIED
|
||||
else aggregate_raw_outputs_from_tasks(task.context)
|
||||
)
|
||||
return context
|
||||
|
||||
def _process_task_result(self, task: Task, output: TaskOutput) -> None:
|
||||
role = task.agent.role if task.agent is not None else "None"
|
||||
@@ -1057,7 +1061,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
output=output.raw,
|
||||
)
|
||||
|
||||
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
|
||||
def _create_crew_output(self, task_outputs: list[TaskOutput]) -> CrewOutput:
|
||||
if not task_outputs:
|
||||
raise ValueError("No task outputs available to create crew output.")
|
||||
|
||||
@@ -1088,10 +1092,10 @@ class Crew(FlowTrackable, BaseModel):
|
||||
|
||||
def _process_async_tasks(
|
||||
self,
|
||||
futures: List[Tuple[Task, Future[TaskOutput], int]],
|
||||
futures: list[tuple[Task, Future[TaskOutput], int]],
|
||||
was_replayed: bool = False,
|
||||
) -> List[TaskOutput]:
|
||||
task_outputs: List[TaskOutput] = []
|
||||
) -> list[TaskOutput]:
|
||||
task_outputs: list[TaskOutput] = []
|
||||
for future_task, future, task_index in futures:
|
||||
task_output = future.result()
|
||||
task_outputs.append(task_output)
|
||||
@@ -1102,8 +1106,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
return task_outputs
|
||||
|
||||
def _find_task_index(
|
||||
self, task_id: str, stored_outputs: List[Any]
|
||||
) -> Optional[int]:
|
||||
self, task_id: str, stored_outputs: list[Any]
|
||||
) -> int | None:
|
||||
return next(
|
||||
(
|
||||
index
|
||||
@@ -1114,7 +1118,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
)
|
||||
|
||||
def replay(
|
||||
self, task_id: str, inputs: Optional[Dict[str, Any]] = None
|
||||
self, task_id: str, inputs: dict[str, Any] | None = None
|
||||
) -> CrewOutput:
|
||||
stored_outputs = self._task_output_handler.load()
|
||||
if not stored_outputs:
|
||||
@@ -1151,19 +1155,18 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self.tasks[i].output = task_output
|
||||
|
||||
self._logging_color = "bold_blue"
|
||||
result = self._execute_tasks(self.tasks, start_index, True)
|
||||
return result
|
||||
return self._execute_tasks(self.tasks, start_index, True)
|
||||
|
||||
def query_knowledge(
|
||||
self, query: List[str], results_limit: int = 3, score_threshold: float = 0.35
|
||||
) -> Union[List[Dict[str, Any]], None]:
|
||||
self, query: list[str], results_limit: int = 3, score_threshold: float = 0.35
|
||||
) -> list[dict[str, Any]] | None:
|
||||
if self.knowledge:
|
||||
return self.knowledge.query(
|
||||
query, results_limit=results_limit, score_threshold=score_threshold
|
||||
)
|
||||
return None
|
||||
|
||||
def fetch_inputs(self) -> Set[str]:
|
||||
def fetch_inputs(self) -> set[str]:
|
||||
"""
|
||||
Gathers placeholders (e.g., {something}) referenced in tasks or agents.
|
||||
Scans each task's 'description' + 'expected_output', and each agent's
|
||||
@@ -1172,7 +1175,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
Returns a set of all discovered placeholder names.
|
||||
"""
|
||||
placeholder_pattern = re.compile(r"\{(.+?)\}")
|
||||
required_inputs: Set[str] = set()
|
||||
required_inputs: set[str] = set()
|
||||
|
||||
# Scan tasks for inputs
|
||||
for task in self.tasks:
|
||||
@@ -1230,7 +1233,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
cloned_tasks.append(cloned_task)
|
||||
task_mapping[task.key] = cloned_task
|
||||
|
||||
for cloned_task, original_task in zip(cloned_tasks, self.tasks):
|
||||
for cloned_task, original_task in zip(cloned_tasks, self.tasks, strict=False):
|
||||
if isinstance(original_task.context, list):
|
||||
cloned_context = [
|
||||
task_mapping[context_task.key]
|
||||
@@ -1256,7 +1259,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
copied_data.pop("agents", None)
|
||||
copied_data.pop("tasks", None)
|
||||
|
||||
copied_crew = Crew(
|
||||
return Crew(
|
||||
**copied_data,
|
||||
agents=cloned_agents,
|
||||
tasks=cloned_tasks,
|
||||
@@ -1266,15 +1269,13 @@ class Crew(FlowTrackable, BaseModel):
|
||||
manager_llm=manager_llm,
|
||||
)
|
||||
|
||||
return copied_crew
|
||||
|
||||
def _set_tasks_callbacks(self) -> None:
|
||||
"""Sets callback for every task suing task_callback"""
|
||||
for task in self.tasks:
|
||||
if not task.callback:
|
||||
task.callback = self.task_callback
|
||||
|
||||
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
|
||||
def _interpolate_inputs(self, inputs: dict[str, Any]) -> None:
|
||||
"""Interpolates the inputs in the tasks and agents."""
|
||||
[
|
||||
task.interpolate_inputs_and_add_conversation_history(
|
||||
@@ -1307,8 +1308,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
def test(
|
||||
self,
|
||||
n_iterations: int,
|
||||
eval_llm: Union[str, InstanceOf[BaseLLM]],
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
eval_llm: str | InstanceOf[BaseLLM],
|
||||
inputs: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
|
||||
try:
|
||||
@@ -1364,7 +1365,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
ValueError: If an invalid command type is provided.
|
||||
RuntimeError: If memory reset operation fails.
|
||||
"""
|
||||
VALID_TYPES = frozenset(
|
||||
valid_types = frozenset(
|
||||
[
|
||||
"long",
|
||||
"short",
|
||||
@@ -1377,9 +1378,9 @@ class Crew(FlowTrackable, BaseModel):
|
||||
]
|
||||
)
|
||||
|
||||
if command_type not in VALID_TYPES:
|
||||
if command_type not in valid_types:
|
||||
raise ValueError(
|
||||
f"Invalid command type. Must be one of: {', '.join(sorted(VALID_TYPES))}"
|
||||
f"Invalid command type. Must be one of: {', '.join(sorted(valid_types))}"
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -1389,7 +1390,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self._reset_specific_memory(command_type)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to reset {command_type} memory: {str(e)}"
|
||||
error_msg = f"Failed to reset {command_type} memory: {e!s}"
|
||||
self._logger.log("error", error_msg)
|
||||
raise RuntimeError(error_msg) from e
|
||||
|
||||
@@ -1397,7 +1398,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
"""Reset all available memory systems."""
|
||||
memory_systems = self._get_memory_systems()
|
||||
|
||||
for memory_type, config in memory_systems.items():
|
||||
for config in memory_systems.values():
|
||||
if (system := config.get("system")) is not None:
|
||||
name = config.get("name")
|
||||
try:
|
||||
@@ -1409,7 +1410,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {str(e)}"
|
||||
f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {e!s}"
|
||||
) from e
|
||||
|
||||
def _reset_specific_memory(self, memory_type: str) -> None:
|
||||
@@ -1438,7 +1439,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
)
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {str(e)}"
|
||||
f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {e!s}"
|
||||
) from e
|
||||
|
||||
def _get_memory_systems(self):
|
||||
@@ -1506,7 +1507,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
},
|
||||
}
|
||||
|
||||
def reset_knowledge(self, knowledges: List[Knowledge]) -> None:
|
||||
def reset_knowledge(self, knowledges: list[Knowledge]) -> None:
|
||||
"""Reset crew and agent knowledge storage."""
|
||||
for ks in knowledges:
|
||||
ks.reset()
|
||||
|
||||
33
src/crewai/responsibility/__init__.py
Normal file
33
src/crewai/responsibility/__init__.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""
|
||||
Formal Responsibility Tracking System for CrewAI
|
||||
|
||||
This module provides comprehensive responsibility tracking capabilities including:
|
||||
- Capability-based agent hierarchy
|
||||
- Mathematical responsibility assignment
|
||||
- Accountability logging
|
||||
- Performance-based capability adjustment
|
||||
"""
|
||||
|
||||
from crewai.responsibility.accountability import AccountabilityLogger
|
||||
from crewai.responsibility.assignment import ResponsibilityCalculator
|
||||
from crewai.responsibility.hierarchy import CapabilityHierarchy
|
||||
from crewai.responsibility.models import (
|
||||
AccountabilityRecord,
|
||||
AgentCapability,
|
||||
PerformanceMetrics,
|
||||
ResponsibilityAssignment,
|
||||
)
|
||||
from crewai.responsibility.performance import PerformanceTracker
|
||||
from crewai.responsibility.system import ResponsibilitySystem
|
||||
|
||||
__all__ = [
|
||||
"AccountabilityLogger",
|
||||
"AccountabilityRecord",
|
||||
"AgentCapability",
|
||||
"CapabilityHierarchy",
|
||||
"PerformanceMetrics",
|
||||
"PerformanceTracker",
|
||||
"ResponsibilityAssignment",
|
||||
"ResponsibilityCalculator",
|
||||
"ResponsibilitySystem",
|
||||
]
|
||||
212
src/crewai/responsibility/accountability.py
Normal file
212
src/crewai/responsibility/accountability.py
Normal file
@@ -0,0 +1,212 @@
|
||||
"""
|
||||
Accountability logging and tracking system.
|
||||
"""
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.responsibility.models import AccountabilityRecord
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
class AccountabilityLogger:
|
||||
"""Logs and tracks agent actions for accountability."""
|
||||
|
||||
def __init__(self):
|
||||
self.records: list[AccountabilityRecord] = []
|
||||
self.agent_records: dict[str, list[AccountabilityRecord]] = defaultdict(list)
|
||||
self._setup_event_listeners()
|
||||
|
||||
def log_action(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
action_type: str,
|
||||
action_description: str,
|
||||
task: Task | None = None,
|
||||
context: dict[str, Any] | None = None
|
||||
) -> AccountabilityRecord:
|
||||
"""Log an agent action."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
task_id = str(task.id) if task else None
|
||||
|
||||
record = AccountabilityRecord(
|
||||
agent_id=agent_id,
|
||||
action_type=action_type,
|
||||
action_description=action_description,
|
||||
task_id=task_id,
|
||||
context=context or {},
|
||||
outcome=None,
|
||||
success=None
|
||||
)
|
||||
|
||||
self.records.append(record)
|
||||
self.agent_records[agent_id].append(record)
|
||||
|
||||
return record
|
||||
|
||||
def log_decision(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
decision: str,
|
||||
reasoning: str,
|
||||
task: Task | None = None,
|
||||
alternatives_considered: list[str] | None = None
|
||||
) -> AccountabilityRecord:
|
||||
"""Log an agent decision with reasoning."""
|
||||
context = {
|
||||
"reasoning": reasoning,
|
||||
"alternatives_considered": alternatives_considered or []
|
||||
}
|
||||
|
||||
return self.log_action(
|
||||
agent=agent,
|
||||
action_type="decision",
|
||||
action_description=decision,
|
||||
task=task,
|
||||
context=context
|
||||
)
|
||||
|
||||
def log_delegation(
|
||||
self,
|
||||
delegating_agent: BaseAgent,
|
||||
receiving_agent: BaseAgent,
|
||||
task: Task,
|
||||
delegation_reason: str
|
||||
) -> AccountabilityRecord:
|
||||
"""Log task delegation between agents."""
|
||||
context = {
|
||||
"receiving_agent_id": self._get_agent_id(receiving_agent),
|
||||
"receiving_agent_role": receiving_agent.role,
|
||||
"delegation_reason": delegation_reason
|
||||
}
|
||||
|
||||
return self.log_action(
|
||||
agent=delegating_agent,
|
||||
action_type="delegation",
|
||||
action_description=f"Delegated task to {receiving_agent.role}",
|
||||
task=task,
|
||||
context=context
|
||||
)
|
||||
|
||||
def log_task_completion(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
task: Task,
|
||||
success: bool,
|
||||
outcome_description: str,
|
||||
completion_time: float | None = None
|
||||
) -> AccountabilityRecord:
|
||||
"""Log task completion with outcome."""
|
||||
context = {
|
||||
"completion_time": completion_time,
|
||||
"task_description": task.description
|
||||
}
|
||||
|
||||
record = self.log_action(
|
||||
agent=agent,
|
||||
action_type="task_completion",
|
||||
action_description=f"Completed task: {task.description[:100]}...",
|
||||
task=task,
|
||||
context=context
|
||||
)
|
||||
|
||||
record.set_outcome(outcome_description, success)
|
||||
return record
|
||||
|
||||
def get_agent_records(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
action_type: str | None = None,
|
||||
since: datetime | None = None
|
||||
) -> list[AccountabilityRecord]:
|
||||
"""Get accountability records for a specific agent."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
records = self.agent_records.get(agent_id, [])
|
||||
|
||||
if action_type:
|
||||
records = [r for r in records if r.action_type == action_type]
|
||||
|
||||
if since:
|
||||
records = [r for r in records if r.timestamp >= since]
|
||||
|
||||
return records
|
||||
|
||||
def get_task_records(self, task: Task) -> list[AccountabilityRecord]:
|
||||
"""Get all accountability records related to a specific task."""
|
||||
task_id = str(task.id)
|
||||
return [r for r in self.records if r.task_id == task_id]
|
||||
|
||||
def get_delegation_chain(self, task: Task) -> list[AccountabilityRecord]:
|
||||
"""Get the delegation chain for a task."""
|
||||
task_records = self.get_task_records(task)
|
||||
delegation_records = [r for r in task_records if r.action_type == "delegation"]
|
||||
|
||||
delegation_records.sort(key=lambda r: r.timestamp)
|
||||
return delegation_records
|
||||
|
||||
def generate_accountability_report(
|
||||
self,
|
||||
agent: BaseAgent | None = None,
|
||||
time_period: timedelta | None = None
|
||||
) -> dict[str, Any]:
|
||||
"""Generate an accountability report."""
|
||||
since = datetime.utcnow() - time_period if time_period else None
|
||||
|
||||
if agent:
|
||||
records = self.get_agent_records(agent, since=since)
|
||||
agent_id = self._get_agent_id(agent)
|
||||
else:
|
||||
records = self.records
|
||||
if since:
|
||||
records = [r for r in records if r.timestamp >= since]
|
||||
agent_id = "all_agents"
|
||||
|
||||
action_counts: dict[str, int] = defaultdict(int)
|
||||
success_counts: dict[str, int] = defaultdict(int)
|
||||
failure_counts: dict[str, int] = defaultdict(int)
|
||||
|
||||
for record in records:
|
||||
action_counts[record.action_type] += 1
|
||||
if record.success is True:
|
||||
success_counts[record.action_type] += 1
|
||||
elif record.success is False:
|
||||
failure_counts[record.action_type] += 1
|
||||
|
||||
success_rates: dict[str, float | None] = {}
|
||||
for action_type in action_counts:
|
||||
total = success_counts[action_type] + failure_counts[action_type]
|
||||
if total > 0:
|
||||
success_rates[action_type] = success_counts[action_type] / total
|
||||
else:
|
||||
success_rates[action_type] = None
|
||||
|
||||
return {
|
||||
"agent_id": agent_id,
|
||||
"report_period": {
|
||||
"start": since.isoformat() if since else None,
|
||||
"end": datetime.utcnow().isoformat()
|
||||
},
|
||||
"total_records": len(records),
|
||||
"action_counts": dict(action_counts),
|
||||
"success_counts": dict(success_counts),
|
||||
"failure_counts": dict(failure_counts),
|
||||
"success_rates": success_rates,
|
||||
"recent_actions": [
|
||||
{
|
||||
"timestamp": r.timestamp.isoformat(),
|
||||
"action_type": r.action_type,
|
||||
"description": r.action_description,
|
||||
"success": r.success
|
||||
}
|
||||
for r in sorted(records, key=lambda x: x.timestamp, reverse=True)[:10]
|
||||
]
|
||||
}
|
||||
|
||||
def _setup_event_listeners(self) -> None:
|
||||
"""Set up event listeners for automatic logging."""
|
||||
|
||||
def _get_agent_id(self, agent: BaseAgent) -> str:
|
||||
"""Get a unique identifier for an agent."""
|
||||
return f"{agent.role}_{id(agent)}"
|
||||
257
src/crewai/responsibility/assignment.py
Normal file
257
src/crewai/responsibility/assignment.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""
|
||||
Mathematical responsibility assignment algorithms.
|
||||
"""
|
||||
|
||||
import math
|
||||
from enum import Enum
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.responsibility.hierarchy import CapabilityHierarchy
|
||||
from crewai.responsibility.models import ResponsibilityAssignment, TaskRequirement
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
class AssignmentStrategy(str, Enum):
|
||||
"""Different strategies for responsibility assignment."""
|
||||
GREEDY = "greedy" # Assign to best available agent
|
||||
BALANCED = "balanced" # Balance workload across agents
|
||||
OPTIMAL = "optimal" # Optimize for overall system performance
|
||||
|
||||
|
||||
class ResponsibilityCalculator:
|
||||
"""Calculates and assigns responsibilities using mathematical algorithms."""
|
||||
|
||||
def __init__(self, hierarchy: CapabilityHierarchy):
|
||||
self.hierarchy = hierarchy
|
||||
self.current_workloads: dict[str, int] = {} # agent_id -> current task count
|
||||
|
||||
def calculate_responsibility_assignment(
|
||||
self,
|
||||
task: Task,
|
||||
requirements: list[TaskRequirement],
|
||||
strategy: AssignmentStrategy = AssignmentStrategy.GREEDY,
|
||||
exclude_agents: list[BaseAgent] | None = None
|
||||
) -> ResponsibilityAssignment | None:
|
||||
"""Calculate the best responsibility assignment for a task."""
|
||||
exclude_agent_ids = set()
|
||||
if exclude_agents:
|
||||
exclude_agent_ids = {self.hierarchy._get_agent_id(agent) for agent in exclude_agents}
|
||||
|
||||
if strategy == AssignmentStrategy.GREEDY:
|
||||
return self._greedy_assignment(task, requirements, exclude_agent_ids)
|
||||
if strategy == AssignmentStrategy.BALANCED:
|
||||
return self._balanced_assignment(task, requirements, exclude_agent_ids)
|
||||
if strategy == AssignmentStrategy.OPTIMAL:
|
||||
return self._optimal_assignment(task, requirements, exclude_agent_ids)
|
||||
raise ValueError(f"Unknown assignment strategy: {strategy}")
|
||||
|
||||
def calculate_multi_agent_assignment(
|
||||
self,
|
||||
task: Task,
|
||||
requirements: list[TaskRequirement],
|
||||
max_agents: int = 3,
|
||||
strategy: AssignmentStrategy = AssignmentStrategy.OPTIMAL
|
||||
) -> list[ResponsibilityAssignment]:
|
||||
"""Calculate assignment for tasks requiring multiple agents."""
|
||||
assignments: list[ResponsibilityAssignment] = []
|
||||
used_agents: set[str] = set()
|
||||
|
||||
sorted_requirements = sorted(requirements, key=lambda r: r.weight, reverse=True)
|
||||
|
||||
for i, requirement in enumerate(sorted_requirements):
|
||||
if len(assignments) >= max_agents:
|
||||
break
|
||||
|
||||
single_req_assignment = self.calculate_responsibility_assignment(
|
||||
task, [requirement], strategy,
|
||||
exclude_agents=[self.hierarchy.agents[agent_id] for agent_id in used_agents]
|
||||
)
|
||||
|
||||
if single_req_assignment:
|
||||
single_req_assignment.responsibility_score *= (1.0 / (i + 1)) # Diminishing returns
|
||||
assignments.append(single_req_assignment)
|
||||
used_agents.add(single_req_assignment.agent_id)
|
||||
|
||||
return assignments
|
||||
|
||||
def update_workload(self, agent: BaseAgent, workload_change: int) -> None:
|
||||
"""Update the current workload for an agent."""
|
||||
agent_id = self.hierarchy._get_agent_id(agent)
|
||||
current = self.current_workloads.get(agent_id, 0)
|
||||
self.current_workloads[agent_id] = max(0, current + workload_change)
|
||||
|
||||
def get_workload_distribution(self) -> dict[str, int]:
|
||||
"""Get current workload distribution across all agents."""
|
||||
return self.current_workloads.copy()
|
||||
|
||||
def _greedy_assignment(
|
||||
self,
|
||||
task: Task,
|
||||
requirements: list[TaskRequirement],
|
||||
exclude_agent_ids: set
|
||||
) -> ResponsibilityAssignment | None:
|
||||
"""Assign to the agent with highest capability match score."""
|
||||
best_match = self.hierarchy.get_best_agent_for_task(requirements, exclude_agent_ids)
|
||||
|
||||
if not best_match:
|
||||
return None
|
||||
|
||||
agent, score, matches = best_match
|
||||
agent_id = self.hierarchy._get_agent_id(agent)
|
||||
|
||||
return ResponsibilityAssignment(
|
||||
agent_id=agent_id,
|
||||
task_id=str(task.id),
|
||||
responsibility_score=score,
|
||||
capability_matches=matches,
|
||||
reasoning=f"Greedy assignment: highest capability match score ({score:.3f})",
|
||||
completed_at=None,
|
||||
success=None
|
||||
)
|
||||
|
||||
def _balanced_assignment(
|
||||
self,
|
||||
task: Task,
|
||||
requirements: list[TaskRequirement],
|
||||
exclude_agent_ids: set
|
||||
) -> ResponsibilityAssignment | None:
|
||||
"""Assign considering both capability and current workload."""
|
||||
capable_agents = self.hierarchy.find_capable_agents(requirements, minimum_match_score=0.3)
|
||||
|
||||
if not capable_agents:
|
||||
return None
|
||||
|
||||
best_agent = None
|
||||
best_score = -1.0
|
||||
best_matches: list[str] = []
|
||||
|
||||
for agent, capability_score in capable_agents:
|
||||
agent_id = self.hierarchy._get_agent_id(agent)
|
||||
|
||||
if agent_id in exclude_agent_ids:
|
||||
continue
|
||||
|
||||
current_workload = self.current_workloads.get(agent_id, 0)
|
||||
workload_penalty = self._calculate_workload_penalty(current_workload)
|
||||
|
||||
combined_score = capability_score * (1.0 - workload_penalty)
|
||||
|
||||
if combined_score > best_score:
|
||||
best_score = combined_score
|
||||
best_agent = agent
|
||||
_, best_matches = self.hierarchy._calculate_detailed_capability_match(agent_id, requirements)
|
||||
|
||||
if best_agent:
|
||||
agent_id = self.hierarchy._get_agent_id(best_agent)
|
||||
return ResponsibilityAssignment(
|
||||
agent_id=agent_id,
|
||||
task_id=str(task.id),
|
||||
responsibility_score=best_score,
|
||||
capability_matches=best_matches,
|
||||
reasoning=f"Balanced assignment: capability ({capability_score:.3f}) with workload consideration",
|
||||
completed_at=None,
|
||||
success=None
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def _optimal_assignment(
|
||||
self,
|
||||
task: Task,
|
||||
requirements: list[TaskRequirement],
|
||||
exclude_agent_ids: set
|
||||
) -> ResponsibilityAssignment | None:
|
||||
"""Assign using optimization for overall system performance."""
|
||||
capable_agents = self.hierarchy.find_capable_agents(requirements, minimum_match_score=0.2)
|
||||
|
||||
if not capable_agents:
|
||||
return None
|
||||
|
||||
best_agent = None
|
||||
best_score = -1.0
|
||||
best_matches: list[str] = []
|
||||
|
||||
for agent, capability_score in capable_agents:
|
||||
agent_id = self.hierarchy._get_agent_id(agent)
|
||||
|
||||
if agent_id in exclude_agent_ids:
|
||||
continue
|
||||
|
||||
optimization_score = self._calculate_optimization_score(
|
||||
agent_id, capability_score, requirements
|
||||
)
|
||||
|
||||
if optimization_score > best_score:
|
||||
best_score = optimization_score
|
||||
best_agent = agent
|
||||
_, best_matches = self.hierarchy._calculate_detailed_capability_match(agent_id, requirements)
|
||||
|
||||
if best_agent:
|
||||
agent_id = self.hierarchy._get_agent_id(best_agent)
|
||||
return ResponsibilityAssignment(
|
||||
agent_id=agent_id,
|
||||
task_id=str(task.id),
|
||||
responsibility_score=best_score,
|
||||
capability_matches=best_matches,
|
||||
reasoning=f"Optimal assignment: multi-factor optimization score ({best_score:.3f})",
|
||||
completed_at=None,
|
||||
success=None
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def _calculate_workload_penalty(self, current_workload: int) -> float:
|
||||
"""Calculate penalty based on current workload."""
|
||||
if current_workload == 0:
|
||||
return 0.0
|
||||
|
||||
return min(0.8, 1.0 - math.exp(-current_workload / 3.0))
|
||||
|
||||
def _calculate_optimization_score(
|
||||
self,
|
||||
agent_id: str,
|
||||
capability_score: float,
|
||||
requirements: list[TaskRequirement]
|
||||
) -> float:
|
||||
"""Calculate multi-factor optimization score."""
|
||||
score = capability_score
|
||||
|
||||
current_workload = self.current_workloads.get(agent_id, 0)
|
||||
workload_factor = 1.0 - self._calculate_workload_penalty(current_workload)
|
||||
|
||||
agent_capabilities = self.hierarchy.agent_capabilities.get(agent_id, [])
|
||||
specialization_bonus = self._calculate_specialization_bonus(agent_capabilities, requirements)
|
||||
|
||||
reliability_factor = 1.0 # Placeholder for future performance integration
|
||||
|
||||
return (
|
||||
score * 0.5 + # 50% capability match
|
||||
workload_factor * 0.2 + # 20% workload consideration
|
||||
specialization_bonus * 0.2 + # 20% specialization bonus
|
||||
reliability_factor * 0.1 # 10% reliability
|
||||
)
|
||||
|
||||
def _calculate_specialization_bonus(
|
||||
self,
|
||||
agent_capabilities: list,
|
||||
requirements: list[TaskRequirement]
|
||||
) -> float:
|
||||
"""Calculate bonus for agents with specialized capabilities."""
|
||||
if not agent_capabilities or not requirements:
|
||||
return 0.0
|
||||
|
||||
high_proficiency_matches = 0
|
||||
total_matches = 0
|
||||
|
||||
for capability in agent_capabilities:
|
||||
for requirement in requirements:
|
||||
if self.hierarchy._capabilities_match(capability, requirement):
|
||||
total_matches += 1
|
||||
if capability.proficiency_level >= 0.8:
|
||||
high_proficiency_matches += 1
|
||||
|
||||
if total_matches == 0:
|
||||
return 0.0
|
||||
|
||||
specialization_ratio = high_proficiency_matches / total_matches
|
||||
return min(0.3, specialization_ratio * 0.3) # Max 30% bonus
|
||||
257
src/crewai/responsibility/hierarchy.py
Normal file
257
src/crewai/responsibility/hierarchy.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""
|
||||
Capability-based agent hierarchy management.
|
||||
"""
|
||||
|
||||
from collections import defaultdict, deque
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.responsibility.models import (
|
||||
AgentCapability,
|
||||
CapabilityType,
|
||||
TaskRequirement,
|
||||
)
|
||||
|
||||
|
||||
class CapabilityHierarchy:
|
||||
"""Manages capability-based agent hierarchy and relationships."""
|
||||
|
||||
def __init__(self):
|
||||
self.agents: dict[str, BaseAgent] = {}
|
||||
self.agent_capabilities: dict[str, list[AgentCapability]] = defaultdict(list)
|
||||
self.capability_index: dict[str, set[str]] = defaultdict(set) # capability_name -> agent_ids
|
||||
self.hierarchy_relationships: dict[str, set[str]] = defaultdict(set) # supervisor -> subordinates
|
||||
|
||||
def add_agent(self, agent: BaseAgent, capabilities: list[AgentCapability]) -> None:
|
||||
"""Add an agent with their capabilities to the hierarchy."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
self.agents[agent_id] = agent
|
||||
self.agent_capabilities[agent_id] = capabilities
|
||||
|
||||
for capability in capabilities:
|
||||
self.capability_index[capability.name].add(agent_id)
|
||||
|
||||
def remove_agent(self, agent: BaseAgent) -> None:
|
||||
"""Remove an agent from the hierarchy."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
|
||||
if agent_id in self.agents:
|
||||
for capability in self.agent_capabilities[agent_id]:
|
||||
self.capability_index[capability.name].discard(agent_id)
|
||||
|
||||
for supervisor_id in self.hierarchy_relationships:
|
||||
self.hierarchy_relationships[supervisor_id].discard(agent_id)
|
||||
if agent_id in self.hierarchy_relationships:
|
||||
del self.hierarchy_relationships[agent_id]
|
||||
|
||||
del self.agents[agent_id]
|
||||
del self.agent_capabilities[agent_id]
|
||||
|
||||
def set_supervision_relationship(self, supervisor: BaseAgent, subordinate: BaseAgent) -> None:
|
||||
"""Establish a supervision relationship between agents."""
|
||||
supervisor_id = self._get_agent_id(supervisor)
|
||||
subordinate_id = self._get_agent_id(subordinate)
|
||||
|
||||
if supervisor_id in self.agents and subordinate_id in self.agents:
|
||||
self.hierarchy_relationships[supervisor_id].add(subordinate_id)
|
||||
|
||||
def get_agent_capabilities(self, agent: BaseAgent) -> list[AgentCapability]:
|
||||
"""Get capabilities for a specific agent."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
return self.agent_capabilities.get(agent_id, [])
|
||||
|
||||
def update_agent_capability(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
capability_name: str,
|
||||
new_proficiency: float,
|
||||
new_confidence: float
|
||||
) -> bool:
|
||||
"""Update a specific capability for an agent."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
|
||||
if agent_id not in self.agent_capabilities:
|
||||
return False
|
||||
|
||||
for capability in self.agent_capabilities[agent_id]:
|
||||
if capability.name == capability_name:
|
||||
capability.update_proficiency(new_proficiency, new_confidence)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def find_capable_agents(
|
||||
self,
|
||||
requirements: list[TaskRequirement],
|
||||
minimum_match_score: float = 0.5
|
||||
) -> list[tuple[BaseAgent, float]]:
|
||||
"""Find agents capable of handling the given requirements."""
|
||||
agent_scores = []
|
||||
|
||||
for agent_id, agent in self.agents.items():
|
||||
score = self._calculate_capability_match_score(agent_id, requirements)
|
||||
if score >= minimum_match_score:
|
||||
agent_scores.append((agent, score))
|
||||
|
||||
agent_scores.sort(key=lambda x: x[1], reverse=True)
|
||||
return agent_scores
|
||||
|
||||
def get_best_agent_for_task(
|
||||
self,
|
||||
requirements: list[TaskRequirement],
|
||||
exclude_agents: set[str] | None = None
|
||||
) -> tuple[BaseAgent, float, list[str]] | None:
|
||||
"""Get the best agent for a task based on capability requirements."""
|
||||
exclude_agents = exclude_agents or set()
|
||||
best_agent = None
|
||||
best_score = 0.0
|
||||
best_matches = []
|
||||
|
||||
for agent_id, agent in self.agents.items():
|
||||
if agent_id in exclude_agents:
|
||||
continue
|
||||
|
||||
score, matches = self._calculate_detailed_capability_match(agent_id, requirements)
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_agent = agent
|
||||
best_matches = matches
|
||||
|
||||
if best_agent:
|
||||
return best_agent, best_score, best_matches
|
||||
return None
|
||||
|
||||
def get_subordinates(self, supervisor: BaseAgent) -> list[BaseAgent]:
|
||||
"""Get all subordinates of a supervisor agent."""
|
||||
supervisor_id = self._get_agent_id(supervisor)
|
||||
subordinate_ids = self.hierarchy_relationships.get(supervisor_id, set())
|
||||
return [self.agents[sub_id] for sub_id in subordinate_ids if sub_id in self.agents]
|
||||
|
||||
def get_hierarchy_path(self, from_agent: BaseAgent, to_agent: BaseAgent) -> list[BaseAgent] | None:
|
||||
"""Find the shortest path in the hierarchy between two agents."""
|
||||
from_id = self._get_agent_id(from_agent)
|
||||
to_id = self._get_agent_id(to_agent)
|
||||
|
||||
if from_id not in self.agents or to_id not in self.agents:
|
||||
return None
|
||||
|
||||
queue = deque([(from_id, [from_id])])
|
||||
visited = {from_id}
|
||||
|
||||
while queue:
|
||||
current_id, path = queue.popleft()
|
||||
|
||||
if current_id == to_id:
|
||||
return [self.agents[agent_id] for agent_id in path]
|
||||
|
||||
for subordinate_id in self.hierarchy_relationships.get(current_id, set()):
|
||||
if subordinate_id not in visited:
|
||||
visited.add(subordinate_id)
|
||||
queue.append((subordinate_id, [*path, subordinate_id]))
|
||||
|
||||
return None
|
||||
|
||||
def get_capability_distribution(self) -> dict[CapabilityType, dict[str, int]]:
|
||||
"""Get distribution of capabilities across all agents."""
|
||||
distribution: dict[CapabilityType, dict[str, int]] = defaultdict(lambda: defaultdict(int))
|
||||
|
||||
for capabilities in self.agent_capabilities.values():
|
||||
for capability in capabilities:
|
||||
proficiency_level = "high" if capability.proficiency_level >= 0.8 else \
|
||||
"medium" if capability.proficiency_level >= 0.5 else "low"
|
||||
distribution[capability.capability_type][proficiency_level] += 1
|
||||
|
||||
return dict(distribution)
|
||||
|
||||
def _get_agent_id(self, agent: BaseAgent) -> str:
|
||||
"""Get a unique identifier for an agent."""
|
||||
return f"{agent.role}_{id(agent)}"
|
||||
|
||||
def _calculate_capability_match_score(
|
||||
self,
|
||||
agent_id: str,
|
||||
requirements: list[TaskRequirement]
|
||||
) -> float:
|
||||
"""Calculate how well an agent's capabilities match task requirements."""
|
||||
if not requirements:
|
||||
return 1.0
|
||||
|
||||
agent_capabilities = self.agent_capabilities.get(agent_id, [])
|
||||
if not agent_capabilities:
|
||||
return 0.0
|
||||
|
||||
total_weight = sum(req.weight for req in requirements)
|
||||
if total_weight == 0:
|
||||
return 0.0
|
||||
|
||||
weighted_score = 0.0
|
||||
|
||||
for requirement in requirements:
|
||||
best_match_score = 0.0
|
||||
|
||||
for capability in agent_capabilities:
|
||||
if self._capabilities_match(capability, requirement):
|
||||
proficiency_score = min(capability.proficiency_level / requirement.minimum_proficiency, 1.0)
|
||||
confidence_factor = capability.confidence_score
|
||||
match_score = proficiency_score * confidence_factor
|
||||
best_match_score = max(best_match_score, match_score)
|
||||
|
||||
weighted_score += best_match_score * requirement.weight
|
||||
|
||||
return weighted_score / total_weight
|
||||
|
||||
def _calculate_detailed_capability_match(
|
||||
self,
|
||||
agent_id: str,
|
||||
requirements: list[TaskRequirement]
|
||||
) -> tuple[float, list[str]]:
|
||||
"""Calculate detailed capability match with matched capability names."""
|
||||
if not requirements:
|
||||
return 1.0, []
|
||||
|
||||
agent_capabilities = self.agent_capabilities.get(agent_id, [])
|
||||
if not agent_capabilities:
|
||||
return 0.0, []
|
||||
|
||||
total_weight = sum(req.weight for req in requirements)
|
||||
if total_weight == 0:
|
||||
return 0.0, []
|
||||
|
||||
weighted_score = 0.0
|
||||
matched_capabilities = []
|
||||
|
||||
for requirement in requirements:
|
||||
best_match_score = 0.0
|
||||
best_match_capability = None
|
||||
|
||||
for capability in agent_capabilities:
|
||||
if self._capabilities_match(capability, requirement):
|
||||
proficiency_score = min(capability.proficiency_level / requirement.minimum_proficiency, 1.0)
|
||||
confidence_factor = capability.confidence_score
|
||||
match_score = proficiency_score * confidence_factor
|
||||
|
||||
if match_score > best_match_score:
|
||||
best_match_score = match_score
|
||||
best_match_capability = capability.name
|
||||
|
||||
if best_match_capability:
|
||||
matched_capabilities.append(best_match_capability)
|
||||
|
||||
weighted_score += best_match_score * requirement.weight
|
||||
|
||||
return weighted_score / total_weight, matched_capabilities
|
||||
|
||||
def _capabilities_match(self, capability: AgentCapability, requirement: TaskRequirement) -> bool:
|
||||
"""Check if a capability matches a requirement."""
|
||||
if capability.name.lower() == requirement.capability_name.lower():
|
||||
return True
|
||||
|
||||
if capability.capability_type == requirement.capability_type:
|
||||
return True
|
||||
|
||||
capability_keywords = set(kw.lower() for kw in capability.keywords)
|
||||
requirement_keywords = set(kw.lower() for kw in requirement.keywords)
|
||||
|
||||
if capability_keywords.intersection(requirement_keywords):
|
||||
return True
|
||||
|
||||
return False
|
||||
188
src/crewai/responsibility/models.py
Normal file
188
src/crewai/responsibility/models.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""
|
||||
Data models for the formal responsibility tracking system.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class CapabilityType(str, Enum):
|
||||
"""Types of capabilities an agent can have."""
|
||||
TECHNICAL = "technical"
|
||||
ANALYTICAL = "analytical"
|
||||
CREATIVE = "creative"
|
||||
COMMUNICATION = "communication"
|
||||
LEADERSHIP = "leadership"
|
||||
DOMAIN_SPECIFIC = "domain_specific"
|
||||
|
||||
|
||||
class AgentCapability(BaseModel):
|
||||
"""Represents a specific capability of an agent."""
|
||||
|
||||
id: UUID = Field(default_factory=uuid4)
|
||||
name: str = Field(..., description="Name of the capability")
|
||||
capability_type: CapabilityType = Field(..., description="Type of capability")
|
||||
proficiency_level: float = Field(
|
||||
...,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="Proficiency level from 0.0 to 1.0"
|
||||
)
|
||||
confidence_score: float = Field(
|
||||
...,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="Confidence in this capability assessment"
|
||||
)
|
||||
description: str | None = Field(None, description="Detailed description of the capability")
|
||||
keywords: list[str] = Field(default_factory=list, description="Keywords associated with this capability")
|
||||
last_updated: datetime = Field(default_factory=datetime.utcnow)
|
||||
|
||||
def update_proficiency(self, new_level: float, confidence: float) -> None:
|
||||
"""Update proficiency level and confidence."""
|
||||
self.proficiency_level = max(0.0, min(1.0, new_level))
|
||||
self.confidence_score = max(0.0, min(1.0, confidence))
|
||||
self.last_updated = datetime.utcnow()
|
||||
|
||||
|
||||
class ResponsibilityAssignment(BaseModel):
|
||||
"""Represents the assignment of responsibility for a task to an agent."""
|
||||
|
||||
id: UUID = Field(default_factory=uuid4)
|
||||
agent_id: str = Field(..., description="ID of the assigned agent")
|
||||
task_id: str = Field(..., description="ID of the task")
|
||||
responsibility_score: float = Field(
|
||||
...,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="Calculated responsibility score"
|
||||
)
|
||||
capability_matches: list[str] = Field(
|
||||
default_factory=list,
|
||||
description="Capabilities that matched for this assignment"
|
||||
)
|
||||
reasoning: str = Field(..., description="Explanation for this assignment")
|
||||
assigned_at: datetime = Field(default_factory=datetime.utcnow)
|
||||
completed_at: datetime | None = Field(None)
|
||||
success: bool | None = Field(None, description="Whether the assignment was successful")
|
||||
|
||||
def mark_completed(self, success: bool) -> None:
|
||||
"""Mark the assignment as completed."""
|
||||
self.completed_at = datetime.utcnow()
|
||||
self.success = success
|
||||
|
||||
|
||||
class AccountabilityRecord(BaseModel):
|
||||
"""Records agent actions and decisions for accountability tracking."""
|
||||
|
||||
id: UUID = Field(default_factory=uuid4)
|
||||
agent_id: str = Field(..., description="ID of the agent")
|
||||
action_type: str = Field(..., description="Type of action taken")
|
||||
action_description: str = Field(..., description="Description of the action")
|
||||
task_id: str | None = Field(None, description="Related task ID if applicable")
|
||||
context: dict[str, Any] = Field(default_factory=dict, description="Additional context")
|
||||
outcome: str | None = Field(None, description="Outcome of the action")
|
||||
success: bool | None = Field(None, description="Whether the action was successful")
|
||||
timestamp: datetime = Field(default_factory=datetime.utcnow)
|
||||
|
||||
def set_outcome(self, outcome: str, success: bool) -> None:
|
||||
"""Set the outcome of the action."""
|
||||
self.outcome = outcome
|
||||
self.success = success
|
||||
|
||||
|
||||
class PerformanceMetrics(BaseModel):
|
||||
"""Performance metrics for an agent."""
|
||||
|
||||
agent_id: str = Field(..., description="ID of the agent")
|
||||
total_tasks: int = Field(default=0, description="Total number of tasks assigned")
|
||||
successful_tasks: int = Field(default=0, description="Number of successful tasks")
|
||||
failed_tasks: int = Field(default=0, description="Number of failed tasks")
|
||||
average_completion_time: float = Field(default=0.0, description="Average task completion time in seconds")
|
||||
quality_score: float = Field(
|
||||
default=0.5,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="Overall quality score"
|
||||
)
|
||||
efficiency_score: float = Field(
|
||||
default=0.5,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="Efficiency score based on completion times"
|
||||
)
|
||||
reliability_score: float = Field(
|
||||
default=0.5,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="Reliability score based on success rate"
|
||||
)
|
||||
last_updated: datetime = Field(default_factory=datetime.utcnow)
|
||||
|
||||
@property
|
||||
def success_rate(self) -> float:
|
||||
"""Calculate success rate."""
|
||||
if self.total_tasks == 0:
|
||||
return 0.0
|
||||
return self.successful_tasks / self.total_tasks
|
||||
|
||||
def update_metrics(
|
||||
self,
|
||||
task_success: bool,
|
||||
completion_time: float,
|
||||
quality_score: float | None = None
|
||||
) -> None:
|
||||
"""Update performance metrics with new task result."""
|
||||
self.total_tasks += 1
|
||||
if task_success:
|
||||
self.successful_tasks += 1
|
||||
else:
|
||||
self.failed_tasks += 1
|
||||
|
||||
alpha = 0.1 # Learning rate
|
||||
|
||||
if self.total_tasks == 1:
|
||||
self.average_completion_time = completion_time
|
||||
else:
|
||||
self.average_completion_time = (
|
||||
alpha * completion_time + (1 - alpha) * self.average_completion_time
|
||||
)
|
||||
|
||||
self.reliability_score = self.success_rate
|
||||
|
||||
if completion_time > 0:
|
||||
normalized_time = min(completion_time / 3600, 1.0) # Normalize to hours, cap at 1
|
||||
self.efficiency_score = max(0.1, 1.0 - normalized_time)
|
||||
|
||||
if quality_score is not None:
|
||||
self.quality_score = (
|
||||
alpha * quality_score + (1 - alpha) * self.quality_score
|
||||
)
|
||||
|
||||
self.last_updated = datetime.utcnow()
|
||||
|
||||
|
||||
class TaskRequirement(BaseModel):
|
||||
"""Represents capability requirements for a task."""
|
||||
|
||||
capability_name: str = Field(..., description="Name of required capability")
|
||||
capability_type: CapabilityType = Field(..., description="Type of required capability")
|
||||
minimum_proficiency: float = Field(
|
||||
...,
|
||||
ge=0.0,
|
||||
le=1.0,
|
||||
description="Minimum required proficiency level"
|
||||
)
|
||||
weight: float = Field(
|
||||
default=1.0,
|
||||
ge=0.0,
|
||||
description="Weight/importance of this requirement"
|
||||
)
|
||||
keywords: list[str] = Field(
|
||||
default_factory=list,
|
||||
description="Keywords that help match capabilities"
|
||||
)
|
||||
233
src/crewai/responsibility/performance.py
Normal file
233
src/crewai/responsibility/performance.py
Normal file
@@ -0,0 +1,233 @@
|
||||
"""
|
||||
Performance-based capability adjustment system.
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.responsibility.hierarchy import CapabilityHierarchy
|
||||
from crewai.responsibility.models import AgentCapability, PerformanceMetrics
|
||||
|
||||
|
||||
class PerformanceTracker:
|
||||
"""Tracks agent performance and adjusts capabilities accordingly."""
|
||||
|
||||
def __init__(self, hierarchy: CapabilityHierarchy):
|
||||
self.hierarchy = hierarchy
|
||||
self.performance_metrics: dict[str, PerformanceMetrics] = {}
|
||||
self.learning_rate = 0.1
|
||||
self.adjustment_threshold = 0.05 # Minimum change to trigger capability update
|
||||
|
||||
def record_task_completion(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
task_success: bool,
|
||||
completion_time: float,
|
||||
quality_score: float | None = None,
|
||||
capability_used: str | None = None
|
||||
) -> None:
|
||||
"""Record a task completion and update performance metrics."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
|
||||
if agent_id not in self.performance_metrics:
|
||||
self.performance_metrics[agent_id] = PerformanceMetrics(agent_id=agent_id)
|
||||
|
||||
metrics = self.performance_metrics[agent_id]
|
||||
metrics.update_metrics(task_success, completion_time, quality_score)
|
||||
|
||||
if capability_used and task_success is not None:
|
||||
self._update_capability_based_on_performance(
|
||||
agent, capability_used, task_success, quality_score
|
||||
)
|
||||
|
||||
def get_performance_metrics(self, agent: BaseAgent) -> PerformanceMetrics | None:
|
||||
"""Get performance metrics for an agent."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
return self.performance_metrics.get(agent_id)
|
||||
|
||||
def adjust_capabilities_based_on_performance(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
performance_window: timedelta = timedelta(days=7)
|
||||
) -> list[tuple[str, float, float]]:
|
||||
"""Adjust agent capabilities based on recent performance."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
metrics = self.performance_metrics.get(agent_id)
|
||||
|
||||
if not metrics:
|
||||
return []
|
||||
|
||||
adjustments = []
|
||||
agent_capabilities = self.hierarchy.get_agent_capabilities(agent)
|
||||
|
||||
for capability in agent_capabilities:
|
||||
old_proficiency = capability.proficiency_level
|
||||
old_confidence = capability.confidence_score
|
||||
|
||||
new_proficiency, new_confidence = self._calculate_adjusted_capability(
|
||||
capability, metrics
|
||||
)
|
||||
|
||||
proficiency_change = abs(new_proficiency - old_proficiency)
|
||||
confidence_change = abs(new_confidence - old_confidence)
|
||||
|
||||
if proficiency_change >= self.adjustment_threshold or confidence_change >= self.adjustment_threshold:
|
||||
self.hierarchy.update_agent_capability(
|
||||
agent, capability.name, new_proficiency, new_confidence
|
||||
)
|
||||
adjustments.append((capability.name, new_proficiency - old_proficiency, new_confidence - old_confidence))
|
||||
|
||||
return adjustments
|
||||
|
||||
def get_performance_trends(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
capability_name: str | None = None
|
||||
) -> dict[str, list[float]]:
|
||||
"""Get performance trends for an agent."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
metrics = self.performance_metrics.get(agent_id)
|
||||
|
||||
if not metrics:
|
||||
return {}
|
||||
|
||||
return {
|
||||
"success_rate": [metrics.success_rate],
|
||||
"quality_score": [metrics.quality_score],
|
||||
"efficiency_score": [metrics.efficiency_score],
|
||||
"reliability_score": [metrics.reliability_score]
|
||||
}
|
||||
|
||||
def identify_improvement_opportunities(
|
||||
self,
|
||||
agent: BaseAgent
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Identify areas where an agent could improve."""
|
||||
agent_id = self._get_agent_id(agent)
|
||||
metrics = self.performance_metrics.get(agent_id)
|
||||
|
||||
if not metrics:
|
||||
return []
|
||||
|
||||
opportunities = []
|
||||
|
||||
if metrics.success_rate < 0.7:
|
||||
opportunities.append({
|
||||
"area": "success_rate",
|
||||
"current_value": metrics.success_rate,
|
||||
"recommendation": "Focus on task completion accuracy and problem-solving skills"
|
||||
})
|
||||
|
||||
if metrics.quality_score < 0.6:
|
||||
opportunities.append({
|
||||
"area": "quality",
|
||||
"current_value": metrics.quality_score,
|
||||
"recommendation": "Improve attention to detail and output quality"
|
||||
})
|
||||
|
||||
if metrics.efficiency_score < 0.5:
|
||||
opportunities.append({
|
||||
"area": "efficiency",
|
||||
"current_value": metrics.efficiency_score,
|
||||
"recommendation": "Work on time management and process optimization"
|
||||
})
|
||||
|
||||
return opportunities
|
||||
|
||||
def compare_agent_performance(
|
||||
self,
|
||||
agents: list[BaseAgent],
|
||||
metric: str = "overall"
|
||||
) -> list[tuple[BaseAgent, float]]:
|
||||
"""Compare performance across multiple agents."""
|
||||
agent_scores = []
|
||||
|
||||
for agent in agents:
|
||||
agent_id = self._get_agent_id(agent)
|
||||
metrics = self.performance_metrics.get(agent_id)
|
||||
|
||||
if not metrics:
|
||||
continue
|
||||
|
||||
if metric == "overall":
|
||||
score = (
|
||||
metrics.success_rate * 0.4 +
|
||||
metrics.quality_score * 0.3 +
|
||||
metrics.efficiency_score * 0.2 +
|
||||
metrics.reliability_score * 0.1
|
||||
)
|
||||
elif metric == "success_rate":
|
||||
score = metrics.success_rate
|
||||
elif metric == "quality":
|
||||
score = metrics.quality_score
|
||||
elif metric == "efficiency":
|
||||
score = metrics.efficiency_score
|
||||
elif metric == "reliability":
|
||||
score = metrics.reliability_score
|
||||
else:
|
||||
continue
|
||||
|
||||
agent_scores.append((agent, score))
|
||||
|
||||
agent_scores.sort(key=lambda x: x[1], reverse=True)
|
||||
return agent_scores
|
||||
|
||||
def _update_capability_based_on_performance(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
capability_name: str,
|
||||
task_success: bool,
|
||||
quality_score: float | None
|
||||
) -> None:
|
||||
"""Update a specific capability based on task performance."""
|
||||
agent_capabilities = self.hierarchy.get_agent_capabilities(agent)
|
||||
|
||||
for capability in agent_capabilities:
|
||||
if capability.name == capability_name:
|
||||
if task_success:
|
||||
proficiency_adjustment = self.learning_rate * 0.1 # Small positive adjustment
|
||||
confidence_adjustment = self.learning_rate * 0.05
|
||||
else:
|
||||
proficiency_adjustment = -self.learning_rate * 0.05 # Small negative adjustment
|
||||
confidence_adjustment = -self.learning_rate * 0.1
|
||||
|
||||
if quality_score is not None:
|
||||
quality_factor = (quality_score - 0.5) * 2 # Scale to -1 to 1
|
||||
proficiency_adjustment *= (1 + quality_factor * 0.5)
|
||||
|
||||
new_proficiency = max(0.0, min(1.0, capability.proficiency_level + proficiency_adjustment))
|
||||
new_confidence = max(0.0, min(1.0, capability.confidence_score + confidence_adjustment))
|
||||
|
||||
self.hierarchy.update_agent_capability(
|
||||
agent, capability_name, new_proficiency, new_confidence
|
||||
)
|
||||
break
|
||||
|
||||
def _calculate_adjusted_capability(
|
||||
self,
|
||||
capability: AgentCapability,
|
||||
metrics: PerformanceMetrics
|
||||
) -> tuple[float, float]:
|
||||
"""Calculate adjusted capability values based on performance metrics."""
|
||||
performance_factor = (
|
||||
metrics.success_rate * 0.4 +
|
||||
metrics.quality_score * 0.3 +
|
||||
metrics.efficiency_score * 0.2 +
|
||||
metrics.reliability_score * 0.1
|
||||
)
|
||||
|
||||
adjustment_magnitude = (performance_factor - 0.5) * self.learning_rate
|
||||
|
||||
new_proficiency = capability.proficiency_level + adjustment_magnitude
|
||||
new_proficiency = max(0.0, min(1.0, new_proficiency))
|
||||
|
||||
confidence_adjustment = (metrics.reliability_score - 0.5) * self.learning_rate * 0.5
|
||||
new_confidence = capability.confidence_score + confidence_adjustment
|
||||
new_confidence = max(0.0, min(1.0, new_confidence))
|
||||
|
||||
return new_proficiency, new_confidence
|
||||
|
||||
def _get_agent_id(self, agent: BaseAgent) -> str:
|
||||
"""Get a unique identifier for an agent."""
|
||||
return f"{agent.role}_{id(agent)}"
|
||||
259
src/crewai/responsibility/system.py
Normal file
259
src/crewai/responsibility/system.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""
|
||||
Main responsibility system that coordinates all components.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.responsibility.accountability import AccountabilityLogger
|
||||
from crewai.responsibility.assignment import (
|
||||
AssignmentStrategy,
|
||||
ResponsibilityCalculator,
|
||||
)
|
||||
from crewai.responsibility.hierarchy import CapabilityHierarchy
|
||||
from crewai.responsibility.models import (
|
||||
AgentCapability,
|
||||
ResponsibilityAssignment,
|
||||
TaskRequirement,
|
||||
)
|
||||
from crewai.responsibility.performance import PerformanceTracker
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
class ResponsibilitySystem:
|
||||
"""Main system that coordinates all responsibility tracking components."""
|
||||
|
||||
def __init__(self):
|
||||
self.hierarchy = CapabilityHierarchy()
|
||||
self.calculator = ResponsibilityCalculator(self.hierarchy)
|
||||
self.accountability = AccountabilityLogger()
|
||||
self.performance = PerformanceTracker(self.hierarchy)
|
||||
self.enabled = True
|
||||
|
||||
def register_agent(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
capabilities: list[AgentCapability],
|
||||
supervisor: BaseAgent | None = None
|
||||
) -> None:
|
||||
"""Register an agent with the responsibility system."""
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
self.hierarchy.add_agent(agent, capabilities)
|
||||
|
||||
if supervisor:
|
||||
self.hierarchy.set_supervision_relationship(supervisor, agent)
|
||||
|
||||
self.accountability.log_action(
|
||||
agent=agent,
|
||||
action_type="registration",
|
||||
action_description=f"Agent registered with {len(capabilities)} capabilities",
|
||||
context={"capabilities": [cap.name for cap in capabilities]}
|
||||
)
|
||||
|
||||
def assign_task_responsibility(
|
||||
self,
|
||||
task: Task,
|
||||
requirements: list[TaskRequirement],
|
||||
strategy: AssignmentStrategy = AssignmentStrategy.GREEDY,
|
||||
exclude_agents: list[BaseAgent] | None = None
|
||||
) -> ResponsibilityAssignment | None:
|
||||
"""Assign responsibility for a task to the best agent."""
|
||||
if not self.enabled:
|
||||
return None
|
||||
|
||||
assignment = self.calculator.calculate_responsibility_assignment(
|
||||
task, requirements, strategy, exclude_agents
|
||||
)
|
||||
|
||||
if assignment:
|
||||
agent = self._get_agent_by_id(assignment.agent_id)
|
||||
if agent:
|
||||
self.calculator.update_workload(agent, 1)
|
||||
|
||||
self.accountability.log_action(
|
||||
agent=agent,
|
||||
action_type="task_assignment",
|
||||
action_description=f"Assigned responsibility for task: {task.description[:100]}...",
|
||||
task=task,
|
||||
context={
|
||||
"responsibility_score": assignment.responsibility_score,
|
||||
"capability_matches": assignment.capability_matches,
|
||||
"strategy": strategy.value
|
||||
}
|
||||
)
|
||||
|
||||
return assignment
|
||||
|
||||
def complete_task(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
task: Task,
|
||||
success: bool,
|
||||
completion_time: float,
|
||||
quality_score: float | None = None,
|
||||
outcome_description: str = ""
|
||||
) -> None:
|
||||
"""Record task completion and update performance metrics."""
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
self.performance.record_task_completion(
|
||||
agent, success, completion_time, quality_score
|
||||
)
|
||||
|
||||
self.calculator.update_workload(agent, -1)
|
||||
|
||||
self.accountability.log_task_completion(
|
||||
agent, task, success, outcome_description, completion_time
|
||||
)
|
||||
|
||||
adjustments = self.performance.adjust_capabilities_based_on_performance(agent)
|
||||
if adjustments:
|
||||
self.accountability.log_action(
|
||||
agent=agent,
|
||||
action_type="capability_adjustment",
|
||||
action_description="Capabilities adjusted based on performance",
|
||||
context={"adjustments": adjustments}
|
||||
)
|
||||
|
||||
def delegate_task(
|
||||
self,
|
||||
delegating_agent: BaseAgent,
|
||||
receiving_agent: BaseAgent,
|
||||
task: Task,
|
||||
reason: str
|
||||
) -> None:
|
||||
"""Record task delegation between agents."""
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
self.calculator.update_workload(delegating_agent, -1)
|
||||
self.calculator.update_workload(receiving_agent, 1)
|
||||
|
||||
self.accountability.log_delegation(
|
||||
delegating_agent, receiving_agent, task, reason
|
||||
)
|
||||
|
||||
def get_agent_status(self, agent: BaseAgent) -> dict[str, Any]:
|
||||
"""Get comprehensive status for an agent."""
|
||||
if not self.enabled:
|
||||
return {}
|
||||
|
||||
agent_id = self.hierarchy._get_agent_id(agent)
|
||||
capabilities = self.hierarchy.get_agent_capabilities(agent)
|
||||
performance = self.performance.get_performance_metrics(agent)
|
||||
recent_records = self.accountability.get_agent_records(
|
||||
agent, since=datetime.utcnow() - timedelta(days=7)
|
||||
)
|
||||
current_workload = self.calculator.current_workloads.get(agent_id, 0)
|
||||
|
||||
return {
|
||||
"agent_id": agent_id,
|
||||
"role": agent.role,
|
||||
"capabilities": [
|
||||
{
|
||||
"name": cap.name,
|
||||
"type": cap.capability_type.value,
|
||||
"proficiency": cap.proficiency_level,
|
||||
"confidence": cap.confidence_score
|
||||
}
|
||||
for cap in capabilities
|
||||
],
|
||||
"performance": {
|
||||
"success_rate": performance.success_rate if performance else 0.0,
|
||||
"quality_score": performance.quality_score if performance else 0.0,
|
||||
"efficiency_score": performance.efficiency_score if performance else 0.0,
|
||||
"total_tasks": performance.total_tasks if performance else 0
|
||||
} if performance else None,
|
||||
"current_workload": current_workload,
|
||||
"recent_activity_count": len(recent_records)
|
||||
}
|
||||
|
||||
def get_system_overview(self) -> dict[str, Any]:
|
||||
"""Get overview of the entire responsibility system."""
|
||||
if not self.enabled:
|
||||
return {"enabled": False}
|
||||
|
||||
total_agents = len(self.hierarchy.agents)
|
||||
capability_distribution = self.hierarchy.get_capability_distribution()
|
||||
workload_distribution = self.calculator.get_workload_distribution()
|
||||
|
||||
all_performance = list(self.performance.performance_metrics.values())
|
||||
avg_success_rate = sum(p.success_rate for p in all_performance) / len(all_performance) if all_performance else 0.0
|
||||
avg_quality = sum(p.quality_score for p in all_performance) / len(all_performance) if all_performance else 0.0
|
||||
|
||||
return {
|
||||
"enabled": True,
|
||||
"total_agents": total_agents,
|
||||
"capability_distribution": capability_distribution,
|
||||
"workload_distribution": workload_distribution,
|
||||
"system_performance": {
|
||||
"average_success_rate": avg_success_rate,
|
||||
"average_quality_score": avg_quality,
|
||||
"total_tasks_completed": sum(p.total_tasks for p in all_performance)
|
||||
},
|
||||
"total_accountability_records": len(self.accountability.records)
|
||||
}
|
||||
|
||||
def generate_recommendations(self) -> list[dict[str, Any]]:
|
||||
"""Generate system-wide recommendations for improvement."""
|
||||
if not self.enabled:
|
||||
return []
|
||||
|
||||
recommendations = []
|
||||
|
||||
workloads = self.calculator.get_workload_distribution()
|
||||
if workloads:
|
||||
max_workload = max(workloads.values())
|
||||
min_workload = min(workloads.values())
|
||||
|
||||
if max_workload - min_workload > 3: # Significant imbalance
|
||||
recommendations.append({
|
||||
"type": "workload_balancing",
|
||||
"priority": "high",
|
||||
"description": "Workload imbalance detected. Consider redistributing tasks.",
|
||||
"details": {"max_workload": max_workload, "min_workload": min_workload}
|
||||
})
|
||||
|
||||
capability_dist = self.hierarchy.get_capability_distribution()
|
||||
for cap_type, levels in capability_dist.items():
|
||||
total_agents_with_cap = sum(levels.values())
|
||||
if total_agents_with_cap < 2: # Too few agents with this capability
|
||||
recommendations.append({
|
||||
"type": "capability_gap",
|
||||
"priority": "medium",
|
||||
"description": f"Limited coverage for {cap_type.value} capabilities",
|
||||
"details": {"capability_type": cap_type.value, "agent_count": total_agents_with_cap}
|
||||
})
|
||||
|
||||
for agent_id, metrics in self.performance.performance_metrics.items():
|
||||
if metrics.success_rate < 0.6: # Low success rate
|
||||
agent = self._get_agent_by_id(agent_id)
|
||||
if agent:
|
||||
recommendations.append({
|
||||
"type": "performance_improvement",
|
||||
"priority": "high",
|
||||
"description": f"Agent {agent.role} has low success rate",
|
||||
"details": {
|
||||
"agent_role": agent.role,
|
||||
"success_rate": metrics.success_rate,
|
||||
"improvement_opportunities": self.performance.identify_improvement_opportunities(agent)
|
||||
}
|
||||
})
|
||||
|
||||
return recommendations
|
||||
|
||||
def enable_system(self) -> None:
|
||||
"""Enable the responsibility system."""
|
||||
self.enabled = True
|
||||
|
||||
def disable_system(self) -> None:
|
||||
"""Disable the responsibility system."""
|
||||
self.enabled = False
|
||||
|
||||
def _get_agent_by_id(self, agent_id: str) -> BaseAgent | None:
|
||||
"""Get agent by ID."""
|
||||
return self.hierarchy.agents.get(agent_id)
|
||||
@@ -27,4 +27,19 @@ class DelegateWorkTool(BaseAgentTool):
|
||||
**kwargs,
|
||||
) -> str:
|
||||
coworker = self._get_coworker(coworker, **kwargs)
|
||||
|
||||
if hasattr(self, 'agents') and self.agents:
|
||||
delegating_agent = kwargs.get('delegating_agent')
|
||||
if delegating_agent and hasattr(delegating_agent, 'responsibility_system'):
|
||||
responsibility_system = delegating_agent.responsibility_system
|
||||
if responsibility_system and responsibility_system.enabled:
|
||||
task_obj = kwargs.get('task_obj')
|
||||
if task_obj:
|
||||
responsibility_system.delegate_task(
|
||||
delegating_agent=delegating_agent,
|
||||
receiving_agent=coworker,
|
||||
task=task_obj,
|
||||
reason=f"Delegation based on capability match for: {task[:100]}..."
|
||||
)
|
||||
|
||||
return self._execute(coworker, task, context)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field, PrivateAttr, model_validator
|
||||
|
||||
@@ -12,11 +12,11 @@ from crewai.utilities.logger import Logger
|
||||
class RPMController(BaseModel):
|
||||
"""Manages requests per minute limiting."""
|
||||
|
||||
max_rpm: Optional[int] = Field(default=None)
|
||||
max_rpm: int | None = Field(default=None)
|
||||
logger: Logger = Field(default_factory=lambda: Logger(verbose=False))
|
||||
_current_rpm: int = PrivateAttr(default=0)
|
||||
_timer: Optional[threading.Timer] = PrivateAttr(default=None)
|
||||
_lock: Optional[threading.Lock] = PrivateAttr(default=None)
|
||||
_timer: Any = PrivateAttr(default=None)
|
||||
_lock: Any = PrivateAttr(default=None)
|
||||
_shutdown_flag: bool = PrivateAttr(default=False)
|
||||
|
||||
@model_validator(mode="after")
|
||||
@@ -35,7 +35,7 @@ class RPMController(BaseModel):
|
||||
if self.max_rpm is not None and self._current_rpm < self.max_rpm:
|
||||
self._current_rpm += 1
|
||||
return True
|
||||
elif self.max_rpm is not None:
|
||||
if self.max_rpm is not None:
|
||||
self.logger.log(
|
||||
"info", "Max RPM reached, waiting for next minute to start."
|
||||
)
|
||||
|
||||
3
tests/responsibility/__init__.py
Normal file
3
tests/responsibility/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
Tests for the formal responsibility tracking system.
|
||||
"""
|
||||
199
tests/responsibility/test_accountability.py
Normal file
199
tests/responsibility/test_accountability.py
Normal file
@@ -0,0 +1,199 @@
|
||||
"""
|
||||
Tests for accountability logging system.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import Mock
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.task import Task
|
||||
from crewai.responsibility.accountability import AccountabilityLogger
|
||||
|
||||
|
||||
class TestAccountabilityLogger:
|
||||
@pytest.fixture
|
||||
def logger(self):
|
||||
return AccountabilityLogger()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_agent(self):
|
||||
agent = Mock(spec=BaseAgent)
|
||||
agent.role = "Test Agent"
|
||||
return agent
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task(self):
|
||||
task = Mock(spec=Task)
|
||||
task.id = "test_task_1"
|
||||
task.description = "Test task description"
|
||||
return task
|
||||
|
||||
def test_log_action(self, logger, mock_agent, mock_task):
|
||||
context = {"complexity": "high", "priority": "urgent"}
|
||||
|
||||
record = logger.log_action(
|
||||
agent=mock_agent,
|
||||
action_type="task_execution",
|
||||
action_description="Executed data processing task",
|
||||
task=mock_task,
|
||||
context=context
|
||||
)
|
||||
|
||||
assert record.agent_id == "Test Agent_" + str(id(mock_agent))
|
||||
assert record.action_type == "task_execution"
|
||||
assert record.action_description == "Executed data processing task"
|
||||
assert record.task_id == "test_task_1"
|
||||
assert record.context["complexity"] == "high"
|
||||
assert len(logger.records) == 1
|
||||
|
||||
def test_log_decision(self, logger, mock_agent, mock_task):
|
||||
alternatives = ["Option A", "Option B", "Option C"]
|
||||
|
||||
record = logger.log_decision(
|
||||
agent=mock_agent,
|
||||
decision="Chose Option A",
|
||||
reasoning="Best performance characteristics",
|
||||
task=mock_task,
|
||||
alternatives_considered=alternatives
|
||||
)
|
||||
|
||||
assert record.action_type == "decision"
|
||||
assert record.action_description == "Chose Option A"
|
||||
assert record.context["reasoning"] == "Best performance characteristics"
|
||||
assert record.context["alternatives_considered"] == alternatives
|
||||
|
||||
def test_log_delegation(self, logger, mock_task):
|
||||
delegating_agent = Mock(spec=BaseAgent)
|
||||
delegating_agent.role = "Manager"
|
||||
|
||||
receiving_agent = Mock(spec=BaseAgent)
|
||||
receiving_agent.role = "Developer"
|
||||
|
||||
record = logger.log_delegation(
|
||||
delegating_agent=delegating_agent,
|
||||
receiving_agent=receiving_agent,
|
||||
task=mock_task,
|
||||
delegation_reason="Specialized expertise required"
|
||||
)
|
||||
|
||||
assert record.action_type == "delegation"
|
||||
assert "Delegated task to Developer" in record.action_description
|
||||
assert record.context["receiving_agent_role"] == "Developer"
|
||||
assert record.context["delegation_reason"] == "Specialized expertise required"
|
||||
|
||||
def test_log_task_completion(self, logger, mock_agent, mock_task):
|
||||
record = logger.log_task_completion(
|
||||
agent=mock_agent,
|
||||
task=mock_task,
|
||||
success=True,
|
||||
outcome_description="Task completed successfully with high quality",
|
||||
completion_time=1800.0
|
||||
)
|
||||
|
||||
assert record.action_type == "task_completion"
|
||||
assert record.success is True
|
||||
assert record.outcome == "Task completed successfully with high quality"
|
||||
assert record.context["completion_time"] == 1800.0
|
||||
|
||||
def test_get_agent_records(self, logger, mock_agent, mock_task):
|
||||
logger.log_action(mock_agent, "action1", "Description 1", mock_task)
|
||||
logger.log_action(mock_agent, "action2", "Description 2", mock_task)
|
||||
logger.log_decision(mock_agent, "decision1", "Reasoning", mock_task)
|
||||
|
||||
all_records = logger.get_agent_records(mock_agent)
|
||||
assert len(all_records) == 3
|
||||
|
||||
decision_records = logger.get_agent_records(mock_agent, action_type="decision")
|
||||
assert len(decision_records) == 1
|
||||
assert decision_records[0].action_type == "decision"
|
||||
|
||||
recent_time = datetime.utcnow() - timedelta(minutes=1)
|
||||
recent_records = logger.get_agent_records(mock_agent, since=recent_time)
|
||||
assert len(recent_records) == 3 # All should be recent
|
||||
|
||||
def test_get_task_records(self, logger, mock_agent, mock_task):
|
||||
other_task = Mock(spec=Task)
|
||||
other_task.id = "other_task"
|
||||
|
||||
logger.log_action(mock_agent, "action1", "Description 1", mock_task)
|
||||
logger.log_action(mock_agent, "action2", "Description 2", other_task)
|
||||
logger.log_action(mock_agent, "action3", "Description 3", mock_task)
|
||||
|
||||
task_records = logger.get_task_records(mock_task)
|
||||
assert len(task_records) == 2
|
||||
|
||||
for record in task_records:
|
||||
assert record.task_id == "test_task_1"
|
||||
|
||||
def test_get_delegation_chain(self, logger, mock_task):
|
||||
manager = Mock(spec=BaseAgent)
|
||||
manager.role = "Manager"
|
||||
|
||||
supervisor = Mock(spec=BaseAgent)
|
||||
supervisor.role = "Supervisor"
|
||||
|
||||
developer = Mock(spec=BaseAgent)
|
||||
developer.role = "Developer"
|
||||
|
||||
logger.log_delegation(manager, supervisor, mock_task, "Initial delegation")
|
||||
logger.log_delegation(supervisor, developer, mock_task, "Further delegation")
|
||||
|
||||
chain = logger.get_delegation_chain(mock_task)
|
||||
assert len(chain) == 2
|
||||
|
||||
assert chain[0].context["receiving_agent_role"] == "Supervisor"
|
||||
assert chain[1].context["receiving_agent_role"] == "Developer"
|
||||
|
||||
def test_generate_accountability_report(self, logger, mock_agent, mock_task):
|
||||
record1 = logger.log_action(mock_agent, "task_execution", "Task 1", mock_task)
|
||||
record1.set_outcome("Success", True)
|
||||
|
||||
record2 = logger.log_action(mock_agent, "task_execution", "Task 2", mock_task)
|
||||
record2.set_outcome("Failed", False)
|
||||
|
||||
record3 = logger.log_decision(mock_agent, "Decision 1", "Reasoning", mock_task)
|
||||
record3.set_outcome("Good decision", True)
|
||||
|
||||
report = logger.generate_accountability_report(agent=mock_agent)
|
||||
|
||||
assert report["total_records"] == 3
|
||||
assert report["action_counts"]["task_execution"] == 2
|
||||
assert report["action_counts"]["decision"] == 1
|
||||
assert report["success_counts"]["task_execution"] == 1
|
||||
assert report["failure_counts"]["task_execution"] == 1
|
||||
assert report["success_rates"]["task_execution"] == 0.5
|
||||
assert report["success_rates"]["decision"] == 1.0
|
||||
|
||||
assert len(report["recent_actions"]) == 3
|
||||
|
||||
def test_generate_system_wide_report(self, logger, mock_task):
|
||||
agent1 = Mock(spec=BaseAgent)
|
||||
agent1.role = "Agent 1"
|
||||
|
||||
agent2 = Mock(spec=BaseAgent)
|
||||
agent2.role = "Agent 2"
|
||||
|
||||
logger.log_action(agent1, "task_execution", "Task 1", mock_task)
|
||||
logger.log_action(agent2, "task_execution", "Task 2", mock_task)
|
||||
|
||||
report = logger.generate_accountability_report()
|
||||
|
||||
assert report["agent_id"] == "all_agents"
|
||||
assert report["total_records"] == 2
|
||||
assert report["action_counts"]["task_execution"] == 2
|
||||
|
||||
def test_time_filtered_report(self, logger, mock_agent, mock_task):
|
||||
logger.log_action(mock_agent, "old_action", "Old action", mock_task)
|
||||
|
||||
report = logger.generate_accountability_report(
|
||||
agent=mock_agent,
|
||||
time_period=timedelta(hours=1)
|
||||
)
|
||||
|
||||
assert report["total_records"] == 1
|
||||
|
||||
report = logger.generate_accountability_report(
|
||||
agent=mock_agent,
|
||||
time_period=timedelta(seconds=1)
|
||||
)
|
||||
221
tests/responsibility/test_assignment.py
Normal file
221
tests/responsibility/test_assignment.py
Normal file
@@ -0,0 +1,221 @@
|
||||
"""
|
||||
Tests for mathematical responsibility assignment.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.task import Task
|
||||
from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement
|
||||
from crewai.responsibility.hierarchy import CapabilityHierarchy
|
||||
from crewai.responsibility.assignment import ResponsibilityCalculator, AssignmentStrategy
|
||||
|
||||
|
||||
class TestResponsibilityCalculator:
|
||||
@pytest.fixture
|
||||
def hierarchy(self):
|
||||
return CapabilityHierarchy()
|
||||
|
||||
@pytest.fixture
|
||||
def calculator(self, hierarchy):
|
||||
return ResponsibilityCalculator(hierarchy)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task(self):
|
||||
task = Mock(spec=Task)
|
||||
task.id = "test_task_1"
|
||||
task.description = "Test task description"
|
||||
return task
|
||||
|
||||
@pytest.fixture
|
||||
def python_agent(self, hierarchy):
|
||||
agent = Mock(spec=BaseAgent)
|
||||
agent.role = "Python Developer"
|
||||
|
||||
capability = AgentCapability(
|
||||
name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
proficiency_level=0.9,
|
||||
confidence_score=0.8,
|
||||
keywords=["python", "programming"]
|
||||
)
|
||||
|
||||
hierarchy.add_agent(agent, [capability])
|
||||
return agent
|
||||
|
||||
@pytest.fixture
|
||||
def analysis_agent(self, hierarchy):
|
||||
agent = Mock(spec=BaseAgent)
|
||||
agent.role = "Data Analyst"
|
||||
|
||||
capability = AgentCapability(
|
||||
name="Data Analysis",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
proficiency_level=0.8,
|
||||
confidence_score=0.9,
|
||||
keywords=["data", "analysis"]
|
||||
)
|
||||
|
||||
hierarchy.add_agent(agent, [capability])
|
||||
return agent
|
||||
|
||||
def test_greedy_assignment(self, calculator, mock_task, python_agent):
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
assignment = calculator.calculate_responsibility_assignment(
|
||||
mock_task, requirements, AssignmentStrategy.GREEDY
|
||||
)
|
||||
|
||||
assert assignment is not None
|
||||
assert assignment.task_id == "test_task_1"
|
||||
assert assignment.responsibility_score > 0.5
|
||||
assert "Python Programming" in assignment.capability_matches
|
||||
assert "Greedy assignment" in assignment.reasoning
|
||||
|
||||
def test_balanced_assignment(self, calculator, mock_task, python_agent, analysis_agent):
|
||||
calculator.update_workload(python_agent, 5) # High workload
|
||||
calculator.update_workload(analysis_agent, 1) # Low workload
|
||||
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="General Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.3,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
assignment = calculator.calculate_responsibility_assignment(
|
||||
mock_task, requirements, AssignmentStrategy.BALANCED
|
||||
)
|
||||
|
||||
assert assignment is not None
|
||||
assert "Balanced assignment" in assignment.reasoning
|
||||
|
||||
def test_optimal_assignment(self, calculator, mock_task, python_agent):
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
assignment = calculator.calculate_responsibility_assignment(
|
||||
mock_task, requirements, AssignmentStrategy.OPTIMAL
|
||||
)
|
||||
|
||||
assert assignment is not None
|
||||
assert "Optimal assignment" in assignment.reasoning
|
||||
|
||||
def test_multi_agent_assignment(self, calculator, mock_task, python_agent, analysis_agent):
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=1.0
|
||||
),
|
||||
TaskRequirement(
|
||||
capability_name="Data Analysis",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=0.8
|
||||
)
|
||||
]
|
||||
|
||||
assignments = calculator.calculate_multi_agent_assignment(
|
||||
mock_task, requirements, max_agents=2
|
||||
)
|
||||
|
||||
assert len(assignments) <= 2
|
||||
assert len(assignments) > 0
|
||||
|
||||
agent_ids = [assignment.agent_id for assignment in assignments]
|
||||
assert len(agent_ids) == len(set(agent_ids))
|
||||
|
||||
def test_workload_update(self, calculator, python_agent):
|
||||
initial_workload = calculator.current_workloads.get(
|
||||
calculator.hierarchy._get_agent_id(python_agent), 0
|
||||
)
|
||||
|
||||
calculator.update_workload(python_agent, 3)
|
||||
|
||||
new_workload = calculator.current_workloads.get(
|
||||
calculator.hierarchy._get_agent_id(python_agent), 0
|
||||
)
|
||||
|
||||
assert new_workload == initial_workload + 3
|
||||
|
||||
calculator.update_workload(python_agent, -2)
|
||||
|
||||
final_workload = calculator.current_workloads.get(
|
||||
calculator.hierarchy._get_agent_id(python_agent), 0
|
||||
)
|
||||
|
||||
assert final_workload == new_workload - 2
|
||||
|
||||
def test_workload_distribution(self, calculator, python_agent, analysis_agent):
|
||||
calculator.update_workload(python_agent, 3)
|
||||
calculator.update_workload(analysis_agent, 1)
|
||||
|
||||
distribution = calculator.get_workload_distribution()
|
||||
|
||||
python_id = calculator.hierarchy._get_agent_id(python_agent)
|
||||
analysis_id = calculator.hierarchy._get_agent_id(analysis_agent)
|
||||
|
||||
assert distribution[python_id] == 3
|
||||
assert distribution[analysis_id] == 1
|
||||
|
||||
def test_exclude_agents(self, calculator, mock_task, python_agent, analysis_agent):
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.3,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
assignment = calculator.calculate_responsibility_assignment(
|
||||
mock_task, requirements, AssignmentStrategy.GREEDY,
|
||||
exclude_agents=[python_agent]
|
||||
)
|
||||
|
||||
if assignment: # If any agent was assigned
|
||||
python_id = calculator.hierarchy._get_agent_id(python_agent)
|
||||
assert assignment.agent_id != python_id
|
||||
|
||||
def test_no_capable_agents(self, calculator, mock_task):
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Quantum Computing",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.9,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
assignment = calculator.calculate_responsibility_assignment(
|
||||
mock_task, requirements, AssignmentStrategy.GREEDY
|
||||
)
|
||||
|
||||
assert assignment is None
|
||||
|
||||
def test_workload_penalty_calculation(self, calculator):
|
||||
assert calculator._calculate_workload_penalty(0) == 0.0
|
||||
|
||||
penalty_1 = calculator._calculate_workload_penalty(1)
|
||||
penalty_5 = calculator._calculate_workload_penalty(5)
|
||||
|
||||
assert penalty_1 < penalty_5 # Higher workload should have higher penalty
|
||||
assert penalty_5 <= 0.8 # Should not exceed maximum penalty
|
||||
208
tests/responsibility/test_hierarchy.py
Normal file
208
tests/responsibility/test_hierarchy.py
Normal file
@@ -0,0 +1,208 @@
|
||||
"""
|
||||
Tests for capability-based agent hierarchy.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.responsibility.models import AgentCapability, CapabilityType, TaskRequirement
|
||||
from crewai.responsibility.hierarchy import CapabilityHierarchy
|
||||
|
||||
|
||||
class TestCapabilityHierarchy:
|
||||
@pytest.fixture
|
||||
def hierarchy(self):
|
||||
return CapabilityHierarchy()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_agent(self):
|
||||
agent = Mock(spec=BaseAgent)
|
||||
agent.role = "Test Agent"
|
||||
return agent
|
||||
|
||||
@pytest.fixture
|
||||
def python_capability(self):
|
||||
return AgentCapability(
|
||||
name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
proficiency_level=0.8,
|
||||
confidence_score=0.9,
|
||||
keywords=["python", "programming"]
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def analysis_capability(self):
|
||||
return AgentCapability(
|
||||
name="Data Analysis",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
proficiency_level=0.7,
|
||||
confidence_score=0.8,
|
||||
keywords=["data", "analysis", "statistics"]
|
||||
)
|
||||
|
||||
def test_add_agent(self, hierarchy, mock_agent, python_capability):
|
||||
capabilities = [python_capability]
|
||||
hierarchy.add_agent(mock_agent, capabilities)
|
||||
|
||||
assert len(hierarchy.agents) == 1
|
||||
assert len(hierarchy.agent_capabilities) == 1
|
||||
assert "Python Programming" in hierarchy.capability_index
|
||||
|
||||
def test_remove_agent(self, hierarchy, mock_agent, python_capability):
|
||||
capabilities = [python_capability]
|
||||
hierarchy.add_agent(mock_agent, capabilities)
|
||||
|
||||
assert len(hierarchy.agents) == 1
|
||||
|
||||
hierarchy.remove_agent(mock_agent)
|
||||
|
||||
assert len(hierarchy.agents) == 0
|
||||
assert len(hierarchy.agent_capabilities) == 0
|
||||
assert len(hierarchy.capability_index["Python Programming"]) == 0
|
||||
|
||||
def test_supervision_relationship(self, hierarchy):
|
||||
supervisor = Mock(spec=BaseAgent)
|
||||
supervisor.role = "Supervisor"
|
||||
subordinate = Mock(spec=BaseAgent)
|
||||
subordinate.role = "Subordinate"
|
||||
|
||||
hierarchy.add_agent(supervisor, [])
|
||||
hierarchy.add_agent(subordinate, [])
|
||||
|
||||
hierarchy.set_supervision_relationship(supervisor, subordinate)
|
||||
|
||||
subordinates = hierarchy.get_subordinates(supervisor)
|
||||
assert len(subordinates) == 1
|
||||
assert subordinates[0] == subordinate
|
||||
|
||||
def test_update_agent_capability(self, hierarchy, mock_agent, python_capability):
|
||||
hierarchy.add_agent(mock_agent, [python_capability])
|
||||
|
||||
success = hierarchy.update_agent_capability(
|
||||
mock_agent, "Python Programming", 0.9, 0.95
|
||||
)
|
||||
|
||||
assert success is True
|
||||
|
||||
capabilities = hierarchy.get_agent_capabilities(mock_agent)
|
||||
updated_cap = next(cap for cap in capabilities if cap.name == "Python Programming")
|
||||
assert updated_cap.proficiency_level == 0.9
|
||||
assert updated_cap.confidence_score == 0.95
|
||||
|
||||
def test_find_capable_agents(self, hierarchy, mock_agent, python_capability):
|
||||
hierarchy.add_agent(mock_agent, [python_capability])
|
||||
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
capable_agents = hierarchy.find_capable_agents(requirements)
|
||||
|
||||
assert len(capable_agents) == 1
|
||||
assert capable_agents[0][0] == mock_agent
|
||||
assert capable_agents[0][1] > 0.5 # Should have a good match score
|
||||
|
||||
def test_get_best_agent_for_task(self, hierarchy, python_capability, analysis_capability):
|
||||
agent1 = Mock(spec=BaseAgent)
|
||||
agent1.role = "Python Developer"
|
||||
agent2 = Mock(spec=BaseAgent)
|
||||
agent2.role = "Data Analyst"
|
||||
|
||||
hierarchy.add_agent(agent1, [python_capability])
|
||||
hierarchy.add_agent(agent2, [analysis_capability])
|
||||
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
result = hierarchy.get_best_agent_for_task(requirements)
|
||||
|
||||
assert result is not None
|
||||
best_agent, score, matches = result
|
||||
assert best_agent == agent1 # Python developer should be chosen
|
||||
assert "Python Programming" in matches
|
||||
|
||||
def test_capability_distribution(self, hierarchy, python_capability, analysis_capability):
|
||||
agent1 = Mock(spec=BaseAgent)
|
||||
agent1.role = "Developer"
|
||||
agent2 = Mock(spec=BaseAgent)
|
||||
agent2.role = "Analyst"
|
||||
|
||||
hierarchy.add_agent(agent1, [python_capability])
|
||||
hierarchy.add_agent(agent2, [analysis_capability])
|
||||
|
||||
distribution = hierarchy.get_capability_distribution()
|
||||
|
||||
assert CapabilityType.TECHNICAL in distribution
|
||||
assert CapabilityType.ANALYTICAL in distribution
|
||||
assert distribution[CapabilityType.TECHNICAL]["high"] == 1 # Python capability is high proficiency
|
||||
assert distribution[CapabilityType.ANALYTICAL]["medium"] == 1 # Analysis capability is medium proficiency
|
||||
|
||||
def test_hierarchy_path(self, hierarchy):
|
||||
manager = Mock(spec=BaseAgent)
|
||||
manager.role = "Manager"
|
||||
supervisor = Mock(spec=BaseAgent)
|
||||
supervisor.role = "Supervisor"
|
||||
worker = Mock(spec=BaseAgent)
|
||||
worker.role = "Worker"
|
||||
|
||||
hierarchy.add_agent(manager, [])
|
||||
hierarchy.add_agent(supervisor, [])
|
||||
hierarchy.add_agent(worker, [])
|
||||
|
||||
hierarchy.set_supervision_relationship(manager, supervisor)
|
||||
hierarchy.set_supervision_relationship(supervisor, worker)
|
||||
|
||||
path = hierarchy.get_hierarchy_path(manager, worker)
|
||||
|
||||
assert path is not None
|
||||
assert len(path) == 3
|
||||
assert path[0] == manager
|
||||
assert path[1] == supervisor
|
||||
assert path[2] == worker
|
||||
|
||||
def test_capabilities_match(self, hierarchy, python_capability):
|
||||
requirement = TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5
|
||||
)
|
||||
|
||||
assert hierarchy._capabilities_match(python_capability, requirement) is True
|
||||
|
||||
requirement2 = TaskRequirement(
|
||||
capability_name="Different Name",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5
|
||||
)
|
||||
|
||||
assert hierarchy._capabilities_match(python_capability, requirement2) is True
|
||||
|
||||
requirement3 = TaskRequirement(
|
||||
capability_name="Different Name",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
minimum_proficiency=0.5,
|
||||
keywords=["python"]
|
||||
)
|
||||
|
||||
assert hierarchy._capabilities_match(python_capability, requirement3) is True
|
||||
|
||||
requirement4 = TaskRequirement(
|
||||
capability_name="Different Name",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
minimum_proficiency=0.5,
|
||||
keywords=["java"]
|
||||
)
|
||||
|
||||
assert hierarchy._capabilities_match(python_capability, requirement4) is False
|
||||
286
tests/responsibility/test_integration.py
Normal file
286
tests/responsibility/test_integration.py
Normal file
@@ -0,0 +1,286 @@
|
||||
"""
|
||||
Integration tests for the responsibility tracking system.
|
||||
"""
|
||||
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.responsibility.assignment import AssignmentStrategy
|
||||
from crewai.responsibility.models import (
|
||||
AgentCapability,
|
||||
CapabilityType,
|
||||
TaskRequirement,
|
||||
)
|
||||
from crewai.responsibility.system import ResponsibilitySystem
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
class TestResponsibilitySystemIntegration:
|
||||
@pytest.fixture
|
||||
def system(self):
|
||||
return ResponsibilitySystem()
|
||||
|
||||
@pytest.fixture
|
||||
def python_agent(self):
|
||||
agent = Mock(spec=BaseAgent)
|
||||
agent.role = "Python Developer"
|
||||
return agent
|
||||
|
||||
@pytest.fixture
|
||||
def analysis_agent(self):
|
||||
agent = Mock(spec=BaseAgent)
|
||||
agent.role = "Data Analyst"
|
||||
return agent
|
||||
|
||||
@pytest.fixture
|
||||
def python_capability(self):
|
||||
return AgentCapability(
|
||||
name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
proficiency_level=0.9,
|
||||
confidence_score=0.8,
|
||||
keywords=["python", "programming", "development"]
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def analysis_capability(self):
|
||||
return AgentCapability(
|
||||
name="Data Analysis",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
proficiency_level=0.8,
|
||||
confidence_score=0.9,
|
||||
keywords=["data", "analysis", "statistics"]
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task(self):
|
||||
task = Mock(spec=Task)
|
||||
task.id = "integration_test_task"
|
||||
task.description = "Complex data processing task requiring Python skills"
|
||||
return task
|
||||
|
||||
def test_full_workflow(self, system, python_agent, python_capability, mock_task):
|
||||
"""Test complete workflow from agent registration to task completion."""
|
||||
|
||||
system.register_agent(python_agent, [python_capability])
|
||||
|
||||
status = system.get_agent_status(python_agent)
|
||||
assert status["role"] == "Python Developer"
|
||||
assert len(status["capabilities"]) == 1
|
||||
assert status["capabilities"][0]["name"] == "Python Programming"
|
||||
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
assignment = system.assign_task_responsibility(mock_task, requirements)
|
||||
|
||||
assert assignment is not None
|
||||
assert assignment.task_id == "integration_test_task"
|
||||
assert assignment.responsibility_score > 0.5
|
||||
|
||||
updated_status = system.get_agent_status(python_agent)
|
||||
assert updated_status["current_workload"] == 1
|
||||
|
||||
system.complete_task(
|
||||
agent=python_agent,
|
||||
task=mock_task,
|
||||
success=True,
|
||||
completion_time=1800.0,
|
||||
quality_score=0.9,
|
||||
outcome_description="Task completed successfully"
|
||||
)
|
||||
|
||||
final_status = system.get_agent_status(python_agent)
|
||||
assert final_status["performance"]["total_tasks"] == 1
|
||||
assert final_status["performance"]["success_rate"] == 1.0
|
||||
assert final_status["current_workload"] == 0 # Should be decremented
|
||||
|
||||
def test_multi_agent_scenario(self, system, python_agent, analysis_agent,
|
||||
python_capability, analysis_capability, mock_task):
|
||||
"""Test scenario with multiple agents and capabilities."""
|
||||
|
||||
system.register_agent(python_agent, [python_capability])
|
||||
system.register_agent(analysis_agent, [analysis_capability])
|
||||
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.7,
|
||||
weight=1.0
|
||||
),
|
||||
TaskRequirement(
|
||||
capability_name="Data Analysis",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
minimum_proficiency=0.6,
|
||||
weight=0.8
|
||||
)
|
||||
]
|
||||
|
||||
greedy_assignment = system.assign_task_responsibility(
|
||||
mock_task, requirements, AssignmentStrategy.GREEDY
|
||||
)
|
||||
|
||||
assert greedy_assignment is not None
|
||||
|
||||
system.calculator.update_workload(python_agent, 5)
|
||||
|
||||
balanced_assignment = system.assign_task_responsibility(
|
||||
mock_task, requirements, AssignmentStrategy.BALANCED
|
||||
)
|
||||
|
||||
assert balanced_assignment is not None
|
||||
|
||||
def test_delegation_workflow(self, system, python_agent, analysis_agent,
|
||||
python_capability, analysis_capability, mock_task):
|
||||
"""Test task delegation between agents."""
|
||||
|
||||
system.register_agent(python_agent, [python_capability], supervisor=None)
|
||||
system.register_agent(analysis_agent, [analysis_capability], supervisor=python_agent)
|
||||
|
||||
system.delegate_task(
|
||||
delegating_agent=python_agent,
|
||||
receiving_agent=analysis_agent,
|
||||
task=mock_task,
|
||||
reason="Analysis expertise required"
|
||||
)
|
||||
|
||||
analysis_status = system.get_agent_status(analysis_agent)
|
||||
|
||||
assert analysis_status["current_workload"] > 0
|
||||
|
||||
delegation_records = system.accountability.get_agent_records(
|
||||
python_agent, action_type="delegation"
|
||||
)
|
||||
assert len(delegation_records) > 0
|
||||
|
||||
def test_performance_based_capability_adjustment(self, system, python_agent,
|
||||
python_capability, mock_task):
|
||||
"""Test that capabilities are adjusted based on performance."""
|
||||
|
||||
system.register_agent(python_agent, [python_capability])
|
||||
|
||||
for i in range(5):
|
||||
task = Mock(spec=Task)
|
||||
task.id = f"task_{i}"
|
||||
task.description = f"Task {i}"
|
||||
|
||||
system.complete_task(
|
||||
agent=python_agent,
|
||||
task=task,
|
||||
success=True,
|
||||
completion_time=1800.0,
|
||||
quality_score=0.9
|
||||
)
|
||||
|
||||
updated_capabilities = system.hierarchy.get_agent_capabilities(python_agent)
|
||||
|
||||
assert len(updated_capabilities) == 1
|
||||
|
||||
def test_system_overview_and_recommendations(self, system, python_agent,
|
||||
analysis_agent, python_capability,
|
||||
analysis_capability):
|
||||
"""Test system overview and recommendation generation."""
|
||||
|
||||
system.register_agent(python_agent, [python_capability])
|
||||
system.register_agent(analysis_agent, [analysis_capability])
|
||||
|
||||
overview = system.get_system_overview()
|
||||
|
||||
assert overview["enabled"] is True
|
||||
assert overview["total_agents"] == 2
|
||||
assert "capability_distribution" in overview
|
||||
assert "system_performance" in overview
|
||||
|
||||
recommendations = system.generate_recommendations()
|
||||
|
||||
assert isinstance(recommendations, list)
|
||||
|
||||
def test_system_enable_disable(self, system, python_agent, python_capability, mock_task):
|
||||
"""Test enabling and disabling the responsibility system."""
|
||||
|
||||
assert system.enabled is True
|
||||
|
||||
system.register_agent(python_agent, [python_capability])
|
||||
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
assignment = system.assign_task_responsibility(mock_task, requirements)
|
||||
assert assignment is not None
|
||||
|
||||
system.disable_system()
|
||||
assert system.enabled is False
|
||||
|
||||
disabled_assignment = system.assign_task_responsibility(mock_task, requirements)
|
||||
assert disabled_assignment is None
|
||||
|
||||
disabled_status = system.get_agent_status(python_agent)
|
||||
assert disabled_status == {}
|
||||
|
||||
system.enable_system()
|
||||
assert system.enabled is True
|
||||
|
||||
enabled_assignment = system.assign_task_responsibility(mock_task, requirements)
|
||||
assert enabled_assignment is not None
|
||||
|
||||
def test_accountability_tracking_integration(self, system, python_agent,
|
||||
python_capability, mock_task):
|
||||
"""Test that all operations are properly logged for accountability."""
|
||||
|
||||
system.register_agent(python_agent, [python_capability])
|
||||
|
||||
registration_records = system.accountability.get_agent_records(
|
||||
python_agent, action_type="registration"
|
||||
)
|
||||
assert len(registration_records) == 1
|
||||
|
||||
requirements = [
|
||||
TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.5,
|
||||
weight=1.0
|
||||
)
|
||||
]
|
||||
|
||||
system.assign_task_responsibility(mock_task, requirements)
|
||||
|
||||
assignment_records = system.accountability.get_agent_records(
|
||||
python_agent, action_type="task_assignment"
|
||||
)
|
||||
assert len(assignment_records) == 1
|
||||
|
||||
system.complete_task(
|
||||
agent=python_agent,
|
||||
task=mock_task,
|
||||
success=True,
|
||||
completion_time=1800.0,
|
||||
quality_score=0.9
|
||||
)
|
||||
|
||||
completion_records = system.accountability.get_agent_records(
|
||||
python_agent, action_type="task_completion"
|
||||
)
|
||||
assert len(completion_records) == 1
|
||||
|
||||
report = system.accountability.generate_accountability_report(agent=python_agent)
|
||||
|
||||
assert report["total_records"] >= 3 # At least registration, assignment, completion
|
||||
assert "registration" in report["action_counts"]
|
||||
assert "task_assignment" in report["action_counts"]
|
||||
assert "task_completion" in report["action_counts"]
|
||||
187
tests/responsibility/test_models.py
Normal file
187
tests/responsibility/test_models.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""
|
||||
Tests for responsibility tracking data models.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
from uuid import uuid4
|
||||
|
||||
from crewai.responsibility.models import (
|
||||
AgentCapability,
|
||||
CapabilityType,
|
||||
ResponsibilityAssignment,
|
||||
AccountabilityRecord,
|
||||
PerformanceMetrics,
|
||||
TaskRequirement
|
||||
)
|
||||
|
||||
|
||||
class TestAgentCapability:
|
||||
def test_create_capability(self):
|
||||
capability = AgentCapability(
|
||||
name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
proficiency_level=0.8,
|
||||
confidence_score=0.9,
|
||||
description="Expert in Python development",
|
||||
keywords=["python", "programming", "development"]
|
||||
)
|
||||
|
||||
assert capability.name == "Python Programming"
|
||||
assert capability.capability_type == CapabilityType.TECHNICAL
|
||||
assert capability.proficiency_level == 0.8
|
||||
assert capability.confidence_score == 0.9
|
||||
assert "python" in capability.keywords
|
||||
|
||||
def test_update_proficiency(self):
|
||||
capability = AgentCapability(
|
||||
name="Data Analysis",
|
||||
capability_type=CapabilityType.ANALYTICAL,
|
||||
proficiency_level=0.5,
|
||||
confidence_score=0.6
|
||||
)
|
||||
|
||||
old_updated = capability.last_updated
|
||||
capability.update_proficiency(0.7, 0.8)
|
||||
|
||||
assert capability.proficiency_level == 0.7
|
||||
assert capability.confidence_score == 0.8
|
||||
assert capability.last_updated > old_updated
|
||||
|
||||
def test_proficiency_bounds(self):
|
||||
capability = AgentCapability(
|
||||
name="Test",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
proficiency_level=0.5,
|
||||
confidence_score=0.5
|
||||
)
|
||||
|
||||
capability.update_proficiency(1.5, 1.2)
|
||||
assert capability.proficiency_level == 1.0
|
||||
assert capability.confidence_score == 1.0
|
||||
|
||||
capability.update_proficiency(-0.5, -0.2)
|
||||
assert capability.proficiency_level == 0.0
|
||||
assert capability.confidence_score == 0.0
|
||||
|
||||
|
||||
class TestResponsibilityAssignment:
|
||||
def test_create_assignment(self):
|
||||
assignment = ResponsibilityAssignment(
|
||||
agent_id="agent_1",
|
||||
task_id="task_1",
|
||||
responsibility_score=0.85,
|
||||
capability_matches=["Python Programming", "Data Analysis"],
|
||||
reasoning="Best match for technical requirements"
|
||||
)
|
||||
|
||||
assert assignment.agent_id == "agent_1"
|
||||
assert assignment.task_id == "task_1"
|
||||
assert assignment.responsibility_score == 0.85
|
||||
assert len(assignment.capability_matches) == 2
|
||||
assert assignment.success is None
|
||||
|
||||
def test_mark_completed(self):
|
||||
assignment = ResponsibilityAssignment(
|
||||
agent_id="agent_1",
|
||||
task_id="task_1",
|
||||
responsibility_score=0.85,
|
||||
reasoning="Test assignment"
|
||||
)
|
||||
|
||||
assert assignment.completed_at is None
|
||||
assert assignment.success is None
|
||||
|
||||
assignment.mark_completed(True)
|
||||
|
||||
assert assignment.completed_at is not None
|
||||
assert assignment.success is True
|
||||
|
||||
|
||||
class TestAccountabilityRecord:
|
||||
def test_create_record(self):
|
||||
record = AccountabilityRecord(
|
||||
agent_id="agent_1",
|
||||
action_type="task_execution",
|
||||
action_description="Executed data analysis task",
|
||||
task_id="task_1",
|
||||
context={"complexity": "high", "duration": 3600}
|
||||
)
|
||||
|
||||
assert record.agent_id == "agent_1"
|
||||
assert record.action_type == "task_execution"
|
||||
assert record.context["complexity"] == "high"
|
||||
assert record.outcome is None
|
||||
|
||||
def test_set_outcome(self):
|
||||
record = AccountabilityRecord(
|
||||
agent_id="agent_1",
|
||||
action_type="decision",
|
||||
action_description="Chose algorithm X"
|
||||
)
|
||||
|
||||
record.set_outcome("Algorithm performed well", True)
|
||||
|
||||
assert record.outcome == "Algorithm performed well"
|
||||
assert record.success is True
|
||||
|
||||
|
||||
class TestPerformanceMetrics:
|
||||
def test_create_metrics(self):
|
||||
metrics = PerformanceMetrics(agent_id="agent_1")
|
||||
|
||||
assert metrics.agent_id == "agent_1"
|
||||
assert metrics.total_tasks == 0
|
||||
assert metrics.success_rate == 0.0
|
||||
assert metrics.quality_score == 0.5
|
||||
|
||||
def test_update_metrics_success(self):
|
||||
metrics = PerformanceMetrics(agent_id="agent_1")
|
||||
|
||||
metrics.update_metrics(True, 1800, 0.8)
|
||||
|
||||
assert metrics.total_tasks == 1
|
||||
assert metrics.successful_tasks == 1
|
||||
assert metrics.failed_tasks == 0
|
||||
assert metrics.success_rate == 1.0
|
||||
assert metrics.average_completion_time == 1800
|
||||
assert metrics.reliability_score == 1.0
|
||||
|
||||
def test_update_metrics_failure(self):
|
||||
metrics = PerformanceMetrics(agent_id="agent_1")
|
||||
|
||||
metrics.update_metrics(False, 3600)
|
||||
|
||||
assert metrics.total_tasks == 1
|
||||
assert metrics.successful_tasks == 0
|
||||
assert metrics.failed_tasks == 1
|
||||
assert metrics.success_rate == 0.0
|
||||
|
||||
def test_update_metrics_mixed(self):
|
||||
metrics = PerformanceMetrics(agent_id="agent_1")
|
||||
|
||||
metrics.update_metrics(True, 1800, 0.8)
|
||||
metrics.update_metrics(False, 3600, 0.3)
|
||||
metrics.update_metrics(True, 2400, 0.9)
|
||||
|
||||
assert metrics.total_tasks == 3
|
||||
assert metrics.successful_tasks == 2
|
||||
assert metrics.failed_tasks == 1
|
||||
assert abs(metrics.success_rate - 2/3) < 0.001
|
||||
|
||||
|
||||
class TestTaskRequirement:
|
||||
def test_create_requirement(self):
|
||||
requirement = TaskRequirement(
|
||||
capability_name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
minimum_proficiency=0.7,
|
||||
weight=1.5,
|
||||
keywords=["python", "coding"]
|
||||
)
|
||||
|
||||
assert requirement.capability_name == "Python Programming"
|
||||
assert requirement.capability_type == CapabilityType.TECHNICAL
|
||||
assert requirement.minimum_proficiency == 0.7
|
||||
assert requirement.weight == 1.5
|
||||
assert "python" in requirement.keywords
|
||||
226
tests/responsibility/test_performance.py
Normal file
226
tests/responsibility/test_performance.py
Normal file
@@ -0,0 +1,226 @@
|
||||
"""
|
||||
Tests for performance-based capability adjustment.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from datetime import timedelta
|
||||
from unittest.mock import Mock
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.responsibility.models import AgentCapability, CapabilityType, PerformanceMetrics
|
||||
from crewai.responsibility.hierarchy import CapabilityHierarchy
|
||||
from crewai.responsibility.performance import PerformanceTracker
|
||||
|
||||
|
||||
class TestPerformanceTracker:
|
||||
@pytest.fixture
|
||||
def hierarchy(self):
|
||||
return CapabilityHierarchy()
|
||||
|
||||
@pytest.fixture
|
||||
def tracker(self, hierarchy):
|
||||
return PerformanceTracker(hierarchy)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_agent(self, hierarchy):
|
||||
agent = Mock(spec=BaseAgent)
|
||||
agent.role = "Test Agent"
|
||||
|
||||
capability = AgentCapability(
|
||||
name="Python Programming",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
proficiency_level=0.7,
|
||||
confidence_score=0.8
|
||||
)
|
||||
|
||||
hierarchy.add_agent(agent, [capability])
|
||||
return agent
|
||||
|
||||
def test_record_task_completion_success(self, tracker, mock_agent):
|
||||
tracker.record_task_completion(
|
||||
agent=mock_agent,
|
||||
task_success=True,
|
||||
completion_time=1800.0,
|
||||
quality_score=0.9
|
||||
)
|
||||
|
||||
metrics = tracker.get_performance_metrics(mock_agent)
|
||||
|
||||
assert metrics is not None
|
||||
assert metrics.total_tasks == 1
|
||||
assert metrics.successful_tasks == 1
|
||||
assert metrics.failed_tasks == 0
|
||||
assert metrics.success_rate == 1.0
|
||||
assert metrics.average_completion_time == 1800.0
|
||||
assert metrics.quality_score > 0.5 # Should be updated towards 0.9
|
||||
|
||||
def test_record_task_completion_failure(self, tracker, mock_agent):
|
||||
tracker.record_task_completion(
|
||||
agent=mock_agent,
|
||||
task_success=False,
|
||||
completion_time=3600.0,
|
||||
quality_score=0.3
|
||||
)
|
||||
|
||||
metrics = tracker.get_performance_metrics(mock_agent)
|
||||
|
||||
assert metrics is not None
|
||||
assert metrics.total_tasks == 1
|
||||
assert metrics.successful_tasks == 0
|
||||
assert metrics.failed_tasks == 1
|
||||
assert metrics.success_rate == 0.0
|
||||
|
||||
def test_multiple_task_completions(self, tracker, mock_agent):
|
||||
tracker.record_task_completion(mock_agent, True, 1800.0, 0.8)
|
||||
tracker.record_task_completion(mock_agent, False, 3600.0, 0.4)
|
||||
tracker.record_task_completion(mock_agent, True, 2400.0, 0.9)
|
||||
|
||||
metrics = tracker.get_performance_metrics(mock_agent)
|
||||
|
||||
assert metrics.total_tasks == 3
|
||||
assert metrics.successful_tasks == 2
|
||||
assert metrics.failed_tasks == 1
|
||||
assert abs(metrics.success_rate - 2/3) < 0.001
|
||||
|
||||
def test_capability_adjustment_on_success(self, tracker, mock_agent):
|
||||
initial_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
|
||||
initial_proficiency = initial_capabilities[0].proficiency_level
|
||||
|
||||
tracker.record_task_completion(
|
||||
agent=mock_agent,
|
||||
task_success=True,
|
||||
completion_time=1800.0,
|
||||
quality_score=0.9,
|
||||
capability_used="Python Programming"
|
||||
)
|
||||
|
||||
updated_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
|
||||
updated_proficiency = updated_capabilities[0].proficiency_level
|
||||
|
||||
assert updated_proficiency >= initial_proficiency
|
||||
|
||||
def test_capability_adjustment_on_failure(self, tracker, mock_agent):
|
||||
initial_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
|
||||
initial_proficiency = initial_capabilities[0].proficiency_level
|
||||
|
||||
tracker.record_task_completion(
|
||||
agent=mock_agent,
|
||||
task_success=False,
|
||||
completion_time=3600.0,
|
||||
quality_score=0.2,
|
||||
capability_used="Python Programming"
|
||||
)
|
||||
|
||||
updated_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
|
||||
updated_proficiency = updated_capabilities[0].proficiency_level
|
||||
|
||||
assert updated_proficiency <= initial_proficiency
|
||||
|
||||
def test_adjust_capabilities_based_on_performance(self, tracker, mock_agent):
|
||||
for _ in range(5):
|
||||
tracker.record_task_completion(mock_agent, True, 1800.0, 0.9)
|
||||
for _ in range(2):
|
||||
tracker.record_task_completion(mock_agent, False, 3600.0, 0.3)
|
||||
|
||||
adjustments = tracker.adjust_capabilities_based_on_performance(mock_agent)
|
||||
|
||||
assert isinstance(adjustments, list)
|
||||
|
||||
def test_get_performance_trends(self, tracker, mock_agent):
|
||||
tracker.record_task_completion(mock_agent, True, 1800.0, 0.8)
|
||||
tracker.record_task_completion(mock_agent, True, 2000.0, 0.9)
|
||||
|
||||
trends = tracker.get_performance_trends(mock_agent)
|
||||
|
||||
assert "success_rate" in trends
|
||||
assert "quality_score" in trends
|
||||
assert "efficiency_score" in trends
|
||||
assert "reliability_score" in trends
|
||||
|
||||
assert len(trends["success_rate"]) > 0
|
||||
|
||||
def test_identify_improvement_opportunities(self, tracker, mock_agent):
|
||||
tracker.record_task_completion(mock_agent, False, 7200.0, 0.3) # Long time, low quality
|
||||
tracker.record_task_completion(mock_agent, False, 6000.0, 0.4)
|
||||
tracker.record_task_completion(mock_agent, True, 5400.0, 0.5)
|
||||
|
||||
opportunities = tracker.identify_improvement_opportunities(mock_agent)
|
||||
|
||||
assert isinstance(opportunities, list)
|
||||
assert len(opportunities) > 0
|
||||
|
||||
areas = [opp["area"] for opp in opportunities]
|
||||
assert "success_rate" in areas or "quality" in areas or "efficiency" in areas
|
||||
|
||||
def test_compare_agent_performance(self, tracker, hierarchy):
|
||||
agent1 = Mock(spec=BaseAgent)
|
||||
agent1.role = "Agent 1"
|
||||
agent2 = Mock(spec=BaseAgent)
|
||||
agent2.role = "Agent 2"
|
||||
|
||||
capability = AgentCapability(
|
||||
name="Test Capability",
|
||||
capability_type=CapabilityType.TECHNICAL,
|
||||
proficiency_level=0.7,
|
||||
confidence_score=0.8
|
||||
)
|
||||
|
||||
hierarchy.add_agent(agent1, [capability])
|
||||
hierarchy.add_agent(agent2, [capability])
|
||||
|
||||
tracker.record_task_completion(agent1, True, 1800.0, 0.9) # Good performance
|
||||
tracker.record_task_completion(agent1, True, 2000.0, 0.8)
|
||||
|
||||
tracker.record_task_completion(agent2, False, 3600.0, 0.4) # Poor performance
|
||||
tracker.record_task_completion(agent2, True, 4000.0, 0.5)
|
||||
|
||||
comparison = tracker.compare_agent_performance([agent1, agent2], metric="overall")
|
||||
|
||||
assert len(comparison) == 2
|
||||
assert comparison[0][1] > comparison[1][1] # First agent should have higher score
|
||||
|
||||
success_comparison = tracker.compare_agent_performance([agent1, agent2], metric="success_rate")
|
||||
assert len(success_comparison) == 2
|
||||
|
||||
def test_learning_rate_effect(self, tracker, mock_agent):
|
||||
original_learning_rate = tracker.learning_rate
|
||||
|
||||
tracker.learning_rate = 0.5
|
||||
|
||||
initial_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
|
||||
initial_proficiency = initial_capabilities[0].proficiency_level
|
||||
|
||||
tracker.record_task_completion(
|
||||
mock_agent, True, 1800.0, 0.9, capability_used="Python Programming"
|
||||
)
|
||||
|
||||
high_lr_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
|
||||
high_lr_proficiency = high_lr_capabilities[0].proficiency_level
|
||||
|
||||
tracker.hierarchy.update_agent_capability(
|
||||
mock_agent, "Python Programming", initial_proficiency, 0.8
|
||||
)
|
||||
tracker.learning_rate = 0.01
|
||||
|
||||
tracker.record_task_completion(
|
||||
mock_agent, True, 1800.0, 0.9, capability_used="Python Programming"
|
||||
)
|
||||
|
||||
low_lr_capabilities = tracker.hierarchy.get_agent_capabilities(mock_agent)
|
||||
low_lr_proficiency = low_lr_capabilities[0].proficiency_level
|
||||
|
||||
high_lr_change = abs(high_lr_proficiency - initial_proficiency)
|
||||
low_lr_change = abs(low_lr_proficiency - initial_proficiency)
|
||||
|
||||
assert high_lr_change > low_lr_change
|
||||
|
||||
tracker.learning_rate = original_learning_rate
|
||||
|
||||
def test_performance_metrics_creation(self, tracker, mock_agent):
|
||||
assert tracker.get_performance_metrics(mock_agent) is None
|
||||
|
||||
tracker.record_task_completion(mock_agent, True, 1800.0)
|
||||
|
||||
metrics = tracker.get_performance_metrics(mock_agent)
|
||||
assert metrics is not None
|
||||
assert metrics.agent_id == tracker._get_agent_id(mock_agent)
|
||||
Reference in New Issue
Block a user