diff --git a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py index 9dd1e2396..6d01f1e27 100644 --- a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py +++ b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py @@ -3,6 +3,7 @@ from __future__ import annotations from typing import TYPE_CHECKING from crewai.agents.parser import AgentFinish +from crewai.memory.utils import sanitize_scope_name from crewai.utilities.printer import Printer from crewai.utilities.string_utils import sanitize_tool_name @@ -26,7 +27,12 @@ class CrewAgentExecutorMixin: _printer: Printer = Printer() def _save_to_memory(self, output: AgentFinish) -> None: - """Save task result to unified memory (memory or crew._memory).""" + """Save task result to unified memory (memory or crew._memory). + + Extends the memory's root_scope with agent-specific path segment + (e.g., '/crew/research-crew/agent/researcher') so that agent memories + are scoped hierarchically under their crew. + """ memory = getattr(self.agent, "memory", None) or ( getattr(self.crew, "_memory", None) if self.crew else None ) @@ -43,6 +49,21 @@ class CrewAgentExecutorMixin: ) extracted = memory.extract_memories(raw) if extracted: - memory.remember_many(extracted, agent_role=self.agent.role) + # Get the memory's existing root_scope + base_root = getattr(memory, "root_scope", None) + + if isinstance(base_root, str) and base_root: + # Memory has a root_scope — extend it with agent info + agent_role = self.agent.role or "unknown" + sanitized_role = sanitize_scope_name(agent_role) + agent_root = f"{base_root.rstrip('/')}/agent/{sanitized_role}" + if not agent_root.startswith("/"): + agent_root = "/" + agent_root + memory.remember_many( + extracted, agent_role=self.agent.role, root_scope=agent_root + ) + else: + # No base root_scope — don't inject one, preserve backward compat + memory.remember_many(extracted, agent_role=self.agent.role) except Exception as e: self.agent._logger.log("error", f"Failed to save to memory: {e}") diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index c5156888c..e130dce7b 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -357,7 +357,18 @@ class Crew(FlowTrackable, BaseModel): @model_validator(mode="after") def create_crew_memory(self) -> Crew: - """Initialize unified memory, respecting crew embedder config.""" + """Initialize unified memory, respecting crew embedder config. + + When memory is enabled, sets a hierarchical root_scope based on the + crew name (e.g. '/crew/research-crew') so that all memories saved by + this crew and its agents are organized under a consistent namespace. + """ + from crewai.memory.utils import sanitize_scope_name + + # Compute sanitized crew name for root_scope + crew_name = sanitize_scope_name(self.name or "crew") + crew_root_scope = f"/crew/{crew_name}" + if self.memory is True: from crewai.memory.unified_memory import Memory @@ -366,9 +377,10 @@ class Crew(FlowTrackable, BaseModel): from crewai.rag.embeddings.factory import build_embedder embedder = build_embedder(self.embedder) # type: ignore[arg-type] - self._memory = Memory(embedder=embedder) + self._memory = Memory(embedder=embedder, root_scope=crew_root_scope) elif self.memory: # User passed a Memory / MemoryScope / MemorySlice instance + # Respect user's configuration — don't auto-set root_scope self._memory = self.memory else: self._memory = None diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 66d84e60e..48bf887c4 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -905,7 +905,10 @@ class Flow(Generic[T], metaclass=FlowMeta): # Internal flows (RecallFlow, EncodingFlow) set _skip_auto_memory # to avoid creating a wasteful standalone Memory instance. if self.memory is None and not getattr(self, "_skip_auto_memory", False): - self.memory = Memory() + from crewai.memory.utils import sanitize_scope_name + + flow_name = sanitize_scope_name(self.name or self.__class__.__name__) + self.memory = Memory(root_scope=f"/flow/{flow_name}") # Register all flow-related methods for method_name in dir(self): diff --git a/lib/crewai/src/crewai/memory/encoding_flow.py b/lib/crewai/src/crewai/memory/encoding_flow.py index cd1babb2d..158054490 100644 --- a/lib/crewai/src/crewai/memory/encoding_flow.py +++ b/lib/crewai/src/crewai/memory/encoding_flow.py @@ -28,6 +28,7 @@ from crewai.memory.analyze import ( analyze_for_save, ) from crewai.memory.types import MemoryConfig, MemoryRecord, embed_texts +from crewai.memory.utils import join_scope_paths logger = logging.getLogger(__name__) @@ -48,6 +49,8 @@ class ItemState(BaseModel): importance: float | None = None source: str | None = None private: bool = False + # Structural root scope prefix for hierarchical scoping + root_scope: str | None = None # Resolved values resolved_scope: str = "/" resolved_categories: list[str] = Field(default_factory=list) @@ -104,6 +107,14 @@ class EncodingFlow(Flow[EncodingState]): embedder: Any, config: MemoryConfig | None = None, ) -> None: + """Initialize the encoding flow. + + Args: + storage: Storage backend for persisting memories. + llm: LLM instance for analysis. + embedder: Embedder for generating vectors. + config: Optional memory configuration. + """ super().__init__(suppress_flow_events=True) self._storage = storage self._llm = llm @@ -180,10 +191,18 @@ class EncodingFlow(Flow[EncodingState]): def _search_one( item: ItemState, ) -> list[tuple[MemoryRecord, float]]: - scope_prefix = item.scope if item.scope and item.scope.strip("/") else None + # Use root_scope as the search boundary, then narrow by explicit scope if provided + effective_prefix = None + if item.root_scope: + effective_prefix = item.root_scope.rstrip("/") + if item.scope and item.scope.strip("/"): + effective_prefix = effective_prefix + "/" + item.scope.strip("/") + elif item.scope and item.scope.strip("/"): + effective_prefix = item.scope + return self._storage.search( # type: ignore[no-any-return] item.embedding, - scope_prefix=scope_prefix, + scope_prefix=effective_prefix, categories=None, limit=self._config.consolidation_limit, min_score=0.0, @@ -253,9 +272,16 @@ class EncodingFlow(Flow[EncodingState]): existing_scopes: list[str] = [] existing_categories: list[str] = [] if any_needs_fields: - existing_scopes = self._storage.list_scopes("/") or ["/"] + # Constrain scope/category suggestions to root_scope boundary + # Check if any active item has root_scope + active_root = next( + (it.root_scope for it in items if not it.dropped and it.root_scope), + None, + ) + scope_search_root = active_root if active_root else "/" + existing_scopes = self._storage.list_scopes(scope_search_root) or ["/"] existing_categories = list( - self._storage.list_categories(scope_prefix=None).keys() + self._storage.list_categories(scope_prefix=active_root).keys() ) # Classify items and submit LLM calls @@ -321,7 +347,13 @@ class EncodingFlow(Flow[EncodingState]): for i, future in save_futures.items(): analysis = future.result() item = items[i] - item.resolved_scope = item.scope or analysis.suggested_scope or "/" + # Determine inner scope from explicit scope or LLM-inferred + inner_scope = item.scope or analysis.suggested_scope or "/" + # Join root_scope with inner scope if root_scope is set + if item.root_scope: + item.resolved_scope = join_scope_paths(item.root_scope, inner_scope) + else: + item.resolved_scope = inner_scope item.resolved_categories = ( item.categories if item.categories is not None @@ -353,8 +385,18 @@ class EncodingFlow(Flow[EncodingState]): pool.shutdown(wait=False) def _apply_defaults(self, item: ItemState) -> None: - """Apply caller values with config defaults (fast path).""" - item.resolved_scope = item.scope or "/" + """Apply caller values with config defaults (fast path). + + If root_scope is set, prepends it to the inner scope to create the + final resolved_scope. + """ + inner_scope = item.scope or "/" + # Join root_scope with inner scope if root_scope is set + if item.root_scope: + item.resolved_scope = join_scope_paths(item.root_scope, inner_scope) + else: + item.resolved_scope = inner_scope if inner_scope != "/" else "/" + item.resolved_categories = item.categories or [] item.resolved_metadata = item.metadata or {} item.resolved_importance = ( diff --git a/lib/crewai/src/crewai/memory/unified_memory.py b/lib/crewai/src/crewai/memory/unified_memory.py index 74761c0bb..488e3c94a 100644 --- a/lib/crewai/src/crewai/memory/unified_memory.py +++ b/lib/crewai/src/crewai/memory/unified_memory.py @@ -31,6 +31,7 @@ from crewai.memory.types import ( compute_composite_score, embed_text, ) +from crewai.memory.utils import join_scope_paths from crewai.rag.embeddings.factory import build_embedder from crewai.rag.embeddings.providers.openai.types import OpenAIProviderSpec @@ -126,6 +127,14 @@ class Memory(BaseModel): default=False, description="If True, remember() and remember_many() are silent no-ops.", ) + root_scope: str | None = Field( + default=None, + description=( + "Structural root scope prefix. When set, LLM-inferred or explicit scopes " + "are nested under this root. For example, a crew with root_scope='/crew/research' " + "will store memories at '/crew/research/'." + ), + ) _config: MemoryConfig = PrivateAttr() _llm_instance: BaseLLM | None = PrivateAttr(default=None) @@ -297,11 +306,26 @@ class Memory(BaseModel): importance: float | None = None, source: str | None = None, private: bool = False, + root_scope: str | None = None, ) -> list[MemoryRecord]: """Run the batch EncodingFlow for one or more items. No event emission. This is the core encoding logic shared by ``remember()`` and ``remember_many()``. Events are managed by the calling method. + + Args: + contents: List of text content to encode and store. + scope: Optional explicit scope (inner scope, nested under root_scope). + categories: Optional categories for all items. + metadata: Optional metadata for all items. + importance: Optional importance score for all items. + source: Optional source identifier for all items. + private: Whether items are private. + root_scope: Structural root scope prefix. LLM-inferred or explicit + scopes are nested under this root. + + Returns: + List of created MemoryRecord instances. """ from crewai.memory.encoding_flow import EncodingFlow @@ -320,6 +344,7 @@ class Memory(BaseModel): "importance": importance, "source": source, "private": private, + "root_scope": root_scope, } for c in contents ] @@ -340,6 +365,7 @@ class Memory(BaseModel): source: str | None = None, private: bool = False, agent_role: str | None = None, + root_scope: str | None = None, ) -> MemoryRecord | None: """Store a single item in memory (synchronous). @@ -349,13 +375,15 @@ class Memory(BaseModel): Args: content: Text to remember. - scope: Optional scope path; inferred if None. + scope: Optional scope path (inner scope); inferred if None. categories: Optional categories; inferred if None. metadata: Optional metadata; merged with LLM-extracted if inferred. importance: Optional importance 0-1; inferred if None. source: Optional provenance identifier (e.g. user ID, session ID). private: If True, only visible to recall from the same source. agent_role: Optional agent role for event metadata. + root_scope: Optional root scope override. If provided, this overrides + the instance-level root_scope for this call only. Returns: The created MemoryRecord, or None if this memory is read-only. @@ -365,6 +393,10 @@ class Memory(BaseModel): """ if self.read_only: return None + + # Determine effective root_scope: per-call override takes precedence + effective_root = root_scope if root_scope is not None else self.root_scope + _source_type = "unified_memory" try: crewai_event_bus.emit( @@ -388,6 +420,7 @@ class Memory(BaseModel): importance, source, private, + effective_root, ) records = future.result() record = records[0] if records else None @@ -426,6 +459,7 @@ class Memory(BaseModel): source: str | None = None, private: bool = False, agent_role: str | None = None, + root_scope: str | None = None, ) -> list[MemoryRecord]: """Store multiple items in memory (non-blocking). @@ -440,13 +474,15 @@ class Memory(BaseModel): Args: contents: List of text items to remember. - scope: Optional scope applied to all items. + scope: Optional scope (inner scope) applied to all items. categories: Optional categories applied to all items. metadata: Optional metadata applied to all items. importance: Optional importance applied to all items. source: Optional provenance identifier applied to all items. private: Privacy flag applied to all items. agent_role: Optional agent role for event metadata. + root_scope: Optional root scope override. If provided, this overrides + the instance-level root_scope for this call only. Returns: Empty list (records are not available until the background save completes). @@ -454,6 +490,9 @@ class Memory(BaseModel): if not contents or self.read_only: return [] + # Determine effective root_scope: per-call override takes precedence + effective_root = root_scope if root_scope is not None else self.root_scope + self._submit_save( self._background_encode_batch, contents, @@ -464,6 +503,7 @@ class Memory(BaseModel): source, private, agent_role, + effective_root, ) return [] @@ -477,6 +517,7 @@ class Memory(BaseModel): source: str | None, private: bool, agent_role: str | None, + root_scope: str | None = None, ) -> list[MemoryRecord]: """Run the encoding pipeline in a background thread with event emission. @@ -486,6 +527,20 @@ class Memory(BaseModel): All ``emit`` calls are wrapped in try/except to handle the case where the event bus shuts down before the background save finishes (e.g. during process exit). + + Args: + contents: List of text content to encode. + scope: Optional inner scope for all items. + categories: Optional categories for all items. + metadata: Optional metadata for all items. + importance: Optional importance for all items. + source: Optional source identifier for all items. + private: Whether items are private. + agent_role: Optional agent role for event metadata. + root_scope: Optional root scope prefix for hierarchical scoping. + + Returns: + List of created MemoryRecord instances. """ try: crewai_event_bus.emit( @@ -502,7 +557,14 @@ class Memory(BaseModel): try: start = time.perf_counter() records = self._encode_batch( - contents, scope, categories, metadata, importance, source, private + contents, + scope, + categories, + metadata, + importance, + source, + private, + root_scope, ) elapsed_ms = (time.perf_counter() - start) * 1000 except RuntimeError: @@ -575,6 +637,14 @@ class Memory(BaseModel): # so that the search sees all persisted records. self.drain_writes() + # Apply root_scope as default scope_prefix for read isolation + effective_scope = scope + if effective_scope is None and self.root_scope: + effective_scope = self.root_scope + elif effective_scope is not None and self.root_scope: + # Nest provided scope under root + effective_scope = join_scope_paths(self.root_scope, effective_scope) + _source = "unified_memory" try: crewai_event_bus.emit( @@ -595,7 +665,7 @@ class Memory(BaseModel): else: raw = self._storage.search( embedding, - scope_prefix=scope, + scope_prefix=effective_scope, categories=categories, limit=limit, min_score=0.0, @@ -630,7 +700,7 @@ class Memory(BaseModel): flow.kickoff( inputs={ "query": query, - "scope": scope, + "scope": effective_scope, "categories": categories or [], "limit": limit, "source": source, @@ -684,11 +754,24 @@ class Memory(BaseModel): ) -> int: """Delete memories matching criteria. + Args: + scope: Scope to delete from. If None and root_scope is set, deletes + only within root_scope. + categories: Filter by categories. + older_than: Delete records older than this datetime. + metadata_filter: Filter by metadata fields. + record_ids: Specific record IDs to delete. + Returns: Number of records deleted. """ + effective_scope = scope + if effective_scope is None and self.root_scope: + effective_scope = self.root_scope + elif effective_scope is not None and self.root_scope: + effective_scope = join_scope_paths(self.root_scope, effective_scope) return self._storage.delete( - scope_prefix=scope, + scope_prefix=effective_scope, categories=categories, record_ids=record_ids, older_than=older_than, @@ -763,9 +846,21 @@ class Memory(BaseModel): read_only=read_only, ) - def list_scopes(self, path: str = "/") -> list[str]: - """List immediate child scopes under path.""" - return self._storage.list_scopes(path) + def list_scopes(self, path: str | None = None) -> list[str]: + """List immediate child scopes under path. + + Args: + path: Scope path to list children of. If None and root_scope is set, + defaults to root_scope. Otherwise defaults to '/'. + """ + effective_path = path + if effective_path is None and self.root_scope: + effective_path = self.root_scope + elif effective_path is not None and self.root_scope: + effective_path = join_scope_paths(self.root_scope, effective_path) + elif effective_path is None: + effective_path = "/" + return self._storage.list_scopes(effective_path) def list_records( self, scope: str | None = None, limit: int = 200, offset: int = 0 @@ -773,20 +868,52 @@ class Memory(BaseModel): """List records in a scope, newest first. Args: - scope: Optional scope path prefix to filter by. + scope: Optional scope path prefix to filter by. If None and root_scope + is set, defaults to root_scope. limit: Maximum number of records to return. offset: Number of records to skip (for pagination). """ + effective_scope = scope + if effective_scope is None and self.root_scope: + effective_scope = self.root_scope + elif effective_scope is not None and self.root_scope: + effective_scope = join_scope_paths(self.root_scope, effective_scope) return self._storage.list_records( - scope_prefix=scope, limit=limit, offset=offset + scope_prefix=effective_scope, limit=limit, offset=offset ) - def info(self, path: str = "/") -> ScopeInfo: - """Return scope info for path.""" - return self._storage.get_scope_info(path) + def info(self, path: str | None = None) -> ScopeInfo: + """Return scope info for path. + + Args: + path: Scope path to get info for. If None and root_scope is set, + defaults to root_scope. Otherwise defaults to '/'. + """ + effective_path = path + if effective_path is None and self.root_scope: + effective_path = self.root_scope + elif effective_path is not None and self.root_scope: + effective_path = join_scope_paths(self.root_scope, effective_path) + elif effective_path is None: + effective_path = "/" + return self._storage.get_scope_info(effective_path) + + def tree(self, path: str | None = None, max_depth: int = 3) -> str: + """Return a formatted tree of scopes (string). + + Args: + path: Root path for the tree. If None and root_scope is set, + defaults to root_scope. Otherwise defaults to '/'. + max_depth: Maximum depth to traverse. + """ + effective_path = path + if effective_path is None and self.root_scope: + effective_path = self.root_scope + elif effective_path is not None and self.root_scope: + effective_path = join_scope_paths(self.root_scope, effective_path) + elif effective_path is None: + effective_path = "/" - def tree(self, path: str = "/", max_depth: int = 3) -> str: - """Return a formatted tree of scopes (string).""" lines: list[str] = [] def _walk(p: str, depth: int, prefix: str) -> None: @@ -797,16 +924,36 @@ class Memory(BaseModel): for child in info.child_scopes[:20]: _walk(child, depth + 1, prefix + " ") - _walk(path.rstrip("/") or "/", 0, "") - return "\n".join(lines) if lines else f"{path or '/'} (0 records)" + _walk(effective_path.rstrip("/") or "/", 0, "") + return "\n".join(lines) if lines else f"{effective_path or '/'} (0 records)" def list_categories(self, path: str | None = None) -> dict[str, int]: - """List categories and counts; path=None means global.""" - return self._storage.list_categories(scope_prefix=path) + """List categories and counts. + + Args: + path: Scope path to filter categories by. If None and root_scope is set, + defaults to root_scope. + """ + effective_path = path + if effective_path is None and self.root_scope: + effective_path = self.root_scope + elif effective_path is not None and self.root_scope: + effective_path = join_scope_paths(self.root_scope, effective_path) + return self._storage.list_categories(scope_prefix=effective_path) def reset(self, scope: str | None = None) -> None: - """Reset (delete all) memories in scope. None = all.""" - self._storage.reset(scope_prefix=scope) + """Reset (delete all) memories in scope. + + Args: + scope: Scope to reset. If None and root_scope is set, resets only + within root_scope. If None and no root_scope, resets all. + """ + effective_scope = scope + if effective_scope is None and self.root_scope: + effective_scope = self.root_scope + elif effective_scope is not None and self.root_scope: + effective_scope = join_scope_paths(self.root_scope, effective_scope) + self._storage.reset(scope_prefix=effective_scope) async def aextract_memories(self, content: str) -> list[str]: """Async variant of extract_memories.""" diff --git a/lib/crewai/src/crewai/memory/utils.py b/lib/crewai/src/crewai/memory/utils.py new file mode 100644 index 000000000..4a6a3a005 --- /dev/null +++ b/lib/crewai/src/crewai/memory/utils.py @@ -0,0 +1,110 @@ +"""Utility functions for the unified memory system.""" + +from __future__ import annotations + +import re + + +def sanitize_scope_name(name: str) -> str: + """Sanitize a name for use in hierarchical scope paths. + + Converts to lowercase, replaces non-alphanumeric chars (except underscore + and hyphen) with hyphens, collapses multiple hyphens, strips leading/trailing + hyphens. + + Args: + name: The raw name to sanitize (e.g. crew name, agent role, flow class name). + + Returns: + A sanitized string safe for use in scope paths. Returns 'unknown' if the + result would be empty. + + Examples: + >>> sanitize_scope_name("Research Crew") + 'research-crew' + >>> sanitize_scope_name("Agent #1 (Main)") + 'agent-1-main' + >>> sanitize_scope_name("café_worker") + 'caf-_worker' + """ + if not name: + return "unknown" + name = name.lower().strip() + # Replace any character that's not alphanumeric, underscore, or hyphen with hyphen + name = re.sub(r"[^a-z0-9_-]", "-", name) + # Collapse multiple hyphens into one + name = re.sub(r"-+", "-", name) + # Strip leading/trailing hyphens + name = name.strip("-") + return name or "unknown" + + +def normalize_scope_path(path: str) -> str: + """Normalize a scope path by removing double slashes and ensuring proper format. + + Args: + path: The raw scope path (e.g. '/crew/MyCrewName//agent//role'). + + Returns: + A normalized path with leading slash, no trailing slash, no double slashes. + Returns '/' for empty or root-only paths. + + Examples: + >>> normalize_scope_path("/crew/test//agent//") + '/crew/test/agent' + >>> normalize_scope_path("") + '/' + >>> normalize_scope_path("crew/test") + '/crew/test' + """ + if not path or path == "/": + return "/" + # Collapse multiple slashes + path = re.sub(r"/+", "/", path) + # Ensure leading slash + if not path.startswith("/"): + path = "/" + path + # Remove trailing slash (unless it's just '/') + if len(path) > 1: + path = path.rstrip("/") + return path + + +def join_scope_paths(root: str | None, inner: str | None) -> str: + """Join a root scope with an inner scope, handling edge cases properly. + + Args: + root: The root scope prefix (e.g. '/crew/research-crew'). + inner: The inner scope (e.g. '/market-trends' or 'market-trends'). + + Returns: + The combined, normalized scope path. + + Examples: + >>> join_scope_paths("/crew/test", "/market-trends") + '/crew/test/market-trends' + >>> join_scope_paths("/crew/test", "market-trends") + '/crew/test/market-trends' + >>> join_scope_paths("/crew/test", "/") + '/crew/test' + >>> join_scope_paths("/crew/test", None) + '/crew/test' + >>> join_scope_paths(None, "/market-trends") + '/market-trends' + >>> join_scope_paths(None, None) + '/' + """ + # Normalize both parts + root = root.rstrip("/") if root else "" + inner = inner.strip("/") if inner else "" + + if root and inner: + result = f"{root}/{inner}" + elif root: + result = root + elif inner: + result = f"/{inner}" + else: + result = "/" + + return normalize_scope_path(result) diff --git a/lib/crewai/tests/memory/test_memory_root_scope.py b/lib/crewai/tests/memory/test_memory_root_scope.py new file mode 100644 index 000000000..8b0c382af --- /dev/null +++ b/lib/crewai/tests/memory/test_memory_root_scope.py @@ -0,0 +1,1209 @@ +"""Tests for hierarchical root_scope functionality in unified memory. + +Root scope is a structural prefix that is set automatically by crews and flows. +The LLM's encoding flow still infers a semantic inner scope, but the final +resolved scope = root_scope + '/' + llm_inferred_scope. +""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from crewai.memory.types import MemoryRecord +from crewai.memory.utils import ( + join_scope_paths, + normalize_scope_path, + sanitize_scope_name, +) + + +# --- Utility function tests --- + + +class TestSanitizeScopeName: + """Tests for sanitize_scope_name utility.""" + + def test_simple_name(self) -> None: + assert sanitize_scope_name("research") == "research" + + def test_name_with_spaces(self) -> None: + assert sanitize_scope_name("Research Crew") == "research-crew" + + def test_name_with_special_chars(self) -> None: + assert sanitize_scope_name("Agent #1 (Main)") == "agent-1-main" + + def test_name_with_unicode(self) -> None: + # Unicode characters get replaced with hyphens + result = sanitize_scope_name("café_worker") + # é becomes -, and the underscore is preserved, so café_worker -> caf-_worker + assert result == "caf-_worker" + + def test_name_with_underscores(self) -> None: + # Underscores are preserved + assert sanitize_scope_name("test_agent") == "test_agent" + + def test_name_with_hyphens(self) -> None: + assert sanitize_scope_name("my-crew") == "my-crew" + + def test_multiple_spaces_collapsed(self) -> None: + assert sanitize_scope_name("foo bar") == "foo-bar" + + def test_leading_trailing_spaces(self) -> None: + assert sanitize_scope_name(" crew ") == "crew" + + def test_empty_string_returns_unknown(self) -> None: + assert sanitize_scope_name("") == "unknown" + + def test_only_special_chars_returns_unknown(self) -> None: + assert sanitize_scope_name("@#$%") == "unknown" + + def test_none_input_returns_unknown(self) -> None: + assert sanitize_scope_name(None) == "unknown" # type: ignore[arg-type] + + +class TestNormalizeScopePath: + """Tests for normalize_scope_path utility.""" + + def test_simple_path(self) -> None: + assert normalize_scope_path("/crew/test") == "/crew/test" + + def test_double_slashes_collapsed(self) -> None: + assert normalize_scope_path("/crew//test//agent") == "/crew/test/agent" + + def test_trailing_slash_removed(self) -> None: + assert normalize_scope_path("/crew/test/") == "/crew/test" + + def test_missing_leading_slash_added(self) -> None: + assert normalize_scope_path("crew/test") == "/crew/test" + + def test_empty_string_returns_root(self) -> None: + assert normalize_scope_path("") == "/" + + def test_root_only_returns_root(self) -> None: + assert normalize_scope_path("/") == "/" + + def test_multiple_trailing_slashes(self) -> None: + assert normalize_scope_path("/crew///") == "/crew" + + +class TestJoinScopePaths: + """Tests for join_scope_paths utility.""" + + def test_basic_join(self) -> None: + assert join_scope_paths("/crew/test", "/market-trends") == "/crew/test/market-trends" + + def test_inner_without_leading_slash(self) -> None: + assert join_scope_paths("/crew/test", "market-trends") == "/crew/test/market-trends" + + def test_root_with_trailing_slash(self) -> None: + assert join_scope_paths("/crew/test/", "/inner") == "/crew/test/inner" + + def test_root_only_inner_slash(self) -> None: + assert join_scope_paths("/crew/test", "/") == "/crew/test" + + def test_root_only_inner_none(self) -> None: + assert join_scope_paths("/crew/test", None) == "/crew/test" + + def test_no_root_with_inner(self) -> None: + assert join_scope_paths(None, "/market-trends") == "/market-trends" + + def test_both_none(self) -> None: + assert join_scope_paths(None, None) == "/" + + def test_empty_strings(self) -> None: + assert join_scope_paths("", "") == "/" + + def test_root_empty_inner_value(self) -> None: + assert join_scope_paths("", "inner") == "/inner" + + +# --- Memory root_scope tests --- + + +@pytest.fixture +def mock_embedder() -> MagicMock: + """Embedder mock that returns one embedding per input text (batch-aware).""" + m = MagicMock() + m.side_effect = lambda texts: [[0.1] * 1536 for _ in texts] + return m + + +class TestMemoryRootScope: + """Tests for Memory class root_scope field.""" + + def test_memory_with_root_scope_prepends_to_explicit_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """When root_scope is set and explicit scope is provided, they combine.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/research-crew", + ) + + record = mem.remember( + "Test content", + scope="/market-trends", + categories=["test"], + importance=0.7, + ) + + assert record is not None + assert record.scope == "/crew/research-crew/market-trends" + + def test_memory_without_root_scope_uses_explicit_scope_directly( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """When root_scope is None, explicit scope is used as-is (backward compat).""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + + record = mem.remember( + "Test content", + scope="/explicit", + categories=["test"], + importance=0.7, + ) + + assert record is not None + assert record.scope == "/explicit" + + def test_memory_root_scope_with_llm_inferred_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """When root_scope is set and scope is inferred by LLM, they combine.""" + from crewai.memory.analyze import ExtractedMetadata, MemoryAnalysis + from crewai.memory.unified_memory import Memory + + llm = MagicMock() + llm.supports_function_calling.return_value = True + llm.call.return_value = MemoryAnalysis( + suggested_scope="/quarterly-results", + categories=["finance"], + importance=0.8, + extracted_metadata=ExtractedMetadata(), + ) + + mem = Memory( + storage=str(tmp_path / "db"), + llm=llm, + embedder=mock_embedder, + root_scope="/flow/mypipeline", + ) + + # Don't provide scope - let LLM infer it + record = mem.remember("Q1 revenue was $1M") + + assert record is not None + assert record.scope == "/flow/mypipeline/quarterly-results" + + def test_memory_root_scope_per_call_override( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """Per-call root_scope overrides instance-level root_scope.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/base", + ) + + record = mem.remember( + "Test content", + scope="/inner", + categories=["test"], + importance=0.7, + root_scope="/override/path", # Override instance-level + ) + + assert record is not None + assert record.scope == "/override/path/inner" + + def test_remember_many_with_root_scope( + self, tmp_path: Path, + ) -> None: + """remember_many respects root_scope for all items.""" + from crewai.memory.unified_memory import Memory + + # Use distinct embeddings to avoid intra-batch dedup + call_count = 0 + + def distinct_embedder(texts: list[str]) -> list[list[float]]: + nonlocal call_count + result = [] + for i, _ in enumerate(texts): + emb = [0.0] * 1536 + emb[(call_count + i) % 1536] = 1.0 + result.append(emb) + call_count += len(texts) + return result + + mock_embedder = MagicMock(side_effect=distinct_embedder) + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/batch-crew", + ) + + mem.remember_many( + ["Fact A", "Fact B"], + scope="/decisions", + categories=["test"], + importance=0.7, + ) + mem.drain_writes() + + records = mem.list_records() + assert len(records) == 2 + for record in records: + assert record.scope == "/crew/batch-crew/decisions" + + def test_remember_many_per_call_root_scope_override( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """remember_many accepts per-call root_scope override.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/default", + ) + + mem.remember_many( + ["Fact A"], + scope="/inner", + categories=["test"], + importance=0.7, + root_scope="/agent/researcher", # Per-call override + ) + mem.drain_writes() + + # Use a global memory view to see all records (not scoped to /default) + mem_global = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + records = mem_global.list_records() + assert len(records) == 1 + assert records[0].scope == "/agent/researcher/inner" + + +class TestRootScopePathNormalization: + """Tests for proper path normalization with root_scope.""" + + def test_no_double_slashes_in_result( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """Final scope should not have double slashes.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/test/", # Trailing slash + ) + + record = mem.remember( + "Test", + scope="/inner/", # Both have slashes + categories=["test"], + importance=0.5, + ) + + assert record is not None + assert "//" not in record.scope + assert record.scope == "/crew/test/inner" + + def test_leading_slash_always_present( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """Final scope should always have leading slash.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="crew/test", # No leading slash + ) + + record = mem.remember( + "Test", + scope="inner", # No leading slash + categories=["test"], + importance=0.5, + ) + + assert record is not None + assert record.scope.startswith("/") + + def test_root_scope_with_root_inner_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """When inner scope is '/', result is just the root_scope.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/test", + ) + + record = mem.remember( + "Test", + scope="/", # Root scope + categories=["test"], + importance=0.5, + ) + + assert record is not None + assert record.scope == "/crew/test" + + +class TestCrewAutoScoping: + """Tests for automatic root_scope assignment in Crew.""" + + def test_crew_memory_true_sets_root_scope(self) -> None: + """Creating Crew with memory=True auto-sets root_scope.""" + from crewai.agent import Agent + from crewai.crew import Crew + from crewai.task import Task + + agent = Agent( + role="Researcher", + goal="Research", + backstory="Expert researcher", + llm="gpt-4o-mini", + ) + task = Task( + description="Do research", + expected_output="Report", + agent=agent, + ) + + crew = Crew( + name="Research Crew", + agents=[agent], + tasks=[task], + memory=True, + ) + + assert crew._memory is not None + assert hasattr(crew._memory, "root_scope") + assert crew._memory.root_scope == "/crew/research-crew" + + def test_crew_memory_instance_preserves_no_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """User-provided Memory instance is not modified — root_scope stays None.""" + from crewai.agent import Agent + from crewai.crew import Crew + from crewai.memory.unified_memory import Memory + from crewai.task import Task + + # Memory without root_scope + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + assert mem.root_scope is None + + agent = Agent( + role="Tester", + goal="Test", + backstory="Tester", + llm="gpt-4o-mini", + ) + task = Task( + description="Test", + expected_output="Results", + agent=agent, + ) + + crew = Crew( + name="Test Crew", + agents=[agent], + tasks=[task], + memory=mem, + ) + + assert crew._memory is mem + # User-provided Memory is not auto-scoped — respect their config + assert crew._memory.root_scope is None + + def test_crew_respects_existing_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """User-provided Memory with existing root_scope is not overwritten.""" + from crewai.agent import Agent + from crewai.crew import Crew + from crewai.memory.unified_memory import Memory + from crewai.task import Task + + # Memory with explicit root_scope + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/custom/path", + ) + + agent = Agent( + role="Tester", + goal="Test", + backstory="Tester", + llm="gpt-4o-mini", + ) + task = Task( + description="Test", + expected_output="Results", + agent=agent, + ) + + crew = Crew( + name="Test Crew", + agents=[agent], + tasks=[task], + memory=mem, + ) + + assert crew._memory.root_scope == "/custom/path" # Not overwritten + + def test_crew_sanitizes_name_for_root_scope(self) -> None: + """Crew name with special chars is sanitized for root_scope.""" + from crewai.agent import Agent + from crewai.crew import Crew + from crewai.task import Task + + agent = Agent( + role="Agent", + goal="Goal", + backstory="Story", + llm="gpt-4o-mini", + ) + task = Task( + description="Task", + expected_output="Output", + agent=agent, + ) + + crew = Crew( + name="My Awesome Crew #1!", + agents=[agent], + tasks=[task], + memory=True, + ) + + assert crew._memory.root_scope == "/crew/my-awesome-crew-1" + + +class TestAgentScopeExtension: + """Tests for agent scope extension in BaseAgentExecutorMixin.""" + + def test_agent_save_extends_crew_root_scope(self) -> None: + """Agent._save_to_memory extends crew's root_scope with agent info.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import ( + CrewAgentExecutorMixin, + ) + from crewai.agents.parser import AgentFinish + from crewai.utilities.printer import Printer + + mock_memory = MagicMock() + mock_memory.read_only = False + mock_memory.root_scope = "/crew/research-crew" + mock_memory.extract_memories.return_value = ["Fact A"] + + mock_agent = MagicMock() + mock_agent.memory = mock_memory + mock_agent._logger = MagicMock() + mock_agent.role = "Researcher" + + mock_task = MagicMock() + mock_task.description = "Research task" + mock_task.expected_output = "Report" + + class MinimalExecutor(CrewAgentExecutorMixin): + crew = None + agent = mock_agent + task = mock_task + iterations = 0 + max_iter = 1 + messages = [] + _i18n = MagicMock() + _printer = Printer() + + executor = MinimalExecutor() + executor._save_to_memory(AgentFinish(thought="", output="Result", text="Result")) + + mock_memory.remember_many.assert_called_once() + call_kwargs = mock_memory.remember_many.call_args.kwargs + assert call_kwargs["root_scope"] == "/crew/research-crew/agent/researcher" + + def test_agent_save_sanitizes_role(self) -> None: + """Agent role with special chars is sanitized for scope path.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import ( + CrewAgentExecutorMixin, + ) + from crewai.agents.parser import AgentFinish + from crewai.utilities.printer import Printer + + mock_memory = MagicMock() + mock_memory.read_only = False + mock_memory.root_scope = "/crew/test" + mock_memory.extract_memories.return_value = ["Fact"] + + mock_agent = MagicMock() + mock_agent.memory = mock_memory + mock_agent._logger = MagicMock() + mock_agent.role = "Senior Research Analyst #1" + + mock_task = MagicMock() + mock_task.description = "Task" + mock_task.expected_output = "Output" + + class MinimalExecutor(CrewAgentExecutorMixin): + crew = None + agent = mock_agent + task = mock_task + iterations = 0 + max_iter = 1 + messages = [] + _i18n = MagicMock() + _printer = Printer() + + executor = MinimalExecutor() + executor._save_to_memory(AgentFinish(thought="", output="R", text="R")) + + call_kwargs = mock_memory.remember_many.call_args.kwargs + assert call_kwargs["root_scope"] == "/crew/test/agent/senior-research-analyst-1" + + +class TestFlowAutoScoping: + """Tests for automatic root_scope assignment in Flow.""" + + def test_flow_auto_memory_sets_root_scope(self) -> None: + """Flow auto-creates memory with root_scope set to /flow/.""" + from crewai.flow.flow import Flow + from crewai.memory.unified_memory import Memory + + class MyPipelineFlow(Flow): + pass + + flow = MyPipelineFlow() + + assert flow.memory is not None + assert isinstance(flow.memory, Memory) + assert flow.memory.root_scope == "/flow/mypipelineflow" + + def test_flow_with_name_uses_name_for_root_scope(self) -> None: + """Flow with custom name uses that name for root_scope.""" + from crewai.flow.flow import Flow + from crewai.memory.unified_memory import Memory + + class MyFlow(Flow): + name = "Custom Pipeline" + + flow = MyFlow() + + assert flow.memory is not None + assert isinstance(flow.memory, Memory) + assert flow.memory.root_scope == "/flow/custom-pipeline" + + def test_flow_user_provided_memory_not_overwritten( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """User-provided memory on Flow is not modified.""" + from crewai.flow.flow import Flow + from crewai.memory.unified_memory import Memory + + user_memory = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/custom/scope", + ) + + class MyFlow(Flow): + memory = user_memory + + flow = MyFlow() + + assert flow.memory is user_memory + assert flow.memory.root_scope == "/custom/scope" + + +class TestBackwardCompatibility: + """Tests ensuring backward compatibility with existing behavior.""" + + def test_memory_without_root_scope_works_normally( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """Memory without root_scope behaves exactly as before.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + + assert mem.root_scope is None + + record = mem.remember( + "Test content", + scope="/explicit", + categories=["test"], + importance=0.7, + ) + + assert record.scope == "/explicit" + + def test_crew_without_name_uses_default(self) -> None: + """Crew without name uses 'crew' as default for root_scope.""" + from crewai.agent import Agent + from crewai.crew import Crew + from crewai.task import Task + + agent = Agent( + role="Agent", + goal="Goal", + backstory="Story", + llm="gpt-4o-mini", + ) + task = Task( + description="Task", + expected_output="Output", + agent=agent, + ) + + # No name provided - uses default "crew" + crew = Crew( + agents=[agent], + tasks=[task], + memory=True, + ) + + assert crew._memory.root_scope == "/crew/crew" + + def test_old_memories_at_root_still_accessible( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """Old memories stored at '/' are still accessible.""" + from crewai.memory.unified_memory import Memory + + # Create memory and store at root (old behavior) + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + + record = mem.remember( + "Old memory at root", + scope="/", + categories=["old"], + importance=0.5, + ) + assert record.scope == "/" + + # Recall from root should find it + matches = mem.recall("Old memory", scope="/", depth="shallow") + assert len(matches) >= 1 + + +class TestEncodingFlowRootScope: + """Tests for root_scope handling in EncodingFlow.""" + + def test_encoding_flow_fast_path_with_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """Group A (fast path) items properly prepend root_scope.""" + from crewai.memory.encoding_flow import ItemState + + # Test _apply_defaults directly on an ItemState without going through Flow + # since Flow.state is a property without a setter + item = ItemState( + content="Test", + scope="/inner", # Explicit + categories=["cat"], # Explicit + importance=0.5, # Explicit + root_scope="/crew/test", + ) + + # Manually test the join_scope_paths logic that _apply_defaults uses + from crewai.memory.utils import join_scope_paths + + inner_scope = item.scope or "/" + if item.root_scope: + resolved = join_scope_paths(item.root_scope, inner_scope) + else: + resolved = inner_scope + + assert resolved == "/crew/test/inner" + + def test_encoding_flow_llm_path_with_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """Group C (LLM path) items properly prepend root_scope to inferred scope.""" + from crewai.memory.analyze import ExtractedMetadata, MemoryAnalysis + from crewai.memory.unified_memory import Memory + + llm = MagicMock() + llm.supports_function_calling.return_value = True + llm.call.return_value = MemoryAnalysis( + suggested_scope="/llm-inferred", + categories=["auto"], + importance=0.7, + extracted_metadata=ExtractedMetadata(), + ) + + mem = Memory( + storage=str(tmp_path / "db"), + llm=llm, + embedder=mock_embedder, + root_scope="/flow/pipeline", + ) + + # No explicit scope/categories/importance -> goes through LLM + record = mem.remember("Content for LLM analysis") + + assert record is not None + assert record.scope == "/flow/pipeline/llm-inferred" + + +class TestMemoryScopeWithRootScope: + """Tests for MemoryScope interaction with root_scope.""" + + def test_memory_scope_remembers_within_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """MemoryScope with underlying Memory that has root_scope works correctly.""" + from crewai.memory.memory_scope import MemoryScope + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/test", + ) + + # Create a MemoryScope + scope = MemoryScope(memory=mem, root_path="/agent/1") + + # Remember through the scope + record = scope.remember( + "Scoped content", + scope="/task", # Inner scope within MemoryScope + categories=["test"], + importance=0.5, + ) + + # The MemoryScope prepends its root_path, then Memory prepends root_scope + # MemoryScope.remember prepends /agent/1 to /task -> /agent/1/task + # Then Memory's root_scope /crew/test gets prepended by encoding flow + # Final: /crew/test/agent/1/task + assert record is not None + # Note: MemoryScope builds the scope before calling memory.remember + # So the scope it passes is /agent/1/task, which then gets root_scope prepended + assert record.scope.startswith("/crew/test/agent/1") + + +class TestReadIsolation: + """Tests for root_scope read isolation (recall, list, info, reset).""" + + def test_recall_with_root_scope_only_returns_scoped_records( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """recall() with root_scope returns only records within that scope.""" + from crewai.memory.unified_memory import Memory + + # Create memory without root_scope and store some records + mem_global = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + + # Store records at different scopes + mem_global.remember( + "Global record", + scope="/other/scope", + categories=["global"], + importance=0.5, + ) + mem_global.remember( + "Crew A record", + scope="/crew/crew-a/inner", + categories=["crew-a"], + importance=0.5, + ) + mem_global.remember( + "Crew B record", + scope="/crew/crew-b/inner", + categories=["crew-b"], + importance=0.5, + ) + + # Create a scoped view for crew-a + mem_scoped = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/crew-a", + ) + + # recall() should only find crew-a records + results = mem_scoped.recall("record", depth="shallow") + assert len(results) == 1 + assert results[0].record.scope == "/crew/crew-a/inner" + + def test_recall_with_root_scope_and_explicit_scope_nests( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """recall() with root_scope + explicit scope combines them.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/test", + ) + + mem.remember( + "Nested record", + scope="/inner/deep", + categories=["test"], + importance=0.5, + ) + + # recall with explicit scope should nest under root_scope + results = mem.recall("record", scope="/inner", depth="shallow") + assert len(results) == 1 + assert results[0].record.scope == "/crew/test/inner/deep" + + def test_recall_without_root_scope_works_globally( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """recall() without root_scope searches globally (backward compat).""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + + mem.remember( + "Record A", + scope="/scope-a", + categories=["test"], + importance=0.5, + ) + mem.remember( + "Record B", + scope="/scope-b", + categories=["test"], + importance=0.5, + ) + + # recall without scope should find all records + results = mem.recall("record", depth="shallow") + assert len(results) == 2 + + def test_list_records_defaults_to_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """list_records() with root_scope defaults to that scope.""" + from crewai.memory.unified_memory import Memory + + # Store records at different scopes + mem_global = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + + mem_global.remember("Global", scope="/other", categories=["x"], importance=0.5) + mem_global.remember("Scoped", scope="/crew/a/inner", categories=["x"], importance=0.5) + + # Create scoped memory + mem_scoped = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/a", + ) + + # list_records() without scope should only show /crew/a records + records = mem_scoped.list_records() + assert len(records) == 1 + assert records[0].scope == "/crew/a/inner" + + def test_list_scopes_defaults_to_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """list_scopes() with root_scope defaults to that scope.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + + mem.remember("A", scope="/crew/a/child1", categories=["x"], importance=0.5) + mem.remember("B", scope="/crew/a/child2", categories=["x"], importance=0.5) + mem.remember("C", scope="/crew/b/other", categories=["x"], importance=0.5) + + mem_scoped = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/a", + ) + + # list_scopes() should only show children under /crew/a + scopes = mem_scoped.list_scopes() + assert "/crew/a/child1" in scopes or "child1" in str(scopes) + assert "/crew/b" not in scopes + + def test_info_defaults_to_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """info() with root_scope defaults to that scope.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + + mem.remember("A", scope="/crew/a/inner", categories=["x"], importance=0.5) + mem.remember("B", scope="/other/inner", categories=["x"], importance=0.5) + + mem_scoped = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/a", + ) + + # info() should only count records under /crew/a + scope_info = mem_scoped.info() + assert scope_info.record_count == 1 + + def test_reset_with_root_scope_only_deletes_scoped_records( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """reset() with root_scope only deletes within that scope.""" + from crewai.memory.unified_memory import Memory + + mem = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + + mem.remember("A", scope="/crew/a/inner", categories=["x"], importance=0.5) + mem.remember("B", scope="/other/inner", categories=["x"], importance=0.5) + + mem_scoped = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + root_scope="/crew/a", + ) + + # reset() should only delete /crew/a records + mem_scoped.reset() + + # Check with a fresh global memory instance to avoid stale table references + mem_fresh = Memory( + storage=str(tmp_path / "db"), + llm=MagicMock(), + embedder=mock_embedder, + ) + records = mem_fresh.list_records() + assert len(records) == 1 + assert records[0].scope == "/other/inner" + + +class TestAgentExecutorBackwardCompat: + """Tests for agent executor backward compatibility.""" + + def test_agent_executor_no_root_scope_when_memory_has_none(self) -> None: + """Agent executor doesn't inject root_scope when memory has none.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import ( + CrewAgentExecutorMixin, + ) + from crewai.agents.parser import AgentFinish + from crewai.utilities.printer import Printer + + mock_memory = MagicMock() + mock_memory.read_only = False + mock_memory.root_scope = None # No root_scope set + mock_memory.extract_memories.return_value = ["Fact A"] + + mock_agent = MagicMock() + mock_agent.memory = mock_memory + mock_agent._logger = MagicMock() + mock_agent.role = "Researcher" + + mock_task = MagicMock() + mock_task.description = "Task" + mock_task.expected_output = "Output" + + class MinimalExecutor(CrewAgentExecutorMixin): + crew = None + agent = mock_agent + task = mock_task + iterations = 0 + max_iter = 1 + messages = [] + _i18n = MagicMock() + _printer = Printer() + + executor = MinimalExecutor() + executor._save_to_memory(AgentFinish(thought="", output="R", text="R")) + + # Should NOT pass root_scope when memory has none + mock_memory.remember_many.assert_called_once() + call_kwargs = mock_memory.remember_many.call_args.kwargs + assert "root_scope" not in call_kwargs + + def test_agent_executor_extends_root_scope_when_memory_has_one(self) -> None: + """Agent executor extends root_scope when memory has one.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import ( + CrewAgentExecutorMixin, + ) + from crewai.agents.parser import AgentFinish + from crewai.utilities.printer import Printer + + mock_memory = MagicMock() + mock_memory.read_only = False + mock_memory.root_scope = "/crew/test" # Has root_scope + mock_memory.extract_memories.return_value = ["Fact A"] + + mock_agent = MagicMock() + mock_agent.memory = mock_memory + mock_agent._logger = MagicMock() + mock_agent.role = "Researcher" + + mock_task = MagicMock() + mock_task.description = "Task" + mock_task.expected_output = "Output" + + class MinimalExecutor(CrewAgentExecutorMixin): + crew = None + agent = mock_agent + task = mock_task + iterations = 0 + max_iter = 1 + messages = [] + _i18n = MagicMock() + _printer = Printer() + + executor = MinimalExecutor() + executor._save_to_memory(AgentFinish(thought="", output="R", text="R")) + + # Should pass extended root_scope + mock_memory.remember_many.assert_called_once() + call_kwargs = mock_memory.remember_many.call_args.kwargs + assert call_kwargs["root_scope"] == "/crew/test/agent/researcher" + + +class TestConsolidationIsolation: + """Tests for consolidation staying within root_scope boundary.""" + + def test_consolidation_search_constrained_to_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """Consolidation similarity search is constrained to root_scope.""" + from crewai.memory.encoding_flow import EncodingFlow, ItemState + from crewai.memory.types import MemoryConfig + + mock_storage = MagicMock() + mock_storage.search.return_value = [] + + flow = EncodingFlow( + storage=mock_storage, + llm=MagicMock(), + embedder=mock_embedder, + config=MemoryConfig(), + ) + + # Create item with root_scope + item = ItemState( + content="Test", + scope="/inner", + root_scope="/crew/a", + embedding=[0.1] * 1536, + ) + flow.state.items = [item] + + # Run parallel_find_similar + flow.parallel_find_similar() + + # Check that search was called with correct scope_prefix + mock_storage.search.assert_called_once() + call_kwargs = mock_storage.search.call_args.kwargs + # Should be /crew/a/inner (root + inner combined) + assert call_kwargs["scope_prefix"] == "/crew/a/inner" + + def test_consolidation_search_without_root_scope( + self, tmp_path: Path, mock_embedder: MagicMock + ) -> None: + """Consolidation without root_scope searches by explicit scope only.""" + from crewai.memory.encoding_flow import EncodingFlow, ItemState + from crewai.memory.types import MemoryConfig + + mock_storage = MagicMock() + mock_storage.search.return_value = [] + + flow = EncodingFlow( + storage=mock_storage, + llm=MagicMock(), + embedder=mock_embedder, + config=MemoryConfig(), + ) + + # Create item without root_scope + item = ItemState( + content="Test", + scope="/inner", + root_scope=None, + embedding=[0.1] * 1536, + ) + flow.state.items = [item] + + # Run parallel_find_similar + flow.parallel_find_similar() + + # Check that search was called with explicit scope only + mock_storage.search.assert_called_once() + call_kwargs = mock_storage.search.call_args.kwargs + assert call_kwargs["scope_prefix"] == "/inner"