Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter

This commit is contained in:
Lorenze Jay
2025-02-19 13:24:26 -08:00
2 changed files with 50 additions and 12 deletions

View File

@@ -58,7 +58,7 @@ class PersistenceDecorator:
_printer = Printer() # Class-level printer instance _printer = Printer() # Class-level printer instance
@classmethod @classmethod
def persist_state(cls, flow_instance: Any, method_name: str, persistence_instance: FlowPersistence) -> None: def persist_state(cls, flow_instance: Any, method_name: str, persistence_instance: FlowPersistence, verbose: bool = False) -> None:
"""Persist flow state with proper error handling and logging. """Persist flow state with proper error handling and logging.
This method handles the persistence of flow state data, including proper This method handles the persistence of flow state data, including proper
@@ -68,6 +68,7 @@ class PersistenceDecorator:
flow_instance: The flow instance whose state to persist flow_instance: The flow instance whose state to persist
method_name: Name of the method that triggered persistence method_name: Name of the method that triggered persistence
persistence_instance: The persistence backend to use persistence_instance: The persistence backend to use
verbose: Whether to log persistence operations
Raises: Raises:
ValueError: If flow has no state or state lacks an ID ValueError: If flow has no state or state lacks an ID
@@ -88,9 +89,10 @@ class PersistenceDecorator:
if not flow_uuid: if not flow_uuid:
raise ValueError("Flow state must have an 'id' field for persistence") raise ValueError("Flow state must have an 'id' field for persistence")
# Log state saving with consistent message # Log state saving only if verbose is True
cls._printer.print(LOG_MESSAGES["save_state"].format(flow_uuid), color="cyan") if verbose:
logger.info(LOG_MESSAGES["save_state"].format(flow_uuid)) cls._printer.print(LOG_MESSAGES["save_state"].format(flow_uuid), color="cyan")
logger.info(LOG_MESSAGES["save_state"].format(flow_uuid))
try: try:
persistence_instance.save_state( persistence_instance.save_state(
@@ -115,7 +117,7 @@ class PersistenceDecorator:
raise ValueError(error_msg) from e raise ValueError(error_msg) from e
def persist(persistence: Optional[FlowPersistence] = None): def persist(persistence: Optional[FlowPersistence] = None, verbose: bool = False):
"""Decorator to persist flow state. """Decorator to persist flow state.
This decorator can be applied at either the class level or method level. This decorator can be applied at either the class level or method level.
@@ -126,6 +128,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
Args: Args:
persistence: Optional FlowPersistence implementation to use. persistence: Optional FlowPersistence implementation to use.
If not provided, uses SQLiteFlowPersistence. If not provided, uses SQLiteFlowPersistence.
verbose: Whether to log persistence operations. Defaults to False.
Returns: Returns:
A decorator that can be applied to either a class or method A decorator that can be applied to either a class or method
@@ -135,13 +138,12 @@ def persist(persistence: Optional[FlowPersistence] = None):
RuntimeError: If state persistence fails RuntimeError: If state persistence fails
Example: Example:
@persist # Class-level persistence with default SQLite @persist(verbose=True) # Class-level persistence with logging
class MyFlow(Flow[MyState]): class MyFlow(Flow[MyState]):
@start() @start()
def begin(self): def begin(self):
pass pass
""" """
def decorator(target: Union[Type, Callable[..., T]]) -> Union[Type, Callable[..., T]]: def decorator(target: Union[Type, Callable[..., T]]) -> Union[Type, Callable[..., T]]:
"""Decorator that handles both class and method decoration.""" """Decorator that handles both class and method decoration."""
actual_persistence = persistence or SQLiteFlowPersistence() actual_persistence = persistence or SQLiteFlowPersistence()
@@ -179,7 +181,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
@functools.wraps(original_method) @functools.wraps(original_method)
async def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: async def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = await original_method(self, *args, **kwargs) result = await original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(self, method_name, actual_persistence) PersistenceDecorator.persist_state(self, method_name, actual_persistence, verbose)
return result return result
return method_wrapper return method_wrapper
@@ -199,7 +201,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
@functools.wraps(original_method) @functools.wraps(original_method)
def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = original_method(self, *args, **kwargs) result = original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(self, method_name, actual_persistence) PersistenceDecorator.persist_state(self, method_name, actual_persistence, verbose)
return result return result
return method_wrapper return method_wrapper
@@ -228,7 +230,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
result = await method_coro result = await method_coro
else: else:
result = method_coro result = method_coro
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence) PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence, verbose)
return result return result
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]: for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:
@@ -240,7 +242,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
@functools.wraps(method) @functools.wraps(method)
def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T: def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
result = method(flow_instance, *args, **kwargs) result = method(flow_instance, *args, **kwargs)
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence) PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence, verbose)
return result return result
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]: for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:

View File

@@ -17,7 +17,7 @@ class TestState(FlowState):
message: str = "" message: str = ""
def test_persist_decorator_saves_state(tmp_path): def test_persist_decorator_saves_state(tmp_path, caplog):
"""Test that @persist decorator saves state in SQLite.""" """Test that @persist decorator saves state in SQLite."""
db_path = os.path.join(tmp_path, "test_flows.db") db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path) persistence = SQLiteFlowPersistence(db_path)
@@ -174,3 +174,39 @@ def test_multiple_method_persistence(tmp_path):
final_state = flow2.state final_state = flow2.state
assert final_state.counter == 99999 assert final_state.counter == 99999
assert final_state.message == "Step 99999" assert final_state.message == "Step 99999"
def test_persist_decorator_verbose_logging(tmp_path, caplog):
"""Test that @persist decorator's verbose parameter controls logging."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
# Test with verbose=False (default)
class QuietFlow(Flow[Dict[str, str]]):
initial_state = dict()
@start()
@persist(persistence) # Default verbose=False
def init_step(self):
self.state["message"] = "Hello, World!"
self.state["id"] = "test-uuid-1"
flow = QuietFlow(persistence=persistence)
flow.kickoff()
assert "Saving flow state to memory for ID: test-uuid-1" not in caplog.text
# Clear the log
caplog.clear()
# Test with verbose=True
class VerboseFlow(Flow[Dict[str, str]]):
initial_state = dict()
@start()
@persist(persistence, verbose=True)
def init_step(self):
self.state["message"] = "Hello, World!"
self.state["id"] = "test-uuid-2"
flow = VerboseFlow(persistence=persistence)
flow.kickoff()
assert "Saving flow state to memory for ID: test-uuid-2" in caplog.text