diff --git a/lib/crewai/src/crewai/events/types/llm_events.py b/lib/crewai/src/crewai/events/types/llm_events.py index 161b8a2a0..87087f100 100644 --- a/lib/crewai/src/crewai/events/types/llm_events.py +++ b/lib/crewai/src/crewai/events/types/llm_events.py @@ -10,6 +10,7 @@ class LLMEventBase(BaseEvent): from_task: Any | None = None from_agent: Any | None = None model: str | None = None + call_id: str def __init__(self, **data: Any) -> None: if data.get("from_task"): diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index c607f1615..902a3d310 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -37,7 +37,7 @@ from crewai.events.types.tool_usage_events import ( ToolUsageFinishedEvent, ToolUsageStartedEvent, ) -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, get_current_call_id, llm_call_context from crewai.llms.constants import ( ANTHROPIC_MODELS, AZURE_MODELS, @@ -770,7 +770,7 @@ class LLM(BaseLLM): chunk_content = None response_id = None - if hasattr(chunk,'id'): + if hasattr(chunk, "id"): response_id = chunk.id # Safely extract content from various chunk formats @@ -827,7 +827,7 @@ class LLM(BaseLLM): available_functions=available_functions, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if result is not None: @@ -849,7 +849,8 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, call_type=LLMCallType.LLM_CALL, - response_id=response_id + response_id=response_id, + call_id=get_current_call_id(), ), ) # --- 4) Fallback to non-streaming if no content received @@ -1015,7 +1016,10 @@ class LLM(BaseLLM): crewai_event_bus.emit( self, event=LLMCallFailedEvent( - error=str(e), from_task=from_task, from_agent=from_agent + error=str(e), + from_task=from_task, + from_agent=from_agent, + call_id=get_current_call_id(), ), ) raise Exception(f"Failed to get streaming response: {e!s}") from e @@ -1048,7 +1052,8 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, + call_id=get_current_call_id(), ), ) @@ -1476,7 +1481,8 @@ class LLM(BaseLLM): chunk=chunk_content, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, + call_id=get_current_call_id(), ), ) @@ -1619,7 +1625,12 @@ class LLM(BaseLLM): logging.error(f"Error executing function '{function_name}': {e}") crewai_event_bus.emit( self, - event=LLMCallFailedEvent(error=f"Tool execution error: {e!s}"), + event=LLMCallFailedEvent( + error=f"Tool execution error: {e!s}", + from_task=from_task, + from_agent=from_agent, + call_id=get_current_call_id(), + ), ) crewai_event_bus.emit( self, @@ -1669,108 +1680,117 @@ class LLM(BaseLLM): ValueError: If response format is not supported LLMContextLengthExceededError: If input exceeds model's context limit """ - crewai_event_bus.emit( - self, - event=LLMCallStartedEvent( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - model=self.model, - ), - ) + with llm_call_context() as call_id: + crewai_event_bus.emit( + self, + event=LLMCallStartedEvent( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + model=self.model, + call_id=call_id, + ), + ) - # --- 2) Validate parameters before proceeding with the call - self._validate_call_params() + # --- 2) Validate parameters before proceeding with the call + self._validate_call_params() - # --- 3) Convert string messages to proper format if needed - if isinstance(messages, str): - messages = [{"role": "user", "content": messages}] - # --- 4) Handle O1 model special case (system messages not supported) - if "o1" in self.model.lower(): - for message in messages: - if message.get("role") == "system": - msg_role: Literal["assistant"] = "assistant" - message["role"] = msg_role + # --- 3) Convert string messages to proper format if needed + if isinstance(messages, str): + messages = [{"role": "user", "content": messages}] + # --- 4) Handle O1 model special case (system messages not supported) + if "o1" in self.model.lower(): + for message in messages: + if message.get("role") == "system": + msg_role: Literal["assistant"] = "assistant" + message["role"] = msg_role - if not self._invoke_before_llm_call_hooks(messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks(messages, from_agent): + raise ValueError("LLM call blocked by before_llm_call hook") - # --- 5) Set up callbacks if provided - with suppress_warnings(): - if callbacks and len(callbacks) > 0: - self.set_callbacks(callbacks) - try: - # --- 6) Prepare parameters for the completion call - params = self._prepare_completion_params(messages, tools) - # --- 7) Make the completion call and handle response - if self.stream: - result = self._handle_streaming_response( - params=params, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - else: - result = self._handle_non_streaming_response( - params=params, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - - if isinstance(result, str): - result = self._invoke_after_llm_call_hooks( - messages, result, from_agent - ) - - return result - except LLMContextLengthExceededError: - # Re-raise LLMContextLengthExceededError as it should be handled - # by the CrewAgentExecutor._invoke_loop method, which can then decide - # whether to summarize the content or abort based on the respect_context_window flag - raise - except Exception as e: - unsupported_stop = "Unsupported parameter" in str( - e - ) and "'stop'" in str(e) - - if unsupported_stop: - if ( - "additional_drop_params" in self.additional_params - and isinstance( - self.additional_params["additional_drop_params"], list + # --- 5) Set up callbacks if provided + with suppress_warnings(): + if callbacks and len(callbacks) > 0: + self.set_callbacks(callbacks) + try: + # --- 6) Prepare parameters for the completion call + params = self._prepare_completion_params(messages, tools) + # --- 7) Make the completion call and handle response + if self.stream: + result = self._handle_streaming_response( + params=params, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, ) - ): - self.additional_params["additional_drop_params"].append("stop") else: - self.additional_params = {"additional_drop_params": ["stop"]} + result = self._handle_non_streaming_response( + params=params, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) - logging.info("Retrying LLM call without the unsupported 'stop'") + if isinstance(result, str): + result = self._invoke_after_llm_call_hooks( + messages, result, from_agent + ) - return self.call( - messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, + return result + except LLMContextLengthExceededError: + # Re-raise LLMContextLengthExceededError as it should be handled + # by the CrewAgentExecutor._invoke_loop method, which can then decide + # whether to summarize the content or abort based on the respect_context_window flag + raise + except Exception as e: + unsupported_stop = "Unsupported parameter" in str( + e + ) and "'stop'" in str(e) + + if unsupported_stop: + if ( + "additional_drop_params" in self.additional_params + and isinstance( + self.additional_params["additional_drop_params"], list + ) + ): + self.additional_params["additional_drop_params"].append( + "stop" + ) + else: + self.additional_params = { + "additional_drop_params": ["stop"] + } + + logging.info("Retrying LLM call without the unsupported 'stop'") + + return self.call( + messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + crewai_event_bus.emit( + self, + event=LLMCallFailedEvent( + error=str(e), + from_task=from_task, + from_agent=from_agent, + call_id=get_current_call_id(), + ), ) - - crewai_event_bus.emit( - self, - event=LLMCallFailedEvent( - error=str(e), from_task=from_task, from_agent=from_agent - ), - ) - raise + raise async def acall( self, @@ -1808,43 +1828,54 @@ class LLM(BaseLLM): ValueError: If response format is not supported LLMContextLengthExceededError: If input exceeds model's context limit """ - crewai_event_bus.emit( - self, - event=LLMCallStartedEvent( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - model=self.model, - ), - ) + with llm_call_context() as call_id: + crewai_event_bus.emit( + self, + event=LLMCallStartedEvent( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + model=self.model, + call_id=call_id, + ), + ) - self._validate_call_params() + self._validate_call_params() - if isinstance(messages, str): - messages = [{"role": "user", "content": messages}] + if isinstance(messages, str): + messages = [{"role": "user", "content": messages}] - # Process file attachments asynchronously before preparing params - messages = await self._aprocess_message_files(messages) + # Process file attachments asynchronously before preparing params + messages = await self._aprocess_message_files(messages) - if "o1" in self.model.lower(): - for message in messages: - if message.get("role") == "system": - msg_role: Literal["assistant"] = "assistant" - message["role"] = msg_role + if "o1" in self.model.lower(): + for message in messages: + if message.get("role") == "system": + msg_role: Literal["assistant"] = "assistant" + message["role"] = msg_role - with suppress_warnings(): - if callbacks and len(callbacks) > 0: - self.set_callbacks(callbacks) - try: - params = self._prepare_completion_params( - messages, tools, skip_file_processing=True - ) + with suppress_warnings(): + if callbacks and len(callbacks) > 0: + self.set_callbacks(callbacks) + try: + params = self._prepare_completion_params( + messages, tools, skip_file_processing=True + ) - if self.stream: - return await self._ahandle_streaming_response( + if self.stream: + return await self._ahandle_streaming_response( + params=params, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + return await self._ahandle_non_streaming_response( params=params, callbacks=callbacks, available_functions=available_functions, @@ -1852,52 +1883,50 @@ class LLM(BaseLLM): from_agent=from_agent, response_model=response_model, ) + except LLMContextLengthExceededError: + raise + except Exception as e: + unsupported_stop = "Unsupported parameter" in str( + e + ) and "'stop'" in str(e) - return await self._ahandle_non_streaming_response( - params=params, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - except LLMContextLengthExceededError: - raise - except Exception as e: - unsupported_stop = "Unsupported parameter" in str( - e - ) and "'stop'" in str(e) + if unsupported_stop: + if ( + "additional_drop_params" in self.additional_params + and isinstance( + self.additional_params["additional_drop_params"], list + ) + ): + self.additional_params["additional_drop_params"].append( + "stop" + ) + else: + self.additional_params = { + "additional_drop_params": ["stop"] + } - if unsupported_stop: - if ( - "additional_drop_params" in self.additional_params - and isinstance( - self.additional_params["additional_drop_params"], list + logging.info("Retrying LLM call without the unsupported 'stop'") + + return await self.acall( + messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, ) - ): - self.additional_params["additional_drop_params"].append("stop") - else: - self.additional_params = {"additional_drop_params": ["stop"]} - logging.info("Retrying LLM call without the unsupported 'stop'") - - return await self.acall( - messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, + crewai_event_bus.emit( + self, + event=LLMCallFailedEvent( + error=str(e), + from_task=from_task, + from_agent=from_agent, + call_id=get_current_call_id(), + ), ) - - crewai_event_bus.emit( - self, - event=LLMCallFailedEvent( - error=str(e), from_task=from_task, from_agent=from_agent - ), - ) - raise + raise def _handle_emit_call_events( self, @@ -1925,6 +1954,7 @@ class LLM(BaseLLM): from_task=from_task, from_agent=from_agent, model=self.model, + call_id=get_current_call_id(), ), ) diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py index 2a6def197..0044b9571 100644 --- a/lib/crewai/src/crewai/llms/base_llm.py +++ b/lib/crewai/src/crewai/llms/base_llm.py @@ -7,11 +7,15 @@ in CrewAI, including common functionality for native SDK implementations. from __future__ import annotations from abc import ABC, abstractmethod +from collections.abc import Generator +from contextlib import contextmanager +import contextvars from datetime import datetime import json import logging import re from typing import TYPE_CHECKING, Any, Final +import uuid from pydantic import BaseModel @@ -50,6 +54,32 @@ DEFAULT_CONTEXT_WINDOW_SIZE: Final[int] = 4096 DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True _JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL) +_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar( + "_current_call_id", default=None +) + + +@contextmanager +def llm_call_context() -> Generator[str, None, None]: + """Context manager that establishes an LLM call scope with a unique call_id.""" + call_id = str(uuid.uuid4()) + token = _current_call_id.set(call_id) + try: + yield call_id + finally: + _current_call_id.reset(token) + + +def get_current_call_id() -> str: + """Get current call_id from context""" + call_id = _current_call_id.get() + if call_id is None: + logging.warning( + "LLM event emitted outside call context - generating fallback call_id" + ) + return str(uuid.uuid4()) + return call_id + class BaseLLM(ABC): """Abstract base class for LLM implementations. @@ -351,6 +381,7 @@ class BaseLLM(ABC): from_task=from_task, from_agent=from_agent, model=self.model, + call_id=get_current_call_id(), ), ) @@ -374,6 +405,7 @@ class BaseLLM(ABC): from_task=from_task, from_agent=from_agent, model=self.model, + call_id=get_current_call_id(), ), ) @@ -394,6 +426,7 @@ class BaseLLM(ABC): from_task=from_task, from_agent=from_agent, model=self.model, + call_id=get_current_call_id(), ), ) @@ -404,7 +437,7 @@ class BaseLLM(ABC): from_agent: Agent | None = None, tool_call: dict[str, Any] | None = None, call_type: LLMCallType | None = None, - response_id: str | None = None + response_id: str | None = None, ) -> None: """Emit stream chunk event. @@ -427,7 +460,8 @@ class BaseLLM(ABC): from_task=from_task, from_agent=from_agent, call_type=call_type, - response_id=response_id + response_id=response_id, + call_id=get_current_call_id(), ), ) diff --git a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py index 1d626a7f4..e779adf88 100644 --- a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py +++ b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py @@ -9,7 +9,7 @@ from anthropic.types import ThinkingBlock from pydantic import BaseModel from crewai.events.types.llm_events import LLMCallType -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( @@ -207,33 +207,44 @@ class AnthropicCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - # Emit call started event - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + # Emit call started event + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - # Format messages for Anthropic - formatted_messages, system_message = self._format_messages_for_anthropic( - messages - ) + # Format messages for Anthropic + formatted_messages, system_message = ( + self._format_messages_for_anthropic(messages) + ) - if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks( + formatted_messages, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") - # Prepare completion parameters - completion_params = self._prepare_completion_params( - formatted_messages, system_message, tools - ) + # Prepare completion parameters + completion_params = self._prepare_completion_params( + formatted_messages, system_message, tools + ) - # Handle streaming vs non-streaming - if self.stream: - return self._handle_streaming_completion( + # Handle streaming vs non-streaming + if self.stream: + return self._handle_streaming_completion( + completion_params, + available_functions, + from_task, + from_agent, + response_model, + ) + + return self._handle_completion( completion_params, available_functions, from_task, @@ -241,21 +252,13 @@ class AnthropicCompletion(BaseLLM): response_model, ) - return self._handle_completion( - completion_params, - available_functions, - from_task, - from_agent, - response_model, - ) - - except Exception as e: - error_msg = f"Anthropic API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except Exception as e: + error_msg = f"Anthropic API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise async def acall( self, @@ -280,26 +283,35 @@ class AnthropicCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - formatted_messages, system_message = self._format_messages_for_anthropic( - messages - ) + formatted_messages, system_message = ( + self._format_messages_for_anthropic(messages) + ) - completion_params = self._prepare_completion_params( - formatted_messages, system_message, tools - ) + completion_params = self._prepare_completion_params( + formatted_messages, system_message, tools + ) - if self.stream: - return await self._ahandle_streaming_completion( + if self.stream: + return await self._ahandle_streaming_completion( + completion_params, + available_functions, + from_task, + from_agent, + response_model, + ) + + return await self._ahandle_completion( completion_params, available_functions, from_task, @@ -307,21 +319,13 @@ class AnthropicCompletion(BaseLLM): response_model, ) - return await self._ahandle_completion( - completion_params, - available_functions, - from_task, - from_agent, - response_model, - ) - - except Exception as e: - error_msg = f"Anthropic API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except Exception as e: + error_msg = f"Anthropic API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise def _prepare_completion_params( self, @@ -704,7 +708,7 @@ class AnthropicCompletion(BaseLLM): for event in stream: if hasattr(event, "message") and hasattr(event.message, "id"): response_id = event.message.id - + if hasattr(event, "delta") and hasattr(event.delta, "text"): text_delta = event.delta.text full_response += text_delta @@ -712,7 +716,7 @@ class AnthropicCompletion(BaseLLM): chunk=text_delta, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if event.type == "content_block_start": @@ -739,7 +743,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) elif event.type == "content_block_delta": if event.delta.type == "input_json_delta": @@ -763,7 +767,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) final_message: Message = stream.get_final_message() @@ -1133,7 +1137,7 @@ class AnthropicCompletion(BaseLLM): chunk=text_delta, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if event.type == "content_block_start": @@ -1160,7 +1164,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) elif event.type == "content_block_delta": if event.delta.type == "input_json_delta": @@ -1184,7 +1188,7 @@ class AnthropicCompletion(BaseLLM): "index": block_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) final_message: Message = await stream.get_final_message() diff --git a/lib/crewai/src/crewai/llms/providers/azure/completion.py b/lib/crewai/src/crewai/llms/providers/azure/completion.py index a3aed7f4b..5caf5e78a 100644 --- a/lib/crewai/src/crewai/llms/providers/azure/completion.py +++ b/lib/crewai/src/crewai/llms/providers/azure/completion.py @@ -43,7 +43,7 @@ try: ) from crewai.events.types.llm_events import LLMCallType - from crewai.llms.base_llm import BaseLLM + from crewai.llms.base_llm import BaseLLM, llm_call_context except ImportError: raise ImportError( @@ -288,31 +288,42 @@ class AzureCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - # Emit call started event - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + # Emit call started event + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - # Format messages for Azure - formatted_messages = self._format_messages_for_azure(messages) + # Format messages for Azure + formatted_messages = self._format_messages_for_azure(messages) - if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks( + formatted_messages, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") - # Prepare completion parameters - completion_params = self._prepare_completion_params( - formatted_messages, tools, response_model - ) + # Prepare completion parameters + completion_params = self._prepare_completion_params( + formatted_messages, tools, response_model + ) - # Handle streaming vs non-streaming - if self.stream: - return self._handle_streaming_completion( + # Handle streaming vs non-streaming + if self.stream: + return self._handle_streaming_completion( + completion_params, + available_functions, + from_task, + from_agent, + response_model, + ) + + return self._handle_completion( completion_params, available_functions, from_task, @@ -320,16 +331,8 @@ class AzureCompletion(BaseLLM): response_model, ) - return self._handle_completion( - completion_params, - available_functions, - from_task, - from_agent, - response_model, - ) - - except Exception as e: - return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value] + except Exception as e: + return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value] async def acall( # type: ignore[return] self, @@ -355,24 +358,33 @@ class AzureCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - formatted_messages = self._format_messages_for_azure(messages) + formatted_messages = self._format_messages_for_azure(messages) - completion_params = self._prepare_completion_params( - formatted_messages, tools, response_model - ) + completion_params = self._prepare_completion_params( + formatted_messages, tools, response_model + ) - if self.stream: - return await self._ahandle_streaming_completion( + if self.stream: + return await self._ahandle_streaming_completion( + completion_params, + available_functions, + from_task, + from_agent, + response_model, + ) + + return await self._ahandle_completion( completion_params, available_functions, from_task, @@ -380,16 +392,8 @@ class AzureCompletion(BaseLLM): response_model, ) - return await self._ahandle_completion( - completion_params, - available_functions, - from_task, - from_agent, - response_model, - ) - - except Exception as e: - self._handle_api_error(e, from_task, from_agent) + except Exception as e: + self._handle_api_error(e, from_task, from_agent) def _prepare_completion_params( self, @@ -726,7 +730,7 @@ class AzureCompletion(BaseLLM): """ if update.choices: choice = update.choices[0] - response_id = update.id if hasattr(update,"id") else None + response_id = update.id if hasattr(update, "id") else None if choice.delta and choice.delta.content: content_delta = choice.delta.content full_response += content_delta @@ -734,7 +738,7 @@ class AzureCompletion(BaseLLM): chunk=content_delta, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if choice.delta and choice.delta.tool_calls: @@ -769,7 +773,7 @@ class AzureCompletion(BaseLLM): "index": idx, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) return full_response diff --git a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py index 73fefc478..4f85cd02e 100644 --- a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py +++ b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py @@ -11,7 +11,7 @@ from pydantic import BaseModel from typing_extensions import Required from crewai.events.types.llm_events import LLMCallType -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -299,77 +299,89 @@ class BedrockCompletion(BaseLLM): response_model: type[BaseModel] | None = None, ) -> str | Any: """Call AWS Bedrock Converse API.""" - try: - # Emit call started event - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) - - # Format messages for Converse API - formatted_messages, system_message = self._format_messages_for_converse( - messages - ) - - if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") - - # Prepare request body - body: BedrockConverseRequestBody = { - "inferenceConfig": self._get_inference_config(), - } - - # Add system message if present - if system_message: - body["system"] = cast( - "list[SystemContentBlockTypeDef]", - cast(object, [{"text": system_message}]), + with llm_call_context(): + try: + # Emit call started event + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, ) - # Add tool config if present or if messages contain tool content - # Bedrock requires toolConfig when messages have toolUse/toolResult - if tools: - tool_config: ToolConfigurationTypeDef = { - "tools": cast( - "Sequence[ToolTypeDef]", - cast(object, self._format_tools_for_converse(tools)), - ) + # Format messages for Converse API + formatted_messages, system_message = self._format_messages_for_converse( + messages + ) + + if not self._invoke_before_llm_call_hooks( + formatted_messages, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") + + # Prepare request body + body: BedrockConverseRequestBody = { + "inferenceConfig": self._get_inference_config(), } - body["toolConfig"] = tool_config - elif self._messages_contain_tool_content(formatted_messages): - # Create minimal toolConfig from tool history in messages - tools_from_history = self._extract_tools_from_message_history( - formatted_messages - ) - if tools_from_history: - body["toolConfig"] = cast( - "ToolConfigurationTypeDef", - cast(object, {"tools": tools_from_history}), + + # Add system message if present + if system_message: + body["system"] = cast( + "list[SystemContentBlockTypeDef]", + cast(object, [{"text": system_message}]), ) - # Add optional advanced features if configured - if self.guardrail_config: - guardrail_config: GuardrailConfigurationTypeDef = cast( - "GuardrailConfigurationTypeDef", cast(object, self.guardrail_config) - ) - body["guardrailConfig"] = guardrail_config + # Add tool config if present or if messages contain tool content + # Bedrock requires toolConfig when messages have toolUse/toolResult + if tools: + tool_config: ToolConfigurationTypeDef = { + "tools": cast( + "Sequence[ToolTypeDef]", + cast(object, self._format_tools_for_converse(tools)), + ) + } + body["toolConfig"] = tool_config + elif self._messages_contain_tool_content(formatted_messages): + # Create minimal toolConfig from tool history in messages + tools_from_history = self._extract_tools_from_message_history( + formatted_messages + ) + if tools_from_history: + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast(object, {"tools": tools_from_history}), + ) - if self.additional_model_request_fields: - body["additionalModelRequestFields"] = ( - self.additional_model_request_fields - ) + # Add optional advanced features if configured + if self.guardrail_config: + guardrail_config: GuardrailConfigurationTypeDef = cast( + "GuardrailConfigurationTypeDef", + cast(object, self.guardrail_config), + ) + body["guardrailConfig"] = guardrail_config - if self.additional_model_response_field_paths: - body["additionalModelResponseFieldPaths"] = ( - self.additional_model_response_field_paths - ) + if self.additional_model_request_fields: + body["additionalModelRequestFields"] = ( + self.additional_model_request_fields + ) - if self.stream: - return self._handle_streaming_converse( + if self.additional_model_response_field_paths: + body["additionalModelResponseFieldPaths"] = ( + self.additional_model_response_field_paths + ) + + if self.stream: + return self._handle_streaming_converse( + formatted_messages, + body, + available_functions, + from_task, + from_agent, + ) + + return self._handle_converse( formatted_messages, body, available_functions, @@ -377,25 +389,17 @@ class BedrockCompletion(BaseLLM): from_agent, ) - return self._handle_converse( - formatted_messages, - body, - available_functions, - from_task, - from_agent, - ) + except Exception as e: + if is_context_length_exceeded(e): + logging.error(f"Context window exceeded: {e}") + raise LLMContextLengthExceededError(str(e)) from e - except Exception as e: - if is_context_length_exceeded(e): - logging.error(f"Context window exceeded: {e}") - raise LLMContextLengthExceededError(str(e)) from e - - error_msg = f"AWS Bedrock API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + error_msg = f"AWS Bedrock API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise async def acall( self, @@ -431,87 +435,93 @@ class BedrockCompletion(BaseLLM): 'Install with: uv add "crewai[bedrock-async]"' ) - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) - - formatted_messages, system_message = self._format_messages_for_converse( - messages - ) - - body: BedrockConverseRequestBody = { - "inferenceConfig": self._get_inference_config(), - } - - if system_message: - body["system"] = cast( - "list[SystemContentBlockTypeDef]", - cast(object, [{"text": system_message}]), + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, ) - # Add tool config if present or if messages contain tool content - # Bedrock requires toolConfig when messages have toolUse/toolResult - if tools: - tool_config: ToolConfigurationTypeDef = { - "tools": cast( - "Sequence[ToolTypeDef]", - cast(object, self._format_tools_for_converse(tools)), - ) + formatted_messages, system_message = self._format_messages_for_converse( + messages + ) + + body: BedrockConverseRequestBody = { + "inferenceConfig": self._get_inference_config(), } - body["toolConfig"] = tool_config - elif self._messages_contain_tool_content(formatted_messages): - # Create minimal toolConfig from tool history in messages - tools_from_history = self._extract_tools_from_message_history( - formatted_messages - ) - if tools_from_history: - body["toolConfig"] = cast( - "ToolConfigurationTypeDef", - cast(object, {"tools": tools_from_history}), + + if system_message: + body["system"] = cast( + "list[SystemContentBlockTypeDef]", + cast(object, [{"text": system_message}]), ) - if self.guardrail_config: - guardrail_config: GuardrailConfigurationTypeDef = cast( - "GuardrailConfigurationTypeDef", cast(object, self.guardrail_config) - ) - body["guardrailConfig"] = guardrail_config + # Add tool config if present or if messages contain tool content + # Bedrock requires toolConfig when messages have toolUse/toolResult + if tools: + tool_config: ToolConfigurationTypeDef = { + "tools": cast( + "Sequence[ToolTypeDef]", + cast(object, self._format_tools_for_converse(tools)), + ) + } + body["toolConfig"] = tool_config + elif self._messages_contain_tool_content(formatted_messages): + # Create minimal toolConfig from tool history in messages + tools_from_history = self._extract_tools_from_message_history( + formatted_messages + ) + if tools_from_history: + body["toolConfig"] = cast( + "ToolConfigurationTypeDef", + cast(object, {"tools": tools_from_history}), + ) - if self.additional_model_request_fields: - body["additionalModelRequestFields"] = ( - self.additional_model_request_fields - ) + if self.guardrail_config: + guardrail_config: GuardrailConfigurationTypeDef = cast( + "GuardrailConfigurationTypeDef", + cast(object, self.guardrail_config), + ) + body["guardrailConfig"] = guardrail_config - if self.additional_model_response_field_paths: - body["additionalModelResponseFieldPaths"] = ( - self.additional_model_response_field_paths - ) + if self.additional_model_request_fields: + body["additionalModelRequestFields"] = ( + self.additional_model_request_fields + ) - if self.stream: - return await self._ahandle_streaming_converse( + if self.additional_model_response_field_paths: + body["additionalModelResponseFieldPaths"] = ( + self.additional_model_response_field_paths + ) + + if self.stream: + return await self._ahandle_streaming_converse( + formatted_messages, + body, + available_functions, + from_task, + from_agent, + ) + + return await self._ahandle_converse( formatted_messages, body, available_functions, from_task, from_agent ) - return await self._ahandle_converse( - formatted_messages, body, available_functions, from_task, from_agent - ) + except Exception as e: + if is_context_length_exceeded(e): + logging.error(f"Context window exceeded: {e}") + raise LLMContextLengthExceededError(str(e)) from e - except Exception as e: - if is_context_length_exceeded(e): - logging.error(f"Context window exceeded: {e}") - raise LLMContextLengthExceededError(str(e)) from e - - error_msg = f"AWS Bedrock API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + error_msg = f"AWS Bedrock API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise def _handle_converse( self, @@ -805,7 +815,7 @@ class BedrockCompletion(BaseLLM): "index": tool_use_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) elif "contentBlockStop" in event: logging.debug("Content block stopped in stream") @@ -1174,7 +1184,7 @@ class BedrockCompletion(BaseLLM): chunk=text_chunk, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) elif "toolUse" in delta and current_tool_use: tool_input = delta["toolUse"].get("input", "") diff --git a/lib/crewai/src/crewai/llms/providers/gemini/completion.py b/lib/crewai/src/crewai/llms/providers/gemini/completion.py index 9687f3d4f..1ec6ab167 100644 --- a/lib/crewai/src/crewai/llms/providers/gemini/completion.py +++ b/lib/crewai/src/crewai/llms/providers/gemini/completion.py @@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Literal, cast from pydantic import BaseModel from crewai.events.types.llm_events import LLMCallType -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -282,32 +282,44 @@ class GeminiCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) - self.tools = tools + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + self.tools = tools - formatted_content, system_instruction = self._format_messages_for_gemini( - messages - ) + formatted_content, system_instruction = ( + self._format_messages_for_gemini(messages) + ) - messages_for_hooks = self._convert_contents_to_dict(formatted_content) + messages_for_hooks = self._convert_contents_to_dict(formatted_content) - if not self._invoke_before_llm_call_hooks(messages_for_hooks, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks( + messages_for_hooks, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") - config = self._prepare_generation_config( - system_instruction, tools, response_model - ) + config = self._prepare_generation_config( + system_instruction, tools, response_model + ) - if self.stream: - return self._handle_streaming_completion( + if self.stream: + return self._handle_streaming_completion( + formatted_content, + config, + available_functions, + from_task, + from_agent, + response_model, + ) + + return self._handle_completion( formatted_content, config, available_functions, @@ -316,29 +328,20 @@ class GeminiCompletion(BaseLLM): response_model, ) - return self._handle_completion( - formatted_content, - config, - available_functions, - from_task, - from_agent, - response_model, - ) - - except APIError as e: - error_msg = f"Google Gemini API error: {e.code} - {e.message}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise - except Exception as e: - error_msg = f"Google Gemini API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except APIError as e: + error_msg = f"Google Gemini API error: {e.code} - {e.message}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise + except Exception as e: + error_msg = f"Google Gemini API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise async def acall( self, @@ -364,27 +367,37 @@ class GeminiCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) - self.tools = tools + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) + self.tools = tools - formatted_content, system_instruction = self._format_messages_for_gemini( - messages - ) + formatted_content, system_instruction = ( + self._format_messages_for_gemini(messages) + ) - config = self._prepare_generation_config( - system_instruction, tools, response_model - ) + config = self._prepare_generation_config( + system_instruction, tools, response_model + ) - if self.stream: - return await self._ahandle_streaming_completion( + if self.stream: + return await self._ahandle_streaming_completion( + formatted_content, + config, + available_functions, + from_task, + from_agent, + response_model, + ) + + return await self._ahandle_completion( formatted_content, config, available_functions, @@ -393,29 +406,20 @@ class GeminiCompletion(BaseLLM): response_model, ) - return await self._ahandle_completion( - formatted_content, - config, - available_functions, - from_task, - from_agent, - response_model, - ) - - except APIError as e: - error_msg = f"Google Gemini API error: {e.code} - {e.message}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise - except Exception as e: - error_msg = f"Google Gemini API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except APIError as e: + error_msg = f"Google Gemini API error: {e.code} - {e.message}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise + except Exception as e: + error_msg = f"Google Gemini API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise def _prepare_generation_config( self, @@ -790,7 +794,7 @@ class GeminiCompletion(BaseLLM): Returns: Tuple of (updated full_response, updated function_calls, updated usage_data) """ - response_id=chunk.response_id if hasattr(chunk,"response_id") else None + response_id = chunk.response_id if hasattr(chunk, "response_id") else None if chunk.usage_metadata: usage_data = self._extract_token_usage(chunk) @@ -800,7 +804,7 @@ class GeminiCompletion(BaseLLM): chunk=chunk.text, from_task=from_task, from_agent=from_agent, - response_id=response_id + response_id=response_id, ) if chunk.candidates: @@ -837,7 +841,7 @@ class GeminiCompletion(BaseLLM): "index": call_index, }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id + response_id=response_id, ) return full_response, function_calls, usage_data diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index 56a6fa2cb..28a1caec0 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -17,7 +17,7 @@ from openai.types.responses import Response from pydantic import BaseModel from crewai.events.types.llm_events import LLMCallType -from crewai.llms.base_llm import BaseLLM +from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( @@ -382,23 +382,35 @@ class OpenAICompletion(BaseLLM): Returns: Completion response or tool call result. """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - formatted_messages = self._format_messages(messages) + formatted_messages = self._format_messages(messages) - if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): - raise ValueError("LLM call blocked by before_llm_call hook") + if not self._invoke_before_llm_call_hooks( + formatted_messages, from_agent + ): + raise ValueError("LLM call blocked by before_llm_call hook") - if self.api == "responses": - return self._call_responses( + if self.api == "responses": + return self._call_responses( + messages=formatted_messages, + tools=tools, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + return self._call_completions( messages=formatted_messages, tools=tools, available_functions=available_functions, @@ -407,22 +419,13 @@ class OpenAICompletion(BaseLLM): response_model=response_model, ) - return self._call_completions( - messages=formatted_messages, - tools=tools, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - - except Exception as e: - error_msg = f"OpenAI API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except Exception as e: + error_msg = f"OpenAI API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise def _call_completions( self, @@ -479,20 +482,30 @@ class OpenAICompletion(BaseLLM): Returns: Completion response or tool call result. """ - try: - self._emit_call_started_event( - messages=messages, - tools=tools, - callbacks=callbacks, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - ) + with llm_call_context(): + try: + self._emit_call_started_event( + messages=messages, + tools=tools, + callbacks=callbacks, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + ) - formatted_messages = self._format_messages(messages) + formatted_messages = self._format_messages(messages) - if self.api == "responses": - return await self._acall_responses( + if self.api == "responses": + return await self._acall_responses( + messages=formatted_messages, + tools=tools, + available_functions=available_functions, + from_task=from_task, + from_agent=from_agent, + response_model=response_model, + ) + + return await self._acall_completions( messages=formatted_messages, tools=tools, available_functions=available_functions, @@ -501,22 +514,13 @@ class OpenAICompletion(BaseLLM): response_model=response_model, ) - return await self._acall_completions( - messages=formatted_messages, - tools=tools, - available_functions=available_functions, - from_task=from_task, - from_agent=from_agent, - response_model=response_model, - ) - - except Exception as e: - error_msg = f"OpenAI API call failed: {e!s}" - logging.error(error_msg) - self._emit_call_failed_event( - error=error_msg, from_task=from_task, from_agent=from_agent - ) - raise + except Exception as e: + error_msg = f"OpenAI API call failed: {e!s}" + logging.error(error_msg) + self._emit_call_failed_event( + error=error_msg, from_task=from_task, from_agent=from_agent + ) + raise async def _acall_completions( self, @@ -1060,7 +1064,7 @@ class OpenAICompletion(BaseLLM): chunk=delta_text, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) elif event.type == "response.function_call_arguments.delta": @@ -1709,7 +1713,7 @@ class OpenAICompletion(BaseLLM): **parse_params, response_format=response_model ) as stream: for chunk in stream: - response_id_stream=chunk.id if hasattr(chunk,"id") else None + response_id_stream = chunk.id if hasattr(chunk, "id") else None if chunk.type == "content.delta": delta_content = chunk.delta @@ -1718,7 +1722,7 @@ class OpenAICompletion(BaseLLM): chunk=delta_content, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) final_completion = stream.get_final_completion() @@ -1748,7 +1752,9 @@ class OpenAICompletion(BaseLLM): usage_data = {"total_tokens": 0} for completion_chunk in completion_stream: - response_id_stream=completion_chunk.id if hasattr(completion_chunk,"id") else None + response_id_stream = ( + completion_chunk.id if hasattr(completion_chunk, "id") else None + ) if hasattr(completion_chunk, "usage") and completion_chunk.usage: usage_data = self._extract_openai_token_usage(completion_chunk) @@ -1766,7 +1772,7 @@ class OpenAICompletion(BaseLLM): chunk=chunk_delta.content, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) if chunk_delta.tool_calls: @@ -1805,7 +1811,7 @@ class OpenAICompletion(BaseLLM): "index": tool_calls[tool_index]["index"], }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id_stream + response_id=response_id_stream, ) self._track_token_usage_internal(usage_data) @@ -2017,7 +2023,7 @@ class OpenAICompletion(BaseLLM): accumulated_content = "" usage_data = {"total_tokens": 0} async for chunk in completion_stream: - response_id_stream=chunk.id if hasattr(chunk,"id") else None + response_id_stream = chunk.id if hasattr(chunk, "id") else None if hasattr(chunk, "usage") and chunk.usage: usage_data = self._extract_openai_token_usage(chunk) @@ -2035,7 +2041,7 @@ class OpenAICompletion(BaseLLM): chunk=delta.content, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) self._track_token_usage_internal(usage_data) @@ -2071,7 +2077,7 @@ class OpenAICompletion(BaseLLM): usage_data = {"total_tokens": 0} async for chunk in stream: - response_id_stream=chunk.id if hasattr(chunk,"id") else None + response_id_stream = chunk.id if hasattr(chunk, "id") else None if hasattr(chunk, "usage") and chunk.usage: usage_data = self._extract_openai_token_usage(chunk) @@ -2089,7 +2095,7 @@ class OpenAICompletion(BaseLLM): chunk=chunk_delta.content, from_task=from_task, from_agent=from_agent, - response_id=response_id_stream + response_id=response_id_stream, ) if chunk_delta.tool_calls: @@ -2128,7 +2134,7 @@ class OpenAICompletion(BaseLLM): "index": tool_calls[tool_index]["index"], }, call_type=LLMCallType.TOOL_CALL, - response_id=response_id_stream + response_id=response_id_stream, ) self._track_token_usage_internal(usage_data) diff --git a/lib/crewai/tests/cassettes/utilities/test_llm_call_events_share_call_id.yaml b/lib/crewai/tests/cassettes/utilities/test_llm_call_events_share_call_id.yaml new file mode 100644 index 000000000..2370a9d04 --- /dev/null +++ b/lib/crewai/tests/cassettes/utilities/test_llm_call_events_share_call_id.yaml @@ -0,0 +1,108 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '71' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.0 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-D2HpUSxS5LeHwDTELElWlC5CDMzmr\",\n \"object\": + \"chat.completion\",\n \"created\": 1769437564,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Hi there! How can I assist you today?\",\n + \ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": + null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": + 9,\n \"completion_tokens\": 10,\n \"total_tokens\": 19,\n \"prompt_tokens_details\": + {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": + \"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n" + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Mon, 26 Jan 2026 14:26:05 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '460' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '477' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/utilities/test_separate_llm_calls_have_different_call_ids.yaml b/lib/crewai/tests/cassettes/utilities/test_separate_llm_calls_have_different_call_ids.yaml new file mode 100644 index 000000000..419c5e006 --- /dev/null +++ b/lib/crewai/tests/cassettes/utilities/test_separate_llm_calls_have_different_call_ids.yaml @@ -0,0 +1,215 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '71' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.0 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-D2HpStmyOpe9DrthWBlDdMZfVMJ1u\",\n \"object\": + \"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Hi! How can I assist you today?\",\n + \ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": + null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": + 9,\n \"completion_tokens\": 9,\n \"total_tokens\": 18,\n \"prompt_tokens_details\": + {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": + \"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n" + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Mon, 26 Jan 2026 14:26:02 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '415' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '434' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +- request: + body: '{"messages":[{"role":"user","content":"Say bye"}],"model":"gpt-4o-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '72' + content-type: + - application/json + cookie: + - COOKIE-XXX + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.0 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: "{\n \"id\": \"chatcmpl-D2HpS1DP0Xd3tmWt5PBincVrdU7yw\",\n \"object\": + \"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": + \"assistant\",\n \"content\": \"Goodbye! If you have more questions + in the future, feel free to reach out. Have a great day!\",\n \"refusal\": + null,\n \"annotations\": []\n },\n \"logprobs\": null,\n + \ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": + 9,\n \"completion_tokens\": 23,\n \"total_tokens\": 32,\n \"prompt_tokens_details\": + {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": + {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": + 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": + \"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n" + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Mon, 26 Jan 2026 14:26:03 GMT + Server: + - cloudflare + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '964' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '979' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/utilities/test_streaming_chunks_share_call_id_with_call.yaml b/lib/crewai/tests/cassettes/utilities/test_streaming_chunks_share_call_id_with_call.yaml new file mode 100644 index 000000000..7b04d21a3 --- /dev/null +++ b/lib/crewai/tests/cassettes/utilities/test_streaming_chunks_share_call_id_with_call.yaml @@ -0,0 +1,143 @@ +interactions: +- request: + body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini","stream":true,"stream_options":{"include_usage":true}}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '125' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.13.0 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: 'data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"rVIyGQF2E"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ZGVqV7ZDm"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"vnfm7IxlIB"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + How"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"o8F35ZZ"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + can"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"kiBzGe3"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + I"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"cbGT2RWgx"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + assist"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"DtxR"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + you"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"6y6Co8J"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":" + today"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SZOmm"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"s9Bc0HqlPg"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"u9aar"} + + + data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":9,"completion_tokens":9,"total_tokens":18,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"5hudm8ySqh39"} + + + data: [DONE] + + + ' + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Mon, 26 Jan 2026 14:26:04 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '260' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '275' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/test_streaming.py b/lib/crewai/tests/test_streaming.py index 5860755ff..8eb63694e 100644 --- a/lib/crewai/tests/test_streaming.py +++ b/lib/crewai/tests/test_streaming.py @@ -217,6 +217,7 @@ class TestCrewKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Hello ", + call_id="test-call-id", ), ) crewai_event_bus.emit( @@ -224,6 +225,7 @@ class TestCrewKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="World!", + call_id="test-call-id", ), ) return mock_output @@ -284,6 +286,7 @@ class TestCrewKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="", + call_id="test-call-id", tool_call=ToolCall( id="call-123", function=FunctionCall( @@ -364,6 +367,7 @@ class TestCrewKickoffStreamingAsync: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Async ", + call_id="test-call-id", ), ) crewai_event_bus.emit( @@ -371,6 +375,7 @@ class TestCrewKickoffStreamingAsync: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Stream!", + call_id="test-call-id", ), ) return mock_output @@ -451,6 +456,7 @@ class TestFlowKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Flow ", + call_id="test-call-id", ), ) crewai_event_bus.emit( @@ -458,6 +464,7 @@ class TestFlowKickoffStreaming: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="output!", + call_id="test-call-id", ), ) return "done" @@ -545,6 +552,7 @@ class TestFlowKickoffStreamingAsync: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="Async flow ", + call_id="test-call-id", ), ) await asyncio.sleep(0.01) @@ -553,6 +561,7 @@ class TestFlowKickoffStreamingAsync: LLMStreamChunkEvent( type="llm_stream_chunk", chunk="stream!", + call_id="test-call-id", ), ) await asyncio.sleep(0.01) @@ -686,6 +695,7 @@ class TestStreamingEdgeCases: type="llm_stream_chunk", chunk="Task 1", task_name="First task", + call_id="test-call-id", ), ) return mock_output diff --git a/lib/crewai/tests/utilities/test_events.py b/lib/crewai/tests/utilities/test_events.py index 789f1f43e..81ef321d6 100644 --- a/lib/crewai/tests/utilities/test_events.py +++ b/lib/crewai/tests/utilities/test_events.py @@ -984,8 +984,8 @@ def test_streaming_fallback_to_non_streaming(): def mock_call(messages, tools=None, callbacks=None, available_functions=None): nonlocal fallback_called # Emit a couple of chunks to simulate partial streaming - crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id = "Id")) - crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id = "Id")) + crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id="Id", call_id="test-call-id")) + crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id="Id", call_id="test-call-id")) # Mark that fallback would be called fallback_called = True @@ -1041,7 +1041,7 @@ def test_streaming_empty_response_handling(): def mock_call(messages, tools=None, callbacks=None, available_functions=None): # Emit a few empty chunks for _ in range(3): - crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="",response_id="id")) + crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="", response_id="id", call_id="test-call-id")) # Return the default message for empty responses return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request." @@ -1280,6 +1280,105 @@ def test_llm_emits_event_with_lite_agent(): assert set(all_agent_id) == {str(agent.id)} +# ----------- CALL_ID CORRELATION TESTS ----------- + + +@pytest.mark.vcr() +def test_llm_call_events_share_call_id(): + """All events from a single LLM call should share the same call_id.""" + import uuid + + events = [] + condition = threading.Condition() + + @crewai_event_bus.on(LLMCallStartedEvent) + def on_start(source, event): + with condition: + events.append(event) + condition.notify() + + @crewai_event_bus.on(LLMCallCompletedEvent) + def on_complete(source, event): + with condition: + events.append(event) + condition.notify() + + llm = LLM(model="gpt-4o-mini") + llm.call("Say hi") + + with condition: + success = condition.wait_for(lambda: len(events) >= 2, timeout=10) + assert success, "Timeout waiting for LLM events" + + # Behavior: all events from the call share the same call_id + assert len(events) == 2 + assert events[0].call_id == events[1].call_id + # call_id should be a valid UUID + uuid.UUID(events[0].call_id) + + +@pytest.mark.vcr() +def test_streaming_chunks_share_call_id_with_call(): + """Streaming chunks should share call_id with started/completed events.""" + events = [] + condition = threading.Condition() + + @crewai_event_bus.on(LLMCallStartedEvent) + def on_start(source, event): + with condition: + events.append(event) + condition.notify() + + @crewai_event_bus.on(LLMStreamChunkEvent) + def on_chunk(source, event): + with condition: + events.append(event) + condition.notify() + + @crewai_event_bus.on(LLMCallCompletedEvent) + def on_complete(source, event): + with condition: + events.append(event) + condition.notify() + + llm = LLM(model="gpt-4o-mini", stream=True) + llm.call("Say hi") + + with condition: + # Wait for at least started, some chunks, and completed + success = condition.wait_for(lambda: len(events) >= 3, timeout=10) + assert success, "Timeout waiting for streaming events" + + # Behavior: all events (started, chunks, completed) share the same call_id + call_ids = {e.call_id for e in events} + assert len(call_ids) == 1 + + +@pytest.mark.vcr() +def test_separate_llm_calls_have_different_call_ids(): + """Different LLM calls should have different call_ids.""" + call_ids = [] + condition = threading.Condition() + + @crewai_event_bus.on(LLMCallStartedEvent) + def on_start(source, event): + with condition: + call_ids.append(event.call_id) + condition.notify() + + llm = LLM(model="gpt-4o-mini") + llm.call("Say hi") + llm.call("Say bye") + + with condition: + success = condition.wait_for(lambda: len(call_ids) >= 2, timeout=10) + assert success, "Timeout waiting for LLM call events" + + # Behavior: each call has its own call_id + assert len(call_ids) == 2 + assert call_ids[0] != call_ids[1] + + # ----------- HUMAN FEEDBACK EVENTS -----------