mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
chore: enhance typing and documentation in tasks module (#3467)
- Disable E501 line length linting rule - Add Google-style docstrings to tasks leaf file - Modernize typing and docs in task_output.py - Improve typing and documentation in conditional_task.py
This commit is contained in:
@@ -131,6 +131,7 @@ select = [
|
||||
"I001", # sort imports
|
||||
"I002", # remove unused imports
|
||||
]
|
||||
ignore = ["E501"] # ignore line too long
|
||||
|
||||
[tool.mypy]
|
||||
exclude = ["src/crewai/cli/templates", "tests"]
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
from typing import Any, Callable
|
||||
"""Conditional task execution based on previous task output."""
|
||||
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
@@ -8,37 +11,54 @@ from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
class ConditionalTask(Task):
|
||||
"""
|
||||
A task that can be conditionally executed based on the output of another task.
|
||||
Note: This cannot be the only task you have in your crew and cannot be the first since its needs context from the previous task.
|
||||
"""A task that can be conditionally executed based on the output of another task.
|
||||
|
||||
This task type allows for dynamic workflow execution based on the results of
|
||||
previous tasks in the crew execution chain.
|
||||
|
||||
Attributes:
|
||||
condition: Function that evaluates previous task output to determine execution.
|
||||
|
||||
Notes:
|
||||
- Cannot be the only task in your crew
|
||||
- Cannot be the first task since it needs context from the previous task
|
||||
"""
|
||||
|
||||
condition: Callable[[TaskOutput], bool] = Field(
|
||||
condition: Callable[[TaskOutput], bool] | None = Field(
|
||||
default=None,
|
||||
description="Maximum number of retries for an agent to execute a task when an error occurs.",
|
||||
description="Function that determines whether the task should be executed based on previous task output.",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
condition: Callable[[Any], bool],
|
||||
condition: Callable[[Any], bool] | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self.condition = condition
|
||||
|
||||
def should_execute(self, context: TaskOutput) -> bool:
|
||||
"""
|
||||
Determines whether the conditional task should be executed based on the provided context.
|
||||
"""Determines whether the conditional task should be executed based on the provided context.
|
||||
|
||||
Args:
|
||||
context (Any): The context or output from the previous task that will be evaluated by the condition.
|
||||
context: The output from the previous task that will be evaluated by the condition.
|
||||
|
||||
Returns:
|
||||
bool: True if the task should be executed, False otherwise.
|
||||
True if the task should be executed, False otherwise.
|
||||
|
||||
Raises:
|
||||
ValueError: If no condition function is set.
|
||||
"""
|
||||
if self.condition is None:
|
||||
raise ValueError("No condition function set for conditional task")
|
||||
return self.condition(context)
|
||||
|
||||
def get_skipped_task_output(self):
|
||||
def get_skipped_task_output(self) -> TaskOutput:
|
||||
"""Generate a TaskOutput for when the conditional task is skipped.
|
||||
|
||||
Returns:
|
||||
Empty TaskOutput with RAW format indicating the task was skipped.
|
||||
"""
|
||||
return TaskOutput(
|
||||
description=self.description,
|
||||
raw="",
|
||||
|
||||
@@ -1,8 +1,16 @@
|
||||
"""Task output format definitions for CrewAI."""
|
||||
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class OutputFormat(str, Enum):
|
||||
"""Enum that represents the output format of a task."""
|
||||
"""Enum that represents the output format of a task.
|
||||
|
||||
Attributes:
|
||||
JSON: Output as JSON dictionary format
|
||||
PYDANTIC: Output as Pydantic model instance
|
||||
RAW: Output as raw unprocessed string
|
||||
"""
|
||||
|
||||
JSON = "json"
|
||||
PYDANTIC = "pydantic"
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
"""Task output representation and formatting."""
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
|
||||
@@ -7,19 +9,31 @@ from crewai.tasks.output_format import OutputFormat
|
||||
|
||||
|
||||
class TaskOutput(BaseModel):
|
||||
"""Class that represents the result of a task."""
|
||||
"""Class that represents the result of a task.
|
||||
|
||||
Attributes:
|
||||
description: Description of the task
|
||||
name: Optional name of the task
|
||||
expected_output: Expected output of the task
|
||||
summary: Summary of the task (auto-generated from description)
|
||||
raw: Raw output of the task
|
||||
pydantic: Pydantic model output of the task
|
||||
json_dict: JSON dictionary output of the task
|
||||
agent: Agent that executed the task
|
||||
output_format: Output format of the task (JSON, PYDANTIC, or RAW)
|
||||
"""
|
||||
|
||||
description: str = Field(description="Description of the task")
|
||||
name: Optional[str] = Field(description="Name of the task", default=None)
|
||||
expected_output: Optional[str] = Field(
|
||||
name: str | None = Field(description="Name of the task", default=None)
|
||||
expected_output: str | None = Field(
|
||||
description="Expected output of the task", default=None
|
||||
)
|
||||
summary: Optional[str] = Field(description="Summary of the task", default=None)
|
||||
summary: str | None = Field(description="Summary of the task", default=None)
|
||||
raw: str = Field(description="Raw output of the task", default="")
|
||||
pydantic: Optional[BaseModel] = Field(
|
||||
pydantic: BaseModel | None = Field(
|
||||
description="Pydantic output of task", default=None
|
||||
)
|
||||
json_dict: Optional[Dict[str, Any]] = Field(
|
||||
json_dict: dict[str, Any] | None = Field(
|
||||
description="JSON dictionary of task", default=None
|
||||
)
|
||||
agent: str = Field(description="Agent that executed the task")
|
||||
@@ -29,13 +43,28 @@ class TaskOutput(BaseModel):
|
||||
|
||||
@model_validator(mode="after")
|
||||
def set_summary(self):
|
||||
"""Set the summary field based on the description."""
|
||||
"""Set the summary field based on the description.
|
||||
|
||||
Returns:
|
||||
Self with updated summary field.
|
||||
"""
|
||||
excerpt = " ".join(self.description.split(" ")[:10])
|
||||
self.summary = f"{excerpt}..."
|
||||
return self
|
||||
|
||||
@property
|
||||
def json(self) -> Optional[str]:
|
||||
def json(self) -> str | None: # type: ignore[override]
|
||||
"""Get the JSON string representation of the task output.
|
||||
|
||||
Returns:
|
||||
JSON string representation of the task output.
|
||||
|
||||
Raises:
|
||||
ValueError: If output format is not JSON.
|
||||
|
||||
Notes:
|
||||
TODO: Refactor to use model_dump_json() to avoid BaseModel method conflict
|
||||
"""
|
||||
if self.output_format != OutputFormat.JSON:
|
||||
raise ValueError(
|
||||
"""
|
||||
@@ -47,8 +76,13 @@ class TaskOutput(BaseModel):
|
||||
|
||||
return json.dumps(self.json_dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert json_output and pydantic_output to a dictionary."""
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert json_output and pydantic_output to a dictionary.
|
||||
|
||||
Returns:
|
||||
Dictionary representation of the task output. Prioritizes json_dict
|
||||
over pydantic model dump if both are available.
|
||||
"""
|
||||
output_dict = {}
|
||||
if self.json_dict:
|
||||
output_dict.update(self.json_dict)
|
||||
|
||||
Reference in New Issue
Block a user