mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-03 14:09:24 +00:00
fix: enforce per-entry artifact TTL and align cleanup scope
- Store each artifact's own expiry (expires_at) instead of a shared stored_at; prune by per-entry expiry on store and enforce it on resolve. Previously a short-TTL store could evict long-TTL entries and expired handles stayed resolvable until the next write. - Derive the artifact scope from the executor's crew or the agent's crew so the native and ReAct paths agree with the crew-id Crew passes to clear_artifact_scope, preventing crew-bound artifacts from lingering under a task scope until TTL.
This commit is contained in:
@@ -869,7 +869,9 @@ class CrewAgentExecutor(BaseAgentExecutor):
|
||||
store_if_artifact,
|
||||
)
|
||||
|
||||
scope_id = artifact_scope_id(self.crew, self.task)
|
||||
scope_id = artifact_scope_id(
|
||||
self.crew or getattr(self.agent, "crew", None), self.task
|
||||
)
|
||||
|
||||
args_dict, parse_error = parse_tool_call_args(
|
||||
func_args, func_name, call_id, original_tool
|
||||
|
||||
@@ -1767,7 +1767,9 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
return parse_error
|
||||
args_dict: dict[str, Any] = parsed_args or {}
|
||||
|
||||
scope_id = artifact_scope_id(self.crew, self.task)
|
||||
scope_id = artifact_scope_id(
|
||||
self.crew or getattr(self.agent, "crew", None), self.task
|
||||
)
|
||||
|
||||
# Get agent_key for event tracking
|
||||
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
|
||||
|
||||
@@ -96,7 +96,7 @@ class FileArtifact:
|
||||
class _Entry:
|
||||
artifact: FileArtifact
|
||||
scope_id: str | None
|
||||
stored_at: float
|
||||
expires_at: float | None
|
||||
|
||||
|
||||
class _ArtifactStore:
|
||||
@@ -119,18 +119,23 @@ class _ArtifactStore:
|
||||
) -> str:
|
||||
handle_id = str(uuid4())
|
||||
with self._lock:
|
||||
self._prune_locked(ttl)
|
||||
self._prune_locked()
|
||||
self._entries[handle_id] = _Entry(
|
||||
artifact=artifact,
|
||||
scope_id=str(scope_id) if scope_id is not None else None,
|
||||
stored_at=time.monotonic(),
|
||||
expires_at=(time.monotonic() + ttl) if ttl > 0 else None,
|
||||
)
|
||||
return f"{_HANDLE_SCHEME}://{handle_id}"
|
||||
|
||||
def resolve(self, handle_id: str) -> FileArtifact | None:
|
||||
with self._lock:
|
||||
entry = self._entries.get(handle_id)
|
||||
return entry.artifact if entry is not None else None
|
||||
if entry is None:
|
||||
return None
|
||||
if entry.expires_at is not None and entry.expires_at <= time.monotonic():
|
||||
del self._entries[handle_id]
|
||||
return None
|
||||
return entry.artifact
|
||||
|
||||
def clear_scope(self, scope_id: str) -> None:
|
||||
scope = str(scope_id)
|
||||
@@ -140,12 +145,13 @@ class _ArtifactStore:
|
||||
]:
|
||||
del self._entries[handle_id]
|
||||
|
||||
def _prune_locked(self, ttl: int) -> None:
|
||||
if ttl <= 0:
|
||||
return
|
||||
cutoff = time.monotonic() - ttl
|
||||
def _prune_locked(self) -> None:
|
||||
"""Drop entries whose per-entry TTL has elapsed. Caller holds the lock."""
|
||||
now = time.monotonic()
|
||||
for handle_id in [
|
||||
hid for hid, entry in self._entries.items() if entry.stored_at < cutoff
|
||||
hid
|
||||
for hid, entry in self._entries.items()
|
||||
if entry.expires_at is not None and entry.expires_at <= now
|
||||
]:
|
||||
del self._entries[handle_id]
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import re
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -284,17 +285,33 @@ class TestNativeExecutorWiring:
|
||||
|
||||
|
||||
class TestTtlPrune:
|
||||
@staticmethod
|
||||
def _expire(handle: str) -> None:
|
||||
"""Force a stored handle's per-entry TTL into the past."""
|
||||
entry = _store._entries[handle.rsplit("/", 1)[-1]]
|
||||
entry.expires_at = time.monotonic() - 1
|
||||
|
||||
def test_expired_handle_does_not_resolve(self) -> None:
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=b"old"), ttl=3600))
|
||||
self._expire(handle)
|
||||
# An expired handle is enforced on lookup, not just on the next write.
|
||||
assert resolve_artifact_handles(handle) == handle
|
||||
|
||||
def test_short_ttl_store_does_not_evict_long_ttl_entries(self) -> None:
|
||||
keep = _handle_in(store_artifact(FileArtifact(data=b"keep"), ttl=3600))
|
||||
# A later short-TTL store must prune only by each entry's own expiry,
|
||||
# never by the current call's ttl.
|
||||
store_artifact(FileArtifact(data=b"tiny"), ttl=1)
|
||||
assert base64.b64decode(resolve_artifact_handles(keep)) == b"keep"
|
||||
|
||||
def test_expired_entries_are_pruned_on_next_store(self) -> None:
|
||||
stale = _handle_in(store_artifact(FileArtifact(data=b"old"), ttl=3600))
|
||||
stale_id = stale.rsplit("/", 1)[-1]
|
||||
# Force the entry to look old, then trigger a prune via another store.
|
||||
_store._entries[stale_id].stored_at -= 7200
|
||||
self._expire(stale)
|
||||
store_artifact(FileArtifact(data=b"new"), ttl=3600)
|
||||
assert resolve_artifact_handles(stale) == stale
|
||||
assert stale.rsplit("/", 1)[-1] not in _store._entries
|
||||
|
||||
def test_ttl_zero_disables_pruning(self) -> None:
|
||||
def test_ttl_zero_never_expires(self) -> None:
|
||||
handle = _handle_in(store_artifact(FileArtifact(data=b"keep"), ttl=0))
|
||||
handle_id = handle.rsplit("/", 1)[-1]
|
||||
_store._entries[handle_id].stored_at -= 99999
|
||||
assert _store._entries[handle.rsplit("/", 1)[-1]].expires_at is None
|
||||
store_artifact(FileArtifact(data=b"another"), ttl=0)
|
||||
assert base64.b64decode(resolve_artifact_handles(handle)) == b"keep"
|
||||
|
||||
Reference in New Issue
Block a user