Files
crewAI/src/crewai/task.py
Lorenze Jay 7addda9398
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Lorenze/better tracing events (#3382)
* feat: implement tool usage limit exception handling

- Introduced `ToolUsageLimitExceeded` exception to manage maximum usage limits for tools.
- Enhanced `CrewStructuredTool` to check and raise this exception when the usage limit is reached.
- Updated `_run` and `_execute` methods to include usage limit checks and handle exceptions appropriately, improving reliability and user feedback.

* feat: enhance PlusAPI and ToolUsage with task metadata

- Removed the `send_trace_batch` method from PlusAPI to streamline the API.
- Added timeout parameters to trace event methods in PlusAPI for improved reliability.
- Updated ToolUsage to include task metadata (task name and ID) in event emissions, enhancing traceability and context during tool usage.
- Refactored event handling in LLM and ToolUsage events to ensure task information is consistently captured.

* feat: enhance memory and event handling with task and agent metadata

- Added task and agent metadata to various memory and event classes, improving traceability and context during memory operations.
- Updated the `ContextualMemory` and `Memory` classes to associate tasks and agents, allowing for better context management.
- Enhanced event emissions in `LLM`, `ToolUsage`, and memory events to include task and agent information, facilitating improved debugging and monitoring.
- Refactored event handling to ensure consistent capture of task and agent details across the system.

* drop

* refactor: clean up unused imports in memory and event modules

- Removed unused TYPE_CHECKING imports from long_term_memory.py to streamline the code.
- Eliminated unnecessary import from memory_events.py, enhancing clarity and maintainability.

* fix memory tests

* fix task_completed payload

* fix: remove unused test agent variable in external memory tests

* refactor: remove unused agent parameter from Memory class save method

- Eliminated the agent parameter from the save method in the Memory class to streamline the code and improve clarity.
- Updated the TraceBatchManager class by moving initialization of attributes into the constructor for better organization and readability.

* refactor: enhance ExecutionState and ReasoningEvent classes with optional task and agent identifiers

- Added optional `current_agent_id` and `current_task_id` attributes to the `ExecutionState` class for better tracking of agent and task states.
- Updated the `from_task` attribute in the `ReasoningEvent` class to use `Optional[Any]` instead of a specific type, improving flexibility in event handling.

* refactor: update ExecutionState class by removing unused agent and task identifiers

- Removed the `current_agent_id` and `current_task_id` attributes from the `ExecutionState` class to simplify the code and enhance clarity.
- Adjusted the import statements to include `Optional` for better type handling.

* refactor: streamline LLM event handling in LiteAgent

- Removed unused LLM event emissions (LLMCallStartedEvent, LLMCallCompletedEvent, LLMCallFailedEvent) from the LiteAgent class to simplify the code and improve performance.
- Adjusted the flow of LLM response handling by eliminating unnecessary event bus interactions, enhancing clarity and maintainability.

* flow ownership and not emitting events when a crew is done

* refactor: remove unused agent parameter from ShortTermMemory save method

- Eliminated the agent parameter from the save method in the ShortTermMemory class to streamline the code and improve clarity.
- This change enhances the maintainability of the memory management system by reducing unnecessary complexity.

* runtype check fix

* fixing tests

* fix lints

* fix: update event assertions in test_llm_emits_event_with_lite_agent

- Adjusted the expected counts for completed and started events in the test to reflect the correct behavior of the LiteAgent.
- Updated assertions for agent roles and IDs to match the expected values after recent changes in event handling.

* fix: update task name assertions in event tests

- Modified assertions in `test_stream_llm_emits_event_with_task_and_agent_info` and `test_llm_emits_event_with_task_and_agent_info` to use `task.description` as a fallback for `task.name`. This ensures that the tests correctly validate the task name even when it is not explicitly set.

* fix: update test assertions for output values and improve readability

- Updated assertions in `test_output_json_dict_hierarchical` to reflect the correct expected score value.
- Enhanced readability of assertions in `test_output_pydantic_to_another_task` and `test_key` by formatting the error messages for clarity.
- These changes ensure that the tests accurately validate the expected outputs and improve overall code quality.

* test fixes

* fix crew_test

* added another fixture

* fix: ensure agent and task assignments in contextual memory are conditional

- Updated the ContextualMemory class to check for the existence of short-term, long-term, external, and extended memory before assigning agent and task attributes. This prevents potential attribute errors when memory types are not initialized.
2025-08-26 09:09:46 -07:00

813 lines
30 KiB
Python

import datetime
import inspect
import json
import logging
import threading
import uuid
from concurrent.futures import Future
from copy import copy
from hashlib import md5
from pathlib import Path
from typing import (
Any,
Callable,
ClassVar,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
get_args,
get_origin,
)
from pydantic import (
UUID4,
BaseModel,
Field,
PrivateAttr,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.security import Fingerprint, SecurityConfig
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.guardrail import process_guardrail, GuardrailResult
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import interpolate_only
class Task(BaseModel):
"""Class that represents a task to be executed.
Each task must have a description, an expected output and an agent responsible for execution.
Attributes:
agent: Agent responsible for task execution. Represents entity performing task.
async_execution: Boolean flag indicating asynchronous task execution.
callback: Function/object executed post task completion for additional actions.
config: Dictionary containing task-specific configuration parameters.
context: List of Task instances providing task context or input data.
description: Descriptive text detailing task's purpose and execution.
expected_output: Clear definition of expected task outcome.
output_file: File path for storing task output.
create_directory: Whether to create the directory for output_file if it doesn't exist.
output_json: Pydantic model for structuring JSON output.
output_pydantic: Pydantic model for task output.
security_config: Security configuration including fingerprinting.
tools: List of tools/resources limited for task execution.
allow_crewai_trigger_context: Optional flag to control crewai_trigger_payload injection.
None (default): Auto-inject for first task only.
True: Always inject trigger payload for this task.
False: Never inject trigger payload, even for first task.
"""
__hash__ = object.__hash__ # type: ignore
logger: ClassVar[logging.Logger] = logging.getLogger(__name__)
used_tools: int = 0
tools_errors: int = 0
delegations: int = 0
i18n: I18N = I18N()
name: Optional[str] = Field(default=None)
prompt_context: Optional[str] = None
description: str = Field(description="Description of the actual task.")
expected_output: str = Field(
description="Clear definition of expected output for the task."
)
config: Optional[Dict[str, Any]] = Field(
description="Configuration for the agent",
default=None,
)
callback: Optional[Any] = Field(
description="Callback to be executed after the task is completed.", default=None
)
agent: Optional[BaseAgent] = Field(
description="Agent responsible for execution the task.", default=None
)
context: Union[List["Task"], None, _NotSpecified] = Field(
description="Other tasks that will have their output used as context for this task.",
default=NOT_SPECIFIED,
)
async_execution: Optional[bool] = Field(
description="Whether the task should be executed asynchronously or not.",
default=False,
)
output_json: Optional[Type[BaseModel]] = Field(
description="A Pydantic model to be used to create a JSON output.",
default=None,
)
output_pydantic: Optional[Type[BaseModel]] = Field(
description="A Pydantic model to be used to create a Pydantic output.",
default=None,
)
output_file: Optional[str] = Field(
description="A file path to be used to create a file output.",
default=None,
)
create_directory: Optional[bool] = Field(
description="Whether to create the directory for output_file if it doesn't exist.",
default=True,
)
output: Optional[TaskOutput] = Field(
description="Task output, it's final result after being executed", default=None
)
tools: Optional[List[BaseTool]] = Field(
default_factory=list,
description="Tools the agent is limited to use for this task.",
)
security_config: SecurityConfig = Field(
default_factory=SecurityConfig,
description="Security configuration for the task.",
)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
human_input: Optional[bool] = Field(
description="Whether the task should have a human review the final answer of the agent",
default=False,
)
markdown: Optional[bool] = Field(
description="Whether the task should instruct the agent to return the final answer formatted in Markdown",
default=False,
)
converter_cls: Optional[Type[Converter]] = Field(
description="A converter class used to export structured output",
default=None,
)
processed_by_agents: Set[str] = Field(default_factory=set)
guardrail: Optional[Union[Callable[[TaskOutput], Tuple[bool, Any]], str]] = Field(
default=None,
description="Function or string description of a guardrail to validate task output before proceeding to next task",
)
max_retries: int = Field(
default=3, description="Maximum number of retries when guardrail fails"
)
retry_count: int = Field(default=0, description="Current number of retries")
start_time: Optional[datetime.datetime] = Field(
default=None, description="Start time of the task execution"
)
end_time: Optional[datetime.datetime] = Field(
default=None, description="End time of the task execution"
)
allow_crewai_trigger_context: Optional[bool] = Field(
default=None,
description="Whether this task should append 'Trigger Payload: {crewai_trigger_payload}' to the task description when crewai_trigger_payload exists in crew inputs.",
)
model_config = {"arbitrary_types_allowed": True}
@field_validator("guardrail")
@classmethod
def validate_guardrail_function(
cls, v: Optional[str | Callable]
) -> Optional[str | Callable]:
"""
If v is a callable, validate that the guardrail function has the correct signature and behavior.
If v is a string, return it as is.
While type hints provide static checking, this validator ensures runtime safety by:
1. Verifying the function accepts exactly one parameter (the TaskOutput)
2. Checking return type annotations match Tuple[bool, Any] if present
3. Providing clear, immediate error messages for debugging
This runtime validation is crucial because:
- Type hints are optional and can be ignored at runtime
- Function signatures need immediate validation before task execution
- Clear error messages help users debug guardrail implementation issues
Args:
v: The guardrail function to validate or a string describing the guardrail task
Returns:
The validated guardrail function or a string describing the guardrail task
Raises:
ValueError: If the function signature is invalid or return annotation
doesn't match Tuple[bool, Any]
"""
if v is not None and callable(v):
sig = inspect.signature(v)
positional_args = [
param
for param in sig.parameters.values()
if param.default is inspect.Parameter.empty
]
if len(positional_args) != 1:
raise ValueError("Guardrail function must accept exactly one parameter")
# Check return annotation if present, but don't require it
return_annotation = sig.return_annotation
if return_annotation != inspect.Signature.empty:
return_annotation_args = get_args(return_annotation)
if not (
get_origin(return_annotation) is tuple
and len(return_annotation_args) == 2
and return_annotation_args[0] is bool
and (
return_annotation_args[1] is Any
or return_annotation_args[1] is str
or return_annotation_args[1] is TaskOutput
or return_annotation_args[1] == Union[str, TaskOutput]
)
):
raise ValueError(
"If return type is annotated, it must be Tuple[bool, Any]"
)
return v
_guardrail: Optional[Callable] = PrivateAttr(default=None)
_original_description: Optional[str] = PrivateAttr(default=None)
_original_expected_output: Optional[str] = PrivateAttr(default=None)
_original_output_file: Optional[str] = PrivateAttr(default=None)
_thread: Optional[threading.Thread] = PrivateAttr(default=None)
@model_validator(mode="before")
@classmethod
def process_model_config(cls, values):
return process_config(values, cls)
@model_validator(mode="after")
def validate_required_fields(self):
required_fields = ["description", "expected_output"]
for field in required_fields:
if getattr(self, field) is None:
raise ValueError(
f"{field} must be provided either directly or through config"
)
return self
@model_validator(mode="after")
def ensure_guardrail_is_callable(self) -> "Task":
if callable(self.guardrail):
self._guardrail = self.guardrail
elif isinstance(self.guardrail, str):
from crewai.tasks.llm_guardrail import LLMGuardrail
assert self.agent is not None
self._guardrail = LLMGuardrail(
description=self.guardrail, llm=self.agent.llm
)
return self
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@field_validator("output_file")
@classmethod
def output_file_validation(cls, value: Optional[str]) -> Optional[str]:
"""Validate the output file path.
Args:
value: The output file path to validate. Can be None or a string.
If the path contains template variables (e.g. {var}), leading slashes are preserved.
For regular paths, leading slashes are stripped.
Returns:
The validated and potentially modified path, or None if no path was provided.
Raises:
ValueError: If the path contains invalid characters, path traversal attempts,
or other security concerns.
"""
if value is None:
return None
# Basic security checks
if ".." in value:
raise ValueError(
"Path traversal attempts are not allowed in output_file paths"
)
# Check for shell expansion first
if value.startswith("~") or value.startswith("$"):
raise ValueError(
"Shell expansion characters are not allowed in output_file paths"
)
# Then check other shell special characters
if any(char in value for char in ["|", ">", "<", "&", ";"]):
raise ValueError(
"Shell special characters are not allowed in output_file paths"
)
# Don't strip leading slash if it's a template path with variables
if "{" in value or "}" in value:
# Validate template variable format
template_vars = [part.split("}")[0] for part in value.split("{")[1:]]
for var in template_vars:
if not var.isidentifier():
raise ValueError(f"Invalid template variable name: {var}")
return value
# Strip leading slash for regular paths
if value.startswith("/"):
return value[1:]
return value
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "Task":
"""Set attributes based on the agent configuration."""
if self.config:
for key, value in self.config.items():
setattr(self, key, value)
return self
@model_validator(mode="after")
def check_tools(self):
"""Check if the tools are set."""
if not self.tools and self.agent and self.agent.tools:
self.tools.extend(self.agent.tools)
return self
@model_validator(mode="after")
def check_output(self):
"""Check if an output type is set."""
output_types = [self.output_json, self.output_pydantic]
if len([type for type in output_types if type]) > 1:
raise PydanticCustomError(
"output_type",
"Only one output type can be set, either output_pydantic or output_json.",
{},
)
return self
def execute_sync(
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> TaskOutput:
"""Execute the task synchronously."""
return self._execute_core(agent, context, tools)
@property
def key(self) -> str:
description = self._original_description or self.description
expected_output = self._original_expected_output or self.expected_output
source = [description, expected_output]
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@property
def execution_duration(self) -> float | None:
if not self.start_time or not self.end_time:
return None
return (self.end_time - self.start_time).total_seconds()
def execute_async(
self,
agent: BaseAgent | None = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
threading.Thread(
daemon=True,
target=self._execute_task_async,
args=(agent, context, tools, future),
).start()
return future
def _execute_task_async(
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
future: Future[TaskOutput],
) -> None:
"""Execute the task asynchronously with context handling."""
result = self._execute_core(agent, context, tools)
future.set_result(result)
def _execute_core(
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
) -> TaskOutput:
"""Run the core execution logic of the task."""
try:
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
self.start_time = datetime.datetime.now()
self.prompt_context = context
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))
result = agent.execute_task(
task=self,
context=context,
tools=tools,
)
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
if self._guardrail:
guardrail_result = process_guardrail(
output=task_output,
guardrail=self._guardrail,
retry_count=self.retry_count,
)
if not guardrail_result.success:
if self.retry_count >= self.max_retries:
raise Exception(
f"Task failed guardrail validation after {self.max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
self.retry_count += 1
context = self.i18n.errors("validation_error").format(
guardrail_result_error=guardrail_result.error,
task_output=task_output.raw,
)
printer = Printer()
printer.print(
content=f"Guardrail blocked, retrying, due to: {guardrail_result.error}\n",
color="yellow",
)
return self._execute_core(agent, context, tools)
if guardrail_result.result is None:
raise Exception(
"Task guardrail returned None as result. This is not allowed."
)
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result
self.output = task_output
self.end_time = datetime.datetime.now()
if self.callback:
self.callback(self.output)
crew = self.agent.crew # type: ignore[union-attr]
if crew and crew.task_callback and crew.task_callback != self.callback:
crew.task_callback(self.output)
if self.output_file:
content = (
json_output
if json_output
else (
pydantic_output.model_dump_json() if pydantic_output else result
)
)
self._save_file(content)
crewai_event_bus.emit(
self, TaskCompletedEvent(output=task_output, task=self)
)
return task_output
except Exception as e:
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))
raise e # Re-raise the exception after emitting the event
def _process_guardrail(self, task_output: TaskOutput) -> GuardrailResult:
assert self._guardrail is not None
from crewai.utilities.events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
crewai_event_bus.emit(
self,
LLMGuardrailStartedEvent(
guardrail=self._guardrail, retry_count=self.retry_count
),
)
try:
result = self._guardrail(task_output)
guardrail_result = GuardrailResult.from_tuple(result)
except Exception as e:
guardrail_result = GuardrailResult(
success=False, result=None, error=f"Guardrail execution error: {str(e)}"
)
crewai_event_bus.emit(
self,
LLMGuardrailCompletedEvent(
success=guardrail_result.success,
result=guardrail_result.result,
error=guardrail_result.error,
retry_count=self.retry_count,
),
)
return guardrail_result
def prompt(self) -> str:
"""Generates the task prompt with optional markdown formatting.
When the markdown attribute is True, instructions for formatting the
response in Markdown syntax will be added to the prompt.
Returns:
str: The formatted prompt string containing the task description,
expected output, and optional markdown formatting instructions.
"""
description = self.description
should_inject = self.allow_crewai_trigger_context
if should_inject and self.agent:
crew = getattr(self.agent, "crew", None)
if crew and hasattr(crew, "_inputs") and crew._inputs:
trigger_payload = crew._inputs.get("crewai_trigger_payload")
if trigger_payload is not None:
description += f"\n\nTrigger Payload: {trigger_payload}"
tasks_slices = [description]
output = self.i18n.slice("expected_output").format(
expected_output=self.expected_output
)
tasks_slices = [description, output]
if self.markdown:
markdown_instruction = """Your final answer MUST be formatted in Markdown syntax.
Follow these guidelines:
- Use # for headers
- Use ** for bold text
- Use * for italic text
- Use - or * for bullet points
- Use `code` for inline code
- Use ```language for code blocks"""
tasks_slices.append(markdown_instruction)
return "\n".join(tasks_slices)
def interpolate_inputs_and_add_conversation_history(
self, inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]]
) -> None:
"""Interpolate inputs into the task description, expected output, and output file path.
Add conversation history if present.
Args:
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, and floats.
Raises:
ValueError: If a required template variable is missing from inputs.
"""
if self._original_description is None:
self._original_description = self.description
if self._original_expected_output is None:
self._original_expected_output = self.expected_output
if self.output_file is not None and self._original_output_file is None:
self._original_output_file = self.output_file
if not inputs:
return
try:
self.description = interpolate_only(
input_string=self._original_description, inputs=inputs
)
except KeyError as e:
raise ValueError(
f"Missing required template variable '{e.args[0]}' in description"
) from e
except ValueError as e:
raise ValueError(f"Error interpolating description: {str(e)}") from e
try:
self.expected_output = interpolate_only(
input_string=self._original_expected_output, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(f"Error interpolating expected_output: {str(e)}") from e
if self.output_file is not None:
try:
self.output_file = interpolate_only(
input_string=self._original_output_file, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(
f"Error interpolating output_file path: {str(e)}"
) from e
if "crew_chat_messages" in inputs and inputs["crew_chat_messages"]:
conversation_instruction = self.i18n.slice(
"conversation_history_instruction"
)
crew_chat_messages_json = str(inputs["crew_chat_messages"])
try:
crew_chat_messages = json.loads(crew_chat_messages_json)
except json.JSONDecodeError as e:
print("An error occurred while parsing crew chat messages:", e)
raise
conversation_history = "\n".join(
f"{msg['role'].capitalize()}: {msg['content']}"
for msg in crew_chat_messages
if isinstance(msg, dict) and "role" in msg and "content" in msg
)
self.description += (
f"\n\n{conversation_instruction}\n\n{conversation_history}"
)
def increment_tools_errors(self) -> None:
"""Increment the tools errors counter."""
self.tools_errors += 1
def increment_delegations(self, agent_name: Optional[str]) -> None:
"""Increment the delegations counter."""
if agent_name:
self.processed_by_agents.add(agent_name)
self.delegations += 1
def copy(
self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"]
) -> "Task":
"""Creates a deep copy of the Task while preserving its original class type.
Args:
agents: List of agents available for the task.
task_mapping: Dictionary mapping task IDs to Task instances.
Returns:
A copy of the task with the same class type as the original.
"""
exclude = {
"id",
"agent",
"context",
"tools",
}
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
cloned_context = (
[task_mapping[context_task.key] for context_task in self.context]
if isinstance(self.context, list)
else None
)
def get_agent_by_role(role: str) -> Union["BaseAgent", None]:
return next((agent for agent in agents if agent.role == role), None)
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
copied_task = self.__class__(
**copied_data,
context=cloned_context,
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
def _export_output(
self, result: str
) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]:
pydantic_output: Optional[BaseModel] = None
json_output: Optional[Dict[str, Any]] = None
if self.output_pydantic or self.output_json:
model_output = convert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)
if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
return pydantic_output, json_output
def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
if self.output_pydantic:
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _save_file(self, result: Union[Dict, str, Any]) -> None:
"""Save task output to a file.
Note:
For cross-platform file writing, especially on Windows, consider using FileWriterTool
from the crewai_tools package:
pip install 'crewai[tools]'
from crewai_tools import FileWriterTool
Args:
result: The result to save to the file. Can be a dict or any stringifiable object.
Raises:
ValueError: If output_file is not set
RuntimeError: If there is an error writing to the file. For cross-platform
compatibility, especially on Windows, use FileWriterTool from crewai_tools
package.
"""
if self.output_file is None:
raise ValueError("output_file is not set.")
FILEWRITER_RECOMMENDATION = (
"For cross-platform file writing, especially on Windows, "
"use FileWriterTool from crewai_tools package."
)
try:
resolved_path = Path(self.output_file).expanduser().resolve()
directory = resolved_path.parent
if self.create_directory and not directory.exists():
directory.mkdir(parents=True, exist_ok=True)
elif not self.create_directory and not directory.exists():
raise RuntimeError(
f"Directory {directory} does not exist and create_directory is False"
)
with resolved_path.open("w", encoding="utf-8") as file:
if isinstance(result, dict):
import json
json.dump(result, file, ensure_ascii=False, indent=2)
else:
file.write(str(result))
except (OSError, IOError) as e:
raise RuntimeError(
"\n".join(
[f"Failed to save output file: {e}", FILEWRITER_RECOMMENDATION]
)
)
return None
def __repr__(self):
return f"Task(description={self.description}, expected_output={self.expected_output})"
@property
def fingerprint(self) -> Fingerprint:
"""Get the fingerprint of the task.
Returns:
Fingerprint: The fingerprint of the task
"""
return self.security_config.fingerprint