Replace flow diagnostics with logging (#6212)

This commit removes flow diagnostics from the definition. These were
used for logging only, and should not be coupled to the definition.
This commit is contained in:
Vinicius Brasil
2026-06-17 19:37:52 -07:00
committed by GitHub
parent 7374486f00
commit 218dc82bf7
3 changed files with 127 additions and 241 deletions

View File

@@ -14,7 +14,6 @@ from crewai.flow.flow_definition import (
FlowConversationalDefinition,
FlowConversationalRouterDefinition,
FlowDefinition,
FlowDefinitionDiagnostic,
FlowDictStateDefinition,
FlowHumanFeedbackDefinition,
FlowMethodDefinition,
@@ -23,6 +22,7 @@ from crewai.flow.flow_definition import (
FlowStateDefinition,
FlowUnknownStateDefinition,
_object_ref,
log_flow_definition_issues,
)
from crewai.flow.flow_wrappers import (
FlowMethod,
@@ -116,7 +116,6 @@ def _is_json_serializable(value: Any) -> bool:
def _serialize_static_value(
value: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> Any:
if value is None or _is_json_serializable(value):
@@ -148,12 +147,11 @@ def _serialize_static_value(
)
ref = _object_ref(value)
diagnostics.append(
FlowDefinitionDiagnostic(
code="non_serializable_value",
path=path,
message=f"value is not fully serializable; preserved import reference {ref}",
)
logger.warning(
"Flow definition value at %s is not fully serializable; "
"preserved import reference %s.",
path,
ref,
)
return {"ref": ref}
@@ -169,10 +167,7 @@ def _state_ref(value: Any) -> str | None:
return None
def _build_state_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowStateDefinition | None:
def _build_state_definition(flow_class: type) -> FlowStateDefinition | None:
from pydantic import BaseModel as PydanticBaseModel
state_value = getattr(flow_class, "_initial_state_t", None)
@@ -187,29 +182,23 @@ def _build_state_definition(
if state_value is dict or isinstance(state_value, dict):
default = None
if isinstance(state_value, dict):
default = _serialize_static_value(state_value, diagnostics, "state.default")
default = _serialize_static_value(state_value, "state.default")
return FlowDictStateDefinition(default=default)
if isinstance(state_value, type) and issubclass(state_value, PydanticBaseModel):
return FlowPydanticStateDefinition(ref=_state_ref(state_value))
if isinstance(state_value, PydanticBaseModel):
return FlowPydanticStateDefinition(
ref=_state_ref(state_value),
default=_serialize_static_value(state_value, diagnostics, "state.default"),
)
diagnostics.append(
FlowDefinitionDiagnostic(
code="unknown_state_type",
path="state",
message=f"could not serialize state type {_object_ref(state_value)}",
default=_serialize_static_value(state_value, "state.default"),
)
logger.warning(
"Flow definition state could not serialize state type %s.",
_object_ref(state_value),
)
return FlowUnknownStateDefinition(ref=_state_ref(state_value))
def _build_config_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConfigDefinition:
def _build_config_definition(flow_class: type) -> FlowConfigDefinition:
config_field_names = set(FlowConfigDefinition.model_fields)
field_defaults = {
name: field.get_default(call_default_factory=True)
@@ -225,15 +214,12 @@ def _build_config_definition(
value if value is None or isinstance(value, str) else _object_ref(value)
)
else:
values[field_name] = _serialize_static_value(
value, diagnostics, f"config.{field_name}"
)
values[field_name] = _serialize_static_value(value, f"config.{field_name}")
return FlowConfigDefinition(**values)
def _build_human_feedback_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowHumanFeedbackDefinition | None:
config = getattr(method, "__human_feedback_config__", None)
@@ -248,7 +234,7 @@ def _build_human_feedback_definition(
llm=getattr(config, "llm", None),
default_outcome=getattr(config, "default_outcome", None),
metadata=_serialize_static_value(
getattr(config, "metadata", None), diagnostics, f"{path}.metadata"
getattr(config, "metadata", None), f"{path}.metadata"
),
provider=getattr(config, "provider", None),
learn=bool(getattr(config, "learn", False)),
@@ -273,7 +259,6 @@ def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | Non
def _build_conversational_router_definition(
router_config: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowConversationalRouterDefinition | None:
if router_config is None:
@@ -284,12 +269,9 @@ def _build_conversational_router_definition(
prompt=getattr(router_config, "prompt", None),
response_format=_serialize_static_value(
getattr(router_config, "response_format", None),
diagnostics,
f"{path}.response_format",
),
llm=_serialize_static_value(
getattr(router_config, "llm", None), diagnostics, f"{path}.llm"
),
llm=_serialize_static_value(getattr(router_config, "llm", None), f"{path}.llm"),
routes=[str(route) for route in routes] if routes is not None else None,
route_descriptions=getattr(router_config, "route_descriptions", None),
default_intent=getattr(router_config, "default_intent", "converse"),
@@ -300,7 +282,6 @@ def _build_conversational_router_definition(
def _build_conversational_definition(
flow_class: type,
diagnostics: list[FlowDefinitionDiagnostic],
) -> FlowConversationalDefinition | None:
if not _is_conversational_flow(flow_class):
return None
@@ -324,12 +305,9 @@ def _build_conversational_definition(
return FlowConversationalDefinition(
enabled=True,
system_prompt=getattr(config, "system_prompt", None),
llm=_serialize_static_value(
getattr(config, "llm", None), diagnostics, "conversational.llm"
),
llm=_serialize_static_value(getattr(config, "llm", None), "conversational.llm"),
router=_build_conversational_router_definition(
getattr(config, "router", None),
diagnostics,
"conversational.router",
),
answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None),
@@ -340,12 +318,10 @@ def _build_conversational_definition(
),
intent_llm=_serialize_static_value(
getattr(config, "intent_llm", None),
diagnostics,
"conversational.intent_llm",
),
answer_from_history_llm=_serialize_static_value(
getattr(config, "answer_from_history_llm", None),
diagnostics,
"conversational.answer_from_history_llm",
),
visible_agent_outputs=(
@@ -365,7 +341,6 @@ def _build_conversational_definition(
def _build_method_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],
path: str,
) -> FlowMethodDefinition:
fragment = _get_flow_method_definition(method)
@@ -376,9 +351,7 @@ def _build_method_definition(
deep=True, update={"do": _method_action(method)}
)
human_feedback = _build_human_feedback_definition(
method, diagnostics, f"{path}.human_feedback"
)
human_feedback = _build_human_feedback_definition(method, f"{path}.human_feedback")
if human_feedback is not None:
method_definition.human_feedback = human_feedback
if human_feedback.emit:
@@ -444,7 +417,6 @@ def _build_flow_definition_from_class(
flow_class: type,
namespace: dict[str, Any] | None = None,
) -> FlowDefinition:
diagnostics: list[FlowDefinitionDiagnostic] = []
methods: dict[str, FlowMethodDefinition] = {}
flow_methods = _iter_flow_methods(flow_class)
if namespace is not None:
@@ -456,7 +428,7 @@ def _build_flow_definition_from_class(
for method_name, method in flow_methods.items():
methods[method_name] = _build_method_definition(
method, diagnostics, f"methods.{method_name}"
method, f"methods.{method_name}"
)
description = None
@@ -467,15 +439,13 @@ def _build_flow_definition_from_class(
definition = FlowDefinition(
name=getattr(flow_class, "__name__", "Flow"),
description=description,
state=_build_state_definition(flow_class, diagnostics),
config=_build_config_definition(flow_class, diagnostics),
state=_build_state_definition(flow_class),
config=_build_config_definition(flow_class),
persist=_build_persistence_definition(flow_class),
conversational=_build_conversational_definition(flow_class, diagnostics),
conversational=_build_conversational_definition(flow_class),
methods=methods,
diagnostics=diagnostics,
)
definition.diagnostics.extend(definition.validate_contract())
definition.log_diagnostics()
log_flow_definition_issues(definition)
return definition

View File

@@ -45,7 +45,6 @@ __all__ = [
"FlowCrewActionDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowDictStateDefinition",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
@@ -70,29 +69,6 @@ def _object_ref(value: Any) -> str:
return f"{module}:{qualname}" if module and qualname else repr(value)
class FlowDefinitionDiagnostic(BaseModel):
"""A non-fatal Flow Definition build or validation diagnostic."""
code: str = Field(
description="Stable diagnostic identifier for tooling and tests.",
examples=["router_without_trigger"],
)
message: str = Field(
description="Human-readable explanation of the diagnostic.",
examples=["router: true requires either start or listen"],
)
severity: Literal["warning", "error"] = Field(
default="warning",
description="Diagnostic severity. Errors indicate an invalid or incomplete contract.",
examples=["error"],
)
path: str | None = Field(
default=None,
description="Dot path to the definition field that produced the diagnostic.",
examples=["methods.decide"],
)
class FlowDictStateDefinition(BaseModel):
"""Static description of a plain dictionary Flow state contract."""
@@ -702,20 +678,6 @@ class FlowDefinition(BaseModel):
}
],
)
diagnostics: list[FlowDefinitionDiagnostic] = Field(
default_factory=list,
description="Validation diagnostics attached to this definition.",
examples=[
[
{
"code": "router_without_trigger",
"message": "router: true requires either start or listen",
"severity": "error",
"path": "methods.decide",
}
]
],
)
@model_validator(mode="after")
def _validate_method_names(self) -> FlowDefinition:
@@ -742,13 +704,9 @@ class FlowDefinition(BaseModel):
@classmethod
def from_dict(cls, data: dict[str, Any]) -> FlowDefinition:
"""Load a definition from a dictionary and attach diagnostics."""
serialized_diagnostics = _deserialize_diagnostics(data.get("diagnostics", []))
"""Load a definition from a dictionary."""
definition = cls.model_validate(data)
definition.diagnostics = _merge_diagnostics(
serialized_diagnostics, definition.validate_contract()
)
definition.log_diagnostics()
log_flow_definition_issues(definition)
return definition
@classmethod
@@ -769,122 +727,81 @@ class FlowDefinition(BaseModel):
"""Return the JSON Schema for the Flow Definition contract."""
return cls.model_json_schema(by_alias=True)
def validate_contract(self) -> list[FlowDefinitionDiagnostic]:
"""Validate the static contract without rejecting dynamic routing."""
diagnostics: list[FlowDefinitionDiagnostic] = []
for method_name, method in self.methods.items():
path = f"methods.{method_name}"
if method.router and not method.is_start and method.listen is None:
diagnostics.append(
FlowDefinitionDiagnostic(
code="router_without_trigger",
severity="error",
path=path,
message="router: true requires either start or listen",
)
)
if method.emit and not method.router:
diagnostics.append(
FlowDefinitionDiagnostic(
code="emit_without_router",
path=f"{path}.emit",
message="emit is only used by routers to declare downstream events",
)
)
if method.human_feedback:
human_feedback_config = method.human_feedback
if human_feedback_config.emit and not human_feedback_config.llm:
diagnostics.append(
FlowDefinitionDiagnostic(
code="human_feedback_llm_required",
severity="error",
path=f"{path}.human_feedback.llm",
message="llm is required when human_feedback.emit is set",
)
)
if (
human_feedback_config.default_outcome is not None
and not human_feedback_config.emit
):
diagnostics.append(
FlowDefinitionDiagnostic(
code="human_feedback_default_requires_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome requires human_feedback.emit",
)
)
elif (
human_feedback_config.default_outcome is not None
and human_feedback_config.emit
):
if (
human_feedback_config.default_outcome
not in human_feedback_config.emit
):
diagnostics.append(
FlowDefinitionDiagnostic(
code="human_feedback_default_not_in_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome must be one of human_feedback.emit",
)
)
return diagnostics
def with_diagnostics(self) -> FlowDefinition:
"""Attach fresh diagnostics and return this definition."""
self.diagnostics = self.validate_contract()
self.log_diagnostics()
return self
def log_diagnostics(self) -> None:
"""Emit all attached diagnostics through the flow definition logger."""
_log_flow_definition_diagnostics(self.name, self.diagnostics)
def _log_flow_definition_diagnostics(
definition_name: str,
diagnostics: list[FlowDefinitionDiagnostic],
) -> None:
for diagnostic in diagnostics:
level = logging.ERROR if diagnostic.severity == "error" else logging.WARNING
path = f" at {diagnostic.path}" if diagnostic.path else ""
logger.log(
level,
"Flow definition diagnostic for %s%s [%s]: %s",
definition_name,
path,
diagnostic.code,
diagnostic.message,
)
def _deserialize_diagnostics(value: Any) -> list[FlowDefinitionDiagnostic]:
return [FlowDefinitionDiagnostic.model_validate(item) for item in value or []]
def _validate_step_name(name: str, *, field: str) -> None:
if not isinstance(name, str) or not _STEP_NAME_PATTERN.fullmatch(name):
raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}")
def _merge_diagnostics(
*diagnostic_groups: list[FlowDefinitionDiagnostic],
) -> list[FlowDefinitionDiagnostic]:
diagnostics: list[FlowDefinitionDiagnostic] = []
seen: set[tuple[str, str, str | None, str]] = set()
for group in diagnostic_groups:
for diagnostic in group:
key = (
diagnostic.code,
diagnostic.severity,
diagnostic.path,
diagnostic.message,
def log_flow_definition_issues(definition: FlowDefinition) -> None:
for method_name, method in definition.methods.items():
path = f"methods.{method_name}"
if method.router and not method.is_start and method.listen is None:
_log_flow_definition_issue(
definition.name,
code="router_without_trigger",
severity="error",
path=path,
message="router: true requires either start or listen",
)
if key in seen:
continue
seen.add(key)
diagnostics.append(diagnostic)
return diagnostics
if method.emit and not method.router:
_log_flow_definition_issue(
definition.name,
code="emit_without_router",
path=f"{path}.emit",
message="emit is only used by routers to declare downstream events",
)
if method.human_feedback:
human_feedback_config = method.human_feedback
if human_feedback_config.emit and not human_feedback_config.llm:
_log_flow_definition_issue(
definition.name,
code="human_feedback_llm_required",
severity="error",
path=f"{path}.human_feedback.llm",
message="llm is required when human_feedback.emit is set",
)
if (
human_feedback_config.default_outcome is not None
and not human_feedback_config.emit
):
_log_flow_definition_issue(
definition.name,
code="human_feedback_default_requires_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome requires human_feedback.emit",
)
elif (
human_feedback_config.default_outcome is not None
and human_feedback_config.emit
and human_feedback_config.default_outcome
not in human_feedback_config.emit
):
_log_flow_definition_issue(
definition.name,
code="human_feedback_default_not_in_emit",
severity="error",
path=f"{path}.human_feedback.default_outcome",
message="default_outcome must be one of human_feedback.emit",
)
def _log_flow_definition_issue(
definition_name: str,
*,
code: str,
message: str,
severity: Literal["warning", "error"] = "warning",
path: str | None = None,
) -> None:
level = logging.ERROR if severity == "error" else logging.WARNING
location = f" at {path}" if path else ""
logger.log(
level,
"Flow definition issue for %s%s [%s]: %s",
definition_name,
location,
code,
message,
)

View File

@@ -44,7 +44,6 @@ def test_flow_public_exports_are_explicit():
"FlowCrewActionDefinition",
"FlowDefinition",
"FlowDefinitionCondition",
"FlowDefinitionDiagnostic",
"FlowDictStateDefinition",
"FlowEachActionDefinition",
"FlowEachInnerActionDefinition",
@@ -69,6 +68,7 @@ def test_flow_definition_json_schema_carries_reference_descriptions():
assert schema["properties"]["schema"]["description"]
assert schema["properties"]["methods"]["description"]
assert "diagnostics" not in schema["properties"]
method_properties = defs["FlowMethodDefinition"]["properties"]
assert method_properties["do"]["description"] == "Action executed when this method runs."
@@ -79,7 +79,11 @@ def test_flow_definition_json_schema_carries_reference_descriptions():
assert "not interpolated" in script_properties["code"]["description"]
assert "not sandboxed" in script_properties["code"]["description"]
state_schema = schema["properties"]["state"]["anyOf"][0]
state_schema = next(
branch
for branch in schema["properties"]["state"]["anyOf"]
if "discriminator" in branch
)
assert state_schema["discriminator"]["propertyName"] == "type"
assert state_schema["discriminator"]["mapping"] == {
"dict": "#/$defs/FlowDictStateDefinition",
@@ -126,7 +130,6 @@ def test_flow_definition_json_schema_carries_field_examples_only():
"FlowConfigDefinition",
"FlowPersistenceDefinition",
"FlowHumanFeedbackDefinition",
"FlowDefinitionDiagnostic",
]:
model_schema = schema if model_name == "FlowDefinition" else defs[model_name]
assert "examples" not in model_schema
@@ -332,7 +335,7 @@ def test_flow_definition_maps_dsl_to_static_contract():
assert review.human_feedback.learn_strict is True
assert definition.methods["audit"].listen == {"and": ["begin", "process"]}
assert definition.diagnostics == []
assert "diagnostics" not in definition.to_dict()
def test_flow_definition_excludes_conversational_builtins_for_regular_flows():
@@ -414,7 +417,8 @@ def test_flow_definition_uses_collapsed_conversational_router_start():
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_human_feedback_metadata():
def test_flow_definition_serializes_human_feedback_metadata(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.dsl._utils")
marker = object()
class MetadataFlow(Flow):
@@ -433,9 +437,9 @@ def test_flow_definition_serializes_human_feedback_metadata():
assert review.human_feedback is not None
assert review.human_feedback.metadata == {"ref": "builtins:dict"}
assert any(
diagnostic.code == "non_serializable_value"
and diagnostic.path == "methods.review.human_feedback.metadata"
for diagnostic in definition.diagnostics
"methods.review.human_feedback.metadata" in record.message
and "not fully serializable" in record.message
for record in caplog.records
)
definition.to_json()
@@ -674,7 +678,6 @@ def test_flow_definition_allows_dynamic_router_emit():
definition = DynamicRouterFlow.flow_definition()
assert definition.methods["decide"].emit is None
assert definition.diagnostics == []
def test_flow_definition_infers_literal_router_emit():
@@ -827,16 +830,15 @@ def test_flow_definition_accepts_explicit_router_events():
assert definition.methods["decide"].emit == ["left", "right"]
def test_flow_definition_preserves_diagnostics_loaded_from_contract():
def test_flow_definition_ignores_legacy_diagnostics_loaded_from_contract():
definition = flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedDiagnosticsFlow",
"methods": {
"decision": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"},
"router": True,
"emit": ["continue"],
"begin": {
"do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.begin"},
"start": True,
}
},
"diagnostics": [
@@ -856,13 +858,13 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract():
}
)
codes = [diagnostic.code for diagnostic in definition.diagnostics]
assert "serialized_warning" in codes
assert codes.count("router_without_trigger") == 1
assert "diagnostics" not in definition.to_dict()
def test_router_start_false_without_listen_reports_missing_trigger():
definition = flow_definition.FlowDefinition.from_dict(
def test_router_start_false_without_listen_logs_missing_trigger(caplog):
caplog.set_level(logging.ERROR, logger="crewai.flow.flow_definition")
flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedFlow",
@@ -878,9 +880,10 @@ def test_router_start_false_without_listen_reports_missing_trigger():
)
assert any(
diagnostic.code == "router_without_trigger"
and diagnostic.path == "methods.decision"
for diagnostic in definition.diagnostics
record.levelno == logging.ERROR
and "router_without_trigger" in record.message
and "methods.decision" in record.message
for record in caplog.records
)
@@ -908,7 +911,7 @@ def test_router_human_feedback_preserves_existing_router_metadata():
assert method.human_feedback is not None
def test_dynamic_router_flow_definition_has_no_diagnostics():
def test_dynamic_router_flow_definition_allows_dynamic_emit():
class LazyDynamicRouterFlow(Flow):
@start()
def begin(self):
@@ -919,7 +922,7 @@ def test_dynamic_router_flow_definition_has_no_diagnostics():
return self.state["dynamic_event"]
definition = LazyDynamicRouterFlow.flow_definition()
assert definition.diagnostics == []
assert definition.methods["decide"].emit is None
def test_dynamic_router_string_listener_is_valid_contract():
@@ -938,7 +941,7 @@ def test_dynamic_router_string_listener_is_valid_contract():
definition = DynamicRouterListenerFlow.flow_definition()
assert definition.diagnostics == []
assert definition.methods["handle"].listen == "dynamic_event"
def test_static_string_listener_is_allowed_by_contract():
@@ -958,7 +961,7 @@ def test_static_string_listener_is_allowed_by_contract():
},
}
)
assert definition.diagnostics == []
assert definition.methods["handle"].listen == "begni"
def test_start_false_not_classified_as_start_method():
@@ -1023,10 +1026,10 @@ def test_flow_definition_cache_is_not_reused_by_subclasses():
assert set(child_definition.methods) == {"child_step"}
def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
def test_flow_definition_logs_validation_issues_when_loaded_from_contract(caplog):
caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition")
definition = flow_definition.FlowDefinition.from_dict(
flow_definition.FlowDefinition.from_dict(
{
"schema": "crewai.flow/v1",
"name": "LoadedFlow",
@@ -1040,10 +1043,6 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog):
}
)
assert any(
diagnostic.code == "router_without_trigger"
for diagnostic in definition.diagnostics
)
assert any(
record.levelno == logging.ERROR
and "LoadedFlow" in record.message