Add source to LLM Guardrail events (#3572)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

This commit adds the source attribute to LLM Guardrail event calls to
identify the Lite Agent or Task that executed the guardrail.
This commit is contained in:
Vini Brasil
2025-09-22 11:58:00 +09:00
committed by GitHub
parent 9c1096dbdc
commit aa8dc9d77f
5 changed files with 111 additions and 152 deletions

View File

@@ -5,20 +5,14 @@ import logging
import threading
import uuid
import warnings
from collections.abc import Callable
from concurrent.futures import Future
from copy import copy
from hashlib import md5
from pathlib import Path
from typing import (
Any,
Callable,
ClassVar,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
get_args,
get_origin,
@@ -35,20 +29,20 @@ from pydantic import (
from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_types import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.security import Fingerprint, SecurityConfig
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.guardrail import process_guardrail, GuardrailResult
from crewai.utilities.converter import Converter, convert_to_model
from crewai.events.event_types import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.utilities.guardrail import process_guardrail
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import interpolate_only
@@ -85,50 +79,50 @@ class Task(BaseModel):
tools_errors: int = 0
delegations: int = 0
i18n: I18N = I18N()
name: Optional[str] = Field(default=None)
prompt_context: Optional[str] = None
name: str | None = Field(default=None)
prompt_context: str | None = None
description: str = Field(description="Description of the actual task.")
expected_output: str = Field(
description="Clear definition of expected output for the task."
)
config: Optional[Dict[str, Any]] = Field(
config: dict[str, Any] | None = Field(
description="Configuration for the agent",
default=None,
)
callback: Optional[Any] = Field(
callback: Any | None = Field(
description="Callback to be executed after the task is completed.", default=None
)
agent: Optional[BaseAgent] = Field(
agent: BaseAgent | None = Field(
description="Agent responsible for execution the task.", default=None
)
context: Union[List["Task"], None, _NotSpecified] = Field(
context: list["Task"] | None | _NotSpecified = Field(
description="Other tasks that will have their output used as context for this task.",
default=NOT_SPECIFIED,
)
async_execution: Optional[bool] = Field(
async_execution: bool | None = Field(
description="Whether the task should be executed asynchronously or not.",
default=False,
)
output_json: Optional[Type[BaseModel]] = Field(
output_json: type[BaseModel] | None = Field(
description="A Pydantic model to be used to create a JSON output.",
default=None,
)
output_pydantic: Optional[Type[BaseModel]] = Field(
output_pydantic: type[BaseModel] | None = Field(
description="A Pydantic model to be used to create a Pydantic output.",
default=None,
)
output_file: Optional[str] = Field(
output_file: str | None = Field(
description="A file path to be used to create a file output.",
default=None,
)
create_directory: Optional[bool] = Field(
create_directory: bool | None = Field(
description="Whether to create the directory for output_file if it doesn't exist.",
default=True,
)
output: Optional[TaskOutput] = Field(
output: TaskOutput | None = Field(
description="Task output, it's final result after being executed", default=None
)
tools: Optional[List[BaseTool]] = Field(
tools: list[BaseTool] | None = Field(
default_factory=list,
description="Tools the agent is limited to use for this task.",
)
@@ -141,24 +135,24 @@ class Task(BaseModel):
frozen=True,
description="Unique identifier for the object, not set by user.",
)
human_input: Optional[bool] = Field(
human_input: bool | None = Field(
description="Whether the task should have a human review the final answer of the agent",
default=False,
)
markdown: Optional[bool] = Field(
markdown: bool | None = Field(
description="Whether the task should instruct the agent to return the final answer formatted in Markdown",
default=False,
)
converter_cls: Optional[Type[Converter]] = Field(
converter_cls: type[Converter] | None = Field(
description="A converter class used to export structured output",
default=None,
)
processed_by_agents: Set[str] = Field(default_factory=set)
guardrail: Optional[Union[Callable[[TaskOutput], Tuple[bool, Any]], str]] = Field(
processed_by_agents: set[str] = Field(default_factory=set)
guardrail: Callable[[TaskOutput], tuple[bool, Any]] | str | None = Field(
default=None,
description="Function or string description of a guardrail to validate task output before proceeding to next task",
)
max_retries: Optional[int] = Field(
max_retries: int | None = Field(
default=None,
description="[DEPRECATED] Maximum number of retries when guardrail fails. Use guardrail_max_retries instead. Will be removed in v1.0.0",
)
@@ -166,13 +160,13 @@ class Task(BaseModel):
default=3, description="Maximum number of retries when guardrail fails"
)
retry_count: int = Field(default=0, description="Current number of retries")
start_time: Optional[datetime.datetime] = Field(
start_time: datetime.datetime | None = Field(
default=None, description="Start time of the task execution"
)
end_time: Optional[datetime.datetime] = Field(
end_time: datetime.datetime | None = Field(
default=None, description="End time of the task execution"
)
allow_crewai_trigger_context: Optional[bool] = Field(
allow_crewai_trigger_context: bool | None = Field(
default=None,
description="Whether this task should append 'Trigger Payload: {crewai_trigger_payload}' to the task description when crewai_trigger_payload exists in crew inputs.",
)
@@ -181,8 +175,8 @@ class Task(BaseModel):
@field_validator("guardrail")
@classmethod
def validate_guardrail_function(
cls, v: Optional[str | Callable]
) -> Optional[str | Callable]:
cls, v: str | Callable | None
) -> str | Callable | None:
"""
If v is a callable, validate that the guardrail function has the correct signature and behavior.
If v is a string, return it as is.
@@ -229,7 +223,7 @@ class Task(BaseModel):
return_annotation_args[1] is Any
or return_annotation_args[1] is str
or return_annotation_args[1] is TaskOutput
or return_annotation_args[1] == Union[str, TaskOutput]
or return_annotation_args[1] == str | TaskOutput
)
):
raise ValueError(
@@ -237,11 +231,11 @@ class Task(BaseModel):
)
return v
_guardrail: Optional[Callable] = PrivateAttr(default=None)
_original_description: Optional[str] = PrivateAttr(default=None)
_original_expected_output: Optional[str] = PrivateAttr(default=None)
_original_output_file: Optional[str] = PrivateAttr(default=None)
_thread: Optional[threading.Thread] = PrivateAttr(default=None)
_guardrail: Callable | None = PrivateAttr(default=None)
_original_description: str | None = PrivateAttr(default=None)
_original_expected_output: str | None = PrivateAttr(default=None)
_original_output_file: str | None = PrivateAttr(default=None)
_thread: threading.Thread | None = PrivateAttr(default=None)
@model_validator(mode="before")
@classmethod
@@ -265,7 +259,9 @@ class Task(BaseModel):
elif isinstance(self.guardrail, str):
from crewai.tasks.llm_guardrail import LLMGuardrail
assert self.agent is not None
if self.agent is None:
raise ValueError("Agent is required to use LLMGuardrail")
self._guardrail = LLMGuardrail(
description=self.guardrail, llm=self.agent.llm
)
@@ -274,7 +270,7 @@ class Task(BaseModel):
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
def _deny_user_set_id(cls, v: UUID4 | None) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
@@ -282,7 +278,7 @@ class Task(BaseModel):
@field_validator("output_file")
@classmethod
def output_file_validation(cls, value: Optional[str]) -> Optional[str]:
def output_file_validation(cls, value: str | None) -> str | None:
"""Validate the output file path.
Args:
@@ -307,7 +303,7 @@ class Task(BaseModel):
)
# Check for shell expansion first
if value.startswith("~") or value.startswith("$"):
if value.startswith(("~", "$")):
raise ValueError(
"Shell expansion characters are not allowed in output_file paths"
)
@@ -373,9 +369,9 @@ class Task(BaseModel):
def execute_sync(
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
agent: BaseAgent | None = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> TaskOutput:
"""Execute the task synchronously."""
return self._execute_core(agent, context, tools)
@@ -397,8 +393,8 @@ class Task(BaseModel):
def execute_async(
self,
agent: BaseAgent | None = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
@@ -411,9 +407,9 @@ class Task(BaseModel):
def _execute_task_async(
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
future: Future[TaskOutput],
) -> None:
"""Execute the task asynchronously with context handling."""
@@ -422,9 +418,9 @@ class Task(BaseModel):
def _execute_core(
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
) -> TaskOutput:
"""Run the core execution logic of the task."""
try:
@@ -465,6 +461,7 @@ class Task(BaseModel):
output=task_output,
guardrail=self._guardrail,
retry_count=self.retry_count,
event_source=self,
)
if not guardrail_result.success:
if self.retry_count >= self.guardrail_max_retries:
@@ -528,41 +525,6 @@ class Task(BaseModel):
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))
raise e # Re-raise the exception after emitting the event
def _process_guardrail(self, task_output: TaskOutput) -> GuardrailResult:
assert self._guardrail is not None
from crewai.events.event_types import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.events.event_bus import crewai_event_bus
crewai_event_bus.emit(
self,
LLMGuardrailStartedEvent(
guardrail=self._guardrail, retry_count=self.retry_count
),
)
try:
result = self._guardrail(task_output)
guardrail_result = GuardrailResult.from_tuple(result)
except Exception as e:
guardrail_result = GuardrailResult(
success=False, result=None, error=f"Guardrail execution error: {str(e)}"
)
crewai_event_bus.emit(
self,
LLMGuardrailCompletedEvent(
success=guardrail_result.success,
result=guardrail_result.result,
error=guardrail_result.error,
retry_count=self.retry_count,
),
)
return guardrail_result
def prompt(self) -> str:
"""Generates the task prompt with optional markdown formatting.
@@ -604,7 +566,7 @@ Follow these guidelines:
return "\n".join(tasks_slices)
def interpolate_inputs_and_add_conversation_history(
self, inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]]
self, inputs: dict[str, str | int | float | dict[str, Any] | list[Any]]
) -> None:
"""Interpolate inputs into the task description, expected output, and output file path.
Add conversation history if present.
@@ -635,14 +597,14 @@ Follow these guidelines:
f"Missing required template variable '{e.args[0]}' in description"
) from e
except ValueError as e:
raise ValueError(f"Error interpolating description: {str(e)}") from e
raise ValueError(f"Error interpolating description: {e!s}") from e
try:
self.expected_output = interpolate_only(
input_string=self._original_expected_output, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(f"Error interpolating expected_output: {str(e)}") from e
raise ValueError(f"Error interpolating expected_output: {e!s}") from e
if self.output_file is not None:
try:
@@ -650,11 +612,9 @@ Follow these guidelines:
input_string=self._original_output_file, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(
f"Error interpolating output_file path: {str(e)}"
) from e
raise ValueError(f"Error interpolating output_file path: {e!s}") from e
if "crew_chat_messages" in inputs and inputs["crew_chat_messages"]:
if inputs.get("crew_chat_messages"):
conversation_instruction = self.i18n.slice(
"conversation_history_instruction"
)
@@ -681,14 +641,14 @@ Follow these guidelines:
"""Increment the tools errors counter."""
self.tools_errors += 1
def increment_delegations(self, agent_name: Optional[str]) -> None:
def increment_delegations(self, agent_name: str | None) -> None:
"""Increment the delegations counter."""
if agent_name:
self.processed_by_agents.add(agent_name)
self.delegations += 1
def copy(
self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"]
def copy( # type: ignore
self, agents: list["BaseAgent"], task_mapping: dict[str, "Task"]
) -> "Task":
"""Creates a deep copy of the Task while preserving its original class type.
@@ -721,20 +681,18 @@ Follow these guidelines:
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
copied_task = self.__class__(
return self.__class__(
**copied_data,
context=cloned_context,
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
def _export_output(
self, result: str
) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]:
pydantic_output: Optional[BaseModel] = None
json_output: Optional[Dict[str, Any]] = None
) -> tuple[BaseModel | None, dict[str, Any] | None]:
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None
if self.output_pydantic or self.output_json:
model_output = convert_to_model(
@@ -764,7 +722,7 @@ Follow these guidelines:
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _save_file(self, result: Union[Dict, str, Any]) -> None:
def _save_file(self, result: dict | str | Any) -> None:
"""Save task output to a file.
Note:
@@ -785,7 +743,7 @@ Follow these guidelines:
if self.output_file is None:
raise ValueError("output_file is not set.")
FILEWRITER_RECOMMENDATION = (
filewriter_recommendation = (
"For cross-platform file writing, especially on Windows, "
"use FileWriterTool from crewai_tools package."
)
@@ -811,10 +769,10 @@ Follow these guidelines:
except (OSError, IOError) as e:
raise RuntimeError(
"\n".join(
[f"Failed to save output file: {e}", FILEWRITER_RECOMMENDATION]
[f"Failed to save output file: {e}", filewriter_recommendation]
)
)
return None
) from e
return
def __repr__(self):
return f"Task(description={self.description}, expected_output={self.expected_output})"