Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
3758361d74 fix: address review comments - remove unused import and deduplicate log messages
Co-Authored-By: João <joao@crewai.com>
2026-04-09 03:49:41 +00:00
Devin AI
e22d1852fe style: format decorators.py with ruff
Co-Authored-By: João <joao@crewai.com>
2026-04-09 03:42:50 +00:00
Devin AI
f1b7e57095 fix: log persistence storage location for @persist decorator
When using @persist, users had no way to discover where their flow state
was being stored. The default SQLiteFlowPersistence uses appdirs which
varies by platform, making the database location opaque.

Changes:
- Log db_path at INFO level when SQLiteFlowPersistence is initialized
- Log storage location (db_path) after each state save in persist decorator
- Show storage location in console output when verbose=True
- Add 5 new tests covering persistence path discoverability

Closes #5372

Co-Authored-By: João <joao@crewai.com>
2026-04-09 03:40:12 +00:00
3 changed files with 104 additions and 25 deletions

View File

@@ -46,7 +46,7 @@ T = TypeVar("T")
# Constants for log messages
LOG_MESSAGES: Final[dict[str, str]] = {
"save_state": "Saving flow state to memory for ID: {}",
"save_state": "Saving flow state for ID: {} (storage: {})",
"save_error": "Failed to persist state for method {}: {}",
"state_missing": "Flow instance has no state",
"id_missing": "Flow state must have an 'id' field for persistence",
@@ -100,13 +100,6 @@ class PersistenceDecorator:
if not flow_uuid:
raise ValueError("Flow state must have an 'id' field for persistence")
# Log state saving only if verbose is True
if verbose:
PRINTER.print(
LOG_MESSAGES["save_state"].format(flow_uuid), color="cyan"
)
logger.info(LOG_MESSAGES["save_state"].format(flow_uuid))
try:
state_data = state._unwrap() if hasattr(state, "_unwrap") else state
persistence_instance.save_state(
@@ -120,6 +113,15 @@ class PersistenceDecorator:
PRINTER.print(error_msg, color="red")
logger.error(error_msg)
raise RuntimeError(f"State persistence failed: {e!s}") from e
# Log storage location so users can find their persisted data
storage_location = getattr(
persistence_instance, "db_path", type(persistence_instance).__name__
)
msg = LOG_MESSAGES["save_state"].format(flow_uuid, storage_location)
if verbose:
PRINTER.print(msg, color="cyan")
logger.info(msg)
except AttributeError as e:
error_msg = LOG_MESSAGES["state_missing"]
if verbose:

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
from datetime import datetime, timezone
import json
import logging
import os
from pathlib import Path
import sqlite3
@@ -17,6 +18,9 @@ from crewai.utilities.lock_store import lock as store_lock
from crewai.utilities.paths import db_storage_path
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -66,6 +70,7 @@ class SQLiteFlowPersistence(FlowPersistence):
def _setup(self) -> Self:
self._lock_name = f"sqlite:{os.path.realpath(self.db_path)}"
self.init_db()
logger.info("SQLiteFlowPersistence initialized with db_path: %s", self.db_path)
return self
def init_db(self) -> None:

View File

@@ -176,23 +176,6 @@ def test_persist_decorator_verbose_logging(tmp_path, caplog):
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
# Test with verbose=False (default)
class QuietFlow(Flow[Dict[str, str]]):
initial_state = dict()
@start()
@persist(persistence) # Default verbose=False
def init_step(self):
self.state["message"] = "Hello, World!"
self.state["id"] = "test-uuid-1"
flow = QuietFlow(persistence=persistence)
flow.kickoff()
assert "Saving flow state" not in caplog.text
# Clear the log
caplog.clear()
# Test with verbose=True
class VerboseFlow(Flow[Dict[str, str]]):
initial_state = dict()
@@ -248,3 +231,92 @@ def test_persistence_with_base_model(tmp_path):
assert message.type == "text"
assert message.content == "Hello, World!"
assert isinstance(flow.state._unwrap(), State)
def test_sqlite_persistence_logs_db_path_on_init(tmp_path, caplog):
"""Test that SQLiteFlowPersistence logs its db_path on initialization."""
caplog.set_level("INFO")
db_path = os.path.join(tmp_path, "my_custom.db")
SQLiteFlowPersistence(db_path)
assert "SQLiteFlowPersistence initialized with db_path" in caplog.text
assert db_path in caplog.text
def test_sqlite_persistence_default_path_is_logged(caplog):
"""Test that the default persistence path is logged so users can discover it."""
caplog.set_level("INFO")
persistence = SQLiteFlowPersistence()
assert "SQLiteFlowPersistence initialized with db_path" in caplog.text
assert "flow_states.db" in caplog.text
# Verify the db_path attribute is accessible for programmatic discovery
assert persistence.db_path.endswith("flow_states.db")
def test_persist_logs_storage_location_on_save(tmp_path, caplog):
"""Test that the persist decorator logs the storage location when state is saved."""
caplog.set_level("INFO")
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class LocationLogFlow(Flow[TestState]):
@start()
@persist(persistence)
def init_step(self):
self.state.message = "test"
flow = LocationLogFlow(persistence=persistence)
flow.kickoff()
# Verify that the storage location (db_path) is logged after saving
assert "storage:" in caplog.text
assert db_path in caplog.text
def test_persist_verbose_shows_storage_location_with_db_path(tmp_path, caplog):
"""Test that verbose persist includes storage location with actual db_path."""
caplog.set_level("INFO")
db_path = os.path.join(tmp_path, "verbose_test.db")
persistence = SQLiteFlowPersistence(db_path)
class VerboseLocationFlow(Flow[Dict[str, str]]):
initial_state = dict()
@start()
@persist(persistence, verbose=True)
def init_step(self):
self.state["message"] = "Hello!"
self.state["id"] = "verbose-uuid"
flow = VerboseLocationFlow(persistence=persistence)
flow.kickoff()
# Verbose mode should log both save message and storage location
assert "Saving flow state for ID: verbose-uuid" in caplog.text
assert f"storage: {db_path}" in caplog.text
def test_persist_class_level_logs_storage_location(tmp_path, caplog):
"""Test that class-level @persist also logs the storage location."""
caplog.set_level("INFO")
db_path = os.path.join(tmp_path, "class_level_test.db")
persistence = SQLiteFlowPersistence(db_path)
@persist(persistence)
class ClassLevelFlow(Flow[TestState]):
@start()
def init_step(self):
self.state.message = "class level"
flow = ClassLevelFlow(persistence=persistence)
flow.kickoff()
# Verify storage location is logged even with class-level decorator
assert "storage:" in caplog.text
assert db_path in caplog.text