From 218dc82bf70df6285ae38255f6afde88f0e162c0 Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Wed, 17 Jun 2026 19:37:52 -0700 Subject: [PATCH] Replace flow diagnostics with logging (#6212) This commit removes flow diagnostics from the definition. These were used for logging only, and should not be coupled to the definition. --- lib/crewai/src/crewai/flow/dsl/_utils.py | 76 ++---- lib/crewai/src/crewai/flow/flow_definition.py | 229 ++++++------------ lib/crewai/tests/test_flow_definition.py | 63 +++-- 3 files changed, 127 insertions(+), 241 deletions(-) diff --git a/lib/crewai/src/crewai/flow/dsl/_utils.py b/lib/crewai/src/crewai/flow/dsl/_utils.py index 35b8006da..99d60a9e3 100644 --- a/lib/crewai/src/crewai/flow/dsl/_utils.py +++ b/lib/crewai/src/crewai/flow/dsl/_utils.py @@ -14,7 +14,6 @@ from crewai.flow.flow_definition import ( FlowConversationalDefinition, FlowConversationalRouterDefinition, FlowDefinition, - FlowDefinitionDiagnostic, FlowDictStateDefinition, FlowHumanFeedbackDefinition, FlowMethodDefinition, @@ -23,6 +22,7 @@ from crewai.flow.flow_definition import ( FlowStateDefinition, FlowUnknownStateDefinition, _object_ref, + log_flow_definition_issues, ) from crewai.flow.flow_wrappers import ( FlowMethod, @@ -116,7 +116,6 @@ def _is_json_serializable(value: Any) -> bool: def _serialize_static_value( value: Any, - diagnostics: list[FlowDefinitionDiagnostic], path: str, ) -> Any: if value is None or _is_json_serializable(value): @@ -148,12 +147,11 @@ def _serialize_static_value( ) ref = _object_ref(value) - diagnostics.append( - FlowDefinitionDiagnostic( - code="non_serializable_value", - path=path, - message=f"value is not fully serializable; preserved import reference {ref}", - ) + logger.warning( + "Flow definition value at %s is not fully serializable; " + "preserved import reference %s.", + path, + ref, ) return {"ref": ref} @@ -169,10 +167,7 @@ def _state_ref(value: Any) -> str | None: return None -def _build_state_definition( - flow_class: type, - diagnostics: list[FlowDefinitionDiagnostic], -) -> FlowStateDefinition | None: +def _build_state_definition(flow_class: type) -> FlowStateDefinition | None: from pydantic import BaseModel as PydanticBaseModel state_value = getattr(flow_class, "_initial_state_t", None) @@ -187,29 +182,23 @@ def _build_state_definition( if state_value is dict or isinstance(state_value, dict): default = None if isinstance(state_value, dict): - default = _serialize_static_value(state_value, diagnostics, "state.default") + default = _serialize_static_value(state_value, "state.default") return FlowDictStateDefinition(default=default) if isinstance(state_value, type) and issubclass(state_value, PydanticBaseModel): return FlowPydanticStateDefinition(ref=_state_ref(state_value)) if isinstance(state_value, PydanticBaseModel): return FlowPydanticStateDefinition( ref=_state_ref(state_value), - default=_serialize_static_value(state_value, diagnostics, "state.default"), - ) - diagnostics.append( - FlowDefinitionDiagnostic( - code="unknown_state_type", - path="state", - message=f"could not serialize state type {_object_ref(state_value)}", + default=_serialize_static_value(state_value, "state.default"), ) + logger.warning( + "Flow definition state could not serialize state type %s.", + _object_ref(state_value), ) return FlowUnknownStateDefinition(ref=_state_ref(state_value)) -def _build_config_definition( - flow_class: type, - diagnostics: list[FlowDefinitionDiagnostic], -) -> FlowConfigDefinition: +def _build_config_definition(flow_class: type) -> FlowConfigDefinition: config_field_names = set(FlowConfigDefinition.model_fields) field_defaults = { name: field.get_default(call_default_factory=True) @@ -225,15 +214,12 @@ def _build_config_definition( value if value is None or isinstance(value, str) else _object_ref(value) ) else: - values[field_name] = _serialize_static_value( - value, diagnostics, f"config.{field_name}" - ) + values[field_name] = _serialize_static_value(value, f"config.{field_name}") return FlowConfigDefinition(**values) def _build_human_feedback_definition( method: Any, - diagnostics: list[FlowDefinitionDiagnostic], path: str, ) -> FlowHumanFeedbackDefinition | None: config = getattr(method, "__human_feedback_config__", None) @@ -248,7 +234,7 @@ def _build_human_feedback_definition( llm=getattr(config, "llm", None), default_outcome=getattr(config, "default_outcome", None), metadata=_serialize_static_value( - getattr(config, "metadata", None), diagnostics, f"{path}.metadata" + getattr(config, "metadata", None), f"{path}.metadata" ), provider=getattr(config, "provider", None), learn=bool(getattr(config, "learn", False)), @@ -273,7 +259,6 @@ def _build_persistence_definition(value: Any) -> FlowPersistenceDefinition | Non def _build_conversational_router_definition( router_config: Any, - diagnostics: list[FlowDefinitionDiagnostic], path: str, ) -> FlowConversationalRouterDefinition | None: if router_config is None: @@ -284,12 +269,9 @@ def _build_conversational_router_definition( prompt=getattr(router_config, "prompt", None), response_format=_serialize_static_value( getattr(router_config, "response_format", None), - diagnostics, f"{path}.response_format", ), - llm=_serialize_static_value( - getattr(router_config, "llm", None), diagnostics, f"{path}.llm" - ), + llm=_serialize_static_value(getattr(router_config, "llm", None), f"{path}.llm"), routes=[str(route) for route in routes] if routes is not None else None, route_descriptions=getattr(router_config, "route_descriptions", None), default_intent=getattr(router_config, "default_intent", "converse"), @@ -300,7 +282,6 @@ def _build_conversational_router_definition( def _build_conversational_definition( flow_class: type, - diagnostics: list[FlowDefinitionDiagnostic], ) -> FlowConversationalDefinition | None: if not _is_conversational_flow(flow_class): return None @@ -324,12 +305,9 @@ def _build_conversational_definition( return FlowConversationalDefinition( enabled=True, system_prompt=getattr(config, "system_prompt", None), - llm=_serialize_static_value( - getattr(config, "llm", None), diagnostics, "conversational.llm" - ), + llm=_serialize_static_value(getattr(config, "llm", None), "conversational.llm"), router=_build_conversational_router_definition( getattr(config, "router", None), - diagnostics, "conversational.router", ), answer_from_history_prompt=getattr(config, "answer_from_history_prompt", None), @@ -340,12 +318,10 @@ def _build_conversational_definition( ), intent_llm=_serialize_static_value( getattr(config, "intent_llm", None), - diagnostics, "conversational.intent_llm", ), answer_from_history_llm=_serialize_static_value( getattr(config, "answer_from_history_llm", None), - diagnostics, "conversational.answer_from_history_llm", ), visible_agent_outputs=( @@ -365,7 +341,6 @@ def _build_conversational_definition( def _build_method_definition( method: Any, - diagnostics: list[FlowDefinitionDiagnostic], path: str, ) -> FlowMethodDefinition: fragment = _get_flow_method_definition(method) @@ -376,9 +351,7 @@ def _build_method_definition( deep=True, update={"do": _method_action(method)} ) - human_feedback = _build_human_feedback_definition( - method, diagnostics, f"{path}.human_feedback" - ) + human_feedback = _build_human_feedback_definition(method, f"{path}.human_feedback") if human_feedback is not None: method_definition.human_feedback = human_feedback if human_feedback.emit: @@ -444,7 +417,6 @@ def _build_flow_definition_from_class( flow_class: type, namespace: dict[str, Any] | None = None, ) -> FlowDefinition: - diagnostics: list[FlowDefinitionDiagnostic] = [] methods: dict[str, FlowMethodDefinition] = {} flow_methods = _iter_flow_methods(flow_class) if namespace is not None: @@ -456,7 +428,7 @@ def _build_flow_definition_from_class( for method_name, method in flow_methods.items(): methods[method_name] = _build_method_definition( - method, diagnostics, f"methods.{method_name}" + method, f"methods.{method_name}" ) description = None @@ -467,15 +439,13 @@ def _build_flow_definition_from_class( definition = FlowDefinition( name=getattr(flow_class, "__name__", "Flow"), description=description, - state=_build_state_definition(flow_class, diagnostics), - config=_build_config_definition(flow_class, diagnostics), + state=_build_state_definition(flow_class), + config=_build_config_definition(flow_class), persist=_build_persistence_definition(flow_class), - conversational=_build_conversational_definition(flow_class, diagnostics), + conversational=_build_conversational_definition(flow_class), methods=methods, - diagnostics=diagnostics, ) - definition.diagnostics.extend(definition.validate_contract()) - definition.log_diagnostics() + log_flow_definition_issues(definition) return definition diff --git a/lib/crewai/src/crewai/flow/flow_definition.py b/lib/crewai/src/crewai/flow/flow_definition.py index 9897cdc1e..230dfe058 100644 --- a/lib/crewai/src/crewai/flow/flow_definition.py +++ b/lib/crewai/src/crewai/flow/flow_definition.py @@ -45,7 +45,6 @@ __all__ = [ "FlowCrewActionDefinition", "FlowDefinition", "FlowDefinitionCondition", - "FlowDefinitionDiagnostic", "FlowDictStateDefinition", "FlowEachActionDefinition", "FlowEachInnerActionDefinition", @@ -70,29 +69,6 @@ def _object_ref(value: Any) -> str: return f"{module}:{qualname}" if module and qualname else repr(value) -class FlowDefinitionDiagnostic(BaseModel): - """A non-fatal Flow Definition build or validation diagnostic.""" - - code: str = Field( - description="Stable diagnostic identifier for tooling and tests.", - examples=["router_without_trigger"], - ) - message: str = Field( - description="Human-readable explanation of the diagnostic.", - examples=["router: true requires either start or listen"], - ) - severity: Literal["warning", "error"] = Field( - default="warning", - description="Diagnostic severity. Errors indicate an invalid or incomplete contract.", - examples=["error"], - ) - path: str | None = Field( - default=None, - description="Dot path to the definition field that produced the diagnostic.", - examples=["methods.decide"], - ) - - class FlowDictStateDefinition(BaseModel): """Static description of a plain dictionary Flow state contract.""" @@ -702,20 +678,6 @@ class FlowDefinition(BaseModel): } ], ) - diagnostics: list[FlowDefinitionDiagnostic] = Field( - default_factory=list, - description="Validation diagnostics attached to this definition.", - examples=[ - [ - { - "code": "router_without_trigger", - "message": "router: true requires either start or listen", - "severity": "error", - "path": "methods.decide", - } - ] - ], - ) @model_validator(mode="after") def _validate_method_names(self) -> FlowDefinition: @@ -742,13 +704,9 @@ class FlowDefinition(BaseModel): @classmethod def from_dict(cls, data: dict[str, Any]) -> FlowDefinition: - """Load a definition from a dictionary and attach diagnostics.""" - serialized_diagnostics = _deserialize_diagnostics(data.get("diagnostics", [])) + """Load a definition from a dictionary.""" definition = cls.model_validate(data) - definition.diagnostics = _merge_diagnostics( - serialized_diagnostics, definition.validate_contract() - ) - definition.log_diagnostics() + log_flow_definition_issues(definition) return definition @classmethod @@ -769,122 +727,81 @@ class FlowDefinition(BaseModel): """Return the JSON Schema for the Flow Definition contract.""" return cls.model_json_schema(by_alias=True) - def validate_contract(self) -> list[FlowDefinitionDiagnostic]: - """Validate the static contract without rejecting dynamic routing.""" - diagnostics: list[FlowDefinitionDiagnostic] = [] - for method_name, method in self.methods.items(): - path = f"methods.{method_name}" - if method.router and not method.is_start and method.listen is None: - diagnostics.append( - FlowDefinitionDiagnostic( - code="router_without_trigger", - severity="error", - path=path, - message="router: true requires either start or listen", - ) - ) - if method.emit and not method.router: - diagnostics.append( - FlowDefinitionDiagnostic( - code="emit_without_router", - path=f"{path}.emit", - message="emit is only used by routers to declare downstream events", - ) - ) - if method.human_feedback: - human_feedback_config = method.human_feedback - if human_feedback_config.emit and not human_feedback_config.llm: - diagnostics.append( - FlowDefinitionDiagnostic( - code="human_feedback_llm_required", - severity="error", - path=f"{path}.human_feedback.llm", - message="llm is required when human_feedback.emit is set", - ) - ) - if ( - human_feedback_config.default_outcome is not None - and not human_feedback_config.emit - ): - diagnostics.append( - FlowDefinitionDiagnostic( - code="human_feedback_default_requires_emit", - severity="error", - path=f"{path}.human_feedback.default_outcome", - message="default_outcome requires human_feedback.emit", - ) - ) - elif ( - human_feedback_config.default_outcome is not None - and human_feedback_config.emit - ): - if ( - human_feedback_config.default_outcome - not in human_feedback_config.emit - ): - diagnostics.append( - FlowDefinitionDiagnostic( - code="human_feedback_default_not_in_emit", - severity="error", - path=f"{path}.human_feedback.default_outcome", - message="default_outcome must be one of human_feedback.emit", - ) - ) - - return diagnostics - - def with_diagnostics(self) -> FlowDefinition: - """Attach fresh diagnostics and return this definition.""" - self.diagnostics = self.validate_contract() - self.log_diagnostics() - return self - - def log_diagnostics(self) -> None: - """Emit all attached diagnostics through the flow definition logger.""" - _log_flow_definition_diagnostics(self.name, self.diagnostics) - - -def _log_flow_definition_diagnostics( - definition_name: str, - diagnostics: list[FlowDefinitionDiagnostic], -) -> None: - for diagnostic in diagnostics: - level = logging.ERROR if diagnostic.severity == "error" else logging.WARNING - path = f" at {diagnostic.path}" if diagnostic.path else "" - logger.log( - level, - "Flow definition diagnostic for %s%s [%s]: %s", - definition_name, - path, - diagnostic.code, - diagnostic.message, - ) - - -def _deserialize_diagnostics(value: Any) -> list[FlowDefinitionDiagnostic]: - return [FlowDefinitionDiagnostic.model_validate(item) for item in value or []] - def _validate_step_name(name: str, *, field: str) -> None: if not isinstance(name, str) or not _STEP_NAME_PATTERN.fullmatch(name): raise ValueError(f"{field} must match {_STEP_NAME_PATTERN.pattern}") -def _merge_diagnostics( - *diagnostic_groups: list[FlowDefinitionDiagnostic], -) -> list[FlowDefinitionDiagnostic]: - diagnostics: list[FlowDefinitionDiagnostic] = [] - seen: set[tuple[str, str, str | None, str]] = set() - for group in diagnostic_groups: - for diagnostic in group: - key = ( - diagnostic.code, - diagnostic.severity, - diagnostic.path, - diagnostic.message, +def log_flow_definition_issues(definition: FlowDefinition) -> None: + for method_name, method in definition.methods.items(): + path = f"methods.{method_name}" + if method.router and not method.is_start and method.listen is None: + _log_flow_definition_issue( + definition.name, + code="router_without_trigger", + severity="error", + path=path, + message="router: true requires either start or listen", ) - if key in seen: - continue - seen.add(key) - diagnostics.append(diagnostic) - return diagnostics + if method.emit and not method.router: + _log_flow_definition_issue( + definition.name, + code="emit_without_router", + path=f"{path}.emit", + message="emit is only used by routers to declare downstream events", + ) + if method.human_feedback: + human_feedback_config = method.human_feedback + if human_feedback_config.emit and not human_feedback_config.llm: + _log_flow_definition_issue( + definition.name, + code="human_feedback_llm_required", + severity="error", + path=f"{path}.human_feedback.llm", + message="llm is required when human_feedback.emit is set", + ) + if ( + human_feedback_config.default_outcome is not None + and not human_feedback_config.emit + ): + _log_flow_definition_issue( + definition.name, + code="human_feedback_default_requires_emit", + severity="error", + path=f"{path}.human_feedback.default_outcome", + message="default_outcome requires human_feedback.emit", + ) + elif ( + human_feedback_config.default_outcome is not None + and human_feedback_config.emit + and human_feedback_config.default_outcome + not in human_feedback_config.emit + ): + _log_flow_definition_issue( + definition.name, + code="human_feedback_default_not_in_emit", + severity="error", + path=f"{path}.human_feedback.default_outcome", + message="default_outcome must be one of human_feedback.emit", + ) + + +def _log_flow_definition_issue( + definition_name: str, + *, + code: str, + message: str, + severity: Literal["warning", "error"] = "warning", + path: str | None = None, +) -> None: + level = logging.ERROR if severity == "error" else logging.WARNING + location = f" at {path}" if path else "" + logger.log( + level, + "Flow definition issue for %s%s [%s]: %s", + definition_name, + location, + code, + message, + ) diff --git a/lib/crewai/tests/test_flow_definition.py b/lib/crewai/tests/test_flow_definition.py index 7aaa4d31a..f947291b8 100644 --- a/lib/crewai/tests/test_flow_definition.py +++ b/lib/crewai/tests/test_flow_definition.py @@ -44,7 +44,6 @@ def test_flow_public_exports_are_explicit(): "FlowCrewActionDefinition", "FlowDefinition", "FlowDefinitionCondition", - "FlowDefinitionDiagnostic", "FlowDictStateDefinition", "FlowEachActionDefinition", "FlowEachInnerActionDefinition", @@ -69,6 +68,7 @@ def test_flow_definition_json_schema_carries_reference_descriptions(): assert schema["properties"]["schema"]["description"] assert schema["properties"]["methods"]["description"] + assert "diagnostics" not in schema["properties"] method_properties = defs["FlowMethodDefinition"]["properties"] assert method_properties["do"]["description"] == "Action executed when this method runs." @@ -79,7 +79,11 @@ def test_flow_definition_json_schema_carries_reference_descriptions(): assert "not interpolated" in script_properties["code"]["description"] assert "not sandboxed" in script_properties["code"]["description"] - state_schema = schema["properties"]["state"]["anyOf"][0] + state_schema = next( + branch + for branch in schema["properties"]["state"]["anyOf"] + if "discriminator" in branch + ) assert state_schema["discriminator"]["propertyName"] == "type" assert state_schema["discriminator"]["mapping"] == { "dict": "#/$defs/FlowDictStateDefinition", @@ -126,7 +130,6 @@ def test_flow_definition_json_schema_carries_field_examples_only(): "FlowConfigDefinition", "FlowPersistenceDefinition", "FlowHumanFeedbackDefinition", - "FlowDefinitionDiagnostic", ]: model_schema = schema if model_name == "FlowDefinition" else defs[model_name] assert "examples" not in model_schema @@ -332,7 +335,7 @@ def test_flow_definition_maps_dsl_to_static_contract(): assert review.human_feedback.learn_strict is True assert definition.methods["audit"].listen == {"and": ["begin", "process"]} - assert definition.diagnostics == [] + assert "diagnostics" not in definition.to_dict() def test_flow_definition_excludes_conversational_builtins_for_regular_flows(): @@ -414,7 +417,8 @@ def test_flow_definition_uses_collapsed_conversational_router_start(): assert methods["route_conversation"].router is True -def test_flow_definition_serializes_human_feedback_metadata(): +def test_flow_definition_serializes_human_feedback_metadata(caplog): + caplog.set_level(logging.WARNING, logger="crewai.flow.dsl._utils") marker = object() class MetadataFlow(Flow): @@ -433,9 +437,9 @@ def test_flow_definition_serializes_human_feedback_metadata(): assert review.human_feedback is not None assert review.human_feedback.metadata == {"ref": "builtins:dict"} assert any( - diagnostic.code == "non_serializable_value" - and diagnostic.path == "methods.review.human_feedback.metadata" - for diagnostic in definition.diagnostics + "methods.review.human_feedback.metadata" in record.message + and "not fully serializable" in record.message + for record in caplog.records ) definition.to_json() @@ -674,7 +678,6 @@ def test_flow_definition_allows_dynamic_router_emit(): definition = DynamicRouterFlow.flow_definition() assert definition.methods["decide"].emit is None - assert definition.diagnostics == [] def test_flow_definition_infers_literal_router_emit(): @@ -827,16 +830,15 @@ def test_flow_definition_accepts_explicit_router_events(): assert definition.methods["decide"].emit == ["left", "right"] -def test_flow_definition_preserves_diagnostics_loaded_from_contract(): +def test_flow_definition_ignores_legacy_diagnostics_loaded_from_contract(): definition = flow_definition.FlowDefinition.from_dict( { "schema": "crewai.flow/v1", "name": "LoadedDiagnosticsFlow", "methods": { - "decision": { - "do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.decision"}, - "router": True, - "emit": ["continue"], + "begin": { + "do": {"ref": "loaded_flows:LoadedDiagnosticsFlow.begin"}, + "start": True, } }, "diagnostics": [ @@ -856,13 +858,13 @@ def test_flow_definition_preserves_diagnostics_loaded_from_contract(): } ) - codes = [diagnostic.code for diagnostic in definition.diagnostics] - assert "serialized_warning" in codes - assert codes.count("router_without_trigger") == 1 + assert "diagnostics" not in definition.to_dict() -def test_router_start_false_without_listen_reports_missing_trigger(): - definition = flow_definition.FlowDefinition.from_dict( +def test_router_start_false_without_listen_logs_missing_trigger(caplog): + caplog.set_level(logging.ERROR, logger="crewai.flow.flow_definition") + + flow_definition.FlowDefinition.from_dict( { "schema": "crewai.flow/v1", "name": "LoadedFlow", @@ -878,9 +880,10 @@ def test_router_start_false_without_listen_reports_missing_trigger(): ) assert any( - diagnostic.code == "router_without_trigger" - and diagnostic.path == "methods.decision" - for diagnostic in definition.diagnostics + record.levelno == logging.ERROR + and "router_without_trigger" in record.message + and "methods.decision" in record.message + for record in caplog.records ) @@ -908,7 +911,7 @@ def test_router_human_feedback_preserves_existing_router_metadata(): assert method.human_feedback is not None -def test_dynamic_router_flow_definition_has_no_diagnostics(): +def test_dynamic_router_flow_definition_allows_dynamic_emit(): class LazyDynamicRouterFlow(Flow): @start() def begin(self): @@ -919,7 +922,7 @@ def test_dynamic_router_flow_definition_has_no_diagnostics(): return self.state["dynamic_event"] definition = LazyDynamicRouterFlow.flow_definition() - assert definition.diagnostics == [] + assert definition.methods["decide"].emit is None def test_dynamic_router_string_listener_is_valid_contract(): @@ -938,7 +941,7 @@ def test_dynamic_router_string_listener_is_valid_contract(): definition = DynamicRouterListenerFlow.flow_definition() - assert definition.diagnostics == [] + assert definition.methods["handle"].listen == "dynamic_event" def test_static_string_listener_is_allowed_by_contract(): @@ -958,7 +961,7 @@ def test_static_string_listener_is_allowed_by_contract(): }, } ) - assert definition.diagnostics == [] + assert definition.methods["handle"].listen == "begni" def test_start_false_not_classified_as_start_method(): @@ -1023,10 +1026,10 @@ def test_flow_definition_cache_is_not_reused_by_subclasses(): assert set(child_definition.methods) == {"child_step"} -def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog): +def test_flow_definition_logs_validation_issues_when_loaded_from_contract(caplog): caplog.set_level(logging.WARNING, logger="crewai.flow.flow_definition") - definition = flow_definition.FlowDefinition.from_dict( + flow_definition.FlowDefinition.from_dict( { "schema": "crewai.flow/v1", "name": "LoadedFlow", @@ -1040,10 +1043,6 @@ def test_flow_definition_logs_diagnostics_when_loaded_from_contract(caplog): } ) - assert any( - diagnostic.code == "router_without_trigger" - for diagnostic in definition.diagnostics - ) assert any( record.levelno == logging.ERROR and "LoadedFlow" in record.message