mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-15 11:58:31 +00:00
feat: create sys event types and handler
feat: add system event types and handler chore: add tests and improve signal-related error logging
This commit is contained in:
@@ -71,6 +71,7 @@ from crewai.events.types.reasoning_events import (
|
||||
AgentReasoningFailedEvent,
|
||||
AgentReasoningStartedEvent,
|
||||
)
|
||||
from crewai.events.types.system_events import SignalEvent, on_signal
|
||||
from crewai.events.types.task_events import (
|
||||
TaskCompletedEvent,
|
||||
TaskFailedEvent,
|
||||
@@ -159,6 +160,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
self._register_flow_event_handlers(crewai_event_bus)
|
||||
self._register_context_event_handlers(crewai_event_bus)
|
||||
self._register_action_event_handlers(crewai_event_bus)
|
||||
self._register_system_event_handlers(crewai_event_bus)
|
||||
|
||||
self._listeners_setup = True
|
||||
|
||||
@@ -458,6 +460,15 @@ class TraceCollectionListener(BaseEventListener):
|
||||
) -> None:
|
||||
self._handle_action_event("knowledge_query_failed", source, event)
|
||||
|
||||
def _register_system_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
|
||||
"""Register handlers for system signal events (SIGTERM, SIGINT, etc.)."""
|
||||
|
||||
@on_signal
|
||||
def handle_signal(source: Any, event: SignalEvent) -> None:
|
||||
"""Flush trace batch on system signals to prevent data loss."""
|
||||
if self.batch_manager.is_batch_initialized():
|
||||
self.batch_manager.finalize_batch()
|
||||
|
||||
def _initialize_crew_batch(self, source: Any, event: Any) -> None:
|
||||
"""Initialize trace batch.
|
||||
|
||||
|
||||
102
lib/crewai/src/crewai/events/types/system_events.py
Normal file
102
lib/crewai/src/crewai/events/types/system_events.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""System signal event types for CrewAI.
|
||||
|
||||
This module contains event types for system-level signals like SIGTERM,
|
||||
allowing listeners to perform cleanup operations before process termination.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from enum import IntEnum
|
||||
import signal
|
||||
from typing import Annotated, Literal, TypeVar
|
||||
|
||||
from pydantic import Field, TypeAdapter
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
|
||||
|
||||
class SignalType(IntEnum):
|
||||
"""Enumeration of supported system signals."""
|
||||
|
||||
SIGTERM = signal.SIGTERM
|
||||
SIGINT = signal.SIGINT
|
||||
SIGHUP = signal.SIGHUP
|
||||
SIGTSTP = signal.SIGTSTP
|
||||
SIGCONT = signal.SIGCONT
|
||||
|
||||
|
||||
class SigTermEvent(BaseEvent):
|
||||
"""Event emitted when SIGTERM is received."""
|
||||
|
||||
type: Literal["SIGTERM"] = "SIGTERM"
|
||||
signal_number: SignalType = SignalType.SIGTERM
|
||||
reason: str | None = None
|
||||
|
||||
|
||||
class SigIntEvent(BaseEvent):
|
||||
"""Event emitted when SIGINT is received."""
|
||||
|
||||
type: Literal["SIGINT"] = "SIGINT"
|
||||
signal_number: SignalType = SignalType.SIGINT
|
||||
reason: str | None = None
|
||||
|
||||
|
||||
class SigHupEvent(BaseEvent):
|
||||
"""Event emitted when SIGHUP is received."""
|
||||
|
||||
type: Literal["SIGHUP"] = "SIGHUP"
|
||||
signal_number: SignalType = SignalType.SIGHUP
|
||||
reason: str | None = None
|
||||
|
||||
|
||||
class SigTStpEvent(BaseEvent):
|
||||
"""Event emitted when SIGTSTP is received.
|
||||
|
||||
Note: SIGSTOP cannot be caught - it immediately suspends the process.
|
||||
"""
|
||||
|
||||
type: Literal["SIGTSTP"] = "SIGTSTP"
|
||||
signal_number: SignalType = SignalType.SIGTSTP
|
||||
reason: str | None = None
|
||||
|
||||
|
||||
class SigContEvent(BaseEvent):
|
||||
"""Event emitted when SIGCONT is received."""
|
||||
|
||||
type: Literal["SIGCONT"] = "SIGCONT"
|
||||
signal_number: SignalType = SignalType.SIGCONT
|
||||
reason: str | None = None
|
||||
|
||||
|
||||
SignalEvent = Annotated[
|
||||
SigTermEvent | SigIntEvent | SigHupEvent | SigTStpEvent | SigContEvent,
|
||||
Field(discriminator="type"),
|
||||
]
|
||||
|
||||
signal_event_adapter: TypeAdapter[SignalEvent] = TypeAdapter(SignalEvent)
|
||||
|
||||
SIGNAL_EVENT_TYPES: tuple[type[BaseEvent], ...] = (
|
||||
SigTermEvent,
|
||||
SigIntEvent,
|
||||
SigHupEvent,
|
||||
SigTStpEvent,
|
||||
SigContEvent,
|
||||
)
|
||||
|
||||
|
||||
T = TypeVar("T", bound=Callable[[object, SignalEvent], None])
|
||||
|
||||
|
||||
def on_signal(func: T) -> T:
|
||||
"""Decorator to register a handler for all signal events.
|
||||
|
||||
Args:
|
||||
func: Handler function that receives (source, event) arguments.
|
||||
|
||||
Returns:
|
||||
The original function, registered for all signal event types.
|
||||
"""
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
|
||||
for event_type in SIGNAL_EVENT_TYPES:
|
||||
crewai_event_bus.on(event_type)(func)
|
||||
return func
|
||||
@@ -9,12 +9,14 @@ data is collected. Users can opt-in to share more complete data using the
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import atexit
|
||||
from collections.abc import Callable
|
||||
from importlib.metadata import version
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import signal
|
||||
import threading
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
@@ -31,6 +33,14 @@ from opentelemetry.sdk.trace.export import (
|
||||
from opentelemetry.trace import Span
|
||||
from typing_extensions import Self
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.system_events import (
|
||||
SigContEvent,
|
||||
SigHupEvent,
|
||||
SigIntEvent,
|
||||
SigTStpEvent,
|
||||
SigTermEvent,
|
||||
)
|
||||
from crewai.telemetry.constants import (
|
||||
CREWAI_TELEMETRY_BASE_URL,
|
||||
CREWAI_TELEMETRY_SERVICE_NAME,
|
||||
@@ -121,6 +131,7 @@ class Telemetry:
|
||||
)
|
||||
|
||||
self.provider.add_span_processor(processor)
|
||||
self._register_shutdown_handlers()
|
||||
self.ready = True
|
||||
except Exception as e:
|
||||
if isinstance(
|
||||
@@ -155,6 +166,71 @@ class Telemetry:
|
||||
self.ready = False
|
||||
self.trace_set = False
|
||||
|
||||
def _register_shutdown_handlers(self) -> None:
|
||||
"""Register handlers for graceful shutdown on process exit and signals."""
|
||||
atexit.register(self._shutdown)
|
||||
|
||||
self._original_handlers: dict[int, Any] = {}
|
||||
|
||||
self._register_signal_handler(signal.SIGTERM, SigTermEvent, shutdown=True)
|
||||
self._register_signal_handler(signal.SIGINT, SigIntEvent, shutdown=True)
|
||||
self._register_signal_handler(signal.SIGHUP, SigHupEvent, shutdown=False)
|
||||
self._register_signal_handler(signal.SIGTSTP, SigTStpEvent, shutdown=False)
|
||||
self._register_signal_handler(signal.SIGCONT, SigContEvent, shutdown=False)
|
||||
|
||||
def _register_signal_handler(
|
||||
self,
|
||||
sig: signal.Signals,
|
||||
event_class: type,
|
||||
shutdown: bool = False,
|
||||
) -> None:
|
||||
"""Register a signal handler that emits an event.
|
||||
|
||||
Args:
|
||||
sig: The signal to handle.
|
||||
event_class: The event class to instantiate and emit.
|
||||
shutdown: Whether to trigger shutdown on this signal.
|
||||
"""
|
||||
try:
|
||||
original_handler = signal.getsignal(sig)
|
||||
self._original_handlers[sig] = original_handler
|
||||
|
||||
def handler(signum: int, frame: Any) -> None:
|
||||
crewai_event_bus.emit(self, event_class())
|
||||
|
||||
if shutdown:
|
||||
self._shutdown()
|
||||
|
||||
if original_handler not in (signal.SIG_DFL, signal.SIG_IGN, None):
|
||||
if callable(original_handler):
|
||||
original_handler(signum, frame)
|
||||
elif shutdown:
|
||||
raise SystemExit(0)
|
||||
|
||||
signal.signal(sig, handler)
|
||||
except ValueError as e:
|
||||
logger.warning(
|
||||
f"Cannot register {sig.name} handler: not running in main thread",
|
||||
exc_info=e,
|
||||
)
|
||||
except OSError as e:
|
||||
logger.warning(f"Cannot register {sig.name} handler: {e}", exc_info=e)
|
||||
|
||||
def _shutdown(self) -> None:
|
||||
"""Flush and shutdown the telemetry provider on process exit.
|
||||
|
||||
Uses a short timeout to avoid blocking process shutdown.
|
||||
"""
|
||||
if not self.ready:
|
||||
return
|
||||
|
||||
try:
|
||||
self.provider.force_flush(timeout_millis=5000)
|
||||
self.provider.shutdown()
|
||||
self.ready = False
|
||||
except Exception as e:
|
||||
logger.debug(f"Telemetry shutdown failed: {e}")
|
||||
|
||||
def _safe_telemetry_operation(
|
||||
self, operation: Callable[[], Span | None]
|
||||
) -> Span | None:
|
||||
|
||||
197
lib/crewai/tests/events/types/test_system_events.py
Normal file
197
lib/crewai/tests/events/types/test_system_events.py
Normal file
@@ -0,0 +1,197 @@
|
||||
"""Tests for system signal events."""
|
||||
|
||||
import signal
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.system_events import (
|
||||
SIGNAL_EVENT_TYPES,
|
||||
SignalEvent,
|
||||
SignalType,
|
||||
SigContEvent,
|
||||
SigHupEvent,
|
||||
SigIntEvent,
|
||||
SigTermEvent,
|
||||
SigTStpEvent,
|
||||
on_signal,
|
||||
signal_event_adapter,
|
||||
)
|
||||
|
||||
|
||||
class TestSignalType:
|
||||
"""Tests for SignalType enum."""
|
||||
|
||||
def test_signal_type_values(self) -> None:
|
||||
"""Verify SignalType maps to correct signal numbers."""
|
||||
assert SignalType.SIGTERM == signal.SIGTERM
|
||||
assert SignalType.SIGINT == signal.SIGINT
|
||||
assert SignalType.SIGHUP == signal.SIGHUP
|
||||
assert SignalType.SIGTSTP == signal.SIGTSTP
|
||||
assert SignalType.SIGCONT == signal.SIGCONT
|
||||
|
||||
|
||||
class TestSignalEvents:
|
||||
"""Tests for individual signal event classes."""
|
||||
|
||||
def test_sigterm_event_defaults(self) -> None:
|
||||
"""Test SigTermEvent has correct defaults."""
|
||||
event = SigTermEvent()
|
||||
assert event.type == "SIGTERM"
|
||||
assert event.signal_number == SignalType.SIGTERM
|
||||
assert event.reason is None
|
||||
|
||||
def test_sigterm_event_with_reason(self) -> None:
|
||||
"""Test SigTermEvent can be created with a reason."""
|
||||
event = SigTermEvent(reason="graceful shutdown")
|
||||
assert event.reason == "graceful shutdown"
|
||||
|
||||
def test_sigint_event_defaults(self) -> None:
|
||||
"""Test SigIntEvent has correct defaults."""
|
||||
event = SigIntEvent()
|
||||
assert event.type == "SIGINT"
|
||||
assert event.signal_number == SignalType.SIGINT
|
||||
|
||||
def test_sighup_event_defaults(self) -> None:
|
||||
"""Test SigHupEvent has correct defaults."""
|
||||
event = SigHupEvent()
|
||||
assert event.type == "SIGHUP"
|
||||
assert event.signal_number == SignalType.SIGHUP
|
||||
|
||||
def test_sigtstp_event_defaults(self) -> None:
|
||||
"""Test SigTStpEvent has correct defaults."""
|
||||
event = SigTStpEvent()
|
||||
assert event.type == "SIGTSTP"
|
||||
assert event.signal_number == SignalType.SIGTSTP
|
||||
|
||||
def test_sigcont_event_defaults(self) -> None:
|
||||
"""Test SigContEvent has correct defaults."""
|
||||
event = SigContEvent()
|
||||
assert event.type == "SIGCONT"
|
||||
assert event.signal_number == SignalType.SIGCONT
|
||||
|
||||
|
||||
class TestSignalEventAdapter:
|
||||
"""Tests for the Pydantic discriminated union adapter."""
|
||||
|
||||
def test_adapter_parses_sigterm(self) -> None:
|
||||
"""Test adapter correctly parses SIGTERM event."""
|
||||
data = {"type": "SIGTERM", "reason": "test"}
|
||||
event = signal_event_adapter.validate_python(data)
|
||||
assert isinstance(event, SigTermEvent)
|
||||
assert event.reason == "test"
|
||||
|
||||
def test_adapter_parses_sigint(self) -> None:
|
||||
"""Test adapter correctly parses SIGINT event."""
|
||||
data = {"type": "SIGINT"}
|
||||
event = signal_event_adapter.validate_python(data)
|
||||
assert isinstance(event, SigIntEvent)
|
||||
|
||||
def test_adapter_parses_sighup(self) -> None:
|
||||
"""Test adapter correctly parses SIGHUP event."""
|
||||
data = {"type": "SIGHUP"}
|
||||
event = signal_event_adapter.validate_python(data)
|
||||
assert isinstance(event, SigHupEvent)
|
||||
|
||||
def test_adapter_parses_sigtstp(self) -> None:
|
||||
"""Test adapter correctly parses SIGTSTP event."""
|
||||
data = {"type": "SIGTSTP"}
|
||||
event = signal_event_adapter.validate_python(data)
|
||||
assert isinstance(event, SigTStpEvent)
|
||||
|
||||
def test_adapter_parses_sigcont(self) -> None:
|
||||
"""Test adapter correctly parses SIGCONT event."""
|
||||
data = {"type": "SIGCONT"}
|
||||
event = signal_event_adapter.validate_python(data)
|
||||
assert isinstance(event, SigContEvent)
|
||||
|
||||
def test_adapter_rejects_invalid_type(self) -> None:
|
||||
"""Test adapter rejects unknown signal type."""
|
||||
data = {"type": "SIGKILL"}
|
||||
with pytest.raises(Exception):
|
||||
signal_event_adapter.validate_python(data)
|
||||
|
||||
|
||||
class TestSignalEventTypes:
|
||||
"""Tests for SIGNAL_EVENT_TYPES constant."""
|
||||
|
||||
def test_contains_all_event_types(self) -> None:
|
||||
"""Verify SIGNAL_EVENT_TYPES contains all signal events."""
|
||||
assert SigTermEvent in SIGNAL_EVENT_TYPES
|
||||
assert SigIntEvent in SIGNAL_EVENT_TYPES
|
||||
assert SigHupEvent in SIGNAL_EVENT_TYPES
|
||||
assert SigTStpEvent in SIGNAL_EVENT_TYPES
|
||||
assert SigContEvent in SIGNAL_EVENT_TYPES
|
||||
assert len(SIGNAL_EVENT_TYPES) == 5
|
||||
|
||||
|
||||
class TestOnSignalDecorator:
|
||||
"""Tests for the @on_signal decorator."""
|
||||
|
||||
def test_decorator_registers_for_all_signals(self) -> None:
|
||||
"""Test that @on_signal registers handler for all signal event types."""
|
||||
import threading
|
||||
|
||||
received_types: set[str] = set()
|
||||
condition = threading.Condition()
|
||||
expected_count = len(SIGNAL_EVENT_TYPES)
|
||||
|
||||
@on_signal
|
||||
def test_handler(source: object, event: SignalEvent) -> None:
|
||||
with condition:
|
||||
received_types.add(event.type)
|
||||
condition.notify_all()
|
||||
|
||||
for event_class in SIGNAL_EVENT_TYPES:
|
||||
crewai_event_bus.emit(self, event_class())
|
||||
|
||||
with condition:
|
||||
condition.wait_for(lambda: len(received_types) >= expected_count, timeout=5.0)
|
||||
|
||||
assert "SIGTERM" in received_types
|
||||
assert "SIGINT" in received_types
|
||||
assert "SIGHUP" in received_types
|
||||
assert "SIGTSTP" in received_types
|
||||
assert "SIGCONT" in received_types
|
||||
|
||||
def test_decorator_returns_original_function(self) -> None:
|
||||
"""Test that @on_signal returns the original function."""
|
||||
|
||||
def my_handler(source: object, event: SignalEvent) -> None:
|
||||
pass
|
||||
|
||||
decorated = on_signal(my_handler)
|
||||
assert decorated is my_handler
|
||||
|
||||
def test_decorator_preserves_function_name(self) -> None:
|
||||
"""Test that @on_signal preserves function metadata."""
|
||||
|
||||
@on_signal
|
||||
def my_named_handler(source: object, event: SignalEvent) -> None:
|
||||
"""My docstring."""
|
||||
pass
|
||||
|
||||
assert my_named_handler.__name__ == "my_named_handler"
|
||||
assert my_named_handler.__doc__ == "My docstring."
|
||||
|
||||
|
||||
class TestSignalEventSerialization:
|
||||
"""Tests for event serialization."""
|
||||
|
||||
def test_sigterm_to_dict(self) -> None:
|
||||
"""Test SigTermEvent serializes correctly."""
|
||||
event = SigTermEvent(reason="test reason")
|
||||
data = event.model_dump()
|
||||
assert data["type"] == "SIGTERM"
|
||||
assert data["signal_number"] == signal.SIGTERM
|
||||
assert data["reason"] == "test reason"
|
||||
|
||||
def test_roundtrip_serialization(self) -> None:
|
||||
"""Test events can be serialized and deserialized."""
|
||||
original = SigTermEvent(reason="roundtrip test")
|
||||
serialized = original.model_dump()
|
||||
restored = signal_event_adapter.validate_python(serialized)
|
||||
assert isinstance(restored, SigTermEvent)
|
||||
assert restored.reason == original.reason
|
||||
assert restored.type == original.type
|
||||
Reference in New Issue
Block a user