feat: Add task guardrails feature

Add support for custom code guardrails in tasks that validate outputs
before proceeding to the next task. Features include:

- Optional task-level guardrail function
- Pre-next-task execution timing
- Tuple return format (success, data)
- Automatic result/error routing
- Configurable retry mechanism
- Comprehensive documentation and tests

Link to Devin run: https://app.devin.ai/sessions/39f6cfd6c5a24d25a7bd70ce070ed29a

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2024-12-11 05:01:27 +00:00
parent ec89e003c8
commit 8bfadec4bf
4 changed files with 325 additions and 1 deletions

View File

@@ -6,7 +6,7 @@ from concurrent.futures import Future
from copy import copy
from hashlib import md5
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union
from opentelemetry.trace import Span
from pydantic import (
@@ -22,6 +22,7 @@ from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.tasks.guardrail_result import GuardrailResult
from crewai.telemetry.telemetry import Telemetry
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
@@ -110,6 +111,18 @@ class Task(BaseModel):
default=None,
)
processed_by_agents: Set[str] = Field(default_factory=set)
guardrail: Optional[Callable[[Any], Tuple[bool, Any]]] = Field(
default=None,
description="Function to validate task output before proceeding to next task"
)
max_retries: int = Field(
default=3,
description="Maximum number of retries when guardrail fails"
)
retry_count: int = Field(
default=0,
description="Current number of retries"
)
_telemetry: Telemetry = PrivateAttr(default_factory=Telemetry)
_execution_span: Optional[Span] = PrivateAttr(default=None)
@@ -253,6 +266,22 @@ class Task(BaseModel):
tools=tools,
)
# Add guardrail validation
if self.guardrail:
guardrail_result = GuardrailResult.from_tuple(self.guardrail(result))
if not guardrail_result.success:
if self.retry_count >= self.max_retries:
raise Exception(
f"Task failed guardrail validation after {self.max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
self.retry_count += 1
context = f"Previous attempt failed validation: {guardrail_result.error}\nPlease try again."
return self._execute_core(agent, context, tools)
result = guardrail_result.result
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(

View File

@@ -0,0 +1,44 @@
"""
Module for handling task guardrail validation results.
This module provides the GuardrailResult class which standardizes
the way task guardrails return their validation results.
"""
from typing import Any, Optional, Tuple, Union
from pydantic import BaseModel
class GuardrailResult(BaseModel):
"""Result from a task guardrail execution.
This class standardizes the return format of task guardrails,
converting tuple responses into a structured format that can
be easily handled by the task execution system.
Attributes:
success (bool): Whether the guardrail validation passed
result (Any, optional): The validated/transformed result if successful
error (str, optional): Error message if validation failed
"""
success: bool
result: Optional[Any] = None
error: Optional[str] = None
@classmethod
def from_tuple(cls, result: Tuple[bool, Union[Any, str]]) -> "GuardrailResult":
"""Create a GuardrailResult from a validation tuple.
Args:
result: A tuple of (success, data) where data is either
the validated result or error message.
Returns:
GuardrailResult: A new instance with the tuple data.
"""
success, data = result
return cls(
success=success,
result=data if success else None,
error=data if not success else None
)