From 2355ec0733101f4ce4228a26c5820d7ebe2cd3a9 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Sun, 30 Nov 2025 17:44:40 -0500 Subject: [PATCH] feat: create sys event types and handler feat: add system event types and handler chore: add tests and improve signal-related error logging --- .../listeners/tracing/trace_listener.py | 11 + .../src/crewai/events/types/system_events.py | 102 +++++++++ lib/crewai/src/crewai/telemetry/telemetry.py | 76 +++++++ .../tests/events/types/test_system_events.py | 197 ++++++++++++++++++ 4 files changed, 386 insertions(+) create mode 100644 lib/crewai/src/crewai/events/types/system_events.py create mode 100644 lib/crewai/tests/events/types/test_system_events.py diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index f8cc43572..c8f7000cd 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -71,6 +71,7 @@ from crewai.events.types.reasoning_events import ( AgentReasoningFailedEvent, AgentReasoningStartedEvent, ) +from crewai.events.types.system_events import SignalEvent, on_signal from crewai.events.types.task_events import ( TaskCompletedEvent, TaskFailedEvent, @@ -159,6 +160,7 @@ class TraceCollectionListener(BaseEventListener): self._register_flow_event_handlers(crewai_event_bus) self._register_context_event_handlers(crewai_event_bus) self._register_action_event_handlers(crewai_event_bus) + self._register_system_event_handlers(crewai_event_bus) self._listeners_setup = True @@ -458,6 +460,15 @@ class TraceCollectionListener(BaseEventListener): ) -> None: self._handle_action_event("knowledge_query_failed", source, event) + def _register_system_event_handlers(self, event_bus: CrewAIEventsBus) -> None: + """Register handlers for system signal events (SIGTERM, SIGINT, etc.).""" + + @on_signal + def handle_signal(source: Any, event: SignalEvent) -> None: + """Flush trace batch on system signals to prevent data loss.""" + if self.batch_manager.is_batch_initialized(): + self.batch_manager.finalize_batch() + def _initialize_crew_batch(self, source: Any, event: Any) -> None: """Initialize trace batch. diff --git a/lib/crewai/src/crewai/events/types/system_events.py b/lib/crewai/src/crewai/events/types/system_events.py new file mode 100644 index 000000000..b17b14c04 --- /dev/null +++ b/lib/crewai/src/crewai/events/types/system_events.py @@ -0,0 +1,102 @@ +"""System signal event types for CrewAI. + +This module contains event types for system-level signals like SIGTERM, +allowing listeners to perform cleanup operations before process termination. +""" + +from collections.abc import Callable +from enum import IntEnum +import signal +from typing import Annotated, Literal, TypeVar + +from pydantic import Field, TypeAdapter + +from crewai.events.base_events import BaseEvent + + +class SignalType(IntEnum): + """Enumeration of supported system signals.""" + + SIGTERM = signal.SIGTERM + SIGINT = signal.SIGINT + SIGHUP = signal.SIGHUP + SIGTSTP = signal.SIGTSTP + SIGCONT = signal.SIGCONT + + +class SigTermEvent(BaseEvent): + """Event emitted when SIGTERM is received.""" + + type: Literal["SIGTERM"] = "SIGTERM" + signal_number: SignalType = SignalType.SIGTERM + reason: str | None = None + + +class SigIntEvent(BaseEvent): + """Event emitted when SIGINT is received.""" + + type: Literal["SIGINT"] = "SIGINT" + signal_number: SignalType = SignalType.SIGINT + reason: str | None = None + + +class SigHupEvent(BaseEvent): + """Event emitted when SIGHUP is received.""" + + type: Literal["SIGHUP"] = "SIGHUP" + signal_number: SignalType = SignalType.SIGHUP + reason: str | None = None + + +class SigTStpEvent(BaseEvent): + """Event emitted when SIGTSTP is received. + + Note: SIGSTOP cannot be caught - it immediately suspends the process. + """ + + type: Literal["SIGTSTP"] = "SIGTSTP" + signal_number: SignalType = SignalType.SIGTSTP + reason: str | None = None + + +class SigContEvent(BaseEvent): + """Event emitted when SIGCONT is received.""" + + type: Literal["SIGCONT"] = "SIGCONT" + signal_number: SignalType = SignalType.SIGCONT + reason: str | None = None + + +SignalEvent = Annotated[ + SigTermEvent | SigIntEvent | SigHupEvent | SigTStpEvent | SigContEvent, + Field(discriminator="type"), +] + +signal_event_adapter: TypeAdapter[SignalEvent] = TypeAdapter(SignalEvent) + +SIGNAL_EVENT_TYPES: tuple[type[BaseEvent], ...] = ( + SigTermEvent, + SigIntEvent, + SigHupEvent, + SigTStpEvent, + SigContEvent, +) + + +T = TypeVar("T", bound=Callable[[object, SignalEvent], None]) + + +def on_signal(func: T) -> T: + """Decorator to register a handler for all signal events. + + Args: + func: Handler function that receives (source, event) arguments. + + Returns: + The original function, registered for all signal event types. + """ + from crewai.events.event_bus import crewai_event_bus + + for event_type in SIGNAL_EVENT_TYPES: + crewai_event_bus.on(event_type)(func) + return func diff --git a/lib/crewai/src/crewai/telemetry/telemetry.py b/lib/crewai/src/crewai/telemetry/telemetry.py index e07fe1577..848ba1133 100644 --- a/lib/crewai/src/crewai/telemetry/telemetry.py +++ b/lib/crewai/src/crewai/telemetry/telemetry.py @@ -9,12 +9,14 @@ data is collected. Users can opt-in to share more complete data using the from __future__ import annotations import asyncio +import atexit from collections.abc import Callable from importlib.metadata import version import json import logging import os import platform +import signal import threading from typing import TYPE_CHECKING, Any @@ -31,6 +33,14 @@ from opentelemetry.sdk.trace.export import ( from opentelemetry.trace import Span from typing_extensions import Self +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.system_events import ( + SigContEvent, + SigHupEvent, + SigIntEvent, + SigTStpEvent, + SigTermEvent, +) from crewai.telemetry.constants import ( CREWAI_TELEMETRY_BASE_URL, CREWAI_TELEMETRY_SERVICE_NAME, @@ -121,6 +131,7 @@ class Telemetry: ) self.provider.add_span_processor(processor) + self._register_shutdown_handlers() self.ready = True except Exception as e: if isinstance( @@ -155,6 +166,71 @@ class Telemetry: self.ready = False self.trace_set = False + def _register_shutdown_handlers(self) -> None: + """Register handlers for graceful shutdown on process exit and signals.""" + atexit.register(self._shutdown) + + self._original_handlers: dict[int, Any] = {} + + self._register_signal_handler(signal.SIGTERM, SigTermEvent, shutdown=True) + self._register_signal_handler(signal.SIGINT, SigIntEvent, shutdown=True) + self._register_signal_handler(signal.SIGHUP, SigHupEvent, shutdown=False) + self._register_signal_handler(signal.SIGTSTP, SigTStpEvent, shutdown=False) + self._register_signal_handler(signal.SIGCONT, SigContEvent, shutdown=False) + + def _register_signal_handler( + self, + sig: signal.Signals, + event_class: type, + shutdown: bool = False, + ) -> None: + """Register a signal handler that emits an event. + + Args: + sig: The signal to handle. + event_class: The event class to instantiate and emit. + shutdown: Whether to trigger shutdown on this signal. + """ + try: + original_handler = signal.getsignal(sig) + self._original_handlers[sig] = original_handler + + def handler(signum: int, frame: Any) -> None: + crewai_event_bus.emit(self, event_class()) + + if shutdown: + self._shutdown() + + if original_handler not in (signal.SIG_DFL, signal.SIG_IGN, None): + if callable(original_handler): + original_handler(signum, frame) + elif shutdown: + raise SystemExit(0) + + signal.signal(sig, handler) + except ValueError as e: + logger.warning( + f"Cannot register {sig.name} handler: not running in main thread", + exc_info=e, + ) + except OSError as e: + logger.warning(f"Cannot register {sig.name} handler: {e}", exc_info=e) + + def _shutdown(self) -> None: + """Flush and shutdown the telemetry provider on process exit. + + Uses a short timeout to avoid blocking process shutdown. + """ + if not self.ready: + return + + try: + self.provider.force_flush(timeout_millis=5000) + self.provider.shutdown() + self.ready = False + except Exception as e: + logger.debug(f"Telemetry shutdown failed: {e}") + def _safe_telemetry_operation( self, operation: Callable[[], Span | None] ) -> Span | None: diff --git a/lib/crewai/tests/events/types/test_system_events.py b/lib/crewai/tests/events/types/test_system_events.py new file mode 100644 index 000000000..2109d428b --- /dev/null +++ b/lib/crewai/tests/events/types/test_system_events.py @@ -0,0 +1,197 @@ +"""Tests for system signal events.""" + +import signal +from unittest.mock import MagicMock, patch + +import pytest + +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.system_events import ( + SIGNAL_EVENT_TYPES, + SignalEvent, + SignalType, + SigContEvent, + SigHupEvent, + SigIntEvent, + SigTermEvent, + SigTStpEvent, + on_signal, + signal_event_adapter, +) + + +class TestSignalType: + """Tests for SignalType enum.""" + + def test_signal_type_values(self) -> None: + """Verify SignalType maps to correct signal numbers.""" + assert SignalType.SIGTERM == signal.SIGTERM + assert SignalType.SIGINT == signal.SIGINT + assert SignalType.SIGHUP == signal.SIGHUP + assert SignalType.SIGTSTP == signal.SIGTSTP + assert SignalType.SIGCONT == signal.SIGCONT + + +class TestSignalEvents: + """Tests for individual signal event classes.""" + + def test_sigterm_event_defaults(self) -> None: + """Test SigTermEvent has correct defaults.""" + event = SigTermEvent() + assert event.type == "SIGTERM" + assert event.signal_number == SignalType.SIGTERM + assert event.reason is None + + def test_sigterm_event_with_reason(self) -> None: + """Test SigTermEvent can be created with a reason.""" + event = SigTermEvent(reason="graceful shutdown") + assert event.reason == "graceful shutdown" + + def test_sigint_event_defaults(self) -> None: + """Test SigIntEvent has correct defaults.""" + event = SigIntEvent() + assert event.type == "SIGINT" + assert event.signal_number == SignalType.SIGINT + + def test_sighup_event_defaults(self) -> None: + """Test SigHupEvent has correct defaults.""" + event = SigHupEvent() + assert event.type == "SIGHUP" + assert event.signal_number == SignalType.SIGHUP + + def test_sigtstp_event_defaults(self) -> None: + """Test SigTStpEvent has correct defaults.""" + event = SigTStpEvent() + assert event.type == "SIGTSTP" + assert event.signal_number == SignalType.SIGTSTP + + def test_sigcont_event_defaults(self) -> None: + """Test SigContEvent has correct defaults.""" + event = SigContEvent() + assert event.type == "SIGCONT" + assert event.signal_number == SignalType.SIGCONT + + +class TestSignalEventAdapter: + """Tests for the Pydantic discriminated union adapter.""" + + def test_adapter_parses_sigterm(self) -> None: + """Test adapter correctly parses SIGTERM event.""" + data = {"type": "SIGTERM", "reason": "test"} + event = signal_event_adapter.validate_python(data) + assert isinstance(event, SigTermEvent) + assert event.reason == "test" + + def test_adapter_parses_sigint(self) -> None: + """Test adapter correctly parses SIGINT event.""" + data = {"type": "SIGINT"} + event = signal_event_adapter.validate_python(data) + assert isinstance(event, SigIntEvent) + + def test_adapter_parses_sighup(self) -> None: + """Test adapter correctly parses SIGHUP event.""" + data = {"type": "SIGHUP"} + event = signal_event_adapter.validate_python(data) + assert isinstance(event, SigHupEvent) + + def test_adapter_parses_sigtstp(self) -> None: + """Test adapter correctly parses SIGTSTP event.""" + data = {"type": "SIGTSTP"} + event = signal_event_adapter.validate_python(data) + assert isinstance(event, SigTStpEvent) + + def test_adapter_parses_sigcont(self) -> None: + """Test adapter correctly parses SIGCONT event.""" + data = {"type": "SIGCONT"} + event = signal_event_adapter.validate_python(data) + assert isinstance(event, SigContEvent) + + def test_adapter_rejects_invalid_type(self) -> None: + """Test adapter rejects unknown signal type.""" + data = {"type": "SIGKILL"} + with pytest.raises(Exception): + signal_event_adapter.validate_python(data) + + +class TestSignalEventTypes: + """Tests for SIGNAL_EVENT_TYPES constant.""" + + def test_contains_all_event_types(self) -> None: + """Verify SIGNAL_EVENT_TYPES contains all signal events.""" + assert SigTermEvent in SIGNAL_EVENT_TYPES + assert SigIntEvent in SIGNAL_EVENT_TYPES + assert SigHupEvent in SIGNAL_EVENT_TYPES + assert SigTStpEvent in SIGNAL_EVENT_TYPES + assert SigContEvent in SIGNAL_EVENT_TYPES + assert len(SIGNAL_EVENT_TYPES) == 5 + + +class TestOnSignalDecorator: + """Tests for the @on_signal decorator.""" + + def test_decorator_registers_for_all_signals(self) -> None: + """Test that @on_signal registers handler for all signal event types.""" + import threading + + received_types: set[str] = set() + condition = threading.Condition() + expected_count = len(SIGNAL_EVENT_TYPES) + + @on_signal + def test_handler(source: object, event: SignalEvent) -> None: + with condition: + received_types.add(event.type) + condition.notify_all() + + for event_class in SIGNAL_EVENT_TYPES: + crewai_event_bus.emit(self, event_class()) + + with condition: + condition.wait_for(lambda: len(received_types) >= expected_count, timeout=5.0) + + assert "SIGTERM" in received_types + assert "SIGINT" in received_types + assert "SIGHUP" in received_types + assert "SIGTSTP" in received_types + assert "SIGCONT" in received_types + + def test_decorator_returns_original_function(self) -> None: + """Test that @on_signal returns the original function.""" + + def my_handler(source: object, event: SignalEvent) -> None: + pass + + decorated = on_signal(my_handler) + assert decorated is my_handler + + def test_decorator_preserves_function_name(self) -> None: + """Test that @on_signal preserves function metadata.""" + + @on_signal + def my_named_handler(source: object, event: SignalEvent) -> None: + """My docstring.""" + pass + + assert my_named_handler.__name__ == "my_named_handler" + assert my_named_handler.__doc__ == "My docstring." + + +class TestSignalEventSerialization: + """Tests for event serialization.""" + + def test_sigterm_to_dict(self) -> None: + """Test SigTermEvent serializes correctly.""" + event = SigTermEvent(reason="test reason") + data = event.model_dump() + assert data["type"] == "SIGTERM" + assert data["signal_number"] == signal.SIGTERM + assert data["reason"] == "test reason" + + def test_roundtrip_serialization(self) -> None: + """Test events can be serialized and deserialized.""" + original = SigTermEvent(reason="roundtrip test") + serialized = original.model_dump() + restored = signal_event_adapter.validate_python(serialized) + assert isinstance(restored, SigTermEvent) + assert restored.reason == original.reason + assert restored.type == original.type \ No newline at end of file