mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
Docstring, Error Handling, and Type Hints Improvements (#1828)
* docs: add comprehensive docstrings to Flow class and methods - Added NumPy-style docstrings to all decorator functions - Added detailed documentation to Flow class methods - Included parameter types, return types, and examples - Enhanced documentation clarity and completeness Co-Authored-By: Joe Moura <joao@crewai.com> * feat: add secure path handling utilities - Add path_utils.py with safe path handling functions - Implement path validation and security checks - Integrate secure path handling in flow_visualizer.py - Add path validation in html_template_handler.py - Add comprehensive error handling for path operations Co-Authored-By: Joe Moura <joao@crewai.com> * docs: add comprehensive docstrings and type hints to flow utils (#1819) Co-Authored-By: Joe Moura <joao@crewai.com> * fix: add type annotations and fix import sorting Co-Authored-By: Joe Moura <joao@crewai.com> * fix: add type annotations to flow utils and visualization utils Co-Authored-By: Joe Moura <joao@crewai.com> * fix: resolve import sorting and type annotation issues Co-Authored-By: Joe Moura <joao@crewai.com> * fix: properly initialize and update edge_smooth variable Co-Authored-By: Joe Moura <joao@crewai.com> --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Joe Moura <joao@crewai.com>
This commit is contained in:
committed by
GitHub
parent
ba0965ef87
commit
45b802a625
@@ -30,7 +30,47 @@ from crewai.telemetry import Telemetry
|
||||
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
|
||||
|
||||
|
||||
def start(condition=None):
|
||||
def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
|
||||
"""
|
||||
Marks a method as a flow's starting point.
|
||||
|
||||
This decorator designates a method as an entry point for the flow execution.
|
||||
It can optionally specify conditions that trigger the start based on other
|
||||
method executions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition : Optional[Union[str, dict, Callable]], optional
|
||||
Defines when the start method should execute. Can be:
|
||||
- str: Name of a method that triggers this start
|
||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
||||
- Callable: A method reference that triggers this start
|
||||
Default is None, meaning unconditional start.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
A decorator function that marks the method as a flow start point.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the condition format is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @start() # Unconditional start
|
||||
>>> def begin_flow(self):
|
||||
... pass
|
||||
|
||||
>>> @start("method_name") # Start after specific method
|
||||
>>> def conditional_start(self):
|
||||
... pass
|
||||
|
||||
>>> @start(and_("method1", "method2")) # Start after multiple methods
|
||||
>>> def complex_start(self):
|
||||
... pass
|
||||
"""
|
||||
def decorator(func):
|
||||
func.__is_start_method__ = True
|
||||
if condition is not None:
|
||||
@@ -56,7 +96,42 @@ def start(condition=None):
|
||||
return decorator
|
||||
|
||||
|
||||
def listen(condition):
|
||||
def listen(condition: Union[str, dict, Callable]) -> Callable:
|
||||
"""
|
||||
Creates a listener that executes when specified conditions are met.
|
||||
|
||||
This decorator sets up a method to execute in response to other method
|
||||
executions in the flow. It supports both simple and complex triggering
|
||||
conditions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition : Union[str, dict, Callable]
|
||||
Specifies when the listener should execute. Can be:
|
||||
- str: Name of a method that triggers this listener
|
||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
||||
- Callable: A method reference that triggers this listener
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
A decorator function that sets up the method as a listener.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the condition format is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @listen("process_data") # Listen to single method
|
||||
>>> def handle_processed_data(self):
|
||||
... pass
|
||||
|
||||
>>> @listen(or_("success", "failure")) # Listen to multiple methods
|
||||
>>> def handle_completion(self):
|
||||
... pass
|
||||
"""
|
||||
def decorator(func):
|
||||
if isinstance(condition, str):
|
||||
func.__trigger_methods__ = [condition]
|
||||
@@ -80,7 +155,47 @@ def listen(condition):
|
||||
return decorator
|
||||
|
||||
|
||||
def router(condition):
|
||||
def router(condition: Union[str, dict, Callable]) -> Callable:
|
||||
"""
|
||||
Creates a routing method that directs flow execution based on conditions.
|
||||
|
||||
This decorator marks a method as a router, which can dynamically determine
|
||||
the next steps in the flow based on its return value. Routers are triggered
|
||||
by specified conditions and can return constants that determine which path
|
||||
the flow should take.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition : Union[str, dict, Callable]
|
||||
Specifies when the router should execute. Can be:
|
||||
- str: Name of a method that triggers this router
|
||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
||||
- Callable: A method reference that triggers this router
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
A decorator function that sets up the method as a router.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the condition format is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @router("check_status")
|
||||
>>> def route_based_on_status(self):
|
||||
... if self.state.status == "success":
|
||||
... return SUCCESS
|
||||
... return FAILURE
|
||||
|
||||
>>> @router(and_("validate", "process"))
|
||||
>>> def complex_routing(self):
|
||||
... if all([self.state.valid, self.state.processed]):
|
||||
... return CONTINUE
|
||||
... return STOP
|
||||
"""
|
||||
def decorator(func):
|
||||
func.__is_router__ = True
|
||||
# Handle conditions like listen/start
|
||||
@@ -106,7 +221,39 @@ def router(condition):
|
||||
return decorator
|
||||
|
||||
|
||||
def or_(*conditions):
|
||||
def or_(*conditions: Union[str, dict, Callable]) -> dict:
|
||||
"""
|
||||
Combines multiple conditions with OR logic for flow control.
|
||||
|
||||
Creates a condition that is satisfied when any of the specified conditions
|
||||
are met. This is used with @start, @listen, or @router decorators to create
|
||||
complex triggering conditions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*conditions : Union[str, dict, Callable]
|
||||
Variable number of conditions that can be:
|
||||
- str: Method names
|
||||
- dict: Existing condition dictionaries
|
||||
- Callable: Method references
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
A condition dictionary with format:
|
||||
{"type": "OR", "methods": list_of_method_names}
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If any condition is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @listen(or_("success", "timeout"))
|
||||
>>> def handle_completion(self):
|
||||
... pass
|
||||
"""
|
||||
methods = []
|
||||
for condition in conditions:
|
||||
if isinstance(condition, dict) and "methods" in condition:
|
||||
@@ -120,7 +267,39 @@ def or_(*conditions):
|
||||
return {"type": "OR", "methods": methods}
|
||||
|
||||
|
||||
def and_(*conditions):
|
||||
def and_(*conditions: Union[str, dict, Callable]) -> dict:
|
||||
"""
|
||||
Combines multiple conditions with AND logic for flow control.
|
||||
|
||||
Creates a condition that is satisfied only when all specified conditions
|
||||
are met. This is used with @start, @listen, or @router decorators to create
|
||||
complex triggering conditions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*conditions : Union[str, dict, Callable]
|
||||
Variable number of conditions that can be:
|
||||
- str: Method names
|
||||
- dict: Existing condition dictionaries
|
||||
- Callable: Method references
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
A condition dictionary with format:
|
||||
{"type": "AND", "methods": list_of_method_names}
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If any condition is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @listen(and_("validated", "processed"))
|
||||
>>> def handle_complete_data(self):
|
||||
... pass
|
||||
"""
|
||||
methods = []
|
||||
for condition in conditions:
|
||||
if isinstance(condition, dict) and "methods" in condition:
|
||||
@@ -286,6 +465,23 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return final_output
|
||||
|
||||
async def _execute_start_method(self, start_method_name: str) -> None:
|
||||
"""
|
||||
Executes a flow's start method and its triggered listeners.
|
||||
|
||||
This internal method handles the execution of methods marked with @start
|
||||
decorator and manages the subsequent chain of listener executions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
start_method_name : str
|
||||
The name of the start method to execute.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Executes the start method and captures its result
|
||||
- Triggers execution of any listeners waiting on this start method
|
||||
- Part of the flow's initialization sequence
|
||||
"""
|
||||
result = await self._execute_method(
|
||||
start_method_name, self._methods[start_method_name]
|
||||
)
|
||||
@@ -306,6 +502,28 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return result
|
||||
|
||||
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
|
||||
"""
|
||||
Executes all listeners and routers triggered by a method completion.
|
||||
|
||||
This internal method manages the execution flow by:
|
||||
1. First executing all triggered routers sequentially
|
||||
2. Then executing all triggered listeners in parallel
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trigger_method : str
|
||||
The name of the method that triggered these listeners.
|
||||
result : Any
|
||||
The result from the triggering method, passed to listeners
|
||||
that accept parameters.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Routers are executed sequentially to maintain flow control
|
||||
- Each router's result becomes the new trigger_method
|
||||
- Normal listeners are executed in parallel for efficiency
|
||||
- Listeners can receive the trigger method's result as a parameter
|
||||
"""
|
||||
# First, handle routers repeatedly until no router triggers anymore
|
||||
while True:
|
||||
routers_triggered = self._find_triggered_methods(
|
||||
@@ -335,6 +553,33 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
def _find_triggered_methods(
|
||||
self, trigger_method: str, router_only: bool
|
||||
) -> List[str]:
|
||||
"""
|
||||
Finds all methods that should be triggered based on conditions.
|
||||
|
||||
This internal method evaluates both OR and AND conditions to determine
|
||||
which methods should be executed next in the flow.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trigger_method : str
|
||||
The name of the method that just completed execution.
|
||||
router_only : bool
|
||||
If True, only consider router methods.
|
||||
If False, only consider non-router methods.
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[str]
|
||||
Names of methods that should be triggered.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Handles both OR and AND conditions:
|
||||
* OR: Triggers if any condition is met
|
||||
* AND: Triggers only when all conditions are met
|
||||
- Maintains state for AND conditions using _pending_and_listeners
|
||||
- Separates router and normal listener evaluation
|
||||
"""
|
||||
triggered = []
|
||||
for listener_name, (condition_type, methods) in self._listeners.items():
|
||||
is_router = listener_name in self._routers
|
||||
@@ -363,6 +608,33 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return triggered
|
||||
|
||||
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
|
||||
"""
|
||||
Executes a single listener method with proper event handling.
|
||||
|
||||
This internal method manages the execution of an individual listener,
|
||||
including parameter inspection, event emission, and error handling.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
listener_name : str
|
||||
The name of the listener method to execute.
|
||||
result : Any
|
||||
The result from the triggering method, which may be passed
|
||||
to the listener if it accepts parameters.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Inspects method signature to determine if it accepts the trigger result
|
||||
- Emits events for method execution start and finish
|
||||
- Handles errors gracefully with detailed logging
|
||||
- Recursively triggers listeners of this listener
|
||||
- Supports both parameterized and parameter-less listeners
|
||||
|
||||
Error Handling
|
||||
-------------
|
||||
Catches and logs any exceptions during execution, preventing
|
||||
individual listener failures from breaking the entire flow.
|
||||
"""
|
||||
try:
|
||||
method = self._methods[listener_name]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user