diff --git a/pyproject.toml b/pyproject.toml index 8a23d6c76..b99c1e6ca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -131,6 +131,7 @@ select = [ "I001", # sort imports "I002", # remove unused imports ] +ignore = ["E501"] # ignore line too long [tool.mypy] exclude = ["src/crewai/cli/templates", "tests"] diff --git a/src/crewai/tasks/conditional_task.py b/src/crewai/tasks/conditional_task.py index d2edee12f..376eddab8 100644 --- a/src/crewai/tasks/conditional_task.py +++ b/src/crewai/tasks/conditional_task.py @@ -1,4 +1,7 @@ -from typing import Any, Callable +"""Conditional task execution based on previous task output.""" + +from collections.abc import Callable +from typing import Any from pydantic import Field @@ -8,37 +11,54 @@ from crewai.tasks.task_output import TaskOutput class ConditionalTask(Task): - """ - A task that can be conditionally executed based on the output of another task. - Note: This cannot be the only task you have in your crew and cannot be the first since its needs context from the previous task. + """A task that can be conditionally executed based on the output of another task. + + This task type allows for dynamic workflow execution based on the results of + previous tasks in the crew execution chain. + + Attributes: + condition: Function that evaluates previous task output to determine execution. + + Notes: + - Cannot be the only task in your crew + - Cannot be the first task since it needs context from the previous task """ - condition: Callable[[TaskOutput], bool] = Field( + condition: Callable[[TaskOutput], bool] | None = Field( default=None, - description="Maximum number of retries for an agent to execute a task when an error occurs.", + description="Function that determines whether the task should be executed based on previous task output.", ) def __init__( self, - condition: Callable[[Any], bool], + condition: Callable[[Any], bool] | None = None, **kwargs, ): super().__init__(**kwargs) self.condition = condition def should_execute(self, context: TaskOutput) -> bool: - """ - Determines whether the conditional task should be executed based on the provided context. + """Determines whether the conditional task should be executed based on the provided context. Args: - context (Any): The context or output from the previous task that will be evaluated by the condition. + context: The output from the previous task that will be evaluated by the condition. Returns: - bool: True if the task should be executed, False otherwise. + True if the task should be executed, False otherwise. + + Raises: + ValueError: If no condition function is set. """ + if self.condition is None: + raise ValueError("No condition function set for conditional task") return self.condition(context) - def get_skipped_task_output(self): + def get_skipped_task_output(self) -> TaskOutput: + """Generate a TaskOutput for when the conditional task is skipped. + + Returns: + Empty TaskOutput with RAW format indicating the task was skipped. + """ return TaskOutput( description=self.description, raw="", diff --git a/src/crewai/tasks/output_format.py b/src/crewai/tasks/output_format.py index dbea9ffcf..825ee9ea0 100644 --- a/src/crewai/tasks/output_format.py +++ b/src/crewai/tasks/output_format.py @@ -1,8 +1,16 @@ +"""Task output format definitions for CrewAI.""" + from enum import Enum class OutputFormat(str, Enum): - """Enum that represents the output format of a task.""" + """Enum that represents the output format of a task. + + Attributes: + JSON: Output as JSON dictionary format + PYDANTIC: Output as Pydantic model instance + RAW: Output as raw unprocessed string + """ JSON = "json" PYDANTIC = "pydantic" diff --git a/src/crewai/tasks/task_output.py b/src/crewai/tasks/task_output.py index b0e8aecd4..ba9f95c18 100644 --- a/src/crewai/tasks/task_output.py +++ b/src/crewai/tasks/task_output.py @@ -1,5 +1,7 @@ +"""Task output representation and formatting.""" + import json -from typing import Any, Dict, Optional +from typing import Any from pydantic import BaseModel, Field, model_validator @@ -7,19 +9,31 @@ from crewai.tasks.output_format import OutputFormat class TaskOutput(BaseModel): - """Class that represents the result of a task.""" + """Class that represents the result of a task. + + Attributes: + description: Description of the task + name: Optional name of the task + expected_output: Expected output of the task + summary: Summary of the task (auto-generated from description) + raw: Raw output of the task + pydantic: Pydantic model output of the task + json_dict: JSON dictionary output of the task + agent: Agent that executed the task + output_format: Output format of the task (JSON, PYDANTIC, or RAW) + """ description: str = Field(description="Description of the task") - name: Optional[str] = Field(description="Name of the task", default=None) - expected_output: Optional[str] = Field( + name: str | None = Field(description="Name of the task", default=None) + expected_output: str | None = Field( description="Expected output of the task", default=None ) - summary: Optional[str] = Field(description="Summary of the task", default=None) + summary: str | None = Field(description="Summary of the task", default=None) raw: str = Field(description="Raw output of the task", default="") - pydantic: Optional[BaseModel] = Field( + pydantic: BaseModel | None = Field( description="Pydantic output of task", default=None ) - json_dict: Optional[Dict[str, Any]] = Field( + json_dict: dict[str, Any] | None = Field( description="JSON dictionary of task", default=None ) agent: str = Field(description="Agent that executed the task") @@ -29,13 +43,28 @@ class TaskOutput(BaseModel): @model_validator(mode="after") def set_summary(self): - """Set the summary field based on the description.""" + """Set the summary field based on the description. + + Returns: + Self with updated summary field. + """ excerpt = " ".join(self.description.split(" ")[:10]) self.summary = f"{excerpt}..." return self @property - def json(self) -> Optional[str]: + def json(self) -> str | None: # type: ignore[override] + """Get the JSON string representation of the task output. + + Returns: + JSON string representation of the task output. + + Raises: + ValueError: If output format is not JSON. + + Notes: + TODO: Refactor to use model_dump_json() to avoid BaseModel method conflict + """ if self.output_format != OutputFormat.JSON: raise ValueError( """ @@ -47,8 +76,13 @@ class TaskOutput(BaseModel): return json.dumps(self.json_dict) - def to_dict(self) -> Dict[str, Any]: - """Convert json_output and pydantic_output to a dictionary.""" + def to_dict(self) -> dict[str, Any]: + """Convert json_output and pydantic_output to a dictionary. + + Returns: + Dictionary representation of the task output. Prioritizes json_dict + over pydantic model dump if both are available. + """ output_dict = {} if self.json_dict: output_dict.update(self.json_dict)