Bugfix/kickoff for each usage metrics (#844)

* WIP. Figuring out disconnect issue.

* Cleaned up logs now that I've isolated the issue to the LLM

* more wip.

* WIP. It looks like usage metrics has always been broken for async

* Update parent crew who is managing for_each loop

* Merge in main to bugfix/kickoff-for-each-usage-metrics

* Clean up code for review

* Add new tests

* Final cleanup. Ready for review.

* Moving copy functionality from Agent to BaseAgent

* Fix renaming issue

* Fix linting errors

* use BaseAgent instead of Agent where applicable
This commit is contained in:
Brandon Hancock (bhancock_ai)
2024-07-03 14:30:53 -04:00
committed by GitHub
parent 706f4cd74a
commit 57fc079267
11 changed files with 1761 additions and 307 deletions

View File

@@ -7,16 +7,15 @@ from langchain.tools.render import render_text_description
from langchain_core.agents import AgentAction
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
from pydantic import Field, InstanceOf, model_validator
from crewai.agents import CacheHandler, CrewAgentExecutor, CrewAgentParser
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import Prompts, Converter
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.utilities.training_handler import CrewTrainingHandler
agentops = None
@@ -91,7 +90,6 @@ class Agent(BaseAgent):
response_template: Optional[str] = Field(
default=None, description="Response format for the agent."
)
allow_code_execution: Optional[bool] = Field(
default=False, description="Enable code execution for the agent."
)
@@ -103,7 +101,7 @@ class Agent(BaseAgent):
@model_validator(mode="after")
def set_agent_executor(self) -> "Agent":
"""Ensure agent executor and token process is set."""
"""Ensure agent executor and token process are set."""
if hasattr(self.llm, "model_name"):
token_handler = TokenCalcHandler(self.llm.model_name, self._token_process)

View File

@@ -1,23 +1,26 @@
from copy import deepcopy
import uuid
from typing import Any, Dict, List, Optional
from abc import ABC, abstractmethod
from copy import copy as shallow_copy
from typing import Any, Dict, List, Optional, TypeVar
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
PrivateAttr,
field_validator,
model_validator,
ConfigDict,
PrivateAttr,
)
from pydantic_core import PydanticCustomError
from crewai.utilities import I18N, RPMController, Logger
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.tools_handler import ToolsHandler
from crewai.utilities.token_counter_callback import TokenProcess
from crewai.utilities import I18N, Logger, RPMController
T = TypeVar("T", bound="BaseAgent")
class BaseAgent(ABC, BaseModel):
@@ -188,6 +191,31 @@ class BaseAgent(ABC, BaseModel):
"""Get the converter class for the agent to create json/pydantic outputs."""
pass
def copy(self: T) -> T:
"""Create a deep copy of the Agent."""
exclude = {
"id",
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"agent_executor",
"tools",
"tools_handler",
"cache_handler",
"llm",
}
# Copy llm and clear callbacks
existing_llm = shallow_copy(self.llm)
existing_llm.callbacks = []
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_agent = type(self)(**copied_data, llm=existing_llm, tools=self.tools)
return copied_agent
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolate inputs into the agent description and backstory."""
if self._original_role is None:
@@ -217,35 +245,6 @@ class BaseAgent(ABC, BaseModel):
def increment_formatting_errors(self) -> None:
self.formatting_errors += 1
def copy(self):
exclude = {
"id",
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"token_process",
"agent_executor",
"tools",
"tools_handler",
"cache_handler",
"crew",
"llm",
}
copied_data = self.model_dump(exclude=exclude, exclude_unset=True)
copied_agent = self.__class__(**copied_data)
# Copy mutable attributes separately
copied_agent.tools = deepcopy(self.tools)
copied_agent.config = deepcopy(self.config)
# Preserve original values for interpolation
copied_agent._original_role = self._original_role
copied_agent._original_goal = self._original_goal
copied_agent._original_backstory = self._original_backstory
return copied_agent
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.

View File

@@ -299,16 +299,13 @@ class Crew(BaseModel):
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined]
# TODO: Create an AgentFunctionCalling protocol for future refactoring
if (
hasattr(agent, "function_calling_llm")
and not agent.function_calling_llm
):
if not agent.function_calling_llm:
agent.function_calling_llm = self.function_calling_llm
if hasattr(agent, "allow_code_execution") and agent.allow_code_execution:
if agent.allow_code_execution:
agent.tools += agent.get_code_execution_tools()
if hasattr(agent, "step_callback") and not agent.step_callback:
if not agent.step_callback:
agent.step_callback = self.step_callback
agent.create_agent_executor()
@@ -326,9 +323,7 @@ class Crew(BaseModel):
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
metrics = metrics + [
agent._token_process.get_summary() for agent in self.agents
]
metrics += [agent._token_process.get_summary() for agent in self.agents]
self.usage_metrics = {
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0]
@@ -336,21 +331,32 @@ class Crew(BaseModel):
return result
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List:
def kickoff_for_each(
self, inputs: List[Dict[str, Any]]
) -> List[Union[str, Dict[str, Any]]]:
"""Executes the Crew's workflow for each input in the list and aggregates results."""
results = []
# Initialize the parent crew's usage metrics
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for input_data in inputs:
crew = self.copy()
for task in crew.tasks:
task.interpolate_inputs(input_data)
for agent in crew.agents:
agent.interpolate_inputs(input_data)
output = crew.kickoff(inputs=input_data)
if crew.usage_metrics:
for key in total_usage_metrics:
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
output = crew.kickoff()
results.append(output)
self.usage_metrics = total_usage_metrics
return results
async def kickoff_async(
@@ -360,26 +366,37 @@ class Crew(BaseModel):
return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]:
async def run_crew(input_data):
crew = self.copy()
crew_copies = [self.copy() for _ in inputs]
for task in crew.tasks:
task.interpolate_inputs(input_data)
for agent in crew.agents:
agent.interpolate_inputs(input_data)
async def run_crew(crew, input_data):
return await crew.kickoff_async(inputs=input_data)
return await crew.kickoff_async()
tasks = [asyncio.create_task(run_crew(input_data)) for input_data in inputs]
tasks = [
asyncio.create_task(run_crew(crew_copies[i], inputs[i]))
for i in range(len(inputs))
]
results = await asyncio.gather(*tasks)
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for crew in crew_copies:
if crew.usage_metrics:
for key in total_usage_metrics:
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
self.usage_metrics = total_usage_metrics
return results
def _run_sequential_process(self) -> str:
"""Executes tasks sequentially and returns the final output."""
task_output = ""
token_usage = []
for task in self.tasks:
if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation"
agents_for_delegation = [
@@ -405,18 +422,16 @@ class Crew(BaseModel):
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n")
token_summ = task.agent._token_process.get_summary()
token_usage.append(token_summ)
if self.output_log_file:
self._file_handler.log(agent=role, task=task_output, status="completed")
token_usage_formatted = self.aggregate_token_usage(token_usage)
self._finish_execution(task_output)
token_usage = self.calculate_usage_metrics()
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
return self._format_output(task_output, token_usage_formatted)
return self._format_output(task_output, token_usage)
def _run_hierarchical_process(self) -> Union[str, Dict[str, Any]]:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
@@ -437,9 +452,10 @@ class Crew(BaseModel):
llm=self.manager_llm,
verbose=self.verbose,
)
self.manager_agent = manager
task_output = ""
token_usage = []
for task in self.tasks:
self._logger.log("debug", f"Working Agent: {manager.role}")
self._logger.log("info", f"Starting Task: {task.description}")
@@ -454,9 +470,6 @@ class Crew(BaseModel):
)
self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")
if hasattr(task, "agent._token_process"):
token_summ = task.agent._token_process.get_summary()
token_usage.append(token_summ)
if self.output_log_file:
self._file_handler.log(
agent=manager.role, task=task_output, status="completed"
@@ -465,13 +478,9 @@ class Crew(BaseModel):
self._finish_execution(task_output)
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
manager_token_usage = manager._token_process.get_summary()
token_usage.append(manager_token_usage)
token_usage_formatted = self.aggregate_token_usage(token_usage)
token_usage = self.calculate_usage_metrics()
return self._format_output(
task_output, token_usage_formatted
), manager_token_usage
return self._format_output(task_output, token_usage), token_usage
def copy(self):
"""Create a deep copy of the Crew."""
@@ -486,12 +495,13 @@ class Crew(BaseModel):
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_telemetry",
"agents",
"tasks",
}
cloned_agents = [agent.copy() for agent in self.agents]
cloned_tasks = [task.copy() for task in self.tasks]
cloned_tasks = [task.copy(cloned_agents) for task in self.tasks]
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
@@ -529,6 +539,7 @@ class Crew(BaseModel):
Formats the output of the crew execution.
If full_output is True, then returned data type will be a dictionary else returned outputs are string
"""
if self.full_output:
return { # type: ignore # Incompatible return value type (got "dict[str, Sequence[str | TaskOutput | None]]", expected "str")
"final_output": output,
@@ -547,11 +558,27 @@ class Crew(BaseModel):
)
self._telemetry.end_crew(self, output)
def calculate_usage_metrics(self) -> Dict[str, int]:
"""Calculates and returns the usage metrics."""
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for agent in self.agents:
if hasattr(agent, "_token_process"):
token_sum = agent._token_process.get_summary()
for key in total_usage_metrics:
total_usage_metrics[key] += token_sum.get(key, 0)
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary()
for key in total_usage_metrics:
total_usage_metrics[key] += token_sum.get(key, 0)
return total_usage_metrics
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
def aggregate_token_usage(self, token_usage_list: List[Dict[str, Any]]):
return {
key: sum([m[key] for m in token_usage_list if m is not None])
for key in token_usage_list[0]
}

View File

@@ -2,8 +2,8 @@ import os
import re
import threading
import uuid
from copy import deepcopy
from typing import Any, Dict, List, Optional, Type
from copy import copy
from typing import Any, Dict, List, Optional, Type, Union
from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
@@ -13,7 +13,9 @@ from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import I18N, ConverterError, Printer
from crewai.utilities.converter import ConverterError
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
@@ -216,7 +218,7 @@ class Task(BaseModel):
)
return result
def _execute(self, agent, task, context, tools):
def _execute(self, agent: "BaseAgent", task, context, tools):
result = agent.execute_task(
task=task,
context=context,
@@ -274,7 +276,7 @@ class Task(BaseModel):
"""Increment the delegations counter."""
self.delegations += 1
def copy(self):
def copy(self, agents: Optional[List["BaseAgent"]] = None) -> "Task":
"""Create a deep copy of the Task."""
exclude = {
"id",
@@ -289,8 +291,12 @@ class Task(BaseModel):
cloned_context = (
[task.copy() for task in self.context] if self.context else None
)
cloned_agent = self.agent.copy() if self.agent else None
cloned_tools = deepcopy(self.tools) if self.tools else []
def get_agent_by_role(role: str) -> Union["BaseAgent", None]:
return next((agent for agent in agents if agent.role == role), None)
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
copied_task = Task(
**copied_data,
@@ -298,6 +304,7 @@ class Task(BaseModel):
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
def _export_output(self, result: str) -> Any:
@@ -329,7 +336,7 @@ class Task(BaseModel):
except Exception:
pass
# type: ignore # Item "None" of "Agent | None" has no attribute "function_calling_llm"
# type: ignore # Item "None" of "BaseAgent | None" has no attribute "function_calling_llm"
llm = getattr(self.agent, "function_calling_llm", None) or self.agent.llm
if not self._is_gpt(llm):
# type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]"
@@ -355,7 +362,9 @@ class Task(BaseModel):
if self.output_file:
content = (
# type: ignore # "str" has no attribute "json"
exported_result if not self.output_pydantic else exported_result.json()
exported_result
if not self.output_pydantic
else exported_result.model_dump_json()
)
self._save_file(content)

View File

@@ -8,18 +8,18 @@ from crewai.agents.agent_builder.utilities.base_token_process import TokenProces
class TokenCalcHandler(BaseCallbackHandler):
model: str = ""
model_name: str = ""
token_cost_process: TokenProcess
def __init__(self, model, token_cost_process):
self.model = model
def __init__(self, model_name, token_cost_process):
self.model_name = model_name
self.token_cost_process = token_cost_process
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
try:
encoding = tiktoken.encoding_for_model(self.model)
encoding = tiktoken.encoding_for_model(self.model_name)
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")