From ecc3d913da72f6e5cbce400fd43051d25dc3fafb Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Tue, 9 Jul 2024 10:27:39 -0400 Subject: [PATCH] Implement major fixes from yesterdays group conversation. Now working on tests. --- src/crewai/crew.py | 54 ++++++++++++++--------- src/crewai/crews/crew_output.py | 73 ++++++++++++++++--------------- src/crewai/task.py | 72 ++++++++++++++---------------- src/crewai/tasks/__init__.py | 4 ++ src/crewai/tasks/output_format.py | 9 ++++ src/crewai/tasks/task_output.py | 52 +++++++++------------- src/crewai/utilities/formatter.py | 10 ++++- 7 files changed, 144 insertions(+), 130 deletions(-) create mode 100644 src/crewai/tasks/output_format.py diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 0fd1170f4..5a991aed6 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -33,7 +33,10 @@ from crewai.tools.agent_tools import AgentTools from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE from crewai.utilities.evaluators.task_evaluator import TaskEvaluator -from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs +from crewai.utilities.formatter import ( + aggregate_raw_outputs_from_task_outputs, + aggregate_raw_outputs_from_tasks, +) from crewai.utilities.training_handler import CrewTrainingHandler try: @@ -319,7 +322,6 @@ class Crew(BaseModel): self._execution_span = self._telemetry.crew_execution_span(self, inputs) if inputs is not None: self._interpolate_inputs(inputs) - self._interpolate_inputs(inputs) self._set_tasks_callbacks() i18n = I18N(prompt_file=self.prompt_file) @@ -385,9 +387,7 @@ class Crew(BaseModel): self.usage_metrics = total_usage_metrics return results - async def kickoff_async( - self, inputs: Optional[CrewOutput] = {} - ) -> Union[str, Dict]: + async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> CrewOutput: """Asynchronous kickoff method to start the crew execution.""" return await asyncio.to_thread(self.kickoff, inputs) @@ -459,9 +459,13 @@ class Crew(BaseModel): self._file_handler.log( agent=role, task=task.description, status="started" ) - # TODO: IF USER OVERRIDE THE CONTEXT, PASS THAT + if task.async_execution: - context = aggregate_raw_outputs_from_task_outputs(task_outputs) + context = ( + aggregate_raw_outputs_from_tasks(task.context) + if task.context + else aggregate_raw_outputs_from_task_outputs(task_outputs) + ) future = task.execute_async( agent=task.agent, context=context, tools=task.tools ) @@ -479,7 +483,11 @@ class Crew(BaseModel): # Clear the futures list after processing all async results futures.clear() - context = aggregate_raw_outputs_from_task_outputs(task_outputs) + context = ( + aggregate_raw_outputs_from_tasks(task.context) + if task.context + else aggregate_raw_outputs_from_task_outputs(task_outputs) + ) task_output = task.execute_sync( agent=task.agent, context=context, tools=task.tools ) @@ -499,7 +507,22 @@ class Crew(BaseModel): token_usage = self.calculate_usage_metrics() - return self._format_output(task_outputs, token_usage) + # Important: There should only be one task output in the list + # If there are more or 0, something went wrong. + if len(task_outputs) != 1: + raise ValueError( + "Something went wrong. Kickoff should return only one task output." + ) + + final_task_output = task_outputs[0] + + return CrewOutput( + _raw=final_task_output.raw, + _pydantic=final_task_output.pydantic, + _json=final_task_output.json, + tasks_output=[task.output for task in self.tasks if task.output], + token_usage=token_usage, + ) def _process_task_result(self, task: Task, output: TaskOutput) -> None: role = task.agent.role if task.agent is not None else "None" @@ -530,6 +553,7 @@ class Crew(BaseModel): task_outputs: List[TaskOutput] = [] futures: List[Tuple[Task, Future[TaskOutput]]] = [] + # TODO: IF USER OVERRIDE THE CONTEXT, PASS THAT for task in self.tasks: self._logger.log("debug", f"Working Agent: {manager.role}") self._logger.log("info", f"Starting Task: {task.description}") @@ -634,18 +658,6 @@ class Crew(BaseModel): for agent in self.agents: agent.interpolate_inputs(inputs) - def _format_output( - self, output: List[TaskOutput], token_usage: Optional[Dict[str, Any]] - ) -> CrewOutput: - """ - Formats the output of the crew execution. - """ - return CrewOutput( - output=output, - tasks_output=[task.output for task in self.tasks if task], - token_usage=token_usage, - ) - def _finish_execution(self, final_string_output: str) -> None: if self.max_rpm: self._rpm_controller.stop_rpm_counter() diff --git a/src/crewai/crews/crew_output.py b/src/crewai/crews/crew_output.py index 2a997af60..c44a7501f 100644 --- a/src/crewai/crews/crew_output.py +++ b/src/crewai/crews/crew_output.py @@ -1,14 +1,17 @@ -from typing import Any, Dict, List +from typing import Any, Dict, Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, PrivateAttr +from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput -from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs class CrewOutput(BaseModel): - output: List[TaskOutput] = Field(description="Result of the final task") - # TODO: TYPED OUTPUT + """Class that represents the result of a crew.""" + + _raw: str = PrivateAttr(default="") + _pydantic: Optional[BaseModel] = PrivateAttr(default=None) + _json: Optional[Dict[str, Any]] = PrivateAttr(default=None) tasks_output: list[TaskOutput] = Field( description="Output of each task", default=[] ) @@ -16,41 +19,39 @@ class CrewOutput(BaseModel): description="Processed token summary", default={} ) - # TODO: Support 1 output by default - # TODO: GIVE THEM THE OPTION TO ACCESS - # TODO: RESULT get's back a string + @property + def raw(self) -> str: + return self._raw - # TODO: Ask @joao what is the desired behavior here - def result( - self, - ) -> List[str | BaseModel | Dict[str, Any]]: - """Return the result of the task based on the available output.""" - if len(self.output) == 1: - return self.output[0].result() + @property + def pydantic(self) -> Optional[BaseModel]: + # Check if the final task output included a pydantic model + if self.tasks_output[-1].output_format != OutputFormat.PYDANTIC: + raise ValueError( + "No pydantic model found in the final task. Please make sure to set the output_pydantic property in the final task in your crew." + ) - results = [output.result() for output in self.output] - return results + return self._pydantic - # TODO: RESULT PYDANTIC - # TODO: RESULT JSON D + @property + def json(self) -> Optional[Dict[str, Any]]: + if self.tasks_output[-1].output_format != OutputFormat.JSON: + raise ValueError( + "No JSON output found in the final task. Please make sure to set the output_json property in the final task in your crew." + ) - def raw_output(self) -> str: - """Return the raw output of the task.""" - return aggregate_raw_outputs_from_task_outputs(self.output) + return self._json - def to_output_dict(self) -> List[Dict[str, Any]]: - output_dict = [output.to_output_dict() for output in self.output] - return output_dict + def to_output_dict(self) -> Dict[str, Any]: + if self.json: + return self.json + if self.pydantic: + return self.pydantic.model_dump() + raise ValueError("No output to convert to dictionary") - def __getitem__(self, key: str) -> Any: - if len(self.output) == 0: - return None - elif len(self.output) == 1: - return self.output[0][key] - else: - return [output[key] for output in self.output] - - # TODO: Confirm with Joao that we want to print the raw output and not the object def __str__(self): - # TODO: GRAB LAST TASK AND CALL RESULT ON IT. - return str(self.raw_output()) + if self.pydantic: + return str(self.pydantic) + if self.json: + return str(self.json) + return self.raw diff --git a/src/crewai/task.py b/src/crewai/task.py index 06d2808df..15dd61f9d 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -4,7 +4,7 @@ import threading import uuid from concurrent.futures import Future from copy import copy -from typing import Any, Dict, List, Optional, Type, Union +from typing import Any, Dict, List, Optional, Tuple, Type, Union from langchain_openai import ChatOpenAI from opentelemetry.trace import Span @@ -12,10 +12,10 @@ from pydantic import UUID4, BaseModel, Field, field_validator, model_validator from pydantic_core import PydanticCustomError from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput from crewai.telemetry.telemetry import Telemetry from crewai.utilities.converter import Converter, ConverterError -from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs from crewai.utilities.i18n import I18N from crewai.utilities.printer import Printer from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser @@ -159,18 +159,6 @@ class Task(BaseModel): ) return self - def wait_for_completion(self) -> str | BaseModel: - """Wait for asynchronous task completion and return the output.""" - assert self.async_execution, "Task is not set to be executed asynchronously." - - if self._future: - self._future.result() # Wait for the future to complete - self._future = None - - assert self.output, "Task output is not set." - - return self.output.exported_output - def execute_sync( self, agent: Optional[BaseAgent] = None, @@ -219,31 +207,27 @@ class Task(BaseModel): self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self) - if self.context: - task_outputs: List[TaskOutput] = [] - for task in self.context: - if task.async_execution: - task.wait_for_completion() - if task.output: - task_outputs.append(task.output) - context = aggregate_raw_outputs_from_task_outputs(task_outputs) - self.prompt_context = context - tools = tools or self.tools + tools = tools or self.tools or [] result = agent.execute_task( task=self, context=context, tools=tools, ) - exported_output = self._export_output(result) + + if self.output_file: + self._save_output(result) + + pydantic_output, json_output = self._export_output(result) task_output = TaskOutput( description=self.description, - raw_output=result, - pydantic_output=exported_output["pydantic"], - json_output=exported_output["json"], + raw=result, + pydantic=pydantic_output, + json=json_output, agent=agent.role, + output_format=self._get_output_format(), ) self.output = task_output @@ -289,7 +273,7 @@ class Task(BaseModel): """Increment the delegations counter.""" self.delegations += 1 - def copy(self, agents: Optional[List["BaseAgent"]] = None) -> "Task": + def copy(self, agents: List["BaseAgent"]) -> "Task": """Create a deep copy of the Task.""" exclude = { "id", @@ -322,26 +306,24 @@ class Task(BaseModel): def _export_output( self, result: str - ) -> Dict[str, Union[BaseModel, Dict[str, Any]]]: - output = { - "pydantic": None, - "json": None, - } + ) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]: + pydantic_output: Optional[BaseModel] = None + json_output: Optional[Dict[str, Any]] = None if self.output_pydantic or self.output_json: model_output = self._convert_to_model(result) - output["pydantic"] = ( + pydantic_output = ( model_output if isinstance(model_output, BaseModel) else None ) - output["json"] = model_output if isinstance(model_output, dict) else None + json_output = model_output if isinstance(model_output, dict) else None - if self.output_file: - self._save_output(output["raw"]) - - return output + return pydantic_output, json_output def _convert_to_model(self, result: str) -> Union[dict, BaseModel, str]: model = self.output_pydantic or self.output_json + if model is None: + return result + try: return self._validate_model(result, model) except Exception: @@ -392,6 +374,13 @@ class Task(BaseModel): return exported_result + def _get_output_format(self) -> OutputFormat: + if self.output_json: + return OutputFormat.JSON + if self.output_pydantic: + return OutputFormat.PYDANTIC + return OutputFormat.RAW + def _get_conversion_instructions(self, model: Type[BaseModel], llm: Any) -> str: instructions = "I'm gonna convert this raw text into valid JSON." if not self._is_gpt(llm): @@ -400,6 +389,9 @@ class Task(BaseModel): return instructions def _save_output(self, content: str) -> None: + if not self.output_file: + raise Exception("Output file path is not set.") + directory = os.path.dirname(self.output_file) if directory and not os.path.exists(directory): os.makedirs(directory) diff --git a/src/crewai/tasks/__init__.py b/src/crewai/tasks/__init__.py index e69de29bb..d26db2093 100644 --- a/src/crewai/tasks/__init__.py +++ b/src/crewai/tasks/__init__.py @@ -0,0 +1,4 @@ +from crewai.tasks.output_format import OutputFormat +from crewai.tasks.task_output import TaskOutput + +__all__ = ["OutputFormat", "TaskOutput"] diff --git a/src/crewai/tasks/output_format.py b/src/crewai/tasks/output_format.py new file mode 100644 index 000000000..dbea9ffcf --- /dev/null +++ b/src/crewai/tasks/output_format.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class OutputFormat(str, Enum): + """Enum that represents the output format of a task.""" + + JSON = "json" + PYDANTIC = "pydantic" + RAW = "raw" diff --git a/src/crewai/tasks/task_output.py b/src/crewai/tasks/task_output.py index 633e10fc9..58a47e287 100644 --- a/src/crewai/tasks/task_output.py +++ b/src/crewai/tasks/task_output.py @@ -1,24 +1,26 @@ -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional from pydantic import BaseModel, Field, model_validator +from crewai.tasks.output_format import OutputFormat + -# TODO: This is a breaking change. Confirm with @joao class TaskOutput(BaseModel): """Class that represents the result of a task.""" - # TODO: MAKE SURE TO FULLY SUPPORT OUTPUT FILE - description: str = Field(description="Description of the task") summary: Optional[str] = Field(description="Summary of the task", default=None) - raw_output: str = Field(description="Result of the task") - pydantic_output: Optional[BaseModel] = Field( + raw: str = Field( + description="Result of the task" + ) # TODO: @joao: breaking change, by renaming raw_output to raw, but now consistent with CrewOutput + pydantic: Optional[BaseModel] = Field( description="Pydantic model output", default=None ) - json_output: Optional[Dict[str, Any]] = Field( - description="JSON output", default=None - ) + json: Optional[Dict[str, Any]] = Field(description="JSON output", default=None) agent: str = Field(description="Agent that executed the task") + output_format: OutputFormat = Field( + description="Output format of the task", default=OutputFormat.RAW + ) @model_validator(mode="after") def set_summary(self): @@ -27,32 +29,18 @@ class TaskOutput(BaseModel): self.summary = f"{excerpt}..." return self - # TODO: Ask @joao what is the desired behavior here - def result(self) -> Union[str, BaseModel, Dict[str, Any]]: - """Return the result of the task based on the available output.""" - if self.pydantic_output: - return self.pydantic_output - elif self.json_output: - return self.json_output - else: - return self.raw_output - - def __getitem__(self, key: str) -> Any: - """Retrieve a value from the pydantic_output or json_output based on the key.""" - if self.pydantic_output and hasattr(self.pydantic_output, key): - return getattr(self.pydantic_output, key) - if self.json_output and key in self.json_output: - return self.json_output[key] - raise KeyError(f"Key '{key}' not found in pydantic_output or json_output") - def to_dict(self) -> Dict[str, Any]: """Convert json_output and pydantic_output to a dictionary.""" output_dict = {} - if self.json_output: - output_dict.update(self.json_output) - if self.pydantic_output: - output_dict.update(self.pydantic_output.model_dump()) + if self.json: + output_dict.update(self.json) + if self.pydantic: + output_dict.update(self.pydantic.model_dump()) return output_dict def __str__(self) -> str: - return self.raw_output + if self.pydantic: + return str(self.pydantic) + if self.json: + return str(self.json) + return self.raw diff --git a/src/crewai/utilities/formatter.py b/src/crewai/utilities/formatter.py index 13c891641..34da6cc43 100644 --- a/src/crewai/utilities/formatter.py +++ b/src/crewai/utilities/formatter.py @@ -1,5 +1,6 @@ from typing import List +from crewai.task import Task from crewai.tasks.task_output import TaskOutput @@ -8,5 +9,12 @@ def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> s dividers = "\n\n----------\n\n" # Join task outputs with dividers - context = dividers.join(output.raw_output for output in task_outputs) + context = dividers.join(output.raw for output in task_outputs) return context + + +def aggregate_raw_outputs_from_tasks(tasks: List[Task]) -> str: + """Generate string context from the tasks.""" + task_outputs = [task.output for task in tasks if task.output is not None] + + return aggregate_raw_outputs_from_task_outputs(task_outputs)