mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-27 00:58:13 +00:00
Compare commits
2 Commits
devin/1768
...
devin/1768
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3add062b4b | ||
|
|
0a9fb72cfe |
@@ -574,10 +574,6 @@ When you run this Flow, the output will change based on the random boolean value
|
||||
|
||||
### Human in the Loop (human feedback)
|
||||
|
||||
<Note>
|
||||
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**.
|
||||
</Note>
|
||||
|
||||
The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
|
||||
|
||||
```python Code
|
||||
|
||||
@@ -7,10 +7,6 @@ mode: "wide"
|
||||
|
||||
## Overview
|
||||
|
||||
<Note>
|
||||
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**. Make sure to update your installation before using this feature.
|
||||
</Note>
|
||||
|
||||
The `@human_feedback` decorator enables human-in-the-loop (HITL) workflows directly within CrewAI Flows. It allows you to pause flow execution, present output to a human for review, collect their feedback, and optionally route to different listeners based on the feedback outcome.
|
||||
|
||||
This is particularly valuable for:
|
||||
|
||||
@@ -11,10 +11,10 @@ Human-in-the-Loop (HITL) is a powerful approach that combines artificial intelli
|
||||
|
||||
CrewAI offers two main approaches for implementing human-in-the-loop workflows:
|
||||
|
||||
| Approach | Best For | Integration | Version |
|
||||
|----------|----------|-------------|---------|
|
||||
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) | **1.8.0+** |
|
||||
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide | - |
|
||||
| Approach | Best For | Integration |
|
||||
|----------|----------|-------------|
|
||||
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) |
|
||||
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide |
|
||||
|
||||
<Tip>
|
||||
If you're building flows and want to add human review steps with routing based on feedback, check out the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide for the `@human_feedback` decorator.
|
||||
|
||||
@@ -567,10 +567,6 @@ Fourth method running
|
||||
|
||||
### Human in the Loop (인간 피드백)
|
||||
|
||||
<Note>
|
||||
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다.
|
||||
</Note>
|
||||
|
||||
`@human_feedback` 데코레이터는 인간의 피드백을 수집하기 위해 플로우 실행을 일시 중지하는 human-in-the-loop 워크플로우를 가능하게 합니다. 이는 승인 게이트, 품질 검토, 인간의 판단이 필요한 결정 지점에 유용합니다.
|
||||
|
||||
```python Code
|
||||
|
||||
@@ -7,10 +7,6 @@ mode: "wide"
|
||||
|
||||
## 개요
|
||||
|
||||
<Note>
|
||||
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다. 이 기능을 사용하기 전에 설치를 업데이트하세요.
|
||||
</Note>
|
||||
|
||||
`@human_feedback` 데코레이터는 CrewAI Flow 내에서 직접 human-in-the-loop(HITL) 워크플로우를 가능하게 합니다. Flow 실행을 일시 중지하고, 인간에게 검토를 위해 출력을 제시하고, 피드백을 수집하고, 선택적으로 피드백 결과에 따라 다른 리스너로 라우팅할 수 있습니다.
|
||||
|
||||
이는 특히 다음과 같은 경우에 유용합니다:
|
||||
|
||||
@@ -5,22 +5,9 @@ icon: "user-check"
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. CrewAI는 필요에 따라 HITL을 구현하는 여러 가지 방법을 제공합니다.
|
||||
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. 이 가이드에서는 CrewAI 내에서 HITL을 구현하는 방법을 안내합니다.
|
||||
|
||||
## HITL 접근 방식 선택
|
||||
|
||||
CrewAI는 human-in-the-loop 워크플로우를 구현하기 위한 두 가지 주요 접근 방식을 제공합니다:
|
||||
|
||||
| 접근 방식 | 적합한 용도 | 통합 | 버전 |
|
||||
|----------|----------|-------------|---------|
|
||||
| **Flow 기반** (`@human_feedback` 데코레이터) | 로컬 개발, 콘솔 기반 검토, 동기식 워크플로우 | [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) | **1.8.0+** |
|
||||
| **Webhook 기반** (Enterprise) | 프로덕션 배포, 비동기 워크플로우, 외부 통합 (Slack, Teams 등) | 이 가이드 | - |
|
||||
|
||||
<Tip>
|
||||
Flow를 구축하면서 피드백을 기반으로 라우팅하는 인간 검토 단계를 추가하려면 `@human_feedback` 데코레이터에 대한 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) 가이드를 참조하세요.
|
||||
</Tip>
|
||||
|
||||
## Webhook 기반 HITL 워크플로우 설정
|
||||
## HITL 워크플로우 설정
|
||||
|
||||
<Steps>
|
||||
<Step title="작업 구성">
|
||||
|
||||
@@ -309,10 +309,6 @@ Ao executar esse Flow, a saída será diferente dependendo do valor booleano ale
|
||||
|
||||
### Human in the Loop (feedback humano)
|
||||
|
||||
<Note>
|
||||
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**.
|
||||
</Note>
|
||||
|
||||
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop, pausando a execução do flow para coletar feedback de um humano. Isso é útil para portões de aprovação, revisão de qualidade e pontos de decisão que requerem julgamento humano.
|
||||
|
||||
```python Code
|
||||
|
||||
@@ -7,10 +7,6 @@ mode: "wide"
|
||||
|
||||
## Visão Geral
|
||||
|
||||
<Note>
|
||||
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**. Certifique-se de atualizar sua instalação antes de usar este recurso.
|
||||
</Note>
|
||||
|
||||
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback.
|
||||
|
||||
Isso é particularmente valioso para:
|
||||
|
||||
@@ -5,22 +5,9 @@ icon: "user-check"
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. CrewAI oferece várias maneiras de implementar HITL dependendo das suas necessidades.
|
||||
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. Este guia mostra como implementar HITL dentro da CrewAI.
|
||||
|
||||
## Escolhendo Sua Abordagem HITL
|
||||
|
||||
CrewAI oferece duas abordagens principais para implementar workflows human-in-the-loop:
|
||||
|
||||
| Abordagem | Melhor Para | Integração | Versão |
|
||||
|----------|----------|-------------|---------|
|
||||
| **Baseada em Flow** (decorador `@human_feedback`) | Desenvolvimento local, revisão via console, workflows síncronos | [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) | **1.8.0+** |
|
||||
| **Baseada em Webhook** (Enterprise) | Deployments em produção, workflows assíncronos, integrações externas (Slack, Teams, etc.) | Este guia | - |
|
||||
|
||||
<Tip>
|
||||
Se você está construindo flows e deseja adicionar etapas de revisão humana com roteamento baseado em feedback, confira o guia [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) para o decorador `@human_feedback`.
|
||||
</Tip>
|
||||
|
||||
## Configurando Workflows HITL Baseados em Webhook
|
||||
## Configurando Workflows HITL
|
||||
|
||||
<Steps>
|
||||
<Step title="Configure sua Tarefa">
|
||||
|
||||
@@ -209,9 +209,10 @@ class EventListener(BaseEventListener):
|
||||
@crewai_event_bus.on(TaskCompletedEvent)
|
||||
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
|
||||
# Handle telemetry
|
||||
span = self.execution_spans.pop(source, None)
|
||||
span = self.execution_spans.get(source)
|
||||
if span:
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.execution_spans[source] = None
|
||||
|
||||
# Pass task name if it exists
|
||||
task_name = get_task_name(source)
|
||||
@@ -221,10 +222,11 @@ class EventListener(BaseEventListener):
|
||||
|
||||
@crewai_event_bus.on(TaskFailedEvent)
|
||||
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
|
||||
span = self.execution_spans.pop(source, None)
|
||||
span = self.execution_spans.get(source)
|
||||
if span:
|
||||
if source.agent and source.agent.crew:
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.execution_spans[source] = None
|
||||
|
||||
# Pass task name if it exists
|
||||
task_name = get_task_name(source)
|
||||
|
||||
@@ -33,13 +33,24 @@ class LongTermMemory(Memory):
|
||||
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
|
||||
super().__init__(storage=storage)
|
||||
|
||||
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
|
||||
def save(
|
||||
self,
|
||||
value: LongTermMemoryItem,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
"""Save an item to long-term memory.
|
||||
|
||||
Args:
|
||||
value: The LongTermMemoryItem to save.
|
||||
metadata: Optional metadata dict (not used, metadata is extracted from the
|
||||
LongTermMemoryItem). Included for supertype compatibility.
|
||||
"""
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveStartedEvent(
|
||||
value=item.task,
|
||||
metadata=item.metadata,
|
||||
agent_role=item.agent,
|
||||
value=value.task,
|
||||
metadata=value.metadata,
|
||||
agent_role=value.agent,
|
||||
source_type="long_term_memory",
|
||||
from_agent=self.agent,
|
||||
from_task=self.task,
|
||||
@@ -48,23 +59,23 @@ class LongTermMemory(Memory):
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
metadata = item.metadata
|
||||
metadata.update(
|
||||
{"agent": item.agent, "expected_output": item.expected_output}
|
||||
item_metadata = value.metadata
|
||||
item_metadata.update(
|
||||
{"agent": value.agent, "expected_output": value.expected_output}
|
||||
)
|
||||
self.storage.save(
|
||||
task_description=item.task,
|
||||
score=metadata["quality"],
|
||||
metadata=metadata,
|
||||
datetime=item.datetime,
|
||||
task_description=value.task,
|
||||
score=item_metadata["quality"],
|
||||
metadata=item_metadata,
|
||||
datetime=value.datetime,
|
||||
)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveCompletedEvent(
|
||||
value=item.task,
|
||||
metadata=item.metadata,
|
||||
agent_role=item.agent,
|
||||
value=value.task,
|
||||
metadata=value.metadata,
|
||||
agent_role=value.agent,
|
||||
save_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="long_term_memory",
|
||||
from_agent=self.agent,
|
||||
@@ -75,25 +86,28 @@ class LongTermMemory(Memory):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveFailedEvent(
|
||||
value=item.task,
|
||||
metadata=item.metadata,
|
||||
agent_role=item.agent,
|
||||
value=value.task,
|
||||
metadata=value.metadata,
|
||||
agent_role=value.agent,
|
||||
error=str(e),
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
def search( # type: ignore[override]
|
||||
def search(
|
||||
self,
|
||||
task: str,
|
||||
latest_n: int = 3,
|
||||
query: str,
|
||||
limit: int = 3,
|
||||
score_threshold: float = 0.6,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Search long-term memory for relevant entries.
|
||||
|
||||
Args:
|
||||
task: The task description to search for.
|
||||
latest_n: Maximum number of results to return.
|
||||
query: The task description to search for.
|
||||
limit: Maximum number of results to return.
|
||||
score_threshold: Minimum similarity score for results (not used for
|
||||
long-term memory, included for supertype compatibility).
|
||||
|
||||
Returns:
|
||||
List of matching memory entries.
|
||||
@@ -101,8 +115,8 @@ class LongTermMemory(Memory):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryStartedEvent(
|
||||
query=task,
|
||||
limit=latest_n,
|
||||
query=query,
|
||||
limit=limit,
|
||||
source_type="long_term_memory",
|
||||
from_agent=self.agent,
|
||||
from_task=self.task,
|
||||
@@ -111,14 +125,14 @@ class LongTermMemory(Memory):
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
results = self.storage.load(task, latest_n)
|
||||
results = self.storage.load(query, limit)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryCompletedEvent(
|
||||
query=task,
|
||||
query=query,
|
||||
results=results,
|
||||
limit=latest_n,
|
||||
limit=limit,
|
||||
query_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="long_term_memory",
|
||||
from_agent=self.agent,
|
||||
@@ -131,26 +145,32 @@ class LongTermMemory(Memory):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryFailedEvent(
|
||||
query=task,
|
||||
limit=latest_n,
|
||||
query=query,
|
||||
limit=limit,
|
||||
error=str(e),
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
async def asave(self, item: LongTermMemoryItem) -> None: # type: ignore[override]
|
||||
async def asave(
|
||||
self,
|
||||
value: LongTermMemoryItem,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
"""Save an item to long-term memory asynchronously.
|
||||
|
||||
Args:
|
||||
item: The LongTermMemoryItem to save.
|
||||
value: The LongTermMemoryItem to save.
|
||||
metadata: Optional metadata dict (not used, metadata is extracted from the
|
||||
LongTermMemoryItem). Included for supertype compatibility.
|
||||
"""
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveStartedEvent(
|
||||
value=item.task,
|
||||
metadata=item.metadata,
|
||||
agent_role=item.agent,
|
||||
value=value.task,
|
||||
metadata=value.metadata,
|
||||
agent_role=value.agent,
|
||||
source_type="long_term_memory",
|
||||
from_agent=self.agent,
|
||||
from_task=self.task,
|
||||
@@ -159,23 +179,23 @@ class LongTermMemory(Memory):
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
metadata = item.metadata
|
||||
metadata.update(
|
||||
{"agent": item.agent, "expected_output": item.expected_output}
|
||||
item_metadata = value.metadata
|
||||
item_metadata.update(
|
||||
{"agent": value.agent, "expected_output": value.expected_output}
|
||||
)
|
||||
await self.storage.asave(
|
||||
task_description=item.task,
|
||||
score=metadata["quality"],
|
||||
metadata=metadata,
|
||||
datetime=item.datetime,
|
||||
task_description=value.task,
|
||||
score=item_metadata["quality"],
|
||||
metadata=item_metadata,
|
||||
datetime=value.datetime,
|
||||
)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveCompletedEvent(
|
||||
value=item.task,
|
||||
metadata=item.metadata,
|
||||
agent_role=item.agent,
|
||||
value=value.task,
|
||||
metadata=value.metadata,
|
||||
agent_role=value.agent,
|
||||
save_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="long_term_memory",
|
||||
from_agent=self.agent,
|
||||
@@ -186,25 +206,28 @@ class LongTermMemory(Memory):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemorySaveFailedEvent(
|
||||
value=item.task,
|
||||
metadata=item.metadata,
|
||||
agent_role=item.agent,
|
||||
value=value.task,
|
||||
metadata=value.metadata,
|
||||
agent_role=value.agent,
|
||||
error=str(e),
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
async def asearch( # type: ignore[override]
|
||||
async def asearch(
|
||||
self,
|
||||
task: str,
|
||||
latest_n: int = 3,
|
||||
query: str,
|
||||
limit: int = 3,
|
||||
score_threshold: float = 0.6,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Search long-term memory asynchronously.
|
||||
|
||||
Args:
|
||||
task: The task description to search for.
|
||||
latest_n: Maximum number of results to return.
|
||||
query: The task description to search for.
|
||||
limit: Maximum number of results to return.
|
||||
score_threshold: Minimum similarity score for results (not used for
|
||||
long-term memory, included for supertype compatibility).
|
||||
|
||||
Returns:
|
||||
List of matching memory entries.
|
||||
@@ -212,8 +235,8 @@ class LongTermMemory(Memory):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryStartedEvent(
|
||||
query=task,
|
||||
limit=latest_n,
|
||||
query=query,
|
||||
limit=limit,
|
||||
source_type="long_term_memory",
|
||||
from_agent=self.agent,
|
||||
from_task=self.task,
|
||||
@@ -222,14 +245,14 @@ class LongTermMemory(Memory):
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
results = await self.storage.aload(task, latest_n)
|
||||
results = await self.storage.aload(query, limit)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryCompletedEvent(
|
||||
query=task,
|
||||
query=query,
|
||||
results=results,
|
||||
limit=latest_n,
|
||||
limit=limit,
|
||||
query_time_ms=(time.time() - start_time) * 1000,
|
||||
source_type="long_term_memory",
|
||||
from_agent=self.agent,
|
||||
@@ -242,8 +265,8 @@ class LongTermMemory(Memory):
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=MemoryQueryFailedEvent(
|
||||
query=task,
|
||||
limit=latest_n,
|
||||
query=query,
|
||||
limit=limit,
|
||||
error=str(e),
|
||||
source_type="long_term_memory",
|
||||
),
|
||||
|
||||
@@ -1,243 +0,0 @@
|
||||
"""Tests for EventListener execution_spans cleanup to prevent memory leaks."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.event_listener import EventListener
|
||||
from crewai.events.types.task_events import (
|
||||
TaskCompletedEvent,
|
||||
TaskFailedEvent,
|
||||
TaskStartedEvent,
|
||||
)
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
class MockAgent:
|
||||
"""Mock agent for testing."""
|
||||
|
||||
def __init__(self, role: str = "test_role"):
|
||||
self.role = role
|
||||
self.crew = MagicMock()
|
||||
|
||||
|
||||
class MockTask:
|
||||
"""Mock task for testing."""
|
||||
|
||||
def __init__(self, task_id: str = "test_task"):
|
||||
self.id = task_id
|
||||
self.name = "Test Task"
|
||||
self.description = "A test task description"
|
||||
self.agent = MockAgent()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def event_listener():
|
||||
"""Create a fresh EventListener instance for testing."""
|
||||
EventListener._instance = None
|
||||
EventListener._initialized = False
|
||||
listener = EventListener()
|
||||
listener.setup_listeners(crewai_event_bus)
|
||||
return listener
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task():
|
||||
"""Create a mock task for testing."""
|
||||
return MockTask()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task_output():
|
||||
"""Create a mock task output for testing."""
|
||||
return TaskOutput(
|
||||
description="Test task description",
|
||||
raw="Test output",
|
||||
agent="test_agent",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_removed_on_task_completed(
|
||||
event_listener, mock_task, mock_task_output
|
||||
):
|
||||
"""Test that execution_spans entries are properly removed when a task completes.
|
||||
|
||||
This test verifies the fix for the memory leak where completed tasks were
|
||||
setting execution_spans[source] = None instead of removing the key entirely.
|
||||
"""
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
mock_span = MagicMock()
|
||||
mock_task_started.return_value = mock_span
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task in event_listener.execution_spans
|
||||
assert event_listener.execution_spans[mock_task] == mock_span
|
||||
|
||||
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, completed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_removed_on_task_failed(event_listener, mock_task):
|
||||
"""Test that execution_spans entries are properly removed when a task fails.
|
||||
|
||||
This test verifies the fix for the memory leak where failed tasks were
|
||||
setting execution_spans[source] = None instead of removing the key entirely.
|
||||
"""
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
mock_span = MagicMock()
|
||||
mock_task_started.return_value = mock_span
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task in event_listener.execution_spans
|
||||
assert event_listener.execution_spans[mock_task] == mock_span
|
||||
|
||||
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, failed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_dict_size_does_not_grow_unbounded(
|
||||
event_listener, mock_task_output
|
||||
):
|
||||
"""Test that execution_spans dictionary size remains bounded after many tasks.
|
||||
|
||||
This test simulates the memory leak scenario where many tasks complete/fail
|
||||
and verifies that the dictionary doesn't grow unboundedly.
|
||||
"""
|
||||
num_tasks = 100
|
||||
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
mock_task_started.return_value = MagicMock()
|
||||
|
||||
for i in range(num_tasks):
|
||||
task = MockTask(task_id=f"task_{i}")
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=task)
|
||||
future = crewai_event_bus.emit(task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
if i % 2 == 0:
|
||||
completed_event = TaskCompletedEvent(
|
||||
output=mock_task_output, task=task
|
||||
)
|
||||
future = crewai_event_bus.emit(task, completed_event)
|
||||
else:
|
||||
failed_event = TaskFailedEvent(error="Test error", task=task)
|
||||
future = crewai_event_bus.emit(task, failed_event)
|
||||
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert len(event_listener.execution_spans) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_handles_missing_task_gracefully(
|
||||
event_listener, mock_task, mock_task_output
|
||||
):
|
||||
"""Test that completing/failing a task not in execution_spans doesn't cause errors.
|
||||
|
||||
This ensures the fix using pop(source, None) handles edge cases gracefully.
|
||||
"""
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, completed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_spans_handles_missing_task_on_failure_gracefully(
|
||||
event_listener, mock_task
|
||||
):
|
||||
"""Test that failing a task not in execution_spans doesn't cause errors.
|
||||
|
||||
This ensures the fix using pop(source, None) handles edge cases gracefully.
|
||||
"""
|
||||
with patch.object(event_listener._telemetry, "task_ended"):
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, failed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
assert mock_task not in event_listener.execution_spans
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_telemetry_task_ended_called_with_span_on_completion(
|
||||
event_listener, mock_task, mock_task_output
|
||||
):
|
||||
"""Test that telemetry.task_ended is called with the correct span on completion."""
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
|
||||
mock_span = MagicMock()
|
||||
mock_task_started.return_value = mock_span
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, completed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
mock_task_ended.assert_called_once_with(
|
||||
mock_span, mock_task, mock_task.agent.crew
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_telemetry_task_ended_called_with_span_on_failure(
|
||||
event_listener, mock_task
|
||||
):
|
||||
"""Test that telemetry.task_ended is called with the correct span on failure."""
|
||||
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
|
||||
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
|
||||
mock_span = MagicMock()
|
||||
mock_task_started.return_value = mock_span
|
||||
|
||||
start_event = TaskStartedEvent(context="test context", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, start_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
|
||||
future = crewai_event_bus.emit(mock_task, failed_event)
|
||||
if future:
|
||||
await asyncio.wrap_future(future)
|
||||
|
||||
mock_task_ended.assert_called_once_with(
|
||||
mock_span, mock_task, mock_task.agent.crew
|
||||
)
|
||||
@@ -1,5 +1,7 @@
|
||||
import inspect
|
||||
import threading
|
||||
from collections import defaultdict
|
||||
from typing import Any
|
||||
from unittest.mock import ANY
|
||||
|
||||
import pytest
|
||||
@@ -13,6 +15,7 @@ from crewai.events.types.memory_events import (
|
||||
)
|
||||
from crewai.memory.long_term.long_term_memory import LongTermMemory
|
||||
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
|
||||
from crewai.memory.memory import Memory
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -114,7 +117,7 @@ def test_long_term_memory_search_events(long_term_memory):
|
||||
|
||||
test_query = "test query"
|
||||
|
||||
long_term_memory.search(test_query, latest_n=5)
|
||||
long_term_memory.search(test_query, limit=5)
|
||||
|
||||
with condition:
|
||||
success = condition.wait_for(
|
||||
@@ -174,10 +177,104 @@ def test_save_and_search(long_term_memory):
|
||||
metadata={"task": "test_task", "quality": 0.5},
|
||||
)
|
||||
long_term_memory.save(memory)
|
||||
find = long_term_memory.search("test_task", latest_n=5)[0]
|
||||
find = long_term_memory.search("test_task", limit=5)[0]
|
||||
assert find["score"] == 0.5
|
||||
assert find["datetime"] == "test_datetime"
|
||||
assert find["metadata"]["agent"] == "test_agent"
|
||||
assert find["metadata"]["quality"] == 0.5
|
||||
assert find["metadata"]["task"] == "test_task"
|
||||
assert find["metadata"]["expected_output"] == "test_output"
|
||||
|
||||
|
||||
class TestLongTermMemoryTypeSignatureCompatibility:
|
||||
"""Tests to verify LongTermMemory method signatures are compatible with Memory base class.
|
||||
|
||||
These tests ensure that the Liskov Substitution Principle is maintained and that
|
||||
LongTermMemory can be used polymorphically wherever Memory is expected.
|
||||
"""
|
||||
|
||||
def test_save_signature_has_value_parameter(self):
|
||||
"""Test that save() uses 'value' parameter name matching Memory base class."""
|
||||
sig = inspect.signature(LongTermMemory.save)
|
||||
params = list(sig.parameters.keys())
|
||||
assert "value" in params, "save() should have 'value' parameter for LSP compliance"
|
||||
assert "metadata" in params, "save() should have 'metadata' parameter for LSP compliance"
|
||||
|
||||
def test_save_signature_has_metadata_with_default(self):
|
||||
"""Test that save() has metadata parameter with default value."""
|
||||
sig = inspect.signature(LongTermMemory.save)
|
||||
metadata_param = sig.parameters.get("metadata")
|
||||
assert metadata_param is not None, "save() should have 'metadata' parameter"
|
||||
assert metadata_param.default is None, "metadata should default to None"
|
||||
|
||||
def test_search_signature_has_query_parameter(self):
|
||||
"""Test that search() uses 'query' parameter name matching Memory base class."""
|
||||
sig = inspect.signature(LongTermMemory.search)
|
||||
params = list(sig.parameters.keys())
|
||||
assert "query" in params, "search() should have 'query' parameter for LSP compliance"
|
||||
assert "limit" in params, "search() should have 'limit' parameter for LSP compliance"
|
||||
assert "score_threshold" in params, "search() should have 'score_threshold' parameter for LSP compliance"
|
||||
|
||||
def test_search_signature_has_score_threshold_with_default(self):
|
||||
"""Test that search() has score_threshold parameter with default value."""
|
||||
sig = inspect.signature(LongTermMemory.search)
|
||||
score_threshold_param = sig.parameters.get("score_threshold")
|
||||
assert score_threshold_param is not None, "search() should have 'score_threshold' parameter"
|
||||
assert score_threshold_param.default == 0.6, "score_threshold should default to 0.6"
|
||||
|
||||
def test_asave_signature_has_value_parameter(self):
|
||||
"""Test that asave() uses 'value' parameter name matching Memory base class."""
|
||||
sig = inspect.signature(LongTermMemory.asave)
|
||||
params = list(sig.parameters.keys())
|
||||
assert "value" in params, "asave() should have 'value' parameter for LSP compliance"
|
||||
assert "metadata" in params, "asave() should have 'metadata' parameter for LSP compliance"
|
||||
|
||||
def test_asearch_signature_has_query_parameter(self):
|
||||
"""Test that asearch() uses 'query' parameter name matching Memory base class."""
|
||||
sig = inspect.signature(LongTermMemory.asearch)
|
||||
params = list(sig.parameters.keys())
|
||||
assert "query" in params, "asearch() should have 'query' parameter for LSP compliance"
|
||||
assert "limit" in params, "asearch() should have 'limit' parameter for LSP compliance"
|
||||
assert "score_threshold" in params, "asearch() should have 'score_threshold' parameter for LSP compliance"
|
||||
|
||||
def test_long_term_memory_is_subclass_of_memory(self):
|
||||
"""Test that LongTermMemory is a proper subclass of Memory."""
|
||||
assert issubclass(LongTermMemory, Memory), "LongTermMemory should be a subclass of Memory"
|
||||
|
||||
def test_save_with_metadata_parameter(self, long_term_memory):
|
||||
"""Test that save() can be called with the metadata parameter (even if unused)."""
|
||||
memory_item = LongTermMemoryItem(
|
||||
agent="test_agent",
|
||||
task="test_task_with_metadata",
|
||||
expected_output="test_output",
|
||||
datetime="test_datetime",
|
||||
quality=0.8,
|
||||
metadata={"task": "test_task_with_metadata", "quality": 0.8},
|
||||
)
|
||||
long_term_memory.save(value=memory_item, metadata={"extra": "data"})
|
||||
results = long_term_memory.search(query="test_task_with_metadata", limit=1)
|
||||
assert len(results) > 0
|
||||
assert results[0]["metadata"]["agent"] == "test_agent"
|
||||
|
||||
def test_search_with_score_threshold_parameter(self, long_term_memory):
|
||||
"""Test that search() can be called with the score_threshold parameter."""
|
||||
memory_item = LongTermMemoryItem(
|
||||
agent="test_agent",
|
||||
task="test_task_score_threshold",
|
||||
expected_output="test_output",
|
||||
datetime="test_datetime",
|
||||
quality=0.9,
|
||||
metadata={"task": "test_task_score_threshold", "quality": 0.9},
|
||||
)
|
||||
long_term_memory.save(value=memory_item)
|
||||
results = long_term_memory.search(
|
||||
query="test_task_score_threshold",
|
||||
limit=5,
|
||||
score_threshold=0.5,
|
||||
)
|
||||
assert isinstance(results, list)
|
||||
|
||||
@pytest.fixture
|
||||
def long_term_memory(self):
|
||||
"""Fixture to create a LongTermMemory instance for this test class."""
|
||||
return LongTermMemory()
|
||||
|
||||
Reference in New Issue
Block a user