Compare commits

..

1 Commits

Author SHA1 Message Date
Gui Vieira
413e0363d0 Document monorepo deployments 2026-06-02 18:24:11 -03:00
21 changed files with 3305 additions and 2249 deletions

View File

@@ -509,6 +509,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -1028,6 +1029,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -1514,6 +1516,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -1999,6 +2002,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -2484,6 +2488,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -2979,6 +2984,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -3474,6 +3480,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -3969,6 +3976,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -4464,6 +4472,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -4948,6 +4957,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -5432,6 +5442,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
@@ -5916,6 +5927,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/training-crews",
@@ -6402,6 +6414,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/training-crews",
@@ -6886,6 +6899,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/training-crews",
@@ -7373,6 +7387,7 @@
"en/enterprise/guides/build-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/monorepo-deployments",
"en/enterprise/guides/private-package-registry",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/training-crews",

View File

@@ -164,6 +164,12 @@ You need to push your crew to a GitHub repository. If you haven't created a crew
![Select Repository](/images/enterprise/select-repo.png)
</Frame>
<Tip>
If your Crew or Flow is inside a monorepo subfolder, expand **Advanced**
and set a working directory before deploying. See
[Monorepo Deployments](/en/enterprise/guides/monorepo-deployments).
</Tip>
</Step>
<Step title="Set Environment Variables">

View File

@@ -0,0 +1,225 @@
---
title: "Monorepo Deployments"
description: "Deploy a Crew or Flow from a subfolder in a larger repository"
icon: "folder-tree"
mode: "wide"
---
<Note>
Use a working directory when your Crew or Flow lives inside a larger
repository. CrewAI AMP validates, builds, tests, and runs the automation from
that subfolder instead of the repository root.
</Note>
## When to Use This
Monorepo deployments are useful when one repository contains multiple
automations, shared packages, or other application code:
```text
company-ai/
|-- uv.lock
|-- packages/
| `-- shared_tools/
`-- crews/
|-- support_agent/
| |-- pyproject.toml
| `-- src/
| `-- support_agent/
| |-- main.py
| `-- crew.py
`-- research_flow/
|-- pyproject.toml
`-- src/
`-- research_flow/
`-- main.py
```
To deploy `support_agent`, set the working directory to:
```text
crews/support_agent
```
AMP still pulls or uploads the whole repository, but it treats the selected
folder as the automation project root.
## What the Working Directory Controls
When a working directory is set, AMP uses that folder for:
- Project validation, including `pyproject.toml`, `src/`, and the Crew or Flow entry point
- Dependency installation with `uv`
- The running process working directory
- The `CREW_ROOT_DIR` environment variable
Leaving the field empty keeps the existing behavior and uses the repository
root.
## Supported Sources
You can set a working directory when creating a deployment from:
- A connected GitHub repository
- A Git repository configured in AMP
- A ZIP upload
<Info>
Configure working directories in the AMP web interface. The
`crewai deploy create` CLI flow does not prompt for this field.
</Info>
You can also add or change the working directory on an existing deployment from
the deployment's **Settings** page. The change takes effect on the next deploy.
<Warning>
Working directories and auto-deploy cannot be used together. If a deployment
has a working directory, auto-deploy is disabled for that deployment. Turn
auto-deploy off before setting a working directory.
</Warning>
## Configure a New Deployment
<Steps>
<Step title="Open Deploy from Code">
In CrewAI AMP, create a new deployment and choose your source: GitHub, Git
Repository, or ZIP upload.
</Step>
<Step title="Select the repository, branch, or ZIP file">
Choose the repository and branch that contain your monorepo, or upload a ZIP
file whose root contains the monorepo contents.
</Step>
<Step title="Open Advanced settings">
Expand the **Advanced** section in the deploy form.
</Step>
<Step title="Enter the working directory">
Enter the path from the repository root to the Crew or Flow project:
```text
crews/support_agent
```
Do not include a leading slash.
</Step>
<Step title="Deploy">
Add any required environment variables, then start the deployment.
</Step>
</Steps>
## Configure an Existing Deployment
<Steps>
<Step title="Open the deployment settings">
Go to your automation in AMP and open **Settings**.
</Step>
<Step title="Turn off auto-deploy if needed">
If auto-deploy is enabled, disable it first. The working directory field is
unavailable while auto-deploy is on.
</Step>
<Step title="Set the working directory">
In **Basic settings**, enter the subfolder path, such as:
```text
crews/support_agent
```
</Step>
<Step title="Redeploy">
Save the setting and redeploy the automation. The new working directory is
used on the next deploy.
</Step>
</Steps>
## Path Rules
The working directory must be a relative path inside the repository or ZIP root.
| Rule | Example |
|------|---------|
| Use a relative path | `crews/support_agent` |
| Do not start with `/` | `/crews/support_agent` is invalid |
| Do not use `.` or `..` path segments | `crews/../support_agent` is invalid |
| Use only letters, numbers, dashes, underscores, dots, and forward slashes | `crews/support agent` is invalid |
| Keep the path at 255 characters or fewer | Longer paths are rejected |
AMP trims leading and trailing whitespace, collapses repeated slashes, and
removes trailing slashes. A blank value uses the repository root.
## Lock Files and UV Workspaces
The selected folder must contain the automation's `pyproject.toml` and `src/`
directory. A `uv.lock` or `poetry.lock` file can live either in the selected
folder or at the repository root.
This supports both common monorepo layouts:
<Tabs>
<Tab title="Project lock file">
```text
company-ai/
`-- crews/
`-- support_agent/
|-- pyproject.toml
|-- uv.lock
`-- src/
`-- support_agent/
`-- main.py
```
</Tab>
<Tab title="Workspace lock file">
```text
company-ai/
|-- uv.lock
|-- packages/
| `-- shared_tools/
`-- crews/
`-- support_agent/
|-- pyproject.toml
`-- src/
`-- support_agent/
`-- main.py
```
</Tab>
</Tabs>
<Tip>
If your automation imports shared packages from elsewhere in the monorepo,
declare those packages in `pyproject.toml` using UV workspace, path, or source
configuration. AMP runs the automation from the selected folder, so shared
code should be installed as a dependency instead of relying on the repository
root being on the Python path.
</Tip>
## Troubleshooting
### Working Directory Not Found
Check that the path is relative to the repository or ZIP root. For ZIP uploads,
the ZIP contents must include the working directory path exactly as entered.
### Missing pyproject.toml
The working directory should point to the Crew or Flow project folder, not just
to a parent folder that contains several projects.
### Missing uv.lock or poetry.lock
Commit a lock file either in the selected project folder or in the repository
root. For UV workspaces, keeping `uv.lock` at the workspace root is supported.
### Auto-Deploy Is Unavailable
Auto-deploy is disabled while a working directory is set. Use manual redeploys
or trigger redeployments from CI/CD with the AMP API instead.
<Card title="Deploy to AMP" icon="rocket" href="/en/enterprise/guides/deploy-to-amp">
Continue with the deployment guide after choosing your monorepo working
directory.
</Card>

View File

@@ -6,6 +6,7 @@ from crewai.flow.async_feedback import (
)
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_config import flow_config
from crewai.flow.flow_serializer import flow_structure
from crewai.flow.human_feedback import HumanFeedbackResult, human_feedback
from crewai.flow.input_provider import InputProvider, InputResponse
from crewai.flow.persistence import persist
@@ -29,6 +30,7 @@ __all__ = [
"and_",
"build_flow_structure",
"flow_config",
"flow_structure",
"human_feedback",
"listen",
"or_",

View File

@@ -2,302 +2,35 @@
plus the ``or_`` / ``and_`` condition combinators.
These decorators wrap user methods into the typed wrappers defined in
``flow_wrappers`` and record their trigger conditions. This module also
projects Python Flow classes into the neutral Flow Definition contract.
Execution happens in ``runtime``.
``flow_wrappers`` and record their trigger conditions. The structural model
those conditions feed is built in ``flow_definition``; execution happens in
``runtime``.
"""
from __future__ import annotations
from collections.abc import Callable, Sequence
from enum import Enum
import inspect
import json
import logging
from typing import (
Any,
Literal,
ParamSpec,
TypeVar,
get_args,
get_origin,
get_type_hints,
)
from pydantic import BaseModel
from typing_extensions import TypeIs
from collections.abc import Callable
from typing import Any, ParamSpec, TypeVar
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_definition import (
FlowConfigDefinition,
FlowDefinition,
FlowDefinitionCondition,
FlowDefinitionDiagnostic,
FlowHumanFeedbackDefinition,
FlowMethodDefinition,
FlowPersistenceDefinition,
FlowStateDefinition,
_extract_all_methods,
is_flow_condition_dict,
is_flow_method_callable,
is_flow_method_name,
)
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
FlowMethod,
ListenMethod,
RouterMethod,
SimpleFlowCondition,
StartMethod,
)
from crewai.flow.types import FlowMethodCallable, FlowMethodName
P = ParamSpec("P")
R = TypeVar("R")
logger = logging.getLogger(__name__)
__all__ = ["and_", "listen", "or_", "router", "start"]
def is_flow_method_name(obj: Any) -> TypeIs[FlowMethodName]:
"""Check if the object is a valid flow method name.
Args:
obj: The object to check.
Returns:
True if the object is a valid flow method name, False otherwise.
"""
return isinstance(obj, str)
def is_flow_method_callable(obj: Any) -> TypeIs[FlowMethodCallable[..., Any]]:
"""Check if the object is a callable flow method.
Args:
obj: The object to check.
Returns:
True if the object is a callable, False otherwise.
"""
return callable(obj) and hasattr(obj, "__name__")
def is_flow_condition_list(obj: Any) -> TypeIs[FlowConditions]:
"""Check if the object is a list of FlowCondition dictionaries.
Args:
obj: The object to check.
Returns:
True if the object is a list of FlowCondition dictionaries, False otherwise.
"""
if not isinstance(obj, list):
return False
for item in obj:
if not (is_flow_method_name(item) or is_flow_condition_dict(item)):
return False
return True
def is_simple_flow_condition(obj: Any) -> TypeIs[SimpleFlowCondition]:
"""Check if the object is a simple flow condition tuple.
Args:
obj: The object to check.
Returns:
True if the object is a (condition_type, methods) tuple, False otherwise.
"""
return (
isinstance(obj, tuple)
and len(obj) == 2
and isinstance(obj[0], str)
and isinstance(obj[1], list)
)
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
"""Check if the object carries Flow method wrapper metadata.
Args:
obj: The object to check.
Returns:
True if the object is a FlowMethod wrapper or compatible method object.
"""
return (
hasattr(obj, "__is_flow_method__")
or hasattr(obj, "__is_start_method__")
or hasattr(obj, "__trigger_methods__")
or hasattr(obj, "__is_router__")
)
def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]:
"""Check if the object matches the FlowCondition structure.
Args:
obj: The object to check.
Returns:
True if the object is a valid FlowCondition dictionary, False otherwise.
"""
if not isinstance(obj, dict):
return False
type_value = obj.get("type")
if type_value not in ("AND", "OR"):
return False
if "conditions" in obj:
conditions = obj["conditions"]
if not isinstance(conditions, list):
return False
for cond in conditions:
if not (
isinstance(cond, str)
or (isinstance(cond, dict) and is_flow_condition_dict(cond))
):
return False
if "methods" in obj:
methods = obj["methods"]
if not (isinstance(methods, list) and all(isinstance(m, str) for m in methods)):
return False
allowed_keys = {"type", "conditions", "methods"}
if not set(obj).issubset(allowed_keys):
return False
return True
def _extract_all_methods_recursive(
condition: str | FlowCondition | dict[str, Any] | list[Any],
flow: Any | None = None,
) -> list[FlowMethodName]:
if is_flow_method_name(condition):
if flow is not None:
if condition in flow._methods:
return [condition]
return []
return [condition]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
methods = []
for sub_cond in normalized.get("conditions", []):
methods.extend(_extract_all_methods_recursive(sub_cond, flow))
return methods
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods_recursive(item, flow))
return methods
return []
def _normalize_condition(
condition: FlowConditions | FlowCondition | FlowMethodName,
) -> FlowCondition:
if is_flow_method_name(condition):
return {"type": OR_CONDITION, "conditions": [condition]}
if is_flow_condition_dict(condition):
if "conditions" in condition:
return condition
if "methods" in condition:
return {"type": condition["type"], "conditions": condition["methods"]}
return condition
if is_flow_condition_list(condition):
return {"type": OR_CONDITION, "conditions": condition}
raise ValueError(f"Cannot normalize condition: {condition}")
def _extract_all_methods(
condition: str | FlowCondition | dict[str, Any] | list[Any],
) -> list[FlowMethodName]:
if is_flow_method_name(condition):
return [condition]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
cond_type = normalized.get("type", OR_CONDITION)
if cond_type == AND_CONDITION:
return [
sub_cond
for sub_cond in normalized.get("conditions", [])
if is_flow_method_name(sub_cond)
]
return []
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods(item))
return methods
return []
def _unwrap_function(function: Any) -> Any:
if hasattr(function, "__func__"):
function = function.__func__
if hasattr(function, "__wrapped__"):
wrapped = function.__wrapped__
if hasattr(wrapped, "unwrap"):
return wrapped.unwrap()
return wrapped
if hasattr(function, "unwrap"):
return function.unwrap()
return function
def _string_values_from_annotation(annotation: Any) -> list[str]:
if annotation is inspect.Signature.empty or isinstance(annotation, str):
return []
if isinstance(annotation, type) and issubclass(annotation, Enum):
return [member.value for member in annotation if isinstance(member.value, str)]
origin = get_origin(annotation)
if origin is None:
return []
args = get_args(annotation)
if origin is Literal or getattr(origin, "__name__", "") == "Literal":
return [arg for arg in args if isinstance(arg, str)]
values: list[str] = []
for arg in args:
values.extend(_string_values_from_annotation(arg))
return values
def _return_annotation(function: Any) -> Any:
unwrapped = _unwrap_function(function)
try:
return get_type_hints(unwrapped, include_extras=True).get(
"return", inspect.Signature.empty
)
except (NameError, TypeError, ValueError):
try:
return inspect.signature(unwrapped).return_annotation
except (TypeError, ValueError):
return inspect.Signature.empty
def _get_router_return_paths(function: Any) -> list[str] | None:
values = _string_values_from_annotation(_return_annotation(function))
return list(dict.fromkeys(values)) if values else None
def _normalize_router_paths(paths: Sequence[Any] | str) -> list[str]:
if isinstance(paths, str):
return [str(paths)]
return list(dict.fromkeys(str(path) for path in paths))
def start(
condition: str | FlowCondition | Callable[..., Any] | None = None,
@@ -336,6 +69,14 @@ def start(
"""
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
"""Decorator that wraps a function as a start method.
Args:
func: The function to wrap as a start method.
Returns:
A StartMethod wrapper around the function.
"""
wrapper = StartMethod(func)
if condition is not None:
@@ -395,6 +136,14 @@ def listen(
"""
def decorator(func: Callable[P, R]) -> ListenMethod[P, R]:
"""Decorator that wraps a function as a listener method.
Args:
func: The function to wrap as a listener method.
Returns:
A ListenMethod wrapper around the function.
"""
wrapper = ListenMethod(func)
if is_flow_method_name(condition):
@@ -426,8 +175,6 @@ def listen(
def router(
condition: str | FlowCondition | Callable[..., Any],
*,
paths: Sequence[str] | str | None = None,
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
"""Creates a routing method that directs flow execution based on conditions.
@@ -441,9 +188,6 @@ def router(
- str: Name of a method that triggers this router
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this router
paths: Optional explicit router output labels for static FlowDefinition
and visualization. If omitted, Literal/Enum return annotations are
used when available.
Returns:
A decorator function that wraps the method as a router and preserves its signature.
@@ -463,13 +207,17 @@ def router(
... if all([self.state.valid, self.state.processed]):
... return "CONTINUE"
... return "STOP"
>>> @router("check_status", paths=["SUCCESS", "FAILURE"])
>>> def explicit_routing(self):
... return "SUCCESS"
"""
def decorator(func: Callable[P, R]) -> RouterMethod[P, R]:
"""Decorator that wraps a function as a router method.
Args:
func: The function to wrap as a router method.
Returns:
A RouterMethod wrapper around the function.
"""
wrapper = RouterMethod(func)
if is_flow_method_name(condition):
@@ -494,13 +242,6 @@ def router(
raise ValueError(
"Condition must be a method, string, or a result of or_() or and_()"
)
if paths is not None:
wrapper.__router_paths__ = _normalize_router_paths(paths)
else:
inferred_paths = _get_router_return_paths(func)
if inferred_paths:
wrapper.__router_paths__ = inferred_paths
return wrapper
return decorator
@@ -577,362 +318,3 @@ def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition
else:
raise ValueError("Invalid condition in and_()")
return {"type": AND_CONDITION, "conditions": processed_conditions}
def _object_ref(value: Any) -> str:
target = value if isinstance(value, type) else type(value)
module = getattr(target, "__module__", "")
qualname = getattr(target, "__qualname__", getattr(target, "__name__", ""))
return f"{module}:{qualname}" if module and qualname else repr(value)
def _is_json_serializable(value: Any) -> bool:
try:
json.dumps(value)
except (TypeError, ValueError):
return False
return True
def _serialize_static_value(
value: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> Any:
if value is None or _is_json_serializable(value):
return value
to_config = getattr(value, "to_config_dict", None)
if callable(to_config):
try:
config = to_config()
if _is_json_serializable(config):
return config
except Exception:
logger.debug(
"Failed to serialize %s via to_config_dict().",
path,
exc_info=True,
)
if isinstance(value, BaseModel):
try:
data = value.model_dump(mode="json")
if _is_json_serializable(data):
return data
except Exception:
logger.debug(
"Failed to serialize %s via Pydantic model_dump().",
path,
exc_info=True,
)
ref = _object_ref(value)
diagnostics.append(
FlowDefinitionDiagnostic(
code="non_serializable_value",
path=path,
message=f"value is not fully serializable; preserved import reference {ref}",
)
)
return {"ref": ref}
def _state_ref(value: Any) -> str | None:
if value is None:
return None
target = value if isinstance(value, type) else type(value)
module = getattr(target, "__module__", None)
qualname = getattr(target, "__qualname__", None)
if module and qualname:
return f"{module}:{qualname}"
return None
def _build_state_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowStateDefinition | None:
from pydantic import BaseModel as PydanticBaseModel
state_value = getattr(flow_class, "_initial_state_t", None)
initial_state = getattr(flow_class, "initial_state", None)
if initial_state is not None:
state_value = initial_state
if state_value is None:
return None
if state_value is dict or isinstance(state_value, dict):
default = None
if isinstance(state_value, dict):
default = _serialize_static_value(state_value, diagnostics, "state.default")
return FlowStateDefinition(type="dict", default=default)
if isinstance(state_value, type) and issubclass(state_value, PydanticBaseModel):
return FlowStateDefinition(type="pydantic", ref=_state_ref(state_value))
if isinstance(state_value, PydanticBaseModel):
return FlowStateDefinition(
type="pydantic",
ref=_state_ref(state_value),
default=_serialize_static_value(state_value, diagnostics, "state.default"),
)
diagnostics.append(
FlowDefinitionDiagnostic(
code="unknown_state_type",
path="state",
message=f"could not serialize state type {_object_ref(state_value)}",
)
)
return FlowStateDefinition(type="unknown", ref=_state_ref(state_value))
def _build_config_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConfigDefinition:
config_field_names = set(FlowConfigDefinition.model_fields)
field_defaults = {
name: field.default
for name, field in getattr(flow_class, "model_fields", {}).items()
if name in config_field_names
}
values: dict[str, Any] = {}
for field_name, default in field_defaults.items():
value = getattr(flow_class, field_name, default)
values[field_name] = _serialize_static_value(
value, diagnostics, f"config.{field_name}"
)
return FlowConfigDefinition(**values)
def _definition_condition_from_runtime(condition: Any) -> FlowDefinitionCondition:
if is_flow_method_name(condition):
return str(condition)
if is_flow_method_callable(condition):
return str(condition.__name__)
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
key = "and" if normalized.get("type") == AND_CONDITION else "or"
return {
key: [
_definition_condition_from_runtime(sub_condition)
for sub_condition in normalized.get("conditions", [])
]
}
if isinstance(condition, list):
return {"or": [_definition_condition_from_runtime(item) for item in condition]}
return str(condition)
def _condition_from_method_metadata(method: Any) -> FlowDefinitionCondition | None:
trigger_condition = getattr(method, "__trigger_condition__", None)
if trigger_condition is not None:
return _definition_condition_from_runtime(trigger_condition)
trigger_methods = getattr(method, "__trigger_methods__", None)
if trigger_methods is None:
return None
condition_type = getattr(method, "__condition_type__", OR_CONDITION)
method_names = [str(method_name) for method_name in trigger_methods]
if condition_type == AND_CONDITION:
return {"and": method_names}
if len(method_names) == 1:
return method_names[0]
return {"or": method_names}
def _build_human_feedback_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowHumanFeedbackDefinition | None:
config = getattr(method, "__human_feedback_config__", None)
if config is None:
return None
emit = getattr(config, "emit", None)
return FlowHumanFeedbackDefinition(
message=str(config.message),
emit=[str(value) for value in emit] if emit is not None else None,
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, f"{path}.llm"
),
default_outcome=getattr(config, "default_outcome", None),
metadata=getattr(config, "metadata", None),
provider=_serialize_static_value(
getattr(config, "provider", None), diagnostics, f"{path}.provider"
),
learn=bool(getattr(config, "learn", False)),
learn_source=str(getattr(config, "learn_source", "hitl")),
learn_strict=bool(getattr(config, "learn_strict", False)),
)
def _build_persistence_definition(
value: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowPersistenceDefinition | None:
config = getattr(value, "__flow_persistence_config__", None)
if config is None:
return None
persistence = getattr(config, "persistence", None)
verbose = bool(getattr(config, "verbose", False))
return FlowPersistenceDefinition(
enabled=True,
verbose=verbose,
persistence=_serialize_static_value(
persistence, diagnostics, f"{path}.persistence"
),
)
def _iter_flow_methods(flow_class: type) -> dict[str, Any]:
methods: dict[str, Any] = {}
for attr_name in dir(flow_class):
if attr_name.startswith("_"):
continue
try:
attr_value = getattr(flow_class, attr_name)
except AttributeError:
continue
if is_flow_method(attr_value):
methods[attr_name] = attr_value
# A wrapped method whose name collides with a base Flow model field
# (e.g. ``checkpoint``) is absorbed by Pydantic as a field; the underlying
# function is preserved as the field default. Recover those so the
# definition still reflects every method once the class is built.
for field_name, field in getattr(flow_class, "model_fields", {}).items():
if field_name in methods or field_name.startswith("_"):
continue
default = getattr(field, "default", None)
if is_flow_method(default):
methods[field_name] = default
return methods
def _build_flow_definition_from_class(
flow_class: type,
namespace: dict[str, Any] | None = None,
) -> FlowDefinition:
diagnostics: list[FlowDefinitionDiagnostic] = []
methods: dict[str, FlowMethodDefinition] = {}
flow_methods = _iter_flow_methods(flow_class)
if namespace is not None:
for attr_name, attr_value in namespace.items():
if is_flow_method(attr_value):
flow_methods[attr_name] = attr_value
for method_name, method in flow_methods.items():
is_start = bool(getattr(method, "__is_start_method__", False))
is_router = bool(getattr(method, "__is_router__", False))
condition = _condition_from_method_metadata(method)
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"methods.{method_name}.human_feedback"
)
if human_feedback and human_feedback.emit:
is_router = True
if not is_start:
start_value: bool | FlowDefinitionCondition | None = None
elif condition is not None:
start_value = condition
else:
start_value = True
method_definition = FlowMethodDefinition(
start=start_value,
listen=condition if not is_start else None,
router=is_router,
human_feedback=human_feedback,
persist=_build_persistence_definition(
method, diagnostics, f"methods.{method_name}.persist"
),
)
router_paths = getattr(method, "__router_paths__", None)
if router_paths and not (human_feedback and human_feedback.emit):
method_definition.returns = [str(path) for path in router_paths]
methods[method_name] = method_definition
description = None
docstring = flow_class.__doc__
if docstring:
description = docstring.strip()
definition = FlowDefinition(
name=getattr(flow_class, "__name__", "Flow"),
description=description,
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
persist=_build_persistence_definition(flow_class, diagnostics, "persist"),
methods=methods,
diagnostics=diagnostics,
)
definition.diagnostics.extend(definition.validate_contract())
definition.log_diagnostics()
return definition
def build_flow_definition(
flow_class: type,
namespace: dict[str, Any] | None = None,
) -> FlowDefinition:
"""Build a FlowDefinition from a Python Flow class."""
return _build_flow_definition_from_class(flow_class, namespace)
def extract_flow_definition(
namespace: dict[str, Any],
) -> tuple[list[str], dict[str, Any], set[str], dict[str, Any]]:
"""Extract the structural flow registries from a Python class namespace."""
start_methods = []
listeners = {}
router_paths = {}
routers = set()
for attr_name, attr_value in namespace.items():
if is_flow_method(attr_value):
if hasattr(attr_value, "__is_start_method__"):
start_methods.append(attr_name)
if (
hasattr(attr_value, "__trigger_methods__")
and attr_value.__trigger_methods__ is not None
):
methods = attr_value.__trigger_methods__
condition_type = getattr(attr_value, "__condition_type__", OR_CONDITION)
if (
hasattr(attr_value, "__trigger_condition__")
and attr_value.__trigger_condition__ is not None
):
listeners[attr_name] = attr_value.__trigger_condition__
else:
listeners[attr_name] = (condition_type, methods)
if hasattr(attr_value, "__is_router__") and attr_value.__is_router__:
routers.add(attr_name)
if (
hasattr(attr_value, "__router_paths__")
and attr_value.__router_paths__
):
router_paths[attr_name] = attr_value.__router_paths__
else:
router_paths[attr_name] = []
if (
hasattr(attr_value, "__is_start_method__")
and hasattr(attr_value, "__is_router__")
and attr_value.__is_router__
):
routers.add(attr_name)
if (
hasattr(attr_value, "__router_paths__")
and attr_value.__router_paths__
):
router_paths[attr_name] = attr_value.__router_paths__
else:
router_paths[attr_name] = []
return start_methods, listeners, routers, router_paths

View File

@@ -3,8 +3,8 @@
The implementation now lives in three modules, split by concern:
- ``crewai.flow.dsl`` -- authoring decorators (``@start`` / ``@listen`` /
``@router``, ``or_`` / ``and_``) and Python Flow class projection
- ``crewai.flow.flow_definition`` -- the serializable Flow Definition contract
``@router``, ``or_`` / ``and_``)
- ``crewai.flow.flow_definition`` -- the structural model extracted from the DSL
- ``crewai.flow.runtime`` -- the Flow execution engine and state
Prefer importing from those modules in new code; this module preserves the

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,592 @@
"""Flow structure serializer for introspecting Flow classes.
This module provides the flow_structure() function that analyzes a Flow class
and returns a JSON-serializable dictionary describing its graph structure.
This is used by Studio UI to render a visual flow graph.
Example:
>>> from crewai.flow import Flow, start, listen
>>> from crewai.flow.flow_serializer import flow_structure
>>>
>>> class MyFlow(Flow):
... @start()
... def begin(self):
... return "started"
...
... @listen(begin)
... def process(self):
... return "done"
>>>
>>> structure = flow_structure(MyFlow)
>>> print(structure["name"])
'MyFlow'
"""
from __future__ import annotations
import inspect
import logging
import re
import textwrap
from typing import Any, TypedDict, get_args, get_origin
from pydantic import BaseModel
from pydantic_core import PydanticUndefined
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowMethod,
ListenMethod,
RouterMethod,
StartMethod,
)
logger = logging.getLogger(__name__)
class MethodInfo(TypedDict, total=False):
"""Information about a single flow method.
Attributes:
name: The method name.
type: Method type - start, listen, router, or start_router.
trigger_methods: List of method names that trigger this method.
condition_type: 'AND' or 'OR' for composite conditions, null otherwise.
router_paths: For routers, the possible route names returned.
has_human_feedback: Whether the method has @human_feedback decorator.
has_crew: Whether the method body references a Crew.
"""
name: str
type: str
trigger_methods: list[str]
condition_type: str | None
router_paths: list[str]
has_human_feedback: bool
has_crew: bool
class EdgeInfo(TypedDict, total=False):
"""Information about an edge between flow methods.
Attributes:
from_method: Source method name.
to_method: Target method name.
edge_type: Type of edge - 'listen' or 'route'.
condition: Route name for router edges, null for listen edges.
"""
from_method: str
to_method: str
edge_type: str
condition: str | None
class StateFieldInfo(TypedDict, total=False):
"""Information about a state field.
Attributes:
name: Field name.
type: Field type as string.
default: Default value if any.
"""
name: str
type: str
default: Any
class StateSchemaInfo(TypedDict, total=False):
"""Information about the flow's state schema.
Attributes:
fields: List of field information.
"""
fields: list[StateFieldInfo]
class FlowStructureInfo(TypedDict, total=False):
"""Complete flow structure information.
Attributes:
name: Flow class name.
description: Flow docstring if available.
methods: List of method information.
edges: List of edge information.
state_schema: State schema if typed, null otherwise.
inputs: Detected flow inputs if available.
"""
name: str
description: str | None
methods: list[MethodInfo]
edges: list[EdgeInfo]
state_schema: StateSchemaInfo | None
inputs: list[str]
def _get_method_type(
method_name: str,
method: Any,
start_methods: list[str],
routers: set[str],
) -> str:
"""Determine the type of a flow method.
Args:
method_name: Name of the method.
method: The method object.
start_methods: List of start method names.
routers: Set of router method names.
Returns:
One of: 'start', 'listen', 'router', or 'start_router'.
"""
is_start = method_name in start_methods or getattr(
method, "__is_start_method__", False
)
is_router = method_name in routers or getattr(method, "__is_router__", False)
if is_start and is_router:
return "start_router"
if is_start:
return "start"
if is_router:
return "router"
return "listen"
def _has_human_feedback(method: Any) -> bool:
"""Check if a method has the @human_feedback decorator.
Args:
method: The method object to check.
Returns:
True if the method has __human_feedback_config__ attribute.
"""
return hasattr(method, "__human_feedback_config__")
def _detect_crew_reference(method: Any) -> bool:
"""Detect if a method body references a Crew.
Checks for patterns like:
- .crew() method calls
- Crew( instantiation
- References to Crew class in type hints
Note:
This is a **best-effort heuristic for UI hints**, not a guarantee.
Uses inspect.getsource + regex which can false-positive on comments
or string literals, and may fail on dynamically generated methods
or lambdas. Do not rely on this for correctness-critical logic.
Args:
method: The method object to inspect.
Returns:
True if crew reference detected, False otherwise.
"""
try:
func = method
if hasattr(method, "_meth"):
func = method._meth
elif hasattr(method, "__wrapped__"):
func = method.__wrapped__
source = inspect.getsource(func)
source = textwrap.dedent(source)
crew_patterns = [
r"\.crew\(\)", # .crew() method call
r"Crew\s*\(", # Crew( instantiation
r":\s*Crew\b", # Type hint with Crew
r"->.*Crew", # Return type hint with Crew
]
for pattern in crew_patterns:
if re.search(pattern, source):
return True
return False
except (OSError, TypeError):
return False
def _extract_trigger_methods(method: Any) -> tuple[list[str], str | None]:
"""Extract trigger methods and condition type from a method.
Args:
method: The method object to inspect.
Returns:
Tuple of (trigger_methods list, condition_type or None).
"""
trigger_methods: list[str] = []
condition_type: str | None = None
if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
trigger_methods = [str(m) for m in method.__trigger_methods__]
# For complex conditions (or_/and_ combinators), extract from __trigger_condition__
if (
not trigger_methods
and hasattr(method, "__trigger_condition__")
and method.__trigger_condition__
):
trigger_condition = method.__trigger_condition__
trigger_methods = _extract_all_methods_from_condition(trigger_condition)
if hasattr(method, "__condition_type__") and method.__condition_type__:
condition_type = str(method.__condition_type__)
return trigger_methods, condition_type
def _extract_router_paths(
method: Any, router_paths_registry: dict[str, list[str]]
) -> list[str]:
"""Extract router paths for a router method.
Args:
method: The method object.
router_paths_registry: The class-level _router_paths dict.
Returns:
List of possible route names.
"""
method_name = getattr(method, "__name__", "")
if hasattr(method, "__router_paths__") and method.__router_paths__:
return [str(p) for p in method.__router_paths__]
if method_name in router_paths_registry:
return [str(p) for p in router_paths_registry[method_name]]
return []
def _extract_all_methods_from_condition(
condition: str | FlowCondition | dict[str, Any] | list[Any],
) -> list[str]:
"""Extract all method names from a condition tree recursively.
Args:
condition: Can be a string, FlowCondition tuple, dict, or list.
Returns:
List of all method names found in the condition.
"""
if isinstance(condition, str):
return [condition]
if isinstance(condition, tuple) and len(condition) == 2:
# FlowCondition: (condition_type, methods_list)
_, methods = condition
if isinstance(methods, list):
result: list[str] = []
for m in methods:
result.extend(_extract_all_methods_from_condition(m))
return result
return []
if isinstance(condition, dict):
conditions_list = condition.get("conditions", [])
dict_methods: list[str] = []
for sub_cond in conditions_list:
dict_methods.extend(_extract_all_methods_from_condition(sub_cond))
return dict_methods
if isinstance(condition, list):
list_methods: list[str] = []
for item in condition:
list_methods.extend(_extract_all_methods_from_condition(item))
return list_methods
return []
def _generate_edges(
listeners: dict[str, tuple[str, list[str]] | FlowCondition],
routers: set[str],
router_paths: dict[str, list[str]],
all_methods: set[str],
) -> list[EdgeInfo]:
"""Generate edges from listeners and routers.
Args:
listeners: Map of listener_name -> (condition_type, trigger_methods) or FlowCondition.
routers: Set of router method names.
router_paths: Map of router_name -> possible return values.
all_methods: Set of all method names in the flow.
Returns:
List of EdgeInfo dictionaries.
"""
edges: list[EdgeInfo] = []
for listener_name, condition_data in listeners.items():
trigger_methods: list[str] = []
if isinstance(condition_data, tuple) and len(condition_data) == 2:
_condition_type, methods = condition_data
trigger_methods = [str(m) for m in methods]
elif isinstance(condition_data, dict):
trigger_methods = _extract_all_methods_from_condition(condition_data)
edges.extend(
EdgeInfo(
from_method=trigger,
to_method=listener_name,
edge_type="listen",
condition=None,
)
for trigger in trigger_methods
if trigger in all_methods
)
for router_name, paths in router_paths.items():
for path in paths:
for listener_name, condition_data in listeners.items():
path_triggers: list[str] = []
if isinstance(condition_data, tuple) and len(condition_data) == 2:
_, methods = condition_data
path_triggers = [str(m) for m in methods]
elif isinstance(condition_data, dict):
path_triggers = _extract_all_methods_from_condition(condition_data)
if str(path) in path_triggers:
edges.append(
EdgeInfo(
from_method=router_name,
to_method=listener_name,
edge_type="route",
condition=str(path),
)
)
return edges
def _extract_state_schema(flow_class: type) -> StateSchemaInfo | None:
"""Extract state schema from a Flow class.
Checks for:
- Generic type parameter (Flow[MyState])
- initial_state class attribute
Args:
flow_class: The Flow class to inspect.
Returns:
StateSchemaInfo if a Pydantic model state is detected, None otherwise.
"""
state_type: type | None = None
# _initial_state_t is set by Flow.__class_getitem__
if hasattr(flow_class, "_initial_state_t"):
state_type = flow_class._initial_state_t
if state_type is None and hasattr(flow_class, "initial_state"):
initial_state = flow_class.initial_state
if isinstance(initial_state, type) and issubclass(initial_state, BaseModel):
state_type = initial_state
elif isinstance(initial_state, BaseModel):
state_type = type(initial_state)
if state_type is None and hasattr(flow_class, "__orig_bases__"):
for base in flow_class.__orig_bases__:
origin = get_origin(base)
if origin is not None:
args = get_args(base)
if args:
candidate = args[0]
if isinstance(candidate, type) and issubclass(candidate, BaseModel):
state_type = candidate
break
if state_type is None or not issubclass(state_type, BaseModel):
return None
fields: list[StateFieldInfo] = []
try:
model_fields = state_type.model_fields
for field_name, field_info in model_fields.items():
field_type_str = "Any"
if field_info.annotation is not None:
field_type_str = str(field_info.annotation)
field_type_str = field_type_str.replace("typing.", "")
field_type_str = field_type_str.replace("<class '", "").replace(
"'>", ""
)
default_value = None
if (
field_info.default is not PydanticUndefined
and field_info.default is not None
and not callable(field_info.default)
):
try:
default_value = field_info.default
except Exception:
default_value = str(field_info.default)
fields.append(
StateFieldInfo(
name=field_name,
type=field_type_str,
default=default_value,
)
)
except Exception:
logger.debug(
"Failed to extract state schema fields for %s", flow_class.__name__
)
return StateSchemaInfo(fields=fields) if fields else None
def _detect_flow_inputs(flow_class: type) -> list[str]:
"""Detect flow input parameters.
Inspects the __init__ signature for custom parameters beyond standard Flow params.
Args:
flow_class: The Flow class to inspect.
Returns:
List of detected input names.
"""
inputs: list[str] = []
try:
init_method = flow_class.__init__ # type: ignore[misc]
init_sig = inspect.signature(init_method)
standard_params = {
"self",
"persistence",
"tracing",
"suppress_flow_events",
"max_method_calls",
"kwargs",
}
inputs.extend(
param_name
for param_name in init_sig.parameters
if param_name not in standard_params and not param_name.startswith("_")
)
except Exception:
logger.debug(
"Failed to detect inputs from __init__ for %s", flow_class.__name__
)
return inputs
def flow_structure(flow_class: type) -> FlowStructureInfo:
"""Introspect a Flow class and return its structure as a JSON-serializable dict.
This function analyzes a Flow CLASS (not instance) and returns complete
information about its graph structure including methods, edges, and state.
Args:
flow_class: A Flow class (not an instance) to introspect.
Returns:
FlowStructureInfo dictionary containing:
- name: Flow class name
- description: Docstring if available
- methods: List of method info dicts
- edges: List of edge info dicts
- state_schema: State schema if typed, None otherwise
- inputs: Detected input names
Raises:
TypeError: If flow_class is not a class.
Example:
>>> structure = flow_structure(MyFlow)
>>> print(structure["name"])
'MyFlow'
>>> for method in structure["methods"]:
... print(method["name"], method["type"])
"""
if not isinstance(flow_class, type):
raise TypeError(
f"flow_structure requires a Flow class, not an instance. "
f"Got {type(flow_class).__name__}"
)
start_methods: list[str] = getattr(flow_class, "_start_methods", [])
listeners: dict[str, Any] = getattr(flow_class, "_listeners", {})
routers: set[str] = getattr(flow_class, "_routers", set())
router_paths_registry: dict[str, list[str]] = getattr(
flow_class, "_router_paths", {}
)
methods: list[MethodInfo] = []
all_method_names: set[str] = set()
for attr_name in dir(flow_class):
if attr_name.startswith("_"):
continue
try:
attr = getattr(flow_class, attr_name)
except AttributeError:
continue
is_flow_method = (
isinstance(attr, (FlowMethod, StartMethod, ListenMethod, RouterMethod))
or hasattr(attr, "__is_flow_method__")
or hasattr(attr, "__is_start_method__")
or hasattr(attr, "__trigger_methods__")
or hasattr(attr, "__is_router__")
)
if not is_flow_method:
continue
all_method_names.add(attr_name)
method_type = _get_method_type(attr_name, attr, start_methods, routers)
trigger_methods, condition_type = _extract_trigger_methods(attr)
router_paths_list: list[str] = []
if method_type in ("router", "start_router"):
router_paths_list = _extract_router_paths(attr, router_paths_registry)
has_hf = _has_human_feedback(attr)
has_crew = _detect_crew_reference(attr)
method_info = MethodInfo(
name=attr_name,
type=method_type,
trigger_methods=trigger_methods,
condition_type=condition_type,
router_paths=router_paths_list,
has_human_feedback=has_hf,
has_crew=has_crew,
)
methods.append(method_info)
edges = _generate_edges(listeners, routers, router_paths_registry, all_method_names)
state_schema = _extract_state_schema(flow_class)
inputs = _detect_flow_inputs(flow_class)
description: str | None = None
if flow_class.__doc__:
description = flow_class.__doc__.strip()
return FlowStructureInfo(
name=flow_class.__name__,
description=description,
methods=methods,
edges=edges,
state_schema=state_schema,
inputs=inputs,
)

View File

@@ -18,17 +18,6 @@ R = TypeVar("R")
FlowConditionType: TypeAlias = Literal["OR", "AND"]
SimpleFlowCondition: TypeAlias = tuple[FlowConditionType, list[FlowMethodName]]
__all__ = [
"FlowCondition",
"FlowConditionType",
"FlowConditions",
"FlowMethod",
"ListenMethod",
"RouterMethod",
"SimpleFlowCondition",
"StartMethod",
]
class FlowCondition(TypedDict, total=False):
"""Type definition for flow trigger conditions.
@@ -86,7 +75,6 @@ class FlowMethod(Generic[P, R]):
"__is_router__",
"__router_paths__",
"__human_feedback_config__",
"__flow_persistence_config__",
"_hf_llm", # Live LLM object for HITL resume
]:
if hasattr(meth, attr):
@@ -177,4 +165,3 @@ class RouterMethod(FlowMethod[P, R]):
__trigger_methods__: list[FlowMethodName] | None = None
__condition_type__: FlowConditionType | None = None
__trigger_condition__: FlowCondition | None = None
__router_paths__: list[str] | None = None

View File

@@ -78,10 +78,14 @@ logger = logging.getLogger(__name__)
F = TypeVar("F", bound=Callable[..., Any])
__all__ = ["HumanFeedbackResult", "human_feedback"]
def _serialize_llm_for_context(llm: Any) -> dict[str, Any] | str | None:
"""Serialize a BaseLLM object to a dict preserving full config.
Delegates to ``llm.to_config_dict()`` when available (BaseLLM and
subclasses). Falls back to extracting the model string with provider
prefix for unknown LLM types.
"""
to_config: Callable[[], dict[str, Any]] | None = getattr(
llm, "to_config_dict", None
)
@@ -99,6 +103,13 @@ def _serialize_llm_for_context(llm: Any) -> dict[str, Any] | str | None:
def _deserialize_llm_from_context(
llm_data: dict[str, Any] | str | None,
) -> BaseLLM | None:
"""Reconstruct an LLM instance from serialized context data.
Handles both the new dict format (with full config) and the legacy
string format (model name only) for backward compatibility.
Returns a BaseLLM instance, or None if llm_data is None.
"""
if llm_data is None:
return None
@@ -345,12 +356,20 @@ def human_feedback(
raise ValueError("default_outcome requires emit to be specified.")
def decorator(func: F) -> F:
"""Inner decorator that wraps the function."""
def _get_hitl_prompt(key: str) -> str:
"""Read a HITL prompt from the i18n translations."""
from crewai.utilities.i18n import I18N_DEFAULT
return I18N_DEFAULT.slice(key)
def _resolve_llm_instance() -> Any:
"""Resolve the ``llm`` parameter to a BaseLLM instance.
Uses the SAME model specified in the decorator so pre-review,
distillation, and outcome collapsing all share one model.
"""
if llm is None:
from crewai.llm import LLM
@@ -364,6 +383,7 @@ def human_feedback(
def _pre_review_with_lessons(
flow_instance: Flow[Any], method_output: Any
) -> Any:
"""Recall past HITL lessons and use LLM to pre-review the output."""
try:
mem = flow_instance.memory
if mem is None:
@@ -411,6 +431,7 @@ def human_feedback(
def _distill_and_store_lessons(
flow_instance: Flow[Any], method_output: Any, raw_feedback: str
) -> None:
"""Extract generalizable lessons from output + feedback, store in memory."""
try:
mem = flow_instance.memory
if mem is None:
@@ -464,6 +485,7 @@ def human_feedback(
def _build_feedback_context(
flow_instance: Flow[Any], method_output: Any
) -> tuple[Any, Any]:
"""Build the PendingFeedbackContext and resolve the effective provider."""
from crewai.flow.async_feedback.types import PendingFeedbackContext
context = PendingFeedbackContext(
@@ -487,6 +509,7 @@ def human_feedback(
return context, effective_provider
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
"""Request feedback using provider or default console (sync)."""
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
@@ -512,6 +535,7 @@ def human_feedback(
async def _request_feedback_async(
flow_instance: Flow[Any], method_output: Any
) -> str:
"""Request feedback, awaiting the provider if it returns a coroutine."""
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
@@ -535,6 +559,7 @@ def human_feedback(
method_output: Any,
raw_feedback: str,
) -> HumanFeedbackResult | str:
"""Process feedback and return result or outcome."""
collapsed_outcome: str | None = None
if not raw_feedback.strip():
@@ -636,9 +661,6 @@ def human_feedback(
"__condition_type__",
"__trigger_condition__",
"__is_flow_method__",
"__flow_persistence_config__",
"__is_router__",
"__router_paths__",
]:
if hasattr(func, attr):
setattr(wrapper, attr, getattr(func, attr))

View File

@@ -31,7 +31,7 @@ import logging
from typing import TYPE_CHECKING, Any, Final, TypeVar, cast
from crewai_core.printer import PRINTER
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
@@ -44,8 +44,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
T = TypeVar("T")
__all__ = ["PersistenceDecorator", "persist"]
LOG_MESSAGES: Final[dict[str, str]] = {
"save_state": "Saving flow state to memory for ID: {}",
"save_error": "Failed to persist state for method {}: {}",
@@ -54,37 +52,6 @@ LOG_MESSAGES: Final[dict[str, str]] = {
}
class _FlowPersistenceConfig(BaseModel):
persistence: Any = None
verbose: bool = False
model_config = ConfigDict(arbitrary_types_allowed=True)
def _stamp_persistence_metadata(
target: Any,
persistence: FlowPersistence,
verbose: bool,
) -> None:
target.__flow_persistence_config__ = _FlowPersistenceConfig(
persistence=persistence,
verbose=verbose,
)
_PRESERVED_FLOW_ATTRS: Final[tuple[str, ...]] = (
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__trigger_condition__",
"__is_router__",
"__router_paths__",
"__human_feedback_config__",
"__flow_persistence_config__",
"_hf_llm",
)
class PersistenceDecorator:
"""Class to handle flow state persistence with consistent logging."""
@@ -196,10 +163,10 @@ def persist(
"""
def decorator(target: type | Callable[..., T]) -> type | Callable[..., T]:
"""Decorator that handles both class and method decoration."""
actual_persistence = persistence or SQLiteFlowPersistence()
if isinstance(target, type):
_stamp_persistence_metadata(target, actual_persistence, verbose)
original_init = target.__init__ # type: ignore[misc]
@functools.wraps(original_init)
@@ -244,7 +211,12 @@ def persist(
wrapped = create_async_wrapper(name, method)
for attr in _PRESERVED_FLOW_ATTRS:
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__is_router__",
]:
if hasattr(method, attr):
setattr(wrapped, attr, getattr(method, attr))
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
@@ -267,7 +239,12 @@ def persist(
wrapped = create_sync_wrapper(name, method)
for attr in _PRESERVED_FLOW_ATTRS:
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__is_router__",
]:
if hasattr(method, attr):
setattr(wrapped, attr, getattr(method, attr))
wrapped.__is_flow_method__ = True # type: ignore[attr-defined]
@@ -277,7 +254,6 @@ def persist(
return target
method = target
method.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(method, actual_persistence, verbose)
if asyncio.iscoroutinefunction(method):
@@ -295,13 +271,15 @@ def persist(
)
return cast(T, result)
for attr in _PRESERVED_FLOW_ATTRS:
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__is_router__",
]:
if hasattr(method, attr):
setattr(method_async_wrapper, attr, getattr(method, attr))
method_async_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(
method_async_wrapper, actual_persistence, verbose
)
return cast(Callable[..., T], method_async_wrapper)
@functools.wraps(method)
@@ -312,11 +290,15 @@ def persist(
)
return result
for attr in _PRESERVED_FLOW_ATTRS:
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__is_router__",
]:
if hasattr(method, attr):
setattr(method_sync_wrapper, attr, getattr(method, attr))
method_sync_wrapper.__is_flow_method__ = True # type: ignore[attr-defined]
_stamp_persistence_metadata(method_sync_wrapper, actual_persistence, verbose)
return cast(Callable[..., T], method_sync_wrapper)
return decorator

View File

@@ -1,8 +1,9 @@
"""Flow runtime: the Flow execution engine, its metaclass, and state proxies.
Holds the Flow class (kickoff/resume/listener dispatch), the FlowMeta
metaclass (Pydantic model construction; Python class extraction is delegated
to ``dsl.extract_flow_definition``), and the thread-safe state proxies.
metaclass (Pydantic model construction; structural extraction is delegated to
``flow_definition.extract_flow_definition``), and the thread-safe state
proxies. The authoring decorators live in ``crewai.flow.dsl``.
"""
from __future__ import annotations
@@ -84,19 +85,17 @@ from crewai.events.types.flow_events import (
MethodExecutionStartedEvent,
)
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.dsl import (
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_definition import (
_extract_all_methods,
_extract_all_methods_recursive,
_normalize_condition,
build_flow_definition,
extract_flow_definition,
is_flow_condition_dict,
is_flow_method,
is_flow_method_name,
is_simple_flow_condition,
)
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_definition import FlowDefinition
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowMethod,
@@ -594,9 +593,6 @@ class FlowMeta(ModelMetaclass):
cls._listeners = listeners # type: ignore[attr-defined]
cls._routers = routers # type: ignore[attr-defined]
cls._router_paths = router_paths # type: ignore[attr-defined]
# The static FlowDefinition is built lazily (on first access via
# ``Flow.flow_definition()`` or visualization), not at class-definition
# time, to avoid AST parsing and diagnostic logging on every import.
return cls
@@ -617,19 +613,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_listeners: ClassVar[dict[FlowMethodName, SimpleFlowCondition | FlowCondition]] = {}
_routers: ClassVar[set[FlowMethodName]] = set()
_router_paths: ClassVar[dict[FlowMethodName, list[FlowMethodName]]] = {}
_flow_definition: ClassVar[FlowDefinition | None] = None
entity_type: Literal["flow"] = "flow"
@classmethod
def flow_definition(cls) -> FlowDefinition:
"""Return the static Flow Definition built from this Flow class."""
flow_definition = cls.__dict__.get("_flow_definition")
if flow_definition is None:
flow_definition = build_flow_definition(cls)
cls._flow_definition = flow_definition
return flow_definition
initial_state: Annotated[ # type: ignore[type-arg]
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
BeforeValidator(_deserialize_initial_state),

View File

@@ -0,0 +1,53 @@
"""Backwards-compatible shim. The implementation moved to ``crewai.flow.flow_definition``.
Import from ``crewai.flow.flow_definition`` directly in new code.
"""
from crewai.flow.flow_definition import (
_extract_all_methods,
_extract_all_methods_recursive,
_extract_string_literals_from_type_annotation,
_normalize_condition,
_unwrap_function,
build_ancestor_dict,
build_parent_children_dict,
calculate_node_levels,
count_outgoing_edges,
dfs_ancestors,
extract_flow_definition,
get_child_index,
get_possible_return_constants,
is_ancestor,
is_flow_condition_dict,
is_flow_condition_list,
is_flow_method,
is_flow_method_callable,
is_flow_method_name,
is_simple_flow_condition,
process_router_paths,
)
__all__ = [
"_extract_all_methods",
"_extract_all_methods_recursive",
"_extract_string_literals_from_type_annotation",
"_normalize_condition",
"_unwrap_function",
"build_ancestor_dict",
"build_parent_children_dict",
"calculate_node_levels",
"count_outgoing_edges",
"dfs_ancestors",
"extract_flow_definition",
"get_child_index",
"get_possible_return_constants",
"is_ancestor",
"is_flow_condition_dict",
"is_flow_condition_list",
"is_flow_method",
"is_flow_method_callable",
"is_flow_method_name",
"is_simple_flow_condition",
"process_router_paths",
]

View File

@@ -1,259 +0,0 @@
from __future__ import annotations
from collections import defaultdict, deque
from typing import Any, cast
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_definition import (
FlowDefinition,
FlowDefinitionCondition,
FlowMethodDefinition,
_extract_definition_condition_atoms,
)
__all__: list[str] = []
def _definition_from(value: Any) -> FlowDefinition:
if isinstance(value, FlowDefinition):
return value
flow_class = value if isinstance(value, type) else value.__class__
flow_definition = getattr(flow_class, "flow_definition", None)
if callable(flow_definition):
return cast(FlowDefinition, flow_definition())
raise TypeError(
"Flow visualization analysis requires a FlowDefinition or a Flow class/instance "
"with flow_definition()."
)
def _method_condition(method: FlowMethodDefinition) -> FlowDefinitionCondition | None:
if method.listen is not None:
return method.listen
if isinstance(method.start, str | dict):
return method.start
return None
def _condition_type(condition: FlowDefinitionCondition | None) -> str | None:
if isinstance(condition, str):
return OR_CONDITION
if isinstance(condition, dict):
if "and" in condition:
return AND_CONDITION
if "or" in condition:
return OR_CONDITION
return None
def _router_paths(method: FlowMethodDefinition) -> list[str]:
if method.human_feedback and method.human_feedback.emit:
return [str(path) for path in method.human_feedback.emit]
if method.returns:
return [str(path) for path in method.returns]
return []
def _condition_atoms(method: FlowMethodDefinition) -> list[str]:
condition = _method_condition(method)
if condition is None:
return []
return _extract_definition_condition_atoms(condition)
def _direct_or_triggers(method: FlowMethodDefinition) -> list[str]:
condition = _method_condition(method)
if condition is None:
return []
return _extract_direct_or_triggers(condition)
def _extract_direct_or_triggers(condition: FlowDefinitionCondition) -> list[str]:
if isinstance(condition, str):
return [condition]
if "and" in condition:
return []
triggers: list[str] = []
for sub_condition in cast(list[FlowDefinitionCondition], condition.get("or", [])):
triggers.extend(_extract_direct_or_triggers(sub_condition))
return triggers
def _parent_children_from_definition(
definition: FlowDefinition,
) -> dict[str, list[str]]:
method_names = set(definition.methods)
parent_children: dict[str, list[str]] = {}
for child_name, method in definition.methods.items():
for trigger in _condition_atoms(method):
if trigger in method_names:
parent_children.setdefault(trigger, [])
if child_name not in parent_children[trigger]:
parent_children[trigger].append(child_name)
for router_name, method in definition.methods.items():
if not method.router:
continue
for path in _router_paths(method):
for child_name, child in definition.methods.items():
if child_name == router_name:
continue
if path in _direct_or_triggers(child):
parent_children.setdefault(router_name, [])
if child_name not in parent_children[router_name]:
parent_children[router_name].append(child_name)
return parent_children
def _calculate_node_levels(flow_or_definition: Any) -> dict[str, int]:
definition = _definition_from(flow_or_definition)
levels: dict[str, int] = {}
queue: deque[str] = deque()
visited: set[str] = set()
pending_and_listeners: dict[str, set[str]] = {}
for method_name, method in definition.methods.items():
if method.is_start:
levels[method_name] = 0
queue.append(method_name)
or_listeners = defaultdict(list)
and_listeners = defaultdict(set)
for listener_name, method in definition.methods.items():
condition = _method_condition(method)
condition_type = _condition_type(condition)
trigger_methods = _condition_atoms(method)
if condition_type == OR_CONDITION:
for trigger in trigger_methods:
or_listeners[trigger].append(listener_name)
elif condition_type == AND_CONDITION:
and_listeners[listener_name] = set(trigger_methods)
while queue:
current = queue.popleft()
current_level = levels[current]
visited.add(current)
for listener_name in or_listeners[current]:
if listener_name not in levels or levels[listener_name] > current_level + 1:
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
for listener_name, required_methods in and_listeners.items():
if current in required_methods:
pending_and_listeners.setdefault(listener_name, set()).add(current)
if required_methods == pending_and_listeners[listener_name]:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
_process_router_paths(definition, current, current_level, levels, queue)
max_level = max(levels.values()) if levels else 0
for method_name in definition.methods:
if method_name not in levels:
levels[method_name] = max_level + 1
return levels
def _count_outgoing_edges(flow_or_definition: Any) -> dict[str, int]:
definition = _definition_from(flow_or_definition)
parent_children = _parent_children_from_definition(definition)
return {
method_name: len(parent_children.get(method_name, []))
for method_name in definition.methods
}
def _build_ancestor_dict(flow_or_definition: Any) -> dict[str, set[str]]:
definition = _definition_from(flow_or_definition)
ancestors: dict[str, set[str]] = {node: set() for node in definition.methods}
parent_children = _parent_children_from_definition(definition)
worklist: deque[str] = deque()
for parent, children in parent_children.items():
for child in children:
if (
child in ancestors
and parent != child
and parent not in ancestors[child]
):
ancestors[child].add(parent)
worklist.append(child)
_dfs_ancestors(worklist, ancestors, parent_children)
return ancestors
def _dfs_ancestors(
worklist: deque[str],
ancestors: dict[str, set[str]],
parent_children: dict[str, list[str]],
) -> None:
while worklist:
parent = worklist.popleft()
for child in parent_children.get(parent, []):
if child not in ancestors:
continue
next_ancestors = set(ancestors[parent])
if parent != child:
next_ancestors.add(parent)
next_ancestors.discard(child)
if not next_ancestors.issubset(ancestors[child]):
ancestors[child].update(next_ancestors)
worklist.append(child)
def _is_ancestor(
node: str, ancestor_candidate: str, ancestors: dict[str, set[str]]
) -> bool:
return ancestor_candidate in ancestors.get(node, set())
def _build_parent_children_dict(flow_or_definition: Any) -> dict[str, list[str]]:
return _parent_children_from_definition(_definition_from(flow_or_definition))
def _get_child_index(
parent: str, child: str, parent_children: dict[str, list[str]]
) -> int:
children = parent_children.get(parent, [])
children.sort()
return children.index(child)
def _process_router_paths(
flow_or_definition: Any,
current: str,
current_level: int,
levels: dict[str, int],
queue: deque[str],
) -> None:
definition = _definition_from(flow_or_definition)
method = definition.methods.get(current)
if method is None or not method.router:
return
for path in _router_paths(method):
for listener_name, listener in definition.methods.items():
if listener_name == current:
continue
if path in _direct_or_triggers(listener):
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
queue.append(listener_name)

View File

@@ -1,118 +1,131 @@
"""Flow structure builder for definition-only Flow visualization."""
"""Flow structure builder for analyzing Flow execution."""
from __future__ import annotations
from collections import defaultdict
import inspect
import logging
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_definition import (
FlowDefinition,
FlowDefinitionCondition,
FlowMethodDefinition,
from crewai.flow.flow_wrappers import FlowCondition
from crewai.flow.types import FlowMethodName
from crewai.flow.utils import (
is_flow_condition_dict,
is_simple_flow_condition,
)
from crewai.flow.visualization.schema import extract_method_signature
from crewai.flow.visualization.types import FlowStructure, NodeMetadata, StructureEdge
logger = logging.getLogger(__name__)
__all__ = ["build_flow_structure", "calculate_execution_paths"]
if TYPE_CHECKING:
from crewai.flow.flow import Flow
def _definition_condition_items(
condition: dict[str, Any],
key: str,
) -> list[FlowDefinitionCondition]:
return cast(list[FlowDefinitionCondition], condition.get(key, []))
def _extract_direct_or_triggers(
condition: str | dict[str, Any] | list[Any] | FlowCondition,
) -> list[str]:
"""Extract direct OR-level trigger strings from a condition.
This function extracts strings that would directly trigger a listener,
meaning they appear at the top level of an OR condition. Strings nested
inside AND conditions are NOT considered direct triggers for router paths.
def _definition_condition_parts(
condition: dict[str, Any],
) -> tuple[str, list[FlowDefinitionCondition]]:
if "and" in condition:
return AND_CONDITION, _definition_condition_items(condition, "and")
return OR_CONDITION, _definition_condition_items(condition, "or")
For example:
- or_("a", "b") -> ["a", "b"] (both are direct triggers)
- and_("a", "b") -> [] (neither are direct triggers, both required)
- or_(and_("a", "b"), "c") -> ["c"] (only "c" is a direct trigger)
Args:
condition: Can be a string, dict, or list.
def _condition_type_from_definition(
condition: FlowDefinitionCondition | None,
) -> str | None:
Returns:
List of direct OR-level trigger strings.
"""
if isinstance(condition, str):
return [condition]
if isinstance(condition, dict):
if "and" in condition:
return AND_CONDITION
if "or" in condition:
return OR_CONDITION
if isinstance(condition, str):
return OR_CONDITION
return None
cond_type = condition.get("type", OR_CONDITION)
conditions_list = condition.get("conditions", [])
def _runtime_condition_from_definition(
condition: FlowDefinitionCondition,
) -> str | dict[str, Any]:
if isinstance(condition, str):
return condition
condition_type, conditions = _definition_condition_parts(condition)
return {
"type": condition_type,
"conditions": [_runtime_condition_from_definition(item) for item in conditions],
}
def _method_trigger_condition(
method_definition: FlowMethodDefinition,
) -> FlowDefinitionCondition | None:
if method_definition.listen is not None:
return method_definition.listen
if isinstance(method_definition.start, str | dict):
return method_definition.start
return None
def _method_router_paths(method_definition: FlowMethodDefinition) -> list[str]:
if method_definition.human_feedback and method_definition.human_feedback.emit:
return [str(path) for path in method_definition.human_feedback.emit]
if method_definition.returns:
return [str(path) for path in method_definition.returns]
if cond_type == OR_CONDITION:
strings = []
for sub_cond in conditions_list:
strings.extend(_extract_direct_or_triggers(sub_cond))
return strings
return []
if isinstance(condition, list):
strings = []
for item in condition:
strings.extend(_extract_direct_or_triggers(item))
return strings
if callable(condition) and hasattr(condition, "__name__"):
return [condition.__name__]
return []
def _extract_direct_or_triggers(
condition: FlowDefinitionCondition,
) -> list[str]:
if isinstance(condition, str):
return [condition]
condition_type, conditions = _definition_condition_parts(condition)
if condition_type == AND_CONDITION:
return []
strings: list[str] = []
for sub_condition in conditions:
strings.extend(_extract_direct_or_triggers(sub_condition))
return strings
def _extract_all_trigger_names(
condition: FlowDefinitionCondition,
condition: str | dict[str, Any] | list[Any] | FlowCondition,
) -> list[str]:
"""Extract ALL trigger names from a condition for display purposes.
Unlike _extract_direct_or_triggers, this extracts ALL strings and method
names from the entire condition tree, including those nested in AND conditions.
This is used for displaying trigger information in the UI.
For example:
- or_("a", "b") -> ["a", "b"]
- and_("a", "b") -> ["a", "b"]
- or_(and_("a", method_6), method_4) -> ["a", "method_6", "method_4"]
Args:
condition: Can be a string, dict, or list.
Returns:
List of all trigger names found in the condition.
"""
if isinstance(condition, str):
return [condition]
_, conditions = _definition_condition_parts(condition)
strings: list[str] = []
for sub_condition in conditions:
strings.extend(_extract_all_trigger_names(sub_condition))
return strings
if isinstance(condition, dict):
conditions_list = condition.get("conditions", [])
strings = []
for sub_cond in conditions_list:
strings.extend(_extract_all_trigger_names(sub_cond))
return strings
if isinstance(condition, list):
strings = []
for item in condition:
strings.extend(_extract_all_trigger_names(item))
return strings
if callable(condition) and hasattr(condition, "__name__"):
return [condition.__name__]
return []
def _create_edges_from_condition(
condition: FlowDefinitionCondition,
condition: str | dict[str, Any] | list[Any] | FlowCondition,
target: str,
nodes: dict[str, NodeMetadata],
) -> list[StructureEdge]:
"""Create edges from a condition tree, preserving AND/OR semantics.
This function recursively processes the condition tree and creates edges
with the appropriate condition_type for each trigger.
For AND conditions, all triggers get edges with condition_type="AND".
For OR conditions, triggers get edges with condition_type="OR".
Args:
condition: The condition tree (string, dict, or list).
target: The target node name.
nodes: Dictionary of all nodes for validation.
Returns:
List of StructureEdge objects representing the condition.
"""
edges: list[StructureEdge] = []
if isinstance(condition, str):
@@ -125,8 +138,21 @@ def _create_edges_from_condition(
is_router_path=False,
)
)
elif callable(condition) and hasattr(condition, "__name__"):
method_name = condition.__name__
if method_name in nodes:
edges.append(
StructureEdge(
source=method_name,
target=target,
condition_type=OR_CONDITION,
is_router_path=False,
)
)
elif isinstance(condition, dict):
cond_type, conditions = _definition_condition_parts(condition)
cond_type = condition.get("type", OR_CONDITION)
conditions_list = condition.get("conditions", [])
if cond_type == AND_CONDITION:
triggers = _extract_all_trigger_names(condition)
edges.extend(
@@ -140,138 +166,271 @@ def _create_edges_from_condition(
if trigger in nodes
)
else:
for sub_condition in conditions:
edges.extend(_create_edges_from_condition(sub_condition, target, nodes))
for sub_cond in conditions_list:
edges.extend(_create_edges_from_condition(sub_cond, target, nodes))
elif isinstance(condition, list):
for item in condition:
edges.extend(_create_edges_from_condition(item, target, nodes))
return edges
def _flow_definition_from(
flow_or_definition: Flow[Any] | type[Flow[Any]] | FlowDefinition,
) -> FlowDefinition:
if isinstance(flow_or_definition, FlowDefinition):
return flow_or_definition
def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
"""Build a structure representation of a Flow's execution.
flow_class = (
flow_or_definition
if isinstance(flow_or_definition, type)
else type(flow_or_definition)
)
flow_definition = getattr(flow_class, "flow_definition", None)
if callable(flow_definition):
return cast(FlowDefinition, flow_definition())
raise TypeError(
"build_flow_structure requires a FlowDefinition or a Flow class/instance "
"with flow_definition()."
)
Args:
flow: Flow instance to analyze.
def build_flow_structure(
flow_or_definition: Flow[Any] | type[Flow[Any]] | FlowDefinition,
) -> FlowStructure:
"""Build a visualization structure projection from a FlowDefinition."""
definition = _flow_definition_from(flow_or_definition)
Returns:
Dictionary with nodes, edges, start_methods, and router_methods.
"""
nodes: dict[str, NodeMetadata] = {}
edges: list[StructureEdge] = []
start_methods: list[str] = []
router_methods: list[str] = []
for method_name, method_definition in definition.methods.items():
node_metadata: NodeMetadata = {"type": "listen", "class_name": definition.name}
for method_name, method in flow._methods.items():
node_metadata: NodeMetadata = {"type": "listen"}
if method_definition.is_start:
if hasattr(method, "__is_start_method__") and method.__is_start_method__:
node_metadata["type"] = "start"
start_methods.append(method_name)
if method_definition.router:
if hasattr(method, "__is_router__") and method.__is_router__:
node_metadata["is_router"] = True
node_metadata["type"] = "router"
router_methods.append(method_name)
router_paths = _method_router_paths(method_definition)
if router_paths:
node_metadata["router_paths"] = router_paths
trigger_condition = _method_trigger_condition(method_definition)
condition_type = _condition_type_from_definition(trigger_condition)
if condition_type is not None and trigger_condition is not None:
node_metadata["trigger_condition_type"] = condition_type
node_metadata["condition_type"] = condition_type
extracted = _extract_all_trigger_names(trigger_condition)
if extracted:
node_metadata["trigger_methods"] = extracted
runtime_condition = _runtime_condition_from_definition(trigger_condition)
if isinstance(runtime_condition, dict):
node_metadata["trigger_condition"] = runtime_condition
if method_name in flow._router_paths:
node_metadata["router_paths"] = [
str(p) for p in flow._router_paths[method_name]
]
if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
node_metadata["trigger_methods"] = [
str(m) for m in method.__trigger_methods__
]
if hasattr(method, "__condition_type__") and method.__condition_type__:
node_metadata["trigger_condition_type"] = method.__condition_type__
if "condition_type" not in node_metadata:
node_metadata["condition_type"] = method.__condition_type__
if node_metadata.get("is_router") and "condition_type" not in node_metadata:
node_metadata["condition_type"] = "IF"
nodes[method_name] = node_metadata
if (
hasattr(method, "__trigger_condition__")
and method.__trigger_condition__ is not None
):
node_metadata["trigger_condition"] = method.__trigger_condition__
for method_name, method_definition in definition.methods.items():
trigger_condition = _method_trigger_condition(method_definition)
if trigger_condition is None:
continue
edges.extend(
_create_edges_from_condition(trigger_condition, method_name, nodes)
if "trigger_methods" not in node_metadata:
extracted = _extract_all_trigger_names(method.__trigger_condition__)
if extracted:
node_metadata["trigger_methods"] = extracted
node_metadata["method_signature"] = extract_method_signature(
method, method_name
)
all_string_triggers: set[str] = set()
for method_definition in definition.methods.values():
trigger_condition = _method_trigger_condition(method_definition)
if trigger_condition is None:
try:
source_code = inspect.getsource(method)
node_metadata["source_code"] = source_code
try:
source_lines, start_line = inspect.getsourcelines(method)
node_metadata["source_lines"] = source_lines
node_metadata["source_start_line"] = start_line
except (OSError, TypeError):
pass
try:
source_file = inspect.getsourcefile(method)
if source_file:
node_metadata["source_file"] = source_file
except (OSError, TypeError):
try:
class_file = inspect.getsourcefile(flow.__class__)
if class_file:
node_metadata["source_file"] = class_file
except (OSError, TypeError):
pass
except (OSError, TypeError):
pass
try:
class_obj = flow.__class__
if class_obj:
class_name = class_obj.__name__
bases = class_obj.__bases__
if bases:
base_strs = []
for base in bases:
if hasattr(base, "__name__"):
if hasattr(base, "__origin__"):
base_strs.append(str(base))
else:
base_strs.append(base.__name__)
else:
base_strs.append(str(base))
try:
source_lines = inspect.getsource(class_obj).split("\n")
_, class_start_line = inspect.getsourcelines(class_obj)
for idx, line in enumerate(source_lines):
stripped = line.strip()
if stripped.startswith("class ") and class_name in stripped:
class_signature = stripped.rstrip(":")
node_metadata["class_signature"] = class_signature
node_metadata["class_line_number"] = (
class_start_line + idx
)
break
except (OSError, TypeError):
class_signature = f"class {class_name}({', '.join(base_strs)})"
node_metadata["class_signature"] = class_signature
else:
class_signature = f"class {class_name}"
node_metadata["class_signature"] = class_signature
node_metadata["class_name"] = class_name
except (OSError, TypeError, AttributeError):
pass
nodes[method_name] = node_metadata
for listener_name, condition_data in flow._listeners.items():
if listener_name in router_methods:
continue
for trigger in _extract_direct_or_triggers(trigger_condition):
if trigger not in nodes:
all_string_triggers.add(trigger)
if is_simple_flow_condition(condition_data):
cond_type, methods = condition_data
edges.extend(
StructureEdge(
source=str(trigger_method),
target=str(listener_name),
condition_type=cond_type,
is_router_path=False,
)
for trigger_method in methods
if str(trigger_method) in nodes
)
elif is_flow_condition_dict(condition_data):
edges.extend(
_create_edges_from_condition(condition_data, str(listener_name), nodes)
)
for method_name, node_metadata in nodes.items(): # type: ignore[assignment]
if node_metadata.get("is_router") and "trigger_methods" in node_metadata:
trigger_methods = node_metadata["trigger_methods"]
condition_type = node_metadata.get("trigger_condition_type", OR_CONDITION)
if "trigger_condition" in node_metadata:
edges.extend(
_create_edges_from_condition(
node_metadata["trigger_condition"], # type: ignore[arg-type]
method_name,
nodes,
)
)
else:
edges.extend(
StructureEdge(
source=trigger_method,
target=method_name,
condition_type=condition_type,
is_router_path=False,
)
for trigger_method in trigger_methods
if trigger_method in nodes
)
all_string_triggers: set[str] = set()
for condition_data in flow._listeners.values():
if is_simple_flow_condition(condition_data):
_, methods = condition_data
for m in methods:
if str(m) not in nodes: # It's a string trigger, not a method name
all_string_triggers.add(str(m))
elif is_flow_condition_dict(condition_data):
for trigger in _extract_direct_or_triggers(condition_data):
if trigger not in nodes:
all_string_triggers.add(trigger)
all_router_outputs: set[str] = set()
for router_method_name in router_methods:
router_paths = _method_router_paths(definition.methods[router_method_name])
if router_paths and router_method_name in nodes:
nodes[router_method_name]["router_paths"] = router_paths
all_router_outputs.update(router_paths)
if router_method_name not in flow._router_paths:
flow._router_paths[FlowMethodName(router_method_name)] = []
if not router_paths:
current_paths = flow._router_paths.get(FlowMethodName(router_method_name), [])
if current_paths and router_method_name in nodes:
nodes[router_method_name]["router_paths"] = [str(p) for p in current_paths]
all_router_outputs.update(str(p) for p in current_paths)
if not current_paths:
logger.warning(
f"Router paths for '{router_method_name}' are dynamic or not "
f"statically inferable; static visualization may omit route edges."
f"Could not determine return paths for router '{router_method_name}'. "
f"Add a return type annotation like "
f"'-> Literal[\"path1\", \"path2\"]' or '-> YourEnum' "
f"to enable proper flow visualization."
)
orphaned_triggers = all_string_triggers - all_router_outputs
if orphaned_triggers:
logger.warning(
f"Static visualization could not match listener triggers "
f"{orphaned_triggers} to explicit router outputs. "
f"Dynamic router values may still trigger these listeners at runtime."
logger.error(
f"Found listeners waiting for triggers {orphaned_triggers} "
f"but no router outputs these values explicitly. "
f"If your router returns a non-static value, check that your router has proper return type annotations."
)
for router_method_name in router_methods:
router_paths = _method_router_paths(definition.methods[router_method_name])
if router_method_name not in flow._router_paths:
continue
router_paths = flow._router_paths[FlowMethodName(router_method_name)]
for path in router_paths:
for listener_name, method_definition in definition.methods.items():
for listener_name, condition_data in flow._listeners.items():
if listener_name == router_method_name:
continue
trigger_condition = _method_trigger_condition(method_definition)
if trigger_condition is None:
continue
trigger_strings_from_cond = _extract_direct_or_triggers(
trigger_condition
)
trigger_strings_from_cond: list[str] = []
if is_simple_flow_condition(condition_data):
_, methods = condition_data
trigger_strings_from_cond = [str(m) for m in methods]
elif is_flow_condition_dict(condition_data):
trigger_strings_from_cond = _extract_direct_or_triggers(
condition_data
)
if str(path) in trigger_strings_from_cond:
edges.append(
StructureEdge(
source=router_method_name,
target=listener_name,
target=str(listener_name),
condition_type=None,
is_router_path=True,
router_path_label=str(path),
)
)
for start_method in flow._start_methods:
if start_method not in nodes and start_method in flow._methods:
method = flow._methods[start_method]
nodes[str(start_method)] = NodeMetadata(type="start")
if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
nodes[str(start_method)]["trigger_methods"] = [
str(m) for m in method.__trigger_methods__
]
if hasattr(method, "__condition_type__") and method.__condition_type__:
nodes[str(start_method)]["condition_type"] = method.__condition_type__
return FlowStructure(
nodes=nodes,
edges=edges,
@@ -307,6 +466,15 @@ def calculate_execution_paths(structure: FlowStructure) -> int:
return 0
def count_paths_from(node: str, visited: set[str]) -> int:
"""Recursively count execution paths from a given node.
Args:
node: Node name to start counting from.
visited: Set of already visited nodes to prevent cycles.
Returns:
Number of execution paths from this node to terminal nodes.
"""
if node in terminal_nodes:
return 1

View File

@@ -0,0 +1,104 @@
"""OpenAPI schema conversion utilities for Flow methods."""
import inspect
from typing import Any, get_args, get_origin
def type_to_openapi_schema(type_hint: Any) -> dict[str, Any]:
"""Convert Python type hint to OpenAPI schema.
Args:
type_hint: Python type hint to convert.
Returns:
OpenAPI schema dictionary.
"""
if type_hint is inspect.Parameter.empty:
return {}
if type_hint is None or type_hint is type(None):
return {"type": "null"}
if hasattr(type_hint, "__module__") and hasattr(type_hint, "__name__"):
if type_hint.__module__ == "typing" and type_hint.__name__ == "Any":
return {}
type_str = str(type_hint)
if type_str == "typing.Any" or type_str == "<class 'typing.Any'>":
return {}
if isinstance(type_hint, str):
return {"type": type_hint}
origin = get_origin(type_hint)
args = get_args(type_hint)
if type_hint is str:
return {"type": "string"}
if type_hint is int:
return {"type": "integer"}
if type_hint is float:
return {"type": "number"}
if type_hint is bool:
return {"type": "boolean"}
if type_hint is dict or origin is dict:
if args and len(args) > 1:
return {
"type": "object",
"additionalProperties": type_to_openapi_schema(args[1]),
}
return {"type": "object"}
if type_hint is list or origin is list:
if args:
return {"type": "array", "items": type_to_openapi_schema(args[0])}
return {"type": "array"}
if hasattr(type_hint, "__name__"):
return {"type": "object", "className": type_hint.__name__}
return {}
def extract_method_signature(method: Any, method_name: str) -> dict[str, Any]:
"""Extract method signature as OpenAPI schema with documentation.
Args:
method: Method to analyze.
method_name: Method name.
Returns:
Dictionary with operationId, parameters, returns, summary, and description.
"""
try:
sig = inspect.signature(method)
parameters = {}
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
parameters[param_name] = type_to_openapi_schema(param.annotation)
return_type = type_to_openapi_schema(sig.return_annotation)
docstring = inspect.getdoc(method)
result: dict[str, Any] = {
"operationId": method_name,
"parameters": parameters,
"returns": return_type,
}
if docstring:
lines = docstring.strip().split("\n")
summary = lines[0].strip()
if summary:
result["summary"] = summary
if len(lines) > 1:
description = "\n".join(line.strip() for line in lines[1:]).strip()
if description:
result["description"] = description
return result
except Exception:
return {"operationId": method_name, "parameters": {}, "returns": {}}

View File

@@ -3,9 +3,6 @@
from typing import Any, TypedDict
__all__ = ["FlowStructure", "NodeMetadata", "StructureEdge"]
class NodeMetadata(TypedDict, total=False):
"""Metadata for a single node in the flow structure."""
@@ -16,7 +13,14 @@ class NodeMetadata(TypedDict, total=False):
trigger_condition_type: str | None
trigger_methods: list[str]
trigger_condition: dict[str, Any] | None
method_signature: dict[str, Any]
source_code: str
source_lines: list[str]
source_start_line: int
source_file: str
class_signature: str
class_name: str
class_line_number: int
class StructureEdge(TypedDict, total=False):

View File

@@ -1,602 +0,0 @@
"""Tests for the static Flow Definition contract."""
import ast
from enum import Enum
import importlib
import inspect
import logging
from pathlib import Path
from typing import Literal
from pydantic import BaseModel
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
import crewai.flow.flow_definition as flow_definition
import crewai.flow.visualization.builder as visualization_builder
def test_flow_public_exports_are_explicit():
import crewai.flow.dsl as flow_dsl
import crewai.flow.visualization as flow_visualization
import crewai.flow.visualization.analysis as visualization_analysis
flow_package = importlib.import_module("crewai.flow")
assert "FlowDefinition" not in flow_package.__all__
assert "FlowDefinitionDiagnostic" not in flow_package.__all__
assert "build_flow_definition" not in flow_package.__all__
assert "flow_structure" not in flow_package.__all__
assert set(flow_dsl.__all__) == {"and_", "listen", "or_", "router", "start"}
assert set(flow_definition.__all__) == {
"FlowConfigDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowHumanFeedbackDefinition",
"FlowMethodDefinition",
"FlowPersistenceDefinition",
"FlowStateDefinition",
}
assert "build_flow_structure" in flow_visualization.__all__
assert "calculate_node_levels" not in flow_visualization.__all__
assert visualization_analysis.__all__ == []
def test_private_flow_helpers_do_not_have_docstrings():
import crewai.flow.dsl as flow_dsl
import crewai.flow.flow_wrappers as flow_wrappers
import crewai.flow.human_feedback as human_feedback
import crewai.flow.persistence.decorators as persistence_decorators
import crewai.flow.visualization.analysis as visualization_analysis
import crewai.flow.visualization.types as visualization_types
modules = [
flow_dsl,
flow_definition,
flow_wrappers,
human_feedback,
persistence_decorators,
visualization_analysis,
visualization_builder,
visualization_types,
]
violations: list[str] = []
for module in modules:
source_path = Path(inspect.getsourcefile(module) or "")
tree = ast.parse(source_path.read_text())
stack: list[ast.AST] = []
if getattr(module, "__all__", None) == [] and ast.get_docstring(tree):
violations.append(f"{source_path}:1:<module>")
class PrivateDocstringVisitor(ast.NodeVisitor):
def visit_ClassDef(self, node: ast.ClassDef) -> None:
self._check_docstring(node)
stack.append(node)
self.generic_visit(node)
stack.pop()
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
self._check_docstring(node)
stack.append(node)
self.generic_visit(node)
stack.pop()
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
self._check_docstring(node)
stack.append(node)
self.generic_visit(node)
stack.pop()
def _check_docstring(
self,
node: ast.ClassDef | ast.FunctionDef | ast.AsyncFunctionDef,
) -> None:
is_dunder = node.name.startswith("__") and node.name.endswith("__")
is_private_name = node.name.startswith("_") and not is_dunder
is_nested_function = any(
isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef))
for parent in stack
)
if (is_private_name or is_nested_function) and ast.get_docstring(node):
violations.append(f"{source_path}:{node.lineno}:{node.name}")
PrivateDocstringVisitor().visit(tree)
assert violations == []
def test_flow_definition_contract_is_dsl_agnostic():
source_path = Path(inspect.getsourcefile(flow_definition) or "")
source = source_path.read_text()
assert "DSL" not in source
assert "flow_wrappers" not in source
assert "build_flow_definition" not in source
assert "extract_flow_definition" not in source
def test_flow_definition_maps_dsl_to_static_contract():
class ContractState(BaseModel):
topic: str = ""
class ContractFlow(Flow[ContractState]):
"""A flow with every core DSL role."""
initial_state = ContractState
stream = True
max_method_calls = 7
@start()
def begin(self):
return "started"
@listen(begin)
def process(self):
return "processed"
@router(process)
def decide(self):
return "approved"
@listen(or_("approved", "revise"))
@human_feedback(
message="Review this output.",
emit=["done", "revise"],
llm="gpt-4o-mini",
default_outcome="done",
metadata={"team": "qa"},
learn=True,
learn_source="hitl",
learn_strict=True,
)
def review(self):
return "review"
@listen(and_(begin, process))
def audit(self):
return "audit"
definition = ContractFlow.flow_definition()
assert definition.schema_ == "crewai.flow/v1"
assert definition.name == "ContractFlow"
assert definition.description == "A flow with every core DSL role."
assert definition.state is not None
assert definition.state.type == "pydantic"
assert definition.state.ref and "ContractState" in definition.state.ref
assert definition.config.stream is True
assert definition.config.max_method_calls == 7
assert definition.methods["begin"].start is True
assert definition.methods["process"].listen == "begin"
decide = definition.methods["decide"]
assert decide.listen == "process"
assert decide.router is True
assert decide.returns is None
review = definition.methods["review"]
assert review.listen == {"or": ["approved", "revise"]}
assert review.router is True
assert review.returns is None
assert review.human_feedback is not None
assert review.human_feedback.emit == ["done", "revise"]
assert review.human_feedback.default_outcome == "done"
assert review.human_feedback.metadata == {"team": "qa"}
assert review.human_feedback.learn is True
assert review.human_feedback.learn_strict is True
assert definition.methods["audit"].listen == {"and": ["begin", "process"]}
assert definition.diagnostics == []
def test_flow_definition_classifies_start_router_from_human_feedback_emit():
class StartRouterFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["continue", "stop"],
llm="gpt-4o-mini",
)
def entry_point(self):
return "data"
@listen("continue")
def proceed(self):
return "proceeding"
@listen("stop")
def halt(self):
return "halted"
definition = StartRouterFlow.flow_definition()
entry_point = definition.methods["entry_point"]
assert entry_point.is_start is True
assert entry_point.router is True
assert entry_point.human_feedback is not None
assert entry_point.human_feedback.emit == ["continue", "stop"]
assert entry_point.returns is None
def test_flow_definition_round_trips_json_and_yaml():
class RoundTripFlow(Flow):
@start()
def begin(self):
return "started"
@router(begin)
def decide(self):
return "left"
@listen("left")
def left(self):
return "left"
definition = RoundTripFlow.flow_definition()
json_round_trip = flow_definition.FlowDefinition.from_json(definition.to_json())
yaml_round_trip = flow_definition.FlowDefinition.from_yaml(definition.to_yaml())
assert json_round_trip.to_dict() == definition.to_dict()
assert yaml_round_trip.to_dict() == definition.to_dict()
assert yaml_round_trip.methods["decide"].router is True
assert yaml_round_trip.methods["decide"].listen == "begin"
def test_flow_definition_detects_persist_metadata():
@persist(verbose=True)
class PersistedFlow(Flow[dict]):
initial_state = {}
@start()
def begin(self):
return "started"
@persist(verbose=False)
@listen(begin)
def checkpoint(self):
return "saved"
definition = PersistedFlow.flow_definition()
assert definition.persist is not None
assert definition.persist.enabled is True
assert definition.persist.verbose is True
assert definition.methods["begin"].persist is None
method_persist = definition.methods["checkpoint"].persist
assert method_persist is not None
assert method_persist.enabled is True
assert method_persist.verbose is False
def test_flow_definition_allows_dynamic_router_returns(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
class DynamicRouterFlow(Flow):
@start()
def begin(self):
return "started"
@router(begin)
def decide(self):
return self.state["dynamic_path"]
definition = DynamicRouterFlow.flow_definition()
assert definition.methods["decide"].returns is None
assert definition.diagnostics == []
assert caplog.records == []
def test_flow_definition_infers_literal_router_returns():
class LiteralRouterFlow(Flow):
@start()
def begin(self):
return "started"
@router(begin)
def decide(self) -> Literal["left", "right"]:
return "left"
@listen("left")
def left(self):
return "left"
@listen("right")
def right(self):
return "right"
definition = LiteralRouterFlow.flow_definition()
assert definition.methods["decide"].returns == ["left", "right"]
def test_flow_definition_infers_enum_router_returns():
class Decision(str, Enum):
APPROVE = "approve"
REJECT = "reject"
class EnumRouterFlow(Flow):
@start()
def begin(self):
return "started"
@router(begin)
def decide(self) -> Decision:
return Decision.APPROVE
@listen("approve")
def approve(self):
return "approve"
@listen("reject")
def reject(self):
return "reject"
definition = EnumRouterFlow.flow_definition()
assert definition.methods["decide"].returns == ["approve", "reject"]
def test_flow_definition_infers_literal_union_router_returns():
class LiteralUnionRouterFlow(Flow):
@start()
def begin(self):
return "started"
@router(begin)
def decide(self) -> Literal["left"] | Literal["right"]:
return "left"
@listen("left")
def left(self):
return "left"
@listen("right")
def right(self):
return "right"
definition = LiteralUnionRouterFlow.flow_definition()
assert definition.methods["decide"].returns == ["left", "right"]
def test_flow_definition_does_not_infer_unannotated_router_body_returns():
class UnannotatedRouterFlow(Flow):
@start()
def begin(self):
return "started"
@router(begin)
def decide(self):
return "left"
@listen("left")
def left(self):
return "left"
definition = UnannotatedRouterFlow.flow_definition()
assert definition.methods["decide"].returns is None
def test_flow_definition_accepts_explicit_router_paths():
class ExplicitRouterFlow(Flow):
@start()
def begin(self):
return "started"
@router(begin, paths=["left", "right", "left"])
def decide(self):
return self.state["dynamic_path"]
@listen("left")
def left(self):
return "left"
@listen("right")
def right(self):
return "right"
definition = ExplicitRouterFlow.flow_definition()
assert definition.methods["decide"].returns == ["left", "right"]
def test_flow_definition_preserves_diagnostics_loaded_from_contract():
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedDiagnosticsFlow",
"methods": {
"decision": {
"router": True,
"returns": ["continue"],
}
},
"diagnostics": [
{
"code": "serialized_warning",
"message": "Preserved serialized diagnostic",
"severity": "warning",
"path": "methods.decision",
},
{
"code": "router_without_trigger",
"message": "router: true requires either start or listen",
"severity": "error",
"path": "methods.decision",
},
],
}
)
codes = [diagnostic.code for diagnostic in definition.diagnostics]
assert "serialized_warning" in codes
assert codes.count("router_without_trigger") == 1
def test_router_human_feedback_preserves_existing_router_metadata():
class RouterHumanFeedbackFlow(Flow):
@start()
def begin(self):
return "started"
@human_feedback(message="Review route:")
@router(begin, paths=["approved", "rejected"])
def decide(self):
return "approved"
@listen("approved")
def approved(self):
return "approved"
definition = RouterHumanFeedbackFlow.flow_definition()
method = definition.methods["decide"]
assert method.router is True
assert method.listen == "begin"
assert method.returns == ["approved", "rejected"]
assert method.human_feedback is not None
def test_dynamic_router_does_not_log_at_class_definition_time(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
class LazyDynamicRouterFlow(Flow):
@start()
def begin(self):
return "started"
@router(begin)
def decide(self):
return self.state["dynamic_path"]
# No diagnostics should be logged merely by defining the class -- the
# FlowDefinition is built lazily.
assert caplog.records == []
# Explicit access still should not log visualization-only diagnostics.
definition = LazyDynamicRouterFlow.flow_definition()
assert definition.diagnostics == []
assert caplog.records == []
def test_dynamic_router_string_listener_is_valid_contract():
class DynamicRouterListenerFlow(Flow):
@start()
def begin(self):
return "started"
@router(begin)
def decide(self):
return self.state["dynamic_path"]
@listen("dynamic_path")
def handle(self):
return "handled"
definition = DynamicRouterListenerFlow.flow_definition()
assert definition.diagnostics == []
def test_static_string_listener_is_allowed_by_contract():
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "TypoFlow",
"methods": {
"begin": {"start": True},
"handle": {"listen": "begni"},
},
}
)
assert definition.diagnostics == []
def test_start_false_not_classified_as_start_method():
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "ExplicitNonStartFlow",
"methods": {
"begin": {"start": True},
"handle": {"start": False, "listen": "begin"},
},
}
)
assert definition.methods["begin"].is_start is True
assert definition.methods["handle"].is_start is False
class ExplicitNonStartFlow(Flow):
@start()
def begin(self):
return "started"
@listen(begin)
def handle(self):
return "handled"
# Attach the loaded contract (with explicit ``start: false``) so the
# projections read from it rather than rebuilding from the DSL.
ExplicitNonStartFlow._flow_definition = definition
flow = ExplicitNonStartFlow()
viz_structure = visualization_builder.build_flow_structure(flow)
assert "handle" not in viz_structure["start_methods"]
assert viz_structure["nodes"]["handle"]["type"] != "start"
def test_flow_definition_cache_is_not_inherited_by_subclasses():
class ParentFlow(Flow):
@start()
def begin(self):
return "begin"
parent_definition = ParentFlow.flow_definition()
class ChildFlow(ParentFlow):
@listen(ParentFlow.begin)
def child_step(self):
return "child"
child_definition = ChildFlow.flow_definition()
assert parent_definition.name == "ParentFlow"
assert child_definition.name == "ChildFlow"
assert child_definition is not parent_definition
assert set(child_definition.methods) == {"begin", "child_step"}
def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedFlow",
"methods": {
"decision": {
"router": True,
"returns": ["continue"],
}
},
}
)
assert any(
diagnostic.code == "router_without_trigger"
for diagnostic in definition.diagnostics
)
assert any(
record.levelno == logging.ERROR
and "LoadedFlow" in record.message
and "router_without_trigger" in record.message
for record in caplog.records
)

View File

@@ -0,0 +1,818 @@
"""Tests for flow_serializer.py - Flow structure serialization for Studio UI."""
from typing import Literal
import pytest
from pydantic import BaseModel, Field
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_serializer import flow_structure
from crewai.flow.human_feedback import human_feedback
class TestSimpleLinearFlow:
"""Test simple linear flow (start → listen → listen)."""
def test_linear_flow_structure(self):
"""Test a simple sequential flow structure."""
class LinearFlow(Flow):
"""A simple linear flow for testing."""
@start()
def begin(self):
return "started"
@listen(begin)
def process(self):
return "processed"
@listen(process)
def finalize(self):
return "done"
structure = flow_structure(LinearFlow)
assert structure["name"] == "LinearFlow"
assert structure["description"] == "A simple linear flow for testing."
assert len(structure["methods"]) == 3
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["begin"]["type"] == "start"
assert method_map["process"]["type"] == "listen"
assert method_map["finalize"]["type"] == "listen"
assert len(structure["edges"]) == 2
edge_pairs = [(e["from_method"], e["to_method"]) for e in structure["edges"]]
assert ("begin", "process") in edge_pairs
assert ("process", "finalize") in edge_pairs
for edge in structure["edges"]:
assert edge["edge_type"] == "listen"
assert edge["condition"] is None
class TestRouterFlow:
"""Test flow with router branching."""
def test_router_flow_structure(self):
"""Test a flow with router that branches to different paths."""
class BranchingFlow(Flow):
@start()
def init(self):
return "initialized"
@router(init)
def decide(self) -> Literal["path_a", "path_b"]:
return "path_a"
@listen("path_a")
def handle_a(self):
return "handled_a"
@listen("path_b")
def handle_b(self):
return "handled_b"
structure = flow_structure(BranchingFlow)
assert structure["name"] == "BranchingFlow"
assert len(structure["methods"]) == 4
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["init"]["type"] == "start"
assert method_map["decide"]["type"] == "router"
assert method_map["handle_a"]["type"] == "listen"
assert method_map["handle_b"]["type"] == "listen"
assert "path_a" in method_map["decide"]["router_paths"]
assert "path_b" in method_map["decide"]["router_paths"]
# Should have: init -> decide (listen), decide -> handle_a (route), decide -> handle_b (route)
listen_edges = [e for e in structure["edges"] if e["edge_type"] == "listen"]
route_edges = [e for e in structure["edges"] if e["edge_type"] == "route"]
assert len(listen_edges) == 1
assert listen_edges[0]["from_method"] == "init"
assert listen_edges[0]["to_method"] == "decide"
assert len(route_edges) == 2
route_targets = {e["to_method"] for e in route_edges}
assert "handle_a" in route_targets
assert "handle_b" in route_targets
route_conditions = {e["to_method"]: e["condition"] for e in route_edges}
assert route_conditions["handle_a"] == "path_a"
assert route_conditions["handle_b"] == "path_b"
class TestAndOrConditions:
"""Test flow with AND/OR conditions."""
def test_and_condition_flow(self):
"""Test a flow where a method waits for multiple methods (AND)."""
class AndConditionFlow(Flow):
@start()
def step_a(self):
return "a"
@start()
def step_b(self):
return "b"
@listen(and_(step_a, step_b))
def converge(self):
return "converged"
structure = flow_structure(AndConditionFlow)
assert len(structure["methods"]) == 3
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["step_a"]["type"] == "start"
assert method_map["step_b"]["type"] == "start"
assert method_map["converge"]["type"] == "listen"
assert method_map["converge"]["condition_type"] == "AND"
triggers = method_map["converge"]["trigger_methods"]
assert "step_a" in triggers
assert "step_b" in triggers
converge_edges = [e for e in structure["edges"] if e["to_method"] == "converge"]
assert len(converge_edges) == 2
def test_or_condition_flow(self):
"""Test a flow where a method is triggered by any of multiple methods (OR)."""
class OrConditionFlow(Flow):
@start()
def path_1(self):
return "1"
@start()
def path_2(self):
return "2"
@listen(or_(path_1, path_2))
def handle_any(self):
return "handled"
structure = flow_structure(OrConditionFlow)
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["handle_any"]["condition_type"] == "OR"
triggers = method_map["handle_any"]["trigger_methods"]
assert "path_1" in triggers
assert "path_2" in triggers
class TestHumanFeedbackMethods:
"""Test flow with @human_feedback decorated methods."""
def test_human_feedback_detection(self):
"""Test that human feedback methods are correctly identified."""
class HumanFeedbackFlow(Flow):
@start()
@human_feedback(
message="Please review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review_step(self):
return "content to review"
@listen("approved")
def handle_approved(self):
return "approved"
@listen("rejected")
def handle_rejected(self):
return "rejected"
structure = flow_structure(HumanFeedbackFlow)
method_map = {m["name"]: m for m in structure["methods"]}
# review_step should have human feedback
assert method_map["review_step"]["has_human_feedback"] is True
# It's a start+router (due to emit)
assert method_map["review_step"]["type"] == "start_router"
assert "approved" in method_map["review_step"]["router_paths"]
assert "rejected" in method_map["review_step"]["router_paths"]
# Other methods should not have human feedback
assert method_map["handle_approved"]["has_human_feedback"] is False
assert method_map["handle_rejected"]["has_human_feedback"] is False
def test_listen_plus_human_feedback_router_edges(self):
"""Test that @listen + @human_feedback(emit=...) generates router edges.
This is the pattern used in the whitepaper generator:
a listener method that also acts as a router via @human_feedback(emit=[...]).
The serializer must generate edges from this method to listeners of its emit paths.
"""
class ReviewFlow(Flow):
@start()
def generate(self):
return "content"
@listen(generate)
@human_feedback(
message="Review this:",
emit=["approved", "needs_changes", "cancelled"],
llm="gpt-4o-mini",
)
def review(self):
return "review result"
@listen("approved")
def handle_approved(self):
return "done"
@listen("needs_changes")
def handle_changes(self):
return "regenerating"
@listen("cancelled")
def handle_cancelled(self):
return "cancelled"
structure = flow_structure(ReviewFlow)
method_map = {m["name"]: m for m in structure["methods"]}
edge_set = {(e["from_method"], e["to_method"], e.get("condition")) for e in structure["edges"]}
# review should be detected as a router with the emit paths
assert method_map["review"]["type"] == "router"
assert set(method_map["review"]["router_paths"]) == {"approved", "needs_changes", "cancelled"}
assert method_map["review"]["has_human_feedback"] is True
assert ("generate", "review", None) in edge_set
assert ("review", "handle_approved", "approved") in edge_set
assert ("review", "handle_changes", "needs_changes") in edge_set
assert ("review", "handle_cancelled", "cancelled") in edge_set
class TestCrewReferences:
"""Test detection of Crew references in method bodies."""
def test_crew_detection_with_crew_call(self):
"""Test that .crew() calls are detected."""
class FlowWithCrew(Flow):
@start()
def run_crew(self):
return "result"
@listen(run_crew)
def no_crew(self):
return "done"
structure = flow_structure(FlowWithCrew)
method_map = {m["name"]: m for m in structure["methods"]}
# Note: Since the actual .crew() call is in a comment/string,
# We're testing the mechanism exists.
assert "has_crew" in method_map["run_crew"]
assert "has_crew" in method_map["no_crew"]
def test_no_crew_when_absent(self):
"""Test that methods without Crew refs return has_crew=False."""
class SimpleNonCrewFlow(Flow):
@start()
def calculate(self):
return 1 + 1
@listen(calculate)
def display(self):
return "result"
structure = flow_structure(SimpleNonCrewFlow)
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["calculate"]["has_crew"] is False
assert method_map["display"]["has_crew"] is False
class TestTypedStateSchema:
"""Test flow with typed Pydantic state."""
def test_pydantic_state_schema_extraction(self):
"""Test extracting state schema from a Flow with Pydantic state."""
class MyState(BaseModel):
counter: int = 0
message: str = ""
items: list[str] = Field(default_factory=list)
class TypedStateFlow(Flow[MyState]):
initial_state = MyState
@start()
def increment(self):
self.state.counter += 1
return self.state.counter
@listen(increment)
def display(self):
return f"Count: {self.state.counter}"
structure = flow_structure(TypedStateFlow)
assert structure["state_schema"] is not None
fields = structure["state_schema"]["fields"]
field_names = {f["name"] for f in fields}
assert "counter" in field_names
assert "message" in field_names
assert "items" in field_names
field_map = {f["name"]: f for f in fields}
assert "int" in field_map["counter"]["type"]
assert "str" in field_map["message"]["type"]
assert field_map["counter"]["default"] == 0
assert field_map["message"]["default"] == ""
def test_dict_state_returns_none(self):
"""Test that flows using dict state return None for state_schema."""
class DictStateFlow(Flow):
@start()
def begin(self):
self.state["count"] = 1
return "started"
structure = flow_structure(DictStateFlow)
assert structure["state_schema"] is None
class TestEdgeCases:
"""Test edge cases and special scenarios."""
def test_start_router_combo(self):
"""Test a method that is both @start and a router (via human_feedback emit)."""
class StartRouterFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["continue", "stop"],
llm="gpt-4o-mini",
)
def entry_point(self):
return "data"
@listen("continue")
def proceed(self):
return "proceeding"
@listen("stop")
def halt(self):
return "halted"
structure = flow_structure(StartRouterFlow)
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["entry_point"]["type"] == "start_router"
assert method_map["entry_point"]["has_human_feedback"] is True
assert "continue" in method_map["entry_point"]["router_paths"]
assert "stop" in method_map["entry_point"]["router_paths"]
def test_multiple_start_methods(self):
"""Test a flow with multiple start methods."""
class MultiStartFlow(Flow):
@start()
def start_a(self):
return "a"
@start()
def start_b(self):
return "b"
@listen(and_(start_a, start_b))
def combine(self):
return "combined"
structure = flow_structure(MultiStartFlow)
start_methods = [m for m in structure["methods"] if m["type"] == "start"]
assert len(start_methods) == 2
start_names = {m["name"] for m in start_methods}
assert "start_a" in start_names
assert "start_b" in start_names
def test_orphan_methods(self):
"""Test that orphan methods (not connected to flow) are still captured."""
class FlowWithOrphan(Flow):
@start()
def begin(self):
return "started"
@listen(begin)
def connected(self):
return "connected"
@listen("never_triggered")
def orphan(self):
return "orphan"
structure = flow_structure(FlowWithOrphan)
method_names = {m["name"] for m in structure["methods"]}
assert "orphan" in method_names
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["orphan"]["trigger_methods"] == ["never_triggered"]
def test_empty_flow(self):
"""Test building structure for a flow with no methods."""
class EmptyFlow(Flow):
pass
structure = flow_structure(EmptyFlow)
assert structure["name"] == "EmptyFlow"
assert structure["methods"] == []
assert structure["edges"] == []
assert structure["state_schema"] is None
def test_flow_with_docstring(self):
"""Test that flow docstring is captured."""
class DocumentedFlow(Flow):
"""This is a well-documented flow.
It has multiple lines of documentation.
"""
@start()
def begin(self):
return "started"
structure = flow_structure(DocumentedFlow)
assert structure["description"] is not None
assert "well-documented flow" in structure["description"]
def test_flow_without_docstring(self):
"""Test that missing docstring returns None."""
class UndocumentedFlow(Flow):
@start()
def begin(self):
return "started"
structure = flow_structure(UndocumentedFlow)
assert structure["description"] is None
def test_nested_conditions(self):
"""Test flow with nested AND/OR conditions."""
class NestedConditionFlow(Flow):
@start()
def a(self):
return "a"
@start()
def b(self):
return "b"
@start()
def c(self):
return "c"
@listen(or_(and_(a, b), c))
def complex_trigger(self):
return "triggered"
structure = flow_structure(NestedConditionFlow)
method_map = {m["name"]: m for m in structure["methods"]}
triggers = method_map["complex_trigger"]["trigger_methods"]
assert len(triggers) == 3
assert "a" in triggers
assert "b" in triggers
assert "c" in triggers
class TestErrorHandling:
"""Test error handling and validation."""
def test_instance_raises_type_error(self):
"""Test that passing an instance raises TypeError."""
class TestFlow(Flow):
@start()
def begin(self):
return "started"
flow_instance = TestFlow()
with pytest.raises(TypeError) as exc_info:
flow_structure(flow_instance)
assert "requires a Flow class, not an instance" in str(exc_info.value)
def test_non_class_raises_type_error(self):
"""Test that passing non-class raises TypeError."""
with pytest.raises(TypeError):
flow_structure("not a class")
with pytest.raises(TypeError):
flow_structure(123)
class TestEdgeGeneration:
"""Test edge generation in various scenarios."""
def test_all_edges_generated_correctly(self):
"""Verify all edges are correctly generated for a complex flow."""
class ComplexFlow(Flow):
@start()
def entry(self):
return "started"
@listen(entry)
def step_1(self):
return "step_1"
@router(step_1)
def branch(self) -> Literal["left", "right"]:
return "left"
@listen("left")
def left_path(self):
return "left_done"
@listen("right")
def right_path(self):
return "right_done"
@listen(or_(left_path, right_path))
def converge(self):
return "done"
structure = flow_structure(ComplexFlow)
edges = structure["edges"]
listen_edges = [(e["from_method"], e["to_method"]) for e in edges if e["edge_type"] == "listen"]
assert ("entry", "step_1") in listen_edges
assert ("step_1", "branch") in listen_edges
assert ("left_path", "converge") in listen_edges
assert ("right_path", "converge") in listen_edges
route_edges = [(e["from_method"], e["to_method"], e["condition"]) for e in edges if e["edge_type"] == "route"]
assert ("branch", "left_path", "left") in route_edges
assert ("branch", "right_path", "right") in route_edges
def test_router_edge_conditions(self):
"""Test that router edge conditions are properly set."""
class RouterConditionFlow(Flow):
@start()
def begin(self):
return "start"
@router(begin)
def route(self) -> Literal["option_1", "option_2", "option_3"]:
return "option_1"
@listen("option_1")
def handle_1(self):
return "1"
@listen("option_2")
def handle_2(self):
return "2"
@listen("option_3")
def handle_3(self):
return "3"
structure = flow_structure(RouterConditionFlow)
route_edges = [e for e in structure["edges"] if e["edge_type"] == "route"]
assert len(route_edges) == 3
conditions = {e["to_method"]: e["condition"] for e in route_edges}
assert conditions["handle_1"] == "option_1"
assert conditions["handle_2"] == "option_2"
assert conditions["handle_3"] == "option_3"
class TestMethodTypeClassification:
"""Test method type classification."""
def test_all_method_types(self):
"""Test classification of all method types."""
class AllTypesFlow(Flow):
@start()
def start_only(self):
return "start"
@listen(start_only)
def listen_only(self):
return "listen"
@router(listen_only)
def router_only(self) -> Literal["path"]:
return "path"
@listen("path")
def after_router(self):
return "after"
@start()
@human_feedback(
message="Review",
emit=["yes", "no"],
llm="gpt-4o-mini",
)
def start_and_router(self):
return "data"
structure = flow_structure(AllTypesFlow)
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["start_only"]["type"] == "start"
assert method_map["listen_only"]["type"] == "listen"
assert method_map["router_only"]["type"] == "router"
assert method_map["after_router"]["type"] == "listen"
assert method_map["start_and_router"]["type"] == "start_router"
class TestInputDetection:
"""Test flow input detection."""
def test_inputs_list_exists(self):
"""Test that inputs list is always present."""
class SimpleFlow(Flow):
@start()
def begin(self):
return "started"
structure = flow_structure(SimpleFlow)
assert "inputs" in structure
assert isinstance(structure["inputs"], list)
class TestJsonSerializable:
"""Test that output is JSON serializable."""
def test_structure_is_json_serializable(self):
"""Test that the entire structure can be JSON serialized."""
import json
class MyState(BaseModel):
value: int = 0
class SerializableFlow(Flow[MyState]):
"""Test flow for JSON serialization."""
initial_state = MyState
@start()
@human_feedback(
message="Review",
emit=["ok", "not_ok"],
llm="gpt-4o-mini",
)
def begin(self):
return "data"
@listen("ok")
def proceed(self):
return "done"
structure = flow_structure(SerializableFlow)
json_str = json.dumps(structure)
assert json_str is not None
parsed = json.loads(json_str)
assert parsed["name"] == "SerializableFlow"
assert len(parsed["methods"]) > 0
class TestFlowInheritance:
"""Test flow inheritance scenarios."""
def test_child_flow_inherits_parent_methods(self):
"""Test that FlowB inheriting from FlowA includes methods from both.
Note: FlowMeta propagates methods but does NOT fully propagate the
_listeners registry from parent classes. This means edges defined
in the parent class (e.g., parent_start -> parent_process) may not
appear in the child's structure. This is a known FlowMeta limitation.
"""
class FlowA(Flow):
"""Parent flow with start method."""
@start()
def parent_start(self):
return "parent started"
@listen(parent_start)
def parent_process(self):
return "parent processed"
class FlowB(FlowA):
"""Child flow with additional methods."""
@listen(FlowA.parent_process)
def child_continue(self):
return "child continued"
@listen(child_continue)
def child_finalize(self):
return "child finalized"
structure = flow_structure(FlowB)
assert structure["name"] == "FlowB"
method_names = {m["name"] for m in structure["methods"]}
assert "parent_start" in method_names
assert "parent_process" in method_names
assert "child_continue" in method_names
assert "child_finalize" in method_names
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["parent_start"]["type"] == "start"
assert method_map["parent_process"]["type"] == "listen"
assert method_map["child_continue"]["type"] == "listen"
assert method_map["child_finalize"]["type"] == "listen"
edge_pairs = [(e["from_method"], e["to_method"]) for e in structure["edges"]]
assert ("parent_process", "child_continue") in edge_pairs
assert ("child_continue", "child_finalize") in edge_pairs
# KNOWN LIMITATION: Edges defined in parent class (parent_start -> parent_process)
# are NOT propagated to child's _listeners registry by FlowMeta.
# This is a FlowMeta limitation, not a serializer bug.
def test_child_flow_can_override_parent_method(self):
"""Test that child can override parent methods."""
class BaseFlow(Flow):
@start()
def begin(self):
return "base begin"
@listen(begin)
def process(self):
return "base process"
class ExtendedFlow(BaseFlow):
@listen(BaseFlow.begin)
def process(self):
return "extended process"
@listen(process)
def finalize(self):
return "extended finalize"
structure = flow_structure(ExtendedFlow)
method_names = {m["name"] for m in structure["methods"]}
assert "begin" in method_names
assert "process" in method_names
assert "finalize" in method_names
# Should have 3 methods total (not 4, since process is overridden)
assert len(structure["methods"]) == 3

View File

@@ -8,7 +8,6 @@ from pathlib import Path
import pytest
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_definition import FlowDefinition
from crewai.flow.visualization import (
build_flow_structure,
visualize_flow_structure,
@@ -77,16 +76,6 @@ class ComplexFlow(Flow):
return "complete"
def _attach_flow_definition(flow_class: type[Flow], methods: dict[str, object]) -> None:
flow_class._flow_definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": flow_class.__name__,
"methods": methods,
}
)
def test_build_flow_structure_simple():
"""Test building structure for a simple sequential flow."""
flow = SimpleFlow()
@@ -109,47 +98,6 @@ def test_build_flow_structure_simple():
assert edge["condition_type"] == "OR"
def test_build_flow_structure_from_flow_class():
"""Test building structure from a Flow class via its FlowDefinition."""
structure = build_flow_structure(SimpleFlow)
assert set(structure["nodes"]) == {"begin", "process"}
assert structure["start_methods"] == ["begin"]
assert structure["nodes"]["begin"]["class_name"] == "SimpleFlow"
def test_build_flow_structure_from_flow_definition():
"""Test building visualization directly from a FlowDefinition."""
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "DefinedFlow",
"methods": {
"begin": {"start": True},
"decide": {
"listen": "begin",
"router": True,
"returns": ["done"],
},
"finish": {"listen": "done"},
},
}
)
structure = build_flow_structure(definition)
assert set(structure["nodes"]) == {"begin", "decide", "finish"}
assert structure["start_methods"] == ["begin"]
assert structure["router_methods"] == ["decide"]
assert structure["nodes"]["begin"]["class_name"] == "DefinedFlow"
assert any(
edge["source"] == "decide"
and edge["target"] == "finish"
and edge["router_path_label"] == "done"
for edge in structure["edges"]
)
def test_build_flow_structure_with_router():
"""Test building structure for a flow with router."""
flow = RouterFlow()
@@ -163,10 +111,13 @@ def test_build_flow_structure_with_router():
router_node = structure["nodes"]["decide"]
assert router_node["type"] == "router"
assert "router_paths" not in router_node
if "router_paths" in router_node:
assert len(router_node["router_paths"]) >= 1
assert any("path" in path for path in router_node["router_paths"])
router_edges = [edge for edge in structure["edges"] if edge["is_router_path"]]
assert router_edges == []
assert len(router_edges) >= 1
def test_build_flow_structure_with_and_or_conditions():
@@ -256,25 +207,31 @@ def test_visualize_flow_structure_json_data():
assert "path_b" in js_content
def test_node_metadata_omits_source_info():
"""Test that definition-only visualization omits Python source metadata."""
def test_node_metadata_includes_source_info():
"""Test that nodes include source code and line number information."""
flow = SimpleFlow()
structure = build_flow_structure(flow)
for node_metadata in structure["nodes"].values():
assert "source_code" not in node_metadata
assert "source_lines" not in node_metadata
assert "source_start_line" not in node_metadata
assert "source_file" not in node_metadata
for node_name, node_metadata in structure["nodes"].items():
assert node_metadata["source_code"] is not None
assert len(node_metadata["source_code"]) > 0
assert node_metadata["source_start_line"] is not None
assert node_metadata["source_start_line"] > 0
assert node_metadata["source_file"] is not None
assert node_metadata["source_file"].endswith(".py")
def test_node_metadata_omits_method_signature():
"""Test that definition-only visualization omits Python method signatures."""
def test_node_metadata_includes_method_signature():
"""Test that nodes include method signature information."""
flow = SimpleFlow()
structure = build_flow_structure(flow)
begin_node = structure["nodes"]["begin"]
assert "method_signature" not in begin_node
assert begin_node["method_signature"] is not None
assert "operationId" in begin_node["method_signature"]
assert begin_node["method_signature"]["operationId"] == "begin"
assert "parameters" in begin_node["method_signature"]
assert "returns" in begin_node["method_signature"]
def test_router_node_has_correct_metadata():
@@ -285,7 +242,10 @@ def test_router_node_has_correct_metadata():
router_node = structure["nodes"]["decide"]
assert router_node["type"] == "router"
assert router_node["is_router"] is True
assert "router_paths" not in router_node
assert router_node["router_paths"] is not None
assert len(router_node["router_paths"]) == 2
assert "path_a" in router_node["router_paths"]
assert "path_b" in router_node["router_paths"]
def test_listen_node_has_trigger_methods():
@@ -357,15 +317,16 @@ def test_topological_path_counting():
assert len(structure["edges"]) > 0
def test_class_metadata_comes_from_definition():
"""Test that nodes include only definition-derived class metadata."""
def test_class_signature_metadata():
"""Test that nodes include class signature information."""
flow = SimpleFlow()
structure = build_flow_structure(flow)
for node_metadata in structure["nodes"].values():
for node_name, node_metadata in structure["nodes"].items():
assert node_metadata["class_name"] is not None
assert node_metadata["class_name"] == "SimpleFlow"
assert "class_signature" not in node_metadata
assert node_metadata["class_signature"] is not None
assert "SimpleFlow" in node_metadata["class_signature"]
def test_visualization_plot_method():
@@ -400,15 +361,6 @@ def test_router_paths_to_string_conditions():
return "handled_b"
flow = RouterToStringFlow()
_attach_flow_definition(
RouterToStringFlow,
{
"init": {"start": True},
"decide": {"listen": "init", "router": True, "returns": ["path_a", "path_b"]},
"handle_either": {"listen": {"or": ["path_a", "path_b"]}},
"handle_b_only": {"listen": "path_b"},
},
)
structure = build_flow_structure(flow)
decide_node = structure["nodes"]["decide"]
@@ -455,16 +407,6 @@ def test_router_paths_not_in_and_conditions():
return "step_3_done"
flow = RouterAndConditionFlow()
_attach_flow_definition(
RouterAndConditionFlow,
{
"init": {"start": True},
"decide": {"listen": "init", "router": True, "returns": ["path_a"]},
"step_1": {"listen": "path_a"},
"step_2_and": {"listen": {"and": ["path_a", "step_1"]}},
"step_3_or": {"listen": {"or": [{"and": ["path_a", "step_1"]}, "path_a"]}},
},
)
structure = build_flow_structure(flow)
router_edges = [edge for edge in structure["edges"] if edge["is_router_path"]]
@@ -476,66 +418,6 @@ def test_router_paths_not_in_and_conditions():
assert "step_2_and" not in targets
def test_analysis_router_paths_only_match_direct_or_triggers():
"""Test analysis helpers align router paths with builder direct-OR semantics."""
from crewai.flow.visualization.analysis import (
_build_parent_children_dict,
_calculate_node_levels,
_count_outgoing_edges,
)
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "RouterAndAnalysisFlow",
"methods": {
"init": {"start": True},
"decide": {
"listen": "init",
"router": True,
"returns": ["path_a"],
},
"step_1": {"listen": "path_a"},
"step_2_and": {"listen": {"and": ["path_a", "step_1"]}},
"step_3_or": {
"listen": {"or": [{"and": ["path_a", "step_1"]}, "path_a"]}
},
},
}
)
parent_children = _build_parent_children_dict(definition)
levels = _calculate_node_levels(definition)
outgoing_edges = _count_outgoing_edges(definition)
assert parent_children["decide"] == ["step_1", "step_3_or"]
assert levels["step_2_and"] > levels["step_1"]
assert outgoing_edges["decide"] == 2
def test_analysis_ancestors_propagate_after_late_parent_merge():
"""Test merged graph ancestors continue downstream after a later parent."""
from crewai.flow.visualization.analysis import _build_ancestor_dict
definition = FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "MergedAncestorFlow",
"methods": {
"A": {"start": True},
"B": {"listen": "A"},
"D": {"listen": {"or": ["B", "C"]}},
"E": {"listen": "D"},
"C": {"listen": "A"},
},
}
)
ancestors = _build_ancestor_dict(definition)
assert ancestors["E"] == {"A", "B", "C", "D"}
def test_chained_routers_no_self_loops():
"""Test that chained routers don't create self-referencing edges.
@@ -572,17 +454,6 @@ def test_chained_routers_no_self_loops():
return "need_auth"
flow = ChainedRouterFlow()
_attach_flow_definition(
ChainedRouterFlow,
{
"entrance": {"start": True},
"session_in_cache": {"listen": "entrance", "router": True, "returns": ["exp"]},
"check_exp": {"listen": "exp", "router": True, "returns": ["auth"]},
"call_ai_auth": {"listen": "auth", "router": True, "returns": ["action"]},
"forward_to_action": {"listen": "action"},
"forward_to_authenticate": {"listen": "authenticate"},
},
)
structure = build_flow_structure(flow)
for edge in structure["edges"]:
@@ -652,16 +523,6 @@ def test_routers_with_shared_output_strings():
return "skipped"
flow = SharedOutputRouterFlow()
_attach_flow_definition(
SharedOutputRouterFlow,
{
"start": {"start": True},
"router_a": {"listen": "start", "router": True, "returns": ["auth"]},
"router_b": {"listen": "auth", "router": True, "returns": ["done"]},
"finalize": {"listen": "done"},
"handle_skip": {"listen": "skip"},
},
)
structure = build_flow_structure(flow)
for edge in structure["edges"]:
@@ -719,19 +580,18 @@ def test_warning_for_router_without_paths(caplog):
build_flow_structure(flow)
assert any(
"Router paths for 'dynamic_router' are dynamic" in record.message
"Could not determine return paths for router 'dynamic_router'" in record.message
for record in caplog.records
)
assert any(
"Static visualization could not match listener triggers" in record.message
"Found listeners waiting for triggers" in record.message
for record in caplog.records
)
assert not any(record.levelno >= logging.ERROR for record in caplog.records)
def test_warning_for_orphaned_listeners(caplog):
"""Test that a warning is logged when a trigger has no explicit router output."""
"""Test that an error is logged when listeners wait for triggers no router outputs."""
import logging
from typing import Literal
@@ -755,33 +615,19 @@ def test_warning_for_orphaned_listeners(caplog):
return "orphan"
flow = OrphanedListenerFlow()
_attach_flow_definition(
OrphanedListenerFlow,
{
"begin": {"start": True},
"my_router": {
"listen": "begin",
"router": True,
"returns": ["option_a", "option_b"],
},
"handle_a": {"listen": "option_a"},
"handle_orphan": {"listen": "option_c"},
},
)
with caplog.at_level(logging.WARNING):
with caplog.at_level(logging.ERROR):
build_flow_structure(flow)
assert any(
"Static visualization could not match listener triggers" in record.message
"Found listeners waiting for triggers" in record.message
and "option_c" in record.message
for record in caplog.records
)
assert not any(record.levelno >= logging.ERROR for record in caplog.records)
def test_no_warning_for_explicit_contract_router_paths(caplog):
"""Test no warning is logged when router paths are declared in the contract."""
def test_no_warning_for_properly_typed_router(caplog):
"""Test that no warning is logged when router has proper type annotations."""
import logging
from typing import Literal
@@ -805,27 +651,11 @@ def test_no_warning_for_explicit_contract_router_paths(caplog):
return "b"
flow = ProperlyTypedRouterFlow()
_attach_flow_definition(
ProperlyTypedRouterFlow,
{
"begin": {"start": True},
"typed_router": {
"listen": "begin",
"router": True,
"returns": ["path_a", "path_b"],
},
"handle_a": {"listen": "path_a"},
"handle_b": {"listen": "path_b"},
},
)
with caplog.at_level(logging.WARNING):
build_flow_structure(flow)
# No warnings should be logged
warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
assert not any("Router paths for" in msg for msg in warning_messages)
assert not any(
"Static visualization could not match listener triggers" in msg
for msg in warning_messages
)
assert not any("Could not determine return paths" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)

View File

@@ -13,7 +13,7 @@ from unittest.mock import MagicMock, patch
import pytest
from crewai.flow import Flow, human_feedback, listen, persist, start
from crewai.flow import Flow, human_feedback, listen, start
from crewai.flow.human_feedback import (
HumanFeedbackConfig,
HumanFeedbackResult,
@@ -91,22 +91,6 @@ class TestHumanFeedbackValidation:
assert hasattr(test_method, "__human_feedback_config__")
assert not hasattr(test_method, "__is_router__") or not test_method.__is_router__
def test_persist_preserves_hf_llm_attribute(self):
"""Test @persist preserves the live LLM stashed by @human_feedback."""
llm = object()
@persist()
@human_feedback(
message="Review this:",
emit=["approve", "reject"],
llm=llm,
)
def test_method(self):
return "output"
assert hasattr(test_method, "_hf_llm")
assert test_method._hf_llm is llm
class TestHumanFeedbackConfig:
"""Tests for HumanFeedbackConfig dataclass."""