Compare commits

...

8 Commits

Author SHA1 Message Date
Devin AI
acff336363 Fix remaining lint issues: N818, RET504, and S101 errors
- Rename ToolUsageLimitExceeded to ToolUsageLimitExceededError (N818)
- Remove unnecessary assignments before return statements (RET504)
- Add noqa S101 comments to all assert statements in test file
- Fix all remaining lint errors while preserving functionality

Co-Authored-By: João <joao@crewai.com>
2025-09-10 02:03:43 +00:00
Devin AI
bbf76d0e42 Fix CI issues: type-checker, lint errors (S101, RET504, B904)
- Add type ignore for create_model call to fix type-checker error
- Fix exception chaining (B904) by using 'from e' syntax
- Fix unnecessary assignment (RET504) by returning directly
- Add noqa comments for S101 assert detection in test file

Co-Authored-By: João <joao@crewai.com>
2025-09-10 01:58:41 +00:00
Devin AI
a533e111e8 Fix duplicate tool execution in structured_tool.invoke()
- Remove duplicate function call on line 285 that caused tools to execute twice
- Add comprehensive test to prevent regression
- Fixes issue #3489

Co-Authored-By: João <joao@crewai.com>
2025-09-10 01:51:27 +00:00
Samarth Rawat
6676d94ba1 Doc Fix: fixed number of memory types (#3288)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Update memory.mdx

* Update memory.mdx

---------

Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-09-09 14:11:56 -04:00
Greyson LaLonde
d5126d159b chore: improve typing and docs in agents leaf files (#3461)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Add typing and Google-style docstrings to agents leaf files
- Add TODO notes
2025-09-08 11:57:34 -04:00
Greyson LaLonde
fa06aea8d5 chore: modernize security module typing (#3469)
- Disable E501, apply Ruff formatting
- Update typing (Self, BeforeValidator), remove dead code
- Convert Fingerprint to Pydantic dataclass and fix serialization/copy behavior
- Add TODO for dynamic namespace config
2025-09-08 11:52:59 -04:00
Greyson LaLonde
f936e0f69b chore: enhance typing and documentation in tasks module (#3467)
- Disable E501 line length linting rule
- Add Google-style docstrings to tasks leaf file
- Modernize typing and docs in task_output.py
- Improve typing and documentation in conditional_task.py
2025-09-08 11:42:23 -04:00
Greyson LaLonde
37c5e88d02 ci: configure pre-commit hooks and github actions to use uv run (#3479) 2025-09-08 11:30:28 -04:00
15 changed files with 397 additions and 257 deletions

View File

@@ -32,7 +32,7 @@ jobs:
run: uv python install ${{ matrix.python-version }}
- name: Install dependencies
run: uv sync --dev --no-install-project
run: uv sync --dev --all-extras --no-install-project
- name: Get changed Python files
id: changed-files

View File

@@ -1,14 +1,18 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.12.11
- repo: local
hooks:
- id: ruff
args: ["--config", "pyproject.toml"]
name: ruff
entry: uv run ruff check
language: system
types: [python]
- id: ruff-format
args: ["--config", "pyproject.toml"]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.17.1
hooks:
name: ruff-format
entry: uv run ruff format
language: system
types: [python]
- id: mypy
args: ["--config-file", "pyproject.toml"]
name: mypy
entry: uv run mypy
language: system
types: [python]

View File

@@ -7,7 +7,7 @@ mode: "wide"
## Overview
The CrewAI framework provides a sophisticated memory system designed to significantly enhance AI agent capabilities. CrewAI offers **three distinct memory approaches** that serve different use cases:
The CrewAI framework provides a sophisticated memory system designed to significantly enhance AI agent capabilities. CrewAI offers **two distinct memory approaches** that serve different use cases:
1. **Basic Memory System** - Built-in short-term, long-term, and entity memory
2. **External Memory** - Standalone external memory providers

View File

@@ -131,6 +131,7 @@ select = [
"I001", # sort imports
"I002", # remove unused imports
]
ignore = ["E501"] # ignore line too long
[tool.mypy]
exclude = ["src/crewai/cli/templates", "tests"]

View File

@@ -1,29 +1,58 @@
"""Base converter adapter for structured output conversion."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from crewai.agents.agent_adapters.base_agent_adapter import BaseAgentAdapter
from crewai.task import Task
class BaseConverterAdapter(ABC):
"""Base class for all converter adapters in CrewAI.
"""Abstract base class for converter adapters in CrewAI.
This abstract class defines the common interface and functionality that all
converter adapters must implement for converting structured output.
Defines the common interface for converting agent outputs to structured formats.
All converter adapters must implement the methods defined here.
"""
def __init__(self, agent_adapter):
def __init__(self, agent_adapter: BaseAgentAdapter) -> None:
"""Initialize the converter adapter.
Args:
agent_adapter: The agent adapter to configure for structured output.
"""
self.agent_adapter = agent_adapter
@abstractmethod
def configure_structured_output(self, task) -> None:
def configure_structured_output(self, task: Task) -> None:
"""Configure agents to return structured output.
Must support json and pydantic output.
Must support both JSON and Pydantic output formats.
Args:
task: The task requiring structured output.
"""
pass
@abstractmethod
def enhance_system_prompt(self, base_prompt: str) -> str:
"""Enhance the system prompt with structured output instructions."""
pass
"""Enhance the system prompt with structured output instructions.
Args:
base_prompt: The original system prompt.
Returns:
Enhanced prompt with structured output guidance.
"""
@abstractmethod
def post_process_result(self, result: str) -> str:
"""Post-process the result to ensure it matches the expected format: string."""
pass
"""Post-process the result to ensure proper string format.
Args:
result: The raw result from agent execution.
Returns:
Processed result as a string.
"""

View File

@@ -1,29 +1,32 @@
"""Base output converter for transforming text into structured formats."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Optional
from typing import Any
from pydantic import BaseModel, Field
class OutputConverter(BaseModel, ABC):
"""
Abstract base class for converting task results into structured formats.
"""Abstract base class for converting text to structured formats.
This class provides a framework for converting unstructured text into
either Pydantic models or JSON, tailored for specific agent requirements.
It uses a language model to interpret and structure the input text based
on given instructions.
Uses language models to transform unstructured text into either Pydantic models
or JSON objects based on provided instructions and target schemas.
Attributes:
text (str): The input text to be converted.
llm (Any): The language model used for conversion.
model (Any): The target model for structuring the output.
instructions (str): Specific instructions for the conversion process.
max_attempts (int): Maximum number of conversion attempts (default: 3).
text: The input text to be converted.
llm: The language model used for conversion.
model: The target Pydantic model class for structuring output.
instructions: Specific instructions for the conversion process.
max_attempts: Maximum number of conversion attempts (default: 3).
"""
text: str = Field(description="Text to be converted.")
llm: Any = Field(description="The language model to be used to convert the text.")
model: Any = Field(description="The model to be used to convert the text.")
model: type[BaseModel] = Field(
description="The model to be used to convert the text."
)
instructions: str = Field(description="Conversion instructions to the LLM.")
max_attempts: int = Field(
description="Max number of attempts to try to get the output formatted.",
@@ -31,11 +34,23 @@ class OutputConverter(BaseModel, ABC):
)
@abstractmethod
def to_pydantic(self, current_attempt=1) -> BaseModel:
"""Convert text to pydantic."""
pass
def to_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Convert text to a Pydantic model instance.
Args:
current_attempt: Current attempt number for retry logic.
Returns:
Pydantic model instance with structured data.
"""
@abstractmethod
def to_json(self, current_attempt=1) -> dict:
"""Convert text to json."""
pass
def to_json(self, current_attempt: int = 1) -> dict[str, Any]:
"""Convert text to a JSON dictionary.
Args:
current_attempt: Current attempt number for retry logic.
Returns:
Dictionary containing structured JSON data.
"""

View File

@@ -1,15 +1,45 @@
from typing import Any, Dict, Optional
"""Cache handler for tool usage results."""
from typing import Any
from pydantic import BaseModel, PrivateAttr
class CacheHandler(BaseModel):
"""Callback handler for tool usage."""
"""Handles caching of tool execution results.
_cache: Dict[str, Any] = PrivateAttr(default_factory=dict)
Provides in-memory caching for tool outputs based on tool name and input.
def add(self, tool, input, output):
Notes:
- TODO: Make thread-safe.
"""
_cache: dict[str, Any] = PrivateAttr(default_factory=dict)
def add(self, tool: str, input: str, output: Any) -> None:
"""Add a tool result to the cache.
Args:
tool: Name of the tool.
input: Input string used for the tool.
output: Output result from tool execution.
Notes:
- TODO: Rename 'input' parameter to avoid shadowing builtin.
"""
self._cache[f"{tool}-{input}"] = output
def read(self, tool, input) -> Optional[str]:
def read(self, tool: str, input: str) -> Any | None:
"""Retrieve a cached tool result.
Args:
tool: Name of the tool.
input: Input string used for the tool.
Returns:
Cached result if found, None otherwise.
Notes:
- TODO: Rename 'input' parameter to avoid shadowing builtin.
"""
return self._cache.get(f"{tool}-{input}")

View File

@@ -0,0 +1,15 @@
"""Security constants for CrewAI.
This module contains security-related constants used throughout the security module.
Notes:
- TODO: Determine if CREW_AI_NAMESPACE should be made dynamic or configurable
"""
from typing import Annotated
from uuid import UUID
CREW_AI_NAMESPACE: Annotated[
UUID,
"Create a deterministic UUID using v5 (SHA-1). Custom namespace for CrewAI to enhance security.",
] = UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")

View File

@@ -1,130 +1,123 @@
"""
Fingerprint Module
"""Fingerprint Module
This module provides functionality for generating and validating unique identifiers
for CrewAI agents. These identifiers are used for tracking, auditing, and security.
"""
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
from typing import Annotated, Any
from uuid import UUID, uuid4, uuid5
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import BaseModel, BeforeValidator, Field, PrivateAttr
from typing_extensions import Self
from crewai.security.constants import CREW_AI_NAMESPACE
def _validate_metadata(v: Any) -> dict[str, Any]:
"""Validate that metadata is a dictionary with string keys and valid values."""
if not isinstance(v, dict):
raise ValueError("Metadata must be a dictionary")
# Validate that all keys are strings
for key, value in v.items():
if not isinstance(key, str):
raise ValueError(f"Metadata keys must be strings, got {type(key)}")
# Validate nested dictionaries (prevent deeply nested structures)
if isinstance(value, dict):
# Check for nested dictionaries (limit depth to 1)
for nested_key, nested_value in value.items():
if not isinstance(nested_key, str):
raise ValueError(
f"Nested metadata keys must be strings, got {type(nested_key)}"
)
if isinstance(nested_value, dict):
raise ValueError("Metadata can only be nested one level deep")
# Check for maximum metadata size (prevent DoS)
if len(str(v)) > 10_000: # Limit metadata size to 10KB
raise ValueError("Metadata size exceeds maximum allowed (10KB)")
return v
class Fingerprint(BaseModel):
"""
A class for generating and managing unique identifiers for agents.
"""A class for generating and managing unique identifiers for agents.
Each agent has dual identifiers:
- Human-readable ID: For debugging and reference (derived from role if not specified)
- Fingerprint UUID: Unique runtime identifier for tracking and auditing
Attributes:
uuid_str (str): String representation of the UUID for this fingerprint, auto-generated
created_at (datetime): When this fingerprint was created, auto-generated
metadata (Dict[str, Any]): Additional metadata associated with this fingerprint
uuid_str: String representation of the UUID for this fingerprint, auto-generated
created_at: When this fingerprint was created, auto-generated
metadata: Additional metadata associated with this fingerprint
"""
uuid_str: str = Field(default_factory=lambda: str(uuid.uuid4()), description="String representation of the UUID")
created_at: datetime = Field(default_factory=datetime.now, description="When this fingerprint was created")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata for this fingerprint")
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_validator('metadata')
@classmethod
def validate_metadata(cls, v):
"""Validate that metadata is a dictionary with string keys and valid values."""
if not isinstance(v, dict):
raise ValueError("Metadata must be a dictionary")
# Validate that all keys are strings
for key, value in v.items():
if not isinstance(key, str):
raise ValueError(f"Metadata keys must be strings, got {type(key)}")
# Validate nested dictionaries (prevent deeply nested structures)
if isinstance(value, dict):
# Check for nested dictionaries (limit depth to 1)
for nested_key, nested_value in value.items():
if not isinstance(nested_key, str):
raise ValueError(f"Nested metadata keys must be strings, got {type(nested_key)}")
if isinstance(nested_value, dict):
raise ValueError("Metadata can only be nested one level deep")
# Check for maximum metadata size (prevent DoS)
if len(str(v)) > 10000: # Limit metadata size to 10KB
raise ValueError("Metadata size exceeds maximum allowed (10KB)")
return v
def __init__(self, **data):
"""Initialize a Fingerprint with auto-generated uuid_str and created_at."""
# Remove uuid_str and created_at from data to ensure they're auto-generated
if 'uuid_str' in data:
data.pop('uuid_str')
if 'created_at' in data:
data.pop('created_at')
# Call the parent constructor with the modified data
super().__init__(**data)
_uuid_str: str = PrivateAttr(default_factory=lambda: str(uuid4()))
_created_at: datetime = PrivateAttr(default_factory=datetime.now)
metadata: Annotated[dict[str, Any], BeforeValidator(_validate_metadata)] = Field(
default_factory=dict
)
@property
def uuid(self) -> uuid.UUID:
def uuid_str(self) -> str:
"""Get the string representation of the UUID for this fingerprint."""
return self._uuid_str
@property
def created_at(self) -> datetime:
"""Get the creation timestamp for this fingerprint."""
return self._created_at
@property
def uuid(self) -> UUID:
"""Get the UUID object for this fingerprint."""
return uuid.UUID(self.uuid_str)
return UUID(self.uuid_str)
@classmethod
def _generate_uuid(cls, seed: str) -> str:
"""
Generate a deterministic UUID based on a seed string.
"""Generate a deterministic UUID based on a seed string.
Args:
seed (str): The seed string to use for UUID generation
seed: The seed string to use for UUID generation
Returns:
str: A string representation of the UUID consistently generated from the seed
A string representation of the UUID consistently generated from the seed
"""
if not isinstance(seed, str):
raise ValueError("Seed must be a string")
if not seed.strip():
raise ValueError("Seed cannot be empty or whitespace")
# Create a deterministic UUID using v5 (SHA-1)
# Custom namespace for CrewAI to enhance security
# Using a unique namespace specific to CrewAI to reduce collision risks
CREW_AI_NAMESPACE = uuid.UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479')
return str(uuid.uuid5(CREW_AI_NAMESPACE, seed))
return str(uuid5(CREW_AI_NAMESPACE, seed))
@classmethod
def generate(cls, seed: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> 'Fingerprint':
"""
Static factory method to create a new Fingerprint.
def generate(
cls, seed: str | None = None, metadata: dict[str, Any] | None = None
) -> Self:
"""Static factory method to create a new Fingerprint.
Args:
seed (Optional[str]): A string to use as seed for the UUID generation.
seed: A string to use as seed for the UUID generation.
If None, a random UUID is generated.
metadata (Optional[Dict[str, Any]]): Additional metadata to store with the fingerprint.
metadata: Additional metadata to store with the fingerprint.
Returns:
Fingerprint: A new Fingerprint instance
A new Fingerprint instance
"""
fingerprint = cls(metadata=metadata or {})
if seed:
# For seed-based generation, we need to manually set the uuid_str after creation
object.__setattr__(fingerprint, 'uuid_str', cls._generate_uuid(seed))
# For seed-based generation, we need to manually set the _uuid_str after creation
fingerprint.__dict__["_uuid_str"] = cls._generate_uuid(seed)
return fingerprint
def __str__(self) -> str:
"""String representation of the fingerprint (the UUID)."""
return self.uuid_str
def __eq__(self, other) -> bool:
def __eq__(self, other: Any) -> bool:
"""Compare fingerprints by their UUID."""
if isinstance(other, Fingerprint):
if type(other) is Fingerprint:
return self.uuid_str == other.uuid_str
return False
@@ -132,29 +125,27 @@ class Fingerprint(BaseModel):
"""Hash of the fingerprint (based on UUID)."""
return hash(self.uuid_str)
def to_dict(self) -> Dict[str, Any]:
"""
Convert the fingerprint to a dictionary representation.
def to_dict(self) -> dict[str, Any]:
"""Convert the fingerprint to a dictionary representation.
Returns:
Dict[str, Any]: Dictionary representation of the fingerprint
Dictionary representation of the fingerprint
"""
return {
"uuid_str": self.uuid_str,
"created_at": self.created_at.isoformat(),
"metadata": self.metadata
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Fingerprint':
"""
Create a Fingerprint from a dictionary representation.
def from_dict(cls, data: dict[str, Any]) -> Self:
"""Create a Fingerprint from a dictionary representation.
Args:
data (Dict[str, Any]): Dictionary representation of a fingerprint
data: Dictionary representation of a fingerprint
Returns:
Fingerprint: A new Fingerprint instance
A new Fingerprint instance
"""
if not data:
return cls()
@@ -163,8 +154,10 @@ class Fingerprint(BaseModel):
# For consistency with existing stored fingerprints, we need to manually set these
if "uuid_str" in data:
object.__setattr__(fingerprint, 'uuid_str', data["uuid_str"])
fingerprint.__dict__["_uuid_str"] = data["uuid_str"]
if "created_at" in data and isinstance(data["created_at"], str):
object.__setattr__(fingerprint, 'created_at', datetime.fromisoformat(data["created_at"]))
fingerprint.__dict__["_created_at"] = datetime.fromisoformat(
data["created_at"]
)
return fingerprint

View File

@@ -1,5 +1,4 @@
"""
Security Configuration Module
"""Security Configuration Module
This module provides configuration for CrewAI security features, including:
- Authentication settings
@@ -10,9 +9,10 @@ The SecurityConfig class is the primary interface for managing security settings
in CrewAI applications.
"""
from typing import Any, Dict, Optional
from typing import Any
from pydantic import BaseModel, ConfigDict, Field, model_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator
from typing_extensions import Self
from crewai.security.fingerprint import Fingerprint
@@ -28,7 +28,6 @@ class SecurityConfig(BaseModel):
- Impersonation/delegation tokens *TODO*
Attributes:
version (str): Version of the security configuration
fingerprint (Fingerprint): The unique fingerprint automatically generated for the component
"""
@@ -37,80 +36,52 @@ class SecurityConfig(BaseModel):
# Note: Cannot use frozen=True as existing tests modify the fingerprint property
)
version: str = Field(
default="1.0.0",
description="Version of the security configuration"
)
fingerprint: Fingerprint = Field(
default_factory=Fingerprint,
description="Unique identifier for the component"
default_factory=Fingerprint, description="Unique identifier for the component"
)
def is_compatible(self, min_version: str) -> bool:
"""
Check if this security configuration is compatible with the minimum required version.
Args:
min_version (str): Minimum required version in semver format (e.g., "1.0.0")
Returns:
bool: True if this configuration is compatible, False otherwise
"""
# Simple version comparison (can be enhanced with packaging.version if needed)
current = [int(x) for x in self.version.split(".")]
minimum = [int(x) for x in min_version.split(".")]
# Compare major, minor, patch versions
for c, m in zip(current, minimum):
if c > m:
return True
if c < m:
return False
return True
@model_validator(mode='before')
@field_validator("fingerprint", mode="before")
@classmethod
def validate_fingerprint(cls, values):
def validate_fingerprint(cls, v: Any) -> Fingerprint:
"""Ensure fingerprint is properly initialized."""
if isinstance(values, dict):
# Handle case where fingerprint is not provided or is None
if 'fingerprint' not in values or values['fingerprint'] is None:
values['fingerprint'] = Fingerprint()
# Handle case where fingerprint is a string (seed)
elif isinstance(values['fingerprint'], str):
if not values['fingerprint'].strip():
raise ValueError("Fingerprint seed cannot be empty")
values['fingerprint'] = Fingerprint.generate(seed=values['fingerprint'])
return values
if v is None:
return Fingerprint()
if isinstance(v, str):
if not v.strip():
raise ValueError("Fingerprint seed cannot be empty")
return Fingerprint.generate(seed=v)
if isinstance(v, dict):
return Fingerprint.from_dict(v)
if isinstance(v, Fingerprint):
return v
def to_dict(self) -> Dict[str, Any]:
raise ValueError(f"Invalid fingerprint type: {type(v)}")
def to_dict(self) -> dict[str, Any]:
"""
Convert the security config to a dictionary.
Returns:
Dict[str, Any]: Dictionary representation of the security config
Dictionary representation of the security config
"""
result = {
"fingerprint": self.fingerprint.to_dict()
}
return result
return {"fingerprint": self.fingerprint.to_dict()}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SecurityConfig':
def from_dict(cls, data: dict[str, Any]) -> Self:
"""
Create a SecurityConfig from a dictionary.
Args:
data (Dict[str, Any]): Dictionary representation of a security config
data: Dictionary representation of a security config
Returns:
SecurityConfig: A new SecurityConfig instance
A new SecurityConfig instance
"""
# Make a copy to avoid modifying the original
data_copy = data.copy()
fingerprint_data = data_copy.pop("fingerprint", None)
fingerprint = Fingerprint.from_dict(fingerprint_data) if fingerprint_data else Fingerprint()
fingerprint_data = data.get("fingerprint")
fingerprint = (
Fingerprint.from_dict(fingerprint_data)
if fingerprint_data
else Fingerprint()
)
return cls(fingerprint=fingerprint)

View File

@@ -1,4 +1,7 @@
from typing import Any, Callable
"""Conditional task execution based on previous task output."""
from collections.abc import Callable
from typing import Any
from pydantic import Field
@@ -8,37 +11,54 @@ from crewai.tasks.task_output import TaskOutput
class ConditionalTask(Task):
"""
A task that can be conditionally executed based on the output of another task.
Note: This cannot be the only task you have in your crew and cannot be the first since its needs context from the previous task.
"""A task that can be conditionally executed based on the output of another task.
This task type allows for dynamic workflow execution based on the results of
previous tasks in the crew execution chain.
Attributes:
condition: Function that evaluates previous task output to determine execution.
Notes:
- Cannot be the only task in your crew
- Cannot be the first task since it needs context from the previous task
"""
condition: Callable[[TaskOutput], bool] = Field(
condition: Callable[[TaskOutput], bool] | None = Field(
default=None,
description="Maximum number of retries for an agent to execute a task when an error occurs.",
description="Function that determines whether the task should be executed based on previous task output.",
)
def __init__(
self,
condition: Callable[[Any], bool],
condition: Callable[[Any], bool] | None = None,
**kwargs,
):
super().__init__(**kwargs)
self.condition = condition
def should_execute(self, context: TaskOutput) -> bool:
"""
Determines whether the conditional task should be executed based on the provided context.
"""Determines whether the conditional task should be executed based on the provided context.
Args:
context (Any): The context or output from the previous task that will be evaluated by the condition.
context: The output from the previous task that will be evaluated by the condition.
Returns:
bool: True if the task should be executed, False otherwise.
True if the task should be executed, False otherwise.
Raises:
ValueError: If no condition function is set.
"""
if self.condition is None:
raise ValueError("No condition function set for conditional task")
return self.condition(context)
def get_skipped_task_output(self):
def get_skipped_task_output(self) -> TaskOutput:
"""Generate a TaskOutput for when the conditional task is skipped.
Returns:
Empty TaskOutput with RAW format indicating the task was skipped.
"""
return TaskOutput(
description=self.description,
raw="",

View File

@@ -1,8 +1,16 @@
"""Task output format definitions for CrewAI."""
from enum import Enum
class OutputFormat(str, Enum):
"""Enum that represents the output format of a task."""
"""Enum that represents the output format of a task.
Attributes:
JSON: Output as JSON dictionary format
PYDANTIC: Output as Pydantic model instance
RAW: Output as raw unprocessed string
"""
JSON = "json"
PYDANTIC = "pydantic"

View File

@@ -1,5 +1,7 @@
"""Task output representation and formatting."""
import json
from typing import Any, Dict, Optional
from typing import Any
from pydantic import BaseModel, Field, model_validator
@@ -7,19 +9,31 @@ from crewai.tasks.output_format import OutputFormat
class TaskOutput(BaseModel):
"""Class that represents the result of a task."""
"""Class that represents the result of a task.
Attributes:
description: Description of the task
name: Optional name of the task
expected_output: Expected output of the task
summary: Summary of the task (auto-generated from description)
raw: Raw output of the task
pydantic: Pydantic model output of the task
json_dict: JSON dictionary output of the task
agent: Agent that executed the task
output_format: Output format of the task (JSON, PYDANTIC, or RAW)
"""
description: str = Field(description="Description of the task")
name: Optional[str] = Field(description="Name of the task", default=None)
expected_output: Optional[str] = Field(
name: str | None = Field(description="Name of the task", default=None)
expected_output: str | None = Field(
description="Expected output of the task", default=None
)
summary: Optional[str] = Field(description="Summary of the task", default=None)
summary: str | None = Field(description="Summary of the task", default=None)
raw: str = Field(description="Raw output of the task", default="")
pydantic: Optional[BaseModel] = Field(
pydantic: BaseModel | None = Field(
description="Pydantic output of task", default=None
)
json_dict: Optional[Dict[str, Any]] = Field(
json_dict: dict[str, Any] | None = Field(
description="JSON dictionary of task", default=None
)
agent: str = Field(description="Agent that executed the task")
@@ -29,13 +43,28 @@ class TaskOutput(BaseModel):
@model_validator(mode="after")
def set_summary(self):
"""Set the summary field based on the description."""
"""Set the summary field based on the description.
Returns:
Self with updated summary field.
"""
excerpt = " ".join(self.description.split(" ")[:10])
self.summary = f"{excerpt}..."
return self
@property
def json(self) -> Optional[str]:
def json(self) -> str | None: # type: ignore[override]
"""Get the JSON string representation of the task output.
Returns:
JSON string representation of the task output.
Raises:
ValueError: If output format is not JSON.
Notes:
TODO: Refactor to use model_dump_json() to avoid BaseModel method conflict
"""
if self.output_format != OutputFormat.JSON:
raise ValueError(
"""
@@ -47,8 +76,13 @@ class TaskOutput(BaseModel):
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
def to_dict(self) -> dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary.
Returns:
Dictionary representation of the task output. Prioritizes json_dict
over pydantic model dump if both are available.
"""
output_dict = {}
if self.json_dict:
output_dict.update(self.json_dict)

View File

@@ -16,7 +16,7 @@ if TYPE_CHECKING:
from crewai.tools.base_tool import BaseTool
class ToolUsageLimitExceeded(Exception):
class ToolUsageLimitExceededError(Exception):
"""Exception raised when a tool has reached its maximum usage limit."""
pass
@@ -164,7 +164,7 @@ class CrewStructuredTool:
# Create model
schema_name = f"{name.title()}Schema"
return create_model(schema_name, **fields)
return create_model(schema_name, **fields) # type: ignore[call-overload]
def _validate_function_signature(self) -> None:
"""Validate that the function signature matches the args schema."""
@@ -207,13 +207,13 @@ class CrewStructuredTool:
raw_args = json.loads(raw_args)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse arguments as JSON: {e}")
raise ValueError(f"Failed to parse arguments as JSON: {e}") from e
try:
validated_args = self.args_schema.model_validate(raw_args)
return validated_args.model_dump()
except Exception as e:
raise ValueError(f"Arguments validation failed: {e}")
raise ValueError(f"Arguments validation failed: {e}") from e
async def ainvoke(
self,
@@ -234,7 +234,7 @@ class CrewStructuredTool:
parsed_args = self._parse_args(input)
if self.has_reached_max_usage_count():
raise ToolUsageLimitExceeded(
raise ToolUsageLimitExceededError(
f"Tool '{self.name}' has reached its maximum usage limit of {self.max_usage_count}. You should not use the {self.name} tool again."
)
@@ -267,23 +267,20 @@ class CrewStructuredTool:
parsed_args = self._parse_args(input)
if self.has_reached_max_usage_count():
raise ToolUsageLimitExceeded(
raise ToolUsageLimitExceededError(
f"Tool '{self.name}' has reached its maximum usage limit of {self.max_usage_count}. You should not use the {self.name} tool again."
)
self._increment_usage_count()
if inspect.iscoroutinefunction(self.func):
result = asyncio.run(self.func(**parsed_args, **kwargs))
return result
return asyncio.run(self.func(**parsed_args, **kwargs))
try:
result = self.func(**parsed_args, **kwargs)
except Exception:
raise
result = self.func(**parsed_args, **kwargs)
if asyncio.iscoroutine(result):
return asyncio.run(result)

View File

@@ -34,10 +34,10 @@ def test_initialization(basic_function, schema_class):
args_schema=schema_class,
)
assert tool.name == "test_tool"
assert tool.description == "Test tool description"
assert tool.func == basic_function
assert tool.args_schema == schema_class
assert tool.name == "test_tool" # noqa: S101
assert tool.description == "Test tool description" # noqa: S101
assert tool.func == basic_function # noqa: S101
assert tool.args_schema == schema_class # noqa: S101
def test_from_function(basic_function):
"""Test creating tool from function"""
@@ -45,10 +45,10 @@ def test_from_function(basic_function):
func=basic_function, name="test_tool", description="Test description"
)
assert tool.name == "test_tool"
assert tool.description == "Test description"
assert tool.func == basic_function
assert isinstance(tool.args_schema, type(BaseModel))
assert tool.name == "test_tool" # noqa: S101
assert tool.description == "Test description" # noqa: S101
assert tool.func == basic_function # noqa: S101
assert isinstance(tool.args_schema, type(BaseModel)) # noqa: S101
def test_validate_function_signature(basic_function, schema_class):
"""Test function signature validation"""
@@ -68,23 +68,23 @@ async def test_ainvoke(basic_function):
tool = CrewStructuredTool.from_function(func=basic_function, name="test_tool")
result = await tool.ainvoke(input={"param1": "test"})
assert result == "test 0"
assert result == "test 0" # noqa: S101
def test_parse_args_dict(basic_function):
"""Test parsing dictionary arguments"""
tool = CrewStructuredTool.from_function(func=basic_function, name="test_tool")
parsed = tool._parse_args({"param1": "test", "param2": 42})
assert parsed["param1"] == "test"
assert parsed["param2"] == 42
assert parsed["param1"] == "test" # noqa: S101
assert parsed["param2"] == 42 # noqa: S101
def test_parse_args_string(basic_function):
"""Test parsing string arguments"""
tool = CrewStructuredTool.from_function(func=basic_function, name="test_tool")
parsed = tool._parse_args('{"param1": "test", "param2": 42}')
assert parsed["param1"] == "test"
assert parsed["param2"] == 42
assert parsed["param1"] == "test" # noqa: S101
assert parsed["param2"] == 42 # noqa: S101
def test_complex_types():
"""Test handling of complex parameter types"""
@@ -97,7 +97,7 @@ def test_complex_types():
func=complex_func, name="test_tool", description="Test complex types"
)
result = tool.invoke({"nested": {"key": "value"}, "items": [1, 2, 3]})
assert result == "Processed 3 items with 1 nested keys"
assert result == "Processed 3 items with 1 nested keys" # noqa: S101
def test_schema_inheritance():
"""Test tool creation with inherited schema"""
@@ -117,7 +117,7 @@ def test_schema_inheritance():
)
result = tool.invoke({"base_param": "test", "extra_param": 42})
assert result == "test 42"
assert result == "test 42" # noqa: S101
def test_default_values_in_schema():
"""Test handling of default values in schema"""
@@ -136,13 +136,39 @@ def test_default_values_in_schema():
# Test with minimal parameters
result = tool.invoke({"required_param": "test"})
assert result == "test default None"
assert result == "test default None" # noqa: S101
# Test with all parameters
result = tool.invoke(
{"required_param": "test", "optional_param": "custom", "nullable_param": 42}
)
assert result == "test custom 42"
assert result == "test custom 42" # noqa: S101
def test_tool_not_executed_twice():
"""Test that tool function is only executed once per invoke call (bug #3489)"""
call_count = 0
def counting_func(param: str) -> str:
"""Function that counts how many times it's called."""
nonlocal call_count
call_count += 1
return f"Called {call_count} times with {param}"
tool = CrewStructuredTool.from_function(
func=counting_func, name="counting_tool", description="Counts calls"
)
call_count = 0
result = tool.invoke({"param": "test"})
assert call_count == 1, f"Expected function to be called once, but was called {call_count} times" # noqa: S101
assert result == "Called 1 times with test" # noqa: S101
result = tool.invoke({"param": "test2"})
assert call_count == 2, f"Expected function to be called twice total, but was called {call_count} times" # noqa: S101
assert result == "Called 2 times with test2" # noqa: S101
@pytest.fixture
def custom_tool_decorator():
@@ -178,22 +204,21 @@ def build_simple_crew(tool):
description="Use the custom tool result as answer.", agent=agent1, expected_output="Use the tool result"
)
crew = Crew(agents=[agent1], tasks=[say_hi_task])
return crew
return Crew(agents=[agent1], tasks=[say_hi_task])
@pytest.mark.vcr(filter_headers=["authorization"])
def test_async_tool_using_within_isolated_crew(custom_tool):
crew = build_simple_crew(custom_tool)
result = crew.kickoff()
assert result.raw == "Hello World from Custom Tool"
assert result.raw == "Hello World from Custom Tool" # noqa: S101
@pytest.mark.vcr(filter_headers=["authorization"])
def test_async_tool_using_decorator_within_isolated_crew(custom_tool_decorator):
crew = build_simple_crew(custom_tool_decorator)
result = crew.kickoff()
assert result.raw == "Hello World from Custom Tool"
assert result.raw == "Hello World from Custom Tool" # noqa: S101
@pytest.mark.vcr(filter_headers=["authorization"])
def test_async_tool_within_flow(custom_tool):
@@ -205,12 +230,11 @@ def test_async_tool_within_flow(custom_tool):
@start()
async def start(self):
crew = build_simple_crew(custom_tool)
result = await crew.kickoff_async()
return result
return await crew.kickoff_async()
flow = StructuredExampleFlow()
result = flow.kickoff()
assert result.raw == "Hello World from Custom Tool"
assert result.raw == "Hello World from Custom Tool" # noqa: S101
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -222,9 +246,8 @@ def test_async_tool_using_decorator_within_flow(custom_tool_decorator):
@start()
async def start(self):
crew = build_simple_crew(custom_tool_decorator)
result = await crew.kickoff_async()
return result
return await crew.kickoff_async()
flow = StructuredExampleFlow()
result = flow.kickoff()
assert result.raw == "Hello World from Custom Tool"
assert result.raw == "Hello World from Custom Tool" # noqa: S101