Compare commits

...

4 Commits

Author SHA1 Message Date
Devin AI
d6a79703d3 Fix type-checker error: replace JSONEncodeError with proper exception types
- Use json.JSONDecodeError, TypeError, ValueError instead of non-existent JSONEncodeError
- Addresses mypy type-checker failure in CI

Co-Authored-By: João <joao@crewai.com>
2025-06-01 07:03:26 +00:00
Devin AI
c1eb48bd6e Add error handling for JSON serialization failures
- Wrap json.dumps() in try-catch block to handle JSONEncodeError
- Raise RuntimeError with descriptive message for serialization failures
- Addresses PR review feedback for better fault tolerance

Co-Authored-By: João <joao@crewai.com>
2025-06-01 06:58:27 +00:00
Devin AI
bcc77cb831 Fix lint: remove unused pytest import
Co-Authored-By: João <joao@crewai.com>
2025-06-01 06:57:58 +00:00
Devin AI
f47ed4f1f1 Fix Flow persistence with nested Pydantic models (issue #2929)
- Use CrewJSONEncoder in SQLiteFlowPersistence.save_state() to properly serialize nested Pydantic models
- Add comprehensive test for nested Pydantic model persistence
- Resolves RuntimeError: Object of type CustomPydanticModel is not JSON serializable

Co-Authored-By: João <joao@crewai.com>
2025-06-01 06:54:48 +00:00
2 changed files with 63 additions and 16 deletions

View File

@@ -11,6 +11,7 @@ from typing import Any, Dict, Optional, Union
from pydantic import BaseModel from pydantic import BaseModel
from crewai.flow.persistence.base import FlowPersistence from crewai.flow.persistence.base import FlowPersistence
from crewai.utilities.crew_json_encoder import CrewJSONEncoder
class SQLiteFlowPersistence(FlowPersistence): class SQLiteFlowPersistence(FlowPersistence):
@@ -89,23 +90,26 @@ class SQLiteFlowPersistence(FlowPersistence):
f"state_data must be either a Pydantic BaseModel or dict, got {type(state_data)}" f"state_data must be either a Pydantic BaseModel or dict, got {type(state_data)}"
) )
with sqlite3.connect(self.db_path) as conn: try:
conn.execute( with sqlite3.connect(self.db_path) as conn:
""" conn.execute(
INSERT INTO flow_states ( """
flow_uuid, INSERT INTO flow_states (
method_name,
timestamp,
state_json
) VALUES (?, ?, ?, ?)
""",
(
flow_uuid, flow_uuid,
method_name, method_name,
datetime.now(timezone.utc).isoformat(), timestamp,
json.dumps(state_dict), state_json
), ) VALUES (?, ?, ?, ?)
) """,
(
flow_uuid,
method_name,
datetime.now(timezone.utc).isoformat(),
json.dumps(state_dict, cls=CrewJSONEncoder),
),
)
except (json.JSONDecodeError, TypeError, ValueError) as e:
raise RuntimeError(f"Failed to serialize flow state: {str(e)}") from e
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]: def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID. """Load the most recent state for a given flow UUID.

View File

@@ -3,7 +3,6 @@
import os import os
from typing import Dict from typing import Dict
import pytest
from pydantic import BaseModel from pydantic import BaseModel
from crewai.flow.flow import Flow, FlowState, listen, start from crewai.flow.flow import Flow, FlowState, listen, start
@@ -208,3 +207,47 @@ def test_persist_decorator_verbose_logging(tmp_path, caplog):
flow = VerboseFlow(persistence=persistence) flow = VerboseFlow(persistence=persistence)
flow.kickoff() flow.kickoff()
assert "Saving flow state" in caplog.text assert "Saving flow state" in caplog.text
def test_nested_pydantic_model_persistence(tmp_path):
"""Test persistence with nested Pydantic models (issue #2929)."""
from pydantic import Field
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class CustomObject(BaseModel):
field_x: float | None = Field(description="foo bar", default=None)
class CustomState(FlowState):
custom_field: CustomObject | None = None
class NestedPydanticFlow(Flow[CustomState]):
initial_state = CustomState
@start()
@persist(persistence, verbose=True)
def set_nested_object(self):
self.state.custom_field = CustomObject(field_x=42.0)
flow = NestedPydanticFlow(persistence=persistence)
flow.kickoff()
saved_state = persistence.load_state(flow.state.id)
assert saved_state is not None
assert saved_state["custom_field"] is not None
assert saved_state["custom_field"]["field_x"] == 42.0
class NullNestedFlow(Flow[CustomState]):
initial_state = CustomState
@start()
@persist(persistence)
def set_null_nested(self):
self.state.custom_field = None
flow2 = NullNestedFlow(persistence=persistence)
flow2.kickoff()
saved_state2 = persistence.load_state(flow2.state.id)
assert saved_state2 is not None
assert saved_state2["custom_field"] is None