From d4b28287d3f1864658de2856d72edc56f0e78d69 Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Mon, 26 Jan 2026 16:04:19 +0100 Subject: [PATCH] Add call_id to LLM events for correlating requests When monitoring LLM events, consumers need to know which events belong to the same API call. Before this change, there was no way to correlate LLMCallStartedEvent, LLMStreamChunkEvent, and LLMCallCompletedEvent belonging to the same request. --- .../src/crewai/events/types/llm_events.py | 1 + lib/crewai/src/crewai/llm.py | 376 ++++++++++-------- lib/crewai/src/crewai/llms/base_llm.py | 38 +- .../llms/providers/anthropic/completion.py | 160 ++++---- .../crewai/llms/providers/azure/completion.py | 124 +++--- .../llms/providers/bedrock/completion.py | 316 ++++++++------- .../llms/providers/gemini/completion.py | 182 ++++----- .../llms/providers/openai/completion.py | 146 +++---- .../test_llm_call_events_share_call_id.yaml | 108 +++++ ...ate_llm_calls_have_different_call_ids.yaml | 215 ++++++++++ ...eaming_chunks_share_call_id_with_call.yaml | 143 +++++++ lib/crewai/tests/test_streaming.py | 10 + lib/crewai/tests/utilities/test_events.py | 105 ++++- 13 files changed, 1296 insertions(+), 628 deletions(-) create mode 100644 lib/crewai/tests/cassettes/utilities/test_llm_call_events_share_call_id.yaml create mode 100644 lib/crewai/tests/cassettes/utilities/test_separate_llm_calls_have_different_call_ids.yaml create mode 100644 lib/crewai/tests/cassettes/utilities/test_streaming_chunks_share_call_id_with_call.yaml diff --git a/lib/crewai/src/crewai/events/types/llm_events.py b/lib/crewai/src/crewai/events/types/llm_events.py index 161b8a2a0..87087f100 100644 --- a/lib/crewai/src/crewai/events/types/llm_events.py +++ b/lib/crewai/src/crewai/events/types/llm_events.py @@ -10,6 +10,7 @@ class LLMEventBase(BaseEvent): from_task: Any | None = None from_agent: Any | None = None model: str | None = None + call_id: str def __init__(self, **data: Any) -> None: if data.get("from_task"): diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index c607f1615..902a3d310 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -37,7 +37,7 @@ from crewai.events.types.tool_usage_events import ( ToolUsageFinishedEvent, ToolUsageStartedEvent, ) -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, get_current_call_id, llm_call_context from crewai.llms.constants import ( ANTHROPIC_MODELS, AZURE_MODELS, @@ -770,7 +770,7 @@ class LLM(BaseLLM): chunk_content = None response_id = None - if hasattr(chunk,'id'): + if hasattr(chunk, "id"): response_id = chunk.id # Safely extract content from various chunk formats @@ -827,7 +827,7 @@ class LLM(BaseLLM): available_functions=available_functions, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if result is not None: @@ -849,7 +849,8 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, call_type=LLMCallType.LLM_CALL, - response_id=response_id + response_id=response_id, + call_id=get_current_call_id(), ), ) # --- 4) Fallback to non-streaming if no content received @@ -1015,7 +1016,10 @@ class LLM(BaseLLM): crewai_event_bus.emit( self, event=LLMCallFailedEvent( - error=str(e), from_task=from_task, from_agent=from_agent + error=str(e), + from_task=from_task, + from_agent=from_agent, + call_id=get_current_call_id(), ), ) raise Exception(f"Failed to get streaming response: {e!s}") from e @@ -1048,7 +1052,8 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, + call_id=get_current_call_id(), ), ) @@ -1476,7 +1481,8 @@ class LLM(BaseLLM): chunk=chunk_content, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, + call_id=get_current_call_id(), ), ) @@ -1619,7 +1625,12 @@ class LLM(BaseLLM): logging.error(f"Error executing function '{function_name}': {e}") crewai_event_bus.emit( self, - event=LLMCallFailedEvent(error=f"Tool execution error: {e!s}"), + event=LLMCallFailedEvent( + error=f"Tool execution error: {e!s}", + from_task=from_task, + from_agent=from_agent, + call_id=get_current_call_id(), + ), ) crewai_event_bus.emit( self, @@ -1669,108 +1680,117 @@ class LLM(BaseLLM): ValueError: If response format is not supported LLMContextLengthExceededError: If input exceeds model's context limit """ - crewai_event_bus.emit( - self, - event=LLMCallStartedEvent( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - model=self.model, - ), - ) + with llm_call_context() as call_id: + crewai_event_bus.emit( + self, + event=LLMCallStartedEvent( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + model=self.model, + call_id=call_id, + ), + ) - # --- 2) Validate parameters before proceeding with the call - self._validate_call_params() + # --- 2) Validate parameters before proceeding with the call + self._validate_call_params() - # --- 3) Convert string messages to proper format if needed - if isinstance(messages, str): - messages = [{"role": "user", "content": messages}] - # --- 4) Handle O1 model special case (system messages not supported) - if "o1" in self.model.lower(): - for message in messages: - if message.get("role") == "system": - msg_role: Literal["assistant"] = "assistant" - message["role"] = msg_role + # --- 3) Convert string messages to proper format if needed + if isinstance(messages, str): + messages = [{"role": "user", "content": messages}] + # --- 4) Handle O1 model special case (system messages not supported) + if "o1" in self.model.lower(): + for message in messages: + if message.get("role") == "system": + msg_role: Literal["assistant"] = "assistant" + message["role"] = msg_role - if not self._invoke_before_llm_call_hooks(messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks(messages, from_agent): + raise ValueError("LLM call blocked by before_llm_call hook") - # --- 5) Set up callbacks if provided - with suppress_warnings(): - if callbacks and len(callbacks) > 0: - self.set_callbacks(callbacks) - try: - # --- 6) Prepare parameters for the completion call - params = self._prepare_completion_params(messages, tools) - # --- 7) Make the completion call and handle response - if self.stream: - result = self._handle_streaming_response( - params=params, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - else: - result = self._handle_non_streaming_response( - params=params, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - - if isinstance(result, str): - result = self._invoke_after_llm_call_hooks( - messages, result, from_agent - ) - - return result - except LLMContextLengthExceededError: - # Re-raise LLMContextLengthExceededError as it should be handled - # by the CrewAgentExecutor._invoke_loop method, which can then decide - # whether to summarize the content or abort based on the respect_context_window flag - raise - except Exception as e: - unsupported_stop = "Unsupported parameter" in str( - e - ) and "'stop'" in str(e) - - if unsupported_stop: - if ( - "additional_drop_params" in self.additional_params - and isinstance( - self.additional_params["additional_drop_params"], list + # --- 5) Set up callbacks if provided + with suppress_warnings(): + if callbacks and len(callbacks) > 0: + self.set_callbacks(callbacks) + try: + # --- 6) Prepare parameters for the completion call + params = self._prepare_completion_params(messages, tools) + # --- 7) Make the completion call and handle response + if self.stream: + result = self._handle_streaming_response( + params=params, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, ) - ): - self.additional_params["additional_drop_params"].append("stop") else: - self.additional_params = {"additional_drop_params": ["stop"]} + result = self._handle_non_streaming_response( + params=params, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) - logging.info("Retrying LLM call without the unsupported 'stop'") + if isinstance(result, str): + result = self._invoke_after_llm_call_hooks( + messages, result, from_agent + ) - return self.call( - messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, + return result + except LLMContextLengthExceededError: + # Re-raise LLMContextLengthExceededError as it should be handled + # by the CrewAgentExecutor._invoke_loop method, which can then decide + # whether to summarize the content or abort based on the respect_context_window flag + raise + except Exception as e: + unsupported_stop = "Unsupported parameter" in str( + e + ) and "'stop'" in str(e) + + if unsupported_stop: + if ( + "additional_drop_params" in self.additional_params + and isinstance( + self.additional_params["additional_drop_params"], list + ) + ): + self.additional_params["additional_drop_params"].append( + "stop" + ) + else: + self.additional_params = { + "additional_drop_params": ["stop"] + } + + logging.info("Retrying LLM call without the unsupported 'stop'") + + return self.call( + messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + crewai_event_bus.emit( + self, + event=LLMCallFailedEvent( + error=str(e), + from_task=from_task, + from_agent=from_agent, + call_id=get_current_call_id(), + ), ) - - crewai_event_bus.emit( - self, - event=LLMCallFailedEvent( - error=str(e), from_task=from_task, from_agent=from_agent - ), - ) - raise + raise async def acall( self, @@ -1808,43 +1828,54 @@ class LLM(BaseLLM): ValueError: If response format is not supported LLMContextLengthExceededError: If input exceeds model's context limit """ - crewai_event_bus.emit( - self, - event=LLMCallStartedEvent( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - model=self.model, - ), - ) + with llm_call_context() as call_id: + crewai_event_bus.emit( + self, + event=LLMCallStartedEvent( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + model=self.model, + call_id=call_id, + ), + ) - self._validate_call_params() + self._validate_call_params() - if isinstance(messages, str): - messages = [{"role": "user", "content": messages}] + if isinstance(messages, str): + messages = [{"role": "user", "content": messages}] - # Process file attachments asynchronously before preparing params - messages = await self._aprocess_message_files(messages) + # Process file attachments asynchronously before preparing params + messages = await self._aprocess_message_files(messages) - if "o1" in self.model.lower(): - for message in messages: - if message.get("role") == "system": - msg_role: Literal["assistant"] = "assistant" - message["role"] = msg_role + if "o1" in self.model.lower(): + for message in messages: + if message.get("role") == "system": + msg_role: Literal["assistant"] = "assistant" + message["role"] = msg_role - with suppress_warnings(): - if callbacks and len(callbacks) > 0: - self.set_callbacks(callbacks) - try: - params = self._prepare_completion_params( - messages, tools, skip_file_processing=True - ) + with suppress_warnings(): + if callbacks and len(callbacks) > 0: + self.set_callbacks(callbacks) + try: + params = self._prepare_completion_params( + messages, tools, skip_file_processing=True + ) - if self.stream: - return await self._ahandle_streaming_response( + if self.stream: + return await self._ahandle_streaming_response( + params=params, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + return await self._ahandle_non_streaming_response( params=params, callbacks=callbacks, available_functions=available_functions, @@ -1852,52 +1883,50 @@ class LLM(BaseLLM): from_agent=from_agent, response_model=response_model, ) + except LLMContextLengthExceededError: + raise + except Exception as e: + unsupported_stop = "Unsupported parameter" in str( + e + ) and "'stop'" in str(e) - return await self._ahandle_non_streaming_response( - params=params, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - except LLMContextLengthExceededError: - raise - except Exception as e: - unsupported_stop = "Unsupported parameter" in str( - e - ) and "'stop'" in str(e) + if unsupported_stop: + if ( + "additional_drop_params" in self.additional_params + and isinstance( + self.additional_params["additional_drop_params"], list + ) + ): + self.additional_params["additional_drop_params"].append( + "stop" + ) + else: + self.additional_params = { + "additional_drop_params": ["stop"] + } - if unsupported_stop: - if ( - "additional_drop_params" in self.additional_params - and isinstance( - self.additional_params["additional_drop_params"], list + logging.info("Retrying LLM call without the unsupported 'stop'") + + return await self.acall( + messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, ) - ): - self.additional_params["additional_drop_params"].append("stop") - else: - self.additional_params = {"additional_drop_params": ["stop"]} - logging.info("Retrying LLM call without the unsupported 'stop'") - - return await self.acall( - messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, + crewai_event_bus.emit( + self, + event=LLMCallFailedEvent( + error=str(e), + from_task=from_task, + from_agent=from_agent, + call_id=get_current_call_id(), + ), ) - - crewai_event_bus.emit( - self, - event=LLMCallFailedEvent( - error=str(e), from_task=from_task, from_agent=from_agent - ), - ) - raise + raise def _handle_emit_call_events( self, @@ -1925,6 +1954,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, model=self.model, + call_id=get_current_call_id(), ), ) diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py index 2a6def197..0044b9571 100644 --- a/lib/crewai/src/crewai/llms/base_llm.py +++ b/lib/crewai/src/crewai/llms/base_llm.py @@ -7,11 +7,15 @@ in CrewAI, including common functionality for native SDK implementations. from __future__ import annotations from abc import ABC, abstractmethod +from collections.abc import Generator +from contextlib import contextmanager +import contextvars from datetime import datetime import json import logging import re from typing import TYPE_CHECKING, Any, Final +import uuid from pydantic import BaseModel @@ -50,6 +54,32 @@ DEFAULT_CONTEXT_WINDOW_SIZE: Final[int] = 4096 DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True _JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL) +_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "_current_call_id", default=None +) + + +@contextmanager +def llm_call_context() -> Generator[str, None, None]: + """Context manager that establishes an LLM call scope with a unique call_id.""" + call_id = str(uuid.uuid4()) + token = _current_call_id.set(call_id) + try: + yield call_id + finally: + _current_call_id.reset(token) + + +def get_current_call_id() -> str: + """Get current call_id from context""" + call_id = _current_call_id.get() + if call_id is None: + logging.warning( + "LLM event emitted outside call context - generating fallback call_id" + ) + return str(uuid.uuid4()) + return call_id + class BaseLLM(ABC): """Abstract base class for LLM implementations. @@ -351,6 +381,7 @@ class BaseLLM(ABC): from_task=from_task, from_agent=from_agent, model=self.model, + call_id=get_current_call_id(), ), ) @@ -374,6 +405,7 @@ class BaseLLM(ABC): from_task=from_task, from_agent=from_agent, model=self.model, + call_id=get_current_call_id(), ), ) @@ -394,6 +426,7 @@ class BaseLLM(ABC): from_task=from_task, from_agent=from_agent, model=self.model, + call_id=get_current_call_id(), ), ) @@ -404,7 +437,7 @@ class BaseLLM(ABC): from_agent: Agent | None = None, tool_call: dict[str, Any] | None = None, call_type: LLMCallType | None = None, - response_id: str | None = None + response_id: str | None = None, ) -> None: """Emit stream chunk event. @@ -427,7 +460,8 @@ class BaseLLM(ABC): from_task=from_task, from_agent=from_agent, call_type=call_type, - response_id=response_id + response_id=response_id, + call_id=get_current_call_id(), ), ) diff --git a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py index 1d626a7f4..e779adf88 100644 --- a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py +++ b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py @@ -9,7 +9,7 @@ from anthropic.types import ThinkingBlock from pydantic import BaseModel from crewai.events.types.llm_events import LLMCallType -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( @@ -207,33 +207,44 @@ class AnthropicCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - # Emit call started event - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + # Emit call started event + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - # Format messages for Anthropic - formatted_messages, system_message = self._format_messages_for_anthropic( - messages - ) + # Format messages for Anthropic + formatted_messages, system_message = ( + self._format_messages_for_anthropic(messages) + ) - if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks( + formatted_messages, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") - # Prepare completion parameters - completion_params = self._prepare_completion_params( - formatted_messages, system_message, tools - ) + # Prepare completion parameters + completion_params = self._prepare_completion_params( + formatted_messages, system_message, tools + ) - # Handle streaming vs non-streaming - if self.stream: - return self._handle_streaming_completion( + # Handle streaming vs non-streaming + if self.stream: + return self._handle_streaming_completion( + completion_params, + available_functions, + from_task, + from_agent, + response_model, + ) + + return self._handle_completion( completion_params, available_functions, from_task, @@ -241,21 +252,13 @@ class AnthropicCompletion(BaseLLM): response_model, ) - return self._handle_completion( - completion_params, - available_functions, - from_task, - from_agent, - response_model, - ) - - except Exception as e: - error_msg = f"Anthropic API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except Exception as e: + error_msg = f"Anthropic API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise async def acall( self, @@ -280,26 +283,35 @@ class AnthropicCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - formatted_messages, system_message = self._format_messages_for_anthropic( - messages - ) + formatted_messages, system_message = ( + self._format_messages_for_anthropic(messages) + ) - completion_params = self._prepare_completion_params( - formatted_messages, system_message, tools - ) + completion_params = self._prepare_completion_params( + formatted_messages, system_message, tools + ) - if self.stream: - return await self._ahandle_streaming_completion( + if self.stream: + return await self._ahandle_streaming_completion( + completion_params, + available_functions, + from_task, + from_agent, + response_model, + ) + + return await self._ahandle_completion( completion_params, available_functions, from_task, @@ -307,21 +319,13 @@ class AnthropicCompletion(BaseLLM): response_model, ) - return await self._ahandle_completion( - completion_params, - available_functions, - from_task, - from_agent, - response_model, - ) - - except Exception as e: - error_msg = f"Anthropic API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except Exception as e: + error_msg = f"Anthropic API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise def _prepare_completion_params( self, @@ -704,7 +708,7 @@ class AnthropicCompletion(BaseLLM): for event in stream: if hasattr(event, "message") and hasattr(event.message, "id"): response_id = event.message.id - + if hasattr(event, "delta") and hasattr(event.delta, "text"): text_delta = event.delta.text full_response += text_delta @@ -712,7 +716,7 @@ class AnthropicCompletion(BaseLLM): chunk=text_delta, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if event.type == "content_block_start": @@ -739,7 +743,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) elif event.type == "content_block_delta": if event.delta.type == "input_json_delta": @@ -763,7 +767,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) final_message: Message = stream.get_final_message() @@ -1133,7 +1137,7 @@ class AnthropicCompletion(BaseLLM): chunk=text_delta, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if event.type == "content_block_start": @@ -1160,7 +1164,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) elif event.type == "content_block_delta": if event.delta.type == "input_json_delta": @@ -1184,7 +1188,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) final_message: Message = await stream.get_final_message() diff --git a/lib/crewai/src/crewai/llms/providers/azure/completion.py b/lib/crewai/src/crewai/llms/providers/azure/completion.py index a3aed7f4b..5caf5e78a 100644 --- a/lib/crewai/src/crewai/llms/providers/azure/completion.py +++ b/lib/crewai/src/crewai/llms/providers/azure/completion.py @@ -43,7 +43,7 @@ try: ) from crewai.events.types.llm_events import LLMCallType - from crewai.llms.base_llm import BaseLLM + from crewai.llms.base_llm import BaseLLM, llm_call_context except ImportError: raise ImportError( @@ -288,31 +288,42 @@ class AzureCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - # Emit call started event - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + # Emit call started event + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - # Format messages for Azure - formatted_messages = self._format_messages_for_azure(messages) + # Format messages for Azure + formatted_messages = self._format_messages_for_azure(messages) - if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks( + formatted_messages, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") - # Prepare completion parameters - completion_params = self._prepare_completion_params( - formatted_messages, tools, response_model - ) + # Prepare completion parameters + completion_params = self._prepare_completion_params( + formatted_messages, tools, response_model + ) - # Handle streaming vs non-streaming - if self.stream: - return self._handle_streaming_completion( + # Handle streaming vs non-streaming + if self.stream: + return self._handle_streaming_completion( + completion_params, + available_functions, + from_task, + from_agent, + response_model, + ) + + return self._handle_completion( completion_params, available_functions, from_task, @@ -320,16 +331,8 @@ class AzureCompletion(BaseLLM): response_model, ) - return self._handle_completion( - completion_params, - available_functions, - from_task, - from_agent, - response_model, - ) - - except Exception as e: - return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value] + except Exception as e: + return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value] async def acall( # type: ignore[return] self, @@ -355,24 +358,33 @@ class AzureCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - formatted_messages = self._format_messages_for_azure(messages) + formatted_messages = self._format_messages_for_azure(messages) - completion_params = self._prepare_completion_params( - formatted_messages, tools, response_model - ) + completion_params = self._prepare_completion_params( + formatted_messages, tools, response_model + ) - if self.stream: - return await self._ahandle_streaming_completion( + if self.stream: + return await self._ahandle_streaming_completion( + completion_params, + available_functions, + from_task, + from_agent, + response_model, + ) + + return await self._ahandle_completion( completion_params, available_functions, from_task, @@ -380,16 +392,8 @@ class AzureCompletion(BaseLLM): response_model, ) - return await self._ahandle_completion( - completion_params, - available_functions, - from_task, - from_agent, - response_model, - ) - - except Exception as e: - self._handle_api_error(e, from_task, from_agent) + except Exception as e: + self._handle_api_error(e, from_task, from_agent) def _prepare_completion_params( self, @@ -726,7 +730,7 @@ class AzureCompletion(BaseLLM): """ if update.choices: choice = update.choices[0] - response_id = update.id if hasattr(update,"id") else None + response_id = update.id if hasattr(update, "id") else None if choice.delta and choice.delta.content: content_delta = choice.delta.content full_response += content_delta @@ -734,7 +738,7 @@ class AzureCompletion(BaseLLM): chunk=content_delta, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if choice.delta and choice.delta.tool_calls: @@ -769,7 +773,7 @@ class AzureCompletion(BaseLLM): "index": idx, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) return full_response diff --git a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py index 73fefc478..4f85cd02e 100644 --- a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py +++ b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py @@ -11,7 +11,7 @@ from pydantic import BaseModel from typing_extensions import Required from crewai.events.types.llm_events import LLMCallType -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -299,77 +299,89 @@ class BedrockCompletion(BaseLLM): response_model: type[BaseModel] | None = None, ) -> str | Any: """Call AWS Bedrock Converse API.""" - try: - # Emit call started event - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) - - # Format messages for Converse API - formatted_messages, system_message = self._format_messages_for_converse( - messages - ) - - if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") - - # Prepare request body - body: BedrockConverseRequestBody = { - "inferenceConfig": self._get_inference_config(), - } - - # Add system message if present - if system_message: - body["system"] = cast( - "list[SystemContentBlockTypeDef]", - cast(object, [{"text": system_message}]), + with llm_call_context(): + try: + # Emit call started event + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, ) - # Add tool config if present or if messages contain tool content - # Bedrock requires toolConfig when messages have toolUse/toolResult - if tools: - tool_config: ToolConfigurationTypeDef = { - "tools": cast( - "Sequence[ToolTypeDef]", - cast(object, self._format_tools_for_converse(tools)), - ) + # Format messages for Converse API + formatted_messages, system_message = self._format_messages_for_converse( + messages + ) + + if not self._invoke_before_llm_call_hooks( + formatted_messages, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") + + # Prepare request body + body: BedrockConverseRequestBody = { + "inferenceConfig": self._get_inference_config(), } - body["toolConfig"] = tool_config - elif self._messages_contain_tool_content(formatted_messages): - # Create minimal toolConfig from tool history in messages - tools_from_history = self._extract_tools_from_message_history( - formatted_messages - ) - if tools_from_history: - body["toolConfig"] = cast( - "ToolConfigurationTypeDef", - cast(object, {"tools": tools_from_history}), + + # Add system message if present + if system_message: + body["system"] = cast( + "list[SystemContentBlockTypeDef]", + cast(object, [{"text": system_message}]), ) - # Add optional advanced features if configured - if self.guardrail_config: - guardrail_config: GuardrailConfigurationTypeDef = cast( - "GuardrailConfigurationTypeDef", cast(object, self.guardrail_config) - ) - body["guardrailConfig"] = guardrail_config + # Add tool config if present or if messages contain tool content + # Bedrock requires toolConfig when messages have toolUse/toolResult + if tools: + tool_config: ToolConfigurationTypeDef = { + "tools": cast( + "Sequence[ToolTypeDef]", + cast(object, self._format_tools_for_converse(tools)), + ) + } + body["toolConfig"] = tool_config + elif self._messages_contain_tool_content(formatted_messages): + # Create minimal toolConfig from tool history in messages + tools_from_history = self._extract_tools_from_message_history( + formatted_messages + ) + if tools_from_history: + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast(object, {"tools": tools_from_history}), + ) - if self.additional_model_request_fields: - body["additionalModelRequestFields"] = ( - self.additional_model_request_fields - ) + # Add optional advanced features if configured + if self.guardrail_config: + guardrail_config: GuardrailConfigurationTypeDef = cast( + "GuardrailConfigurationTypeDef", + cast(object, self.guardrail_config), + ) + body["guardrailConfig"] = guardrail_config - if self.additional_model_response_field_paths: - body["additionalModelResponseFieldPaths"] = ( - self.additional_model_response_field_paths - ) + if self.additional_model_request_fields: + body["additionalModelRequestFields"] = ( + self.additional_model_request_fields + ) - if self.stream: - return self._handle_streaming_converse( + if self.additional_model_response_field_paths: + body["additionalModelResponseFieldPaths"] = ( + self.additional_model_response_field_paths + ) + + if self.stream: + return self._handle_streaming_converse( + formatted_messages, + body, + available_functions, + from_task, + from_agent, + ) + + return self._handle_converse( formatted_messages, body, available_functions, @@ -377,25 +389,17 @@ class BedrockCompletion(BaseLLM): from_agent, ) - return self._handle_converse( - formatted_messages, - body, - available_functions, - from_task, - from_agent, - ) + except Exception as e: + if is_context_length_exceeded(e): + logging.error(f"Context window exceeded: {e}") + raise LLMContextLengthExceededError(str(e)) from e - except Exception as e: - if is_context_length_exceeded(e): - logging.error(f"Context window exceeded: {e}") - raise LLMContextLengthExceededError(str(e)) from e - - error_msg = f"AWS Bedrock API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + error_msg = f"AWS Bedrock API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise async def acall( self, @@ -431,87 +435,93 @@ class BedrockCompletion(BaseLLM): 'Install with: uv add "crewai[bedrock-async]"' ) - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) - - formatted_messages, system_message = self._format_messages_for_converse( - messages - ) - - body: BedrockConverseRequestBody = { - "inferenceConfig": self._get_inference_config(), - } - - if system_message: - body["system"] = cast( - "list[SystemContentBlockTypeDef]", - cast(object, [{"text": system_message}]), + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, ) - # Add tool config if present or if messages contain tool content - # Bedrock requires toolConfig when messages have toolUse/toolResult - if tools: - tool_config: ToolConfigurationTypeDef = { - "tools": cast( - "Sequence[ToolTypeDef]", - cast(object, self._format_tools_for_converse(tools)), - ) + formatted_messages, system_message = self._format_messages_for_converse( + messages + ) + + body: BedrockConverseRequestBody = { + "inferenceConfig": self._get_inference_config(), } - body["toolConfig"] = tool_config - elif self._messages_contain_tool_content(formatted_messages): - # Create minimal toolConfig from tool history in messages - tools_from_history = self._extract_tools_from_message_history( - formatted_messages - ) - if tools_from_history: - body["toolConfig"] = cast( - "ToolConfigurationTypeDef", - cast(object, {"tools": tools_from_history}), + + if system_message: + body["system"] = cast( + "list[SystemContentBlockTypeDef]", + cast(object, [{"text": system_message}]), ) - if self.guardrail_config: - guardrail_config: GuardrailConfigurationTypeDef = cast( - "GuardrailConfigurationTypeDef", cast(object, self.guardrail_config) - ) - body["guardrailConfig"] = guardrail_config + # Add tool config if present or if messages contain tool content + # Bedrock requires toolConfig when messages have toolUse/toolResult + if tools: + tool_config: ToolConfigurationTypeDef = { + "tools": cast( + "Sequence[ToolTypeDef]", + cast(object, self._format_tools_for_converse(tools)), + ) + } + body["toolConfig"] = tool_config + elif self._messages_contain_tool_content(formatted_messages): + # Create minimal toolConfig from tool history in messages + tools_from_history = self._extract_tools_from_message_history( + formatted_messages + ) + if tools_from_history: + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast(object, {"tools": tools_from_history}), + ) - if self.additional_model_request_fields: - body["additionalModelRequestFields"] = ( - self.additional_model_request_fields - ) + if self.guardrail_config: + guardrail_config: GuardrailConfigurationTypeDef = cast( + "GuardrailConfigurationTypeDef", + cast(object, self.guardrail_config), + ) + body["guardrailConfig"] = guardrail_config - if self.additional_model_response_field_paths: - body["additionalModelResponseFieldPaths"] = ( - self.additional_model_response_field_paths - ) + if self.additional_model_request_fields: + body["additionalModelRequestFields"] = ( + self.additional_model_request_fields + ) - if self.stream: - return await self._ahandle_streaming_converse( + if self.additional_model_response_field_paths: + body["additionalModelResponseFieldPaths"] = ( + self.additional_model_response_field_paths + ) + + if self.stream: + return await self._ahandle_streaming_converse( + formatted_messages, + body, + available_functions, + from_task, + from_agent, + ) + + return await self._ahandle_converse( formatted_messages, body, available_functions, from_task, from_agent ) - return await self._ahandle_converse( - formatted_messages, body, available_functions, from_task, from_agent - ) + except Exception as e: + if is_context_length_exceeded(e): + logging.error(f"Context window exceeded: {e}") + raise LLMContextLengthExceededError(str(e)) from e - except Exception as e: - if is_context_length_exceeded(e): - logging.error(f"Context window exceeded: {e}") - raise LLMContextLengthExceededError(str(e)) from e - - error_msg = f"AWS Bedrock API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + error_msg = f"AWS Bedrock API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise def _handle_converse( self, @@ -805,7 +815,7 @@ class BedrockCompletion(BaseLLM): "index": tool_use_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) elif "contentBlockStop" in event: logging.debug("Content block stopped in stream") @@ -1174,7 +1184,7 @@ class BedrockCompletion(BaseLLM): chunk=text_chunk, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) elif "toolUse" in delta and current_tool_use: tool_input = delta["toolUse"].get("input", "") diff --git a/lib/crewai/src/crewai/llms/providers/gemini/completion.py b/lib/crewai/src/crewai/llms/providers/gemini/completion.py index 9687f3d4f..1ec6ab167 100644 --- a/lib/crewai/src/crewai/llms/providers/gemini/completion.py +++ b/lib/crewai/src/crewai/llms/providers/gemini/completion.py @@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Literal, cast from pydantic import BaseModel from crewai.events.types.llm_events import LLMCallType -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -282,32 +282,44 @@ class GeminiCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) - self.tools = tools + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + self.tools = tools - formatted_content, system_instruction = self._format_messages_for_gemini( - messages - ) + formatted_content, system_instruction = ( + self._format_messages_for_gemini(messages) + ) - messages_for_hooks = self._convert_contents_to_dict(formatted_content) + messages_for_hooks = self._convert_contents_to_dict(formatted_content) - if not self._invoke_before_llm_call_hooks(messages_for_hooks, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks( + messages_for_hooks, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") - config = self._prepare_generation_config( - system_instruction, tools, response_model - ) + config = self._prepare_generation_config( + system_instruction, tools, response_model + ) - if self.stream: - return self._handle_streaming_completion( + if self.stream: + return self._handle_streaming_completion( + formatted_content, + config, + available_functions, + from_task, + from_agent, + response_model, + ) + + return self._handle_completion( formatted_content, config, available_functions, @@ -316,29 +328,20 @@ class GeminiCompletion(BaseLLM): response_model, ) - return self._handle_completion( - formatted_content, - config, - available_functions, - from_task, - from_agent, - response_model, - ) - - except APIError as e: - error_msg = f"Google Gemini API error: {e.code} - {e.message}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise - except Exception as e: - error_msg = f"Google Gemini API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except APIError as e: + error_msg = f"Google Gemini API error: {e.code} - {e.message}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise + except Exception as e: + error_msg = f"Google Gemini API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise async def acall( self, @@ -364,27 +367,37 @@ class GeminiCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) - self.tools = tools + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + self.tools = tools - formatted_content, system_instruction = self._format_messages_for_gemini( - messages - ) + formatted_content, system_instruction = ( + self._format_messages_for_gemini(messages) + ) - config = self._prepare_generation_config( - system_instruction, tools, response_model - ) + config = self._prepare_generation_config( + system_instruction, tools, response_model + ) - if self.stream: - return await self._ahandle_streaming_completion( + if self.stream: + return await self._ahandle_streaming_completion( + formatted_content, + config, + available_functions, + from_task, + from_agent, + response_model, + ) + + return await self._ahandle_completion( formatted_content, config, available_functions, @@ -393,29 +406,20 @@ class GeminiCompletion(BaseLLM): response_model, ) - return await self._ahandle_completion( - formatted_content, - config, - available_functions, - from_task, - from_agent, - response_model, - ) - - except APIError as e: - error_msg = f"Google Gemini API error: {e.code} - {e.message}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise - except Exception as e: - error_msg = f"Google Gemini API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except APIError as e: + error_msg = f"Google Gemini API error: {e.code} - {e.message}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise + except Exception as e: + error_msg = f"Google Gemini API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise def _prepare_generation_config( self, @@ -790,7 +794,7 @@ class GeminiCompletion(BaseLLM): Returns: Tuple of (updated full_response, updated function_calls, updated usage_data) """ - response_id=chunk.response_id if hasattr(chunk,"response_id") else None + response_id = chunk.response_id if hasattr(chunk, "response_id") else None if chunk.usage_metadata: usage_data = self._extract_token_usage(chunk) @@ -800,7 +804,7 @@ class GeminiCompletion(BaseLLM): chunk=chunk.text, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if chunk.candidates: @@ -837,7 +841,7 @@ class GeminiCompletion(BaseLLM): "index": call_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) return full_response, function_calls, usage_data diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index 56a6fa2cb..28a1caec0 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -17,7 +17,7 @@ from openai.types.responses import Response from pydantic import BaseModel from crewai.events.types.llm_events import LLMCallType -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( @@ -382,23 +382,35 @@ class OpenAICompletion(BaseLLM): Returns: Completion response or tool call result. """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - formatted_messages = self._format_messages(messages) + formatted_messages = self._format_messages(messages) - if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks( + formatted_messages, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") - if self.api == "responses": - return self._call_responses( + if self.api == "responses": + return self._call_responses( + messages=formatted_messages, + tools=tools, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + return self._call_completions( messages=formatted_messages, tools=tools, available_functions=available_functions, @@ -407,22 +419,13 @@ class OpenAICompletion(BaseLLM): response_model=response_model, ) - return self._call_completions( - messages=formatted_messages, - tools=tools, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - - except Exception as e: - error_msg = f"OpenAI API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except Exception as e: + error_msg = f"OpenAI API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise def _call_completions( self, @@ -479,20 +482,30 @@ class OpenAICompletion(BaseLLM): Returns: Completion response or tool call result. """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - formatted_messages = self._format_messages(messages) + formatted_messages = self._format_messages(messages) - if self.api == "responses": - return await self._acall_responses( + if self.api == "responses": + return await self._acall_responses( + messages=formatted_messages, + tools=tools, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + return await self._acall_completions( messages=formatted_messages, tools=tools, available_functions=available_functions, @@ -501,22 +514,13 @@ class OpenAICompletion(BaseLLM): response_model=response_model, ) - return await self._acall_completions( - messages=formatted_messages, - tools=tools, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - - except Exception as e: - error_msg = f"OpenAI API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except Exception as e: + error_msg = f"OpenAI API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise async def _acall_completions( self, @@ -1060,7 +1064,7 @@ class OpenAICompletion(BaseLLM): chunk=delta_text, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) elif event.type == "response.function_call_arguments.delta": @@ -1709,7 +1713,7 @@ class OpenAICompletion(BaseLLM): **parse_params, response_format=response_model ) as stream: for chunk in stream: - response_id_stream=chunk.id if hasattr(chunk,"id") else None + response_id_stream = chunk.id if hasattr(chunk, "id") else None if chunk.type == "content.delta": delta_content = chunk.delta @@ -1718,7 +1722,7 @@ class OpenAICompletion(BaseLLM): chunk=delta_content, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) final_completion = stream.get_final_completion() @@ -1748,7 +1752,9 @@ class OpenAICompletion(BaseLLM): usage_data = {"total_tokens": 0} for completion_chunk in completion_stream: - response_id_stream=completion_chunk.id if hasattr(completion_chunk,"id") else None + response_id_stream = ( + completion_chunk.id if hasattr(completion_chunk, "id") else None + ) if hasattr(completion_chunk, "usage") and completion_chunk.usage: usage_data = self._extract_openai_token_usage(completion_chunk) @@ -1766,7 +1772,7 @@ class OpenAICompletion(BaseLLM): chunk=chunk_delta.content, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) if chunk_delta.tool_calls: @@ -1805,7 +1811,7 @@ class OpenAICompletion(BaseLLM): "index": tool_calls[tool_index]["index"], }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id_stream + response_id=response_id_stream, ) self._track_token_usage_internal(usage_data) @@ -2017,7 +2023,7 @@ class OpenAICompletion(BaseLLM): accumulated_content = "" usage_data = {"total_tokens": 0} async for chunk in completion_stream: - response_id_stream=chunk.id if hasattr(chunk,"id") else None + response_id_stream = chunk.id if hasattr(chunk, "id") else None if hasattr(chunk, "usage") and chunk.usage: usage_data = self._extract_openai_token_usage(chunk) @@ -2035,7 +2041,7 @@ class OpenAICompletion(BaseLLM): chunk=delta.content, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) self._track_token_usage_internal(usage_data) @@ -2071,7 +2077,7 @@ class OpenAICompletion(BaseLLM): usage_data = {"total_tokens": 0} async for chunk in stream: - response_id_stream=chunk.id if hasattr(chunk,"id") else None + response_id_stream = chunk.id if hasattr(chunk, "id") else None if hasattr(chunk, "usage") and chunk.usage: usage_data = self._extract_openai_token_usage(chunk) @@ -2089,7 +2095,7 @@ class OpenAICompletion(BaseLLM): chunk=chunk_delta.content, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) if chunk_delta.tool_calls: @@ -2128,7 +2134,7 @@ class OpenAICompletion(BaseLLM): "index": tool_calls[tool_index]["index"], }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id_stream + response_id=response_id_stream, ) self._track_token_usage_internal(usage_data) diff --git a/lib/crewai/tests/cassettes/utilities/test_llm_call_events_share_call_id.yaml b/lib/crewai/tests/cassettes/utilities/test_llm_call_events_share_call_id.yaml new file mode 100644 index 000000000..2370a9d04 --- /dev/null +++ b/lib/crewai/tests/cassettes/utilities/test_llm_call_events_share_call_id.yaml @@ -0,0 +1,108 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '71' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.0 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-D2HpUSxS5LeHwDTELElWlC5CDMzmr\",\n \"object\": + \"chat.completion\",\n \"created\": 1769437564,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Hi there! How can I assist you today?\",\n + \ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": + null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": + 9,\n \"completion_tokens\": 10,\n \"total_tokens\": 19,\n \"prompt_tokens_details\": + {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": + \"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n" + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Mon, 26 Jan 2026 14:26:05 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '460' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '477' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/utilities/test_separate_llm_calls_have_different_call_ids.yaml b/lib/crewai/tests/cassettes/utilities/test_separate_llm_calls_have_different_call_ids.yaml new file mode 100644 index 000000000..419c5e006 --- /dev/null +++ b/lib/crewai/tests/cassettes/utilities/test_separate_llm_calls_have_different_call_ids.yaml @@ -0,0 +1,215 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '71' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.0 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-D2HpStmyOpe9DrthWBlDdMZfVMJ1u\",\n \"object\": + \"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Hi! How can I assist you today?\",\n + \ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": + null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": + 9,\n \"completion_tokens\": 9,\n \"total_tokens\": 18,\n \"prompt_tokens_details\": + {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": + \"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n" + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Mon, 26 Jan 2026 14:26:02 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '415' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '434' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +- request: + body: '{"messages":[{"role":"user","content":"Say bye"}],"model":"gpt-4o-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '72' + content-type: + - application/json + cookie: + - COOKIE-XXX + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.0 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-D2HpS1DP0Xd3tmWt5PBincVrdU7yw\",\n \"object\": + \"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Goodbye! If you have more questions + in the future, feel free to reach out. Have a great day!\",\n \"refusal\": + null,\n \"annotations\": []\n },\n \"logprobs\": null,\n + \ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": + 9,\n \"completion_tokens\": 23,\n \"total_tokens\": 32,\n \"prompt_tokens_details\": + {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": + \"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n" + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Mon, 26 Jan 2026 14:26:03 GMT + Server: + - cloudflare + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '964' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '979' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/utilities/test_streaming_chunks_share_call_id_with_call.yaml b/lib/crewai/tests/cassettes/utilities/test_streaming_chunks_share_call_id_with_call.yaml new file mode 100644 index 000000000..7b04d21a3 --- /dev/null +++ b/lib/crewai/tests/cassettes/utilities/test_streaming_chunks_share_call_id_with_call.yaml @@ -0,0 +1,143 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini","stream":true,"stream_options":{"include_usage":true}}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '125' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.0 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: 'data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"rVIyGQF2E"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ZGVqV7ZDm"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"vnfm7IxlIB"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + How"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"o8F35ZZ"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + can"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"kiBzGe3"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + I"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"cbGT2RWgx"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + assist"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"DtxR"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + you"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"6y6Co8J"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + today"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SZOmm"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"s9Bc0HqlPg"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"u9aar"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":9,"completion_tokens":9,"total_tokens":18,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"5hudm8ySqh39"} + + + data: [DONE] + + + ' + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Mon, 26 Jan 2026 14:26:04 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '260' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '275' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/test_streaming.py b/lib/crewai/tests/test_streaming.py index 5860755ff..8eb63694e 100644 --- a/lib/crewai/tests/test_streaming.py +++ b/lib/crewai/tests/test_streaming.py @@ -217,6 +217,7 @@ class TestCrewKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Hello ", + call_id="test-call-id", ), ) crewai_event_bus.emit( @@ -224,6 +225,7 @@ class TestCrewKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="World!", + call_id="test-call-id", ), ) return mock_output @@ -284,6 +286,7 @@ class TestCrewKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="", + call_id="test-call-id", tool_call=ToolCall( id="call-123", function=FunctionCall( @@ -364,6 +367,7 @@ class TestCrewKickoffStreamingAsync: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Async ", + call_id="test-call-id", ), ) crewai_event_bus.emit( @@ -371,6 +375,7 @@ class TestCrewKickoffStreamingAsync: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Stream!", + call_id="test-call-id", ), ) return mock_output @@ -451,6 +456,7 @@ class TestFlowKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Flow ", + call_id="test-call-id", ), ) crewai_event_bus.emit( @@ -458,6 +464,7 @@ class TestFlowKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="output!", + call_id="test-call-id", ), ) return "done" @@ -545,6 +552,7 @@ class TestFlowKickoffStreamingAsync: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Async flow ", + call_id="test-call-id", ), ) await asyncio.sleep(0.01) @@ -553,6 +561,7 @@ class TestFlowKickoffStreamingAsync: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="stream!", + call_id="test-call-id", ), ) await asyncio.sleep(0.01) @@ -686,6 +695,7 @@ class TestStreamingEdgeCases: type="llm_stream_chunk", chunk="Task 1", task_name="First task", + call_id="test-call-id", ), ) return mock_output diff --git a/lib/crewai/tests/utilities/test_events.py b/lib/crewai/tests/utilities/test_events.py index 789f1f43e..81ef321d6 100644 --- a/lib/crewai/tests/utilities/test_events.py +++ b/lib/crewai/tests/utilities/test_events.py @@ -984,8 +984,8 @@ def test_streaming_fallback_to_non_streaming(): def mock_call(messages, tools=None, callbacks=None, available_functions=None): nonlocal fallback_called # Emit a couple of chunks to simulate partial streaming - crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id = "Id")) - crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id = "Id")) + crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id="Id", call_id="test-call-id")) + crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id="Id", call_id="test-call-id")) # Mark that fallback would be called fallback_called = True @@ -1041,7 +1041,7 @@ def test_streaming_empty_response_handling(): def mock_call(messages, tools=None, callbacks=None, available_functions=None): # Emit a few empty chunks for _ in range(3): - crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="",response_id="id")) + crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="", response_id="id", call_id="test-call-id")) # Return the default message for empty responses return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request." @@ -1280,6 +1280,105 @@ def test_llm_emits_event_with_lite_agent(): assert set(all_agent_id) == {str(agent.id)} +# ----------- CALL_ID CORRELATION TESTS ----------- + + +@pytest.mark.vcr() +def test_llm_call_events_share_call_id(): + """All events from a single LLM call should share the same call_id.""" + import uuid + + events = [] + condition = threading.Condition() + + @crewai_event_bus.on(LLMCallStartedEvent) + def on_start(source, event): + with condition: + events.append(event) + condition.notify() + + @crewai_event_bus.on(LLMCallCompletedEvent) + def on_complete(source, event): + with condition: + events.append(event) + condition.notify() + + llm = LLM(model="gpt-4o-mini") + llm.call("Say hi") + + with condition: + success = condition.wait_for(lambda: len(events) >= 2, timeout=10) + assert success, "Timeout waiting for LLM events" + + # Behavior: all events from the call share the same call_id + assert len(events) == 2 + assert events[0].call_id == events[1].call_id + # call_id should be a valid UUID + uuid.UUID(events[0].call_id) + + +@pytest.mark.vcr() +def test_streaming_chunks_share_call_id_with_call(): + """Streaming chunks should share call_id with started/completed events.""" + events = [] + condition = threading.Condition() + + @crewai_event_bus.on(LLMCallStartedEvent) + def on_start(source, event): + with condition: + events.append(event) + condition.notify() + + @crewai_event_bus.on(LLMStreamChunkEvent) + def on_chunk(source, event): + with condition: + events.append(event) + condition.notify() + + @crewai_event_bus.on(LLMCallCompletedEvent) + def on_complete(source, event): + with condition: + events.append(event) + condition.notify() + + llm = LLM(model="gpt-4o-mini", stream=True) + llm.call("Say hi") + + with condition: + # Wait for at least started, some chunks, and completed + success = condition.wait_for(lambda: len(events) >= 3, timeout=10) + assert success, "Timeout waiting for streaming events" + + # Behavior: all events (started, chunks, completed) share the same call_id + call_ids = {e.call_id for e in events} + assert len(call_ids) == 1 + + +@pytest.mark.vcr() +def test_separate_llm_calls_have_different_call_ids(): + """Different LLM calls should have different call_ids.""" + call_ids = [] + condition = threading.Condition() + + @crewai_event_bus.on(LLMCallStartedEvent) + def on_start(source, event): + with condition: + call_ids.append(event.call_id) + condition.notify() + + llm = LLM(model="gpt-4o-mini") + llm.call("Say hi") + llm.call("Say bye") + + with condition: + success = condition.wait_for(lambda: len(call_ids) >= 2, timeout=10) + assert success, "Timeout waiting for LLM call events" + + # Behavior: each call has its own call_id + assert len(call_ids) == 2 + assert call_ids[0] != call_ids[1] + + # ----------- HUMAN FEEDBACK EVENTS -----------