feat: conversation-aware memory extraction for Agent.kickoff()

When Agent.kickoff() completes, the overridden _save_to_memory now
includes the full conversation history (up to 20 turns) alongside
task metadata and the final result. This produces significantly
better memory extraction for standalone agent sessions where the
conversation thread carries important context.

The base _save_to_memory (used by Crew-based execution) remains
unchanged since each Task already encodes its own context.

Inspired by Iris's memory service which proved that thread-context-
aware extraction is dramatically more accurate than single-content
extraction.

Tests: 8 new tests covering conversation inclusion, task metadata,
fallback behavior, read-only memory, truncation, and scoped saves.
This commit is contained in:
Joao Moura
2026-04-25 17:48:32 -07:00
parent cb46a1c4ba
commit 62d097a761
3 changed files with 318 additions and 1 deletions

View File

@@ -2536,6 +2536,78 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): # type: ignor
return "initialized"
def _save_to_memory(self, output: AgentFinish) -> None:
"""Save to memory with full conversation context.
Overrides the base implementation to include the full conversation
history (``self.state.messages``) when extracting memories, not just
the task description and final result. This produces significantly
better memory extraction for standalone Agent.kickoff() sessions
where the conversation thread carries important context.
For Crew-based execution the base implementation (task + result) is
sufficient because each task already encodes its own context.
"""
if self.agent is None:
return
from crewai.agents.agent_builder.base_agent_executor import sanitize_scope_name
memory = getattr(self.agent, "memory", None) or (
getattr(self.crew, "_memory", None) if self.crew else None
)
if memory is None or memory.read_only:
return
# Build context from the full conversation thread
conversation_lines: list[str] = []
for msg in self.state.messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if content and isinstance(content, str):
conversation_lines.append(f"{role}: {content[:500]}")
if conversation_lines:
# Include conversation context + task metadata + final result
parts = []
if self.task:
parts.append(f"Task: {self.task.description}")
parts.append(f"Agent: {self.agent.role}")
parts.append(f"Expected result: {self.task.expected_output}")
parts.append("")
parts.append("Conversation:")
# Last 20 turns to stay within token limits
parts.extend(conversation_lines[-20:])
parts.append("")
parts.append(f"Final result: {output.text}")
raw = "\n".join(parts)
else:
# Fallback: same as base implementation
raw = (
f"Task: {self.task.description if self.task else '(none)'}\n"
f"Agent: {self.agent.role}\n"
f"Expected result: {self.task.expected_output if self.task else '(none)'}\n"
f"Result: {output.text}"
)
try:
extracted = memory.extract_memories(raw)
if extracted:
base_root = getattr(memory, "root_scope", None)
if isinstance(base_root, str) and base_root:
agent_role = self.agent.role or "unknown"
sanitized_role = sanitize_scope_name(agent_role)
agent_root = f"{base_root.rstrip('/')}/agent/{sanitized_role}"
if not agent_root.startswith("/"):
agent_root = "/" + agent_root
memory.remember_many(
extracted, agent_role=self.agent.role, root_scope=agent_root
)
else:
memory.remember_many(extracted, agent_role=self.agent.role)
except Exception as e:
self.agent._logger.log("error", f"Failed to save to memory: {e}")
def invoke(
self, inputs: dict[str, Any]
) -> dict[str, Any] | Coroutine[Any, Any, dict[str, Any]]:

View File

@@ -0,0 +1,245 @@
"""Tests for conversation-aware memory extraction in Agent.kickoff().
Verifies that the overridden _save_to_memory in AgentExecutor includes
the full conversation history when extracting memories, not just the
task description and final result.
"""
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from crewai.agents.parser import AgentFinish
class FakeMemory:
"""Minimal memory stub for testing."""
read_only = False
root_scope = None
def __init__(self) -> None:
self.extracted_content: str = ""
self.remembered: list[str] = []
def extract_memories(self, content: str) -> list[str]:
self.extracted_content = content
return [f"fact from: {content[:40]}"]
def remember_many(
self,
contents: list[str],
agent_role: str | None = None,
root_scope: str | None = None,
) -> list[Any]:
self.remembered.extend(contents)
return []
class FakeAgent:
"""Minimal agent stub."""
def __init__(self, role: str = "Test Agent", memory: Any = None) -> None:
self.role = role
self.memory = memory
self._logger = MagicMock()
class FakeTask:
"""Minimal task stub."""
def __init__(
self,
description: str = "Test task",
expected_output: str = "A result",
) -> None:
self.description = description
self.expected_output = expected_output
def _build_executor(
agent: FakeAgent,
task: FakeTask | None = None,
messages: list[dict[str, str]] | None = None,
):
"""Build an AgentExecutor with model_construct to skip validation."""
import threading
from crewai.experimental.agent_executor import AgentExecutor, AgentExecutorState
state = AgentExecutorState()
if messages:
state.messages = messages # type: ignore[assignment]
executor = AgentExecutor.model_construct(
agent=agent,
task=task,
crew=None,
llm=MagicMock(),
callbacks=None,
prompt=None,
original_tools=[],
tools=[],
)
executor._state = state
executor._methods = {}
executor._method_outputs = []
executor._completed_methods = set()
executor._fired_or_listeners = set()
executor._pending_and_listeners = {}
executor._method_execution_counts = {}
executor._method_call_counts = {}
executor._event_futures = []
executor._human_feedback_method_outputs = {}
executor._input_history = []
executor._is_execution_resuming = False
executor._state_lock = threading.Lock()
executor._or_listeners_lock = threading.Lock()
executor._execution_lock = threading.Lock()
executor._finalize_lock = threading.Lock()
executor._finalize_called = False
executor._is_executing = False
executor._has_been_invoked = False
executor._last_parser_error = None
executor._last_context_error = None
executor._step_executor = None
executor._planner_observer = None
return executor
class TestConversationAwareMemory:
"""Tests for the overridden _save_to_memory that includes conversation."""
def test_conversation_included_in_extraction(self):
"""Memory extraction should include conversation turns."""
memory = FakeMemory()
agent = FakeAgent(memory=memory)
task = FakeTask(description="Find the weather")
messages = [
{"role": "user", "content": "What's the weather in SF?"},
{"role": "assistant", "content": "Let me check..."},
{"role": "user", "content": "Also check NYC please"},
{"role": "assistant", "content": "SF: 62F sunny. NYC: 45F cloudy."},
]
executor = _build_executor(agent, task, messages)
output = AgentFinish(output="SF: 62F sunny. NYC: 45F cloudy.", text="SF: 62F sunny. NYC: 45F cloudy.", thought="")
executor._save_to_memory(output)
# The extracted content should contain conversation turns
assert "user: What's the weather in SF?" in memory.extracted_content
assert "user: Also check NYC please" in memory.extracted_content
assert "Conversation:" in memory.extracted_content
assert "Final result:" in memory.extracted_content
assert len(memory.remembered) > 0
def test_task_metadata_included(self):
"""Task description and expected output should still be present."""
memory = FakeMemory()
agent = FakeAgent(memory=memory)
task = FakeTask(description="Analyze sales data", expected_output="A report")
messages = [{"role": "user", "content": "Run the analysis"}]
executor = _build_executor(agent, task, messages)
output = AgentFinish(output="Report ready", text="Report ready", thought="")
executor._save_to_memory(output)
assert "Task: Analyze sales data" in memory.extracted_content
assert "Expected result: A report" in memory.extracted_content
def test_no_messages_falls_back(self):
"""With no conversation history, falls back to task+result format."""
memory = FakeMemory()
agent = FakeAgent(memory=memory)
task = FakeTask(description="Simple task")
executor = _build_executor(agent, task, messages=[])
output = AgentFinish(output="Done", text="Done", thought="")
executor._save_to_memory(output)
# Fallback format
assert "Task: Simple task" in memory.extracted_content
assert "Result: Done" in memory.extracted_content
# No "Conversation:" header
assert "Conversation:" not in memory.extracted_content
def test_no_memory_is_noop(self):
"""No memory attached should not raise."""
agent = FakeAgent(memory=None)
task = FakeTask()
executor = _build_executor(agent, task)
output = AgentFinish(output="Done", text="Done", thought="")
executor._save_to_memory(output) # should not raise
def test_read_only_memory_is_noop(self):
"""Read-only memory should not attempt to save."""
memory = FakeMemory()
memory.read_only = True
agent = FakeAgent(memory=memory)
task = FakeTask()
executor = _build_executor(agent, task, messages=[{"role": "user", "content": "hello"}])
output = AgentFinish(output="Done", text="Done", thought="")
executor._save_to_memory(output)
assert memory.extracted_content == ""
assert len(memory.remembered) == 0
def test_long_conversation_truncated(self):
"""Only the last 20 turns should be included."""
memory = FakeMemory()
agent = FakeAgent(memory=memory)
task = FakeTask()
messages = [
{"role": "user", "content": f"message {i}"}
for i in range(30)
]
executor = _build_executor(agent, task, messages)
output = AgentFinish(output="Done", text="Done", thought="")
executor._save_to_memory(output)
# Should NOT include the first 10 messages
assert "message 0" not in memory.extracted_content
assert "message 9" not in memory.extracted_content
# Should include the last 20
assert "message 10" in memory.extracted_content
assert "message 29" in memory.extracted_content
def test_scoped_memory_saves_to_agent_root(self):
"""When memory has a root_scope, saves should go under /root/agent/<role>."""
memory = FakeMemory()
memory.root_scope = "/company"
agent = FakeAgent(role="Researcher", memory=memory)
task = FakeTask()
messages = [{"role": "user", "content": "Research AI trends"}]
executor = _build_executor(agent, task, messages)
output = AgentFinish(output="Report", text="Report", thought="")
executor._save_to_memory(output)
assert len(memory.remembered) > 0
def test_no_task_still_works(self):
"""Agent.kickoff() without a task should still extract from conversation."""
memory = FakeMemory()
agent = FakeAgent(memory=memory)
messages = [
{"role": "user", "content": "Remember that Joe prefers dark mode"},
{"role": "assistant", "content": "Got it, noted."},
]
executor = _build_executor(agent, task=None, messages=messages)
output = AgentFinish(output="Noted.", text="Noted.", thought="")
executor._save_to_memory(output)
assert "user: Remember that Joe prefers dark mode" in memory.extracted_content

2
uv.lock generated
View File

@@ -13,7 +13,7 @@ resolution-markers = [
]
[options]
exclude-newer = "2026-04-22T16:00:00Z"
exclude-newer = "2026-04-23T07:00:00Z"
[manifest]
members = [