mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 14:52:36 +00:00
fix: add collision-resistant task run IDs across crew/flow executions
Fixes #4607 - Add property to Task that combines the deterministic (MD5 of description+expected_output) with the unique instance uid=1000(ubuntu) gid=1000(ubuntu) groups=1000(ubuntu),27(sudo),999(docker) (uuid4) to produce a collision-resistant identifier per execution. - Update telemetry utils and spans to emit alongside the existing and attributes. - Keep unchanged for structural identity (crew composition, copy/clone mapping). - Add regression tests asserting unique run_ids across repeated runs with identical task definitions. Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -510,6 +510,21 @@ class Task(BaseModel):
|
||||
|
||||
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
|
||||
|
||||
@property
|
||||
def run_id(self) -> str:
|
||||
"""Generate a collision-resistant run ID for this task execution.
|
||||
|
||||
Combines the deterministic task key with the unique instance UUID
|
||||
to produce an identifier that is unique per task instance while
|
||||
still being tied to the task's structural identity.
|
||||
|
||||
Returns:
|
||||
str: A collision-resistant hexadecimal run ID.
|
||||
"""
|
||||
return md5(
|
||||
f"{self.key}|{self.id}".encode(), usedforsecurity=False
|
||||
).hexdigest()
|
||||
|
||||
@property
|
||||
def execution_duration(self) -> float | None:
|
||||
if not self.start_time or not self.end_time:
|
||||
|
||||
@@ -360,6 +360,7 @@ class Telemetry:
|
||||
{
|
||||
"key": task.key,
|
||||
"id": str(task.id),
|
||||
"run_id": task.run_id,
|
||||
"description": task.description,
|
||||
"expected_output": task.expected_output,
|
||||
"async_execution?": task.async_execution,
|
||||
@@ -444,6 +445,7 @@ class Telemetry:
|
||||
{
|
||||
"key": task.key,
|
||||
"id": str(task.id),
|
||||
"run_id": task.run_id,
|
||||
"async_execution?": task.async_execution,
|
||||
"human_input?": task.human_input,
|
||||
"agent_role": (
|
||||
@@ -852,6 +854,7 @@ class Telemetry:
|
||||
[
|
||||
{
|
||||
"id": str(task.id),
|
||||
"run_id": task.run_id,
|
||||
"description": task.description,
|
||||
"expected_output": task.expected_output,
|
||||
"async_execution?": task.async_execution,
|
||||
@@ -902,6 +905,7 @@ class Telemetry:
|
||||
[
|
||||
{
|
||||
"id": str(task.id),
|
||||
"run_id": task.run_id,
|
||||
"description": task.description,
|
||||
"output": task.output.raw if task.output else "",
|
||||
}
|
||||
|
||||
@@ -80,6 +80,7 @@ def add_task_attributes(
|
||||
"""
|
||||
add_attribute_fn(span, "task_key", task.key)
|
||||
add_attribute_fn(span, "task_id", str(task.id))
|
||||
add_attribute_fn(span, "task_run_id", task.run_id)
|
||||
|
||||
if include_fingerprint and hasattr(task, "fingerprint") and task.fingerprint:
|
||||
add_attribute_fn(span, "task_fingerprint", task.fingerprint.uuid_str)
|
||||
|
||||
@@ -1093,6 +1093,107 @@ def test_key():
|
||||
)
|
||||
|
||||
|
||||
def test_run_id_is_collision_resistant():
|
||||
"""Test that run_id is unique across different task instances with the same definition."""
|
||||
description = "Analyze the market trends"
|
||||
expected_output = "A detailed market analysis report."
|
||||
|
||||
task1 = Task(description=description, expected_output=expected_output)
|
||||
task2 = Task(description=description, expected_output=expected_output)
|
||||
|
||||
# Same definition should produce the same key
|
||||
assert task1.key == task2.key, (
|
||||
"Tasks with identical definitions should have the same key."
|
||||
)
|
||||
|
||||
# Different instances should produce different run_ids
|
||||
assert task1.run_id != task2.run_id, (
|
||||
"Tasks with identical definitions but different instances should have different run_ids."
|
||||
)
|
||||
|
||||
|
||||
def test_run_id_is_stable_for_same_instance():
|
||||
"""Test that run_id is deterministic for the same task instance."""
|
||||
task = Task(
|
||||
description="Analyze the market trends",
|
||||
expected_output="A detailed market analysis report.",
|
||||
)
|
||||
|
||||
# run_id should be stable across multiple accesses
|
||||
assert task.run_id == task.run_id, (
|
||||
"run_id should be stable for the same task instance."
|
||||
)
|
||||
|
||||
|
||||
def test_run_id_format():
|
||||
"""Test that run_id is a valid hex digest string."""
|
||||
task = Task(
|
||||
description="Analyze the market trends",
|
||||
expected_output="A detailed market analysis report.",
|
||||
)
|
||||
|
||||
# run_id should be a 32-character hex string (MD5 digest)
|
||||
assert len(task.run_id) == 32, "run_id should be a 32-character hex digest."
|
||||
assert all(c in "0123456789abcdef" for c in task.run_id), (
|
||||
"run_id should only contain hex characters."
|
||||
)
|
||||
|
||||
|
||||
def test_run_id_incorporates_key_and_id():
|
||||
"""Test that run_id is derived from both key and id."""
|
||||
task = Task(
|
||||
description="Analyze the market trends",
|
||||
expected_output="A detailed market analysis report.",
|
||||
)
|
||||
|
||||
expected_run_id = md5(
|
||||
f"{task.key}|{task.id}".encode(), usedforsecurity=False
|
||||
).hexdigest()
|
||||
|
||||
assert task.run_id == expected_run_id, (
|
||||
"run_id should be the MD5 hash of key|id."
|
||||
)
|
||||
|
||||
|
||||
def test_run_id_unique_across_repeated_crew_runs():
|
||||
"""Test that equivalent task-name fixtures across runs produce unique run_ids.
|
||||
|
||||
This is the core regression test for issue #4607: repeated crew/flow
|
||||
executions with the same task names must not collide on run IDs.
|
||||
"""
|
||||
description = "Summarize the quarterly report"
|
||||
expected_output = "A concise summary of key findings."
|
||||
|
||||
collected_run_ids: set[str] = set()
|
||||
num_runs = 50
|
||||
|
||||
for _ in range(num_runs):
|
||||
task = Task(description=description, expected_output=expected_output)
|
||||
collected_run_ids.add(task.run_id)
|
||||
|
||||
assert len(collected_run_ids) == num_runs, (
|
||||
f"Expected {num_runs} unique run_ids across repeated runs, "
|
||||
f"but only got {len(collected_run_ids)}."
|
||||
)
|
||||
|
||||
|
||||
def test_run_id_stable_after_interpolation():
|
||||
"""Test that run_id remains stable after input interpolation."""
|
||||
task = Task(
|
||||
description="Analyze {topic} trends",
|
||||
expected_output="A report about {topic}.",
|
||||
)
|
||||
run_id_before = task.run_id
|
||||
|
||||
task.interpolate_inputs_and_add_conversation_history(inputs={"topic": "AI"})
|
||||
run_id_after = task.run_id
|
||||
|
||||
assert run_id_before == run_id_after, (
|
||||
"run_id should remain stable after input interpolation since "
|
||||
"key uses original description and id doesn't change."
|
||||
)
|
||||
|
||||
|
||||
def test_output_file_validation(tmp_path):
|
||||
"""Test output file path validation."""
|
||||
# Valid paths
|
||||
|
||||
Reference in New Issue
Block a user