Compare commits

..

5 Commits

Author SHA1 Message Date
Vinicius Brasil
2f0cdf14c7 Add call_id to LLM events for correlating requests
When monitoring LLM events, consumers need to know which events belong
to the same API call. Before this change, there was no way to correlate
LLMCallStartedEvent, LLMStreamChunkEvent, and LLMCallCompletedEvent
belonging to the same request.
2026-02-02 17:04:03 -03:00
Sampson
8c6436234b adds additional search params (#4321)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Introduces support for additional Brave Search API web-search parameters.
2026-02-02 11:17:02 -08:00
Lucas Gomide
96bde4510b feat: auto update tools.specs (#4341) 2026-02-02 12:52:00 -05:00
Greyson LaLonde
9d7f45376a fix: use contextvars for flow execution context 2026-02-02 11:24:02 -05:00
Thiago Moretto
536447ab0e declare stagehand package as dep for StagehandTool (#4336) 2026-02-02 09:45:47 -05:00
33 changed files with 1622 additions and 822 deletions

View File

@@ -0,0 +1,63 @@
name: Generate Tool Specifications
on:
pull_request:
branches:
- main
paths:
- 'lib/crewai-tools/src/crewai_tools/**'
workflow_dispatch:
permissions:
contents: write
pull-requests: write
jobs:
generate-specs:
runs-on: ubuntu-latest
env:
PYTHONUNBUFFERED: 1
steps:
- name: Generate GitHub App token
id: app-token
uses: tibdex/github-app-token@v2
with:
app_id: ${{ secrets.CREWAI_TOOL_SPECS_APP_ID }}
private_key: ${{ secrets.CREWAI_TOOL_SPECS_PRIVATE_KEY }}
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.head_ref }}
token: ${{ steps.app-token.outputs.token }}
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
version: "0.8.4"
python-version: "3.12"
enable-cache: true
- name: Install the project
working-directory: lib/crewai-tools
run: uv sync --dev --all-extras
- name: Generate tool specifications
working-directory: lib/crewai-tools
run: uv run python src/crewai_tools/generate_tool_specs.py
- name: Check for changes and commit
run: |
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add lib/crewai-tools/tool.specs.json
if git diff --quiet --staged; then
echo "No changes detected in tool.specs.json"
else
echo "Changes detected in tool.specs.json, committing..."
git commit -m "chore: update tool specifications"
git push
fi

View File

@@ -1,12 +1,17 @@
from datetime import datetime
import json
import os
import time
from typing import Any, ClassVar
from typing import Annotated, Any, ClassVar, Literal
from crewai.tools import BaseTool, EnvVar
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from pydantic.types import StringConstraints
import requests
load_dotenv()
def _save_results_to_file(content: str) -> None:
"""Saves the search results to a file."""
@@ -15,37 +20,72 @@ def _save_results_to_file(content: str) -> None:
file.write(content)
class BraveSearchToolSchema(BaseModel):
"""Input for BraveSearchTool."""
FreshnessPreset = Literal["pd", "pw", "pm", "py"]
FreshnessRange = Annotated[
str, StringConstraints(pattern=r"^\d{4}-\d{2}-\d{2}to\d{4}-\d{2}-\d{2}$")
]
Freshness = FreshnessPreset | FreshnessRange
SafeSearch = Literal["off", "moderate", "strict"]
search_query: str = Field(
..., description="Mandatory search query you want to use to search the internet"
class BraveSearchToolSchema(BaseModel):
"""Input for BraveSearchTool"""
query: str = Field(..., description="Search query to perform")
country: str | None = Field(
default=None,
description="Country code for geo-targeting (e.g., 'US', 'BR').",
)
search_language: str | None = Field(
default=None,
description="Language code for the search results (e.g., 'en', 'es').",
)
count: int | None = Field(
default=None,
description="The maximum number of results to return. Actual number may be less.",
)
offset: int | None = Field(
default=None, description="Skip the first N result sets/pages. Max is 9."
)
safesearch: SafeSearch | None = Field(
default=None,
description="Filter out explicit content. Options: off/moderate/strict",
)
spellcheck: bool | None = Field(
default=None,
description="Attempt to correct spelling errors in the search query.",
)
freshness: Freshness | None = Field(
default=None,
description="Enforce freshness of results. Options: pd/pw/pm/py, or YYYY-MM-DDtoYYYY-MM-DD",
)
text_decorations: bool | None = Field(
default=None,
description="Include markup to highlight search terms in the results.",
)
extra_snippets: bool | None = Field(
default=None,
description="Include up to 5 text snippets for each page if possible.",
)
operators: bool | None = Field(
default=None,
description="Whether to apply search operators (e.g., site:example.com).",
)
# TODO: Extend support to additional endpoints (e.g., /images, /news, etc.)
class BraveSearchTool(BaseTool):
"""BraveSearchTool - A tool for performing web searches using the Brave Search API.
"""A tool that performs web searches using the Brave Search API."""
This module provides functionality to search the internet using Brave's Search API,
supporting customizable result counts and country-specific searches.
Dependencies:
- requests
- pydantic
- python-dotenv (for API key management)
"""
name: str = "Brave Web Search the internet"
name: str = "Brave Search"
description: str = (
"A tool that can be used to search the internet with a search_query."
"A tool that performs web searches using the Brave Search API. "
"Results are returned as structured JSON data."
)
args_schema: type[BaseModel] = BraveSearchToolSchema
search_url: str = "https://api.search.brave.com/res/v1/web/search"
country: str | None = ""
n_results: int = 10
save_file: bool = False
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
@@ -55,6 +95,9 @@ class BraveSearchTool(BaseTool):
),
]
)
# Rate limiting parameters
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -73,19 +116,64 @@ class BraveSearchTool(BaseTool):
self._min_request_interval - (current_time - self._last_request_time)
)
BraveSearchTool._last_request_time = time.time()
# Construct and send the request
try:
search_query = kwargs.get("search_query") or kwargs.get("query")
if not search_query:
raise ValueError("Search query is required")
# Maintain both "search_query" and "query" for backwards compatibility
query = kwargs.get("search_query") or kwargs.get("query")
if not query:
raise ValueError("Query is required")
payload = {"q": query}
if country := kwargs.get("country"):
payload["country"] = country
if search_language := kwargs.get("search_language"):
payload["search_language"] = search_language
# Fallback to deprecated n_results parameter if no count is provided
count = kwargs.get("count")
if count is not None:
payload["count"] = count
else:
payload["count"] = self.n_results
# Offset may be 0, so avoid truthiness check
offset = kwargs.get("offset")
if offset is not None:
payload["offset"] = offset
if safesearch := kwargs.get("safesearch"):
payload["safesearch"] = safesearch
save_file = kwargs.get("save_file", self.save_file)
n_results = kwargs.get("n_results", self.n_results)
if freshness := kwargs.get("freshness"):
payload["freshness"] = freshness
payload = {"q": search_query, "count": n_results}
# Boolean parameters
spellcheck = kwargs.get("spellcheck")
if spellcheck is not None:
payload["spellcheck"] = spellcheck
if self.country != "":
payload["country"] = self.country
text_decorations = kwargs.get("text_decorations")
if text_decorations is not None:
payload["text_decorations"] = text_decorations
extra_snippets = kwargs.get("extra_snippets")
if extra_snippets is not None:
payload["extra_snippets"] = extra_snippets
operators = kwargs.get("operators")
if operators is not None:
payload["operators"] = operators
# Limit the result types to "web" since there is presently no
# handling of other types like "discussions", "faq", "infobox",
# "news", "videos", or "locations".
payload["result_filter"] = "web"
# Setup Request Headers
headers = {
"X-Subscription-Token": os.environ["BRAVE_API_KEY"],
"Accept": "application/json",
@@ -97,25 +185,32 @@ class BraveSearchTool(BaseTool):
response.raise_for_status() # Handle non-200 responses
results = response.json()
# TODO: Handle other result types like "discussions", "faq", etc.
web_results_items = []
if "web" in results:
results = results["web"]["results"]
string = []
for result in results:
try:
string.append(
"\n".join(
[
f"Title: {result['title']}",
f"Link: {result['url']}",
f"Snippet: {result['description']}",
"---",
]
)
)
except KeyError: # noqa: PERF203
continue
web_results = results["web"]["results"]
content = "\n".join(string)
for result in web_results:
url = result.get("url")
title = result.get("title")
# If, for whatever reason, this entry does not have a title
# or url, skip it.
if not url or not title:
continue
item = {
"url": url,
"title": title,
}
description = result.get("description")
if description:
item["description"] = description
snippets = result.get("extra_snippets")
if snippets:
item["snippets"] = snippets
web_results_items.append(item)
content = json.dumps(web_results_items)
except requests.RequestException as e:
return f"Error performing search: {e!s}"
except KeyError as e:

View File

@@ -137,6 +137,7 @@ class StagehandTool(BaseTool):
- 'observe': For finding elements in a specific area
"""
args_schema: type[BaseModel] = StagehandToolSchema
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand"])
# Stagehand configuration
api_key: str | None = None

View File

@@ -1,8 +1,10 @@
import json
from unittest.mock import patch
from crewai_tools.tools.brave_search_tool.brave_search_tool import BraveSearchTool
import pytest
from crewai_tools.tools.brave_search_tool.brave_search_tool import BraveSearchTool
@pytest.fixture
def brave_tool():
@@ -30,16 +32,43 @@ def test_brave_tool_search(mock_get, brave_tool):
}
mock_get.return_value.json.return_value = mock_response
result = brave_tool.run(search_query="test")
result = brave_tool.run(query="test")
assert "Test Title" in result
assert "http://test.com" in result
def test_brave_tool():
tool = BraveSearchTool(
n_results=2,
)
tool.run(search_query="ChatGPT")
@patch("requests.get")
def test_brave_tool(mock_get):
mock_response = {
"web": {
"results": [
{
"title": "Brave Browser",
"url": "https://brave.com",
"description": "Brave Browser description",
}
]
}
}
mock_get.return_value.json.return_value = mock_response
tool = BraveSearchTool(n_results=2)
result = tool.run(query="Brave Browser")
assert result is not None
# Parse JSON so we can examine the structure
data = json.loads(result)
assert isinstance(data, list)
assert len(data) >= 1
# First item should have expected fields: title, url, and description
first = data[0]
assert "title" in first
assert first["title"] == "Brave Browser"
assert "url" in first
assert first["url"] == "https://brave.com"
assert "description" in first
assert first["description"] == "Brave Browser description"
if __name__ == "__main__":

View File

@@ -129,25 +129,20 @@ PROVIDERS = [
MODELS = {
"openai": [
"gpt-4",
"gpt-4.1",
"gpt-4.1-mini-2025-04-14",
"gpt-4.1-nano-2025-04-14",
"gpt-4o",
"gpt-4o-mini",
"gpt-4-turbo",
"gpt-4.1",
"gpt-4.1-mini",
"gpt-4.1-nano",
"o1",
"o1-mini",
"o1-preview",
"o3",
"o3-mini",
"o4-mini",
],
"anthropic": [
"claude-sonnet-4-5-20250514",
"claude-3-7-sonnet-20250219",
"claude-3-5-sonnet-20241022",
"claude-3-5-haiku-20241022",
"claude-3-5-sonnet-20240620",
"claude-3-sonnet-20240229",
"claude-3-opus-20240229",
"claude-3-haiku-20240307",
],
"gemini": [
"gemini/gemini-3-pro-preview",
@@ -235,15 +230,13 @@ MODELS = {
"nvidia_nim/baichuan-inc/baichuan2-13b-chat",
],
"groq": [
"groq/llama-3.3-70b-versatile",
"groq/llama-3.3-70b-specdec",
"groq/llama-3.1-8b-instant",
"groq/llama-3.2-3b-preview",
"groq/llama-3.2-1b-preview",
"groq/mixtral-8x7b-32768",
"groq/llama-3.1-70b-versatile",
"groq/llama-3.1-405b-reasoning",
"groq/gemma2-9b-it",
"groq/gemma-7b-it",
],
"ollama": ["ollama/llama3.2", "ollama/llama3.3", "ollama/mixtral", "ollama/deepseek-r1"],
"ollama": ["ollama/llama3.1", "ollama/mixtral"],
"watson": [
"watsonx/meta-llama/llama-3-1-70b-instruct",
"watsonx/meta-llama/llama-3-1-8b-instruct",

View File

@@ -10,6 +10,7 @@ class LLMEventBase(BaseEvent):
from_task: Any | None = None
from_agent: Any | None = None
model: str | None = None
call_id: str
def __init__(self, **data: Any) -> None:
if data.get("from_task"):

View File

@@ -8,11 +8,13 @@ Example:
from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow):
self.send_slack_notification(context)
raise HumanFeedbackPending(context=context)
class MyFlow(Flow):
@start()
@human_feedback(
@@ -26,12 +28,13 @@ Example:
```
"""
from crewai.flow.async_feedback.providers import ConsoleProvider
from crewai.flow.async_feedback.types import (
HumanFeedbackPending,
HumanFeedbackProvider,
PendingFeedbackContext,
)
from crewai.flow.async_feedback.providers import ConsoleProvider
__all__ = [
"ConsoleProvider",

View File

@@ -6,10 +6,11 @@ provider that collects feedback via console input.
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from crewai.flow.async_feedback.types import PendingFeedbackContext
if TYPE_CHECKING:
from crewai.flow.flow import Flow
@@ -27,6 +28,7 @@ class ConsoleProvider:
```python
from crewai.flow.async_feedback import ConsoleProvider
# Explicitly use console provider
@human_feedback(
message="Review this:",
@@ -49,7 +51,7 @@ class ConsoleProvider:
def request_feedback(
self,
context: PendingFeedbackContext,
flow: Flow,
flow: Flow[Any],
) -> str:
"""Request feedback via console input (blocking).

View File

@@ -10,6 +10,7 @@ from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
if TYPE_CHECKING:
from crewai.flow.flow import Flow
@@ -155,7 +156,7 @@ class HumanFeedbackPending(Exception): # noqa: N818 - Not an error, a control f
callback_info={
"slack_channel": "#reviews",
"thread_id": ticket_id,
}
},
)
```
"""
@@ -232,7 +233,7 @@ class HumanFeedbackProvider(Protocol):
callback_info={
"channel": self.channel,
"thread_id": thread_id,
}
},
)
```
"""
@@ -240,7 +241,7 @@ class HumanFeedbackProvider(Protocol):
def request_feedback(
self,
context: PendingFeedbackContext,
flow: Flow,
flow: Flow[Any],
) -> str:
"""Request feedback from a human.

View File

@@ -1,4 +1,5 @@
from typing import Final, Literal
AND_CONDITION: Final[Literal["AND"]] = "AND"
OR_CONDITION: Final[Literal["OR"]] = "OR"

View File

@@ -58,6 +58,7 @@ from crewai.events.types.flow_events import (
MethodExecutionStartedEvent,
)
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
@@ -1540,6 +1541,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx)
flow_token = attach(ctx)
flow_id_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
try:
# Reset flow state for fresh execution unless restoring from persistence
is_restoring = inputs and "id" in inputs and self._persistence is not None
@@ -1717,6 +1725,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
return final_output
finally:
if request_id_token is not None:
current_flow_request_id.reset(request_id_token)
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
detach(flow_token)
async def akickoff(

View File

@@ -8,6 +8,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import HumanFeedbackProvider

View File

@@ -0,0 +1,16 @@
"""Flow execution context management.
This module provides context variables for tracking flow execution state across
async boundaries and nested function calls.
"""
import contextvars
current_flow_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_request_id", default=None
)
current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_id", default=None
)

View File

@@ -1,46 +1,22 @@
import inspect
from typing import Any
from pydantic import BaseModel, Field, InstanceOf, model_validator
from pydantic import BaseModel, model_validator
from typing_extensions import Self
from crewai.flow.flow import Flow
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
class FlowTrackable(BaseModel):
"""Mixin that tracks the Flow instance that instantiated the object, e.g. a
Flow instance that created a Crew or Agent.
"""Mixin that tracks flow execution context for objects created within flows.
Automatically finds and stores a reference to the parent Flow instance by
inspecting the call stack.
When a Crew or Agent is instantiated inside a flow execution, this mixin
automatically captures the flow ID and request ID from context variables,
enabling proper tracking and association with the parent flow execution.
"""
parent_flow: InstanceOf[Flow[Any]] | None = Field(
default=None,
description="The parent flow of the instance, if it was created inside a flow.",
)
@model_validator(mode="after")
def _set_parent_flow(self) -> Self:
max_depth = 8
frame = inspect.currentframe()
try:
if frame is None:
return self
frame = frame.f_back
for _ in range(max_depth):
if frame is None:
break
candidate = frame.f_locals.get("self")
if isinstance(candidate, Flow):
self.parent_flow = candidate
break
frame = frame.f_back
finally:
del frame
def _set_flow_context(self) -> Self:
request_id = current_flow_request_id.get()
if request_id:
self._request_id = request_id
self._flow_id = current_flow_id.get()
return self

View File

@@ -11,6 +11,7 @@ Example (synchronous, default):
```python
from crewai.flow import Flow, start, listen, human_feedback
class ReviewFlow(Flow):
@start()
@human_feedback(
@@ -32,11 +33,13 @@ Example (asynchronous with custom provider):
from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow):
self.send_notification(context)
raise HumanFeedbackPending(context=context)
class ReviewFlow(Flow):
@start()
@human_feedback(
@@ -229,6 +232,7 @@ def human_feedback(
def review_document(self):
return document_content
@listen("approved")
def publish(self):
print(f"Publishing: {self.last_human_feedback.output}")
@@ -265,7 +269,7 @@ def human_feedback(
def decorator(func: F) -> F:
"""Inner decorator that wraps the function."""
def _request_feedback(flow_instance: Flow, method_output: Any) -> str:
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
"""Request feedback using provider or default console."""
from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -291,19 +295,16 @@ def human_feedback(
effective_provider = flow_config.hitl_provider
if effective_provider is not None:
# Use provider (may raise HumanFeedbackPending for async providers)
return effective_provider.request_feedback(context, flow_instance)
else:
# Use default console input (local development)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
def _process_feedback(
flow_instance: Flow,
flow_instance: Flow[Any],
method_output: Any,
raw_feedback: str,
) -> HumanFeedbackResult | str:
@@ -319,12 +320,14 @@ def human_feedback(
# No default and no feedback - use first outcome
collapsed_outcome = emit[0]
elif emit:
# Collapse feedback to outcome using LLM
collapsed_outcome = flow_instance._collapse_to_outcome(
feedback=raw_feedback,
outcomes=emit,
llm=llm,
)
if llm is not None:
collapsed_outcome = flow_instance._collapse_to_outcome(
feedback=raw_feedback,
outcomes=emit,
llm=llm,
)
else:
collapsed_outcome = emit[0]
# Create result
result = HumanFeedbackResult(
@@ -349,7 +352,7 @@ def human_feedback(
if asyncio.iscoroutinefunction(func):
# Async wrapper
@wraps(func)
async def async_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any:
async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
# Execute the original method
method_output = await func(self, *args, **kwargs)
@@ -363,7 +366,7 @@ def human_feedback(
else:
# Sync wrapper
@wraps(func)
def sync_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any:
def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
# Execute the original method
method_output = func(self, *args, **kwargs)
@@ -397,11 +400,10 @@ def human_feedback(
)
wrapper.__is_flow_method__ = True
# Make it a router if emit specified
if emit:
wrapper.__is_router__ = True
wrapper.__router_paths__ = list(emit)
return wrapper # type: ignore[return-value]
return wrapper # type: ignore[no-any-return]
return decorator

View File

@@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any
from pydantic import BaseModel
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -103,4 +104,3 @@ class FlowPersistence(ABC):
Args:
flow_uuid: Unique identifier for the flow instance
"""
pass

View File

@@ -15,6 +15,7 @@ from pydantic import BaseModel
from crewai.flow.persistence.base import FlowPersistence
from crewai.utilities.paths import db_storage_path
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -176,7 +177,8 @@ class SQLiteFlowPersistence(FlowPersistence):
row = cursor.fetchone()
if row:
return json.loads(row[0])
result = json.loads(row[0])
return result if isinstance(result, dict) else None
return None
def save_pending_feedback(
@@ -196,7 +198,6 @@ class SQLiteFlowPersistence(FlowPersistence):
state_data: Current state data
"""
# Import here to avoid circular imports
from crewai.flow.async_feedback.types import PendingFeedbackContext
# Convert state_data to dict
if isinstance(state_data, BaseModel):

View File

@@ -37,7 +37,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, get_current_call_id, llm_call_context
from crewai.llms.constants import (
ANTHROPIC_MODELS,
AZURE_MODELS,
@@ -770,7 +770,7 @@ class LLM(BaseLLM):
chunk_content = None
response_id = None
if hasattr(chunk,'id'):
if hasattr(chunk, "id"):
response_id = chunk.id
# Safely extract content from various chunk formats
@@ -827,7 +827,7 @@ class LLM(BaseLLM):
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
)
if result is not None:
@@ -849,7 +849,8 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.LLM_CALL,
response_id=response_id
response_id=response_id,
call_id=get_current_call_id(),
),
)
# --- 4) Fallback to non-streaming if no content received
@@ -1015,7 +1016,10 @@ class LLM(BaseLLM):
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
raise Exception(f"Failed to get streaming response: {e!s}") from e
@@ -1048,7 +1052,8 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
call_id=get_current_call_id(),
),
)
@@ -1476,7 +1481,8 @@ class LLM(BaseLLM):
chunk=chunk_content,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
call_id=get_current_call_id(),
),
)
@@ -1619,7 +1625,12 @@ class LLM(BaseLLM):
logging.error(f"Error executing function '{function_name}': {e}")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=f"Tool execution error: {e!s}"),
event=LLMCallFailedEvent(
error=f"Tool execution error: {e!s}",
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
crewai_event_bus.emit(
self,
@@ -1669,108 +1680,117 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
logging.info("Retrying LLM call without the unsupported 'stop'")
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
raise
async def acall(
self,
@@ -1808,43 +1828,54 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
self._validate_call_params()
self._validate_call_params()
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
if self.stream:
return await self._ahandle_streaming_response(
if self.stream:
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._ahandle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
@@ -1852,52 +1883,50 @@ class LLM(BaseLLM):
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
return await self._ahandle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
raise
def _handle_emit_call_events(
self,
@@ -1925,6 +1954,7 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)

View File

@@ -7,11 +7,15 @@ in CrewAI, including common functionality for native SDK implementations.
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from datetime import datetime
import json
import logging
import re
from typing import TYPE_CHECKING, Any, Final
import uuid
from pydantic import BaseModel
@@ -50,6 +54,32 @@ DEFAULT_CONTEXT_WINDOW_SIZE: Final[int] = 4096
DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True
_JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL)
_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_call_id", default=None
)
@contextmanager
def llm_call_context() -> Generator[str, None, None]:
"""Context manager that establishes an LLM call scope with a unique call_id."""
call_id = str(uuid.uuid4())
token = _current_call_id.set(call_id)
try:
yield call_id
finally:
_current_call_id.reset(token)
def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
if call_id is None:
logging.warning(
"LLM event emitted outside call context - generating fallback call_id"
)
return str(uuid.uuid4())
return call_id
class BaseLLM(ABC):
"""Abstract base class for LLM implementations.
@@ -351,6 +381,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -374,6 +405,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -394,6 +426,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -428,6 +461,7 @@ class BaseLLM(ABC):
from_agent=from_agent,
call_type=call_type,
response_id=response_id,
call_id=get_current_call_id(),
),
)

View File

@@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any, Final, Literal, TypeGuard, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -266,35 +266,46 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Anthropic
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
# Format messages for Anthropic
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
@@ -302,21 +313,13 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -342,28 +345,37 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
if self.stream:
return await self._ahandle_streaming_completion(
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
@@ -371,21 +383,13 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_completion_params(
self,

View File

@@ -43,7 +43,7 @@ try:
)
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
except ImportError:
raise ImportError(
@@ -293,32 +293,44 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
effective_response_model = response_model or self.response_format
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
@@ -326,16 +338,8 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
async def acall( # type: ignore[return]
self,
@@ -361,25 +365,35 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages_for_azure(messages)
effective_response_model = response_model or self.response_format
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
formatted_messages = self._format_messages_for_azure(messages)
if self.stream:
return await self._ahandle_streaming_completion(
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
@@ -387,16 +401,8 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
def _prepare_completion_params(
self,

View File

@@ -11,7 +11,7 @@ from pydantic import BaseModel
from typing_extensions import Required
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -378,77 +378,90 @@ class BedrockCompletion(BaseLLM):
"""Call AWS Bedrock Converse API."""
effective_response_model = response_model or self.response_format
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.stream:
return self._handle_streaming_converse(
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return self._handle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_converse(
formatted_messages,
body,
available_functions,
@@ -457,26 +470,17 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
return self._handle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -514,69 +518,80 @@ class BedrockCompletion(BaseLLM):
'Install with: uv add "crewai[bedrock-async]"'
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.stream:
return await self._ahandle_streaming_converse(
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return await self._ahandle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_converse(
formatted_messages,
body,
available_functions,
@@ -585,26 +600,17 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _handle_converse(
self,

View File

@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Literal, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -293,33 +293,45 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
if not self._invoke_before_llm_call_hooks(messages_for_hooks, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
messages_for_hooks, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return self._handle_streaming_completion(
if self.stream:
return self._handle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
formatted_content,
config,
available_functions,
@@ -328,29 +340,20 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
return self._handle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -376,28 +379,38 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
if self.stream:
return await self._ahandle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
formatted_content,
config,
available_functions,
@@ -406,29 +419,20 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_generation_config(
self,

View File

@@ -17,7 +17,7 @@ from openai.types.responses import Response
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -382,23 +382,35 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if self.api == "responses":
return self._call_responses(
if self.api == "responses":
return self._call_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return self._call_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -407,22 +419,13 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
return self._call_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _call_completions(
self,
@@ -479,20 +482,30 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if self.api == "responses":
return await self._acall_responses(
if self.api == "responses":
return await self._acall_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._acall_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -501,22 +514,13 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
return await self._acall_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def _acall_completions(
self,

View File

@@ -299,14 +299,16 @@ class TestFlow(Flow):
return agent.kickoff("Test query")
def verify_agent_parent_flow(result, agent, flow):
"""Verify that both the result and agent have the correct parent flow."""
assert result.parent_flow is flow
def verify_agent_flow_context(result, agent, flow):
"""Verify that both the result and agent have the correct flow context."""
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
assert agent is not None
assert agent.parent_flow is flow
assert agent._flow_id == flow.flow_id # type: ignore[attr-defined]
assert agent._request_id == flow.flow_id # type: ignore[attr-defined]
def test_sets_parent_flow_when_inside_flow():
def test_sets_flow_context_when_inside_flow():
"""Test that an Agent can be created and executed inside a Flow context."""
captured_event = None

View File

@@ -0,0 +1,108 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpUSxS5LeHwDTELElWlC5CDMzmr\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437564,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi there! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 10,\n \"total_tokens\": 19,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:05 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '460'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '477'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,215 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpStmyOpe9DrthWBlDdMZfVMJ1u\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 9,\n \"total_tokens\": 18,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:02 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '415'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '434'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"user","content":"Say bye"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '72'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpS1DP0Xd3tmWt5PBincVrdU7yw\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Goodbye! If you have more questions
in the future, feel free to reach out. Have a great day!\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 23,\n \"total_tokens\": 32,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:03 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '964'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '979'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,143 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini","stream":true,"stream_options":{"include_usage":true}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '125'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: 'data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"rVIyGQF2E"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ZGVqV7ZDm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"vnfm7IxlIB"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
How"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"o8F35ZZ"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
can"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"kiBzGe3"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
I"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"cbGT2RWgx"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
assist"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"DtxR"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
you"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"6y6Co8J"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
today"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SZOmm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"s9Bc0HqlPg"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"u9aar"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":9,"completion_tokens":9,"total_tokens":18,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"5hudm8ySqh39"}
data: [DONE]
'
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Mon, 26 Jan 2026 14:26:04 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '260'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '275'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -18,68 +18,3 @@ def test_huggingface_models():
"""Test that Huggingface models are properly configured."""
assert "huggingface" in MODELS
assert len(MODELS["huggingface"]) > 0
def test_openai_models_include_latest():
"""Test that OpenAI models include the latest models."""
assert "openai" in MODELS
openai_models = MODELS["openai"]
assert len(openai_models) > 0
assert "gpt-4o" in openai_models
assert "gpt-4o-mini" in openai_models
assert "o1" in openai_models
assert "o3" in openai_models
assert "o3-mini" in openai_models
def test_anthropic_models_include_latest():
"""Test that Anthropic models include the latest Claude models."""
assert "anthropic" in MODELS
anthropic_models = MODELS["anthropic"]
assert len(anthropic_models) > 0
assert "claude-3-7-sonnet-20250219" in anthropic_models
assert "claude-3-5-sonnet-20241022" in anthropic_models
assert "claude-3-5-haiku-20241022" in anthropic_models
def test_groq_models_include_latest():
"""Test that Groq models include the latest Llama models."""
assert "groq" in MODELS
groq_models = MODELS["groq"]
assert len(groq_models) > 0
assert "groq/llama-3.3-70b-versatile" in groq_models
def test_ollama_models_include_latest():
"""Test that Ollama models include the latest models."""
assert "ollama" in MODELS
ollama_models = MODELS["ollama"]
assert len(ollama_models) > 0
assert "ollama/llama3.2" in ollama_models
assert "ollama/llama3.3" in ollama_models
def test_all_providers_have_models():
"""Test that all providers in PROVIDERS have corresponding models in MODELS."""
providers_with_models = [
"openai",
"anthropic",
"gemini",
"nvidia_nim",
"groq",
"ollama",
"watson",
"bedrock",
"huggingface",
"sambanova",
]
for provider in providers_with_models:
assert provider in MODELS, f"Provider {provider} should have models defined"
assert len(MODELS[provider]) > 0, f"Provider {provider} should have at least one model"
def test_all_providers_have_env_vars_or_defaults():
"""Test that all providers have environment variable configurations."""
for provider in PROVIDERS:
if provider in ENV_VARS:
assert len(ENV_VARS[provider]) > 0, f"Provider {provider} should have env var config"

View File

@@ -4520,7 +4520,7 @@ def test_crew_copy_with_memory():
pytest.fail(f"Copying crew raised an unexpected exception: {e}")
def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
@CrewBase
class TestCrew:
agents_config = None
@@ -4582,10 +4582,11 @@ def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
flow.kickoff()
assert captured_crew is not None
assert captured_crew.parent_flow is flow
assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined]
def test_sets_parent_flow_when_outside_flow(researcher, writer):
def test_sets_flow_context_when_outside_flow(researcher, writer):
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
@@ -4594,11 +4595,12 @@ def test_sets_parent_flow_when_outside_flow(researcher, writer):
Task(description="Task 2", expected_output="output", agent=writer),
],
)
assert crew.parent_flow is None
assert not hasattr(crew, "_flow_id")
assert not hasattr(crew, "_request_id")
@pytest.mark.vcr()
def test_sets_parent_flow_when_inside_flow(researcher, writer):
def test_sets_flow_context_when_inside_flow(researcher, writer):
class MyFlow(Flow):
@start()
def start(self):
@@ -4615,7 +4617,8 @@ def test_sets_parent_flow_when_inside_flow(researcher, writer):
flow = MyFlow()
result = flow.kickoff()
assert result.parent_flow is flow
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):

View File

@@ -217,6 +217,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Hello ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -224,6 +225,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="World!",
call_id="test-call-id",
),
)
return mock_output
@@ -284,6 +286,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="",
call_id="test-call-id",
tool_call=ToolCall(
id="call-123",
function=FunctionCall(
@@ -364,6 +367,7 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -371,6 +375,7 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Stream!",
call_id="test-call-id",
),
)
return mock_output
@@ -451,6 +456,7 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Flow ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -458,6 +464,7 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="output!",
call_id="test-call-id",
),
)
return "done"
@@ -545,6 +552,7 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async flow ",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -553,6 +561,7 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="stream!",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -686,6 +695,7 @@ class TestStreamingEdgeCases:
type="llm_stream_chunk",
chunk="Task 1",
task_name="First task",
call_id="test-call-id",
),
)
return mock_output

View File

@@ -984,8 +984,8 @@ def test_streaming_fallback_to_non_streaming():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
nonlocal fallback_called
# Emit a couple of chunks to simulate partial streaming
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id = "Id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id = "Id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id="Id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id="Id", call_id="test-call-id"))
# Mark that fallback would be called
fallback_called = True
@@ -1041,7 +1041,7 @@ def test_streaming_empty_response_handling():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
# Emit a few empty chunks
for _ in range(3):
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="",response_id="id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="", response_id="id", call_id="test-call-id"))
# Return the default message for empty responses
return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request."
@@ -1280,6 +1280,105 @@ def test_llm_emits_event_with_lite_agent():
assert set(all_agent_id) == {str(agent.id)}
# ----------- CALL_ID CORRELATION TESTS -----------
@pytest.mark.vcr()
def test_llm_call_events_share_call_id():
"""All events from a single LLM call should share the same call_id."""
import uuid
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
with condition:
success = condition.wait_for(lambda: len(events) >= 2, timeout=10)
assert success, "Timeout waiting for LLM events"
# Behavior: all events from the call share the same call_id
assert len(events) == 2
assert events[0].call_id == events[1].call_id
# call_id should be a valid UUID
uuid.UUID(events[0].call_id)
@pytest.mark.vcr()
def test_streaming_chunks_share_call_id_with_call():
"""Streaming chunks should share call_id with started/completed events."""
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_chunk(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini", stream=True)
llm.call("Say hi")
with condition:
# Wait for at least started, some chunks, and completed
success = condition.wait_for(lambda: len(events) >= 3, timeout=10)
assert success, "Timeout waiting for streaming events"
# Behavior: all events (started, chunks, completed) share the same call_id
call_ids = {e.call_id for e in events}
assert len(call_ids) == 1
@pytest.mark.vcr()
def test_separate_llm_calls_have_different_call_ids():
"""Different LLM calls should have different call_ids."""
call_ids = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
call_ids.append(event.call_id)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
llm.call("Say bye")
with condition:
success = condition.wait_for(lambda: len(call_ids) >= 2, timeout=10)
assert success, "Timeout waiting for LLM call events"
# Behavior: each call has its own call_id
assert len(call_ids) == 2
assert call_ids[0] != call_ids[1]
# ----------- HUMAN FEEDBACK EVENTS -----------