mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-25 12:22:38 +00:00
feat: add governance policy framework for ungoverned call sites (OWASP Agentic Top 10)
Addresses issue #5280 - Security audit identified 266 ungoverned call sites that could benefit from governance checks per OWASP Agentic Top 10 standards. Changes: - Add security/governance.py with SubprocessPolicy, HttpPolicy, ToolPolicy, GovernanceConfig classes supporting allowlist/blocklist and custom validators - Integrate governance into SecurityConfig for crew-level configuration - Add subprocess governance check in agent _validate_docker_installation - Add tool governance checks in execute_tool_and_check_finality (sync/async) - Add tool governance checks in crew_agent_executor native tool call path - Export governance types from security module - Add 42 comprehensive tests covering all policy types and integration points Governance is permissive by default (allows all) to maintain backward compatibility. Users can configure policies to restrict operations. Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -1216,16 +1216,32 @@ class Agent(BaseAgent):
|
||||
self._logger.log("warning", f"Failed to inject date: {e!s}")
|
||||
|
||||
def _validate_docker_installation(self) -> None:
|
||||
"""Check if Docker is installed and running."""
|
||||
"""Check if Docker is installed and running.
|
||||
|
||||
Validates the subprocess command against the agent's governance
|
||||
policy before execution (OWASP ASI08: Uncontrolled Code Execution).
|
||||
"""
|
||||
from crewai.security.governance import GovernanceError
|
||||
|
||||
docker_path = shutil.which("docker")
|
||||
if not docker_path:
|
||||
raise RuntimeError(
|
||||
f"Docker is not installed. Please install Docker to use code execution with agent: {self.role}"
|
||||
)
|
||||
|
||||
command = [str(docker_path), "info"]
|
||||
|
||||
# Validate subprocess command against governance policy
|
||||
try:
|
||||
self.security_config.governance.validate_subprocess(command)
|
||||
except GovernanceError as e:
|
||||
raise RuntimeError(
|
||||
f"Governance policy blocked Docker validation for agent '{self.role}': {e}"
|
||||
) from e
|
||||
|
||||
try:
|
||||
subprocess.run( # noqa: S603
|
||||
[str(docker_path), "info"],
|
||||
command,
|
||||
check=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
|
||||
@@ -962,6 +962,22 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
structured_tool = structured
|
||||
break
|
||||
|
||||
# Governance policy check (OWASP ASI02: Tool Misuse & Exploitation)
|
||||
governance_blocked = False
|
||||
if self.crew and hasattr(self.crew, "security_config"):
|
||||
from crewai.security.governance import GovernanceError
|
||||
|
||||
try:
|
||||
self.crew.security_config.governance.validate_tool(
|
||||
func_name, args_dict or {}
|
||||
)
|
||||
except GovernanceError as gov_err:
|
||||
governance_blocked = True
|
||||
result = (
|
||||
f"Tool execution blocked by governance policy. "
|
||||
f"Tool: {func_name}. Reason: {gov_err.detail}"
|
||||
)
|
||||
|
||||
hook_blocked = False
|
||||
before_hook_context = ToolCallHookContext(
|
||||
tool_name=func_name,
|
||||
@@ -972,20 +988,23 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
crew=self.crew,
|
||||
)
|
||||
before_hooks = get_before_tool_call_hooks()
|
||||
try:
|
||||
for hook in before_hooks:
|
||||
hook_result = hook(before_hook_context)
|
||||
if hook_result is False:
|
||||
hook_blocked = True
|
||||
break
|
||||
except Exception as hook_error:
|
||||
if self.agent.verbose:
|
||||
self._printer.print(
|
||||
content=f"Error in before_tool_call hook: {hook_error}",
|
||||
color="red",
|
||||
)
|
||||
if not governance_blocked:
|
||||
try:
|
||||
for hook in before_hooks:
|
||||
hook_result = hook(before_hook_context)
|
||||
if hook_result is False:
|
||||
hook_blocked = True
|
||||
break
|
||||
except Exception as hook_error:
|
||||
if self.agent.verbose:
|
||||
self._printer.print(
|
||||
content=f"Error in before_tool_call hook: {hook_error}",
|
||||
color="red",
|
||||
)
|
||||
|
||||
if hook_blocked:
|
||||
if governance_blocked:
|
||||
result = result # already set above
|
||||
elif hook_blocked:
|
||||
result = f"Tool execution blocked by hook. Tool: {func_name}"
|
||||
elif max_usage_reached and original_tool:
|
||||
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
|
||||
|
||||
@@ -8,7 +8,22 @@ This module provides security-related functionality for CrewAI, including:
|
||||
"""
|
||||
|
||||
from crewai.security.fingerprint import Fingerprint
|
||||
from crewai.security.governance import (
|
||||
GovernanceConfig,
|
||||
GovernanceError,
|
||||
HttpPolicy,
|
||||
SubprocessPolicy,
|
||||
ToolPolicy,
|
||||
)
|
||||
from crewai.security.security_config import SecurityConfig
|
||||
|
||||
|
||||
__all__ = ["Fingerprint", "SecurityConfig"]
|
||||
__all__ = [
|
||||
"Fingerprint",
|
||||
"GovernanceConfig",
|
||||
"GovernanceError",
|
||||
"HttpPolicy",
|
||||
"SecurityConfig",
|
||||
"SubprocessPolicy",
|
||||
"ToolPolicy",
|
||||
]
|
||||
|
||||
380
lib/crewai/src/crewai/security/governance.py
Normal file
380
lib/crewai/src/crewai/security/governance.py
Normal file
@@ -0,0 +1,380 @@
|
||||
"""Security Governance Module
|
||||
|
||||
This module provides configurable governance policies for CrewAI to address
|
||||
ungoverned call sites identified by security audits (OWASP Agentic Top 10).
|
||||
|
||||
Governance policies allow users to define allowlists and blocklists for:
|
||||
- Subprocess execution (ASI08: Uncontrolled Code Execution)
|
||||
- HTTP requests (ASI07: Data Leakage & Exfiltration)
|
||||
- Tool invocations (ASI02: Tool Misuse & Exploitation)
|
||||
|
||||
Each policy validates operations before they execute and raises
|
||||
GovernanceError when a policy violation is detected.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import logging
|
||||
import re
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GovernanceError(Exception):
|
||||
"""Raised when a governance policy blocks an operation.
|
||||
|
||||
Attributes:
|
||||
category: The governance category that was violated
|
||||
(e.g., 'subprocess', 'http', 'tool').
|
||||
detail: A human-readable description of the violation.
|
||||
"""
|
||||
|
||||
def __init__(self, category: str, detail: str) -> None:
|
||||
self.category = category
|
||||
self.detail = detail
|
||||
super().__init__(f"[{category}] {detail}")
|
||||
|
||||
|
||||
class SubprocessPolicy(BaseModel):
|
||||
"""Policy governing subprocess execution.
|
||||
|
||||
Controls which subprocess commands are allowed or blocked during
|
||||
agent execution. By default, all commands are allowed unless
|
||||
explicitly configured.
|
||||
|
||||
Attributes:
|
||||
allowed_commands: If set, only these command basenames are permitted.
|
||||
Example: ["docker", "git", "uv"]
|
||||
blocked_commands: Commands that are always blocked, even if they
|
||||
appear in allowed_commands.
|
||||
Example: ["rm", "shutdown"]
|
||||
allow_shell: Whether shell=True is permitted in subprocess calls.
|
||||
Defaults to False for security.
|
||||
custom_validator: Optional callable that receives (command, kwargs)
|
||||
and returns True to allow or False to block.
|
||||
"""
|
||||
|
||||
allowed_commands: list[str] | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Allowlist of command basenames. "
|
||||
"If None, all commands are allowed (unless blocked)."
|
||||
),
|
||||
)
|
||||
blocked_commands: list[str] = Field(
|
||||
default_factory=list,
|
||||
description="Blocklist of command basenames that are always denied.",
|
||||
)
|
||||
allow_shell: bool = Field(
|
||||
default=False,
|
||||
description="Whether shell=True is permitted in subprocess calls.",
|
||||
)
|
||||
custom_validator: Callable[[list[str], dict[str, Any]], bool] | None = Field(
|
||||
default=None,
|
||||
exclude=True,
|
||||
description=(
|
||||
"Optional callable(command_list, kwargs) -> bool. "
|
||||
"Return True to allow, False to block."
|
||||
),
|
||||
)
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def validate_command(
|
||||
self, command: list[str], *, shell: bool = False, **kwargs: Any
|
||||
) -> None:
|
||||
"""Validate a subprocess command against this policy.
|
||||
|
||||
Args:
|
||||
command: The command as a list of strings (e.g., ["docker", "info"]).
|
||||
shell: Whether shell mode is requested.
|
||||
**kwargs: Additional subprocess keyword arguments.
|
||||
|
||||
Raises:
|
||||
GovernanceError: If the command violates this policy.
|
||||
"""
|
||||
if not command:
|
||||
raise GovernanceError("subprocess", "Empty command is not allowed.")
|
||||
|
||||
if shell and not self.allow_shell:
|
||||
raise GovernanceError(
|
||||
"subprocess",
|
||||
"shell=True is not permitted by the subprocess policy.",
|
||||
)
|
||||
|
||||
cmd_basename = command[0].rsplit("/", 1)[-1]
|
||||
|
||||
if cmd_basename in self.blocked_commands:
|
||||
raise GovernanceError(
|
||||
"subprocess",
|
||||
f"Command '{cmd_basename}' is blocked by policy.",
|
||||
)
|
||||
|
||||
if self.allowed_commands is not None and cmd_basename not in self.allowed_commands:
|
||||
raise GovernanceError(
|
||||
"subprocess",
|
||||
f"Command '{cmd_basename}' is not in the allowed commands list: "
|
||||
f"{self.allowed_commands}.",
|
||||
)
|
||||
|
||||
if self.custom_validator is not None:
|
||||
if not self.custom_validator(command, kwargs):
|
||||
raise GovernanceError(
|
||||
"subprocess",
|
||||
f"Command '{' '.join(command)}' was rejected by custom validator.",
|
||||
)
|
||||
|
||||
logger.debug("Subprocess governance: allowed command '%s'", " ".join(command))
|
||||
|
||||
|
||||
class HttpPolicy(BaseModel):
|
||||
"""Policy governing HTTP requests.
|
||||
|
||||
Controls which HTTP endpoints agents are allowed to call. By default,
|
||||
all requests are allowed unless explicitly configured.
|
||||
|
||||
Attributes:
|
||||
allowed_domains: If set, only requests to these domains are allowed.
|
||||
Example: ["api.openai.com", "api.anthropic.com"]
|
||||
blocked_domains: Domains that are always blocked.
|
||||
Example: ["evil.example.com"]
|
||||
allowed_url_patterns: Regex patterns that URLs must match.
|
||||
Example: [r"https://api\\.openai\\.com/.*"]
|
||||
custom_validator: Optional callable that receives (url, method, kwargs)
|
||||
and returns True to allow or False to block.
|
||||
"""
|
||||
|
||||
allowed_domains: list[str] | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Allowlist of domains. "
|
||||
"If None, all domains are allowed (unless blocked)."
|
||||
),
|
||||
)
|
||||
blocked_domains: list[str] = Field(
|
||||
default_factory=list,
|
||||
description="Blocklist of domains that are always denied.",
|
||||
)
|
||||
allowed_url_patterns: list[str] | None = Field(
|
||||
default=None,
|
||||
description="Regex patterns that requested URLs must match.",
|
||||
)
|
||||
custom_validator: Callable[[str, str, dict[str, Any]], bool] | None = Field(
|
||||
default=None,
|
||||
exclude=True,
|
||||
description=(
|
||||
"Optional callable(url, method, kwargs) -> bool. "
|
||||
"Return True to allow, False to block."
|
||||
),
|
||||
)
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def validate_request(
|
||||
self, url: str, method: str = "GET", **kwargs: Any
|
||||
) -> None:
|
||||
"""Validate an HTTP request against this policy.
|
||||
|
||||
Args:
|
||||
url: The target URL.
|
||||
method: HTTP method (GET, POST, etc.).
|
||||
**kwargs: Additional request keyword arguments.
|
||||
|
||||
Raises:
|
||||
GovernanceError: If the request violates this policy.
|
||||
"""
|
||||
parsed = urlparse(url)
|
||||
domain = parsed.hostname or ""
|
||||
|
||||
if domain in self.blocked_domains:
|
||||
raise GovernanceError(
|
||||
"http",
|
||||
f"Domain '{domain}' is blocked by policy.",
|
||||
)
|
||||
|
||||
if self.allowed_domains is not None and domain not in self.allowed_domains:
|
||||
raise GovernanceError(
|
||||
"http",
|
||||
f"Domain '{domain}' is not in the allowed domains list: "
|
||||
f"{self.allowed_domains}.",
|
||||
)
|
||||
|
||||
if self.allowed_url_patterns is not None:
|
||||
matched = any(
|
||||
re.match(pattern, url) for pattern in self.allowed_url_patterns
|
||||
)
|
||||
if not matched:
|
||||
raise GovernanceError(
|
||||
"http",
|
||||
f"URL '{url}' does not match any allowed URL pattern.",
|
||||
)
|
||||
|
||||
if self.custom_validator is not None:
|
||||
if not self.custom_validator(url, method, kwargs):
|
||||
raise GovernanceError(
|
||||
"http",
|
||||
f"Request to '{url}' ({method}) was rejected by custom validator.",
|
||||
)
|
||||
|
||||
logger.debug("HTTP governance: allowed %s %s", method, url)
|
||||
|
||||
|
||||
class ToolPolicy(BaseModel):
|
||||
"""Policy governing tool invocations.
|
||||
|
||||
Controls which tools agents are allowed to use. By default, all tools
|
||||
are allowed unless explicitly configured.
|
||||
|
||||
Attributes:
|
||||
allowed_tools: If set, only tools with these names can be invoked.
|
||||
Example: ["search", "read_file"]
|
||||
blocked_tools: Tools that are always blocked.
|
||||
Example: ["delete_database", "execute_code"]
|
||||
custom_validator: Optional callable that receives
|
||||
(tool_name, tool_input) and returns True to allow or False to block.
|
||||
"""
|
||||
|
||||
allowed_tools: list[str] | None = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Allowlist of tool names. "
|
||||
"If None, all tools are allowed (unless blocked)."
|
||||
),
|
||||
)
|
||||
blocked_tools: list[str] = Field(
|
||||
default_factory=list,
|
||||
description="Blocklist of tool names that are always denied.",
|
||||
)
|
||||
custom_validator: Callable[[str, dict[str, Any]], bool] | None = Field(
|
||||
default=None,
|
||||
exclude=True,
|
||||
description=(
|
||||
"Optional callable(tool_name, tool_input) -> bool. "
|
||||
"Return True to allow, False to block."
|
||||
),
|
||||
)
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def validate_tool(
|
||||
self, tool_name: str, tool_input: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
"""Validate a tool invocation against this policy.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool being invoked.
|
||||
tool_input: Input arguments for the tool.
|
||||
|
||||
Raises:
|
||||
GovernanceError: If the tool invocation violates this policy.
|
||||
"""
|
||||
if tool_name in self.blocked_tools:
|
||||
raise GovernanceError(
|
||||
"tool",
|
||||
f"Tool '{tool_name}' is blocked by policy.",
|
||||
)
|
||||
|
||||
if self.allowed_tools is not None and tool_name not in self.allowed_tools:
|
||||
raise GovernanceError(
|
||||
"tool",
|
||||
f"Tool '{tool_name}' is not in the allowed tools list: "
|
||||
f"{self.allowed_tools}.",
|
||||
)
|
||||
|
||||
if self.custom_validator is not None:
|
||||
if not self.custom_validator(tool_name, tool_input or {}):
|
||||
raise GovernanceError(
|
||||
"tool",
|
||||
f"Tool '{tool_name}' was rejected by custom validator.",
|
||||
)
|
||||
|
||||
logger.debug("Tool governance: allowed tool '%s'", tool_name)
|
||||
|
||||
|
||||
class GovernanceConfig(BaseModel):
|
||||
"""Aggregated governance configuration for a CrewAI crew.
|
||||
|
||||
Combines subprocess, HTTP, and tool policies into a single
|
||||
configuration object that can be attached to a Crew's SecurityConfig.
|
||||
|
||||
Example:
|
||||
>>> governance = GovernanceConfig(
|
||||
... subprocess_policy=SubprocessPolicy(
|
||||
... allowed_commands=["docker", "git"],
|
||||
... blocked_commands=["rm"],
|
||||
... ),
|
||||
... http_policy=HttpPolicy(
|
||||
... allowed_domains=["api.openai.com"],
|
||||
... ),
|
||||
... tool_policy=ToolPolicy(
|
||||
... blocked_tools=["delete_database"],
|
||||
... ),
|
||||
... )
|
||||
>>> crew = Crew(
|
||||
... security_config=SecurityConfig(governance=governance),
|
||||
... ...
|
||||
... )
|
||||
"""
|
||||
|
||||
subprocess_policy: SubprocessPolicy = Field(
|
||||
default_factory=SubprocessPolicy,
|
||||
description="Policy for subprocess execution governance.",
|
||||
)
|
||||
http_policy: HttpPolicy = Field(
|
||||
default_factory=HttpPolicy,
|
||||
description="Policy for HTTP request governance.",
|
||||
)
|
||||
tool_policy: ToolPolicy = Field(
|
||||
default_factory=ToolPolicy,
|
||||
description="Policy for tool invocation governance.",
|
||||
)
|
||||
|
||||
def validate_subprocess(
|
||||
self, command: list[str], *, shell: bool = False, **kwargs: Any
|
||||
) -> None:
|
||||
"""Validate a subprocess command.
|
||||
|
||||
Args:
|
||||
command: The command as a list of strings.
|
||||
shell: Whether shell mode is requested.
|
||||
**kwargs: Additional subprocess keyword arguments.
|
||||
|
||||
Raises:
|
||||
GovernanceError: If the command violates the subprocess policy.
|
||||
"""
|
||||
self.subprocess_policy.validate_command(command, shell=shell, **kwargs)
|
||||
|
||||
def validate_http(
|
||||
self, url: str, method: str = "GET", **kwargs: Any
|
||||
) -> None:
|
||||
"""Validate an HTTP request.
|
||||
|
||||
Args:
|
||||
url: The target URL.
|
||||
method: HTTP method.
|
||||
**kwargs: Additional request keyword arguments.
|
||||
|
||||
Raises:
|
||||
GovernanceError: If the request violates the HTTP policy.
|
||||
"""
|
||||
self.http_policy.validate_request(url, method, **kwargs)
|
||||
|
||||
def validate_tool(
|
||||
self, tool_name: str, tool_input: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
"""Validate a tool invocation.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool.
|
||||
tool_input: Input arguments for the tool.
|
||||
|
||||
Raises:
|
||||
GovernanceError: If the tool invocation violates the tool policy.
|
||||
"""
|
||||
self.tool_policy.validate_tool(tool_name, tool_input)
|
||||
@@ -9,12 +9,15 @@ The SecurityConfig class is the primary interface for managing security settings
|
||||
in CrewAI applications.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator
|
||||
from typing_extensions import Self
|
||||
|
||||
from crewai.security.fingerprint import Fingerprint
|
||||
from crewai.security.governance import GovernanceConfig
|
||||
|
||||
|
||||
class SecurityConfig(BaseModel):
|
||||
@@ -39,6 +42,13 @@ class SecurityConfig(BaseModel):
|
||||
fingerprint: Fingerprint = Field(
|
||||
default_factory=Fingerprint, description="Unique identifier for the component"
|
||||
)
|
||||
governance: GovernanceConfig = Field(
|
||||
default_factory=GovernanceConfig,
|
||||
description=(
|
||||
"Governance policies for controlling subprocess execution, "
|
||||
"HTTP requests, and tool invocations."
|
||||
),
|
||||
)
|
||||
|
||||
@field_validator("fingerprint", mode="before")
|
||||
@classmethod
|
||||
@@ -64,7 +74,10 @@ class SecurityConfig(BaseModel):
|
||||
Returns:
|
||||
Dictionary representation of the security config
|
||||
"""
|
||||
return {"fingerprint": self.fingerprint.to_dict()}
|
||||
return {
|
||||
"fingerprint": self.fingerprint.to_dict(),
|
||||
"governance": self.governance.model_dump(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[str, Any]) -> Self:
|
||||
@@ -84,4 +97,11 @@ class SecurityConfig(BaseModel):
|
||||
else Fingerprint()
|
||||
)
|
||||
|
||||
return cls(fingerprint=fingerprint)
|
||||
governance_data = data.get("governance")
|
||||
governance = (
|
||||
GovernanceConfig(**governance_data)
|
||||
if governance_data
|
||||
else GovernanceConfig()
|
||||
)
|
||||
|
||||
return cls(fingerprint=fingerprint, governance=governance)
|
||||
|
||||
@@ -10,6 +10,7 @@ from crewai.hooks.tool_hooks import (
|
||||
get_before_tool_call_hooks,
|
||||
)
|
||||
from crewai.security.fingerprint import Fingerprint
|
||||
from crewai.security.governance import GovernanceError
|
||||
from crewai.tools.structured_tool import CrewStructuredTool
|
||||
from crewai.tools.tool_types import ToolResult
|
||||
from crewai.tools.tool_usage import ToolUsage, ToolUsageError
|
||||
@@ -95,6 +96,21 @@ async def aexecute_tool_and_check_finality(
|
||||
tool = tool_name_to_tool_map.get(sanitized_tool_name)
|
||||
if tool:
|
||||
tool_input = tool_calling.arguments if tool_calling.arguments else {}
|
||||
|
||||
# Governance policy check (OWASP ASI02: Tool Misuse & Exploitation)
|
||||
if crew and hasattr(crew, "security_config"):
|
||||
try:
|
||||
crew.security_config.governance.validate_tool(
|
||||
sanitized_tool_name, tool_input
|
||||
)
|
||||
except GovernanceError as e:
|
||||
blocked_message = (
|
||||
f"Tool execution blocked by governance policy. "
|
||||
f"Tool: {tool_calling.tool_name}. Reason: {e.detail}"
|
||||
)
|
||||
logger.log("warning", blocked_message)
|
||||
return ToolResult(blocked_message, False)
|
||||
|
||||
hook_context = ToolCallHookContext(
|
||||
tool_name=tool_calling.tool_name,
|
||||
tool_input=tool_input,
|
||||
@@ -215,6 +231,21 @@ def execute_tool_and_check_finality(
|
||||
tool = tool_name_to_tool_map.get(sanitized_tool_name)
|
||||
if tool:
|
||||
tool_input = tool_calling.arguments if tool_calling.arguments else {}
|
||||
|
||||
# Governance policy check (OWASP ASI02: Tool Misuse & Exploitation)
|
||||
if crew and hasattr(crew, "security_config"):
|
||||
try:
|
||||
crew.security_config.governance.validate_tool(
|
||||
sanitized_tool_name, tool_input
|
||||
)
|
||||
except GovernanceError as e:
|
||||
blocked_message = (
|
||||
f"Tool execution blocked by governance policy. "
|
||||
f"Tool: {tool_calling.tool_name}. Reason: {e.detail}"
|
||||
)
|
||||
logger.log("warning", blocked_message)
|
||||
return ToolResult(blocked_message, False)
|
||||
|
||||
hook_context = ToolCallHookContext(
|
||||
tool_name=tool_calling.tool_name,
|
||||
tool_input=tool_input,
|
||||
|
||||
577
lib/crewai/tests/security/test_governance.py
Normal file
577
lib/crewai/tests/security/test_governance.py
Normal file
@@ -0,0 +1,577 @@
|
||||
"""Tests for the security governance module.
|
||||
|
||||
Tests cover:
|
||||
- SubprocessPolicy: command allowlist/blocklist, shell validation, custom validators
|
||||
- HttpPolicy: domain allowlist/blocklist, URL pattern matching, custom validators
|
||||
- ToolPolicy: tool allowlist/blocklist, custom validators
|
||||
- GovernanceConfig: aggregated policy validation
|
||||
- Integration with SecurityConfig
|
||||
- Integration with agent subprocess calls
|
||||
- Integration with tool execution governance
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.security import (
|
||||
GovernanceConfig,
|
||||
GovernanceError,
|
||||
HttpPolicy,
|
||||
SecurityConfig,
|
||||
SubprocessPolicy,
|
||||
ToolPolicy,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubprocessPolicy tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSubprocessPolicy:
|
||||
"""Tests for SubprocessPolicy."""
|
||||
|
||||
def test_default_policy_allows_all(self) -> None:
|
||||
"""Default policy with no restrictions should allow any command."""
|
||||
policy = SubprocessPolicy()
|
||||
# Should not raise
|
||||
policy.validate_command(["docker", "info"])
|
||||
policy.validate_command(["git", "status"])
|
||||
policy.validate_command(["python", "-c", "print('hello')"])
|
||||
|
||||
def test_allowed_commands_allowlist(self) -> None:
|
||||
"""Only commands in the allowlist should be permitted."""
|
||||
policy = SubprocessPolicy(allowed_commands=["docker", "git"])
|
||||
policy.validate_command(["docker", "info"]) # allowed
|
||||
policy.validate_command(["git", "status"]) # allowed
|
||||
|
||||
with pytest.raises(GovernanceError, match="not in the allowed commands list"):
|
||||
policy.validate_command(["rm", "-rf", "/"])
|
||||
|
||||
def test_blocked_commands_blocklist(self) -> None:
|
||||
"""Blocked commands should always be denied."""
|
||||
policy = SubprocessPolicy(blocked_commands=["rm", "shutdown"])
|
||||
policy.validate_command(["docker", "info"]) # allowed
|
||||
|
||||
with pytest.raises(GovernanceError, match="blocked by policy"):
|
||||
policy.validate_command(["rm", "-rf", "/"])
|
||||
|
||||
with pytest.raises(GovernanceError, match="blocked by policy"):
|
||||
policy.validate_command(["shutdown", "-h", "now"])
|
||||
|
||||
def test_blocked_takes_precedence_over_allowed(self) -> None:
|
||||
"""A command in both allowed and blocked should be denied."""
|
||||
policy = SubprocessPolicy(
|
||||
allowed_commands=["docker", "rm"],
|
||||
blocked_commands=["rm"],
|
||||
)
|
||||
policy.validate_command(["docker", "info"]) # allowed
|
||||
|
||||
with pytest.raises(GovernanceError, match="blocked by policy"):
|
||||
policy.validate_command(["rm", "-rf", "/"])
|
||||
|
||||
def test_shell_not_allowed_by_default(self) -> None:
|
||||
"""shell=True should be blocked by default."""
|
||||
policy = SubprocessPolicy()
|
||||
|
||||
with pytest.raises(GovernanceError, match="shell=True is not permitted"):
|
||||
policy.validate_command(["echo", "hello"], shell=True)
|
||||
|
||||
def test_shell_allowed_when_configured(self) -> None:
|
||||
"""shell=True should be allowed when explicitly configured."""
|
||||
policy = SubprocessPolicy(allow_shell=True)
|
||||
# Should not raise
|
||||
policy.validate_command(["echo", "hello"], shell=True)
|
||||
|
||||
def test_empty_command_rejected(self) -> None:
|
||||
"""Empty command list should be rejected."""
|
||||
policy = SubprocessPolicy()
|
||||
|
||||
with pytest.raises(GovernanceError, match="Empty command"):
|
||||
policy.validate_command([])
|
||||
|
||||
def test_custom_validator_allows(self) -> None:
|
||||
"""Custom validator returning True should allow execution."""
|
||||
validator = MagicMock(return_value=True)
|
||||
policy = SubprocessPolicy(custom_validator=validator)
|
||||
policy.validate_command(["docker", "info"])
|
||||
validator.assert_called_once()
|
||||
|
||||
def test_custom_validator_blocks(self) -> None:
|
||||
"""Custom validator returning False should block execution."""
|
||||
validator = MagicMock(return_value=False)
|
||||
policy = SubprocessPolicy(custom_validator=validator)
|
||||
|
||||
with pytest.raises(GovernanceError, match="rejected by custom validator"):
|
||||
policy.validate_command(["docker", "info"])
|
||||
|
||||
def test_command_basename_extraction(self) -> None:
|
||||
"""Full paths should be reduced to basename for matching."""
|
||||
policy = SubprocessPolicy(allowed_commands=["docker"])
|
||||
# Should match "docker" even with full path
|
||||
policy.validate_command(["/usr/bin/docker", "info"])
|
||||
|
||||
def test_governance_error_attributes(self) -> None:
|
||||
"""GovernanceError should carry category and detail."""
|
||||
policy = SubprocessPolicy(blocked_commands=["rm"])
|
||||
|
||||
with pytest.raises(GovernanceError) as exc_info:
|
||||
policy.validate_command(["rm", "-rf", "/"])
|
||||
|
||||
error = exc_info.value
|
||||
assert error.category == "subprocess"
|
||||
assert "rm" in error.detail
|
||||
assert "blocked by policy" in error.detail
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HttpPolicy tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHttpPolicy:
|
||||
"""Tests for HttpPolicy."""
|
||||
|
||||
def test_default_policy_allows_all(self) -> None:
|
||||
"""Default policy with no restrictions should allow any URL."""
|
||||
policy = HttpPolicy()
|
||||
policy.validate_request("https://api.openai.com/v1/chat")
|
||||
policy.validate_request("https://example.com/data", method="POST")
|
||||
|
||||
def test_allowed_domains_allowlist(self) -> None:
|
||||
"""Only requests to allowed domains should be permitted."""
|
||||
policy = HttpPolicy(allowed_domains=["api.openai.com", "api.anthropic.com"])
|
||||
policy.validate_request("https://api.openai.com/v1/chat")
|
||||
|
||||
with pytest.raises(GovernanceError, match="not in the allowed domains list"):
|
||||
policy.validate_request("https://evil.example.com/steal")
|
||||
|
||||
def test_blocked_domains_blocklist(self) -> None:
|
||||
"""Blocked domains should always be denied."""
|
||||
policy = HttpPolicy(blocked_domains=["evil.example.com"])
|
||||
policy.validate_request("https://api.openai.com/v1/chat")
|
||||
|
||||
with pytest.raises(GovernanceError, match="blocked by policy"):
|
||||
policy.validate_request("https://evil.example.com/steal")
|
||||
|
||||
def test_blocked_takes_precedence_over_allowed(self) -> None:
|
||||
"""A domain in both allowed and blocked should be denied."""
|
||||
policy = HttpPolicy(
|
||||
allowed_domains=["api.openai.com", "evil.example.com"],
|
||||
blocked_domains=["evil.example.com"],
|
||||
)
|
||||
|
||||
with pytest.raises(GovernanceError, match="blocked by policy"):
|
||||
policy.validate_request("https://evil.example.com/data")
|
||||
|
||||
def test_url_pattern_matching(self) -> None:
|
||||
"""URLs should be validated against regex patterns."""
|
||||
policy = HttpPolicy(
|
||||
allowed_url_patterns=[r"https://api\.openai\.com/.*"]
|
||||
)
|
||||
policy.validate_request("https://api.openai.com/v1/chat")
|
||||
|
||||
with pytest.raises(GovernanceError, match="does not match any allowed URL pattern"):
|
||||
policy.validate_request("https://evil.com/steal")
|
||||
|
||||
def test_custom_validator_allows(self) -> None:
|
||||
"""Custom validator returning True should allow the request."""
|
||||
validator = MagicMock(return_value=True)
|
||||
policy = HttpPolicy(custom_validator=validator)
|
||||
policy.validate_request("https://api.openai.com/v1/chat", method="POST")
|
||||
validator.assert_called_once()
|
||||
|
||||
def test_custom_validator_blocks(self) -> None:
|
||||
"""Custom validator returning False should block the request."""
|
||||
validator = MagicMock(return_value=False)
|
||||
policy = HttpPolicy(custom_validator=validator)
|
||||
|
||||
with pytest.raises(GovernanceError, match="rejected by custom validator"):
|
||||
policy.validate_request("https://api.openai.com/v1/chat")
|
||||
|
||||
def test_governance_error_attributes(self) -> None:
|
||||
"""GovernanceError should carry category and detail for HTTP."""
|
||||
policy = HttpPolicy(blocked_domains=["evil.com"])
|
||||
|
||||
with pytest.raises(GovernanceError) as exc_info:
|
||||
policy.validate_request("https://evil.com/data")
|
||||
|
||||
error = exc_info.value
|
||||
assert error.category == "http"
|
||||
assert "evil.com" in error.detail
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ToolPolicy tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolPolicy:
|
||||
"""Tests for ToolPolicy."""
|
||||
|
||||
def test_default_policy_allows_all(self) -> None:
|
||||
"""Default policy with no restrictions should allow any tool."""
|
||||
policy = ToolPolicy()
|
||||
policy.validate_tool("search", {"query": "hello"})
|
||||
policy.validate_tool("read_file", {"path": "/tmp/test"})
|
||||
|
||||
def test_allowed_tools_allowlist(self) -> None:
|
||||
"""Only tools in the allowlist should be permitted."""
|
||||
policy = ToolPolicy(allowed_tools=["search", "read_file"])
|
||||
policy.validate_tool("search", {"query": "hello"})
|
||||
|
||||
with pytest.raises(GovernanceError, match="not in the allowed tools list"):
|
||||
policy.validate_tool("delete_database", {})
|
||||
|
||||
def test_blocked_tools_blocklist(self) -> None:
|
||||
"""Blocked tools should always be denied."""
|
||||
policy = ToolPolicy(blocked_tools=["delete_database", "execute_code"])
|
||||
policy.validate_tool("search", {"query": "hello"})
|
||||
|
||||
with pytest.raises(GovernanceError, match="blocked by policy"):
|
||||
policy.validate_tool("delete_database", {})
|
||||
|
||||
def test_blocked_takes_precedence_over_allowed(self) -> None:
|
||||
"""A tool in both allowed and blocked should be denied."""
|
||||
policy = ToolPolicy(
|
||||
allowed_tools=["search", "delete_database"],
|
||||
blocked_tools=["delete_database"],
|
||||
)
|
||||
|
||||
with pytest.raises(GovernanceError, match="blocked by policy"):
|
||||
policy.validate_tool("delete_database", {})
|
||||
|
||||
def test_custom_validator_allows(self) -> None:
|
||||
"""Custom validator returning True should allow the tool."""
|
||||
validator = MagicMock(return_value=True)
|
||||
policy = ToolPolicy(custom_validator=validator)
|
||||
policy.validate_tool("search", {"query": "hello"})
|
||||
validator.assert_called_once_with("search", {"query": "hello"})
|
||||
|
||||
def test_custom_validator_blocks(self) -> None:
|
||||
"""Custom validator returning False should block the tool."""
|
||||
validator = MagicMock(return_value=False)
|
||||
policy = ToolPolicy(custom_validator=validator)
|
||||
|
||||
with pytest.raises(GovernanceError, match="rejected by custom validator"):
|
||||
policy.validate_tool("search", {"query": "hello"})
|
||||
|
||||
def test_none_input_handled(self) -> None:
|
||||
"""None input should be handled gracefully."""
|
||||
policy = ToolPolicy()
|
||||
policy.validate_tool("search", None)
|
||||
|
||||
def test_governance_error_attributes(self) -> None:
|
||||
"""GovernanceError should carry category and detail for tools."""
|
||||
policy = ToolPolicy(blocked_tools=["danger"])
|
||||
|
||||
with pytest.raises(GovernanceError) as exc_info:
|
||||
policy.validate_tool("danger", {})
|
||||
|
||||
error = exc_info.value
|
||||
assert error.category == "tool"
|
||||
assert "danger" in error.detail
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GovernanceConfig tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGovernanceConfig:
|
||||
"""Tests for the aggregated GovernanceConfig."""
|
||||
|
||||
def test_default_governance_allows_all(self) -> None:
|
||||
"""Default GovernanceConfig should allow all operations."""
|
||||
config = GovernanceConfig()
|
||||
config.validate_subprocess(["docker", "info"])
|
||||
config.validate_http("https://api.openai.com/v1/chat")
|
||||
config.validate_tool("search", {"query": "hello"})
|
||||
|
||||
def test_governance_with_all_policies(self) -> None:
|
||||
"""GovernanceConfig should enforce all configured policies."""
|
||||
config = GovernanceConfig(
|
||||
subprocess_policy=SubprocessPolicy(
|
||||
allowed_commands=["docker"],
|
||||
blocked_commands=["rm"],
|
||||
),
|
||||
http_policy=HttpPolicy(
|
||||
allowed_domains=["api.openai.com"],
|
||||
),
|
||||
tool_policy=ToolPolicy(
|
||||
blocked_tools=["delete_database"],
|
||||
),
|
||||
)
|
||||
|
||||
# Allowed operations
|
||||
config.validate_subprocess(["docker", "info"])
|
||||
config.validate_http("https://api.openai.com/v1/chat")
|
||||
config.validate_tool("search", {"query": "hello"})
|
||||
|
||||
# Blocked operations
|
||||
with pytest.raises(GovernanceError, match="subprocess"):
|
||||
config.validate_subprocess(["rm", "-rf", "/"])
|
||||
|
||||
with pytest.raises(GovernanceError, match="http"):
|
||||
config.validate_http("https://evil.com/steal")
|
||||
|
||||
with pytest.raises(GovernanceError, match="tool"):
|
||||
config.validate_tool("delete_database", {})
|
||||
|
||||
def test_governance_serialization(self) -> None:
|
||||
"""GovernanceConfig should be serializable."""
|
||||
config = GovernanceConfig(
|
||||
subprocess_policy=SubprocessPolicy(
|
||||
allowed_commands=["docker"],
|
||||
blocked_commands=["rm"],
|
||||
),
|
||||
http_policy=HttpPolicy(
|
||||
blocked_domains=["evil.com"],
|
||||
),
|
||||
tool_policy=ToolPolicy(
|
||||
blocked_tools=["danger"],
|
||||
),
|
||||
)
|
||||
|
||||
data = config.model_dump()
|
||||
assert data["subprocess_policy"]["allowed_commands"] == ["docker"]
|
||||
assert data["subprocess_policy"]["blocked_commands"] == ["rm"]
|
||||
assert data["http_policy"]["blocked_domains"] == ["evil.com"]
|
||||
assert data["tool_policy"]["blocked_tools"] == ["danger"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SecurityConfig integration tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSecurityConfigGovernance:
|
||||
"""Tests for governance integration in SecurityConfig."""
|
||||
|
||||
def test_security_config_has_default_governance(self) -> None:
|
||||
"""SecurityConfig should have default governance that allows all."""
|
||||
config = SecurityConfig()
|
||||
assert config.governance is not None
|
||||
assert isinstance(config.governance, GovernanceConfig)
|
||||
|
||||
# Default governance allows everything
|
||||
config.governance.validate_subprocess(["docker", "info"])
|
||||
config.governance.validate_http("https://api.openai.com/v1/chat")
|
||||
config.governance.validate_tool("search", {})
|
||||
|
||||
def test_security_config_with_custom_governance(self) -> None:
|
||||
"""SecurityConfig should accept a custom GovernanceConfig."""
|
||||
governance = GovernanceConfig(
|
||||
subprocess_policy=SubprocessPolicy(blocked_commands=["rm"]),
|
||||
tool_policy=ToolPolicy(blocked_tools=["danger"]),
|
||||
)
|
||||
config = SecurityConfig(governance=governance)
|
||||
|
||||
with pytest.raises(GovernanceError):
|
||||
config.governance.validate_subprocess(["rm", "-rf", "/"])
|
||||
|
||||
with pytest.raises(GovernanceError):
|
||||
config.governance.validate_tool("danger", {})
|
||||
|
||||
def test_security_config_to_dict_includes_governance(self) -> None:
|
||||
"""to_dict() should include governance configuration."""
|
||||
governance = GovernanceConfig(
|
||||
subprocess_policy=SubprocessPolicy(blocked_commands=["rm"]),
|
||||
)
|
||||
config = SecurityConfig(governance=governance)
|
||||
config_dict = config.to_dict()
|
||||
|
||||
assert "governance" in config_dict
|
||||
assert "subprocess_policy" in config_dict["governance"]
|
||||
assert config_dict["governance"]["subprocess_policy"]["blocked_commands"] == ["rm"]
|
||||
|
||||
def test_security_config_from_dict_with_governance(self) -> None:
|
||||
"""from_dict() should restore governance configuration."""
|
||||
original = SecurityConfig(
|
||||
governance=GovernanceConfig(
|
||||
subprocess_policy=SubprocessPolicy(blocked_commands=["rm"]),
|
||||
http_policy=HttpPolicy(blocked_domains=["evil.com"]),
|
||||
tool_policy=ToolPolicy(blocked_tools=["danger"]),
|
||||
)
|
||||
)
|
||||
|
||||
config_dict = original.to_dict()
|
||||
restored = SecurityConfig.from_dict(config_dict)
|
||||
|
||||
assert restored.governance is not None
|
||||
assert "rm" in restored.governance.subprocess_policy.blocked_commands
|
||||
assert "evil.com" in restored.governance.http_policy.blocked_domains
|
||||
assert "danger" in restored.governance.tool_policy.blocked_tools
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent subprocess governance integration tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAgentSubprocessGovernance:
|
||||
"""Tests for governance enforcement in agent subprocess calls."""
|
||||
|
||||
def test_validate_docker_blocked_by_governance(self) -> None:
|
||||
"""Agent._validate_docker_installation should respect subprocess governance."""
|
||||
from crewai.agent.core import Agent
|
||||
|
||||
agent = Agent(
|
||||
role="test",
|
||||
goal="test",
|
||||
backstory="test",
|
||||
security_config=SecurityConfig(
|
||||
governance=GovernanceConfig(
|
||||
subprocess_policy=SubprocessPolicy(
|
||||
blocked_commands=["docker"],
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
with patch("shutil.which", return_value="/usr/bin/docker"):
|
||||
with pytest.raises(RuntimeError, match="Governance policy blocked"):
|
||||
agent._validate_docker_installation()
|
||||
|
||||
def test_validate_docker_allowed_by_governance(self) -> None:
|
||||
"""Agent._validate_docker_installation should pass with permissive governance."""
|
||||
from crewai.agent.core import Agent
|
||||
|
||||
agent = Agent(
|
||||
role="test",
|
||||
goal="test",
|
||||
backstory="test",
|
||||
security_config=SecurityConfig(
|
||||
governance=GovernanceConfig(
|
||||
subprocess_policy=SubprocessPolicy(
|
||||
allowed_commands=["docker"],
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
with (
|
||||
patch("shutil.which", return_value="/usr/bin/docker"),
|
||||
patch("subprocess.run") as mock_run,
|
||||
):
|
||||
mock_run.return_value = MagicMock(returncode=0)
|
||||
# Should not raise
|
||||
agent._validate_docker_installation()
|
||||
|
||||
def test_validate_docker_not_in_allowlist(self) -> None:
|
||||
"""Subprocess governance should block docker when not in allowlist."""
|
||||
from crewai.agent.core import Agent
|
||||
|
||||
agent = Agent(
|
||||
role="test",
|
||||
goal="test",
|
||||
backstory="test",
|
||||
security_config=SecurityConfig(
|
||||
governance=GovernanceConfig(
|
||||
subprocess_policy=SubprocessPolicy(
|
||||
allowed_commands=["git"], # docker not allowed
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
with patch("shutil.which", return_value="/usr/bin/docker"):
|
||||
with pytest.raises(RuntimeError, match="Governance policy blocked"):
|
||||
agent._validate_docker_installation()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tool governance integration tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolGovernanceIntegration:
|
||||
"""Tests for tool governance enforcement in the tool execution path."""
|
||||
|
||||
def test_tool_blocked_by_governance_sync(self) -> None:
|
||||
"""execute_tool_and_check_finality should block tools per governance."""
|
||||
from crewai.agents.parser import AgentAction
|
||||
from crewai.security.security_config import SecurityConfig
|
||||
from crewai.utilities.tool_utils import execute_tool_and_check_finality
|
||||
|
||||
# Create a mock crew with governance that blocks 'danger_tool'
|
||||
mock_crew = MagicMock()
|
||||
mock_crew.verbose = False
|
||||
mock_crew.security_config = SecurityConfig(
|
||||
governance=GovernanceConfig(
|
||||
tool_policy=ToolPolicy(blocked_tools=["danger_tool"]),
|
||||
)
|
||||
)
|
||||
|
||||
# Create a mock tool with attributes needed by ToolUsage internals
|
||||
mock_tool = MagicMock()
|
||||
mock_tool.name = "Danger Tool"
|
||||
mock_tool.result_as_answer = False
|
||||
mock_tool.description = "Danger Tool: A dangerous tool, args: {}"
|
||||
|
||||
agent_action = AgentAction(
|
||||
tool="danger_tool",
|
||||
tool_input="{}",
|
||||
text='Action: Danger Tool\nAction Input: {}',
|
||||
thought="",
|
||||
)
|
||||
|
||||
from crewai.utilities.i18n import get_i18n
|
||||
|
||||
result = execute_tool_and_check_finality(
|
||||
agent_action=agent_action,
|
||||
tools=[mock_tool],
|
||||
i18n=get_i18n(),
|
||||
crew=mock_crew,
|
||||
)
|
||||
|
||||
assert "blocked by governance policy" in result.result
|
||||
|
||||
def test_tool_allowed_by_governance(self) -> None:
|
||||
"""Tools not blocked by governance should proceed normally."""
|
||||
governance = GovernanceConfig(
|
||||
tool_policy=ToolPolicy(
|
||||
allowed_tools=["search_tool"],
|
||||
),
|
||||
)
|
||||
|
||||
# Allowed tool should not raise
|
||||
governance.validate_tool("search_tool", {"query": "test"})
|
||||
|
||||
# Disallowed tool should raise
|
||||
with pytest.raises(GovernanceError):
|
||||
governance.validate_tool("other_tool", {})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GovernanceError tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGovernanceError:
|
||||
"""Tests for the GovernanceError exception."""
|
||||
|
||||
def test_error_message_format(self) -> None:
|
||||
"""GovernanceError message should include category and detail."""
|
||||
error = GovernanceError("subprocess", "Command 'rm' is blocked")
|
||||
assert str(error) == "[subprocess] Command 'rm' is blocked"
|
||||
assert error.category == "subprocess"
|
||||
assert error.detail == "Command 'rm' is blocked"
|
||||
|
||||
def test_error_inherits_from_exception(self) -> None:
|
||||
"""GovernanceError should be catchable as Exception."""
|
||||
with pytest.raises(Exception):
|
||||
raise GovernanceError("test", "test detail")
|
||||
|
||||
def test_error_categories(self) -> None:
|
||||
"""Different governance categories should be properly tracked."""
|
||||
subprocess_err = GovernanceError("subprocess", "blocked")
|
||||
http_err = GovernanceError("http", "blocked")
|
||||
tool_err = GovernanceError("tool", "blocked")
|
||||
|
||||
assert subprocess_err.category == "subprocess"
|
||||
assert http_err.category == "http"
|
||||
assert tool_err.category == "tool"
|
||||
Reference in New Issue
Block a user