mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-11 09:08:31 +00:00
This commit adds a conditional check to ensure that the output file directory exists before attempting to create it. This ensures that the code does not fail in cases where the directory does not exist and needs to be created. The condition is added in the `_save_file` method of the `Task` class, ensuring that the correct behavior is maintained for saving results to a file.
296 lines
10 KiB
Python
296 lines
10 KiB
Python
import threading
|
|
import uuid
|
|
from typing import Any, Dict, List, Optional, Type
|
|
import os
|
|
|
|
from langchain_openai import ChatOpenAI
|
|
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
|
|
from pydantic_core import PydanticCustomError
|
|
|
|
from crewai.agent import Agent
|
|
from crewai.tasks.task_output import TaskOutput
|
|
from crewai.utilities import I18N, Converter, ConverterError, Printer
|
|
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
|
|
|
|
|
|
class Task(BaseModel):
|
|
"""Class that represents a task to be executed.
|
|
|
|
Each task must have a description, an expected output and an agent responsible for execution.
|
|
|
|
Attributes:
|
|
agent: Agent responsible for task execution. Represents entity performing task.
|
|
async_execution: Boolean flag indicating asynchronous task execution.
|
|
callback: Function/object executed post task completion for additional actions.
|
|
config: Dictionary containing task-specific configuration parameters.
|
|
context: List of Task instances providing task context or input data.
|
|
description: Descriptive text detailing task's purpose and execution.
|
|
expected_output: Clear definition of expected task outcome.
|
|
output_file: File path for storing task output.
|
|
output_json: Pydantic model for structuring JSON output.
|
|
output_pydantic: Pydantic model for task output.
|
|
tools: List of tools/resources limited for task execution.
|
|
"""
|
|
|
|
class Config:
|
|
arbitrary_types_allowed = True
|
|
|
|
__hash__ = object.__hash__ # type: ignore
|
|
used_tools: int = 0
|
|
tools_errors: int = 0
|
|
delegations: int = 0
|
|
i18n: I18N = I18N()
|
|
thread: threading.Thread = None
|
|
prompt_context: Optional[str] = None
|
|
description: str = Field(description="Description of the actual task.")
|
|
expected_output: str = Field(
|
|
description="Clear definition of expected output for the task."
|
|
)
|
|
config: Optional[Dict[str, Any]] = Field(
|
|
description="Configuration for the agent",
|
|
default=None,
|
|
)
|
|
callback: Optional[Any] = Field(
|
|
description="Callback to be executed after the task is completed.", default=None
|
|
)
|
|
agent: Optional[Agent] = Field(
|
|
description="Agent responsible for execution the task.", default=None
|
|
)
|
|
context: Optional[List["Task"]] = Field(
|
|
description="Other tasks that will have their output used as context for this task.",
|
|
default=None,
|
|
)
|
|
async_execution: Optional[bool] = Field(
|
|
description="Whether the task should be executed asynchronously or not.",
|
|
default=False,
|
|
)
|
|
output_json: Optional[Type[BaseModel]] = Field(
|
|
description="A Pydantic model to be used to create a JSON output.",
|
|
default=None,
|
|
)
|
|
output_pydantic: Optional[Type[BaseModel]] = Field(
|
|
description="A Pydantic model to be used to create a Pydantic output.",
|
|
default=None,
|
|
)
|
|
output_file: Optional[str] = Field(
|
|
description="A file path to be used to create a file output.",
|
|
default=None,
|
|
)
|
|
output: Optional[TaskOutput] = Field(
|
|
description="Task output, it's final result after being executed", default=None
|
|
)
|
|
tools: Optional[List[Any]] = Field(
|
|
default_factory=list,
|
|
description="Tools the agent is limited to use for this task.",
|
|
)
|
|
id: UUID4 = Field(
|
|
default_factory=uuid.uuid4,
|
|
frozen=True,
|
|
description="Unique identifier for the object, not set by user.",
|
|
)
|
|
human_input: Optional[bool] = Field(
|
|
description="Whether the task should have a human review the final answer of the agent",
|
|
default=False,
|
|
)
|
|
|
|
_original_description: str | None = None
|
|
_original_expected_output: str | None = None
|
|
|
|
def __init__(__pydantic_self__, **data):
|
|
config = data.pop("config", {})
|
|
super().__init__(**config, **data)
|
|
|
|
@field_validator("id", mode="before")
|
|
@classmethod
|
|
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
|
|
if v:
|
|
raise PydanticCustomError(
|
|
"may_not_set_field", "This field is not to be set by the user.", {}
|
|
)
|
|
|
|
@model_validator(mode="after")
|
|
def set_attributes_based_on_config(self) -> "Task":
|
|
"""Set attributes based on the agent configuration."""
|
|
if self.config:
|
|
for key, value in self.config.items():
|
|
setattr(self, key, value)
|
|
return self
|
|
|
|
@model_validator(mode="after")
|
|
def check_tools(self):
|
|
"""Check if the tools are set."""
|
|
if not self.tools and self.agent and self.agent.tools:
|
|
self.tools.extend(self.agent.tools)
|
|
return self
|
|
|
|
@model_validator(mode="after")
|
|
def check_output(self):
|
|
"""Check if an output type is set."""
|
|
output_types = [self.output_json, self.output_pydantic]
|
|
if len([type for type in output_types if type]) > 1:
|
|
raise PydanticCustomError(
|
|
"output_type",
|
|
"Only one output type can be set, either output_pydantic or output_json.",
|
|
{},
|
|
)
|
|
return self
|
|
|
|
def execute(
|
|
self,
|
|
agent: Agent | None = None,
|
|
context: Optional[str] = None,
|
|
tools: Optional[List[Any]] = None,
|
|
) -> str:
|
|
"""Execute the task.
|
|
|
|
Returns:
|
|
Output of the task.
|
|
"""
|
|
|
|
agent = agent or self.agent
|
|
if not agent:
|
|
raise Exception(
|
|
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
|
|
)
|
|
|
|
if self.context:
|
|
context = []
|
|
for task in self.context:
|
|
if task.async_execution:
|
|
task.thread.join()
|
|
if task and task.output:
|
|
context.append(task.output.raw_output)
|
|
context = "\n".join(context)
|
|
|
|
self.prompt_context = context
|
|
tools = tools or self.tools
|
|
|
|
if self.async_execution:
|
|
self.thread = threading.Thread(
|
|
target=self._execute, args=(agent, self, context, tools)
|
|
)
|
|
self.thread.start()
|
|
else:
|
|
result = self._execute(
|
|
task=self,
|
|
agent=agent,
|
|
context=context,
|
|
tools=tools,
|
|
)
|
|
return result
|
|
|
|
def _execute(self, agent, task, context, tools):
|
|
result = agent.execute_task(
|
|
task=task,
|
|
context=context,
|
|
tools=tools,
|
|
)
|
|
|
|
exported_output = self._export_output(result)
|
|
|
|
self.output = TaskOutput(
|
|
description=self.description,
|
|
exported_output=exported_output,
|
|
raw_output=result,
|
|
)
|
|
|
|
if self.callback:
|
|
self.callback(self.output)
|
|
|
|
return exported_output
|
|
|
|
def prompt(self) -> str:
|
|
"""Prompt the task.
|
|
|
|
Returns:
|
|
Prompt of the task.
|
|
"""
|
|
tasks_slices = [self.description]
|
|
|
|
output = self.i18n.slice("expected_output").format(
|
|
expected_output=self.expected_output
|
|
)
|
|
tasks_slices = [self.description, output]
|
|
return "\n".join(tasks_slices)
|
|
|
|
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
|
|
"""Interpolate inputs into the task description and expected output."""
|
|
if self._original_description is None:
|
|
self._original_description = self.description
|
|
if self._original_expected_output is None:
|
|
self._original_expected_output = self.expected_output
|
|
|
|
if inputs:
|
|
self.description = self._original_description.format(**inputs)
|
|
self.expected_output = self._original_expected_output.format(**inputs)
|
|
|
|
def increment_tools_errors(self) -> None:
|
|
"""Increment the tools errors counter."""
|
|
self.tools_errors += 1
|
|
|
|
def increment_delegations(self) -> None:
|
|
"""Increment the delegations counter."""
|
|
self.delegations += 1
|
|
|
|
def _export_output(self, result: str) -> Any:
|
|
exported_result = result
|
|
instructions = "I'm gonna convert this raw text into valid JSON."
|
|
|
|
if self.output_pydantic or self.output_json:
|
|
model = self.output_pydantic or self.output_json
|
|
|
|
# try to convert task_output directly to pydantic/json
|
|
try:
|
|
exported_result = model.model_validate_json(result)
|
|
if self.output_json:
|
|
return exported_result.model_dump()
|
|
return exported_result
|
|
except Exception:
|
|
pass
|
|
|
|
llm = self.agent.function_calling_llm or self.agent.llm
|
|
|
|
if not self._is_gpt(llm):
|
|
model_schema = PydanticSchemaParser(model=model).get_schema()
|
|
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
|
|
|
|
converter = Converter(
|
|
llm=llm, text=result, model=model, instructions=instructions
|
|
)
|
|
|
|
if self.output_pydantic:
|
|
exported_result = converter.to_pydantic()
|
|
elif self.output_json:
|
|
exported_result = converter.to_json()
|
|
|
|
if isinstance(exported_result, ConverterError):
|
|
Printer().print(
|
|
content=f"{exported_result.message} Using raw output instead.",
|
|
color="red",
|
|
)
|
|
exported_result = result
|
|
|
|
if self.output_file:
|
|
content = (
|
|
exported_result if not self.output_pydantic else exported_result.json()
|
|
)
|
|
self._save_file(content)
|
|
|
|
return exported_result
|
|
|
|
def _is_gpt(self, llm) -> bool:
|
|
return isinstance(llm, ChatOpenAI) and llm.openai_api_base == None
|
|
|
|
def _save_file(self, result: Any) -> None:
|
|
directory = os.path.dirname(self.output_file)
|
|
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
|
|
with open(self.output_file, "w") as file:
|
|
file.write(result)
|
|
return None
|
|
|
|
def __repr__(self):
|
|
return f"Task(description={self.description}, expected_output={self.expected_output})"
|