Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
dba6aef61c Fix flow not terminating on CTRL+C (#2611)
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-04-15 19:12:52 +00:00
10 changed files with 84 additions and 76 deletions

View File

@@ -1,33 +0,0 @@
name: Notify Downstream
on:
push:
branches:
- main
permissions:
contents: read
jobs:
notify-downstream:
runs-on: ubuntu-latest
steps:
- name: Generate GitHub App token
id: app-token
uses: tibdex/github-app-token@v2
with:
app_id: ${{ secrets.OSS_SYNC_APP_ID }}
private_key: ${{ secrets.OSS_SYNC_APP_PRIVATE_KEY }}
- name: Notify Repo B
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ steps.app-token.outputs.token }}
repository: ${{ secrets.OSS_SYNC_DOWNSTREAM_REPO }}
event-type: upstream-commit
client-payload: |
{
"commit_sha": "${{ github.sha }}"
}

View File

@@ -288,20 +288,26 @@ To add a guardrail to a task, provide a validation function through the `guardra
```python Code
from typing import Tuple, Union, Dict, Any
from crewai import TaskOutput
def validate_blog_content(result: TaskOutput) -> Tuple[bool, Any]:
def validate_blog_content(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
"""Validate blog content meets requirements."""
try:
# Check word count
word_count = len(result.split())
if word_count > 200:
return (False, "Blog content exceeds 200 words")
return (False, {
"error": "Blog content exceeds 200 words",
"code": "WORD_COUNT_ERROR",
"context": {"word_count": word_count}
})
# Additional validation logic here
return (True, result.strip())
except Exception as e:
return (False, "Unexpected error during validation")
return (False, {
"error": "Unexpected error during validation",
"code": "SYSTEM_ERROR"
})
blog_task = Task(
description="Write a blog post about AI",
@@ -319,24 +325,29 @@ blog_task = Task(
- Type hints are recommended but optional
2. **Return Values**:
- On success: it returns a tuple of `(bool, Any)`. For example: `(True, validated_result)`
- On Failure: it returns a tuple of `(bool, str)`. For example: `(False, "Error message explain the failure")`
- Success: Return `(True, validated_result)`
- Failure: Return `(False, error_details)`
### Error Handling Best Practices
1. **Structured Error Responses**:
```python Code
from crewai import TaskOutput
def validate_with_context(result: TaskOutput) -> Tuple[bool, Any]:
def validate_with_context(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
try:
# Main validation logic
validated_data = perform_validation(result)
return (True, validated_data)
except ValidationError as e:
return (False, f"VALIDATION_ERROR: {str(e)}")
return (False, {
"error": str(e),
"code": "VALIDATION_ERROR",
"context": {"input": result}
})
except Exception as e:
return (False, str(e))
return (False, {
"error": "Unexpected error",
"code": "SYSTEM_ERROR"
})
```
2. **Error Categories**:
@@ -347,25 +358,28 @@ def validate_with_context(result: TaskOutput) -> Tuple[bool, Any]:
3. **Validation Chain**:
```python Code
from typing import Any, Dict, List, Tuple, Union
from crewai import TaskOutput
def complex_validation(result: TaskOutput) -> Tuple[bool, Any]:
def complex_validation(result: str) -> Tuple[bool, Union[str, Dict[str, Any]]]:
"""Chain multiple validation steps."""
# Step 1: Basic validation
if not result:
return (False, "Empty result")
return (False, {"error": "Empty result", "code": "EMPTY_INPUT"})
# Step 2: Content validation
try:
validated = validate_content(result)
if not validated:
return (False, "Invalid content")
return (False, {"error": "Invalid content", "code": "CONTENT_ERROR"})
# Step 3: Format validation
formatted = format_output(validated)
return (True, formatted)
except Exception as e:
return (False, str(e))
return (False, {
"error": str(e),
"code": "VALIDATION_ERROR",
"context": {"step": "content_validation"}
})
```
### Handling Guardrail Results
@@ -380,16 +394,19 @@ When a guardrail returns `(False, error)`:
Example with retry handling:
```python Code
from typing import Optional, Tuple, Union
from crewai import TaskOutput, Task
def validate_json_output(result: TaskOutput) -> Tuple[bool, Any]:
def validate_json_output(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
"""Validate and parse JSON output."""
try:
# Try to parse as JSON
data = json.loads(result)
return (True, data)
except json.JSONDecodeError as e:
return (False, "Invalid JSON format")
return (False, {
"error": "Invalid JSON format",
"code": "JSON_ERROR",
"context": {"line": e.lineno, "column": e.colno}
})
task = Task(
description="Generate a JSON report",

View File

@@ -535,7 +535,6 @@ class Agent(BaseAgent):
verbose=self.verbose,
response_format=response_format,
i18n=self.i18n,
original_agent=self,
)
return await lite_agent.kickoff_async(messages)

View File

@@ -273,9 +273,11 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
for attr_name in dir(module):
attr = getattr(module, attr_name)
try:
if callable(attr) and hasattr(attr, "crew"):
crew_instance = attr().crew()
return crew_instance
if isinstance(attr, Crew) and hasattr(attr, "kickoff"):
print(
f"Found valid crew object in attribute '{attr_name}' at {crew_os_path}."
)
return attr
except Exception as e:
print(f"Error processing attribute {attr_name}: {e}")

View File

@@ -1399,12 +1399,12 @@ class Crew(BaseModel):
RuntimeError: If the specified memory system fails to reset
"""
reset_functions = {
"long": (getattr(self, "_long_term_memory", None), "long term"),
"short": (getattr(self, "_short_term_memory", None), "short term"),
"entity": (getattr(self, "_entity_memory", None), "entity"),
"knowledge": (getattr(self, "knowledge", None), "knowledge"),
"kickoff_outputs": (getattr(self, "_task_output_handler", None), "task output"),
"external": (getattr(self, "_external_memory", None), "external"),
"long": (self._long_term_memory, "long term"),
"short": (self._short_term_memory, "short term"),
"entity": (self._entity_memory, "entity"),
"knowledge": (self.knowledge, "knowledge"),
"kickoff_outputs": (self._task_output_handler, "task output"),
"external": (self._external_memory, "external"),
}
memory_system, name = reset_functions[memory_type]

View File

@@ -719,7 +719,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
async def run_flow():
return await self.kickoff_async(inputs)
return asyncio.run(run_flow())
try:
return asyncio.run(run_flow())
except KeyboardInterrupt:
raise
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""

View File

@@ -21,7 +21,7 @@ class SQLiteFlowPersistence(FlowPersistence):
moderate performance requirements.
"""
db_path: str
db_path: str # Type annotation for instance variable
def __init__(self, db_path: Optional[str] = None):
"""Initialize SQLite persistence.

View File

@@ -1,2 +0,0 @@
CREWAI_TELEMETRY_BASE_URL: str = "https://telemetry.crewai.com:4319"
CREWAI_TELEMETRY_SERVICE_NAME: str = "crewAI-telemetry"

View File

@@ -9,11 +9,6 @@ from contextlib import contextmanager
from importlib.metadata import version
from typing import TYPE_CHECKING, Any, Optional
from crewai.telemetry.constants import (
CREWAI_TELEMETRY_BASE_URL,
CREWAI_TELEMETRY_SERVICE_NAME,
)
@contextmanager
def suppress_warnings():
@@ -57,15 +52,16 @@ class Telemetry:
return
try:
telemetry_endpoint = "https://telemetry.crewai.com:4319"
self.resource = Resource(
attributes={SERVICE_NAME: CREWAI_TELEMETRY_SERVICE_NAME},
attributes={SERVICE_NAME: "crewAI-telemetry"},
)
with suppress_warnings():
self.provider = TracerProvider(resource=self.resource)
processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=f"{CREWAI_TELEMETRY_BASE_URL}/v1/traces",
endpoint=f"{telemetry_endpoint}/v1/traces",
timeout=30,
)
)
@@ -79,12 +75,12 @@ class Telemetry:
):
raise # Re-raise the exception to not interfere with system signals
self.ready = False
def _is_telemetry_disabled(self) -> bool:
"""Check if telemetry should be disabled based on environment variables."""
return (
os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true"
or os.getenv("CREWAI_DISABLE_TELEMETRY", "false").lower() == "true"
os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true" or
os.getenv("CREWAI_DISABLE_TELEMETRY", "false").lower() == "true"
)
def set_tracer(self):

View File

@@ -755,3 +755,29 @@ def test_multiple_routers_from_same_trigger():
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)
def test_flow_keyboard_interrupt_handling():
"""Test that a flow properly terminates when a keyboard interrupt is received."""
execution_order = []
class KeyboardInterruptFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
raise KeyboardInterrupt()
@listen(step_2)
def step_3(self):
execution_order.append("step_3")
flow = KeyboardInterruptFlow()
with pytest.raises(KeyboardInterrupt):
flow.kickoff()
assert execution_order == ["step_1", "step_2"]