Compare commits

...

1 Commits

Author SHA1 Message Date
Joao Moura
92d0694a54 feat: automatic root_scope for hierarchical memory isolation
Crews and flows now automatically scope their memories hierarchically.
The encoding flow's LLM-inferred scope becomes a sub-scope under the
structural root, preventing memory pollution across crews/agents.

Scope hierarchy:
  /crew/{crew_name}/agent/{agent_role}/{llm-inferred}
  /flow/{flow_name}/{llm-inferred}

Changes:
- Memory class: new root_scope field, passed through remember/remember_many
- EncodingFlow: prepends root_scope to resolved scope in both fast path
  (Group A) and LLM path (Group C/D)
- Crew: auto-sets root_scope=/crew/{sanitized_name} on memory creation
- Agent executor: extends crew root with /agent/{sanitized_role} per save
- Flow: auto-sets root_scope=/flow/{sanitized_name} on memory creation
- New utils: sanitize_scope_name, normalize_scope_path, join_scope_paths

Backward compatible — no root_scope means no prefix (existing behavior).
Old memories at '/' remain accessible.

51 new tests, all existing tests pass.
2026-03-23 19:59:31 -07:00
7 changed files with 1074 additions and 11 deletions

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from crewai.agents.parser import AgentFinish from crewai.agents.parser import AgentFinish
from crewai.memory.utils import sanitize_scope_name
from crewai.utilities.printer import Printer from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import sanitize_tool_name from crewai.utilities.string_utils import sanitize_tool_name
@@ -26,7 +27,12 @@ class CrewAgentExecutorMixin:
_printer: Printer = Printer() _printer: Printer = Printer()
def _save_to_memory(self, output: AgentFinish) -> None: def _save_to_memory(self, output: AgentFinish) -> None:
"""Save task result to unified memory (memory or crew._memory).""" """Save task result to unified memory (memory or crew._memory).
Extends the memory's root_scope with agent-specific path segment
(e.g., '/crew/research-crew/agent/researcher') so that agent memories
are scoped hierarchically under their crew.
"""
memory = getattr(self.agent, "memory", None) or ( memory = getattr(self.agent, "memory", None) or (
getattr(self.crew, "_memory", None) if self.crew else None getattr(self.crew, "_memory", None) if self.crew else None
) )
@@ -43,6 +49,20 @@ class CrewAgentExecutorMixin:
) )
extracted = memory.extract_memories(raw) extracted = memory.extract_memories(raw)
if extracted: if extracted:
memory.remember_many(extracted, agent_role=self.agent.role) # Build agent-specific root_scope that extends the crew's root
agent_role = self.agent.role or "unknown"
sanitized_role = sanitize_scope_name(agent_role)
# Get the memory's existing root_scope and extend with agent info
base_root = getattr(memory, "root_scope", None) or ""
# Construct agent root: base_root + /agent/<role>
agent_root = f"{base_root.rstrip('/')}/agent/{sanitized_role}"
# Ensure leading slash
if not agent_root.startswith("/"):
agent_root = "/" + agent_root
memory.remember_many(
extracted, agent_role=self.agent.role, root_scope=agent_root
)
except Exception as e: except Exception as e:
self.agent._logger.log("error", f"Failed to save to memory: {e}") self.agent._logger.log("error", f"Failed to save to memory: {e}")

View File

@@ -357,7 +357,18 @@ class Crew(FlowTrackable, BaseModel):
@model_validator(mode="after") @model_validator(mode="after")
def create_crew_memory(self) -> Crew: def create_crew_memory(self) -> Crew:
"""Initialize unified memory, respecting crew embedder config.""" """Initialize unified memory, respecting crew embedder config.
When memory is enabled, sets a hierarchical root_scope based on the
crew name (e.g. '/crew/research-crew') so that all memories saved by
this crew and its agents are organized under a consistent namespace.
"""
from crewai.memory.utils import sanitize_scope_name
# Compute sanitized crew name for root_scope
crew_name = sanitize_scope_name(self.name or "crew")
crew_root_scope = f"/crew/{crew_name}"
if self.memory is True: if self.memory is True:
from crewai.memory.unified_memory import Memory from crewai.memory.unified_memory import Memory
@@ -366,10 +377,13 @@ class Crew(FlowTrackable, BaseModel):
from crewai.rag.embeddings.factory import build_embedder from crewai.rag.embeddings.factory import build_embedder
embedder = build_embedder(self.embedder) # type: ignore[arg-type] embedder = build_embedder(self.embedder) # type: ignore[arg-type]
self._memory = Memory(embedder=embedder) self._memory = Memory(embedder=embedder, root_scope=crew_root_scope)
elif self.memory: elif self.memory:
# User passed a Memory / MemoryScope / MemorySlice instance # User passed a Memory / MemoryScope / MemorySlice instance
self._memory = self.memory self._memory = self.memory
# Set root_scope only if not already set (don't override user config)
if hasattr(self._memory, "root_scope") and self._memory.root_scope is None:
self._memory.root_scope = crew_root_scope
else: else:
self._memory = None self._memory = None

View File

@@ -905,7 +905,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Internal flows (RecallFlow, EncodingFlow) set _skip_auto_memory # Internal flows (RecallFlow, EncodingFlow) set _skip_auto_memory
# to avoid creating a wasteful standalone Memory instance. # to avoid creating a wasteful standalone Memory instance.
if self.memory is None and not getattr(self, "_skip_auto_memory", False): if self.memory is None and not getattr(self, "_skip_auto_memory", False):
self.memory = Memory() from crewai.memory.utils import sanitize_scope_name
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
self.memory = Memory(root_scope=f"/flow/{flow_name}")
# Register all flow-related methods # Register all flow-related methods
for method_name in dir(self): for method_name in dir(self):

View File

@@ -28,6 +28,7 @@ from crewai.memory.analyze import (
analyze_for_save, analyze_for_save,
) )
from crewai.memory.types import MemoryConfig, MemoryRecord, embed_texts from crewai.memory.types import MemoryConfig, MemoryRecord, embed_texts
from crewai.memory.utils import join_scope_paths
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -48,6 +49,8 @@ class ItemState(BaseModel):
importance: float | None = None importance: float | None = None
source: str | None = None source: str | None = None
private: bool = False private: bool = False
# Structural root scope prefix for hierarchical scoping
root_scope: str | None = None
# Resolved values # Resolved values
resolved_scope: str = "/" resolved_scope: str = "/"
resolved_categories: list[str] = Field(default_factory=list) resolved_categories: list[str] = Field(default_factory=list)
@@ -103,12 +106,24 @@ class EncodingFlow(Flow[EncodingState]):
llm: Any, llm: Any,
embedder: Any, embedder: Any,
config: MemoryConfig | None = None, config: MemoryConfig | None = None,
root_scope: str | None = None,
) -> None: ) -> None:
"""Initialize the encoding flow.
Args:
storage: Storage backend for persisting memories.
llm: LLM instance for analysis.
embedder: Embedder for generating vectors.
config: Optional memory configuration.
root_scope: Structural root scope prefix. LLM-inferred or explicit
scopes are nested under this root.
"""
super().__init__(suppress_flow_events=True) super().__init__(suppress_flow_events=True)
self._storage = storage self._storage = storage
self._llm = llm self._llm = llm
self._embedder = embedder self._embedder = embedder
self._config = config or MemoryConfig() self._config = config or MemoryConfig()
self._root_scope = root_scope
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Step 1: Batch embed (ONE embedder call) # Step 1: Batch embed (ONE embedder call)
@@ -321,7 +336,13 @@ class EncodingFlow(Flow[EncodingState]):
for i, future in save_futures.items(): for i, future in save_futures.items():
analysis = future.result() analysis = future.result()
item = items[i] item = items[i]
item.resolved_scope = item.scope or analysis.suggested_scope or "/" # Determine inner scope from explicit scope or LLM-inferred
inner_scope = item.scope or analysis.suggested_scope or "/"
# Join root_scope with inner scope if root_scope is set
if item.root_scope:
item.resolved_scope = join_scope_paths(item.root_scope, inner_scope)
else:
item.resolved_scope = inner_scope
item.resolved_categories = ( item.resolved_categories = (
item.categories item.categories
if item.categories is not None if item.categories is not None
@@ -353,8 +374,18 @@ class EncodingFlow(Flow[EncodingState]):
pool.shutdown(wait=False) pool.shutdown(wait=False)
def _apply_defaults(self, item: ItemState) -> None: def _apply_defaults(self, item: ItemState) -> None:
"""Apply caller values with config defaults (fast path).""" """Apply caller values with config defaults (fast path).
item.resolved_scope = item.scope or "/"
If root_scope is set, prepends it to the inner scope to create the
final resolved_scope.
"""
inner_scope = item.scope or "/"
# Join root_scope with inner scope if root_scope is set
if item.root_scope:
item.resolved_scope = join_scope_paths(item.root_scope, inner_scope)
else:
item.resolved_scope = inner_scope if inner_scope != "/" else "/"
item.resolved_categories = item.categories or [] item.resolved_categories = item.categories or []
item.resolved_metadata = item.metadata or {} item.resolved_metadata = item.metadata or {}
item.resolved_importance = ( item.resolved_importance = (

View File

@@ -126,6 +126,14 @@ class Memory(BaseModel):
default=False, default=False,
description="If True, remember() and remember_many() are silent no-ops.", description="If True, remember() and remember_many() are silent no-ops.",
) )
root_scope: str | None = Field(
default=None,
description=(
"Structural root scope prefix. When set, LLM-inferred or explicit scopes "
"are nested under this root. For example, a crew with root_scope='/crew/research' "
"will store memories at '/crew/research/<inferred_scope>'."
),
)
_config: MemoryConfig = PrivateAttr() _config: MemoryConfig = PrivateAttr()
_llm_instance: BaseLLM | None = PrivateAttr(default=None) _llm_instance: BaseLLM | None = PrivateAttr(default=None)
@@ -297,11 +305,26 @@ class Memory(BaseModel):
importance: float | None = None, importance: float | None = None,
source: str | None = None, source: str | None = None,
private: bool = False, private: bool = False,
root_scope: str | None = None,
) -> list[MemoryRecord]: ) -> list[MemoryRecord]:
"""Run the batch EncodingFlow for one or more items. No event emission. """Run the batch EncodingFlow for one or more items. No event emission.
This is the core encoding logic shared by ``remember()`` and This is the core encoding logic shared by ``remember()`` and
``remember_many()``. Events are managed by the calling method. ``remember_many()``. Events are managed by the calling method.
Args:
contents: List of text content to encode and store.
scope: Optional explicit scope (inner scope, nested under root_scope).
categories: Optional categories for all items.
metadata: Optional metadata for all items.
importance: Optional importance score for all items.
source: Optional source identifier for all items.
private: Whether items are private.
root_scope: Structural root scope prefix. LLM-inferred or explicit
scopes are nested under this root.
Returns:
List of created MemoryRecord instances.
""" """
from crewai.memory.encoding_flow import EncodingFlow from crewai.memory.encoding_flow import EncodingFlow
@@ -310,6 +333,7 @@ class Memory(BaseModel):
llm=self._llm, llm=self._llm,
embedder=self._embedder, embedder=self._embedder,
config=self._config, config=self._config,
root_scope=root_scope,
) )
items_input = [ items_input = [
{ {
@@ -320,6 +344,7 @@ class Memory(BaseModel):
"importance": importance, "importance": importance,
"source": source, "source": source,
"private": private, "private": private,
"root_scope": root_scope,
} }
for c in contents for c in contents
] ]
@@ -340,6 +365,7 @@ class Memory(BaseModel):
source: str | None = None, source: str | None = None,
private: bool = False, private: bool = False,
agent_role: str | None = None, agent_role: str | None = None,
root_scope: str | None = None,
) -> MemoryRecord | None: ) -> MemoryRecord | None:
"""Store a single item in memory (synchronous). """Store a single item in memory (synchronous).
@@ -349,13 +375,15 @@ class Memory(BaseModel):
Args: Args:
content: Text to remember. content: Text to remember.
scope: Optional scope path; inferred if None. scope: Optional scope path (inner scope); inferred if None.
categories: Optional categories; inferred if None. categories: Optional categories; inferred if None.
metadata: Optional metadata; merged with LLM-extracted if inferred. metadata: Optional metadata; merged with LLM-extracted if inferred.
importance: Optional importance 0-1; inferred if None. importance: Optional importance 0-1; inferred if None.
source: Optional provenance identifier (e.g. user ID, session ID). source: Optional provenance identifier (e.g. user ID, session ID).
private: If True, only visible to recall from the same source. private: If True, only visible to recall from the same source.
agent_role: Optional agent role for event metadata. agent_role: Optional agent role for event metadata.
root_scope: Optional root scope override. If provided, this overrides
the instance-level root_scope for this call only.
Returns: Returns:
The created MemoryRecord, or None if this memory is read-only. The created MemoryRecord, or None if this memory is read-only.
@@ -365,6 +393,10 @@ class Memory(BaseModel):
""" """
if self.read_only: if self.read_only:
return None return None
# Determine effective root_scope: per-call override takes precedence
effective_root = root_scope if root_scope is not None else self.root_scope
_source_type = "unified_memory" _source_type = "unified_memory"
try: try:
crewai_event_bus.emit( crewai_event_bus.emit(
@@ -388,6 +420,7 @@ class Memory(BaseModel):
importance, importance,
source, source,
private, private,
effective_root,
) )
records = future.result() records = future.result()
record = records[0] if records else None record = records[0] if records else None
@@ -426,6 +459,7 @@ class Memory(BaseModel):
source: str | None = None, source: str | None = None,
private: bool = False, private: bool = False,
agent_role: str | None = None, agent_role: str | None = None,
root_scope: str | None = None,
) -> list[MemoryRecord]: ) -> list[MemoryRecord]:
"""Store multiple items in memory (non-blocking). """Store multiple items in memory (non-blocking).
@@ -440,13 +474,15 @@ class Memory(BaseModel):
Args: Args:
contents: List of text items to remember. contents: List of text items to remember.
scope: Optional scope applied to all items. scope: Optional scope (inner scope) applied to all items.
categories: Optional categories applied to all items. categories: Optional categories applied to all items.
metadata: Optional metadata applied to all items. metadata: Optional metadata applied to all items.
importance: Optional importance applied to all items. importance: Optional importance applied to all items.
source: Optional provenance identifier applied to all items. source: Optional provenance identifier applied to all items.
private: Privacy flag applied to all items. private: Privacy flag applied to all items.
agent_role: Optional agent role for event metadata. agent_role: Optional agent role for event metadata.
root_scope: Optional root scope override. If provided, this overrides
the instance-level root_scope for this call only.
Returns: Returns:
Empty list (records are not available until the background save completes). Empty list (records are not available until the background save completes).
@@ -454,6 +490,9 @@ class Memory(BaseModel):
if not contents or self.read_only: if not contents or self.read_only:
return [] return []
# Determine effective root_scope: per-call override takes precedence
effective_root = root_scope if root_scope is not None else self.root_scope
self._submit_save( self._submit_save(
self._background_encode_batch, self._background_encode_batch,
contents, contents,
@@ -464,6 +503,7 @@ class Memory(BaseModel):
source, source,
private, private,
agent_role, agent_role,
effective_root,
) )
return [] return []
@@ -477,6 +517,7 @@ class Memory(BaseModel):
source: str | None, source: str | None,
private: bool, private: bool,
agent_role: str | None, agent_role: str | None,
root_scope: str | None = None,
) -> list[MemoryRecord]: ) -> list[MemoryRecord]:
"""Run the encoding pipeline in a background thread with event emission. """Run the encoding pipeline in a background thread with event emission.
@@ -486,6 +527,20 @@ class Memory(BaseModel):
All ``emit`` calls are wrapped in try/except to handle the case where All ``emit`` calls are wrapped in try/except to handle the case where
the event bus shuts down before the background save finishes (e.g. the event bus shuts down before the background save finishes (e.g.
during process exit). during process exit).
Args:
contents: List of text content to encode.
scope: Optional inner scope for all items.
categories: Optional categories for all items.
metadata: Optional metadata for all items.
importance: Optional importance for all items.
source: Optional source identifier for all items.
private: Whether items are private.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope prefix for hierarchical scoping.
Returns:
List of created MemoryRecord instances.
""" """
try: try:
crewai_event_bus.emit( crewai_event_bus.emit(
@@ -502,7 +557,14 @@ class Memory(BaseModel):
try: try:
start = time.perf_counter() start = time.perf_counter()
records = self._encode_batch( records = self._encode_batch(
contents, scope, categories, metadata, importance, source, private contents,
scope,
categories,
metadata,
importance,
source,
private,
root_scope,
) )
elapsed_ms = (time.perf_counter() - start) * 1000 elapsed_ms = (time.perf_counter() - start) * 1000
except RuntimeError: except RuntimeError:

View File

@@ -0,0 +1,110 @@
"""Utility functions for the unified memory system."""
from __future__ import annotations
import re
def sanitize_scope_name(name: str) -> str:
"""Sanitize a name for use in hierarchical scope paths.
Converts to lowercase, replaces non-alphanumeric chars (except underscore
and hyphen) with hyphens, collapses multiple hyphens, strips leading/trailing
hyphens.
Args:
name: The raw name to sanitize (e.g. crew name, agent role, flow class name).
Returns:
A sanitized string safe for use in scope paths. Returns 'unknown' if the
result would be empty.
Examples:
>>> sanitize_scope_name("Research Crew")
'research-crew'
>>> sanitize_scope_name("Agent #1 (Main)")
'agent-1-main'
>>> sanitize_scope_name("café_worker")
'caf-worker'
"""
if not name:
return "unknown"
name = name.lower().strip()
# Replace any character that's not alphanumeric, underscore, or hyphen with hyphen
name = re.sub(r"[^a-z0-9_-]", "-", name)
# Collapse multiple hyphens into one
name = re.sub(r"-+", "-", name)
# Strip leading/trailing hyphens
name = name.strip("-")
return name or "unknown"
def normalize_scope_path(path: str) -> str:
"""Normalize a scope path by removing double slashes and ensuring proper format.
Args:
path: The raw scope path (e.g. '/crew/MyCrewName//agent//role').
Returns:
A normalized path with leading slash, no trailing slash, no double slashes.
Returns '/' for empty or root-only paths.
Examples:
>>> normalize_scope_path("/crew/test//agent//")
'/crew/test/agent'
>>> normalize_scope_path("")
'/'
>>> normalize_scope_path("crew/test")
'/crew/test'
"""
if not path or path == "/":
return "/"
# Collapse multiple slashes
path = re.sub(r"/+", "/", path)
# Ensure leading slash
if not path.startswith("/"):
path = "/" + path
# Remove trailing slash (unless it's just '/')
if len(path) > 1:
path = path.rstrip("/")
return path
def join_scope_paths(root: str | None, inner: str | None) -> str:
"""Join a root scope with an inner scope, handling edge cases properly.
Args:
root: The root scope prefix (e.g. '/crew/research-crew').
inner: The inner scope (e.g. '/market-trends' or 'market-trends').
Returns:
The combined, normalized scope path.
Examples:
>>> join_scope_paths("/crew/test", "/market-trends")
'/crew/test/market-trends'
>>> join_scope_paths("/crew/test", "market-trends")
'/crew/test/market-trends'
>>> join_scope_paths("/crew/test", "/")
'/crew/test'
>>> join_scope_paths("/crew/test", None)
'/crew/test'
>>> join_scope_paths(None, "/market-trends")
'/market-trends'
>>> join_scope_paths(None, None)
'/'
"""
# Normalize both parts
root = root.rstrip("/") if root else ""
inner = inner.strip("/") if inner else ""
if root and inner:
result = f"{root}/{inner}"
elif root:
result = root
elif inner:
result = f"/{inner}"
else:
result = "/"
return normalize_scope_path(result)

View File

@@ -0,0 +1,823 @@
"""Tests for hierarchical root_scope functionality in unified memory.
Root scope is a structural prefix that is set automatically by crews and flows.
The LLM's encoding flow still infers a semantic inner scope, but the final
resolved scope = root_scope + '/' + llm_inferred_scope.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai.memory.types import MemoryRecord
from crewai.memory.utils import (
join_scope_paths,
normalize_scope_path,
sanitize_scope_name,
)
# --- Utility function tests ---
class TestSanitizeScopeName:
"""Tests for sanitize_scope_name utility."""
def test_simple_name(self) -> None:
assert sanitize_scope_name("research") == "research"
def test_name_with_spaces(self) -> None:
assert sanitize_scope_name("Research Crew") == "research-crew"
def test_name_with_special_chars(self) -> None:
assert sanitize_scope_name("Agent #1 (Main)") == "agent-1-main"
def test_name_with_unicode(self) -> None:
# Unicode characters get replaced with hyphens
result = sanitize_scope_name("café_worker")
# é becomes -, and the underscore is preserved, so café_worker -> caf-_worker
assert result == "caf-_worker"
def test_name_with_underscores(self) -> None:
# Underscores are preserved
assert sanitize_scope_name("test_agent") == "test_agent"
def test_name_with_hyphens(self) -> None:
assert sanitize_scope_name("my-crew") == "my-crew"
def test_multiple_spaces_collapsed(self) -> None:
assert sanitize_scope_name("foo bar") == "foo-bar"
def test_leading_trailing_spaces(self) -> None:
assert sanitize_scope_name(" crew ") == "crew"
def test_empty_string_returns_unknown(self) -> None:
assert sanitize_scope_name("") == "unknown"
def test_only_special_chars_returns_unknown(self) -> None:
assert sanitize_scope_name("@#$%") == "unknown"
def test_none_input_returns_unknown(self) -> None:
assert sanitize_scope_name(None) == "unknown" # type: ignore[arg-type]
class TestNormalizeScopePath:
"""Tests for normalize_scope_path utility."""
def test_simple_path(self) -> None:
assert normalize_scope_path("/crew/test") == "/crew/test"
def test_double_slashes_collapsed(self) -> None:
assert normalize_scope_path("/crew//test//agent") == "/crew/test/agent"
def test_trailing_slash_removed(self) -> None:
assert normalize_scope_path("/crew/test/") == "/crew/test"
def test_missing_leading_slash_added(self) -> None:
assert normalize_scope_path("crew/test") == "/crew/test"
def test_empty_string_returns_root(self) -> None:
assert normalize_scope_path("") == "/"
def test_root_only_returns_root(self) -> None:
assert normalize_scope_path("/") == "/"
def test_multiple_trailing_slashes(self) -> None:
assert normalize_scope_path("/crew///") == "/crew"
class TestJoinScopePaths:
"""Tests for join_scope_paths utility."""
def test_basic_join(self) -> None:
assert join_scope_paths("/crew/test", "/market-trends") == "/crew/test/market-trends"
def test_inner_without_leading_slash(self) -> None:
assert join_scope_paths("/crew/test", "market-trends") == "/crew/test/market-trends"
def test_root_with_trailing_slash(self) -> None:
assert join_scope_paths("/crew/test/", "/inner") == "/crew/test/inner"
def test_root_only_inner_slash(self) -> None:
assert join_scope_paths("/crew/test", "/") == "/crew/test"
def test_root_only_inner_none(self) -> None:
assert join_scope_paths("/crew/test", None) == "/crew/test"
def test_no_root_with_inner(self) -> None:
assert join_scope_paths(None, "/market-trends") == "/market-trends"
def test_both_none(self) -> None:
assert join_scope_paths(None, None) == "/"
def test_empty_strings(self) -> None:
assert join_scope_paths("", "") == "/"
def test_root_empty_inner_value(self) -> None:
assert join_scope_paths("", "inner") == "/inner"
# --- Memory root_scope tests ---
@pytest.fixture
def mock_embedder() -> MagicMock:
"""Embedder mock that returns one embedding per input text (batch-aware)."""
m = MagicMock()
m.side_effect = lambda texts: [[0.1] * 1536 for _ in texts]
return m
class TestMemoryRootScope:
"""Tests for Memory class root_scope field."""
def test_memory_with_root_scope_prepends_to_explicit_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When root_scope is set and explicit scope is provided, they combine."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/research-crew",
)
record = mem.remember(
"Test content",
scope="/market-trends",
categories=["test"],
importance=0.7,
)
assert record is not None
assert record.scope == "/crew/research-crew/market-trends"
def test_memory_without_root_scope_uses_explicit_scope_directly(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When root_scope is None, explicit scope is used as-is (backward compat)."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
record = mem.remember(
"Test content",
scope="/explicit",
categories=["test"],
importance=0.7,
)
assert record is not None
assert record.scope == "/explicit"
def test_memory_root_scope_with_llm_inferred_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When root_scope is set and scope is inferred by LLM, they combine."""
from crewai.memory.analyze import ExtractedMetadata, MemoryAnalysis
from crewai.memory.unified_memory import Memory
llm = MagicMock()
llm.supports_function_calling.return_value = True
llm.call.return_value = MemoryAnalysis(
suggested_scope="/quarterly-results",
categories=["finance"],
importance=0.8,
extracted_metadata=ExtractedMetadata(),
)
mem = Memory(
storage=str(tmp_path / "db"),
llm=llm,
embedder=mock_embedder,
root_scope="/flow/mypipeline",
)
# Don't provide scope - let LLM infer it
record = mem.remember("Q1 revenue was $1M")
assert record is not None
assert record.scope == "/flow/mypipeline/quarterly-results"
def test_memory_root_scope_per_call_override(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Per-call root_scope overrides instance-level root_scope."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/base",
)
record = mem.remember(
"Test content",
scope="/inner",
categories=["test"],
importance=0.7,
root_scope="/override/path", # Override instance-level
)
assert record is not None
assert record.scope == "/override/path/inner"
def test_remember_many_with_root_scope(
self, tmp_path: Path,
) -> None:
"""remember_many respects root_scope for all items."""
from crewai.memory.unified_memory import Memory
# Use distinct embeddings to avoid intra-batch dedup
call_count = 0
def distinct_embedder(texts: list[str]) -> list[list[float]]:
nonlocal call_count
result = []
for i, _ in enumerate(texts):
emb = [0.0] * 1536
emb[(call_count + i) % 1536] = 1.0
result.append(emb)
call_count += len(texts)
return result
mock_embedder = MagicMock(side_effect=distinct_embedder)
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/batch-crew",
)
mem.remember_many(
["Fact A", "Fact B"],
scope="/decisions",
categories=["test"],
importance=0.7,
)
mem.drain_writes()
records = mem.list_records()
assert len(records) == 2
for record in records:
assert record.scope == "/crew/batch-crew/decisions"
def test_remember_many_per_call_root_scope_override(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""remember_many accepts per-call root_scope override."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/default",
)
mem.remember_many(
["Fact A"],
scope="/inner",
categories=["test"],
importance=0.7,
root_scope="/agent/researcher", # Per-call override
)
mem.drain_writes()
records = mem.list_records()
assert len(records) == 1
assert records[0].scope == "/agent/researcher/inner"
class TestRootScopePathNormalization:
"""Tests for proper path normalization with root_scope."""
def test_no_double_slashes_in_result(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Final scope should not have double slashes."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/test/", # Trailing slash
)
record = mem.remember(
"Test",
scope="/inner/", # Both have slashes
categories=["test"],
importance=0.5,
)
assert record is not None
assert "//" not in record.scope
assert record.scope == "/crew/test/inner"
def test_leading_slash_always_present(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Final scope should always have leading slash."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="crew/test", # No leading slash
)
record = mem.remember(
"Test",
scope="inner", # No leading slash
categories=["test"],
importance=0.5,
)
assert record is not None
assert record.scope.startswith("/")
def test_root_scope_with_root_inner_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When inner scope is '/', result is just the root_scope."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/test",
)
record = mem.remember(
"Test",
scope="/", # Root scope
categories=["test"],
importance=0.5,
)
assert record is not None
assert record.scope == "/crew/test"
class TestCrewAutoScoping:
"""Tests for automatic root_scope assignment in Crew."""
def test_crew_memory_true_sets_root_scope(self) -> None:
"""Creating Crew with memory=True auto-sets root_scope."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
agent = Agent(
role="Researcher",
goal="Research",
backstory="Expert researcher",
llm="gpt-4o-mini",
)
task = Task(
description="Do research",
expected_output="Report",
agent=agent,
)
crew = Crew(
name="Research Crew",
agents=[agent],
tasks=[task],
memory=True,
)
assert crew._memory is not None
assert hasattr(crew._memory, "root_scope")
assert crew._memory.root_scope == "/crew/research-crew"
def test_crew_memory_instance_gets_root_scope_if_not_set(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""User-provided Memory instance gets root_scope if not already set."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.unified_memory import Memory
from crewai.task import Task
# Memory without root_scope
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
assert mem.root_scope is None
agent = Agent(
role="Tester",
goal="Test",
backstory="Tester",
llm="gpt-4o-mini",
)
task = Task(
description="Test",
expected_output="Results",
agent=agent,
)
crew = Crew(
name="Test Crew",
agents=[agent],
tasks=[task],
memory=mem,
)
assert crew._memory is mem
assert crew._memory.root_scope == "/crew/test-crew"
def test_crew_respects_existing_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""User-provided Memory with existing root_scope is not overwritten."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.unified_memory import Memory
from crewai.task import Task
# Memory with explicit root_scope
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/custom/path",
)
agent = Agent(
role="Tester",
goal="Test",
backstory="Tester",
llm="gpt-4o-mini",
)
task = Task(
description="Test",
expected_output="Results",
agent=agent,
)
crew = Crew(
name="Test Crew",
agents=[agent],
tasks=[task],
memory=mem,
)
assert crew._memory.root_scope == "/custom/path" # Not overwritten
def test_crew_sanitizes_name_for_root_scope(self) -> None:
"""Crew name with special chars is sanitized for root_scope."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
agent = Agent(
role="Agent",
goal="Goal",
backstory="Story",
llm="gpt-4o-mini",
)
task = Task(
description="Task",
expected_output="Output",
agent=agent,
)
crew = Crew(
name="My Awesome Crew #1!",
agents=[agent],
tasks=[task],
memory=True,
)
assert crew._memory.root_scope == "/crew/my-awesome-crew-1"
class TestAgentScopeExtension:
"""Tests for agent scope extension in BaseAgentExecutorMixin."""
def test_agent_save_extends_crew_root_scope(self) -> None:
"""Agent._save_to_memory extends crew's root_scope with agent info."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
mock_memory = MagicMock()
mock_memory.read_only = False
mock_memory.root_scope = "/crew/research-crew"
mock_memory.extract_memories.return_value = ["Fact A"]
mock_agent = MagicMock()
mock_agent.memory = mock_memory
mock_agent._logger = MagicMock()
mock_agent.role = "Researcher"
mock_task = MagicMock()
mock_task.description = "Research task"
mock_task.expected_output = "Report"
class MinimalExecutor(CrewAgentExecutorMixin):
crew = None
agent = mock_agent
task = mock_task
iterations = 0
max_iter = 1
messages = []
_i18n = MagicMock()
_printer = Printer()
executor = MinimalExecutor()
executor._save_to_memory(AgentFinish(thought="", output="Result", text="Result"))
mock_memory.remember_many.assert_called_once()
call_kwargs = mock_memory.remember_many.call_args.kwargs
assert call_kwargs["root_scope"] == "/crew/research-crew/agent/researcher"
def test_agent_save_sanitizes_role(self) -> None:
"""Agent role with special chars is sanitized for scope path."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
mock_memory = MagicMock()
mock_memory.read_only = False
mock_memory.root_scope = "/crew/test"
mock_memory.extract_memories.return_value = ["Fact"]
mock_agent = MagicMock()
mock_agent.memory = mock_memory
mock_agent._logger = MagicMock()
mock_agent.role = "Senior Research Analyst #1"
mock_task = MagicMock()
mock_task.description = "Task"
mock_task.expected_output = "Output"
class MinimalExecutor(CrewAgentExecutorMixin):
crew = None
agent = mock_agent
task = mock_task
iterations = 0
max_iter = 1
messages = []
_i18n = MagicMock()
_printer = Printer()
executor = MinimalExecutor()
executor._save_to_memory(AgentFinish(thought="", output="R", text="R"))
call_kwargs = mock_memory.remember_many.call_args.kwargs
assert call_kwargs["root_scope"] == "/crew/test/agent/senior-research-analyst-1"
class TestFlowAutoScoping:
"""Tests for automatic root_scope assignment in Flow."""
def test_flow_auto_memory_sets_root_scope(self) -> None:
"""Flow auto-creates memory with root_scope set to /flow/<class_name>."""
from crewai.flow.flow import Flow
from crewai.memory.unified_memory import Memory
class MyPipelineFlow(Flow):
pass
flow = MyPipelineFlow()
assert flow.memory is not None
assert isinstance(flow.memory, Memory)
assert flow.memory.root_scope == "/flow/mypipelineflow"
def test_flow_with_name_uses_name_for_root_scope(self) -> None:
"""Flow with custom name uses that name for root_scope."""
from crewai.flow.flow import Flow
from crewai.memory.unified_memory import Memory
class MyFlow(Flow):
name = "Custom Pipeline"
flow = MyFlow()
assert flow.memory is not None
assert isinstance(flow.memory, Memory)
assert flow.memory.root_scope == "/flow/custom-pipeline"
def test_flow_user_provided_memory_not_overwritten(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""User-provided memory on Flow is not modified."""
from crewai.flow.flow import Flow
from crewai.memory.unified_memory import Memory
user_memory = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/custom/scope",
)
class MyFlow(Flow):
memory = user_memory
flow = MyFlow()
assert flow.memory is user_memory
assert flow.memory.root_scope == "/custom/scope"
class TestBackwardCompatibility:
"""Tests ensuring backward compatibility with existing behavior."""
def test_memory_without_root_scope_works_normally(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Memory without root_scope behaves exactly as before."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
assert mem.root_scope is None
record = mem.remember(
"Test content",
scope="/explicit",
categories=["test"],
importance=0.7,
)
assert record.scope == "/explicit"
def test_crew_without_name_uses_default(self) -> None:
"""Crew without name uses 'crew' as default for root_scope."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
agent = Agent(
role="Agent",
goal="Goal",
backstory="Story",
llm="gpt-4o-mini",
)
task = Task(
description="Task",
expected_output="Output",
agent=agent,
)
# No name provided - uses default "crew"
crew = Crew(
agents=[agent],
tasks=[task],
memory=True,
)
assert crew._memory.root_scope == "/crew/crew"
def test_old_memories_at_root_still_accessible(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Old memories stored at '/' are still accessible."""
from crewai.memory.unified_memory import Memory
# Create memory and store at root (old behavior)
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
record = mem.remember(
"Old memory at root",
scope="/",
categories=["old"],
importance=0.5,
)
assert record.scope == "/"
# Recall from root should find it
matches = mem.recall("Old memory", scope="/", depth="shallow")
assert len(matches) >= 1
class TestEncodingFlowRootScope:
"""Tests for root_scope handling in EncodingFlow."""
def test_encoding_flow_fast_path_with_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Group A (fast path) items properly prepend root_scope."""
from crewai.memory.encoding_flow import ItemState
# Test _apply_defaults directly on an ItemState without going through Flow
# since Flow.state is a property without a setter
item = ItemState(
content="Test",
scope="/inner", # Explicit
categories=["cat"], # Explicit
importance=0.5, # Explicit
root_scope="/crew/test",
)
# Manually test the join_scope_paths logic that _apply_defaults uses
from crewai.memory.utils import join_scope_paths
inner_scope = item.scope or "/"
if item.root_scope:
resolved = join_scope_paths(item.root_scope, inner_scope)
else:
resolved = inner_scope
assert resolved == "/crew/test/inner"
def test_encoding_flow_llm_path_with_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Group C (LLM path) items properly prepend root_scope to inferred scope."""
from crewai.memory.analyze import ExtractedMetadata, MemoryAnalysis
from crewai.memory.unified_memory import Memory
llm = MagicMock()
llm.supports_function_calling.return_value = True
llm.call.return_value = MemoryAnalysis(
suggested_scope="/llm-inferred",
categories=["auto"],
importance=0.7,
extracted_metadata=ExtractedMetadata(),
)
mem = Memory(
storage=str(tmp_path / "db"),
llm=llm,
embedder=mock_embedder,
root_scope="/flow/pipeline",
)
# No explicit scope/categories/importance -> goes through LLM
record = mem.remember("Content for LLM analysis")
assert record is not None
assert record.scope == "/flow/pipeline/llm-inferred"
class TestMemoryScopeWithRootScope:
"""Tests for MemoryScope interaction with root_scope."""
def test_memory_scope_remembers_within_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""MemoryScope with underlying Memory that has root_scope works correctly."""
from crewai.memory.memory_scope import MemoryScope
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/test",
)
# Create a MemoryScope
scope = MemoryScope(memory=mem, root_path="/agent/1")
# Remember through the scope
record = scope.remember(
"Scoped content",
scope="/task", # Inner scope within MemoryScope
categories=["test"],
importance=0.5,
)
# The MemoryScope prepends its root_path, then Memory prepends root_scope
# MemoryScope.remember prepends /agent/1 to /task -> /agent/1/task
# Then Memory's root_scope /crew/test gets prepended by encoding flow
# Final: /crew/test/agent/1/task
assert record is not None
# Note: MemoryScope builds the scope before calling memory.remember
# So the scope it passes is /agent/1/task, which then gets root_scope prepended
assert record.scope.startswith("/crew/test/agent/1")