fix: Resolve merge conflict and maintain organized imports

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2024-12-22 04:31:39 +00:00
4 changed files with 525 additions and 18 deletions

View File

@@ -6,7 +6,7 @@ icon: list-check
## Overview of a Task
In the CrewAI framework, a `Task` is a specific assignment completed by an `Agent`.
In the CrewAI framework, a `Task` is a specific assignment completed by an `Agent`.
Tasks provide all necessary details for execution, such as a description, the agent responsible, required tools, and more, facilitating a wide range of action complexities.
@@ -263,8 +263,148 @@ analysis_task = Task(
)
```
## Task Guardrails
Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
efeedback to agents when their output doesn't meet specific criteria.
### Using Task Guardrails
To add a guardrail to a task, provide a validation function through the `guardrail` parameter:
```python Code
from typing import Tuple, Union, Dict, Any
def validate_blog_content(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
"""Validate blog content meets requirements."""
try:
# Check word count
word_count = len(result.split())
if word_count > 200:
return (False, {
"error": "Blog content exceeds 200 words",
"code": "WORD_COUNT_ERROR",
"context": {"word_count": word_count}
})
# Additional validation logic here
return (True, result.strip())
except Exception as e:
return (False, {
"error": "Unexpected error during validation",
"code": "SYSTEM_ERROR"
})
blog_task = Task(
description="Write a blog post about AI",
expected_output="A blog post under 200 words",
agent=blog_agent,
guardrail=validate_blog_content # Add the guardrail function
)
```
### Guardrail Function Requirements
1. **Function Signature**:
- Must accept exactly one parameter (the task output)
- Should return a tuple of `(bool, Any)`
- Type hints are recommended but optional
2. **Return Values**:
- Success: Return `(True, validated_result)`
- Failure: Return `(False, error_details)`
### Error Handling Best Practices
1. **Structured Error Responses**:
```python Code
def validate_with_context(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
try:
# Main validation logic
validated_data = perform_validation(result)
return (True, validated_data)
except ValidationError as e:
return (False, {
"error": str(e),
"code": "VALIDATION_ERROR",
"context": {"input": result}
})
except Exception as e:
return (False, {
"error": "Unexpected error",
"code": "SYSTEM_ERROR"
})
```
2. **Error Categories**:
- Use specific error codes
- Include relevant context
- Provide actionable feedback
3. **Validation Chain**:
```python Code
from typing import Any, Dict, List, Tuple, Union
def complex_validation(result: str) -> Tuple[bool, Union[str, Dict[str, Any]]]:
"""Chain multiple validation steps."""
# Step 1: Basic validation
if not result:
return (False, {"error": "Empty result", "code": "EMPTY_INPUT"})
# Step 2: Content validation
try:
validated = validate_content(result)
if not validated:
return (False, {"error": "Invalid content", "code": "CONTENT_ERROR"})
# Step 3: Format validation
formatted = format_output(validated)
return (True, formatted)
except Exception as e:
return (False, {
"error": str(e),
"code": "VALIDATION_ERROR",
"context": {"step": "content_validation"}
})
```
### Handling Guardrail Results
When a guardrail returns `(False, error)`:
1. The error is sent back to the agent
2. The agent attempts to fix the issue
3. The process repeats until:
- The guardrail returns `(True, result)`
- Maximum retries are reached
Example with retry handling:
```python Code
from typing import Optional, Tuple, Union
def validate_json_output(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
"""Validate and parse JSON output."""
try:
# Try to parse as JSON
data = json.loads(result)
return (True, data)
except json.JSONDecodeError as e:
return (False, {
"error": "Invalid JSON format",
"code": "JSON_ERROR",
"context": {"line": e.lineno, "column": e.colno}
})
task = Task(
description="Generate a JSON report",
expected_output="A valid JSON object",
agent=analyst,
guardrail=validate_json_output,
max_retries=3 # Limit retry attempts
)
```
## Getting Structured Consistent Outputs from Tasks
When you need to ensure that a task outputs a structured and consistent format, you can use the `output_pydantic` or `output_json` properties on a task. These properties allow you to define the expected output structure, making it easier to parse and utilize the results in your application.
<Note>
It's also important to note that the output of the final task of a crew becomes the final output of the actual crew itself.
@@ -608,6 +748,114 @@ While creating and executing tasks, certain validation mechanisms are in place t
These validations help in maintaining the consistency and reliability of task executions within the crewAI framework.
## Task Guardrails
Task guardrails provide a powerful way to validate, transform, or filter task outputs before they are passed to the next task. Guardrails are optional functions that execute before the next task starts, allowing you to ensure that task outputs meet specific requirements or formats.
### Basic Usage
```python Code
from typing import Tuple, Union
from crewai import Task
def validate_json_output(result: str) -> Tuple[bool, Union[dict, str]]:
"""Validate that the output is valid JSON."""
try:
json_data = json.loads(result)
return (True, json_data)
except json.JSONDecodeError:
return (False, "Output must be valid JSON")
task = Task(
description="Generate JSON data",
expected_output="Valid JSON object",
guardrail=validate_json_output
)
```
### How Guardrails Work
1. **Optional Attribute**: Guardrails are an optional attribute at the task level, allowing you to add validation only where needed.
2. **Execution Timing**: The guardrail function is executed before the next task starts, ensuring valid data flow between tasks.
3. **Return Format**: Guardrails must return a tuple of `(success, data)`:
- If `success` is `True`, `data` is the validated/transformed result
- If `success` is `False`, `data` is the error message
4. **Result Routing**:
- On success (`True`), the result is automatically passed to the next task
- On failure (`False`), the error is sent back to the agent to generate a new answer
### Common Use Cases
#### Data Format Validation
```python Code
def validate_email_format(result: str) -> Tuple[bool, Union[str, str]]:
"""Ensure the output contains a valid email address."""
import re
email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
if re.match(email_pattern, result.strip()):
return (True, result.strip())
return (False, "Output must be a valid email address")
```
#### Content Filtering
```python Code
def filter_sensitive_info(result: str) -> Tuple[bool, Union[str, str]]:
"""Remove or validate sensitive information."""
sensitive_patterns = ['SSN:', 'password:', 'secret:']
for pattern in sensitive_patterns:
if pattern.lower() in result.lower():
return (False, f"Output contains sensitive information ({pattern})")
return (True, result)
```
#### Data Transformation
```python Code
def normalize_phone_number(result: str) -> Tuple[bool, Union[str, str]]:
"""Ensure phone numbers are in a consistent format."""
import re
digits = re.sub(r'\D', '', result)
if len(digits) == 10:
formatted = f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
return (True, formatted)
return (False, "Output must be a 10-digit phone number")
```
### Advanced Features
#### Chaining Multiple Validations
```python Code
def chain_validations(*validators):
"""Chain multiple validators together."""
def combined_validator(result):
for validator in validators:
success, data = validator(result)
if not success:
return (False, data)
result = data
return (True, result)
return combined_validator
# Usage
task = Task(
description="Get user contact info",
expected_output="Email and phone",
guardrail=chain_validations(
validate_email_format,
filter_sensitive_info
)
)
```
#### Custom Retry Logic
```python Code
task = Task(
description="Generate data",
expected_output="Valid data",
guardrail=validate_data,
max_retries=5 # Override default retry limit
)
```
## Creating Directories when Saving Files
You can now specify if a task should create directories when saving its output to a file. This is particularly useful for organizing outputs and ensuring that file paths are correctly structured.
@@ -629,7 +877,7 @@ save_output_task = Task(
## Conclusion
Tasks are the driving force behind the actions of agents in CrewAI.
By properly defining tasks and their outcomes, you set the stage for your AI agents to work effectively, either independently or as a collaborative unit.
Equipping tasks with appropriate tools, understanding the execution process, and following robust validation practices are crucial for maximizing CrewAI's potential,
Tasks are the driving force behind the actions of agents in CrewAI.
By properly defining tasks and their outcomes, you set the stage for your AI agents to work effectively, either independently or as a collaborative unit.
Equipping tasks with appropriate tools, understanding the execution process, and following robust validation practices are crucial for maximizing CrewAI's potential,
ensuring agents are effectively prepared for their assignments and that tasks are executed as intended.

View File

@@ -1,4 +1,5 @@
import datetime
import inspect
import json
import logging
import threading
@@ -7,18 +8,7 @@ from concurrent.futures import Future
from copy import copy
from hashlib import md5
from pathlib import Path
from typing import (
Any,
Callable,
ClassVar,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
)
from typing import Any, Callable, ClassVar, Dict, List, Optional, Set, Tuple, Type, Union
from opentelemetry.trace import Span
from pydantic import (
@@ -124,6 +114,55 @@ class Task(BaseModel):
default=None,
)
processed_by_agents: Set[str] = Field(default_factory=set)
guardrail: Optional[Callable[[TaskOutput], Tuple[bool, Any]]] = Field(
default=None,
description="Function to validate task output before proceeding to next task"
)
max_retries: int = Field(
default=3,
description="Maximum number of retries when guardrail fails"
)
retry_count: int = Field(
default=0,
description="Current number of retries"
)
@field_validator("guardrail")
@classmethod
def validate_guardrail_function(cls, v: Optional[Callable]) -> Optional[Callable]:
"""Validate that the guardrail function has the correct signature and behavior.
While type hints provide static checking, this validator ensures runtime safety by:
1. Verifying the function accepts exactly one parameter (the TaskOutput)
2. Checking return type annotations match Tuple[bool, Any] if present
3. Providing clear, immediate error messages for debugging
This runtime validation is crucial because:
- Type hints are optional and can be ignored at runtime
- Function signatures need immediate validation before task execution
- Clear error messages help users debug guardrail implementation issues
Args:
v: The guardrail function to validate
Returns:
The validated guardrail function
Raises:
ValueError: If the function signature is invalid or return annotation
doesn't match Tuple[bool, Any]
"""
if v is not None:
sig = inspect.signature(v)
if len(sig.parameters) != 1:
raise ValueError("Guardrail function must accept exactly one parameter")
# Check return annotation if present, but don't require it
return_annotation = sig.return_annotation
if return_annotation != inspect.Signature.empty:
if not (return_annotation == Tuple[bool, Any] or str(return_annotation) == 'Tuple[bool, Any]'):
raise ValueError("If return type is annotated, it must be Tuple[bool, Any]")
return v
_telemetry: Telemetry = PrivateAttr(default_factory=Telemetry)
_execution_span: Optional[Span] = PrivateAttr(default=None)
@@ -268,7 +307,6 @@ class Task(BaseModel):
)
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(
name=self.name,
description=self.description,
@@ -279,6 +317,37 @@ class Task(BaseModel):
agent=agent.role,
output_format=self._get_output_format(),
)
if self.guardrail:
guardrail_result = GuardrailResult.from_tuple(self.guardrail(task_output))
if not guardrail_result.success:
if self.retry_count >= self.max_retries:
raise Exception(
f"Task failed guardrail validation after {self.max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
self.retry_count += 1
context = (
f"### Previous attempt failed validation: {guardrail_result.error}\n\n\n"
f"### Previous result:\n{task_output.raw}\n\n\n"
"Try again, making sure to address the validation error."
)
return self._execute_core(agent, context, tools)
if guardrail_result.result is None:
raise Exception(
"Task guardrail returned None as result. This is not allowed."
)
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(guardrail_result.result)
task_output.pydantic = pydantic_output
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result
self.output = task_output
self._set_end_execution_time(start_time)

View File

@@ -0,0 +1,56 @@
"""
Module for handling task guardrail validation results.
This module provides the GuardrailResult class which standardizes
the way task guardrails return their validation results.
"""
from typing import Any, Optional, Tuple, Union
from pydantic import BaseModel, field_validator
class GuardrailResult(BaseModel):
"""Result from a task guardrail execution.
This class standardizes the return format of task guardrails,
converting tuple responses into a structured format that can
be easily handled by the task execution system.
Attributes:
success (bool): Whether the guardrail validation passed
result (Any, optional): The validated/transformed result if successful
error (str, optional): Error message if validation failed
"""
success: bool
result: Optional[Any] = None
error: Optional[str] = None
@field_validator("result", "error")
@classmethod
def validate_result_error_exclusivity(cls, v: Any, info) -> Any:
values = info.data
if "success" in values:
if values["success"] and v and "error" in values and values["error"]:
raise ValueError("Cannot have both result and error when success is True")
if not values["success"] and v and "result" in values and values["result"]:
raise ValueError("Cannot have both result and error when success is False")
return v
@classmethod
def from_tuple(cls, result: Tuple[bool, Union[Any, str]]) -> "GuardrailResult":
"""Create a GuardrailResult from a validation tuple.
Args:
result: A tuple of (success, data) where data is either
the validated result or error message.
Returns:
GuardrailResult: A new instance with the tuple data.
"""
success, data = result
return cls(
success=success,
result=data if success else None,
error=data if not success else None
)

View File

@@ -0,0 +1,134 @@
"""Tests for task guardrails functionality."""
from unittest.mock import Mock
import pytest
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
def test_task_without_guardrail():
"""Test that tasks work normally without guardrails (backward compatibility)."""
agent = Mock()
agent.role = "test_agent"
agent.execute_task.return_value = "test result"
agent.crew = None
task = Task(
description="Test task",
expected_output="Output"
)
result = task.execute_sync(agent=agent)
assert isinstance(result, TaskOutput)
assert result.raw == "test result"
def test_task_with_successful_guardrail():
"""Test that successful guardrail validation passes transformed result."""
def guardrail(result: TaskOutput):
return (True, result.raw.upper())
agent = Mock()
agent.role = "test_agent"
agent.execute_task.return_value = "test result"
agent.crew = None
task = Task(
description="Test task",
expected_output="Output",
guardrail=guardrail
)
result = task.execute_sync(agent=agent)
assert isinstance(result, TaskOutput)
assert result.raw == "TEST RESULT"
def test_task_with_failing_guardrail():
"""Test that failing guardrail triggers retry with error context."""
def guardrail(result: TaskOutput):
return (False, "Invalid format")
agent = Mock()
agent.role = "test_agent"
agent.execute_task.side_effect = [
"bad result",
"good result"
]
agent.crew = None
task = Task(
description="Test task",
expected_output="Output",
guardrail=guardrail,
max_retries=1
)
# First execution fails guardrail, second succeeds
agent.execute_task.side_effect = ["bad result", "good result"]
with pytest.raises(Exception) as exc_info:
task.execute_sync(agent=agent)
assert "Task failed guardrail validation" in str(exc_info.value)
assert task.retry_count == 1
def test_task_with_guardrail_retries():
"""Test that guardrail respects max_retries configuration."""
def guardrail(result: TaskOutput):
return (False, "Invalid format")
agent = Mock()
agent.role = "test_agent"
agent.execute_task.return_value = "bad result"
agent.crew = None
task = Task(
description="Test task",
expected_output="Output",
guardrail=guardrail,
max_retries=2
)
with pytest.raises(Exception) as exc_info:
task.execute_sync(agent=agent)
assert task.retry_count == 2
assert "Task failed guardrail validation after 2 retries" in str(exc_info.value)
assert "Invalid format" in str(exc_info.value)
def test_guardrail_error_in_context():
"""Test that guardrail error is passed in context for retry."""
def guardrail(result: TaskOutput):
return (False, "Expected JSON, got string")
agent = Mock()
agent.role = "test_agent"
agent.crew = None
task = Task(
description="Test task",
expected_output="Output",
guardrail=guardrail,
max_retries=1
)
# Mock execute_task to succeed on second attempt
first_call = True
def execute_task(task, context, tools):
nonlocal first_call
if first_call:
first_call = False
return "invalid"
return '{"valid": "json"}'
agent.execute_task.side_effect = execute_task
with pytest.raises(Exception) as exc_info:
task.execute_sync(agent=agent)
assert "Task failed guardrail validation" in str(exc_info.value)
assert "Expected JSON, got string" in str(exc_info.value)