Compare commits

..

3 Commits

Author SHA1 Message Date
Devin AI
4f13153483 fix: handle None value for a2a_task_ids_by_endpoint in config
Address Bugbot feedback - handle case where a2a_task_ids_by_endpoint
is explicitly set to None (e.g., from JSON/YAML config) to avoid
TypeError when calling dict() on None.

Co-Authored-By: João <joao@crewai.com>
2026-01-01 08:17:38 +00:00
Devin AI
f171805092 fix: address Bugbot feedback - prevent in-place mutation and don't persist completed task IDs
- Make defensive copy of a2a_task_ids_by_endpoint dict to avoid in-place mutation
- Don't persist completed task IDs since A2A protocol rejects terminal state task IDs
- Update task_id_config locally for current loop only, not in shared dict
- Update tests to verify correct behavior:
  - Completed task IDs are NOT persisted for reuse
  - Each new delegation gets a fresh task_id (None)
  - Completed task IDs are still tracked in reference_task_ids

Co-Authored-By: João <joao@crewai.com>
2026-01-01 08:03:00 +00:00
Devin AI
ba56c73ac1 fix: use endpoint-scoped task IDs for A2A delegations
This fixes issue #4166 where delegating to a second A2A agent fails
because the task_id from the first agent is in 'completed' state.

The fix introduces endpoint-scoped task ID storage in task.config
using a2a_task_ids_by_endpoint dictionary. This ensures that:
- Each A2A endpoint gets its own task_id
- Multi-turn conversations with the same endpoint reuse the task_id
- Sequential delegations to different endpoints use separate task_ids

Added tests to verify:
- Sequential delegation to multiple endpoints uses separate task IDs
- Multi-turn conversations with same endpoint reuse task IDs
- Endpoint-scoped task IDs are properly persisted to task.config

Co-Authored-By: João <joao@crewai.com>
2026-01-01 07:44:52 +00:00
8 changed files with 364 additions and 657 deletions

View File

@@ -531,10 +531,20 @@ def _delegate_to_a2a(
agent_config = next(filter(lambda x: x.endpoint == agent_id, a2a_agents))
task_config = task.config or {}
context_id = task_config.get("context_id")
task_id_config = task_config.get("task_id")
metadata = task_config.get("metadata")
extensions = task_config.get("extensions")
# Use endpoint-scoped task IDs to prevent reusing task IDs across different A2A agents
# This fixes the issue where delegating to a second A2A agent fails because the task_id
# from the first agent is in "completed" state
# Make a defensive copy to avoid in-place mutation of task.config
# Handle case where value is explicitly None (e.g., from JSON/YAML config)
existing_task_ids = task_config.get("a2a_task_ids_by_endpoint")
a2a_task_ids_by_endpoint: dict[str, str] = (
dict(existing_task_ids) if existing_task_ids else {}
)
task_id_config = a2a_task_ids_by_endpoint.get(agent_id)
reference_task_ids = task_config.get("reference_task_ids", [])
if original_task_description is None:
@@ -575,6 +585,8 @@ def _delegate_to_a2a(
if conversation_history:
latest_message = conversation_history[-1]
if latest_message.task_id is not None:
# Update task_id_config for the current loop iteration only
# Don't persist to a2a_task_ids_by_endpoint yet - wait until we know the status
task_id_config = latest_message.task_id
if latest_message.context_id is not None:
context_id = latest_message.context_id
@@ -584,13 +596,16 @@ def _delegate_to_a2a(
a2a_result["status"] == "completed"
and agent_config.trust_remote_completion_status
):
# Don't persist completed task IDs - they can't be reused
# (A2A protocol rejects task IDs in terminal state)
# Only add to reference_task_ids for tracking purposes
if task.config is None:
task.config = {}
if (
task_id_config is not None
and task_id_config not in reference_task_ids
):
reference_task_ids.append(task_id_config)
if task.config is None:
task.config = {}
task.config["reference_task_ids"] = reference_task_ids
result_text = a2a_result.get("result", "")

View File

@@ -7,7 +7,6 @@ from copy import copy as shallow_copy
from hashlib import md5
import json
import re
import threading
from typing import (
Any,
cast,
@@ -204,10 +203,6 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
workflow_token_metrics: Any | None = Field(
default=None,
description="Detailed per-agent and per-task token metrics.",
)
manager_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
description="Language model that will run the agent.", default=None
)
@@ -949,36 +944,17 @@ class Crew(FlowTrackable, BaseModel):
continue
if task.async_execution:
# Capture token usage before async task execution
tokens_before = self._get_agent_token_usage(exec_data.agent)
context = self._get_context(
task, [last_sync_output] if last_sync_output else []
)
# Wrap task execution to capture tokens immediately after completion
# Use default arguments to bind loop variables at definition time (fixes B023)
agent = exec_data.agent
tools = exec_data.tools
async def _wrapped_task_execution(
_task=task,
_agent=agent,
_tools=tools,
_context=context,
):
result = await _task.aexecute_sync(
agent=_agent,
context=_context,
tools=_tools,
async_task = asyncio.create_task(
task.aexecute_sync(
agent=exec_data.agent,
context=context,
tools=exec_data.tools,
)
# Capture tokens immediately after task completes
# This reduces (but doesn't eliminate) race conditions
tokens_after = self._get_agent_token_usage(_agent)
return result, tokens_after
async_task = asyncio.create_task(_wrapped_task_execution())
pending_tasks.append((task, async_task, task_index, exec_data.agent, tokens_before))
)
pending_tasks.append((task, async_task, task_index))
else:
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(
@@ -986,22 +962,12 @@ class Crew(FlowTrackable, BaseModel):
)
pending_tasks.clear()
# Capture token usage before task execution
tokens_before = self._get_agent_token_usage(exec_data.agent)
context = self._get_context(task, task_outputs)
task_output = await task.aexecute_sync(
agent=exec_data.agent,
context=context,
tools=exec_data.tools,
)
# Capture token usage after task execution and attach to task output
tokens_after = self._get_agent_token_usage(exec_data.agent)
task_output = self._attach_task_token_metrics(
task_output, task, exec_data.agent, tokens_before, tokens_after
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
@@ -1015,7 +981,7 @@ class Crew(FlowTrackable, BaseModel):
self,
task: ConditionalTask,
task_outputs: list[TaskOutput],
pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, Any]], int, Any, Any]],
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]],
task_index: int,
was_replayed: bool,
) -> TaskOutput | None:
@@ -1030,20 +996,13 @@ class Crew(FlowTrackable, BaseModel):
async def _aprocess_async_tasks(
self,
pending_tasks: list[tuple[Task, asyncio.Task[tuple[TaskOutput, Any]], int, Any, Any]],
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]],
was_replayed: bool = False,
) -> list[TaskOutput]:
"""Process pending async tasks and return their outputs."""
task_outputs: list[TaskOutput] = []
for future_task, async_task, task_index, agent, tokens_before in pending_tasks:
# Unwrap the result which includes both output and tokens_after
task_output, tokens_after = await async_task
# Attach token metrics using the captured tokens_after
task_output = self._attach_task_token_metrics(
task_output, future_task, agent, tokens_before, tokens_after
)
for future_task, async_task, task_index in pending_tasks:
task_output = await async_task
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._store_execution_log(
@@ -1163,14 +1122,9 @@ class Crew(FlowTrackable, BaseModel):
"""
task_outputs: list[TaskOutput] = []
futures: list[tuple[Task, Future[TaskOutput | tuple[TaskOutput, Any]], int, Any, Any]] = []
futures: list[tuple[Task, Future[TaskOutput], int]] = []
last_sync_output: TaskOutput | None = None
# Per-agent locks to serialize async task execution for accurate token tracking
# This ensures that when multiple async tasks from the same agent run,
# they execute one at a time so token deltas can be accurately attributed
agent_locks: dict[str, threading.Lock] = {}
for task_index, task in enumerate(tasks):
exec_data, task_outputs, last_sync_output = prepare_task_execution(
self, task, task_index, start_index, task_outputs, last_sync_output
@@ -1190,50 +1144,23 @@ class Crew(FlowTrackable, BaseModel):
context = self._get_context(
task, [last_sync_output] if last_sync_output else []
)
# Get or create a lock for this agent to serialize async task execution
# This ensures accurate per-task token tracking
agent_id = str(getattr(exec_data.agent, 'id', id(exec_data.agent)))
if agent_id not in agent_locks:
agent_locks[agent_id] = threading.Lock()
agent_lock = agent_locks[agent_id]
# Create a token capture callback that will be called inside the thread
# after task completion (while still holding the lock)
def create_token_callback(agent: Any = exec_data.agent) -> Any:
return self._get_agent_token_usage(agent)
future = task.execute_async(
agent=exec_data.agent,
context=context,
tools=exec_data.tools,
token_capture_callback=create_token_callback,
agent_execution_lock=agent_lock,
)
# Note: tokens_before is no longer captured here since it will be
# captured inside the thread after acquiring the lock
futures.append((task, future, task_index, exec_data.agent, None))
futures.append((task, future, task_index))
else:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
# Capture token usage before task execution
tokens_before = self._get_agent_token_usage(exec_data.agent)
context = self._get_context(task, task_outputs)
task_output = task.execute_sync(
agent=exec_data.agent,
context=context,
tools=exec_data.tools,
)
# Capture token usage after task execution and attach to task output
tokens_after = self._get_agent_token_usage(exec_data.agent)
task_output = self._attach_task_token_metrics(
task_output, task, exec_data.agent, tokens_before, tokens_after
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
@@ -1247,7 +1174,7 @@ class Crew(FlowTrackable, BaseModel):
self,
task: ConditionalTask,
task_outputs: list[TaskOutput],
futures: list[tuple[Task, Future[TaskOutput | tuple[TaskOutput, Any, Any]], int, Any, Any]],
futures: list[tuple[Task, Future[TaskOutput], int]],
task_index: int,
was_replayed: bool,
) -> TaskOutput | None:
@@ -1474,38 +1401,16 @@ class Crew(FlowTrackable, BaseModel):
json_dict=final_task_output.json_dict,
tasks_output=task_outputs,
token_usage=self.token_usage,
token_metrics=getattr(self, 'workflow_token_metrics', None),
)
def _process_async_tasks(
self,
futures: list[tuple[Task, Future[TaskOutput | tuple[TaskOutput, Any, Any]], int, Any, Any]],
futures: list[tuple[Task, Future[TaskOutput], int]],
was_replayed: bool = False,
) -> list[TaskOutput]:
"""Process completed async tasks and attach token metrics.
The futures contain either:
- TaskOutput (if no token tracking was enabled)
- tuple of (TaskOutput, tokens_before, tokens_after) (if token tracking was enabled)
Token tracking is enabled when the task was executed with a token_capture_callback
and agent_execution_lock, which ensures accurate per-task token attribution even
when multiple async tasks from the same agent run concurrently.
"""
task_outputs: list[TaskOutput] = []
for future_task, future, task_index, agent, _ in futures:
result = future.result()
# Check if result is a tuple (token tracking enabled) or just TaskOutput
if isinstance(result, tuple) and len(result) == 3:
task_output, tokens_before, tokens_after = result
task_output = self._attach_task_token_metrics(
task_output, future_task, agent, tokens_before, tokens_after
)
else:
# No token tracking - result is just TaskOutput
task_output = result
for future_task, future, task_index in futures:
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._store_execution_log(
@@ -1711,83 +1616,12 @@ class Crew(FlowTrackable, BaseModel):
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
from crewai.types.usage_metrics import (
AgentTokenMetrics,
WorkflowTokenMetrics,
)
total_usage_metrics = UsageMetrics()
# Preserve existing workflow_token_metrics if it exists (has per_task data)
if hasattr(self, 'workflow_token_metrics') and self.workflow_token_metrics:
workflow_metrics = self.workflow_token_metrics
else:
workflow_metrics = WorkflowTokenMetrics()
# Build per-agent metrics from per-task data (more accurate)
# This avoids the cumulative token issue where all agents show the same total
# Key by agent_id to handle multiple agents with the same role
agent_token_sums = {}
agent_info_map = {} # Map agent_id to (agent_name, agent_id)
# First, build a map of all agents by their ID
for agent in self.agents:
agent_role = getattr(agent, 'role', 'Unknown Agent')
agent_id = str(getattr(agent, 'id', ''))
agent_info_map[agent_id] = (agent_role, agent_id)
if workflow_metrics.per_task:
# Sum up tokens for each agent from their tasks
# We need to find which agent_id corresponds to each task's agent_name
for task_metrics in workflow_metrics.per_task.values():
agent_name = task_metrics.agent_name
# Find the agent_id for this agent_name from agent_info_map
# For now, we'll use the agent_name as a temporary key but this needs improvement
# TODO: Store agent_id in TaskTokenMetrics to avoid this lookup
matching_agent_ids = [aid for aid, (name, _) in agent_info_map.items() if name == agent_name]
# Use the first matching agent_id (limitation: can't distinguish between same-role agents)
# This is better than nothing but ideally we'd store agent_id in TaskTokenMetrics
for agent_id in matching_agent_ids:
if agent_id not in agent_token_sums:
agent_token_sums[agent_id] = {
'total_tokens': 0,
'prompt_tokens': 0,
'cached_prompt_tokens': 0,
'completion_tokens': 0,
'successful_requests': 0
}
# Only add to the first matching agent (this is the limitation)
agent_token_sums[agent_id]['total_tokens'] += task_metrics.total_tokens
agent_token_sums[agent_id]['prompt_tokens'] += task_metrics.prompt_tokens
agent_token_sums[agent_id]['cached_prompt_tokens'] += task_metrics.cached_prompt_tokens
agent_token_sums[agent_id]['completion_tokens'] += task_metrics.completion_tokens
agent_token_sums[agent_id]['successful_requests'] += task_metrics.successful_requests
break # Only add to first matching agent
# Create per-agent metrics from the summed task data, keyed by agent_id
for agent in self.agents:
agent_role = getattr(agent, 'role', 'Unknown Agent')
agent_id = str(getattr(agent, 'id', ''))
if agent_id in agent_token_sums:
# Use accurate per-task summed data
sums = agent_token_sums[agent_id]
agent_metrics = AgentTokenMetrics(
agent_name=agent_role,
agent_id=agent_id,
total_tokens=sums['total_tokens'],
prompt_tokens=sums['prompt_tokens'],
cached_prompt_tokens=sums['cached_prompt_tokens'],
completion_tokens=sums['completion_tokens'],
successful_requests=sums['successful_requests']
)
# Key by agent_id to avoid collision for agents with same role
workflow_metrics.per_agent[agent_id] = agent_metrics
# Still get total usage for overall metrics
if isinstance(agent.llm, BaseLLM):
llm_usage = agent.llm.get_token_usage_summary()
total_usage_metrics.add_usage_metrics(llm_usage)
else:
# fallback litellm
@@ -1795,65 +1629,22 @@ class Crew(FlowTrackable, BaseModel):
token_sum = agent._token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
if self.manager_agent:
manager_role = getattr(self.manager_agent, 'role', 'Manager Agent')
manager_id = str(getattr(self.manager_agent, 'id', ''))
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
if hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
if (
self.manager_agent
and hasattr(self.manager_agent, "llm")
and hasattr(self.manager_agent.llm, "get_token_usage_summary")
):
if isinstance(self.manager_agent.llm, BaseLLM):
llm_usage = self.manager_agent.llm.get_token_usage_summary()
else:
llm_usage = self.manager_agent.llm._token_process.get_summary()
# Create per-agent metrics for manager
manager_metrics = AgentTokenMetrics(
agent_name=manager_role,
agent_id=manager_id,
total_tokens=token_sum.total_tokens,
prompt_tokens=token_sum.prompt_tokens,
cached_prompt_tokens=token_sum.cached_prompt_tokens,
completion_tokens=token_sum.completion_tokens,
successful_requests=token_sum.successful_requests
)
workflow_metrics.per_agent[manager_role] = manager_metrics
total_usage_metrics.add_usage_metrics(llm_usage)
if (
hasattr(self.manager_agent, "llm")
and hasattr(self.manager_agent.llm, "get_token_usage_summary")
):
if isinstance(self.manager_agent.llm, BaseLLM):
llm_usage = self.manager_agent.llm.get_token_usage_summary()
else:
llm_usage = self.manager_agent.llm._token_process.get_summary()
total_usage_metrics.add_usage_metrics(llm_usage)
# Update or create manager metrics
if manager_role in workflow_metrics.per_agent:
workflow_metrics.per_agent[manager_role].total_tokens += llm_usage.total_tokens
workflow_metrics.per_agent[manager_role].prompt_tokens += llm_usage.prompt_tokens
workflow_metrics.per_agent[manager_role].cached_prompt_tokens += llm_usage.cached_prompt_tokens
workflow_metrics.per_agent[manager_role].completion_tokens += llm_usage.completion_tokens
workflow_metrics.per_agent[manager_role].successful_requests += llm_usage.successful_requests
else:
manager_metrics = AgentTokenMetrics(
agent_name=manager_role,
agent_id=manager_id,
total_tokens=llm_usage.total_tokens,
prompt_tokens=llm_usage.prompt_tokens,
cached_prompt_tokens=llm_usage.cached_prompt_tokens,
completion_tokens=llm_usage.completion_tokens,
successful_requests=llm_usage.successful_requests
)
workflow_metrics.per_agent[manager_role] = manager_metrics
# Set workflow-level totals
workflow_metrics.total_tokens = total_usage_metrics.total_tokens
workflow_metrics.prompt_tokens = total_usage_metrics.prompt_tokens
workflow_metrics.cached_prompt_tokens = total_usage_metrics.cached_prompt_tokens
workflow_metrics.completion_tokens = total_usage_metrics.completion_tokens
workflow_metrics.successful_requests = total_usage_metrics.successful_requests
# Store workflow metrics (preserving per_task data)
self.workflow_token_metrics = workflow_metrics
self.usage_metrics = total_usage_metrics
return total_usage_metrics
@@ -2127,56 +1918,3 @@ To enable tracing, do any one of these:
padding=(1, 2),
)
console.print(panel)
def _get_agent_token_usage(self, agent: BaseAgent | None) -> UsageMetrics:
"""Get current token usage for an agent."""
if not agent:
return UsageMetrics()
if isinstance(agent.llm, BaseLLM):
return agent.llm.get_token_usage_summary()
if hasattr(agent, "_token_process"):
return agent._token_process.get_summary()
return UsageMetrics()
def _attach_task_token_metrics(
self,
task_output: TaskOutput,
task: Task,
agent: BaseAgent | None,
tokens_before: UsageMetrics,
tokens_after: UsageMetrics
) -> TaskOutput:
"""Attach per-task token metrics to the task output."""
from crewai.types.usage_metrics import TaskTokenMetrics
if not agent:
return task_output
# Calculate the delta (tokens used by this specific task)
task_tokens = TaskTokenMetrics(
task_name=getattr(task, 'name', None) or task.description[:50],
task_id=str(getattr(task, 'id', '')),
agent_name=getattr(agent, 'role', 'Unknown Agent'),
total_tokens=tokens_after.total_tokens - tokens_before.total_tokens,
prompt_tokens=tokens_after.prompt_tokens - tokens_before.prompt_tokens,
cached_prompt_tokens=tokens_after.cached_prompt_tokens - tokens_before.cached_prompt_tokens,
completion_tokens=tokens_after.completion_tokens - tokens_before.completion_tokens,
successful_requests=tokens_after.successful_requests - tokens_before.successful_requests
)
# Attach to task output
task_output.usage_metrics = task_tokens
# Store in workflow metrics
if not hasattr(self, 'workflow_token_metrics') or self.workflow_token_metrics is None:
from crewai.types.usage_metrics import WorkflowTokenMetrics
self.workflow_token_metrics = WorkflowTokenMetrics()
# Use task_id in the key to prevent collision when multiple tasks have the same name
task_key = f"{task_tokens.task_id}_{task_tokens.task_name}_{task_tokens.agent_name}"
self.workflow_token_metrics.per_task[task_key] = task_tokens
return task_output

View File

@@ -7,7 +7,7 @@ from pydantic import BaseModel, Field
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics, WorkflowTokenMetrics
from crewai.types.usage_metrics import UsageMetrics
class CrewOutput(BaseModel):
@@ -26,10 +26,6 @@ class CrewOutput(BaseModel):
token_usage: UsageMetrics = Field(
description="Processed token summary", default_factory=UsageMetrics
)
token_metrics: WorkflowTokenMetrics | None = Field(
description="Detailed per-agent and per-task token metrics",
default=None
)
@property
def json(self) -> str | None: # type: ignore[override]

View File

@@ -1,6 +1,5 @@
from __future__ import annotations
from collections.abc import Callable
from concurrent.futures import Future
from copy import copy as shallow_copy
import datetime
@@ -477,34 +476,13 @@ class Task(BaseModel):
agent: BaseAgent | None = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
token_capture_callback: Callable[[], Any] | None = None,
agent_execution_lock: threading.Lock | None = None,
) -> Future[TaskOutput | tuple[TaskOutput, Any, Any]]:
"""Execute the task asynchronously.
Args:
agent: The agent to execute the task.
context: Context for the task execution.
tools: Tools available for the task.
token_capture_callback: Optional callback to capture token usage.
If provided, the future will return a tuple of
(TaskOutput, tokens_before, tokens_after) instead of just TaskOutput.
The callback is called twice: once before task execution (after
acquiring the lock if one is provided) and once after task completion.
agent_execution_lock: Optional lock to serialize task execution for
the same agent. This is used to ensure accurate per-task token
tracking when multiple async tasks from the same agent run
concurrently.
Returns:
Future containing TaskOutput, or tuple of (TaskOutput, tokens_before, tokens_after)
if token_capture_callback is provided.
"""
future: Future[TaskOutput | tuple[TaskOutput, Any, Any]] = Future()
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
threading.Thread(
daemon=True,
target=self._execute_task_async,
args=(agent, context, tools, future, token_capture_callback, agent_execution_lock),
args=(agent, context, tools, future),
).start()
return future
@@ -513,45 +491,14 @@ class Task(BaseModel):
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
future: Future[TaskOutput | tuple[TaskOutput, Any, Any]],
token_capture_callback: Callable[[], Any] | None = None,
agent_execution_lock: threading.Lock | None = None,
future: Future[TaskOutput],
) -> None:
"""Execute the task asynchronously with context handling.
If agent_execution_lock is provided, the task execution will be
serialized with other tasks using the same lock. This ensures
accurate per-task token tracking by:
1. Capturing tokens_before after acquiring the lock
2. Executing the task
3. Capturing tokens_after immediately after completion
4. Releasing the lock
If token_capture_callback is provided, it will be called twice:
once before task execution and once after, both while holding the lock.
"""
"""Execute the task asynchronously with context handling."""
try:
if agent_execution_lock:
with agent_execution_lock:
if token_capture_callback:
tokens_before = token_capture_callback()
result = self._execute_core(agent, context, tools)
if token_capture_callback:
tokens_after = token_capture_callback()
future.set_result((result, tokens_before, tokens_after))
else:
future.set_result(result)
else:
if token_capture_callback:
tokens_before = token_capture_callback()
result = self._execute_core(agent, context, tools)
if token_capture_callback:
tokens_after = token_capture_callback()
future.set_result((result, tokens_before, tokens_after))
else:
future.set_result(result)
result = self._execute_core(agent, context, tools)
future.set_result(result)
except Exception as e:
future.set_exception(e)
future.set_exception(e)
async def aexecute_sync(
self,

View File

@@ -6,7 +6,6 @@ from typing import Any
from pydantic import BaseModel, Field, model_validator
from crewai.tasks.output_format import OutputFormat
from crewai.types.usage_metrics import TaskTokenMetrics
from crewai.utilities.types import LLMMessage
@@ -23,7 +22,6 @@ class TaskOutput(BaseModel):
json_dict: JSON dictionary output of the task
agent: Agent that executed the task
output_format: Output format of the task (JSON, PYDANTIC, or RAW)
usage_metrics: Token usage metrics for this specific task
"""
description: str = Field(description="Description of the task")
@@ -44,10 +42,6 @@ class TaskOutput(BaseModel):
description="Output format of the task", default=OutputFormat.RAW
)
messages: list[LLMMessage] = Field(description="Messages of the task", default=[])
usage_metrics: TaskTokenMetrics | None = Field(
description="Token usage metrics for this task",
default=None
)
@model_validator(mode="after")
def set_summary(self):

View File

@@ -44,74 +44,3 @@ class UsageMetrics(BaseModel):
self.cached_prompt_tokens += usage_metrics.cached_prompt_tokens
self.completion_tokens += usage_metrics.completion_tokens
self.successful_requests += usage_metrics.successful_requests
class AgentTokenMetrics(BaseModel):
"""Token usage metrics for a specific agent.
Attributes:
agent_name: Name/role of the agent
agent_id: Unique identifier for the agent
total_tokens: Total tokens used by this agent
prompt_tokens: Prompt tokens used by this agent
completion_tokens: Completion tokens used by this agent
successful_requests: Number of successful LLM requests
"""
agent_name: str = Field(description="Name/role of the agent")
agent_id: str | None = Field(default=None, description="Unique identifier for the agent")
total_tokens: int = Field(default=0, description="Total tokens used by this agent")
prompt_tokens: int = Field(default=0, description="Prompt tokens used by this agent")
cached_prompt_tokens: int = Field(default=0, description="Cached prompt tokens used by this agent")
completion_tokens: int = Field(default=0, description="Completion tokens used by this agent")
successful_requests: int = Field(default=0, description="Number of successful LLM requests")
class TaskTokenMetrics(BaseModel):
"""Token usage metrics for a specific task.
Attributes:
task_name: Name of the task
task_id: Unique identifier for the task
agent_name: Name of the agent that executed the task
total_tokens: Total tokens used for this task
prompt_tokens: Prompt tokens used for this task
completion_tokens: Completion tokens used for this task
successful_requests: Number of successful LLM requests
"""
task_name: str = Field(description="Name of the task")
task_id: str | None = Field(default=None, description="Unique identifier for the task")
agent_name: str = Field(description="Name of the agent that executed the task")
total_tokens: int = Field(default=0, description="Total tokens used for this task")
prompt_tokens: int = Field(default=0, description="Prompt tokens used for this task")
cached_prompt_tokens: int = Field(default=0, description="Cached prompt tokens used for this task")
completion_tokens: int = Field(default=0, description="Completion tokens used for this task")
successful_requests: int = Field(default=0, description="Number of successful LLM requests")
class WorkflowTokenMetrics(BaseModel):
"""Complete token usage metrics for a crew workflow.
Attributes:
total_tokens: Total tokens used across entire workflow
prompt_tokens: Total prompt tokens used
completion_tokens: Total completion tokens used
successful_requests: Total successful requests
per_agent: Dictionary mapping agent names to their token metrics
per_task: Dictionary mapping task names to their token metrics
"""
total_tokens: int = Field(default=0, description="Total tokens used across entire workflow")
prompt_tokens: int = Field(default=0, description="Total prompt tokens used")
cached_prompt_tokens: int = Field(default=0, description="Total cached prompt tokens used")
completion_tokens: int = Field(default=0, description="Total completion tokens used")
successful_requests: int = Field(default=0, description="Total successful requests")
per_agent: dict[str, AgentTokenMetrics] = Field(
default_factory=dict,
description="Token metrics per agent"
)
per_task: dict[str, TaskTokenMetrics] = Field(
default_factory=dict,
description="Token metrics per task"
)

View File

@@ -0,0 +1,305 @@
"""Test A2A delegation to multiple endpoints sequentially.
This test file covers the bug fix for issue #4166 where delegating to a second
A2A agent fails because the task_id from the first agent is in "completed" state.
"""
from unittest.mock import MagicMock, patch
import pytest
from crewai.a2a.config import A2AConfig
try:
from a2a.types import Message, Part, Role, TextPart
A2A_SDK_INSTALLED = True
except ImportError:
A2A_SDK_INSTALLED = False
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_sequential_delegation_to_multiple_endpoints_uses_separate_task_ids():
"""When delegating to multiple A2A endpoints sequentially, each should get a unique task_id.
This test verifies the fix for issue #4166 where the second A2A delegation
fails with 'Task is in terminal state: completed' because the task_id from
the first delegation was being reused.
"""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai import Agent, Task
# Configure agent with two A2A endpoints
a2a_configs = [
A2AConfig(
endpoint="http://endpoint-a.com",
trust_remote_completion_status=True,
),
A2AConfig(
endpoint="http://endpoint-b.com",
trust_remote_completion_status=True,
),
]
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_configs,
)
task = Task(description="test", expected_output="test", agent=agent)
# First delegation to endpoint A
class MockResponseA:
is_a2a = True
message = "Please help with task A"
a2a_ids = ["http://endpoint-a.com/"]
# Second delegation to endpoint B
class MockResponseB:
is_a2a = True
message = "Please help with task B"
a2a_ids = ["http://endpoint-b.com/"]
task_ids_used = []
def mock_execute_a2a_delegation(**kwargs):
"""Track the task_id used for each delegation."""
task_ids_used.append(kwargs.get("task_id"))
endpoint = kwargs.get("endpoint")
# Create a mock message with a task_id
mock_message = MagicMock()
mock_message.task_id = f"task-id-for-{endpoint}"
mock_message.context_id = None
return {
"status": "completed",
"result": f"Done by {endpoint}",
"history": [mock_message],
}
with (
patch(
"crewai.a2a.wrapper.execute_a2a_delegation",
side_effect=mock_execute_a2a_delegation,
) as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card_a = MagicMock()
mock_card_a.name = "Agent A"
mock_card_b = MagicMock()
mock_card_b.name = "Agent B"
mock_fetch.return_value = (
{
"http://endpoint-a.com/": mock_card_a,
"http://endpoint-b.com/": mock_card_b,
},
{},
)
# First delegation to endpoint A
result_a = _delegate_to_a2a(
self=agent,
agent_response=MockResponseA(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={
"http://endpoint-a.com/": mock_card_a,
"http://endpoint-b.com/": mock_card_b,
},
original_task_description="test",
)
assert result_a == "Done by http://endpoint-a.com/"
# Second delegation to endpoint B
result_b = _delegate_to_a2a(
self=agent,
agent_response=MockResponseB(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={
"http://endpoint-a.com/": mock_card_a,
"http://endpoint-b.com/": mock_card_b,
},
original_task_description="test",
)
assert result_b == "Done by http://endpoint-b.com/"
# Verify that the second delegation used a different (None) task_id
# The first call should have task_id=None (no prior task_id for endpoint A)
# The second call should also have task_id=None (no prior task_id for endpoint B)
assert len(task_ids_used) == 2
assert task_ids_used[0] is None # First delegation to endpoint A
assert task_ids_used[1] is None # Second delegation to endpoint B (not reusing A's task_id)
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_completed_task_ids_are_not_persisted_for_reuse():
"""Completed task IDs should NOT be persisted for reuse.
The A2A protocol rejects task IDs that are in terminal state (completed/failed).
This test verifies that completed task IDs are not stored in task.config
for future delegations, so each new delegation gets a fresh task_id.
"""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai import Agent, Task
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=True,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
task = Task(description="test", expected_output="test", agent=agent)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
task_ids_used = []
def mock_execute_a2a_delegation(**kwargs):
"""Track the task_id used for each call."""
task_ids_used.append(kwargs.get("task_id"))
# Create a mock message with a task_id
mock_message = MagicMock()
mock_message.task_id = "completed-task-id"
mock_message.context_id = None
return {
"status": "completed",
"result": "Done",
"history": [mock_message],
}
with (
patch(
"crewai.a2a.wrapper.execute_a2a_delegation",
side_effect=mock_execute_a2a_delegation,
),
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# First delegation
_delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
# Verify that completed task IDs are NOT stored in a2a_task_ids_by_endpoint
# because they can't be reused (A2A protocol rejects terminal state task IDs)
if task.config is not None:
a2a_task_ids = task.config.get("a2a_task_ids_by_endpoint", {})
# The endpoint should NOT have a stored task_id since it completed
assert "http://test-endpoint.com/" not in a2a_task_ids
# Second delegation to the SAME endpoint should also get a fresh task_id
_delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
# Verify that BOTH calls used None as task_id (fresh task for each)
# because completed task IDs are not persisted
assert len(task_ids_used) == 2
assert task_ids_used[0] is None # First call - new conversation
assert task_ids_used[1] is None # Second call - also new (completed IDs not reused)
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_reference_task_ids_are_tracked_for_completed_tasks():
"""Completed task IDs should be added to reference_task_ids for tracking.
While completed task IDs can't be reused for new delegations, they should
still be tracked in reference_task_ids for context/history purposes.
"""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai import Agent, Task
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=True,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
task = Task(description="test", expected_output="test", agent=agent)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
def mock_execute_a2a_delegation(**kwargs):
mock_message = MagicMock()
mock_message.task_id = "unique-task-id-123"
mock_message.context_id = None
return {
"status": "completed",
"result": "Done",
"history": [mock_message],
}
with (
patch(
"crewai.a2a.wrapper.execute_a2a_delegation",
side_effect=mock_execute_a2a_delegation,
),
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
_delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
# Verify the completed task_id is tracked in reference_task_ids
assert task.config is not None
assert "reference_task_ids" in task.config
assert "unique-task-id-123" in task.config["reference_task_ids"]

View File

@@ -4768,220 +4768,3 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory():
assert "Researcher" in messages[0]["content"]
assert messages[1]["role"] == "user"
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]
def test_async_task_token_tracking_uses_per_agent_lock():
"""Test that async tasks from the same agent use per-agent locks for accurate token tracking.
This test verifies the fix for the race condition described in issue #4168:
When multiple tasks with async_execution=True are executed by the same agent,
the per-agent lock ensures that token tracking is accurate by serializing
task execution and capturing tokens_before/tokens_after inside the thread.
"""
from crewai.types.usage_metrics import TaskTokenMetrics
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="You are a researcher",
allow_delegation=False,
)
task1 = Task(
description="Research topic 1",
expected_output="Research output 1",
agent=agent,
async_execution=True,
)
task2 = Task(
description="Research topic 2",
expected_output="Research output 2",
agent=agent,
async_execution=True,
)
task3 = Task(
description="Summarize research",
expected_output="Summary",
agent=agent,
async_execution=False,
)
crew = Crew(agents=[agent], tasks=[task1, task2, task3])
mock_output = TaskOutput(
description="Test output",
raw="Test result",
agent="Researcher",
)
execution_order = []
lock_acquisitions = []
original_execute_core = Task._execute_core
def mock_execute_core(self, agent, context, tools):
execution_order.append(self.description)
return mock_output
with patch.object(Task, "_execute_core", mock_execute_core):
with patch.object(
crew,
"_get_agent_token_usage",
side_effect=[
UsageMetrics(total_tokens=100, prompt_tokens=80, completion_tokens=20, successful_requests=1),
UsageMetrics(total_tokens=150, prompt_tokens=120, completion_tokens=30, successful_requests=2),
UsageMetrics(total_tokens=150, prompt_tokens=120, completion_tokens=30, successful_requests=2),
UsageMetrics(total_tokens=200, prompt_tokens=160, completion_tokens=40, successful_requests=3),
UsageMetrics(total_tokens=200, prompt_tokens=160, completion_tokens=40, successful_requests=3),
UsageMetrics(total_tokens=250, prompt_tokens=200, completion_tokens=50, successful_requests=4),
]
):
result = crew.kickoff()
assert len(result.tasks_output) == 3
for task_output in result.tasks_output:
if hasattr(task_output, 'usage_metrics') and task_output.usage_metrics:
assert isinstance(task_output.usage_metrics, TaskTokenMetrics)
def test_async_task_token_callback_captures_tokens_inside_thread():
"""Test that token capture callback is called inside the thread for async tasks.
This verifies that tokens_before and tokens_after are captured inside the thread
(after acquiring the lock), not when the task is queued.
"""
from concurrent.futures import Future
import time
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="You are a researcher",
allow_delegation=False,
)
task = Task(
description="Research topic",
expected_output="Research output",
agent=agent,
)
callback_call_times = []
callback_thread_ids = []
main_thread_id = threading.current_thread().ident
def token_callback():
callback_call_times.append(time.time())
callback_thread_ids.append(threading.current_thread().ident)
return UsageMetrics(total_tokens=100, prompt_tokens=80, completion_tokens=20, successful_requests=1)
mock_output = TaskOutput(
description="Test output",
raw="Test result",
agent="Researcher",
)
with patch.object(Task, "_execute_core", return_value=mock_output):
lock = threading.Lock()
future = task.execute_async(
agent=agent,
context=None,
tools=None,
token_capture_callback=token_callback,
agent_execution_lock=lock,
)
result = future.result(timeout=10)
assert isinstance(result, tuple)
assert len(result) == 3
task_output, tokens_before, tokens_after = result
assert len(callback_call_times) == 2
assert len(callback_thread_ids) == 2
for thread_id in callback_thread_ids:
assert thread_id != main_thread_id
assert callback_thread_ids[0] == callback_thread_ids[1]
def test_async_task_per_agent_lock_serializes_execution():
"""Test that per-agent lock serializes async task execution for the same agent.
This test verifies that when multiple async tasks from the same agent are executed,
the per-agent lock ensures they run one at a time (serialized), not concurrently.
"""
import time
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="You are a researcher",
allow_delegation=False,
)
task1 = Task(
description="Research topic 1",
expected_output="Research output 1",
agent=agent,
)
task2 = Task(
description="Research topic 2",
expected_output="Research output 2",
agent=agent,
)
execution_times = []
mock_output = TaskOutput(
description="Test output",
raw="Test result",
agent="Researcher",
)
def slow_execute_core(self, agent, context, tools):
start_time = time.time()
time.sleep(0.1)
end_time = time.time()
execution_times.append((start_time, end_time))
return mock_output
with patch.object(Task, "_execute_core", slow_execute_core):
lock = threading.Lock()
def token_callback():
return UsageMetrics(total_tokens=100, prompt_tokens=80, completion_tokens=20, successful_requests=1)
future1 = task1.execute_async(
agent=agent,
context=None,
tools=None,
token_capture_callback=token_callback,
agent_execution_lock=lock,
)
future2 = task2.execute_async(
agent=agent,
context=None,
tools=None,
token_capture_callback=token_callback,
agent_execution_lock=lock,
)
result1 = future1.result(timeout=10)
result2 = future2.result(timeout=10)
assert len(execution_times) == 2
start1, end1 = execution_times[0]
start2, end2 = execution_times[1]
if start1 < start2:
assert end1 <= start2, "Tasks should not overlap when using the same lock"
else:
assert end2 <= start1, "Tasks should not overlap when using the same lock"