feat: automatic root_scope for hierarchical memory isolation (#5035)

* feat: automatic root_scope for hierarchical memory isolation

Crews and flows now automatically scope their memories hierarchically.
The encoding flow's LLM-inferred scope becomes a sub-scope under the
structural root, preventing memory pollution across crews/agents.

Scope hierarchy:
  /crew/{crew_name}/agent/{agent_role}/{llm-inferred}
  /flow/{flow_name}/{llm-inferred}

Changes:
- Memory class: new root_scope field, passed through remember/remember_many
- EncodingFlow: prepends root_scope to resolved scope in both fast path
  (Group A) and LLM path (Group C/D)
- Crew: auto-sets root_scope=/crew/{sanitized_name} on memory creation
- Agent executor: extends crew root with /agent/{sanitized_role} per save
- Flow: auto-sets root_scope=/flow/{sanitized_name} on memory creation
- New utils: sanitize_scope_name, normalize_scope_path, join_scope_paths

Backward compatible — no root_scope means no prefix (existing behavior).
Old memories at '/' remain accessible.

51 new tests, all existing tests pass.

* ci: retrigger tests

* fix: don't auto-set root_scope on user-provided Memory instances

When users pass their own Memory instance to a Crew (memory=mem),
respect their configuration — don't auto-set root_scope.
Auto-scoping only applies when memory=True (Crew creates Memory).

Fixes: test_crew_memory_with_google_vertex_embedder which passes
Memory(embedder=...) to Crew and expects remember(scope='/test')
to produce scope '/test', not '/crew/crew/test'.

* fix: address 6 review comments — true scope isolation for reads, writes, and consolidation

1. Constrain similarity search to root_scope boundary (no cross-crew consolidation)
2. Remove unused self._root_scope from EncodingFlow
3. Apply root_scope to recall/list/info/reset (true read isolation)
4. Only extend agent root_scope when crew has one (backward compat)
5. Fix docstring example for sanitize_scope_name
6. Verify code comments match behavior

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Joao Moura <joao@crewai.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
alex-clawd
2026-03-23 22:56:10 -07:00
committed by GitHub
parent 949d7f1091
commit dd9ae02159
7 changed files with 1578 additions and 34 deletions

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from crewai.agents.parser import AgentFinish
from crewai.memory.utils import sanitize_scope_name
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import sanitize_tool_name
@@ -26,7 +27,12 @@ class CrewAgentExecutorMixin:
_printer: Printer = Printer()
def _save_to_memory(self, output: AgentFinish) -> None:
"""Save task result to unified memory (memory or crew._memory)."""
"""Save task result to unified memory (memory or crew._memory).
Extends the memory's root_scope with agent-specific path segment
(e.g., '/crew/research-crew/agent/researcher') so that agent memories
are scoped hierarchically under their crew.
"""
memory = getattr(self.agent, "memory", None) or (
getattr(self.crew, "_memory", None) if self.crew else None
)
@@ -43,6 +49,21 @@ class CrewAgentExecutorMixin:
)
extracted = memory.extract_memories(raw)
if extracted:
memory.remember_many(extracted, agent_role=self.agent.role)
# Get the memory's existing root_scope
base_root = getattr(memory, "root_scope", None)
if isinstance(base_root, str) and base_root:
# Memory has a root_scope — extend it with agent info
agent_role = self.agent.role or "unknown"
sanitized_role = sanitize_scope_name(agent_role)
agent_root = f"{base_root.rstrip('/')}/agent/{sanitized_role}"
if not agent_root.startswith("/"):
agent_root = "/" + agent_root
memory.remember_many(
extracted, agent_role=self.agent.role, root_scope=agent_root
)
else:
# No base root_scope — don't inject one, preserve backward compat
memory.remember_many(extracted, agent_role=self.agent.role)
except Exception as e:
self.agent._logger.log("error", f"Failed to save to memory: {e}")

View File

@@ -357,7 +357,18 @@ class Crew(FlowTrackable, BaseModel):
@model_validator(mode="after")
def create_crew_memory(self) -> Crew:
"""Initialize unified memory, respecting crew embedder config."""
"""Initialize unified memory, respecting crew embedder config.
When memory is enabled, sets a hierarchical root_scope based on the
crew name (e.g. '/crew/research-crew') so that all memories saved by
this crew and its agents are organized under a consistent namespace.
"""
from crewai.memory.utils import sanitize_scope_name
# Compute sanitized crew name for root_scope
crew_name = sanitize_scope_name(self.name or "crew")
crew_root_scope = f"/crew/{crew_name}"
if self.memory is True:
from crewai.memory.unified_memory import Memory
@@ -366,9 +377,10 @@ class Crew(FlowTrackable, BaseModel):
from crewai.rag.embeddings.factory import build_embedder
embedder = build_embedder(self.embedder) # type: ignore[arg-type]
self._memory = Memory(embedder=embedder)
self._memory = Memory(embedder=embedder, root_scope=crew_root_scope)
elif self.memory:
# User passed a Memory / MemoryScope / MemorySlice instance
# Respect user's configuration — don't auto-set root_scope
self._memory = self.memory
else:
self._memory = None

View File

@@ -905,7 +905,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Internal flows (RecallFlow, EncodingFlow) set _skip_auto_memory
# to avoid creating a wasteful standalone Memory instance.
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
self.memory = Memory()
from crewai.memory.utils import sanitize_scope_name
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
self.memory = Memory(root_scope=f"/flow/{flow_name}")
# Register all flow-related methods
for method_name in dir(self):

View File

@@ -28,6 +28,7 @@ from crewai.memory.analyze import (
analyze_for_save,
)
from crewai.memory.types import MemoryConfig, MemoryRecord, embed_texts
from crewai.memory.utils import join_scope_paths
logger = logging.getLogger(__name__)
@@ -48,6 +49,8 @@ class ItemState(BaseModel):
importance: float | None = None
source: str | None = None
private: bool = False
# Structural root scope prefix for hierarchical scoping
root_scope: str | None = None
# Resolved values
resolved_scope: str = "/"
resolved_categories: list[str] = Field(default_factory=list)
@@ -104,6 +107,14 @@ class EncodingFlow(Flow[EncodingState]):
embedder: Any,
config: MemoryConfig | None = None,
) -> None:
"""Initialize the encoding flow.
Args:
storage: Storage backend for persisting memories.
llm: LLM instance for analysis.
embedder: Embedder for generating vectors.
config: Optional memory configuration.
"""
super().__init__(suppress_flow_events=True)
self._storage = storage
self._llm = llm
@@ -180,10 +191,18 @@ class EncodingFlow(Flow[EncodingState]):
def _search_one(
item: ItemState,
) -> list[tuple[MemoryRecord, float]]:
scope_prefix = item.scope if item.scope and item.scope.strip("/") else None
# Use root_scope as the search boundary, then narrow by explicit scope if provided
effective_prefix = None
if item.root_scope:
effective_prefix = item.root_scope.rstrip("/")
if item.scope and item.scope.strip("/"):
effective_prefix = effective_prefix + "/" + item.scope.strip("/")
elif item.scope and item.scope.strip("/"):
effective_prefix = item.scope
return self._storage.search( # type: ignore[no-any-return]
item.embedding,
scope_prefix=scope_prefix,
scope_prefix=effective_prefix,
categories=None,
limit=self._config.consolidation_limit,
min_score=0.0,
@@ -253,9 +272,16 @@ class EncodingFlow(Flow[EncodingState]):
existing_scopes: list[str] = []
existing_categories: list[str] = []
if any_needs_fields:
existing_scopes = self._storage.list_scopes("/") or ["/"]
# Constrain scope/category suggestions to root_scope boundary
# Check if any active item has root_scope
active_root = next(
(it.root_scope for it in items if not it.dropped and it.root_scope),
None,
)
scope_search_root = active_root if active_root else "/"
existing_scopes = self._storage.list_scopes(scope_search_root) or ["/"]
existing_categories = list(
self._storage.list_categories(scope_prefix=None).keys()
self._storage.list_categories(scope_prefix=active_root).keys()
)
# Classify items and submit LLM calls
@@ -321,7 +347,13 @@ class EncodingFlow(Flow[EncodingState]):
for i, future in save_futures.items():
analysis = future.result()
item = items[i]
item.resolved_scope = item.scope or analysis.suggested_scope or "/"
# Determine inner scope from explicit scope or LLM-inferred
inner_scope = item.scope or analysis.suggested_scope or "/"
# Join root_scope with inner scope if root_scope is set
if item.root_scope:
item.resolved_scope = join_scope_paths(item.root_scope, inner_scope)
else:
item.resolved_scope = inner_scope
item.resolved_categories = (
item.categories
if item.categories is not None
@@ -353,8 +385,18 @@ class EncodingFlow(Flow[EncodingState]):
pool.shutdown(wait=False)
def _apply_defaults(self, item: ItemState) -> None:
"""Apply caller values with config defaults (fast path)."""
item.resolved_scope = item.scope or "/"
"""Apply caller values with config defaults (fast path).
If root_scope is set, prepends it to the inner scope to create the
final resolved_scope.
"""
inner_scope = item.scope or "/"
# Join root_scope with inner scope if root_scope is set
if item.root_scope:
item.resolved_scope = join_scope_paths(item.root_scope, inner_scope)
else:
item.resolved_scope = inner_scope if inner_scope != "/" else "/"
item.resolved_categories = item.categories or []
item.resolved_metadata = item.metadata or {}
item.resolved_importance = (

View File

@@ -31,6 +31,7 @@ from crewai.memory.types import (
compute_composite_score,
embed_text,
)
from crewai.memory.utils import join_scope_paths
from crewai.rag.embeddings.factory import build_embedder
from crewai.rag.embeddings.providers.openai.types import OpenAIProviderSpec
@@ -126,6 +127,14 @@ class Memory(BaseModel):
default=False,
description="If True, remember() and remember_many() are silent no-ops.",
)
root_scope: str | None = Field(
default=None,
description=(
"Structural root scope prefix. When set, LLM-inferred or explicit scopes "
"are nested under this root. For example, a crew with root_scope='/crew/research' "
"will store memories at '/crew/research/<inferred_scope>'."
),
)
_config: MemoryConfig = PrivateAttr()
_llm_instance: BaseLLM | None = PrivateAttr(default=None)
@@ -297,11 +306,26 @@ class Memory(BaseModel):
importance: float | None = None,
source: str | None = None,
private: bool = False,
root_scope: str | None = None,
) -> list[MemoryRecord]:
"""Run the batch EncodingFlow for one or more items. No event emission.
This is the core encoding logic shared by ``remember()`` and
``remember_many()``. Events are managed by the calling method.
Args:
contents: List of text content to encode and store.
scope: Optional explicit scope (inner scope, nested under root_scope).
categories: Optional categories for all items.
metadata: Optional metadata for all items.
importance: Optional importance score for all items.
source: Optional source identifier for all items.
private: Whether items are private.
root_scope: Structural root scope prefix. LLM-inferred or explicit
scopes are nested under this root.
Returns:
List of created MemoryRecord instances.
"""
from crewai.memory.encoding_flow import EncodingFlow
@@ -320,6 +344,7 @@ class Memory(BaseModel):
"importance": importance,
"source": source,
"private": private,
"root_scope": root_scope,
}
for c in contents
]
@@ -340,6 +365,7 @@ class Memory(BaseModel):
source: str | None = None,
private: bool = False,
agent_role: str | None = None,
root_scope: str | None = None,
) -> MemoryRecord | None:
"""Store a single item in memory (synchronous).
@@ -349,13 +375,15 @@ class Memory(BaseModel):
Args:
content: Text to remember.
scope: Optional scope path; inferred if None.
scope: Optional scope path (inner scope); inferred if None.
categories: Optional categories; inferred if None.
metadata: Optional metadata; merged with LLM-extracted if inferred.
importance: Optional importance 0-1; inferred if None.
source: Optional provenance identifier (e.g. user ID, session ID).
private: If True, only visible to recall from the same source.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope override. If provided, this overrides
the instance-level root_scope for this call only.
Returns:
The created MemoryRecord, or None if this memory is read-only.
@@ -365,6 +393,10 @@ class Memory(BaseModel):
"""
if self.read_only:
return None
# Determine effective root_scope: per-call override takes precedence
effective_root = root_scope if root_scope is not None else self.root_scope
_source_type = "unified_memory"
try:
crewai_event_bus.emit(
@@ -388,6 +420,7 @@ class Memory(BaseModel):
importance,
source,
private,
effective_root,
)
records = future.result()
record = records[0] if records else None
@@ -426,6 +459,7 @@ class Memory(BaseModel):
source: str | None = None,
private: bool = False,
agent_role: str | None = None,
root_scope: str | None = None,
) -> list[MemoryRecord]:
"""Store multiple items in memory (non-blocking).
@@ -440,13 +474,15 @@ class Memory(BaseModel):
Args:
contents: List of text items to remember.
scope: Optional scope applied to all items.
scope: Optional scope (inner scope) applied to all items.
categories: Optional categories applied to all items.
metadata: Optional metadata applied to all items.
importance: Optional importance applied to all items.
source: Optional provenance identifier applied to all items.
private: Privacy flag applied to all items.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope override. If provided, this overrides
the instance-level root_scope for this call only.
Returns:
Empty list (records are not available until the background save completes).
@@ -454,6 +490,9 @@ class Memory(BaseModel):
if not contents or self.read_only:
return []
# Determine effective root_scope: per-call override takes precedence
effective_root = root_scope if root_scope is not None else self.root_scope
self._submit_save(
self._background_encode_batch,
contents,
@@ -464,6 +503,7 @@ class Memory(BaseModel):
source,
private,
agent_role,
effective_root,
)
return []
@@ -477,6 +517,7 @@ class Memory(BaseModel):
source: str | None,
private: bool,
agent_role: str | None,
root_scope: str | None = None,
) -> list[MemoryRecord]:
"""Run the encoding pipeline in a background thread with event emission.
@@ -486,6 +527,20 @@ class Memory(BaseModel):
All ``emit`` calls are wrapped in try/except to handle the case where
the event bus shuts down before the background save finishes (e.g.
during process exit).
Args:
contents: List of text content to encode.
scope: Optional inner scope for all items.
categories: Optional categories for all items.
metadata: Optional metadata for all items.
importance: Optional importance for all items.
source: Optional source identifier for all items.
private: Whether items are private.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope prefix for hierarchical scoping.
Returns:
List of created MemoryRecord instances.
"""
try:
crewai_event_bus.emit(
@@ -502,7 +557,14 @@ class Memory(BaseModel):
try:
start = time.perf_counter()
records = self._encode_batch(
contents, scope, categories, metadata, importance, source, private
contents,
scope,
categories,
metadata,
importance,
source,
private,
root_scope,
)
elapsed_ms = (time.perf_counter() - start) * 1000
except RuntimeError:
@@ -575,6 +637,14 @@ class Memory(BaseModel):
# so that the search sees all persisted records.
self.drain_writes()
# Apply root_scope as default scope_prefix for read isolation
effective_scope = scope
if effective_scope is None and self.root_scope:
effective_scope = self.root_scope
elif effective_scope is not None and self.root_scope:
# Nest provided scope under root
effective_scope = join_scope_paths(self.root_scope, effective_scope)
_source = "unified_memory"
try:
crewai_event_bus.emit(
@@ -595,7 +665,7 @@ class Memory(BaseModel):
else:
raw = self._storage.search(
embedding,
scope_prefix=scope,
scope_prefix=effective_scope,
categories=categories,
limit=limit,
min_score=0.0,
@@ -630,7 +700,7 @@ class Memory(BaseModel):
flow.kickoff(
inputs={
"query": query,
"scope": scope,
"scope": effective_scope,
"categories": categories or [],
"limit": limit,
"source": source,
@@ -684,11 +754,24 @@ class Memory(BaseModel):
) -> int:
"""Delete memories matching criteria.
Args:
scope: Scope to delete from. If None and root_scope is set, deletes
only within root_scope.
categories: Filter by categories.
older_than: Delete records older than this datetime.
metadata_filter: Filter by metadata fields.
record_ids: Specific record IDs to delete.
Returns:
Number of records deleted.
"""
effective_scope = scope
if effective_scope is None and self.root_scope:
effective_scope = self.root_scope
elif effective_scope is not None and self.root_scope:
effective_scope = join_scope_paths(self.root_scope, effective_scope)
return self._storage.delete(
scope_prefix=scope,
scope_prefix=effective_scope,
categories=categories,
record_ids=record_ids,
older_than=older_than,
@@ -763,9 +846,21 @@ class Memory(BaseModel):
read_only=read_only,
)
def list_scopes(self, path: str = "/") -> list[str]:
"""List immediate child scopes under path."""
return self._storage.list_scopes(path)
def list_scopes(self, path: str | None = None) -> list[str]:
"""List immediate child scopes under path.
Args:
path: Scope path to list children of. If None and root_scope is set,
defaults to root_scope. Otherwise defaults to '/'.
"""
effective_path = path
if effective_path is None and self.root_scope:
effective_path = self.root_scope
elif effective_path is not None and self.root_scope:
effective_path = join_scope_paths(self.root_scope, effective_path)
elif effective_path is None:
effective_path = "/"
return self._storage.list_scopes(effective_path)
def list_records(
self, scope: str | None = None, limit: int = 200, offset: int = 0
@@ -773,20 +868,52 @@ class Memory(BaseModel):
"""List records in a scope, newest first.
Args:
scope: Optional scope path prefix to filter by.
scope: Optional scope path prefix to filter by. If None and root_scope
is set, defaults to root_scope.
limit: Maximum number of records to return.
offset: Number of records to skip (for pagination).
"""
effective_scope = scope
if effective_scope is None and self.root_scope:
effective_scope = self.root_scope
elif effective_scope is not None and self.root_scope:
effective_scope = join_scope_paths(self.root_scope, effective_scope)
return self._storage.list_records(
scope_prefix=scope, limit=limit, offset=offset
scope_prefix=effective_scope, limit=limit, offset=offset
)
def info(self, path: str = "/") -> ScopeInfo:
"""Return scope info for path."""
return self._storage.get_scope_info(path)
def info(self, path: str | None = None) -> ScopeInfo:
"""Return scope info for path.
Args:
path: Scope path to get info for. If None and root_scope is set,
defaults to root_scope. Otherwise defaults to '/'.
"""
effective_path = path
if effective_path is None and self.root_scope:
effective_path = self.root_scope
elif effective_path is not None and self.root_scope:
effective_path = join_scope_paths(self.root_scope, effective_path)
elif effective_path is None:
effective_path = "/"
return self._storage.get_scope_info(effective_path)
def tree(self, path: str | None = None, max_depth: int = 3) -> str:
"""Return a formatted tree of scopes (string).
Args:
path: Root path for the tree. If None and root_scope is set,
defaults to root_scope. Otherwise defaults to '/'.
max_depth: Maximum depth to traverse.
"""
effective_path = path
if effective_path is None and self.root_scope:
effective_path = self.root_scope
elif effective_path is not None and self.root_scope:
effective_path = join_scope_paths(self.root_scope, effective_path)
elif effective_path is None:
effective_path = "/"
def tree(self, path: str = "/", max_depth: int = 3) -> str:
"""Return a formatted tree of scopes (string)."""
lines: list[str] = []
def _walk(p: str, depth: int, prefix: str) -> None:
@@ -797,16 +924,36 @@ class Memory(BaseModel):
for child in info.child_scopes[:20]:
_walk(child, depth + 1, prefix + " ")
_walk(path.rstrip("/") or "/", 0, "")
return "\n".join(lines) if lines else f"{path or '/'} (0 records)"
_walk(effective_path.rstrip("/") or "/", 0, "")
return "\n".join(lines) if lines else f"{effective_path or '/'} (0 records)"
def list_categories(self, path: str | None = None) -> dict[str, int]:
"""List categories and counts; path=None means global."""
return self._storage.list_categories(scope_prefix=path)
"""List categories and counts.
Args:
path: Scope path to filter categories by. If None and root_scope is set,
defaults to root_scope.
"""
effective_path = path
if effective_path is None and self.root_scope:
effective_path = self.root_scope
elif effective_path is not None and self.root_scope:
effective_path = join_scope_paths(self.root_scope, effective_path)
return self._storage.list_categories(scope_prefix=effective_path)
def reset(self, scope: str | None = None) -> None:
"""Reset (delete all) memories in scope. None = all."""
self._storage.reset(scope_prefix=scope)
"""Reset (delete all) memories in scope.
Args:
scope: Scope to reset. If None and root_scope is set, resets only
within root_scope. If None and no root_scope, resets all.
"""
effective_scope = scope
if effective_scope is None and self.root_scope:
effective_scope = self.root_scope
elif effective_scope is not None and self.root_scope:
effective_scope = join_scope_paths(self.root_scope, effective_scope)
self._storage.reset(scope_prefix=effective_scope)
async def aextract_memories(self, content: str) -> list[str]:
"""Async variant of extract_memories."""

View File

@@ -0,0 +1,110 @@
"""Utility functions for the unified memory system."""
from __future__ import annotations
import re
def sanitize_scope_name(name: str) -> str:
"""Sanitize a name for use in hierarchical scope paths.
Converts to lowercase, replaces non-alphanumeric chars (except underscore
and hyphen) with hyphens, collapses multiple hyphens, strips leading/trailing
hyphens.
Args:
name: The raw name to sanitize (e.g. crew name, agent role, flow class name).
Returns:
A sanitized string safe for use in scope paths. Returns 'unknown' if the
result would be empty.
Examples:
>>> sanitize_scope_name("Research Crew")
'research-crew'
>>> sanitize_scope_name("Agent #1 (Main)")
'agent-1-main'
>>> sanitize_scope_name("café_worker")
'caf-_worker'
"""
if not name:
return "unknown"
name = name.lower().strip()
# Replace any character that's not alphanumeric, underscore, or hyphen with hyphen
name = re.sub(r"[^a-z0-9_-]", "-", name)
# Collapse multiple hyphens into one
name = re.sub(r"-+", "-", name)
# Strip leading/trailing hyphens
name = name.strip("-")
return name or "unknown"
def normalize_scope_path(path: str) -> str:
"""Normalize a scope path by removing double slashes and ensuring proper format.
Args:
path: The raw scope path (e.g. '/crew/MyCrewName//agent//role').
Returns:
A normalized path with leading slash, no trailing slash, no double slashes.
Returns '/' for empty or root-only paths.
Examples:
>>> normalize_scope_path("/crew/test//agent//")
'/crew/test/agent'
>>> normalize_scope_path("")
'/'
>>> normalize_scope_path("crew/test")
'/crew/test'
"""
if not path or path == "/":
return "/"
# Collapse multiple slashes
path = re.sub(r"/+", "/", path)
# Ensure leading slash
if not path.startswith("/"):
path = "/" + path
# Remove trailing slash (unless it's just '/')
if len(path) > 1:
path = path.rstrip("/")
return path
def join_scope_paths(root: str | None, inner: str | None) -> str:
"""Join a root scope with an inner scope, handling edge cases properly.
Args:
root: The root scope prefix (e.g. '/crew/research-crew').
inner: The inner scope (e.g. '/market-trends' or 'market-trends').
Returns:
The combined, normalized scope path.
Examples:
>>> join_scope_paths("/crew/test", "/market-trends")
'/crew/test/market-trends'
>>> join_scope_paths("/crew/test", "market-trends")
'/crew/test/market-trends'
>>> join_scope_paths("/crew/test", "/")
'/crew/test'
>>> join_scope_paths("/crew/test", None)
'/crew/test'
>>> join_scope_paths(None, "/market-trends")
'/market-trends'
>>> join_scope_paths(None, None)
'/'
"""
# Normalize both parts
root = root.rstrip("/") if root else ""
inner = inner.strip("/") if inner else ""
if root and inner:
result = f"{root}/{inner}"
elif root:
result = root
elif inner:
result = f"/{inner}"
else:
result = "/"
return normalize_scope_path(result)

File diff suppressed because it is too large Load Diff