diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 8e26e7e38..c365cdb90 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -537,8 +537,9 @@ def _delegate_to_a2a( # Use endpoint-scoped task IDs to prevent reusing task IDs across different A2A agents # This fixes the issue where delegating to a second A2A agent fails because the task_id # from the first agent is in "completed" state - a2a_task_ids_by_endpoint: dict[str, str] = task_config.get( - "a2a_task_ids_by_endpoint", {} + # Make a defensive copy to avoid in-place mutation of task.config + a2a_task_ids_by_endpoint: dict[str, str] = dict( + task_config.get("a2a_task_ids_by_endpoint", {}) ) task_id_config = a2a_task_ids_by_endpoint.get(agent_id) @@ -582,9 +583,9 @@ def _delegate_to_a2a( if conversation_history: latest_message = conversation_history[-1] if latest_message.task_id is not None: + # Update task_id_config for the current loop iteration only + # Don't persist to a2a_task_ids_by_endpoint yet - wait until we know the status task_id_config = latest_message.task_id - # Store the task_id scoped to this endpoint for multi-turn conversations - a2a_task_ids_by_endpoint[agent_id] = task_id_config if latest_message.context_id is not None: context_id = latest_message.context_id @@ -593,10 +594,11 @@ def _delegate_to_a2a( a2a_result["status"] == "completed" and agent_config.trust_remote_completion_status ): + # Don't persist completed task IDs - they can't be reused + # (A2A protocol rejects task IDs in terminal state) + # Only add to reference_task_ids for tracking purposes if task.config is None: task.config = {} - # Persist endpoint-scoped task IDs for future delegations - task.config["a2a_task_ids_by_endpoint"] = a2a_task_ids_by_endpoint if ( task_id_config is not None and task_id_config not in reference_task_ids diff --git a/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py b/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py index 332af8cc9..d0496fc4a 100644 --- a/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py +++ b/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py @@ -116,14 +116,6 @@ def test_sequential_delegation_to_multiple_endpoints_uses_separate_task_ids(): assert result_a == "Done by http://endpoint-a.com/" - # Verify the endpoint-scoped task IDs are stored in task.config - assert task.config is not None - assert "a2a_task_ids_by_endpoint" in task.config - assert ( - task.config["a2a_task_ids_by_endpoint"]["http://endpoint-a.com/"] - == "task-id-for-http://endpoint-a.com/" - ) - # Second delegation to endpoint B result_b = _delegate_to_a2a( self=agent, @@ -150,12 +142,12 @@ def test_sequential_delegation_to_multiple_endpoints_uses_separate_task_ids(): @pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") -def test_multi_turn_conversation_with_same_endpoint_reuses_task_id(): - """Multi-turn conversations with the same endpoint should reuse the task_id. +def test_completed_task_ids_are_not_persisted_for_reuse(): + """Completed task IDs should NOT be persisted for reuse. - This test ensures that the fix for issue #4166 doesn't break multi-turn - conversations with the same endpoint. When trust_remote_completion_status=True, - the task_id should be stored and reused for subsequent calls to the same endpoint. + The A2A protocol rejects task IDs that are in terminal state (completed/failed). + This test verifies that completed task IDs are not stored in task.config + for future delegations, so each new delegation gets a fresh task_id. """ from crewai.a2a.wrapper import _delegate_to_a2a from crewai import Agent, Task @@ -187,7 +179,7 @@ def test_multi_turn_conversation_with_same_endpoint_reuses_task_id(): # Create a mock message with a task_id mock_message = MagicMock() - mock_message.task_id = "persistent-task-id" + mock_message.task_id = "completed-task-id" mock_message.context_id = None return { @@ -219,15 +211,14 @@ def test_multi_turn_conversation_with_same_endpoint_reuses_task_id(): original_task_description="test", ) - # Verify the task_id was stored in the endpoint-scoped dictionary - assert task.config is not None - assert "a2a_task_ids_by_endpoint" in task.config - stored_task_id = task.config["a2a_task_ids_by_endpoint"].get( - "http://test-endpoint.com/" - ) - assert stored_task_id == "persistent-task-id" + # Verify that completed task IDs are NOT stored in a2a_task_ids_by_endpoint + # because they can't be reused (A2A protocol rejects terminal state task IDs) + if task.config is not None: + a2a_task_ids = task.config.get("a2a_task_ids_by_endpoint", {}) + # The endpoint should NOT have a stored task_id since it completed + assert "http://test-endpoint.com/" not in a2a_task_ids - # Second delegation to the SAME endpoint should use the stored task_id + # Second delegation to the SAME endpoint should also get a fresh task_id _delegate_to_a2a( self=agent, agent_response=MockResponse(), @@ -239,17 +230,20 @@ def test_multi_turn_conversation_with_same_endpoint_reuses_task_id(): original_task_description="test", ) - # Verify that the second call used the stored task_id + # Verify that BOTH calls used None as task_id (fresh task for each) + # because completed task IDs are not persisted assert len(task_ids_used) == 2 - # First call should have no task_id (new conversation) - assert task_ids_used[0] is None - # Second call should reuse the task_id from the first call - assert task_ids_used[1] == "persistent-task-id" + assert task_ids_used[0] is None # First call - new conversation + assert task_ids_used[1] is None # Second call - also new (completed IDs not reused) @pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") -def test_endpoint_scoped_task_ids_are_persisted_to_task_config(): - """Verify that endpoint-scoped task IDs are properly persisted to task.config.""" +def test_reference_task_ids_are_tracked_for_completed_tasks(): + """Completed task IDs should be added to reference_task_ids for tracking. + + While completed task IDs can't be reused for new delegations, they should + still be tracked in reference_task_ids for context/history purposes. + """ from crewai.a2a.wrapper import _delegate_to_a2a from crewai import Agent, Task @@ -305,10 +299,7 @@ def test_endpoint_scoped_task_ids_are_persisted_to_task_config(): original_task_description="test", ) - # Verify the endpoint-scoped task IDs are stored + # Verify the completed task_id is tracked in reference_task_ids assert task.config is not None - assert "a2a_task_ids_by_endpoint" in task.config - assert ( - task.config["a2a_task_ids_by_endpoint"]["http://test-endpoint.com/"] - == "unique-task-id-123" - ) + assert "reference_task_ids" in task.config + assert "unique-task-id-123" in task.config["reference_task_ids"]