diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 8172e7a70..95e6a9a15 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1074,6 +1074,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): _human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict) _input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list) _state: Any = PrivateAttr(default=None) + _execution_id: str = PrivateAttr(default_factory=lambda: str(uuid4())) def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override] class _FlowGeneric(cls): # type: ignore[valid-type,misc] @@ -1864,6 +1865,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): except (AttributeError, TypeError): return "" # Safely handle any unexpected attribute access issues + @property + def execution_id(self) -> str: + """Stable identifier for this flow execution. + + Separate from ``flow_id`` / ``state.id``, which consumers may + override via ``kickoff(inputs={"id": ...})`` to resume a persisted + flow. ``execution_id`` is never affected by ``inputs`` and stays + stable for the lifetime of a single run, so it is the correct key + for telemetry, tracing, and any external correlation that must + uniquely identify a single execution even when callers pass an + ``id`` in ``inputs``. + + Defaults to a fresh ``uuid4`` per ``Flow`` instance; assign to + override when an outer system already has an execution identity. + """ + return self._execution_id + + @execution_id.setter + def execution_id(self, value: str) -> None: + self._execution_id = value + def _initialize_state(self, inputs: dict[str, Any]) -> None: """Initialize or update flow state with new inputs. @@ -2177,9 +2199,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): flow_id_token = None request_id_token = None if current_flow_id.get() is None: - flow_id_token = current_flow_id.set(self.flow_id) + flow_id_token = current_flow_id.set(self.execution_id) if current_flow_request_id.get() is None: - request_id_token = current_flow_request_id.set(self.flow_id) + request_id_token = current_flow_request_id.set(self.execution_id) try: # Reset flow state for fresh execution unless restoring from persistence diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index 3d6fe4602..c3b8127a9 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -4519,8 +4519,8 @@ def test_sets_flow_context_when_using_crewbase_pattern_inside_flow(): flow.kickoff() assert captured_crew is not None - assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined] - assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined] + assert captured_crew._flow_id == flow.execution_id # type: ignore[attr-defined] + assert captured_crew._request_id == flow.execution_id # type: ignore[attr-defined] def test_sets_flow_context_when_outside_flow(researcher, writer): @@ -4554,8 +4554,8 @@ def test_sets_flow_context_when_inside_flow(researcher, writer): flow = MyFlow() result = flow.kickoff() - assert result._flow_id == flow.flow_id # type: ignore[attr-defined] - assert result._request_id == flow.flow_id # type: ignore[attr-defined] + assert result._flow_id == flow.execution_id # type: ignore[attr-defined] + assert result._request_id == flow.execution_id # type: ignore[attr-defined] def test_reset_knowledge_with_no_crew_knowledge(researcher, writer): diff --git a/lib/crewai/tests/test_flow_execution_id.py b/lib/crewai/tests/test_flow_execution_id.py new file mode 100644 index 000000000..95088d4b6 --- /dev/null +++ b/lib/crewai/tests/test_flow_execution_id.py @@ -0,0 +1,127 @@ +"""Regression tests for ``Flow.execution_id``. + +``execution_id`` is the stable tracking identifier for a single flow run. +It must stay independent of ``state.id`` so that consumers passing an +``id`` in ``inputs`` (used for persistence restore) cannot destabilize +the identity used by telemetry, tracing, and external correlation. +""" + +from __future__ import annotations + +from typing import Any + +import pytest +from crewai.flow.flow import Flow, FlowState, start +from crewai.flow.flow_context import current_flow_id, current_flow_request_id + + +class _CaptureState(FlowState): + captured_flow_id: str = "" + captured_state_id: str = "" + captured_current_flow_id: str = "" + captured_execution_id: str = "" + + +class _IdentityCaptureFlow(Flow[_CaptureState]): + initial_state = _CaptureState + + @start() + def capture(self) -> None: + self.state.captured_flow_id = self.flow_id + self.state.captured_state_id = self.state.id + self.state.captured_current_flow_id = current_flow_id.get() or "" + self.state.captured_execution_id = self.execution_id + + +def test_execution_id_defaults_to_fresh_uuid_per_instance() -> None: + a = _IdentityCaptureFlow() + b = _IdentityCaptureFlow() + + assert a.execution_id + assert b.execution_id + assert a.execution_id != b.execution_id + + +def test_execution_id_survives_consumer_id_in_inputs() -> None: + flow = _IdentityCaptureFlow() + original_execution_id = flow.execution_id + + flow.kickoff(inputs={"id": "consumer-supplied-id"}) + + assert flow.state.id == "consumer-supplied-id" + assert flow.flow_id == "consumer-supplied-id" + assert flow.execution_id == original_execution_id + assert flow.execution_id != "consumer-supplied-id" + + +def test_two_runs_with_same_consumer_id_have_distinct_execution_ids() -> None: + flow_a = _IdentityCaptureFlow() + flow_b = _IdentityCaptureFlow() + + colliding_id = "shared-consumer-id" + flow_a.kickoff(inputs={"id": colliding_id}) + flow_b.kickoff(inputs={"id": colliding_id}) + + assert flow_a.state.id == colliding_id + assert flow_b.state.id == colliding_id + assert flow_a.execution_id != flow_b.execution_id + + +def test_execution_id_is_writable() -> None: + flow = _IdentityCaptureFlow() + flow.execution_id = "external-task-id" + + assert flow.execution_id == "external-task-id" + + flow.kickoff(inputs={"id": "consumer-supplied-id"}) + assert flow.execution_id == "external-task-id" + assert flow.state.id == "consumer-supplied-id" + + +def test_current_flow_id_context_var_matches_execution_id() -> None: + flow = _IdentityCaptureFlow() + flow.execution_id = "external-task-id" + + flow.kickoff(inputs={"id": "consumer-supplied-id"}) + + assert flow.state.captured_current_flow_id == "external-task-id" + assert flow.state.captured_flow_id == "consumer-supplied-id" + assert flow.state.captured_execution_id == "external-task-id" + + +def test_execution_id_not_included_in_serialized_state() -> None: + flow = _IdentityCaptureFlow() + flow.execution_id = "external-task-id" + flow.kickoff() + + dumped = flow.state.model_dump() + assert "execution_id" not in dumped + assert "_execution_id" not in dumped + assert dumped["id"] == flow.state.id + + +def test_dict_state_flow_also_exposes_stable_execution_id() -> None: + class DictFlow(Flow[dict[str, Any]]): + initial_state = dict # type: ignore[assignment] + + @start() + def noop(self) -> None: + pass + + flow = DictFlow() + original = flow.execution_id + flow.kickoff(inputs={"id": "consumer-supplied-id"}) + + assert flow.state["id"] == "consumer-supplied-id" + assert flow.execution_id == original + + +@pytest.fixture(autouse=True) +def _reset_flow_context_vars(): + yield + for var in (current_flow_id, current_flow_request_id): + try: + var.set(None) + except LookupError: + # ContextVar was never set in this context; nothing to reset. + pass