added usage_metrics to full output (#756)

* added extra parameter for kickoff to return token usage count after result

* added output_token_usage to class and in full_output

* logger duplicated

* added more types

* added usage_metrics to full output instead

* added more to the description on full_output

* possible mispacing
This commit is contained in:
Lorenze Jay
2024-06-12 10:18:52 -07:00
committed by GitHub
parent a1a48888c3
commit d3b6640b4a
4 changed files with 62 additions and 46 deletions

View File

@@ -48,7 +48,7 @@ class Crew(BaseModel):
max_rpm: Maximum number of requests per minute for the crew execution to be respected.
prompt_file: Path to the prompt json file to be used for the crew.
id: A unique identifier for the crew instance.
full_output: Whether the crew should return the full output with all tasks outputs or just the final output.
full_output: Whether the crew should return the full output with all tasks outputs and token usage metrics or just the final output.
task_callback: Callback to be executed after each task for every agents execution.
step_callback: Callback to be executed after each step for every agents execution.
share_crew: Whether you want to share the complete crew information and execution with crewAI to make the library better, and allow us to train models.
@@ -59,8 +59,7 @@ class Crew(BaseModel):
_rpm_controller: RPMController = PrivateAttr()
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(
default=CacheHandler())
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
@@ -85,7 +84,7 @@ class Crew(BaseModel):
)
full_output: Optional[bool] = Field(
default=False,
description="Whether the crew should return the full output with all tasks outputs or just the final output.",
description="Whether the crew should return the full output with all tasks outputs and token usage metrics or just the final output.",
)
manager_llm: Optional[Any] = Field(
description="Language model that will run the agent.", default=None
@@ -155,8 +154,7 @@ class Crew(BaseModel):
self._logger = Logger(self.verbose)
if self.output_log_file:
self._file_handler = FileHandler(self.output_log_file)
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger)
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self._telemetry.crew_creation(self)
@@ -167,7 +165,9 @@ class Crew(BaseModel):
"""Set private attributes."""
if self.memory:
self._long_term_memory = LongTermMemory()
self._short_term_memory = ShortTermMemory(crew=self, embedder_config=self.embedder)
self._short_term_memory = ShortTermMemory(
crew=self, embedder_config=self.embedder
)
self._entity_memory = EntityMemory(crew=self, embedder_config=self.embedder)
return self
@@ -242,7 +242,10 @@ class Crew(BaseModel):
del task_config["agent"]
return Task(**task_config, agent=task_agent)
def kickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str:
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = {},
) -> str:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
@@ -271,7 +274,6 @@ class Crew(BaseModel):
result, manager_metrics = self._run_hierarchical_process()
# type: ignore # Cannot determine type of "manager_metrics"
metrics.append(manager_metrics)
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
@@ -303,10 +305,12 @@ class Crew(BaseModel):
return results
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> Union[str, Dict]:
async def kickoff_async(
self, inputs: Optional[Dict[str, Any]] = {}
) -> Union[str, Dict]:
"""Asynchronous kickoff method to start the crew execution."""
return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]:
async def run_crew(input_data):
crew = self.copy()
@@ -318,9 +322,8 @@ class Crew(BaseModel):
return await crew.kickoff_async()
tasks = [asyncio.create_task(run_crew(input_data))
for input_data in inputs]
tasks = [asyncio.create_task(run_crew(input_data)) for input_data in inputs]
results = await asyncio.gather(*tasks)
return results
@@ -341,8 +344,7 @@ class Crew(BaseModel):
task.tools += AgentTools(agents=agents_for_delegation).tools()
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== Working Agent: {
role}", color="bold_purple")
self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple")
self._logger.log(
"info", f"== Starting Task: {task.description}", color="bold_purple"
)
@@ -353,20 +355,21 @@ class Crew(BaseModel):
)
output = task.execute(context=task_output)
if not task.async_execution:
task_output = output
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {
task_output}\n\n")
self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n")
if self.output_log_file:
self._file_handler.log(
agent=role, task=task_output, status="completed")
self._file_handler.log(agent=role, task=task_output, status="completed")
self._finish_execution(task_output)
return self._format_output(task_output)
# type: ignore # Item "None" of "Agent | None" has no attribute "_token_process"
token_usage = task.agent._token_process.get_summary()
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
return self._format_output(task_output, token_usage)
def _run_hierarchical_process(self) -> str:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
@@ -382,8 +385,7 @@ class Crew(BaseModel):
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
backstory=i18n.retrieve(
"hierarchical_manager_agent", "backstory"),
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
llm=self.manager_llm,
verbose=True,
@@ -403,8 +405,7 @@ class Crew(BaseModel):
agent=manager, context=task_output, tools=manager.tools
)
self._logger.log(
"debug", f"[{manager.role}] Task output: {task_output}")
self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")
if self.output_log_file:
self._file_handler.log(
@@ -413,11 +414,14 @@ class Crew(BaseModel):
self._finish_execution(task_output)
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
return self._format_output(task_output), manager._token_process.get_summary()
manager_token_usage = manager._token_process.get_summary()
return self._format_output(
task_output, manager_token_usage
), manager_token_usage
def copy(self):
"""Create a deep copy of the Crew."""
exclude = {
"id",
"_rpm_controller",
@@ -427,7 +431,7 @@ class Crew(BaseModel):
"_cache_handler",
"_short_term_memory",
"_long_term_memory",
"_entity_memory"
"_entity_memory",
"agents",
"tasks",
}
@@ -445,7 +449,6 @@ class Crew(BaseModel):
return copied_crew
def _set_tasks_callbacks(self) -> None:
"""Sets callback for every task suing task_callback"""
for task in self.tasks:
@@ -454,18 +457,23 @@ class Crew(BaseModel):
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolates the inputs in the tasks and agents."""
[task.interpolate_inputs(
# type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
inputs) for task in self.tasks]
[
task.interpolate_inputs(
# type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
inputs
)
for task in self.tasks
]
# type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
[agent.interpolate_inputs(inputs) for agent in self.agents]
def _format_output(self, output: str) -> str:
def _format_output(self, output: str, token_usage: Optional[Dict[str, Any]]) -> str:
"""Formats the output of the crew execution."""
if self.full_output:
return { # type: ignore # Incompatible return value type (got "dict[str, Sequence[str | TaskOutput | None]]", expected "str")
"final_output": output,
"tasks_outputs": [task.output for task in self.tasks if task],
"usage_metrics": token_usage,
}
else:
return output

View File

@@ -236,8 +236,7 @@ class Task(BaseModel):
if inputs:
self.description = self._original_description.format(**inputs)
self.expected_output = self._original_expected_output.format(
**inputs)
self.expected_output = self._original_expected_output.format(**inputs)
def increment_tools_errors(self) -> None:
"""Increment the tools errors counter."""
@@ -259,11 +258,18 @@ class Task(BaseModel):
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
cloned_context = [task.copy() for task in self.context] if self.context else None
cloned_context = (
[task.copy() for task in self.context] if self.context else None
)
cloned_agent = self.agent.copy() if self.agent else None
cloned_tools = deepcopy(self.tools) if self.tools else None
copied_task = Task(**copied_data, context=cloned_context, agent=cloned_agent, tools=cloned_tools)
copied_task = Task(
**copied_data,
context=cloned_context,
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
def _export_output(self, result: str) -> Any:
@@ -287,8 +293,7 @@ class Task(BaseModel):
if match:
try:
# type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
exported_result = model.model_validate_json(
match.group(0))
exported_result = model.model_validate_json(match.group(0))
if self.output_json:
# type: ignore # "str" has no attribute "model_dump"
return exported_result.model_dump()
@@ -302,8 +307,7 @@ class Task(BaseModel):
if not self._is_gpt(llm):
# type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]"
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{
instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
llm=llm, text=result, model=model, instructions=instructions
@@ -316,8 +320,7 @@ class Task(BaseModel):
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{
exported_result.message} Using raw output instead.",
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
exported_result = result
@@ -342,7 +345,7 @@ class Task(BaseModel):
os.makedirs(directory)
# type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]"
with open(self.output_file, "w", encoding='utf-8') as file:
with open(self.output_file, "w", encoding="utf-8") as file:
file.write(result)
return None

View File

@@ -1,6 +1,5 @@
from datetime import datetime
from crewai.utilities.printer import Printer
from datetime import datetime
class Logger:

View File

@@ -384,6 +384,12 @@ def test_crew_full_ouput():
assert result == {
"final_output": "Hello! It is a delight to receive your message. I trust this response finds you in good spirits. It's indeed a pleasure to connect with you too.",
"tasks_outputs": [task1.output, task2.output],
"usage_metrics": {
"completion_tokens": 109,
"prompt_tokens": 330,
"successful_requests": 2,
"total_tokens": 439,
},
}