Compare commits

...

1 Commits

Author SHA1 Message Date
Devin AI
154544c155 fix: add collision-resistant task run IDs across crew/flow executions
Fixes #4607

- Add  property to Task that combines the deterministic
  (MD5 of description+expected_output) with the unique instance uid=1000(ubuntu) gid=1000(ubuntu) groups=1000(ubuntu),27(sudo),999(docker)
  (uuid4) to produce a collision-resistant identifier per execution.
- Update telemetry utils and spans to emit  alongside
  the existing  and  attributes.
- Keep  unchanged for structural identity (crew composition,
  copy/clone mapping).
- Add regression tests asserting unique run_ids across repeated
  runs with identical task definitions.

Co-Authored-By: João <joao@crewai.com>
2026-02-26 12:09:30 +00:00
4 changed files with 121 additions and 0 deletions

View File

@@ -510,6 +510,21 @@ class Task(BaseModel):
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@property
def run_id(self) -> str:
"""Generate a collision-resistant run ID for this task execution.
Combines the deterministic task key with the unique instance UUID
to produce an identifier that is unique per task instance while
still being tied to the task's structural identity.
Returns:
str: A collision-resistant hexadecimal run ID.
"""
return md5(
f"{self.key}|{self.id}".encode(), usedforsecurity=False
).hexdigest()
@property
def execution_duration(self) -> float | None:
if not self.start_time or not self.end_time:

View File

@@ -360,6 +360,7 @@ class Telemetry:
{
"key": task.key,
"id": str(task.id),
"run_id": task.run_id,
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
@@ -444,6 +445,7 @@ class Telemetry:
{
"key": task.key,
"id": str(task.id),
"run_id": task.run_id,
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": (
@@ -852,6 +854,7 @@ class Telemetry:
[
{
"id": str(task.id),
"run_id": task.run_id,
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
@@ -902,6 +905,7 @@ class Telemetry:
[
{
"id": str(task.id),
"run_id": task.run_id,
"description": task.description,
"output": task.output.raw if task.output else "",
}

View File

@@ -80,6 +80,7 @@ def add_task_attributes(
"""
add_attribute_fn(span, "task_key", task.key)
add_attribute_fn(span, "task_id", str(task.id))
add_attribute_fn(span, "task_run_id", task.run_id)
if include_fingerprint and hasattr(task, "fingerprint") and task.fingerprint:
add_attribute_fn(span, "task_fingerprint", task.fingerprint.uuid_str)

View File

@@ -1093,6 +1093,107 @@ def test_key():
)
def test_run_id_is_collision_resistant():
"""Test that run_id is unique across different task instances with the same definition."""
description = "Analyze the market trends"
expected_output = "A detailed market analysis report."
task1 = Task(description=description, expected_output=expected_output)
task2 = Task(description=description, expected_output=expected_output)
# Same definition should produce the same key
assert task1.key == task2.key, (
"Tasks with identical definitions should have the same key."
)
# Different instances should produce different run_ids
assert task1.run_id != task2.run_id, (
"Tasks with identical definitions but different instances should have different run_ids."
)
def test_run_id_is_stable_for_same_instance():
"""Test that run_id is deterministic for the same task instance."""
task = Task(
description="Analyze the market trends",
expected_output="A detailed market analysis report.",
)
# run_id should be stable across multiple accesses
assert task.run_id == task.run_id, (
"run_id should be stable for the same task instance."
)
def test_run_id_format():
"""Test that run_id is a valid hex digest string."""
task = Task(
description="Analyze the market trends",
expected_output="A detailed market analysis report.",
)
# run_id should be a 32-character hex string (MD5 digest)
assert len(task.run_id) == 32, "run_id should be a 32-character hex digest."
assert all(c in "0123456789abcdef" for c in task.run_id), (
"run_id should only contain hex characters."
)
def test_run_id_incorporates_key_and_id():
"""Test that run_id is derived from both key and id."""
task = Task(
description="Analyze the market trends",
expected_output="A detailed market analysis report.",
)
expected_run_id = md5(
f"{task.key}|{task.id}".encode(), usedforsecurity=False
).hexdigest()
assert task.run_id == expected_run_id, (
"run_id should be the MD5 hash of key|id."
)
def test_run_id_unique_across_repeated_crew_runs():
"""Test that equivalent task-name fixtures across runs produce unique run_ids.
This is the core regression test for issue #4607: repeated crew/flow
executions with the same task names must not collide on run IDs.
"""
description = "Summarize the quarterly report"
expected_output = "A concise summary of key findings."
collected_run_ids: set[str] = set()
num_runs = 50
for _ in range(num_runs):
task = Task(description=description, expected_output=expected_output)
collected_run_ids.add(task.run_id)
assert len(collected_run_ids) == num_runs, (
f"Expected {num_runs} unique run_ids across repeated runs, "
f"but only got {len(collected_run_ids)}."
)
def test_run_id_stable_after_interpolation():
"""Test that run_id remains stable after input interpolation."""
task = Task(
description="Analyze {topic} trends",
expected_output="A report about {topic}.",
)
run_id_before = task.run_id
task.interpolate_inputs_and_add_conversation_history(inputs={"topic": "AI"})
run_id_after = task.run_id
assert run_id_before == run_id_after, (
"run_id should remain stable after input interpolation since "
"key uses original description and id doesn't change."
)
def test_output_file_validation(tmp_path):
"""Test output file path validation."""
# Valid paths