diff --git a/src/crewai/security/constants.py b/src/crewai/security/constants.py new file mode 100644 index 000000000..c16a52665 --- /dev/null +++ b/src/crewai/security/constants.py @@ -0,0 +1,15 @@ +"""Security constants for CrewAI. + +This module contains security-related constants used throughout the security module. + +Notes: + - TODO: Determine if CREW_AI_NAMESPACE should be made dynamic or configurable +""" + +from typing import Annotated +from uuid import UUID + +CREW_AI_NAMESPACE: Annotated[ + UUID, + "Create a deterministic UUID using v5 (SHA-1). Custom namespace for CrewAI to enhance security.", +] = UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479") diff --git a/src/crewai/security/fingerprint.py b/src/crewai/security/fingerprint.py index 982c62492..1ed21405c 100644 --- a/src/crewai/security/fingerprint.py +++ b/src/crewai/security/fingerprint.py @@ -1,130 +1,123 @@ -""" -Fingerprint Module +"""Fingerprint Module This module provides functionality for generating and validating unique identifiers for CrewAI agents. These identifiers are used for tracking, auditing, and security. """ -import uuid from datetime import datetime -from typing import Any, Dict, Optional +from typing import Annotated, Any +from uuid import UUID, uuid4, uuid5 -from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic import BaseModel, BeforeValidator, Field, PrivateAttr +from typing_extensions import Self + +from crewai.security.constants import CREW_AI_NAMESPACE + + +def _validate_metadata(v: Any) -> dict[str, Any]: + """Validate that metadata is a dictionary with string keys and valid values.""" + if not isinstance(v, dict): + raise ValueError("Metadata must be a dictionary") + + # Validate that all keys are strings + for key, value in v.items(): + if not isinstance(key, str): + raise ValueError(f"Metadata keys must be strings, got {type(key)}") + + # Validate nested dictionaries (prevent deeply nested structures) + if isinstance(value, dict): + # Check for nested dictionaries (limit depth to 1) + for nested_key, nested_value in value.items(): + if not isinstance(nested_key, str): + raise ValueError( + f"Nested metadata keys must be strings, got {type(nested_key)}" + ) + if isinstance(nested_value, dict): + raise ValueError("Metadata can only be nested one level deep") + + # Check for maximum metadata size (prevent DoS) + if len(str(v)) > 10_000: # Limit metadata size to 10KB + raise ValueError("Metadata size exceeds maximum allowed (10KB)") + + return v class Fingerprint(BaseModel): - """ - A class for generating and managing unique identifiers for agents. + """A class for generating and managing unique identifiers for agents. Each agent has dual identifiers: - Human-readable ID: For debugging and reference (derived from role if not specified) - Fingerprint UUID: Unique runtime identifier for tracking and auditing Attributes: - uuid_str (str): String representation of the UUID for this fingerprint, auto-generated - created_at (datetime): When this fingerprint was created, auto-generated - metadata (Dict[str, Any]): Additional metadata associated with this fingerprint + uuid_str: String representation of the UUID for this fingerprint, auto-generated + created_at: When this fingerprint was created, auto-generated + metadata: Additional metadata associated with this fingerprint """ - uuid_str: str = Field(default_factory=lambda: str(uuid.uuid4()), description="String representation of the UUID") - created_at: datetime = Field(default_factory=datetime.now, description="When this fingerprint was created") - metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata for this fingerprint") - - model_config = ConfigDict(arbitrary_types_allowed=True) - - @field_validator('metadata') - @classmethod - def validate_metadata(cls, v): - """Validate that metadata is a dictionary with string keys and valid values.""" - if not isinstance(v, dict): - raise ValueError("Metadata must be a dictionary") - - # Validate that all keys are strings - for key, value in v.items(): - if not isinstance(key, str): - raise ValueError(f"Metadata keys must be strings, got {type(key)}") - - # Validate nested dictionaries (prevent deeply nested structures) - if isinstance(value, dict): - # Check for nested dictionaries (limit depth to 1) - for nested_key, nested_value in value.items(): - if not isinstance(nested_key, str): - raise ValueError(f"Nested metadata keys must be strings, got {type(nested_key)}") - if isinstance(nested_value, dict): - raise ValueError("Metadata can only be nested one level deep") - - # Check for maximum metadata size (prevent DoS) - if len(str(v)) > 10000: # Limit metadata size to 10KB - raise ValueError("Metadata size exceeds maximum allowed (10KB)") - - return v - - def __init__(self, **data): - """Initialize a Fingerprint with auto-generated uuid_str and created_at.""" - # Remove uuid_str and created_at from data to ensure they're auto-generated - if 'uuid_str' in data: - data.pop('uuid_str') - if 'created_at' in data: - data.pop('created_at') - - # Call the parent constructor with the modified data - super().__init__(**data) + _uuid_str: str = PrivateAttr(default_factory=lambda: str(uuid4())) + _created_at: datetime = PrivateAttr(default_factory=datetime.now) + metadata: Annotated[dict[str, Any], BeforeValidator(_validate_metadata)] = Field( + default_factory=dict + ) @property - def uuid(self) -> uuid.UUID: + def uuid_str(self) -> str: + """Get the string representation of the UUID for this fingerprint.""" + return self._uuid_str + + @property + def created_at(self) -> datetime: + """Get the creation timestamp for this fingerprint.""" + return self._created_at + + @property + def uuid(self) -> UUID: """Get the UUID object for this fingerprint.""" - return uuid.UUID(self.uuid_str) + return UUID(self.uuid_str) @classmethod def _generate_uuid(cls, seed: str) -> str: - """ - Generate a deterministic UUID based on a seed string. + """Generate a deterministic UUID based on a seed string. Args: - seed (str): The seed string to use for UUID generation + seed: The seed string to use for UUID generation Returns: - str: A string representation of the UUID consistently generated from the seed + A string representation of the UUID consistently generated from the seed """ - if not isinstance(seed, str): - raise ValueError("Seed must be a string") - if not seed.strip(): raise ValueError("Seed cannot be empty or whitespace") - - # Create a deterministic UUID using v5 (SHA-1) - # Custom namespace for CrewAI to enhance security - # Using a unique namespace specific to CrewAI to reduce collision risks - CREW_AI_NAMESPACE = uuid.UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479') - return str(uuid.uuid5(CREW_AI_NAMESPACE, seed)) + return str(uuid5(CREW_AI_NAMESPACE, seed)) @classmethod - def generate(cls, seed: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> 'Fingerprint': - """ - Static factory method to create a new Fingerprint. + def generate( + cls, seed: str | None = None, metadata: dict[str, Any] | None = None + ) -> Self: + """Static factory method to create a new Fingerprint. Args: - seed (Optional[str]): A string to use as seed for the UUID generation. + seed: A string to use as seed for the UUID generation. If None, a random UUID is generated. - metadata (Optional[Dict[str, Any]]): Additional metadata to store with the fingerprint. + metadata: Additional metadata to store with the fingerprint. Returns: - Fingerprint: A new Fingerprint instance + A new Fingerprint instance """ fingerprint = cls(metadata=metadata or {}) if seed: - # For seed-based generation, we need to manually set the uuid_str after creation - object.__setattr__(fingerprint, 'uuid_str', cls._generate_uuid(seed)) + # For seed-based generation, we need to manually set the _uuid_str after creation + fingerprint.__dict__["_uuid_str"] = cls._generate_uuid(seed) return fingerprint def __str__(self) -> str: """String representation of the fingerprint (the UUID).""" return self.uuid_str - def __eq__(self, other) -> bool: + def __eq__(self, other: Any) -> bool: """Compare fingerprints by their UUID.""" - if isinstance(other, Fingerprint): + if type(other) is Fingerprint: return self.uuid_str == other.uuid_str return False @@ -132,29 +125,27 @@ class Fingerprint(BaseModel): """Hash of the fingerprint (based on UUID).""" return hash(self.uuid_str) - def to_dict(self) -> Dict[str, Any]: - """ - Convert the fingerprint to a dictionary representation. + def to_dict(self) -> dict[str, Any]: + """Convert the fingerprint to a dictionary representation. Returns: - Dict[str, Any]: Dictionary representation of the fingerprint + Dictionary representation of the fingerprint """ return { "uuid_str": self.uuid_str, "created_at": self.created_at.isoformat(), - "metadata": self.metadata + "metadata": self.metadata, } @classmethod - def from_dict(cls, data: Dict[str, Any]) -> 'Fingerprint': - """ - Create a Fingerprint from a dictionary representation. + def from_dict(cls, data: dict[str, Any]) -> Self: + """Create a Fingerprint from a dictionary representation. Args: - data (Dict[str, Any]): Dictionary representation of a fingerprint + data: Dictionary representation of a fingerprint Returns: - Fingerprint: A new Fingerprint instance + A new Fingerprint instance """ if not data: return cls() @@ -163,8 +154,10 @@ class Fingerprint(BaseModel): # For consistency with existing stored fingerprints, we need to manually set these if "uuid_str" in data: - object.__setattr__(fingerprint, 'uuid_str', data["uuid_str"]) + fingerprint.__dict__["_uuid_str"] = data["uuid_str"] if "created_at" in data and isinstance(data["created_at"], str): - object.__setattr__(fingerprint, 'created_at', datetime.fromisoformat(data["created_at"])) + fingerprint.__dict__["_created_at"] = datetime.fromisoformat( + data["created_at"] + ) return fingerprint diff --git a/src/crewai/security/security_config.py b/src/crewai/security/security_config.py index 9f680de42..8f037a0db 100644 --- a/src/crewai/security/security_config.py +++ b/src/crewai/security/security_config.py @@ -1,5 +1,4 @@ -""" -Security Configuration Module +"""Security Configuration Module This module provides configuration for CrewAI security features, including: - Authentication settings @@ -10,9 +9,10 @@ The SecurityConfig class is the primary interface for managing security settings in CrewAI applications. """ -from typing import Any, Dict, Optional +from typing import Any -from pydantic import BaseModel, ConfigDict, Field, model_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator +from typing_extensions import Self from crewai.security.fingerprint import Fingerprint @@ -28,7 +28,6 @@ class SecurityConfig(BaseModel): - Impersonation/delegation tokens *TODO* Attributes: - version (str): Version of the security configuration fingerprint (Fingerprint): The unique fingerprint automatically generated for the component """ @@ -37,80 +36,52 @@ class SecurityConfig(BaseModel): # Note: Cannot use frozen=True as existing tests modify the fingerprint property ) - version: str = Field( - default="1.0.0", - description="Version of the security configuration" - ) - fingerprint: Fingerprint = Field( - default_factory=Fingerprint, - description="Unique identifier for the component" + default_factory=Fingerprint, description="Unique identifier for the component" ) - - def is_compatible(self, min_version: str) -> bool: - """ - Check if this security configuration is compatible with the minimum required version. - - Args: - min_version (str): Minimum required version in semver format (e.g., "1.0.0") - - Returns: - bool: True if this configuration is compatible, False otherwise - """ - # Simple version comparison (can be enhanced with packaging.version if needed) - current = [int(x) for x in self.version.split(".")] - minimum = [int(x) for x in min_version.split(".")] - - # Compare major, minor, patch versions - for c, m in zip(current, minimum): - if c > m: - return True - if c < m: - return False - return True - @model_validator(mode='before') + @field_validator("fingerprint", mode="before") @classmethod - def validate_fingerprint(cls, values): + def validate_fingerprint(cls, v: Any) -> Fingerprint: """Ensure fingerprint is properly initialized.""" - if isinstance(values, dict): - # Handle case where fingerprint is not provided or is None - if 'fingerprint' not in values or values['fingerprint'] is None: - values['fingerprint'] = Fingerprint() - # Handle case where fingerprint is a string (seed) - elif isinstance(values['fingerprint'], str): - if not values['fingerprint'].strip(): - raise ValueError("Fingerprint seed cannot be empty") - values['fingerprint'] = Fingerprint.generate(seed=values['fingerprint']) - return values + if v is None: + return Fingerprint() + if isinstance(v, str): + if not v.strip(): + raise ValueError("Fingerprint seed cannot be empty") + return Fingerprint.generate(seed=v) + if isinstance(v, dict): + return Fingerprint.from_dict(v) + if isinstance(v, Fingerprint): + return v - def to_dict(self) -> Dict[str, Any]: + raise ValueError(f"Invalid fingerprint type: {type(v)}") + + def to_dict(self) -> dict[str, Any]: """ Convert the security config to a dictionary. Returns: - Dict[str, Any]: Dictionary representation of the security config + Dictionary representation of the security config """ - result = { - "fingerprint": self.fingerprint.to_dict() - } - return result + return {"fingerprint": self.fingerprint.to_dict()} @classmethod - def from_dict(cls, data: Dict[str, Any]) -> 'SecurityConfig': + def from_dict(cls, data: dict[str, Any]) -> Self: """ Create a SecurityConfig from a dictionary. Args: - data (Dict[str, Any]): Dictionary representation of a security config + data: Dictionary representation of a security config Returns: - SecurityConfig: A new SecurityConfig instance + A new SecurityConfig instance """ - # Make a copy to avoid modifying the original - data_copy = data.copy() - - fingerprint_data = data_copy.pop("fingerprint", None) - fingerprint = Fingerprint.from_dict(fingerprint_data) if fingerprint_data else Fingerprint() + fingerprint_data = data.get("fingerprint") + fingerprint = ( + Fingerprint.from_dict(fingerprint_data) + if fingerprint_data + else Fingerprint() + ) return cls(fingerprint=fingerprint)