Compare commits

...

10 Commits

Author SHA1 Message Date
Rip&Tear
1ffbb3f76e Delete SLACK_SUMMARY.md 2026-03-10 00:10:43 +08:00
Rip&Tear
4f37f59dc4 Delete SECURITY_FIX_F001.md 2026-03-10 00:10:19 +08:00
Cursor Agent
9365b270e0 Add Slack summary for security fix
Co-authored-by: Rip&Tear <theCyberTech@users.noreply.github.com>
2026-03-09 14:32:32 +00:00
Cursor Agent
31ab821bb6 Add security fix documentation for F-001
Co-authored-by: Rip&Tear <theCyberTech@users.noreply.github.com>
2026-03-09 14:31:08 +00:00
Cursor Agent
6ee0cacd7c [SECURITY] Fix sandbox escape vulnerability in CodeInterpreterTool (F-001)
This commit addresses a critical security vulnerability where the CodeInterpreterTool
could be exploited via sandbox escape attacks when Docker was unavailable.

Changes:
- Remove insecure fallback to restricted sandbox in run_code_safety()
- Now fails closed with RuntimeError when Docker is unavailable
- Mark run_code_in_restricted_sandbox() as deprecated and insecure
- Add clear security warnings to SandboxPython class documentation
- Update tests to reflect secure-by-default behavior
- Add test demonstrating the sandbox escape vulnerability
- Update README with security requirements and best practices

The previous implementation would fall back to a Python-based 'restricted sandbox'
when Docker was unavailable. However, this sandbox could be easily bypassed using
Python object introspection to recover the original __import__ function, allowing
arbitrary module access and command execution on the host.

The fix enforces Docker as a requirement for safe code execution. Users who cannot
use Docker must explicitly enable unsafe_mode=True, acknowledging the security risks.

Security Impact:
- Prevents RCE via sandbox escape when Docker is unavailable
- Enforces fail-closed security model
- Maintains backward compatibility via unsafe_mode flag

References:
- https://docs.crewai.com/tools/ai-ml/codeinterpretertool

Co-authored-by: Rip&Tear <theCyberTech@users.noreply.github.com>
2026-03-09 14:30:14 +00:00
Lucas Gomide
adef605410 fix: add missing list/dict methods to LockedListProxy and LockedDictProxy
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
2026-03-09 09:38:35 -04:00
Greyson LaLonde
cd42bcf035 refactor(memory): convert memory classes to serializable
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* refactor(memory): convert Memory, MemoryScope, and MemorySlice to BaseModel

* fix(test): update mock memory attribute from _read_only to read_only

* fix: handle re-validation in wrap validators and patch BaseModel class in tests
2026-03-08 23:08:10 -04:00
Greyson LaLonde
bc45a7fbe3 feat: create action for nightly releases
Some checks failed
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2026-03-06 18:32:52 -05:00
Matt Aitchison
87759cdb14 fix(deps): bump gitpython to >=3.1.41 to resolve CVE path traversal vulnerability (#4740)
Some checks failed
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
GitPython ==3.1.38 is affected by a high-severity path traversal
vulnerability (dependabot alert #1). Bump to >=3.1.41,<4 which
includes the fix.
2026-03-05 12:41:24 -06:00
Tiago Freire
059cb93aeb fix(executor): propagate contextvars context to parallel tool call threads
ThreadPoolExecutor threads do not inherit the calling thread's contextvars
context, causing _event_id_stack and _current_celery_task_id to be empty
in worker threads. This broke OTel span parenting for parallel tool calls
(missing parent_event_id) and lost the Celery task ID in the enterprise
tracking layer ([Task ID: no-task]).

Fix by capturing an independent context copy per submission via
contextvars.copy_context().run in CrewAgentExecutor._handle_native_tool_calls,
so each worker thread starts with the correct inherited context without
sharing mutable state across threads.
2026-03-05 08:20:09 -05:00
18 changed files with 722 additions and 237 deletions

127
.github/workflows/nightly.yml vendored Normal file
View File

@@ -0,0 +1,127 @@
name: Nightly Canary Release
on:
schedule:
- cron: '0 6 * * *' # daily at 6am UTC
workflow_dispatch:
jobs:
check:
name: Check for new commits
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
has_changes: ${{ steps.check.outputs.has_changes }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Check for commits in last 24h
id: check
run: |
RECENT=$(git log --since="24 hours ago" --oneline | head -1)
if [ -n "$RECENT" ]; then
echo "has_changes=true" >> "$GITHUB_OUTPUT"
else
echo "has_changes=false" >> "$GITHUB_OUTPUT"
fi
build:
name: Build nightly packages
needs: check
if: needs.check.outputs.has_changes == 'true' || github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Stamp nightly versions
run: |
DATE=$(date +%Y%m%d)
for init_file in \
lib/crewai/src/crewai/__init__.py \
lib/crewai-tools/src/crewai_tools/__init__.py \
lib/crewai-files/src/crewai_files/__init__.py; do
CURRENT=$(python -c "
import re
text = open('$init_file').read()
print(re.search(r'__version__\s*=\s*\"(.*?)\"\s*$', text, re.MULTILINE).group(1))
")
NIGHTLY="${CURRENT}.dev${DATE}"
sed -i "s/__version__ = .*/__version__ = \"${NIGHTLY}\"/" "$init_file"
echo "$init_file: $CURRENT -> $NIGHTLY"
done
# Update cross-package dependency pins to nightly versions
sed -i "s/\"crewai-tools==[^\"]*\"/\"crewai-tools==${NIGHTLY}\"/" lib/crewai/pyproject.toml
sed -i "s/\"crewai==[^\"]*\"/\"crewai==${NIGHTLY}\"/" lib/crewai-tools/pyproject.toml
echo "Updated cross-package dependency pins to ${NIGHTLY}"
- name: Build packages
run: |
uv build --all-packages
rm dist/.gitignore
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: dist
path: dist/
publish:
name: Publish nightly to PyPI
needs: build
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/crewai
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
version: "0.8.4"
python-version: "3.12"
enable-cache: false
- name: Download artifacts
uses: actions/download-artifact@v4
with:
name: dist
path: dist
- name: Publish to PyPI
env:
UV_PUBLISH_TOKEN: ${{ secrets.PYPI_API_TOKEN }}
run: |
failed=0
for package in dist/*; do
if [[ "$package" == *"crewai_devtools"* ]]; then
echo "Skipping private package: $package"
continue
fi
echo "Publishing $package"
if ! uv publish "$package"; then
echo "Failed to publish $package"
failed=1
fi
done
if [ $failed -eq 1 ]; then
echo "Some packages failed to publish"
exit 1
fi

View File

@@ -108,7 +108,7 @@ stagehand = [
"stagehand>=0.4.1",
]
github = [
"gitpython==3.1.38",
"gitpython>=3.1.41,<4",
"PyGithub==1.59.1",
]
rag = [

View File

@@ -1,13 +1,27 @@
# CodeInterpreterTool
## Description
This tool is used to give the Agent the ability to run code (Python3) from the code generated by the Agent itself. The code is executed in a sandboxed environment, so it is safe to run any code.
This tool is used to give the Agent the ability to run code (Python3) from the code generated by the Agent itself. The code is executed in a Docker container for secure isolation.
It is incredible useful since it allows the Agent to generate code, run it in the same environment, get the result and use it to make decisions.
It is incredibly useful since it allows the Agent to generate code, run it in an isolated environment, get the result and use it to make decisions.
## ⚠️ Security Requirements
**Docker is REQUIRED** for safe code execution. The tool will refuse to execute code without Docker to prevent security vulnerabilities.
### Why Docker is Required
Previous versions included a "restricted sandbox" fallback when Docker was unavailable. This has been **removed** due to critical security vulnerabilities:
- The Python-based sandbox could be escaped via object introspection
- Attackers could recover the original `__import__` function and access any module
- This allowed arbitrary command execution on the host system
**Docker provides real process isolation** and is the only secure way to execute untrusted code.
## Requirements
- Docker
- **Docker (REQUIRED)** - Install from [docker.com](https://docs.docker.com/get-docker/)
## Installation
Install the crewai_tools package
@@ -17,7 +31,9 @@ pip install 'crewai[tools]'
## Example
Remember that when using this tool, the code must be generated by the Agent itself. The code must be a Python3 code. And it will take some time for the first time to run because it needs to build the Docker image.
Remember that when using this tool, the code must be generated by the Agent itself. The code must be Python3 code. It will take some time the first time to run because it needs to build the Docker image.
### Basic Usage (Docker Container - Recommended)
```python
from crewai_tools import CodeInterpreterTool
@@ -28,7 +44,9 @@ Agent(
)
```
Or if you need to pass your own Dockerfile just do this
### Custom Dockerfile
If you need to pass your own Dockerfile:
```python
from crewai_tools import CodeInterpreterTool
@@ -39,15 +57,39 @@ Agent(
)
```
If it is difficult to connect to docker daemon automatically (especially for macOS users), you can do this to setup docker host manually
### Manual Docker Host Configuration
If it is difficult to connect to the Docker daemon automatically (especially for macOS users), you can set up the Docker host manually:
```python
from crewai_tools import CodeInterpreterTool
Agent(
...
tools=[CodeInterpreterTool(user_docker_base_url="<Docker Host Base Url>",
user_dockerfile_path="<Dockerfile_path>")],
tools=[CodeInterpreterTool(
user_docker_base_url="<Docker Host Base Url>",
user_dockerfile_path="<Dockerfile_path>"
)],
)
```
### Unsafe Mode (NOT RECOMMENDED)
If you absolutely cannot use Docker and **fully trust the code source**, you can use unsafe mode:
```python
from crewai_tools import CodeInterpreterTool
# WARNING: Only use with fully trusted code!
Agent(
...
tools=[CodeInterpreterTool(unsafe_mode=True)],
)
```
**⚠️ SECURITY WARNING:** `unsafe_mode=True` executes code directly on the host without any isolation. Only use this if:
- You completely trust the code being executed
- You understand the security risks
- You cannot install Docker in your environment
For production use, **always use Docker** (the default mode).

View File

@@ -50,11 +50,16 @@ class CodeInterpreterSchema(BaseModel):
class SandboxPython:
"""A restricted Python execution environment for running code safely.
"""INSECURE: A restricted Python execution environment with known vulnerabilities.
This class provides methods to safely execute Python code by restricting access to
potentially dangerous modules and built-in functions. It creates a sandboxed
environment where harmful operations are blocked.
WARNING: This class does NOT provide real security isolation and is vulnerable to
sandbox escape attacks via Python object introspection. Attackers can recover the
original __import__ function and bypass all restrictions.
DO NOT USE for untrusted code execution. Use Docker containers instead.
This class attempts to restrict access to dangerous modules and built-in functions
but provides no real security boundary against a motivated attacker.
"""
BLOCKED_MODULES: ClassVar[set[str]] = {
@@ -299,8 +304,8 @@ class CodeInterpreterTool(BaseTool):
def run_code_safety(self, code: str, libraries_used: list[str]) -> str:
"""Runs code in the safest available environment.
Attempts to run code in Docker if available, falls back to a restricted
sandbox if Docker is not available.
Requires Docker to be available for secure code execution. Fails closed
if Docker is not available to prevent sandbox escape vulnerabilities.
Args:
code: The Python code to execute as a string.
@@ -308,10 +313,24 @@ class CodeInterpreterTool(BaseTool):
Returns:
The output of the executed code as a string.
Raises:
RuntimeError: If Docker is not available, as the restricted sandbox
is vulnerable to escape attacks and should not be used
for untrusted code execution.
"""
if self._check_docker_available():
return self.run_code_in_docker(code, libraries_used)
return self.run_code_in_restricted_sandbox(code)
error_msg = (
"Docker is required for safe code execution but is not available. "
"The restricted sandbox fallback has been removed due to security vulnerabilities "
"that allow sandbox escape via Python object introspection. "
"Please install Docker (https://docs.docker.com/get-docker/) or use unsafe_mode=True "
"if you trust the code source and understand the security risks."
)
Printer.print(error_msg, color="bold_red")
raise RuntimeError(error_msg)
def run_code_in_docker(self, code: str, libraries_used: list[str]) -> str:
"""Runs Python code in a Docker container for safe isolation.
@@ -342,10 +361,19 @@ class CodeInterpreterTool(BaseTool):
@staticmethod
def run_code_in_restricted_sandbox(code: str) -> str:
"""Runs Python code in a restricted sandbox environment.
"""DEPRECATED AND INSECURE: Runs Python code in a restricted sandbox environment.
Executes the code with restricted access to potentially dangerous modules and
built-in functions for basic safety when Docker is not available.
WARNING: This method is vulnerable to sandbox escape attacks via Python object
introspection and should NOT be used for untrusted code execution. It has been
deprecated and is only kept for backward compatibility with trusted code.
The "restricted" environment can be bypassed by attackers who can:
- Use object graph introspection to recover the original __import__ function
- Access any Python module including os, subprocess, sys, etc.
- Execute arbitrary commands on the host system
Use run_code_in_docker() for secure code execution, or run_code_unsafe()
if you explicitly acknowledge the security risks.
Args:
code: The Python code to execute as a string.
@@ -354,7 +382,10 @@ class CodeInterpreterTool(BaseTool):
The value of the 'result' variable from the executed code,
or an error message if execution failed.
"""
Printer.print("Running code in restricted sandbox", color="yellow")
Printer.print(
"WARNING: Running code in INSECURE restricted sandbox (vulnerable to escape attacks)",
color="bold_red"
)
exec_locals: dict[str, Any] = {}
try:
SandboxPython.exec(code=code, locals_=exec_locals)

View File

@@ -76,24 +76,24 @@ print("This is line 2")"""
)
def test_restricted_sandbox_basic_code_execution(printer_mock, docker_unavailable_mock):
"""Test basic code execution."""
def test_docker_unavailable_raises_error(printer_mock, docker_unavailable_mock):
"""Test that execution fails when Docker is unavailable in safe mode."""
tool = CodeInterpreterTool()
code = """
result = 2 + 2
print(result)
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
assert result == 4
with pytest.raises(RuntimeError) as exc_info:
tool.run(code=code, libraries_used=[])
assert "Docker is required for safe code execution" in str(exc_info.value)
assert "sandbox escape" in str(exc_info.value)
def test_restricted_sandbox_running_with_blocked_modules(
printer_mock, docker_unavailable_mock
):
"""Test that restricted modules cannot be imported."""
"""Test that restricted modules cannot be imported when using the deprecated sandbox directly."""
tool = CodeInterpreterTool()
restricted_modules = SandboxPython.BLOCKED_MODULES
@@ -102,18 +102,17 @@ def test_restricted_sandbox_running_with_blocked_modules(
import {module}
result = "Import succeeded"
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
# Note: run_code_in_restricted_sandbox is deprecated and insecure
# This test verifies the old behavior but should not be used in production
result = tool.run_code_in_restricted_sandbox(code)
assert f"An error occurred: Importing '{module}' is not allowed" in result
def test_restricted_sandbox_running_with_blocked_builtins(
printer_mock, docker_unavailable_mock
):
"""Test that restricted builtins are not available."""
"""Test that restricted builtins are not available when using the deprecated sandbox directly."""
tool = CodeInterpreterTool()
restricted_builtins = SandboxPython.UNSAFE_BUILTINS
@@ -122,25 +121,23 @@ def test_restricted_sandbox_running_with_blocked_builtins(
{builtin}("test")
result = "Builtin available"
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
# Note: run_code_in_restricted_sandbox is deprecated and insecure
# This test verifies the old behavior but should not be used in production
result = tool.run_code_in_restricted_sandbox(code)
assert f"An error occurred: name '{builtin}' is not defined" in result
def test_restricted_sandbox_running_with_no_result_variable(
printer_mock, docker_unavailable_mock
):
"""Test behavior when no result variable is set."""
"""Test behavior when no result variable is set in deprecated sandbox."""
tool = CodeInterpreterTool()
code = """
x = 10
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
# Note: run_code_in_restricted_sandbox is deprecated and insecure
# This test verifies the old behavior but should not be used in production
result = tool.run_code_in_restricted_sandbox(code)
assert result == "No result variable found."
@@ -172,3 +169,40 @@ result = eval("5/1")
"WARNING: Running code in unsafe mode", color="bold_magenta"
)
assert 5.0 == result
def test_sandbox_escape_vulnerability_demonstration(printer_mock):
"""Demonstrate that the restricted sandbox is vulnerable to escape attacks.
This test shows that an attacker can use Python object introspection to bypass
the restricted sandbox and access blocked modules like 'os'. This is why the
sandbox should never be used for untrusted code execution.
NOTE: This test uses the deprecated run_code_in_restricted_sandbox directly
to demonstrate the vulnerability. In production, Docker is now required.
"""
tool = CodeInterpreterTool()
# Classic Python sandbox escape via object introspection
escape_code = """
# Recover the real __import__ function via object introspection
for cls in ().__class__.__bases__[0].__subclasses__():
if cls.__name__ == 'catch_warnings':
# Get the real builtins module
real_builtins = cls()._module.__builtins__
real_import = real_builtins['__import__']
# Now we can import os and execute commands
os = real_import('os')
# Demonstrate we have escaped the sandbox
result = "SANDBOX_ESCAPED" if hasattr(os, 'system') else "FAILED"
break
"""
# The deprecated sandbox is vulnerable to this attack
result = tool.run_code_in_restricted_sandbox(escape_code)
# This demonstrates the vulnerability - the attacker can escape
assert result == "SANDBOX_ESCAPED", (
"The restricted sandbox was bypassed via object introspection. "
"This is why Docker is now required for safe code execution."
)

View File

@@ -30,12 +30,9 @@ class CrewAgentExecutorMixin:
memory = getattr(self.agent, "memory", None) or (
getattr(self.crew, "_memory", None) if self.crew else None
)
if memory is None or not self.task or getattr(memory, "_read_only", False):
if memory is None or not self.task or memory.read_only:
return
if (
f"Action: {sanitize_tool_name('Delegate work to coworker')}"
in output.text
):
if f"Action: {sanitize_tool_name('Delegate work to coworker')}" in output.text:
return
try:
raw = (
@@ -48,6 +45,4 @@ class CrewAgentExecutorMixin:
if extracted:
memory.remember_many(extracted, agent_role=self.agent.role)
except Exception as e:
self.agent._logger.log(
"error", f"Failed to save to memory: {e}"
)
self.agent._logger.log("error", f"Failed to save to memory: {e}")

View File

@@ -8,6 +8,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable
import contextvars
from concurrent.futures import ThreadPoolExecutor, as_completed
import inspect
import logging
@@ -755,6 +756,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(
contextvars.copy_context().run,
self._execute_single_native_tool_call,
call_id=call_id,
func_name=func_name,

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import contextvars
from collections.abc import Callable, Coroutine
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
@@ -728,7 +729,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
max_workers = min(8, len(runnable_tool_calls))
with ThreadPoolExecutor(max_workers=max_workers) as pool:
future_to_idx = {
pool.submit(self._execute_single_native_tool_call, tool_call): idx
pool.submit(contextvars.copy_context().run, self._execute_single_native_tool_call, tool_call): idx
for idx, tool_call in enumerate(runnable_tool_calls)
}
ordered_results: list[dict[str, Any] | None] = [None] * len(

View File

@@ -497,6 +497,50 @@ class LockedListProxy(list, Generic[T]): # type: ignore[type-arg]
def __bool__(self) -> bool:
return bool(self._list)
def index(self, value: T, start: SupportsIndex = 0, stop: SupportsIndex | None = None) -> int: # type: ignore[override]
if stop is None:
return self._list.index(value, start)
return self._list.index(value, start, stop)
def count(self, value: T) -> int:
return self._list.count(value)
def sort(self, *, key: Any = None, reverse: bool = False) -> None:
with self._lock:
self._list.sort(key=key, reverse=reverse)
def reverse(self) -> None:
with self._lock:
self._list.reverse()
def copy(self) -> list[T]:
return self._list.copy()
def __add__(self, other: list[T]) -> list[T]:
return self._list + other
def __radd__(self, other: list[T]) -> list[T]:
return other + self._list
def __iadd__(self, other: Iterable[T]) -> LockedListProxy[T]:
with self._lock:
self._list += list(other)
return self
def __mul__(self, n: SupportsIndex) -> list[T]:
return self._list * n
def __rmul__(self, n: SupportsIndex) -> list[T]:
return self._list * n
def __imul__(self, n: SupportsIndex) -> LockedListProxy[T]:
with self._lock:
self._list *= n
return self
def __reversed__(self) -> Iterator[T]:
return reversed(self._list)
def __eq__(self, other: object) -> bool:
"""Compare based on the underlying list contents."""
if isinstance(other, LockedListProxy):
@@ -579,6 +623,23 @@ class LockedDictProxy(dict, Generic[T]): # type: ignore[type-arg]
def __bool__(self) -> bool:
return bool(self._dict)
def copy(self) -> dict[str, T]:
return self._dict.copy()
def __or__(self, other: dict[str, T]) -> dict[str, T]:
return self._dict | other
def __ror__(self, other: dict[str, T]) -> dict[str, T]:
return other | self._dict
def __ior__(self, other: dict[str, T]) -> LockedDictProxy[T]:
with self._lock:
self._dict |= other
return self
def __reversed__(self) -> Iterator[str]:
return reversed(self._dict)
def __eq__(self, other: object) -> bool:
"""Compare based on the underlying dict contents."""
if isinstance(other, LockedDictProxy):
@@ -620,6 +681,10 @@ class StateProxy(Generic[T]):
if name in ("_proxy_state", "_proxy_lock"):
object.__setattr__(self, name, value)
else:
if isinstance(value, LockedListProxy):
value = value._list
elif isinstance(value, LockedDictProxy):
value = value._dict
with object.__getattribute__(self, "_proxy_lock"):
setattr(object.__getattribute__(self, "_proxy_state"), name, value)

View File

@@ -600,7 +600,7 @@ class LiteAgent(FlowTrackable, BaseModel):
def _save_to_memory(self, output_text: str) -> None:
"""Extract discrete memories from the run and remember each. No-op if _memory is None or read-only."""
if self._memory is None or getattr(self._memory, "_read_only", False):
if self._memory is None or self._memory.read_only:
return
input_str = self._get_last_user_content() or "User request"
try:

View File

@@ -3,11 +3,9 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any
from typing import Any, Literal
if TYPE_CHECKING:
from crewai.memory.unified_memory import Memory
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator
from crewai.memory.types import (
_RECALL_OVERSAMPLE_FACTOR,
@@ -15,22 +13,38 @@ from crewai.memory.types import (
MemoryRecord,
ScopeInfo,
)
from crewai.memory.unified_memory import Memory
class MemoryScope:
class MemoryScope(BaseModel):
"""View of Memory restricted to a root path. All operations are scoped under that path."""
def __init__(self, memory: Memory, root_path: str) -> None:
"""Initialize scope.
model_config = ConfigDict(arbitrary_types_allowed=True)
Args:
memory: The underlying Memory instance.
root_path: Root path for this scope (e.g. /agent/1).
"""
self._memory = memory
self._root = root_path.rstrip("/") or ""
if self._root and not self._root.startswith("/"):
self._root = "/" + self._root
root_path: str = Field(default="/")
_memory: Memory = PrivateAttr()
_root: str = PrivateAttr()
@model_validator(mode="wrap")
@classmethod
def _accept_memory(cls, data: Any, handler: Any) -> MemoryScope:
"""Extract memory dependency and normalize root path before validation."""
if isinstance(data, MemoryScope):
return data
memory = data.pop("memory")
instance: MemoryScope = handler(data)
instance._memory = memory
root = instance.root_path.rstrip("/") or ""
if root and not root.startswith("/"):
root = "/" + root
instance._root = root
return instance
@property
def read_only(self) -> bool:
"""Whether the underlying memory is read-only."""
return self._memory.read_only
def _scope_path(self, scope: str | None) -> str:
if not scope or scope == "/":
@@ -52,7 +66,7 @@ class MemoryScope:
importance: float | None = None,
source: str | None = None,
private: bool = False,
) -> MemoryRecord:
) -> MemoryRecord | None:
"""Remember content; scope is relative to this scope's root."""
path = self._scope_path(scope)
return self._memory.remember(
@@ -71,7 +85,7 @@ class MemoryScope:
scope: str | None = None,
categories: list[str] | None = None,
limit: int = 10,
depth: str = "deep",
depth: Literal["shallow", "deep"] = "deep",
source: str | None = None,
include_private: bool = False,
) -> list[MemoryMatch]:
@@ -138,34 +152,34 @@ class MemoryScope:
"""Return a narrower scope under this scope."""
child = path.strip("/")
if not child:
return MemoryScope(self._memory, self._root or "/")
return MemoryScope(memory=self._memory, root_path=self._root or "/")
base = self._root.rstrip("/") or ""
new_root = f"{base}/{child}" if base else f"/{child}"
return MemoryScope(self._memory, new_root)
return MemoryScope(memory=self._memory, root_path=new_root)
class MemorySlice:
class MemorySlice(BaseModel):
"""View over multiple scopes: recall searches all, remember is a no-op when read_only."""
def __init__(
self,
memory: Memory,
scopes: list[str],
categories: list[str] | None = None,
read_only: bool = True,
) -> None:
"""Initialize slice.
model_config = ConfigDict(arbitrary_types_allowed=True)
Args:
memory: The underlying Memory instance.
scopes: List of scope paths to include.
categories: Optional category filter for recall.
read_only: If True, remember() is a silent no-op.
"""
self._memory = memory
self._scopes = [s.rstrip("/") or "/" for s in scopes]
self._categories = categories
self._read_only = read_only
scopes: list[str] = Field(default_factory=list)
categories: list[str] | None = Field(default=None)
read_only: bool = Field(default=True)
_memory: Memory = PrivateAttr()
@model_validator(mode="wrap")
@classmethod
def _accept_memory(cls, data: Any, handler: Any) -> MemorySlice:
"""Extract memory dependency and normalize scopes before validation."""
if isinstance(data, MemorySlice):
return data
memory = data.pop("memory")
data["scopes"] = [s.rstrip("/") or "/" for s in data.get("scopes", [])]
instance: MemorySlice = handler(data)
instance._memory = memory
return instance
def remember(
self,
@@ -178,7 +192,7 @@ class MemorySlice:
private: bool = False,
) -> MemoryRecord | None:
"""Remember into an explicit scope. No-op when read_only=True."""
if self._read_only:
if self.read_only:
return None
return self._memory.remember(
content,
@@ -196,14 +210,14 @@ class MemorySlice:
scope: str | None = None,
categories: list[str] | None = None,
limit: int = 10,
depth: str = "deep",
depth: Literal["shallow", "deep"] = "deep",
source: str | None = None,
include_private: bool = False,
) -> list[MemoryMatch]:
"""Recall across all slice scopes; results merged and re-ranked."""
cats = categories or self._categories
cats = categories or self.categories
all_matches: list[MemoryMatch] = []
for sc in self._scopes:
for sc in self.scopes:
matches = self._memory.recall(
query,
scope=sc,
@@ -231,7 +245,7 @@ class MemorySlice:
def list_scopes(self, path: str = "/") -> list[str]:
"""List scopes across all slice roots."""
out: list[str] = []
for sc in self._scopes:
for sc in self.scopes:
full = f"{sc.rstrip('/')}{path}" if sc != "/" else path
out.extend(self._memory.list_scopes(full))
return sorted(set(out))
@@ -243,15 +257,23 @@ class MemorySlice:
oldest: datetime | None = None
newest: datetime | None = None
children: list[str] = []
for sc in self._scopes:
for sc in self.scopes:
full = f"{sc.rstrip('/')}{path}" if sc != "/" else path
inf = self._memory.info(full)
total_records += inf.record_count
all_categories.update(inf.categories)
if inf.oldest_record:
oldest = inf.oldest_record if oldest is None else min(oldest, inf.oldest_record)
oldest = (
inf.oldest_record
if oldest is None
else min(oldest, inf.oldest_record)
)
if inf.newest_record:
newest = inf.newest_record if newest is None else max(newest, inf.newest_record)
newest = (
inf.newest_record
if newest is None
else max(newest, inf.newest_record)
)
children.extend(inf.child_scopes)
return ScopeInfo(
path=path,
@@ -265,7 +287,7 @@ class MemorySlice:
def list_categories(self, path: str | None = None) -> dict[str, int]:
"""Categories and counts across slice scopes."""
counts: dict[str, int] = {}
for sc in self._scopes:
for sc in self.scopes:
full = (f"{sc.rstrip('/')}{path}" if sc != "/" else path) if path else sc
for k, v in self._memory.list_categories(full).items():
counts[k] = counts.get(k, 0) + v

View File

@@ -6,7 +6,9 @@ from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
import threading
import time
from typing import TYPE_CHECKING, Any, Literal
from typing import TYPE_CHECKING, Annotated, Any, Literal
from pydantic import BaseModel, ConfigDict, Field, PlainValidator, PrivateAttr
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
@@ -39,13 +41,18 @@ if TYPE_CHECKING:
)
def _passthrough(v: Any) -> Any:
"""PlainValidator that accepts any value, bypassing strict union discrimination."""
return v
def _default_embedder() -> OpenAIEmbeddingFunction:
"""Build default OpenAI embedder for memory."""
spec: OpenAIProviderSpec = {"provider": "openai", "config": {}}
return build_embedder(spec)
class Memory:
class Memory(BaseModel):
"""Unified memory: standalone, LLM-analyzed, with intelligent recall flow.
Works without agent/crew. Uses LLM to infer scope, categories, importance on save.
@@ -53,116 +60,119 @@ class Memory:
pluggable storage (LanceDB default).
"""
def __init__(
self,
llm: BaseLLM | str = "gpt-4o-mini",
storage: StorageBackend | str = "lancedb",
embedder: Any = None,
# -- Scoring weights --
# These three weights control how recall results are ranked.
# The composite score is: semantic_weight * similarity + recency_weight * decay + importance_weight * importance.
# They should sum to ~1.0 for intuitive scoring.
recency_weight: float = 0.3,
semantic_weight: float = 0.5,
importance_weight: float = 0.2,
# How quickly old memories lose relevance. The recency score halves every
# N days (exponential decay). Lower = faster forgetting; higher = longer relevance.
recency_half_life_days: int = 30,
# -- Consolidation --
# When remembering new content, if an existing record has similarity >= this
# threshold, the LLM is asked to merge/update/delete. Set to 1.0 to disable.
consolidation_threshold: float = 0.85,
# Max existing records to compare against when checking for consolidation.
consolidation_limit: int = 5,
# -- Save defaults --
# Importance assigned to new memories when no explicit value is given and
# the LLM analysis path is skipped (all fields provided by the caller).
default_importance: float = 0.5,
# -- Recall depth control --
# These thresholds govern the RecallFlow router that decides between
# returning results immediately ("synthesize") vs. doing an extra
# LLM-driven exploration round ("explore_deeper").
# confidence >= confidence_threshold_high => always synthesize
# confidence < confidence_threshold_low => explore deeper (if budget > 0)
# complex query + confidence < complex_query_threshold => explore deeper
confidence_threshold_high: float = 0.8,
confidence_threshold_low: float = 0.5,
complex_query_threshold: float = 0.7,
# How many LLM-driven exploration rounds the RecallFlow is allowed to run.
# 0 = always shallow (vector search only); higher = more thorough but slower.
exploration_budget: int = 1,
# Queries shorter than this skip LLM analysis (saving ~1-3s).
# Longer queries (full task descriptions) benefit from LLM distillation.
query_analysis_threshold: int = 200,
# When True, all write operations (remember, remember_many) are silently
# skipped. Useful for sharing a read-only view of memory across agents
# without any of them persisting new memories.
read_only: bool = False,
) -> None:
"""Initialize Memory.
model_config = ConfigDict(arbitrary_types_allowed=True)
Args:
llm: LLM for analysis (model name or BaseLLM instance).
storage: Backend: "lancedb" or a StorageBackend instance.
embedder: Embedding callable, provider config dict, or None (default OpenAI).
recency_weight: Weight for recency in the composite relevance score.
semantic_weight: Weight for semantic similarity in the composite relevance score.
importance_weight: Weight for importance in the composite relevance score.
recency_half_life_days: Recency score halves every N days (exponential decay).
consolidation_threshold: Similarity above which consolidation is triggered on save.
consolidation_limit: Max existing records to compare during consolidation.
default_importance: Default importance when not provided or inferred.
confidence_threshold_high: Recall confidence above which results are returned directly.
confidence_threshold_low: Recall confidence below which deeper exploration is triggered.
complex_query_threshold: For complex queries, explore deeper below this confidence.
exploration_budget: Number of LLM-driven exploration rounds during deep recall.
query_analysis_threshold: Queries shorter than this skip LLM analysis during deep recall.
read_only: If True, remember() and remember_many() are silent no-ops.
"""
self._read_only = read_only
llm: Annotated[BaseLLM | str, PlainValidator(_passthrough)] = Field(
default="gpt-4o-mini",
description="LLM for analysis (model name or BaseLLM instance).",
)
storage: Annotated[StorageBackend | str, PlainValidator(_passthrough)] = Field(
default="lancedb",
description="Storage backend instance or path string.",
)
embedder: Any = Field(
default=None,
description="Embedding callable, provider config dict, or None for default OpenAI.",
)
recency_weight: float = Field(
default=0.3,
description="Weight for recency in the composite relevance score.",
)
semantic_weight: float = Field(
default=0.5,
description="Weight for semantic similarity in the composite relevance score.",
)
importance_weight: float = Field(
default=0.2,
description="Weight for importance in the composite relevance score.",
)
recency_half_life_days: int = Field(
default=30,
description="Recency score halves every N days (exponential decay).",
)
consolidation_threshold: float = Field(
default=0.85,
description="Similarity above which consolidation is triggered on save.",
)
consolidation_limit: int = Field(
default=5,
description="Max existing records to compare during consolidation.",
)
default_importance: float = Field(
default=0.5,
description="Default importance when not provided or inferred.",
)
confidence_threshold_high: float = Field(
default=0.8,
description="Recall confidence above which results are returned directly.",
)
confidence_threshold_low: float = Field(
default=0.5,
description="Recall confidence below which deeper exploration is triggered.",
)
complex_query_threshold: float = Field(
default=0.7,
description="For complex queries, explore deeper below this confidence.",
)
exploration_budget: int = Field(
default=1,
description="Number of LLM-driven exploration rounds during deep recall.",
)
query_analysis_threshold: int = Field(
default=200,
description="Queries shorter than this skip LLM analysis during deep recall.",
)
read_only: bool = Field(
default=False,
description="If True, remember() and remember_many() are silent no-ops.",
)
_config: MemoryConfig = PrivateAttr()
_llm_instance: BaseLLM | None = PrivateAttr(default=None)
_embedder_instance: Any = PrivateAttr(default=None)
_storage: StorageBackend = PrivateAttr()
_save_pool: ThreadPoolExecutor = PrivateAttr(
default_factory=lambda: ThreadPoolExecutor(
max_workers=1, thread_name_prefix="memory-save"
)
)
_pending_saves: list[Future[Any]] = PrivateAttr(default_factory=list)
_pending_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
def model_post_init(self, __context: Any) -> None:
"""Initialize runtime state from field values."""
self._config = MemoryConfig(
recency_weight=recency_weight,
semantic_weight=semantic_weight,
importance_weight=importance_weight,
recency_half_life_days=recency_half_life_days,
consolidation_threshold=consolidation_threshold,
consolidation_limit=consolidation_limit,
default_importance=default_importance,
confidence_threshold_high=confidence_threshold_high,
confidence_threshold_low=confidence_threshold_low,
complex_query_threshold=complex_query_threshold,
exploration_budget=exploration_budget,
query_analysis_threshold=query_analysis_threshold,
recency_weight=self.recency_weight,
semantic_weight=self.semantic_weight,
importance_weight=self.importance_weight,
recency_half_life_days=self.recency_half_life_days,
consolidation_threshold=self.consolidation_threshold,
consolidation_limit=self.consolidation_limit,
default_importance=self.default_importance,
confidence_threshold_high=self.confidence_threshold_high,
confidence_threshold_low=self.confidence_threshold_low,
complex_query_threshold=self.complex_query_threshold,
exploration_budget=self.exploration_budget,
query_analysis_threshold=self.query_analysis_threshold,
)
# Store raw config for lazy initialization. LLM and embedder are only
# built on first access so that Memory() never fails at construction
# time (e.g. when auto-created by Flow without an API key set).
self._llm_config: BaseLLM | str = llm
self._llm_instance: BaseLLM | None = None if isinstance(llm, str) else llm
self._embedder_config: Any = embedder
self._embedder_instance: Any = (
embedder
if (embedder is not None and not isinstance(embedder, dict))
self._llm_instance = None if isinstance(self.llm, str) else self.llm
self._embedder_instance = (
self.embedder
if (self.embedder is not None and not isinstance(self.embedder, dict))
else None
)
if isinstance(storage, str):
if isinstance(self.storage, str):
from crewai.memory.storage.lancedb_storage import LanceDBStorage
self._storage = LanceDBStorage() if storage == "lancedb" else LanceDBStorage(path=storage)
self._storage = (
LanceDBStorage()
if self.storage == "lancedb"
else LanceDBStorage(path=self.storage)
)
else:
self._storage = storage
# Background save queue. max_workers=1 serializes saves to avoid
# concurrent storage mutations (two saves finding the same similar
# record and both trying to update/delete it). Within each save,
# the parallel LLM calls still run on their own thread pool.
self._save_pool = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="memory-save"
)
self._pending_saves: list[Future[Any]] = []
self._pending_lock = threading.Lock()
self._storage = self.storage
_MEMORY_DOCS_URL = "https://docs.crewai.com/concepts/memory"
@@ -173,11 +183,7 @@ class Memory:
from crewai.llm import LLM
try:
model_name = (
self._llm_config
if isinstance(self._llm_config, str)
else str(self._llm_config)
)
model_name = self.llm if isinstance(self.llm, str) else str(self.llm)
self._llm_instance = LLM(model=model_name)
except Exception as e:
raise RuntimeError(
@@ -197,8 +203,8 @@ class Memory:
"""Lazy embedder initialization -- only created when first needed."""
if self._embedder_instance is None:
try:
if isinstance(self._embedder_config, dict):
self._embedder_instance = build_embedder(self._embedder_config)
if isinstance(self.embedder, dict):
self._embedder_instance = build_embedder(self.embedder)
else:
self._embedder_instance = _default_embedder()
except Exception as e:
@@ -356,7 +362,7 @@ class Memory:
Raises:
Exception: On save failure (events emitted).
"""
if self._read_only:
if self.read_only:
return None
_source_type = "unified_memory"
try:
@@ -444,7 +450,7 @@ class Memory:
Returns:
Empty list (records are not available until the background save completes).
"""
if not contents or self._read_only:
if not contents or self.read_only:
return []
self._submit_save(

View File

@@ -121,7 +121,7 @@ def create_memory_tools(memory: Any) -> list[BaseTool]:
description=i18n.tools("recall_memory"),
),
]
if not getattr(memory, "_read_only", False):
if not memory.read_only:
tools.append(
RememberTool(
memory=memory,

View File

@@ -1136,7 +1136,7 @@ def test_lite_agent_memory_instance_recall_and_save_called():
successful_requests=1,
)
mock_memory = Mock()
mock_memory._read_only = False
mock_memory.read_only = False
mock_memory.recall.return_value = []
mock_memory.extract_memories.return_value = ["Fact one.", "Fact two."]

View File

@@ -172,8 +172,8 @@ def test_memory_scope_slice(tmp_path: Path, mock_embedder: MagicMock) -> None:
sc = mem.scope("/agent/1")
assert sc._root in ("/agent/1", "/agent/1/")
sl = mem.slice(["/a", "/b"], read_only=True)
assert sl._read_only is True
assert "/a" in sl._scopes and "/b" in sl._scopes
assert sl.read_only is True
assert "/a" in sl.scopes and "/b" in sl.scopes
def test_memory_list_scopes_info_tree(tmp_path: Path, mock_embedder: MagicMock) -> None:
@@ -198,7 +198,7 @@ def test_memory_scope_remember_recall(tmp_path: Path, mock_embedder: MagicMock)
from crewai.memory.memory_scope import MemoryScope
mem = Memory(storage=str(tmp_path / "db5"), llm=MagicMock(), embedder=mock_embedder)
scope = MemoryScope(mem, "/crew/1")
scope = MemoryScope(memory=mem, root_path="/crew/1")
scope.remember("Scoped note", scope="/", categories=[], importance=0.5, metadata={})
results = scope.recall("note", limit=5, depth="shallow")
assert len(results) >= 1
@@ -213,7 +213,7 @@ def test_memory_slice_recall(tmp_path: Path, mock_embedder: MagicMock) -> None:
mem = Memory(storage=str(tmp_path / "db6"), llm=MagicMock(), embedder=mock_embedder)
mem.remember("In scope A", scope="/a", categories=[], importance=0.5, metadata={})
sl = MemorySlice(mem, ["/a"], read_only=True)
sl = MemorySlice(memory=mem, scopes=["/a"], read_only=True)
matches = sl.recall("scope", limit=5, depth="shallow")
assert isinstance(matches, list)
@@ -223,7 +223,7 @@ def test_memory_slice_remember_is_noop_when_read_only(tmp_path: Path, mock_embed
from crewai.memory.memory_scope import MemorySlice
mem = Memory(storage=str(tmp_path / "db7"), llm=MagicMock(), embedder=mock_embedder)
sl = MemorySlice(mem, ["/a"], read_only=True)
sl = MemorySlice(memory=mem, scopes=["/a"], read_only=True)
result = sl.remember("x", scope="/a")
assert result is None
assert mem.list_records() == []
@@ -319,7 +319,7 @@ def test_executor_save_to_memory_calls_extract_then_remember_per_item() -> None:
from crewai.agents.parser import AgentFinish
mock_memory = MagicMock()
mock_memory._read_only = False
mock_memory.read_only = False
mock_memory.extract_memories.return_value = ["Fact A.", "Fact B."]
mock_agent = MagicMock()
@@ -360,7 +360,7 @@ def test_executor_save_to_memory_skips_delegation_output() -> None:
from crewai.utilities.string_utils import sanitize_tool_name
mock_memory = MagicMock()
mock_memory._read_only = False
mock_memory.read_only = False
mock_agent = MagicMock()
mock_agent.memory = mock_memory
mock_agent._logger = MagicMock()
@@ -393,7 +393,7 @@ def test_memory_scope_extract_memories_delegates() -> None:
mock_memory = MagicMock()
mock_memory.extract_memories.return_value = ["Scoped fact."]
scope = MemoryScope(mock_memory, "/agent/1")
scope = MemoryScope(memory=mock_memory, root_path="/agent/1")
result = scope.extract_memories("Some content")
mock_memory.extract_memories.assert_called_once_with("Some content")
assert result == ["Scoped fact."]
@@ -405,7 +405,7 @@ def test_memory_slice_extract_memories_delegates() -> None:
mock_memory = MagicMock()
mock_memory.extract_memories.return_value = ["Sliced fact."]
sl = MemorySlice(mock_memory, ["/a", "/b"], read_only=True)
sl = MemorySlice(memory=mock_memory, scopes=["/a", "/b"], read_only=True)
result = sl.extract_memories("Some content")
mock_memory.extract_memories.assert_called_once_with("Some content")
assert result == ["Sliced fact."]
@@ -670,10 +670,10 @@ def test_agent_kickoff_memory_recall_and_save(tmp_path: Path, mock_embedder: Mag
verbose=False,
)
# Mock recall to verify it's called, but return real results
with patch.object(mem, "recall", wraps=mem.recall) as recall_mock, \
patch.object(mem, "extract_memories", return_value=["PostgreSQL is used."]) as extract_mock, \
patch.object(mem, "remember_many", wraps=mem.remember_many) as remember_many_mock:
# Patch on the class to avoid Pydantic BaseModel __delattr__ restriction
with patch.object(Memory, "recall", wraps=mem.recall) as recall_mock, \
patch.object(Memory, "extract_memories", return_value=["PostgreSQL is used."]) as extract_mock, \
patch.object(Memory, "remember_many", wraps=mem.remember_many) as remember_many_mock:
result = agent.kickoff("What database do we use?")
assert result is not None

View File

@@ -36,7 +36,7 @@ from crewai.flow import Flow, start
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
from crewai.memory.unified_memory import Memory
from crewai.process import Process
from crewai.project import CrewBase, agent, before_kickoff, crew, task
from crewai.task import Task
@@ -2618,9 +2618,9 @@ def test_memory_remember_called_after_task():
)
with patch.object(
crew._memory, "extract_memories", wraps=crew._memory.extract_memories
Memory, "extract_memories", wraps=crew._memory.extract_memories
) as extract_mock, patch.object(
crew._memory, "remember", wraps=crew._memory.remember
Memory, "remember", wraps=crew._memory.remember
) as remember_mock:
crew.kickoff()
@@ -4773,13 +4773,13 @@ def test_memory_remember_receives_task_content():
# Mock extract_memories to return fake memories and capture the raw input.
# No wraps= needed -- the test only checks what args it receives, not the output.
patch.object(
crew._memory, "extract_memories", return_value=["Fake memory."]
Memory, "extract_memories", return_value=["Fake memory."]
) as extract_mock,
# Mock recall to avoid LLM calls for query analysis (not in cassette).
patch.object(crew._memory, "recall", return_value=[]),
patch.object(Memory, "recall", return_value=[]),
# Mock remember_many to prevent the background save from triggering
# LLM calls (field resolution) that aren't in the cassette.
patch.object(crew._memory, "remember_many", return_value=[]),
patch.object(Memory, "remember_many", return_value=[]),
):
crew.kickoff()

View File

@@ -1893,3 +1893,163 @@ def test_or_condition_self_listen_fires_once():
flow = OrSelfListenFlow()
flow.kickoff()
assert call_count == 1
class ListState(BaseModel):
items: list = []
class DictState(BaseModel):
data: dict = {}
class _ListFlow(Flow[ListState]):
@start()
def populate(self):
self.state.items = [3, 1, 4, 1, 5, 9, 2, 6]
class _DictFlow(Flow[DictState]):
@start()
def populate(self):
self.state.data = {"a": 1, "b": 2, "c": 3}
def _make_list_flow():
flow = _ListFlow()
flow.kickoff()
return flow
def _make_dict_flow():
flow = _DictFlow()
flow.kickoff()
return flow
def test_locked_list_proxy_index():
flow = _make_list_flow()
assert flow.state.items.index(4) == 2
assert flow.state.items.index(1, 2) == 3
def test_locked_list_proxy_index_missing_raises():
flow = _make_list_flow()
with pytest.raises(ValueError):
flow.state.items.index(999)
def test_locked_list_proxy_count():
flow = _make_list_flow()
assert flow.state.items.count(1) == 2
assert flow.state.items.count(999) == 0
def test_locked_list_proxy_sort():
flow = _make_list_flow()
flow.state.items.sort()
assert list(flow.state.items) == [1, 1, 2, 3, 4, 5, 6, 9]
def test_locked_list_proxy_sort_reverse():
flow = _make_list_flow()
flow.state.items.sort(reverse=True)
assert list(flow.state.items) == [9, 6, 5, 4, 3, 2, 1, 1]
def test_locked_list_proxy_sort_key():
flow = _make_list_flow()
flow.state.items.sort(key=lambda x: -x)
assert list(flow.state.items) == [9, 6, 5, 4, 3, 2, 1, 1]
def test_locked_list_proxy_reverse():
flow = _make_list_flow()
original = list(flow.state.items)
flow.state.items.reverse()
assert list(flow.state.items) == list(reversed(original))
def test_locked_list_proxy_copy():
flow = _make_list_flow()
copied = flow.state.items.copy()
assert copied == [3, 1, 4, 1, 5, 9, 2, 6]
assert isinstance(copied, list)
copied.append(999)
assert 999 not in flow.state.items
def test_locked_list_proxy_add():
flow = _make_list_flow()
result = flow.state.items + [10, 11]
assert result == [3, 1, 4, 1, 5, 9, 2, 6, 10, 11]
assert len(flow.state.items) == 8
def test_locked_list_proxy_radd():
flow = _make_list_flow()
result = [0] + flow.state.items
assert result[0] == 0
assert len(result) == 9
def test_locked_list_proxy_iadd():
flow = _make_list_flow()
flow.state.items += [10]
assert 10 in flow.state.items
# Verify no deadlock: mutations must still work after +=
flow.state.items.append(99)
assert 99 in flow.state.items
def test_locked_list_proxy_mul():
flow = _make_list_flow()
result = flow.state.items * 2
assert len(result) == 16
def test_locked_list_proxy_rmul():
flow = _make_list_flow()
result = 2 * flow.state.items
assert len(result) == 16
def test_locked_list_proxy_reversed():
flow = _make_list_flow()
original = list(flow.state.items)
assert list(reversed(flow.state.items)) == list(reversed(original))
def test_locked_dict_proxy_copy():
flow = _make_dict_flow()
copied = flow.state.data.copy()
assert copied == {"a": 1, "b": 2, "c": 3}
assert isinstance(copied, dict)
copied["z"] = 99
assert "z" not in flow.state.data
def test_locked_dict_proxy_or():
flow = _make_dict_flow()
result = flow.state.data | {"d": 4}
assert result == {"a": 1, "b": 2, "c": 3, "d": 4}
assert "d" not in flow.state.data
def test_locked_dict_proxy_ror():
flow = _make_dict_flow()
result = {"z": 0} | flow.state.data
assert result == {"z": 0, "a": 1, "b": 2, "c": 3}
def test_locked_dict_proxy_ior():
flow = _make_dict_flow()
flow.state.data |= {"d": 4}
assert flow.state.data["d"] == 4
# Verify no deadlock: mutations must still work after |=
flow.state.data["e"] = 5
assert flow.state.data["e"] == 5
def test_locked_dict_proxy_reversed():
flow = _make_dict_flow()
assert list(reversed(flow.state.data)) == ["c", "b", "a"]

8
uv.lock generated
View File

@@ -1426,7 +1426,7 @@ requires-dist = [
{ name = "docker", specifier = "~=7.1.0" },
{ name = "exa-py", marker = "extra == 'exa-py'", specifier = ">=1.8.7" },
{ name = "firecrawl-py", marker = "extra == 'firecrawl-py'", specifier = ">=1.8.0" },
{ name = "gitpython", marker = "extra == 'github'", specifier = "==3.1.38" },
{ name = "gitpython", marker = "extra == 'github'", specifier = ">=3.1.41,<4" },
{ name = "hyperbrowser", marker = "extra == 'hyperbrowser'", specifier = ">=0.18.0" },
{ name = "langchain-apify", marker = "extra == 'apify'", specifier = ">=0.1.2,<1.0.0" },
{ name = "linkup-sdk", marker = "extra == 'linkup-sdk'", specifier = ">=0.2.2" },
@@ -2201,14 +2201,14 @@ wheels = [
[[package]]
name = "gitpython"
version = "3.1.38"
version = "3.1.46"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "gitdb" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b3/45/cee7af549b6fa33f04531e402693a772b776cd9f845a2cbeca99cfac3331/GitPython-3.1.38.tar.gz", hash = "sha256:4d683e8957c8998b58ddb937e3e6cd167215a180e1ffd4da769ab81c620a89fe", size = 200632, upload-time = "2023-10-17T06:09:52.235Z" }
sdist = { url = "https://files.pythonhosted.org/packages/df/b5/59d16470a1f0dfe8c793f9ef56fd3826093fc52b3bd96d6b9d6c26c7e27b/gitpython-3.1.46.tar.gz", hash = "sha256:400124c7d0ef4ea03f7310ac2fbf7151e09ff97f2a3288d64a440c584a29c37f", size = 215371, upload-time = "2026-01-01T15:37:32.073Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3c/ae/044453eacd5a526d3f242ccd77e38ee8219c65e0b132562b551bd67c61a4/GitPython-3.1.38-py3-none-any.whl", hash = "sha256:9e98b672ffcb081c2c8d5aa630d4251544fb040fb158863054242f24a2a2ba30", size = 190573, upload-time = "2023-10-17T06:09:50.18Z" },
{ url = "https://files.pythonhosted.org/packages/6a/09/e21df6aef1e1ffc0c816f0522ddc3f6dcded766c3261813131c78a704470/gitpython-3.1.46-py3-none-any.whl", hash = "sha256:79812ed143d9d25b6d176a10bb511de0f9c67b1fa641d82097b0ab90398a2058", size = 208620, upload-time = "2026-01-01T15:37:30.574Z" },
]
[[package]]