Compare commits

...

1 Commits

Author SHA1 Message Date
Joao Moura
92d0694a54 feat: automatic root_scope for hierarchical memory isolation
Crews and flows now automatically scope their memories hierarchically.
The encoding flow's LLM-inferred scope becomes a sub-scope under the
structural root, preventing memory pollution across crews/agents.

Scope hierarchy:
  /crew/{crew_name}/agent/{agent_role}/{llm-inferred}
  /flow/{flow_name}/{llm-inferred}

Changes:
- Memory class: new root_scope field, passed through remember/remember_many
- EncodingFlow: prepends root_scope to resolved scope in both fast path
  (Group A) and LLM path (Group C/D)
- Crew: auto-sets root_scope=/crew/{sanitized_name} on memory creation
- Agent executor: extends crew root with /agent/{sanitized_role} per save
- Flow: auto-sets root_scope=/flow/{sanitized_name} on memory creation
- New utils: sanitize_scope_name, normalize_scope_path, join_scope_paths

Backward compatible — no root_scope means no prefix (existing behavior).
Old memories at '/' remain accessible.

51 new tests, all existing tests pass.
2026-03-23 19:59:31 -07:00
7 changed files with 1074 additions and 11 deletions

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from crewai.agents.parser import AgentFinish
from crewai.memory.utils import sanitize_scope_name
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import sanitize_tool_name
@@ -26,7 +27,12 @@ class CrewAgentExecutorMixin:
_printer: Printer = Printer()
def _save_to_memory(self, output: AgentFinish) -> None:
"""Save task result to unified memory (memory or crew._memory)."""
"""Save task result to unified memory (memory or crew._memory).
Extends the memory's root_scope with agent-specific path segment
(e.g., '/crew/research-crew/agent/researcher') so that agent memories
are scoped hierarchically under their crew.
"""
memory = getattr(self.agent, "memory", None) or (
getattr(self.crew, "_memory", None) if self.crew else None
)
@@ -43,6 +49,20 @@ class CrewAgentExecutorMixin:
)
extracted = memory.extract_memories(raw)
if extracted:
memory.remember_many(extracted, agent_role=self.agent.role)
# Build agent-specific root_scope that extends the crew's root
agent_role = self.agent.role or "unknown"
sanitized_role = sanitize_scope_name(agent_role)
# Get the memory's existing root_scope and extend with agent info
base_root = getattr(memory, "root_scope", None) or ""
# Construct agent root: base_root + /agent/<role>
agent_root = f"{base_root.rstrip('/')}/agent/{sanitized_role}"
# Ensure leading slash
if not agent_root.startswith("/"):
agent_root = "/" + agent_root
memory.remember_many(
extracted, agent_role=self.agent.role, root_scope=agent_root
)
except Exception as e:
self.agent._logger.log("error", f"Failed to save to memory: {e}")

View File

@@ -357,7 +357,18 @@ class Crew(FlowTrackable, BaseModel):
@model_validator(mode="after")
def create_crew_memory(self) -> Crew:
"""Initialize unified memory, respecting crew embedder config."""
"""Initialize unified memory, respecting crew embedder config.
When memory is enabled, sets a hierarchical root_scope based on the
crew name (e.g. '/crew/research-crew') so that all memories saved by
this crew and its agents are organized under a consistent namespace.
"""
from crewai.memory.utils import sanitize_scope_name
# Compute sanitized crew name for root_scope
crew_name = sanitize_scope_name(self.name or "crew")
crew_root_scope = f"/crew/{crew_name}"
if self.memory is True:
from crewai.memory.unified_memory import Memory
@@ -366,10 +377,13 @@ class Crew(FlowTrackable, BaseModel):
from crewai.rag.embeddings.factory import build_embedder
embedder = build_embedder(self.embedder) # type: ignore[arg-type]
self._memory = Memory(embedder=embedder)
self._memory = Memory(embedder=embedder, root_scope=crew_root_scope)
elif self.memory:
# User passed a Memory / MemoryScope / MemorySlice instance
self._memory = self.memory
# Set root_scope only if not already set (don't override user config)
if hasattr(self._memory, "root_scope") and self._memory.root_scope is None:
self._memory.root_scope = crew_root_scope
else:
self._memory = None

View File

@@ -905,7 +905,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Internal flows (RecallFlow, EncodingFlow) set _skip_auto_memory
# to avoid creating a wasteful standalone Memory instance.
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
self.memory = Memory()
from crewai.memory.utils import sanitize_scope_name
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
self.memory = Memory(root_scope=f"/flow/{flow_name}")
# Register all flow-related methods
for method_name in dir(self):

View File

@@ -28,6 +28,7 @@ from crewai.memory.analyze import (
analyze_for_save,
)
from crewai.memory.types import MemoryConfig, MemoryRecord, embed_texts
from crewai.memory.utils import join_scope_paths
logger = logging.getLogger(__name__)
@@ -48,6 +49,8 @@ class ItemState(BaseModel):
importance: float | None = None
source: str | None = None
private: bool = False
# Structural root scope prefix for hierarchical scoping
root_scope: str | None = None
# Resolved values
resolved_scope: str = "/"
resolved_categories: list[str] = Field(default_factory=list)
@@ -103,12 +106,24 @@ class EncodingFlow(Flow[EncodingState]):
llm: Any,
embedder: Any,
config: MemoryConfig | None = None,
root_scope: str | None = None,
) -> None:
"""Initialize the encoding flow.
Args:
storage: Storage backend for persisting memories.
llm: LLM instance for analysis.
embedder: Embedder for generating vectors.
config: Optional memory configuration.
root_scope: Structural root scope prefix. LLM-inferred or explicit
scopes are nested under this root.
"""
super().__init__(suppress_flow_events=True)
self._storage = storage
self._llm = llm
self._embedder = embedder
self._config = config or MemoryConfig()
self._root_scope = root_scope
# ------------------------------------------------------------------
# Step 1: Batch embed (ONE embedder call)
@@ -321,7 +336,13 @@ class EncodingFlow(Flow[EncodingState]):
for i, future in save_futures.items():
analysis = future.result()
item = items[i]
item.resolved_scope = item.scope or analysis.suggested_scope or "/"
# Determine inner scope from explicit scope or LLM-inferred
inner_scope = item.scope or analysis.suggested_scope or "/"
# Join root_scope with inner scope if root_scope is set
if item.root_scope:
item.resolved_scope = join_scope_paths(item.root_scope, inner_scope)
else:
item.resolved_scope = inner_scope
item.resolved_categories = (
item.categories
if item.categories is not None
@@ -353,8 +374,18 @@ class EncodingFlow(Flow[EncodingState]):
pool.shutdown(wait=False)
def _apply_defaults(self, item: ItemState) -> None:
"""Apply caller values with config defaults (fast path)."""
item.resolved_scope = item.scope or "/"
"""Apply caller values with config defaults (fast path).
If root_scope is set, prepends it to the inner scope to create the
final resolved_scope.
"""
inner_scope = item.scope or "/"
# Join root_scope with inner scope if root_scope is set
if item.root_scope:
item.resolved_scope = join_scope_paths(item.root_scope, inner_scope)
else:
item.resolved_scope = inner_scope if inner_scope != "/" else "/"
item.resolved_categories = item.categories or []
item.resolved_metadata = item.metadata or {}
item.resolved_importance = (

View File

@@ -126,6 +126,14 @@ class Memory(BaseModel):
default=False,
description="If True, remember() and remember_many() are silent no-ops.",
)
root_scope: str | None = Field(
default=None,
description=(
"Structural root scope prefix. When set, LLM-inferred or explicit scopes "
"are nested under this root. For example, a crew with root_scope='/crew/research' "
"will store memories at '/crew/research/<inferred_scope>'."
),
)
_config: MemoryConfig = PrivateAttr()
_llm_instance: BaseLLM | None = PrivateAttr(default=None)
@@ -297,11 +305,26 @@ class Memory(BaseModel):
importance: float | None = None,
source: str | None = None,
private: bool = False,
root_scope: str | None = None,
) -> list[MemoryRecord]:
"""Run the batch EncodingFlow for one or more items. No event emission.
This is the core encoding logic shared by ``remember()`` and
``remember_many()``. Events are managed by the calling method.
Args:
contents: List of text content to encode and store.
scope: Optional explicit scope (inner scope, nested under root_scope).
categories: Optional categories for all items.
metadata: Optional metadata for all items.
importance: Optional importance score for all items.
source: Optional source identifier for all items.
private: Whether items are private.
root_scope: Structural root scope prefix. LLM-inferred or explicit
scopes are nested under this root.
Returns:
List of created MemoryRecord instances.
"""
from crewai.memory.encoding_flow import EncodingFlow
@@ -310,6 +333,7 @@ class Memory(BaseModel):
llm=self._llm,
embedder=self._embedder,
config=self._config,
root_scope=root_scope,
)
items_input = [
{
@@ -320,6 +344,7 @@ class Memory(BaseModel):
"importance": importance,
"source": source,
"private": private,
"root_scope": root_scope,
}
for c in contents
]
@@ -340,6 +365,7 @@ class Memory(BaseModel):
source: str | None = None,
private: bool = False,
agent_role: str | None = None,
root_scope: str | None = None,
) -> MemoryRecord | None:
"""Store a single item in memory (synchronous).
@@ -349,13 +375,15 @@ class Memory(BaseModel):
Args:
content: Text to remember.
scope: Optional scope path; inferred if None.
scope: Optional scope path (inner scope); inferred if None.
categories: Optional categories; inferred if None.
metadata: Optional metadata; merged with LLM-extracted if inferred.
importance: Optional importance 0-1; inferred if None.
source: Optional provenance identifier (e.g. user ID, session ID).
private: If True, only visible to recall from the same source.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope override. If provided, this overrides
the instance-level root_scope for this call only.
Returns:
The created MemoryRecord, or None if this memory is read-only.
@@ -365,6 +393,10 @@ class Memory(BaseModel):
"""
if self.read_only:
return None
# Determine effective root_scope: per-call override takes precedence
effective_root = root_scope if root_scope is not None else self.root_scope
_source_type = "unified_memory"
try:
crewai_event_bus.emit(
@@ -388,6 +420,7 @@ class Memory(BaseModel):
importance,
source,
private,
effective_root,
)
records = future.result()
record = records[0] if records else None
@@ -426,6 +459,7 @@ class Memory(BaseModel):
source: str | None = None,
private: bool = False,
agent_role: str | None = None,
root_scope: str | None = None,
) -> list[MemoryRecord]:
"""Store multiple items in memory (non-blocking).
@@ -440,13 +474,15 @@ class Memory(BaseModel):
Args:
contents: List of text items to remember.
scope: Optional scope applied to all items.
scope: Optional scope (inner scope) applied to all items.
categories: Optional categories applied to all items.
metadata: Optional metadata applied to all items.
importance: Optional importance applied to all items.
source: Optional provenance identifier applied to all items.
private: Privacy flag applied to all items.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope override. If provided, this overrides
the instance-level root_scope for this call only.
Returns:
Empty list (records are not available until the background save completes).
@@ -454,6 +490,9 @@ class Memory(BaseModel):
if not contents or self.read_only:
return []
# Determine effective root_scope: per-call override takes precedence
effective_root = root_scope if root_scope is not None else self.root_scope
self._submit_save(
self._background_encode_batch,
contents,
@@ -464,6 +503,7 @@ class Memory(BaseModel):
source,
private,
agent_role,
effective_root,
)
return []
@@ -477,6 +517,7 @@ class Memory(BaseModel):
source: str | None,
private: bool,
agent_role: str | None,
root_scope: str | None = None,
) -> list[MemoryRecord]:
"""Run the encoding pipeline in a background thread with event emission.
@@ -486,6 +527,20 @@ class Memory(BaseModel):
All ``emit`` calls are wrapped in try/except to handle the case where
the event bus shuts down before the background save finishes (e.g.
during process exit).
Args:
contents: List of text content to encode.
scope: Optional inner scope for all items.
categories: Optional categories for all items.
metadata: Optional metadata for all items.
importance: Optional importance for all items.
source: Optional source identifier for all items.
private: Whether items are private.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope prefix for hierarchical scoping.
Returns:
List of created MemoryRecord instances.
"""
try:
crewai_event_bus.emit(
@@ -502,7 +557,14 @@ class Memory(BaseModel):
try:
start = time.perf_counter()
records = self._encode_batch(
contents, scope, categories, metadata, importance, source, private
contents,
scope,
categories,
metadata,
importance,
source,
private,
root_scope,
)
elapsed_ms = (time.perf_counter() - start) * 1000
except RuntimeError:

View File

@@ -0,0 +1,110 @@
"""Utility functions for the unified memory system."""
from __future__ import annotations
import re
def sanitize_scope_name(name: str) -> str:
"""Sanitize a name for use in hierarchical scope paths.
Converts to lowercase, replaces non-alphanumeric chars (except underscore
and hyphen) with hyphens, collapses multiple hyphens, strips leading/trailing
hyphens.
Args:
name: The raw name to sanitize (e.g. crew name, agent role, flow class name).
Returns:
A sanitized string safe for use in scope paths. Returns 'unknown' if the
result would be empty.
Examples:
>>> sanitize_scope_name("Research Crew")
'research-crew'
>>> sanitize_scope_name("Agent #1 (Main)")
'agent-1-main'
>>> sanitize_scope_name("café_worker")
'caf-worker'
"""
if not name:
return "unknown"
name = name.lower().strip()
# Replace any character that's not alphanumeric, underscore, or hyphen with hyphen
name = re.sub(r"[^a-z0-9_-]", "-", name)
# Collapse multiple hyphens into one
name = re.sub(r"-+", "-", name)
# Strip leading/trailing hyphens
name = name.strip("-")
return name or "unknown"
def normalize_scope_path(path: str) -> str:
"""Normalize a scope path by removing double slashes and ensuring proper format.
Args:
path: The raw scope path (e.g. '/crew/MyCrewName//agent//role').
Returns:
A normalized path with leading slash, no trailing slash, no double slashes.
Returns '/' for empty or root-only paths.
Examples:
>>> normalize_scope_path("/crew/test//agent//")
'/crew/test/agent'
>>> normalize_scope_path("")
'/'
>>> normalize_scope_path("crew/test")
'/crew/test'
"""
if not path or path == "/":
return "/"
# Collapse multiple slashes
path = re.sub(r"/+", "/", path)
# Ensure leading slash
if not path.startswith("/"):
path = "/" + path
# Remove trailing slash (unless it's just '/')
if len(path) > 1:
path = path.rstrip("/")
return path
def join_scope_paths(root: str | None, inner: str | None) -> str:
"""Join a root scope with an inner scope, handling edge cases properly.
Args:
root: The root scope prefix (e.g. '/crew/research-crew').
inner: The inner scope (e.g. '/market-trends' or 'market-trends').
Returns:
The combined, normalized scope path.
Examples:
>>> join_scope_paths("/crew/test", "/market-trends")
'/crew/test/market-trends'
>>> join_scope_paths("/crew/test", "market-trends")
'/crew/test/market-trends'
>>> join_scope_paths("/crew/test", "/")
'/crew/test'
>>> join_scope_paths("/crew/test", None)
'/crew/test'
>>> join_scope_paths(None, "/market-trends")
'/market-trends'
>>> join_scope_paths(None, None)
'/'
"""
# Normalize both parts
root = root.rstrip("/") if root else ""
inner = inner.strip("/") if inner else ""
if root and inner:
result = f"{root}/{inner}"
elif root:
result = root
elif inner:
result = f"/{inner}"
else:
result = "/"
return normalize_scope_path(result)

View File

@@ -0,0 +1,823 @@
"""Tests for hierarchical root_scope functionality in unified memory.
Root scope is a structural prefix that is set automatically by crews and flows.
The LLM's encoding flow still infers a semantic inner scope, but the final
resolved scope = root_scope + '/' + llm_inferred_scope.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai.memory.types import MemoryRecord
from crewai.memory.utils import (
join_scope_paths,
normalize_scope_path,
sanitize_scope_name,
)
# --- Utility function tests ---
class TestSanitizeScopeName:
"""Tests for sanitize_scope_name utility."""
def test_simple_name(self) -> None:
assert sanitize_scope_name("research") == "research"
def test_name_with_spaces(self) -> None:
assert sanitize_scope_name("Research Crew") == "research-crew"
def test_name_with_special_chars(self) -> None:
assert sanitize_scope_name("Agent #1 (Main)") == "agent-1-main"
def test_name_with_unicode(self) -> None:
# Unicode characters get replaced with hyphens
result = sanitize_scope_name("café_worker")
# é becomes -, and the underscore is preserved, so café_worker -> caf-_worker
assert result == "caf-_worker"
def test_name_with_underscores(self) -> None:
# Underscores are preserved
assert sanitize_scope_name("test_agent") == "test_agent"
def test_name_with_hyphens(self) -> None:
assert sanitize_scope_name("my-crew") == "my-crew"
def test_multiple_spaces_collapsed(self) -> None:
assert sanitize_scope_name("foo bar") == "foo-bar"
def test_leading_trailing_spaces(self) -> None:
assert sanitize_scope_name(" crew ") == "crew"
def test_empty_string_returns_unknown(self) -> None:
assert sanitize_scope_name("") == "unknown"
def test_only_special_chars_returns_unknown(self) -> None:
assert sanitize_scope_name("@#$%") == "unknown"
def test_none_input_returns_unknown(self) -> None:
assert sanitize_scope_name(None) == "unknown" # type: ignore[arg-type]
class TestNormalizeScopePath:
"""Tests for normalize_scope_path utility."""
def test_simple_path(self) -> None:
assert normalize_scope_path("/crew/test") == "/crew/test"
def test_double_slashes_collapsed(self) -> None:
assert normalize_scope_path("/crew//test//agent") == "/crew/test/agent"
def test_trailing_slash_removed(self) -> None:
assert normalize_scope_path("/crew/test/") == "/crew/test"
def test_missing_leading_slash_added(self) -> None:
assert normalize_scope_path("crew/test") == "/crew/test"
def test_empty_string_returns_root(self) -> None:
assert normalize_scope_path("") == "/"
def test_root_only_returns_root(self) -> None:
assert normalize_scope_path("/") == "/"
def test_multiple_trailing_slashes(self) -> None:
assert normalize_scope_path("/crew///") == "/crew"
class TestJoinScopePaths:
"""Tests for join_scope_paths utility."""
def test_basic_join(self) -> None:
assert join_scope_paths("/crew/test", "/market-trends") == "/crew/test/market-trends"
def test_inner_without_leading_slash(self) -> None:
assert join_scope_paths("/crew/test", "market-trends") == "/crew/test/market-trends"
def test_root_with_trailing_slash(self) -> None:
assert join_scope_paths("/crew/test/", "/inner") == "/crew/test/inner"
def test_root_only_inner_slash(self) -> None:
assert join_scope_paths("/crew/test", "/") == "/crew/test"
def test_root_only_inner_none(self) -> None:
assert join_scope_paths("/crew/test", None) == "/crew/test"
def test_no_root_with_inner(self) -> None:
assert join_scope_paths(None, "/market-trends") == "/market-trends"
def test_both_none(self) -> None:
assert join_scope_paths(None, None) == "/"
def test_empty_strings(self) -> None:
assert join_scope_paths("", "") == "/"
def test_root_empty_inner_value(self) -> None:
assert join_scope_paths("", "inner") == "/inner"
# --- Memory root_scope tests ---
@pytest.fixture
def mock_embedder() -> MagicMock:
"""Embedder mock that returns one embedding per input text (batch-aware)."""
m = MagicMock()
m.side_effect = lambda texts: [[0.1] * 1536 for _ in texts]
return m
class TestMemoryRootScope:
"""Tests for Memory class root_scope field."""
def test_memory_with_root_scope_prepends_to_explicit_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When root_scope is set and explicit scope is provided, they combine."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/research-crew",
)
record = mem.remember(
"Test content",
scope="/market-trends",
categories=["test"],
importance=0.7,
)
assert record is not None
assert record.scope == "/crew/research-crew/market-trends"
def test_memory_without_root_scope_uses_explicit_scope_directly(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When root_scope is None, explicit scope is used as-is (backward compat)."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
record = mem.remember(
"Test content",
scope="/explicit",
categories=["test"],
importance=0.7,
)
assert record is not None
assert record.scope == "/explicit"
def test_memory_root_scope_with_llm_inferred_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When root_scope is set and scope is inferred by LLM, they combine."""
from crewai.memory.analyze import ExtractedMetadata, MemoryAnalysis
from crewai.memory.unified_memory import Memory
llm = MagicMock()
llm.supports_function_calling.return_value = True
llm.call.return_value = MemoryAnalysis(
suggested_scope="/quarterly-results",
categories=["finance"],
importance=0.8,
extracted_metadata=ExtractedMetadata(),
)
mem = Memory(
storage=str(tmp_path / "db"),
llm=llm,
embedder=mock_embedder,
root_scope="/flow/mypipeline",
)
# Don't provide scope - let LLM infer it
record = mem.remember("Q1 revenue was $1M")
assert record is not None
assert record.scope == "/flow/mypipeline/quarterly-results"
def test_memory_root_scope_per_call_override(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Per-call root_scope overrides instance-level root_scope."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/base",
)
record = mem.remember(
"Test content",
scope="/inner",
categories=["test"],
importance=0.7,
root_scope="/override/path", # Override instance-level
)
assert record is not None
assert record.scope == "/override/path/inner"
def test_remember_many_with_root_scope(
self, tmp_path: Path,
) -> None:
"""remember_many respects root_scope for all items."""
from crewai.memory.unified_memory import Memory
# Use distinct embeddings to avoid intra-batch dedup
call_count = 0
def distinct_embedder(texts: list[str]) -> list[list[float]]:
nonlocal call_count
result = []
for i, _ in enumerate(texts):
emb = [0.0] * 1536
emb[(call_count + i) % 1536] = 1.0
result.append(emb)
call_count += len(texts)
return result
mock_embedder = MagicMock(side_effect=distinct_embedder)
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/batch-crew",
)
mem.remember_many(
["Fact A", "Fact B"],
scope="/decisions",
categories=["test"],
importance=0.7,
)
mem.drain_writes()
records = mem.list_records()
assert len(records) == 2
for record in records:
assert record.scope == "/crew/batch-crew/decisions"
def test_remember_many_per_call_root_scope_override(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""remember_many accepts per-call root_scope override."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/default",
)
mem.remember_many(
["Fact A"],
scope="/inner",
categories=["test"],
importance=0.7,
root_scope="/agent/researcher", # Per-call override
)
mem.drain_writes()
records = mem.list_records()
assert len(records) == 1
assert records[0].scope == "/agent/researcher/inner"
class TestRootScopePathNormalization:
"""Tests for proper path normalization with root_scope."""
def test_no_double_slashes_in_result(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Final scope should not have double slashes."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/test/", # Trailing slash
)
record = mem.remember(
"Test",
scope="/inner/", # Both have slashes
categories=["test"],
importance=0.5,
)
assert record is not None
assert "//" not in record.scope
assert record.scope == "/crew/test/inner"
def test_leading_slash_always_present(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Final scope should always have leading slash."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="crew/test", # No leading slash
)
record = mem.remember(
"Test",
scope="inner", # No leading slash
categories=["test"],
importance=0.5,
)
assert record is not None
assert record.scope.startswith("/")
def test_root_scope_with_root_inner_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When inner scope is '/', result is just the root_scope."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/test",
)
record = mem.remember(
"Test",
scope="/", # Root scope
categories=["test"],
importance=0.5,
)
assert record is not None
assert record.scope == "/crew/test"
class TestCrewAutoScoping:
"""Tests for automatic root_scope assignment in Crew."""
def test_crew_memory_true_sets_root_scope(self) -> None:
"""Creating Crew with memory=True auto-sets root_scope."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
agent = Agent(
role="Researcher",
goal="Research",
backstory="Expert researcher",
llm="gpt-4o-mini",
)
task = Task(
description="Do research",
expected_output="Report",
agent=agent,
)
crew = Crew(
name="Research Crew",
agents=[agent],
tasks=[task],
memory=True,
)
assert crew._memory is not None
assert hasattr(crew._memory, "root_scope")
assert crew._memory.root_scope == "/crew/research-crew"
def test_crew_memory_instance_gets_root_scope_if_not_set(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""User-provided Memory instance gets root_scope if not already set."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.unified_memory import Memory
from crewai.task import Task
# Memory without root_scope
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
assert mem.root_scope is None
agent = Agent(
role="Tester",
goal="Test",
backstory="Tester",
llm="gpt-4o-mini",
)
task = Task(
description="Test",
expected_output="Results",
agent=agent,
)
crew = Crew(
name="Test Crew",
agents=[agent],
tasks=[task],
memory=mem,
)
assert crew._memory is mem
assert crew._memory.root_scope == "/crew/test-crew"
def test_crew_respects_existing_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""User-provided Memory with existing root_scope is not overwritten."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.unified_memory import Memory
from crewai.task import Task
# Memory with explicit root_scope
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/custom/path",
)
agent = Agent(
role="Tester",
goal="Test",
backstory="Tester",
llm="gpt-4o-mini",
)
task = Task(
description="Test",
expected_output="Results",
agent=agent,
)
crew = Crew(
name="Test Crew",
agents=[agent],
tasks=[task],
memory=mem,
)
assert crew._memory.root_scope == "/custom/path" # Not overwritten
def test_crew_sanitizes_name_for_root_scope(self) -> None:
"""Crew name with special chars is sanitized for root_scope."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
agent = Agent(
role="Agent",
goal="Goal",
backstory="Story",
llm="gpt-4o-mini",
)
task = Task(
description="Task",
expected_output="Output",
agent=agent,
)
crew = Crew(
name="My Awesome Crew #1!",
agents=[agent],
tasks=[task],
memory=True,
)
assert crew._memory.root_scope == "/crew/my-awesome-crew-1"
class TestAgentScopeExtension:
"""Tests for agent scope extension in BaseAgentExecutorMixin."""
def test_agent_save_extends_crew_root_scope(self) -> None:
"""Agent._save_to_memory extends crew's root_scope with agent info."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
mock_memory = MagicMock()
mock_memory.read_only = False
mock_memory.root_scope = "/crew/research-crew"
mock_memory.extract_memories.return_value = ["Fact A"]
mock_agent = MagicMock()
mock_agent.memory = mock_memory
mock_agent._logger = MagicMock()
mock_agent.role = "Researcher"
mock_task = MagicMock()
mock_task.description = "Research task"
mock_task.expected_output = "Report"
class MinimalExecutor(CrewAgentExecutorMixin):
crew = None
agent = mock_agent
task = mock_task
iterations = 0
max_iter = 1
messages = []
_i18n = MagicMock()
_printer = Printer()
executor = MinimalExecutor()
executor._save_to_memory(AgentFinish(thought="", output="Result", text="Result"))
mock_memory.remember_many.assert_called_once()
call_kwargs = mock_memory.remember_many.call_args.kwargs
assert call_kwargs["root_scope"] == "/crew/research-crew/agent/researcher"
def test_agent_save_sanitizes_role(self) -> None:
"""Agent role with special chars is sanitized for scope path."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
mock_memory = MagicMock()
mock_memory.read_only = False
mock_memory.root_scope = "/crew/test"
mock_memory.extract_memories.return_value = ["Fact"]
mock_agent = MagicMock()
mock_agent.memory = mock_memory
mock_agent._logger = MagicMock()
mock_agent.role = "Senior Research Analyst #1"
mock_task = MagicMock()
mock_task.description = "Task"
mock_task.expected_output = "Output"
class MinimalExecutor(CrewAgentExecutorMixin):
crew = None
agent = mock_agent
task = mock_task
iterations = 0
max_iter = 1
messages = []
_i18n = MagicMock()
_printer = Printer()
executor = MinimalExecutor()
executor._save_to_memory(AgentFinish(thought="", output="R", text="R"))
call_kwargs = mock_memory.remember_many.call_args.kwargs
assert call_kwargs["root_scope"] == "/crew/test/agent/senior-research-analyst-1"
class TestFlowAutoScoping:
"""Tests for automatic root_scope assignment in Flow."""
def test_flow_auto_memory_sets_root_scope(self) -> None:
"""Flow auto-creates memory with root_scope set to /flow/<class_name>."""
from crewai.flow.flow import Flow
from crewai.memory.unified_memory import Memory
class MyPipelineFlow(Flow):
pass
flow = MyPipelineFlow()
assert flow.memory is not None
assert isinstance(flow.memory, Memory)
assert flow.memory.root_scope == "/flow/mypipelineflow"
def test_flow_with_name_uses_name_for_root_scope(self) -> None:
"""Flow with custom name uses that name for root_scope."""
from crewai.flow.flow import Flow
from crewai.memory.unified_memory import Memory
class MyFlow(Flow):
name = "Custom Pipeline"
flow = MyFlow()
assert flow.memory is not None
assert isinstance(flow.memory, Memory)
assert flow.memory.root_scope == "/flow/custom-pipeline"
def test_flow_user_provided_memory_not_overwritten(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""User-provided memory on Flow is not modified."""
from crewai.flow.flow import Flow
from crewai.memory.unified_memory import Memory
user_memory = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/custom/scope",
)
class MyFlow(Flow):
memory = user_memory
flow = MyFlow()
assert flow.memory is user_memory
assert flow.memory.root_scope == "/custom/scope"
class TestBackwardCompatibility:
"""Tests ensuring backward compatibility with existing behavior."""
def test_memory_without_root_scope_works_normally(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Memory without root_scope behaves exactly as before."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
assert mem.root_scope is None
record = mem.remember(
"Test content",
scope="/explicit",
categories=["test"],
importance=0.7,
)
assert record.scope == "/explicit"
def test_crew_without_name_uses_default(self) -> None:
"""Crew without name uses 'crew' as default for root_scope."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
agent = Agent(
role="Agent",
goal="Goal",
backstory="Story",
llm="gpt-4o-mini",
)
task = Task(
description="Task",
expected_output="Output",
agent=agent,
)
# No name provided - uses default "crew"
crew = Crew(
agents=[agent],
tasks=[task],
memory=True,
)
assert crew._memory.root_scope == "/crew/crew"
def test_old_memories_at_root_still_accessible(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Old memories stored at '/' are still accessible."""
from crewai.memory.unified_memory import Memory
# Create memory and store at root (old behavior)
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
record = mem.remember(
"Old memory at root",
scope="/",
categories=["old"],
importance=0.5,
)
assert record.scope == "/"
# Recall from root should find it
matches = mem.recall("Old memory", scope="/", depth="shallow")
assert len(matches) >= 1
class TestEncodingFlowRootScope:
"""Tests for root_scope handling in EncodingFlow."""
def test_encoding_flow_fast_path_with_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Group A (fast path) items properly prepend root_scope."""
from crewai.memory.encoding_flow import ItemState
# Test _apply_defaults directly on an ItemState without going through Flow
# since Flow.state is a property without a setter
item = ItemState(
content="Test",
scope="/inner", # Explicit
categories=["cat"], # Explicit
importance=0.5, # Explicit
root_scope="/crew/test",
)
# Manually test the join_scope_paths logic that _apply_defaults uses
from crewai.memory.utils import join_scope_paths
inner_scope = item.scope or "/"
if item.root_scope:
resolved = join_scope_paths(item.root_scope, inner_scope)
else:
resolved = inner_scope
assert resolved == "/crew/test/inner"
def test_encoding_flow_llm_path_with_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Group C (LLM path) items properly prepend root_scope to inferred scope."""
from crewai.memory.analyze import ExtractedMetadata, MemoryAnalysis
from crewai.memory.unified_memory import Memory
llm = MagicMock()
llm.supports_function_calling.return_value = True
llm.call.return_value = MemoryAnalysis(
suggested_scope="/llm-inferred",
categories=["auto"],
importance=0.7,
extracted_metadata=ExtractedMetadata(),
)
mem = Memory(
storage=str(tmp_path / "db"),
llm=llm,
embedder=mock_embedder,
root_scope="/flow/pipeline",
)
# No explicit scope/categories/importance -> goes through LLM
record = mem.remember("Content for LLM analysis")
assert record is not None
assert record.scope == "/flow/pipeline/llm-inferred"
class TestMemoryScopeWithRootScope:
"""Tests for MemoryScope interaction with root_scope."""
def test_memory_scope_remembers_within_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""MemoryScope with underlying Memory that has root_scope works correctly."""
from crewai.memory.memory_scope import MemoryScope
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/test",
)
# Create a MemoryScope
scope = MemoryScope(memory=mem, root_path="/agent/1")
# Remember through the scope
record = scope.remember(
"Scoped content",
scope="/task", # Inner scope within MemoryScope
categories=["test"],
importance=0.5,
)
# The MemoryScope prepends its root_path, then Memory prepends root_scope
# MemoryScope.remember prepends /agent/1 to /task -> /agent/1/task
# Then Memory's root_scope /crew/test gets prepended by encoding flow
# Final: /crew/test/agent/1/task
assert record is not None
# Note: MemoryScope builds the scope before calling memory.remember
# So the scope it passes is /agent/1/task, which then gets root_scope prepended
assert record.scope.startswith("/crew/test/agent/1")