Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
3add062b4b chore: re-trigger CI tests
Co-Authored-By: João <joao@crewai.com>
2026-01-10 21:10:20 +00:00
Devin AI
0a9fb72cfe fix: resolve type signature incompatibility in LongTermMemory
This commit fixes the type signature incompatibility issue in the
LongTermMemory class (issue #4213). The save(), asave(), search(), and
asearch() methods now have signatures compatible with the Memory base class,
following the Liskov Substitution Principle.

Changes:
- Renamed 'item' parameter to 'value' in save() and asave() methods
- Added 'metadata' parameter to save() and asave() methods for LSP compliance
- Renamed 'task' parameter to 'query' in search() and asearch() methods
- Renamed 'latest_n' parameter to 'limit' in search() and asearch() methods
- Added 'score_threshold' parameter to search() and asearch() methods
- Removed '# type: ignore' comments that were suppressing the type errors
- Updated existing tests to use new parameter names
- Added comprehensive tests to verify type signature compatibility

Co-Authored-By: João <joao@crewai.com>
2026-01-10 21:06:56 +00:00
13 changed files with 193 additions and 364 deletions

View File

@@ -574,10 +574,6 @@ When you run this Flow, the output will change based on the random boolean value
### Human in the Loop (human feedback)
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**.
</Note>
The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
```python Code

View File

@@ -7,10 +7,6 @@ mode: "wide"
## Overview
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**. Make sure to update your installation before using this feature.
</Note>
The `@human_feedback` decorator enables human-in-the-loop (HITL) workflows directly within CrewAI Flows. It allows you to pause flow execution, present output to a human for review, collect their feedback, and optionally route to different listeners based on the feedback outcome.
This is particularly valuable for:

View File

@@ -11,10 +11,10 @@ Human-in-the-Loop (HITL) is a powerful approach that combines artificial intelli
CrewAI offers two main approaches for implementing human-in-the-loop workflows:
| Approach | Best For | Integration | Version |
|----------|----------|-------------|---------|
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) | **1.8.0+** |
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide | - |
| Approach | Best For | Integration |
|----------|----------|-------------|
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) |
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide |
<Tip>
If you're building flows and want to add human review steps with routing based on feedback, check out the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide for the `@human_feedback` decorator.

View File

@@ -567,10 +567,6 @@ Fourth method running
### Human in the Loop (인간 피드백)
<Note>
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다.
</Note>
`@human_feedback` 데코레이터는 인간의 피드백을 수집하기 위해 플로우 실행을 일시 중지하는 human-in-the-loop 워크플로우를 가능하게 합니다. 이는 승인 게이트, 품질 검토, 인간의 판단이 필요한 결정 지점에 유용합니다.
```python Code

View File

@@ -7,10 +7,6 @@ mode: "wide"
## 개요
<Note>
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다. 이 기능을 사용하기 전에 설치를 업데이트하세요.
</Note>
`@human_feedback` 데코레이터는 CrewAI Flow 내에서 직접 human-in-the-loop(HITL) 워크플로우를 가능하게 합니다. Flow 실행을 일시 중지하고, 인간에게 검토를 위해 출력을 제시하고, 피드백을 수집하고, 선택적으로 피드백 결과에 따라 다른 리스너로 라우팅할 수 있습니다.
이는 특히 다음과 같은 경우에 유용합니다:

View File

@@ -5,22 +5,9 @@ icon: "user-check"
mode: "wide"
---
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. CrewAI는 필요에 따라 HITL을 구현하는 여러 가지 방법을 제공합니다.
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. 이 가이드에서는 CrewAI 내에서 HITL을 구현하는 방법을 안내합니다.
## HITL 접근 방식 선택
CrewAI는 human-in-the-loop 워크플로우를 구현하기 위한 두 가지 주요 접근 방식을 제공합니다:
| 접근 방식 | 적합한 용도 | 통합 | 버전 |
|----------|----------|-------------|---------|
| **Flow 기반** (`@human_feedback` 데코레이터) | 로컬 개발, 콘솔 기반 검토, 동기식 워크플로우 | [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) | **1.8.0+** |
| **Webhook 기반** (Enterprise) | 프로덕션 배포, 비동기 워크플로우, 외부 통합 (Slack, Teams 등) | 이 가이드 | - |
<Tip>
Flow를 구축하면서 피드백을 기반으로 라우팅하는 인간 검토 단계를 추가하려면 `@human_feedback` 데코레이터에 대한 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) 가이드를 참조하세요.
</Tip>
## Webhook 기반 HITL 워크플로우 설정
## HITL 워크플로우 설정
<Steps>
<Step title="작업 구성">

View File

@@ -309,10 +309,6 @@ Ao executar esse Flow, a saída será diferente dependendo do valor booleano ale
### Human in the Loop (feedback humano)
<Note>
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**.
</Note>
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop, pausando a execução do flow para coletar feedback de um humano. Isso é útil para portões de aprovação, revisão de qualidade e pontos de decisão que requerem julgamento humano.
```python Code

View File

@@ -7,10 +7,6 @@ mode: "wide"
## Visão Geral
<Note>
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**. Certifique-se de atualizar sua instalação antes de usar este recurso.
</Note>
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback.
Isso é particularmente valioso para:

View File

@@ -5,22 +5,9 @@ icon: "user-check"
mode: "wide"
---
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. CrewAI oferece várias maneiras de implementar HITL dependendo das suas necessidades.
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. Este guia mostra como implementar HITL dentro da CrewAI.
## Escolhendo Sua Abordagem HITL
CrewAI oferece duas abordagens principais para implementar workflows human-in-the-loop:
| Abordagem | Melhor Para | Integração | Versão |
|----------|----------|-------------|---------|
| **Baseada em Flow** (decorador `@human_feedback`) | Desenvolvimento local, revisão via console, workflows síncronos | [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) | **1.8.0+** |
| **Baseada em Webhook** (Enterprise) | Deployments em produção, workflows assíncronos, integrações externas (Slack, Teams, etc.) | Este guia | - |
<Tip>
Se você está construindo flows e deseja adicionar etapas de revisão humana com roteamento baseado em feedback, confira o guia [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) para o decorador `@human_feedback`.
</Tip>
## Configurando Workflows HITL Baseados em Webhook
## Configurando Workflows HITL
<Steps>
<Step title="Configure sua Tarefa">

View File

@@ -209,9 +209,10 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
# Handle telemetry
span = self.execution_spans.pop(source, None)
span = self.execution_spans.get(source)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)
@@ -221,10 +222,11 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
span = self.execution_spans.pop(source, None)
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)

View File

@@ -33,13 +33,24 @@ class LongTermMemory(Memory):
storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage()
super().__init__(storage=storage)
def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
def save(
self,
value: LongTermMemoryItem,
metadata: dict[str, Any] | None = None,
) -> None:
"""Save an item to long-term memory.
Args:
value: The LongTermMemoryItem to save.
metadata: Optional metadata dict (not used, metadata is extracted from the
LongTermMemoryItem). Included for supertype compatibility.
"""
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
value=value.task,
metadata=value.metadata,
agent_role=value.agent,
source_type="long_term_memory",
from_agent=self.agent,
from_task=self.task,
@@ -48,23 +59,23 @@ class LongTermMemory(Memory):
start_time = time.time()
try:
metadata = item.metadata
metadata.update(
{"agent": item.agent, "expected_output": item.expected_output}
item_metadata = value.metadata
item_metadata.update(
{"agent": value.agent, "expected_output": value.expected_output}
)
self.storage.save(
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
task_description=value.task,
score=item_metadata["quality"],
metadata=item_metadata,
datetime=value.datetime,
)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
value=value.task,
metadata=value.metadata,
agent_role=value.agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
from_agent=self.agent,
@@ -75,25 +86,28 @@ class LongTermMemory(Memory):
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
value=value.task,
metadata=value.metadata,
agent_role=value.agent,
error=str(e),
source_type="long_term_memory",
),
)
raise
def search( # type: ignore[override]
def search(
self,
task: str,
latest_n: int = 3,
query: str,
limit: int = 3,
score_threshold: float = 0.6,
) -> list[dict[str, Any]]:
"""Search long-term memory for relevant entries.
Args:
task: The task description to search for.
latest_n: Maximum number of results to return.
query: The task description to search for.
limit: Maximum number of results to return.
score_threshold: Minimum similarity score for results (not used for
long-term memory, included for supertype compatibility).
Returns:
List of matching memory entries.
@@ -101,8 +115,8 @@ class LongTermMemory(Memory):
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=task,
limit=latest_n,
query=query,
limit=limit,
source_type="long_term_memory",
from_agent=self.agent,
from_task=self.task,
@@ -111,14 +125,14 @@ class LongTermMemory(Memory):
start_time = time.time()
try:
results = self.storage.load(task, latest_n)
results = self.storage.load(query, limit)
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=task,
query=query,
results=results,
limit=latest_n,
limit=limit,
query_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
from_agent=self.agent,
@@ -131,26 +145,32 @@ class LongTermMemory(Memory):
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=task,
limit=latest_n,
query=query,
limit=limit,
error=str(e),
source_type="long_term_memory",
),
)
raise
async def asave(self, item: LongTermMemoryItem) -> None: # type: ignore[override]
async def asave(
self,
value: LongTermMemoryItem,
metadata: dict[str, Any] | None = None,
) -> None:
"""Save an item to long-term memory asynchronously.
Args:
item: The LongTermMemoryItem to save.
value: The LongTermMemoryItem to save.
metadata: Optional metadata dict (not used, metadata is extracted from the
LongTermMemoryItem). Included for supertype compatibility.
"""
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
value=value.task,
metadata=value.metadata,
agent_role=value.agent,
source_type="long_term_memory",
from_agent=self.agent,
from_task=self.task,
@@ -159,23 +179,23 @@ class LongTermMemory(Memory):
start_time = time.time()
try:
metadata = item.metadata
metadata.update(
{"agent": item.agent, "expected_output": item.expected_output}
item_metadata = value.metadata
item_metadata.update(
{"agent": value.agent, "expected_output": value.expected_output}
)
await self.storage.asave(
task_description=item.task,
score=metadata["quality"],
metadata=metadata,
datetime=item.datetime,
task_description=value.task,
score=item_metadata["quality"],
metadata=item_metadata,
datetime=value.datetime,
)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
value=value.task,
metadata=value.metadata,
agent_role=value.agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
from_agent=self.agent,
@@ -186,25 +206,28 @@ class LongTermMemory(Memory):
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
value=item.task,
metadata=item.metadata,
agent_role=item.agent,
value=value.task,
metadata=value.metadata,
agent_role=value.agent,
error=str(e),
source_type="long_term_memory",
),
)
raise
async def asearch( # type: ignore[override]
async def asearch(
self,
task: str,
latest_n: int = 3,
query: str,
limit: int = 3,
score_threshold: float = 0.6,
) -> list[dict[str, Any]]:
"""Search long-term memory asynchronously.
Args:
task: The task description to search for.
latest_n: Maximum number of results to return.
query: The task description to search for.
limit: Maximum number of results to return.
score_threshold: Minimum similarity score for results (not used for
long-term memory, included for supertype compatibility).
Returns:
List of matching memory entries.
@@ -212,8 +235,8 @@ class LongTermMemory(Memory):
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=task,
limit=latest_n,
query=query,
limit=limit,
source_type="long_term_memory",
from_agent=self.agent,
from_task=self.task,
@@ -222,14 +245,14 @@ class LongTermMemory(Memory):
start_time = time.time()
try:
results = await self.storage.aload(task, latest_n)
results = await self.storage.aload(query, limit)
crewai_event_bus.emit(
self,
event=MemoryQueryCompletedEvent(
query=task,
query=query,
results=results,
limit=latest_n,
limit=limit,
query_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
from_agent=self.agent,
@@ -242,8 +265,8 @@ class LongTermMemory(Memory):
crewai_event_bus.emit(
self,
event=MemoryQueryFailedEvent(
query=task,
limit=latest_n,
query=query,
limit=limit,
error=str(e),
source_type="long_term_memory",
),

View File

@@ -1,243 +0,0 @@
"""Tests for EventListener execution_spans cleanup to prevent memory leaks."""
import asyncio
from unittest.mock import MagicMock, patch
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.tasks.task_output import TaskOutput
class MockAgent:
"""Mock agent for testing."""
def __init__(self, role: str = "test_role"):
self.role = role
self.crew = MagicMock()
class MockTask:
"""Mock task for testing."""
def __init__(self, task_id: str = "test_task"):
self.id = task_id
self.name = "Test Task"
self.description = "A test task description"
self.agent = MockAgent()
@pytest.fixture
def event_listener():
"""Create a fresh EventListener instance for testing."""
EventListener._instance = None
EventListener._initialized = False
listener = EventListener()
listener.setup_listeners(crewai_event_bus)
return listener
@pytest.fixture
def mock_task():
"""Create a mock task for testing."""
return MockTask()
@pytest.fixture
def mock_task_output():
"""Create a mock task output for testing."""
return TaskOutput(
description="Test task description",
raw="Test output",
agent="test_agent",
)
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_completed(
event_listener, mock_task, mock_task_output
):
"""Test that execution_spans entries are properly removed when a task completes.
This test verifies the fix for the memory leak where completed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_removed_on_task_failed(event_listener, mock_task):
"""Test that execution_spans entries are properly removed when a task fails.
This test verifies the fix for the memory leak where failed tasks were
setting execution_spans[source] = None instead of removing the key entirely.
"""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
assert mock_task in event_listener.execution_spans
assert event_listener.execution_spans[mock_task] == mock_span
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_dict_size_does_not_grow_unbounded(
event_listener, mock_task_output
):
"""Test that execution_spans dictionary size remains bounded after many tasks.
This test simulates the memory leak scenario where many tasks complete/fail
and verifies that the dictionary doesn't grow unboundedly.
"""
num_tasks = 100
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended"):
mock_task_started.return_value = MagicMock()
for i in range(num_tasks):
task = MockTask(task_id=f"task_{i}")
start_event = TaskStartedEvent(context="test context", task=task)
future = crewai_event_bus.emit(task, start_event)
if future:
await asyncio.wrap_future(future)
if i % 2 == 0:
completed_event = TaskCompletedEvent(
output=mock_task_output, task=task
)
future = crewai_event_bus.emit(task, completed_event)
else:
failed_event = TaskFailedEvent(error="Test error", task=task)
future = crewai_event_bus.emit(task, failed_event)
if future:
await asyncio.wrap_future(future)
assert len(event_listener.execution_spans) == 0
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_gracefully(
event_listener, mock_task, mock_task_output
):
"""Test that completing/failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_execution_spans_handles_missing_task_on_failure_gracefully(
event_listener, mock_task
):
"""Test that failing a task not in execution_spans doesn't cause errors.
This ensures the fix using pop(source, None) handles edge cases gracefully.
"""
with patch.object(event_listener._telemetry, "task_ended"):
assert mock_task not in event_listener.execution_spans
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
assert mock_task not in event_listener.execution_spans
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_completion(
event_listener, mock_task, mock_task_output
):
"""Test that telemetry.task_ended is called with the correct span on completion."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
completed_event = TaskCompletedEvent(output=mock_task_output, task=mock_task)
future = crewai_event_bus.emit(mock_task, completed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)
@pytest.mark.asyncio
async def test_telemetry_task_ended_called_with_span_on_failure(
event_listener, mock_task
):
"""Test that telemetry.task_ended is called with the correct span on failure."""
with patch.object(event_listener._telemetry, "task_started") as mock_task_started:
with patch.object(event_listener._telemetry, "task_ended") as mock_task_ended:
mock_span = MagicMock()
mock_task_started.return_value = mock_span
start_event = TaskStartedEvent(context="test context", task=mock_task)
future = crewai_event_bus.emit(mock_task, start_event)
if future:
await asyncio.wrap_future(future)
failed_event = TaskFailedEvent(error="Test error", task=mock_task)
future = crewai_event_bus.emit(mock_task, failed_event)
if future:
await asyncio.wrap_future(future)
mock_task_ended.assert_called_once_with(
mock_span, mock_task, mock_task.agent.crew
)

View File

@@ -1,5 +1,7 @@
import inspect
import threading
from collections import defaultdict
from typing import Any
from unittest.mock import ANY
import pytest
@@ -13,6 +15,7 @@ from crewai.events.types.memory_events import (
)
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.memory import Memory
@pytest.fixture
@@ -114,7 +117,7 @@ def test_long_term_memory_search_events(long_term_memory):
test_query = "test query"
long_term_memory.search(test_query, latest_n=5)
long_term_memory.search(test_query, limit=5)
with condition:
success = condition.wait_for(
@@ -174,10 +177,104 @@ def test_save_and_search(long_term_memory):
metadata={"task": "test_task", "quality": 0.5},
)
long_term_memory.save(memory)
find = long_term_memory.search("test_task", latest_n=5)[0]
find = long_term_memory.search("test_task", limit=5)[0]
assert find["score"] == 0.5
assert find["datetime"] == "test_datetime"
assert find["metadata"]["agent"] == "test_agent"
assert find["metadata"]["quality"] == 0.5
assert find["metadata"]["task"] == "test_task"
assert find["metadata"]["expected_output"] == "test_output"
class TestLongTermMemoryTypeSignatureCompatibility:
"""Tests to verify LongTermMemory method signatures are compatible with Memory base class.
These tests ensure that the Liskov Substitution Principle is maintained and that
LongTermMemory can be used polymorphically wherever Memory is expected.
"""
def test_save_signature_has_value_parameter(self):
"""Test that save() uses 'value' parameter name matching Memory base class."""
sig = inspect.signature(LongTermMemory.save)
params = list(sig.parameters.keys())
assert "value" in params, "save() should have 'value' parameter for LSP compliance"
assert "metadata" in params, "save() should have 'metadata' parameter for LSP compliance"
def test_save_signature_has_metadata_with_default(self):
"""Test that save() has metadata parameter with default value."""
sig = inspect.signature(LongTermMemory.save)
metadata_param = sig.parameters.get("metadata")
assert metadata_param is not None, "save() should have 'metadata' parameter"
assert metadata_param.default is None, "metadata should default to None"
def test_search_signature_has_query_parameter(self):
"""Test that search() uses 'query' parameter name matching Memory base class."""
sig = inspect.signature(LongTermMemory.search)
params = list(sig.parameters.keys())
assert "query" in params, "search() should have 'query' parameter for LSP compliance"
assert "limit" in params, "search() should have 'limit' parameter for LSP compliance"
assert "score_threshold" in params, "search() should have 'score_threshold' parameter for LSP compliance"
def test_search_signature_has_score_threshold_with_default(self):
"""Test that search() has score_threshold parameter with default value."""
sig = inspect.signature(LongTermMemory.search)
score_threshold_param = sig.parameters.get("score_threshold")
assert score_threshold_param is not None, "search() should have 'score_threshold' parameter"
assert score_threshold_param.default == 0.6, "score_threshold should default to 0.6"
def test_asave_signature_has_value_parameter(self):
"""Test that asave() uses 'value' parameter name matching Memory base class."""
sig = inspect.signature(LongTermMemory.asave)
params = list(sig.parameters.keys())
assert "value" in params, "asave() should have 'value' parameter for LSP compliance"
assert "metadata" in params, "asave() should have 'metadata' parameter for LSP compliance"
def test_asearch_signature_has_query_parameter(self):
"""Test that asearch() uses 'query' parameter name matching Memory base class."""
sig = inspect.signature(LongTermMemory.asearch)
params = list(sig.parameters.keys())
assert "query" in params, "asearch() should have 'query' parameter for LSP compliance"
assert "limit" in params, "asearch() should have 'limit' parameter for LSP compliance"
assert "score_threshold" in params, "asearch() should have 'score_threshold' parameter for LSP compliance"
def test_long_term_memory_is_subclass_of_memory(self):
"""Test that LongTermMemory is a proper subclass of Memory."""
assert issubclass(LongTermMemory, Memory), "LongTermMemory should be a subclass of Memory"
def test_save_with_metadata_parameter(self, long_term_memory):
"""Test that save() can be called with the metadata parameter (even if unused)."""
memory_item = LongTermMemoryItem(
agent="test_agent",
task="test_task_with_metadata",
expected_output="test_output",
datetime="test_datetime",
quality=0.8,
metadata={"task": "test_task_with_metadata", "quality": 0.8},
)
long_term_memory.save(value=memory_item, metadata={"extra": "data"})
results = long_term_memory.search(query="test_task_with_metadata", limit=1)
assert len(results) > 0
assert results[0]["metadata"]["agent"] == "test_agent"
def test_search_with_score_threshold_parameter(self, long_term_memory):
"""Test that search() can be called with the score_threshold parameter."""
memory_item = LongTermMemoryItem(
agent="test_agent",
task="test_task_score_threshold",
expected_output="test_output",
datetime="test_datetime",
quality=0.9,
metadata={"task": "test_task_score_threshold", "quality": 0.9},
)
long_term_memory.save(value=memory_item)
results = long_term_memory.search(
query="test_task_score_threshold",
limit=5,
score_threshold=0.5,
)
assert isinstance(results, list)
@pytest.fixture
def long_term_memory(self):
"""Fixture to create a LongTermMemory instance for this test class."""
return LongTermMemory()