Compare commits

...

4 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
3d148090d9 Merge branch 'main' into bugfix/fix-type-error-in-token-counter 2025-02-26 13:25:02 -05:00
Brandon Hancock (bhancock_ai)
8fedbe49cb Add support for python 3.10 (#2230) 2025-02-26 13:24:31 -05:00
Lorenze Jay
82cfd353b3 Merge branch 'main' into bugfix/fix-type-error-in-token-counter 2025-02-26 10:01:19 -08:00
Brandon Hancock
0903bbeca2 Fix type issue 2025-02-25 12:18:52 -05:00
2 changed files with 33 additions and 16 deletions

View File

@@ -4,7 +4,7 @@ SQLite-based implementation of flow state persistence.
import json
import sqlite3
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional, Union
@@ -34,6 +34,7 @@ class SQLiteFlowPersistence(FlowPersistence):
ValueError: If db_path is invalid
"""
from crewai.utilities.paths import db_storage_path
# Get path from argument or default location
path = db_path or str(Path(db_storage_path()) / "flow_states.db")
@@ -46,7 +47,8 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None:
"""Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS flow_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL,
@@ -54,12 +56,15 @@ class SQLiteFlowPersistence(FlowPersistence):
timestamp DATETIME NOT NULL,
state_json TEXT NOT NULL
)
""")
"""
)
# Add index for faster UUID lookups
conn.execute("""
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
ON flow_states(flow_uuid)
""")
"""
)
def save_state(
self,
@@ -85,19 +90,22 @@ class SQLiteFlowPersistence(FlowPersistence):
)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
conn.execute(
"""
INSERT INTO flow_states (
flow_uuid,
method_name,
timestamp,
state_json
) VALUES (?, ?, ?, ?)
""", (
flow_uuid,
method_name,
datetime.utcnow().isoformat(),
json.dumps(state_dict),
))
""",
(
flow_uuid,
method_name,
datetime.now(timezone.utc).isoformat(),
json.dumps(state_dict),
),
)
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
@@ -109,13 +117,16 @@ class SQLiteFlowPersistence(FlowPersistence):
The most recent state as a dictionary, or None if no state exists
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
cursor = conn.execute(
"""
SELECT state_json
FROM flow_states
WHERE flow_uuid = ?
ORDER BY id DESC
LIMIT 1
""", (flow_uuid,))
""",
(flow_uuid,),
)
row = cursor.fetchone()
if row:

View File

@@ -30,8 +30,14 @@ class TokenCalcHandler(CustomLogger):
if hasattr(usage, "prompt_tokens"):
self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens)
if hasattr(usage, "completion_tokens"):
self.token_cost_process.sum_completion_tokens(usage.completion_tokens)
if hasattr(usage, "prompt_tokens_details") and usage.prompt_tokens_details:
self.token_cost_process.sum_completion_tokens(
usage.completion_tokens
)
if (
hasattr(usage, "prompt_tokens_details")
and usage.prompt_tokens_details
and usage.prompt_tokens_details.cached_tokens
):
self.token_cost_process.sum_cached_prompt_tokens(
usage.prompt_tokens_details.cached_tokens
)