Compare commits

..

11 Commits

Author SHA1 Message Date
dependabot[bot]
d3f424fd8f chore(deps-dev): bump types-aiofiles
Some checks are pending
Build uv cache / build-cache (3.10) (push) Waiting to run
Build uv cache / build-cache (3.11) (push) Waiting to run
Build uv cache / build-cache (3.12) (push) Waiting to run
Build uv cache / build-cache (3.13) (push) Waiting to run
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Notify Downstream / notify-downstream (push) Waiting to run
Bumps [types-aiofiles](https://github.com/typeshed-internal/stub_uploader) from 24.1.0.20250822 to 25.1.0.20251011.
- [Commits](https://github.com/typeshed-internal/stub_uploader/commits)

---
updated-dependencies:
- dependency-name: types-aiofiles
  dependency-version: 25.1.0.20251011
  dependency-type: direct:development
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-02-03 12:02:28 -05:00
Matt Aitchison
fee9445067 fix: add .python-version to fix Dependabot uv updates (#4352)
Dependabot's uv updater defaults to Python 3.14.2, which is incompatible
with the project's requires-python constraint (>=3.10, <3.14). Adding
.python-version pins the Python version to 3.13 for dependency updates.

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-02-03 10:55:01 -06:00
Greyson LaLonde
a3c01265ee feat: add version check & integrate update notices 2026-02-03 10:17:50 -05:00
Matt Aitchison
aa7e7785bc chore: group dependabot security updates into single PR (#4351)
Configure dependabot to batch security updates together while keeping
regular version updates as separate PRs.
2026-02-03 08:53:28 -06:00
Thiago Moretto
e30645e855 limit stagehand dep version to 0.5.9 due breaking changes (#4339)
* limit to 0.5.9 due breaking changes + add env vars requirements

* fix tool spec extract that was ignoring with default

* original tool spec

* update spec
2026-02-03 09:43:24 -05:00
Greyson LaLonde
c1d2801be2 fix: reject reserved script names for crew folders 2026-02-03 09:16:55 -05:00
Greyson LaLonde
6a8483fcb6 fix: resolve race condition in guardrail event emission test 2026-02-03 09:06:48 -05:00
Greyson LaLonde
5fb602dff2 fix: replace timing-based concurrency test with state tracking 2026-02-03 08:58:51 -05:00
Greyson LaLonde
b90cff580a fix: relax openai and litellm dependency constraints 2026-02-03 08:51:55 -05:00
Vini Brasil
576b74b2ef Add call_id to LLM events for correlating requests (#4281)
When monitoring LLM events, consumers need to know which events belong
to the same API call. Before this change, there was no way to correlate
LLMCallStartedEvent, LLMStreamChunkEvent, and LLMCallCompletedEvent
belonging to the same request.
2026-02-03 10:10:33 -03:00
Greyson LaLonde
7590d4c6e3 fix: enforce additionalProperties=false in schemas
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
* fix: enforce additionalProperties=false in schemas

* fix: ensure nested items have required properties
2026-02-02 22:19:04 -05:00
39 changed files with 18787 additions and 1500 deletions

View File

@@ -5,7 +5,12 @@
version: 2
updates:
- package-ecosystem: uv # See documentation for possible values
directory: "/" # Location of package manifests
- package-ecosystem: uv
directory: "/"
schedule:
interval: "weekly"
groups:
security-updates:
applies-to: security-updates
patterns:
- "*"

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.13

View File

@@ -8,8 +8,9 @@ from typing import Any
from crewai.tools.base_tool import BaseTool, EnvVar
from pydantic import BaseModel
from pydantic.fields import FieldInfo
from pydantic.json_schema import GenerateJsonSchema
from pydantic_core import PydanticOmit
from pydantic_core import PydanticOmit, PydanticUndefined
from crewai_tools import tools
@@ -44,6 +45,9 @@ class ToolSpecExtractor:
schema = self._unwrap_schema(core_schema)
fields = schema.get("schema", {}).get("fields", {})
# Use model_fields to get defaults (handles both default and default_factory)
model_fields = tool_class.model_fields
tool_info = {
"name": tool_class.__name__,
"humanized_name": self._extract_field_default(
@@ -54,9 +58,9 @@ class ToolSpecExtractor:
).strip(),
"run_params_schema": self._extract_params(fields.get("args_schema")),
"init_params_schema": self._extract_init_params(tool_class),
"env_vars": self._extract_env_vars(fields.get("env_vars")),
"package_dependencies": self._extract_field_default(
fields.get("package_dependencies"), fallback=[]
"env_vars": self._extract_env_vars_from_model_fields(model_fields),
"package_dependencies": self._extract_package_deps_from_model_fields(
model_fields
),
}
@@ -103,10 +107,27 @@ class ToolSpecExtractor:
return {}
@staticmethod
def _extract_env_vars(
env_vars_field: dict[str, Any] | None,
def _get_field_default(field: FieldInfo | None) -> Any:
"""Get default value from a FieldInfo, handling both default and default_factory."""
if not field:
return None
default_value = field.default
if default_value is PydanticUndefined or default_value is None:
if field.default_factory:
return field.default_factory()
return None
return default_value
@staticmethod
def _extract_env_vars_from_model_fields(
model_fields: dict[str, FieldInfo],
) -> list[dict[str, Any]]:
if not env_vars_field:
default_value = ToolSpecExtractor._get_field_default(
model_fields.get("env_vars")
)
if not default_value:
return []
return [
@@ -116,10 +137,22 @@ class ToolSpecExtractor:
"required": env_var.required,
"default": env_var.default,
}
for env_var in env_vars_field.get("schema", {}).get("default", [])
for env_var in default_value
if isinstance(env_var, EnvVar)
]
@staticmethod
def _extract_package_deps_from_model_fields(
model_fields: dict[str, FieldInfo],
) -> list[str]:
default_value = ToolSpecExtractor._get_field_default(
model_fields.get("package_dependencies")
)
if not isinstance(default_value, list):
return []
return default_value
@staticmethod
def _extract_init_params(tool_class: type[BaseTool]) -> dict[str, Any]:
ignored_init_params = [
@@ -152,7 +185,7 @@ class ToolSpecExtractor:
if __name__ == "__main__":
output_file = Path(__file__).parent / "tool.specs.json"
output_file = Path(__file__).parent.parent.parent / "tool.specs.json"
extractor = ToolSpecExtractor()
extractor.extract_all_tools()

View File

@@ -13,16 +13,10 @@ from crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder impor
from crewai_tools.tools.crewai_platform_tools.crewai_platform_tools import (
CrewaiPlatformTools,
)
from crewai_tools.tools.crewai_platform_tools.file_hook import (
process_file_markers,
register_file_processing_hook,
)
__all__ = [
"CrewAIPlatformActionTool",
"CrewaiPlatformToolBuilder",
"CrewaiPlatformTools",
"process_file_markers",
"register_file_processing_hook",
]

View File

@@ -2,8 +2,6 @@
import json
import os
import re
import tempfile
from typing import Any
from crewai.tools import BaseTool
@@ -16,26 +14,6 @@ from crewai_tools.tools.crewai_platform_tools.misc import (
get_platform_integration_token,
)
_FILE_MARKER_PREFIX = "__CREWAI_FILE__"
_MIME_TO_EXTENSION = {
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
"application/vnd.ms-excel": ".xls",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/vnd.ms-powerpoint": ".ppt",
"application/pdf": ".pdf",
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"image/webp": ".webp",
"text/plain": ".txt",
"text/csv": ".csv",
"application/json": ".json",
"application/zip": ".zip",
}
class CrewAIPlatformActionTool(BaseTool):
action_name: str = Field(default="", description="The name of the action")
@@ -93,18 +71,10 @@ class CrewAIPlatformActionTool(BaseTool):
url=api_url,
headers=headers,
json=payload,
timeout=300,
stream=True,
timeout=60,
verify=os.environ.get("CREWAI_FACTORY", "false").lower() != "true",
)
content_type = response.headers.get("Content-Type", "")
# Check if response is binary (non-JSON)
if "application/json" not in content_type:
return self._handle_binary_response(response)
# Normal JSON response
data = response.json()
if not response.ok:
if isinstance(data, dict):
@@ -121,49 +91,3 @@ class CrewAIPlatformActionTool(BaseTool):
except Exception as e:
return f"Error executing action {self.action_name}: {e!s}"
def _handle_binary_response(self, response: requests.Response) -> str:
"""Handle binary streaming response from the API.
Streams the binary content to a temporary file and returns a marker
that can be processed by the file hook to inject the file into the
LLM context.
Args:
response: The streaming HTTP response with binary content.
Returns:
A file marker string in the format:
__CREWAI_FILE__:filename:content_type:file_path
"""
content_type = response.headers.get("Content-Type", "application/octet-stream")
filename = self._extract_filename_from_headers(response.headers)
extension = self._get_file_extension(content_type, filename)
with tempfile.NamedTemporaryFile(
delete=False, suffix=extension, prefix="crewai_"
) as tmp_file:
for chunk in response.iter_content(chunk_size=8192):
tmp_file.write(chunk)
tmp_path = tmp_file.name
return f"{_FILE_MARKER_PREFIX}:{filename}:{content_type}:{tmp_path}"
def _extract_filename_from_headers(
self, headers: requests.structures.CaseInsensitiveDict
) -> str:
content_disposition = headers.get("Content-Disposition", "")
if content_disposition:
match = re.search(r'filename="?([^";\s]+)"?', content_disposition)
if match:
return match.group(1)
return "downloaded_file"
def _get_file_extension(self, content_type: str, filename: str) -> str:
if "." in filename:
return "." + filename.rsplit(".", 1)[-1]
base_content_type = content_type.split(";")[0].strip()
return _MIME_TO_EXTENSION.get(base_content_type, "")

View File

@@ -6,9 +6,6 @@ from crewai_tools.adapters.tool_collection import ToolCollection
from crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder import (
CrewaiPlatformToolBuilder,
)
from crewai_tools.tools.crewai_platform_tools.file_hook import (
register_file_processing_hook,
)
logger = logging.getLogger(__name__)
@@ -25,8 +22,6 @@ def CrewaiPlatformTools( # noqa: N802
Returns:
A list of BaseTool instances for platform actions
"""
register_file_processing_hook()
builder = CrewaiPlatformToolBuilder(apps=apps)
return builder.tools() # type: ignore

View File

@@ -1,132 +0,0 @@
"""File processing hook for CrewAI Platform Tools.
This module provides a hook that processes file markers returned by platform tools
and injects the files into the LLM context for native file handling.
"""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from crewai.hooks.tool_hooks import ToolCallHookContext
logger = logging.getLogger(__name__)
_FILE_MARKER_PREFIX = "__CREWAI_FILE__"
_hook_registered = False
def process_file_markers(context: ToolCallHookContext) -> str | None:
"""Process file markers in tool results and inject files into context.
This hook detects file markers returned by platform tools (e.g., download_file)
and converts them into FileInput objects that are attached to the hook context.
The agent executor will then inject these files into the tool message for
native LLM file handling.
The marker format is:
__CREWAI_FILE__:filename:content_type:file_path
Args:
context: The tool call hook context containing the tool result.
Returns:
A human-readable message if a file was processed, None otherwise.
"""
result = context.tool_result
if not result or not result.startswith(_FILE_MARKER_PREFIX):
return None
try:
parts = result.split(":", 3)
if len(parts) < 4:
logger.warning(f"Invalid file marker format: {result[:100]}")
return None
_, filename, content_type, file_path = parts
if not os.path.isfile(file_path):
logger.error(f"File not found: {file_path}")
return f"Error: Downloaded file not found at {file_path}"
try:
from crewai_files import File
except ImportError:
logger.warning(
"crewai_files not installed. File will not be attached to LLM context."
)
return (
f"Downloaded file: {filename} ({content_type}). "
f"File saved at: {file_path}. "
"Note: Install crewai_files for native LLM file handling."
)
file = File(source=file_path, content_type=content_type, filename=filename)
context.files = {filename: file}
file_size = os.path.getsize(file_path)
size_str = _format_file_size(file_size)
return f"Downloaded file: {filename} ({content_type}, {size_str}). File is attached for LLM analysis."
except Exception as e:
logger.exception(f"Error processing file marker: {e}")
return f"Error processing downloaded file: {e}"
def _format_file_size(size_bytes: int) -> str:
"""Format file size in human-readable format.
Args:
size_bytes: Size in bytes.
Returns:
Human-readable size string.
"""
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"
def register_file_processing_hook() -> bool:
"""Register the file processing hook globally.
This function should be called once during application initialization
to enable automatic file injection for platform tools.
Returns:
True if the hook was registered, False if it was already registered
or if registration failed.
"""
global _hook_registered
if _hook_registered:
logger.debug("File processing hook already registered")
return False
try:
from crewai.hooks import register_after_tool_call_hook
register_after_tool_call_hook(process_file_markers)
_hook_registered = True
logger.info("File processing hook registered successfully")
return True
except ImportError:
logger.warning(
"crewai.hooks not available. File processing hook not registered."
)
return False
except Exception as e:
logger.exception(f"Failed to register file processing hook: {e}")
return False

View File

@@ -4,7 +4,7 @@ import os
import re
from typing import Any
from crewai.tools import BaseTool
from crewai.tools import BaseTool, EnvVar
from pydantic import BaseModel, Field
@@ -137,7 +137,21 @@ class StagehandTool(BaseTool):
- 'observe': For finding elements in a specific area
"""
args_schema: type[BaseModel] = StagehandToolSchema
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand"])
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand<=0.5.9"])
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="BROWSERBASE_API_KEY",
description="API key for Browserbase services",
required=False,
),
EnvVar(
name="BROWSERBASE_PROJECT_ID",
description="Project ID for Browserbase services",
required=False,
),
]
)
# Stagehand configuration
api_key: str | None = None

View File

@@ -23,23 +23,26 @@ class MockTool(BaseTool):
)
my_parameter: str = Field("This is default value", description="What a description")
my_parameter_bool: bool = Field(False)
# Use default_factory like real tools do (not direct default)
package_dependencies: list[str] = Field(
["this-is-a-required-package", "another-required-package"], description=""
default_factory=lambda: ["this-is-a-required-package", "another-required-package"]
)
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="SERPER_API_KEY",
description="API key for Serper",
required=True,
default=None,
),
EnvVar(
name="API_RATE_LIMIT",
description="API rate limit",
required=False,
default="100",
),
]
)
env_vars: list[EnvVar] = [
EnvVar(
name="SERPER_API_KEY",
description="API key for Serper",
required=True,
default=None,
),
EnvVar(
name="API_RATE_LIMIT",
description="API rate limit",
required=False,
default="100",
),
]
@pytest.fixture

View File

@@ -2,7 +2,6 @@ import unittest
from unittest.mock import Mock, patch
from crewai_tools.tools.crewai_platform_tools import CrewaiPlatformTools
from crewai_tools.tools.crewai_platform_tools import file_hook
class TestCrewaiPlatformTools(unittest.TestCase):
@@ -114,64 +113,3 @@ class TestCrewaiPlatformTools(unittest.TestCase):
with self.assertRaises(ValueError) as context:
CrewaiPlatformTools(apps=["github"])
assert "No platform integration token found" in str(context.exception)
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token"})
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder.requests.get"
)
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tools.register_file_processing_hook"
)
def test_crewai_platform_tools_registers_file_hook(
self, mock_register_hook, mock_get
):
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"actions": {"github": []}}
mock_get.return_value = mock_response
CrewaiPlatformTools(apps=["github"])
mock_register_hook.assert_called_once()
class TestFileHook(unittest.TestCase):
def setUp(self):
file_hook._hook_registered = False
def tearDown(self):
file_hook._hook_registered = False
@patch("crewai.hooks.register_after_tool_call_hook")
def test_register_hook_is_idempotent(self, mock_register):
"""Test hook registration succeeds once and is idempotent."""
assert file_hook.register_file_processing_hook() is True
assert file_hook._hook_registered is True
mock_register.assert_called_once_with(file_hook.process_file_markers)
# Second call should return False and not register again
assert file_hook.register_file_processing_hook() is False
mock_register.assert_called_once()
def test_process_file_markers_ignores_non_file_results(self):
"""Test that non-file-marker results return None."""
test_cases = [
None, # Empty result
"Regular tool output", # Non-marker
"__CREWAI_FILE__:incomplete", # Invalid format (missing parts)
]
for tool_result in test_cases:
mock_context = Mock()
mock_context.tool_result = tool_result
assert file_hook.process_file_markers(mock_context) is None
def test_format_file_size(self):
"""Test file size formatting across units."""
cases = [
(500, "500 bytes"),
(1024, "1.0 KB"),
(1536, "1.5 KB"),
(1024 * 1024, "1.0 MB"),
(1024 * 1024 * 1024, "1.0 GB"),
]
for size_bytes, expected in cases:
assert file_hook._format_file_size(size_bytes) == expected

File diff suppressed because it is too large Load Diff

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
# Core Dependencies
"pydantic~=2.11.9",
"openai~=1.83.0",
"openai>=1.83.0,<3",
"instructor>=1.3.3",
# Text Processing
"pdfplumber~=0.11.4",
@@ -78,7 +78,7 @@ voyageai = [
"voyageai~=0.3.5",
]
litellm = [
"litellm~=1.74.9",
"litellm>=1.74.9,<3",
]
bedrock = [
"boto3~=1.40.45",

View File

@@ -930,10 +930,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"name": func_name,
"content": result,
}
if after_hook_context.files:
tool_message["files"] = after_hook_context.files
self.messages.append(tool_message)
# Log the tool execution

View File

@@ -1,10 +1,13 @@
from typing import Any
DEFAULT_CREWAI_ENTERPRISE_URL = "https://app.crewai.com"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER = "workos"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE = "client_01JNJQWBJ4SPFN3SWJM5T7BDG8"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID = "client_01JYT06R59SP0NXYGD994NFXXX"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN = "login.crewai.com"
ENV_VARS = {
ENV_VARS: dict[str, list[dict[str, Any]]] = {
"openai": [
{
"prompt": "Enter your OPENAI API key (press Enter to skip)",
@@ -112,7 +115,7 @@ ENV_VARS = {
}
PROVIDERS = [
PROVIDERS: list[str] = [
"openai",
"anthropic",
"gemini",
@@ -127,7 +130,7 @@ PROVIDERS = [
"sambanova",
]
MODELS = {
MODELS: dict[str, list[str]] = {
"openai": [
"gpt-4",
"gpt-4.1",

View File

@@ -3,6 +3,7 @@ import shutil
import sys
import click
import tomli
from crewai.cli.constants import ENV_VARS, MODELS
from crewai.cli.provider import (
@@ -13,7 +14,31 @@ from crewai.cli.provider import (
from crewai.cli.utils import copy_template, load_env_vars, write_env_file
def create_folder_structure(name, parent_folder=None):
def get_reserved_script_names() -> set[str]:
"""Get reserved script names from pyproject.toml template.
Returns:
Set of reserved script names that would conflict with crew folder names.
"""
package_dir = Path(__file__).parent
template_path = package_dir / "templates" / "crew" / "pyproject.toml"
with open(template_path, "r") as f:
template_content = f.read()
template_content = template_content.replace("{{folder_name}}", "_placeholder_")
template_content = template_content.replace("{{name}}", "placeholder")
template_content = template_content.replace("{{crew_name}}", "Placeholder")
template_data = tomli.loads(template_content)
script_names = set(template_data.get("project", {}).get("scripts", {}).keys())
script_names.discard("_placeholder_")
return script_names
def create_folder_structure(
name: str, parent_folder: str | None = None
) -> tuple[Path, str, str]:
import keyword
import re
@@ -51,6 +76,14 @@ def create_folder_structure(name, parent_folder=None):
f"Project name '{name}' would generate invalid Python module name '{folder_name}'"
)
reserved_names = get_reserved_script_names()
if folder_name in reserved_names:
raise ValueError(
f"Project name '{name}' would generate folder name '{folder_name}' which is reserved. "
f"Reserved names are: {', '.join(sorted(reserved_names))}. "
"Please choose a different name."
)
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
class_name = re.sub(r"[^a-zA-Z0-9_]", "", class_name)
@@ -114,7 +147,9 @@ def create_folder_structure(name, parent_folder=None):
return folder_path, folder_name, class_name
def copy_template_files(folder_path, name, class_name, parent_folder):
def copy_template_files(
folder_path: Path, name: str, class_name: str, parent_folder: str | None
) -> None:
package_dir = Path(__file__).parent
templates_dir = package_dir / "templates" / "crew"
@@ -155,7 +190,12 @@ def copy_template_files(folder_path, name, class_name, parent_folder):
copy_template(src_file, dst_file, name, class_name, folder_path.name)
def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
def create_crew(
name: str,
provider: str | None = None,
skip_provider: bool = False,
parent_folder: str | None = None,
) -> None:
folder_path, folder_name, class_name = create_folder_structure(name, parent_folder)
env_vars = load_env_vars(folder_path)
if not skip_provider:
@@ -189,7 +229,9 @@ def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
if selected_provider is None: # User typed 'q'
click.secho("Exiting...", fg="yellow")
sys.exit(0)
if selected_provider: # Valid selection
if selected_provider and isinstance(
selected_provider, str
): # Valid selection
break
click.secho(
"No provider selected. Please try again or press 'q' to exit.", fg="red"

View File

@@ -1,8 +1,10 @@
from collections import defaultdict
from collections.abc import Sequence
import json
import os
from pathlib import Path
import time
from typing import Any
import certifi
import click
@@ -11,16 +13,15 @@ import requests
from crewai.cli.constants import JSON_URL, MODELS, PROVIDERS
def select_choice(prompt_message, choices):
"""
Presents a list of choices to the user and prompts them to select one.
def select_choice(prompt_message: str, choices: Sequence[str]) -> str | None:
"""Presents a list of choices to the user and prompts them to select one.
Args:
- prompt_message (str): The message to display to the user before presenting the choices.
- choices (list): A list of options to present to the user.
prompt_message: The message to display to the user before presenting the choices.
choices: A list of options to present to the user.
Returns:
- str: The selected choice from the list, or None if the user chooses to quit.
The selected choice from the list, or None if the user chooses to quit.
"""
provider_models = get_provider_data()
@@ -52,16 +53,14 @@ def select_choice(prompt_message, choices):
)
def select_provider(provider_models):
"""
Presents a list of providers to the user and prompts them to select one.
def select_provider(provider_models: dict[str, list[str]]) -> str | None | bool:
"""Presents a list of providers to the user and prompts them to select one.
Args:
- provider_models (dict): A dictionary of provider models.
provider_models: A dictionary of provider models.
Returns:
- str: The selected provider
- None: If user explicitly quits
The selected provider, None if user explicitly quits, or False if no selection.
"""
predefined_providers = [p.lower() for p in PROVIDERS]
all_providers = sorted(set(predefined_providers + list(provider_models.keys())))
@@ -80,16 +79,15 @@ def select_provider(provider_models):
return provider.lower() if provider else False
def select_model(provider, provider_models):
"""
Presents a list of models for a given provider to the user and prompts them to select one.
def select_model(provider: str, provider_models: dict[str, list[str]]) -> str | None:
"""Presents a list of models for a given provider to the user and prompts them to select one.
Args:
- provider (str): The provider for which to select a model.
- provider_models (dict): A dictionary of provider models.
provider: The provider for which to select a model.
provider_models: A dictionary of provider models.
Returns:
- str: The selected model, or None if the operation is aborted or an invalid selection is made.
The selected model, or None if the operation is aborted or an invalid selection is made.
"""
predefined_providers = [p.lower() for p in PROVIDERS]
@@ -107,16 +105,17 @@ def select_model(provider, provider_models):
)
def load_provider_data(cache_file, cache_expiry):
"""
Loads provider data from a cache file if it exists and is not expired. If the cache is expired or corrupted, it fetches the data from the web.
def load_provider_data(cache_file: Path, cache_expiry: int) -> dict[str, Any] | None:
"""Loads provider data from a cache file if it exists and is not expired.
If the cache is expired or corrupted, it fetches the data from the web.
Args:
- cache_file (Path): The path to the cache file.
- cache_expiry (int): The cache expiry time in seconds.
cache_file: The path to the cache file.
cache_expiry: The cache expiry time in seconds.
Returns:
- dict or None: The loaded provider data or None if the operation fails.
The loaded provider data or None if the operation fails.
"""
current_time = time.time()
if (
@@ -137,32 +136,31 @@ def load_provider_data(cache_file, cache_expiry):
return fetch_provider_data(cache_file)
def read_cache_file(cache_file):
"""
Reads and returns the JSON content from a cache file. Returns None if the file contains invalid JSON.
def read_cache_file(cache_file: Path) -> dict[str, Any] | None:
"""Reads and returns the JSON content from a cache file.
Args:
- cache_file (Path): The path to the cache file.
cache_file: The path to the cache file.
Returns:
- dict or None: The JSON content of the cache file or None if the JSON is invalid.
The JSON content of the cache file or None if the JSON is invalid.
"""
try:
with open(cache_file, "r") as f:
return json.load(f)
data: dict[str, Any] = json.load(f)
return data
except json.JSONDecodeError:
return None
def fetch_provider_data(cache_file):
"""
Fetches provider data from a specified URL and caches it to a file.
def fetch_provider_data(cache_file: Path) -> dict[str, Any] | None:
"""Fetches provider data from a specified URL and caches it to a file.
Args:
- cache_file (Path): The path to the cache file.
cache_file: The path to the cache file.
Returns:
- dict or None: The fetched provider data or None if the operation fails.
The fetched provider data or None if the operation fails.
"""
ssl_config = os.environ["SSL_CERT_FILE"] = certifi.where()
@@ -180,36 +178,39 @@ def fetch_provider_data(cache_file):
return None
def download_data(response):
"""
Downloads data from a given HTTP response and returns the JSON content.
def download_data(response: requests.Response) -> dict[str, Any]:
"""Downloads data from a given HTTP response and returns the JSON content.
Args:
- response (requests.Response): The HTTP response object.
response: The HTTP response object.
Returns:
- dict: The JSON content of the response.
The JSON content of the response.
"""
total_size = int(response.headers.get("content-length", 0))
block_size = 8192
data_chunks = []
data_chunks: list[bytes] = []
bar: Any
with click.progressbar(
length=total_size, label="Downloading", show_pos=True
) as progress_bar:
) as bar:
for chunk in response.iter_content(block_size):
if chunk:
data_chunks.append(chunk)
progress_bar.update(len(chunk))
bar.update(len(chunk))
data_content = b"".join(data_chunks)
return json.loads(data_content.decode("utf-8"))
result: dict[str, Any] = json.loads(data_content.decode("utf-8"))
return result
def get_provider_data():
"""
Retrieves provider data from a cache file, filters out models based on provider criteria, and returns a dictionary of providers mapped to their models.
def get_provider_data() -> dict[str, list[str]] | None:
"""Retrieves provider data from a cache file.
Filters out models based on provider criteria, and returns a dictionary of providers
mapped to their models.
Returns:
- dict or None: A dictionary of providers mapped to their models or None if the operation fails.
A dictionary of providers mapped to their models or None if the operation fails.
"""
cache_dir = Path.home() / ".crewai"
cache_dir.mkdir(exist_ok=True)

View File

@@ -1,6 +1,107 @@
"""Version utilities for CrewAI CLI."""
from collections.abc import Mapping
from datetime import datetime, timedelta
from functools import lru_cache
import importlib.metadata
import json
from pathlib import Path
from typing import Any, cast
from urllib import request
from urllib.error import URLError
import appdirs
from packaging.version import InvalidVersion, parse
@lru_cache(maxsize=1)
def _get_cache_file() -> Path:
"""Get the path to the version cache file.
Cached to avoid repeated filesystem operations.
"""
cache_dir = Path(appdirs.user_cache_dir("crewai"))
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir / "version_cache.json"
def get_crewai_version() -> str:
"""Get the version number of CrewAI running the CLI"""
"""Get the version number of CrewAI running the CLI."""
return importlib.metadata.version("crewai")
def _is_cache_valid(cache_data: Mapping[str, Any]) -> bool:
"""Check if the cache is still valid, less than 24 hours old."""
if "timestamp" not in cache_data:
return False
try:
cache_time = datetime.fromisoformat(str(cache_data["timestamp"]))
return datetime.now() - cache_time < timedelta(hours=24)
except (ValueError, TypeError):
return False
def get_latest_version_from_pypi(timeout: int = 2) -> str | None:
"""Get the latest version of CrewAI from PyPI.
Args:
timeout: Request timeout in seconds.
Returns:
Latest version string or None if unable to fetch.
"""
cache_file = _get_cache_file()
if cache_file.exists():
try:
cache_data = json.loads(cache_file.read_text())
if _is_cache_valid(cache_data):
return cast(str | None, cache_data.get("version"))
except (json.JSONDecodeError, OSError):
pass
try:
with request.urlopen(
"https://pypi.org/pypi/crewai/json", timeout=timeout
) as response:
data = json.loads(response.read())
latest_version = cast(str, data["info"]["version"])
cache_data = {
"version": latest_version,
"timestamp": datetime.now().isoformat(),
}
cache_file.write_text(json.dumps(cache_data))
return latest_version
except (URLError, json.JSONDecodeError, KeyError, OSError):
return None
def check_version() -> tuple[str, str | None]:
"""Check current and latest versions.
Returns:
Tuple of (current_version, latest_version).
latest_version is None if unable to fetch from PyPI.
"""
current = get_crewai_version()
latest = get_latest_version_from_pypi()
return current, latest
def is_newer_version_available() -> tuple[bool, str, str | None]:
"""Check if a newer version is available.
Returns:
Tuple of (is_newer, current_version, latest_version).
"""
current, latest = check_version()
if latest is None:
return False, current, None
try:
return parse(latest) > parse(current), current, latest
except (InvalidVersion, TypeError):
return False, current, latest

View File

@@ -10,6 +10,7 @@ class LLMEventBase(BaseEvent):
from_task: Any | None = None
from_agent: Any | None = None
model: str | None = None
call_id: str
def __init__(self, **data: Any) -> None:
if data.get("from_task"):

View File

@@ -1,11 +1,14 @@
import os
import threading
from typing import Any, ClassVar
from typing import Any, ClassVar, cast
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.text import Text
from crewai.cli.version import is_newer_version_available
class ConsoleFormatter:
tool_usage_counts: ClassVar[dict[str, int]] = {}
@@ -35,6 +38,39 @@ class ConsoleFormatter:
padding=(1, 2),
)
def _show_version_update_message_if_needed(self) -> None:
"""Show version update message if a newer version is available.
Only displays when verbose mode is enabled and not running in CI/CD.
"""
if not self.verbose:
return
if os.getenv("CI", "").lower() in ("true", "1"):
return
try:
is_newer, current, latest = is_newer_version_available()
if is_newer and latest:
message = f"""A new version of CrewAI is available!
Current version: {current}
Latest version: {latest}
To update, run: uv sync --upgrade-package crewai"""
panel = Panel(
message,
title="✨ Update Available ✨",
border_style="yellow",
padding=(1, 2),
)
self.console.print(panel)
self.console.print()
except Exception: # noqa: S110
# Silently ignore errors in version check - it's non-critical
pass
def _show_tracing_disabled_message_if_needed(self) -> None:
"""Show tracing disabled message if tracing is not enabled."""
from crewai.events.listeners.tracing.utils import (
@@ -176,9 +212,10 @@ To enable tracing, do any one of these:
if not self.verbose:
return
# Reset the crew completion event for this new crew execution
ConsoleFormatter.crew_completion_printed.clear()
self._show_version_update_message_if_needed()
content = self.create_status_content(
"Crew Execution Started",
crew_name,
@@ -237,6 +274,8 @@ To enable tracing, do any one of these:
def handle_flow_started(self, flow_name: str, flow_id: str) -> None:
"""Show flow started panel."""
self._show_version_update_message_if_needed()
content = Text()
content.append("Flow Started\n", style="blue bold")
content.append("Name: ", style="white")
@@ -885,7 +924,7 @@ To enable tracing, do any one of these:
is_a2a_delegation = False
try:
output_data = json.loads(formatted_answer.output)
output_data = json.loads(cast(str, formatted_answer.output))
if isinstance(output_data, dict):
if output_data.get("is_a2a") is True:
is_a2a_delegation = True

View File

@@ -814,10 +814,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"name": func_name,
"content": result,
}
if after_hook_context.files:
tool_message["files"] = after_hook_context.files
self.state.messages.append(tool_message)
# Log the tool execution

View File

@@ -5,7 +5,6 @@ from typing import TYPE_CHECKING, Any
from crewai.events.event_listener import event_listener
from crewai.hooks.types import AfterToolCallHookType, BeforeToolCallHookType
from crewai.utilities.printer import Printer
from crewai.utilities.types import FileInput
if TYPE_CHECKING:
@@ -35,9 +34,6 @@ class ToolCallHookContext:
crew: Crew instance (may be None)
tool_result: Tool execution result (only set for after_tool_call hooks).
Can be modified by returning a new string from after_tool_call hook.
files: Optional dictionary of files to attach to the tool message.
Can be set by after_tool_call hooks to inject files into the LLM context.
These files will be formatted according to the LLM provider's requirements.
"""
def __init__(
@@ -68,7 +64,6 @@ class ToolCallHookContext:
self.task = task
self.crew = crew
self.tool_result = tool_result
self.files: dict[str, FileInput] | None = None
def request_human_input(
self,

View File

@@ -37,7 +37,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, get_current_call_id, llm_call_context
from crewai.llms.constants import (
ANTHROPIC_MODELS,
AZURE_MODELS,
@@ -770,7 +770,7 @@ class LLM(BaseLLM):
chunk_content = None
response_id = None
if hasattr(chunk,'id'):
if hasattr(chunk, "id"):
response_id = chunk.id
# Safely extract content from various chunk formats
@@ -827,7 +827,7 @@ class LLM(BaseLLM):
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
)
if result is not None:
@@ -849,7 +849,8 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.LLM_CALL,
response_id=response_id
response_id=response_id,
call_id=get_current_call_id(),
),
)
# --- 4) Fallback to non-streaming if no content received
@@ -1015,7 +1016,10 @@ class LLM(BaseLLM):
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
raise Exception(f"Failed to get streaming response: {e!s}") from e
@@ -1048,7 +1052,8 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.TOOL_CALL,
response_id=response_id
response_id=response_id,
call_id=get_current_call_id(),
),
)
@@ -1476,7 +1481,8 @@ class LLM(BaseLLM):
chunk=chunk_content,
from_task=from_task,
from_agent=from_agent,
response_id=response_id
response_id=response_id,
call_id=get_current_call_id(),
),
)
@@ -1619,7 +1625,12 @@ class LLM(BaseLLM):
logging.error(f"Error executing function '{function_name}': {e}")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=f"Tool execution error: {e!s}"),
event=LLMCallFailedEvent(
error=f"Tool execution error: {e!s}",
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
crewai_event_bus.emit(
self,
@@ -1669,108 +1680,117 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
logging.info("Retrying LLM call without the unsupported 'stop'")
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
raise
async def acall(
self,
@@ -1808,43 +1828,54 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
self._validate_call_params()
self._validate_call_params()
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
if self.stream:
return await self._ahandle_streaming_response(
if self.stream:
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._ahandle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
@@ -1852,52 +1883,50 @@ class LLM(BaseLLM):
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
return await self._ahandle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
raise
def _handle_emit_call_events(
self,
@@ -1925,6 +1954,7 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)

View File

@@ -7,11 +7,15 @@ in CrewAI, including common functionality for native SDK implementations.
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from datetime import datetime
import json
import logging
import re
from typing import TYPE_CHECKING, Any, Final
import uuid
from pydantic import BaseModel
@@ -50,6 +54,32 @@ DEFAULT_CONTEXT_WINDOW_SIZE: Final[int] = 4096
DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True
_JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL)
_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_call_id", default=None
)
@contextmanager
def llm_call_context() -> Generator[str, None, None]:
"""Context manager that establishes an LLM call scope with a unique call_id."""
call_id = str(uuid.uuid4())
token = _current_call_id.set(call_id)
try:
yield call_id
finally:
_current_call_id.reset(token)
def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
if call_id is None:
logging.warning(
"LLM event emitted outside call context - generating fallback call_id"
)
return str(uuid.uuid4())
return call_id
class BaseLLM(ABC):
"""Abstract base class for LLM implementations.
@@ -351,6 +381,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -374,6 +405,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -394,6 +426,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -428,6 +461,7 @@ class BaseLLM(ABC):
from_agent=from_agent,
call_type=call_type,
response_id=response_id,
call_id=get_current_call_id(),
),
)

View File

@@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any, Final, Literal, TypeGuard, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -266,35 +266,46 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Anthropic
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
# Format messages for Anthropic
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
@@ -302,21 +313,13 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -342,28 +345,37 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
if self.stream:
return await self._ahandle_streaming_completion(
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
@@ -371,21 +383,13 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_completion_params(
self,

View File

@@ -43,7 +43,7 @@ try:
)
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
except ImportError:
raise ImportError(
@@ -293,32 +293,44 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
effective_response_model = response_model or self.response_format
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
@@ -326,16 +338,8 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
async def acall( # type: ignore[return]
self,
@@ -361,25 +365,35 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages_for_azure(messages)
effective_response_model = response_model or self.response_format
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
formatted_messages = self._format_messages_for_azure(messages)
if self.stream:
return await self._ahandle_streaming_completion(
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
@@ -387,16 +401,8 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
def _prepare_completion_params(
self,

View File

@@ -11,7 +11,7 @@ from pydantic import BaseModel
from typing_extensions import Required
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -378,77 +378,90 @@ class BedrockCompletion(BaseLLM):
"""Call AWS Bedrock Converse API."""
effective_response_model = response_model or self.response_format
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.stream:
return self._handle_streaming_converse(
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return self._handle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_converse(
formatted_messages,
body,
available_functions,
@@ -457,26 +470,17 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
return self._handle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -514,69 +518,80 @@ class BedrockCompletion(BaseLLM):
'Install with: uv add "crewai[bedrock-async]"'
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.stream:
return await self._ahandle_streaming_converse(
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return await self._ahandle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_converse(
formatted_messages,
body,
available_functions,
@@ -585,26 +600,17 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _handle_converse(
self,

View File

@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Literal, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -293,33 +293,45 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
if not self._invoke_before_llm_call_hooks(messages_for_hooks, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
messages_for_hooks, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return self._handle_streaming_completion(
if self.stream:
return self._handle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
formatted_content,
config,
available_functions,
@@ -328,29 +340,20 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
return self._handle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -376,28 +379,38 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
if self.stream:
return await self._ahandle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
formatted_content,
config,
available_functions,
@@ -406,29 +419,20 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
return await self._ahandle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_generation_config(
self,

View File

@@ -17,7 +17,7 @@ from openai.types.responses import Response
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -382,23 +382,35 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if self.api == "responses":
return self._call_responses(
if self.api == "responses":
return self._call_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return self._call_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -407,22 +419,13 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
return self._call_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _call_completions(
self,
@@ -479,20 +482,30 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if self.api == "responses":
return await self._acall_responses(
if self.api == "responses":
return await self._acall_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._acall_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -501,22 +514,13 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
return await self._acall_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def _acall_completions(
self,

View File

@@ -235,8 +235,13 @@ class TestAsyncAgentExecutor:
mock_crew: MagicMock, mock_tools_handler: MagicMock
) -> None:
"""Test that multiple ainvoke calls can run concurrently."""
max_concurrent = 0
current_concurrent = 0
lock = asyncio.Lock()
async def create_and_run_executor(executor_id: int) -> dict[str, Any]:
nonlocal max_concurrent, current_concurrent
executor = CrewAgentExecutor(
llm=mock_llm,
task=mock_task,
@@ -252,7 +257,13 @@ class TestAsyncAgentExecutor:
)
async def delayed_response(*args: Any, **kwargs: Any) -> str:
await asyncio.sleep(0.05)
nonlocal max_concurrent, current_concurrent
async with lock:
current_concurrent += 1
max_concurrent = max(max_concurrent, current_concurrent)
await asyncio.sleep(0.01)
async with lock:
current_concurrent -= 1
return f"Thought: Done\nFinal Answer: Result from executor {executor_id}"
with patch(
@@ -273,19 +284,15 @@ class TestAsyncAgentExecutor:
}
)
import time
start = time.time()
results = await asyncio.gather(
create_and_run_executor(1),
create_and_run_executor(2),
create_and_run_executor(3),
)
elapsed = time.time() - start
assert len(results) == 3
assert all("output" in r for r in results)
assert elapsed < 0.15, f"Expected concurrent execution, took {elapsed}s"
assert max_concurrent > 1, f"Expected concurrent execution, max concurrent was {max_concurrent}"
class TestAsyncLLMResponseHelper:

View File

@@ -0,0 +1,108 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpUSxS5LeHwDTELElWlC5CDMzmr\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437564,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi there! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 10,\n \"total_tokens\": 19,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:05 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '460'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '477'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,215 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpStmyOpe9DrthWBlDdMZfVMJ1u\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 9,\n \"total_tokens\": 18,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:02 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '415'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '434'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"user","content":"Say bye"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '72'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpS1DP0Xd3tmWt5PBincVrdU7yw\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Goodbye! If you have more questions
in the future, feel free to reach out. Have a great day!\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 23,\n \"total_tokens\": 32,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:03 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '964'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '979'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,143 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini","stream":true,"stream_options":{"include_usage":true}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '125'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: 'data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"rVIyGQF2E"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ZGVqV7ZDm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"vnfm7IxlIB"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
How"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"o8F35ZZ"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
can"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"kiBzGe3"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
I"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"cbGT2RWgx"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
assist"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"DtxR"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
you"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"6y6Co8J"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
today"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SZOmm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"s9Bc0HqlPg"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"u9aar"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":9,"completion_tokens":9,"total_tokens":18,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"5hudm8ySqh39"}
data: [DONE]
'
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Mon, 26 Jan 2026 14:26:04 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '260'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '275'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -296,6 +296,23 @@ def test_create_folder_structure_folder_name_validation():
shutil.rmtree(folder_path)
def test_create_folder_structure_rejects_reserved_names():
"""Test that reserved script names are rejected to prevent pyproject.toml conflicts."""
with tempfile.TemporaryDirectory() as temp_dir:
reserved_names = ["test", "train", "replay", "run_crew", "run_with_trigger"]
for reserved_name in reserved_names:
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(reserved_name, parent_folder=temp_dir)
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(f"{reserved_name}/", parent_folder=temp_dir)
capitalized = reserved_name.capitalize()
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(capitalized, parent_folder=temp_dir)
@mock.patch("crewai.cli.create_crew.create_folder_structure")
@mock.patch("crewai.cli.create_crew.copy_template")
@mock.patch("crewai.cli.create_crew.load_env_vars")

View File

@@ -1,10 +1,20 @@
"""Test for version management."""
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai import __version__
from crewai.cli.version import get_crewai_version
from crewai.cli.version import (
_get_cache_file,
_is_cache_valid,
get_crewai_version,
get_latest_version_from_pypi,
is_newer_version_available,
)
def test_dynamic_versioning_consistency():
def test_dynamic_versioning_consistency() -> None:
"""Test that dynamic versioning provides consistent version across all access methods."""
cli_version = get_crewai_version()
package_version = __version__
@@ -15,3 +25,186 @@ def test_dynamic_versioning_consistency():
# Version should not be empty
assert package_version is not None
assert len(package_version.strip()) > 0
class TestVersionChecking:
"""Test version checking utilities."""
def test_get_crewai_version(self) -> None:
"""Test getting current crewai version."""
version = get_crewai_version()
assert isinstance(version, str)
assert len(version) > 0
def test_get_cache_file(self) -> None:
"""Test cache file path generation."""
cache_file = _get_cache_file()
assert isinstance(cache_file, Path)
assert cache_file.name == "version_cache.json"
def test_is_cache_valid_with_fresh_cache(self) -> None:
"""Test cache validation with fresh cache."""
cache_data = {"timestamp": datetime.now().isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is True
def test_is_cache_valid_with_stale_cache(self) -> None:
"""Test cache validation with stale cache."""
old_time = datetime.now() - timedelta(hours=25)
cache_data = {"timestamp": old_time.isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
def test_is_cache_valid_with_missing_timestamp(self) -> None:
"""Test cache validation with missing timestamp."""
cache_data = {"version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
@patch("crewai.cli.version.Path.exists")
@patch("crewai.cli.version.request.urlopen")
def test_get_latest_version_from_pypi_success(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test successful PyPI version fetch."""
# Mock cache not existing to force fetch from PyPI
mock_exists.return_value = False
mock_response = MagicMock()
mock_response.read.return_value = b'{"info": {"version": "2.0.0"}}'
mock_urlopen.return_value.__enter__.return_value = mock_response
version = get_latest_version_from_pypi()
assert version == "2.0.0"
@patch("crewai.cli.version.Path.exists")
@patch("crewai.cli.version.request.urlopen")
def test_get_latest_version_from_pypi_failure(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test PyPI version fetch failure."""
from urllib.error import URLError
# Mock cache not existing to force fetch from PyPI
mock_exists.return_value = False
mock_urlopen.side_effect = URLError("Network error")
version = get_latest_version_from_pypi()
assert version is None
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_true(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when newer version is available."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is True
assert current == "1.0.0"
assert latest == "2.0.0"
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_false(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when no newer version is available."""
mock_current.return_value = "2.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "2.0.0"
assert latest == "2.0.0"
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_with_none_latest(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when PyPI fetch fails."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = None
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "1.0.0"
assert latest is None
class TestConsoleFormatterVersionCheck:
"""Test version check display in ConsoleFormatter."""
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": ""})
def test_version_message_shows_when_update_available_and_verbose(
self, mock_check: MagicMock
) -> None:
"""Test version message shows when update available and verbose enabled."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
assert mock_print.call_count == 2
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
def test_version_message_hides_when_verbose_false(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when verbose disabled."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=False)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
def test_version_message_hides_when_no_update_available(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when no update available."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (False, "2.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": "true"})
def test_version_message_hides_in_ci_environment(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when running in CI/CD."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": "1"})
def test_version_message_hides_in_ci_environment_with_numeric_value(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when CI=1."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()

View File

@@ -217,6 +217,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Hello ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -224,6 +225,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="World!",
call_id="test-call-id",
),
)
return mock_output
@@ -284,6 +286,7 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="",
call_id="test-call-id",
tool_call=ToolCall(
id="call-123",
function=FunctionCall(
@@ -364,6 +367,7 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -371,6 +375,7 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Stream!",
call_id="test-call-id",
),
)
return mock_output
@@ -451,6 +456,7 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Flow ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -458,6 +464,7 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="output!",
call_id="test-call-id",
),
)
return "done"
@@ -545,6 +552,7 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async flow ",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -553,6 +561,7 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="stream!",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -686,6 +695,7 @@ class TestStreamingEdgeCases:
type="llm_stream_chunk",
chunk="Task 1",
task_name="First task",
call_id="test-call-id",
),
)
return mock_output

View File

@@ -249,6 +249,8 @@ def test_guardrail_emits_events(sample_agent):
result = task.execute_sync(agent=sample_agent)
crewai_event_bus.flush(timeout=10.0)
with condition:
success = condition.wait_for(
lambda: len(started_guardrail) >= 2 and len(completed_guardrail) >= 2,
@@ -267,6 +269,8 @@ def test_guardrail_emits_events(sample_agent):
task.execute_sync(agent=sample_agent)
crewai_event_bus.flush(timeout=10.0)
with condition:
success = condition.wait_for(
lambda: len(started_guardrail) >= 3 and len(completed_guardrail) >= 3,

View File

@@ -984,8 +984,8 @@ def test_streaming_fallback_to_non_streaming():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
nonlocal fallback_called
# Emit a couple of chunks to simulate partial streaming
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id = "Id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id = "Id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id="Id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id="Id", call_id="test-call-id"))
# Mark that fallback would be called
fallback_called = True
@@ -1041,7 +1041,7 @@ def test_streaming_empty_response_handling():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
# Emit a few empty chunks
for _ in range(3):
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="",response_id="id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="", response_id="id", call_id="test-call-id"))
# Return the default message for empty responses
return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request."
@@ -1280,6 +1280,105 @@ def test_llm_emits_event_with_lite_agent():
assert set(all_agent_id) == {str(agent.id)}
# ----------- CALL_ID CORRELATION TESTS -----------
@pytest.mark.vcr()
def test_llm_call_events_share_call_id():
"""All events from a single LLM call should share the same call_id."""
import uuid
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
with condition:
success = condition.wait_for(lambda: len(events) >= 2, timeout=10)
assert success, "Timeout waiting for LLM events"
# Behavior: all events from the call share the same call_id
assert len(events) == 2
assert events[0].call_id == events[1].call_id
# call_id should be a valid UUID
uuid.UUID(events[0].call_id)
@pytest.mark.vcr()
def test_streaming_chunks_share_call_id_with_call():
"""Streaming chunks should share call_id with started/completed events."""
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_chunk(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini", stream=True)
llm.call("Say hi")
with condition:
# Wait for at least started, some chunks, and completed
success = condition.wait_for(lambda: len(events) >= 3, timeout=10)
assert success, "Timeout waiting for streaming events"
# Behavior: all events (started, chunks, completed) share the same call_id
call_ids = {e.call_id for e in events}
assert len(call_ids) == 1
@pytest.mark.vcr()
def test_separate_llm_calls_have_different_call_ids():
"""Different LLM calls should have different call_ids."""
call_ids = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
call_ids.append(event.call_id)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
llm.call("Say bye")
with condition:
success = condition.wait_for(lambda: len(call_ids) >= 2, timeout=10)
assert success, "Timeout waiting for LLM call events"
# Behavior: each call has its own call_id
assert len(call_ids) == 2
assert call_ids[0] != call_ids[1]
# ----------- HUMAN FEEDBACK EVENTS -----------

View File

@@ -28,7 +28,7 @@ dev = [
"boto3-stubs[bedrock-runtime]==1.40.54",
"types-psycopg2==2.9.21.20251012",
"types-pymysql==1.1.0.20250916",
"types-aiofiles~=24.1.0",
"types-aiofiles~=25.1.0",
]

12
uv.lock generated
View File

@@ -51,7 +51,7 @@ dev = [
{ name = "pytest-timeout", specifier = "==2.4.0" },
{ name = "pytest-xdist", specifier = "==3.8.0" },
{ name = "ruff", specifier = "==0.14.7" },
{ name = "types-aiofiles", specifier = "~=24.1.0" },
{ name = "types-aiofiles", specifier = "~=25.1.0" },
{ name = "types-appdirs", specifier = "==1.4.*" },
{ name = "types-psycopg2", specifier = "==2.9.21.20251012" },
{ name = "types-pymysql", specifier = "==1.1.0.20250916" },
@@ -1294,10 +1294,10 @@ requires-dist = [
{ name = "json-repair", specifier = "~=0.25.2" },
{ name = "json5", specifier = "~=0.10.0" },
{ name = "jsonref", specifier = "~=1.1.0" },
{ name = "litellm", marker = "extra == 'litellm'", specifier = "~=1.74.9" },
{ name = "litellm", marker = "extra == 'litellm'", specifier = ">=1.74.9,<3" },
{ name = "mcp", specifier = "~=1.23.1" },
{ name = "mem0ai", marker = "extra == 'mem0'", specifier = "~=0.1.94" },
{ name = "openai", specifier = "~=1.83.0" },
{ name = "openai", specifier = ">=1.83.0,<3" },
{ name = "openpyxl", specifier = "~=3.1.5" },
{ name = "openpyxl", marker = "extra == 'openpyxl'", specifier = "~=3.1.5" },
{ name = "opentelemetry-api", specifier = "~=1.34.0" },
@@ -8282,11 +8282,11 @@ wheels = [
[[package]]
name = "types-aiofiles"
version = "24.1.0.20250822"
version = "25.1.0.20251011"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/19/48/c64471adac9206cc844afb33ed311ac5a65d2f59df3d861e0f2d0cad7414/types_aiofiles-24.1.0.20250822.tar.gz", hash = "sha256:9ab90d8e0c307fe97a7cf09338301e3f01a163e39f3b529ace82466355c84a7b", size = 14484, upload-time = "2025-08-22T03:02:23.039Z" }
sdist = { url = "https://files.pythonhosted.org/packages/84/6c/6d23908a8217e36704aa9c79d99a620f2fdd388b66a4b7f72fbc6b6ff6c6/types_aiofiles-25.1.0.20251011.tar.gz", hash = "sha256:1c2b8ab260cb3cd40c15f9d10efdc05a6e1e6b02899304d80dfa0410e028d3ff", size = 14535, upload-time = "2025-10-11T02:44:51.237Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bc/8e/5e6d2215e1d8f7c2a94c6e9d0059ae8109ce0f5681956d11bb0a228cef04/types_aiofiles-24.1.0.20250822-py3-none-any.whl", hash = "sha256:0ec8f8909e1a85a5a79aed0573af7901f53120dd2a29771dd0b3ef48e12328b0", size = 14322, upload-time = "2025-08-22T03:02:21.918Z" },
{ url = "https://files.pythonhosted.org/packages/71/0f/76917bab27e270bb6c32addd5968d69e558e5b6f7fb4ac4cbfa282996a96/types_aiofiles-25.1.0.20251011-py3-none-any.whl", hash = "sha256:8ff8de7f9d42739d8f0dadcceeb781ce27cd8d8c4152d4a7c52f6b20edb8149c", size = 14338, upload-time = "2025-10-11T02:44:50.054Z" },
]
[[package]]