From 1c4f44af808c32fff8fb01c3a23bc0594d365d3e Mon Sep 17 00:00:00 2001 From: Vidit Ostwal <110953813+Vidit-Ostwal@users.noreply.github.com> Date: Thu, 8 Jan 2026 00:12:27 +0530 Subject: [PATCH 1/3] Adding usage info in llm.py (#4172) * Adding usage info everywhere * Changing the check * Changing the logic * Adding tests * Adding casellets * Minor change * Fixing testcase * remove the duplicated test case, thanks to cursor * Adding async test cases * Updating test case --------- Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> --- lib/crewai/src/crewai/llm.py | 28 ++- ...t_usage_info_non_streaming_with_acall.yaml | 113 +++++++++++ ...st_usage_info_non_streaming_with_call.yaml | 113 +++++++++++ .../test_usage_info_streaming_with_acall.yaml | 179 ++++++++++++++++++ .../test_usage_info_streaming_with_call.yaml | 179 ++++++++++++++++++ lib/crewai/tests/test_llm.py | 113 +++++++++++ 6 files changed, 715 insertions(+), 10 deletions(-) create mode 100644 lib/crewai/tests/cassettes/test_usage_info_non_streaming_with_acall.yaml create mode 100644 lib/crewai/tests/cassettes/test_usage_info_non_streaming_with_call.yaml create mode 100644 lib/crewai/tests/cassettes/test_usage_info_streaming_with_acall.yaml create mode 100644 lib/crewai/tests/cassettes/test_usage_info_streaming_with_call.yaml diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index 77053deeb..8bc1fe648 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -925,11 +925,12 @@ class LLM(BaseLLM): except Exception as e: logging.debug(f"Error checking for tool calls: {e}") + # Track token usage and log callbacks if available in streaming mode + if usage_info: + self._track_token_usage_internal(usage_info) + self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) + if not tool_calls or not available_functions: - # Track token usage and log callbacks if available in streaming mode - if usage_info: - self._track_token_usage_internal(usage_info) - self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) if response_model and self.is_litellm: instructor_instance = InternalInstructor( @@ -962,12 +963,7 @@ class LLM(BaseLLM): if tool_result is not None: return tool_result - # --- 10) Track token usage and log callbacks if available in streaming mode - if usage_info: - self._track_token_usage_internal(usage_info) - self._handle_streaming_callbacks(callbacks, usage_info, last_chunk) - - # --- 11) Emit completion event and return response + # --- 10) Emit completion event and return response self._handle_emit_call_events( response=full_response, call_type=LLMCallType.LLM_CALL, @@ -1148,6 +1144,10 @@ class LLM(BaseLLM): if response_model: params["response_model"] = response_model response = litellm.completion(**params) + + if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage: + usage_info = response.usage + self._track_token_usage_internal(usage_info) except ContextWindowExceededError as e: # Convert litellm's context window error to our own exception type @@ -1273,6 +1273,10 @@ class LLM(BaseLLM): params["response_model"] = response_model response = await litellm.acompletion(**params) + if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage: + usage_info = response.usage + self._track_token_usage_internal(usage_info) + except ContextWindowExceededError as e: raise LLMContextLengthExceededError(str(e)) from e @@ -1359,6 +1363,7 @@ class LLM(BaseLLM): """ full_response = "" chunk_count = 0 + usage_info = None accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict( @@ -1444,6 +1449,9 @@ class LLM(BaseLLM): end_time=0, ) + if usage_info: + self._track_token_usage_internal(usage_info) + if accumulated_tool_args and available_functions: # Convert accumulated tool args to ChatCompletionDeltaToolCall objects tool_calls_list: list[ChatCompletionDeltaToolCall] = [ diff --git a/lib/crewai/tests/cassettes/test_usage_info_non_streaming_with_acall.yaml b/lib/crewai/tests/cassettes/test_usage_info_non_streaming_with_acall.yaml new file mode 100644 index 000000000..b123e9f99 --- /dev/null +++ b/lib/crewai/tests/cassettes/test_usage_info_non_streaming_with_acall.yaml @@ -0,0 +1,113 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Tell me a joke."}],"model":"gpt-4o-mini","stop":[]}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '90' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - async:asyncio + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 2.14.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.14 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-CvErx9mbnUKFHKkhPChO93eUzKJqy\",\n \"object\": + \"chat.completion\",\n \"created\": 1767757889,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Why did the scarecrow win an award? + \\n\\nBecause he was outstanding in his field!\",\n \"refusal\": null,\n + \ \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": + \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 12,\n \"completion_tokens\": + 18,\n \"total_tokens\": 30,\n \"prompt_tokens_details\": {\n \"cached_tokens\": + 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": + \"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n" + headers: + Access-Control-Expose-Headers: + - ACCESS-CONTROL-XXX + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Wed, 07 Jan 2026 03:51:29 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + content-length: + - '887' + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '466' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '483' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/test_usage_info_non_streaming_with_call.yaml b/lib/crewai/tests/cassettes/test_usage_info_non_streaming_with_call.yaml new file mode 100644 index 000000000..ec0f25047 --- /dev/null +++ b/lib/crewai/tests/cassettes/test_usage_info_non_streaming_with_call.yaml @@ -0,0 +1,113 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Tell me a joke."}],"model":"gpt-4o-mini","stop":[]}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '90' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 2.14.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.14 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-CugAsv9iAHdiGddGDHcZWEp7ZV7cB\",\n \"object\": + \"chat.completion\",\n \"created\": 1767624522,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Why don't skeletons fight each other? + \\n\\nThey don't have the guts!\",\n \"refusal\": null,\n \"annotations\": + []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n + \ }\n ],\n \"usage\": {\n \"prompt_tokens\": 12,\n \"completion_tokens\": + 15,\n \"total_tokens\": 27,\n \"prompt_tokens_details\": {\n \"cached_tokens\": + 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": + \"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n" + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Mon, 05 Jan 2026 14:48:43 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + content-length: + - '874' + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '424' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '1017' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/test_usage_info_streaming_with_acall.yaml b/lib/crewai/tests/cassettes/test_usage_info_streaming_with_acall.yaml new file mode 100644 index 000000000..c2257d581 --- /dev/null +++ b/lib/crewai/tests/cassettes/test_usage_info_streaming_with_acall.yaml @@ -0,0 +1,179 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Tell me a joke."}],"model":"gpt-4o-mini","stop":[],"stream":true,"stream_options":{"include_usage":true}}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '144' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - async:asyncio + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 2.14.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.14 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: 'data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"k9LESwMhk"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Why"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"tYMBX9z8"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + did"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"X5lpC48"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + the"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"Ns5pnmO"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + scare"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"cUTYl"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"crow"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ZvHPszH"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + win"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"pLKQ5rM"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + an"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"Yl8vxgvM"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + award"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"xfxd0"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SFxdiZP3Uh"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + \n\n"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"Sysruv"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Because"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"OeZH"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + he"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"epBJpPYm"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + was"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"5Bofkug"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + outstanding"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ylIDIBTCqSLy3tA"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + in"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"lLi2lQc4"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + his"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"fi47Jij"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + field"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"Kkiyw"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"RMcUfqa93e"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"rAtJI"} + + + data: {"id":"chatcmpl-CvF96exJN1ZmQQ0zfOWhGs2UqetwZ","object":"chat.completion.chunk","created":1767758952,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":12,"completion_tokens":18,"total_tokens":30,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"aTyTRaiahL"} + + + data: [DONE] + + + ' + headers: + Access-Control-Expose-Headers: + - ACCESS-CONTROL-XXX + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 07 Jan 2026 04:09:13 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '243' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '645' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/test_usage_info_streaming_with_call.yaml b/lib/crewai/tests/cassettes/test_usage_info_streaming_with_call.yaml new file mode 100644 index 000000000..dc10b77ec --- /dev/null +++ b/lib/crewai/tests/cassettes/test_usage_info_streaming_with_call.yaml @@ -0,0 +1,179 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Tell me a joke."}],"model":"gpt-4o-mini","stop":[],"stream":true,"stream_options":{"include_usage":true}}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '144' + content-type: + - application/json + cookie: + - COOKIE-XXX + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 2.14.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.11.14 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: 'data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SVnFynat2"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Why"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"M0Y4Qurw"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + did"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"LknkzkM"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + the"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"45ePnqI"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + scare"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"DsJ1r"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"crow"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"9sXjMg0"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + win"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"UlTRXCu"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + an"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"He218dPh"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + award"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"CO1Dc"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"nHS3XxEjuW"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + \n\n"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"IhBQDR"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Because"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"TJzX"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + he"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"AjRyStfn"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + was"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"2AZtzyA"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + outstanding"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"XfziOItr8wziIap"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + in"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"7hXp54s6"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + his"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"RPmgnK3"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + field"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"uqtNk"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"Wziup4uj7N"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"q9paY"} + + + data: {"id":"chatcmpl-CugAuE9ctOkFjqIbmxWZpxeNX7gWt","object":"chat.completion.chunk","created":1767624524,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":12,"completion_tokens":18,"total_tokens":30,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"TWmOWpZx0s"} + + + data: [DONE] + + + ' + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Mon, 05 Jan 2026 14:48:44 GMT + Server: + - cloudflare + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '227' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '645' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/test_llm.py b/lib/crewai/tests/test_llm.py index 6f3bcd70a..a8b6a7a3f 100644 --- a/lib/crewai/tests/test_llm.py +++ b/lib/crewai/tests/test_llm.py @@ -877,3 +877,116 @@ def test_validate_model_in_constants(): LLM._validate_model_in_constants("anthropic.claude-future-v1:0", "bedrock") is True ) + +@pytest.mark.vcr(record_mode="once",decode_compressed_response=True) +def test_usage_info_non_streaming_with_call(): + llm = LLM(model="gpt-4o-mini", is_litellm=True) + assert llm._token_usage == { + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "successful_requests": 0, + "cached_prompt_tokens": 0, + } + assert llm.stream is False + + with patch.object( + llm, "_handle_non_streaming_response", wraps=llm._handle_non_streaming_response + ) as mock_handle: + llm.call("Tell me a joke.") + mock_handle.assert_called_once() + + assert llm._token_usage["total_tokens"] > 0 + assert llm._token_usage["prompt_tokens"] > 0 + assert llm._token_usage["completion_tokens"] > 0 + assert llm._token_usage["successful_requests"] == 1 + + +@pytest.mark.vcr(record_mode="once",decode_compressed_response=True) +def test_usage_info_streaming_with_call(): + llm = LLM(model="gpt-4o-mini", is_litellm=True, stream=True) + assert llm._token_usage == { + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "successful_requests": 0, + "cached_prompt_tokens": 0, + } + assert llm.stream is True + + with patch.object( + llm, "_handle_streaming_response", wraps=llm._handle_streaming_response + ) as mock_handle: + llm.call("Tell me a joke.") + mock_handle.assert_called_once() + + assert llm._token_usage["total_tokens"] > 0 + assert llm._token_usage["prompt_tokens"] > 0 + assert llm._token_usage["completion_tokens"] > 0 + assert llm._token_usage["successful_requests"] == 1 + + +@pytest.mark.asyncio +@pytest.mark.vcr(record_mode="once",decode_compressed_response=True,match_on=["method", "scheme", "host", "path", "body"]) +async def test_usage_info_non_streaming_with_acall(): + llm = LLM( + model="openai/gpt-4o-mini", + is_litellm=True, + stream=False, + ) + + # sanity check + assert llm._token_usage == { + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "successful_requests": 0, + "cached_prompt_tokens": 0, + } + + with patch.object( + llm, "_ahandle_non_streaming_response", wraps=llm._ahandle_non_streaming_response + ) as mock_handle: + result = await llm.acall("Tell me a joke.") + mock_handle.assert_called_once() + + # token usage assertions (robust) + assert llm._token_usage["successful_requests"] == 1 + assert llm._token_usage["prompt_tokens"] > 0 + assert llm._token_usage["completion_tokens"] > 0 + assert llm._token_usage["total_tokens"] > 0 + + assert len(result) > 0 + + +@pytest.mark.asyncio +@pytest.mark.vcr(record_mode="none",decode_compressed_response=True,match_on=["method", "scheme", "host", "path", "body"]) +async def test_usage_info_streaming_with_acall(): + llm = LLM( + model="gpt-4o-mini", + is_litellm=True, + stream=True, + ) + + assert llm.stream is True + assert llm._token_usage == { + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "successful_requests": 0, + "cached_prompt_tokens": 0, + } + + with patch.object( + llm, "_ahandle_streaming_response", wraps=llm._ahandle_streaming_response + ) as mock_handle: + result = await llm.acall("Tell me a joke.") + mock_handle.assert_called_once() + + + assert llm._token_usage["successful_requests"] == 1 + assert llm._token_usage["prompt_tokens"] > 0 + assert llm._token_usage["completion_tokens"] > 0 + assert llm._token_usage["total_tokens"] > 0 + + assert len(result) > 0 \ No newline at end of file From a760923c50817ae1fd497e069eb68babaf30de1d Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 7 Jan 2026 14:01:36 -0500 Subject: [PATCH 2/3] fix: handle missing a2a dep as optional --- lib/crewai/src/crewai/a2a/__init__.py | 10 ---------- lib/crewai/src/crewai/a2a/config.py | 17 ++++++++++++++--- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/lib/crewai/src/crewai/a2a/__init__.py b/lib/crewai/src/crewai/a2a/__init__.py index e5065ddd9..288e805f6 100644 --- a/lib/crewai/src/crewai/a2a/__init__.py +++ b/lib/crewai/src/crewai/a2a/__init__.py @@ -1,18 +1,8 @@ """Agent-to-Agent (A2A) protocol communication module for CrewAI.""" from crewai.a2a.config import A2AConfig -from crewai.a2a.errors import A2APollingTimeoutError -from crewai.a2a.updates import ( - PollingConfig, - PushNotificationConfig, - StreamingConfig, -) __all__ = [ "A2AConfig", - "A2APollingTimeoutError", - "PollingConfig", - "PushNotificationConfig", - "StreamingConfig", ] diff --git a/lib/crewai/src/crewai/a2a/config.py b/lib/crewai/src/crewai/a2a/config.py index d7ac85bae..26d3ce121 100644 --- a/lib/crewai/src/crewai/a2a/config.py +++ b/lib/crewai/src/crewai/a2a/config.py @@ -5,7 +5,7 @@ This module is separate from experimental.a2a to avoid circular imports. from __future__ import annotations -from typing import Annotated, ClassVar +from typing import Annotated, Any, ClassVar from pydantic import ( BaseModel, @@ -17,7 +17,12 @@ from pydantic import ( ) from crewai.a2a.auth.schemas import AuthScheme -from crewai.a2a.updates import StreamingConfig, UpdateConfig + + +try: + from crewai.a2a.updates import UpdateConfig +except ImportError: + UpdateConfig = Any # type: ignore[misc,assignment] http_url_adapter = TypeAdapter(HttpUrl) @@ -30,6 +35,12 @@ Url = Annotated[ ] +def _get_default_update_config() -> UpdateConfig: + from crewai.a2a.updates import StreamingConfig + + return StreamingConfig() + + class A2AConfig(BaseModel): """Configuration for A2A protocol integration. @@ -68,6 +79,6 @@ class A2AConfig(BaseModel): description="If True, return A2A result directly when completed", ) updates: UpdateConfig = Field( - default_factory=StreamingConfig, + default_factory=_get_default_update_config, description="Update mechanism config", ) From 67953b3a6a57d0f8b18474aea8f8835a5fcd99fd Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 7 Jan 2026 14:07:40 -0500 Subject: [PATCH 3/3] feat: a2a native async chain --- lib/crewai/src/crewai/a2a/types.py | 23 + lib/crewai/src/crewai/a2a/utils.py | 346 +++++++----- lib/crewai/src/crewai/a2a/wrapper.py | 792 +++++++++++++++++++++------ 3 files changed, 852 insertions(+), 309 deletions(-) diff --git a/lib/crewai/src/crewai/a2a/types.py b/lib/crewai/src/crewai/a2a/types.py index fca22d8bb..217b59467 100644 --- a/lib/crewai/src/crewai/a2a/types.py +++ b/lib/crewai/src/crewai/a2a/types.py @@ -4,6 +4,16 @@ from typing import Any, Literal, Protocol, TypedDict, runtime_checkable from typing_extensions import NotRequired +from crewai.a2a.updates import ( + PollingConfig, + PollingHandler, + PushNotificationConfig, + PushNotificationHandler, + StreamingConfig, + StreamingHandler, + UpdateConfig, +) + @runtime_checkable class AgentResponseProtocol(Protocol): @@ -36,3 +46,16 @@ class PartsDict(TypedDict): text: str metadata: NotRequired[PartsMetadataDict] + + +PollingHandlerType = type[PollingHandler] +StreamingHandlerType = type[StreamingHandler] +PushNotificationHandlerType = type[PushNotificationHandler] + +HandlerType = PollingHandlerType | StreamingHandlerType | PushNotificationHandlerType + +HANDLER_REGISTRY: dict[type[UpdateConfig], HandlerType] = { + PollingConfig: PollingHandler, + StreamingConfig: StreamingHandler, + PushNotificationConfig: PushNotificationHandler, +} diff --git a/lib/crewai/src/crewai/a2a/utils.py b/lib/crewai/src/crewai/a2a/utils.py index 42f5f44e0..4b3ba23e9 100644 --- a/lib/crewai/src/crewai/a2a/utils.py +++ b/lib/crewai/src/crewai/a2a/utils.py @@ -34,13 +34,15 @@ from crewai.a2a.auth.utils import ( ) from crewai.a2a.config import A2AConfig from crewai.a2a.task_helpers import TaskStateResult -from crewai.a2a.types import PartsDict, PartsMetadataDict +from crewai.a2a.types import ( + HANDLER_REGISTRY, + HandlerType, + PartsDict, + PartsMetadataDict, +) from crewai.a2a.updates import ( PollingConfig, - PollingHandler, PushNotificationConfig, - PushNotificationHandler, - StreamingConfig, StreamingHandler, UpdateConfig, ) @@ -60,17 +62,6 @@ if TYPE_CHECKING: from crewai.a2a.auth.schemas import AuthScheme -HandlerType = ( - type[PollingHandler] | type[StreamingHandler] | type[PushNotificationHandler] -) - -HANDLER_REGISTRY: dict[type[UpdateConfig], HandlerType] = { - PollingConfig: PollingHandler, - StreamingConfig: StreamingHandler, - PushNotificationConfig: PushNotificationHandler, -} - - def get_handler(config: UpdateConfig | None) -> HandlerType: """Get the handler class for a given update config. @@ -92,24 +83,14 @@ def _fetch_agent_card_cached( timeout: int, _ttl_hash: int, ) -> AgentCard: - """Cached version of fetch_agent_card with auth support. - - Args: - endpoint: A2A agent endpoint URL - auth_hash: Hash of the auth object - timeout: Request timeout - _ttl_hash: Time-based hash for cache invalidation - - Returns: - Cached AgentCard - """ + """Cached sync version of fetch_agent_card.""" auth = _auth_store.get(auth_hash) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete( - _fetch_agent_card_async(endpoint=endpoint, auth=auth, timeout=timeout) + _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) ) finally: loop.close() @@ -159,47 +140,74 @@ def fetch_agent_card( asyncio.set_event_loop(loop) try: return loop.run_until_complete( - _fetch_agent_card_async(endpoint=endpoint, auth=auth, timeout=timeout) + afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) ) finally: loop.close() +async def afetch_agent_card( + endpoint: str, + auth: AuthScheme | None = None, + timeout: int = 30, + use_cache: bool = True, +) -> AgentCard: + """Fetch AgentCard from an A2A endpoint asynchronously. + + Native async implementation. Use this when running in an async context. + + Args: + endpoint: A2A agent endpoint URL (AgentCard URL). + auth: Optional AuthScheme for authentication. + timeout: Request timeout in seconds. + use_cache: Whether to use caching (default True). + + Returns: + AgentCard object with agent capabilities and skills. + + Raises: + httpx.HTTPStatusError: If the request fails. + A2AClientHTTPError: If authentication fails. + """ + if use_cache: + if auth: + auth_data = auth.model_dump_json( + exclude={ + "_access_token", + "_token_expires_at", + "_refresh_token", + "_authorization_callback", + } + ) + auth_hash = hash((type(auth).__name__, auth_data)) + else: + auth_hash = 0 + _auth_store[auth_hash] = auth + agent_card: AgentCard = await _afetch_agent_card_cached( + endpoint, auth_hash, timeout + ) + return agent_card + + return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + + @cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator] -async def _fetch_agent_card_async_cached( +async def _afetch_agent_card_cached( endpoint: str, auth_hash: int, timeout: int, ) -> AgentCard: - """Cached async implementation of AgentCard fetching. - - Args: - endpoint: A2A agent endpoint URL - auth_hash: Hash of the auth object - timeout: Request timeout in seconds - - Returns: - Cached AgentCard object - """ + """Cached async implementation of AgentCard fetching.""" auth = _auth_store.get(auth_hash) - return await _fetch_agent_card_async(endpoint=endpoint, auth=auth, timeout=timeout) + return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) -async def _fetch_agent_card_async( +async def _afetch_agent_card_impl( endpoint: str, auth: AuthScheme | None, timeout: int, ) -> AgentCard: - """Async implementation of AgentCard fetching. - - Args: - endpoint: A2A agent endpoint URL - auth: Optional AuthScheme for authentication - timeout: Request timeout in seconds - - Returns: - AgentCard object - """ + """Internal async implementation of AgentCard fetching.""" if "/.well-known/agent-card.json" in endpoint: base_url = endpoint.replace("/.well-known/agent-card.json", "") agent_card_path = "/.well-known/agent-card.json" @@ -268,35 +276,114 @@ def execute_a2a_delegation( turn_number: int | None = None, updates: UpdateConfig | None = None, ) -> TaskStateResult: - """Execute a task delegation to a remote A2A agent with multi-turn support. + """Execute a task delegation to a remote A2A agent synchronously. + + This is the sync wrapper around aexecute_a2a_delegation. For async contexts, + use aexecute_a2a_delegation directly. Args: - endpoint: A2A agent endpoint URL - auth: Optional AuthScheme for authentication - timeout: Request timeout in seconds - task_description: The task to delegate - context: Optional context information - context_id: Context ID for correlating messages/tasks - task_id: Specific task identifier - reference_task_ids: List of related task IDs - metadata: Additional metadata - extensions: Protocol extensions for custom fields - conversation_history: Previous Message objects from conversation - agent_id: Agent identifier for logging - agent_role: Role of the CrewAI agent delegating the task - agent_branch: Optional agent tree branch for logging - response_model: Optional Pydantic model for structured outputs - turn_number: Optional turn number for multi-turn conversations - updates: Update mechanism config from A2AConfig.updates + endpoint: A2A agent endpoint URL. + auth: Optional AuthScheme for authentication. + timeout: Request timeout in seconds. + task_description: The task to delegate. + context: Optional context information. + context_id: Context ID for correlating messages/tasks. + task_id: Specific task identifier. + reference_task_ids: List of related task IDs. + metadata: Additional metadata. + extensions: Protocol extensions for custom fields. + conversation_history: Previous Message objects from conversation. + agent_id: Agent identifier for logging. + agent_role: Role of the CrewAI agent delegating the task. + agent_branch: Optional agent tree branch for logging. + response_model: Optional Pydantic model for structured outputs. + turn_number: Optional turn number for multi-turn conversations. + updates: Update mechanism config from A2AConfig.updates. Returns: - TaskStateResult with status, result/error, history, and agent_card + TaskStateResult with status, result/error, history, and agent_card. """ - is_multiturn = bool(conversation_history and len(conversation_history) > 0) - if turn_number is None: - turn_number = ( - len([m for m in (conversation_history or []) if m.role == Role.user]) + 1 + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete( + aexecute_a2a_delegation( + endpoint=endpoint, + auth=auth, + timeout=timeout, + task_description=task_description, + context=context, + context_id=context_id, + task_id=task_id, + reference_task_ids=reference_task_ids, + metadata=metadata, + extensions=extensions, + conversation_history=conversation_history, + agent_id=agent_id, + agent_role=agent_role, + agent_branch=agent_branch, + response_model=response_model, + turn_number=turn_number, + updates=updates, + ) ) + finally: + loop.close() + + +async def aexecute_a2a_delegation( + endpoint: str, + auth: AuthScheme | None, + timeout: int, + task_description: str, + context: str | None = None, + context_id: str | None = None, + task_id: str | None = None, + reference_task_ids: list[str] | None = None, + metadata: dict[str, Any] | None = None, + extensions: dict[str, Any] | None = None, + conversation_history: list[Message] | None = None, + agent_id: str | None = None, + agent_role: Role | None = None, + agent_branch: Any | None = None, + response_model: type[BaseModel] | None = None, + turn_number: int | None = None, + updates: UpdateConfig | None = None, +) -> TaskStateResult: + """Execute a task delegation to a remote A2A agent asynchronously. + + Native async implementation with multi-turn support. Use this when running + in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()). + + Args: + endpoint: A2A agent endpoint URL. + auth: Optional AuthScheme for authentication. + timeout: Request timeout in seconds. + task_description: The task to delegate. + context: Optional context information. + context_id: Context ID for correlating messages/tasks. + task_id: Specific task identifier. + reference_task_ids: List of related task IDs. + metadata: Additional metadata. + extensions: Protocol extensions for custom fields. + conversation_history: Previous Message objects from conversation. + agent_id: Agent identifier for logging. + agent_role: Role of the CrewAI agent delegating the task. + agent_branch: Optional agent tree branch for logging. + response_model: Optional Pydantic model for structured outputs. + turn_number: Optional turn number for multi-turn conversations. + updates: Update mechanism config from A2AConfig.updates. + + Returns: + TaskStateResult with status, result/error, history, and agent_card. + """ + if conversation_history is None: + conversation_history = [] + + is_multiturn = len(conversation_history) > 0 + if turn_number is None: + turn_number = len([m for m in conversation_history if m.role == Role.user]) + 1 + crewai_event_bus.emit( agent_branch, A2ADelegationStartedEvent( @@ -308,48 +395,41 @@ def execute_a2a_delegation( ), ) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - result = loop.run_until_complete( - _execute_a2a_delegation_async( - endpoint=endpoint, - auth=auth, - timeout=timeout, - task_description=task_description, - context=context, - context_id=context_id, - task_id=task_id, - reference_task_ids=reference_task_ids, - metadata=metadata, - extensions=extensions, - conversation_history=conversation_history or [], - is_multiturn=is_multiturn, - turn_number=turn_number, - agent_branch=agent_branch, - agent_id=agent_id, - agent_role=agent_role, - response_model=response_model, - updates=updates, - ) - ) + result = await _aexecute_a2a_delegation_impl( + endpoint=endpoint, + auth=auth, + timeout=timeout, + task_description=task_description, + context=context, + context_id=context_id, + task_id=task_id, + reference_task_ids=reference_task_ids, + metadata=metadata, + extensions=extensions, + conversation_history=conversation_history, + is_multiturn=is_multiturn, + turn_number=turn_number, + agent_branch=agent_branch, + agent_id=agent_id, + agent_role=agent_role, + response_model=response_model, + updates=updates, + ) - crewai_event_bus.emit( - agent_branch, - A2ADelegationCompletedEvent( - status=result["status"], - result=result.get("result"), - error=result.get("error"), - is_multiturn=is_multiturn, - ), - ) + crewai_event_bus.emit( + agent_branch, + A2ADelegationCompletedEvent( + status=result["status"], + result=result.get("result"), + error=result.get("error"), + is_multiturn=is_multiturn, + ), + ) - return result - finally: - loop.close() + return result -async def _execute_a2a_delegation_async( +async def _aexecute_a2a_delegation_impl( endpoint: str, auth: AuthScheme | None, timeout: int, @@ -361,39 +441,15 @@ async def _execute_a2a_delegation_async( metadata: dict[str, Any] | None, extensions: dict[str, Any] | None, conversation_history: list[Message], - is_multiturn: bool = False, - turn_number: int = 1, - agent_branch: Any | None = None, - agent_id: str | None = None, - agent_role: str | None = None, - response_model: type[BaseModel] | None = None, - updates: UpdateConfig | None = None, + is_multiturn: bool, + turn_number: int, + agent_branch: Any | None, + agent_id: str | None, + agent_role: str | None, + response_model: type[BaseModel] | None, + updates: UpdateConfig | None, ) -> TaskStateResult: - """Async implementation of A2A delegation with multi-turn support. - - Args: - endpoint: A2A agent endpoint URL - auth: Optional AuthScheme for authentication - timeout: Request timeout in seconds - task_description: Task to delegate - context: Optional context - context_id: Context ID for correlation - task_id: Specific task identifier - reference_task_ids: Related task IDs - metadata: Additional metadata - extensions: Protocol extensions - conversation_history: Previous Message objects - is_multiturn: Whether this is a multi-turn conversation - turn_number: Current turn number - agent_branch: Agent tree branch for logging - agent_id: Agent identifier for logging - agent_role: Agent role for logging - response_model: Optional Pydantic model for structured outputs - updates: Update mechanism config - - Returns: - TaskStateResult with status, result/error, history, and agent_card - """ + """Internal async implementation of A2A delegation.""" if auth: auth_data = auth.model_dump_json( exclude={ @@ -407,7 +463,7 @@ async def _execute_a2a_delegation_async( else: auth_hash = 0 _auth_store[auth_hash] = auth - agent_card = await _fetch_agent_card_async_cached( + agent_card = await _afetch_agent_card_cached( endpoint=endpoint, auth_hash=auth_hash, timeout=timeout ) diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 63c445921..358d0fc79 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -5,7 +5,8 @@ Wraps agent classes with A2A delegation capabilities. from __future__ import annotations -from collections.abc import Callable +import asyncio +from collections.abc import Callable, Coroutine from concurrent.futures import ThreadPoolExecutor, as_completed from functools import wraps from types import MethodType @@ -26,6 +27,8 @@ from crewai.a2a.templates import ( ) from crewai.a2a.types import AgentResponseProtocol from crewai.a2a.utils import ( + aexecute_a2a_delegation, + afetch_agent_card, execute_a2a_delegation, fetch_agent_card, get_a2a_agents_and_response_model, @@ -48,15 +51,15 @@ if TYPE_CHECKING: def wrap_agent_with_a2a_instance( agent: Agent, extension_registry: ExtensionRegistry | None = None ) -> None: - """Wrap an agent instance's execute_task method with A2A support. + """Wrap an agent instance's execute_task and aexecute_task methods with A2A support. This function modifies the agent instance by wrapping its execute_task - method to add A2A delegation capabilities. Should only be called when - the agent has a2a configuration set. + and aexecute_task methods to add A2A delegation capabilities. Should only + be called when the agent has a2a configuration set. Args: - agent: The agent instance to wrap - extension_registry: Optional registry of A2A extensions for injecting tools and custom logic + agent: The agent instance to wrap. + extension_registry: Optional registry of A2A extensions. """ if extension_registry is None: extension_registry = ExtensionRegistry() @@ -64,6 +67,7 @@ def wrap_agent_with_a2a_instance( extension_registry.inject_all_tools(agent) original_execute_task = agent.execute_task.__func__ # type: ignore[attr-defined] + original_aexecute_task = agent.aexecute_task.__func__ # type: ignore[attr-defined] @wraps(original_execute_task) def execute_task_with_a2a( @@ -72,17 +76,7 @@ def wrap_agent_with_a2a_instance( context: str | None = None, tools: list[BaseTool] | None = None, ) -> str: - """Execute task with A2A delegation support. - - Args: - self: The agent instance - task: The task to execute - context: Optional context for task execution - tools: Optional tools available to the agent - - Returns: - Task execution result - """ + """Execute task with A2A delegation support (sync).""" if not self.a2a: return original_execute_task(self, task, context, tools) # type: ignore[no-any-return] @@ -99,7 +93,34 @@ def wrap_agent_with_a2a_instance( extension_registry=extension_registry, ) + @wraps(original_aexecute_task) + async def aexecute_task_with_a2a( + self: Agent, + task: Task, + context: str | None = None, + tools: list[BaseTool] | None = None, + ) -> str: + """Execute task with A2A delegation support (async).""" + if not self.a2a: + return await original_aexecute_task(self, task, context, tools) # type: ignore[no-any-return] + + a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) + + return await _aexecute_task_with_a2a( + self=self, + a2a_agents=a2a_agents, + original_fn=original_aexecute_task, + task=task, + agent_response_model=agent_response_model, + context=context, + tools=tools, + extension_registry=extension_registry, + ) + object.__setattr__(agent, "execute_task", MethodType(execute_task_with_a2a, agent)) + object.__setattr__( + agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent) + ) def _fetch_card_from_config( @@ -353,15 +374,7 @@ IMPORTANT: You have the ability to delegate this task to remote A2A agents. def _parse_agent_response( raw_result: str | dict[str, Any], agent_response_model: type[BaseModel] ) -> BaseModel | str | dict[str, Any]: - """Parse LLM output as AgentResponse or return raw agent response. - - Args: - raw_result: Raw output from LLM - agent_response_model: The agent response model - - Returns: - Parsed AgentResponse, or raw result if parsing fails - """ + """Parse LLM output as AgentResponse or return raw agent response.""" if agent_response_model: try: if isinstance(raw_result, str): @@ -373,6 +386,246 @@ def _parse_agent_response( return raw_result +def _handle_max_turns_exceeded( + conversation_history: list[Message], + max_turns: int, +) -> str: + """Handle the case when max turns is exceeded. + + Shared logic for both sync and async delegation. + + Returns: + Final message if found in history. + + Raises: + Exception: If no final message found and max turns exceeded. + """ + if conversation_history: + for msg in reversed(conversation_history): + if msg.role == Role.agent: + text_parts = [ + part.root.text for part in msg.parts if part.root.kind == "text" + ] + final_message = ( + " ".join(text_parts) if text_parts else "Conversation completed" + ) + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=final_message, + error=None, + total_turns=max_turns, + ), + ) + return final_message + + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="failed", + final_result=None, + error=f"Conversation exceeded maximum turns ({max_turns})", + total_turns=max_turns, + ), + ) + raise Exception(f"A2A conversation exceeded maximum turns ({max_turns})") + + +def _process_response_result( + raw_result: str, + disable_structured_output: bool, + turn_num: int, + agent_role: str, + agent_response_model: type[BaseModel], +) -> tuple[str | None, str | None]: + """Process LLM response and determine next action. + + Shared logic for both sync and async handlers. + + Returns: + Tuple of (final_result, next_request). + """ + if disable_structured_output: + final_turn_number = turn_num + 1 + result_text = str(raw_result) + crewai_event_bus.emit( + None, + A2AMessageSentEvent( + message=result_text, + turn_number=final_turn_number, + is_multiturn=True, + agent_role=agent_role, + ), + ) + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=result_text, + error=None, + total_turns=final_turn_number, + ), + ) + return result_text, None + + llm_response = _parse_agent_response( + raw_result=raw_result, agent_response_model=agent_response_model + ) + + if isinstance(llm_response, BaseModel) and isinstance( + llm_response, AgentResponseProtocol + ): + if not llm_response.is_a2a: + final_turn_number = turn_num + 1 + crewai_event_bus.emit( + None, + A2AMessageSentEvent( + message=str(llm_response.message), + turn_number=final_turn_number, + is_multiturn=True, + agent_role=agent_role, + ), + ) + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=str(llm_response.message), + error=None, + total_turns=final_turn_number, + ), + ) + return str(llm_response.message), None + return None, str(llm_response.message) + + return str(raw_result), None + + +def _prepare_agent_cards_dict( + a2a_result: TaskStateResult, + agent_id: str, + agent_cards: dict[str, AgentCard] | None, +) -> dict[str, AgentCard]: + """Prepare agent cards dictionary from result and existing cards. + + Shared logic for both sync and async response handlers. + """ + agent_cards_dict = agent_cards or {} + if "agent_card" in a2a_result and agent_id not in agent_cards_dict: + agent_cards_dict[agent_id] = a2a_result["agent_card"] + return agent_cards_dict + + +def _prepare_delegation_context( + self: Agent, + agent_response: AgentResponseProtocol, + task: Task, + original_task_description: str | None, +) -> tuple[ + list[A2AConfig], + type[BaseModel], + str, + str, + A2AConfig, + str | None, + str | None, + dict[str, Any] | None, + dict[str, Any] | None, + list[str], + str, + int, +]: + """Prepare delegation context from agent response and task. + + Shared logic for both sync and async delegation. + + Returns: + Tuple containing all the context values needed for delegation. + """ + a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) + agent_ids = tuple(config.endpoint for config in a2a_agents) + current_request = str(agent_response.message) + + if hasattr(agent_response, "a2a_ids") and agent_response.a2a_ids: + agent_id = agent_response.a2a_ids[0] + else: + agent_id = agent_ids[0] if agent_ids else "" + + if agent_id and agent_id not in agent_ids: + raise ValueError( + f"Unknown A2A agent ID(s): {agent_response.a2a_ids} not in {agent_ids}" + ) + + agent_config = next(filter(lambda x: x.endpoint == agent_id, a2a_agents)) + task_config = task.config or {} + context_id = task_config.get("context_id") + task_id_config = task_config.get("task_id") + metadata = task_config.get("metadata") + extensions = task_config.get("extensions") + reference_task_ids = task_config.get("reference_task_ids", []) + + if original_task_description is None: + original_task_description = task.description + + max_turns = agent_config.max_turns + + return ( + a2a_agents, + agent_response_model, + current_request, + agent_id, + agent_config, + context_id, + task_id_config, + metadata, + extensions, + reference_task_ids, + original_task_description, + max_turns, + ) + + +def _handle_task_completion( + a2a_result: TaskStateResult, + task: Task, + task_id_config: str | None, + reference_task_ids: list[str], + agent_config: A2AConfig, + turn_num: int, +) -> tuple[str | None, str | None, list[str]]: + """Handle task completion state including reference task updates. + + Shared logic for both sync and async delegation. + + Returns: + Tuple of (result_if_trusted, updated_task_id, updated_reference_task_ids). + """ + if a2a_result["status"] == TaskState.completed: + if task_id_config is not None and task_id_config not in reference_task_ids: + reference_task_ids.append(task_id_config) + if task.config is None: + task.config = {} + task.config["reference_task_ids"] = reference_task_ids + task_id_config = None + + if agent_config.trust_remote_completion_status: + result_text = a2a_result.get("result", "") + final_turn_number = turn_num + 1 + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=result_text, + error=None, + total_turns=final_turn_number, + ), + ) + return str(result_text), task_id_config, reference_task_ids + + return None, task_id_config, reference_task_ids + + def _handle_agent_response_and_continue( self: Agent, a2a_result: TaskStateResult, @@ -413,9 +666,7 @@ def _handle_agent_response_and_continue( - final_result is not None if conversation should end - current_request is the next message to send if continuing """ - agent_cards_dict = agent_cards or {} - if "agent_card" in a2a_result and agent_id not in agent_cards_dict: - agent_cards_dict[agent_id] = a2a_result["agent_card"] + agent_cards_dict = _prepare_agent_cards_dict(a2a_result, agent_id, agent_cards) task.description, disable_structured_output = _augment_prompt_with_a2a( a2a_agents=a2a_agents, @@ -436,61 +687,14 @@ def _handle_agent_response_and_continue( if disable_structured_output: task.response_model = original_response_model - if disable_structured_output: - final_turn_number = turn_num + 1 - result_text = str(raw_result) - crewai_event_bus.emit( - None, - A2AMessageSentEvent( - message=result_text, - turn_number=final_turn_number, - is_multiturn=True, - agent_role=self.role, - ), - ) - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="completed", - final_result=result_text, - error=None, - total_turns=final_turn_number, - ), - ) - return result_text, None - - llm_response = _parse_agent_response( - raw_result=raw_result, agent_response_model=agent_response_model + return _process_response_result( + raw_result=raw_result, + disable_structured_output=disable_structured_output, + turn_num=turn_num, + agent_role=self.role, + agent_response_model=agent_response_model, ) - if isinstance(llm_response, BaseModel) and isinstance( - llm_response, AgentResponseProtocol - ): - if not llm_response.is_a2a: - final_turn_number = turn_num + 1 - crewai_event_bus.emit( - None, - A2AMessageSentEvent( - message=str(llm_response.message), - turn_number=final_turn_number, - is_multiturn=True, - agent_role=self.role, - ), - ) - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="completed", - final_result=str(llm_response.message), - error=None, - total_turns=final_turn_number, - ), - ) - return str(llm_response.message), None - return None, str(llm_response.message) - - return str(raw_result), None - def _delegate_to_a2a( self: Agent, @@ -522,34 +726,24 @@ def _delegate_to_a2a( Raises: ImportError: If a2a-sdk is not installed """ - a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) - agent_ids = tuple(config.endpoint for config in a2a_agents) - current_request = str(agent_response.message) - - if hasattr(agent_response, "a2a_ids") and agent_response.a2a_ids: - agent_id = agent_response.a2a_ids[0] - else: - agent_id = agent_ids[0] if agent_ids else "" - - if agent_id and agent_id not in agent_ids: - raise ValueError( - f"Unknown A2A agent ID(s): {agent_response.a2a_ids} not in {agent_ids}" - ) - - agent_config = next(filter(lambda x: x.endpoint == agent_id, a2a_agents)) - task_config = task.config or {} - context_id = task_config.get("context_id") - task_id_config = task_config.get("task_id") - metadata = task_config.get("metadata") - extensions = task_config.get("extensions") - - reference_task_ids = task_config.get("reference_task_ids", []) - - if original_task_description is None: - original_task_description = task.description + ( + a2a_agents, + agent_response_model, + current_request, + agent_id, + agent_config, + context_id, + task_id_config, + metadata, + extensions, + reference_task_ids, + original_task_description, + max_turns, + ) = _prepare_delegation_context( + self, agent_response, task, original_task_description + ) conversation_history: list[Message] = [] - max_turns = agent_config.max_turns try: for turn_num in range(max_turns): @@ -589,33 +783,18 @@ def _delegate_to_a2a( context_id = latest_message.context_id if a2a_result["status"] in [TaskState.completed, TaskState.input_required]: - if a2a_result["status"] == TaskState.completed: - if ( - task_id_config is not None - and task_id_config not in reference_task_ids - ): - reference_task_ids.append(task_id_config) - if task.config is None: - task.config = {} - task.config["reference_task_ids"] = reference_task_ids - task_id_config = None - - if ( - a2a_result["status"] == TaskState.completed - and agent_config.trust_remote_completion_status - ): - result_text = a2a_result.get("result", "") - final_turn_number = turn_num + 1 - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="completed", - final_result=result_text, - error=None, - total_turns=final_turn_number, - ), + trusted_result, task_id_config, reference_task_ids = ( + _handle_task_completion( + a2a_result, + task, + task_id_config, + reference_task_ids, + agent_config, + turn_num, ) - return str(result_text) + ) + if trusted_result is not None: + return trusted_result final_result, next_request = _handle_agent_response_and_continue( self=self, @@ -681,36 +860,321 @@ def _delegate_to_a2a( ) return f"A2A delegation failed: {error_msg}" - if conversation_history: - for msg in reversed(conversation_history): - if msg.role == Role.agent: - text_parts = [ - part.root.text for part in msg.parts if part.root.kind == "text" - ] - final_message = ( - " ".join(text_parts) if text_parts else "Conversation completed" - ) - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="completed", - final_result=final_message, - error=None, - total_turns=max_turns, - ), - ) - return final_message - - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="failed", - final_result=None, - error=f"Conversation exceeded maximum turns ({max_turns})", - total_turns=max_turns, - ), - ) - raise Exception(f"A2A conversation exceeded maximum turns ({max_turns})") + return _handle_max_turns_exceeded(conversation_history, max_turns) + + finally: + task.description = original_task_description + + +async def _afetch_card_from_config( + config: A2AConfig, +) -> tuple[A2AConfig, AgentCard | Exception]: + """Fetch agent card from A2A config asynchronously.""" + try: + card = await afetch_agent_card( + endpoint=config.endpoint, + auth=config.auth, + timeout=config.timeout, + ) + return config, card + except Exception as e: + return config, e + + +async def _afetch_agent_cards_concurrently( + a2a_agents: list[A2AConfig], +) -> tuple[dict[str, AgentCard], dict[str, str]]: + """Fetch agent cards concurrently for multiple A2A agents using asyncio.""" + agent_cards: dict[str, AgentCard] = {} + failed_agents: dict[str, str] = {} + + tasks = [_afetch_card_from_config(config) for config in a2a_agents] + results = await asyncio.gather(*tasks) + + for config, result in results: + if isinstance(result, Exception): + if config.fail_fast: + raise RuntimeError( + f"Failed to fetch agent card from {config.endpoint}. " + f"Ensure the A2A agent is running and accessible. Error: {result}" + ) from result + failed_agents[config.endpoint] = str(result) + else: + agent_cards[config.endpoint] = result + + return agent_cards, failed_agents + + +async def _aexecute_task_with_a2a( + self: Agent, + a2a_agents: list[A2AConfig], + original_fn: Callable[..., Coroutine[Any, Any, str]], + task: Task, + agent_response_model: type[BaseModel], + context: str | None, + tools: list[BaseTool] | None, + extension_registry: ExtensionRegistry, +) -> str: + """Async version of _execute_task_with_a2a.""" + original_description: str = task.description + original_output_pydantic = task.output_pydantic + original_response_model = task.response_model + + agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents) + + if not agent_cards and a2a_agents and failed_agents: + unavailable_agents_text = "" + for endpoint, error in failed_agents.items(): + unavailable_agents_text += f" - {endpoint}: {error}\n" + + notice = UNAVAILABLE_AGENTS_NOTICE_TEMPLATE.substitute( + unavailable_agents=unavailable_agents_text + ) + task.description = f"{original_description}{notice}" + + try: + return await original_fn(self, task, context, tools) + finally: + task.description = original_description + + task.description, _ = _augment_prompt_with_a2a( + a2a_agents=a2a_agents, + task_description=original_description, + agent_cards=agent_cards, + failed_agents=failed_agents, + extension_registry=extension_registry, + ) + task.response_model = agent_response_model + + try: + raw_result = await original_fn(self, task, context, tools) + agent_response = _parse_agent_response( + raw_result=raw_result, agent_response_model=agent_response_model + ) + + if extension_registry and isinstance(agent_response, BaseModel): + agent_response = extension_registry.process_response_with_all( + agent_response, {} + ) + + if isinstance(agent_response, BaseModel) and isinstance( + agent_response, AgentResponseProtocol + ): + if agent_response.is_a2a: + return await _adelegate_to_a2a( + self, + agent_response=agent_response, + task=task, + original_fn=original_fn, + context=context, + tools=tools, + agent_cards=agent_cards, + original_task_description=original_description, + extension_registry=extension_registry, + ) + return str(agent_response.message) + + return raw_result + finally: + task.description = original_description + task.output_pydantic = original_output_pydantic + task.response_model = original_response_model + + +async def _ahandle_agent_response_and_continue( + self: Agent, + a2a_result: TaskStateResult, + agent_id: str, + agent_cards: dict[str, AgentCard] | None, + a2a_agents: list[A2AConfig], + original_task_description: str, + conversation_history: list[Message], + turn_num: int, + max_turns: int, + task: Task, + original_fn: Callable[..., Coroutine[Any, Any, str]], + context: str | None, + tools: list[BaseTool] | None, + agent_response_model: type[BaseModel], + remote_task_completed: bool = False, +) -> tuple[str | None, str | None]: + """Async version of _handle_agent_response_and_continue.""" + agent_cards_dict = _prepare_agent_cards_dict(a2a_result, agent_id, agent_cards) + + task.description, disable_structured_output = _augment_prompt_with_a2a( + a2a_agents=a2a_agents, + task_description=original_task_description, + conversation_history=conversation_history, + turn_num=turn_num, + max_turns=max_turns, + agent_cards=agent_cards_dict, + remote_task_completed=remote_task_completed, + ) + + original_response_model = task.response_model + if disable_structured_output: + task.response_model = None + + raw_result = await original_fn(self, task, context, tools) + + if disable_structured_output: + task.response_model = original_response_model + + return _process_response_result( + raw_result=raw_result, + disable_structured_output=disable_structured_output, + turn_num=turn_num, + agent_role=self.role, + agent_response_model=agent_response_model, + ) + + +async def _adelegate_to_a2a( + self: Agent, + agent_response: AgentResponseProtocol, + task: Task, + original_fn: Callable[..., Coroutine[Any, Any, str]], + context: str | None, + tools: list[BaseTool] | None, + agent_cards: dict[str, AgentCard] | None = None, + original_task_description: str | None = None, + extension_registry: ExtensionRegistry | None = None, +) -> str: + """Async version of _delegate_to_a2a.""" + ( + a2a_agents, + agent_response_model, + current_request, + agent_id, + agent_config, + context_id, + task_id_config, + metadata, + extensions, + reference_task_ids, + original_task_description, + max_turns, + ) = _prepare_delegation_context( + self, agent_response, task, original_task_description + ) + + conversation_history: list[Message] = [] + + try: + for turn_num in range(max_turns): + console_formatter = getattr(crewai_event_bus, "_console", None) + agent_branch = None + if console_formatter: + agent_branch = getattr( + console_formatter, "current_agent_branch", None + ) or getattr(console_formatter, "current_task_branch", None) + + a2a_result = await aexecute_a2a_delegation( + endpoint=agent_config.endpoint, + auth=agent_config.auth, + timeout=agent_config.timeout, + task_description=current_request, + context_id=context_id, + task_id=task_id_config, + reference_task_ids=reference_task_ids, + metadata=metadata, + extensions=extensions, + conversation_history=conversation_history, + agent_id=agent_id, + agent_role=Role.user, + agent_branch=agent_branch, + response_model=agent_config.response_model, + turn_number=turn_num + 1, + updates=agent_config.updates, + ) + + conversation_history = a2a_result.get("history", []) + + if conversation_history: + latest_message = conversation_history[-1] + if latest_message.task_id is not None: + task_id_config = latest_message.task_id + if latest_message.context_id is not None: + context_id = latest_message.context_id + + if a2a_result["status"] in [TaskState.completed, TaskState.input_required]: + trusted_result, task_id_config, reference_task_ids = ( + _handle_task_completion( + a2a_result, + task, + task_id_config, + reference_task_ids, + agent_config, + turn_num, + ) + ) + if trusted_result is not None: + return trusted_result + + final_result, next_request = await _ahandle_agent_response_and_continue( + self=self, + a2a_result=a2a_result, + agent_id=agent_id, + agent_cards=agent_cards, + a2a_agents=a2a_agents, + original_task_description=original_task_description, + conversation_history=conversation_history, + turn_num=turn_num, + max_turns=max_turns, + task=task, + original_fn=original_fn, + context=context, + tools=tools, + agent_response_model=agent_response_model, + remote_task_completed=(a2a_result["status"] == TaskState.completed), + ) + + if final_result is not None: + return final_result + + if next_request is not None: + current_request = next_request + + continue + + error_msg = a2a_result.get("error", "Unknown error") + + final_result, next_request = await _ahandle_agent_response_and_continue( + self=self, + a2a_result=a2a_result, + agent_id=agent_id, + agent_cards=agent_cards, + a2a_agents=a2a_agents, + original_task_description=original_task_description, + conversation_history=conversation_history, + turn_num=turn_num, + max_turns=max_turns, + task=task, + original_fn=original_fn, + context=context, + tools=tools, + agent_response_model=agent_response_model, + ) + + if final_result is not None: + return final_result + + if next_request is not None: + current_request = next_request + continue + + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="failed", + final_result=None, + error=error_msg, + total_turns=turn_num + 1, + ), + ) + return f"A2A delegation failed: {error_msg}" + + return _handle_max_turns_exceeded(conversation_history, max_turns) finally: task.description = original_task_description