mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-07 15:18:29 +00:00
Compare commits
2 Commits
gl/chore/u
...
devin/1762
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce8462f2f3 | ||
|
|
f1bc535ff8 |
@@ -2,7 +2,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
from concurrent.futures import Future
|
||||
from concurrent.futures import Future, TimeoutError as FutureTimeoutError
|
||||
from copy import copy as shallow_copy
|
||||
from hashlib import md5
|
||||
import json
|
||||
@@ -39,6 +39,7 @@ from crewai.events.listeners.tracing.trace_listener import (
|
||||
TraceCollectionListener,
|
||||
)
|
||||
from crewai.events.listeners.tracing.utils import (
|
||||
is_tracing_disabled,
|
||||
is_tracing_enabled,
|
||||
should_auto_collect_first_time_traces,
|
||||
)
|
||||
@@ -315,7 +316,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self._cache_handler = CacheHandler()
|
||||
event_listener = EventListener() # type: ignore[no-untyped-call]
|
||||
|
||||
if (
|
||||
if not is_tracing_disabled() and (
|
||||
is_tracing_enabled()
|
||||
or self.tracing
|
||||
or should_auto_collect_first_time_traces()
|
||||
@@ -604,6 +605,34 @@ class Crew(FlowTrackable, BaseModel):
|
||||
CrewTrainingHandler(TRAINING_DATA_FILE).initialize_file()
|
||||
CrewTrainingHandler(filename).initialize_file()
|
||||
|
||||
def _wait_for_event_handlers(
|
||||
self, future: Future[None] | None, timeout: float = 30.0
|
||||
) -> None:
|
||||
"""Wait for event handlers to complete with timeout.
|
||||
|
||||
Args:
|
||||
future: Future returned from event bus emit, or None
|
||||
timeout: Maximum time to wait in seconds (default: 30.0)
|
||||
"""
|
||||
if future is None:
|
||||
return
|
||||
|
||||
try:
|
||||
future.result(timeout=timeout)
|
||||
except FutureTimeoutError:
|
||||
self._logger.log(
|
||||
"warning",
|
||||
f"Event handlers did not complete within {timeout}s timeout. "
|
||||
"This may indicate slow or blocked handlers.",
|
||||
color="yellow",
|
||||
)
|
||||
except Exception as e:
|
||||
self._logger.log(
|
||||
"warning",
|
||||
f"Error waiting for event handlers: {e}",
|
||||
color="yellow",
|
||||
)
|
||||
|
||||
def train(
|
||||
self, n_iterations: int, filename: str, inputs: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
@@ -671,10 +700,11 @@ class Crew(FlowTrackable, BaseModel):
|
||||
inputs = {}
|
||||
inputs = before_callback(inputs)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
future = crewai_event_bus.emit(
|
||||
self,
|
||||
CrewKickoffStartedEvent(crew_name=self.name, inputs=inputs),
|
||||
)
|
||||
self._wait_for_event_handlers(future)
|
||||
|
||||
# Starts the crew to work on its assigned tasks.
|
||||
self._task_output_handler.reset()
|
||||
@@ -717,10 +747,11 @@ class Crew(FlowTrackable, BaseModel):
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
future = crewai_event_bus.emit(
|
||||
self,
|
||||
CrewKickoffFailedEvent(error=str(e), crew_name=self.name),
|
||||
)
|
||||
self._wait_for_event_handlers(future)
|
||||
raise
|
||||
finally:
|
||||
detach(token)
|
||||
@@ -1162,7 +1193,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
final_string_output = final_task_output.raw
|
||||
self._finish_execution(final_string_output)
|
||||
self.token_usage = self.calculate_usage_metrics()
|
||||
crewai_event_bus.emit(
|
||||
future = crewai_event_bus.emit(
|
||||
self,
|
||||
CrewKickoffCompletedEvent(
|
||||
crew_name=self.name,
|
||||
@@ -1170,6 +1201,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
total_tokens=self.token_usage.total_tokens,
|
||||
),
|
||||
)
|
||||
self._wait_for_event_handlers(future)
|
||||
return CrewOutput(
|
||||
raw=final_task_output.raw,
|
||||
pydantic=final_task_output.pydantic,
|
||||
|
||||
@@ -27,6 +27,21 @@ def is_tracing_enabled() -> bool:
|
||||
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true"
|
||||
|
||||
|
||||
def is_tracing_disabled() -> bool:
|
||||
"""Check if tracing is explicitly disabled via environment variables.
|
||||
|
||||
Returns True if any of the disable flags are set to true.
|
||||
"""
|
||||
disable_flags = [
|
||||
"CREWAI_DISABLE_TRACING",
|
||||
"CREWAI_DISABLE_TRACKING",
|
||||
"OTEL_SDK_DISABLED",
|
||||
]
|
||||
return any(
|
||||
os.getenv(flag, "false").lower() == "true" for flag in disable_flags
|
||||
)
|
||||
|
||||
|
||||
def on_first_execution_tracing_confirmation() -> bool:
|
||||
if _is_test_environment():
|
||||
return False
|
||||
|
||||
176
lib/crewai/tests/test_kickoff_for_each_hang.py
Normal file
176
lib/crewai/tests/test_kickoff_for_each_hang.py
Normal file
@@ -0,0 +1,176 @@
|
||||
"""Test for issue #3871: kickoff_for_each() hang fix."""
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.crew_events import CrewKickoffCompletedEvent
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_crew():
|
||||
"""Create a simple crew for testing."""
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
return crew
|
||||
|
||||
|
||||
def test_kickoff_for_each_waits_for_event_handlers(simple_crew):
|
||||
"""Test that kickoff_for_each waits for event handlers to complete.
|
||||
|
||||
This test verifies the fix for issue #3871 by registering a slow
|
||||
sync handler and ensuring kickoff_for_each waits for it to complete.
|
||||
"""
|
||||
handler_completed = threading.Event()
|
||||
handler_call_count = 0
|
||||
|
||||
def slow_handler(source, event):
|
||||
nonlocal handler_call_count
|
||||
handler_call_count += 1
|
||||
time.sleep(0.1) # Simulate slow handler
|
||||
handler_completed.set()
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
crewai_event_bus.register_handler(
|
||||
CrewKickoffCompletedEvent,
|
||||
slow_handler,
|
||||
)
|
||||
|
||||
# Mock the task execution to avoid actual LLM calls
|
||||
from types import SimpleNamespace
|
||||
|
||||
def mock_execute_tasks(self, tasks):
|
||||
return [SimpleNamespace(raw="Test output", pydantic=None, json_dict=None)]
|
||||
|
||||
with patch.object(Crew, '_execute_tasks', mock_execute_tasks):
|
||||
start_time = time.time()
|
||||
results = simple_crew.kickoff_for_each(
|
||||
inputs=[{"test": "input1"}, {"test": "input2"}]
|
||||
)
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
# Verify results were returned
|
||||
assert len(results) == 2
|
||||
|
||||
# Verify handler was called for each kickoff
|
||||
assert handler_call_count == 2
|
||||
|
||||
# Verify the execution waited for handlers (should take at least 0.18s for 2 handlers)
|
||||
assert elapsed_time >= 0.18, (
|
||||
f"kickoff_for_each returned too quickly ({elapsed_time:.3f}s), "
|
||||
"suggesting it didn't wait for event handlers"
|
||||
)
|
||||
|
||||
# Verify handler completed
|
||||
assert handler_completed.is_set()
|
||||
|
||||
|
||||
def test_kickoff_waits_for_event_handlers_on_error(simple_crew):
|
||||
"""Test that kickoff waits for event handlers even when an error occurs."""
|
||||
handler_completed = threading.Event()
|
||||
|
||||
def error_handler(source, event):
|
||||
time.sleep(0.1) # Simulate slow handler
|
||||
handler_completed.set()
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
from crewai.events.types.crew_events import CrewKickoffFailedEvent
|
||||
crewai_event_bus.register_handler(
|
||||
CrewKickoffFailedEvent,
|
||||
error_handler,
|
||||
)
|
||||
|
||||
# Mock the task execution to raise an error
|
||||
with patch.object(Crew, '_run_sequential_process', side_effect=RuntimeError("Test error")):
|
||||
start_time = time.time()
|
||||
with pytest.raises(RuntimeError, match="Test error"):
|
||||
simple_crew.kickoff()
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
# Verify the execution waited for handlers (should take at least 0.09s)
|
||||
assert elapsed_time >= 0.09, (
|
||||
f"kickoff returned too quickly ({elapsed_time:.3f}s), "
|
||||
"suggesting it didn't wait for error event handlers"
|
||||
)
|
||||
|
||||
# Verify handler completed
|
||||
assert handler_completed.is_set()
|
||||
|
||||
|
||||
def test_tracing_disabled_flag_respected():
|
||||
"""Test that CREWAI_DISABLE_TRACING flag prevents tracing setup."""
|
||||
from crewai.events.listeners.tracing.utils import is_tracing_disabled
|
||||
|
||||
# Test with CREWAI_DISABLE_TRACING=true
|
||||
with patch.dict(os.environ, {"CREWAI_DISABLE_TRACING": "true"}):
|
||||
assert is_tracing_disabled() is True
|
||||
|
||||
# Test with OTEL_SDK_DISABLED=true
|
||||
with patch.dict(os.environ, {"OTEL_SDK_DISABLED": "true"}):
|
||||
assert is_tracing_disabled() is True
|
||||
|
||||
# Test with CREWAI_DISABLE_TRACKING=true
|
||||
with patch.dict(os.environ, {"CREWAI_DISABLE_TRACKING": "true"}):
|
||||
assert is_tracing_disabled() is True
|
||||
|
||||
# Test with no disable flags
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
assert is_tracing_disabled() is False
|
||||
|
||||
|
||||
def test_tracing_not_enabled_when_disabled_flag_set():
|
||||
"""Test that tracing is not enabled when disable flag is set."""
|
||||
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
|
||||
|
||||
# Mock TraceCollectionListener.setup_listeners to track if it's called
|
||||
with patch.object(TraceCollectionListener, 'setup_listeners') as mock_setup:
|
||||
with patch.dict(os.environ, {
|
||||
"CREWAI_DISABLE_TRACING": "true",
|
||||
"CREWAI_TESTING": "true", # Prevent first-time auto-collection
|
||||
}):
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
# Verify setup_listeners was not called
|
||||
mock_setup.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user