Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
6cfc105d54 chore: re-trigger CI
Co-Authored-By: João <joao@crewai.com>
2026-01-12 06:19:45 +00:00
Devin AI
703e0f6191 fix: memory leak in execution_spans dictionary
Fix memory leak in EventListener where completed/failed tasks were never
fully removed from the execution_spans dictionary. Instead of removing
entries, the code was setting them to None, causing task objects to
remain referenced indefinitely and preventing garbage collection.

Changes:
- Replace 'self.execution_spans[source] = None' with
  'self.execution_spans.pop(source, None)' in both TaskCompletedEvent
  and TaskFailedEvent handlers
- Add comprehensive tests to verify execution_spans cleanup behavior

This fixes unbounded memory growth in long-running processes or systems
executing many tasks.

Fixes #4222

Co-Authored-By: João <joao@crewai.com>
2026-01-12 06:16:23 +00:00
Joao Moura
b858d705a8 updating docs
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
2026-01-11 16:02:55 -08:00
11 changed files with 303 additions and 12 deletions

View File

@@ -574,6 +574,10 @@ When you run this Flow, the output will change based on the random boolean value
### Human in the Loop (human feedback)
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**.
</Note>
The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
```python Code

View File

@@ -7,6 +7,10 @@ mode: "wide"
## Overview
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**. Make sure to update your installation before using this feature.
</Note>
The `@human_feedback` decorator enables human-in-the-loop (HITL) workflows directly within CrewAI Flows. It allows you to pause flow execution, present output to a human for review, collect their feedback, and optionally route to different listeners based on the feedback outcome.
This is particularly valuable for:

View File

@@ -11,10 +11,10 @@ Human-in-the-Loop (HITL) is a powerful approach that combines artificial intelli
CrewAI offers two main approaches for implementing human-in-the-loop workflows:
| Approach | Best For | Integration |
|----------|----------|-------------|
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) |
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide |
| Approach | Best For | Integration | Version |
|----------|----------|-------------|---------|
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) | **1.8.0+** |
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide | - |
<Tip>
If you're building flows and want to add human review steps with routing based on feedback, check out the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide for the `@human_feedback` decorator.

View File

@@ -567,6 +567,10 @@ Fourth method running
### Human in the Loop (인간 피드백)
<Note>
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다.
</Note>
`@human_feedback` 데코레이터는 인간의 피드백을 수집하기 위해 플로우 실행을 일시 중지하는 human-in-the-loop 워크플로우를 가능하게 합니다. 이는 승인 게이트, 품질 검토, 인간의 판단이 필요한 결정 지점에 유용합니다.
```python Code

View File

@@ -7,6 +7,10 @@ mode: "wide"
## 개요
<Note>
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다. 이 기능을 사용하기 전에 설치를 업데이트하세요.
</Note>
`@human_feedback` 데코레이터는 CrewAI Flow 내에서 직접 human-in-the-loop(HITL) 워크플로우를 가능하게 합니다. Flow 실행을 일시 중지하고, 인간에게 검토를 위해 출력을 제시하고, 피드백을 수집하고, 선택적으로 피드백 결과에 따라 다른 리스너로 라우팅할 수 있습니다.
이는 특히 다음과 같은 경우에 유용합니다:

View File

@@ -5,9 +5,22 @@ icon: "user-check"
mode: "wide"
---
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. 이 가이드에서는 CrewAI 내에서 HITL을 구현하는 방법을 안내합니다.
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. CrewAI는 필요에 따라 HITL을 구현하는 여러 가지 방법을 제공합니다.
## HITL 워크플로우 설정
## HITL 접근 방식 선택
CrewAI는 human-in-the-loop 워크플로우를 구현하기 위한 두 가지 주요 접근 방식을 제공합니다:
| 접근 방식 | 적합한 용도 | 통합 | 버전 |
|----------|----------|-------------|---------|
| **Flow 기반** (`@human_feedback` 데코레이터) | 로컬 개발, 콘솔 기반 검토, 동기식 워크플로우 | [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) | **1.8.0+** |
| **Webhook 기반** (Enterprise) | 프로덕션 배포, 비동기 워크플로우, 외부 통합 (Slack, Teams 등) | 이 가이드 | - |
<Tip>
Flow를 구축하면서 피드백을 기반으로 라우팅하는 인간 검토 단계를 추가하려면 `@human_feedback` 데코레이터에 대한 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) 가이드를 참조하세요.
</Tip>
## Webhook 기반 HITL 워크플로우 설정
<Steps>
<Step title="작업 구성">

View File

@@ -309,6 +309,10 @@ Ao executar esse Flow, a saída será diferente dependendo do valor booleano ale
### Human in the Loop (feedback humano)
<Note>
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**.
</Note>
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop, pausando a execução do flow para coletar feedback de um humano. Isso é útil para portões de aprovação, revisão de qualidade e pontos de decisão que requerem julgamento humano.
```python Code

View File

@@ -7,6 +7,10 @@ mode: "wide"
## Visão Geral
<Note>
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**. Certifique-se de atualizar sua instalação antes de usar este recurso.
</Note>
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback.
Isso é particularmente valioso para:

View File

@@ -5,9 +5,22 @@ icon: "user-check"
mode: "wide"
---
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. Este guia mostra como implementar HITL dentro da CrewAI.
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. CrewAI oferece várias maneiras de implementar HITL dependendo das suas necessidades.
## Configurando Workflows HITL
## Escolhendo Sua Abordagem HITL
CrewAI oferece duas abordagens principais para implementar workflows human-in-the-loop:
| Abordagem | Melhor Para | Integração | Versão |
|----------|----------|-------------|---------|
| **Baseada em Flow** (decorador `@human_feedback`) | Desenvolvimento local, revisão via console, workflows síncronos | [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) | **1.8.0+** |
| **Baseada em Webhook** (Enterprise) | Deployments em produção, workflows assíncronos, integrações externas (Slack, Teams, etc.) | Este guia | - |
<Tip>
Se você está construindo flows e deseja adicionar etapas de revisão humana com roteamento baseado em feedback, confira o guia [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) para o decorador `@human_feedback`.
</Tip>
## Configurando Workflows HITL Baseados em Webhook
<Steps>
<Step title="Configure sua Tarefa">

View File

@@ -209,10 +209,9 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
# Handle telemetry
span = self.execution_spans.get(source)
span = self.execution_spans.pop(source, None)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)
@@ -222,11 +221,10 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
span = self.execution_spans.get(source)
span = self.execution_spans.pop(source, None)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)

View File

@@ -0,0 +1,243 @@
"""Tests for EventListener execution_spans cleanup to prevent memory leaks."""
import asyncio
from unittest.mock import MagicMock, patch
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.tasks.task_output import TaskOutput
class MockAgent:
"""Mock agent for testing."""
def __init__(self, role: str = "test_role"):
self.role = role
self.crew = MagicMock()
class MockTask:
"""Mock task for testing."""
def __init__(self, task_id: str = "test_task"):
self.id = task_id
self.name = "Test Task"
self.description = "A test task description"
self.agent = MockAgent()
@pytest.fixture
def event_listener():
"""Create a fresh EventListener instance for testing."""
EventListener._instance = None
EventListener._initialized = False
listener = EventListener()
listener.setup_listeners(crewai_event_bus)
return listener
@pytest.fixture
def mock_task():
"""Create a mock task for testing."""
return MockTask()
@pytest.fixture
def mock_task_output():
"""Create a mock task output for testing."""
return TaskOutput(
description="Test task description",
raw="Test output",
agent="test_agent",
)
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_completed(
event_listener, mock_task, mock_task_output
):
"""Test that execution_spans entries are properly removed when a task completes.
This test verifies the fix for the memory leak where completed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_failed(event_listener, mock_task):
"""Test that execution_spans entries are properly removed when a task fails.
This test verifies the fix for the memory leak where failed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_dict_size_does_not_grow_unbounded(
event_listener, mock_task_output
):
"""Test that execution_spans dictionary size remains bounded after many tasks.
This test simulates the memory leak scenario where many tasks complete/fail
and verifies that the dictionary doesn't grow unboundedly.
"""
num_tasks = 100
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_task_started.return_value = MagicMock()
for i in range(num_tasks):
task = MockTask(task_id=f"task_{i}")
start_event = TaskStartedEvent(context="test context", task=task)
future = crewai_event_bus.emit(task, start_event)
if future:
await asyncio.wrap_future(future)
if i % 2 == 0:
completed_event = TaskCompletedEvent(
output=mock_task_output, task=task
)
future = crewai_event_bus.emit(task, completed_event)
else:
failed_event = TaskFailedEvent(error="Test error", task=task)
future = crewai_event_bus.emit(task, failed_event)
if future:
await asyncio.wrap_future(future)
assert len(event_listener.execution_spans) == 0
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_gracefully(
event_listener, mock_task, mock_task_output
):
"""Test that completing/failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_on_failure_gracefully(
event_listener, mock_task
):
"""Test that failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_completion(
event_listener, mock_task, mock_task_output
):
"""Test that telemetry.task_ended is called with the correct span on completion."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_failure(
event_listener, mock_task
):
"""Test that telemetry.task_ended is called with the correct span on failure."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)