mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-03 16:22:49 +00:00
Merge in main to bugfix/kickoff-for-each-usage-metrics
This commit is contained in:
@@ -5,19 +5,20 @@ from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from langchain_core.callbacks import BaseCallbackHandler
|
||||
from pydantic import (
|
||||
UUID4,
|
||||
BaseModel,
|
||||
ConfigDict,
|
||||
Field,
|
||||
InstanceOf,
|
||||
Json,
|
||||
PrivateAttr,
|
||||
field_validator,
|
||||
model_validator,
|
||||
UUID4,
|
||||
BaseModel,
|
||||
ConfigDict,
|
||||
Field,
|
||||
InstanceOf,
|
||||
Json,
|
||||
PrivateAttr,
|
||||
field_validator,
|
||||
model_validator,
|
||||
)
|
||||
from pydantic_core import PydanticCustomError
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.agents.cache import CacheHandler
|
||||
from crewai.memory.entity.entity_memory import EntityMemory
|
||||
from crewai.memory.long_term.long_term_memory import LongTermMemory
|
||||
@@ -27,6 +28,8 @@ from crewai.task import Task
|
||||
from crewai.telemetry import Telemetry
|
||||
from crewai.tools.agent_tools import AgentTools
|
||||
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
||||
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
||||
from crewai.utilities.training_handler import CrewTrainingHandler
|
||||
|
||||
|
||||
class Crew(BaseModel):
|
||||
@@ -63,11 +66,13 @@ class Crew(BaseModel):
|
||||
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
|
||||
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
|
||||
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
|
||||
_train: Optional[bool] = PrivateAttr(default=False)
|
||||
_train_iteration: Optional[int] = PrivateAttr()
|
||||
|
||||
cache: bool = Field(default=False)
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
tasks: List[Task] = Field(default_factory=list)
|
||||
agents: List[Agent] = Field(default_factory=list)
|
||||
agents: List[BaseAgent] = Field(default_factory=list)
|
||||
process: Process = Field(default=Process.sequential)
|
||||
verbose: Union[int, bool] = Field(default=0)
|
||||
memory: bool = Field(
|
||||
@@ -89,7 +94,7 @@ class Crew(BaseModel):
|
||||
manager_llm: Optional[Any] = Field(
|
||||
description="Language model that will run the agent.", default=None
|
||||
)
|
||||
manager_agent: Optional[Any] = Field(
|
||||
manager_agent: Optional[BaseAgent] = Field(
|
||||
description="Custom agent that will be used as manager.", default=None
|
||||
)
|
||||
manager_callbacks: Optional[List[InstanceOf[BaseCallbackHandler]]] = Field(
|
||||
@@ -242,17 +247,41 @@ class Crew(BaseModel):
|
||||
del task_config["agent"]
|
||||
return Task(**task_config, agent=task_agent)
|
||||
|
||||
def _setup_for_training(self) -> None:
|
||||
"""Sets up the crew for training."""
|
||||
self._train = True
|
||||
|
||||
for task in self.tasks:
|
||||
task.human_input = True
|
||||
|
||||
for agent in self.agents:
|
||||
agent.allow_delegation = False
|
||||
|
||||
def train(self, n_iterations: int, inputs: Optional[Dict[str, Any]] = {}) -> None:
|
||||
"""Trains the crew for a given number of iterations."""
|
||||
self._setup_for_training()
|
||||
|
||||
for n_iteration in range(n_iterations):
|
||||
self._train_iteration = n_iteration
|
||||
self.kickoff(inputs=inputs)
|
||||
|
||||
training_data = CrewTrainingHandler("training_data.pkl").load()
|
||||
|
||||
for agent in self.agents:
|
||||
result = TaskEvaluator(agent).evaluate_training_data(
|
||||
training_data=training_data, agent_id=str(agent.id)
|
||||
)
|
||||
|
||||
CrewTrainingHandler("trained_agents_data.pkl").save_trained_data(
|
||||
agent_id=str(agent.role), trained_data=result.model_dump()
|
||||
)
|
||||
|
||||
def kickoff(
|
||||
self,
|
||||
inputs: Optional[Dict[str, Any]] = {},
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""Starts the crew to work on its assigned tasks."""
|
||||
print(f"CREW ID {self.id} - KICKING OFF CREW")
|
||||
print(
|
||||
f"CREW ID {self.id} - callbacks",
|
||||
[agent.llm.callbacks for agent in self.agents],
|
||||
)
|
||||
self._execution_span = self._telemetry.crew_execution_span(self)
|
||||
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
||||
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
|
||||
self._interpolate_inputs(inputs)
|
||||
self._set_tasks_callbacks()
|
||||
@@ -260,12 +289,21 @@ class Crew(BaseModel):
|
||||
i18n = I18N(prompt_file=self.prompt_file)
|
||||
|
||||
for agent in self.agents:
|
||||
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
|
||||
agent.i18n = i18n
|
||||
agent.crew = self
|
||||
|
||||
if not agent.function_calling_llm:
|
||||
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
|
||||
agent.crew = self # type: ignore[attr-defined]
|
||||
# TODO: Create an AgentFunctionCalling protocol for future refactoring
|
||||
if (
|
||||
hasattr(agent, "function_calling_llm")
|
||||
and not agent.function_calling_llm
|
||||
):
|
||||
agent.function_calling_llm = self.function_calling_llm
|
||||
if not agent.step_callback:
|
||||
|
||||
if hasattr(agent, "allow_code_execution") and agent.allow_code_execution:
|
||||
agent.tools += agent.get_code_execution_tools()
|
||||
|
||||
if hasattr(agent, "step_callback") and not agent.step_callback:
|
||||
agent.step_callback = self.step_callback
|
||||
|
||||
agent.create_agent_executor()
|
||||
@@ -283,10 +321,10 @@ class Crew(BaseModel):
|
||||
raise NotImplementedError(
|
||||
f"The process '{self.process}' is not implemented yet."
|
||||
)
|
||||
|
||||
metrics = metrics + [
|
||||
agent._token_process.get_summary() for agent in self.agents
|
||||
]
|
||||
|
||||
self.usage_metrics = {
|
||||
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0]
|
||||
}
|
||||
@@ -327,7 +365,6 @@ class Crew(BaseModel):
|
||||
"""Asynchronous kickoff method to start the crew execution."""
|
||||
return await asyncio.to_thread(self.kickoff, inputs)
|
||||
|
||||
# TODO: IF THERE ARE MULTIPLE INPUTS, THE USAGE METRICS FOR FIRST ONE COMES BACK AS 0.
|
||||
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]:
|
||||
crew_copies = [self.copy() for _ in inputs]
|
||||
|
||||
@@ -356,28 +393,17 @@ class Crew(BaseModel):
|
||||
|
||||
return results
|
||||
|
||||
def train(self, n_iterations: int) -> None:
|
||||
# TODO: Implement training
|
||||
pass
|
||||
|
||||
def _run_sequential_process(self) -> Union[str, Dict[str, Any]]:
|
||||
def _run_sequential_process(self) -> str:
|
||||
"""Executes tasks sequentially and returns the final output."""
|
||||
task_output = ""
|
||||
total_token_usage = {
|
||||
"total_tokens": 0,
|
||||
"prompt_tokens": 0,
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
}
|
||||
|
||||
for task in self.tasks:
|
||||
print("TASK DESCRIPTION", task.description)
|
||||
if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation"
|
||||
agents_for_delegation = [
|
||||
agent for agent in self.agents if agent != task.agent
|
||||
]
|
||||
if len(self.agents) > 1 and len(agents_for_delegation) > 0:
|
||||
task.tools += AgentTools(agents=agents_for_delegation).tools()
|
||||
task.tools += task.agent.get_delegation_tools(agents_for_delegation)
|
||||
|
||||
role = task.agent.role if task.agent is not None else "None"
|
||||
self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple")
|
||||
@@ -389,7 +415,6 @@ class Crew(BaseModel):
|
||||
self._file_handler.log(
|
||||
agent=role, task=task.description, status="started"
|
||||
)
|
||||
|
||||
output = task.execute(context=task_output)
|
||||
|
||||
if not task.async_execution:
|
||||
@@ -401,18 +426,12 @@ class Crew(BaseModel):
|
||||
if self.output_log_file:
|
||||
self._file_handler.log(agent=role, task=task_output, status="completed")
|
||||
|
||||
for agent in self.agents:
|
||||
print("INSPECTING AGENT", agent.role)
|
||||
agent_token_usage = agent._token_process.get_summary()
|
||||
print("AGENT TOKEN USAGE", agent_token_usage)
|
||||
for key in total_token_usage:
|
||||
total_token_usage[key] += agent_token_usage.get(key, 0)
|
||||
|
||||
self._finish_execution(task_output)
|
||||
# type: ignore # Item "None" of "Agent | None" has no attribute "_token_process")
|
||||
|
||||
token_usage = self._calculate_usage_metrics()
|
||||
|
||||
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
|
||||
return self._format_output(task_output, total_token_usage)
|
||||
return self._format_output(task_output, token_usage)
|
||||
|
||||
def _run_hierarchical_process(self) -> Union[str, Dict[str, Any]]:
|
||||
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
|
||||
@@ -423,7 +442,7 @@ class Crew(BaseModel):
|
||||
manager = self.manager_agent
|
||||
if len(manager.tools) > 0:
|
||||
raise Exception("Manager agent should not have tools")
|
||||
manager.tools = AgentTools(agents=self.agents).tools()
|
||||
manager.tools = self.manager_agent.get_delegation_tools(self.agents)
|
||||
else:
|
||||
manager = Agent(
|
||||
role=i18n.retrieve("hierarchical_manager_agent", "role"),
|
||||
@@ -431,10 +450,12 @@ class Crew(BaseModel):
|
||||
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
|
||||
tools=AgentTools(agents=self.agents).tools(),
|
||||
llm=self.manager_llm,
|
||||
verbose=True,
|
||||
verbose=self.verbose,
|
||||
)
|
||||
self.manager_agent = manager
|
||||
|
||||
task_output = ""
|
||||
|
||||
for task in self.tasks:
|
||||
self._logger.log("debug", f"Working Agent: {manager.role}")
|
||||
self._logger.log("info", f"Starting Task: {task.description}")
|
||||
@@ -449,19 +470,19 @@ class Crew(BaseModel):
|
||||
)
|
||||
|
||||
self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")
|
||||
|
||||
if self.output_log_file:
|
||||
self._file_handler.log(
|
||||
agent=manager.role, task=task_output, status="completed"
|
||||
)
|
||||
|
||||
# TODO: GET TOKENS USAGE CALCULATED INCLUDING MANAGER
|
||||
self._finish_execution(task_output)
|
||||
|
||||
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
|
||||
manager_token_usage = manager._token_process.get_summary()
|
||||
token_usage = self._calculate_usage_metrics()
|
||||
|
||||
return (
|
||||
self._format_output(task_output, manager_token_usage),
|
||||
manager_token_usage,
|
||||
self._format_output(task_output, token_usage),
|
||||
token_usage,
|
||||
)
|
||||
|
||||
def copy(self):
|
||||
@@ -511,17 +532,17 @@ class Crew(BaseModel):
|
||||
for task in self.tasks
|
||||
]
|
||||
# type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
|
||||
[agent.interpolate_inputs(inputs) for agent in self.agents]
|
||||
for agent in self.agents:
|
||||
agent.interpolate_inputs(inputs)
|
||||
|
||||
def _format_output(
|
||||
self, output: str, token_usage: Optional[Dict[str, Any]]
|
||||
self, output: str, token_usage: Optional[Dict[str, Any]] = None
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""
|
||||
Formats the output of the crew execution.
|
||||
If full_output is True, then returned data type will be a dictionary else returned outputs are string
|
||||
"""
|
||||
|
||||
print("token_usage passed to _format_output", token_usage)
|
||||
if self.full_output:
|
||||
return { # type: ignore # Incompatible return value type (got "dict[str, Sequence[str | TaskOutput | None]]", expected "str")
|
||||
"final_output": output,
|
||||
@@ -536,5 +557,38 @@ class Crew(BaseModel):
|
||||
self._rpm_controller.stop_rpm_counter()
|
||||
self._telemetry.end_crew(self, output)
|
||||
|
||||
def _calculate_usage_metrics(
|
||||
self,
|
||||
) -> Dict[str, int]:
|
||||
total_usage_metrics = {
|
||||
"total_tokens": 0,
|
||||
"prompt_tokens": 0,
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
}
|
||||
|
||||
for agent in self.agents:
|
||||
if hasattr(agent, "_token_process"):
|
||||
token_sum = agent._token_process.get_summary()
|
||||
total_usage_metrics = {
|
||||
key: total_usage_metrics[key] + token_sum[key]
|
||||
for key in total_usage_metrics
|
||||
}
|
||||
|
||||
if self.manager_agent:
|
||||
token_sum = self.manager_agent._token_process.get_summary()
|
||||
total_usage_metrics = {
|
||||
key: total_usage_metrics[key] + token_sum[key]
|
||||
for key in total_usage_metrics
|
||||
}
|
||||
|
||||
return total_usage_metrics
|
||||
|
||||
def __repr__(self):
|
||||
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
|
||||
|
||||
def aggregate_token_usage(self, token_usage_list: List[Dict[str, Any]]):
|
||||
return {
|
||||
key: sum([m[key] for m in token_usage_list if m is not None])
|
||||
for key in token_usage_list[0]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user