mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-15 02:58:30 +00:00
fix: memory leak in execution_spans dictionary
Fix memory leak in EventListener where completed/failed tasks were never fully removed from the execution_spans dictionary. Instead of removing entries, the code was setting them to None, causing task objects to remain referenced indefinitely and preventing garbage collection. Changes: - Replace 'self.execution_spans[source] = None' with 'self.execution_spans.pop(source, None)' in both TaskCompletedEvent and TaskFailedEvent handlers - Add comprehensive tests to verify execution_spans cleanup behavior This fixes unbounded memory growth in long-running processes or systems executing many tasks. Fixes #4222 Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -209,10 +209,9 @@ class EventListener(BaseEventListener):
|
||||
@crewai_event_bus.on(TaskCompletedEvent)
|
||||
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
|
||||
# Handle telemetry
|
||||
span = self.execution_spans.get(source)
|
||||
span = self.execution_spans.pop(source, None)
|
||||
if span:
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.execution_spans[source] = None
|
||||
|
||||
# Pass task name if it exists
|
||||
task_name = get_task_name(source)
|
||||
@@ -222,11 +221,10 @@ class EventListener(BaseEventListener):
|
||||
|
||||
@crewai_event_bus.on(TaskFailedEvent)
|
||||
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
|
||||
span = self.execution_spans.get(source)
|
||||
span = self.execution_spans.pop(source, None)
|
||||
if span:
|
||||
if source.agent and source.agent.crew:
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.execution_spans[source] = None
|
||||
|
||||
# Pass task name if it exists
|
||||
task_name = get_task_name(source)
|
||||
|
||||
243
lib/crewai/tests/events/test_event_listener_execution_spans.py
Normal file
243
lib/crewai/tests/events/test_event_listener_execution_spans.py
Normal file
@@ -0,0 +1,243 @@
|
||||
"""Tests for EventListener execution_spans cleanup to prevent memory leaks."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.event_listener import EventListener
|
||||
from crewai.events.types.task_events import (
|
||||
TaskCompletedEvent,
|
||||
TaskFailedEvent,
|
||||
TaskStartedEvent,
|
||||
)
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
class MockAgent:
|
||||
"""Mock agent for testing."""
|
||||
|
||||
def __init__(self, role: str = "test_role"):
|
||||
self.role = role
|
||||
self.crew = MagicMock()
|
||||
|
||||
|
||||
class MockTask:
|
||||
"""Mock task for testing."""
|
||||
|
||||
def __init__(self, task_id: str = "test_task"):
|
||||
self.id = task_id
|
||||
self.name = "Test Task"
|
||||
self.description = "A test task description"
|
||||
self.agent = MockAgent()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def event_listener():
|
||||
"""Create a fresh EventListener instance for testing."""
|
||||
EventListener._instance = None
|
||||
EventListener._initialized = False
|
||||
listener = EventListener()
|
||||
listener.setup_listeners(crewai_event_bus)
|
||||
return listener
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task():
|
||||
"""Create a mock task for testing."""
|
||||
return MockTask()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task_output():
|
||||
"""Create a mock task output for testing."""
|
||||
return TaskOutput(
|
||||
description="Test task description",
|
||||
raw="Test output",
|
||||
agent="test_agent",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_removed_on_task_completed(
|
||||
event_listener, mock_task, mock_task_output
|
||||
):
|
||||
"""Test that execution_spans entries are properly removed when a task completes.
|
||||
|
||||
This test verifies the fix for the memory leak where completed tasks were
|
||||
setting execution_spans[source] = None instead of removing the key entirely.
|
||||
"""
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
mock_span = MagicMock()
|
||||
mock_task_started.return_value = mock_span
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task in event_listener.execution_spans
|
||||
assert event_listener.execution_spans[mock_task] == mock_span
|
||||
|
||||
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, completed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_removed_on_task_failed(event_listener, mock_task):
|
||||
"""Test that execution_spans entries are properly removed when a task fails.
|
||||
|
||||
This test verifies the fix for the memory leak where failed tasks were
|
||||
setting execution_spans[source] = None instead of removing the key entirely.
|
||||
"""
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
mock_span = MagicMock()
|
||||
mock_task_started.return_value = mock_span
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task in event_listener.execution_spans
|
||||
assert event_listener.execution_spans[mock_task] == mock_span
|
||||
|
||||
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, failed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_dict_size_does_not_grow_unbounded(
|
||||
event_listener, mock_task_output
|
||||
):
|
||||
"""Test that execution_spans dictionary size remains bounded after many tasks.
|
||||
|
||||
This test simulates the memory leak scenario where many tasks complete/fail
|
||||
and verifies that the dictionary doesn't grow unboundedly.
|
||||
"""
|
||||
num_tasks = 100
|
||||
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
mock_task_started.return_value = MagicMock()
|
||||
|
||||
for i in range(num_tasks):
|
||||
task = MockTask(task_id=f"task_{i}")
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=task)
|
||||
future = crewai_event_bus.emit(task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
if i % 2 == 0:
|
||||
completed_event = TaskCompletedEvent(
|
||||
output=mock_task_output, task=task
|
||||
)
|
||||
future = crewai_event_bus.emit(task, completed_event)
|
||||
else:
|
||||
failed_event = TaskFailedEvent(error="Test error", task=task)
|
||||
future = crewai_event_bus.emit(task, failed_event)
|
||||
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert len(event_listener.execution_spans) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_handles_missing_task_gracefully(
|
||||
event_listener, mock_task, mock_task_output
|
||||
):
|
||||
"""Test that completing/failing a task not in execution_spans doesn't cause errors.
|
||||
|
||||
This ensures the fix using pop(source, None) handles edge cases gracefully.
|
||||
"""
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, completed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_handles_missing_task_on_failure_gracefully(
|
||||
event_listener, mock_task
|
||||
):
|
||||
"""Test that failing a task not in execution_spans doesn't cause errors.
|
||||
|
||||
This ensures the fix using pop(source, None) handles edge cases gracefully.
|
||||
"""
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, failed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_telemetry_task_ended_called_with_span_on_completion(
|
||||
event_listener, mock_task, mock_task_output
|
||||
):
|
||||
"""Test that telemetry.task_ended is called with the correct span on completion."""
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
|
||||
mock_span = MagicMock()
|
||||
mock_task_started.return_value = mock_span
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, completed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
mock_task_ended.assert_called_once_with(
|
||||
mock_span, mock_task, mock_task.agent.crew
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_telemetry_task_ended_called_with_span_on_failure(
|
||||
event_listener, mock_task
|
||||
):
|
||||
"""Test that telemetry.task_ended is called with the correct span on failure."""
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
|
||||
mock_span = MagicMock()
|
||||
mock_task_started.return_value = mock_span
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, failed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
mock_task_ended.assert_called_once_with(
|
||||
mock_span, mock_task, mock_task.agent.crew
|
||||
)
|
||||
Reference in New Issue
Block a user