Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
d2c4040dd6 fix: properly complete Future when async task execution fails
This fixes GitHub issue #4072 where an async task that errors would keep
its thread alive because the Future was never completed.

The issue was in the _execute_task_async method which didn't handle
exceptions from _execute_core. When an exception was raised, the
future.set_result() was never called, leaving the Future in an incomplete
state. This caused future.result() to block forever.

The fix wraps the _execute_core call in a try-except block and calls
future.set_exception(e) when an exception occurs, ensuring the Future
is always properly completed.

Added tests:
- test_execute_async_basic: Basic threaded async execution
- test_execute_async_exception_completes_future: Regression test for #4072
- test_execute_async_exception_sets_end_time: Verify end_time is set on error
- test_execute_async_exception_does_not_hang: Verify no hang on error

Co-Authored-By: João <joao@crewai.com>
2025-12-11 14:19:23 +00:00
Greyson LaLonde
8ef9fe2cab fix: check platform compat for windows signals 2025-12-11 08:38:19 -05:00
Alex Larionov
807f97114f fix: set rpm controller timer as daemon to prevent process hang
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2025-12-11 02:59:55 -05:00
6 changed files with 109 additions and 11 deletions

View File

@@ -19,9 +19,9 @@ class SignalType(IntEnum):
SIGTERM = signal.SIGTERM
SIGINT = signal.SIGINT
SIGHUP = signal.SIGHUP
SIGTSTP = signal.SIGTSTP
SIGCONT = signal.SIGCONT
SIGHUP = getattr(signal, "SIGHUP", 1)
SIGTSTP = getattr(signal, "SIGTSTP", 20)
SIGCONT = getattr(signal, "SIGCONT", 18)
class SigTermEvent(BaseEvent):

View File

@@ -494,7 +494,11 @@ class Task(BaseModel):
future: Future[TaskOutput],
) -> None:
"""Execute the task asynchronously with context handling."""
result = self._execute_core(agent, context, tools)
try:
result = self._execute_core(agent, context, tools)
except Exception as e:
future.set_exception(e)
return
future.set_result(result)
async def aexecute_sync(

View File

@@ -174,9 +174,12 @@ class Telemetry:
self._register_signal_handler(signal.SIGTERM, SigTermEvent, shutdown=True)
self._register_signal_handler(signal.SIGINT, SigIntEvent, shutdown=True)
self._register_signal_handler(signal.SIGHUP, SigHupEvent, shutdown=False)
self._register_signal_handler(signal.SIGTSTP, SigTStpEvent, shutdown=False)
self._register_signal_handler(signal.SIGCONT, SigContEvent, shutdown=False)
if hasattr(signal, "SIGHUP"):
self._register_signal_handler(signal.SIGHUP, SigHupEvent, shutdown=False)
if hasattr(signal, "SIGTSTP"):
self._register_signal_handler(signal.SIGTSTP, SigTStpEvent, shutdown=False)
if hasattr(signal, "SIGCONT"):
self._register_signal_handler(signal.SIGCONT, SigContEvent, shutdown=False)
def _register_signal_handler(
self,

View File

@@ -79,6 +79,7 @@ class RPMController(BaseModel):
self._current_rpm = 0
if not self._shutdown_flag:
self._timer = threading.Timer(60.0, self._reset_request_count)
self._timer.daemon = True
self._timer.start()
if self._lock:

View File

@@ -27,9 +27,9 @@ class TestSignalType:
"""Verify SignalType maps to correct signal numbers."""
assert SignalType.SIGTERM == signal.SIGTERM
assert SignalType.SIGINT == signal.SIGINT
assert SignalType.SIGHUP == signal.SIGHUP
assert SignalType.SIGTSTP == signal.SIGTSTP
assert SignalType.SIGCONT == signal.SIGCONT
assert SignalType.SIGHUP == getattr(signal, "SIGHUP", 1)
assert SignalType.SIGTSTP == getattr(signal, "SIGTSTP", 20)
assert SignalType.SIGCONT == getattr(signal, "SIGCONT", 18)
class TestSignalEvents:

View File

@@ -383,4 +383,94 @@ class TestAsyncTaskOutput:
assert result.description == "Test description"
assert result.expected_output == "Test expected"
assert result.raw == "Test result"
assert result.agent == "Test Agent"
assert result.agent == "Test Agent"
class TestThreadedAsyncExecution:
"""Tests for threaded async task execution (execute_async with Future)."""
@patch("crewai.Agent.execute_task")
def test_execute_async_basic(
self, mock_execute: MagicMock, test_agent: Agent
) -> None:
"""Test basic threaded async task execution."""
mock_execute.return_value = "Async task result"
task = Task(
description="Test task description",
expected_output="Test expected output",
agent=test_agent,
)
future = task.execute_async()
result = future.result(timeout=5)
assert result is not None
assert isinstance(result, TaskOutput)
assert result.raw == "Async task result"
assert result.agent == "Test Agent"
mock_execute.assert_called_once()
@patch("crewai.Agent.execute_task")
def test_execute_async_exception_completes_future(
self, mock_execute: MagicMock, test_agent: Agent
) -> None:
"""Test that execute_async properly completes the Future when an exception occurs.
This is a regression test for GitHub issue #4072 where an async task that
errors would keep its thread alive because the Future was never completed.
"""
mock_execute.side_effect = ValueError("Something happened here")
task = Task(
description="Test task description",
expected_output="Test expected output",
agent=test_agent,
)
future = task.execute_async()
with pytest.raises(ValueError) as exc_info:
future.result(timeout=5)
assert "Something happened here" in str(exc_info.value)
@patch("crewai.Agent.execute_task")
def test_execute_async_exception_sets_end_time(
self, mock_execute: MagicMock, test_agent: Agent
) -> None:
"""Test that execute_async sets end_time even when an exception occurs."""
mock_execute.side_effect = RuntimeError("Test error")
task = Task(
description="Test task description",
expected_output="Test expected output",
agent=test_agent,
)
future = task.execute_async()
with pytest.raises(RuntimeError):
future.result(timeout=5)
assert task.end_time is not None
@patch("crewai.Agent.execute_task")
def test_execute_async_exception_does_not_hang(
self, mock_execute: MagicMock, test_agent: Agent
) -> None:
"""Test that execute_async does not hang when an exception occurs.
This test verifies that the Future is properly completed with an exception,
allowing future.result() to return immediately instead of blocking forever.
"""
mock_execute.side_effect = Exception("Task execution failed")
task = Task(
description="Test task description",
expected_output="Test expected output",
agent=test_agent,
)
future = task.execute_async()
with pytest.raises(Exception) as exc_info:
future.result(timeout=1)
assert "Task execution failed" in str(exc_info.value)