From 14e704698052e6976a9f97d2dcd23181f413d151 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Wed, 12 Jun 2024 10:18:52 -0700 Subject: [PATCH] added usage_metrics to full output (#756) * added extra parameter for kickoff to return token usage count after result * added output_token_usage to class and in full_output * logger duplicated * added more types * added usage_metrics to full output instead * added more to the description on full_output * possible mispacing --- src/crewai/crew.py | 76 +++++++++++++++++++--------------- src/crewai/task.py | 25 ++++++----- src/crewai/utilities/logger.py | 1 - tests/crew_test.py | 6 +++ 4 files changed, 62 insertions(+), 46 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 974fdb9b3..8c91202db 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -48,7 +48,7 @@ class Crew(BaseModel): max_rpm: Maximum number of requests per minute for the crew execution to be respected. prompt_file: Path to the prompt json file to be used for the crew. id: A unique identifier for the crew instance. - full_output: Whether the crew should return the full output with all tasks outputs or just the final output. + full_output: Whether the crew should return the full output with all tasks outputs and token usage metrics or just the final output. task_callback: Callback to be executed after each task for every agents execution. step_callback: Callback to be executed after each step for every agents execution. share_crew: Whether you want to share the complete crew information and execution with crewAI to make the library better, and allow us to train models. @@ -59,8 +59,7 @@ class Crew(BaseModel): _rpm_controller: RPMController = PrivateAttr() _logger: Logger = PrivateAttr() _file_handler: FileHandler = PrivateAttr() - _cache_handler: InstanceOf[CacheHandler] = PrivateAttr( - default=CacheHandler()) + _cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler()) _short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr() _long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr() _entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr() @@ -85,7 +84,7 @@ class Crew(BaseModel): ) full_output: Optional[bool] = Field( default=False, - description="Whether the crew should return the full output with all tasks outputs or just the final output.", + description="Whether the crew should return the full output with all tasks outputs and token usage metrics or just the final output.", ) manager_llm: Optional[Any] = Field( description="Language model that will run the agent.", default=None @@ -155,8 +154,7 @@ class Crew(BaseModel): self._logger = Logger(self.verbose) if self.output_log_file: self._file_handler = FileHandler(self.output_log_file) - self._rpm_controller = RPMController( - max_rpm=self.max_rpm, logger=self._logger) + self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger) self._telemetry = Telemetry() self._telemetry.set_tracer() self._telemetry.crew_creation(self) @@ -167,7 +165,9 @@ class Crew(BaseModel): """Set private attributes.""" if self.memory: self._long_term_memory = LongTermMemory() - self._short_term_memory = ShortTermMemory(crew=self, embedder_config=self.embedder) + self._short_term_memory = ShortTermMemory( + crew=self, embedder_config=self.embedder + ) self._entity_memory = EntityMemory(crew=self, embedder_config=self.embedder) return self @@ -242,7 +242,10 @@ class Crew(BaseModel): del task_config["agent"] return Task(**task_config, agent=task_agent) - def kickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str: + def kickoff( + self, + inputs: Optional[Dict[str, Any]] = {}, + ) -> str: """Starts the crew to work on its assigned tasks.""" self._execution_span = self._telemetry.crew_execution_span(self) # type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]" @@ -271,7 +274,6 @@ class Crew(BaseModel): result, manager_metrics = self._run_hierarchical_process() # type: ignore # Cannot determine type of "manager_metrics" metrics.append(manager_metrics) - else: raise NotImplementedError( f"The process '{self.process}' is not implemented yet." @@ -303,10 +305,12 @@ class Crew(BaseModel): return results - async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> Union[str, Dict]: + async def kickoff_async( + self, inputs: Optional[Dict[str, Any]] = {} + ) -> Union[str, Dict]: """Asynchronous kickoff method to start the crew execution.""" return await asyncio.to_thread(self.kickoff, inputs) - + async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]: async def run_crew(input_data): crew = self.copy() @@ -318,9 +322,8 @@ class Crew(BaseModel): return await crew.kickoff_async() - tasks = [asyncio.create_task(run_crew(input_data)) - for input_data in inputs] - + tasks = [asyncio.create_task(run_crew(input_data)) for input_data in inputs] + results = await asyncio.gather(*tasks) return results @@ -341,8 +344,7 @@ class Crew(BaseModel): task.tools += AgentTools(agents=agents_for_delegation).tools() role = task.agent.role if task.agent is not None else "None" - self._logger.log("debug", f"== Working Agent: { - role}", color="bold_purple") + self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple") self._logger.log( "info", f"== Starting Task: {task.description}", color="bold_purple" ) @@ -353,20 +355,21 @@ class Crew(BaseModel): ) output = task.execute(context=task_output) + if not task.async_execution: task_output = output role = task.agent.role if task.agent is not None else "None" - self._logger.log("debug", f"== [{role}] Task output: { - task_output}\n\n") + self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n") if self.output_log_file: - self._file_handler.log( - agent=role, task=task_output, status="completed") + self._file_handler.log(agent=role, task=task_output, status="completed") self._finish_execution(task_output) - return self._format_output(task_output) - + # type: ignore # Item "None" of "Agent | None" has no attribute "_token_process" + token_usage = task.agent._token_process.get_summary() + # type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str") + return self._format_output(task_output, token_usage) def _run_hierarchical_process(self) -> str: """Creates and assigns a manager agent to make sure the crew completes the tasks.""" @@ -382,8 +385,7 @@ class Crew(BaseModel): manager = Agent( role=i18n.retrieve("hierarchical_manager_agent", "role"), goal=i18n.retrieve("hierarchical_manager_agent", "goal"), - backstory=i18n.retrieve( - "hierarchical_manager_agent", "backstory"), + backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"), tools=AgentTools(agents=self.agents).tools(), llm=self.manager_llm, verbose=True, @@ -403,8 +405,7 @@ class Crew(BaseModel): agent=manager, context=task_output, tools=manager.tools ) - self._logger.log( - "debug", f"[{manager.role}] Task output: {task_output}") + self._logger.log("debug", f"[{manager.role}] Task output: {task_output}") if self.output_log_file: self._file_handler.log( @@ -413,11 +414,14 @@ class Crew(BaseModel): self._finish_execution(task_output) # type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str") - return self._format_output(task_output), manager._token_process.get_summary() + manager_token_usage = manager._token_process.get_summary() + return self._format_output( + task_output, manager_token_usage + ), manager_token_usage def copy(self): """Create a deep copy of the Crew.""" - + exclude = { "id", "_rpm_controller", @@ -427,7 +431,7 @@ class Crew(BaseModel): "_cache_handler", "_short_term_memory", "_long_term_memory", - "_entity_memory" + "_entity_memory", "agents", "tasks", } @@ -445,7 +449,6 @@ class Crew(BaseModel): return copied_crew - def _set_tasks_callbacks(self) -> None: """Sets callback for every task suing task_callback""" for task in self.tasks: @@ -454,18 +457,23 @@ class Crew(BaseModel): def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None: """Interpolates the inputs in the tasks and agents.""" - [task.interpolate_inputs( - # type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None) - inputs) for task in self.tasks] + [ + task.interpolate_inputs( + # type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None) + inputs + ) + for task in self.tasks + ] # type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None) [agent.interpolate_inputs(inputs) for agent in self.agents] - def _format_output(self, output: str) -> str: + def _format_output(self, output: str, token_usage: Optional[Dict[str, Any]]) -> str: """Formats the output of the crew execution.""" if self.full_output: return { # type: ignore # Incompatible return value type (got "dict[str, Sequence[str | TaskOutput | None]]", expected "str") "final_output": output, "tasks_outputs": [task.output for task in self.tasks if task], + "usage_metrics": token_usage, } else: return output diff --git a/src/crewai/task.py b/src/crewai/task.py index 2c618fe8d..359f5871f 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -236,8 +236,7 @@ class Task(BaseModel): if inputs: self.description = self._original_description.format(**inputs) - self.expected_output = self._original_expected_output.format( - **inputs) + self.expected_output = self._original_expected_output.format(**inputs) def increment_tools_errors(self) -> None: """Increment the tools errors counter.""" @@ -259,11 +258,18 @@ class Task(BaseModel): copied_data = self.model_dump(exclude=exclude) copied_data = {k: v for k, v in copied_data.items() if v is not None} - cloned_context = [task.copy() for task in self.context] if self.context else None + cloned_context = ( + [task.copy() for task in self.context] if self.context else None + ) cloned_agent = self.agent.copy() if self.agent else None cloned_tools = deepcopy(self.tools) if self.tools else None - copied_task = Task(**copied_data, context=cloned_context, agent=cloned_agent, tools=cloned_tools) + copied_task = Task( + **copied_data, + context=cloned_context, + agent=cloned_agent, + tools=cloned_tools, + ) return copied_task def _export_output(self, result: str) -> Any: @@ -287,8 +293,7 @@ class Task(BaseModel): if match: try: # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json" - exported_result = model.model_validate_json( - match.group(0)) + exported_result = model.model_validate_json(match.group(0)) if self.output_json: # type: ignore # "str" has no attribute "model_dump" return exported_result.model_dump() @@ -302,8 +307,7 @@ class Task(BaseModel): if not self._is_gpt(llm): # type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]" model_schema = PydanticSchemaParser(model=model).get_schema() - instructions = f"{ - instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}" + instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}" converter = Converter( llm=llm, text=result, model=model, instructions=instructions @@ -316,8 +320,7 @@ class Task(BaseModel): if isinstance(exported_result, ConverterError): Printer().print( - content=f"{ - exported_result.message} Using raw output instead.", + content=f"{exported_result.message} Using raw output instead.", color="red", ) exported_result = result @@ -342,7 +345,7 @@ class Task(BaseModel): os.makedirs(directory) # type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]" - with open(self.output_file, "w", encoding='utf-8') as file: + with open(self.output_file, "w", encoding="utf-8") as file: file.write(result) return None diff --git a/src/crewai/utilities/logger.py b/src/crewai/utilities/logger.py index 55eeb02cc..0e19ee5e1 100644 --- a/src/crewai/utilities/logger.py +++ b/src/crewai/utilities/logger.py @@ -1,6 +1,5 @@ from datetime import datetime from crewai.utilities.printer import Printer -from datetime import datetime class Logger: diff --git a/tests/crew_test.py b/tests/crew_test.py index 3c4d087e3..2be57821f 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -384,6 +384,12 @@ def test_crew_full_ouput(): assert result == { "final_output": "Hello! It is a delight to receive your message. I trust this response finds you in good spirits. It's indeed a pleasure to connect with you too.", "tasks_outputs": [task1.output, task2.output], + "usage_metrics": { + "completion_tokens": 109, + "prompt_tokens": 330, + "successful_requests": 2, + "total_tokens": 439, + }, }