Files
crewAI/src/crewai/task.py
2024-02-24 03:11:41 -03:00

217 lines
7.1 KiB
Python

import threading
import uuid
from typing import Any, List, Optional, Type
from langchain_openai import ChatOpenAI
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.tasks.task_output import TaskOutput
from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
class Task(BaseModel):
"""Class that represent a task to be executed."""
class Config:
arbitrary_types_allowed = True
__hash__ = object.__hash__ # type: ignore
used_tools: int = 0
i18n: I18N = I18N()
thread: threading.Thread = None
description: str = Field(description="Description of the actual task.")
callback: Optional[Any] = Field(
description="Callback to be executed after the task is completed.", default=None
)
agent: Optional[Agent] = Field(
description="Agent responsible for execution the task.", default=None
)
expected_output: Optional[str] = Field(
description="Clear definition of expected output for the task.",
default=None,
)
context: Optional[List["Task"]] = Field(
description="Other tasks that will have their output used as context for this task.",
default=None,
)
async_execution: Optional[bool] = Field(
description="Whether the task should be executed asynchronously or not.",
default=False,
)
output_json: Optional[Type[BaseModel]] = Field(
description="A Pydantic model to be used to create a JSON output.",
default=None,
)
output_pydantic: Optional[Type[BaseModel]] = Field(
description="A Pydantic model to be used to create a Pydantic output.",
default=None,
)
output_file: Optional[str] = Field(
description="A file path to be used to create a file output.",
default=None,
)
output: Optional[TaskOutput] = Field(
description="Task output, it's final result after being executed", default=None
)
tools: Optional[List[Any]] = Field(
default_factory=list,
description="Tools the agent is limited to use for this task.",
)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def check_tools(self):
"""Check if the tools are set."""
if not self.tools and self.agent and self.agent.tools:
self.tools.extend(self.agent.tools)
return self
@model_validator(mode="after")
def check_output(self):
"""Check if an output type is set."""
output_types = [self.output_json, self.output_pydantic]
if len([type for type in output_types if type]) > 1:
raise PydanticCustomError(
"output_type",
"Only one output type can be set, either output_pydantic or output_json.",
{},
)
return self
def execute(
self,
agent: Agent | None = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
"""Execute the task.
Returns:
Output of the task.
"""
agent = agent or self.agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
if self.context:
context = []
for task in self.context:
if task.async_execution:
task.thread.join()
if task and task.output:
context.append(task.output.raw_output)
context = "\n".join(context)
tools = tools or self.tools
if self.async_execution:
self.thread = threading.Thread(
target=self._execute, args=(agent, self, context, tools)
)
self.thread.start()
else:
result = self._execute(
task=self,
agent=agent,
context=context,
tools=tools,
)
return result
def _execute(self, agent, task, context, tools):
result = agent.execute_task(
task=task,
context=context,
tools=tools,
)
exported_output = self._export_output(result)
self.output = TaskOutput(
description=self.description,
exported_output=exported_output,
raw_output=result,
)
if self.callback:
self.callback(self.output)
return exported_output
def prompt(self) -> str:
"""Prompt the task.
Returns:
Prompt of the task.
"""
tasks_slices = [self.description]
if self.expected_output:
output = self.i18n.slice("expected_output").format(
expected_output=self.expected_output
)
tasks_slices = [self.description, output]
return "\n".join(tasks_slices)
def _export_output(self, result: str) -> Any:
exported_result = result
instructions = "I'm gonna convert this raw text into valid JSON."
if self.output_pydantic or self.output_json:
model = self.output_pydantic or self.output_json
llm = self.agent.function_calling_llm or self.agent.llm
if not self._is_gpt(llm):
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
llm=llm, text=result, model=model, instructions=instructions
)
if self.output_pydantic:
exported_result = converter.to_pydantic()
elif self.output_json:
exported_result = converter.to_json()
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
exported_result = result
if self.output_file:
content = (
exported_result if not self.output_pydantic else exported_result.json()
)
self._save_file(content)
return exported_result
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base == None
def _save_file(self, result: Any) -> None:
with open(self.output_file, "w") as file:
file.write(result)
return None