Files
crewAI/src/crewai/tasks/task_output.py
Devin AI 5d3c34b3ea fix: Improve type annotations across multiple files
- Replace Optional[set[str]] with Union[set[str], None] in json methods
- Fix add_nodes_to_network call parameters in flow_visualizer
- Add __base__=BaseModel to create_model call in structured_tool
- Clean up imports in provider.py

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-01-01 21:29:15 +00:00

76 lines
2.6 KiB
Python

import json
from typing import Any, Callable, Dict, Optional, Union
from pydantic import BaseModel, Field, model_validator
from crewai.tasks.output_format import OutputFormat
class TaskOutput(BaseModel):
"""Class that represents the result of a task."""
description: str = Field(description="Description of the task")
name: Optional[str] = Field(description="Name of the task", default=None)
expected_output: Optional[str] = Field(
description="Expected output of the task", default=None
)
summary: Optional[str] = Field(description="Summary of the task", default=None)
raw: str = Field(description="Raw output of the task", default="")
pydantic: Optional[BaseModel] = Field(
description="Pydantic output of task", default=None
)
json_dict: Optional[Dict[str, Any]] = Field(
description="JSON dictionary of task", default=None
)
agent: str = Field(description="Agent that executed the task")
output_format: OutputFormat = Field(
description="Output format of the task", default=OutputFormat.RAW
)
@model_validator(mode="after")
def set_summary(self):
"""Set the summary field based on the description."""
excerpt = " ".join(self.description.split(" ")[:10])
self.summary = f"{excerpt}..."
return self
def json(
self,
*,
include: Union[set[str], None] = None,
exclude: Union[set[str], None] = None,
by_alias: bool = False,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any,
) -> str:
if self.output_format != OutputFormat.JSON:
raise ValueError(
"""
Invalid output format requested.
If you would like to access the JSON output,
please make sure to set the output_json property for the task
"""
)
return json.dumps(self.json_dict, default=encoder, **dumps_kwargs)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json_dict:
output_dict.update(self.json_dict)
elif self.pydantic:
output_dict.update(self.pydantic.model_dump())
return output_dict
def __str__(self) -> str:
if self.pydantic:
return str(self.pydantic)
if self.json_dict:
return str(self.json_dict)
return self.raw