Major rehaul of TaskOutput and CrewOutput. Updated all tests to work with new change. Need to add in a few final tricky async tests and add a few more to verify output types on TaskOutput and CrewOutput.

This commit is contained in:
Brandon Hancock
2024-06-21 16:13:59 -04:00
parent 5c504f4087
commit ee4a996de3
13 changed files with 28954 additions and 5453 deletions

View File

@@ -6,15 +6,15 @@ from typing import Any, Dict, List, Optional, Tuple, Union
from langchain_core.callbacks import BaseCallbackHandler
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
Json,
PrivateAttr,
field_validator,
model_validator,
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
Json,
PrivateAttr,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
@@ -26,9 +26,11 @@ from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs
class Crew(BaseModel):
@@ -246,12 +248,12 @@ class Crew(BaseModel):
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = {},
inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
self._interpolate_inputs(inputs)
if inputs is not None:
self._interpolate_inputs(inputs)
self._set_tasks_callbacks()
i18n = I18N(prompt_file=self.prompt_file)
@@ -297,12 +299,7 @@ class Crew(BaseModel):
for input_data in inputs:
crew = self.copy()
for task in crew.tasks:
task.interpolate_inputs(input_data)
for agent in crew.agents:
agent.interpolate_inputs(input_data)
output = crew.kickoff()
output = crew.kickoff(inputs=input_data)
results.append(output)
return results
@@ -317,12 +314,7 @@ class Crew(BaseModel):
async def run_crew(input_data):
crew = self.copy()
for task in crew.tasks:
task.interpolate_inputs(input_data)
for agent in crew.agents:
agent.interpolate_inputs(input_data)
return await crew.kickoff_async()
return await crew.kickoff_async(inputs=input_data)
tasks = [asyncio.create_task(run_crew(input_data)) for input_data in inputs]
@@ -336,15 +328,8 @@ class Crew(BaseModel):
def _run_sequential_process(self) -> CrewOutput:
"""Executes tasks sequentially and returns the final output."""
# TODO: Check to see if we need to be clearing task output after each task
task_output = ""
futures: List[Tuple[Task, Future]] = []
def _process_task_result(task, output):
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {output}\n\n")
if self.output_log_file:
self._file_handler.log(agent=role, task=output, status="completed")
task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput]]] = []
for task in self.tasks:
if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation"
@@ -366,47 +351,55 @@ class Crew(BaseModel):
)
if task.async_execution:
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
future = task.execute_async(
agent=task.agent, context=task_output, tools=task.tools
agent=task.agent, context=context, tools=task.tools
)
futures.append(( task, future))
futures.append((task, future))
else:
# Before executing a synchronous task, wait for all async tasks to complete
if futures:
task_output = ""
# Clear task_outputs before processing async tasks
task_outputs = []
for future_task, future in futures:
output = future.result()
task_output += output + "\n\n\n"
_process_task_result(future_task, output)
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
# Clear the futures list after processing all async results
futures.clear()
output = task.execute_sync(
agent=task.agent, context=task_output, tools=task.tools
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
task_output = task.execute_sync(
agent=task.agent, context=context, tools=task.tools
)
task_output = output
_process_task_result(task, output)
# TODO: Check with Joao to see if we want to add or ignore outputs from async tasks
# Process any remaining async results
task_outputs = [task_output]
self._process_task_result(task, task_output)
if futures:
task_output = ""
# Clear task_outputs before processing async tasks
task_outputs = []
for future_task, future in futures:
output = future.result()
task_output += output + "\n\n\n"
_process_task_result(future_task, output)
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._finish_execution(task_output)
final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
self._finish_execution(final_string_output)
# type: ignore # Item "None" of "Agent | None" has no attribute "_token_process"
token_usage = task.agent._token_process.get_summary()
return self._format_output(task_output, token_usage)
return self._format_output(task_outputs, token_usage)
# TODO: Updates this to mimic the async and sync exeuction of tasks in sequential process
def _process_task_result(self, task: Task, output: TaskOutput) -> None:
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {output}\n\n")
if self.output_log_file:
self._file_handler.log(agent=role, task=output, status="completed")
# TODO: Ask Joao if agent conntected to task can delegate to other agents like we
# can in sequential process. Or, can only the manager delegate to other agents in hierarchical process.
def _run_hierarchical_process(self) -> Tuple[CrewOutput, Dict[str, Any]]:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
self.manager_agent.allow_delegation = True
@@ -424,16 +417,12 @@ class Crew(BaseModel):
verbose=True,
)
task_output = ""
futures: List[Tuple[Task, Future]] = []
def _process_task_result(task, output):
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {output}\n\n")
if self.output_log_file:
self._file_handler.log(agent=role, task=output, status="completed")
task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput]]] = []
for task in self.tasks:
# TODO: In sequential, we allow delegation but we ignore it here. Confirm with Joao.
self._logger.log("debug", f"Working Agent: {manager.role}")
self._logger.log("info", f"Starting Task: {task.description}")
@@ -443,40 +432,45 @@ class Crew(BaseModel):
)
if task.async_execution:
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
future = task.execute_async(
agent=manager, context=task_output, tools=manager.tools
agent=manager, context=context, tools=manager.tools
)
futures.append((task, future))
else:
# Before executing a synchronous task, wait for all async tasks to complete
if futures:
task_output = "" # Clear task_output before processing async tasks
# Clear task_outputs before processing async tasks
task_outputs = []
for future_task, future in futures:
output = future.result()
task_output += output + "\n\n\n"
_process_task_result(future_task, output)
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
# Clear the futures list after processing all async results
futures.clear()
output = task.execute_sync(
agent=manager, context=task_output, tools=manager.tools
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
task_output = task.execute_sync(
agent=manager, context=context, tools=manager.tools
)
task_output = output # Set task_output to the new result for sync tasks
_process_task_result(task, output)
task_outputs = [task_output]
self._process_task_result(task, task_output)
# Process any remaining async results
if futures:
task_output = "" # Clear task_output before processing async tasks
# Clear task_outputs before processing async tasks
task_outputs = []
for future_task, future in futures:
output = future.result()
task_output += output + "\n\n\n"
_process_task_result(future_task, output)
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._finish_execution(task_output)
final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
self._finish_execution(final_string_output)
manager_token_usage = manager._token_process.get_summary()
return (
self._format_output(task_output, manager_token_usage),
self._format_output(task_outputs, manager_token_usage),
manager_token_usage,
)
@@ -529,25 +523,22 @@ class Crew(BaseModel):
[agent.interpolate_inputs(inputs) for agent in self.agents]
def _format_output(
self, output: str, token_usage: Optional[Dict[str, Any]]
self, output: List[TaskOutput], token_usage: Optional[Dict[str, Any]]
) -> CrewOutput:
"""
Formats the output of the crew execution.
"""
print("Crew Output: ", output)
print("Crew output type: ", type(output))
print("SELF TASKS: ", self.tasks)
print("Tasks Output: ", [task.output for task in self.tasks if task])
return CrewOutput(
final_output=output,
output=output,
tasks_output=[task.output for task in self.tasks if task],
token_output=token_usage,
)
def _finish_execution(self, output: str) -> None:
def _finish_execution(self, final_string_output: str) -> None:
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
self._telemetry.end_crew(self, output)
self._telemetry.end_crew(self, final_string_output)
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"

View File

@@ -1,15 +1,13 @@
from typing import Any, Dict, Union
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs
# TODO: Potentially add in JSON_OUTPUT, PYDANTIC_OUTPUT, etc.
class CrewOutput(BaseModel):
final_output: Union[str, Dict, BaseModel] = Field(
description="Final output of the crew"
)
output: List[TaskOutput] = Field(description="Result of the final task")
tasks_output: list[TaskOutput] = Field(
description="Output of each task", default=[]
)
@@ -17,5 +15,19 @@ class CrewOutput(BaseModel):
description="Processed token summary", default={}
)
def result(self) -> Union[str, BaseModel, Dict[str, Any]]:
"""Return the result of the task based on the available output."""
return self.output.result()
def raw_output(self) -> str:
"""Return the raw output of the task."""
return aggregate_raw_outputs_from_task_outputs(self.output)
def to_output_dict(self) -> Dict[str, Any]:
self.output.to_output_dict()
def __getitem__(self, key: str) -> Any:
self.output[key]
def __str__(self):
return self.final_output
return str(self.raw_output())

View File

@@ -151,16 +151,16 @@ class Task(BaseModel):
agent: Optional[Agent] = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
) -> TaskOutput:
"""Execute the task synchronously."""
return self._execute_task_sync(agent, context, tools)
return self._execute_core(agent, context, tools)
def execute_async(
self,
agent: Optional[Agent] = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> Future:
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future = Future()
threading.Thread(
@@ -168,21 +168,12 @@ class Task(BaseModel):
).start()
return future
def _execute_task_sync(
self,
agent: Optional[Agent],
context: Optional[str],
tools: Optional[List[Any]],
) -> str:
"""Execute the task synchronously with context handling."""
return self._execute_core(agent, context, tools)
def _execute_task_async(
self,
agent: Optional[Agent],
context: Optional[str],
tools: Optional[List[Any]],
future: Future,
future: Future[TaskOutput],
) -> None:
"""Execute the task asynchronously with context handling."""
result = self._execute_core(agent, context, tools)
@@ -193,7 +184,7 @@ class Task(BaseModel):
agent: Optional[Agent],
context: Optional[str],
tools: Optional[List[Any]],
) -> str:
) -> TaskOutput:
"""Run the core execution logic of the task."""
agent = agent or self.agent
if not agent:
@@ -221,17 +212,19 @@ class Task(BaseModel):
exported_output = self._export_output(result)
self.output = TaskOutput(
task_output = TaskOutput(
description=self.description,
exported_output=exported_output,
raw_output=result,
pydantic_output=exported_output["pydantic"],
json_output=exported_output["json"],
agent=agent.role,
)
self.output = task_output
if self.callback:
self.callback(self.output)
return exported_output
return task_output
def prompt(self) -> str:
"""Prompt the task.
@@ -292,70 +285,92 @@ class Task(BaseModel):
)
return copied_task
def _export_output(self, result: str) -> Union[str, dict, BaseModel]:
exported_result = result
instructions = "I'm gonna convert this raw text into valid JSON."
def _export_output(
self, result: str
) -> Dict[str, Union[BaseModel, Dict[str, Any]]]:
output = {
"pydantic": None,
"json": None,
}
if self.output_pydantic or self.output_json:
model = self.output_pydantic or self.output_json
model_output = self._convert_to_model(result)
output["pydantic"] = (
model_output if isinstance(model_output, BaseModel) else None
)
output["json"] = model_output if isinstance(model_output, dict) else None
# try to convert task_output directly to pydantic/json
if self.output_file:
self._save_output(output["raw"])
return output
def _convert_to_model(self, result: str) -> Union[dict, BaseModel, str]:
model = self.output_pydantic or self.output_json
try:
return self._validate_model(result, model)
except Exception:
return self._handle_partial_json(result, model)
def _validate_model(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel]:
exported_result = model.model_validate_json(result)
if self.output_json:
return exported_result.model_dump()
return exported_result
def _handle_partial_json(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel, str]:
match = re.search(r"({.*})", result, re.DOTALL)
if match:
try:
# type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
exported_result = model.model_validate_json(result)
exported_result = model.model_validate_json(match.group(0))
if self.output_json:
# type: ignore # "str" has no attribute "model_dump"
return exported_result.model_dump()
return exported_result
except Exception:
# sometimes the response contains valid JSON in the middle of text
match = re.search(r"({.*})", result, re.DOTALL)
if match:
try:
# type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
exported_result = model.model_validate_json(match.group(0))
if self.output_json:
# type: ignore # "str" has no attribute "model_dump"
return exported_result.model_dump()
return exported_result
except Exception:
pass
pass
# type: ignore # Item "None" of "Agent | None" has no attribute "function_calling_llm"
llm = self.agent.function_calling_llm or self.agent.llm
return self._convert_with_instructions(result, model)
if not self._is_gpt(llm):
# type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]"
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
def _convert_with_instructions(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel, str]:
llm = self.agent.function_calling_llm or self.agent.llm
instructions = self._get_conversion_instructions(model, llm)
converter = Converter(
llm=llm, text=result, model=model, instructions=instructions
converter = Converter(
llm=llm, text=result, model=model, instructions=instructions
)
exported_result = (
converter.to_pydantic() if self.output_pydantic else converter.to_json()
)
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
if self.output_pydantic:
exported_result = converter.to_pydantic()
elif self.output_json:
exported_result = converter.to_json()
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
exported_result = result
if self.output_file:
content = (
# type: ignore # "str" has no attribute "json"
exported_result
if not self.output_pydantic
else exported_result.json()
)
self._save_file(content)
return result
return exported_result
def _get_conversion_instructions(self, model: Type[BaseModel], llm: Any) -> str:
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(llm):
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
return instructions
def _save_output(self, content: str) -> None:
directory = os.path.dirname(self.output_file)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(self.output_file, "w", encoding="utf-8") as file:
file.write(content)
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None

View File

@@ -1,24 +1,56 @@
from typing import Optional, Union
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, Field, model_validator
# TODO: This is a breaking change. Confirm with @joao
class TaskOutput(BaseModel):
"""Class that represents the result of a task."""
description: str = Field(description="Description of the task")
summary: Optional[str] = Field(description="Summary of the task", default=None)
exported_output: Union[str, BaseModel] = Field(
description="Output of the task", default=None
raw_output: str = Field(description="Result of the task")
pydantic_output: Optional[BaseModel] = Field(
description="Pydantic model output", default=None
)
json_output: Optional[Dict[str, Any]] = Field(
description="JSON output", default=None
)
agent: str = Field(description="Agent that executed the task")
raw_output: str = Field(description="Result of the task")
@model_validator(mode="after")
def set_summary(self):
"""Set the summary field based on the description."""
excerpt = " ".join(self.description.split(" ")[:10])
self.summary = f"{excerpt}..."
return self
def result(self):
return self.exported_output
# TODO: Ask @joao what is the desired behavior here
def result(self) -> Union[str, BaseModel, Dict[str, Any]]:
"""Return the result of the task based on the available output."""
if self.pydantic_output:
return self.pydantic_output
elif self.json_output:
return self.json_output
else:
return self.raw_output
def __getitem__(self, key: str) -> Any:
"""Retrieve a value from the pydantic_output or json_output based on the key."""
if self.pydantic_output and hasattr(self.pydantic_output, key):
return getattr(self.pydantic_output, key)
if self.json_output and key in self.json_output:
return self.json_output[key]
raise KeyError(f"Key '{key}' not found in pydantic_output or json_output")
def to_output_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json_output:
output_dict.update(self.json_output)
if self.pydantic_output:
output_dict.update(self.pydantic_output.dict())
return output_dict
def __str__(self) -> str:
return self.raw_output

View File

@@ -273,7 +273,7 @@ class Telemetry:
except Exception:
pass
def end_crew(self, crew, output):
def end_crew(self, crew, final_string_output):
if (self.ready) and (crew.share_crew):
try:
self._add_attribute(
@@ -281,7 +281,9 @@ class Telemetry:
"crewai_version",
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(crew._execution_span, "crew_output", output)
self._add_attribute(
crew._execution_span, "crew_output", final_string_output
)
self._add_attribute(
crew._execution_span,
"crew_tasks_output",

View File

@@ -1,9 +1,9 @@
from .converter import Converter, ConverterError
from .fileHandler import FileHandler
from .i18n import I18N
from .instructor import Instructor
from .logger import Logger
from .parser import YamlParser
from .printer import Printer
from .prompts import Prompts
from .rpm_controller import RPMController
from .fileHandler import FileHandler
from .parser import YamlParser

View File

@@ -0,0 +1,12 @@
from typing import List
from crewai.tasks.task_output import TaskOutput
def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> str:
"""Generate string context from the task outputs."""
dividers = "\n\n----------\n\n"
# Join task outputs with dividers
context = dividers.join(output.raw_output for output in task_outputs)
return context