diff --git a/lib/crewai/src/crewai/__init__.py b/lib/crewai/src/crewai/__init__.py index e82b92511..ab47aa634 100644 --- a/lib/crewai/src/crewai/__init__.py +++ b/lib/crewai/src/crewai/__init__.py @@ -81,6 +81,7 @@ _track_install_async() _LAZY_IMPORTS: dict[str, tuple[str, str]] = { "Memory": ("crewai.memory.unified_memory", "Memory"), + "MemoryPromptConfig": ("crewai.memory.types", "MemoryPromptConfig"), } @@ -210,6 +211,7 @@ __all__ = [ "Knowledge", "LLMGuardrail", "Memory", + "MemoryPromptConfig", "PlanningConfig", "Process", "RuntimeState", diff --git a/lib/crewai/src/crewai/memory/analyze.py b/lib/crewai/src/crewai/memory/analyze.py index e700f4281..2709c57fc 100644 --- a/lib/crewai/src/crewai/memory/analyze.py +++ b/lib/crewai/src/crewai/memory/analyze.py @@ -8,7 +8,7 @@ from typing import Any from pydantic import BaseModel, ConfigDict, Field -from crewai.memory.types import MemoryRecord, ScopeInfo +from crewai.memory.types import MemoryPromptConfig, MemoryRecord, ScopeInfo from crewai.utilities.i18n import get_i18n @@ -140,19 +140,23 @@ class ConsolidationPlan(BaseModel): ) -def _get_prompt(key: str) -> str: - """Retrieve a memory prompt from the i18n translations. - - Args: - key: The prompt key under the "memory" section. - - Returns: - The prompt string. - """ +def _memory_prompt_line( + memory_prompt: MemoryPromptConfig | None, + key: str, +) -> str: + """Resolve one memory prompt: override string or bundled translation.""" + if memory_prompt is not None: + raw = getattr(memory_prompt, key, None) + if isinstance(raw, str) and raw.strip(): + return raw return get_i18n().memory(key) -def extract_memories_from_content(content: str, llm: Any) -> list[str]: +def extract_memories_from_content( + content: str, + llm: Any, + memory_prompt: MemoryPromptConfig | None = None, +) -> list[str]: """Use the LLM to extract discrete memory statements from raw content. This is a pure helper: it does NOT store anything. Callers should call @@ -164,15 +168,21 @@ def extract_memories_from_content(content: str, llm: Any) -> list[str]: Args: content: Raw text (e.g. task description + result dump). llm: The LLM instance to use. + memory_prompt: Optional per-step prompt strings (see ``MemoryPromptConfig``). Returns: List of short, self-contained memory statements (or [content] on failure). """ if not (content or "").strip(): return [] - user = _get_prompt("extract_memories_user").format(content=content) + user = _memory_prompt_line(memory_prompt, "extract_memories_user").format( + content=content + ) messages = [ - {"role": "system", "content": _get_prompt("extract_memories_system")}, + { + "role": "system", + "content": _memory_prompt_line(memory_prompt, "extract_memories_system"), + }, {"role": "user", "content": user}, ] try: @@ -202,6 +212,7 @@ def analyze_query( available_scopes: list[str], scope_info: ScopeInfo | None, llm: Any, + memory_prompt: MemoryPromptConfig | None = None, ) -> QueryAnalysis: """Use the LLM to analyze a recall query. @@ -212,6 +223,7 @@ def analyze_query( available_scopes: Scope paths that exist in the store. scope_info: Optional info about the current scope. llm: The LLM instance to use. + memory_prompt: Optional per-step prompt strings. Returns: QueryAnalysis with keywords, suggested_scopes, complexity, recall_queries, time_filter. @@ -219,13 +231,16 @@ def analyze_query( scope_desc = "" if scope_info: scope_desc = f"Current scope has {scope_info.record_count} records, categories: {scope_info.categories}" - user = _get_prompt("query_user").format( + user = _memory_prompt_line(memory_prompt, "query_user").format( query=query, available_scopes=available_scopes or ["/"], scope_desc=scope_desc, ) messages = [ - {"role": "system", "content": _get_prompt("query_system")}, + { + "role": "system", + "content": _memory_prompt_line(memory_prompt, "query_system"), + }, {"role": "user", "content": user}, ] try: @@ -269,6 +284,7 @@ def analyze_for_save( existing_scopes: list[str], existing_categories: list[str], llm: Any, + memory_prompt: MemoryPromptConfig | None = None, ) -> MemoryAnalysis: """Infer scope, categories, importance, and metadata for a single memory. @@ -280,17 +296,21 @@ def analyze_for_save( existing_scopes: Current scope paths in the memory store. existing_categories: Current categories in use. llm: The LLM instance to use. + memory_prompt: Optional per-step prompt strings. Returns: MemoryAnalysis with suggested_scope, categories, importance, extracted_metadata. """ - user = _get_prompt("save_user").format( + user = _memory_prompt_line(memory_prompt, "save_user").format( content=content, existing_scopes=existing_scopes or ["/"], existing_categories=existing_categories or [], ) messages = [ - {"role": "system", "content": _get_prompt("save_system")}, + { + "role": "system", + "content": _memory_prompt_line(memory_prompt, "save_system"), + }, {"role": "user", "content": user}, ] try: @@ -322,6 +342,7 @@ def analyze_for_consolidation( new_content: str, existing_records: list[MemoryRecord], llm: Any, + memory_prompt: MemoryPromptConfig | None = None, ) -> ConsolidationPlan: """Decide insert/update/delete for a single memory against similar existing records. @@ -332,6 +353,7 @@ def analyze_for_consolidation( new_content: The new content to store. existing_records: Existing records that are semantically similar. llm: The LLM instance to use. + memory_prompt: Optional per-step prompt strings. Returns: ConsolidationPlan with actions per record and whether to insert the new content. @@ -345,12 +367,15 @@ def analyze_for_consolidation( f"- id={r.id} | scope={r.scope} | importance={r.importance:.2f} | created={created}\n" f" content: {r.content[:200]}{'...' if len(r.content) > 200 else ''}" ) - user = _get_prompt("consolidation_user").format( + user = _memory_prompt_line(memory_prompt, "consolidation_user").format( new_content=new_content, records_summary="\n\n".join(records_lines), ) messages = [ - {"role": "system", "content": _get_prompt("consolidation_system")}, + { + "role": "system", + "content": _memory_prompt_line(memory_prompt, "consolidation_system"), + }, {"role": "user", "content": user}, ] try: diff --git a/lib/crewai/src/crewai/memory/encoding_flow.py b/lib/crewai/src/crewai/memory/encoding_flow.py index acd025d55..baf94c9c9 100644 --- a/lib/crewai/src/crewai/memory/encoding_flow.py +++ b/lib/crewai/src/crewai/memory/encoding_flow.py @@ -314,6 +314,7 @@ class EncodingFlow(Flow[EncodingState]): item.content, list(item.similar_records), self._llm, + self._config.memory_prompt, ) elif not fields_provided and not has_similar: # Group C: field resolution only @@ -324,6 +325,7 @@ class EncodingFlow(Flow[EncodingState]): existing_scopes, existing_categories, self._llm, + self._config.memory_prompt, ) else: # Group D: both in parallel @@ -334,6 +336,7 @@ class EncodingFlow(Flow[EncodingState]): existing_scopes, existing_categories, self._llm, + self._config.memory_prompt, ) consol_futures[i] = pool.submit( contextvars.copy_context().run, @@ -341,6 +344,7 @@ class EncodingFlow(Flow[EncodingState]): item.content, list(item.similar_records), self._llm, + self._config.memory_prompt, ) # Collect field-resolution results diff --git a/lib/crewai/src/crewai/memory/recall_flow.py b/lib/crewai/src/crewai/memory/recall_flow.py index 3a058f27b..3bc0b1a54 100644 --- a/lib/crewai/src/crewai/memory/recall_flow.py +++ b/lib/crewai/src/crewai/memory/recall_flow.py @@ -227,6 +227,7 @@ class RecallFlow(Flow[RecallState]): available, scope_info, self._llm, + self._config.memory_prompt, ) self.state.query_analysis = analysis diff --git a/lib/crewai/src/crewai/memory/types.py b/lib/crewai/src/crewai/memory/types.py index e787b569d..174ca2bf1 100644 --- a/lib/crewai/src/crewai/memory/types.py +++ b/lib/crewai/src/crewai/memory/types.py @@ -6,7 +6,7 @@ from datetime import datetime from typing import Any from uuid import uuid4 -from pydantic import BaseModel, Field +from pydantic import BaseModel, ConfigDict, Field # When searching the vector store, we ask for more results than the caller @@ -132,6 +132,126 @@ class ScopeInfo(BaseModel): ) +class MemoryPromptConfig(BaseModel): + """Configuration for memory LLM prompts (like ``PlanningConfig`` for planning). + + Field names match translation keys under ``memory`` in ``translations/en.json``. + When set, the string replaces the bundled prompt for that step; omitted keys + keep the default i18n text. Templates must include the same ``str.format`` + placeholders as the defaults (e.g. ``save_user`` uses ``{content}``, + ``{existing_scopes}``, ``{existing_categories}``). + """ + + model_config = ConfigDict(extra="forbid") + + save_system: str | None = None + save_user: str | None = None + query_system: str | None = None + query_user: str | None = None + extract_memories_system: str | None = None + extract_memories_user: str | None = None + consolidation_system: str | None = None + consolidation_user: str | None = None + + @classmethod + def for_online_people_research(cls) -> MemoryPromptConfig: + """Prompt profile for open-web people research (e.g. Exa). + + Biases memory toward **search provenance** (queries, domains, URLs, dead + ends, strong sources) and away from storing fragile biographical detail + or speculation as high-confidence fact. Pair with ``EXASearchTool`` and + explicit scopes/categories in ``remember()`` when you need determinism. + """ + return cls( + save_system=( + "You analyze content for a hierarchical memory used during " + "online people research.\n" + "Prioritize durable *research-process* signals over raw biographical claims:\n" + "- suggested_scope: use paths like /subjects/, /searches/queries, " + "/sources/domains when the content is about where or how you searched.\n" + "- categories: include tags such as search_query, result_domain, " + "source_url, exa_search, dead_end, preferred_source, methodology " + "when applicable. Use subject_profile only for well-sourced factual " + "snippets tied to a named URL or document - not for gossip or guesses.\n" + "- importance: higher (0.7-1.0) for reusable search lessons " + "(e.g. 'site X returned 404', 'LinkedIn useful for role Y', " + "'query phrasing Z worked'). Lower (0.2-0.5) for one-off personal " + "minutiae or unverified claims.\n" + "- extracted_metadata.entities: prefer domains, publication names, " + "and search keywords; only list people when the text explicitly " + "anchors a fact to a source.\n" + "Given the content and existing scopes and categories, output:\n" + "1. suggested_scope\n2. categories\n3. importance\n4. extracted_metadata" + ), + save_user=( + "Content to store:\n{content}\n\n" + "Existing scopes: {existing_scopes}\n" + "Existing categories: {existing_categories}\n\n" + "Return the analysis as structured output." + ), + query_system=( + "You analyze a query against memory for people-research crews.\n" + "Favor recalling *how and where* the team already searched:\n" + "- keywords: include domains, site names, query stems, and subject " + "handles - not only the person's name.\n" + "- suggested_scopes: prefer /searches, /sources, /subjects as relevant.\n" + "- recall_queries: produce phrases that would match memories about " + "Exa queries, domains tried, and conclusions about source quality.\n" + "- complexity: 'complex' if the question asks to synthesize many sources; " + "else 'simple'.\n" + "- time_filter: ISO date if the query implies a time window; else null.\n" + "Given the query and available scopes, output:\n" + "1. keywords\n2. suggested_scopes\n3. complexity\n4. recall_queries\n" + "5. time_filter" + ), + query_user=( + "Query: {query}\n\n" + "Available scopes: {available_scopes}\n" + "{scope_desc}\n\n" + "Return the analysis as structured output." + ), + extract_memories_system=( + "Extract discrete memory statements from research logs (tool outputs, " + "Exa results, agent notes).\n" + "HIGH priority - always capture when present:\n" + "- Exact or paraphrased search queries used and which engine/tool " + "(e.g. Exa).\n" + "- Domains and canonical URLs of pages opened; HTTP errors or empty results.\n" + "- Judgments about source usefulness (e.g. 'official bio on company site', " + "'social profile - verify elsewhere').\n" + "MEDIUM priority:\n" + "- Verifiable facts explicitly tied to a source in the text " + "(quote which source).\n" + "LOW priority / often skip:\n" + "- Speculation, reputation gossip, or detailed private life unless " + "the task explicitly requires it and a primary source is named.\n" + "Each memory: one clear sentence, no duplicate ideas.\n" + 'Output JSON with a single key "memories" whose value is a list of strings.' + ), + extract_memories_user=( + "Content:\n{content}\n\n" + "Extract memory statements as described. Return structured output." + ), + consolidation_system=( + "You consolidate new research content with similar existing memories.\n" + "Prefer keeping separate memories when they differ by URL, domain, or " + "search query - even if about the same person.\n" + "Merge or update when the new text clearly refines the same source " + "or same search finding.\n" + "Delete only when a memory is clearly wrong and superseded.\n" + "Be conservative: prefer 'keep' when unsure.\n" + "For each existing memory: keep | update | delete. " + "Set insert_new when the new content adds a distinct query, URL, or " + "source judgment." + ), + consolidation_user=( + "New content to consider storing:\n{new_content}\n\n" + "Existing similar memories:\n{records_summary}\n\n" + "Return the consolidation plan as structured output." + ), + ) + + class MemoryConfig(BaseModel): """Internal configuration for memory scoring, consolidation, and recall behavior. @@ -141,6 +261,11 @@ class MemoryConfig(BaseModel): compute_composite_score. """ + memory_prompt: MemoryPromptConfig | None = Field( + default=None, + description="Per-step prompt strings overriding bundled memory prompts.", + ) + # -- Composite score weights -- # The recall composite score is: # semantic_weight * similarity + recency_weight * decay + importance_weight * importance diff --git a/lib/crewai/src/crewai/memory/unified_memory.py b/lib/crewai/src/crewai/memory/unified_memory.py index d879bace0..bfc1eec94 100644 --- a/lib/crewai/src/crewai/memory/unified_memory.py +++ b/lib/crewai/src/crewai/memory/unified_memory.py @@ -9,7 +9,13 @@ import threading import time from typing import TYPE_CHECKING, Annotated, Any, Literal -from pydantic import BaseModel, ConfigDict, Field, PlainValidator, PrivateAttr +from pydantic import ( + BaseModel, + ConfigDict, + Field, + PlainValidator, + PrivateAttr, +) from crewai.events.event_bus import crewai_event_bus from crewai.events.types.memory_events import ( @@ -26,6 +32,7 @@ from crewai.memory.storage.backend import StorageBackend from crewai.memory.types import ( MemoryConfig, MemoryMatch, + MemoryPromptConfig, MemoryRecord, ScopeInfo, compute_composite_score, @@ -59,6 +66,10 @@ class Memory(BaseModel): Works without agent/crew. Uses LLM to infer scope, categories, importance on save. Uses RecallFlow for adaptive-depth recall. Supports scope/slice views and pluggable storage (LanceDB default). + + Override LLM prompts per step via ``memory_prompt`` (same idea as + ``PlanningConfig.system_prompt`` / ``plan_prompt``): set only the strings you + need; the rest stay on bundled translations. """ model_config = ConfigDict(arbitrary_types_allowed=True) @@ -135,6 +146,13 @@ class Memory(BaseModel): "will store memories at '/crew/research/'." ), ) + memory_prompt: MemoryPromptConfig | None = Field( + default=None, + description=( + "Optional prompt strings for save, query, extract, and consolidation steps. " + "See MemoryPromptConfig; unset fields use translations/en.json defaults." + ), + ) _config: MemoryConfig = PrivateAttr() _llm_instance: BaseLLM | None = PrivateAttr(default=None) @@ -181,6 +199,7 @@ class Memory(BaseModel): def model_post_init(self, __context: Any) -> None: """Initialize runtime state from field values.""" self._config = MemoryConfig( + memory_prompt=self.memory_prompt, recency_weight=self.recency_weight, semantic_weight=self.semantic_weight, importance_weight=self.importance_weight, @@ -638,7 +657,9 @@ class Memory(BaseModel): Returns: List of short, self-contained memory statements. """ - return extract_memories_from_content(content, self._llm) + return extract_memories_from_content( + content, self._llm, self._config.memory_prompt + ) def recall( self, diff --git a/lib/crewai/src/crewai/telemetry/telemetry.py b/lib/crewai/src/crewai/telemetry/telemetry.py index 7809c5b4c..c93a06ff2 100644 --- a/lib/crewai/src/crewai/telemetry/telemetry.py +++ b/lib/crewai/src/crewai/telemetry/telemetry.py @@ -51,6 +51,7 @@ from crewai.telemetry.utils import ( add_crew_and_task_attributes, add_crew_attributes, close_span, + crew_memory_span_attribute_value, ) from crewai.utilities.logger_utils import suppress_warnings from crewai.utilities.string_utils import sanitize_tool_name @@ -280,7 +281,11 @@ class Telemetry: self._add_attribute(span, "python_version", platform.python_version()) add_crew_attributes(span, crew, self._add_attribute) self._add_attribute(span, "crew_process", crew.process) - self._add_attribute(span, "crew_memory", crew.memory) + self._add_attribute( + span, + "crew_memory", + crew_memory_span_attribute_value(crew.memory), + ) self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks)) self._add_attribute(span, "crew_number_of_agents", len(crew.agents)) diff --git a/lib/crewai/src/crewai/telemetry/utils.py b/lib/crewai/src/crewai/telemetry/utils.py index c6b649a30..0e818d94d 100644 --- a/lib/crewai/src/crewai/telemetry/utils.py +++ b/lib/crewai/src/crewai/telemetry/utils.py @@ -16,6 +16,19 @@ if TYPE_CHECKING: from crewai.task import Task +def crew_memory_span_attribute_value(memory: Any) -> bool | str: + """Serialize ``Crew.memory`` for OpenTelemetry span attributes. + + OTLP only allows bool, str, bytes, int, float, and homogeneous sequences + of those types — not arbitrary objects like :class:`~crewai.memory.unified_memory.Memory`. + """ + if memory is None or memory is False: + return False + if memory is True: + return True + return type(memory).__name__ + + def add_agent_fingerprint_to_span( span: Span, agent: Any, add_attribute_fn: Callable[[Span, str, Any], None] ) -> None: diff --git a/lib/crewai/tests/memory/test_unified_memory.py b/lib/crewai/tests/memory/test_unified_memory.py index f36bf0c2b..7c7ef4345 100644 --- a/lib/crewai/tests/memory/test_unified_memory.py +++ b/lib/crewai/tests/memory/test_unified_memory.py @@ -664,6 +664,53 @@ def test_remember_survives_llm_failure( assert mem._storage.count() == 1 +# --- Per-Memory prompt config (MemoryPromptConfig) --- + + +def test_memory_prompt_overrides_for_online_people_research() -> None: + from crewai.memory.types import MemoryPromptConfig + + po = MemoryPromptConfig.for_online_people_research() + assert po.save_system and "search_query" in po.save_system + assert po.extract_memories_system and "Exa" in po.extract_memories_system + assert po.query_system and "recall_queries" in po.query_system + + +def test_memory_prompt_overrides_save_system_used_in_analyze(tmp_path: Path) -> None: + from crewai.memory.analyze import analyze_for_save + from crewai.memory.types import MemoryPromptConfig + from crewai.memory.unified_memory import Memory + + custom_system = "CUSTOM_SAVE_SYSTEM_OVERRIDE" + llm = MagicMock() + llm.supports_function_calling.return_value = False + llm.call.return_value = ( + '{"suggested_scope": "/", "categories": [], "importance": 0.5, ' + '"extracted_metadata": {"entities": [], "dates": [], "topics": []}}' + ) + + mem = Memory( + storage=str(tmp_path / "ov_db"), + embedder=MagicMock(), + llm=llm, + memory_prompt=MemoryPromptConfig(save_system=custom_system), + ) + assert mem._config.memory_prompt is not None + assert mem._config.memory_prompt.save_system == custom_system + + analyze_for_save( + "hello", + existing_scopes=["/"], + existing_categories=[], + llm=llm, + memory_prompt=mem._config.memory_prompt, + ) + call_args = llm.call.call_args + messages = call_args[0][0] + assert messages[0]["role"] == "system" + assert messages[0]["content"] == custom_system + + # --- Agent.kickoff() memory integration --- diff --git a/lib/crewai/tests/telemetry/test_telemetry.py b/lib/crewai/tests/telemetry/test_telemetry.py index d0564982d..c72f0fd44 100644 --- a/lib/crewai/tests/telemetry/test_telemetry.py +++ b/lib/crewai/tests/telemetry/test_telemetry.py @@ -3,8 +3,9 @@ import threading from unittest.mock import patch import pytest -from crewai import Agent, Crew, Task +from crewai import Agent, Crew, Memory, Task from crewai.telemetry import Telemetry +from crewai.telemetry.utils import crew_memory_span_attribute_value from opentelemetry import trace @@ -159,3 +160,20 @@ def test_no_signal_handler_traceback_in_non_main_thread(): mock_holder["logger"].debug.assert_any_call( "Skipping signal handler registration: not running in main thread" ) + + +@pytest.mark.parametrize( + ("memory", "expected"), + [ + (False, False), + (None, False), + (True, True), + ], +) +def test_crew_memory_span_attribute_value_primitives(memory, expected): + assert crew_memory_span_attribute_value(memory) is expected + + +def test_crew_memory_span_attribute_value_memory_instance(): + """Custom Memory instances must become a primitive string for OTLP.""" + assert crew_memory_span_attribute_value(Memory()) == "Memory"