Compare commits

...

3 Commits

Author SHA1 Message Date
lorenzejay
88cbf6bd1a refactor: update MemoryPromptConfig to allow custom prompt strings
* Removed the static method for online people research and replaced it with a constructor for MemoryPromptConfig that accepts custom strings for save, extract, and query systems.
* Updated the corresponding test to validate the new configuration approach, ensuring flexibility in memory prompt handling.
2026-04-07 17:53:36 -07:00
Lorenze Jay
eeeb90c3a8 Merge branch 'main' into lorenze/imp/memory-prompt-influence 2026-04-07 17:50:11 -07:00
lorenzejay
1c9e8d21c0 feat: introduce MemoryPromptConfig for customizable memory prompts
* Added MemoryPromptConfig class to allow users to override default memory prompts for various operations (save, query, extract, consolidation).
* Updated relevant functions and classes to utilize the new configuration, enabling more flexible and context-specific memory handling.
* Enhanced tests to validate the functionality of the new prompt configuration and its integration within the memory processing flows.
2026-04-07 17:49:06 -07:00
10 changed files with 192 additions and 24 deletions

View File

@@ -81,6 +81,7 @@ _track_install_async()
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),
"MemoryPromptConfig": ("crewai.memory.types", "MemoryPromptConfig"),
}
@@ -233,6 +234,7 @@ __all__ = [
"Knowledge",
"LLMGuardrail",
"Memory",
"MemoryPromptConfig",
"PlanningConfig",
"Process",
"RuntimeState",

View File

@@ -8,7 +8,7 @@ from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from crewai.memory.types import MemoryRecord, ScopeInfo
from crewai.memory.types import MemoryPromptConfig, MemoryRecord, ScopeInfo
from crewai.utilities.i18n import get_i18n
@@ -140,19 +140,23 @@ class ConsolidationPlan(BaseModel):
)
def _get_prompt(key: str) -> str:
"""Retrieve a memory prompt from the i18n translations.
Args:
key: The prompt key under the "memory" section.
Returns:
The prompt string.
"""
def _memory_prompt_line(
memory_prompt: MemoryPromptConfig | None,
key: str,
) -> str:
"""Resolve one memory prompt: override string or bundled translation."""
if memory_prompt is not None:
raw = getattr(memory_prompt, key, None)
if isinstance(raw, str) and raw.strip():
return raw
return get_i18n().memory(key)
def extract_memories_from_content(content: str, llm: Any) -> list[str]:
def extract_memories_from_content(
content: str,
llm: Any,
memory_prompt: MemoryPromptConfig | None = None,
) -> list[str]:
"""Use the LLM to extract discrete memory statements from raw content.
This is a pure helper: it does NOT store anything. Callers should call
@@ -164,15 +168,21 @@ def extract_memories_from_content(content: str, llm: Any) -> list[str]:
Args:
content: Raw text (e.g. task description + result dump).
llm: The LLM instance to use.
memory_prompt: Optional per-step prompt strings (see ``MemoryPromptConfig``).
Returns:
List of short, self-contained memory statements (or [content] on failure).
"""
if not (content or "").strip():
return []
user = _get_prompt("extract_memories_user").format(content=content)
user = _memory_prompt_line(memory_prompt, "extract_memories_user").format(
content=content
)
messages = [
{"role": "system", "content": _get_prompt("extract_memories_system")},
{
"role": "system",
"content": _memory_prompt_line(memory_prompt, "extract_memories_system"),
},
{"role": "user", "content": user},
]
try:
@@ -202,6 +212,7 @@ def analyze_query(
available_scopes: list[str],
scope_info: ScopeInfo | None,
llm: Any,
memory_prompt: MemoryPromptConfig | None = None,
) -> QueryAnalysis:
"""Use the LLM to analyze a recall query.
@@ -212,6 +223,7 @@ def analyze_query(
available_scopes: Scope paths that exist in the store.
scope_info: Optional info about the current scope.
llm: The LLM instance to use.
memory_prompt: Optional per-step prompt strings.
Returns:
QueryAnalysis with keywords, suggested_scopes, complexity, recall_queries, time_filter.
@@ -219,13 +231,16 @@ def analyze_query(
scope_desc = ""
if scope_info:
scope_desc = f"Current scope has {scope_info.record_count} records, categories: {scope_info.categories}"
user = _get_prompt("query_user").format(
user = _memory_prompt_line(memory_prompt, "query_user").format(
query=query,
available_scopes=available_scopes or ["/"],
scope_desc=scope_desc,
)
messages = [
{"role": "system", "content": _get_prompt("query_system")},
{
"role": "system",
"content": _memory_prompt_line(memory_prompt, "query_system"),
},
{"role": "user", "content": user},
]
try:
@@ -269,6 +284,7 @@ def analyze_for_save(
existing_scopes: list[str],
existing_categories: list[str],
llm: Any,
memory_prompt: MemoryPromptConfig | None = None,
) -> MemoryAnalysis:
"""Infer scope, categories, importance, and metadata for a single memory.
@@ -280,17 +296,21 @@ def analyze_for_save(
existing_scopes: Current scope paths in the memory store.
existing_categories: Current categories in use.
llm: The LLM instance to use.
memory_prompt: Optional per-step prompt strings.
Returns:
MemoryAnalysis with suggested_scope, categories, importance, extracted_metadata.
"""
user = _get_prompt("save_user").format(
user = _memory_prompt_line(memory_prompt, "save_user").format(
content=content,
existing_scopes=existing_scopes or ["/"],
existing_categories=existing_categories or [],
)
messages = [
{"role": "system", "content": _get_prompt("save_system")},
{
"role": "system",
"content": _memory_prompt_line(memory_prompt, "save_system"),
},
{"role": "user", "content": user},
]
try:
@@ -322,6 +342,7 @@ def analyze_for_consolidation(
new_content: str,
existing_records: list[MemoryRecord],
llm: Any,
memory_prompt: MemoryPromptConfig | None = None,
) -> ConsolidationPlan:
"""Decide insert/update/delete for a single memory against similar existing records.
@@ -332,6 +353,7 @@ def analyze_for_consolidation(
new_content: The new content to store.
existing_records: Existing records that are semantically similar.
llm: The LLM instance to use.
memory_prompt: Optional per-step prompt strings.
Returns:
ConsolidationPlan with actions per record and whether to insert the new content.
@@ -345,12 +367,15 @@ def analyze_for_consolidation(
f"- id={r.id} | scope={r.scope} | importance={r.importance:.2f} | created={created}\n"
f" content: {r.content[:200]}{'...' if len(r.content) > 200 else ''}"
)
user = _get_prompt("consolidation_user").format(
user = _memory_prompt_line(memory_prompt, "consolidation_user").format(
new_content=new_content,
records_summary="\n\n".join(records_lines),
)
messages = [
{"role": "system", "content": _get_prompt("consolidation_system")},
{
"role": "system",
"content": _memory_prompt_line(memory_prompt, "consolidation_system"),
},
{"role": "user", "content": user},
]
try:

View File

@@ -314,6 +314,7 @@ class EncodingFlow(Flow[EncodingState]):
item.content,
list(item.similar_records),
self._llm,
self._config.memory_prompt,
)
elif not fields_provided and not has_similar:
# Group C: field resolution only
@@ -324,6 +325,7 @@ class EncodingFlow(Flow[EncodingState]):
existing_scopes,
existing_categories,
self._llm,
self._config.memory_prompt,
)
else:
# Group D: both in parallel
@@ -334,6 +336,7 @@ class EncodingFlow(Flow[EncodingState]):
existing_scopes,
existing_categories,
self._llm,
self._config.memory_prompt,
)
consol_futures[i] = pool.submit(
contextvars.copy_context().run,
@@ -341,6 +344,7 @@ class EncodingFlow(Flow[EncodingState]):
item.content,
list(item.similar_records),
self._llm,
self._config.memory_prompt,
)
# Collect field-resolution results

View File

@@ -227,6 +227,7 @@ class RecallFlow(Flow[RecallState]):
available,
scope_info,
self._llm,
self._config.memory_prompt,
)
self.state.query_analysis = analysis

View File

@@ -6,7 +6,7 @@ from datetime import datetime
from typing import Any
from uuid import uuid4
from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field
# When searching the vector store, we ask for more results than the caller
@@ -132,6 +132,28 @@ class ScopeInfo(BaseModel):
)
class MemoryPromptConfig(BaseModel):
"""Configuration for memory LLM prompts (like ``PlanningConfig`` for planning).
Field names match translation keys under ``memory`` in ``translations/en.json``.
When set, the string replaces the bundled prompt for that step; omitted keys
keep the default i18n text. Templates must include the same ``str.format``
placeholders as the defaults (e.g. ``save_user`` uses ``{content}``,
``{existing_scopes}``, ``{existing_categories}``).
"""
model_config = ConfigDict(extra="forbid")
save_system: str | None = None
save_user: str | None = None
query_system: str | None = None
query_user: str | None = None
extract_memories_system: str | None = None
extract_memories_user: str | None = None
consolidation_system: str | None = None
consolidation_user: str | None = None
class MemoryConfig(BaseModel):
"""Internal configuration for memory scoring, consolidation, and recall behavior.
@@ -141,6 +163,11 @@ class MemoryConfig(BaseModel):
compute_composite_score.
"""
memory_prompt: MemoryPromptConfig | None = Field(
default=None,
description="Per-step prompt strings overriding bundled memory prompts.",
)
# -- Composite score weights --
# The recall composite score is:
# semantic_weight * similarity + recency_weight * decay + importance_weight * importance

View File

@@ -9,7 +9,13 @@ import threading
import time
from typing import TYPE_CHECKING, Annotated, Any, Literal
from pydantic import BaseModel, ConfigDict, Field, PlainValidator, PrivateAttr
from pydantic import (
BaseModel,
ConfigDict,
Field,
PlainValidator,
PrivateAttr,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
@@ -26,6 +32,7 @@ from crewai.memory.storage.backend import StorageBackend
from crewai.memory.types import (
MemoryConfig,
MemoryMatch,
MemoryPromptConfig,
MemoryRecord,
ScopeInfo,
compute_composite_score,
@@ -59,6 +66,10 @@ class Memory(BaseModel):
Works without agent/crew. Uses LLM to infer scope, categories, importance on save.
Uses RecallFlow for adaptive-depth recall. Supports scope/slice views and
pluggable storage (LanceDB default).
Override LLM prompts per step via ``memory_prompt`` (same idea as
``PlanningConfig.system_prompt`` / ``plan_prompt``): set only the strings you
need; the rest stay on bundled translations.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -135,6 +146,13 @@ class Memory(BaseModel):
"will store memories at '/crew/research/<inferred_scope>'."
),
)
memory_prompt: MemoryPromptConfig | None = Field(
default=None,
description=(
"Optional prompt strings for save, query, extract, and consolidation steps. "
"See MemoryPromptConfig; unset fields use translations/en.json defaults."
),
)
_config: MemoryConfig = PrivateAttr()
_llm_instance: BaseLLM | None = PrivateAttr(default=None)
@@ -181,6 +199,7 @@ class Memory(BaseModel):
def model_post_init(self, __context: Any) -> None:
"""Initialize runtime state from field values."""
self._config = MemoryConfig(
memory_prompt=self.memory_prompt,
recency_weight=self.recency_weight,
semantic_weight=self.semantic_weight,
importance_weight=self.importance_weight,
@@ -638,7 +657,9 @@ class Memory(BaseModel):
Returns:
List of short, self-contained memory statements.
"""
return extract_memories_from_content(content, self._llm)
return extract_memories_from_content(
content, self._llm, self._config.memory_prompt
)
def recall(
self,

View File

@@ -51,6 +51,7 @@ from crewai.telemetry.utils import (
add_crew_and_task_attributes,
add_crew_attributes,
close_span,
crew_memory_span_attribute_value,
)
from crewai.utilities.logger_utils import suppress_warnings
from crewai.utilities.string_utils import sanitize_tool_name
@@ -280,7 +281,11 @@ class Telemetry:
self._add_attribute(span, "python_version", platform.python_version())
add_crew_attributes(span, crew, self._add_attribute)
self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(
span,
"crew_memory",
crew_memory_span_attribute_value(crew.memory),
)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))

View File

@@ -16,6 +16,19 @@ if TYPE_CHECKING:
from crewai.task import Task
def crew_memory_span_attribute_value(memory: Any) -> bool | str:
"""Serialize ``Crew.memory`` for OpenTelemetry span attributes.
OTLP only allows bool, str, bytes, int, float, and homogeneous sequences
of those types — not arbitrary objects like :class:`~crewai.memory.unified_memory.Memory`.
"""
if memory is None or memory is False:
return False
if memory is True:
return True
return type(memory).__name__
def add_agent_fingerprint_to_span(
span: Span, agent: Any, add_attribute_fn: Callable[[Span, str, Any], None]
) -> None:

View File

@@ -650,6 +650,58 @@ def test_remember_survives_llm_failure(
assert mem._storage.count() == 1
# --- Per-Memory prompt config (MemoryPromptConfig) ---
def test_memory_prompt_config_custom_strings() -> None:
"""Library stays domain-agnostic; apps pass their own MemoryPromptConfig."""
from crewai.memory.types import MemoryPromptConfig
po = MemoryPromptConfig(
save_system="Prefer categories: search_query, exa_search, result_domain.",
extract_memories_system="Record Exa queries and canonical URLs first.",
query_system="Distill recall_queries toward domains and past queries.",
)
assert "search_query" in (po.save_system or "")
assert "Exa" in (po.extract_memories_system or "")
assert "recall_queries" in (po.query_system or "")
def test_memory_prompt_overrides_save_system_used_in_analyze(tmp_path: Path) -> None:
from crewai.memory.analyze import analyze_for_save
from crewai.memory.types import MemoryPromptConfig
from crewai.memory.unified_memory import Memory
custom_system = "CUSTOM_SAVE_SYSTEM_OVERRIDE"
llm = MagicMock()
llm.supports_function_calling.return_value = False
llm.call.return_value = (
'{"suggested_scope": "/", "categories": [], "importance": 0.5, '
'"extracted_metadata": {"entities": [], "dates": [], "topics": []}}'
)
mem = Memory(
storage=str(tmp_path / "ov_db"),
embedder=MagicMock(),
llm=llm,
memory_prompt=MemoryPromptConfig(save_system=custom_system),
)
assert mem._config.memory_prompt is not None
assert mem._config.memory_prompt.save_system == custom_system
analyze_for_save(
"hello",
existing_scopes=["/"],
existing_categories=[],
llm=llm,
memory_prompt=mem._config.memory_prompt,
)
call_args = llm.call.call_args
messages = call_args[0][0]
assert messages[0]["role"] == "system"
assert messages[0]["content"] == custom_system
# --- Agent.kickoff() memory integration ---

View File

@@ -3,8 +3,9 @@ import threading
from unittest.mock import patch
import pytest
from crewai import Agent, Crew, Task
from crewai import Agent, Crew, Memory, Task
from crewai.telemetry import Telemetry
from crewai.telemetry.utils import crew_memory_span_attribute_value
from opentelemetry import trace
@@ -159,3 +160,20 @@ def test_no_signal_handler_traceback_in_non_main_thread():
mock_holder["logger"].debug.assert_any_call(
"Skipping signal handler registration: not running in main thread"
)
@pytest.mark.parametrize(
("memory", "expected"),
[
(False, False),
(None, False),
(True, True),
],
)
def test_crew_memory_span_attribute_value_primitives(memory, expected):
assert crew_memory_span_attribute_value(memory) is expected
def test_crew_memory_span_attribute_value_memory_instance():
"""Custom Memory instances must become a primitive string for OTLP."""
assert crew_memory_span_attribute_value(Memory()) == "Memory"