fix(flow): add execution_id separate from state.id
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

* fix(flow): add execution_id separate from state.id (COR-48)

  When a consumer passes `id` in `kickoff(inputs=...)`, that value
  overwrites the flow's state.id — which was also being used as the
  execution tracking identity for telemetry, tracing, and external
  correlation. Two kickoffs sharing the same consumer id ended up
  with the same tracking id, breaking any downstream system that
  joins on it.

  Introduces `Flow.execution_id`: a stable per-run identifier stored
  as a `PrivateAttr` on the `Flow` model, exposed via property +
  setter. It defaults to a fresh `uuid4` per instance, is never
  touched by `inputs["id"]`, and can be assigned by outer systems
  that already have an execution identity (e.g. a task id).

  Switches the `current_flow_id` / `current_flow_request_id`
  ContextVars to seed from `execution_id` so OTel spans emitted by
  `FlowTrackable` children correlate on the stable tracking key.

  `state.id` keeps its existing override semantics for
  persistence/restore — consumers resuming a persisted flow via
  `inputs["id"]` work exactly as before.

  Adds tests covering default uniqueness per instance, immunity to
  consumer `inputs["id"]`, context-var propagation, absence from
  serialized state, and parity for dict-state flows.

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
This commit is contained in:
Tiago Freire
2026-04-23 17:48:14 -03:00
committed by GitHub
parent 69d777ca50
commit b0e2fda105
3 changed files with 155 additions and 6 deletions

View File

@@ -1074,6 +1074,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
_state: Any = PrivateAttr(default=None)
_execution_id: str = PrivateAttr(default_factory=lambda: str(uuid4()))
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
@@ -1864,6 +1865,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
except (AttributeError, TypeError):
return "" # Safely handle any unexpected attribute access issues
@property
def execution_id(self) -> str:
"""Stable identifier for this flow execution.
Separate from ``flow_id`` / ``state.id``, which consumers may
override via ``kickoff(inputs={"id": ...})`` to resume a persisted
flow. ``execution_id`` is never affected by ``inputs`` and stays
stable for the lifetime of a single run, so it is the correct key
for telemetry, tracing, and any external correlation that must
uniquely identify a single execution even when callers pass an
``id`` in ``inputs``.
Defaults to a fresh ``uuid4`` per ``Flow`` instance; assign to
override when an outer system already has an execution identity.
"""
return self._execution_id
@execution_id.setter
def execution_id(self, value: str) -> None:
self._execution_id = value
def _initialize_state(self, inputs: dict[str, Any]) -> None:
"""Initialize or update flow state with new inputs.
@@ -2177,9 +2199,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
flow_id_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
flow_id_token = current_flow_id.set(self.execution_id)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
request_id_token = current_flow_request_id.set(self.execution_id)
try:
# Reset flow state for fresh execution unless restoring from persistence

View File

@@ -4519,8 +4519,8 @@ def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
flow.kickoff()
assert captured_crew is not None
assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew._flow_id == flow.execution_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.execution_id # type: ignore[attr-defined]
def test_sets_flow_context_when_outside_flow(researcher, writer):
@@ -4554,8 +4554,8 @@ def test_sets_flow_context_when_inside_flow(researcher, writer):
flow = MyFlow()
result = flow.kickoff()
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
assert result._flow_id == flow.execution_id # type: ignore[attr-defined]
assert result._request_id == flow.execution_id # type: ignore[attr-defined]
def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):

View File

@@ -0,0 +1,127 @@
"""Regression tests for ``Flow.execution_id``.
``execution_id`` is the stable tracking identifier for a single flow run.
It must stay independent of ``state.id`` so that consumers passing an
``id`` in ``inputs`` (used for persistence restore) cannot destabilize
the identity used by telemetry, tracing, and external correlation.
"""
from __future__ import annotations
from typing import Any
import pytest
from crewai.flow.flow import Flow, FlowState, start
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
class _CaptureState(FlowState):
captured_flow_id: str = ""
captured_state_id: str = ""
captured_current_flow_id: str = ""
captured_execution_id: str = ""
class _IdentityCaptureFlow(Flow[_CaptureState]):
initial_state = _CaptureState
@start()
def capture(self) -> None:
self.state.captured_flow_id = self.flow_id
self.state.captured_state_id = self.state.id
self.state.captured_current_flow_id = current_flow_id.get() or ""
self.state.captured_execution_id = self.execution_id
def test_execution_id_defaults_to_fresh_uuid_per_instance() -> None:
a = _IdentityCaptureFlow()
b = _IdentityCaptureFlow()
assert a.execution_id
assert b.execution_id
assert a.execution_id != b.execution_id
def test_execution_id_survives_consumer_id_in_inputs() -> None:
flow = _IdentityCaptureFlow()
original_execution_id = flow.execution_id
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.state.id == "consumer-supplied-id"
assert flow.flow_id == "consumer-supplied-id"
assert flow.execution_id == original_execution_id
assert flow.execution_id != "consumer-supplied-id"
def test_two_runs_with_same_consumer_id_have_distinct_execution_ids() -> None:
flow_a = _IdentityCaptureFlow()
flow_b = _IdentityCaptureFlow()
colliding_id = "shared-consumer-id"
flow_a.kickoff(inputs={"id": colliding_id})
flow_b.kickoff(inputs={"id": colliding_id})
assert flow_a.state.id == colliding_id
assert flow_b.state.id == colliding_id
assert flow_a.execution_id != flow_b.execution_id
def test_execution_id_is_writable() -> None:
flow = _IdentityCaptureFlow()
flow.execution_id = "external-task-id"
assert flow.execution_id == "external-task-id"
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.execution_id == "external-task-id"
assert flow.state.id == "consumer-supplied-id"
def test_current_flow_id_context_var_matches_execution_id() -> None:
flow = _IdentityCaptureFlow()
flow.execution_id = "external-task-id"
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.state.captured_current_flow_id == "external-task-id"
assert flow.state.captured_flow_id == "consumer-supplied-id"
assert flow.state.captured_execution_id == "external-task-id"
def test_execution_id_not_included_in_serialized_state() -> None:
flow = _IdentityCaptureFlow()
flow.execution_id = "external-task-id"
flow.kickoff()
dumped = flow.state.model_dump()
assert "execution_id" not in dumped
assert "_execution_id" not in dumped
assert dumped["id"] == flow.state.id
def test_dict_state_flow_also_exposes_stable_execution_id() -> None:
class DictFlow(Flow[dict[str, Any]]):
initial_state = dict # type: ignore[assignment]
@start()
def noop(self) -> None:
pass
flow = DictFlow()
original = flow.execution_id
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.state["id"] == "consumer-supplied-id"
assert flow.execution_id == original
@pytest.fixture(autouse=True)
def _reset_flow_context_vars():
yield
for var in (current_flow_id, current_flow_request_id):
try:
var.set(None)
except LookupError:
# ContextVar was never set in this context; nothing to reset.
pass