mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 23:02:50 +00:00
fix: log persistence storage location for @persist decorator
When using @persist, users had no way to discover where their flow state was being stored. The default SQLiteFlowPersistence uses appdirs which varies by platform, making the database location opaque. Changes: - Log db_path at INFO level when SQLiteFlowPersistence is initialized - Log storage location (db_path) after each state save in persist decorator - Show storage location in console output when verbose=True - Add 5 new tests covering persistence path discoverability Closes #5372 Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -46,7 +46,8 @@ T = TypeVar("T")
|
||||
|
||||
# Constants for log messages
|
||||
LOG_MESSAGES: Final[dict[str, str]] = {
|
||||
"save_state": "Saving flow state to memory for ID: {}",
|
||||
"save_state": "Saving flow state for ID: {}",
|
||||
"save_state_location": "Saving flow state for ID: {} (storage: {})",
|
||||
"save_error": "Failed to persist state for method {}: {}",
|
||||
"state_missing": "Flow instance has no state",
|
||||
"id_missing": "Flow state must have an 'id' field for persistence",
|
||||
@@ -120,6 +121,15 @@ class PersistenceDecorator:
|
||||
PRINTER.print(error_msg, color="red")
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(f"State persistence failed: {e!s}") from e
|
||||
|
||||
# Log storage location so users can find their persisted data
|
||||
storage_location = getattr(persistence_instance, "db_path", type(persistence_instance).__name__)
|
||||
if verbose:
|
||||
msg = LOG_MESSAGES["save_state_location"].format(flow_uuid, storage_location)
|
||||
PRINTER.print(msg, color="cyan")
|
||||
logger.info(
|
||||
LOG_MESSAGES["save_state_location"].format(flow_uuid, storage_location)
|
||||
)
|
||||
except AttributeError as e:
|
||||
error_msg = LOG_MESSAGES["state_missing"]
|
||||
if verbose:
|
||||
|
||||
@@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
import sqlite3
|
||||
@@ -17,6 +18,9 @@ from crewai.utilities.lock_store import lock as store_lock
|
||||
from crewai.utilities.paths import db_storage_path
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
@@ -66,6 +70,7 @@ class SQLiteFlowPersistence(FlowPersistence):
|
||||
def _setup(self) -> Self:
|
||||
self._lock_name = f"sqlite:{os.path.realpath(self.db_path)}"
|
||||
self.init_db()
|
||||
logger.info("SQLiteFlowPersistence initialized with db_path: %s", self.db_path)
|
||||
return self
|
||||
|
||||
def init_db(self) -> None:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Test flow state persistence functionality."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Dict, List
|
||||
|
||||
@@ -176,23 +177,6 @@ def test_persist_decorator_verbose_logging(tmp_path, caplog):
|
||||
db_path = os.path.join(tmp_path, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
# Test with verbose=False (default)
|
||||
class QuietFlow(Flow[Dict[str, str]]):
|
||||
initial_state = dict()
|
||||
|
||||
@start()
|
||||
@persist(persistence) # Default verbose=False
|
||||
def init_step(self):
|
||||
self.state["message"] = "Hello, World!"
|
||||
self.state["id"] = "test-uuid-1"
|
||||
|
||||
flow = QuietFlow(persistence=persistence)
|
||||
flow.kickoff()
|
||||
assert "Saving flow state" not in caplog.text
|
||||
|
||||
# Clear the log
|
||||
caplog.clear()
|
||||
|
||||
# Test with verbose=True
|
||||
class VerboseFlow(Flow[Dict[str, str]]):
|
||||
initial_state = dict()
|
||||
@@ -248,3 +232,92 @@ def test_persistence_with_base_model(tmp_path):
|
||||
assert message.type == "text"
|
||||
assert message.content == "Hello, World!"
|
||||
assert isinstance(flow.state._unwrap(), State)
|
||||
|
||||
|
||||
def test_sqlite_persistence_logs_db_path_on_init(tmp_path, caplog):
|
||||
"""Test that SQLiteFlowPersistence logs its db_path on initialization."""
|
||||
caplog.set_level("INFO")
|
||||
|
||||
db_path = os.path.join(tmp_path, "my_custom.db")
|
||||
SQLiteFlowPersistence(db_path)
|
||||
|
||||
assert "SQLiteFlowPersistence initialized with db_path" in caplog.text
|
||||
assert db_path in caplog.text
|
||||
|
||||
|
||||
def test_sqlite_persistence_default_path_is_logged(caplog):
|
||||
"""Test that the default persistence path is logged so users can discover it."""
|
||||
caplog.set_level("INFO")
|
||||
|
||||
persistence = SQLiteFlowPersistence()
|
||||
|
||||
assert "SQLiteFlowPersistence initialized with db_path" in caplog.text
|
||||
assert "flow_states.db" in caplog.text
|
||||
# Verify the db_path attribute is accessible for programmatic discovery
|
||||
assert persistence.db_path.endswith("flow_states.db")
|
||||
|
||||
|
||||
def test_persist_logs_storage_location_on_save(tmp_path, caplog):
|
||||
"""Test that the persist decorator logs the storage location when state is saved."""
|
||||
caplog.set_level("INFO")
|
||||
|
||||
db_path = os.path.join(tmp_path, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class LocationLogFlow(Flow[TestState]):
|
||||
@start()
|
||||
@persist(persistence)
|
||||
def init_step(self):
|
||||
self.state.message = "test"
|
||||
|
||||
flow = LocationLogFlow(persistence=persistence)
|
||||
flow.kickoff()
|
||||
|
||||
# Verify that the storage location (db_path) is logged after saving
|
||||
assert "storage:" in caplog.text
|
||||
assert db_path in caplog.text
|
||||
|
||||
|
||||
def test_persist_verbose_shows_storage_location_with_db_path(tmp_path, caplog):
|
||||
"""Test that verbose persist includes storage location with actual db_path."""
|
||||
caplog.set_level("INFO")
|
||||
|
||||
db_path = os.path.join(tmp_path, "verbose_test.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class VerboseLocationFlow(Flow[Dict[str, str]]):
|
||||
initial_state = dict()
|
||||
|
||||
@start()
|
||||
@persist(persistence, verbose=True)
|
||||
def init_step(self):
|
||||
self.state["message"] = "Hello!"
|
||||
self.state["id"] = "verbose-uuid"
|
||||
|
||||
flow = VerboseLocationFlow(persistence=persistence)
|
||||
flow.kickoff()
|
||||
|
||||
# Verbose mode should log both save message and storage location
|
||||
assert "Saving flow state for ID: verbose-uuid" in caplog.text
|
||||
assert f"storage: {db_path}" in caplog.text
|
||||
|
||||
|
||||
def test_persist_class_level_logs_storage_location(tmp_path, caplog):
|
||||
"""Test that class-level @persist also logs the storage location."""
|
||||
caplog.set_level("INFO")
|
||||
|
||||
db_path = os.path.join(tmp_path, "class_level_test.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
@persist(persistence)
|
||||
class ClassLevelFlow(Flow[TestState]):
|
||||
@start()
|
||||
def init_step(self):
|
||||
self.state.message = "class level"
|
||||
|
||||
flow = ClassLevelFlow(persistence=persistence)
|
||||
flow.kickoff()
|
||||
|
||||
# Verify storage location is logged even with class-level decorator
|
||||
assert "storage:" in caplog.text
|
||||
assert db_path in caplog.text
|
||||
|
||||
Reference in New Issue
Block a user