mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 16:18:30 +00:00
Implement Flow state export method (#2134)
This commit implements a method for exporting the state of a flow into a JSON-serializable dictionary. The idea is producing a human-readable version of state that can be inspected or consumed by other systems, hence JSON and not pickling or marshalling. I consider it an export because it's a one-way process, meaning it cannot be loaded back into Python because of complex types.
This commit is contained in:
52
src/crewai/flow/state_utils.py
Normal file
52
src/crewai/flow/state_utils.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from datetime import date, datetime
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.flow import Flow
|
||||
|
||||
|
||||
def export_state(flow: Flow) -> dict[str, Any]:
|
||||
"""Exports the Flow's internal state as JSON-compatible data structures.
|
||||
|
||||
Performs a one-way transformation of a Flow's state into basic Python types
|
||||
that can be safely serialized to JSON. To prevent infinite recursion with
|
||||
circular references, the conversion is limited to a depth of 5 levels.
|
||||
|
||||
Args:
|
||||
flow: The Flow object whose state needs to be exported
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: The transformed state using JSON-compatible Python
|
||||
types.
|
||||
"""
|
||||
return _to_serializable(flow._state)
|
||||
|
||||
|
||||
def _to_serializable(obj: Any, max_depth: int = 5, _current_depth: int = 0) -> Any:
|
||||
if _current_depth >= max_depth:
|
||||
return repr(obj)
|
||||
|
||||
if isinstance(obj, (str, int, float, bool, type(None))):
|
||||
return obj
|
||||
elif isinstance(obj, (date, datetime)):
|
||||
return obj.isoformat()
|
||||
elif isinstance(obj, (list, tuple, set)):
|
||||
return [_to_serializable(item, max_depth, _current_depth + 1) for item in obj]
|
||||
elif isinstance(obj, dict):
|
||||
return {
|
||||
_to_serializable_key(key): _to_serializable(
|
||||
value, max_depth, _current_depth + 1
|
||||
)
|
||||
for key, value in obj.items()
|
||||
}
|
||||
elif isinstance(obj, BaseModel):
|
||||
return _to_serializable(obj.model_dump(), max_depth, _current_depth + 1)
|
||||
else:
|
||||
return repr(obj)
|
||||
|
||||
|
||||
def _to_serializable_key(key: Any) -> str:
|
||||
if isinstance(key, (str, int)):
|
||||
return str(key)
|
||||
return f"key_{id(key)}_{repr(key)}"
|
||||
Reference in New Issue
Block a user