fix: Replace deprecated typing imports with built-in types

- Replace Dict, List, Set, Tuple with dict, list, set, tuple throughout codebase
- Add missing type annotations to crew_events.py methods
- Add proper type annotations to test_crew_cancellation.py
- Use type: ignore[method-assign] comments for mock assignments
- Maintain backward compatibility while modernizing type hints

This resolves lint and type-checker failures in CI while preserving
the cancellation functionality.

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-09-04 02:07:02 +00:00
parent 3a54cc859a
commit 3619d4dc50
108 changed files with 662 additions and 672 deletions

View File

@@ -5,11 +5,8 @@ import logging
from typing import (
Any,
Callable,
Dict,
Generic,
List,
Optional,
Set,
Type,
TypeVar,
Union,
@@ -57,10 +54,10 @@ class FlowState(BaseModel):
# Type variables with explicit bounds
T = TypeVar(
"T", bound=Union[Dict[str, Any], BaseModel]
"T", bound=Union[dict[str, Any], BaseModel]
) # Generic flow state type parameter
StateT = TypeVar(
"StateT", bound=Union[Dict[str, Any], BaseModel]
"StateT", bound=Union[dict[str, Any], BaseModel]
) # State validation type parameter
@@ -436,14 +433,14 @@ class FlowMeta(type):
class Flow(Generic[T], metaclass=FlowMeta):
"""Base class for all flows.
Type parameter T must be either Dict[str, Any] or a subclass of BaseModel."""
Type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
_printer = Printer()
_start_methods: List[str] = []
_listeners: Dict[str, tuple[str, List[str]]] = {}
_routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {}
_start_methods: list[str] = []
_listeners: dict[str, tuple[str, list[str]]] = {}
_routers: set[str] = set()
_router_paths: dict[str, list[str]] = {}
initial_state: Union[Type[T], T, None] = None
name: Optional[str] = None
tracing: Optional[bool] = False
@@ -468,11 +465,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
**kwargs: Additional state values to initialize or override
"""
# Initialize basic instance attributes
self._methods: Dict[str, Callable] = {}
self._method_execution_counts: Dict[str, int] = {}
self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs
self._completed_methods: Set[str] = set() # Track completed methods for reload
self._methods: dict[str, Callable] = {}
self._method_execution_counts: dict[str, int] = {}
self._pending_and_listeners: dict[str, set[str]] = {}
self._method_outputs: list[Any] = [] # List to store all method outputs
self._completed_methods: set[str] = set() # Track completed methods for reload
self._persistence: Optional[FlowPersistence] = persistence
self._is_execution_resuming: bool = False
@@ -600,7 +597,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return self._state
@property
def method_outputs(self) -> List[Any]:
def method_outputs(self) -> list[Any]:
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
@@ -637,7 +634,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
except (AttributeError, TypeError):
return "" # Safely handle any unexpected attribute access issues
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
def _initialize_state(self, inputs: dict[str, Any]) -> None:
"""Initialize or update flow state with new inputs.
Args:
@@ -691,7 +688,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")
def _restore_state(self, stored_state: Dict[str, Any]) -> None:
def _restore_state(self, stored_state: dict[str, Any]) -> None:
"""Restore flow state from persistence.
Args:
@@ -783,7 +780,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif hasattr(self._state, field_name):
object.__setattr__(self._state, field_name, value)
def _apply_state_updates(self, updates: Dict[str, Any]) -> None:
def _apply_state_updates(self, updates: dict[str, Any]) -> None:
"""Apply multiple state updates efficiently."""
if isinstance(self._state, dict):
self._state.update(updates)
@@ -792,7 +789,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if hasattr(self._state, key):
object.__setattr__(self._state, key, value)
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
def kickoff(self, inputs: Optional[dict[str, Any]] = None) -> Any:
"""
Start the flow execution in a synchronous context.
@@ -805,7 +802,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return asyncio.run(run_flow())
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
async def kickoff_async(self, inputs: Optional[dict[str, Any]] = None) -> Any:
"""
Start the flow execution asynchronously.
@@ -1109,7 +1106,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
def _find_triggered_methods(
self, trigger_method: str, router_only: bool
) -> List[str]:
) -> list[str]:
"""
Finds all methods that should be triggered based on conditions.
@@ -1126,7 +1123,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns
-------
List[str]
list[str]
Names of methods that should be triggered.
Notes

View File

@@ -7,7 +7,7 @@ traversal attacks and ensure paths remain within allowed boundaries.
import os
from pathlib import Path
from typing import List, Union
from typing import Union
def safe_path_join(*parts: str, root: Union[str, Path, None] = None) -> str:
@@ -101,7 +101,7 @@ def validate_path_exists(path: Union[str, Path], file_type: str = "file") -> str
raise ValueError(f"Invalid path: {str(e)}")
def list_files(directory: Union[str, Path], pattern: str = "*") -> List[str]:
def list_files(directory: Union[str, Path], pattern: str = "*") -> list[str]:
"""
Safely list files in a directory matching a pattern.
@@ -114,7 +114,7 @@ def list_files(directory: Union[str, Path], pattern: str = "*") -> List[str]:
Returns
-------
List[str]
list[str]
List of matching file paths.
Raises

View File

@@ -4,7 +4,7 @@ CrewAI Flow Persistence.
This module provides interfaces and implementations for persisting flow states.
"""
from typing import Any, Dict, TypeVar, Union
from typing import Any, TypeVar, Union
from pydantic import BaseModel
@@ -14,5 +14,5 @@ from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
__all__ = ["FlowPersistence", "persist", "SQLiteFlowPersistence"]
StateType = TypeVar('StateType', bound=Union[Dict[str, Any], BaseModel])
DictStateType = Dict[str, Any]
StateType = TypeVar('StateType', bound=Union[dict[str, Any], BaseModel])
DictStateType = dict[str, Any]

View File

@@ -1,7 +1,7 @@
"""Base class for flow state persistence."""
import abc
from typing import Any, Dict, Optional, Union
from typing import Any, Optional, Union
from pydantic import BaseModel
@@ -29,7 +29,7 @@ class FlowPersistence(abc.ABC):
self,
flow_uuid: str,
method_name: str,
state_data: Union[Dict[str, Any], BaseModel]
state_data: Union[dict[str, Any], BaseModel]
) -> None:
"""Persist the flow state after method completion.
@@ -41,7 +41,7 @@ class FlowPersistence(abc.ABC):
pass
@abc.abstractmethod
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
def load_state(self, flow_uuid: str) -> Optional[dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
Args:

View File

@@ -6,7 +6,7 @@ import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Optional, Union
from pydantic import BaseModel
@@ -70,7 +70,7 @@ class SQLiteFlowPersistence(FlowPersistence):
self,
flow_uuid: str,
method_name: str,
state_data: Union[Dict[str, Any], BaseModel],
state_data: Union[dict[str, Any], BaseModel],
) -> None:
"""Save the current flow state to SQLite.
@@ -107,7 +107,7 @@ class SQLiteFlowPersistence(FlowPersistence):
),
)
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
def load_state(self, flow_uuid: str) -> Optional[dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
Args:

View File

@@ -17,10 +17,10 @@ import ast
import inspect
import textwrap
from collections import defaultdict, deque
from typing import Any, Deque, Dict, List, Optional, Set, Union
from typing import Any, Deque, Optional, Union
def get_possible_return_constants(function: Any) -> Optional[List[str]]:
def get_possible_return_constants(function: Any) -> Optional[list[str]]:
try:
source = inspect.getsource(function)
except OSError:
@@ -94,7 +94,7 @@ def get_possible_return_constants(function: Any) -> Optional[List[str]]:
return list(return_values) if return_values else None
def calculate_node_levels(flow: Any) -> Dict[str, int]:
def calculate_node_levels(flow: Any) -> dict[str, int]:
"""
Calculate the hierarchical level of each node in the flow.
@@ -108,7 +108,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
Returns
-------
Dict[str, int]
dict[str, int]
Dictionary mapping method names to their hierarchical levels.
Notes
@@ -118,10 +118,10 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
- Handles both OR and AND conditions for listeners
- Processes router paths separately
"""
levels: Dict[str, int] = {}
levels: dict[str, int] = {}
queue: Deque[str] = deque()
visited: Set[str] = set()
pending_and_listeners: Dict[str, Set[str]] = {}
visited: set[str] = set()
pending_and_listeners: dict[str, set[str]] = {}
# Make all start methods at level 0
for method_name, method in flow._methods.items():
@@ -172,7 +172,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
return levels
def count_outgoing_edges(flow: Any) -> Dict[str, int]:
def count_outgoing_edges(flow: Any) -> dict[str, int]:
"""
Count the number of outgoing edges for each method in the flow.
@@ -183,7 +183,7 @@ def count_outgoing_edges(flow: Any) -> Dict[str, int]:
Returns
-------
Dict[str, int]
dict[str, int]
Dictionary mapping method names to their outgoing edge count.
"""
counts = {}
@@ -197,7 +197,7 @@ def count_outgoing_edges(flow: Any) -> Dict[str, int]:
return counts
def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
def build_ancestor_dict(flow: Any) -> dict[str, set[str]]:
"""
Build a dictionary mapping each node to its ancestor nodes.
@@ -208,11 +208,11 @@ def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
Returns
-------
Dict[str, Set[str]]
dict[str, set[str]]
Dictionary mapping each node to a set of its ancestor nodes.
"""
ancestors: Dict[str, Set[str]] = {node: set() for node in flow._methods}
visited: Set[str] = set()
ancestors: dict[str, set[str]] = {node: set() for node in flow._methods}
visited: set[str] = set()
for node in flow._methods:
if node not in visited:
dfs_ancestors(node, ancestors, visited, flow)
@@ -220,7 +220,7 @@ def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
def dfs_ancestors(
node: str, ancestors: Dict[str, Set[str]], visited: Set[str], flow: Any
node: str, ancestors: dict[str, set[str]], visited: set[str], flow: Any
) -> None:
"""
Perform depth-first search to build ancestor relationships.
@@ -229,9 +229,9 @@ def dfs_ancestors(
----------
node : str
Current node being processed.
ancestors : Dict[str, Set[str]]
ancestors : dict[str, set[str]]
Dictionary tracking ancestor relationships.
visited : Set[str]
visited : set[str]
Set of already visited nodes.
flow : Any
The flow instance being analyzed.
@@ -265,7 +265,7 @@ def dfs_ancestors(
def is_ancestor(
node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]
node: str, ancestor_candidate: str, ancestors: dict[str, set[str]]
) -> bool:
"""
Check if one node is an ancestor of another.
@@ -287,7 +287,7 @@ def is_ancestor(
return ancestor_candidate in ancestors.get(node, set())
def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
def build_parent_children_dict(flow: Any) -> dict[str, list[str]]:
"""
Build a dictionary mapping parent nodes to their children.
@@ -298,7 +298,7 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
Returns
-------
Dict[str, List[str]]
dict[str, list[str]]
Dictionary mapping parent method names to lists of their child method names.
Notes
@@ -307,7 +307,7 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
- Maps router methods to their paths and listeners
- Children lists are sorted for consistent ordering
"""
parent_children: Dict[str, List[str]] = {}
parent_children: dict[str, list[str]] = {}
# Map listeners to their trigger methods
for listener_name, (_, trigger_methods) in flow._listeners.items():
@@ -332,7 +332,7 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
def get_child_index(
parent: str, child: str, parent_children: Dict[str, List[str]]
parent: str, child: str, parent_children: dict[str, list[str]]
) -> int:
"""
Get the index of a child node in its parent's sorted children list.
@@ -343,7 +343,7 @@ def get_child_index(
The parent node name.
child : str
The child node name to find the index for.
parent_children : Dict[str, List[str]]
parent_children : dict[str, list[str]]
Dictionary mapping parents to their children lists.
Returns

View File

@@ -17,7 +17,7 @@ Example
import ast
import inspect
from typing import Any, Dict, List, Tuple, Union
from typing import Any, Union
from .utils import (
build_ancestor_dict,
@@ -73,8 +73,8 @@ def method_calls_crew(method: Any) -> bool:
def add_nodes_to_network(
net: Any,
flow: Any,
node_positions: Dict[str, Tuple[float, float]],
node_styles: Dict[str, Dict[str, Any]]
node_positions: dict[str, tuple[float, float]],
node_styles: dict[str, dict[str, Any]]
) -> None:
"""
Add nodes to the network visualization with appropriate styling.
@@ -85,9 +85,9 @@ def add_nodes_to_network(
The pyvis Network instance to add nodes to.
flow : Any
The flow instance containing method information.
node_positions : Dict[str, Tuple[float, float]]
node_positions : dict[str, tuple[float, float]]
Dictionary mapping node names to their (x, y) positions.
node_styles : Dict[str, Dict[str, Any]]
node_styles : dict[str, dict[str, Any]]
Dictionary containing style configurations for different node types.
Notes
@@ -138,10 +138,10 @@ def add_nodes_to_network(
def compute_positions(
flow: Any,
node_levels: Dict[str, int],
node_levels: dict[str, int],
y_spacing: float = 150,
x_spacing: float = 300
) -> Dict[str, Tuple[float, float]]:
) -> dict[str, tuple[float, float]]:
"""
Compute the (x, y) positions for each node in the flow graph.
@@ -149,7 +149,7 @@ def compute_positions(
----------
flow : Any
The flow instance to compute positions for.
node_levels : Dict[str, int]
node_levels : dict[str, int]
Dictionary mapping node names to their hierarchical levels.
y_spacing : float, optional
Vertical spacing between levels, by default 150.
@@ -158,11 +158,11 @@ def compute_positions(
Returns
-------
Dict[str, Tuple[float, float]]
dict[str, tuple[float, float]]
Dictionary mapping node names to their (x, y) coordinates.
"""
level_nodes: Dict[int, List[str]] = {}
node_positions: Dict[str, Tuple[float, float]] = {}
level_nodes: dict[int, list[str]] = {}
node_positions: dict[str, tuple[float, float]] = {}
for method_name, level in node_levels.items():
level_nodes.setdefault(level, []).append(method_name)
@@ -180,10 +180,10 @@ def compute_positions(
def add_edges(
net: Any,
flow: Any,
node_positions: Dict[str, Tuple[float, float]],
colors: Dict[str, str]
node_positions: dict[str, tuple[float, float]],
colors: dict[str, str]
) -> None:
edge_smooth: Dict[str, Union[str, float]] = {"type": "continuous"} # Default value
edge_smooth: dict[str, Union[str, float]] = {"type": "continuous"} # Default value
"""
Add edges to the network visualization with appropriate styling.
@@ -193,9 +193,9 @@ def add_edges(
The pyvis Network instance to add edges to.
flow : Any
The flow instance containing edge information.
node_positions : Dict[str, Tuple[float, float]]
node_positions : dict[str, tuple[float, float]]
Dictionary mapping node names to their positions.
colors : Dict[str, str]
colors : dict[str, str]
Dictionary mapping edge types to their colors.
Notes