diff --git a/lib/crewai/src/crewai/events/signal_manager.py b/lib/crewai/src/crewai/events/signal_manager.py new file mode 100644 index 000000000..3d13ffa1b --- /dev/null +++ b/lib/crewai/src/crewai/events/signal_manager.py @@ -0,0 +1,134 @@ +"""System signal manager for CrewAI. + +This module provides a singleton manager that bridges OS signals to the CrewAI +event bus, independent of telemetry settings. This ensures that signal events +(SigTermEvent, SigIntEvent, etc.) are always emitted when signals are received, +regardless of whether telemetry is enabled or disabled. +""" + +from __future__ import annotations + +import logging +import signal +import threading +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self + +from crewai.events.event_bus import crewai_event_bus + + +if TYPE_CHECKING: + from collections.abc import Callable + + from crewai.events.base_events import BaseEvent + + EventFactory = Callable[[], BaseEvent] + +logger = logging.getLogger(__name__) + + +class SystemSignalManager: + """Singleton manager for bridging OS signals to the CrewAI event bus. + + This class registers signal handlers that emit corresponding events to the + event bus, allowing any code to listen for system signals via the event + system. It operates independently of telemetry settings. + + The manager supports handler chaining: when a signal handler is registered, + it preserves any previously registered handler and calls it after emitting + the event. This allows user code to register handlers before or after + CrewAI initialization. + + Attributes: + _instance: Singleton instance of the manager. + _lock: Thread lock for singleton initialization. + _original_handlers: Mapping of signals to their original handlers. + _registered_signals: Set of signals that have been registered. + """ + + _instance: Self | None = None + _lock: threading.Lock = threading.Lock() + + def __new__(cls) -> Self: + """Create or return the singleton instance. + + Returns: + The singleton SystemSignalManager instance. + """ + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self) -> None: + """Initialize the signal manager. + + This is safe to call multiple times; initialization only happens once. + """ + if getattr(self, "_initialized", False): + return + + self._initialized: bool = True + self._original_handlers: dict[signal.Signals, Any] = {} + self._registered_signals: set[signal.Signals] = set() + self._handler_lock = threading.Lock() + + def register_signal( + self, + sig: signal.Signals, + event_factory: EventFactory, + shutdown: bool = False, + ) -> None: + """Register a signal handler that emits an event to the event bus. + + This method can be called multiple times for the same signal. Each call + will re-read the current signal handler and wrap it, ensuring that any + handlers registered after the initial setup are still called. + + Args: + sig: The signal to handle (e.g., signal.SIGTERM). + event_factory: A callable that creates the event to emit. + shutdown: If True, raise SystemExit(0) after handling if there + was no original handler to call. + """ + with self._handler_lock: + try: + original_handler = signal.getsignal(sig) + self._original_handlers[sig] = original_handler + + def handler(signum: int, frame: Any) -> None: + crewai_event_bus.emit(self, event_factory()) + + if original_handler not in (signal.SIG_DFL, signal.SIG_IGN, None): + if callable(original_handler): + original_handler(signum, frame) + elif shutdown: + raise SystemExit(0) + + signal.signal(sig, handler) + self._registered_signals.add(sig) + except ValueError as e: + logger.warning( + f"Cannot register {sig.name} handler: not running in main thread", + exc_info=e, + ) + except OSError as e: + logger.warning(f"Cannot register {sig.name} handler: {e}", exc_info=e) + + def ensure_handlers_installed(self) -> None: + """Ensure signal handlers are installed, re-wrapping if necessary. + + This method can be called to reinstall signal handlers, which is useful + when user code has registered handlers after CrewAI's initial setup. + The handlers will be re-registered to wrap any new handlers that were + installed since the last registration. + + This is a no-op if called before any signals have been registered via + register_signal(). + """ + + +system_signal_manager: SystemSignalManager = SystemSignalManager() diff --git a/lib/crewai/src/crewai/events/types/system_events.py b/lib/crewai/src/crewai/events/types/system_events.py index b17b14c04..13abe49cf 100644 --- a/lib/crewai/src/crewai/events/types/system_events.py +++ b/lib/crewai/src/crewai/events/types/system_events.py @@ -100,3 +100,22 @@ def on_signal(func: T) -> T: for event_type in SIGNAL_EVENT_TYPES: crewai_event_bus.on(event_type)(func) return func + + +def _register_default_system_signal_handlers() -> None: + """Register default signal handlers that emit events to the event bus. + + This function is called at module import time to ensure signal events + are always available, regardless of telemetry settings. It bridges OS + signals to the CrewAI event bus so that @on_signal handlers work. + """ + from crewai.events.signal_manager import system_signal_manager + + system_signal_manager.register_signal(signal.SIGTERM, SigTermEvent, shutdown=True) + system_signal_manager.register_signal(signal.SIGINT, SigIntEvent, shutdown=True) + system_signal_manager.register_signal(signal.SIGHUP, SigHupEvent, shutdown=False) + system_signal_manager.register_signal(signal.SIGTSTP, SigTStpEvent, shutdown=False) + system_signal_manager.register_signal(signal.SIGCONT, SigContEvent, shutdown=False) + + +_register_default_system_signal_handlers() diff --git a/lib/crewai/src/crewai/telemetry/telemetry.py b/lib/crewai/src/crewai/telemetry/telemetry.py index 848ba1133..62a1549a9 100644 --- a/lib/crewai/src/crewai/telemetry/telemetry.py +++ b/lib/crewai/src/crewai/telemetry/telemetry.py @@ -16,7 +16,6 @@ import json import logging import os import platform -import signal import threading from typing import TYPE_CHECKING, Any @@ -35,11 +34,9 @@ from typing_extensions import Self from crewai.events.event_bus import crewai_event_bus from crewai.events.types.system_events import ( - SigContEvent, - SigHupEvent, SigIntEvent, - SigTStpEvent, SigTermEvent, + SignalEvent, ) from crewai.telemetry.constants import ( CREWAI_TELEMETRY_BASE_URL, @@ -131,7 +128,7 @@ class Telemetry: ) self.provider.add_span_processor(processor) - self._register_shutdown_handlers() + self._register_telemetry_shutdown_handlers() self.ready = True except Exception as e: if isinstance( @@ -166,55 +163,24 @@ class Telemetry: self.ready = False self.trace_set = False - def _register_shutdown_handlers(self) -> None: - """Register handlers for graceful shutdown on process exit and signals.""" + def _register_telemetry_shutdown_handlers(self) -> None: + """Register handlers for graceful telemetry shutdown on process exit and signals. + + This method registers: + 1. An atexit handler for normal process exit + 2. Event bus handlers for SIGTERM and SIGINT to flush telemetry before shutdown + + Note: The actual OS signal handlers are registered by SystemSignalManager + (via system_events module import), which emits events to the event bus. + Telemetry subscribes to these events to perform cleanup. + """ atexit.register(self._shutdown) - self._original_handlers: dict[int, Any] = {} + def _on_shutdown_signal(source: object, event: SignalEvent) -> None: + self._shutdown() - self._register_signal_handler(signal.SIGTERM, SigTermEvent, shutdown=True) - self._register_signal_handler(signal.SIGINT, SigIntEvent, shutdown=True) - self._register_signal_handler(signal.SIGHUP, SigHupEvent, shutdown=False) - self._register_signal_handler(signal.SIGTSTP, SigTStpEvent, shutdown=False) - self._register_signal_handler(signal.SIGCONT, SigContEvent, shutdown=False) - - def _register_signal_handler( - self, - sig: signal.Signals, - event_class: type, - shutdown: bool = False, - ) -> None: - """Register a signal handler that emits an event. - - Args: - sig: The signal to handle. - event_class: The event class to instantiate and emit. - shutdown: Whether to trigger shutdown on this signal. - """ - try: - original_handler = signal.getsignal(sig) - self._original_handlers[sig] = original_handler - - def handler(signum: int, frame: Any) -> None: - crewai_event_bus.emit(self, event_class()) - - if shutdown: - self._shutdown() - - if original_handler not in (signal.SIG_DFL, signal.SIG_IGN, None): - if callable(original_handler): - original_handler(signum, frame) - elif shutdown: - raise SystemExit(0) - - signal.signal(sig, handler) - except ValueError as e: - logger.warning( - f"Cannot register {sig.name} handler: not running in main thread", - exc_info=e, - ) - except OSError as e: - logger.warning(f"Cannot register {sig.name} handler: {e}", exc_info=e) + crewai_event_bus.on(SigTermEvent)(_on_shutdown_signal) + crewai_event_bus.on(SigIntEvent)(_on_shutdown_signal) def _shutdown(self) -> None: """Flush and shutdown the telemetry provider on process exit. diff --git a/lib/crewai/tests/events/test_signal_manager.py b/lib/crewai/tests/events/test_signal_manager.py new file mode 100644 index 000000000..b2b0777a9 --- /dev/null +++ b/lib/crewai/tests/events/test_signal_manager.py @@ -0,0 +1,283 @@ +"""Tests for SystemSignalManager and signal event decoupling from telemetry. + +These tests verify that: +1. Signal events work when telemetry is disabled +2. Signal handler chaining works correctly +3. SystemSignalManager properly bridges OS signals to the event bus +""" + +import os +import signal +import subprocess +import sys +import textwrap +import time +from unittest.mock import patch + +import pytest + +from crewai.events.event_bus import crewai_event_bus +from crewai.events.signal_manager import SystemSignalManager, system_signal_manager +from crewai.events.types.system_events import ( + SigTermEvent, + on_signal, +) + + +class TestSystemSignalManager: + """Tests for SystemSignalManager class.""" + + def test_singleton_pattern(self) -> None: + """Test that SystemSignalManager is a singleton.""" + manager1 = SystemSignalManager() + manager2 = SystemSignalManager() + assert manager1 is manager2 + + def test_global_instance_is_singleton(self) -> None: + """Test that the global system_signal_manager is the singleton instance.""" + manager = SystemSignalManager() + assert manager is system_signal_manager + + def test_register_signal_stores_original_handler(self) -> None: + """Test that register_signal stores the original handler.""" + manager = SystemSignalManager() + original = signal.getsignal(signal.SIGUSR1) + + try: + manager.register_signal(signal.SIGUSR1, SigTermEvent, shutdown=False) + assert signal.SIGUSR1 in manager._original_handlers + finally: + signal.signal(signal.SIGUSR1, original) + + def test_register_signal_emits_event(self) -> None: + """Test that registered signal handler emits event to event bus.""" + import threading + + received_events: list[SigTermEvent] = [] + condition = threading.Condition() + + @crewai_event_bus.on(SigTermEvent) + def handler(source: object, event: SigTermEvent) -> None: + with condition: + received_events.append(event) + condition.notify_all() + + manager = SystemSignalManager() + original = signal.getsignal(signal.SIGUSR1) + + try: + manager.register_signal(signal.SIGUSR1, SigTermEvent, shutdown=False) + os.kill(os.getpid(), signal.SIGUSR1) + + with condition: + condition.wait_for(lambda: len(received_events) >= 1, timeout=5.0) + + assert len(received_events) >= 1 + assert isinstance(received_events[0], SigTermEvent) + finally: + signal.signal(signal.SIGUSR1, original) + + +class TestSignalEventsWithTelemetryDisabled: + """Tests verifying signal events work when telemetry is disabled. + + These tests use subprocess to avoid interfering with pytest's signal handling. + """ + + @pytest.mark.timeout(30) + def test_on_signal_handler_fires_with_telemetry_disabled(self) -> None: + """Test that @on_signal handlers fire even when telemetry is disabled. + + This is the core fix for issue #4041: signal events should work + regardless of the CREWAI_DISABLE_TELEMETRY setting. + """ + script = textwrap.dedent(''' + import os + import sys + import time + + os.environ["CREWAI_DISABLE_TELEMETRY"] = "true" + + from crewai.events.types.system_events import SignalEvent, on_signal + + @on_signal + def user_signal_handler(source: object, event: SignalEvent) -> None: + print(f"[USER_HANDLER] Received event type={event.type}", flush=True) + + print(f"[READY] PID={os.getpid()}", flush=True) + sys.stdout.flush() + + while True: + time.sleep(0.1) + ''') + + env = os.environ.copy() + env["CREWAI_DISABLE_TELEMETRY"] = "true" + + proc = subprocess.Popen( + [sys.executable, "-c", script], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + text=True, + ) + + try: + ready_line = proc.stdout.readline() + assert "[READY]" in ready_line, f"Process not ready: {ready_line}" + + time.sleep(0.5) + proc.send_signal(signal.SIGTERM) + + stdout, stderr = proc.communicate(timeout=10) + full_output = ready_line + stdout + + assert "[USER_HANDLER]" in full_output, ( + f"User handler did not fire. Output: {full_output}, Stderr: {stderr}" + ) + finally: + if proc.poll() is None: + proc.kill() + proc.wait() + + @pytest.mark.timeout(30) + def test_sigint_handler_fires_with_telemetry_disabled(self) -> None: + """Test that SIGINT events work when telemetry is disabled.""" + script = textwrap.dedent(''' + import os + import sys + import time + + os.environ["CREWAI_DISABLE_TELEMETRY"] = "true" + + from crewai.events.types.system_events import SigIntEvent + from crewai.events.event_bus import crewai_event_bus + + @crewai_event_bus.on(SigIntEvent) + def sigint_handler(source: object, event: SigIntEvent) -> None: + print(f"[SIGINT_HANDLER] Received SIGINT event", flush=True) + + print(f"[READY] PID={os.getpid()}", flush=True) + sys.stdout.flush() + + while True: + time.sleep(0.1) + ''') + + env = os.environ.copy() + env["CREWAI_DISABLE_TELEMETRY"] = "true" + + proc = subprocess.Popen( + [sys.executable, "-c", script], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + text=True, + ) + + try: + ready_line = proc.stdout.readline() + assert "[READY]" in ready_line, f"Process not ready: {ready_line}" + + time.sleep(0.5) + proc.send_signal(signal.SIGINT) + + stdout, stderr = proc.communicate(timeout=10) + full_output = ready_line + stdout + + assert "[SIGINT_HANDLER]" in full_output, ( + f"SIGINT handler did not fire. Output: {full_output}, Stderr: {stderr}" + ) + finally: + if proc.poll() is None: + proc.kill() + proc.wait() + + +class TestSignalHandlerChaining: + """Tests verifying signal handler chaining works correctly. + + These tests verify that when user code registers a signal handler before + CrewAI, the CrewAI handler properly chains to the original handler. + """ + + @pytest.mark.timeout(30) + def test_baseline_handler_called_after_crewai_handler(self) -> None: + """Test that baseline OS handler is called after CrewAI emits the event. + + This tests the scenario where user code registers a signal handler + before CrewAI imports. The CrewAI handler should emit the event and + then call the original handler. + """ + script = textwrap.dedent(''' + import os + import signal + import sys + import time + from typing import Any + + def baseline_handler(signum: int, frame: Any) -> None: + print("[BASELINE_HANDLER] Signal received", flush=True) + + signal.signal(signal.SIGTERM, baseline_handler) + + from crewai.events.types.system_events import SignalEvent, on_signal + + @on_signal + def user_signal_handler(source: object, event: SignalEvent) -> None: + print(f"[USER_HANDLER] Received event type={event.type}", flush=True) + + print(f"[READY] PID={os.getpid()}", flush=True) + sys.stdout.flush() + + while True: + time.sleep(0.1) + ''') + + proc = subprocess.Popen( + [sys.executable, "-c", script], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + try: + ready_line = proc.stdout.readline() + assert "[READY]" in ready_line, f"Process not ready: {ready_line}" + + time.sleep(0.5) + proc.send_signal(signal.SIGTERM) + + time.sleep(1.0) + + proc.send_signal(signal.SIGKILL) + stdout, stderr = proc.communicate(timeout=5) + full_output = ready_line + stdout + + assert "[USER_HANDLER]" in full_output, ( + f"User handler did not fire. Output: {full_output}, Stderr: {stderr}" + ) + assert "[BASELINE_HANDLER]" in full_output, ( + f"Baseline handler did not fire. Output: {full_output}, Stderr: {stderr}" + ) + finally: + if proc.poll() is None: + proc.kill() + proc.wait() + + +class TestTelemetrySignalIntegration: + """Tests for Telemetry's integration with the signal event system.""" + + def test_telemetry_registers_shutdown_handlers_on_event_bus(self) -> None: + """Test that Telemetry registers shutdown handlers on the event bus.""" + with patch("crewai.telemetry.telemetry.TracerProvider"): + with patch("crewai.telemetry.telemetry.BatchSpanProcessor"): + with patch("crewai.telemetry.telemetry.SafeOTLPSpanExporter"): + with patch.dict(os.environ, {"CREWAI_DISABLE_TELEMETRY": "false"}): + from crewai.telemetry.telemetry import Telemetry + + Telemetry._instance = None + telemetry = Telemetry() + + assert telemetry.ready is True