diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index f5af4a426..643d146cb 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -2,7 +2,7 @@ from __future__ import annotations import asyncio from collections.abc import Callable -from concurrent.futures import Future +from concurrent.futures import Future, TimeoutError as FutureTimeoutError from copy import copy as shallow_copy from hashlib import md5 import json @@ -39,6 +39,7 @@ from crewai.events.listeners.tracing.trace_listener import ( TraceCollectionListener, ) from crewai.events.listeners.tracing.utils import ( + is_tracing_disabled, is_tracing_enabled, should_auto_collect_first_time_traces, ) @@ -315,7 +316,7 @@ class Crew(FlowTrackable, BaseModel): self._cache_handler = CacheHandler() event_listener = EventListener() # type: ignore[no-untyped-call] - if ( + if not is_tracing_disabled() and ( is_tracing_enabled() or self.tracing or should_auto_collect_first_time_traces() @@ -604,6 +605,34 @@ class Crew(FlowTrackable, BaseModel): CrewTrainingHandler(TRAINING_DATA_FILE).initialize_file() CrewTrainingHandler(filename).initialize_file() + def _wait_for_event_handlers( + self, future: Future[None] | None, timeout: float = 30.0 + ) -> None: + """Wait for event handlers to complete with timeout. + + Args: + future: Future returned from event bus emit, or None + timeout: Maximum time to wait in seconds (default: 30.0) + """ + if future is None: + return + + try: + future.result(timeout=timeout) + except FutureTimeoutError: + self._logger.log( + "warning", + f"Event handlers did not complete within {timeout}s timeout. " + "This may indicate slow or blocked handlers.", + color="yellow", + ) + except Exception as e: + self._logger.log( + "warning", + f"Error waiting for event handlers: {e}", + color="yellow", + ) + def train( self, n_iterations: int, filename: str, inputs: dict[str, Any] | None = None ) -> None: @@ -671,10 +700,11 @@ class Crew(FlowTrackable, BaseModel): inputs = {} inputs = before_callback(inputs) - crewai_event_bus.emit( + future = crewai_event_bus.emit( self, CrewKickoffStartedEvent(crew_name=self.name, inputs=inputs), ) + self._wait_for_event_handlers(future) # Starts the crew to work on its assigned tasks. self._task_output_handler.reset() @@ -717,10 +747,11 @@ class Crew(FlowTrackable, BaseModel): return result except Exception as e: - crewai_event_bus.emit( + future = crewai_event_bus.emit( self, CrewKickoffFailedEvent(error=str(e), crew_name=self.name), ) + self._wait_for_event_handlers(future) raise finally: detach(token) @@ -1162,7 +1193,7 @@ class Crew(FlowTrackable, BaseModel): final_string_output = final_task_output.raw self._finish_execution(final_string_output) self.token_usage = self.calculate_usage_metrics() - crewai_event_bus.emit( + future = crewai_event_bus.emit( self, CrewKickoffCompletedEvent( crew_name=self.name, @@ -1170,6 +1201,7 @@ class Crew(FlowTrackable, BaseModel): total_tokens=self.token_usage.total_tokens, ), ) + self._wait_for_event_handlers(future) return CrewOutput( raw=final_task_output.raw, pydantic=final_task_output.pydantic, diff --git a/lib/crewai/src/crewai/events/listeners/tracing/utils.py b/lib/crewai/src/crewai/events/listeners/tracing/utils.py index 9c5a30a05..26c0e7f20 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/utils.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/utils.py @@ -27,6 +27,21 @@ def is_tracing_enabled() -> bool: return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true" +def is_tracing_disabled() -> bool: + """Check if tracing is explicitly disabled via environment variables. + + Returns True if any of the disable flags are set to true. + """ + disable_flags = [ + "CREWAI_DISABLE_TRACING", + "CREWAI_DISABLE_TRACKING", + "OTEL_SDK_DISABLED", + ] + return any( + os.getenv(flag, "false").lower() == "true" for flag in disable_flags + ) + + def on_first_execution_tracing_confirmation() -> bool: if _is_test_environment(): return False diff --git a/lib/crewai/tests/test_kickoff_for_each_hang.py b/lib/crewai/tests/test_kickoff_for_each_hang.py new file mode 100644 index 000000000..4bf02bdee --- /dev/null +++ b/lib/crewai/tests/test_kickoff_for_each_hang.py @@ -0,0 +1,185 @@ +"""Test for issue #3871: kickoff_for_each() hang fix.""" +import os +import threading +import time +from unittest.mock import Mock, patch + +import pytest + +from crewai import Agent, Crew, Task +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.crew_events import CrewKickoffCompletedEvent + + +@pytest.fixture +def simple_crew(): + """Create a simple crew for testing.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + verbose=False, + ) + + task = Task( + description="Test task", + expected_output="Test output", + agent=agent, + ) + + crew = Crew( + agents=[agent], + tasks=[task], + verbose=False, + ) + + return crew + + +def test_kickoff_for_each_waits_for_event_handlers(simple_crew): + """Test that kickoff_for_each waits for event handlers to complete. + + This test verifies the fix for issue #3871 by registering a slow + sync handler and ensuring kickoff_for_each waits for it to complete. + """ + handler_completed = threading.Event() + handler_call_count = 0 + + def slow_handler(source, event): + nonlocal handler_call_count + handler_call_count += 1 + time.sleep(0.1) # Simulate slow handler + handler_completed.set() + + with crewai_event_bus.scoped_handlers(): + crewai_event_bus.register_handler( + CrewKickoffCompletedEvent, + slow_handler, + ) + + # Mock the task execution to avoid actual LLM calls + with patch.object(simple_crew, '_run_sequential_process') as mock_run: + mock_output = Mock() + mock_output.raw = "Test output" + mock_output.pydantic = None + mock_output.json_dict = None + mock_run.return_value = Mock( + raw="Test output", + pydantic=None, + json_dict=None, + tasks_output=[mock_output], + token_usage=Mock(total_tokens=0), + ) + + start_time = time.time() + results = simple_crew.kickoff_for_each( + inputs=[{"test": "input1"}, {"test": "input2"}] + ) + elapsed_time = time.time() - start_time + + # Verify results were returned + assert len(results) == 2 + + # Verify handler was called for each kickoff + assert handler_call_count == 2 + + # Verify the execution waited for handlers (should take at least 0.2s for 2 handlers) + assert elapsed_time >= 0.2, ( + f"kickoff_for_each returned too quickly ({elapsed_time:.3f}s), " + "suggesting it didn't wait for event handlers" + ) + + # Verify handler completed + assert handler_completed.is_set() + + +def test_kickoff_waits_for_event_handlers_on_error(simple_crew): + """Test that kickoff waits for event handlers even when an error occurs.""" + handler_completed = threading.Event() + + def error_handler(source, event): + time.sleep(0.1) # Simulate slow handler + handler_completed.set() + + with crewai_event_bus.scoped_handlers(): + from crewai.events.types.crew_events import CrewKickoffFailedEvent + crewai_event_bus.register_handler( + CrewKickoffFailedEvent, + error_handler, + ) + + # Mock the task execution to raise an error + with patch.object(simple_crew, '_run_sequential_process') as mock_run: + mock_run.side_effect = RuntimeError("Test error") + + start_time = time.time() + with pytest.raises(RuntimeError, match="Test error"): + simple_crew.kickoff() + elapsed_time = time.time() - start_time + + # Verify the execution waited for handlers (should take at least 0.1s) + assert elapsed_time >= 0.1, ( + f"kickoff returned too quickly ({elapsed_time:.3f}s), " + "suggesting it didn't wait for error event handlers" + ) + + # Verify handler completed + assert handler_completed.is_set() + + +def test_tracing_disabled_flag_respected(): + """Test that CREWAI_DISABLE_TRACING flag prevents tracing setup.""" + from crewai.events.listeners.tracing.utils import is_tracing_disabled + + # Test with CREWAI_DISABLE_TRACING=true + with patch.dict(os.environ, {"CREWAI_DISABLE_TRACING": "true"}): + assert is_tracing_disabled() is True + + # Test with OTEL_SDK_DISABLED=true + with patch.dict(os.environ, {"OTEL_SDK_DISABLED": "true"}): + assert is_tracing_disabled() is True + + # Test with CREWAI_DISABLE_TRACKING=true + with patch.dict(os.environ, {"CREWAI_DISABLE_TRACKING": "true"}): + assert is_tracing_disabled() is True + + # Test with no disable flags + with patch.dict(os.environ, {}, clear=True): + assert is_tracing_disabled() is False + + +def test_tracing_not_enabled_when_disabled_flag_set(): + """Test that tracing is not enabled when disable flag is set.""" + from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener + + # Mock TraceCollectionListener.setup_listeners to track if it's called + with patch.object(TraceCollectionListener, 'setup_listeners') as mock_setup: + with patch.dict(os.environ, { + "CREWAI_DISABLE_TRACING": "true", + "CREWAI_TESTING": "true", # Prevent first-time auto-collection + }): + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + verbose=False, + ) + + task = Task( + description="Test task", + expected_output="Test output", + agent=agent, + ) + + crew = Crew( + agents=[agent], + tasks=[task], + verbose=False, + ) + + # Verify setup_listeners was not called + mock_setup.assert_not_called() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])