mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 15:48:29 +00:00
Fix issue #3871: kickoff_for_each() hangs after completion
This commit fixes the threading issue where kickoff_for_each() would hang indefinitely after successful completion. The root cause was that event handlers scheduled in the ThreadPoolExecutor were not being awaited, leaving non-daemon worker threads active and preventing process exit. Changes: 1. Added _wait_for_event_handlers() helper method to safely wait on event bus futures with timeout (30s default) and proper error handling 2. Modified kickoff() to wait on all three lifecycle event emissions: - CrewKickoffStartedEvent - CrewKickoffCompletedEvent - CrewKickoffFailedEvent 3. Added is_tracing_disabled() helper to respect CREWAI_DISABLE_TRACING, CREWAI_DISABLE_TRACKING, and OTEL_SDK_DISABLED environment variables 4. Updated tracing enable logic to check disable flags first, preventing tracing setup when explicitly disabled 5. Added comprehensive tests covering: - kickoff_for_each() waiting for event handlers - kickoff() waiting for event handlers on error - Tracing disable flags being respected This ensures that each kickoff() call fully processes its event handlers before returning, preventing the accumulation of pending executor tasks that would block process exit in kickoff_for_each(). Fixes #3871 Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -2,7 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
from concurrent.futures import Future
|
||||
from concurrent.futures import Future, TimeoutError as FutureTimeoutError
|
||||
from copy import copy as shallow_copy
|
||||
from hashlib import md5
|
||||
import json
|
||||
@@ -39,6 +39,7 @@ from crewai.events.listeners.tracing.trace_listener import (
|
||||
TraceCollectionListener,
|
||||
)
|
||||
from crewai.events.listeners.tracing.utils import (
|
||||
is_tracing_disabled,
|
||||
is_tracing_enabled,
|
||||
should_auto_collect_first_time_traces,
|
||||
)
|
||||
@@ -315,7 +316,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self._cache_handler = CacheHandler()
|
||||
event_listener = EventListener() # type: ignore[no-untyped-call]
|
||||
|
||||
if (
|
||||
if not is_tracing_disabled() and (
|
||||
is_tracing_enabled()
|
||||
or self.tracing
|
||||
or should_auto_collect_first_time_traces()
|
||||
@@ -604,6 +605,34 @@ class Crew(FlowTrackable, BaseModel):
|
||||
CrewTrainingHandler(TRAINING_DATA_FILE).initialize_file()
|
||||
CrewTrainingHandler(filename).initialize_file()
|
||||
|
||||
def _wait_for_event_handlers(
|
||||
self, future: Future[None] | None, timeout: float = 30.0
|
||||
) -> None:
|
||||
"""Wait for event handlers to complete with timeout.
|
||||
|
||||
Args:
|
||||
future: Future returned from event bus emit, or None
|
||||
timeout: Maximum time to wait in seconds (default: 30.0)
|
||||
"""
|
||||
if future is None:
|
||||
return
|
||||
|
||||
try:
|
||||
future.result(timeout=timeout)
|
||||
except FutureTimeoutError:
|
||||
self._logger.log(
|
||||
"warning",
|
||||
f"Event handlers did not complete within {timeout}s timeout. "
|
||||
"This may indicate slow or blocked handlers.",
|
||||
color="yellow",
|
||||
)
|
||||
except Exception as e:
|
||||
self._logger.log(
|
||||
"warning",
|
||||
f"Error waiting for event handlers: {e}",
|
||||
color="yellow",
|
||||
)
|
||||
|
||||
def train(
|
||||
self, n_iterations: int, filename: str, inputs: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
@@ -671,10 +700,11 @@ class Crew(FlowTrackable, BaseModel):
|
||||
inputs = {}
|
||||
inputs = before_callback(inputs)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
future = crewai_event_bus.emit(
|
||||
self,
|
||||
CrewKickoffStartedEvent(crew_name=self.name, inputs=inputs),
|
||||
)
|
||||
self._wait_for_event_handlers(future)
|
||||
|
||||
# Starts the crew to work on its assigned tasks.
|
||||
self._task_output_handler.reset()
|
||||
@@ -717,10 +747,11 @@ class Crew(FlowTrackable, BaseModel):
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
future = crewai_event_bus.emit(
|
||||
self,
|
||||
CrewKickoffFailedEvent(error=str(e), crew_name=self.name),
|
||||
)
|
||||
self._wait_for_event_handlers(future)
|
||||
raise
|
||||
finally:
|
||||
detach(token)
|
||||
@@ -1162,7 +1193,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
final_string_output = final_task_output.raw
|
||||
self._finish_execution(final_string_output)
|
||||
self.token_usage = self.calculate_usage_metrics()
|
||||
crewai_event_bus.emit(
|
||||
future = crewai_event_bus.emit(
|
||||
self,
|
||||
CrewKickoffCompletedEvent(
|
||||
crew_name=self.name,
|
||||
@@ -1170,6 +1201,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
total_tokens=self.token_usage.total_tokens,
|
||||
),
|
||||
)
|
||||
self._wait_for_event_handlers(future)
|
||||
return CrewOutput(
|
||||
raw=final_task_output.raw,
|
||||
pydantic=final_task_output.pydantic,
|
||||
|
||||
@@ -27,6 +27,21 @@ def is_tracing_enabled() -> bool:
|
||||
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true"
|
||||
|
||||
|
||||
def is_tracing_disabled() -> bool:
|
||||
"""Check if tracing is explicitly disabled via environment variables.
|
||||
|
||||
Returns True if any of the disable flags are set to true.
|
||||
"""
|
||||
disable_flags = [
|
||||
"CREWAI_DISABLE_TRACING",
|
||||
"CREWAI_DISABLE_TRACKING",
|
||||
"OTEL_SDK_DISABLED",
|
||||
]
|
||||
return any(
|
||||
os.getenv(flag, "false").lower() == "true" for flag in disable_flags
|
||||
)
|
||||
|
||||
|
||||
def on_first_execution_tracing_confirmation() -> bool:
|
||||
if _is_test_environment():
|
||||
return False
|
||||
|
||||
185
lib/crewai/tests/test_kickoff_for_each_hang.py
Normal file
185
lib/crewai/tests/test_kickoff_for_each_hang.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""Test for issue #3871: kickoff_for_each() hang fix."""
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.crew_events import CrewKickoffCompletedEvent
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_crew():
|
||||
"""Create a simple crew for testing."""
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
return crew
|
||||
|
||||
|
||||
def test_kickoff_for_each_waits_for_event_handlers(simple_crew):
|
||||
"""Test that kickoff_for_each waits for event handlers to complete.
|
||||
|
||||
This test verifies the fix for issue #3871 by registering a slow
|
||||
sync handler and ensuring kickoff_for_each waits for it to complete.
|
||||
"""
|
||||
handler_completed = threading.Event()
|
||||
handler_call_count = 0
|
||||
|
||||
def slow_handler(source, event):
|
||||
nonlocal handler_call_count
|
||||
handler_call_count += 1
|
||||
time.sleep(0.1) # Simulate slow handler
|
||||
handler_completed.set()
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
crewai_event_bus.register_handler(
|
||||
CrewKickoffCompletedEvent,
|
||||
slow_handler,
|
||||
)
|
||||
|
||||
# Mock the task execution to avoid actual LLM calls
|
||||
with patch.object(simple_crew, '_run_sequential_process') as mock_run:
|
||||
mock_output = Mock()
|
||||
mock_output.raw = "Test output"
|
||||
mock_output.pydantic = None
|
||||
mock_output.json_dict = None
|
||||
mock_run.return_value = Mock(
|
||||
raw="Test output",
|
||||
pydantic=None,
|
||||
json_dict=None,
|
||||
tasks_output=[mock_output],
|
||||
token_usage=Mock(total_tokens=0),
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
results = simple_crew.kickoff_for_each(
|
||||
inputs=[{"test": "input1"}, {"test": "input2"}]
|
||||
)
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
# Verify results were returned
|
||||
assert len(results) == 2
|
||||
|
||||
# Verify handler was called for each kickoff
|
||||
assert handler_call_count == 2
|
||||
|
||||
# Verify the execution waited for handlers (should take at least 0.2s for 2 handlers)
|
||||
assert elapsed_time >= 0.2, (
|
||||
f"kickoff_for_each returned too quickly ({elapsed_time:.3f}s), "
|
||||
"suggesting it didn't wait for event handlers"
|
||||
)
|
||||
|
||||
# Verify handler completed
|
||||
assert handler_completed.is_set()
|
||||
|
||||
|
||||
def test_kickoff_waits_for_event_handlers_on_error(simple_crew):
|
||||
"""Test that kickoff waits for event handlers even when an error occurs."""
|
||||
handler_completed = threading.Event()
|
||||
|
||||
def error_handler(source, event):
|
||||
time.sleep(0.1) # Simulate slow handler
|
||||
handler_completed.set()
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
from crewai.events.types.crew_events import CrewKickoffFailedEvent
|
||||
crewai_event_bus.register_handler(
|
||||
CrewKickoffFailedEvent,
|
||||
error_handler,
|
||||
)
|
||||
|
||||
# Mock the task execution to raise an error
|
||||
with patch.object(simple_crew, '_run_sequential_process') as mock_run:
|
||||
mock_run.side_effect = RuntimeError("Test error")
|
||||
|
||||
start_time = time.time()
|
||||
with pytest.raises(RuntimeError, match="Test error"):
|
||||
simple_crew.kickoff()
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
# Verify the execution waited for handlers (should take at least 0.1s)
|
||||
assert elapsed_time >= 0.1, (
|
||||
f"kickoff returned too quickly ({elapsed_time:.3f}s), "
|
||||
"suggesting it didn't wait for error event handlers"
|
||||
)
|
||||
|
||||
# Verify handler completed
|
||||
assert handler_completed.is_set()
|
||||
|
||||
|
||||
def test_tracing_disabled_flag_respected():
|
||||
"""Test that CREWAI_DISABLE_TRACING flag prevents tracing setup."""
|
||||
from crewai.events.listeners.tracing.utils import is_tracing_disabled
|
||||
|
||||
# Test with CREWAI_DISABLE_TRACING=true
|
||||
with patch.dict(os.environ, {"CREWAI_DISABLE_TRACING": "true"}):
|
||||
assert is_tracing_disabled() is True
|
||||
|
||||
# Test with OTEL_SDK_DISABLED=true
|
||||
with patch.dict(os.environ, {"OTEL_SDK_DISABLED": "true"}):
|
||||
assert is_tracing_disabled() is True
|
||||
|
||||
# Test with CREWAI_DISABLE_TRACKING=true
|
||||
with patch.dict(os.environ, {"CREWAI_DISABLE_TRACKING": "true"}):
|
||||
assert is_tracing_disabled() is True
|
||||
|
||||
# Test with no disable flags
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
assert is_tracing_disabled() is False
|
||||
|
||||
|
||||
def test_tracing_not_enabled_when_disabled_flag_set():
|
||||
"""Test that tracing is not enabled when disable flag is set."""
|
||||
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
|
||||
|
||||
# Mock TraceCollectionListener.setup_listeners to track if it's called
|
||||
with patch.object(TraceCollectionListener, 'setup_listeners') as mock_setup:
|
||||
with patch.dict(os.environ, {
|
||||
"CREWAI_DISABLE_TRACING": "true",
|
||||
"CREWAI_TESTING": "true", # Prevent first-time auto-collection
|
||||
}):
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
# Verify setup_listeners was not called
|
||||
mock_setup.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user