Compare commits

..

3 Commits

Author SHA1 Message Date
Devin AI
6cfc105d54 chore: re-trigger CI
Co-Authored-By: João <joao@crewai.com>
2026-01-12 06:19:45 +00:00
Devin AI
703e0f6191 fix: memory leak in execution_spans dictionary
Fix memory leak in EventListener where completed/failed tasks were never
fully removed from the execution_spans dictionary. Instead of removing
entries, the code was setting them to None, causing task objects to
remain referenced indefinitely and preventing garbage collection.

Changes:
- Replace 'self.execution_spans[source] = None' with
  'self.execution_spans.pop(source, None)' in both TaskCompletedEvent
  and TaskFailedEvent handlers
- Add comprehensive tests to verify execution_spans cleanup behavior

This fixes unbounded memory growth in long-running processes or systems
executing many tasks.

Fixes #4222

Co-Authored-By: João <joao@crewai.com>
2026-01-12 06:16:23 +00:00
Joao Moura
b858d705a8 updating docs
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
2026-01-11 16:02:55 -08:00
13 changed files with 425 additions and 224 deletions

View File

@@ -574,6 +574,10 @@ When you run this Flow, the output will change based on the random boolean value
### Human in the Loop (human feedback)
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**.
</Note>
The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
```python Code

View File

@@ -7,6 +7,10 @@ mode: "wide"
## Overview
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**. Make sure to update your installation before using this feature.
</Note>
The `@human_feedback` decorator enables human-in-the-loop (HITL) workflows directly within CrewAI Flows. It allows you to pause flow execution, present output to a human for review, collect their feedback, and optionally route to different listeners based on the feedback outcome.
This is particularly valuable for:

View File

@@ -11,10 +11,10 @@ Human-in-the-Loop (HITL) is a powerful approach that combines artificial intelli
CrewAI offers two main approaches for implementing human-in-the-loop workflows:
| Approach | Best For | Integration |
|----------|----------|-------------|
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) |
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide |
| Approach | Best For | Integration | Version |
|----------|----------|-------------|---------|
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) | **1.8.0+** |
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide | - |
<Tip>
If you're building flows and want to add human review steps with routing based on feedback, check out the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide for the `@human_feedback` decorator.

View File

@@ -567,6 +567,10 @@ Fourth method running
### Human in the Loop (인간 피드백)
<Note>
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다.
</Note>
`@human_feedback` 데코레이터는 인간의 피드백을 수집하기 위해 플로우 실행을 일시 중지하는 human-in-the-loop 워크플로우를 가능하게 합니다. 이는 승인 게이트, 품질 검토, 인간의 판단이 필요한 결정 지점에 유용합니다.
```python Code

View File

@@ -7,6 +7,10 @@ mode: "wide"
## 개요
<Note>
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다. 이 기능을 사용하기 전에 설치를 업데이트하세요.
</Note>
`@human_feedback` 데코레이터는 CrewAI Flow 내에서 직접 human-in-the-loop(HITL) 워크플로우를 가능하게 합니다. Flow 실행을 일시 중지하고, 인간에게 검토를 위해 출력을 제시하고, 피드백을 수집하고, 선택적으로 피드백 결과에 따라 다른 리스너로 라우팅할 수 있습니다.
이는 특히 다음과 같은 경우에 유용합니다:

View File

@@ -5,9 +5,22 @@ icon: "user-check"
mode: "wide"
---
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. 이 가이드에서는 CrewAI 내에서 HITL을 구현하는 방법을 안내합니다.
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. CrewAI는 필요에 따라 HITL을 구현하는 여러 가지 방법을 제공합니다.
## HITL 워크플로우 설정
## HITL 접근 방식 선택
CrewAI는 human-in-the-loop 워크플로우를 구현하기 위한 두 가지 주요 접근 방식을 제공합니다:
| 접근 방식 | 적합한 용도 | 통합 | 버전 |
|----------|----------|-------------|---------|
| **Flow 기반** (`@human_feedback` 데코레이터) | 로컬 개발, 콘솔 기반 검토, 동기식 워크플로우 | [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) | **1.8.0+** |
| **Webhook 기반** (Enterprise) | 프로덕션 배포, 비동기 워크플로우, 외부 통합 (Slack, Teams 등) | 이 가이드 | - |
<Tip>
Flow를 구축하면서 피드백을 기반으로 라우팅하는 인간 검토 단계를 추가하려면 `@human_feedback` 데코레이터에 대한 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) 가이드를 참조하세요.
</Tip>
## Webhook 기반 HITL 워크플로우 설정
<Steps>
<Step title="작업 구성">

View File

@@ -309,6 +309,10 @@ Ao executar esse Flow, a saída será diferente dependendo do valor booleano ale
### Human in the Loop (feedback humano)
<Note>
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**.
</Note>
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop, pausando a execução do flow para coletar feedback de um humano. Isso é útil para portões de aprovação, revisão de qualidade e pontos de decisão que requerem julgamento humano.
```python Code

View File

@@ -7,6 +7,10 @@ mode: "wide"
## Visão Geral
<Note>
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**. Certifique-se de atualizar sua instalação antes de usar este recurso.
</Note>
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback.
Isso é particularmente valioso para:

View File

@@ -5,9 +5,22 @@ icon: "user-check"
mode: "wide"
---
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. Este guia mostra como implementar HITL dentro da CrewAI.
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. CrewAI oferece várias maneiras de implementar HITL dependendo das suas necessidades.
## Configurando Workflows HITL
## Escolhendo Sua Abordagem HITL
CrewAI oferece duas abordagens principais para implementar workflows human-in-the-loop:
| Abordagem | Melhor Para | Integração | Versão |
|----------|----------|-------------|---------|
| **Baseada em Flow** (decorador `@human_feedback`) | Desenvolvimento local, revisão via console, workflows síncronos | [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) | **1.8.0+** |
| **Baseada em Webhook** (Enterprise) | Deployments em produção, workflows assíncronos, integrações externas (Slack, Teams, etc.) | Este guia | - |
<Tip>
Se você está construindo flows e deseja adicionar etapas de revisão humana com roteamento baseado em feedback, confira o guia [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) para o decorador `@human_feedback`.
</Tip>
## Configurando Workflows HITL Baseados em Webhook
<Steps>
<Step title="Configure sua Tarefa">

View File

@@ -209,10 +209,9 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
# Handle telemetry
span = self.execution_spans.get(source)
span = self.execution_spans.pop(source, None)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)
@@ -222,11 +221,10 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
span = self.execution_spans.get(source)
span = self.execution_spans.pop(source, None)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)

View File

@@ -341,7 +341,6 @@ class AccumulatedToolArgs(BaseModel):
class LLM(BaseLLM):
completion_cost: float | None = None
_callback_lock: threading.RLock = threading.RLock()
def __new__(cls, model: str, is_litellm: bool = False, **kwargs: Any) -> LLM:
"""Factory method that routes to native SDK or falls back to LiteLLM.
@@ -1145,7 +1144,7 @@ class LLM(BaseLLM):
if response_model:
params["response_model"] = response_model
response = litellm.completion(**params)
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1364,7 +1363,7 @@ class LLM(BaseLLM):
"""
full_response = ""
chunk_count = 0
usage_info = None
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict(
@@ -1658,92 +1657,78 @@ class LLM(BaseLLM):
raise ValueError("LLM call blocked by before_llm_call hook")
# --- 5) Set up callbacks if provided
# Use a class-level lock to synchronize access to global litellm callbacks.
# This prevents race conditions when multiple LLM instances call set_callbacks
# concurrently, which could cause callbacks to be removed before they fire.
with suppress_warnings():
with LLM._callback_lock:
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
logging.info(
"Retrying LLM call without the unsupported 'stop'"
)
# Recursive call happens inside the lock since we're using
# a reentrant-safe pattern (the lock is released when we
# exit the with block, and the recursive call will acquire
# it again)
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
raise
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
async def acall(
self,
@@ -1805,27 +1790,14 @@ class LLM(BaseLLM):
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
# Use a class-level lock to synchronize access to global litellm callbacks.
# This prevents race conditions when multiple LLM instances call set_callbacks
# concurrently, which could cause callbacks to be removed before they fire.
with suppress_warnings():
with LLM._callback_lock:
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(messages, tools)
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(messages, tools)
if self.stream:
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._ahandle_non_streaming_response(
if self.stream:
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
@@ -1833,49 +1805,52 @@ class LLM(BaseLLM):
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
return await self._ahandle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
logging.info(
"Retrying LLM call without the unsupported 'stop'"
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
logging.info("Retrying LLM call without the unsupported 'stop'")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
raise
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
def _handle_emit_call_events(
self,

View File

@@ -0,0 +1,243 @@
"""Tests for EventListener execution_spans cleanup to prevent memory leaks."""
import asyncio
from unittest.mock import MagicMock, patch
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.tasks.task_output import TaskOutput
class MockAgent:
"""Mock agent for testing."""
def __init__(self, role: str = "test_role"):
self.role = role
self.crew = MagicMock()
class MockTask:
"""Mock task for testing."""
def __init__(self, task_id: str = "test_task"):
self.id = task_id
self.name = "Test Task"
self.description = "A test task description"
self.agent = MockAgent()
@pytest.fixture
def event_listener():
"""Create a fresh EventListener instance for testing."""
EventListener._instance = None
EventListener._initialized = False
listener = EventListener()
listener.setup_listeners(crewai_event_bus)
return listener
@pytest.fixture
def mock_task():
"""Create a mock task for testing."""
return MockTask()
@pytest.fixture
def mock_task_output():
"""Create a mock task output for testing."""
return TaskOutput(
description="Test task description",
raw="Test output",
agent="test_agent",
)
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_completed(
event_listener, mock_task, mock_task_output
):
"""Test that execution_spans entries are properly removed when a task completes.
This test verifies the fix for the memory leak where completed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_failed(event_listener, mock_task):
"""Test that execution_spans entries are properly removed when a task fails.
This test verifies the fix for the memory leak where failed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_dict_size_does_not_grow_unbounded(
event_listener, mock_task_output
):
"""Test that execution_spans dictionary size remains bounded after many tasks.
This test simulates the memory leak scenario where many tasks complete/fail
and verifies that the dictionary doesn't grow unboundedly.
"""
num_tasks = 100
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_task_started.return_value = MagicMock()
for i in range(num_tasks):
task = MockTask(task_id=f"task_{i}")
start_event = TaskStartedEvent(context="test context", task=task)
future = crewai_event_bus.emit(task, start_event)
if future:
await asyncio.wrap_future(future)
if i % 2 == 0:
completed_event = TaskCompletedEvent(
output=mock_task_output, task=task
)
future = crewai_event_bus.emit(task, completed_event)
else:
failed_event = TaskFailedEvent(error="Test error", task=task)
future = crewai_event_bus.emit(task, failed_event)
if future:
await asyncio.wrap_future(future)
assert len(event_listener.execution_spans) == 0
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_gracefully(
event_listener, mock_task, mock_task_output
):
"""Test that completing/failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_on_failure_gracefully(
event_listener, mock_task
):
"""Test that failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_completion(
event_listener, mock_task, mock_task_output
):
"""Test that telemetry.task_ended is called with the correct span on completion."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_failure(
event_listener, mock_task
):
"""Test that telemetry.task_ended is called with the correct span on failure."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)

View File

@@ -1,6 +1,6 @@
import logging
import os
import threading
from time import sleep
from unittest.mock import MagicMock, patch
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
@@ -18,15 +18,9 @@ from pydantic import BaseModel
import pytest
# TODO: This test fails without print statement, which makes me think that something is happening asynchronously that we need to eventually fix and dive deeper into at a later date
@pytest.mark.vcr()
def test_llm_callback_replacement():
"""Test that callbacks are properly isolated between LLM instances.
This test verifies that the race condition fix (using _callback_lock) works
correctly. Previously, this test required a sleep(5) workaround because
callbacks were being modified globally without synchronization, causing
one LLM instance's callbacks to interfere with another's.
"""
llm1 = LLM(model="gpt-4o-mini", is_litellm=True)
llm2 = LLM(model="gpt-4o-mini", is_litellm=True)
@@ -43,6 +37,7 @@ def test_llm_callback_replacement():
messages=[{"role": "user", "content": "Hello, world from another agent!"}],
callbacks=[calc_handler_2],
)
sleep(5)
usage_metrics_2 = calc_handler_2.token_cost_process.get_summary()
# The first handler should not have been updated
@@ -51,66 +46,6 @@ def test_llm_callback_replacement():
assert usage_metrics_1 == calc_handler_1.token_cost_process.get_summary()
def test_llm_callback_lock_prevents_race_condition():
"""Test that the _callback_lock prevents race conditions in concurrent LLM calls.
This test verifies that multiple threads can safely call LLM.call() with
different callbacks without interfering with each other. The lock ensures
that callbacks are properly isolated between concurrent calls.
"""
num_threads = 5
results: list[int] = []
errors: list[Exception] = []
lock = threading.Lock()
def make_llm_call(thread_id: int, mock_completion: MagicMock) -> None:
try:
llm = LLM(model="gpt-4o-mini", is_litellm=True)
calc_handler = TokenCalcHandler(token_cost_process=TokenProcess())
mock_message = MagicMock()
mock_message.content = f"Response from thread {thread_id}"
mock_choice = MagicMock()
mock_choice.message = mock_message
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.usage = {
"prompt_tokens": 10,
"completion_tokens": 10,
"total_tokens": 20,
}
mock_completion.return_value = mock_response
llm.call(
messages=[{"role": "user", "content": f"Hello from thread {thread_id}"}],
callbacks=[calc_handler],
)
usage = calc_handler.token_cost_process.get_summary()
with lock:
results.append(usage.successful_requests)
except Exception as e:
with lock:
errors.append(e)
with patch("litellm.completion") as mock_completion:
threads = [
threading.Thread(target=make_llm_call, args=(i, mock_completion))
for i in range(num_threads)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Errors occurred: {errors}"
assert len(results) == num_threads
assert all(
r == 1 for r in results
), f"Expected all callbacks to have 1 successful request, got {results}"
@pytest.mark.vcr()
def test_llm_call_with_string_input():
llm = LLM(model="gpt-4o-mini")
@@ -1054,4 +989,4 @@ async def test_usage_info_streaming_with_acall():
assert llm._token_usage["completion_tokens"] > 0
assert llm._token_usage["total_tokens"] > 0
assert len(result) > 0
assert len(result) > 0