Compare commits

...

1 Commits

Author SHA1 Message Date
Vinicius Brasil
d4b28287d3 Add call_id to LLM events for correlating requests
When monitoring LLM events, consumers need to know which events belong
to the same API call. Before this change, there was no way to correlate
LLMCallStartedEvent, LLMStreamChunkEvent, and LLMCallCompletedEvent
belonging to the same request.
2026-01-26 16:16:42 +01:00
13 changed files with 1296 additions and 628 deletions

View File

@@ -10,6 +10,7 @@ class LLMEventBase(BaseEvent):
from_task: Any | None = None from_task: Any | None = None
from_agent: Any | None = None from_agent: Any | None = None
model: str | None = None model: str | None = None
call_id: str
def __init__(self, **data: Any) -> None: def __init__(self, **data: Any) -> None:
if data.get("from_task"): if data.get("from_task"):

View File

@@ -37,7 +37,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent, ToolUsageFinishedEvent,
ToolUsageStartedEvent, ToolUsageStartedEvent,
) )
from crewai.llms.base_llm import BaseLLM from crewai.llms.base_llm import BaseLLM, get_current_call_id, llm_call_context
from crewai.llms.constants import ( from crewai.llms.constants import (
ANTHROPIC_MODELS, ANTHROPIC_MODELS,
AZURE_MODELS, AZURE_MODELS,
@@ -770,7 +770,7 @@ class LLM(BaseLLM):
chunk_content = None chunk_content = None
response_id = None response_id = None
if hasattr(chunk,'id'): if hasattr(chunk, "id"):
response_id = chunk.id response_id = chunk.id
# Safely extract content from various chunk formats # Safely extract content from various chunk formats
@@ -827,7 +827,7 @@ class LLM(BaseLLM):
available_functions=available_functions, available_functions=available_functions,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id response_id=response_id,
) )
if result is not None: if result is not None:
@@ -849,7 +849,8 @@ class LLM(BaseLLM):
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
call_type=LLMCallType.LLM_CALL, call_type=LLMCallType.LLM_CALL,
response_id=response_id response_id=response_id,
call_id=get_current_call_id(),
), ),
) )
# --- 4) Fallback to non-streaming if no content received # --- 4) Fallback to non-streaming if no content received
@@ -1015,7 +1016,10 @@ class LLM(BaseLLM):
crewai_event_bus.emit( crewai_event_bus.emit(
self, self,
event=LLMCallFailedEvent( event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
), ),
) )
raise Exception(f"Failed to get streaming response: {e!s}") from e raise Exception(f"Failed to get streaming response: {e!s}") from e
@@ -1048,7 +1052,8 @@ class LLM(BaseLLM):
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id response_id=response_id,
call_id=get_current_call_id(),
), ),
) )
@@ -1476,7 +1481,8 @@ class LLM(BaseLLM):
chunk=chunk_content, chunk=chunk_content,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id response_id=response_id,
call_id=get_current_call_id(),
), ),
) )
@@ -1619,7 +1625,12 @@ class LLM(BaseLLM):
logging.error(f"Error executing function '{function_name}': {e}") logging.error(f"Error executing function '{function_name}': {e}")
crewai_event_bus.emit( crewai_event_bus.emit(
self, self,
event=LLMCallFailedEvent(error=f"Tool execution error: {e!s}"), event=LLMCallFailedEvent(
error=f"Tool execution error: {e!s}",
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
) )
crewai_event_bus.emit( crewai_event_bus.emit(
self, self,
@@ -1669,108 +1680,117 @@ class LLM(BaseLLM):
ValueError: If response format is not supported ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit LLMContextLengthExceededError: If input exceeds model's context limit
""" """
crewai_event_bus.emit( with llm_call_context() as call_id:
self, crewai_event_bus.emit(
event=LLMCallStartedEvent( self,
messages=messages, event=LLMCallStartedEvent(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
model=self.model, from_agent=from_agent,
), model=self.model,
) call_id=call_id,
),
)
# --- 2) Validate parameters before proceeding with the call # --- 2) Validate parameters before proceeding with the call
self._validate_call_params() self._validate_call_params()
# --- 3) Convert string messages to proper format if needed # --- 3) Convert string messages to proper format if needed
if isinstance(messages, str): if isinstance(messages, str):
messages = [{"role": "user", "content": messages}] messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported) # --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower(): if "o1" in self.model.lower():
for message in messages: for message in messages:
if message.get("role") == "system": if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant" msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role message["role"] = msg_role
if not self._invoke_before_llm_call_hooks(messages, from_agent): if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook") raise ValueError("LLM call blocked by before_llm_call hook")
# --- 5) Set up callbacks if provided # --- 5) Set up callbacks if provided
with suppress_warnings(): with suppress_warnings():
if callbacks and len(callbacks) > 0: if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks) self.set_callbacks(callbacks)
try: try:
# --- 6) Prepare parameters for the completion call # --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools) params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response # --- 7) Make the completion call and handle response
if self.stream: if self.stream:
result = self._handle_streaming_response( result = self._handle_streaming_response(
params=params, params=params,
callbacks=callbacks, callbacks=callbacks,
available_functions=available_functions, available_functions=available_functions,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_model=response_model, response_model=response_model,
)
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
) )
):
self.additional_params["additional_drop_params"].append("stop")
else: else:
self.additional_params = {"additional_drop_params": ["stop"]} result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
logging.info("Retrying LLM call without the unsupported 'stop'") if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return self.call( return result
messages, except LLMContextLengthExceededError:
tools=tools, # Re-raise LLMContextLengthExceededError as it should be handled
callbacks=callbacks, # by the CrewAgentExecutor._invoke_loop method, which can then decide
available_functions=available_functions, # whether to summarize the content or abort based on the respect_context_window flag
from_task=from_task, raise
from_agent=from_agent, except Exception as e:
response_model=response_model, unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
) )
raise
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
async def acall( async def acall(
self, self,
@@ -1808,43 +1828,54 @@ class LLM(BaseLLM):
ValueError: If response format is not supported ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit LLMContextLengthExceededError: If input exceeds model's context limit
""" """
crewai_event_bus.emit( with llm_call_context() as call_id:
self, crewai_event_bus.emit(
event=LLMCallStartedEvent( self,
messages=messages, event=LLMCallStartedEvent(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
model=self.model, from_agent=from_agent,
), model=self.model,
) call_id=call_id,
),
)
self._validate_call_params() self._validate_call_params()
if isinstance(messages, str): if isinstance(messages, str):
messages = [{"role": "user", "content": messages}] messages = [{"role": "user", "content": messages}]
# Process file attachments asynchronously before preparing params # Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages) messages = await self._aprocess_message_files(messages)
if "o1" in self.model.lower(): if "o1" in self.model.lower():
for message in messages: for message in messages:
if message.get("role") == "system": if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant" msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role message["role"] = msg_role
with suppress_warnings(): with suppress_warnings():
if callbacks and len(callbacks) > 0: if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks) self.set_callbacks(callbacks)
try: try:
params = self._prepare_completion_params( params = self._prepare_completion_params(
messages, tools, skip_file_processing=True messages, tools, skip_file_processing=True
) )
if self.stream: if self.stream:
return await self._ahandle_streaming_response( return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._ahandle_non_streaming_response(
params=params, params=params,
callbacks=callbacks, callbacks=callbacks,
available_functions=available_functions, available_functions=available_functions,
@@ -1852,52 +1883,50 @@ class LLM(BaseLLM):
from_agent=from_agent, from_agent=from_agent,
response_model=response_model, response_model=response_model,
) )
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
return await self._ahandle_non_streaming_response( if unsupported_stop:
params=params, if (
callbacks=callbacks, "additional_drop_params" in self.additional_params
available_functions=available_functions, and isinstance(
from_task=from_task, self.additional_params["additional_drop_params"], list
from_agent=from_agent, )
response_model=response_model, ):
) self.additional_params["additional_drop_params"].append(
except LLMContextLengthExceededError: "stop"
raise )
except Exception as e: else:
unsupported_stop = "Unsupported parameter" in str( self.additional_params = {
e "additional_drop_params": ["stop"]
) and "'stop'" in str(e) }
if unsupported_stop: logging.info("Retrying LLM call without the unsupported 'stop'")
if (
"additional_drop_params" in self.additional_params return await self.acall(
and isinstance( messages,
self.additional_params["additional_drop_params"], list tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
) )
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info("Retrying LLM call without the unsupported 'stop'") crewai_event_bus.emit(
self,
return await self.acall( event=LLMCallFailedEvent(
messages, error=str(e),
tools=tools, from_task=from_task,
callbacks=callbacks, from_agent=from_agent,
available_functions=available_functions, call_id=get_current_call_id(),
from_task=from_task, ),
from_agent=from_agent,
response_model=response_model,
) )
raise
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
def _handle_emit_call_events( def _handle_emit_call_events(
self, self,
@@ -1925,6 +1954,7 @@ class LLM(BaseLLM):
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
model=self.model, model=self.model,
call_id=get_current_call_id(),
), ),
) )

View File

@@ -7,11 +7,15 @@ in CrewAI, including common functionality for native SDK implementations.
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from datetime import datetime from datetime import datetime
import json import json
import logging import logging
import re import re
from typing import TYPE_CHECKING, Any, Final from typing import TYPE_CHECKING, Any, Final
import uuid
from pydantic import BaseModel from pydantic import BaseModel
@@ -50,6 +54,32 @@ DEFAULT_CONTEXT_WINDOW_SIZE: Final[int] = 4096
DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True
_JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL) _JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL)
_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_call_id", default=None
)
@contextmanager
def llm_call_context() -> Generator[str, None, None]:
"""Context manager that establishes an LLM call scope with a unique call_id."""
call_id = str(uuid.uuid4())
token = _current_call_id.set(call_id)
try:
yield call_id
finally:
_current_call_id.reset(token)
def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
if call_id is None:
logging.warning(
"LLM event emitted outside call context - generating fallback call_id"
)
return str(uuid.uuid4())
return call_id
class BaseLLM(ABC): class BaseLLM(ABC):
"""Abstract base class for LLM implementations. """Abstract base class for LLM implementations.
@@ -351,6 +381,7 @@ class BaseLLM(ABC):
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
model=self.model, model=self.model,
call_id=get_current_call_id(),
), ),
) )
@@ -374,6 +405,7 @@ class BaseLLM(ABC):
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
model=self.model, model=self.model,
call_id=get_current_call_id(),
), ),
) )
@@ -394,6 +426,7 @@ class BaseLLM(ABC):
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
model=self.model, model=self.model,
call_id=get_current_call_id(),
), ),
) )
@@ -404,7 +437,7 @@ class BaseLLM(ABC):
from_agent: Agent | None = None, from_agent: Agent | None = None,
tool_call: dict[str, Any] | None = None, tool_call: dict[str, Any] | None = None,
call_type: LLMCallType | None = None, call_type: LLMCallType | None = None,
response_id: str | None = None response_id: str | None = None,
) -> None: ) -> None:
"""Emit stream chunk event. """Emit stream chunk event.
@@ -427,7 +460,8 @@ class BaseLLM(ABC):
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
call_type=call_type, call_type=call_type,
response_id=response_id response_id=response_id,
call_id=get_current_call_id(),
), ),
) )

View File

@@ -9,7 +9,7 @@ from anthropic.types import ThinkingBlock
from pydantic import BaseModel from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import ( from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -207,33 +207,44 @@ class AnthropicCompletion(BaseLLM):
Returns: Returns:
Chat completion response or tool call result Chat completion response or tool call result
""" """
try: with llm_call_context():
# Emit call started event try:
self._emit_call_started_event( # Emit call started event
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
)
# Format messages for Anthropic # Format messages for Anthropic
formatted_messages, system_message = self._format_messages_for_anthropic( formatted_messages, system_message = (
messages self._format_messages_for_anthropic(messages)
) )
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): if not self._invoke_before_llm_call_hooks(
raise ValueError("LLM call blocked by before_llm_call hook") formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare completion parameters # Prepare completion parameters
completion_params = self._prepare_completion_params( completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools formatted_messages, system_message, tools
) )
# Handle streaming vs non-streaming # Handle streaming vs non-streaming
if self.stream: if self.stream:
return self._handle_streaming_completion( return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
response_model,
)
return self._handle_completion(
completion_params, completion_params,
available_functions, available_functions,
from_task, from_task,
@@ -241,21 +252,13 @@ class AnthropicCompletion(BaseLLM):
response_model, response_model,
) )
return self._handle_completion( except Exception as e:
completion_params, error_msg = f"Anthropic API call failed: {e!s}"
available_functions, logging.error(error_msg)
from_task, self._emit_call_failed_event(
from_agent, error=error_msg, from_task=from_task, from_agent=from_agent
response_model, )
) raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall( async def acall(
self, self,
@@ -280,26 +283,35 @@ class AnthropicCompletion(BaseLLM):
Returns: Returns:
Chat completion response or tool call result Chat completion response or tool call result
""" """
try: with llm_call_context():
self._emit_call_started_event( try:
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
)
formatted_messages, system_message = self._format_messages_for_anthropic( formatted_messages, system_message = (
messages self._format_messages_for_anthropic(messages)
) )
completion_params = self._prepare_completion_params( completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools formatted_messages, system_message, tools
) )
if self.stream: if self.stream:
return await self._ahandle_streaming_completion( return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
response_model,
)
return await self._ahandle_completion(
completion_params, completion_params,
available_functions, available_functions,
from_task, from_task,
@@ -307,21 +319,13 @@ class AnthropicCompletion(BaseLLM):
response_model, response_model,
) )
return await self._ahandle_completion( except Exception as e:
completion_params, error_msg = f"Anthropic API call failed: {e!s}"
available_functions, logging.error(error_msg)
from_task, self._emit_call_failed_event(
from_agent, error=error_msg, from_task=from_task, from_agent=from_agent
response_model, )
) raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_completion_params( def _prepare_completion_params(
self, self,
@@ -712,7 +716,7 @@ class AnthropicCompletion(BaseLLM):
chunk=text_delta, chunk=text_delta,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id response_id=response_id,
) )
if event.type == "content_block_start": if event.type == "content_block_start":
@@ -739,7 +743,7 @@ class AnthropicCompletion(BaseLLM):
"index": block_index, "index": block_index,
}, },
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id response_id=response_id,
) )
elif event.type == "content_block_delta": elif event.type == "content_block_delta":
if event.delta.type == "input_json_delta": if event.delta.type == "input_json_delta":
@@ -763,7 +767,7 @@ class AnthropicCompletion(BaseLLM):
"index": block_index, "index": block_index,
}, },
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id response_id=response_id,
) )
final_message: Message = stream.get_final_message() final_message: Message = stream.get_final_message()
@@ -1133,7 +1137,7 @@ class AnthropicCompletion(BaseLLM):
chunk=text_delta, chunk=text_delta,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id response_id=response_id,
) )
if event.type == "content_block_start": if event.type == "content_block_start":
@@ -1160,7 +1164,7 @@ class AnthropicCompletion(BaseLLM):
"index": block_index, "index": block_index,
}, },
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id response_id=response_id,
) )
elif event.type == "content_block_delta": elif event.type == "content_block_delta":
if event.delta.type == "input_json_delta": if event.delta.type == "input_json_delta":
@@ -1184,7 +1188,7 @@ class AnthropicCompletion(BaseLLM):
"index": block_index, "index": block_index,
}, },
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id response_id=response_id,
) )
final_message: Message = await stream.get_final_message() final_message: Message = await stream.get_final_message()

View File

@@ -43,7 +43,7 @@ try:
) )
from crewai.events.types.llm_events import LLMCallType from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM from crewai.llms.base_llm import BaseLLM, llm_call_context
except ImportError: except ImportError:
raise ImportError( raise ImportError(
@@ -288,31 +288,42 @@ class AzureCompletion(BaseLLM):
Returns: Returns:
Chat completion response or tool call result Chat completion response or tool call result
""" """
try: with llm_call_context():
# Emit call started event try:
self._emit_call_started_event( # Emit call started event
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
)
# Format messages for Azure # Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages) formatted_messages = self._format_messages_for_azure(messages)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): if not self._invoke_before_llm_call_hooks(
raise ValueError("LLM call blocked by before_llm_call hook") formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare completion parameters # Prepare completion parameters
completion_params = self._prepare_completion_params( completion_params = self._prepare_completion_params(
formatted_messages, tools, response_model formatted_messages, tools, response_model
) )
# Handle streaming vs non-streaming # Handle streaming vs non-streaming
if self.stream: if self.stream:
return self._handle_streaming_completion( return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
response_model,
)
return self._handle_completion(
completion_params, completion_params,
available_functions, available_functions,
from_task, from_task,
@@ -320,16 +331,8 @@ class AzureCompletion(BaseLLM):
response_model, response_model,
) )
return self._handle_completion( except Exception as e:
completion_params, return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
available_functions,
from_task,
from_agent,
response_model,
)
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
async def acall( # type: ignore[return] async def acall( # type: ignore[return]
self, self,
@@ -355,24 +358,33 @@ class AzureCompletion(BaseLLM):
Returns: Returns:
Chat completion response or tool call result Chat completion response or tool call result
""" """
try: with llm_call_context():
self._emit_call_started_event( try:
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
)
formatted_messages = self._format_messages_for_azure(messages) formatted_messages = self._format_messages_for_azure(messages)
completion_params = self._prepare_completion_params( completion_params = self._prepare_completion_params(
formatted_messages, tools, response_model formatted_messages, tools, response_model
) )
if self.stream: if self.stream:
return await self._ahandle_streaming_completion( return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
response_model,
)
return await self._ahandle_completion(
completion_params, completion_params,
available_functions, available_functions,
from_task, from_task,
@@ -380,16 +392,8 @@ class AzureCompletion(BaseLLM):
response_model, response_model,
) )
return await self._ahandle_completion( except Exception as e:
completion_params, self._handle_api_error(e, from_task, from_agent)
available_functions,
from_task,
from_agent,
response_model,
)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
def _prepare_completion_params( def _prepare_completion_params(
self, self,
@@ -726,7 +730,7 @@ class AzureCompletion(BaseLLM):
""" """
if update.choices: if update.choices:
choice = update.choices[0] choice = update.choices[0]
response_id = update.id if hasattr(update,"id") else None response_id = update.id if hasattr(update, "id") else None
if choice.delta and choice.delta.content: if choice.delta and choice.delta.content:
content_delta = choice.delta.content content_delta = choice.delta.content
full_response += content_delta full_response += content_delta
@@ -734,7 +738,7 @@ class AzureCompletion(BaseLLM):
chunk=content_delta, chunk=content_delta,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id response_id=response_id,
) )
if choice.delta and choice.delta.tool_calls: if choice.delta and choice.delta.tool_calls:
@@ -769,7 +773,7 @@ class AzureCompletion(BaseLLM):
"index": idx, "index": idx,
}, },
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id response_id=response_id,
) )
return full_response return full_response

View File

@@ -11,7 +11,7 @@ from pydantic import BaseModel
from typing_extensions import Required from typing_extensions import Required
from crewai.events.types.llm_events import LLMCallType from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import ( from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError, LLMContextLengthExceededError,
@@ -299,77 +299,89 @@ class BedrockCompletion(BaseLLM):
response_model: type[BaseModel] | None = None, response_model: type[BaseModel] | None = None,
) -> str | Any: ) -> str | Any:
"""Call AWS Bedrock Converse API.""" """Call AWS Bedrock Converse API."""
try: with llm_call_context():
# Emit call started event try:
self._emit_call_started_event( # Emit call started event
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
) )
# Add tool config if present or if messages contain tool content # Format messages for Converse API
# Bedrock requires toolConfig when messages have toolUse/toolResult formatted_messages, system_message = self._format_messages_for_converse(
if tools: messages
tool_config: ToolConfigurationTypeDef = { )
"tools": cast(
"Sequence[ToolTypeDef]", if not self._invoke_before_llm_call_hooks(
cast(object, self._format_tools_for_converse(tools)), formatted_messages, from_agent
) ):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
} }
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages): # Add system message if present
# Create minimal toolConfig from tool history in messages if system_message:
tools_from_history = self._extract_tools_from_message_history( body["system"] = cast(
formatted_messages "list[SystemContentBlockTypeDef]",
) cast(object, [{"text": system_message}]),
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
) )
# Add optional advanced features if configured # Add tool config if present or if messages contain tool content
if self.guardrail_config: # Bedrock requires toolConfig when messages have toolUse/toolResult
guardrail_config: GuardrailConfigurationTypeDef = cast( if tools:
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config) tool_config: ToolConfigurationTypeDef = {
) "tools": cast(
body["guardrailConfig"] = guardrail_config "Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.additional_model_request_fields: # Add optional advanced features if configured
body["additionalModelRequestFields"] = ( if self.guardrail_config:
self.additional_model_request_fields guardrail_config: GuardrailConfigurationTypeDef = cast(
) "GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_response_field_paths: if self.additional_model_request_fields:
body["additionalModelResponseFieldPaths"] = ( body["additionalModelRequestFields"] = (
self.additional_model_response_field_paths self.additional_model_request_fields
) )
if self.stream: if self.additional_model_response_field_paths:
return self._handle_streaming_converse( body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return self._handle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
)
return self._handle_converse(
formatted_messages, formatted_messages,
body, body,
available_functions, available_functions,
@@ -377,25 +389,17 @@ class BedrockCompletion(BaseLLM):
from_agent, from_agent,
) )
return self._handle_converse( except Exception as e:
formatted_messages, if is_context_length_exceeded(e):
body, logging.error(f"Context window exceeded: {e}")
available_functions, raise LLMContextLengthExceededError(str(e)) from e
from_task,
from_agent,
)
except Exception as e: error_msg = f"AWS Bedrock API call failed: {e!s}"
if is_context_length_exceeded(e): logging.error(error_msg)
logging.error(f"Context window exceeded: {e}") self._emit_call_failed_event(
raise LLMContextLengthExceededError(str(e)) from e error=error_msg, from_task=from_task, from_agent=from_agent
)
error_msg = f"AWS Bedrock API call failed: {e!s}" raise
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall( async def acall(
self, self,
@@ -431,87 +435,93 @@ class BedrockCompletion(BaseLLM):
'Install with: uv add "crewai[bedrock-async]"' 'Install with: uv add "crewai[bedrock-async]"'
) )
try: with llm_call_context():
self._emit_call_started_event( try:
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
) )
# Add tool config if present or if messages contain tool content formatted_messages, system_message = self._format_messages_for_converse(
# Bedrock requires toolConfig when messages have toolUse/toolResult messages
if tools: )
tool_config: ToolConfigurationTypeDef = {
"tools": cast( body: BedrockConverseRequestBody = {
"Sequence[ToolTypeDef]", "inferenceConfig": self._get_inference_config(),
cast(object, self._format_tools_for_converse(tools)),
)
} }
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages): if system_message:
# Create minimal toolConfig from tool history in messages body["system"] = cast(
tools_from_history = self._extract_tools_from_message_history( "list[SystemContentBlockTypeDef]",
formatted_messages cast(object, [{"text": system_message}]),
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
) )
if self.guardrail_config: # Add tool config if present or if messages contain tool content
guardrail_config: GuardrailConfigurationTypeDef = cast( # Bedrock requires toolConfig when messages have toolUse/toolResult
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config) if tools:
) tool_config: ToolConfigurationTypeDef = {
body["guardrailConfig"] = guardrail_config "tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.additional_model_request_fields: if self.guardrail_config:
body["additionalModelRequestFields"] = ( guardrail_config: GuardrailConfigurationTypeDef = cast(
self.additional_model_request_fields "GuardrailConfigurationTypeDef",
) cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_response_field_paths: if self.additional_model_request_fields:
body["additionalModelResponseFieldPaths"] = ( body["additionalModelRequestFields"] = (
self.additional_model_response_field_paths self.additional_model_request_fields
) )
if self.stream: if self.additional_model_response_field_paths:
return await self._ahandle_streaming_converse( body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return await self._ahandle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
)
return await self._ahandle_converse(
formatted_messages, body, available_functions, from_task, from_agent formatted_messages, body, available_functions, from_task, from_agent
) )
return await self._ahandle_converse( except Exception as e:
formatted_messages, body, available_functions, from_task, from_agent if is_context_length_exceeded(e):
) logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
except Exception as e: error_msg = f"AWS Bedrock API call failed: {e!s}"
if is_context_length_exceeded(e): logging.error(error_msg)
logging.error(f"Context window exceeded: {e}") self._emit_call_failed_event(
raise LLMContextLengthExceededError(str(e)) from e error=error_msg, from_task=from_task, from_agent=from_agent
)
error_msg = f"AWS Bedrock API call failed: {e!s}" raise
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _handle_converse( def _handle_converse(
self, self,
@@ -805,7 +815,7 @@ class BedrockCompletion(BaseLLM):
"index": tool_use_index, "index": tool_use_index,
}, },
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id response_id=response_id,
) )
elif "contentBlockStop" in event: elif "contentBlockStop" in event:
logging.debug("Content block stopped in stream") logging.debug("Content block stopped in stream")
@@ -1174,7 +1184,7 @@ class BedrockCompletion(BaseLLM):
chunk=text_chunk, chunk=text_chunk,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id response_id=response_id,
) )
elif "toolUse" in delta and current_tool_use: elif "toolUse" in delta and current_tool_use:
tool_input = delta["toolUse"].get("input", "") tool_input = delta["toolUse"].get("input", "")

View File

@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Literal, cast
from pydantic import BaseModel from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import ( from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError, LLMContextLengthExceededError,
@@ -282,32 +282,44 @@ class GeminiCompletion(BaseLLM):
Returns: Returns:
Chat completion response or tool call result Chat completion response or tool call result
""" """
try: with llm_call_context():
self._emit_call_started_event( try:
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
self.tools = tools )
self.tools = tools
formatted_content, system_instruction = self._format_messages_for_gemini( formatted_content, system_instruction = (
messages self._format_messages_for_gemini(messages)
) )
messages_for_hooks = self._convert_contents_to_dict(formatted_content) messages_for_hooks = self._convert_contents_to_dict(formatted_content)
if not self._invoke_before_llm_call_hooks(messages_for_hooks, from_agent): if not self._invoke_before_llm_call_hooks(
raise ValueError("LLM call blocked by before_llm_call hook") messages_for_hooks, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
config = self._prepare_generation_config( config = self._prepare_generation_config(
system_instruction, tools, response_model system_instruction, tools, response_model
) )
if self.stream: if self.stream:
return self._handle_streaming_completion( return self._handle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
response_model,
)
return self._handle_completion(
formatted_content, formatted_content,
config, config,
available_functions, available_functions,
@@ -316,29 +328,20 @@ class GeminiCompletion(BaseLLM):
response_model, response_model,
) )
return self._handle_completion( except APIError as e:
formatted_content, error_msg = f"Google Gemini API error: {e.code} - {e.message}"
config, logging.error(error_msg)
available_functions, self._emit_call_failed_event(
from_task, error=error_msg, from_task=from_task, from_agent=from_agent
from_agent, )
response_model, raise
) except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
except APIError as e: logging.error(error_msg)
error_msg = f"Google Gemini API error: {e.code} - {e.message}" self._emit_call_failed_event(
logging.error(error_msg) error=error_msg, from_task=from_task, from_agent=from_agent
self._emit_call_failed_event( )
error=error_msg, from_task=from_task, from_agent=from_agent raise
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall( async def acall(
self, self,
@@ -364,27 +367,37 @@ class GeminiCompletion(BaseLLM):
Returns: Returns:
Chat completion response or tool call result Chat completion response or tool call result
""" """
try: with llm_call_context():
self._emit_call_started_event( try:
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
self.tools = tools )
self.tools = tools
formatted_content, system_instruction = self._format_messages_for_gemini( formatted_content, system_instruction = (
messages self._format_messages_for_gemini(messages)
) )
config = self._prepare_generation_config( config = self._prepare_generation_config(
system_instruction, tools, response_model system_instruction, tools, response_model
) )
if self.stream: if self.stream:
return await self._ahandle_streaming_completion( return await self._ahandle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
response_model,
)
return await self._ahandle_completion(
formatted_content, formatted_content,
config, config,
available_functions, available_functions,
@@ -393,29 +406,20 @@ class GeminiCompletion(BaseLLM):
response_model, response_model,
) )
return await self._ahandle_completion( except APIError as e:
formatted_content, error_msg = f"Google Gemini API error: {e.code} - {e.message}"
config, logging.error(error_msg)
available_functions, self._emit_call_failed_event(
from_task, error=error_msg, from_task=from_task, from_agent=from_agent
from_agent, )
response_model, raise
) except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
except APIError as e: logging.error(error_msg)
error_msg = f"Google Gemini API error: {e.code} - {e.message}" self._emit_call_failed_event(
logging.error(error_msg) error=error_msg, from_task=from_task, from_agent=from_agent
self._emit_call_failed_event( )
error=error_msg, from_task=from_task, from_agent=from_agent raise
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_generation_config( def _prepare_generation_config(
self, self,
@@ -790,7 +794,7 @@ class GeminiCompletion(BaseLLM):
Returns: Returns:
Tuple of (updated full_response, updated function_calls, updated usage_data) Tuple of (updated full_response, updated function_calls, updated usage_data)
""" """
response_id=chunk.response_id if hasattr(chunk,"response_id") else None response_id = chunk.response_id if hasattr(chunk, "response_id") else None
if chunk.usage_metadata: if chunk.usage_metadata:
usage_data = self._extract_token_usage(chunk) usage_data = self._extract_token_usage(chunk)
@@ -800,7 +804,7 @@ class GeminiCompletion(BaseLLM):
chunk=chunk.text, chunk=chunk.text,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id response_id=response_id,
) )
if chunk.candidates: if chunk.candidates:
@@ -837,7 +841,7 @@ class GeminiCompletion(BaseLLM):
"index": call_index, "index": call_index,
}, },
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id response_id=response_id,
) )
return full_response, function_calls, usage_data return full_response, function_calls, usage_data

View File

@@ -17,7 +17,7 @@ from openai.types.responses import Response
from pydantic import BaseModel from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import ( from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -382,23 +382,35 @@ class OpenAICompletion(BaseLLM):
Returns: Returns:
Completion response or tool call result. Completion response or tool call result.
""" """
try: with llm_call_context():
self._emit_call_started_event( try:
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
)
formatted_messages = self._format_messages(messages) formatted_messages = self._format_messages(messages)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent): if not self._invoke_before_llm_call_hooks(
raise ValueError("LLM call blocked by before_llm_call hook") formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if self.api == "responses": if self.api == "responses":
return self._call_responses( return self._call_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return self._call_completions(
messages=formatted_messages, messages=formatted_messages,
tools=tools, tools=tools,
available_functions=available_functions, available_functions=available_functions,
@@ -407,22 +419,13 @@ class OpenAICompletion(BaseLLM):
response_model=response_model, response_model=response_model,
) )
return self._call_completions( except Exception as e:
messages=formatted_messages, error_msg = f"OpenAI API call failed: {e!s}"
tools=tools, logging.error(error_msg)
available_functions=available_functions, self._emit_call_failed_event(
from_task=from_task, error=error_msg, from_task=from_task, from_agent=from_agent
from_agent=from_agent, )
response_model=response_model, raise
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _call_completions( def _call_completions(
self, self,
@@ -479,20 +482,30 @@ class OpenAICompletion(BaseLLM):
Returns: Returns:
Completion response or tool call result. Completion response or tool call result.
""" """
try: with llm_call_context():
self._emit_call_started_event( try:
messages=messages, self._emit_call_started_event(
tools=tools, messages=messages,
callbacks=callbacks, tools=tools,
available_functions=available_functions, callbacks=callbacks,
from_task=from_task, available_functions=available_functions,
from_agent=from_agent, from_task=from_task,
) from_agent=from_agent,
)
formatted_messages = self._format_messages(messages) formatted_messages = self._format_messages(messages)
if self.api == "responses": if self.api == "responses":
return await self._acall_responses( return await self._acall_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._acall_completions(
messages=formatted_messages, messages=formatted_messages,
tools=tools, tools=tools,
available_functions=available_functions, available_functions=available_functions,
@@ -501,22 +514,13 @@ class OpenAICompletion(BaseLLM):
response_model=response_model, response_model=response_model,
) )
return await self._acall_completions( except Exception as e:
messages=formatted_messages, error_msg = f"OpenAI API call failed: {e!s}"
tools=tools, logging.error(error_msg)
available_functions=available_functions, self._emit_call_failed_event(
from_task=from_task, error=error_msg, from_task=from_task, from_agent=from_agent
from_agent=from_agent, )
response_model=response_model, raise
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def _acall_completions( async def _acall_completions(
self, self,
@@ -1060,7 +1064,7 @@ class OpenAICompletion(BaseLLM):
chunk=delta_text, chunk=delta_text,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id_stream response_id=response_id_stream,
) )
elif event.type == "response.function_call_arguments.delta": elif event.type == "response.function_call_arguments.delta":
@@ -1709,7 +1713,7 @@ class OpenAICompletion(BaseLLM):
**parse_params, response_format=response_model **parse_params, response_format=response_model
) as stream: ) as stream:
for chunk in stream: for chunk in stream:
response_id_stream=chunk.id if hasattr(chunk,"id") else None response_id_stream = chunk.id if hasattr(chunk, "id") else None
if chunk.type == "content.delta": if chunk.type == "content.delta":
delta_content = chunk.delta delta_content = chunk.delta
@@ -1718,7 +1722,7 @@ class OpenAICompletion(BaseLLM):
chunk=delta_content, chunk=delta_content,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id_stream response_id=response_id_stream,
) )
final_completion = stream.get_final_completion() final_completion = stream.get_final_completion()
@@ -1748,7 +1752,9 @@ class OpenAICompletion(BaseLLM):
usage_data = {"total_tokens": 0} usage_data = {"total_tokens": 0}
for completion_chunk in completion_stream: for completion_chunk in completion_stream:
response_id_stream=completion_chunk.id if hasattr(completion_chunk,"id") else None response_id_stream = (
completion_chunk.id if hasattr(completion_chunk, "id") else None
)
if hasattr(completion_chunk, "usage") and completion_chunk.usage: if hasattr(completion_chunk, "usage") and completion_chunk.usage:
usage_data = self._extract_openai_token_usage(completion_chunk) usage_data = self._extract_openai_token_usage(completion_chunk)
@@ -1766,7 +1772,7 @@ class OpenAICompletion(BaseLLM):
chunk=chunk_delta.content, chunk=chunk_delta.content,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id_stream response_id=response_id_stream,
) )
if chunk_delta.tool_calls: if chunk_delta.tool_calls:
@@ -1805,7 +1811,7 @@ class OpenAICompletion(BaseLLM):
"index": tool_calls[tool_index]["index"], "index": tool_calls[tool_index]["index"],
}, },
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id_stream response_id=response_id_stream,
) )
self._track_token_usage_internal(usage_data) self._track_token_usage_internal(usage_data)
@@ -2017,7 +2023,7 @@ class OpenAICompletion(BaseLLM):
accumulated_content = "" accumulated_content = ""
usage_data = {"total_tokens": 0} usage_data = {"total_tokens": 0}
async for chunk in completion_stream: async for chunk in completion_stream:
response_id_stream=chunk.id if hasattr(chunk,"id") else None response_id_stream = chunk.id if hasattr(chunk, "id") else None
if hasattr(chunk, "usage") and chunk.usage: if hasattr(chunk, "usage") and chunk.usage:
usage_data = self._extract_openai_token_usage(chunk) usage_data = self._extract_openai_token_usage(chunk)
@@ -2035,7 +2041,7 @@ class OpenAICompletion(BaseLLM):
chunk=delta.content, chunk=delta.content,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id_stream response_id=response_id_stream,
) )
self._track_token_usage_internal(usage_data) self._track_token_usage_internal(usage_data)
@@ -2071,7 +2077,7 @@ class OpenAICompletion(BaseLLM):
usage_data = {"total_tokens": 0} usage_data = {"total_tokens": 0}
async for chunk in stream: async for chunk in stream:
response_id_stream=chunk.id if hasattr(chunk,"id") else None response_id_stream = chunk.id if hasattr(chunk, "id") else None
if hasattr(chunk, "usage") and chunk.usage: if hasattr(chunk, "usage") and chunk.usage:
usage_data = self._extract_openai_token_usage(chunk) usage_data = self._extract_openai_token_usage(chunk)
@@ -2089,7 +2095,7 @@ class OpenAICompletion(BaseLLM):
chunk=chunk_delta.content, chunk=chunk_delta.content,
from_task=from_task, from_task=from_task,
from_agent=from_agent, from_agent=from_agent,
response_id=response_id_stream response_id=response_id_stream,
) )
if chunk_delta.tool_calls: if chunk_delta.tool_calls:
@@ -2128,7 +2134,7 @@ class OpenAICompletion(BaseLLM):
"index": tool_calls[tool_index]["index"], "index": tool_calls[tool_index]["index"],
}, },
call_type=LLMCallType.TOOL_CALL, call_type=LLMCallType.TOOL_CALL,
response_id=response_id_stream response_id=response_id_stream,
) )
self._track_token_usage_internal(usage_data) self._track_token_usage_internal(usage_data)

View File

@@ -0,0 +1,108 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpUSxS5LeHwDTELElWlC5CDMzmr\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437564,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi there! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 10,\n \"total_tokens\": 19,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:05 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '460'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '477'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,215 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpStmyOpe9DrthWBlDdMZfVMJ1u\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 9,\n \"total_tokens\": 18,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:02 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '415'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '434'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"user","content":"Say bye"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '72'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpS1DP0Xd3tmWt5PBincVrdU7yw\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Goodbye! If you have more questions
in the future, feel free to reach out. Have a great day!\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 23,\n \"total_tokens\": 32,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:03 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '964'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '979'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,143 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini","stream":true,"stream_options":{"include_usage":true}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '125'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: 'data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"rVIyGQF2E"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ZGVqV7ZDm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"vnfm7IxlIB"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
How"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"o8F35ZZ"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
can"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"kiBzGe3"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
I"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"cbGT2RWgx"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
assist"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"DtxR"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
you"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"6y6Co8J"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
today"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SZOmm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"s9Bc0HqlPg"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"u9aar"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":9,"completion_tokens":9,"total_tokens":18,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"5hudm8ySqh39"}
data: [DONE]
'
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Mon, 26 Jan 2026 14:26:04 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '260'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '275'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -217,6 +217,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent( LLMStreamChunkEvent(
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="Hello ", chunk="Hello ",
call_id="test-call-id",
), ),
) )
crewai_event_bus.emit( crewai_event_bus.emit(
@@ -224,6 +225,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent( LLMStreamChunkEvent(
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="World!", chunk="World!",
call_id="test-call-id",
), ),
) )
return mock_output return mock_output
@@ -284,6 +286,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent( LLMStreamChunkEvent(
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="", chunk="",
call_id="test-call-id",
tool_call=ToolCall( tool_call=ToolCall(
id="call-123", id="call-123",
function=FunctionCall( function=FunctionCall(
@@ -364,6 +367,7 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent( LLMStreamChunkEvent(
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="Async ", chunk="Async ",
call_id="test-call-id",
), ),
) )
crewai_event_bus.emit( crewai_event_bus.emit(
@@ -371,6 +375,7 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent( LLMStreamChunkEvent(
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="Stream!", chunk="Stream!",
call_id="test-call-id",
), ),
) )
return mock_output return mock_output
@@ -451,6 +456,7 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent( LLMStreamChunkEvent(
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="Flow ", chunk="Flow ",
call_id="test-call-id",
), ),
) )
crewai_event_bus.emit( crewai_event_bus.emit(
@@ -458,6 +464,7 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent( LLMStreamChunkEvent(
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="output!", chunk="output!",
call_id="test-call-id",
), ),
) )
return "done" return "done"
@@ -545,6 +552,7 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent( LLMStreamChunkEvent(
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="Async flow ", chunk="Async flow ",
call_id="test-call-id",
), ),
) )
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
@@ -553,6 +561,7 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent( LLMStreamChunkEvent(
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="stream!", chunk="stream!",
call_id="test-call-id",
), ),
) )
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
@@ -686,6 +695,7 @@ class TestStreamingEdgeCases:
type="llm_stream_chunk", type="llm_stream_chunk",
chunk="Task 1", chunk="Task 1",
task_name="First task", task_name="First task",
call_id="test-call-id",
), ),
) )
return mock_output return mock_output

View File

@@ -984,8 +984,8 @@ def test_streaming_fallback_to_non_streaming():
def mock_call(messages, tools=None, callbacks=None, available_functions=None): def mock_call(messages, tools=None, callbacks=None, available_functions=None):
nonlocal fallback_called nonlocal fallback_called
# Emit a couple of chunks to simulate partial streaming # Emit a couple of chunks to simulate partial streaming
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id = "Id")) crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id="Id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id = "Id")) crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id="Id", call_id="test-call-id"))
# Mark that fallback would be called # Mark that fallback would be called
fallback_called = True fallback_called = True
@@ -1041,7 +1041,7 @@ def test_streaming_empty_response_handling():
def mock_call(messages, tools=None, callbacks=None, available_functions=None): def mock_call(messages, tools=None, callbacks=None, available_functions=None):
# Emit a few empty chunks # Emit a few empty chunks
for _ in range(3): for _ in range(3):
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="",response_id="id")) crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="", response_id="id", call_id="test-call-id"))
# Return the default message for empty responses # Return the default message for empty responses
return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request." return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request."
@@ -1280,6 +1280,105 @@ def test_llm_emits_event_with_lite_agent():
assert set(all_agent_id) == {str(agent.id)} assert set(all_agent_id) == {str(agent.id)}
# ----------- CALL_ID CORRELATION TESTS -----------
@pytest.mark.vcr()
def test_llm_call_events_share_call_id():
"""All events from a single LLM call should share the same call_id."""
import uuid
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
with condition:
success = condition.wait_for(lambda: len(events) >= 2, timeout=10)
assert success, "Timeout waiting for LLM events"
# Behavior: all events from the call share the same call_id
assert len(events) == 2
assert events[0].call_id == events[1].call_id
# call_id should be a valid UUID
uuid.UUID(events[0].call_id)
@pytest.mark.vcr()
def test_streaming_chunks_share_call_id_with_call():
"""Streaming chunks should share call_id with started/completed events."""
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_chunk(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini", stream=True)
llm.call("Say hi")
with condition:
# Wait for at least started, some chunks, and completed
success = condition.wait_for(lambda: len(events) >= 3, timeout=10)
assert success, "Timeout waiting for streaming events"
# Behavior: all events (started, chunks, completed) share the same call_id
call_ids = {e.call_id for e in events}
assert len(call_ids) == 1
@pytest.mark.vcr()
def test_separate_llm_calls_have_different_call_ids():
"""Different LLM calls should have different call_ids."""
call_ids = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
call_ids.append(event.call_id)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
llm.call("Say bye")
with condition:
success = condition.wait_for(lambda: len(call_ids) >= 2, timeout=10)
assert success, "Timeout waiting for LLM call events"
# Behavior: each call has its own call_id
assert len(call_ids) == 2
assert call_ids[0] != call_ids[1]
# ----------- HUMAN FEEDBACK EVENTS ----------- # ----------- HUMAN FEEDBACK EVENTS -----------