Compare commits

...

12 Commits

Author SHA1 Message Date
dependabot[bot]
d3f424fd8f chore(deps-dev): bump types-aiofiles
Some checks failed
Build uv cache / build-cache (3.10) (push) Waiting to run
Build uv cache / build-cache (3.11) (push) Waiting to run
Build uv cache / build-cache (3.12) (push) Waiting to run
Build uv cache / build-cache (3.13) (push) Waiting to run
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Bumps [types-aiofiles](https://github.com/typeshed-internal/stub_uploader) from 24.1.0.20250822 to 25.1.0.20251011.
- [Commits](https://github.com/typeshed-internal/stub_uploader/commits)

---
updated-dependencies:
- dependency-name: types-aiofiles
  dependency-version: 25.1.0.20251011
  dependency-type: direct:development
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-02-03 12:02:28 -05:00
Matt Aitchison
fee9445067 fix: add .python-version to fix Dependabot uv updates (#4352)
Dependabot's uv updater defaults to Python 3.14.2, which is incompatible
with the project's requires-python constraint (>=3.10, <3.14). Adding
.python-version pins the Python version to 3.13 for dependency updates.

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-02-03 10:55:01 -06:00
Greyson LaLonde
a3c01265ee feat: add version check & integrate update notices 2026-02-03 10:17:50 -05:00
Matt Aitchison
aa7e7785bc chore: group dependabot security updates into single PR (#4351)
Configure dependabot to batch security updates together while keeping
regular version updates as separate PRs.
2026-02-03 08:53:28 -06:00
Thiago Moretto
e30645e855 limit stagehand dep version to 0.5.9 due breaking changes (#4339)
* limit to 0.5.9 due breaking changes + add env vars requirements

* fix tool spec extract that was ignoring with default

* original tool spec

* update spec
2026-02-03 09:43:24 -05:00
Greyson LaLonde
c1d2801be2 fix: reject reserved script names for crew folders 2026-02-03 09:16:55 -05:00
Greyson LaLonde
6a8483fcb6 fix: resolve race condition in guardrail event emission test 2026-02-03 09:06:48 -05:00
Greyson LaLonde
5fb602dff2 fix: replace timing-based concurrency test with state tracking 2026-02-03 08:58:51 -05:00
Greyson LaLonde
b90cff580a fix: relax openai and litellm dependency constraints 2026-02-03 08:51:55 -05:00
Vini Brasil
576b74b2ef Add call_id to LLM events for correlating requests (#4281)
When monitoring LLM events, consumers need to know which events belong
to the same API call. Before this change, there was no way to correlate
LLMCallStartedEvent, LLMStreamChunkEvent, and LLMCallCompletedEvent
belonging to the same request.
2026-02-03 10:10:33 -03:00
Greyson LaLonde
7590d4c6e3 fix: enforce additionalProperties=false in schemas
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
* fix: enforce additionalProperties=false in schemas

* fix: ensure nested items have required properties
2026-02-02 22:19:04 -05:00
Sampson
8c6436234b adds additional search params (#4321)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Introduces support for additional Brave Search API web-search parameters.
2026-02-02 11:17:02 -08:00
35 changed files with 19008 additions and 1269 deletions

View File

@@ -5,7 +5,12 @@
version: 2
updates:
- package-ecosystem: uv # See documentation for possible values
directory: "/" # Location of package manifests
- package-ecosystem: uv
directory: "/"
schedule:
interval: "weekly"
groups:
security-updates:
applies-to: security-updates
patterns:
- "*"

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.13

View File

@@ -8,8 +8,9 @@ from typing import Any
from crewai.tools.base_tool import BaseTool, EnvVar
from pydantic import BaseModel
from pydantic.fields import FieldInfo
from pydantic.json_schema import GenerateJsonSchema
from pydantic_core import PydanticOmit
from pydantic_core import PydanticOmit, PydanticUndefined
from crewai_tools import tools
@@ -44,6 +45,9 @@ class ToolSpecExtractor:
schema = self._unwrap_schema(core_schema)
fields = schema.get("schema", {}).get("fields", {})
# Use model_fields to get defaults (handles both default and default_factory)
model_fields = tool_class.model_fields
tool_info = {
"name": tool_class.__name__,
"humanized_name": self._extract_field_default(
@@ -54,9 +58,9 @@ class ToolSpecExtractor:
).strip(),
"run_params_schema": self._extract_params(fields.get("args_schema")),
"init_params_schema": self._extract_init_params(tool_class),
"env_vars": self._extract_env_vars(fields.get("env_vars")),
"package_dependencies": self._extract_field_default(
fields.get("package_dependencies"), fallback=[]
"env_vars": self._extract_env_vars_from_model_fields(model_fields),
"package_dependencies": self._extract_package_deps_from_model_fields(
model_fields
),
}
@@ -103,10 +107,27 @@ class ToolSpecExtractor:
return {}
@staticmethod
def _extract_env_vars(
env_vars_field: dict[str, Any] | None,
def _get_field_default(field: FieldInfo | None) -> Any:
"""Get default value from a FieldInfo, handling both default and default_factory."""
if not field:
return None
default_value = field.default
if default_value is PydanticUndefined or default_value is None:
if field.default_factory:
return field.default_factory()
return None
return default_value
@staticmethod
def _extract_env_vars_from_model_fields(
model_fields: dict[str, FieldInfo],
) -> list[dict[str, Any]]:
if not env_vars_field:
default_value = ToolSpecExtractor._get_field_default(
model_fields.get("env_vars")
)
if not default_value:
return []
return [
@@ -116,10 +137,22 @@ class ToolSpecExtractor:
"required": env_var.required,
"default": env_var.default,
}
for env_var in env_vars_field.get("schema", {}).get("default", [])
for env_var in default_value
if isinstance(env_var, EnvVar)
]
@staticmethod
def _extract_package_deps_from_model_fields(
model_fields: dict[str, FieldInfo],
) -> list[str]:
default_value = ToolSpecExtractor._get_field_default(
model_fields.get("package_dependencies")
)
if not isinstance(default_value, list):
return []
return default_value
@staticmethod
def _extract_init_params(tool_class: type[BaseTool]) -> dict[str, Any]:
ignored_init_params = [
@@ -152,7 +185,7 @@ class ToolSpecExtractor:
if __name__ == "__main__":
output_file = Path(__file__).parent / "tool.specs.json"
output_file = Path(__file__).parent.parent.parent / "tool.specs.json"
extractor = ToolSpecExtractor()
extractor.extract_all_tools()

View File

@@ -1,12 +1,17 @@
from datetime import datetime
import json
import os
import time
from typing import Any, ClassVar
from typing import Annotated, Any, ClassVar, Literal
from crewai.tools import BaseTool, EnvVar
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from pydantic.types import StringConstraints
import requests
load_dotenv()
def _save_results_to_file(content: str) -> None:
"""Saves the search results to a file."""
@@ -15,37 +20,72 @@ def _save_results_to_file(content: str) -> None:
file.write(content)
class BraveSearchToolSchema(BaseModel):
"""Input for BraveSearchTool."""
FreshnessPreset = Literal["pd", "pw", "pm", "py"]
FreshnessRange = Annotated[
str, StringConstraints(pattern=r"^\d{4}-\d{2}-\d{2}to\d{4}-\d{2}-\d{2}$")
]
Freshness = FreshnessPreset | FreshnessRange
SafeSearch = Literal["off", "moderate", "strict"]
search_query: str = Field(
..., description="Mandatory search query you want to use to search the internet"
class BraveSearchToolSchema(BaseModel):
"""Input for BraveSearchTool"""
query: str = Field(..., description="Search query to perform")
country: str | None = Field(
default=None,
description="Country code for geo-targeting (e.g., 'US', 'BR').",
)
search_language: str | None = Field(
default=None,
description="Language code for the search results (e.g., 'en', 'es').",
)
count: int | None = Field(
default=None,
description="The maximum number of results to return. Actual number may be less.",
)
offset: int | None = Field(
default=None, description="Skip the first N result sets/pages. Max is 9."
)
safesearch: SafeSearch | None = Field(
default=None,
description="Filter out explicit content. Options: off/moderate/strict",
)
spellcheck: bool | None = Field(
default=None,
description="Attempt to correct spelling errors in the search query.",
)
freshness: Freshness | None = Field(
default=None,
description="Enforce freshness of results. Options: pd/pw/pm/py, or YYYY-MM-DDtoYYYY-MM-DD",
)
text_decorations: bool | None = Field(
default=None,
description="Include markup to highlight search terms in the results.",
)
extra_snippets: bool | None = Field(
default=None,
description="Include up to 5 text snippets for each page if possible.",
)
operators: bool | None = Field(
default=None,
description="Whether to apply search operators (e.g., site:example.com).",
)
# TODO: Extend support to additional endpoints (e.g., /images, /news, etc.)
class BraveSearchTool(BaseTool):
"""BraveSearchTool - A tool for performing web searches using the Brave Search API.
"""A tool that performs web searches using the Brave Search API."""
This module provides functionality to search the internet using Brave's Search API,
supporting customizable result counts and country-specific searches.
Dependencies:
- requests
- pydantic
- python-dotenv (for API key management)
"""
name: str = "Brave Web Search the internet"
name: str = "Brave Search"
description: str = (
"A tool that can be used to search the internet with a search_query."
"A tool that performs web searches using the Brave Search API. "
"Results are returned as structured JSON data."
)
args_schema: type[BaseModel] = BraveSearchToolSchema
search_url: str = "https://api.search.brave.com/res/v1/web/search"
country: str | None = ""
n_results: int = 10
save_file: bool = False
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
@@ -55,6 +95,9 @@ class BraveSearchTool(BaseTool):
),
]
)
# Rate limiting parameters
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -73,19 +116,64 @@ class BraveSearchTool(BaseTool):
self._min_request_interval - (current_time - self._last_request_time)
)
BraveSearchTool._last_request_time = time.time()
# Construct and send the request
try:
search_query = kwargs.get("search_query") or kwargs.get("query")
if not search_query:
raise ValueError("Search query is required")
# Maintain both "search_query" and "query" for backwards compatibility
query = kwargs.get("search_query") or kwargs.get("query")
if not query:
raise ValueError("Query is required")
payload = {"q": query}
if country := kwargs.get("country"):
payload["country"] = country
if search_language := kwargs.get("search_language"):
payload["search_language"] = search_language
# Fallback to deprecated n_results parameter if no count is provided
count = kwargs.get("count")
if count is not None:
payload["count"] = count
else:
payload["count"] = self.n_results
# Offset may be 0, so avoid truthiness check
offset = kwargs.get("offset")
if offset is not None:
payload["offset"] = offset
if safesearch := kwargs.get("safesearch"):
payload["safesearch"] = safesearch
save_file = kwargs.get("save_file", self.save_file)
n_results = kwargs.get("n_results", self.n_results)
if freshness := kwargs.get("freshness"):
payload["freshness"] = freshness
payload = {"q": search_query, "count": n_results}
# Boolean parameters
spellcheck = kwargs.get("spellcheck")
if spellcheck is not None:
payload["spellcheck"] = spellcheck
if self.country != "":
payload["country"] = self.country
text_decorations = kwargs.get("text_decorations")
if text_decorations is not None:
payload["text_decorations"] = text_decorations
extra_snippets = kwargs.get("extra_snippets")
if extra_snippets is not None:
payload["extra_snippets"] = extra_snippets
operators = kwargs.get("operators")
if operators is not None:
payload["operators"] = operators
# Limit the result types to "web" since there is presently no
# handling of other types like "discussions", "faq", "infobox",
# "news", "videos", or "locations".
payload["result_filter"] = "web"
# Setup Request Headers
headers = {
"X-Subscription-Token": os.environ["BRAVE_API_KEY"],
"Accept": "application/json",
@@ -97,25 +185,32 @@ class BraveSearchTool(BaseTool):
response.raise_for_status() # Handle non-200 responses
results = response.json()
# TODO: Handle other result types like "discussions", "faq", etc.
web_results_items = []
if "web" in results:
results = results["web"]["results"]
string = []
for result in results:
try:
string.append(
"\n".join(
[
f"Title: {result['title']}",
f"Link: {result['url']}",
f"Snippet: {result['description']}",
"---",
]
)
)
except KeyError: # noqa: PERF203
continue
web_results = results["web"]["results"]
content = "\n".join(string)
for result in web_results:
url = result.get("url")
title = result.get("title")
# If, for whatever reason, this entry does not have a title
# or url, skip it.
if not url or not title:
continue
item = {
"url": url,
"title": title,
}
description = result.get("description")
if description:
item["description"] = description
snippets = result.get("extra_snippets")
if snippets:
item["snippets"] = snippets
web_results_items.append(item)
content = json.dumps(web_results_items)
except requests.RequestException as e:
return f"Error performing search: {e!s}"
except KeyError as e:

View File

@@ -4,7 +4,7 @@ import os
import re
from typing import Any
from crewai.tools import BaseTool
from crewai.tools import BaseTool, EnvVar
from pydantic import BaseModel, Field
@@ -137,7 +137,21 @@ class StagehandTool(BaseTool):
- 'observe': For finding elements in a specific area
"""
args_schema: type[BaseModel] = StagehandToolSchema
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand"])
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand<=0.5.9"])
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="BROWSERBASE_API_KEY",
description="API key for Browserbase services",
required=False,
),
EnvVar(
name="BROWSERBASE_PROJECT_ID",
description="Project ID for Browserbase services",
required=False,
),
]
)
# Stagehand configuration
api_key: str | None = None

View File

@@ -23,23 +23,26 @@ class MockTool(BaseTool):
)
my_parameter: str = Field("This is default value", description="What a description")
my_parameter_bool: bool = Field(False)
# Use default_factory like real tools do (not direct default)
package_dependencies: list[str] = Field(
["this-is-a-required-package", "another-required-package"], description=""
default_factory=lambda: ["this-is-a-required-package", "another-required-package"]
)
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="SERPER_API_KEY",
description="API key for Serper",
required=True,
default=None,
),
EnvVar(
name="API_RATE_LIMIT",
description="API rate limit",
required=False,
default="100",
),
]
)
env_vars: list[EnvVar] = [
EnvVar(
name="SERPER_API_KEY",
description="API key for Serper",
required=True,
default=None,
),
EnvVar(
name="API_RATE_LIMIT",
description="API rate limit",
required=False,
default="100",
),
]
@pytest.fixture

View File

@@ -1,8 +1,10 @@
import json
from unittest.mock import patch
from crewai_tools.tools.brave_search_tool.brave_search_tool import BraveSearchTool
import pytest
from crewai_tools.tools.brave_search_tool.brave_search_tool import BraveSearchTool
@pytest.fixture
def brave_tool():
@@ -30,16 +32,43 @@ def test_brave_tool_search(mock_get, brave_tool):
}
mock_get.return_value.json.return_value = mock_response
result = brave_tool.run(search_query="test")
result = brave_tool.run(query="test")
assert "Test Title" in result
assert "http://test.com" in result
def test_brave_tool():
tool = BraveSearchTool(
n_results=2,
)
tool.run(search_query="ChatGPT")
@patch("requests.get")
def test_brave_tool(mock_get):
mock_response = {
"web": {
"results": [
{
"title": "Brave Browser",
"url": "https://brave.com",
"description": "Brave Browser description",
}
]
}
}
mock_get.return_value.json.return_value = mock_response
tool = BraveSearchTool(n_results=2)
result = tool.run(query="Brave Browser")
assert result is not None
# Parse JSON so we can examine the structure
data = json.loads(result)
assert isinstance(data, list)
assert len(data) >= 1
# First item should have expected fields: title, url, and description
first = data[0]
assert "title" in first
assert first["title"] == "Brave Browser"
assert "url" in first
assert first["url"] == "https://brave.com"
assert "description" in first
assert first["description"] == "Brave Browser description"
if __name__ == "__main__":

File diff suppressed because it is too large Load Diff

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
# Core Dependencies
"pydantic~=2.11.9",
"openai~=1.83.0",
"openai>=1.83.0,<3",
"instructor>=1.3.3",
# Text Processing
"pdfplumber~=0.11.4",
@@ -78,7 +78,7 @@ voyageai = [
"voyageai~=0.3.5",
]
litellm = [
"litellm~=1.74.9",
"litellm>=1.74.9,<3",
]
bedrock = [
"boto3~=1.40.45",

View File

@@ -16,6 +16,7 @@ from crewai.agents.agent_adapters.openai_agents.protocols import (
)
from crewai.tools import BaseTool
from crewai.utilities.import_utils import require
from crewai.utilities.pydantic_schema_utils import force_additional_properties_false
from crewai.utilities.string_utils import sanitize_tool_name
@@ -135,7 +136,9 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
for tool in tools:
schema: dict[str, Any] = tool.args_schema.model_json_schema()
schema.update({"additionalProperties": False, "type": "object"})
schema = force_additional_properties_false(schema)
schema.update({"type": "object"})
openai_tool: OpenAIFunctionTool = cast(
OpenAIFunctionTool,

View File

@@ -1,10 +1,13 @@
from typing import Any
DEFAULT_CREWAI_ENTERPRISE_URL = "https://app.crewai.com"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER = "workos"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE = "client_01JNJQWBJ4SPFN3SWJM5T7BDG8"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID = "client_01JYT06R59SP0NXYGD994NFXXX"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN = "login.crewai.com"
ENV_VARS = {
ENV_VARS: dict[str, list[dict[str, Any]]] = {
"openai": [
{
"prompt": "Enter your OPENAI API key (press Enter to skip)",
@@ -112,7 +115,7 @@ ENV_VARS = {
}
PROVIDERS = [
PROVIDERS: list[str] = [
"openai",
"anthropic",
"gemini",
@@ -127,7 +130,7 @@ PROVIDERS = [
"sambanova",
]
MODELS = {
MODELS: dict[str, list[str]] = {
"openai": [
"gpt-4",
"gpt-4.1",

View File

@@ -3,6 +3,7 @@ import shutil
import sys
import click
import tomli
from crewai.cli.constants import ENV_VARS, MODELS
from crewai.cli.provider import (
@@ -13,7 +14,31 @@ from crewai.cli.provider import (
from crewai.cli.utils import copy_template, load_env_vars, write_env_file
def create_folder_structure(name, parent_folder=None):
def get_reserved_script_names() -> set[str]:
"""Get reserved script names from pyproject.toml template.
Returns:
Set of reserved script names that would conflict with crew folder names.
"""
package_dir = Path(__file__).parent
template_path = package_dir / "templates" / "crew" / "pyproject.toml"
with open(template_path, "r") as f:
template_content = f.read()
template_content = template_content.replace("{{folder_name}}", "_placeholder_")
template_content = template_content.replace("{{name}}", "placeholder")
template_content = template_content.replace("{{crew_name}}", "Placeholder")
template_data = tomli.loads(template_content)
script_names = set(template_data.get("project", {}).get("scripts", {}).keys())
script_names.discard("_placeholder_")
return script_names
def create_folder_structure(
name: str, parent_folder: str | None = None
) -> tuple[Path, str, str]:
import keyword
import re
@@ -51,6 +76,14 @@ def create_folder_structure(name, parent_folder=None):
f"Project name '{name}' would generate invalid Python module name '{folder_name}'"
)
reserved_names = get_reserved_script_names()
if folder_name in reserved_names:
raise ValueError(
f"Project name '{name}' would generate folder name '{folder_name}' which is reserved. "
f"Reserved names are: {', '.join(sorted(reserved_names))}. "
"Please choose a different name."
)
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
class_name = re.sub(r"[^a-zA-Z0-9_]", "", class_name)
@@ -114,7 +147,9 @@ def create_folder_structure(name, parent_folder=None):
return folder_path, folder_name, class_name
def copy_template_files(folder_path, name, class_name, parent_folder):
def copy_template_files(
folder_path: Path, name: str, class_name: str, parent_folder: str | None
) -> None:
package_dir = Path(__file__).parent
templates_dir = package_dir / "templates" / "crew"
@@ -155,7 +190,12 @@ def copy_template_files(folder_path, name, class_name, parent_folder):
copy_template(src_file, dst_file, name, class_name, folder_path.name)
def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
def create_crew(
name: str,
provider: str | None = None,
skip_provider: bool = False,
parent_folder: str | None = None,
) -> None:
folder_path, folder_name, class_name = create_folder_structure(name, parent_folder)
env_vars = load_env_vars(folder_path)
if not skip_provider:
@@ -189,7 +229,9 @@ def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
if selected_provider is None: # User typed 'q'
click.secho("Exiting...", fg="yellow")
sys.exit(0)
if selected_provider: # Valid selection
if selected_provider and isinstance(
selected_provider, str
): # Valid selection
break
click.secho(
"No provider selected. Please try again or press 'q' to exit.", fg="red"

View File

@@ -1,8 +1,10 @@
from collections import defaultdict
from collections.abc import Sequence
import json
import os
from pathlib import Path
import time
from typing import Any
import certifi
import click
@@ -11,16 +13,15 @@ import requests
from crewai.cli.constants import JSON_URL, MODELS, PROVIDERS
def select_choice(prompt_message, choices):
"""
Presents a list of choices to the user and prompts them to select one.
def select_choice(prompt_message: str, choices: Sequence[str]) -> str | None:
"""Presents a list of choices to the user and prompts them to select one.
Args:
- prompt_message (str): The message to display to the user before presenting the choices.
- choices (list): A list of options to present to the user.
prompt_message: The message to display to the user before presenting the choices.
choices: A list of options to present to the user.
Returns:
- str: The selected choice from the list, or None if the user chooses to quit.
The selected choice from the list, or None if the user chooses to quit.
"""
provider_models = get_provider_data()
@@ -52,16 +53,14 @@ def select_choice(prompt_message, choices):
)
def select_provider(provider_models):
"""
Presents a list of providers to the user and prompts them to select one.
def select_provider(provider_models: dict[str, list[str]]) -> str | None | bool:
"""Presents a list of providers to the user and prompts them to select one.
Args:
- provider_models (dict): A dictionary of provider models.
provider_models: A dictionary of provider models.
Returns:
- str: The selected provider
- None: If user explicitly quits
The selected provider, None if user explicitly quits, or False if no selection.
"""
predefined_providers = [p.lower() for p in PROVIDERS]
all_providers = sorted(set(predefined_providers + list(provider_models.keys())))
@@ -80,16 +79,15 @@ def select_provider(provider_models):
return provider.lower() if provider else False
def select_model(provider, provider_models):
"""
Presents a list of models for a given provider to the user and prompts them to select one.
def select_model(provider: str, provider_models: dict[str, list[str]]) -> str | None:
"""Presents a list of models for a given provider to the user and prompts them to select one.
Args:
- provider (str): The provider for which to select a model.
- provider_models (dict): A dictionary of provider models.
provider: The provider for which to select a model.
provider_models: A dictionary of provider models.
Returns:
- str: The selected model, or None if the operation is aborted or an invalid selection is made.
The selected model, or None if the operation is aborted or an invalid selection is made.
"""
predefined_providers = [p.lower() for p in PROVIDERS]
@@ -107,16 +105,17 @@ def select_model(provider, provider_models):
)
def load_provider_data(cache_file, cache_expiry):
"""
Loads provider data from a cache file if it exists and is not expired. If the cache is expired or corrupted, it fetches the data from the web.
def load_provider_data(cache_file: Path, cache_expiry: int) -> dict[str, Any] | None:
"""Loads provider data from a cache file if it exists and is not expired.
If the cache is expired or corrupted, it fetches the data from the web.
Args:
- cache_file (Path): The path to the cache file.
- cache_expiry (int): The cache expiry time in seconds.
cache_file: The path to the cache file.
cache_expiry: The cache expiry time in seconds.
Returns:
- dict or None: The loaded provider data or None if the operation fails.
The loaded provider data or None if the operation fails.
"""
current_time = time.time()
if (
@@ -137,32 +136,31 @@ def load_provider_data(cache_file, cache_expiry):
return fetch_provider_data(cache_file)
def read_cache_file(cache_file):
"""
Reads and returns the JSON content from a cache file. Returns None if the file contains invalid JSON.
def read_cache_file(cache_file: Path) -> dict[str, Any] | None:
"""Reads and returns the JSON content from a cache file.
Args:
- cache_file (Path): The path to the cache file.
cache_file: The path to the cache file.
Returns:
- dict or None: The JSON content of the cache file or None if the JSON is invalid.
The JSON content of the cache file or None if the JSON is invalid.
"""
try:
with open(cache_file, "r") as f:
return json.load(f)
data: dict[str, Any] = json.load(f)
return data
except json.JSONDecodeError:
return None
def fetch_provider_data(cache_file):
"""
Fetches provider data from a specified URL and caches it to a file.
def fetch_provider_data(cache_file: Path) -> dict[str, Any] | None:
"""Fetches provider data from a specified URL and caches it to a file.
Args:
- cache_file (Path): The path to the cache file.
cache_file: The path to the cache file.
Returns:
- dict or None: The fetched provider data or None if the operation fails.
The fetched provider data or None if the operation fails.
"""
ssl_config = os.environ["SSL_CERT_FILE"] = certifi.where()
@@ -180,36 +178,39 @@ def fetch_provider_data(cache_file):
return None
def download_data(response):
"""
Downloads data from a given HTTP response and returns the JSON content.
def download_data(response: requests.Response) -> dict[str, Any]:
"""Downloads data from a given HTTP response and returns the JSON content.
Args:
- response (requests.Response): The HTTP response object.
response: The HTTP response object.
Returns:
- dict: The JSON content of the response.
The JSON content of the response.
"""
total_size = int(response.headers.get("content-length", 0))
block_size = 8192
data_chunks = []
data_chunks: list[bytes] = []
bar: Any
with click.progressbar(
length=total_size, label="Downloading", show_pos=True
) as progress_bar:
) as bar:
for chunk in response.iter_content(block_size):
if chunk:
data_chunks.append(chunk)
progress_bar.update(len(chunk))
bar.update(len(chunk))
data_content = b"".join(data_chunks)
return json.loads(data_content.decode("utf-8"))
result: dict[str, Any] = json.loads(data_content.decode("utf-8"))
return result
def get_provider_data():
"""
Retrieves provider data from a cache file, filters out models based on provider criteria, and returns a dictionary of providers mapped to their models.
def get_provider_data() -> dict[str, list[str]] | None:
"""Retrieves provider data from a cache file.
Filters out models based on provider criteria, and returns a dictionary of providers
mapped to their models.
Returns:
- dict or None: A dictionary of providers mapped to their models or None if the operation fails.
A dictionary of providers mapped to their models or None if the operation fails.
"""
cache_dir = Path.home() / ".crewai"
cache_dir.mkdir(exist_ok=True)

View File

@@ -1,6 +1,107 @@
"""Version utilities for CrewAI CLI."""
from collections.abc import Mapping
from datetime import datetime, timedelta
from functools import lru_cache
import importlib.metadata
import json
from pathlib import Path
from typing import Any, cast
from urllib import request
from urllib.error import URLError
import appdirs
from packaging.version import InvalidVersion, parse
@lru_cache(maxsize=1)
def _get_cache_file() -> Path:
"""Get the path to the version cache file.
Cached to avoid repeated filesystem operations.
"""
cache_dir = Path(appdirs.user_cache_dir("crewai"))
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir / "version_cache.json"
def get_crewai_version() -> str:
"""Get the version number of CrewAI running the CLI"""
"""Get the version number of CrewAI running the CLI."""
return importlib.metadata.version("crewai")
def _is_cache_valid(cache_data: Mapping[str, Any]) -> bool:
"""Check if the cache is still valid, less than 24 hours old."""
if "timestamp" not in cache_data:
return False
try:
cache_time = datetime.fromisoformat(str(cache_data["timestamp"]))
return datetime.now() - cache_time < timedelta(hours=24)
except (ValueError, TypeError):
return False
def get_latest_version_from_pypi(timeout: int = 2) -> str | None:
"""Get the latest version of CrewAI from PyPI.
Args:
timeout: Request timeout in seconds.
Returns:
Latest version string or None if unable to fetch.
"""
cache_file = _get_cache_file()
if cache_file.exists():
try:
cache_data = json.loads(cache_file.read_text())
if _is_cache_valid(cache_data):
return cast(str | None, cache_data.get("version"))
except (json.JSONDecodeError, OSError):
pass
try:
with request.urlopen(
"https://pypi.org/pypi/crewai/json", timeout=timeout
) as response:
data = json.loads(response.read())
latest_version = cast(str, data["info"]["version"])
cache_data = {
"version": latest_version,
"timestamp": datetime.now().isoformat(),
}
cache_file.write_text(json.dumps(cache_data))
return latest_version
except (URLError, json.JSONDecodeError, KeyError, OSError):
return None
def check_version() -> tuple[str, str | None]:
"""Check current and latest versions.
Returns:
Tuple of (current_version, latest_version).
latest_version is None if unable to fetch from PyPI.
"""
current = get_crewai_version()
latest = get_latest_version_from_pypi()
return current, latest
def is_newer_version_available() -> tuple[bool, str, str | None]:
"""Check if a newer version is available.
Returns:
Tuple of (is_newer, current_version, latest_version).
"""
current, latest = check_version()
if latest is None:
return False, current, None
try:
return parse(latest) > parse(current), current, latest
except (InvalidVersion, TypeError):
return False, current, latest

View File

@@ -10,6 +10,7 @@ class LLMEventBase(BaseEvent):
from_task: Any | None = None
from_agent: Any | None = None
model: str | None = None
call_id: str
def __init__(self, **data: Any) -> None:
if data.get("from_task"):

View File

@@ -1,11 +1,14 @@
import os
import threading
from typing import Any, ClassVar
from typing import Any, ClassVar, cast
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.text import Text
from crewai.cli.version import is_newer_version_available
class ConsoleFormatter:
tool_usage_counts: ClassVar[dict[str, int]] = {}
@@ -35,6 +38,39 @@ class ConsoleFormatter:
padding=(1, 2),
)
def _show_version_update_message_if_needed(self) -> None:
"""Show version update message if a newer version is available.
Only displays when verbose mode is enabled and not running in CI/CD.
"""
if not self.verbose:
return
if os.getenv("CI", "").lower() in ("true", "1"):
return
try:
is_newer, current, latest = is_newer_version_available()
if is_newer and latest:
message = f"""A new version of CrewAI is available!
Current version: {current}
Latest version: {latest}
To update, run: uv sync --upgrade-package crewai"""
panel = Panel(
message,
title="✨ Update Available ✨",
border_style="yellow",
padding=(1, 2),
)
self.console.print(panel)
self.console.print()
except Exception: # noqa: S110
# Silently ignore errors in version check - it's non-critical
pass
def _show_tracing_disabled_message_if_needed(self) -> None:
"""Show tracing disabled message if tracing is not enabled."""
from crewai.events.listeners.tracing.utils import (
@@ -176,9 +212,10 @@ To enable tracing, do any one of these:
if not self.verbose:
return
# Reset the crew completion event for this new crew execution
ConsoleFormatter.crew_completion_printed.clear()
self._show_version_update_message_if_needed()
content = self.create_status_content(
"Crew Execution Started",
crew_name,
@@ -237,6 +274,8 @@ To enable tracing, do any one of these:
def handle_flow_started(self, flow_name: str, flow_id: str) -> None:
"""Show flow started panel."""
self._show_version_update_message_if_needed()
content = Text()
content.append("Flow Started\n", style="blue bold")
content.append("Name: ", style="white")
@@ -885,7 +924,7 @@ To enable tracing, do any one of these:
is_a2a_delegation = False
try:
output_data = json.loads(formatted_answer.output)
output_data = json.loads(cast(str, formatted_answer.output))
if isinstance(output_data, dict):
if output_data.get("is_a2a") is True:
is_a2a_delegation = True

View File

@@ -37,7 +37,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, get_current_call_id, llm_call_context
from crewai.llms.constants import (
ANTHROPIC_MODELS,
AZURE_MODELS,
@@ -770,7 +770,7 @@ class LLM(BaseLLM):
chunk_content = None
response_id = None
if hasattr(chunk,'id'):
if hasattr(chunk, "id"):
response_id = chunk.id
# Safely extract content from various chunk formats
@@ -827,7 +827,7 @@ class LLM(BaseLLM):
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
)
if result is not None:
@@ -849,7 +849,8 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.LLM_CALL,
response_id=response_id
response_id=response_id,
call_id=get_current_call_id(),
),
)
# --- 4) Fallback to non-streaming if no content received
@@ -1015,7 +1016,10 @@ class LLM(BaseLLM):
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
raise Exception(f"Failed to get streaming response: {e!s}") from e
@@ -1048,7 +1052,8 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
call_id=get_current_call_id(),
),
)
@@ -1476,7 +1481,8 @@ class LLM(BaseLLM):
chunk=chunk_content,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
call_id=get_current_call_id(),
),
)
@@ -1619,7 +1625,12 @@ class LLM(BaseLLM):
logging.error(f"Error executing function '{function_name}': {e}")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=f"Tool execution error: {e!s}"),
event=LLMCallFailedEvent(
error=f"Tool execution error: {e!s}",
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
crewai_event_bus.emit(
self,
@@ -1669,108 +1680,117 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
logging.info("Retrying LLM call without the unsupported 'stop'")
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
raise
async def acall(
self,
@@ -1808,43 +1828,54 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
self._validate_call_params()
self._validate_call_params()
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
if self.stream:
return await self._ahandle_streaming_response(
if self.stream:
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._ahandle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
@@ -1852,52 +1883,50 @@ class LLM(BaseLLM):
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
return await self._ahandle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
raise
def _handle_emit_call_events(
self,
@@ -1925,6 +1954,7 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)

View File

@@ -7,11 +7,15 @@ in CrewAI, including common functionality for native SDK implementations.
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from datetime import datetime
import json
import logging
import re
from typing import TYPE_CHECKING, Any, Final
import uuid
from pydantic import BaseModel
@@ -50,6 +54,32 @@ DEFAULT_CONTEXT_WINDOW_SIZE: Final[int] = 4096
DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True
_JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL)
_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_call_id", default=None
)
@contextmanager
def llm_call_context() -> Generator[str, None, None]:
"""Context manager that establishes an LLM call scope with a unique call_id."""
call_id = str(uuid.uuid4())
token = _current_call_id.set(call_id)
try:
yield call_id
finally:
_current_call_id.reset(token)
def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
if call_id is None:
logging.warning(
"LLM event emitted outside call context - generating fallback call_id"
)
return str(uuid.uuid4())
return call_id
class BaseLLM(ABC):
"""Abstract base class for LLM implementations.
@@ -351,6 +381,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -374,6 +405,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -394,6 +426,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -428,6 +461,7 @@ class BaseLLM(ABC):
from_agent=from_agent,
call_type=call_type,
response_id=response_id,
call_id=get_current_call_id(),
),
)

View File

@@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any, Final, Literal, TypeGuard, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -266,35 +266,46 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Anthropic
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
# Format messages for Anthropic
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
@@ -302,21 +313,13 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -342,28 +345,37 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
if self.stream:
return await self._ahandle_streaming_completion(
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
@@ -371,21 +383,13 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_completion_params(
self,

View File

@@ -43,7 +43,7 @@ try:
)
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
except ImportError:
raise ImportError(
@@ -293,32 +293,44 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
effective_response_model = response_model or self.response_format
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
@@ -326,16 +338,8 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
async def acall( # type: ignore[return]
self,
@@ -361,25 +365,35 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages_for_azure(messages)
effective_response_model = response_model or self.response_format
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
formatted_messages = self._format_messages_for_azure(messages)
if self.stream:
return await self._ahandle_streaming_completion(
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
@@ -387,16 +401,8 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
def _prepare_completion_params(
self,

View File

@@ -11,7 +11,7 @@ from pydantic import BaseModel
from typing_extensions import Required
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -378,77 +378,90 @@ class BedrockCompletion(BaseLLM):
"""Call AWS Bedrock Converse API."""
effective_response_model = response_model or self.response_format
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.stream:
return self._handle_streaming_converse(
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return self._handle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_converse(
formatted_messages,
body,
available_functions,
@@ -457,26 +470,17 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
return self._handle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -514,69 +518,80 @@ class BedrockCompletion(BaseLLM):
'Install with: uv add "crewai[bedrock-async]"'
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.stream:
return await self._ahandle_streaming_converse(
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return await self._ahandle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_converse(
formatted_messages,
body,
available_functions,
@@ -585,26 +600,17 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _handle_converse(
self,

View File

@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Literal, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -293,33 +293,45 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
if not self._invoke_before_llm_call_hooks(messages_for_hooks, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
messages_for_hooks, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return self._handle_streaming_completion(
if self.stream:
return self._handle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
formatted_content,
config,
available_functions,
@@ -328,29 +340,20 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
return self._handle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -376,28 +379,38 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
if self.stream:
return await self._ahandle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
formatted_content,
config,
available_functions,
@@ -406,29 +419,20 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_generation_config(
self,

View File

@@ -17,7 +17,7 @@ from openai.types.responses import Response
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -382,23 +382,35 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if self.api == "responses":
return self._call_responses(
if self.api == "responses":
return self._call_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return self._call_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -407,22 +419,13 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
return self._call_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _call_completions(
self,
@@ -479,20 +482,30 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if self.api == "responses":
return await self._acall_responses(
if self.api == "responses":
return await self._acall_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._acall_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -501,22 +514,13 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
return await self._acall_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def _acall_completions(
self,
@@ -1521,13 +1525,16 @@ class OpenAICompletion(BaseLLM):
) -> list[dict[str, Any]]:
"""Convert CrewAI tool format to OpenAI function calling format."""
from crewai.llms.providers.utils.common import safe_tool_conversion
from crewai.utilities.pydantic_schema_utils import (
force_additional_properties_false,
)
openai_tools = []
for tool in tools:
name, description, parameters = safe_tool_conversion(tool, "OpenAI")
openai_tool = {
openai_tool: dict[str, Any] = {
"type": "function",
"function": {
"name": name,
@@ -1537,10 +1544,11 @@ class OpenAICompletion(BaseLLM):
}
if parameters:
if isinstance(parameters, dict):
openai_tool["function"]["parameters"] = parameters # type: ignore
else:
openai_tool["function"]["parameters"] = dict(parameters)
params_dict = (
parameters if isinstance(parameters, dict) else dict(parameters)
)
params_dict = force_additional_properties_false(params_dict)
openai_tool["function"]["parameters"] = params_dict
openai_tools.append(openai_tool)
return openai_tools

View File

@@ -127,6 +127,36 @@ def add_key_in_dict_recursively(
return d
def force_additional_properties_false(d: Any) -> Any:
"""Force additionalProperties=false on all object-type dicts recursively.
OpenAI strict mode requires all objects to have additionalProperties=false.
This function overwrites any existing value to ensure compliance.
Also ensures objects have properties and required arrays, even if empty,
as OpenAI strict mode requires these for all object types.
Args:
d: The dictionary/list to modify.
Returns:
The modified dictionary/list.
"""
if isinstance(d, dict):
if d.get("type") == "object":
d["additionalProperties"] = False
if "properties" not in d:
d["properties"] = {}
if "required" not in d:
d["required"] = []
for v in d.values():
force_additional_properties_false(v)
elif isinstance(d, list):
for i in d:
force_additional_properties_false(i)
return d
def fix_discriminator_mappings(schema: dict[str, Any]) -> dict[str, Any]:
"""Replace '#/$defs/...' references in discriminator.mapping with just the model name.
@@ -278,13 +308,7 @@ def generate_model_description(model: type[BaseModel]) -> dict[str, Any]:
"""
json_schema = model.model_json_schema(ref_template="#/$defs/{model}")
json_schema = add_key_in_dict_recursively(
json_schema,
key="additionalProperties",
value=False,
criteria=lambda d: d.get("type") == "object"
and "additionalProperties" not in d,
)
json_schema = force_additional_properties_false(json_schema)
json_schema = resolve_refs(json_schema)
@@ -378,6 +402,9 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
"""
effective_root = root_schema or json_schema
json_schema = force_additional_properties_false(json_schema)
effective_root = force_additional_properties_false(effective_root)
if "allOf" in json_schema:
json_schema = _merge_all_of_schemas(json_schema["allOf"], effective_root)
if "title" not in json_schema and "title" in (root_schema or {}):

View File

@@ -235,8 +235,13 @@ class TestAsyncAgentExecutor:
mock_crew: MagicMock, mock_tools_handler: MagicMock
) -> None:
"""Test that multiple ainvoke calls can run concurrently."""
max_concurrent = 0
current_concurrent = 0
lock = asyncio.Lock()
async def create_and_run_executor(executor_id: int) -> dict[str, Any]:
nonlocal max_concurrent, current_concurrent
executor = CrewAgentExecutor(
llm=mock_llm,
task=mock_task,
@@ -252,7 +257,13 @@ class TestAsyncAgentExecutor:
)
async def delayed_response(*args: Any, **kwargs: Any) -> str:
await asyncio.sleep(0.05)
nonlocal max_concurrent, current_concurrent
async with lock:
current_concurrent += 1
max_concurrent = max(max_concurrent, current_concurrent)
await asyncio.sleep(0.01)
async with lock:
current_concurrent -= 1
return f"Thought: Done\nFinal Answer: Result from executor {executor_id}"
with patch(
@@ -273,19 +284,15 @@ class TestAsyncAgentExecutor:
}
)
import time
start = time.time()
results = await asyncio.gather(
create_and_run_executor(1),
create_and_run_executor(2),
create_and_run_executor(3),
)
elapsed = time.time() - start
assert len(results) == 3
assert all("output" in r for r in results)
assert elapsed < 0.15, f"Expected concurrent execution, took {elapsed}s"
assert max_concurrent > 1, f"Expected concurrent execution, max concurrent was {max_concurrent}"
class TestAsyncLLMResponseHelper:

View File

@@ -0,0 +1,108 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpUSxS5LeHwDTELElWlC5CDMzmr\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437564,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi there! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 10,\n \"total_tokens\": 19,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:05 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '460'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '477'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,215 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpStmyOpe9DrthWBlDdMZfVMJ1u\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 9,\n \"total_tokens\": 18,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:02 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '415'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '434'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"user","content":"Say bye"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '72'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpS1DP0Xd3tmWt5PBincVrdU7yw\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Goodbye! If you have more questions
in the future, feel free to reach out. Have a great day!\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 23,\n \"total_tokens\": 32,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:03 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '964'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '979'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,143 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini","stream":true,"stream_options":{"include_usage":true}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '125'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: 'data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"rVIyGQF2E"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ZGVqV7ZDm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"vnfm7IxlIB"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
How"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"o8F35ZZ"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
can"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"kiBzGe3"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
I"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"cbGT2RWgx"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
assist"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"DtxR"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
you"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"6y6Co8J"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
today"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SZOmm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"s9Bc0HqlPg"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"u9aar"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":9,"completion_tokens":9,"total_tokens":18,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"5hudm8ySqh39"}
data: [DONE]
'
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Mon, 26 Jan 2026 14:26:04 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '260'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '275'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -296,6 +296,23 @@ def test_create_folder_structure_folder_name_validation():
shutil.rmtree(folder_path)
def test_create_folder_structure_rejects_reserved_names():
"""Test that reserved script names are rejected to prevent pyproject.toml conflicts."""
with tempfile.TemporaryDirectory() as temp_dir:
reserved_names = ["test", "train", "replay", "run_crew", "run_with_trigger"]
for reserved_name in reserved_names:
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(reserved_name, parent_folder=temp_dir)
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(f"{reserved_name}/", parent_folder=temp_dir)
capitalized = reserved_name.capitalize()
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(capitalized, parent_folder=temp_dir)
@mock.patch("crewai.cli.create_crew.create_folder_structure")
@mock.patch("crewai.cli.create_crew.copy_template")
@mock.patch("crewai.cli.create_crew.load_env_vars")

View File

@@ -1,10 +1,20 @@
"""Test for version management."""
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai import __version__
from crewai.cli.version import get_crewai_version
from crewai.cli.version import (
_get_cache_file,
_is_cache_valid,
get_crewai_version,
get_latest_version_from_pypi,
is_newer_version_available,
)
def test_dynamic_versioning_consistency():
def test_dynamic_versioning_consistency() -> None:
"""Test that dynamic versioning provides consistent version across all access methods."""
cli_version = get_crewai_version()
package_version = __version__
@@ -15,3 +25,186 @@ def test_dynamic_versioning_consistency():
# Version should not be empty
assert package_version is not None
assert len(package_version.strip()) > 0
class TestVersionChecking:
"""Test version checking utilities."""
def test_get_crewai_version(self) -> None:
"""Test getting current crewai version."""
version = get_crewai_version()
assert isinstance(version, str)
assert len(version) > 0
def test_get_cache_file(self) -> None:
"""Test cache file path generation."""
cache_file = _get_cache_file()
assert isinstance(cache_file, Path)
assert cache_file.name == "version_cache.json"
def test_is_cache_valid_with_fresh_cache(self) -> None:
"""Test cache validation with fresh cache."""
cache_data = {"timestamp": datetime.now().isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is True
def test_is_cache_valid_with_stale_cache(self) -> None:
"""Test cache validation with stale cache."""
old_time = datetime.now() - timedelta(hours=25)
cache_data = {"timestamp": old_time.isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
def test_is_cache_valid_with_missing_timestamp(self) -> None:
"""Test cache validation with missing timestamp."""
cache_data = {"version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
@patch("crewai.cli.version.Path.exists")
@patch("crewai.cli.version.request.urlopen")
def test_get_latest_version_from_pypi_success(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test successful PyPI version fetch."""
# Mock cache not existing to force fetch from PyPI
mock_exists.return_value = False
mock_response = MagicMock()
mock_response.read.return_value = b'{"info": {"version": "2.0.0"}}'
mock_urlopen.return_value.__enter__.return_value = mock_response
version = get_latest_version_from_pypi()
assert version == "2.0.0"
@patch("crewai.cli.version.Path.exists")
@patch("crewai.cli.version.request.urlopen")
def test_get_latest_version_from_pypi_failure(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test PyPI version fetch failure."""
from urllib.error import URLError
# Mock cache not existing to force fetch from PyPI
mock_exists.return_value = False
mock_urlopen.side_effect = URLError("Network error")
version = get_latest_version_from_pypi()
assert version is None
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_true(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when newer version is available."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is True
assert current == "1.0.0"
assert latest == "2.0.0"
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_false(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when no newer version is available."""
mock_current.return_value = "2.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "2.0.0"
assert latest == "2.0.0"
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_with_none_latest(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when PyPI fetch fails."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = None
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "1.0.0"
assert latest is None
class TestConsoleFormatterVersionCheck:
"""Test version check display in ConsoleFormatter."""
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": ""})
def test_version_message_shows_when_update_available_and_verbose(
self, mock_check: MagicMock
) -> None:
"""Test version message shows when update available and verbose enabled."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
assert mock_print.call_count == 2
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
def test_version_message_hides_when_verbose_false(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when verbose disabled."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=False)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
def test_version_message_hides_when_no_update_available(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when no update available."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (False, "2.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": "true"})
def test_version_message_hides_in_ci_environment(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when running in CI/CD."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": "1"})
def test_version_message_hides_in_ci_environment_with_numeric_value(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when CI=1."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()

View File

@@ -217,6 +217,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Hello ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -224,6 +225,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="World!",
call_id="test-call-id",
),
)
return mock_output
@@ -284,6 +286,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="",
call_id="test-call-id",
tool_call=ToolCall(
id="call-123",
function=FunctionCall(
@@ -364,6 +367,7 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -371,6 +375,7 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Stream!",
call_id="test-call-id",
),
)
return mock_output
@@ -451,6 +456,7 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Flow ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -458,6 +464,7 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="output!",
call_id="test-call-id",
),
)
return "done"
@@ -545,6 +552,7 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async flow ",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -553,6 +561,7 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="stream!",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -686,6 +695,7 @@ class TestStreamingEdgeCases:
type="llm_stream_chunk",
chunk="Task 1",
task_name="First task",
call_id="test-call-id",
),
)
return mock_output

View File

@@ -249,6 +249,8 @@ def test_guardrail_emits_events(sample_agent):
result = task.execute_sync(agent=sample_agent)
crewai_event_bus.flush(timeout=10.0)
with condition:
success = condition.wait_for(
lambda: len(started_guardrail) >= 2 and len(completed_guardrail) >= 2,
@@ -267,6 +269,8 @@ def test_guardrail_emits_events(sample_agent):
task.execute_sync(agent=sample_agent)
crewai_event_bus.flush(timeout=10.0)
with condition:
success = condition.wait_for(
lambda: len(started_guardrail) >= 3 and len(completed_guardrail) >= 3,

View File

@@ -984,8 +984,8 @@ def test_streaming_fallback_to_non_streaming():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
nonlocal fallback_called
# Emit a couple of chunks to simulate partial streaming
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id = "Id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id = "Id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id="Id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id="Id", call_id="test-call-id"))
# Mark that fallback would be called
fallback_called = True
@@ -1041,7 +1041,7 @@ def test_streaming_empty_response_handling():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
# Emit a few empty chunks
for _ in range(3):
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="",response_id="id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="", response_id="id", call_id="test-call-id"))
# Return the default message for empty responses
return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request."
@@ -1280,6 +1280,105 @@ def test_llm_emits_event_with_lite_agent():
assert set(all_agent_id) == {str(agent.id)}
# ----------- CALL_ID CORRELATION TESTS -----------
@pytest.mark.vcr()
def test_llm_call_events_share_call_id():
"""All events from a single LLM call should share the same call_id."""
import uuid
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
with condition:
success = condition.wait_for(lambda: len(events) >= 2, timeout=10)
assert success, "Timeout waiting for LLM events"
# Behavior: all events from the call share the same call_id
assert len(events) == 2
assert events[0].call_id == events[1].call_id
# call_id should be a valid UUID
uuid.UUID(events[0].call_id)
@pytest.mark.vcr()
def test_streaming_chunks_share_call_id_with_call():
"""Streaming chunks should share call_id with started/completed events."""
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_chunk(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini", stream=True)
llm.call("Say hi")
with condition:
# Wait for at least started, some chunks, and completed
success = condition.wait_for(lambda: len(events) >= 3, timeout=10)
assert success, "Timeout waiting for streaming events"
# Behavior: all events (started, chunks, completed) share the same call_id
call_ids = {e.call_id for e in events}
assert len(call_ids) == 1
@pytest.mark.vcr()
def test_separate_llm_calls_have_different_call_ids():
"""Different LLM calls should have different call_ids."""
call_ids = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
call_ids.append(event.call_id)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
llm.call("Say bye")
with condition:
success = condition.wait_for(lambda: len(call_ids) >= 2, timeout=10)
assert success, "Timeout waiting for LLM call events"
# Behavior: each call has its own call_id
assert len(call_ids) == 2
assert call_ids[0] != call_ids[1]
# ----------- HUMAN FEEDBACK EVENTS -----------

View File

@@ -28,7 +28,7 @@ dev = [
"boto3-stubs[bedrock-runtime]==1.40.54",
"types-psycopg2==2.9.21.20251012",
"types-pymysql==1.1.0.20250916",
"types-aiofiles~=24.1.0",
"types-aiofiles~=25.1.0",
]

12
uv.lock generated
View File

@@ -51,7 +51,7 @@ dev = [
{ name = "pytest-timeout", specifier = "==2.4.0" },
{ name = "pytest-xdist", specifier = "==3.8.0" },
{ name = "ruff", specifier = "==0.14.7" },
{ name = "types-aiofiles", specifier = "~=24.1.0" },
{ name = "types-aiofiles", specifier = "~=25.1.0" },
{ name = "types-appdirs", specifier = "==1.4.*" },
{ name = "types-psycopg2", specifier = "==2.9.21.20251012" },
{ name = "types-pymysql", specifier = "==1.1.0.20250916" },
@@ -1294,10 +1294,10 @@ requires-dist = [
{ name = "json-repair", specifier = "~=0.25.2" },
{ name = "json5", specifier = "~=0.10.0" },
{ name = "jsonref", specifier = "~=1.1.0" },
{ name = "litellm", marker = "extra == 'litellm'", specifier = "~=1.74.9" },
{ name = "litellm", marker = "extra == 'litellm'", specifier = ">=1.74.9,<3" },
{ name = "mcp", specifier = "~=1.23.1" },
{ name = "mem0ai", marker = "extra == 'mem0'", specifier = "~=0.1.94" },
{ name = "openai", specifier = "~=1.83.0" },
{ name = "openai", specifier = ">=1.83.0,<3" },
{ name = "openpyxl", specifier = "~=3.1.5" },
{ name = "openpyxl", marker = "extra == 'openpyxl'", specifier = "~=3.1.5" },
{ name = "opentelemetry-api", specifier = "~=1.34.0" },
@@ -8282,11 +8282,11 @@ wheels = [
[[package]]
name = "types-aiofiles"
version = "24.1.0.20250822"
version = "25.1.0.20251011"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/19/48/c64471adac9206cc844afb33ed311ac5a65d2f59df3d861e0f2d0cad7414/types_aiofiles-24.1.0.20250822.tar.gz", hash = "sha256:9ab90d8e0c307fe97a7cf09338301e3f01a163e39f3b529ace82466355c84a7b", size = 14484, upload-time = "2025-08-22T03:02:23.039Z" }
sdist = { url = "https://files.pythonhosted.org/packages/84/6c/6d23908a8217e36704aa9c79d99a620f2fdd388b66a4b7f72fbc6b6ff6c6/types_aiofiles-25.1.0.20251011.tar.gz", hash = "sha256:1c2b8ab260cb3cd40c15f9d10efdc05a6e1e6b02899304d80dfa0410e028d3ff", size = 14535, upload-time = "2025-10-11T02:44:51.237Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bc/8e/5e6d2215e1d8f7c2a94c6e9d0059ae8109ce0f5681956d11bb0a228cef04/types_aiofiles-24.1.0.20250822-py3-none-any.whl", hash = "sha256:0ec8f8909e1a85a5a79aed0573af7901f53120dd2a29771dd0b3ef48e12328b0", size = 14322, upload-time = "2025-08-22T03:02:21.918Z" },
{ url = "https://files.pythonhosted.org/packages/71/0f/76917bab27e270bb6c32addd5968d69e558e5b6f7fb4ac4cbfa282996a96/types_aiofiles-25.1.0.20251011-py3-none-any.whl", hash = "sha256:8ff8de7f9d42739d8f0dadcceeb781ce27cd8d8c4152d4a7c52f6b20edb8149c", size = 14338, upload-time = "2025-10-11T02:44:50.054Z" },
]
[[package]]