Compare commits

...

2 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
2e900c6443 Merge branch 'main' into bugfix/utc-in-python-3_10 2025-02-26 13:21:50 -05:00
Brandon Hancock
5a8d9019fa Add support for python 3.10 2025-02-25 16:05:22 -05:00

View File

@@ -4,7 +4,7 @@ SQLite-based implementation of flow state persistence.
import json import json
import sqlite3 import sqlite3
from datetime import datetime from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union
@@ -34,6 +34,7 @@ class SQLiteFlowPersistence(FlowPersistence):
ValueError: If db_path is invalid ValueError: If db_path is invalid
""" """
from crewai.utilities.paths import db_storage_path from crewai.utilities.paths import db_storage_path
# Get path from argument or default location # Get path from argument or default location
path = db_path or str(Path(db_storage_path()) / "flow_states.db") path = db_path or str(Path(db_storage_path()) / "flow_states.db")
@@ -46,7 +47,8 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None: def init_db(self) -> None:
"""Create the necessary tables if they don't exist.""" """Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
conn.execute(""" conn.execute(
"""
CREATE TABLE IF NOT EXISTS flow_states ( CREATE TABLE IF NOT EXISTS flow_states (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL, flow_uuid TEXT NOT NULL,
@@ -54,12 +56,15 @@ class SQLiteFlowPersistence(FlowPersistence):
timestamp DATETIME NOT NULL, timestamp DATETIME NOT NULL,
state_json TEXT NOT NULL state_json TEXT NOT NULL
) )
""") """
)
# Add index for faster UUID lookups # Add index for faster UUID lookups
conn.execute(""" conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
ON flow_states(flow_uuid) ON flow_states(flow_uuid)
""") """
)
def save_state( def save_state(
self, self,
@@ -85,19 +90,22 @@ class SQLiteFlowPersistence(FlowPersistence):
) )
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
conn.execute(""" conn.execute(
"""
INSERT INTO flow_states ( INSERT INTO flow_states (
flow_uuid, flow_uuid,
method_name, method_name,
timestamp, timestamp,
state_json state_json
) VALUES (?, ?, ?, ?) ) VALUES (?, ?, ?, ?)
""", ( """,
flow_uuid, (
method_name, flow_uuid,
datetime.utcnow().isoformat(), method_name,
json.dumps(state_dict), datetime.now(timezone.utc).isoformat(),
)) json.dumps(state_dict),
),
)
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]: def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID. """Load the most recent state for a given flow UUID.
@@ -109,13 +117,16 @@ class SQLiteFlowPersistence(FlowPersistence):
The most recent state as a dictionary, or None if no state exists The most recent state as a dictionary, or None if no state exists
""" """
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(""" cursor = conn.execute(
"""
SELECT state_json SELECT state_json
FROM flow_states FROM flow_states
WHERE flow_uuid = ? WHERE flow_uuid = ?
ORDER BY id DESC ORDER BY id DESC
LIMIT 1 LIMIT 1
""", (flow_uuid,)) """,
(flow_uuid,),
)
row = cursor.fetchone() row = cursor.fetchone()
if row: if row: