From b0e2fda105c2e0c05c7abb1f53800443ffd582ea Mon Sep 17 00:00:00 2001 From: Tiago Freire Date: Thu, 23 Apr 2026 17:48:14 -0300 Subject: [PATCH] fix(flow): add execution_id separate from state.id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(flow): add execution_id separate from state.id (COR-48) When a consumer passes `id` in `kickoff(inputs=...)`, that value overwrites the flow's state.id — which was also being used as the execution tracking identity for telemetry, tracing, and external correlation. Two kickoffs sharing the same consumer id ended up with the same tracking id, breaking any downstream system that joins on it. Introduces `Flow.execution_id`: a stable per-run identifier stored as a `PrivateAttr` on the `Flow` model, exposed via property + setter. It defaults to a fresh `uuid4` per instance, is never touched by `inputs["id"]`, and can be assigned by outer systems that already have an execution identity (e.g. a task id). Switches the `current_flow_id` / `current_flow_request_id` ContextVars to seed from `execution_id` so OTel spans emitted by `FlowTrackable` children correlate on the stable tracking key. `state.id` keeps its existing override semantics for persistence/restore — consumers resuming a persisted flow via `inputs["id"]` work exactly as before. Adds tests covering default uniqueness per instance, immunity to consumer `inputs["id"]`, context-var propagation, absence from serialized state, and parity for dict-state flows. Co-authored-by: Greyson LaLonde --- lib/crewai/src/crewai/flow/flow.py | 26 ++++- lib/crewai/tests/test_crew.py | 8 +- lib/crewai/tests/test_flow_execution_id.py | 127 +++++++++++++++++++++ 3 files changed, 155 insertions(+), 6 deletions(-) create mode 100644 lib/crewai/tests/test_flow_execution_id.py diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 8172e7a70..95e6a9a15 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1074,6 +1074,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): _human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict) _input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list) _state: Any = PrivateAttr(default=None) + _execution_id: str = PrivateAttr(default_factory=lambda: str(uuid4())) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] class _FlowGeneric(cls): # type: ignore[valid-type,misc] @@ -1864,6 +1865,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): except (AttributeError, TypeError): return "" # Safely handle any unexpected attribute access issues + @property + def execution_id(self) -> str: + """Stable identifier for this flow execution. + + Separate from ``flow_id`` / ``state.id``, which consumers may + override via ``kickoff(inputs={"id": ...})`` to resume a persisted + flow. ``execution_id`` is never affected by ``inputs`` and stays + stable for the lifetime of a single run, so it is the correct key + for telemetry, tracing, and any external correlation that must + uniquely identify a single execution even when callers pass an + ``id`` in ``inputs``. + + Defaults to a fresh ``uuid4`` per ``Flow`` instance; assign to + override when an outer system already has an execution identity. + """ + return self._execution_id + + @execution_id.setter + def execution_id(self, value: str) -> None: + self._execution_id = value + def _initialize_state(self, inputs: dict[str, Any]) -> None: """Initialize or update flow state with new inputs. @@ -2177,9 +2199,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): flow_id_token = None request_id_token = None if current_flow_id.get() is None: - flow_id_token = current_flow_id.set(self.flow_id) + flow_id_token = current_flow_id.set(self.execution_id) if current_flow_request_id.get() is None: - request_id_token = current_flow_request_id.set(self.flow_id) + request_id_token = current_flow_request_id.set(self.execution_id) try: # Reset flow state for fresh execution unless restoring from persistence diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index 3d6fe4602..c3b8127a9 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -4519,8 +4519,8 @@ def test_sets_flow_context_when_using_crewbase_pattern_inside_flow(): flow.kickoff() assert captured_crew is not None - assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined] - assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined] + assert captured_crew._flow_id == flow.execution_id # type: ignore[attr-defined] + assert captured_crew._request_id == flow.execution_id # type: ignore[attr-defined] def test_sets_flow_context_when_outside_flow(researcher, writer): @@ -4554,8 +4554,8 @@ def test_sets_flow_context_when_inside_flow(researcher, writer): flow = MyFlow() result = flow.kickoff() - assert result._flow_id == flow.flow_id # type: ignore[attr-defined] - assert result._request_id == flow.flow_id # type: ignore[attr-defined] + assert result._flow_id == flow.execution_id # type: ignore[attr-defined] + assert result._request_id == flow.execution_id # type: ignore[attr-defined] def test_reset_knowledge_with_no_crew_knowledge(researcher, writer): diff --git a/lib/crewai/tests/test_flow_execution_id.py b/lib/crewai/tests/test_flow_execution_id.py new file mode 100644 index 000000000..95088d4b6 --- /dev/null +++ b/lib/crewai/tests/test_flow_execution_id.py @@ -0,0 +1,127 @@ +"""Regression tests for ``Flow.execution_id``. + +``execution_id`` is the stable tracking identifier for a single flow run. +It must stay independent of ``state.id`` so that consumers passing an +``id`` in ``inputs`` (used for persistence restore) cannot destabilize +the identity used by telemetry, tracing, and external correlation. +""" + +from __future__ import annotations + +from typing import Any + +import pytest +from crewai.flow.flow import Flow, FlowState, start +from crewai.flow.flow_context import current_flow_id, current_flow_request_id + + +class _CaptureState(FlowState): + captured_flow_id: str = "" + captured_state_id: str = "" + captured_current_flow_id: str = "" + captured_execution_id: str = "" + + +class _IdentityCaptureFlow(Flow[_CaptureState]): + initial_state = _CaptureState + + @start() + def capture(self) -> None: + self.state.captured_flow_id = self.flow_id + self.state.captured_state_id = self.state.id + self.state.captured_current_flow_id = current_flow_id.get() or "" + self.state.captured_execution_id = self.execution_id + + +def test_execution_id_defaults_to_fresh_uuid_per_instance() -> None: + a = _IdentityCaptureFlow() + b = _IdentityCaptureFlow() + + assert a.execution_id + assert b.execution_id + assert a.execution_id != b.execution_id + + +def test_execution_id_survives_consumer_id_in_inputs() -> None: + flow = _IdentityCaptureFlow() + original_execution_id = flow.execution_id + + flow.kickoff(inputs={"id": "consumer-supplied-id"}) + + assert flow.state.id == "consumer-supplied-id" + assert flow.flow_id == "consumer-supplied-id" + assert flow.execution_id == original_execution_id + assert flow.execution_id != "consumer-supplied-id" + + +def test_two_runs_with_same_consumer_id_have_distinct_execution_ids() -> None: + flow_a = _IdentityCaptureFlow() + flow_b = _IdentityCaptureFlow() + + colliding_id = "shared-consumer-id" + flow_a.kickoff(inputs={"id": colliding_id}) + flow_b.kickoff(inputs={"id": colliding_id}) + + assert flow_a.state.id == colliding_id + assert flow_b.state.id == colliding_id + assert flow_a.execution_id != flow_b.execution_id + + +def test_execution_id_is_writable() -> None: + flow = _IdentityCaptureFlow() + flow.execution_id = "external-task-id" + + assert flow.execution_id == "external-task-id" + + flow.kickoff(inputs={"id": "consumer-supplied-id"}) + assert flow.execution_id == "external-task-id" + assert flow.state.id == "consumer-supplied-id" + + +def test_current_flow_id_context_var_matches_execution_id() -> None: + flow = _IdentityCaptureFlow() + flow.execution_id = "external-task-id" + + flow.kickoff(inputs={"id": "consumer-supplied-id"}) + + assert flow.state.captured_current_flow_id == "external-task-id" + assert flow.state.captured_flow_id == "consumer-supplied-id" + assert flow.state.captured_execution_id == "external-task-id" + + +def test_execution_id_not_included_in_serialized_state() -> None: + flow = _IdentityCaptureFlow() + flow.execution_id = "external-task-id" + flow.kickoff() + + dumped = flow.state.model_dump() + assert "execution_id" not in dumped + assert "_execution_id" not in dumped + assert dumped["id"] == flow.state.id + + +def test_dict_state_flow_also_exposes_stable_execution_id() -> None: + class DictFlow(Flow[dict[str, Any]]): + initial_state = dict # type: ignore[assignment] + + @start() + def noop(self) -> None: + pass + + flow = DictFlow() + original = flow.execution_id + flow.kickoff(inputs={"id": "consumer-supplied-id"}) + + assert flow.state["id"] == "consumer-supplied-id" + assert flow.execution_id == original + + +@pytest.fixture(autouse=True) +def _reset_flow_context_vars(): + yield + for var in (current_flow_id, current_flow_request_id): + try: + var.set(None) + except LookupError: + # ContextVar was never set in this context; nothing to reset. + pass