Adding support to export tasks as json, pydantic objects, and save as file

This commit is contained in:
João Moura
2024-02-19 22:46:34 -03:00
parent 3cfc8dd4e0
commit a649eb8555
3 changed files with 70 additions and 9 deletions

View File

@@ -41,7 +41,6 @@ class Crew(BaseModel):
full_output: Whether the crew should return the full output with all tasks outputs or just the final output.
step_callback: Callback to be executed after each step for every agents execution.
share_crew: Whether you want to share the complete crew infromation and execution with crewAI to make the library better, and allow us to train models.
_cache_handler: Handles caching for the crew's operations.
"""
__hash__ = object.__hash__ # type: ignore

View File

@@ -1,13 +1,13 @@
import threading
import uuid
from typing import Any, List, Optional
from typing import Any, List, Optional, Type
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.tasks.task_output import TaskOutput
from crewai.utilities import I18N
from crewai.utilities import I18N, Instructor
class Task(BaseModel):
@@ -39,10 +39,22 @@ class Task(BaseModel):
description="Whether the task should be executed asynchronously or not.",
default=False,
)
output_json: Optional[Type[BaseModel]] = Field(
description="A Pydantic model to be used to create a JSON output.",
default=None,
)
output_pydantic: Optional[Type[BaseModel]] = Field(
description="A Pydantic model to be used to create a Pydantic output.",
default=None,
)
output_file: Optional[str] = Field(
description="A file path to be used to create a file output.",
default=None,
)
output: Optional[TaskOutput] = Field(
description="Task output, it's final result after being executed", default=None
)
tools: List[Any] = Field(
tools: Optional[List[Any]] = Field(
default_factory=list,
description="Tools the agent is limited to use for this task.",
)
@@ -67,6 +79,18 @@ class Task(BaseModel):
self.tools.extend(self.agent.tools)
return self
@model_validator(mode="after")
def check_output(self):
"""Check if an output type is set."""
output_types = [self.output_json, self.output_pydantic]
if len([type for type in output_types if type]) > 1:
raise PydanticCustomError(
"output_type",
"Only one output type can be set, either output_pydantic or output_json.",
{},
)
return self
def execute(
self,
agent: Agent | None = None,
@@ -115,9 +139,19 @@ class Task(BaseModel):
context=context,
tools=tools,
)
self.output = TaskOutput(description=self.description, result=result)
self.callback(self.output) if self.callback else None
return result
exported_output = self._export_output(result)
self.output = TaskOutput(
description=self.description,
exported_output=exported_output,
raw_output=result,
)
if self.callback:
self.callback(self.output)
return exported_output
def prompt(self) -> str:
"""Prompt the task.
@@ -133,3 +167,28 @@ class Task(BaseModel):
)
tasks_slices = [self.description, output]
return "\n".join(tasks_slices)
def _export_output(self, result: str) -> Any:
if self.output_pydantic or self.output_json:
model = self.output_pydantic or self.output_json
instructor = Instructor(
agent=self.agent,
content=result,
model=model,
)
if self.output_pydantic:
result = instructor.to_pydantic()
elif self.output_json:
result = instructor.to_json()
if self.output_file:
content = result if not self.output_pydantic else result.json()
self._save_file(content)
return result
def _save_file(self, result: Any) -> None:
with open(self.output_file, "w") as file:
file.write(result)
return None

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, Union
from pydantic import BaseModel, Field, model_validator
@@ -8,7 +8,10 @@ class TaskOutput(BaseModel):
description: str = Field(description="Description of the task")
summary: Optional[str] = Field(description="Summary of the task", default=None)
result: str = Field(description="Result of the task")
exported_output: Union[str, BaseModel] = Field(
description="Output of the task", default=None
)
raw_output: str = Field(description="Result of the task")
@model_validator(mode="after")
def set_summary(self):