mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 16:48:30 +00:00
fix: address Bugbot feedback - prevent in-place mutation and don't persist completed task IDs
- Make defensive copy of a2a_task_ids_by_endpoint dict to avoid in-place mutation - Don't persist completed task IDs since A2A protocol rejects terminal state task IDs - Update task_id_config locally for current loop only, not in shared dict - Update tests to verify correct behavior: - Completed task IDs are NOT persisted for reuse - Each new delegation gets a fresh task_id (None) - Completed task IDs are still tracked in reference_task_ids Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -537,8 +537,9 @@ def _delegate_to_a2a(
|
||||
# Use endpoint-scoped task IDs to prevent reusing task IDs across different A2A agents
|
||||
# This fixes the issue where delegating to a second A2A agent fails because the task_id
|
||||
# from the first agent is in "completed" state
|
||||
a2a_task_ids_by_endpoint: dict[str, str] = task_config.get(
|
||||
"a2a_task_ids_by_endpoint", {}
|
||||
# Make a defensive copy to avoid in-place mutation of task.config
|
||||
a2a_task_ids_by_endpoint: dict[str, str] = dict(
|
||||
task_config.get("a2a_task_ids_by_endpoint", {})
|
||||
)
|
||||
task_id_config = a2a_task_ids_by_endpoint.get(agent_id)
|
||||
|
||||
@@ -582,9 +583,9 @@ def _delegate_to_a2a(
|
||||
if conversation_history:
|
||||
latest_message = conversation_history[-1]
|
||||
if latest_message.task_id is not None:
|
||||
# Update task_id_config for the current loop iteration only
|
||||
# Don't persist to a2a_task_ids_by_endpoint yet - wait until we know the status
|
||||
task_id_config = latest_message.task_id
|
||||
# Store the task_id scoped to this endpoint for multi-turn conversations
|
||||
a2a_task_ids_by_endpoint[agent_id] = task_id_config
|
||||
if latest_message.context_id is not None:
|
||||
context_id = latest_message.context_id
|
||||
|
||||
@@ -593,10 +594,11 @@ def _delegate_to_a2a(
|
||||
a2a_result["status"] == "completed"
|
||||
and agent_config.trust_remote_completion_status
|
||||
):
|
||||
# Don't persist completed task IDs - they can't be reused
|
||||
# (A2A protocol rejects task IDs in terminal state)
|
||||
# Only add to reference_task_ids for tracking purposes
|
||||
if task.config is None:
|
||||
task.config = {}
|
||||
# Persist endpoint-scoped task IDs for future delegations
|
||||
task.config["a2a_task_ids_by_endpoint"] = a2a_task_ids_by_endpoint
|
||||
if (
|
||||
task_id_config is not None
|
||||
and task_id_config not in reference_task_ids
|
||||
|
||||
@@ -116,14 +116,6 @@ def test_sequential_delegation_to_multiple_endpoints_uses_separate_task_ids():
|
||||
|
||||
assert result_a == "Done by http://endpoint-a.com/"
|
||||
|
||||
# Verify the endpoint-scoped task IDs are stored in task.config
|
||||
assert task.config is not None
|
||||
assert "a2a_task_ids_by_endpoint" in task.config
|
||||
assert (
|
||||
task.config["a2a_task_ids_by_endpoint"]["http://endpoint-a.com/"]
|
||||
== "task-id-for-http://endpoint-a.com/"
|
||||
)
|
||||
|
||||
# Second delegation to endpoint B
|
||||
result_b = _delegate_to_a2a(
|
||||
self=agent,
|
||||
@@ -150,12 +142,12 @@ def test_sequential_delegation_to_multiple_endpoints_uses_separate_task_ids():
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_multi_turn_conversation_with_same_endpoint_reuses_task_id():
|
||||
"""Multi-turn conversations with the same endpoint should reuse the task_id.
|
||||
def test_completed_task_ids_are_not_persisted_for_reuse():
|
||||
"""Completed task IDs should NOT be persisted for reuse.
|
||||
|
||||
This test ensures that the fix for issue #4166 doesn't break multi-turn
|
||||
conversations with the same endpoint. When trust_remote_completion_status=True,
|
||||
the task_id should be stored and reused for subsequent calls to the same endpoint.
|
||||
The A2A protocol rejects task IDs that are in terminal state (completed/failed).
|
||||
This test verifies that completed task IDs are not stored in task.config
|
||||
for future delegations, so each new delegation gets a fresh task_id.
|
||||
"""
|
||||
from crewai.a2a.wrapper import _delegate_to_a2a
|
||||
from crewai import Agent, Task
|
||||
@@ -187,7 +179,7 @@ def test_multi_turn_conversation_with_same_endpoint_reuses_task_id():
|
||||
|
||||
# Create a mock message with a task_id
|
||||
mock_message = MagicMock()
|
||||
mock_message.task_id = "persistent-task-id"
|
||||
mock_message.task_id = "completed-task-id"
|
||||
mock_message.context_id = None
|
||||
|
||||
return {
|
||||
@@ -219,15 +211,14 @@ def test_multi_turn_conversation_with_same_endpoint_reuses_task_id():
|
||||
original_task_description="test",
|
||||
)
|
||||
|
||||
# Verify the task_id was stored in the endpoint-scoped dictionary
|
||||
assert task.config is not None
|
||||
assert "a2a_task_ids_by_endpoint" in task.config
|
||||
stored_task_id = task.config["a2a_task_ids_by_endpoint"].get(
|
||||
"http://test-endpoint.com/"
|
||||
)
|
||||
assert stored_task_id == "persistent-task-id"
|
||||
# Verify that completed task IDs are NOT stored in a2a_task_ids_by_endpoint
|
||||
# because they can't be reused (A2A protocol rejects terminal state task IDs)
|
||||
if task.config is not None:
|
||||
a2a_task_ids = task.config.get("a2a_task_ids_by_endpoint", {})
|
||||
# The endpoint should NOT have a stored task_id since it completed
|
||||
assert "http://test-endpoint.com/" not in a2a_task_ids
|
||||
|
||||
# Second delegation to the SAME endpoint should use the stored task_id
|
||||
# Second delegation to the SAME endpoint should also get a fresh task_id
|
||||
_delegate_to_a2a(
|
||||
self=agent,
|
||||
agent_response=MockResponse(),
|
||||
@@ -239,17 +230,20 @@ def test_multi_turn_conversation_with_same_endpoint_reuses_task_id():
|
||||
original_task_description="test",
|
||||
)
|
||||
|
||||
# Verify that the second call used the stored task_id
|
||||
# Verify that BOTH calls used None as task_id (fresh task for each)
|
||||
# because completed task IDs are not persisted
|
||||
assert len(task_ids_used) == 2
|
||||
# First call should have no task_id (new conversation)
|
||||
assert task_ids_used[0] is None
|
||||
# Second call should reuse the task_id from the first call
|
||||
assert task_ids_used[1] == "persistent-task-id"
|
||||
assert task_ids_used[0] is None # First call - new conversation
|
||||
assert task_ids_used[1] is None # Second call - also new (completed IDs not reused)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_endpoint_scoped_task_ids_are_persisted_to_task_config():
|
||||
"""Verify that endpoint-scoped task IDs are properly persisted to task.config."""
|
||||
def test_reference_task_ids_are_tracked_for_completed_tasks():
|
||||
"""Completed task IDs should be added to reference_task_ids for tracking.
|
||||
|
||||
While completed task IDs can't be reused for new delegations, they should
|
||||
still be tracked in reference_task_ids for context/history purposes.
|
||||
"""
|
||||
from crewai.a2a.wrapper import _delegate_to_a2a
|
||||
from crewai import Agent, Task
|
||||
|
||||
@@ -305,10 +299,7 @@ def test_endpoint_scoped_task_ids_are_persisted_to_task_config():
|
||||
original_task_description="test",
|
||||
)
|
||||
|
||||
# Verify the endpoint-scoped task IDs are stored
|
||||
# Verify the completed task_id is tracked in reference_task_ids
|
||||
assert task.config is not None
|
||||
assert "a2a_task_ids_by_endpoint" in task.config
|
||||
assert (
|
||||
task.config["a2a_task_ids_by_endpoint"]["http://test-endpoint.com/"]
|
||||
== "unique-task-id-123"
|
||||
)
|
||||
assert "reference_task_ids" in task.config
|
||||
assert "unique-task-id-123" in task.config["reference_task_ids"]
|
||||
|
||||
Reference in New Issue
Block a user