Compare commits

..

2 Commits

Author SHA1 Message Date
Greyson LaLonde
eda4430d6f Merge branch 'main' into gl/fix/hitl-flow-plot 2026-02-02 13:37:12 -05:00
Greyson LaLonde
3f264b4cc8 fix: add human_feedback metadata and visualization 2026-02-02 13:18:22 -05:00
41 changed files with 1559 additions and 19024 deletions

View File

@@ -5,12 +5,7 @@
version: 2
updates:
- package-ecosystem: uv
directory: "/"
- package-ecosystem: uv # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
groups:
security-updates:
applies-to: security-updates
patterns:
- "*"

View File

@@ -1 +0,0 @@
3.13

View File

@@ -8,9 +8,8 @@ from typing import Any
from crewai.tools.base_tool import BaseTool, EnvVar
from pydantic import BaseModel
from pydantic.fields import FieldInfo
from pydantic.json_schema import GenerateJsonSchema
from pydantic_core import PydanticOmit, PydanticUndefined
from pydantic_core import PydanticOmit
from crewai_tools import tools
@@ -45,9 +44,6 @@ class ToolSpecExtractor:
schema = self._unwrap_schema(core_schema)
fields = schema.get("schema", {}).get("fields", {})
# Use model_fields to get defaults (handles both default and default_factory)
model_fields = tool_class.model_fields
tool_info = {
"name": tool_class.__name__,
"humanized_name": self._extract_field_default(
@@ -58,9 +54,9 @@ class ToolSpecExtractor:
).strip(),
"run_params_schema": self._extract_params(fields.get("args_schema")),
"init_params_schema": self._extract_init_params(tool_class),
"env_vars": self._extract_env_vars_from_model_fields(model_fields),
"package_dependencies": self._extract_package_deps_from_model_fields(
model_fields
"env_vars": self._extract_env_vars(fields.get("env_vars")),
"package_dependencies": self._extract_field_default(
fields.get("package_dependencies"), fallback=[]
),
}
@@ -107,27 +103,10 @@ class ToolSpecExtractor:
return {}
@staticmethod
def _get_field_default(field: FieldInfo | None) -> Any:
"""Get default value from a FieldInfo, handling both default and default_factory."""
if not field:
return None
default_value = field.default
if default_value is PydanticUndefined or default_value is None:
if field.default_factory:
return field.default_factory()
return None
return default_value
@staticmethod
def _extract_env_vars_from_model_fields(
model_fields: dict[str, FieldInfo],
def _extract_env_vars(
env_vars_field: dict[str, Any] | None,
) -> list[dict[str, Any]]:
default_value = ToolSpecExtractor._get_field_default(
model_fields.get("env_vars")
)
if not default_value:
if not env_vars_field:
return []
return [
@@ -137,22 +116,10 @@ class ToolSpecExtractor:
"required": env_var.required,
"default": env_var.default,
}
for env_var in default_value
for env_var in env_vars_field.get("schema", {}).get("default", [])
if isinstance(env_var, EnvVar)
]
@staticmethod
def _extract_package_deps_from_model_fields(
model_fields: dict[str, FieldInfo],
) -> list[str]:
default_value = ToolSpecExtractor._get_field_default(
model_fields.get("package_dependencies")
)
if not isinstance(default_value, list):
return []
return default_value
@staticmethod
def _extract_init_params(tool_class: type[BaseTool]) -> dict[str, Any]:
ignored_init_params = [
@@ -185,7 +152,7 @@ class ToolSpecExtractor:
if __name__ == "__main__":
output_file = Path(__file__).parent.parent.parent / "tool.specs.json"
output_file = Path(__file__).parent / "tool.specs.json"
extractor = ToolSpecExtractor()
extractor.extract_all_tools()

View File

@@ -1,17 +1,12 @@
from datetime import datetime
import json
import os
import time
from typing import Annotated, Any, ClassVar, Literal
from typing import Any, ClassVar
from crewai.tools import BaseTool, EnvVar
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from pydantic.types import StringConstraints
import requests
load_dotenv()
def _save_results_to_file(content: str) -> None:
"""Saves the search results to a file."""
@@ -20,72 +15,37 @@ def _save_results_to_file(content: str) -> None:
file.write(content)
FreshnessPreset = Literal["pd", "pw", "pm", "py"]
FreshnessRange = Annotated[
str, StringConstraints(pattern=r"^\d{4}-\d{2}-\d{2}to\d{4}-\d{2}-\d{2}$")
]
Freshness = FreshnessPreset | FreshnessRange
SafeSearch = Literal["off", "moderate", "strict"]
class BraveSearchToolSchema(BaseModel):
"""Input for BraveSearchTool"""
"""Input for BraveSearchTool."""
query: str = Field(..., description="Search query to perform")
country: str | None = Field(
default=None,
description="Country code for geo-targeting (e.g., 'US', 'BR').",
)
search_language: str | None = Field(
default=None,
description="Language code for the search results (e.g., 'en', 'es').",
)
count: int | None = Field(
default=None,
description="The maximum number of results to return. Actual number may be less.",
)
offset: int | None = Field(
default=None, description="Skip the first N result sets/pages. Max is 9."
)
safesearch: SafeSearch | None = Field(
default=None,
description="Filter out explicit content. Options: off/moderate/strict",
)
spellcheck: bool | None = Field(
default=None,
description="Attempt to correct spelling errors in the search query.",
)
freshness: Freshness | None = Field(
default=None,
description="Enforce freshness of results. Options: pd/pw/pm/py, or YYYY-MM-DDtoYYYY-MM-DD",
)
text_decorations: bool | None = Field(
default=None,
description="Include markup to highlight search terms in the results.",
)
extra_snippets: bool | None = Field(
default=None,
description="Include up to 5 text snippets for each page if possible.",
)
operators: bool | None = Field(
default=None,
description="Whether to apply search operators (e.g., site:example.com).",
search_query: str = Field(
..., description="Mandatory search query you want to use to search the internet"
)
# TODO: Extend support to additional endpoints (e.g., /images, /news, etc.)
class BraveSearchTool(BaseTool):
"""A tool that performs web searches using the Brave Search API."""
"""BraveSearchTool - A tool for performing web searches using the Brave Search API.
name: str = "Brave Search"
This module provides functionality to search the internet using Brave's Search API,
supporting customizable result counts and country-specific searches.
Dependencies:
- requests
- pydantic
- python-dotenv (for API key management)
"""
name: str = "Brave Web Search the internet"
description: str = (
"A tool that performs web searches using the Brave Search API. "
"Results are returned as structured JSON data."
"A tool that can be used to search the internet with a search_query."
)
args_schema: type[BaseModel] = BraveSearchToolSchema
search_url: str = "https://api.search.brave.com/res/v1/web/search"
country: str | None = ""
n_results: int = 10
save_file: bool = False
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
@@ -95,9 +55,6 @@ class BraveSearchTool(BaseTool):
),
]
)
# Rate limiting parameters
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -116,64 +73,19 @@ class BraveSearchTool(BaseTool):
self._min_request_interval - (current_time - self._last_request_time)
)
BraveSearchTool._last_request_time = time.time()
# Construct and send the request
try:
# Maintain both "search_query" and "query" for backwards compatibility
query = kwargs.get("search_query") or kwargs.get("query")
if not query:
raise ValueError("Query is required")
payload = {"q": query}
if country := kwargs.get("country"):
payload["country"] = country
if search_language := kwargs.get("search_language"):
payload["search_language"] = search_language
# Fallback to deprecated n_results parameter if no count is provided
count = kwargs.get("count")
if count is not None:
payload["count"] = count
else:
payload["count"] = self.n_results
# Offset may be 0, so avoid truthiness check
offset = kwargs.get("offset")
if offset is not None:
payload["offset"] = offset
if safesearch := kwargs.get("safesearch"):
payload["safesearch"] = safesearch
search_query = kwargs.get("search_query") or kwargs.get("query")
if not search_query:
raise ValueError("Search query is required")
save_file = kwargs.get("save_file", self.save_file)
if freshness := kwargs.get("freshness"):
payload["freshness"] = freshness
n_results = kwargs.get("n_results", self.n_results)
# Boolean parameters
spellcheck = kwargs.get("spellcheck")
if spellcheck is not None:
payload["spellcheck"] = spellcheck
payload = {"q": search_query, "count": n_results}
text_decorations = kwargs.get("text_decorations")
if text_decorations is not None:
payload["text_decorations"] = text_decorations
if self.country != "":
payload["country"] = self.country
extra_snippets = kwargs.get("extra_snippets")
if extra_snippets is not None:
payload["extra_snippets"] = extra_snippets
operators = kwargs.get("operators")
if operators is not None:
payload["operators"] = operators
# Limit the result types to "web" since there is presently no
# handling of other types like "discussions", "faq", "infobox",
# "news", "videos", or "locations".
payload["result_filter"] = "web"
# Setup Request Headers
headers = {
"X-Subscription-Token": os.environ["BRAVE_API_KEY"],
"Accept": "application/json",
@@ -185,32 +97,25 @@ class BraveSearchTool(BaseTool):
response.raise_for_status() # Handle non-200 responses
results = response.json()
# TODO: Handle other result types like "discussions", "faq", etc.
web_results_items = []
if "web" in results:
web_results = results["web"]["results"]
for result in web_results:
url = result.get("url")
title = result.get("title")
# If, for whatever reason, this entry does not have a title
# or url, skip it.
if not url or not title:
results = results["web"]["results"]
string = []
for result in results:
try:
string.append(
"\n".join(
[
f"Title: {result['title']}",
f"Link: {result['url']}",
f"Snippet: {result['description']}",
"---",
]
)
)
except KeyError: # noqa: PERF203
continue
item = {
"url": url,
"title": title,
}
description = result.get("description")
if description:
item["description"] = description
snippets = result.get("extra_snippets")
if snippets:
item["snippets"] = snippets
web_results_items.append(item)
content = json.dumps(web_results_items)
content = "\n".join(string)
except requests.RequestException as e:
return f"Error performing search: {e!s}"
except KeyError as e:

View File

@@ -4,7 +4,7 @@ import os
import re
from typing import Any
from crewai.tools import BaseTool, EnvVar
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
@@ -137,21 +137,7 @@ class StagehandTool(BaseTool):
- 'observe': For finding elements in a specific area
"""
args_schema: type[BaseModel] = StagehandToolSchema
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand<=0.5.9"])
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="BROWSERBASE_API_KEY",
description="API key for Browserbase services",
required=False,
),
EnvVar(
name="BROWSERBASE_PROJECT_ID",
description="Project ID for Browserbase services",
required=False,
),
]
)
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand"])
# Stagehand configuration
api_key: str | None = None

View File

@@ -23,26 +23,23 @@ class MockTool(BaseTool):
)
my_parameter: str = Field("This is default value", description="What a description")
my_parameter_bool: bool = Field(False)
# Use default_factory like real tools do (not direct default)
package_dependencies: list[str] = Field(
default_factory=lambda: ["this-is-a-required-package", "another-required-package"]
)
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="SERPER_API_KEY",
description="API key for Serper",
required=True,
default=None,
),
EnvVar(
name="API_RATE_LIMIT",
description="API rate limit",
required=False,
default="100",
),
]
["this-is-a-required-package", "another-required-package"], description=""
)
env_vars: list[EnvVar] = [
EnvVar(
name="SERPER_API_KEY",
description="API key for Serper",
required=True,
default=None,
),
EnvVar(
name="API_RATE_LIMIT",
description="API rate limit",
required=False,
default="100",
),
]
@pytest.fixture

View File

@@ -1,9 +1,7 @@
import json
from unittest.mock import patch
import pytest
from crewai_tools.tools.brave_search_tool.brave_search_tool import BraveSearchTool
import pytest
@pytest.fixture
@@ -32,43 +30,16 @@ def test_brave_tool_search(mock_get, brave_tool):
}
mock_get.return_value.json.return_value = mock_response
result = brave_tool.run(query="test")
result = brave_tool.run(search_query="test")
assert "Test Title" in result
assert "http://test.com" in result
@patch("requests.get")
def test_brave_tool(mock_get):
mock_response = {
"web": {
"results": [
{
"title": "Brave Browser",
"url": "https://brave.com",
"description": "Brave Browser description",
}
]
}
}
mock_get.return_value.json.return_value = mock_response
tool = BraveSearchTool(n_results=2)
result = tool.run(query="Brave Browser")
assert result is not None
# Parse JSON so we can examine the structure
data = json.loads(result)
assert isinstance(data, list)
assert len(data) >= 1
# First item should have expected fields: title, url, and description
first = data[0]
assert "title" in first
assert first["title"] == "Brave Browser"
assert "url" in first
assert first["url"] == "https://brave.com"
assert "description" in first
assert first["description"] == "Brave Browser description"
def test_brave_tool():
tool = BraveSearchTool(
n_results=2,
)
tool.run(search_query="ChatGPT")
if __name__ == "__main__":

File diff suppressed because it is too large Load Diff

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
# Core Dependencies
"pydantic~=2.11.9",
"openai>=1.83.0,<3",
"openai~=1.83.0",
"instructor>=1.3.3",
# Text Processing
"pdfplumber~=0.11.4",
@@ -78,7 +78,7 @@ voyageai = [
"voyageai~=0.3.5",
]
litellm = [
"litellm>=1.74.9,<3",
"litellm~=1.74.9",
]
bedrock = [
"boto3~=1.40.45",

View File

@@ -16,7 +16,6 @@ from crewai.agents.agent_adapters.openai_agents.protocols import (
)
from crewai.tools import BaseTool
from crewai.utilities.import_utils import require
from crewai.utilities.pydantic_schema_utils import force_additional_properties_false
from crewai.utilities.string_utils import sanitize_tool_name
@@ -136,9 +135,7 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
for tool in tools:
schema: dict[str, Any] = tool.args_schema.model_json_schema()
schema = force_additional_properties_false(schema)
schema.update({"type": "object"})
schema.update({"additionalProperties": False, "type": "object"})
openai_tool: OpenAIFunctionTool = cast(
OpenAIFunctionTool,

View File

@@ -1,13 +1,10 @@
from typing import Any
DEFAULT_CREWAI_ENTERPRISE_URL = "https://app.crewai.com"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER = "workos"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE = "client_01JNJQWBJ4SPFN3SWJM5T7BDG8"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID = "client_01JYT06R59SP0NXYGD994NFXXX"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN = "login.crewai.com"
ENV_VARS: dict[str, list[dict[str, Any]]] = {
ENV_VARS = {
"openai": [
{
"prompt": "Enter your OPENAI API key (press Enter to skip)",
@@ -115,7 +112,7 @@ ENV_VARS: dict[str, list[dict[str, Any]]] = {
}
PROVIDERS: list[str] = [
PROVIDERS = [
"openai",
"anthropic",
"gemini",
@@ -130,7 +127,7 @@ PROVIDERS: list[str] = [
"sambanova",
]
MODELS: dict[str, list[str]] = {
MODELS = {
"openai": [
"gpt-4",
"gpt-4.1",

View File

@@ -3,7 +3,6 @@ import shutil
import sys
import click
import tomli
from crewai.cli.constants import ENV_VARS, MODELS
from crewai.cli.provider import (
@@ -14,31 +13,7 @@ from crewai.cli.provider import (
from crewai.cli.utils import copy_template, load_env_vars, write_env_file
def get_reserved_script_names() -> set[str]:
"""Get reserved script names from pyproject.toml template.
Returns:
Set of reserved script names that would conflict with crew folder names.
"""
package_dir = Path(__file__).parent
template_path = package_dir / "templates" / "crew" / "pyproject.toml"
with open(template_path, "r") as f:
template_content = f.read()
template_content = template_content.replace("{{folder_name}}", "_placeholder_")
template_content = template_content.replace("{{name}}", "placeholder")
template_content = template_content.replace("{{crew_name}}", "Placeholder")
template_data = tomli.loads(template_content)
script_names = set(template_data.get("project", {}).get("scripts", {}).keys())
script_names.discard("_placeholder_")
return script_names
def create_folder_structure(
name: str, parent_folder: str | None = None
) -> tuple[Path, str, str]:
def create_folder_structure(name, parent_folder=None):
import keyword
import re
@@ -76,14 +51,6 @@ def create_folder_structure(
f"Project name '{name}' would generate invalid Python module name '{folder_name}'"
)
reserved_names = get_reserved_script_names()
if folder_name in reserved_names:
raise ValueError(
f"Project name '{name}' would generate folder name '{folder_name}' which is reserved. "
f"Reserved names are: {', '.join(sorted(reserved_names))}. "
"Please choose a different name."
)
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
class_name = re.sub(r"[^a-zA-Z0-9_]", "", class_name)
@@ -147,9 +114,7 @@ def create_folder_structure(
return folder_path, folder_name, class_name
def copy_template_files(
folder_path: Path, name: str, class_name: str, parent_folder: str | None
) -> None:
def copy_template_files(folder_path, name, class_name, parent_folder):
package_dir = Path(__file__).parent
templates_dir = package_dir / "templates" / "crew"
@@ -190,12 +155,7 @@ def copy_template_files(
copy_template(src_file, dst_file, name, class_name, folder_path.name)
def create_crew(
name: str,
provider: str | None = None,
skip_provider: bool = False,
parent_folder: str | None = None,
) -> None:
def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
folder_path, folder_name, class_name = create_folder_structure(name, parent_folder)
env_vars = load_env_vars(folder_path)
if not skip_provider:
@@ -229,9 +189,7 @@ def create_crew(
if selected_provider is None: # User typed 'q'
click.secho("Exiting...", fg="yellow")
sys.exit(0)
if selected_provider and isinstance(
selected_provider, str
): # Valid selection
if selected_provider: # Valid selection
break
click.secho(
"No provider selected. Please try again or press 'q' to exit.", fg="red"

View File

@@ -1,10 +1,8 @@
from collections import defaultdict
from collections.abc import Sequence
import json
import os
from pathlib import Path
import time
from typing import Any
import certifi
import click
@@ -13,15 +11,16 @@ import requests
from crewai.cli.constants import JSON_URL, MODELS, PROVIDERS
def select_choice(prompt_message: str, choices: Sequence[str]) -> str | None:
"""Presents a list of choices to the user and prompts them to select one.
def select_choice(prompt_message, choices):
"""
Presents a list of choices to the user and prompts them to select one.
Args:
prompt_message: The message to display to the user before presenting the choices.
choices: A list of options to present to the user.
- prompt_message (str): The message to display to the user before presenting the choices.
- choices (list): A list of options to present to the user.
Returns:
The selected choice from the list, or None if the user chooses to quit.
- str: The selected choice from the list, or None if the user chooses to quit.
"""
provider_models = get_provider_data()
@@ -53,14 +52,16 @@ def select_choice(prompt_message: str, choices: Sequence[str]) -> str | None:
)
def select_provider(provider_models: dict[str, list[str]]) -> str | None | bool:
"""Presents a list of providers to the user and prompts them to select one.
def select_provider(provider_models):
"""
Presents a list of providers to the user and prompts them to select one.
Args:
provider_models: A dictionary of provider models.
- provider_models (dict): A dictionary of provider models.
Returns:
The selected provider, None if user explicitly quits, or False if no selection.
- str: The selected provider
- None: If user explicitly quits
"""
predefined_providers = [p.lower() for p in PROVIDERS]
all_providers = sorted(set(predefined_providers + list(provider_models.keys())))
@@ -79,15 +80,16 @@ def select_provider(provider_models: dict[str, list[str]]) -> str | None | bool:
return provider.lower() if provider else False
def select_model(provider: str, provider_models: dict[str, list[str]]) -> str | None:
"""Presents a list of models for a given provider to the user and prompts them to select one.
def select_model(provider, provider_models):
"""
Presents a list of models for a given provider to the user and prompts them to select one.
Args:
provider: The provider for which to select a model.
provider_models: A dictionary of provider models.
- provider (str): The provider for which to select a model.
- provider_models (dict): A dictionary of provider models.
Returns:
The selected model, or None if the operation is aborted or an invalid selection is made.
- str: The selected model, or None if the operation is aborted or an invalid selection is made.
"""
predefined_providers = [p.lower() for p in PROVIDERS]
@@ -105,17 +107,16 @@ def select_model(provider: str, provider_models: dict[str, list[str]]) -> str |
)
def load_provider_data(cache_file: Path, cache_expiry: int) -> dict[str, Any] | None:
"""Loads provider data from a cache file if it exists and is not expired.
If the cache is expired or corrupted, it fetches the data from the web.
def load_provider_data(cache_file, cache_expiry):
"""
Loads provider data from a cache file if it exists and is not expired. If the cache is expired or corrupted, it fetches the data from the web.
Args:
cache_file: The path to the cache file.
cache_expiry: The cache expiry time in seconds.
- cache_file (Path): The path to the cache file.
- cache_expiry (int): The cache expiry time in seconds.
Returns:
The loaded provider data or None if the operation fails.
- dict or None: The loaded provider data or None if the operation fails.
"""
current_time = time.time()
if (
@@ -136,31 +137,32 @@ def load_provider_data(cache_file: Path, cache_expiry: int) -> dict[str, Any] |
return fetch_provider_data(cache_file)
def read_cache_file(cache_file: Path) -> dict[str, Any] | None:
"""Reads and returns the JSON content from a cache file.
def read_cache_file(cache_file):
"""
Reads and returns the JSON content from a cache file. Returns None if the file contains invalid JSON.
Args:
cache_file: The path to the cache file.
- cache_file (Path): The path to the cache file.
Returns:
The JSON content of the cache file or None if the JSON is invalid.
- dict or None: The JSON content of the cache file or None if the JSON is invalid.
"""
try:
with open(cache_file, "r") as f:
data: dict[str, Any] = json.load(f)
return data
return json.load(f)
except json.JSONDecodeError:
return None
def fetch_provider_data(cache_file: Path) -> dict[str, Any] | None:
"""Fetches provider data from a specified URL and caches it to a file.
def fetch_provider_data(cache_file):
"""
Fetches provider data from a specified URL and caches it to a file.
Args:
cache_file: The path to the cache file.
- cache_file (Path): The path to the cache file.
Returns:
The fetched provider data or None if the operation fails.
- dict or None: The fetched provider data or None if the operation fails.
"""
ssl_config = os.environ["SSL_CERT_FILE"] = certifi.where()
@@ -178,39 +180,36 @@ def fetch_provider_data(cache_file: Path) -> dict[str, Any] | None:
return None
def download_data(response: requests.Response) -> dict[str, Any]:
"""Downloads data from a given HTTP response and returns the JSON content.
def download_data(response):
"""
Downloads data from a given HTTP response and returns the JSON content.
Args:
response: The HTTP response object.
- response (requests.Response): The HTTP response object.
Returns:
The JSON content of the response.
- dict: The JSON content of the response.
"""
total_size = int(response.headers.get("content-length", 0))
block_size = 8192
data_chunks: list[bytes] = []
bar: Any
data_chunks = []
with click.progressbar(
length=total_size, label="Downloading", show_pos=True
) as bar:
) as progress_bar:
for chunk in response.iter_content(block_size):
if chunk:
data_chunks.append(chunk)
bar.update(len(chunk))
progress_bar.update(len(chunk))
data_content = b"".join(data_chunks)
result: dict[str, Any] = json.loads(data_content.decode("utf-8"))
return result
return json.loads(data_content.decode("utf-8"))
def get_provider_data() -> dict[str, list[str]] | None:
"""Retrieves provider data from a cache file.
Filters out models based on provider criteria, and returns a dictionary of providers
mapped to their models.
def get_provider_data():
"""
Retrieves provider data from a cache file, filters out models based on provider criteria, and returns a dictionary of providers mapped to their models.
Returns:
A dictionary of providers mapped to their models or None if the operation fails.
- dict or None: A dictionary of providers mapped to their models or None if the operation fails.
"""
cache_dir = Path.home() / ".crewai"
cache_dir.mkdir(exist_ok=True)

View File

@@ -1,107 +1,6 @@
"""Version utilities for CrewAI CLI."""
from collections.abc import Mapping
from datetime import datetime, timedelta
from functools import lru_cache
import importlib.metadata
import json
from pathlib import Path
from typing import Any, cast
from urllib import request
from urllib.error import URLError
import appdirs
from packaging.version import InvalidVersion, parse
@lru_cache(maxsize=1)
def _get_cache_file() -> Path:
"""Get the path to the version cache file.
Cached to avoid repeated filesystem operations.
"""
cache_dir = Path(appdirs.user_cache_dir("crewai"))
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir / "version_cache.json"
def get_crewai_version() -> str:
"""Get the version number of CrewAI running the CLI."""
"""Get the version number of CrewAI running the CLI"""
return importlib.metadata.version("crewai")
def _is_cache_valid(cache_data: Mapping[str, Any]) -> bool:
"""Check if the cache is still valid, less than 24 hours old."""
if "timestamp" not in cache_data:
return False
try:
cache_time = datetime.fromisoformat(str(cache_data["timestamp"]))
return datetime.now() - cache_time < timedelta(hours=24)
except (ValueError, TypeError):
return False
def get_latest_version_from_pypi(timeout: int = 2) -> str | None:
"""Get the latest version of CrewAI from PyPI.
Args:
timeout: Request timeout in seconds.
Returns:
Latest version string or None if unable to fetch.
"""
cache_file = _get_cache_file()
if cache_file.exists():
try:
cache_data = json.loads(cache_file.read_text())
if _is_cache_valid(cache_data):
return cast(str | None, cache_data.get("version"))
except (json.JSONDecodeError, OSError):
pass
try:
with request.urlopen(
"https://pypi.org/pypi/crewai/json", timeout=timeout
) as response:
data = json.loads(response.read())
latest_version = cast(str, data["info"]["version"])
cache_data = {
"version": latest_version,
"timestamp": datetime.now().isoformat(),
}
cache_file.write_text(json.dumps(cache_data))
return latest_version
except (URLError, json.JSONDecodeError, KeyError, OSError):
return None
def check_version() -> tuple[str, str | None]:
"""Check current and latest versions.
Returns:
Tuple of (current_version, latest_version).
latest_version is None if unable to fetch from PyPI.
"""
current = get_crewai_version()
latest = get_latest_version_from_pypi()
return current, latest
def is_newer_version_available() -> tuple[bool, str, str | None]:
"""Check if a newer version is available.
Returns:
Tuple of (is_newer, current_version, latest_version).
"""
current, latest = check_version()
if latest is None:
return False, current, None
try:
return parse(latest) > parse(current), current, latest
except (InvalidVersion, TypeError):
return False, current, latest

View File

@@ -10,7 +10,6 @@ class LLMEventBase(BaseEvent):
from_task: Any | None = None
from_agent: Any | None = None
model: str | None = None
call_id: str
def __init__(self, **data: Any) -> None:
if data.get("from_task"):

View File

@@ -1,14 +1,11 @@
import os
import threading
from typing import Any, ClassVar, cast
from typing import Any, ClassVar
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.text import Text
from crewai.cli.version import is_newer_version_available
class ConsoleFormatter:
tool_usage_counts: ClassVar[dict[str, int]] = {}
@@ -38,39 +35,6 @@ class ConsoleFormatter:
padding=(1, 2),
)
def _show_version_update_message_if_needed(self) -> None:
"""Show version update message if a newer version is available.
Only displays when verbose mode is enabled and not running in CI/CD.
"""
if not self.verbose:
return
if os.getenv("CI", "").lower() in ("true", "1"):
return
try:
is_newer, current, latest = is_newer_version_available()
if is_newer and latest:
message = f"""A new version of CrewAI is available!
Current version: {current}
Latest version: {latest}
To update, run: uv sync --upgrade-package crewai"""
panel = Panel(
message,
title="✨ Update Available ✨",
border_style="yellow",
padding=(1, 2),
)
self.console.print(panel)
self.console.print()
except Exception: # noqa: S110
# Silently ignore errors in version check - it's non-critical
pass
def _show_tracing_disabled_message_if_needed(self) -> None:
"""Show tracing disabled message if tracing is not enabled."""
from crewai.events.listeners.tracing.utils import (
@@ -212,10 +176,9 @@ To enable tracing, do any one of these:
if not self.verbose:
return
# Reset the crew completion event for this new crew execution
ConsoleFormatter.crew_completion_printed.clear()
self._show_version_update_message_if_needed()
content = self.create_status_content(
"Crew Execution Started",
crew_name,
@@ -274,8 +237,6 @@ To enable tracing, do any one of these:
def handle_flow_started(self, flow_name: str, flow_id: str) -> None:
"""Show flow started panel."""
self._show_version_update_message_if_needed()
content = Text()
content.append("Flow Started\n", style="blue bold")
content.append("Name: ", style="white")
@@ -924,7 +885,7 @@ To enable tracing, do any one of these:
is_a2a_delegation = False
try:
output_data = json.loads(cast(str, formatted_answer.output))
output_data = json.loads(formatted_answer.output)
if isinstance(output_data, dict):
if output_data.get("is_a2a") is True:
is_a2a_delegation = True

View File

@@ -513,11 +513,17 @@ class FlowMeta(type):
and attr_value.__is_router__
):
routers.add(attr_name)
possible_returns = get_possible_return_constants(attr_value)
if possible_returns:
router_paths[attr_name] = possible_returns
if (
hasattr(attr_value, "__router_paths__")
and attr_value.__router_paths__
):
router_paths[attr_name] = attr_value.__router_paths__
else:
router_paths[attr_name] = []
possible_returns = get_possible_return_constants(attr_value)
if possible_returns:
router_paths[attr_name] = possible_returns
else:
router_paths[attr_name] = []
# Handle start methods that are also routers (e.g., @human_feedback with emit)
if (

View File

@@ -1025,7 +1025,7 @@ class TriggeredByHighlighter {
const isAndOrRouter = edge.dashes || edge.label === "AND";
const highlightColor = isAndOrRouter
? "{{ CREWAI_ORANGE }}"
? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
const updateData = {
@@ -1080,7 +1080,7 @@ class TriggeredByHighlighter {
// Keep the original edge color instead of turning gray
const isAndOrRouter = edge.dashes || edge.label === "AND";
const baseColor = isAndOrRouter
? "{{ CREWAI_ORANGE }}"
? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
// Convert color to rgba with opacity for vis.js
@@ -1142,7 +1142,7 @@ class TriggeredByHighlighter {
const defaultColor =
edge.dashes || edge.label === "AND"
? "{{ CREWAI_ORANGE }}"
? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
const currentOpacity = edge.opacity !== undefined ? edge.opacity : 1.0;
const currentWidth =
@@ -1253,7 +1253,7 @@ class TriggeredByHighlighter {
const defaultColor =
edge.dashes || edge.label === "AND"
? "{{ CREWAI_ORANGE }}"
? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
const currentOpacity = edge.opacity !== undefined ? edge.opacity : 1.0;
const currentWidth =
@@ -2370,7 +2370,7 @@ class NetworkManager {
this.edges.forEach((edge) => {
let edgeColor;
if (edge.dashes || edge.label === "AND") {
edgeColor = "{{ CREWAI_ORANGE }}";
edgeColor = edge.color?.color || "{{ CREWAI_ORANGE }}";
} else {
edgeColor = orEdgeColor;
}

View File

@@ -129,7 +129,7 @@ def _create_edges_from_condition(
edges: list[StructureEdge] = []
if isinstance(condition, str):
if condition in nodes:
if condition in nodes and condition != target:
edges.append(
StructureEdge(
source=condition,
@@ -140,7 +140,7 @@ def _create_edges_from_condition(
)
elif callable(condition) and hasattr(condition, "__name__"):
method_name = condition.__name__
if method_name in nodes:
if method_name in nodes and method_name != target:
edges.append(
StructureEdge(
source=method_name,
@@ -163,7 +163,7 @@ def _create_edges_from_condition(
is_router_path=False,
)
for trigger in triggers
if trigger in nodes
if trigger in nodes and trigger != target
)
else:
for sub_cond in conditions_list:
@@ -196,9 +196,34 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
node_metadata["type"] = "start"
start_methods.append(method_name)
if (
hasattr(method, "__human_feedback_config__")
and method.__human_feedback_config__
):
config = method.__human_feedback_config__
node_metadata["is_human_feedback"] = True
node_metadata["human_feedback_message"] = config.message
if config.emit:
node_metadata["human_feedback_emit"] = list(config.emit)
if config.llm:
llm_str = (
config.llm
if isinstance(config.llm, str)
else str(type(config.llm).__name__)
)
node_metadata["human_feedback_llm"] = llm_str
if config.default_outcome:
node_metadata["human_feedback_default_outcome"] = config.default_outcome
if hasattr(method, "__is_router__") and method.__is_router__:
node_metadata["is_router"] = True
node_metadata["type"] = "router"
if "is_human_feedback" not in node_metadata:
node_metadata["type"] = "router"
else:
node_metadata["type"] = "human_feedback"
router_methods.append(method_name)
if method_name in flow._router_paths:
@@ -317,7 +342,7 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
is_router_path=False,
)
for trigger_method in methods
if str(trigger_method) in nodes
if str(trigger_method) in nodes and str(trigger_method) != listener_name
)
elif is_flow_condition_dict(condition_data):
edges.extend(

View File

@@ -81,6 +81,7 @@ class JSExtension(Extension):
CREWAI_ORANGE = "#FF5A50"
HITL_BLUE = "#4A90E2"
DARK_GRAY = "#333333"
WHITE = "#FFFFFF"
GRAY = "#666666"
@@ -225,6 +226,7 @@ def render_interactive(
nodes_list: list[dict[str, Any]] = []
for name, metadata in dag["nodes"].items():
node_type: str = metadata.get("type", "listen")
is_human_feedback: bool = metadata.get("is_human_feedback", False)
color_config: dict[str, Any]
font_color: str
@@ -241,6 +243,17 @@ def render_interactive(
}
font_color = "var(--node-text-color)"
border_width = 3
elif node_type == "human_feedback":
color_config = {
"background": "var(--node-bg-router)",
"border": HITL_BLUE,
"highlight": {
"background": "var(--node-bg-router)",
"border": HITL_BLUE,
},
}
font_color = "var(--node-text-color)"
border_width = 3
elif node_type == "router":
color_config = {
"background": "var(--node-bg-router)",
@@ -266,16 +279,57 @@ def render_interactive(
title_parts: list[str] = []
type_badge_bg: str = (
CREWAI_ORANGE if node_type in ["start", "router"] else DARK_GRAY
)
display_type = node_type
type_badge_bg: str
if node_type == "human_feedback":
type_badge_bg = HITL_BLUE
display_type = "HITL"
elif node_type in ["start", "router"]:
type_badge_bg = CREWAI_ORANGE
else:
type_badge_bg = DARK_GRAY
title_parts.append(f"""
<div style="border-bottom: 1px solid rgba(102,102,102,0.15); padding-bottom: 8px; margin-bottom: 10px;">
<div style="font-size: 13px; font-weight: 700; color: {DARK_GRAY}; margin-bottom: 6px;">{name}</div>
<span style="display: inline-block; background: {type_badge_bg}; color: white; padding: 2px 8px; border-radius: 4px; font-size: 10px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px;">{node_type}</span>
<span style="display: inline-block; background: {type_badge_bg}; color: white; padding: 2px 8px; border-radius: 4px; font-size: 10px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px;">{display_type}</span>
</div>
""")
if is_human_feedback:
feedback_msg = metadata.get("human_feedback_message", "")
if feedback_msg:
title_parts.append(f"""
<div style="margin-bottom: 8px;">
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 4px; font-weight: 600;">👤 Human Feedback</div>
<div style="background: rgba(74,144,226,0.08); padding: 6px 8px; border-radius: 4px; font-size: 11px; color: {DARK_GRAY}; border: 1px solid rgba(74,144,226,0.2); line-height: 1.4;">{feedback_msg}</div>
</div>
""")
if metadata.get("human_feedback_emit"):
emit_options = metadata["human_feedback_emit"]
emit_items = "".join(
[
f'<li style="margin: 3px 0;"><code style="background: rgba(74,144,226,0.08); padding: 2px 6px; border-radius: 3px; font-size: 10px; color: {HITL_BLUE}; border: 1px solid rgba(74,144,226,0.2); font-weight: 600;">{opt}</code></li>'
for opt in emit_options
]
)
title_parts.append(f"""
<div style="margin-bottom: 8px;">
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 4px; font-weight: 600;">Outcomes</div>
<ul style="list-style: none; padding: 0; margin: 0;">{emit_items}</ul>
</div>
""")
if metadata.get("human_feedback_llm"):
llm_model = metadata["human_feedback_llm"]
title_parts.append(f"""
<div style="margin-bottom: 8px;">
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 3px; font-weight: 600;">LLM</div>
<span style="display: inline-block; background: rgba(102,102,102,0.08); padding: 3px 8px; border-radius: 4px; font-size: 10px; color: {DARK_GRAY}; border: 1px solid rgba(102,102,102,0.12);">{llm_model}</span>
</div>
""")
if metadata.get("condition_type"):
condition = metadata["condition_type"]
if condition == "AND":
@@ -309,7 +363,7 @@ def render_interactive(
</div>
""")
if metadata.get("router_paths"):
if metadata.get("router_paths") and not is_human_feedback:
paths = metadata["router_paths"]
paths_items = "".join(
[
@@ -365,7 +419,11 @@ def render_interactive(
edge_dashes: bool | list[int] = False
if edge["is_router_path"]:
edge_color = CREWAI_ORANGE
source_node = dag["nodes"].get(edge["source"], {})
if source_node.get("is_human_feedback", False):
edge_color = HITL_BLUE
else:
edge_color = CREWAI_ORANGE
edge_dashes = [15, 10]
if "router_path_label" in edge:
edge_label = edge["router_path_label"]
@@ -417,6 +475,7 @@ def render_interactive(
css_content = css_content.replace("'{{ DARK_GRAY }}'", DARK_GRAY)
css_content = css_content.replace("'{{ GRAY }}'", GRAY)
css_content = css_content.replace("'{{ CREWAI_ORANGE }}'", CREWAI_ORANGE)
css_content = css_content.replace("'{{ HITL_BLUE }}'", HITL_BLUE)
css_output_path.write_text(css_content, encoding="utf-8")
@@ -430,6 +489,7 @@ def render_interactive(
js_content = js_content.replace("{{ DARK_GRAY }}", DARK_GRAY)
js_content = js_content.replace("{{ GRAY }}", GRAY)
js_content = js_content.replace("{{ CREWAI_ORANGE }}", CREWAI_ORANGE)
js_content = js_content.replace("{{ HITL_BLUE }}", HITL_BLUE)
js_content = js_content.replace("'{{ nodeData }}'", dag_nodes_json)
js_content = js_content.replace("'{{ dagData }}'", dag_full_json)
js_content = js_content.replace("'{{ nodes_list_json }}'", json.dumps(nodes_list))
@@ -441,6 +501,7 @@ def render_interactive(
html_content = template.render(
CREWAI_ORANGE=CREWAI_ORANGE,
HITL_BLUE=HITL_BLUE,
DARK_GRAY=DARK_GRAY,
WHITE=WHITE,
GRAY=GRAY,

View File

@@ -21,6 +21,11 @@ class NodeMetadata(TypedDict, total=False):
class_signature: str
class_name: str
class_line_number: int
is_human_feedback: bool
human_feedback_message: str
human_feedback_emit: list[str]
human_feedback_llm: str
human_feedback_default_outcome: str
class StructureEdge(TypedDict, total=False):

View File

@@ -37,7 +37,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.llms.base_llm import BaseLLM, get_current_call_id, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.llms.constants import (
ANTHROPIC_MODELS,
AZURE_MODELS,
@@ -770,7 +770,7 @@ class LLM(BaseLLM):
chunk_content = None
response_id = None
if hasattr(chunk, "id"):
if hasattr(chunk,'id'):
response_id = chunk.id
# Safely extract content from various chunk formats
@@ -827,7 +827,7 @@ class LLM(BaseLLM):
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_id=response_id,
response_id=response_id
)
if result is not None:
@@ -849,8 +849,7 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.LLM_CALL,
response_id=response_id,
call_id=get_current_call_id(),
response_id=response_id
),
)
# --- 4) Fallback to non-streaming if no content received
@@ -1016,10 +1015,7 @@ class LLM(BaseLLM):
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise Exception(f"Failed to get streaming response: {e!s}") from e
@@ -1052,8 +1048,7 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.TOOL_CALL,
response_id=response_id,
call_id=get_current_call_id(),
response_id=response_id
),
)
@@ -1481,8 +1476,7 @@ class LLM(BaseLLM):
chunk=chunk_content,
from_task=from_task,
from_agent=from_agent,
response_id=response_id,
call_id=get_current_call_id(),
response_id=response_id
),
)
@@ -1625,12 +1619,7 @@ class LLM(BaseLLM):
logging.error(f"Error executing function '{function_name}': {e}")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=f"Tool execution error: {e!s}",
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
event=LLMCallFailedEvent(error=f"Tool execution error: {e!s}"),
)
crewai_event_bus.emit(
self,
@@ -1680,117 +1669,108 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
raise
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
async def acall(
self,
@@ -1828,54 +1808,43 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
self._validate_call_params()
self._validate_call_params()
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
if self.stream:
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._ahandle_non_streaming_response(
if self.stream:
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
@@ -1883,50 +1852,52 @@ class LLM(BaseLLM):
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
return await self._ahandle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
raise
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
def _handle_emit_call_events(
self,
@@ -1954,7 +1925,6 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)

View File

@@ -7,15 +7,11 @@ in CrewAI, including common functionality for native SDK implementations.
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from datetime import datetime
import json
import logging
import re
from typing import TYPE_CHECKING, Any, Final
import uuid
from pydantic import BaseModel
@@ -54,32 +50,6 @@ DEFAULT_CONTEXT_WINDOW_SIZE: Final[int] = 4096
DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True
_JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL)
_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_call_id", default=None
)
@contextmanager
def llm_call_context() -> Generator[str, None, None]:
"""Context manager that establishes an LLM call scope with a unique call_id."""
call_id = str(uuid.uuid4())
token = _current_call_id.set(call_id)
try:
yield call_id
finally:
_current_call_id.reset(token)
def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
if call_id is None:
logging.warning(
"LLM event emitted outside call context - generating fallback call_id"
)
return str(uuid.uuid4())
return call_id
class BaseLLM(ABC):
"""Abstract base class for LLM implementations.
@@ -381,7 +351,6 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -405,7 +374,6 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -426,7 +394,6 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -461,7 +428,6 @@ class BaseLLM(ABC):
from_agent=from_agent,
call_type=call_type,
response_id=response_id,
call_id=get_current_call_id(),
),
)

View File

@@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any, Final, Literal, TypeGuard, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -266,46 +266,35 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Anthropic
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
# Format messages for Anthropic
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
@@ -313,13 +302,21 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -345,37 +342,28 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
@@ -383,13 +371,21 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_completion_params(
self,

View File

@@ -43,7 +43,7 @@ try:
)
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
except ImportError:
raise ImportError(
@@ -293,44 +293,32 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
@@ -338,8 +326,16 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
async def acall( # type: ignore[return]
self,
@@ -365,35 +361,25 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
formatted_messages = self._format_messages_for_azure(messages)
formatted_messages = self._format_messages_for_azure(messages)
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
@@ -401,8 +387,16 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
def _prepare_completion_params(
self,

View File

@@ -11,7 +11,7 @@ from pydantic import BaseModel
from typing_extensions import Required
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -378,90 +378,77 @@ class BedrockCompletion(BaseLLM):
"""Call AWS Bedrock Converse API."""
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return self._handle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_converse(
if self.stream:
return self._handle_streaming_converse(
formatted_messages,
body,
available_functions,
@@ -470,17 +457,26 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
return self._handle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -518,80 +514,69 @@ class BedrockCompletion(BaseLLM):
'Install with: uv add "crewai[bedrock-async]"'
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return await self._ahandle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_converse(
if self.stream:
return await self._ahandle_streaming_converse(
formatted_messages,
body,
available_functions,
@@ -600,17 +585,26 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
return await self._ahandle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _handle_converse(
self,

View File

@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Literal, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -293,45 +293,33 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
if not self._invoke_before_llm_call_hooks(
messages_for_hooks, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(messages_for_hooks, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return self._handle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
if self.stream:
return self._handle_streaming_completion(
formatted_content,
config,
available_functions,
@@ -340,20 +328,29 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return self._handle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -379,38 +376,28 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
if self.stream:
return await self._ahandle_streaming_completion(
formatted_content,
config,
available_functions,
@@ -419,20 +406,29 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return await self._ahandle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_generation_config(
self,

View File

@@ -17,7 +17,7 @@ from openai.types.responses import Response
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -382,35 +382,23 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if self.api == "responses":
return self._call_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return self._call_completions(
if self.api == "responses":
return self._call_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -419,13 +407,22 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return self._call_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _call_completions(
self,
@@ -482,30 +479,20 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if self.api == "responses":
return await self._acall_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._acall_completions(
if self.api == "responses":
return await self._acall_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -514,13 +501,22 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return await self._acall_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def _acall_completions(
self,
@@ -1525,16 +1521,13 @@ class OpenAICompletion(BaseLLM):
) -> list[dict[str, Any]]:
"""Convert CrewAI tool format to OpenAI function calling format."""
from crewai.llms.providers.utils.common import safe_tool_conversion
from crewai.utilities.pydantic_schema_utils import (
force_additional_properties_false,
)
openai_tools = []
for tool in tools:
name, description, parameters = safe_tool_conversion(tool, "OpenAI")
openai_tool: dict[str, Any] = {
openai_tool = {
"type": "function",
"function": {
"name": name,
@@ -1544,11 +1537,10 @@ class OpenAICompletion(BaseLLM):
}
if parameters:
params_dict = (
parameters if isinstance(parameters, dict) else dict(parameters)
)
params_dict = force_additional_properties_false(params_dict)
openai_tool["function"]["parameters"] = params_dict
if isinstance(parameters, dict):
openai_tool["function"]["parameters"] = parameters # type: ignore
else:
openai_tool["function"]["parameters"] = dict(parameters)
openai_tools.append(openai_tool)
return openai_tools

View File

@@ -127,36 +127,6 @@ def add_key_in_dict_recursively(
return d
def force_additional_properties_false(d: Any) -> Any:
"""Force additionalProperties=false on all object-type dicts recursively.
OpenAI strict mode requires all objects to have additionalProperties=false.
This function overwrites any existing value to ensure compliance.
Also ensures objects have properties and required arrays, even if empty,
as OpenAI strict mode requires these for all object types.
Args:
d: The dictionary/list to modify.
Returns:
The modified dictionary/list.
"""
if isinstance(d, dict):
if d.get("type") == "object":
d["additionalProperties"] = False
if "properties" not in d:
d["properties"] = {}
if "required" not in d:
d["required"] = []
for v in d.values():
force_additional_properties_false(v)
elif isinstance(d, list):
for i in d:
force_additional_properties_false(i)
return d
def fix_discriminator_mappings(schema: dict[str, Any]) -> dict[str, Any]:
"""Replace '#/$defs/...' references in discriminator.mapping with just the model name.
@@ -308,7 +278,13 @@ def generate_model_description(model: type[BaseModel]) -> dict[str, Any]:
"""
json_schema = model.model_json_schema(ref_template="#/$defs/{model}")
json_schema = force_additional_properties_false(json_schema)
json_schema = add_key_in_dict_recursively(
json_schema,
key="additionalProperties",
value=False,
criteria=lambda d: d.get("type") == "object"
and "additionalProperties" not in d,
)
json_schema = resolve_refs(json_schema)
@@ -402,9 +378,6 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
"""
effective_root = root_schema or json_schema
json_schema = force_additional_properties_false(json_schema)
effective_root = force_additional_properties_false(effective_root)
if "allOf" in json_schema:
json_schema = _merge_all_of_schemas(json_schema["allOf"], effective_root)
if "title" not in json_schema and "title" in (root_schema or {}):

View File

@@ -235,13 +235,8 @@ class TestAsyncAgentExecutor:
mock_crew: MagicMock, mock_tools_handler: MagicMock
) -> None:
"""Test that multiple ainvoke calls can run concurrently."""
max_concurrent = 0
current_concurrent = 0
lock = asyncio.Lock()
async def create_and_run_executor(executor_id: int) -> dict[str, Any]:
nonlocal max_concurrent, current_concurrent
executor = CrewAgentExecutor(
llm=mock_llm,
task=mock_task,
@@ -257,13 +252,7 @@ class TestAsyncAgentExecutor:
)
async def delayed_response(*args: Any, **kwargs: Any) -> str:
nonlocal max_concurrent, current_concurrent
async with lock:
current_concurrent += 1
max_concurrent = max(max_concurrent, current_concurrent)
await asyncio.sleep(0.01)
async with lock:
current_concurrent -= 1
await asyncio.sleep(0.05)
return f"Thought: Done\nFinal Answer: Result from executor {executor_id}"
with patch(
@@ -284,15 +273,19 @@ class TestAsyncAgentExecutor:
}
)
import time
start = time.time()
results = await asyncio.gather(
create_and_run_executor(1),
create_and_run_executor(2),
create_and_run_executor(3),
)
elapsed = time.time() - start
assert len(results) == 3
assert all("output" in r for r in results)
assert max_concurrent > 1, f"Expected concurrent execution, max concurrent was {max_concurrent}"
assert elapsed < 0.15, f"Expected concurrent execution, took {elapsed}s"
class TestAsyncLLMResponseHelper:

View File

@@ -1,108 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpUSxS5LeHwDTELElWlC5CDMzmr\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437564,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi there! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 10,\n \"total_tokens\": 19,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:05 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '460'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '477'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,215 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpStmyOpe9DrthWBlDdMZfVMJ1u\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 9,\n \"total_tokens\": 18,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:02 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '415'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '434'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"user","content":"Say bye"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '72'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpS1DP0Xd3tmWt5PBincVrdU7yw\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Goodbye! If you have more questions
in the future, feel free to reach out. Have a great day!\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 23,\n \"total_tokens\": 32,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:03 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '964'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '979'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,143 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini","stream":true,"stream_options":{"include_usage":true}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '125'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: 'data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"rVIyGQF2E"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ZGVqV7ZDm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"vnfm7IxlIB"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
How"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"o8F35ZZ"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
can"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"kiBzGe3"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
I"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"cbGT2RWgx"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
assist"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"DtxR"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
you"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"6y6Co8J"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
today"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SZOmm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"s9Bc0HqlPg"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"u9aar"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":9,"completion_tokens":9,"total_tokens":18,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"5hudm8ySqh39"}
data: [DONE]
'
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Mon, 26 Jan 2026 14:26:04 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '260'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '275'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -296,23 +296,6 @@ def test_create_folder_structure_folder_name_validation():
shutil.rmtree(folder_path)
def test_create_folder_structure_rejects_reserved_names():
"""Test that reserved script names are rejected to prevent pyproject.toml conflicts."""
with tempfile.TemporaryDirectory() as temp_dir:
reserved_names = ["test", "train", "replay", "run_crew", "run_with_trigger"]
for reserved_name in reserved_names:
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(reserved_name, parent_folder=temp_dir)
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(f"{reserved_name}/", parent_folder=temp_dir)
capitalized = reserved_name.capitalize()
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(capitalized, parent_folder=temp_dir)
@mock.patch("crewai.cli.create_crew.create_folder_structure")
@mock.patch("crewai.cli.create_crew.copy_template")
@mock.patch("crewai.cli.create_crew.load_env_vars")

View File

@@ -1,20 +1,10 @@
"""Test for version management."""
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai import __version__
from crewai.cli.version import (
_get_cache_file,
_is_cache_valid,
get_crewai_version,
get_latest_version_from_pypi,
is_newer_version_available,
)
from crewai.cli.version import get_crewai_version
def test_dynamic_versioning_consistency() -> None:
def test_dynamic_versioning_consistency():
"""Test that dynamic versioning provides consistent version across all access methods."""
cli_version = get_crewai_version()
package_version = __version__
@@ -25,186 +15,3 @@ def test_dynamic_versioning_consistency() -> None:
# Version should not be empty
assert package_version is not None
assert len(package_version.strip()) > 0
class TestVersionChecking:
"""Test version checking utilities."""
def test_get_crewai_version(self) -> None:
"""Test getting current crewai version."""
version = get_crewai_version()
assert isinstance(version, str)
assert len(version) > 0
def test_get_cache_file(self) -> None:
"""Test cache file path generation."""
cache_file = _get_cache_file()
assert isinstance(cache_file, Path)
assert cache_file.name == "version_cache.json"
def test_is_cache_valid_with_fresh_cache(self) -> None:
"""Test cache validation with fresh cache."""
cache_data = {"timestamp": datetime.now().isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is True
def test_is_cache_valid_with_stale_cache(self) -> None:
"""Test cache validation with stale cache."""
old_time = datetime.now() - timedelta(hours=25)
cache_data = {"timestamp": old_time.isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
def test_is_cache_valid_with_missing_timestamp(self) -> None:
"""Test cache validation with missing timestamp."""
cache_data = {"version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
@patch("crewai.cli.version.Path.exists")
@patch("crewai.cli.version.request.urlopen")
def test_get_latest_version_from_pypi_success(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test successful PyPI version fetch."""
# Mock cache not existing to force fetch from PyPI
mock_exists.return_value = False
mock_response = MagicMock()
mock_response.read.return_value = b'{"info": {"version": "2.0.0"}}'
mock_urlopen.return_value.__enter__.return_value = mock_response
version = get_latest_version_from_pypi()
assert version == "2.0.0"
@patch("crewai.cli.version.Path.exists")
@patch("crewai.cli.version.request.urlopen")
def test_get_latest_version_from_pypi_failure(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test PyPI version fetch failure."""
from urllib.error import URLError
# Mock cache not existing to force fetch from PyPI
mock_exists.return_value = False
mock_urlopen.side_effect = URLError("Network error")
version = get_latest_version_from_pypi()
assert version is None
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_true(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when newer version is available."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is True
assert current == "1.0.0"
assert latest == "2.0.0"
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_false(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when no newer version is available."""
mock_current.return_value = "2.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "2.0.0"
assert latest == "2.0.0"
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_with_none_latest(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when PyPI fetch fails."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = None
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "1.0.0"
assert latest is None
class TestConsoleFormatterVersionCheck:
"""Test version check display in ConsoleFormatter."""
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": ""})
def test_version_message_shows_when_update_available_and_verbose(
self, mock_check: MagicMock
) -> None:
"""Test version message shows when update available and verbose enabled."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
assert mock_print.call_count == 2
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
def test_version_message_hides_when_verbose_false(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when verbose disabled."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=False)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
def test_version_message_hides_when_no_update_available(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when no update available."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (False, "2.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": "true"})
def test_version_message_hides_in_ci_environment(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when running in CI/CD."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": "1"})
def test_version_message_hides_in_ci_environment_with_numeric_value(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when CI=1."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()

View File

@@ -8,6 +8,7 @@ from pathlib import Path
import pytest
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.human_feedback import human_feedback
from crewai.flow.visualization import (
build_flow_structure,
visualize_flow_structure,
@@ -667,4 +668,180 @@ def test_no_warning_for_properly_typed_router(caplog):
# No warnings should be logged
warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
assert not any("Could not determine return paths" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
def test_human_feedback_node_metadata():
"""Test that human feedback nodes have correct metadata."""
from typing import Literal
class HITLFlow(Flow):
"""Flow with human-in-the-loop feedback."""
@start()
@human_feedback(
message="Please review the output:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review_content(self) -> Literal["approved", "rejected"]:
return "approved"
@listen("approved")
def on_approved(self):
return "published"
@listen("rejected")
def on_rejected(self):
return "discarded"
flow = HITLFlow()
structure = build_flow_structure(flow)
review_node = structure["nodes"]["review_content"]
assert review_node["is_human_feedback"] is True
assert review_node["type"] == "human_feedback"
assert review_node["human_feedback_message"] == "Please review the output:"
assert review_node["human_feedback_emit"] == ["approved", "rejected"]
assert review_node["human_feedback_llm"] == "gpt-4o-mini"
def test_human_feedback_visualization_includes_hitl_data():
"""Test that visualization includes human feedback data in HTML."""
from typing import Literal
class HITLFlow(Flow):
"""Flow with human-in-the-loop feedback."""
@start()
@human_feedback(
message="Please review the output:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review_content(self) -> Literal["approved", "rejected"]:
return "approved"
@listen("approved")
def on_approved(self):
return "published"
flow = HITLFlow()
structure = build_flow_structure(flow)
html_file = visualize_flow_structure(structure, "test_hitl.html", show=False)
html_path = Path(html_file)
js_file = html_path.parent / f"{html_path.stem}_script.js"
js_content = js_file.read_text(encoding="utf-8")
assert "HITL" in js_content
assert "Please review the output:" in js_content
assert "approved" in js_content
assert "rejected" in js_content
assert "#4A90E2" in js_content
def test_human_feedback_without_emit_metadata():
"""Test that human feedback without emit has correct metadata."""
class HITLSimpleFlow(Flow):
"""Flow with simple human feedback (no routing)."""
@start()
@human_feedback(message="Please provide feedback:")
def review_step(self):
return "content"
@listen(review_step)
def next_step(self):
return "done"
flow = HITLSimpleFlow()
structure = build_flow_structure(flow)
review_node = structure["nodes"]["review_step"]
assert review_node["is_human_feedback"] is True
assert "is_router" not in review_node or review_node["is_router"] is False
assert review_node["type"] == "start"
assert review_node["human_feedback_message"] == "Please provide feedback:"
def test_human_feedback_with_default_outcome():
"""Test that human feedback with default outcome includes it in metadata."""
from typing import Literal
class HITLDefaultFlow(Flow):
"""Flow with human feedback that has a default outcome."""
@start()
@human_feedback(
message="Review this:",
emit=["approved", "needs_work"],
llm="gpt-4o-mini",
default_outcome="needs_work",
)
def review(self) -> Literal["approved", "needs_work"]:
return "approved"
@listen("approved")
def on_approved(self):
return "published"
@listen("needs_work")
def on_needs_work(self):
return "revised"
flow = HITLDefaultFlow()
structure = build_flow_structure(flow)
review_node = structure["nodes"]["review"]
assert review_node["is_human_feedback"] is True
assert review_node["human_feedback_default_outcome"] == "needs_work"
def test_mixed_router_and_human_feedback():
"""Test flow with both regular routers and human feedback routers."""
from typing import Literal
class MixedFlow(Flow):
"""Flow with both regular routers and HITL."""
@start()
def init(self):
return "initialized"
@router(init)
def auto_decision(self) -> Literal["path_a", "path_b"]:
return "path_a"
@listen("path_a")
@human_feedback(
message="Review this step:",
emit=["continue", "stop"],
llm="gpt-4o-mini",
)
def human_review(self) -> Literal["continue", "stop"]:
return "continue"
@listen("continue")
def proceed(self):
return "done"
@listen("stop")
def halt(self):
return "halted"
flow = MixedFlow()
structure = build_flow_structure(flow)
auto_node = structure["nodes"]["auto_decision"]
assert auto_node["type"] == "router"
assert auto_node["is_router"] is True
assert "is_human_feedback" not in auto_node or auto_node["is_human_feedback"] is False
human_node = structure["nodes"]["human_review"]
assert human_node["type"] == "human_feedback"
assert human_node["is_router"] is True
assert human_node["is_human_feedback"] is True
assert human_node["human_feedback_message"] == "Review this step:"

View File

@@ -217,7 +217,6 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Hello ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -225,7 +224,6 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="World!",
call_id="test-call-id",
),
)
return mock_output
@@ -286,7 +284,6 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="",
call_id="test-call-id",
tool_call=ToolCall(
id="call-123",
function=FunctionCall(
@@ -367,7 +364,6 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -375,7 +371,6 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Stream!",
call_id="test-call-id",
),
)
return mock_output
@@ -456,7 +451,6 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Flow ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -464,7 +458,6 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="output!",
call_id="test-call-id",
),
)
return "done"
@@ -552,7 +545,6 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async flow ",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -561,7 +553,6 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="stream!",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -695,7 +686,6 @@ class TestStreamingEdgeCases:
type="llm_stream_chunk",
chunk="Task 1",
task_name="First task",
call_id="test-call-id",
),
)
return mock_output

View File

@@ -249,8 +249,6 @@ def test_guardrail_emits_events(sample_agent):
result = task.execute_sync(agent=sample_agent)
crewai_event_bus.flush(timeout=10.0)
with condition:
success = condition.wait_for(
lambda: len(started_guardrail) >= 2 and len(completed_guardrail) >= 2,
@@ -269,8 +267,6 @@ def test_guardrail_emits_events(sample_agent):
task.execute_sync(agent=sample_agent)
crewai_event_bus.flush(timeout=10.0)
with condition:
success = condition.wait_for(
lambda: len(started_guardrail) >= 3 and len(completed_guardrail) >= 3,

View File

@@ -984,8 +984,8 @@ def test_streaming_fallback_to_non_streaming():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
nonlocal fallback_called
# Emit a couple of chunks to simulate partial streaming
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id="Id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id="Id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id = "Id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id = "Id"))
# Mark that fallback would be called
fallback_called = True
@@ -1041,7 +1041,7 @@ def test_streaming_empty_response_handling():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
# Emit a few empty chunks
for _ in range(3):
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="", response_id="id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="",response_id="id"))
# Return the default message for empty responses
return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request."
@@ -1280,105 +1280,6 @@ def test_llm_emits_event_with_lite_agent():
assert set(all_agent_id) == {str(agent.id)}
# ----------- CALL_ID CORRELATION TESTS -----------
@pytest.mark.vcr()
def test_llm_call_events_share_call_id():
"""All events from a single LLM call should share the same call_id."""
import uuid
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
with condition:
success = condition.wait_for(lambda: len(events) >= 2, timeout=10)
assert success, "Timeout waiting for LLM events"
# Behavior: all events from the call share the same call_id
assert len(events) == 2
assert events[0].call_id == events[1].call_id
# call_id should be a valid UUID
uuid.UUID(events[0].call_id)
@pytest.mark.vcr()
def test_streaming_chunks_share_call_id_with_call():
"""Streaming chunks should share call_id with started/completed events."""
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_chunk(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini", stream=True)
llm.call("Say hi")
with condition:
# Wait for at least started, some chunks, and completed
success = condition.wait_for(lambda: len(events) >= 3, timeout=10)
assert success, "Timeout waiting for streaming events"
# Behavior: all events (started, chunks, completed) share the same call_id
call_ids = {e.call_id for e in events}
assert len(call_ids) == 1
@pytest.mark.vcr()
def test_separate_llm_calls_have_different_call_ids():
"""Different LLM calls should have different call_ids."""
call_ids = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
call_ids.append(event.call_id)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
llm.call("Say bye")
with condition:
success = condition.wait_for(lambda: len(call_ids) >= 2, timeout=10)
assert success, "Timeout waiting for LLM call events"
# Behavior: each call has its own call_id
assert len(call_ids) == 2
assert call_ids[0] != call_ids[1]
# ----------- HUMAN FEEDBACK EVENTS -----------

View File

@@ -28,7 +28,7 @@ dev = [
"boto3-stubs[bedrock-runtime]==1.40.54",
"types-psycopg2==2.9.21.20251012",
"types-pymysql==1.1.0.20250916",
"types-aiofiles~=25.1.0",
"types-aiofiles~=24.1.0",
]

12
uv.lock generated
View File

@@ -51,7 +51,7 @@ dev = [
{ name = "pytest-timeout", specifier = "==2.4.0" },
{ name = "pytest-xdist", specifier = "==3.8.0" },
{ name = "ruff", specifier = "==0.14.7" },
{ name = "types-aiofiles", specifier = "~=25.1.0" },
{ name = "types-aiofiles", specifier = "~=24.1.0" },
{ name = "types-appdirs", specifier = "==1.4.*" },
{ name = "types-psycopg2", specifier = "==2.9.21.20251012" },
{ name = "types-pymysql", specifier = "==1.1.0.20250916" },
@@ -1294,10 +1294,10 @@ requires-dist = [
{ name = "json-repair", specifier = "~=0.25.2" },
{ name = "json5", specifier = "~=0.10.0" },
{ name = "jsonref", specifier = "~=1.1.0" },
{ name = "litellm", marker = "extra == 'litellm'", specifier = ">=1.74.9,<3" },
{ name = "litellm", marker = "extra == 'litellm'", specifier = "~=1.74.9" },
{ name = "mcp", specifier = "~=1.23.1" },
{ name = "mem0ai", marker = "extra == 'mem0'", specifier = "~=0.1.94" },
{ name = "openai", specifier = ">=1.83.0,<3" },
{ name = "openai", specifier = "~=1.83.0" },
{ name = "openpyxl", specifier = "~=3.1.5" },
{ name = "openpyxl", marker = "extra == 'openpyxl'", specifier = "~=3.1.5" },
{ name = "opentelemetry-api", specifier = "~=1.34.0" },
@@ -8282,11 +8282,11 @@ wheels = [
[[package]]
name = "types-aiofiles"
version = "25.1.0.20251011"
version = "24.1.0.20250822"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/84/6c/6d23908a8217e36704aa9c79d99a620f2fdd388b66a4b7f72fbc6b6ff6c6/types_aiofiles-25.1.0.20251011.tar.gz", hash = "sha256:1c2b8ab260cb3cd40c15f9d10efdc05a6e1e6b02899304d80dfa0410e028d3ff", size = 14535, upload-time = "2025-10-11T02:44:51.237Z" }
sdist = { url = "https://files.pythonhosted.org/packages/19/48/c64471adac9206cc844afb33ed311ac5a65d2f59df3d861e0f2d0cad7414/types_aiofiles-24.1.0.20250822.tar.gz", hash = "sha256:9ab90d8e0c307fe97a7cf09338301e3f01a163e39f3b529ace82466355c84a7b", size = 14484, upload-time = "2025-08-22T03:02:23.039Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/71/0f/76917bab27e270bb6c32addd5968d69e558e5b6f7fb4ac4cbfa282996a96/types_aiofiles-25.1.0.20251011-py3-none-any.whl", hash = "sha256:8ff8de7f9d42739d8f0dadcceeb781ce27cd8d8c4152d4a7c52f6b20edb8149c", size = 14338, upload-time = "2025-10-11T02:44:50.054Z" },
{ url = "https://files.pythonhosted.org/packages/bc/8e/5e6d2215e1d8f7c2a94c6e9d0059ae8109ce0f5681956d11bb0a228cef04/types_aiofiles-24.1.0.20250822-py3-none-any.whl", hash = "sha256:0ec8f8909e1a85a5a79aed0573af7901f53120dd2a29771dd0b3ef48e12328b0", size = 14322, upload-time = "2025-08-22T03:02:21.918Z" },
]
[[package]]