The majority of tasks are working now. Need to fix converter class

This commit is contained in:
Brandon Hancock
2024-07-09 15:40:39 -04:00
parent ecc3d913da
commit 7518cb9def
10 changed files with 3332 additions and 198 deletions

View File

@@ -254,6 +254,27 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def validate_end_with_at_most_one_async_task(self):
"""Validates that the crew ends with at most one asynchronous task."""
final_async_task_count = 0
# Traverse tasks backward
for task in reversed(self.tasks):
if task.async_execution:
final_async_task_count += 1
else:
break # Stop traversing as soon as a non-async task is encountered
if final_async_task_count > 1:
raise PydanticCustomError(
"async_task_count",
"The crew must end with at most one asynchronous task.",
{},
)
return self
def _setup_from_config(self):
assert self.config is not None, "Config should not be None."
@@ -347,8 +368,7 @@ class Crew(BaseModel):
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result, manager_metrics = self._run_hierarchical_process() # type: ignore # Incompatible types in assignment (expression has type "str | dict[str, Any]", variable has type "str")
metrics.append(manager_metrics)
result = self._run_hierarchical_process() # type: ignore # Incompatible types in assignment (expression has type "str | dict[str, Any]", variable has type "str")
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
@@ -502,11 +522,6 @@ class Crew(BaseModel):
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
# Important: There should only be one task output in the list
# If there are more or 0, something went wrong.
if len(task_outputs) != 1:
@@ -516,10 +531,15 @@ class Crew(BaseModel):
final_task_output = task_outputs[0]
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
return CrewOutput(
_raw=final_task_output.raw,
_pydantic=final_task_output.pydantic,
_json=final_task_output.json,
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=[task.output for task in self.tasks if task.output],
token_usage=token_usage,
)
@@ -530,7 +550,8 @@ class Crew(BaseModel):
if self.output_log_file:
self._file_handler.log(agent=role, task=output, status="completed")
def _run_hierarchical_process(self) -> Tuple[CrewOutput, Dict[str, Any]]:
# TODO: @joao, Breaking change. Changed return type. Usage metrics is included in crewoutput
def _run_hierarchical_process(self) -> CrewOutput:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
@@ -564,7 +585,11 @@ class Crew(BaseModel):
)
if task.async_execution:
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
context = (
aggregate_raw_outputs_from_tasks(task.context)
if task.context
else aggregate_raw_outputs_from_task_outputs(task_outputs)
)
future = task.execute_async(
agent=manager, context=context, tools=manager.tools
)
@@ -582,7 +607,11 @@ class Crew(BaseModel):
# Clear the futures list after processing all async results
futures.clear()
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
context = (
aggregate_raw_outputs_from_tasks(task.context)
if task.context
else aggregate_raw_outputs_from_task_outputs(task_outputs)
)
task_output = task.execute_sync(
agent=manager, context=context, tools=manager.tools
)
@@ -598,14 +627,26 @@ class Crew(BaseModel):
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
# Important: There should only be one task output in the list
# If there are more or 0, something went wrong.
if len(task_outputs) != 1:
raise ValueError(
"Something went wrong. Kickoff should return only one task output."
)
final_task_output = task_outputs[0]
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
return (
self._format_output(task_outputs, token_usage),
token_usage,
return CrewOutput(
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=[task.output for task in self.tasks if task.output],
token_usage=token_usage,
)
def copy(self):

View File

@@ -1,6 +1,7 @@
import json
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field, PrivateAttr
from pydantic import BaseModel, Field
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
@@ -9,9 +10,13 @@ from crewai.tasks.task_output import TaskOutput
class CrewOutput(BaseModel):
"""Class that represents the result of a crew."""
_raw: str = PrivateAttr(default="")
_pydantic: Optional[BaseModel] = PrivateAttr(default=None)
_json: Optional[Dict[str, Any]] = PrivateAttr(default=None)
raw: str = Field(description="Raw output of crew", default="")
pydantic: Optional[BaseModel] = Field(
description="Pydantic output of Crew", default=None
)
json_dict: Optional[Dict[str, Any]] = Field(
description="JSON dict output of Crew", default=None
)
tasks_output: list[TaskOutput] = Field(
description="Output of each task", default=[]
)
@@ -19,32 +24,30 @@ class CrewOutput(BaseModel):
description="Processed token summary", default={}
)
@property
def raw(self) -> str:
return self._raw
# TODO: Joao - Adding this safety check breakes when people want to see
# The full output of a CrewOutput.
# @property
# def pydantic(self) -> Optional[BaseModel]:
# # Check if the final task output included a pydantic model
# if self.tasks_output[-1].output_format != OutputFormat.PYDANTIC:
# raise ValueError(
# "No pydantic model found in the final task. Please make sure to set the output_pydantic property in the final task in your crew."
# )
# return self._pydantic
@property
def pydantic(self) -> Optional[BaseModel]:
# Check if the final task output included a pydantic model
if self.tasks_output[-1].output_format != OutputFormat.PYDANTIC:
raise ValueError(
"No pydantic model found in the final task. Please make sure to set the output_pydantic property in the final task in your crew."
)
return self._pydantic
@property
def json(self) -> Optional[Dict[str, Any]]:
def json(self) -> Optional[str]:
if self.tasks_output[-1].output_format != OutputFormat.JSON:
raise ValueError(
"No JSON output found in the final task. Please make sure to set the output_json property in the final task in your crew."
)
return self._json
return json.dumps(self.json_dict)
def to_output_dict(self) -> Dict[str, Any]:
if self.json:
return self.json
def to_dict(self) -> Dict[str, Any]:
if self.json_dict:
return self.json_dict
if self.pydantic:
return self.pydantic.model_dump()
raise ValueError("No output to convert to dictionary")
@@ -52,6 +55,6 @@ class CrewOutput(BaseModel):
def __str__(self):
if self.pydantic:
return str(self.pydantic)
if self.json:
return str(self.json)
if self.json_dict:
return str(self.json_dict)
return self.raw

View File

@@ -1,3 +1,4 @@
import json
import os
import re
import threading
@@ -99,6 +100,10 @@ class Task(BaseModel):
description="Whether the task should have a human review the final answer of the agent",
default=False,
)
converter_cls: Optional[Type[Converter]] = Field(
description="A converter class used to export structured output",
default=None,
)
_telemetry: Telemetry
_execution_span: Span | None = None
@@ -216,16 +221,13 @@ class Task(BaseModel):
tools=tools,
)
if self.output_file:
self._save_output(result)
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(
description=self.description,
raw=result,
pydantic=pydantic_output,
json=json_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
@@ -238,6 +240,15 @@ class Task(BaseModel):
self._telemetry.task_ended(self._execution_span, self)
self._execution_span = None
if self.output_file:
content = (
json_output
if json_output
else pydantic_output.model_dump_json() if pydantic_output else result
)
print("CALLING SAVE FILE", content)
self._save_file(content)
return task_output
def prompt(self) -> str:
@@ -312,10 +323,19 @@ class Task(BaseModel):
if self.output_pydantic or self.output_json:
model_output = self._convert_to_model(result)
print("MODEL OUTPUT", model_output)
pydantic_output = (
model_output if isinstance(model_output, BaseModel) else None
)
json_output = model_output if isinstance(model_output, dict) else None
print("PYDANTIC OUTPUT", pydantic_output)
if isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
else:
json_output = model_output if isinstance(model_output, dict) else None
print("JSON OUTPUT", json_output)
return pydantic_output, json_output
@@ -327,13 +347,18 @@ class Task(BaseModel):
try:
return self._validate_model(result, model)
except Exception:
print("EXCEPTION IN _convert_to_model")
return self._handle_partial_json(result, model)
def _validate_model(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel]:
print("VALIDATE MODEL - RESULT", result)
print("VALIDATE MODEL - MODEL", model)
exported_result = model.model_validate_json(result)
print("EXPORTED RESULT", exported_result)
if self.output_json:
print("HERE IN _validate_model", self.output_json)
return exported_result.model_dump()
return exported_result
@@ -344,10 +369,12 @@ class Task(BaseModel):
if match:
try:
exported_result = model.model_validate_json(match.group(0))
print("EXPORTED RESULT in handle_partial", exported_result)
if self.output_json:
return exported_result.model_dump()
return exported_result
except Exception:
print("EXCEPTION IN _handle_partial_json")
pass
return self._convert_with_instructions(result, model)
@@ -355,16 +382,21 @@ class Task(BaseModel):
def _convert_with_instructions(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel, str]:
print("CONVERT WITH INSTRUCTIONS - RESULT", result)
print("CONVERT WITH INSTRUCTIONS - model", model)
llm = self.agent.function_calling_llm or self.agent.llm
instructions = self._get_conversion_instructions(model, llm)
converter = Converter(
llm=llm, text=result, model=model, instructions=instructions
)
print("CONVERTER", converter)
exported_result = (
converter.to_pydantic() if self.output_pydantic else converter.to_json()
)
print("EXPORTED RESULT IN CONVERT WITH INSTRUCTIONS", exported_result)
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
@@ -402,6 +434,7 @@ class Task(BaseModel):
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def _save_file(self, result: Any) -> None:
print("SAVING FILE with content", result)
directory = os.path.dirname(self.output_file) # type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None"
if directory and not os.path.exists(directory):

View File

@@ -1,3 +1,4 @@
import json
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field, model_validator
@@ -11,12 +12,14 @@ class TaskOutput(BaseModel):
description: str = Field(description="Description of the task")
summary: Optional[str] = Field(description="Summary of the task", default=None)
raw: str = Field(
description="Result of the task"
description="Raw output of the task", default=""
) # TODO: @joao: breaking change, by renaming raw_output to raw, but now consistent with CrewOutput
pydantic: Optional[BaseModel] = Field(
description="Pydantic model output", default=None
description="Pydantic output of task", default=None
)
json_dict: Optional[Dict[str, Any]] = Field(
description="JSON dictionary of task", default=None
)
json: Optional[Dict[str, Any]] = Field(description="JSON output", default=None)
agent: str = Field(description="Agent that executed the task")
output_format: OutputFormat = Field(
description="Output format of the task", default=OutputFormat.RAW
@@ -29,11 +32,40 @@ class TaskOutput(BaseModel):
self.summary = f"{excerpt}..."
return self
# TODO: Joao - Adding this safety check breakes when people want to see
# The full output of a TaskOutput or CrewOutput.
# @property
# def pydantic(self) -> Optional[BaseModel]:
# # Check if the final task output included a pydantic model
# if self.output_format != OutputFormat.PYDANTIC:
# raise ValueError(
# """
# Invalid output format requested.
# If you would like to access the pydantic model,
# please make sure to set the output_pydantic property for the task.
# """
# )
# return self._pydantic
@property
def json(self) -> Optional[str]:
if self.output_format != OutputFormat.JSON:
raise ValueError(
"""
Invalid output format requested.
If you would like to access the JSON output,
please make sure to set the output_json property for the task
"""
)
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json:
output_dict.update(self.json)
if self.json_dict:
output_dict.update(self.json_dict)
if self.pydantic:
output_dict.update(self.pydantic.model_dump())
return output_dict
@@ -41,6 +73,6 @@ class TaskOutput(BaseModel):
def __str__(self) -> str:
if self.pydantic:
return str(self.pydantic)
if self.json:
return str(self.json)
if self.json_dict:
return str(self.json_dict)
return self.raw