mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 16:18:30 +00:00
* fix: async tasks calls * fix: some issue along with some type check errors * fix: some issue along with some type check errors * fix: async test
619 lines
24 KiB
Python
619 lines
24 KiB
Python
import asyncio
|
|
import json
|
|
import uuid
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
from langchain_core.callbacks import BaseCallbackHandler
|
|
from pydantic import (
|
|
UUID4,
|
|
BaseModel,
|
|
ConfigDict,
|
|
Field,
|
|
InstanceOf,
|
|
Json,
|
|
PrivateAttr,
|
|
field_validator,
|
|
model_validator,
|
|
)
|
|
from pydantic_core import PydanticCustomError
|
|
|
|
from crewai.agent import Agent
|
|
from crewai.agents.agent_builder.base_agent import BaseAgent
|
|
from crewai.agents.cache import CacheHandler
|
|
from crewai.memory.entity.entity_memory import EntityMemory
|
|
from crewai.memory.long_term.long_term_memory import LongTermMemory
|
|
from crewai.memory.short_term.short_term_memory import ShortTermMemory
|
|
from crewai.process import Process
|
|
from crewai.task import Task
|
|
from crewai.telemetry import Telemetry
|
|
from crewai.tools.agent_tools import AgentTools
|
|
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
|
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
|
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
|
from crewai.utilities.training_handler import CrewTrainingHandler
|
|
|
|
try:
|
|
import agentops
|
|
except ImportError:
|
|
agentops = None
|
|
|
|
|
|
class Crew(BaseModel):
|
|
"""
|
|
Represents a group of agents, defining how they should collaborate and the tasks they should perform.
|
|
|
|
Attributes:
|
|
tasks: List of tasks assigned to the crew.
|
|
agents: List of agents part of this crew.
|
|
manager_llm: The language model that will run manager agent.
|
|
manager_agent: Custom agent that will be used as manager.
|
|
memory: Whether the crew should use memory to store memories of it's execution.
|
|
manager_callbacks: The callback handlers to be executed by the manager agent when hierarchical process is used
|
|
cache: Whether the crew should use a cache to store the results of the tools execution.
|
|
function_calling_llm: The language model that will run the tool calling for all the agents.
|
|
process: The process flow that the crew will follow (e.g., sequential, hierarchical).
|
|
verbose: Indicates the verbosity level for logging during execution.
|
|
config: Configuration settings for the crew.
|
|
max_rpm: Maximum number of requests per minute for the crew execution to be respected.
|
|
prompt_file: Path to the prompt json file to be used for the crew.
|
|
id: A unique identifier for the crew instance.
|
|
full_output: Whether the crew should return the full output with all tasks outputs and token usage metrics or just the final output.
|
|
task_callback: Callback to be executed after each task for every agents execution.
|
|
step_callback: Callback to be executed after each step for every agents execution.
|
|
share_crew: Whether you want to share the complete crew information and execution with crewAI to make the library better, and allow us to train models.
|
|
"""
|
|
|
|
__hash__ = object.__hash__ # type: ignore
|
|
_execution_span: Any = PrivateAttr()
|
|
_rpm_controller: RPMController = PrivateAttr()
|
|
_logger: Logger = PrivateAttr()
|
|
_file_handler: FileHandler = PrivateAttr()
|
|
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
|
|
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
|
|
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
|
|
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
|
|
_train: Optional[bool] = PrivateAttr(default=False)
|
|
_train_iteration: Optional[int] = PrivateAttr()
|
|
|
|
cache: bool = Field(default=True)
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
tasks: List[Task] = Field(default_factory=list)
|
|
agents: List[BaseAgent] = Field(default_factory=list)
|
|
process: Process = Field(default=Process.sequential)
|
|
verbose: Union[int, bool] = Field(default=0)
|
|
memory: bool = Field(
|
|
default=False,
|
|
description="Whether the crew should use memory to store memories of it's execution",
|
|
)
|
|
embedder: Optional[dict] = Field(
|
|
default={"provider": "openai"},
|
|
description="Configuration for the embedder to be used for the crew.",
|
|
)
|
|
usage_metrics: Optional[dict] = Field(
|
|
default=None,
|
|
description="Metrics for the LLM usage during all tasks execution.",
|
|
)
|
|
full_output: Optional[bool] = Field(
|
|
default=False,
|
|
description="Whether the crew should return the full output with all tasks outputs and token usage metrics or just the final output.",
|
|
)
|
|
manager_llm: Optional[Any] = Field(
|
|
description="Language model that will run the agent.", default=None
|
|
)
|
|
manager_agent: Optional[BaseAgent] = Field(
|
|
description="Custom agent that will be used as manager.", default=None
|
|
)
|
|
manager_callbacks: Optional[List[InstanceOf[BaseCallbackHandler]]] = Field(
|
|
default=None,
|
|
description="A list of callback handlers to be executed by the manager agent when hierarchical process is used",
|
|
)
|
|
function_calling_llm: Optional[Any] = Field(
|
|
description="Language model that will run the agent.", default=None
|
|
)
|
|
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None)
|
|
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
|
|
share_crew: Optional[bool] = Field(default=False)
|
|
step_callback: Optional[Any] = Field(
|
|
default=None,
|
|
description="Callback to be executed after each step for all agents execution.",
|
|
)
|
|
task_callback: Optional[Any] = Field(
|
|
default=None,
|
|
description="Callback to be executed after each task for all agents execution.",
|
|
)
|
|
max_rpm: Optional[int] = Field(
|
|
default=None,
|
|
description="Maximum number of requests per minute for the crew execution to be respected.",
|
|
)
|
|
prompt_file: str = Field(
|
|
default=None,
|
|
description="Path to the prompt json file to be used for the crew.",
|
|
)
|
|
output_log_file: Optional[Union[bool, str]] = Field(
|
|
default=False,
|
|
description="output_log_file",
|
|
)
|
|
|
|
@field_validator("id", mode="before")
|
|
@classmethod
|
|
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
|
|
"""Prevent manual setting of the 'id' field by users."""
|
|
if v:
|
|
raise PydanticCustomError(
|
|
"may_not_set_field", "The 'id' field cannot be set by the user.", {}
|
|
)
|
|
|
|
@field_validator("config", mode="before")
|
|
@classmethod
|
|
def check_config_type(
|
|
cls, v: Union[Json, Dict[str, Any]]
|
|
) -> Union[Json, Dict[str, Any]]:
|
|
"""Validates that the config is a valid type.
|
|
Args:
|
|
v: The config to be validated.
|
|
Returns:
|
|
The config if it is valid.
|
|
"""
|
|
|
|
# TODO: Improve typing
|
|
return json.loads(v) if isinstance(v, Json) else v # type: ignore
|
|
|
|
@model_validator(mode="after")
|
|
def set_private_attrs(self) -> "Crew":
|
|
"""Set private attributes."""
|
|
self._cache_handler = CacheHandler()
|
|
self._logger = Logger(self.verbose)
|
|
if self.output_log_file:
|
|
self._file_handler = FileHandler(self.output_log_file)
|
|
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
|
|
self._telemetry = Telemetry()
|
|
self._telemetry.set_tracer()
|
|
self._telemetry.crew_creation(self)
|
|
return self
|
|
|
|
@model_validator(mode="after")
|
|
def create_crew_memory(self) -> "Crew":
|
|
"""Set private attributes."""
|
|
if self.memory:
|
|
self._long_term_memory = LongTermMemory()
|
|
self._short_term_memory = ShortTermMemory(
|
|
crew=self, embedder_config=self.embedder
|
|
)
|
|
self._entity_memory = EntityMemory(crew=self, embedder_config=self.embedder)
|
|
return self
|
|
|
|
@model_validator(mode="after")
|
|
def check_manager_llm(self):
|
|
"""Validates that the language model is set when using hierarchical process."""
|
|
if self.process == Process.hierarchical:
|
|
if not self.manager_llm and not self.manager_agent:
|
|
raise PydanticCustomError(
|
|
"missing_manager_llm_or_manager_agent",
|
|
"Attribute `manager_llm` or `manager_agent` is required when using hierarchical process.",
|
|
{},
|
|
)
|
|
|
|
if (self.manager_agent is not None) and (
|
|
self.agents.count(self.manager_agent) > 0
|
|
):
|
|
raise PydanticCustomError(
|
|
"manager_agent_in_agents",
|
|
"Manager agent should not be included in agents list.",
|
|
{},
|
|
)
|
|
|
|
return self
|
|
|
|
@model_validator(mode="after")
|
|
def check_config(self):
|
|
"""Validates that the crew is properly configured with agents and tasks."""
|
|
if not self.config and not self.tasks and not self.agents:
|
|
raise PydanticCustomError(
|
|
"missing_keys",
|
|
"Either 'agents' and 'tasks' need to be set or 'config'.",
|
|
{},
|
|
)
|
|
|
|
if self.config:
|
|
self._setup_from_config()
|
|
|
|
if self.agents:
|
|
for agent in self.agents:
|
|
if self.cache:
|
|
agent.set_cache_handler(self._cache_handler)
|
|
if self.max_rpm:
|
|
agent.set_rpm_controller(self._rpm_controller)
|
|
return self
|
|
|
|
@model_validator(mode="after")
|
|
def validate_tasks(self):
|
|
if self.process == Process.sequential:
|
|
for task in self.tasks:
|
|
if task.agent is None:
|
|
raise PydanticCustomError(
|
|
"missing_agent_in_task",
|
|
f"Sequential process error: Agent is missing in the task with the following description: {task.description}", # type: ignore # Argument of type "str" cannot be assigned to parameter "message_template" of type "LiteralString"
|
|
{},
|
|
)
|
|
|
|
return self
|
|
|
|
@model_validator(mode="after")
|
|
def check_tasks_in_hierarchical_process_not_async(self):
|
|
"""Validates that the tasks in hierarchical process are not flagged with async_execution."""
|
|
if self.process == Process.hierarchical:
|
|
for task in self.tasks:
|
|
if task.async_execution:
|
|
raise PydanticCustomError(
|
|
"async_execution_in_hierarchical_process",
|
|
"Hierarchical process error: Tasks cannot be flagged with async_execution.",
|
|
{},
|
|
)
|
|
|
|
return self
|
|
|
|
def _setup_from_config(self):
|
|
assert self.config is not None, "Config should not be None."
|
|
|
|
"""Initializes agents and tasks from the provided config."""
|
|
if not self.config.get("agents") or not self.config.get("tasks"):
|
|
raise PydanticCustomError(
|
|
"missing_keys_in_config", "Config should have 'agents' and 'tasks'.", {}
|
|
)
|
|
|
|
self.process = self.config.get("process", self.process)
|
|
self.agents = [Agent(**agent) for agent in self.config["agents"]]
|
|
self.tasks = [self._create_task(task) for task in self.config["tasks"]]
|
|
|
|
def _create_task(self, task_config: Dict[str, Any]) -> Task:
|
|
"""Creates a task instance from its configuration.
|
|
|
|
Args:
|
|
task_config: The configuration of the task.
|
|
|
|
Returns:
|
|
A task instance.
|
|
"""
|
|
task_agent = next(
|
|
agt for agt in self.agents if agt.role == task_config["agent"]
|
|
)
|
|
del task_config["agent"]
|
|
return Task(**task_config, agent=task_agent)
|
|
|
|
def _setup_for_training(self) -> None:
|
|
"""Sets up the crew for training."""
|
|
self._train = True
|
|
|
|
for task in self.tasks:
|
|
task.human_input = True
|
|
|
|
for agent in self.agents:
|
|
agent.allow_delegation = False
|
|
|
|
CrewTrainingHandler(TRAINING_DATA_FILE).initialize_file()
|
|
CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).initialize_file()
|
|
|
|
def train(self, n_iterations: int, inputs: Optional[Dict[str, Any]] = {}) -> None:
|
|
"""Trains the crew for a given number of iterations."""
|
|
self._setup_for_training()
|
|
|
|
for n_iteration in range(n_iterations):
|
|
self._train_iteration = n_iteration
|
|
self.kickoff(inputs=inputs)
|
|
|
|
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
|
|
|
|
for agent in self.agents:
|
|
result = TaskEvaluator(agent).evaluate_training_data(
|
|
training_data=training_data, agent_id=str(agent.id)
|
|
)
|
|
|
|
CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).save_trained_data(
|
|
agent_id=str(agent.role), trained_data=result.model_dump()
|
|
)
|
|
|
|
def kickoff(
|
|
self,
|
|
inputs: Optional[Dict[str, Any]] = {},
|
|
) -> Union[str, Dict[str, Any]]:
|
|
"""Starts the crew to work on its assigned tasks."""
|
|
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
|
|
|
self._interpolate_inputs(inputs) # type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
|
|
self._set_tasks_callbacks()
|
|
|
|
i18n = I18N(prompt_file=self.prompt_file)
|
|
|
|
for agent in self.agents:
|
|
agent.i18n = i18n
|
|
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
|
|
agent.crew = self # type: ignore[attr-defined]
|
|
# TODO: Create an AgentFunctionCalling protocol for future refactoring
|
|
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
|
|
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
|
|
|
|
if agent.allow_code_execution: # type: ignore # BaseAgent" has no attribute "allow_code_execution"
|
|
agent.tools += agent.get_code_execution_tools() # type: ignore # "BaseAgent" has no attribute "get_code_execution_tools"; maybe "get_delegation_tools"?
|
|
|
|
if not agent.step_callback: # type: ignore # "BaseAgent" has no attribute "step_callback"
|
|
agent.step_callback = self.step_callback # type: ignore # "BaseAgent" has no attribute "step_callback"
|
|
|
|
agent.create_agent_executor()
|
|
|
|
metrics = []
|
|
|
|
if self.process == Process.sequential:
|
|
result = self._run_sequential_process()
|
|
elif self.process == Process.hierarchical:
|
|
result, manager_metrics = self._run_hierarchical_process() # type: ignore # Incompatible types in assignment (expression has type "str | dict[str, Any]", variable has type "str")
|
|
metrics.append(manager_metrics)
|
|
else:
|
|
raise NotImplementedError(
|
|
f"The process '{self.process}' is not implemented yet."
|
|
)
|
|
metrics += [agent._token_process.get_summary() for agent in self.agents]
|
|
|
|
self.usage_metrics = {
|
|
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0]
|
|
}
|
|
|
|
return result
|
|
|
|
def kickoff_for_each(
|
|
self, inputs: List[Dict[str, Any]]
|
|
) -> List[Union[str, Dict[str, Any]]]:
|
|
"""Executes the Crew's workflow for each input in the list and aggregates results."""
|
|
results = []
|
|
|
|
# Initialize the parent crew's usage metrics
|
|
total_usage_metrics = {
|
|
"total_tokens": 0,
|
|
"prompt_tokens": 0,
|
|
"completion_tokens": 0,
|
|
"successful_requests": 0,
|
|
}
|
|
|
|
for input_data in inputs:
|
|
crew = self.copy()
|
|
|
|
output = crew.kickoff(inputs=input_data)
|
|
|
|
if crew.usage_metrics:
|
|
for key in total_usage_metrics:
|
|
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
|
|
|
|
results.append(output)
|
|
|
|
self.usage_metrics = total_usage_metrics
|
|
return results
|
|
|
|
async def kickoff_async(
|
|
self, inputs: Optional[Dict[str, Any]] = {}
|
|
) -> Union[str, Dict]:
|
|
"""Asynchronous kickoff method to start the crew execution."""
|
|
return await asyncio.to_thread(self.kickoff, inputs)
|
|
|
|
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]:
|
|
crew_copies = [self.copy() for _ in inputs]
|
|
|
|
async def run_crew(crew, input_data):
|
|
return await crew.kickoff_async(inputs=input_data)
|
|
|
|
tasks = [
|
|
asyncio.create_task(run_crew(crew_copies[i], inputs[i]))
|
|
for i in range(len(inputs))
|
|
]
|
|
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
total_usage_metrics = {
|
|
"total_tokens": 0,
|
|
"prompt_tokens": 0,
|
|
"completion_tokens": 0,
|
|
"successful_requests": 0,
|
|
}
|
|
for crew in crew_copies:
|
|
if crew.usage_metrics:
|
|
for key in total_usage_metrics:
|
|
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
|
|
|
|
self.usage_metrics = total_usage_metrics
|
|
|
|
return results
|
|
|
|
def _run_sequential_process(self) -> Union[str, Dict[str, Any]]:
|
|
"""Executes tasks sequentially and returns the final output."""
|
|
task_output = None
|
|
|
|
for task in self.tasks:
|
|
if task.agent and task.agent.allow_delegation:
|
|
agents_for_delegation = [
|
|
agent for agent in self.agents if agent != task.agent
|
|
]
|
|
if len(self.agents) > 1 and len(agents_for_delegation) > 0:
|
|
task.tools += task.agent.get_delegation_tools(agents_for_delegation)
|
|
|
|
role = task.agent.role if task.agent is not None else "None"
|
|
self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple")
|
|
self._logger.log(
|
|
"info", f"== Starting Task: {task.description}", color="bold_purple"
|
|
)
|
|
|
|
if self.output_log_file:
|
|
self._file_handler.log(
|
|
agent=role, task=task.description, status="started"
|
|
)
|
|
output = task.execute(context=task_output)
|
|
|
|
if not task.async_execution:
|
|
task_output = output
|
|
|
|
role = task.agent.role if task.agent is not None else "None"
|
|
self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n")
|
|
|
|
if self.output_log_file:
|
|
self._file_handler.log(agent=role, task=task_output, status="completed")
|
|
|
|
self._finish_execution(task_output)
|
|
|
|
token_usage = self.calculate_usage_metrics()
|
|
|
|
return self._format_output(task_output if task_output else "", token_usage)
|
|
|
|
def _run_hierarchical_process(
|
|
self,
|
|
) -> Tuple[Union[str, Dict[str, Any]], Dict[str, Any]]:
|
|
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
|
|
|
|
i18n = I18N(prompt_file=self.prompt_file)
|
|
if self.manager_agent is not None:
|
|
self.manager_agent.allow_delegation = True
|
|
manager = self.manager_agent
|
|
if manager.tools is not None and len(manager.tools) > 0:
|
|
raise Exception("Manager agent should not have tools")
|
|
manager.tools = self.manager_agent.get_delegation_tools(self.agents)
|
|
else:
|
|
manager = Agent(
|
|
role=i18n.retrieve("hierarchical_manager_agent", "role"),
|
|
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
|
|
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
|
|
tools=AgentTools(agents=self.agents).tools(),
|
|
llm=self.manager_llm,
|
|
verbose=self.verbose,
|
|
)
|
|
self.manager_agent = manager
|
|
|
|
task_output = None
|
|
|
|
for task in self.tasks:
|
|
self._logger.log("debug", f"Working Agent: {manager.role}")
|
|
self._logger.log("info", f"Starting Task: {task.description}")
|
|
|
|
if self.output_log_file:
|
|
self._file_handler.log(
|
|
agent=manager.role, task=task.description, status="started"
|
|
)
|
|
|
|
if task.agent:
|
|
manager.tools = task.agent.get_delegation_tools([task.agent])
|
|
else:
|
|
manager.tools = manager.get_delegation_tools(self.agents)
|
|
task_output = task.execute(
|
|
agent=manager, context=task_output, tools=manager.tools
|
|
)
|
|
|
|
self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")
|
|
if self.output_log_file:
|
|
self._file_handler.log(
|
|
agent=manager.role, task=task_output, status="completed"
|
|
)
|
|
|
|
self._finish_execution(task_output)
|
|
|
|
token_usage = self.calculate_usage_metrics()
|
|
|
|
return self._format_output(
|
|
task_output if task_output else "", token_usage
|
|
), token_usage
|
|
|
|
def copy(self):
|
|
"""Create a deep copy of the Crew."""
|
|
|
|
exclude = {
|
|
"id",
|
|
"_rpm_controller",
|
|
"_logger",
|
|
"_execution_span",
|
|
"_file_handler",
|
|
"_cache_handler",
|
|
"_short_term_memory",
|
|
"_long_term_memory",
|
|
"_entity_memory",
|
|
"_telemetry",
|
|
"agents",
|
|
"tasks",
|
|
}
|
|
|
|
cloned_agents = [agent.copy() for agent in self.agents]
|
|
cloned_tasks = [task.copy(cloned_agents) for task in self.tasks]
|
|
|
|
copied_data = self.model_dump(exclude=exclude)
|
|
copied_data = {k: v for k, v in copied_data.items() if v is not None}
|
|
|
|
copied_data.pop("agents", None)
|
|
copied_data.pop("tasks", None)
|
|
|
|
copied_crew = Crew(**copied_data, agents=cloned_agents, tasks=cloned_tasks)
|
|
|
|
return copied_crew
|
|
|
|
def _set_tasks_callbacks(self) -> None:
|
|
"""Sets callback for every task suing task_callback"""
|
|
for task in self.tasks:
|
|
if not task.callback:
|
|
task.callback = self.task_callback
|
|
|
|
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
|
|
"""Interpolates the inputs in the tasks and agents."""
|
|
[
|
|
task.interpolate_inputs(
|
|
# type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
|
|
inputs
|
|
)
|
|
for task in self.tasks
|
|
]
|
|
# type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
|
|
for agent in self.agents:
|
|
agent.interpolate_inputs(inputs)
|
|
|
|
def _format_output(
|
|
self, output: str, token_usage: Optional[Dict[str, Any]] = None
|
|
) -> Union[str, Dict[str, Any]]:
|
|
"""
|
|
Formats the output of the crew execution.
|
|
If full_output is True, then returned data type will be a dictionary else returned outputs are string
|
|
"""
|
|
|
|
if self.full_output:
|
|
return {
|
|
"final_output": output,
|
|
"tasks_outputs": [task.output for task in self.tasks if task],
|
|
"usage_metrics": token_usage,
|
|
}
|
|
else:
|
|
return output
|
|
|
|
def _finish_execution(self, output) -> None:
|
|
if self.max_rpm:
|
|
self._rpm_controller.stop_rpm_counter()
|
|
if agentops:
|
|
agentops.end_session(
|
|
end_state="Success", end_state_reason="Finished Execution"
|
|
)
|
|
self._telemetry.end_crew(self, output)
|
|
|
|
def calculate_usage_metrics(self) -> Dict[str, int]:
|
|
"""Calculates and returns the usage metrics."""
|
|
total_usage_metrics = {
|
|
"total_tokens": 0,
|
|
"prompt_tokens": 0,
|
|
"completion_tokens": 0,
|
|
"successful_requests": 0,
|
|
}
|
|
|
|
for agent in self.agents:
|
|
if hasattr(agent, "_token_process"):
|
|
token_sum = agent._token_process.get_summary()
|
|
for key in total_usage_metrics:
|
|
total_usage_metrics[key] += token_sum.get(key, 0)
|
|
|
|
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
|
|
token_sum = self.manager_agent._token_process.get_summary()
|
|
for key in total_usage_metrics:
|
|
total_usage_metrics[key] += token_sum.get(key, 0)
|
|
|
|
return total_usage_metrics
|
|
|
|
def __repr__(self):
|
|
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
|