Compare commits

..

2 Commits

Author SHA1 Message Date
Greyson LaLonde
eda4430d6f Merge branch 'main' into gl/fix/hitl-flow-plot 2026-02-02 13:37:12 -05:00
Greyson LaLonde
3f264b4cc8 fix: add human_feedback metadata and visualization 2026-02-02 13:18:22 -05:00
81 changed files with 2175 additions and 20922 deletions

View File

@@ -5,12 +5,7 @@
version: 2
updates:
- package-ecosystem: uv
directory: "/"
- package-ecosystem: uv # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
groups:
security-updates:
applies-to: security-updates
patterns:
- "*"

View File

@@ -1 +0,0 @@
3.13

View File

@@ -2,95 +2,29 @@
from __future__ import annotations
from collections.abc import Callable
import logging
from typing import TYPE_CHECKING, Any
from crewai.tools import BaseTool
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
from crewai.utilities.string_utils import sanitize_tool_name
from pydantic import BaseModel
from crewai_tools.adapters.tool_collection import ToolCollection
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from mcp import StdioServerParameters
from mcp.types import CallToolResult, TextContent, Tool
from mcpadapt.core import MCPAdapt, ToolAdapter
logger = logging.getLogger(__name__)
from mcpadapt.core import MCPAdapt
from mcpadapt.crewai_adapter import CrewAIAdapter
try:
from mcp import StdioServerParameters
from mcp.types import CallToolResult, TextContent, Tool
from mcpadapt.core import MCPAdapt, ToolAdapter
class CrewAIToolAdapter(ToolAdapter):
"""Adapter that creates CrewAI tools with properly normalized JSON schemas.
This adapter bypasses mcpadapt's model creation which adds invalid null values
to field schemas, instead using CrewAI's own schema utilities.
"""
def adapt(
self,
func: Callable[[dict[str, Any] | None], CallToolResult],
mcp_tool: Tool,
) -> BaseTool:
"""Adapt a MCP tool to a CrewAI tool.
Args:
func: The function to call when the tool is invoked.
mcp_tool: The MCP tool definition to adapt.
Returns:
A CrewAI BaseTool instance.
"""
tool_name = sanitize_tool_name(mcp_tool.name)
tool_description = mcp_tool.description or ""
args_model = create_model_from_schema(mcp_tool.inputSchema)
class CrewAIMCPTool(BaseTool):
name: str = tool_name
description: str = tool_description
args_schema: type[BaseModel] = args_model
def _run(self, **kwargs: Any) -> Any:
result = func(kwargs)
if len(result.content) == 1:
first_content = result.content[0]
if isinstance(first_content, TextContent):
return first_content.text
return str(first_content)
return str(
[
content.text
for content in result.content
if isinstance(content, TextContent)
]
)
def _generate_description(self) -> None:
schema = self.args_schema.model_json_schema()
schema.pop("$defs", None)
self.description = (
f"Tool Name: {self.name}\n"
f"Tool Arguments: {schema}\n"
f"Tool Description: {self.description}"
)
return CrewAIMCPTool()
async def async_adapt(self, afunc: Any, mcp_tool: Tool) -> Any:
"""Async adaptation is not supported by CrewAI."""
raise NotImplementedError("async is not supported by the CrewAI framework.")
from mcpadapt.core import MCPAdapt
from mcpadapt.crewai_adapter import CrewAIAdapter
MCP_AVAILABLE = True
except ImportError as e:
logger.debug(f"MCP packages not available: {e}")
except ImportError:
MCP_AVAILABLE = False
@@ -100,6 +34,9 @@ class MCPServerAdapter:
Note: tools can only be accessed after the server has been started with the
`start()` method.
Attributes:
tools: The CrewAI tools available from the MCP server.
Usage:
# context manager + stdio
with MCPServerAdapter(...) as tools:
@@ -152,9 +89,7 @@ class MCPServerAdapter:
super().__init__()
self._adapter = None
self._tools = None
self._tool_names = (
[sanitize_tool_name(name) for name in tool_names] if tool_names else None
)
self._tool_names = list(tool_names) if tool_names else None
if not MCP_AVAILABLE:
import click
@@ -165,7 +100,7 @@ class MCPServerAdapter:
import subprocess
try:
subprocess.run(["uv", "add", "mcp crewai-tools'[mcp]'"], check=True) # noqa: S607
subprocess.run(["uv", "add", "mcp crewai-tools[mcp]"], check=True) # noqa: S607
except subprocess.CalledProcessError as e:
raise ImportError("Failed to install mcp package") from e
@@ -177,7 +112,7 @@ class MCPServerAdapter:
try:
self._serverparams = serverparams
self._adapter = MCPAdapt(
self._serverparams, CrewAIToolAdapter(), connect_timeout
self._serverparams, CrewAIAdapter(), connect_timeout
)
self.start()
@@ -189,13 +124,13 @@ class MCPServerAdapter:
logger.error(f"Error during stop cleanup: {stop_e}")
raise RuntimeError(f"Failed to initialize MCP Adapter: {e}") from e
def start(self) -> None:
def start(self):
"""Start the MCP server and initialize the tools."""
self._tools = self._adapter.__enter__() # type: ignore[union-attr]
self._tools = self._adapter.__enter__()
def stop(self) -> None:
def stop(self):
"""Stop the MCP server."""
self._adapter.__exit__(None, None, None) # type: ignore[union-attr]
self._adapter.__exit__(None, None, None)
@property
def tools(self) -> ToolCollection[BaseTool]:
@@ -217,19 +152,12 @@ class MCPServerAdapter:
return tools_collection.filter_by_names(self._tool_names)
return tools_collection
def __enter__(self) -> ToolCollection[BaseTool]:
"""Enter the context manager.
Note that `__init__()` already starts the MCP server,
so tools should already be available.
def __enter__(self):
"""Enter the context manager. Note that `__init__()` already starts the MCP server.
So tools should already be available.
"""
return self.tools
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: Any,
) -> None:
def __exit__(self, exc_type, exc_value, traceback):
"""Exit the context manager."""
self._adapter.__exit__(exc_type, exc_value, traceback) # type: ignore[union-attr]
return self._adapter.__exit__(exc_type, exc_value, traceback)

View File

@@ -8,9 +8,8 @@ from typing import Any
from crewai.tools.base_tool import BaseTool, EnvVar
from pydantic import BaseModel
from pydantic.fields import FieldInfo
from pydantic.json_schema import GenerateJsonSchema
from pydantic_core import PydanticOmit, PydanticUndefined
from pydantic_core import PydanticOmit
from crewai_tools import tools
@@ -45,9 +44,6 @@ class ToolSpecExtractor:
schema = self._unwrap_schema(core_schema)
fields = schema.get("schema", {}).get("fields", {})
# Use model_fields to get defaults (handles both default and default_factory)
model_fields = tool_class.model_fields
tool_info = {
"name": tool_class.__name__,
"humanized_name": self._extract_field_default(
@@ -58,9 +54,9 @@ class ToolSpecExtractor:
).strip(),
"run_params_schema": self._extract_params(fields.get("args_schema")),
"init_params_schema": self._extract_init_params(tool_class),
"env_vars": self._extract_env_vars_from_model_fields(model_fields),
"package_dependencies": self._extract_package_deps_from_model_fields(
model_fields
"env_vars": self._extract_env_vars(fields.get("env_vars")),
"package_dependencies": self._extract_field_default(
fields.get("package_dependencies"), fallback=[]
),
}
@@ -107,27 +103,10 @@ class ToolSpecExtractor:
return {}
@staticmethod
def _get_field_default(field: FieldInfo | None) -> Any:
"""Get default value from a FieldInfo, handling both default and default_factory."""
if not field:
return None
default_value = field.default
if default_value is PydanticUndefined or default_value is None:
if field.default_factory:
return field.default_factory()
return None
return default_value
@staticmethod
def _extract_env_vars_from_model_fields(
model_fields: dict[str, FieldInfo],
def _extract_env_vars(
env_vars_field: dict[str, Any] | None,
) -> list[dict[str, Any]]:
default_value = ToolSpecExtractor._get_field_default(
model_fields.get("env_vars")
)
if not default_value:
if not env_vars_field:
return []
return [
@@ -137,22 +116,10 @@ class ToolSpecExtractor:
"required": env_var.required,
"default": env_var.default,
}
for env_var in default_value
for env_var in env_vars_field.get("schema", {}).get("default", [])
if isinstance(env_var, EnvVar)
]
@staticmethod
def _extract_package_deps_from_model_fields(
model_fields: dict[str, FieldInfo],
) -> list[str]:
default_value = ToolSpecExtractor._get_field_default(
model_fields.get("package_dependencies")
)
if not isinstance(default_value, list):
return []
return default_value
@staticmethod
def _extract_init_params(tool_class: type[BaseTool]) -> dict[str, Any]:
ignored_init_params = [
@@ -185,7 +152,7 @@ class ToolSpecExtractor:
if __name__ == "__main__":
output_file = Path(__file__).parent.parent.parent / "tool.specs.json"
output_file = Path(__file__).parent / "tool.specs.json"
extractor = ToolSpecExtractor()
extractor.extract_all_tools()

View File

@@ -1,17 +1,12 @@
from datetime import datetime
import json
import os
import time
from typing import Annotated, Any, ClassVar, Literal
from typing import Any, ClassVar
from crewai.tools import BaseTool, EnvVar
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from pydantic.types import StringConstraints
import requests
load_dotenv()
def _save_results_to_file(content: str) -> None:
"""Saves the search results to a file."""
@@ -20,72 +15,37 @@ def _save_results_to_file(content: str) -> None:
file.write(content)
FreshnessPreset = Literal["pd", "pw", "pm", "py"]
FreshnessRange = Annotated[
str, StringConstraints(pattern=r"^\d{4}-\d{2}-\d{2}to\d{4}-\d{2}-\d{2}$")
]
Freshness = FreshnessPreset | FreshnessRange
SafeSearch = Literal["off", "moderate", "strict"]
class BraveSearchToolSchema(BaseModel):
"""Input for BraveSearchTool"""
"""Input for BraveSearchTool."""
query: str = Field(..., description="Search query to perform")
country: str | None = Field(
default=None,
description="Country code for geo-targeting (e.g., 'US', 'BR').",
)
search_language: str | None = Field(
default=None,
description="Language code for the search results (e.g., 'en', 'es').",
)
count: int | None = Field(
default=None,
description="The maximum number of results to return. Actual number may be less.",
)
offset: int | None = Field(
default=None, description="Skip the first N result sets/pages. Max is 9."
)
safesearch: SafeSearch | None = Field(
default=None,
description="Filter out explicit content. Options: off/moderate/strict",
)
spellcheck: bool | None = Field(
default=None,
description="Attempt to correct spelling errors in the search query.",
)
freshness: Freshness | None = Field(
default=None,
description="Enforce freshness of results. Options: pd/pw/pm/py, or YYYY-MM-DDtoYYYY-MM-DD",
)
text_decorations: bool | None = Field(
default=None,
description="Include markup to highlight search terms in the results.",
)
extra_snippets: bool | None = Field(
default=None,
description="Include up to 5 text snippets for each page if possible.",
)
operators: bool | None = Field(
default=None,
description="Whether to apply search operators (e.g., site:example.com).",
search_query: str = Field(
..., description="Mandatory search query you want to use to search the internet"
)
# TODO: Extend support to additional endpoints (e.g., /images, /news, etc.)
class BraveSearchTool(BaseTool):
"""A tool that performs web searches using the Brave Search API."""
"""BraveSearchTool - A tool for performing web searches using the Brave Search API.
name: str = "Brave Search"
This module provides functionality to search the internet using Brave's Search API,
supporting customizable result counts and country-specific searches.
Dependencies:
- requests
- pydantic
- python-dotenv (for API key management)
"""
name: str = "Brave Web Search the internet"
description: str = (
"A tool that performs web searches using the Brave Search API. "
"Results are returned as structured JSON data."
"A tool that can be used to search the internet with a search_query."
)
args_schema: type[BaseModel] = BraveSearchToolSchema
search_url: str = "https://api.search.brave.com/res/v1/web/search"
country: str | None = ""
n_results: int = 10
save_file: bool = False
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
@@ -95,9 +55,6 @@ class BraveSearchTool(BaseTool):
),
]
)
# Rate limiting parameters
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -116,64 +73,19 @@ class BraveSearchTool(BaseTool):
self._min_request_interval - (current_time - self._last_request_time)
)
BraveSearchTool._last_request_time = time.time()
# Construct and send the request
try:
# Maintain both "search_query" and "query" for backwards compatibility
query = kwargs.get("search_query") or kwargs.get("query")
if not query:
raise ValueError("Query is required")
payload = {"q": query}
if country := kwargs.get("country"):
payload["country"] = country
if search_language := kwargs.get("search_language"):
payload["search_language"] = search_language
# Fallback to deprecated n_results parameter if no count is provided
count = kwargs.get("count")
if count is not None:
payload["count"] = count
else:
payload["count"] = self.n_results
# Offset may be 0, so avoid truthiness check
offset = kwargs.get("offset")
if offset is not None:
payload["offset"] = offset
if safesearch := kwargs.get("safesearch"):
payload["safesearch"] = safesearch
search_query = kwargs.get("search_query") or kwargs.get("query")
if not search_query:
raise ValueError("Search query is required")
save_file = kwargs.get("save_file", self.save_file)
if freshness := kwargs.get("freshness"):
payload["freshness"] = freshness
n_results = kwargs.get("n_results", self.n_results)
# Boolean parameters
spellcheck = kwargs.get("spellcheck")
if spellcheck is not None:
payload["spellcheck"] = spellcheck
payload = {"q": search_query, "count": n_results}
text_decorations = kwargs.get("text_decorations")
if text_decorations is not None:
payload["text_decorations"] = text_decorations
if self.country != "":
payload["country"] = self.country
extra_snippets = kwargs.get("extra_snippets")
if extra_snippets is not None:
payload["extra_snippets"] = extra_snippets
operators = kwargs.get("operators")
if operators is not None:
payload["operators"] = operators
# Limit the result types to "web" since there is presently no
# handling of other types like "discussions", "faq", "infobox",
# "news", "videos", or "locations".
payload["result_filter"] = "web"
# Setup Request Headers
headers = {
"X-Subscription-Token": os.environ["BRAVE_API_KEY"],
"Accept": "application/json",
@@ -185,32 +97,25 @@ class BraveSearchTool(BaseTool):
response.raise_for_status() # Handle non-200 responses
results = response.json()
# TODO: Handle other result types like "discussions", "faq", etc.
web_results_items = []
if "web" in results:
web_results = results["web"]["results"]
for result in web_results:
url = result.get("url")
title = result.get("title")
# If, for whatever reason, this entry does not have a title
# or url, skip it.
if not url or not title:
results = results["web"]["results"]
string = []
for result in results:
try:
string.append(
"\n".join(
[
f"Title: {result['title']}",
f"Link: {result['url']}",
f"Snippet: {result['description']}",
"---",
]
)
)
except KeyError: # noqa: PERF203
continue
item = {
"url": url,
"title": title,
}
description = result.get("description")
if description:
item["description"] = description
snippets = result.get("extra_snippets")
if snippets:
item["snippets"] = snippets
web_results_items.append(item)
content = json.dumps(web_results_items)
content = "\n".join(string)
except requests.RequestException as e:
return f"Error performing search: {e!s}"
except KeyError as e:

View File

@@ -4,7 +4,7 @@ import os
import re
from typing import Any
from crewai.tools import BaseTool, EnvVar
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
@@ -137,21 +137,7 @@ class StagehandTool(BaseTool):
- 'observe': For finding elements in a specific area
"""
args_schema: type[BaseModel] = StagehandToolSchema
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand<=0.5.9"])
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="BROWSERBASE_API_KEY",
description="API key for Browserbase services",
required=False,
),
EnvVar(
name="BROWSERBASE_PROJECT_ID",
description="Project ID for Browserbase services",
required=False,
),
]
)
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand"])
# Stagehand configuration
api_key: str | None = None

View File

@@ -23,26 +23,23 @@ class MockTool(BaseTool):
)
my_parameter: str = Field("This is default value", description="What a description")
my_parameter_bool: bool = Field(False)
# Use default_factory like real tools do (not direct default)
package_dependencies: list[str] = Field(
default_factory=lambda: ["this-is-a-required-package", "another-required-package"]
)
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="SERPER_API_KEY",
description="API key for Serper",
required=True,
default=None,
),
EnvVar(
name="API_RATE_LIMIT",
description="API rate limit",
required=False,
default="100",
),
]
["this-is-a-required-package", "another-required-package"], description=""
)
env_vars: list[EnvVar] = [
EnvVar(
name="SERPER_API_KEY",
description="API key for Serper",
required=True,
default=None,
),
EnvVar(
name="API_RATE_LIMIT",
description="API rate limit",
required=False,
default="100",
),
]
@pytest.fixture

View File

@@ -1,9 +1,7 @@
import json
from unittest.mock import patch
import pytest
from crewai_tools.tools.brave_search_tool.brave_search_tool import BraveSearchTool
import pytest
@pytest.fixture
@@ -32,43 +30,16 @@ def test_brave_tool_search(mock_get, brave_tool):
}
mock_get.return_value.json.return_value = mock_response
result = brave_tool.run(query="test")
result = brave_tool.run(search_query="test")
assert "Test Title" in result
assert "http://test.com" in result
@patch("requests.get")
def test_brave_tool(mock_get):
mock_response = {
"web": {
"results": [
{
"title": "Brave Browser",
"url": "https://brave.com",
"description": "Brave Browser description",
}
]
}
}
mock_get.return_value.json.return_value = mock_response
tool = BraveSearchTool(n_results=2)
result = tool.run(query="Brave Browser")
assert result is not None
# Parse JSON so we can examine the structure
data = json.loads(result)
assert isinstance(data, list)
assert len(data) >= 1
# First item should have expected fields: title, url, and description
first = data[0]
assert "title" in first
assert first["title"] == "Brave Browser"
assert "url" in first
assert first["url"] == "https://brave.com"
assert "description" in first
assert first["description"] == "Brave Browser description"
def test_brave_tool():
tool = BraveSearchTool(
n_results=2,
)
tool.run(search_query="ChatGPT")
if __name__ == "__main__":

File diff suppressed because it is too large Load Diff

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
# Core Dependencies
"pydantic~=2.11.9",
"openai>=1.83.0,<3",
"openai~=1.83.0",
"instructor>=1.3.3",
# Text Processing
"pdfplumber~=0.11.4",
@@ -78,7 +78,7 @@ voyageai = [
"voyageai~=0.3.5",
]
litellm = [
"litellm>=1.74.9,<3",
"litellm~=1.74.9",
]
bedrock = [
"boto3~=1.40.45",

View File

@@ -118,8 +118,6 @@ MCP_TOOL_EXECUTION_TIMEOUT: Final[int] = 30
MCP_DISCOVERY_TIMEOUT: Final[int] = 15
MCP_MAX_RETRIES: Final[int] = 3
_passthrough_exceptions: tuple[type[Exception], ...] = ()
# Simple in-memory cache for MCP tool schemas (duration: 5 minutes)
_mcp_schema_cache: dict[str, Any] = {}
_cache_ttl: Final[int] = 300 # 5 minutes
@@ -481,8 +479,6 @@ class Agent(BaseAgent):
),
)
raise e
if isinstance(e, _passthrough_exceptions):
raise
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(
@@ -715,8 +711,6 @@ class Agent(BaseAgent):
),
)
raise e
if isinstance(e, _passthrough_exceptions):
raise
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
crewai_event_bus.emit(

View File

@@ -37,10 +37,9 @@ class BaseAgentAdapter(BaseAgent, ABC):
tools: Optional list of BaseTool instances to be configured
"""
@abstractmethod
def configure_structured_output(self, task: Any) -> None:
def configure_structured_output(self, structured_output: Any) -> None:
"""Configure the structured output for the specific agent implementation.
Args:
task: The task object containing output format specifications.
structured_output: The structured output to be configured
"""

View File

@@ -16,7 +16,6 @@ from crewai.agents.agent_adapters.openai_agents.protocols import (
)
from crewai.tools import BaseTool
from crewai.utilities.import_utils import require
from crewai.utilities.pydantic_schema_utils import force_additional_properties_false
from crewai.utilities.string_utils import sanitize_tool_name
@@ -136,9 +135,7 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
for tool in tools:
schema: dict[str, Any] = tool.args_schema.model_json_schema()
schema = force_additional_properties_false(schema)
schema.update({"type": "object"})
schema.update({"additionalProperties": False, "type": "object"})
openai_tool: OpenAIFunctionTool = cast(
OpenAIFunctionTool,

View File

@@ -4,6 +4,7 @@ import time
from typing import TYPE_CHECKING
from crewai.agents.parser import AgentFinish
from crewai.events.event_listener import event_listener
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.utilities.converter import ConverterError
@@ -137,3 +138,52 @@ class CrewAgentExecutorMixin:
content="Long term memory is enabled, but entity memory is not enabled. Please configure entity memory or set memory=True to automatically enable it.",
color="bold_yellow",
)
def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input with mode-appropriate messaging.
Note: The final answer is already displayed via the AgentLogsExecutionEvent
panel, so we only show the feedback prompt here.
"""
from rich.panel import Panel
from rich.text import Text
formatter = event_listener.formatter
formatter.pause_live_updates()
try:
# Training mode prompt (single iteration)
if self.crew and getattr(self.crew, "_train", False):
prompt_text = (
"TRAINING MODE: Provide feedback to improve the agent's performance.\n\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process."
)
title = "🎓 Training Feedback Required"
# Regular human-in-the-loop prompt (multiple iterations)
else:
prompt_text = (
"Provide feedback on the Final Result above.\n\n"
"• If you are happy with the result, simply hit Enter without typing anything.\n"
"• Otherwise, provide specific improvement requests.\n"
"• You can provide multiple rounds of feedback until satisfied."
)
title = "💬 Human Feedback Required"
content = Text()
content.append(prompt_text, style="yellow")
prompt_panel = Panel(
content,
title=title,
border_style="yellow",
padding=(1, 2),
)
formatter.console.print(prompt_panel)
response = input()
if response.strip() != "":
formatter.console.print("\n[cyan]Processing your feedback...[/cyan]")
return response
finally:
formatter.resume_live_updates()

View File

@@ -19,7 +19,6 @@ from crewai.agents.parser import (
AgentFinish,
OutputParserError,
)
from crewai.core.providers.human_input import ExecutorContext, get_provider
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
@@ -176,16 +175,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"""
return self.llm.supports_stop_words() if self.llm else False
def _setup_messages(self, inputs: dict[str, Any]) -> None:
"""Set up messages for the agent execution.
def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute the agent with given inputs.
Args:
inputs: Input dictionary containing prompt variables.
"""
provider = get_provider()
if provider.setup_messages(cast(ExecutorContext, cast(object, self))):
return
Returns:
Dictionary with agent output.
"""
if "system" in self.prompt:
system_prompt = self._format_prompt(
cast(str, self.prompt.get("system", "")), inputs
@@ -199,19 +197,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(format_message_for_llm(user_prompt))
provider.post_setup_messages(cast(ExecutorContext, cast(object, self)))
def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute the agent with given inputs.
Args:
inputs: Input dictionary containing prompt variables.
Returns:
Dictionary with agent output.
"""
self._setup_messages(inputs)
self._inject_multimodal_files(inputs)
self._show_start_logs()
@@ -814,7 +799,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
agent_key=agent_key,
),
)
error_event_emitted = False
track_delegation_if_needed(func_name, args_dict, self.task)
@@ -897,7 +881,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
error=e,
),
)
error_event_emitted = True
elif max_usage_reached and original_tool:
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
@@ -925,20 +908,20 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
color="red",
)
if not error_event_emitted:
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
@@ -987,7 +970,18 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
Dictionary with agent output.
"""
self._setup_messages(inputs)
if "system" in self.prompt:
system_prompt = self._format_prompt(
cast(str, self.prompt.get("system", "")), inputs
)
user_prompt = self._format_prompt(
cast(str, self.prompt.get("user", "")), inputs
)
self.messages.append(format_message_for_llm(system_prompt, role="system"))
self.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
self.messages.append(format_message_for_llm(user_prompt))
await self._ainject_multimodal_files(inputs)
@@ -1497,7 +1491,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return prompt.replace("{tools}", inputs["tools"])
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Process human feedback via the configured provider.
"""Process human feedback.
Args:
formatted_answer: Initial agent result.
@@ -1505,8 +1499,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
Final answer after feedback.
"""
provider = get_provider()
return provider.handle_feedback(formatted_answer, self)
output_str = (
formatted_answer.output
if isinstance(formatted_answer.output, str)
else formatted_answer.output.model_dump_json()
)
human_feedback = self._ask_human_input(output_str)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)
return self._handle_regular_feedback(formatted_answer, human_feedback)
def _is_training_mode(self) -> bool:
"""Check if training mode is active.
@@ -1516,18 +1519,74 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"""
return bool(self.crew and self.crew._train)
def _format_feedback_message(self, feedback: str) -> LLMMessage:
"""Format feedback as a message for the LLM.
def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish:
"""Process training feedback.
Args:
feedback: User feedback string.
initial_answer: Initial agent output.
feedback: Training feedback.
Returns:
Formatted message dict.
Improved answer.
"""
return format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
self._handle_crew_training_output(initial_answer, feedback)
self.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
improved_answer = self._invoke_loop()
self._handle_crew_training_output(improved_answer)
self.ask_for_human_input = False
return improved_answer
def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish:
"""Process regular feedback iteratively.
Args:
current_answer: Current agent output.
initial_feedback: Initial user feedback.
Returns:
Final answer after iterations.
"""
feedback = initial_feedback
answer = current_answer
while self.ask_for_human_input:
# If the user provides a blank response, assume they are happy with the result
if feedback.strip() == "":
self.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
output_str = (
answer.output
if isinstance(answer.output, str)
else answer.output.model_dump_json()
)
feedback = self._ask_human_input(output_str)
return answer
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process single feedback iteration.
Args:
feedback: User feedback.
Returns:
Updated agent response.
"""
self.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
return self._invoke_loop()
@classmethod
def __get_pydantic_core_schema__(

View File

@@ -1,13 +1,10 @@
from typing import Any
DEFAULT_CREWAI_ENTERPRISE_URL = "https://app.crewai.com"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER = "workos"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE = "client_01JNJQWBJ4SPFN3SWJM5T7BDG8"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID = "client_01JYT06R59SP0NXYGD994NFXXX"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN = "login.crewai.com"
ENV_VARS: dict[str, list[dict[str, Any]]] = {
ENV_VARS = {
"openai": [
{
"prompt": "Enter your OPENAI API key (press Enter to skip)",
@@ -115,7 +112,7 @@ ENV_VARS: dict[str, list[dict[str, Any]]] = {
}
PROVIDERS: list[str] = [
PROVIDERS = [
"openai",
"anthropic",
"gemini",
@@ -130,7 +127,7 @@ PROVIDERS: list[str] = [
"sambanova",
]
MODELS: dict[str, list[str]] = {
MODELS = {
"openai": [
"gpt-4",
"gpt-4.1",

View File

@@ -3,7 +3,6 @@ import shutil
import sys
import click
import tomli
from crewai.cli.constants import ENV_VARS, MODELS
from crewai.cli.provider import (
@@ -14,31 +13,7 @@ from crewai.cli.provider import (
from crewai.cli.utils import copy_template, load_env_vars, write_env_file
def get_reserved_script_names() -> set[str]:
"""Get reserved script names from pyproject.toml template.
Returns:
Set of reserved script names that would conflict with crew folder names.
"""
package_dir = Path(__file__).parent
template_path = package_dir / "templates" / "crew" / "pyproject.toml"
with open(template_path, "r") as f:
template_content = f.read()
template_content = template_content.replace("{{folder_name}}", "_placeholder_")
template_content = template_content.replace("{{name}}", "placeholder")
template_content = template_content.replace("{{crew_name}}", "Placeholder")
template_data = tomli.loads(template_content)
script_names = set(template_data.get("project", {}).get("scripts", {}).keys())
script_names.discard("_placeholder_")
return script_names
def create_folder_structure(
name: str, parent_folder: str | None = None
) -> tuple[Path, str, str]:
def create_folder_structure(name, parent_folder=None):
import keyword
import re
@@ -76,14 +51,6 @@ def create_folder_structure(
f"Project name '{name}' would generate invalid Python module name '{folder_name}'"
)
reserved_names = get_reserved_script_names()
if folder_name in reserved_names:
raise ValueError(
f"Project name '{name}' would generate folder name '{folder_name}' which is reserved. "
f"Reserved names are: {', '.join(sorted(reserved_names))}. "
"Please choose a different name."
)
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
class_name = re.sub(r"[^a-zA-Z0-9_]", "", class_name)
@@ -147,9 +114,7 @@ def create_folder_structure(
return folder_path, folder_name, class_name
def copy_template_files(
folder_path: Path, name: str, class_name: str, parent_folder: str | None
) -> None:
def copy_template_files(folder_path, name, class_name, parent_folder):
package_dir = Path(__file__).parent
templates_dir = package_dir / "templates" / "crew"
@@ -190,12 +155,7 @@ def copy_template_files(
copy_template(src_file, dst_file, name, class_name, folder_path.name)
def create_crew(
name: str,
provider: str | None = None,
skip_provider: bool = False,
parent_folder: str | None = None,
) -> None:
def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
folder_path, folder_name, class_name = create_folder_structure(name, parent_folder)
env_vars = load_env_vars(folder_path)
if not skip_provider:
@@ -229,9 +189,7 @@ def create_crew(
if selected_provider is None: # User typed 'q'
click.secho("Exiting...", fg="yellow")
sys.exit(0)
if selected_provider and isinstance(
selected_provider, str
): # Valid selection
if selected_provider: # Valid selection
break
click.secho(
"No provider selected. Please try again or press 'q' to exit.", fg="red"

View File

@@ -1,10 +1,8 @@
from collections import defaultdict
from collections.abc import Sequence
import json
import os
from pathlib import Path
import time
from typing import Any
import certifi
import click
@@ -13,15 +11,16 @@ import requests
from crewai.cli.constants import JSON_URL, MODELS, PROVIDERS
def select_choice(prompt_message: str, choices: Sequence[str]) -> str | None:
"""Presents a list of choices to the user and prompts them to select one.
def select_choice(prompt_message, choices):
"""
Presents a list of choices to the user and prompts them to select one.
Args:
prompt_message: The message to display to the user before presenting the choices.
choices: A list of options to present to the user.
- prompt_message (str): The message to display to the user before presenting the choices.
- choices (list): A list of options to present to the user.
Returns:
The selected choice from the list, or None if the user chooses to quit.
- str: The selected choice from the list, or None if the user chooses to quit.
"""
provider_models = get_provider_data()
@@ -53,14 +52,16 @@ def select_choice(prompt_message: str, choices: Sequence[str]) -> str | None:
)
def select_provider(provider_models: dict[str, list[str]]) -> str | None | bool:
"""Presents a list of providers to the user and prompts them to select one.
def select_provider(provider_models):
"""
Presents a list of providers to the user and prompts them to select one.
Args:
provider_models: A dictionary of provider models.
- provider_models (dict): A dictionary of provider models.
Returns:
The selected provider, None if user explicitly quits, or False if no selection.
- str: The selected provider
- None: If user explicitly quits
"""
predefined_providers = [p.lower() for p in PROVIDERS]
all_providers = sorted(set(predefined_providers + list(provider_models.keys())))
@@ -79,15 +80,16 @@ def select_provider(provider_models: dict[str, list[str]]) -> str | None | bool:
return provider.lower() if provider else False
def select_model(provider: str, provider_models: dict[str, list[str]]) -> str | None:
"""Presents a list of models for a given provider to the user and prompts them to select one.
def select_model(provider, provider_models):
"""
Presents a list of models for a given provider to the user and prompts them to select one.
Args:
provider: The provider for which to select a model.
provider_models: A dictionary of provider models.
- provider (str): The provider for which to select a model.
- provider_models (dict): A dictionary of provider models.
Returns:
The selected model, or None if the operation is aborted or an invalid selection is made.
- str: The selected model, or None if the operation is aborted or an invalid selection is made.
"""
predefined_providers = [p.lower() for p in PROVIDERS]
@@ -105,17 +107,16 @@ def select_model(provider: str, provider_models: dict[str, list[str]]) -> str |
)
def load_provider_data(cache_file: Path, cache_expiry: int) -> dict[str, Any] | None:
"""Loads provider data from a cache file if it exists and is not expired.
If the cache is expired or corrupted, it fetches the data from the web.
def load_provider_data(cache_file, cache_expiry):
"""
Loads provider data from a cache file if it exists and is not expired. If the cache is expired or corrupted, it fetches the data from the web.
Args:
cache_file: The path to the cache file.
cache_expiry: The cache expiry time in seconds.
- cache_file (Path): The path to the cache file.
- cache_expiry (int): The cache expiry time in seconds.
Returns:
The loaded provider data or None if the operation fails.
- dict or None: The loaded provider data or None if the operation fails.
"""
current_time = time.time()
if (
@@ -136,31 +137,32 @@ def load_provider_data(cache_file: Path, cache_expiry: int) -> dict[str, Any] |
return fetch_provider_data(cache_file)
def read_cache_file(cache_file: Path) -> dict[str, Any] | None:
"""Reads and returns the JSON content from a cache file.
def read_cache_file(cache_file):
"""
Reads and returns the JSON content from a cache file. Returns None if the file contains invalid JSON.
Args:
cache_file: The path to the cache file.
- cache_file (Path): The path to the cache file.
Returns:
The JSON content of the cache file or None if the JSON is invalid.
- dict or None: The JSON content of the cache file or None if the JSON is invalid.
"""
try:
with open(cache_file, "r") as f:
data: dict[str, Any] = json.load(f)
return data
return json.load(f)
except json.JSONDecodeError:
return None
def fetch_provider_data(cache_file: Path) -> dict[str, Any] | None:
"""Fetches provider data from a specified URL and caches it to a file.
def fetch_provider_data(cache_file):
"""
Fetches provider data from a specified URL and caches it to a file.
Args:
cache_file: The path to the cache file.
- cache_file (Path): The path to the cache file.
Returns:
The fetched provider data or None if the operation fails.
- dict or None: The fetched provider data or None if the operation fails.
"""
ssl_config = os.environ["SSL_CERT_FILE"] = certifi.where()
@@ -178,39 +180,36 @@ def fetch_provider_data(cache_file: Path) -> dict[str, Any] | None:
return None
def download_data(response: requests.Response) -> dict[str, Any]:
"""Downloads data from a given HTTP response and returns the JSON content.
def download_data(response):
"""
Downloads data from a given HTTP response and returns the JSON content.
Args:
response: The HTTP response object.
- response (requests.Response): The HTTP response object.
Returns:
The JSON content of the response.
- dict: The JSON content of the response.
"""
total_size = int(response.headers.get("content-length", 0))
block_size = 8192
data_chunks: list[bytes] = []
bar: Any
data_chunks = []
with click.progressbar(
length=total_size, label="Downloading", show_pos=True
) as bar:
) as progress_bar:
for chunk in response.iter_content(block_size):
if chunk:
data_chunks.append(chunk)
bar.update(len(chunk))
progress_bar.update(len(chunk))
data_content = b"".join(data_chunks)
result: dict[str, Any] = json.loads(data_content.decode("utf-8"))
return result
return json.loads(data_content.decode("utf-8"))
def get_provider_data() -> dict[str, list[str]] | None:
"""Retrieves provider data from a cache file.
Filters out models based on provider criteria, and returns a dictionary of providers
mapped to their models.
def get_provider_data():
"""
Retrieves provider data from a cache file, filters out models based on provider criteria, and returns a dictionary of providers mapped to their models.
Returns:
A dictionary of providers mapped to their models or None if the operation fails.
- dict or None: A dictionary of providers mapped to their models or None if the operation fails.
"""
cache_dir = Path.home() / ".crewai"
cache_dir.mkdir(exist_ok=True)

View File

@@ -1,107 +1,6 @@
"""Version utilities for CrewAI CLI."""
from collections.abc import Mapping
from datetime import datetime, timedelta
from functools import lru_cache
import importlib.metadata
import json
from pathlib import Path
from typing import Any, cast
from urllib import request
from urllib.error import URLError
import appdirs
from packaging.version import InvalidVersion, parse
@lru_cache(maxsize=1)
def _get_cache_file() -> Path:
"""Get the path to the version cache file.
Cached to avoid repeated filesystem operations.
"""
cache_dir = Path(appdirs.user_cache_dir("crewai"))
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir / "version_cache.json"
def get_crewai_version() -> str:
"""Get the version number of CrewAI running the CLI."""
"""Get the version number of CrewAI running the CLI"""
return importlib.metadata.version("crewai")
def _is_cache_valid(cache_data: Mapping[str, Any]) -> bool:
"""Check if the cache is still valid, less than 24 hours old."""
if "timestamp" not in cache_data:
return False
try:
cache_time = datetime.fromisoformat(str(cache_data["timestamp"]))
return datetime.now() - cache_time < timedelta(hours=24)
except (ValueError, TypeError):
return False
def get_latest_version_from_pypi(timeout: int = 2) -> str | None:
"""Get the latest version of CrewAI from PyPI.
Args:
timeout: Request timeout in seconds.
Returns:
Latest version string or None if unable to fetch.
"""
cache_file = _get_cache_file()
if cache_file.exists():
try:
cache_data = json.loads(cache_file.read_text())
if _is_cache_valid(cache_data):
return cast(str | None, cache_data.get("version"))
except (json.JSONDecodeError, OSError):
pass
try:
with request.urlopen(
"https://pypi.org/pypi/crewai/json", timeout=timeout
) as response:
data = json.loads(response.read())
latest_version = cast(str, data["info"]["version"])
cache_data = {
"version": latest_version,
"timestamp": datetime.now().isoformat(),
}
cache_file.write_text(json.dumps(cache_data))
return latest_version
except (URLError, json.JSONDecodeError, KeyError, OSError):
return None
def check_version() -> tuple[str, str | None]:
"""Check current and latest versions.
Returns:
Tuple of (current_version, latest_version).
latest_version is None if unable to fetch from PyPI.
"""
current = get_crewai_version()
latest = get_latest_version_from_pypi()
return current, latest
def is_newer_version_available() -> tuple[bool, str, str | None]:
"""Check if a newer version is available.
Returns:
Tuple of (is_newer, current_version, latest_version).
"""
current, latest = check_version()
if latest is None:
return False, current, None
try:
return parse(latest) > parse(current), current, latest
except (InvalidVersion, TypeError):
return False, current, latest

View File

@@ -1 +0,0 @@
"""Core crewAI components and interfaces."""

View File

@@ -1 +0,0 @@
"""Provider interfaces for extensible crewAI components."""

View File

@@ -1,78 +0,0 @@
"""Content processor provider for extensible content processing."""
from contextvars import ContextVar
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class ContentProcessorProvider(Protocol):
"""Protocol for content processing during task execution."""
def process(self, content: str, context: dict[str, Any] | None = None) -> str:
"""Process content before use.
Args:
content: The content to process.
context: Optional context information.
Returns:
The processed content.
"""
...
class NoOpContentProcessor:
"""Default processor that returns content unchanged."""
def process(self, content: str, context: dict[str, Any] | None = None) -> str:
"""Return content unchanged.
Args:
content: The content to process.
context: Optional context information (unused).
Returns:
The original content unchanged.
"""
return content
_content_processor: ContextVar[ContentProcessorProvider | None] = ContextVar(
"_content_processor", default=None
)
_default_processor = NoOpContentProcessor()
def get_processor() -> ContentProcessorProvider:
"""Get the current content processor.
Returns:
The registered content processor or the default no-op processor.
"""
processor = _content_processor.get()
if processor is not None:
return processor
return _default_processor
def set_processor(processor: ContentProcessorProvider) -> None:
"""Set the content processor for the current context.
Args:
processor: The content processor to use.
"""
_content_processor.set(processor)
def process_content(content: str, context: dict[str, Any] | None = None) -> str:
"""Process content using the registered processor.
Args:
content: The content to process.
context: Optional context information.
Returns:
The processed content.
"""
return get_processor().process(content, context)

View File

@@ -1,304 +0,0 @@
"""Human input provider for HITL (Human-in-the-Loop) flows."""
from __future__ import annotations
from contextvars import ContextVar, Token
from typing import TYPE_CHECKING, Protocol, runtime_checkable
if TYPE_CHECKING:
from crewai.agent.core import Agent
from crewai.agents.parser import AgentFinish
from crewai.crew import Crew
from crewai.llms.base_llm import BaseLLM
from crewai.task import Task
from crewai.utilities.types import LLMMessage
class ExecutorContext(Protocol):
"""Context interface for human input providers to interact with executor."""
task: Task | None
crew: Crew | None
messages: list[LLMMessage]
ask_for_human_input: bool
llm: BaseLLM
agent: Agent
def _invoke_loop(self) -> AgentFinish:
"""Invoke the agent loop and return the result."""
...
def _is_training_mode(self) -> bool:
"""Check if training mode is active."""
...
def _handle_crew_training_output(
self,
result: AgentFinish,
human_feedback: str | None = None,
) -> None:
"""Handle training output."""
...
def _format_feedback_message(self, feedback: str) -> LLMMessage:
"""Format feedback as a message."""
...
@runtime_checkable
class HumanInputProvider(Protocol):
"""Protocol for human input handling.
Implementations handle the full feedback flow:
- Sync: prompt user, loop until satisfied
- Async: raise exception for external handling
"""
def setup_messages(self, context: ExecutorContext) -> bool:
"""Set up messages for execution.
Called before standard message setup. Allows providers to handle
conversation resumption or other custom message initialization.
Args:
context: Executor context with messages list to modify.
Returns:
True if messages were set up (skip standard setup),
False to use standard setup.
"""
...
def post_setup_messages(self, context: ExecutorContext) -> None:
"""Called after standard message setup.
Allows providers to modify messages after standard setup completes.
Only called when setup_messages returned False.
Args:
context: Executor context with messages list to modify.
"""
...
def handle_feedback(
self,
formatted_answer: AgentFinish,
context: ExecutorContext,
) -> AgentFinish:
"""Handle the full human feedback flow.
Args:
formatted_answer: The agent's current answer.
context: Executor context for callbacks.
Returns:
The final answer after feedback processing.
Raises:
Exception: Async implementations may raise to signal external handling.
"""
...
@staticmethod
def _get_output_string(answer: AgentFinish) -> str:
"""Extract output string from answer.
Args:
answer: The agent's finished answer.
Returns:
String representation of the output.
"""
if isinstance(answer.output, str):
return answer.output
return answer.output.model_dump_json()
class SyncHumanInputProvider(HumanInputProvider):
"""Default synchronous human input via terminal."""
def setup_messages(self, context: ExecutorContext) -> bool:
"""Use standard message setup.
Args:
context: Executor context (unused).
Returns:
False to use standard setup.
"""
return False
def post_setup_messages(self, context: ExecutorContext) -> None:
"""No-op for sync provider.
Args:
context: Executor context (unused).
"""
def handle_feedback(
self,
formatted_answer: AgentFinish,
context: ExecutorContext,
) -> AgentFinish:
"""Handle feedback synchronously with terminal prompts.
Args:
formatted_answer: The agent's current answer.
context: Executor context for callbacks.
Returns:
The final answer after feedback processing.
"""
feedback = self._prompt_input(context.crew)
if context._is_training_mode():
return self._handle_training_feedback(formatted_answer, feedback, context)
return self._handle_regular_feedback(formatted_answer, feedback, context)
@staticmethod
def _handle_training_feedback(
initial_answer: AgentFinish,
feedback: str,
context: ExecutorContext,
) -> AgentFinish:
"""Process training feedback (single iteration).
Args:
initial_answer: The agent's initial answer.
feedback: Human feedback string.
context: Executor context for callbacks.
Returns:
Improved answer after processing feedback.
"""
context._handle_crew_training_output(initial_answer, feedback)
context.messages.append(context._format_feedback_message(feedback))
improved_answer = context._invoke_loop()
context._handle_crew_training_output(improved_answer)
context.ask_for_human_input = False
return improved_answer
def _handle_regular_feedback(
self,
current_answer: AgentFinish,
initial_feedback: str,
context: ExecutorContext,
) -> AgentFinish:
"""Process regular feedback with iteration loop.
Args:
current_answer: The agent's current answer.
initial_feedback: Initial human feedback string.
context: Executor context for callbacks.
Returns:
Final answer after all feedback iterations.
"""
feedback = initial_feedback
answer = current_answer
while context.ask_for_human_input:
if feedback.strip() == "":
context.ask_for_human_input = False
else:
context.messages.append(context._format_feedback_message(feedback))
answer = context._invoke_loop()
feedback = self._prompt_input(context.crew)
return answer
@staticmethod
def _prompt_input(crew: Crew | None) -> str:
"""Show rich panel and prompt for input.
Args:
crew: The crew instance for context.
Returns:
User input string from terminal.
"""
from rich.panel import Panel
from rich.text import Text
from crewai.events.event_listener import event_listener
formatter = event_listener.formatter
formatter.pause_live_updates()
try:
if crew and getattr(crew, "_train", False):
prompt_text = (
"TRAINING MODE: Provide feedback to improve the agent's performance.\n\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process."
)
title = "🎓 Training Feedback Required"
else:
prompt_text = (
"Provide feedback on the Final Result above.\n\n"
"• If you are happy with the result, simply hit Enter without typing anything.\n"
"• Otherwise, provide specific improvement requests.\n"
"• You can provide multiple rounds of feedback until satisfied."
)
title = "💬 Human Feedback Required"
content = Text()
content.append(prompt_text, style="yellow")
prompt_panel = Panel(
content,
title=title,
border_style="yellow",
padding=(1, 2),
)
formatter.console.print(prompt_panel)
response = input()
if response.strip() != "":
formatter.console.print("\n[cyan]Processing your feedback...[/cyan]")
return response
finally:
formatter.resume_live_updates()
_provider: ContextVar[HumanInputProvider | None] = ContextVar(
"human_input_provider",
default=None,
)
def get_provider() -> HumanInputProvider:
"""Get the current human input provider.
Returns:
The current provider, or a new SyncHumanInputProvider if none set.
"""
provider = _provider.get()
if provider is None:
initialized_provider = SyncHumanInputProvider()
set_provider(initialized_provider)
return initialized_provider
return provider
def set_provider(provider: HumanInputProvider) -> Token[HumanInputProvider | None]:
"""Set the human input provider for the current context.
Args:
provider: The provider to use.
Returns:
Token that can be used to reset to previous value.
"""
return _provider.set(provider)
def reset_provider(token: Token[HumanInputProvider | None]) -> None:
"""Reset the provider to its previous value.
Args:
token: Token returned from set_provider.
"""
_provider.reset(token)

View File

@@ -751,8 +751,6 @@ class Crew(FlowTrackable, BaseModel):
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
result = self._post_kickoff(result)
self.usage_metrics = self.calculate_usage_metrics()
return result
@@ -766,9 +764,6 @@ class Crew(FlowTrackable, BaseModel):
clear_files(self.id)
detach(token)
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
return result
def kickoff_for_each(
self,
inputs: list[dict[str, Any]],
@@ -941,8 +936,6 @@ class Crew(FlowTrackable, BaseModel):
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
result = self._post_kickoff(result)
self.usage_metrics = self.calculate_usage_metrics()
return result
@@ -1188,9 +1181,6 @@ class Crew(FlowTrackable, BaseModel):
self.manager_agent = manager
manager.crew = self
def _get_execution_start_index(self, tasks: list[Task]) -> int | None:
return None
def _execute_tasks(
self,
tasks: list[Task],
@@ -1207,9 +1197,6 @@ class Crew(FlowTrackable, BaseModel):
Returns:
CrewOutput: Final output of the crew
"""
custom_start = self._get_execution_start_index(tasks)
if custom_start is not None:
start_index = custom_start
task_outputs: list[TaskOutput] = []
futures: list[tuple[Task, Future[TaskOutput], int]] = []
@@ -1318,10 +1305,8 @@ class Crew(FlowTrackable, BaseModel):
if files:
supported_types: list[str] = []
if agent and agent.llm and agent.llm.supports_multimodal():
provider = (
getattr(agent.llm, "provider", None)
or getattr(agent.llm, "model", None)
or "openai"
provider = getattr(agent.llm, "provider", None) or getattr(
agent.llm, "model", "openai"
)
api = getattr(agent.llm, "api", None)
supported_types = get_supported_content_types(provider, api)
@@ -2026,13 +2011,7 @@ class Crew(FlowTrackable, BaseModel):
@staticmethod
def _show_tracing_disabled_message() -> None:
"""Show a message when tracing is disabled."""
from crewai.events.listeners.tracing.utils import (
has_user_declined_tracing,
should_suppress_tracing_messages,
)
if should_suppress_tracing_messages():
return
from crewai.events.listeners.tracing.utils import has_user_declined_tracing
console = Console()

View File

@@ -195,7 +195,6 @@ __all__ = [
"ToolUsageFinishedEvent",
"ToolUsageStartedEvent",
"ToolValidateInputErrorEvent",
"_extension_exports",
"crewai_event_bus",
]
@@ -211,29 +210,14 @@ _AGENT_EVENT_MAPPING = {
"LiteAgentExecutionStartedEvent": "crewai.events.types.agent_events",
}
_extension_exports: dict[str, Any] = {}
def __getattr__(name: str) -> Any:
"""Lazy import for agent events and registered extensions."""
"""Lazy import for agent events to avoid circular imports."""
if name in _AGENT_EVENT_MAPPING:
import importlib
module_path = _AGENT_EVENT_MAPPING[name]
module = importlib.import_module(module_path)
return getattr(module, name)
if name in _extension_exports:
import importlib
value = _extension_exports[name]
if isinstance(value, str):
module_path, _, attr_name = value.rpartition(".")
if module_path:
module = importlib.import_module(module_path)
return getattr(module, attr_name)
return importlib.import_module(value)
return value
msg = f"module {__name__!r} has no attribute {name!r}"
raise AttributeError(msg)

View File

@@ -227,39 +227,6 @@ class CrewAIEventsBus:
return decorator
def off(
self,
event_type: type[BaseEvent],
handler: Callable[..., Any],
) -> None:
"""Unregister an event handler for a specific event type.
Args:
event_type: The event class to stop listening for
handler: The handler function to unregister
"""
with self._rwlock.w_locked():
if event_type in self._sync_handlers:
existing_sync = self._sync_handlers[event_type]
if handler in existing_sync:
self._sync_handlers[event_type] = existing_sync - {handler}
if not self._sync_handlers[event_type]:
del self._sync_handlers[event_type]
if event_type in self._async_handlers:
existing_async = self._async_handlers[event_type]
if handler in existing_async:
self._async_handlers[event_type] = existing_async - {handler}
if not self._async_handlers[event_type]:
del self._async_handlers[event_type]
if event_type in self._handler_dependencies:
self._handler_dependencies[event_type].pop(handler, None)
if not self._handler_dependencies[event_type]:
del self._handler_dependencies[event_type]
self._execution_plan_cache.pop(event_type, None)
def _call_handlers(
self,
source: Any,

View File

@@ -797,13 +797,7 @@ class TraceCollectionListener(BaseEventListener):
from rich.console import Console
from rich.panel import Panel
from crewai.events.listeners.tracing.utils import (
has_user_declined_tracing,
should_suppress_tracing_messages,
)
if should_suppress_tracing_messages():
return
from crewai.events.listeners.tracing.utils import has_user_declined_tracing
console = Console()

View File

@@ -1,4 +1,3 @@
from collections.abc import Callable
from contextvars import ContextVar, Token
from datetime import datetime
import getpass
@@ -27,35 +26,6 @@ logger = logging.getLogger(__name__)
_tracing_enabled: ContextVar[bool | None] = ContextVar("_tracing_enabled", default=None)
_first_time_trace_hook: ContextVar[Callable[[], bool] | None] = ContextVar(
"_first_time_trace_hook", default=None
)
_suppress_tracing_messages: ContextVar[bool] = ContextVar(
"_suppress_tracing_messages", default=False
)
def set_suppress_tracing_messages(suppress: bool) -> object:
"""Set whether to suppress tracing-related console messages.
Args:
suppress: True to suppress messages, False to show them.
Returns:
A token that can be used to restore the previous value.
"""
return _suppress_tracing_messages.set(suppress)
def should_suppress_tracing_messages() -> bool:
"""Check if tracing messages should be suppressed.
Returns:
True if messages should be suppressed, False otherwise.
"""
return _suppress_tracing_messages.get()
def should_enable_tracing(*, override: bool | None = None) -> bool:
"""Determine if tracing should be enabled.
@@ -437,13 +407,10 @@ def truncate_messages(
def should_auto_collect_first_time_traces() -> bool:
"""True if we should auto-collect traces for first-time user.
Returns:
True if first-time user AND telemetry not disabled AND tracing not explicitly enabled, False otherwise.
"""
hook = _first_time_trace_hook.get()
if hook is not None:
return hook()
if _is_test_environment():
return False
@@ -465,9 +432,6 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
if _is_test_environment():
return False
if should_suppress_tracing_messages():
return False
try:
import threading

View File

@@ -10,7 +10,6 @@ class LLMEventBase(BaseEvent):
from_task: Any | None = None
from_agent: Any | None = None
model: str | None = None
call_id: str
def __init__(self, **data: Any) -> None:
if data.get("from_task"):

View File

@@ -16,7 +16,7 @@ class ToolUsageEvent(BaseEvent):
tool_name: str
tool_args: dict[str, Any] | str
tool_class: str | None = None
run_attempts: int = 0
run_attempts: int | None = None
delegations: int | None = None
agent: Any | None = None
task_name: str | None = None
@@ -26,7 +26,7 @@ class ToolUsageEvent(BaseEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(self, **data: Any) -> None:
def __init__(self, **data):
if data.get("from_task"):
task = data["from_task"]
data["task_id"] = str(task.id)
@@ -96,10 +96,10 @@ class ToolExecutionErrorEvent(BaseEvent):
type: str = "tool_execution_error"
tool_name: str
tool_args: dict[str, Any]
tool_class: Callable[..., Any]
tool_class: Callable
agent: Any | None = None
def __init__(self, **data: Any) -> None:
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:

View File

@@ -1,45 +1,11 @@
from contextvars import ContextVar
import os
import threading
from typing import Any, ClassVar, cast
from typing import Any, ClassVar
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.text import Text
from crewai.cli.version import is_newer_version_available
_disable_version_check: ContextVar[bool] = ContextVar(
"_disable_version_check", default=False
)
_suppress_console_output: ContextVar[bool] = ContextVar(
"_suppress_console_output", default=False
)
def set_suppress_console_output(suppress: bool) -> object:
"""Set whether to suppress all console output.
Args:
suppress: True to suppress output, False to show it.
Returns:
A token that can be used to restore the previous value.
"""
return _suppress_console_output.set(suppress)
def should_suppress_console_output() -> bool:
"""Check if console output should be suppressed.
Returns:
True if output should be suppressed, False otherwise.
"""
return _suppress_console_output.get()
class ConsoleFormatter:
tool_usage_counts: ClassVar[dict[str, int]] = {}
@@ -69,56 +35,13 @@ class ConsoleFormatter:
padding=(1, 2),
)
def _show_version_update_message_if_needed(self) -> None:
"""Show version update message if a newer version is available.
Only displays when verbose mode is enabled and not running in CI/CD.
"""
if not self.verbose:
return
if _disable_version_check.get():
return
if os.getenv("CI", "").lower() in ("true", "1"):
return
if os.getenv("CREWAI_DISABLE_VERSION_CHECK", "").lower() in ("true", "1"):
return
try:
is_newer, current, latest = is_newer_version_available()
if is_newer and latest:
message = f"""A new version of CrewAI is available!
Current version: {current}
Latest version: {latest}
To update, run: uv sync --upgrade-package crewai"""
panel = Panel(
message,
title="✨ Update Available ✨",
border_style="yellow",
padding=(1, 2),
)
self.console.print(panel)
self.console.print()
except Exception: # noqa: S110
# Silently ignore errors in version check - it's non-critical
pass
def _show_tracing_disabled_message_if_needed(self) -> None:
"""Show tracing disabled message if tracing is not enabled."""
from crewai.events.listeners.tracing.utils import (
has_user_declined_tracing,
is_tracing_enabled_in_context,
should_suppress_tracing_messages,
)
if should_suppress_tracing_messages():
return
if not is_tracing_enabled_in_context():
if has_user_declined_tracing():
message = """Info: Tracing is disabled.
@@ -170,8 +93,6 @@ To enable tracing, do any one of these:
def print(self, *args: Any, **kwargs: Any) -> None:
"""Print to console. Simplified to only handle panel-based output."""
if should_suppress_console_output():
return
# Skip blank lines during streaming
if len(args) == 0 and self._is_streaming:
return
@@ -255,10 +176,9 @@ To enable tracing, do any one of these:
if not self.verbose:
return
# Reset the crew completion event for this new crew execution
ConsoleFormatter.crew_completion_printed.clear()
self._show_version_update_message_if_needed()
content = self.create_status_content(
"Crew Execution Started",
crew_name,
@@ -317,8 +237,6 @@ To enable tracing, do any one of these:
def handle_flow_started(self, flow_name: str, flow_id: str) -> None:
"""Show flow started panel."""
self._show_version_update_message_if_needed()
content = Text()
content.append("Flow Started\n", style="blue bold")
content.append("Name: ", style="white")
@@ -528,9 +446,6 @@ To enable tracing, do any one of these:
if not self.verbose:
return
if should_suppress_console_output():
return
self._is_streaming = True
self._last_stream_call_type = call_type
@@ -970,7 +885,7 @@ To enable tracing, do any one of these:
is_a2a_delegation = False
try:
output_data = json.loads(cast(str, formatted_answer.output))
output_data = json.loads(formatted_answer.output)
if isinstance(output_data, dict):
if output_data.get("is_a2a") is True:
is_a2a_delegation = True

View File

@@ -18,7 +18,6 @@ from crewai.agents.parser import (
AgentFinish,
OutputParserError,
)
from crewai.core.providers.human_input import get_provider
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled_in_context,
@@ -42,12 +41,7 @@ from crewai.hooks.tool_hooks import (
get_after_tool_call_hooks,
get_before_tool_call_hooks,
)
from crewai.hooks.types import (
AfterLLMCallHookCallable,
AfterLLMCallHookType,
BeforeLLMCallHookCallable,
BeforeLLMCallHookType,
)
from crewai.hooks.types import AfterLLMCallHookType, BeforeLLMCallHookType
from crewai.utilities.agent_utils import (
convert_tools_to_openai_schema,
enforce_rpm_limit,
@@ -197,12 +191,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._instance_id = str(uuid4())[:8]
self.before_llm_call_hooks: list[
BeforeLLMCallHookType | BeforeLLMCallHookCallable
] = []
self.after_llm_call_hooks: list[
AfterLLMCallHookType | AfterLLMCallHookCallable
] = []
self.before_llm_call_hooks: list[BeforeLLMCallHookType] = []
self.after_llm_call_hooks: list[AfterLLMCallHookType] = []
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
@@ -217,51 +207,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
)
self._state = AgentReActState()
@property
def messages(self) -> list[LLMMessage]:
"""Delegate to state for ExecutorContext conformance."""
return self._state.messages
@messages.setter
def messages(self, value: list[LLMMessage]) -> None:
"""Delegate to state for ExecutorContext conformance."""
self._state.messages = value
@property
def ask_for_human_input(self) -> bool:
"""Delegate to state for ExecutorContext conformance."""
return self._state.ask_for_human_input
@ask_for_human_input.setter
def ask_for_human_input(self, value: bool) -> None:
"""Delegate to state for ExecutorContext conformance."""
self._state.ask_for_human_input = value
def _invoke_loop(self) -> AgentFinish:
"""Invoke the agent loop and return the result.
Required by ExecutorContext protocol.
"""
self._state.iterations = 0
self._state.is_finished = False
self._state.current_answer = None
self.kickoff()
answer = self._state.current_answer
if not isinstance(answer, AgentFinish):
raise RuntimeError("Agent loop did not produce a final answer")
return answer
def _format_feedback_message(self, feedback: str) -> LLMMessage:
"""Format feedback as a message for the LLM.
Required by ExecutorContext protocol.
"""
return format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
def _ensure_flow_initialized(self) -> None:
"""Ensure Flow.__init__() has been called.
@@ -355,6 +300,16 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""
return self._state
@property
def messages(self) -> list[LLMMessage]:
"""Compatibility property for mixin - returns state messages."""
return self._state.messages
@messages.setter
def messages(self, value: list[LLMMessage]) -> None:
"""Set state messages."""
self._state.messages = value
@property
def iterations(self) -> int:
"""Compatibility property for mixin - returns state iterations."""
@@ -734,7 +689,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
agent_key=agent_key,
),
)
error_event_emitted = False
track_delegation_if_needed(func_name, args_dict, self.task)
@@ -810,7 +764,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
error=e,
),
)
error_event_emitted = True
elif max_usage_reached and original_tool:
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
@@ -839,20 +792,20 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
color="red",
)
if not error_event_emitted:
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
@@ -1366,8 +1319,17 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
Returns:
Final answer after feedback.
"""
provider = get_provider()
return provider.handle_feedback(formatted_answer, self)
output_str = (
str(formatted_answer.output)
if isinstance(formatted_answer.output, BaseModel)
else formatted_answer.output
)
human_feedback = self._ask_human_input(output_str)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)
return self._handle_regular_feedback(formatted_answer, human_feedback)
def _is_training_mode(self) -> bool:
"""Check if training mode is active.
@@ -1377,6 +1339,101 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""
return bool(self.crew and self.crew._train)
def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish:
"""Process training feedback and generate improved answer.
Args:
initial_answer: Initial agent output.
feedback: Training feedback.
Returns:
Improved answer.
"""
self._handle_crew_training_output(initial_answer, feedback)
self.state.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
# Re-run flow for improved answer
self.state.iterations = 0
self.state.is_finished = False
self.state.current_answer = None
self.kickoff()
# Get improved answer from state
improved_answer = self.state.current_answer
if not isinstance(improved_answer, AgentFinish):
raise RuntimeError(
"Training feedback iteration did not produce final answer"
)
self._handle_crew_training_output(improved_answer)
self.state.ask_for_human_input = False
return improved_answer
def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish:
"""Process regular feedback iteratively until user is satisfied.
Args:
current_answer: Current agent output.
initial_feedback: Initial user feedback.
Returns:
Final answer after iterations.
"""
feedback = initial_feedback
answer = current_answer
while self.state.ask_for_human_input:
if feedback.strip() == "":
self.state.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
output_str = (
str(answer.output)
if isinstance(answer.output, BaseModel)
else answer.output
)
feedback = self._ask_human_input(output_str)
return answer
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration and generate updated response.
Args:
feedback: User feedback.
Returns:
Updated agent response.
"""
self.state.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
# Re-run flow
self.state.iterations = 0
self.state.is_finished = False
self.state.current_answer = None
self.kickoff()
# Get answer from state
answer = self.state.current_answer
if not isinstance(answer, AgentFinish):
raise RuntimeError("Feedback iteration did not produce final answer")
return answer
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: GetCoreSchemaHandler

View File

@@ -28,8 +28,6 @@ Example:
```
"""
from typing import Any
from crewai.flow.async_feedback.providers import ConsoleProvider
from crewai.flow.async_feedback.types import (
HumanFeedbackPending,
@@ -43,15 +41,4 @@ __all__ = [
"HumanFeedbackPending",
"HumanFeedbackProvider",
"PendingFeedbackContext",
"_extension_exports",
]
_extension_exports: dict[str, Any] = {}
def __getattr__(name: str) -> Any:
"""Support extensions via dynamic attribute lookup."""
if name in _extension_exports:
return _extension_exports[name]
msg = f"module {__name__!r} has no attribute {name!r}"
raise AttributeError(msg)

View File

@@ -45,7 +45,6 @@ from crewai.events.listeners.tracing.utils import (
has_user_declined_tracing,
set_tracing_enabled,
should_enable_tracing,
should_suppress_tracing_messages,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
@@ -514,11 +513,17 @@ class FlowMeta(type):
and attr_value.__is_router__
):
routers.add(attr_name)
possible_returns = get_possible_return_constants(attr_value)
if possible_returns:
router_paths[attr_name] = possible_returns
if (
hasattr(attr_value, "__router_paths__")
and attr_value.__router_paths__
):
router_paths[attr_name] = attr_value.__router_paths__
else:
router_paths[attr_name] = []
possible_returns = get_possible_return_constants(attr_value)
if possible_returns:
router_paths[attr_name] = possible_returns
else:
router_paths[attr_name] = []
# Handle start methods that are also routers (e.g., @human_feedback with emit)
if (
@@ -2075,14 +2080,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
racing_members,
other_listeners,
listener_result,
current_triggering_event_id,
triggering_event_id,
)
else:
tasks = [
self._execute_single_listener(
listener_name,
listener_result,
current_triggering_event_id,
listener_name, listener_result, triggering_event_id
)
for listener_name in listeners_triggered
]
@@ -2629,8 +2632,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
@staticmethod
def _show_tracing_disabled_message() -> None:
"""Show a message when tracing is disabled."""
if should_suppress_tracing_messages():
return
console = Console()

View File

@@ -1025,7 +1025,7 @@ class TriggeredByHighlighter {
const isAndOrRouter = edge.dashes || edge.label === "AND";
const highlightColor = isAndOrRouter
? "{{ CREWAI_ORANGE }}"
? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
const updateData = {
@@ -1080,7 +1080,7 @@ class TriggeredByHighlighter {
// Keep the original edge color instead of turning gray
const isAndOrRouter = edge.dashes || edge.label === "AND";
const baseColor = isAndOrRouter
? "{{ CREWAI_ORANGE }}"
? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
// Convert color to rgba with opacity for vis.js
@@ -1142,7 +1142,7 @@ class TriggeredByHighlighter {
const defaultColor =
edge.dashes || edge.label === "AND"
? "{{ CREWAI_ORANGE }}"
? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
const currentOpacity = edge.opacity !== undefined ? edge.opacity : 1.0;
const currentWidth =
@@ -1253,7 +1253,7 @@ class TriggeredByHighlighter {
const defaultColor =
edge.dashes || edge.label === "AND"
? "{{ CREWAI_ORANGE }}"
? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
const currentOpacity = edge.opacity !== undefined ? edge.opacity : 1.0;
const currentWidth =
@@ -2370,7 +2370,7 @@ class NetworkManager {
this.edges.forEach((edge) => {
let edgeColor;
if (edge.dashes || edge.label === "AND") {
edgeColor = "{{ CREWAI_ORANGE }}";
edgeColor = edge.color?.color || "{{ CREWAI_ORANGE }}";
} else {
edgeColor = orEdgeColor;
}

View File

@@ -129,7 +129,7 @@ def _create_edges_from_condition(
edges: list[StructureEdge] = []
if isinstance(condition, str):
if condition in nodes:
if condition in nodes and condition != target:
edges.append(
StructureEdge(
source=condition,
@@ -140,7 +140,7 @@ def _create_edges_from_condition(
)
elif callable(condition) and hasattr(condition, "__name__"):
method_name = condition.__name__
if method_name in nodes:
if method_name in nodes and method_name != target:
edges.append(
StructureEdge(
source=method_name,
@@ -163,7 +163,7 @@ def _create_edges_from_condition(
is_router_path=False,
)
for trigger in triggers
if trigger in nodes
if trigger in nodes and trigger != target
)
else:
for sub_cond in conditions_list:
@@ -196,9 +196,34 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
node_metadata["type"] = "start"
start_methods.append(method_name)
if (
hasattr(method, "__human_feedback_config__")
and method.__human_feedback_config__
):
config = method.__human_feedback_config__
node_metadata["is_human_feedback"] = True
node_metadata["human_feedback_message"] = config.message
if config.emit:
node_metadata["human_feedback_emit"] = list(config.emit)
if config.llm:
llm_str = (
config.llm
if isinstance(config.llm, str)
else str(type(config.llm).__name__)
)
node_metadata["human_feedback_llm"] = llm_str
if config.default_outcome:
node_metadata["human_feedback_default_outcome"] = config.default_outcome
if hasattr(method, "__is_router__") and method.__is_router__:
node_metadata["is_router"] = True
node_metadata["type"] = "router"
if "is_human_feedback" not in node_metadata:
node_metadata["type"] = "router"
else:
node_metadata["type"] = "human_feedback"
router_methods.append(method_name)
if method_name in flow._router_paths:
@@ -317,7 +342,7 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
is_router_path=False,
)
for trigger_method in methods
if str(trigger_method) in nodes
if str(trigger_method) in nodes and str(trigger_method) != listener_name
)
elif is_flow_condition_dict(condition_data):
edges.extend(

View File

@@ -81,6 +81,7 @@ class JSExtension(Extension):
CREWAI_ORANGE = "#FF5A50"
HITL_BLUE = "#4A90E2"
DARK_GRAY = "#333333"
WHITE = "#FFFFFF"
GRAY = "#666666"
@@ -225,6 +226,7 @@ def render_interactive(
nodes_list: list[dict[str, Any]] = []
for name, metadata in dag["nodes"].items():
node_type: str = metadata.get("type", "listen")
is_human_feedback: bool = metadata.get("is_human_feedback", False)
color_config: dict[str, Any]
font_color: str
@@ -241,6 +243,17 @@ def render_interactive(
}
font_color = "var(--node-text-color)"
border_width = 3
elif node_type == "human_feedback":
color_config = {
"background": "var(--node-bg-router)",
"border": HITL_BLUE,
"highlight": {
"background": "var(--node-bg-router)",
"border": HITL_BLUE,
},
}
font_color = "var(--node-text-color)"
border_width = 3
elif node_type == "router":
color_config = {
"background": "var(--node-bg-router)",
@@ -266,16 +279,57 @@ def render_interactive(
title_parts: list[str] = []
type_badge_bg: str = (
CREWAI_ORANGE if node_type in ["start", "router"] else DARK_GRAY
)
display_type = node_type
type_badge_bg: str
if node_type == "human_feedback":
type_badge_bg = HITL_BLUE
display_type = "HITL"
elif node_type in ["start", "router"]:
type_badge_bg = CREWAI_ORANGE
else:
type_badge_bg = DARK_GRAY
title_parts.append(f"""
<div style="border-bottom: 1px solid rgba(102,102,102,0.15); padding-bottom: 8px; margin-bottom: 10px;">
<div style="font-size: 13px; font-weight: 700; color: {DARK_GRAY}; margin-bottom: 6px;">{name}</div>
<span style="display: inline-block; background: {type_badge_bg}; color: white; padding: 2px 8px; border-radius: 4px; font-size: 10px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px;">{node_type}</span>
<span style="display: inline-block; background: {type_badge_bg}; color: white; padding: 2px 8px; border-radius: 4px; font-size: 10px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px;">{display_type}</span>
</div>
""")
if is_human_feedback:
feedback_msg = metadata.get("human_feedback_message", "")
if feedback_msg:
title_parts.append(f"""
<div style="margin-bottom: 8px;">
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 4px; font-weight: 600;">👤 Human Feedback</div>
<div style="background: rgba(74,144,226,0.08); padding: 6px 8px; border-radius: 4px; font-size: 11px; color: {DARK_GRAY}; border: 1px solid rgba(74,144,226,0.2); line-height: 1.4;">{feedback_msg}</div>
</div>
""")
if metadata.get("human_feedback_emit"):
emit_options = metadata["human_feedback_emit"]
emit_items = "".join(
[
f'<li style="margin: 3px 0;"><code style="background: rgba(74,144,226,0.08); padding: 2px 6px; border-radius: 3px; font-size: 10px; color: {HITL_BLUE}; border: 1px solid rgba(74,144,226,0.2); font-weight: 600;">{opt}</code></li>'
for opt in emit_options
]
)
title_parts.append(f"""
<div style="margin-bottom: 8px;">
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 4px; font-weight: 600;">Outcomes</div>
<ul style="list-style: none; padding: 0; margin: 0;">{emit_items}</ul>
</div>
""")
if metadata.get("human_feedback_llm"):
llm_model = metadata["human_feedback_llm"]
title_parts.append(f"""
<div style="margin-bottom: 8px;">
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 3px; font-weight: 600;">LLM</div>
<span style="display: inline-block; background: rgba(102,102,102,0.08); padding: 3px 8px; border-radius: 4px; font-size: 10px; color: {DARK_GRAY}; border: 1px solid rgba(102,102,102,0.12);">{llm_model}</span>
</div>
""")
if metadata.get("condition_type"):
condition = metadata["condition_type"]
if condition == "AND":
@@ -309,7 +363,7 @@ def render_interactive(
</div>
""")
if metadata.get("router_paths"):
if metadata.get("router_paths") and not is_human_feedback:
paths = metadata["router_paths"]
paths_items = "".join(
[
@@ -365,7 +419,11 @@ def render_interactive(
edge_dashes: bool | list[int] = False
if edge["is_router_path"]:
edge_color = CREWAI_ORANGE
source_node = dag["nodes"].get(edge["source"], {})
if source_node.get("is_human_feedback", False):
edge_color = HITL_BLUE
else:
edge_color = CREWAI_ORANGE
edge_dashes = [15, 10]
if "router_path_label" in edge:
edge_label = edge["router_path_label"]
@@ -417,6 +475,7 @@ def render_interactive(
css_content = css_content.replace("'{{ DARK_GRAY }}'", DARK_GRAY)
css_content = css_content.replace("'{{ GRAY }}'", GRAY)
css_content = css_content.replace("'{{ CREWAI_ORANGE }}'", CREWAI_ORANGE)
css_content = css_content.replace("'{{ HITL_BLUE }}'", HITL_BLUE)
css_output_path.write_text(css_content, encoding="utf-8")
@@ -430,6 +489,7 @@ def render_interactive(
js_content = js_content.replace("{{ DARK_GRAY }}", DARK_GRAY)
js_content = js_content.replace("{{ GRAY }}", GRAY)
js_content = js_content.replace("{{ CREWAI_ORANGE }}", CREWAI_ORANGE)
js_content = js_content.replace("{{ HITL_BLUE }}", HITL_BLUE)
js_content = js_content.replace("'{{ nodeData }}'", dag_nodes_json)
js_content = js_content.replace("'{{ dagData }}'", dag_full_json)
js_content = js_content.replace("'{{ nodes_list_json }}'", json.dumps(nodes_list))
@@ -441,6 +501,7 @@ def render_interactive(
html_content = template.render(
CREWAI_ORANGE=CREWAI_ORANGE,
HITL_BLUE=HITL_BLUE,
DARK_GRAY=DARK_GRAY,
WHITE=WHITE,
GRAY=GRAY,

View File

@@ -21,6 +21,11 @@ class NodeMetadata(TypedDict, total=False):
class_signature: str
class_name: str
class_line_number: int
is_human_feedback: bool
human_feedback_message: str
human_feedback_emit: list[str]
human_feedback_llm: str
human_feedback_default_outcome: str
class StructureEdge(TypedDict, total=False):

View File

@@ -3,12 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast
from crewai.events.event_listener import event_listener
from crewai.hooks.types import (
AfterLLMCallHookCallable,
AfterLLMCallHookType,
BeforeLLMCallHookCallable,
BeforeLLMCallHookType,
)
from crewai.hooks.types import AfterLLMCallHookType, BeforeLLMCallHookType
from crewai.utilities.printer import Printer
@@ -154,12 +149,12 @@ class LLMCallHookContext:
event_listener.formatter.resume_live_updates()
_before_llm_call_hooks: list[BeforeLLMCallHookType | BeforeLLMCallHookCallable] = []
_after_llm_call_hooks: list[AfterLLMCallHookType | AfterLLMCallHookCallable] = []
_before_llm_call_hooks: list[BeforeLLMCallHookType] = []
_after_llm_call_hooks: list[AfterLLMCallHookType] = []
def register_before_llm_call_hook(
hook: BeforeLLMCallHookType | BeforeLLMCallHookCallable,
hook: BeforeLLMCallHookType,
) -> None:
"""Register a global before_llm_call hook.
@@ -195,7 +190,7 @@ def register_before_llm_call_hook(
def register_after_llm_call_hook(
hook: AfterLLMCallHookType | AfterLLMCallHookCallable,
hook: AfterLLMCallHookType,
) -> None:
"""Register a global after_llm_call hook.
@@ -222,9 +217,7 @@ def register_after_llm_call_hook(
_after_llm_call_hooks.append(hook)
def get_before_llm_call_hooks() -> list[
BeforeLLMCallHookType | BeforeLLMCallHookCallable
]:
def get_before_llm_call_hooks() -> list[BeforeLLMCallHookType]:
"""Get all registered global before_llm_call hooks.
Returns:
@@ -233,7 +226,7 @@ def get_before_llm_call_hooks() -> list[
return _before_llm_call_hooks.copy()
def get_after_llm_call_hooks() -> list[AfterLLMCallHookType | AfterLLMCallHookCallable]:
def get_after_llm_call_hooks() -> list[AfterLLMCallHookType]:
"""Get all registered global after_llm_call hooks.
Returns:
@@ -243,7 +236,7 @@ def get_after_llm_call_hooks() -> list[AfterLLMCallHookType | AfterLLMCallHookCa
def unregister_before_llm_call_hook(
hook: BeforeLLMCallHookType | BeforeLLMCallHookCallable,
hook: BeforeLLMCallHookType,
) -> bool:
"""Unregister a specific global before_llm_call hook.
@@ -269,7 +262,7 @@ def unregister_before_llm_call_hook(
def unregister_after_llm_call_hook(
hook: AfterLLMCallHookType | AfterLLMCallHookCallable,
hook: AfterLLMCallHookType,
) -> bool:
"""Unregister a specific global after_llm_call hook.

View File

@@ -3,12 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any
from crewai.events.event_listener import event_listener
from crewai.hooks.types import (
AfterToolCallHookCallable,
AfterToolCallHookType,
BeforeToolCallHookCallable,
BeforeToolCallHookType,
)
from crewai.hooks.types import AfterToolCallHookType, BeforeToolCallHookType
from crewai.utilities.printer import Printer
@@ -117,12 +112,12 @@ class ToolCallHookContext:
# Global hook registries
_before_tool_call_hooks: list[BeforeToolCallHookType | BeforeToolCallHookCallable] = []
_after_tool_call_hooks: list[AfterToolCallHookType | AfterToolCallHookCallable] = []
_before_tool_call_hooks: list[BeforeToolCallHookType] = []
_after_tool_call_hooks: list[AfterToolCallHookType] = []
def register_before_tool_call_hook(
hook: BeforeToolCallHookType | BeforeToolCallHookCallable,
hook: BeforeToolCallHookType,
) -> None:
"""Register a global before_tool_call hook.
@@ -159,7 +154,7 @@ def register_before_tool_call_hook(
def register_after_tool_call_hook(
hook: AfterToolCallHookType | AfterToolCallHookCallable,
hook: AfterToolCallHookType,
) -> None:
"""Register a global after_tool_call hook.
@@ -189,9 +184,7 @@ def register_after_tool_call_hook(
_after_tool_call_hooks.append(hook)
def get_before_tool_call_hooks() -> list[
BeforeToolCallHookType | BeforeToolCallHookCallable
]:
def get_before_tool_call_hooks() -> list[BeforeToolCallHookType]:
"""Get all registered global before_tool_call hooks.
Returns:
@@ -200,9 +193,7 @@ def get_before_tool_call_hooks() -> list[
return _before_tool_call_hooks.copy()
def get_after_tool_call_hooks() -> list[
AfterToolCallHookType | AfterToolCallHookCallable
]:
def get_after_tool_call_hooks() -> list[AfterToolCallHookType]:
"""Get all registered global after_tool_call hooks.
Returns:
@@ -212,7 +203,7 @@ def get_after_tool_call_hooks() -> list[
def unregister_before_tool_call_hook(
hook: BeforeToolCallHookType | BeforeToolCallHookCallable,
hook: BeforeToolCallHookType,
) -> bool:
"""Unregister a specific global before_tool_call hook.
@@ -238,7 +229,7 @@ def unregister_before_tool_call_hook(
def unregister_after_tool_call_hook(
hook: AfterToolCallHookType | AfterToolCallHookCallable,
hook: AfterToolCallHookType,
) -> bool:
"""Unregister a specific global after_tool_call hook.

View File

@@ -1 +0,0 @@
"""Knowledge source utilities."""

View File

@@ -1,70 +0,0 @@
"""Helper utilities for knowledge sources."""
from typing import Any, ClassVar
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
from crewai.knowledge.source.excel_knowledge_source import ExcelKnowledgeSource
from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource
from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
class SourceHelper:
"""Helper class for creating and managing knowledge sources."""
SUPPORTED_FILE_TYPES: ClassVar[list[str]] = [
".csv",
".pdf",
".json",
".txt",
".xlsx",
".xls",
]
_FILE_TYPE_MAP: ClassVar[dict[str, type[BaseKnowledgeSource]]] = {
".csv": CSVKnowledgeSource,
".pdf": PDFKnowledgeSource,
".json": JSONKnowledgeSource,
".txt": TextFileKnowledgeSource,
".xlsx": ExcelKnowledgeSource,
".xls": ExcelKnowledgeSource,
}
@classmethod
def is_supported_file(cls, file_path: str) -> bool:
"""Check if a file type is supported.
Args:
file_path: Path to the file.
Returns:
True if the file type is supported.
"""
return file_path.lower().endswith(tuple(cls.SUPPORTED_FILE_TYPES))
@classmethod
def get_source(
cls, file_path: str, metadata: dict[str, Any] | None = None
) -> BaseKnowledgeSource:
"""Create appropriate KnowledgeSource based on file extension.
Args:
file_path: Path to the file.
metadata: Optional metadata to attach to the source.
Returns:
The appropriate KnowledgeSource instance.
Raises:
ValueError: If the file type is not supported.
"""
if not cls.is_supported_file(file_path):
raise ValueError(f"Unsupported file type: {file_path}")
lower_path = file_path.lower()
for ext, source_cls in cls._FILE_TYPE_MAP.items():
if lower_path.endswith(ext):
return source_cls(file_path=[file_path], metadata=metadata)
raise ValueError(f"Unsupported file type: {file_path}")

View File

@@ -37,7 +37,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.llms.base_llm import BaseLLM, get_current_call_id, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.llms.constants import (
ANTHROPIC_MODELS,
AZURE_MODELS,
@@ -770,7 +770,7 @@ class LLM(BaseLLM):
chunk_content = None
response_id = None
if hasattr(chunk, "id"):
if hasattr(chunk,'id'):
response_id = chunk.id
# Safely extract content from various chunk formats
@@ -827,7 +827,7 @@ class LLM(BaseLLM):
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_id=response_id,
response_id=response_id
)
if result is not None:
@@ -849,8 +849,7 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.LLM_CALL,
response_id=response_id,
call_id=get_current_call_id(),
response_id=response_id
),
)
# --- 4) Fallback to non-streaming if no content received
@@ -1016,10 +1015,7 @@ class LLM(BaseLLM):
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise Exception(f"Failed to get streaming response: {e!s}") from e
@@ -1052,8 +1048,7 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
call_type=LLMCallType.TOOL_CALL,
response_id=response_id,
call_id=get_current_call_id(),
response_id=response_id
),
)
@@ -1481,8 +1476,7 @@ class LLM(BaseLLM):
chunk=chunk_content,
from_task=from_task,
from_agent=from_agent,
response_id=response_id,
call_id=get_current_call_id(),
response_id=response_id
),
)
@@ -1625,12 +1619,7 @@ class LLM(BaseLLM):
logging.error(f"Error executing function '{function_name}': {e}")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=f"Tool execution error: {e!s}",
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
event=LLMCallFailedEvent(error=f"Tool execution error: {e!s}"),
)
crewai_event_bus.emit(
self,
@@ -1680,117 +1669,108 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
result = self._handle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
raise
else:
result = self._handle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
if isinstance(result, str):
result = self._invoke_after_llm_call_hooks(
messages, result, from_agent
)
return result
except LLMContextLengthExceededError:
# Re-raise LLMContextLengthExceededError as it should be handled
# by the CrewAgentExecutor._invoke_loop method, which can then decide
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
async def acall(
self,
@@ -1828,54 +1808,43 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=call_id,
),
)
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
self._validate_call_params()
self._validate_call_params()
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
# Process file attachments asynchronously before preparing params
messages = await self._aprocess_message_files(messages)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
msg_role: Literal["assistant"] = "assistant"
message["role"] = msg_role
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
params = self._prepare_completion_params(
messages, tools, skip_file_processing=True
)
if self.stream:
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._ahandle_non_streaming_response(
if self.stream:
return await self._ahandle_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
@@ -1883,50 +1852,52 @@ class LLM(BaseLLM):
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append(
"stop"
)
else:
self.additional_params = {
"additional_drop_params": ["stop"]
}
return await self._ahandle_non_streaming_response(
params=params,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except LLMContextLengthExceededError:
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e),
from_task=from_task,
from_agent=from_agent,
call_id=get_current_call_id(),
),
logging.info("Retrying LLM call without the unsupported 'stop'")
return await self.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
raise
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise
def _handle_emit_call_events(
self,
@@ -1954,7 +1925,6 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)

View File

@@ -7,15 +7,11 @@ in CrewAI, including common functionality for native SDK implementations.
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
import contextvars
from datetime import datetime
import json
import logging
import re
from typing import TYPE_CHECKING, Any, Final
import uuid
from pydantic import BaseModel
@@ -54,32 +50,6 @@ DEFAULT_CONTEXT_WINDOW_SIZE: Final[int] = 4096
DEFAULT_SUPPORTS_STOP_WORDS: Final[bool] = True
_JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTALL)
_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_call_id", default=None
)
@contextmanager
def llm_call_context() -> Generator[str, None, None]:
"""Context manager that establishes an LLM call scope with a unique call_id."""
call_id = str(uuid.uuid4())
token = _current_call_id.set(call_id)
try:
yield call_id
finally:
_current_call_id.reset(token)
def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
if call_id is None:
logging.warning(
"LLM event emitted outside call context - generating fallback call_id"
)
return str(uuid.uuid4())
return call_id
class BaseLLM(ABC):
"""Abstract base class for LLM implementations.
@@ -381,7 +351,6 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -405,7 +374,6 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -426,7 +394,6 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
model=self.model,
call_id=get_current_call_id(),
),
)
@@ -461,7 +428,6 @@ class BaseLLM(ABC):
from_agent=from_agent,
call_type=call_type,
response_id=response_id,
call_id=get_current_call_id(),
),
)

View File

@@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any, Final, Literal, TypeGuard, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -266,46 +266,35 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Anthropic
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
# Format messages for Anthropic
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools, available_functions
)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
@@ -313,13 +302,21 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -345,37 +342,28 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = (
self._format_messages_for_anthropic(messages)
)
formatted_messages, system_message = self._format_messages_for_anthropic(
messages
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools, available_functions
)
completion_params = self._prepare_completion_params(
formatted_messages, system_message, tools
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
@@ -383,20 +371,27 @@ class AnthropicCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
error_msg = f"Anthropic API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_completion_params(
self,
messages: list[LLMMessage],
system_message: str | None = None,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Prepare parameters for Anthropic messages API.
@@ -404,8 +399,6 @@ class AnthropicCompletion(BaseLLM):
messages: Formatted messages for Anthropic
system_message: Extracted system message
tools: Tool definitions
available_functions: Available functions for tool calling. When provided
with a single tool, tool_choice is automatically set to force tool use.
Returns:
Parameters dictionary for Anthropic API
@@ -431,13 +424,7 @@ class AnthropicCompletion(BaseLLM):
# Handle tools for Claude 3+
if tools and self.supports_tools:
converted_tools = self._convert_tools_for_interference(tools)
params["tools"] = converted_tools
if available_functions and len(converted_tools) == 1:
tool_name = converted_tools[0].get("name")
if tool_name and tool_name in available_functions:
params["tool_choice"] = {"type": "tool", "name": tool_name}
params["tools"] = self._convert_tools_for_interference(tools)
if self.thinking:
if isinstance(self.thinking, AnthropicThinkingConfig):
@@ -739,11 +726,15 @@ class AnthropicCompletion(BaseLLM):
)
return list(tool_uses)
result = self._execute_first_tool(
tool_uses, available_functions, from_task, from_agent
# Handle tool use conversation flow internally
return self._handle_tool_use_conversation(
response,
tool_uses,
params,
available_functions,
from_task,
from_agent,
)
if result is not None:
return result
content = ""
thinking_blocks: list[ThinkingBlock] = []
@@ -944,12 +935,14 @@ class AnthropicCompletion(BaseLLM):
if not available_functions:
return list(tool_uses)
# Execute first tool and return result directly
result = self._execute_first_tool(
tool_uses, available_functions, from_task, from_agent
return self._handle_tool_use_conversation(
final_message,
tool_uses,
params,
available_functions,
from_task,
from_agent,
)
if result is not None:
return result
full_response = self._apply_stop_words(full_response)
@@ -1008,41 +1001,6 @@ class AnthropicCompletion(BaseLLM):
return tool_results
def _execute_first_tool(
self,
tool_uses: list[ToolUseBlock | BetaToolUseBlock],
available_functions: dict[str, Any],
from_task: Any | None = None,
from_agent: Any | None = None,
) -> Any | None:
"""Execute the first tool from the tool_uses list and return its result.
This is used when available_functions is provided, to directly execute
the tool and return its result (matching OpenAI behavior for use cases
like reasoning_handler).
Args:
tool_uses: List of tool use blocks from Claude's response
available_functions: Available functions for tool calling
from_task: Task that initiated the call
from_agent: Agent that initiated the call
Returns:
The result of the first tool execution, or None if execution failed
"""
tool_use = tool_uses[0]
function_name = tool_use.name
function_args = cast(dict[str, Any], tool_use.input)
return self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# TODO: we drop this
def _handle_tool_use_conversation(
self,
initial_response: Message | BetaMessage,
@@ -1258,11 +1216,14 @@ class AnthropicCompletion(BaseLLM):
)
return list(tool_uses)
result = self._execute_first_tool(
tool_uses, available_functions, from_task, from_agent
return await self._ahandle_tool_use_conversation(
response,
tool_uses,
params,
available_functions,
from_task,
from_agent,
)
if result is not None:
return result
content = ""
if response.content:
@@ -1443,11 +1404,14 @@ class AnthropicCompletion(BaseLLM):
if not available_functions:
return list(tool_uses)
result = self._execute_first_tool(
tool_uses, available_functions, from_task, from_agent
return await self._ahandle_tool_use_conversation(
final_message,
tool_uses,
params,
available_functions,
from_task,
from_agent,
)
if result is not None:
return result
full_response = self._apply_stop_words(full_response)

View File

@@ -43,7 +43,7 @@ try:
)
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
except ImportError:
raise ImportError(
@@ -293,44 +293,32 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
# Format messages for Azure
formatted_messages = self._format_messages_for_azure(messages)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
# Prepare completion parameters
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
# Handle streaming vs non-streaming
if self.stream:
return self._handle_streaming_completion(
completion_params,
available_functions,
from_task,
@@ -338,8 +326,16 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
return self._handle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
return self._handle_api_error(e, from_task, from_agent) # type: ignore[func-returns-value]
async def acall( # type: ignore[return]
self,
@@ -365,35 +361,25 @@ class AzureCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
effective_response_model = response_model or self.response_format
effective_response_model = response_model or self.response_format
formatted_messages = self._format_messages_for_azure(messages)
formatted_messages = self._format_messages_for_azure(messages)
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
completion_params = self._prepare_completion_params(
formatted_messages, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
if self.stream:
return await self._ahandle_streaming_completion(
completion_params,
available_functions,
from_task,
@@ -401,8 +387,16 @@ class AzureCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
return await self._ahandle_completion(
completion_params,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except Exception as e:
self._handle_api_error(e, from_task, from_agent)
def _prepare_completion_params(
self,

View File

@@ -11,7 +11,7 @@ from pydantic import BaseModel
from typing_extensions import Required
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -378,90 +378,77 @@ class BedrockCompletion(BaseLLM):
"""Call AWS Bedrock Converse API."""
effective_response_model = response_model or self.response_format
with llm_call_context():
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
try:
# Emit call started event
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
# Format messages for Converse API
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
# Prepare request body
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
# Add system message if present
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
# Add optional advanced features if configured
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return self._handle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_converse(
if self.stream:
return self._handle_streaming_converse(
formatted_messages,
body,
available_functions,
@@ -470,17 +457,26 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
return self._handle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -518,80 +514,69 @@ class BedrockCompletion(BaseLLM):
'Install with: uv add "crewai[bedrock-async]"'
)
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
}
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
)
formatted_messages, system_message = self._format_messages_for_converse(
messages
)
body: BedrockConverseRequestBody = {
"inferenceConfig": self._get_inference_config(),
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
if system_message:
body["system"] = cast(
"list[SystemContentBlockTypeDef]",
cast(object, [{"text": system_message}]),
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
"Sequence[ToolTypeDef]",
cast(object, self._format_tools_for_converse(tools)),
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef", cast(object, self.guardrail_config)
)
body["guardrailConfig"] = guardrail_config
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
"GuardrailConfigurationTypeDef",
cast(object, self.guardrail_config),
)
body["guardrailConfig"] = guardrail_config
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.additional_model_request_fields:
body["additionalModelRequestFields"] = (
self.additional_model_request_fields
)
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.additional_model_response_field_paths:
body["additionalModelResponseFieldPaths"] = (
self.additional_model_response_field_paths
)
if self.stream:
return await self._ahandle_streaming_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_converse(
if self.stream:
return await self._ahandle_streaming_converse(
formatted_messages,
body,
available_functions,
@@ -600,17 +585,26 @@ class BedrockCompletion(BaseLLM):
effective_response_model,
)
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
return await self._ahandle_converse(
formatted_messages,
body,
available_functions,
from_task,
from_agent,
effective_response_model,
)
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
if is_context_length_exceeded(e):
logging.error(f"Context window exceeded: {e}")
raise LLMContextLengthExceededError(str(e)) from e
error_msg = f"AWS Bedrock API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _handle_converse(
self,

View File

@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Literal, cast
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -293,45 +293,33 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
messages_for_hooks = self._convert_contents_to_dict(formatted_content)
if not self._invoke_before_llm_call_hooks(
messages_for_hooks, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(messages_for_hooks, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return self._handle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return self._handle_completion(
if self.stream:
return self._handle_streaming_completion(
formatted_content,
config,
available_functions,
@@ -340,20 +328,29 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return self._handle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def acall(
self,
@@ -379,38 +376,28 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
self.tools = tools
effective_response_model = response_model or self.response_format
formatted_content, system_instruction = (
self._format_messages_for_gemini(messages)
)
formatted_content, system_instruction = self._format_messages_for_gemini(
messages
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
config = self._prepare_generation_config(
system_instruction, tools, effective_response_model
)
if self.stream:
return await self._ahandle_streaming_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
return await self._ahandle_completion(
if self.stream:
return await self._ahandle_streaming_completion(
formatted_content,
config,
available_functions,
@@ -419,20 +406,29 @@ class GeminiCompletion(BaseLLM):
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return await self._ahandle_completion(
formatted_content,
config,
available_functions,
from_task,
from_agent,
effective_response_model,
)
except APIError as e:
error_msg = f"Google Gemini API error: {e.code} - {e.message}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
except Exception as e:
error_msg = f"Google Gemini API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _prepare_generation_config(
self,

View File

@@ -17,7 +17,7 @@ from openai.types.responses import Response
from pydantic import BaseModel
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.base_llm import BaseLLM
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -382,35 +382,23 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if not self._invoke_before_llm_call_hooks(
formatted_messages, from_agent
):
raise ValueError("LLM call blocked by before_llm_call hook")
if not self._invoke_before_llm_call_hooks(formatted_messages, from_agent):
raise ValueError("LLM call blocked by before_llm_call hook")
if self.api == "responses":
return self._call_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return self._call_completions(
if self.api == "responses":
return self._call_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -419,13 +407,22 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return self._call_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
def _call_completions(
self,
@@ -482,30 +479,20 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
with llm_call_context():
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
try:
self._emit_call_started_event(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
formatted_messages = self._format_messages(messages)
formatted_messages = self._format_messages(messages)
if self.api == "responses":
return await self._acall_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return await self._acall_completions(
if self.api == "responses":
return await self._acall_responses(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
@@ -514,13 +501,22 @@ class OpenAICompletion(BaseLLM):
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
return await self._acall_completions(
messages=formatted_messages,
tools=tools,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
error_msg = f"OpenAI API call failed: {e!s}"
logging.error(error_msg)
self._emit_call_failed_event(
error=error_msg, from_task=from_task, from_agent=from_agent
)
raise
async def _acall_completions(
self,
@@ -1525,16 +1521,13 @@ class OpenAICompletion(BaseLLM):
) -> list[dict[str, Any]]:
"""Convert CrewAI tool format to OpenAI function calling format."""
from crewai.llms.providers.utils.common import safe_tool_conversion
from crewai.utilities.pydantic_schema_utils import (
force_additional_properties_false,
)
openai_tools = []
for tool in tools:
name, description, parameters = safe_tool_conversion(tool, "OpenAI")
openai_tool: dict[str, Any] = {
openai_tool = {
"type": "function",
"function": {
"name": name,
@@ -1544,11 +1537,10 @@ class OpenAICompletion(BaseLLM):
}
if parameters:
params_dict = (
parameters if isinstance(parameters, dict) else dict(parameters)
)
params_dict = force_additional_properties_false(params_dict)
openai_tool["function"]["parameters"] = params_dict
if isinstance(parameters, dict):
openai_tool["function"]["parameters"] = parameters # type: ignore
else:
openai_tool["function"]["parameters"] = dict(parameters)
openai_tools.append(openai_tool)
return openai_tools

View File

@@ -27,8 +27,6 @@ if TYPE_CHECKING:
from crewai import Agent, Task
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.crews.crew_output import CrewOutput
from crewai.hooks.llm_hooks import LLMCallHookContext
from crewai.hooks.tool_hooks import ToolCallHookContext
from crewai.project.wrappers import (
CrewInstance,
OutputJsonClass,
@@ -36,8 +34,6 @@ if TYPE_CHECKING:
)
from crewai.tasks.task_output import TaskOutput
_post_initialize_crew_hooks: list[Callable[[Any], None]] = []
class AgentConfig(TypedDict, total=False):
"""Type definition for agent configuration dictionary.
@@ -270,9 +266,6 @@ class CrewBaseMeta(type):
instance.map_all_agent_variables()
instance.map_all_task_variables()
for hook in _post_initialize_crew_hooks:
hook(instance)
original_methods = {
name: method
for name, method in cls.__dict__.items()
@@ -492,61 +485,47 @@ def _register_crew_hooks(instance: CrewInstance, cls: type) -> None:
if has_agent_filter:
agents_filter = hook_method._filter_agents
def make_filtered_before_llm(
bound_fn: Callable[[LLMCallHookContext], bool | None],
agents_list: list[str],
) -> Callable[[LLMCallHookContext], bool | None]:
def filtered(context: LLMCallHookContext) -> bool | None:
def make_filtered_before_llm(bound_fn, agents_list):
def filtered(context):
if context.agent and context.agent.role not in agents_list:
return None
return bound_fn(context)
return filtered
before_llm_hook = make_filtered_before_llm(bound_hook, agents_filter)
final_hook = make_filtered_before_llm(bound_hook, agents_filter)
else:
before_llm_hook = bound_hook
final_hook = bound_hook
register_before_llm_call_hook(before_llm_hook)
instance._registered_hook_functions.append(
("before_llm_call", before_llm_hook)
)
register_before_llm_call_hook(final_hook)
instance._registered_hook_functions.append(("before_llm_call", final_hook))
if hasattr(hook_method, "is_after_llm_call_hook"):
if has_agent_filter:
agents_filter = hook_method._filter_agents
def make_filtered_after_llm(
bound_fn: Callable[[LLMCallHookContext], str | None],
agents_list: list[str],
) -> Callable[[LLMCallHookContext], str | None]:
def filtered(context: LLMCallHookContext) -> str | None:
def make_filtered_after_llm(bound_fn, agents_list):
def filtered(context):
if context.agent and context.agent.role not in agents_list:
return None
return bound_fn(context)
return filtered
after_llm_hook = make_filtered_after_llm(bound_hook, agents_filter)
final_hook = make_filtered_after_llm(bound_hook, agents_filter)
else:
after_llm_hook = bound_hook
final_hook = bound_hook
register_after_llm_call_hook(after_llm_hook)
instance._registered_hook_functions.append(
("after_llm_call", after_llm_hook)
)
register_after_llm_call_hook(final_hook)
instance._registered_hook_functions.append(("after_llm_call", final_hook))
if hasattr(hook_method, "is_before_tool_call_hook"):
if has_tool_filter or has_agent_filter:
tools_filter = getattr(hook_method, "_filter_tools", None)
agents_filter = getattr(hook_method, "_filter_agents", None)
def make_filtered_before_tool(
bound_fn: Callable[[ToolCallHookContext], bool | None],
tools_list: list[str] | None,
agents_list: list[str] | None,
) -> Callable[[ToolCallHookContext], bool | None]:
def filtered(context: ToolCallHookContext) -> bool | None:
def make_filtered_before_tool(bound_fn, tools_list, agents_list):
def filtered(context):
if tools_list and context.tool_name not in tools_list:
return None
if (
@@ -559,28 +538,22 @@ def _register_crew_hooks(instance: CrewInstance, cls: type) -> None:
return filtered
before_tool_hook = make_filtered_before_tool(
final_hook = make_filtered_before_tool(
bound_hook, tools_filter, agents_filter
)
else:
before_tool_hook = bound_hook
final_hook = bound_hook
register_before_tool_call_hook(before_tool_hook)
instance._registered_hook_functions.append(
("before_tool_call", before_tool_hook)
)
register_before_tool_call_hook(final_hook)
instance._registered_hook_functions.append(("before_tool_call", final_hook))
if hasattr(hook_method, "is_after_tool_call_hook"):
if has_tool_filter or has_agent_filter:
tools_filter = getattr(hook_method, "_filter_tools", None)
agents_filter = getattr(hook_method, "_filter_agents", None)
def make_filtered_after_tool(
bound_fn: Callable[[ToolCallHookContext], str | None],
tools_list: list[str] | None,
agents_list: list[str] | None,
) -> Callable[[ToolCallHookContext], str | None]:
def filtered(context: ToolCallHookContext) -> str | None:
def make_filtered_after_tool(bound_fn, tools_list, agents_list):
def filtered(context):
if tools_list and context.tool_name not in tools_list:
return None
if (
@@ -593,16 +566,14 @@ def _register_crew_hooks(instance: CrewInstance, cls: type) -> None:
return filtered
after_tool_hook = make_filtered_after_tool(
final_hook = make_filtered_after_tool(
bound_hook, tools_filter, agents_filter
)
else:
after_tool_hook = bound_hook
final_hook = bound_hook
register_after_tool_call_hook(after_tool_hook)
instance._registered_hook_functions.append(
("after_tool_call", after_tool_hook)
)
register_after_tool_call_hook(final_hook)
instance._registered_hook_functions.append(("after_tool_call", final_hook))
instance._hooks_being_registered = False

View File

@@ -72,8 +72,6 @@ class CrewInstance(Protocol):
__crew_metadata__: CrewMetadata
_mcp_server_adapter: Any
_all_methods: dict[str, Callable[..., Any]]
_registered_hook_functions: list[tuple[str, Callable[..., Any]]]
_hooks_being_registered: bool
agents: list[Agent]
tasks: list[Task]
base_directory: Path

View File

@@ -31,7 +31,6 @@ from pydantic_core import PydanticCustomError
from typing_extensions import Self
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.core.providers.content_processor import process_content
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.task_events import (
TaskCompletedEvent,
@@ -497,7 +496,6 @@ class Task(BaseModel):
tools: list[BaseTool] | None = None,
) -> TaskOutput:
"""Execute the task synchronously."""
self.start_time = datetime.datetime.now()
return self._execute_core(agent, context, tools)
@property
@@ -538,7 +536,6 @@ class Task(BaseModel):
) -> None:
"""Execute the task asynchronously with context handling."""
try:
self.start_time = datetime.datetime.now()
result = self._execute_core(agent, context, tools)
future.set_result(result)
except Exception as e:
@@ -551,7 +548,6 @@ class Task(BaseModel):
tools: list[BaseTool] | None = None,
) -> TaskOutput:
"""Execute the task asynchronously using native async/await."""
self.start_time = datetime.datetime.now()
return await self._aexecute_core(agent, context, tools)
async def _aexecute_core(
@@ -570,6 +566,8 @@ class Task(BaseModel):
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
self.start_time = datetime.datetime.now()
self.prompt_context = context
tools = tools or self.tools or []
@@ -581,8 +579,6 @@ class Task(BaseModel):
tools=tools,
)
self._post_agent_execution(agent)
if not self._guardrails and not self._guardrail:
pydantic_output, json_output = self._export_output(result)
else:
@@ -665,6 +661,8 @@ class Task(BaseModel):
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
self.start_time = datetime.datetime.now()
self.prompt_context = context
tools = tools or self.tools or []
@@ -676,8 +674,6 @@ class Task(BaseModel):
tools=tools,
)
self._post_agent_execution(agent)
if not self._guardrails and not self._guardrail:
pydantic_output, json_output = self._export_output(result)
else:
@@ -745,9 +741,6 @@ class Task(BaseModel):
finally:
clear_task_files(self.id)
def _post_agent_execution(self, agent: BaseAgent) -> None:
pass
def prompt(self) -> str:
"""Generates the task prompt with optional markdown formatting.
@@ -870,11 +863,6 @@ Follow these guidelines:
except ValueError as e:
raise ValueError(f"Error interpolating description: {e!s}") from e
self.description = process_content(self.description, {"task": self})
self._original_expected_output = process_content(
self._original_expected_output, {"task": self}
)
try:
self.expected_output = interpolate_only(
input_string=self._original_expected_output, inputs=inputs

View File

@@ -6,7 +6,6 @@ Classes:
HallucinationGuardrail: Placeholder guardrail that validates task outputs.
"""
from collections.abc import Callable
from typing import Any
from crewai.llm import LLM
@@ -14,36 +13,32 @@ from crewai.tasks.task_output import TaskOutput
from crewai.utilities.logger import Logger
_validate_output_hook: Callable[..., tuple[bool, Any]] | None = None
class HallucinationGuardrail:
"""Placeholder for the HallucinationGuardrail feature.
Attributes:
context: Optional reference context that outputs would be checked against.
context: The reference context that outputs would be checked against.
llm: The language model that would be used for evaluation.
threshold: Optional minimum faithfulness score that would be required to pass.
tool_response: Optional tool response information that would be used in evaluation.
Examples:
>>> # Basic usage without context (uses task expected_output as context)
>>> guardrail = HallucinationGuardrail(llm=agent.llm)
>>> # With context for reference
>>> # Basic usage with default verdict logic
>>> guardrail = HallucinationGuardrail(
... llm=agent.llm,
... context="AI helps with various tasks including analysis and generation.",
... llm=agent.llm,
... )
>>> # With custom threshold for stricter validation
>>> strict_guardrail = HallucinationGuardrail(
... context="Quantum computing uses qubits in superposition.",
... llm=agent.llm,
... threshold=8.0, # Require score >= 8 to pass
... threshold=8.0, # Would require score >= 8 to pass in enterprise version
... )
>>> # With tool response for additional context
>>> guardrail_with_tools = HallucinationGuardrail(
... context="The current weather data",
... llm=agent.llm,
... tool_response="Weather API returned: Temperature 22°C, Humidity 65%",
... )
@@ -51,17 +46,16 @@ class HallucinationGuardrail:
def __init__(
self,
context: str,
llm: LLM,
context: str | None = None,
threshold: float | None = None,
tool_response: str = "",
):
"""Initialize the HallucinationGuardrail placeholder.
Args:
context: The reference context that outputs would be checked against.
llm: The language model that would be used for evaluation.
context: Optional reference context that outputs would be checked against.
If not provided, the task's expected_output will be used as context.
threshold: Optional minimum faithfulness score that would be required to pass.
tool_response: Optional tool response information that would be used in evaluation.
"""
@@ -84,17 +78,16 @@ class HallucinationGuardrail:
def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]:
"""Validate a task output against hallucination criteria.
In the open source, this method always returns that the output is valid.
Args:
task_output: The output to be validated.
Returns:
A tuple containing:
- True if validation passed, False otherwise
- The raw task output if valid, or error feedback if invalid
- True
- The raw task output
"""
if callable(_validate_output_hook):
return _validate_output_hook(self, task_output)
self._logger.log(
"warning",
"Premium hallucination detection skipped (use for free at https://app.crewai.com)\n",

View File

@@ -1,10 +1,6 @@
import asyncio
from collections.abc import Coroutine
import inspect
from typing import Any
from pydantic import BaseModel, Field
from typing_extensions import TypeIs
from crewai.agent import Agent
from crewai.lite_agent_output import LiteAgentOutput
@@ -12,13 +8,6 @@ from crewai.llms.base_llm import BaseLLM
from crewai.tasks.task_output import TaskOutput
def _is_coroutine(
obj: LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput],
) -> TypeIs[Coroutine[Any, Any, LiteAgentOutput]]:
"""Check if obj is a coroutine for type narrowing."""
return inspect.iscoroutine(obj)
class LLMGuardrailResult(BaseModel):
valid: bool = Field(
description="Whether the task output complies with the guardrail"
@@ -73,10 +62,7 @@ class LLMGuardrail:
- If the Task result complies with the guardrail, saying that is valid
"""
kickoff_result = agent.kickoff(query, response_format=LLMGuardrailResult)
if _is_coroutine(kickoff_result):
return asyncio.run(kickoff_result)
return kickoff_result
return agent.kickoff(query, response_format=LLMGuardrailResult)
def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]:
"""Validates the output of a task based on specified criteria.

View File

@@ -903,7 +903,7 @@ class Telemetry:
{
"id": str(task.id),
"description": task.description,
"output": task.output.raw if task.output else "",
"output": task.output.raw_output,
}
for task in crew.tasks
]
@@ -923,9 +923,6 @@ class Telemetry:
value: The attribute value.
"""
if span is None:
return
def _operation() -> None:
return span.set_attribute(key, value)

View File

@@ -270,7 +270,6 @@ class ToolUsage:
result = None # type: ignore
should_retry = False
available_tool = None
error_event_emitted = False
try:
if self.tools_handler and self.tools_handler.cache:
@@ -409,7 +408,6 @@ class ToolUsage:
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
error_event_emitted = True
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
@@ -437,7 +435,7 @@ class ToolUsage:
result = self._format_result(result=result)
finally:
if started_event_emitted and not error_event_emitted:
if started_event_emitted:
self.on_tool_use_finished(
tool=tool,
tool_calling=calling,
@@ -502,7 +500,6 @@ class ToolUsage:
result = None # type: ignore
should_retry = False
available_tool = None
error_event_emitted = False
try:
if self.tools_handler and self.tools_handler.cache:
@@ -641,7 +638,6 @@ class ToolUsage:
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
error_event_emitted = True
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
@@ -669,7 +665,7 @@ class ToolUsage:
result = self._format_result(result=result)
finally:
if started_event_emitted and not error_event_emitted:
if started_event_emitted:
self.on_tool_use_finished(
tool=tool,
tool_calling=calling,

View File

@@ -42,8 +42,6 @@ if TYPE_CHECKING:
from crewai.llm import LLM
from crewai.task import Task
_create_plus_client_hook: Callable[[], Any] | None = None
class SummaryContent(TypedDict):
"""Structure for summary content entries.
@@ -93,11 +91,7 @@ def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]:
for tool in tools:
if isinstance(tool, CrewAITool):
structured_tool = tool.to_structured_tool()
structured_tool.current_usage_count = 0
if structured_tool._original_tool:
structured_tool._original_tool.current_usage_count = 0
tools_list.append(structured_tool)
tools_list.append(tool.to_structured_tool())
else:
raise ValueError("Tool is not a CrewStructuredTool or BaseTool")
@@ -824,13 +818,10 @@ def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
if from_repository:
import importlib
if callable(_create_plus_client_hook):
client = _create_plus_client_hook()
else:
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.plus_api import PlusAPI
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.plus_api import PlusAPI
client = PlusAPI(api_key=get_auth_token())
client = PlusAPI(api_key=get_auth_token())
_print_current_organization()
response = client.get_agent(from_repository)
if response.status_code == 404:

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
from pydantic import BaseModel, Field, InstanceOf
from rich.box import HEAVY_EDGE
@@ -36,13 +36,7 @@ class CrewEvaluator:
iteration: The current iteration of the evaluation.
"""
def __init__(
self,
crew: Crew,
eval_llm: InstanceOf[BaseLLM] | str | None = None,
openai_model_name: str | None = None,
llm: InstanceOf[BaseLLM] | str | None = None,
) -> None:
def __init__(self, crew: Crew, eval_llm: InstanceOf[BaseLLM]) -> None:
self.crew = crew
self.llm = eval_llm
self.tasks_scores: defaultdict[int, list[float]] = defaultdict(list)
@@ -92,9 +86,7 @@ class CrewEvaluator:
"""
self.iteration = iteration
def print_crew_evaluation_result(
self, token_usage: list[dict[str, Any]] | None = None
) -> None:
def print_crew_evaluation_result(self) -> None:
"""
Prints the evaluation result of the crew in a table.
A Crew with 2 tasks using the command crewai test -n 3
@@ -212,7 +204,7 @@ class CrewEvaluator:
CrewTestResultEvent(
quality=quality_score,
execution_duration=current_task.execution_duration,
model=getattr(self.llm, "model", str(self.llm)),
model=self.llm.model,
crew_name=self.crew.name,
crew=self.crew,
),

View File

@@ -4,8 +4,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Final, Literal, NamedTuple
from crewai.events.utils.console_formatter import should_suppress_console_output
if TYPE_CHECKING:
from _typeshed import SupportsWrite
@@ -79,8 +77,6 @@ class Printer:
file: A file-like object (stream); defaults to the current sys.stdout.
flush: Whether to forcibly flush the stream.
"""
if should_suppress_console_output():
return
if isinstance(content, str):
content = [ColoredText(content, color)]
print(

View File

@@ -19,10 +19,9 @@ from collections.abc import Callable
from copy import deepcopy
import datetime
import logging
from typing import TYPE_CHECKING, Annotated, Any, Final, Literal, TypedDict, Union
from typing import TYPE_CHECKING, Annotated, Any, Literal, Union
import uuid
import jsonref # type: ignore[import-untyped]
from pydantic import (
UUID1,
UUID3,
@@ -70,21 +69,6 @@ else:
EmailStr = str
class JsonSchemaInfo(TypedDict):
"""Inner structure for JSON schema metadata."""
name: str
strict: Literal[True]
schema: dict[str, Any]
class ModelDescription(TypedDict):
"""Return type for generate_model_description."""
type: Literal["json_schema"]
json_schema: JsonSchemaInfo
def resolve_refs(schema: dict[str, Any]) -> dict[str, Any]:
"""Recursively resolve all local $refs in the given JSON Schema using $defs as the source.
@@ -143,102 +127,6 @@ def add_key_in_dict_recursively(
return d
def force_additional_properties_false(d: Any) -> Any:
"""Force additionalProperties=false on all object-type dicts recursively.
OpenAI strict mode requires all objects to have additionalProperties=false.
This function overwrites any existing value to ensure compliance.
Also ensures objects have properties and required arrays, even if empty,
as OpenAI strict mode requires these for all object types.
Args:
d: The dictionary/list to modify.
Returns:
The modified dictionary/list.
"""
if isinstance(d, dict):
if d.get("type") == "object":
d["additionalProperties"] = False
if "properties" not in d:
d["properties"] = {}
if "required" not in d:
d["required"] = []
for v in d.values():
force_additional_properties_false(v)
elif isinstance(d, list):
for i in d:
force_additional_properties_false(i)
return d
OPENAI_SUPPORTED_FORMATS: Final[
set[Literal["date-time", "date", "time", "duration"]]
] = {
"date-time",
"date",
"time",
"duration",
}
def strip_unsupported_formats(d: Any) -> Any:
"""Remove format annotations that OpenAI strict mode doesn't support.
OpenAI only supports: date-time, date, time, duration.
Other formats like uri, email, uuid etc. cause validation errors.
Args:
d: The dictionary/list to modify.
Returns:
The modified dictionary/list.
"""
if isinstance(d, dict):
format_value = d.get("format")
if (
isinstance(format_value, str)
and format_value not in OPENAI_SUPPORTED_FORMATS
):
del d["format"]
for v in d.values():
strip_unsupported_formats(v)
elif isinstance(d, list):
for i in d:
strip_unsupported_formats(i)
return d
def ensure_type_in_schemas(d: Any) -> Any:
"""Ensure all schema objects in anyOf/oneOf have a 'type' key.
OpenAI strict mode requires every schema to have a 'type' key.
Empty schemas {} in anyOf/oneOf are converted to {"type": "object"}.
Args:
d: The dictionary/list to modify.
Returns:
The modified dictionary/list.
"""
if isinstance(d, dict):
for key in ("anyOf", "oneOf"):
if key in d:
schema_list = d[key]
for i, schema in enumerate(schema_list):
if isinstance(schema, dict) and schema == {}:
schema_list[i] = {"type": "object"}
else:
ensure_type_in_schemas(schema)
for v in d.values():
ensure_type_in_schemas(v)
elif isinstance(d, list):
for item in d:
ensure_type_in_schemas(item)
return d
def fix_discriminator_mappings(schema: dict[str, Any]) -> dict[str, Any]:
"""Replace '#/$defs/...' references in discriminator.mapping with just the model name.
@@ -375,49 +263,7 @@ def ensure_all_properties_required(schema: dict[str, Any]) -> dict[str, Any]:
return schema
def strip_null_from_types(schema: dict[str, Any]) -> dict[str, Any]:
"""Remove null type from anyOf/type arrays.
Pydantic generates `T | None` for optional fields, which creates schemas with
null in the type. However, for MCP tools, optional fields should be omitted
entirely rather than sent as null. This function strips null from types.
Args:
schema: JSON schema dictionary.
Returns:
Modified schema with null types removed.
"""
if isinstance(schema, dict):
if "anyOf" in schema:
any_of = schema["anyOf"]
non_null = [opt for opt in any_of if opt.get("type") != "null"]
if len(non_null) == 1:
schema.pop("anyOf")
schema.update(non_null[0])
elif len(non_null) > 1:
schema["anyOf"] = non_null
type_value = schema.get("type")
if isinstance(type_value, list) and "null" in type_value:
non_null_types = [t for t in type_value if t != "null"]
if len(non_null_types) == 1:
schema["type"] = non_null_types[0]
elif len(non_null_types) > 1:
schema["type"] = non_null_types
for value in schema.values():
if isinstance(value, dict):
strip_null_from_types(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
strip_null_from_types(item)
return schema
def generate_model_description(model: type[BaseModel]) -> ModelDescription:
def generate_model_description(model: type[BaseModel]) -> dict[str, Any]:
"""Generate JSON schema description of a Pydantic model.
This function takes a Pydantic model class and returns its JSON schema,
@@ -428,13 +274,17 @@ def generate_model_description(model: type[BaseModel]) -> ModelDescription:
model: A Pydantic model class.
Returns:
A ModelDescription with JSON schema representation of the model.
A JSON schema dictionary representation of the model.
"""
json_schema = model.model_json_schema(ref_template="#/$defs/{model}")
json_schema = force_additional_properties_false(json_schema)
json_schema = strip_unsupported_formats(json_schema)
json_schema = ensure_type_in_schemas(json_schema)
json_schema = add_key_in_dict_recursively(
json_schema,
key="additionalProperties",
value=False,
criteria=lambda d: d.get("type") == "object"
and "additionalProperties" not in d,
)
json_schema = resolve_refs(json_schema)
@@ -442,7 +292,6 @@ def generate_model_description(model: type[BaseModel]) -> ModelDescription:
json_schema = fix_discriminator_mappings(json_schema)
json_schema = convert_oneof_to_anyof(json_schema)
json_schema = ensure_all_properties_required(json_schema)
json_schema = strip_null_from_types(json_schema)
return {
"type": "json_schema",
@@ -527,19 +376,14 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
>>> person.name
'John'
"""
json_schema = dict(jsonref.replace_refs(json_schema, proxies=False))
effective_root = root_schema or json_schema
json_schema = force_additional_properties_false(json_schema)
effective_root = force_additional_properties_false(effective_root)
if "allOf" in json_schema:
json_schema = _merge_all_of_schemas(json_schema["allOf"], effective_root)
if "title" not in json_schema and "title" in (root_schema or {}):
json_schema["title"] = (root_schema or {}).get("title")
model_name = json_schema.get("title") or "DynamicModel"
model_name = json_schema.get("title", "DynamicModel")
field_definitions = {
name: _json_schema_to_pydantic_field(
name, prop, json_schema.get("required", []), effective_root
@@ -547,11 +391,9 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
for name, prop in (json_schema.get("properties", {}) or {}).items()
}
effective_config = __config__ or ConfigDict(extra="forbid")
return create_model_base(
model_name,
__config__=effective_config,
__config__=__config__,
__base__=__base__,
__module__=__module__,
__validators__=__validators__,
@@ -730,10 +572,8 @@ def _json_schema_to_pydantic_type(
any_of_schemas = json_schema.get("anyOf", []) + json_schema.get("oneOf", [])
if any_of_schemas:
any_of_types = [
_json_schema_to_pydantic_type(
schema, root_schema, name_=f"{name_ or 'Union'}Option{i}"
)
for i, schema in enumerate(any_of_schemas)
_json_schema_to_pydantic_type(schema, root_schema)
for schema in any_of_schemas
]
return Union[tuple(any_of_types)] # noqa: UP007
@@ -769,7 +609,7 @@ def _json_schema_to_pydantic_type(
if properties:
json_schema_ = json_schema.copy()
if json_schema_.get("title") is None:
json_schema_["title"] = name_ or "DynamicModel"
json_schema_["title"] = name_
return create_model_from_schema(json_schema_, root_schema=root_schema)
return dict
if type_ == "null":

View File

@@ -19,7 +19,6 @@ def to_serializable(
exclude: set[str] | None = None,
max_depth: int = 5,
_current_depth: int = 0,
_ancestors: set[int] | None = None,
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.
@@ -32,7 +31,6 @@ def to_serializable(
exclude: Set of keys to exclude from the result.
max_depth: Maximum recursion depth. Defaults to 5.
_current_depth: Current recursion depth (for internal use).
_ancestors: Set of ancestor object ids for cycle detection (for internal use).
Returns:
Serializable: A JSON-compatible structure.
@@ -43,29 +41,16 @@ def to_serializable(
if exclude is None:
exclude = set()
if _ancestors is None:
_ancestors = set()
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
if isinstance(obj, uuid.UUID):
return str(obj)
if isinstance(obj, (date, datetime)):
return obj.isoformat()
object_id = id(obj)
if object_id in _ancestors:
return f"<circular_ref:{type(obj).__name__}>"
new_ancestors = _ancestors | {object_id}
if isinstance(obj, (list, tuple, set)):
return [
to_serializable(
item,
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
item, max_depth=max_depth, _current_depth=_current_depth + 1
)
for item in obj
]
@@ -76,7 +61,6 @@ def to_serializable(
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
for key, value in obj.items()
if key not in exclude
@@ -87,16 +71,12 @@ def to_serializable(
obj=obj.model_dump(exclude=exclude),
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
except Exception:
try:
return {
_to_serializable_key(k): to_serializable(
v,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
v, max_depth=max_depth, _current_depth=_current_depth + 1
)
for k, v in obj.__dict__.items()
if k not in (exclude or set())

View File

@@ -51,10 +51,6 @@ class ConcreteAgentAdapter(BaseAgentAdapter):
# Dummy implementation for MCP tools
return []
def configure_structured_output(self, task: Any) -> None:
# Dummy implementation for structured output
pass
async def aexecute_task(
self,
task: Any,

View File

@@ -703,8 +703,6 @@ def test_agent_definition_based_on_dict():
# test for human input
@pytest.mark.vcr()
def test_agent_human_input():
from crewai.core.providers.human_input import SyncHumanInputProvider
# Agent configuration
config = {
"role": "test role",
@@ -722,7 +720,7 @@ def test_agent_human_input():
human_input=True,
)
# Side effect function for _prompt_input to simulate multiple feedback iterations
# Side effect function for _ask_human_input to simulate multiple feedback iterations
feedback_responses = iter(
[
"Don't say hi, say Hello instead!", # First feedback: instruct change
@@ -730,16 +728,16 @@ def test_agent_human_input():
]
)
def prompt_input_side_effect(*args, **kwargs):
def ask_human_input_side_effect(*args, **kwargs):
return next(feedback_responses)
# Patch both _prompt_input on provider and _invoke_loop to avoid real API/network calls.
# Patch both _ask_human_input and _invoke_loop to avoid real API/network calls.
with (
patch.object(
SyncHumanInputProvider,
"_prompt_input",
side_effect=prompt_input_side_effect,
) as mock_prompt_input,
CrewAgentExecutor,
"_ask_human_input",
side_effect=ask_human_input_side_effect,
) as mock_human_input,
patch.object(
CrewAgentExecutor,
"_invoke_loop",
@@ -751,7 +749,7 @@ def test_agent_human_input():
# Assertions to ensure the agent behaves correctly.
# It should have requested feedback twice.
assert mock_prompt_input.call_count == 2
assert mock_human_input.call_count == 2
# The final result should be processed to "Hello"
assert output.strip().lower() == "hello"

View File

@@ -235,13 +235,8 @@ class TestAsyncAgentExecutor:
mock_crew: MagicMock, mock_tools_handler: MagicMock
) -> None:
"""Test that multiple ainvoke calls can run concurrently."""
max_concurrent = 0
current_concurrent = 0
lock = asyncio.Lock()
async def create_and_run_executor(executor_id: int) -> dict[str, Any]:
nonlocal max_concurrent, current_concurrent
executor = CrewAgentExecutor(
llm=mock_llm,
task=mock_task,
@@ -257,13 +252,7 @@ class TestAsyncAgentExecutor:
)
async def delayed_response(*args: Any, **kwargs: Any) -> str:
nonlocal max_concurrent, current_concurrent
async with lock:
current_concurrent += 1
max_concurrent = max(max_concurrent, current_concurrent)
await asyncio.sleep(0.01)
async with lock:
current_concurrent -= 1
await asyncio.sleep(0.05)
return f"Thought: Done\nFinal Answer: Result from executor {executor_id}"
with patch(
@@ -284,15 +273,19 @@ class TestAsyncAgentExecutor:
}
)
import time
start = time.time()
results = await asyncio.gather(
create_and_run_executor(1),
create_and_run_executor(2),
create_and_run_executor(3),
)
elapsed = time.time() - start
assert len(results) == 3
assert all("output" in r for r in results)
assert max_concurrent > 1, f"Expected concurrent execution, max concurrent was {max_concurrent}"
assert elapsed < 0.15, f"Expected concurrent execution, took {elapsed}s"
class TestAsyncLLMResponseHelper:

View File

@@ -1,102 +0,0 @@
interactions:
- request:
body: '{"max_tokens":4096,"messages":[{"role":"user","content":"Calculate 5 +
3 using the simple_calculator tool with operation ''add''."}],"model":"claude-3-5-haiku-20241022","stream":false,"tool_choice":{"type":"tool","name":"simple_calculator"},"tools":[{"name":"simple_calculator","description":"Perform
simple math operations","input_schema":{"type":"object","properties":{"operation":{"type":"string","enum":["add","multiply"],"description":"The
operation to perform"},"a":{"type":"integer","description":"First number"},"b":{"type":"integer","description":"Second
number"}},"required":["operation","a","b"]}}]}'
headers:
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
anthropic-version:
- '2023-06-01'
connection:
- keep-alive
content-length:
- '608'
content-type:
- application/json
host:
- api.anthropic.com
user-agent:
- X-USER-AGENT-XXX
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 0.73.0
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
x-stainless-timeout:
- NOT_GIVEN
method: POST
uri: https://api.anthropic.com/v1/messages
response:
body:
string: '{"model":"claude-3-5-haiku-20241022","id":"msg_01Q2F83aAeqqTCxsd8WpZjK7","type":"message","role":"assistant","content":[{"type":"tool_use","id":"toolu_01BW4XkHnhRVM5JZsvoaQKw5","name":"simple_calculator","input":{"operation":"add","a":5,"b":3}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":498,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":67,"service_tier":"standard"}}'
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Security-Policy:
- CSP-FILTERED
Content-Type:
- application/json
Date:
- Tue, 03 Feb 2026 23:26:35 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Robots-Tag:
- none
anthropic-organization-id:
- ANTHROPIC-ORGANIZATION-ID-XXX
anthropic-ratelimit-input-tokens-limit:
- ANTHROPIC-RATELIMIT-INPUT-TOKENS-LIMIT-XXX
anthropic-ratelimit-input-tokens-remaining:
- ANTHROPIC-RATELIMIT-INPUT-TOKENS-REMAINING-XXX
anthropic-ratelimit-input-tokens-reset:
- ANTHROPIC-RATELIMIT-INPUT-TOKENS-RESET-XXX
anthropic-ratelimit-output-tokens-limit:
- ANTHROPIC-RATELIMIT-OUTPUT-TOKENS-LIMIT-XXX
anthropic-ratelimit-output-tokens-remaining:
- ANTHROPIC-RATELIMIT-OUTPUT-TOKENS-REMAINING-XXX
anthropic-ratelimit-output-tokens-reset:
- ANTHROPIC-RATELIMIT-OUTPUT-TOKENS-RESET-XXX
anthropic-ratelimit-requests-limit:
- '4000'
anthropic-ratelimit-requests-remaining:
- '3999'
anthropic-ratelimit-requests-reset:
- '2026-02-03T23:26:34Z'
anthropic-ratelimit-tokens-limit:
- ANTHROPIC-RATELIMIT-TOKENS-LIMIT-XXX
anthropic-ratelimit-tokens-remaining:
- ANTHROPIC-RATELIMIT-TOKENS-REMAINING-XXX
anthropic-ratelimit-tokens-reset:
- ANTHROPIC-RATELIMIT-TOKENS-RESET-XXX
cf-cache-status:
- DYNAMIC
request-id:
- REQUEST-ID-XXX
strict-transport-security:
- STS-XXX
x-envoy-upstream-service-time:
- '1228'
status:
code: 200
message: OK
version: 1

View File

@@ -1,108 +0,0 @@
interactions:
- request:
body: '{"max_tokens":4096,"messages":[{"role":"user","content":"Create a simple
plan to say hello. Use the create_reasoning_plan tool."}],"model":"claude-3-5-haiku-20241022","stream":false,"tool_choice":{"type":"tool","name":"create_reasoning_plan"},"tools":[{"name":"create_reasoning_plan","description":"Create
a structured reasoning plan for completing a task","input_schema":{"type":"object","properties":{"plan":{"type":"string","description":"High-level
plan description"},"steps":{"type":"array","items":{"type":"object"},"description":"List
of steps to execute"},"ready":{"type":"boolean","description":"Whether the plan
is ready to execute"}},"required":["plan","steps","ready"]}}]}'
headers:
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
anthropic-version:
- '2023-06-01'
connection:
- keep-alive
content-length:
- '684'
content-type:
- application/json
host:
- api.anthropic.com
user-agent:
- X-USER-AGENT-XXX
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 0.73.0
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
x-stainless-timeout:
- NOT_GIVEN
method: POST
uri: https://api.anthropic.com/v1/messages
response:
body:
string: '{"model":"claude-3-5-haiku-20241022","id":"msg_01HLuGgGRFseMdhTYAhkKtfz","type":"message","role":"assistant","content":[{"type":"tool_use","id":"toolu_01GQAUFHffGzMd3ufA6YRMZF","name":"create_reasoning_plan","input":{"plan":"Say
hello in a friendly and straightforward manner","steps":[{"description":"Take
a deep breath","action":"Pause and relax"},{"description":"Smile","action":"Prepare
a warm facial expression"},{"description":"Greet the person","action":"Say
''Hello!''"},{"description":"Wait for response","action":"Listen and be ready
to continue conversation"}],"ready":true}}],"stop_reason":"tool_use","stop_sequence":null,"usage":{"input_tokens":513,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":162,"service_tier":"standard"}}'
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Security-Policy:
- CSP-FILTERED
Content-Type:
- application/json
Date:
- Tue, 03 Feb 2026 23:26:38 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Robots-Tag:
- none
anthropic-organization-id:
- ANTHROPIC-ORGANIZATION-ID-XXX
anthropic-ratelimit-input-tokens-limit:
- ANTHROPIC-RATELIMIT-INPUT-TOKENS-LIMIT-XXX
anthropic-ratelimit-input-tokens-remaining:
- ANTHROPIC-RATELIMIT-INPUT-TOKENS-REMAINING-XXX
anthropic-ratelimit-input-tokens-reset:
- ANTHROPIC-RATELIMIT-INPUT-TOKENS-RESET-XXX
anthropic-ratelimit-output-tokens-limit:
- ANTHROPIC-RATELIMIT-OUTPUT-TOKENS-LIMIT-XXX
anthropic-ratelimit-output-tokens-remaining:
- ANTHROPIC-RATELIMIT-OUTPUT-TOKENS-REMAINING-XXX
anthropic-ratelimit-output-tokens-reset:
- ANTHROPIC-RATELIMIT-OUTPUT-TOKENS-RESET-XXX
anthropic-ratelimit-requests-limit:
- '4000'
anthropic-ratelimit-requests-remaining:
- '3999'
anthropic-ratelimit-requests-reset:
- '2026-02-03T23:26:35Z'
anthropic-ratelimit-tokens-limit:
- ANTHROPIC-RATELIMIT-TOKENS-LIMIT-XXX
anthropic-ratelimit-tokens-remaining:
- ANTHROPIC-RATELIMIT-TOKENS-REMAINING-XXX
anthropic-ratelimit-tokens-reset:
- ANTHROPIC-RATELIMIT-TOKENS-RESET-XXX
cf-cache-status:
- DYNAMIC
request-id:
- REQUEST-ID-XXX
strict-transport-security:
- STS-XXX
x-envoy-upstream-service-time:
- '2994'
status:
code: 200
message: OK
version: 1

View File

@@ -1,108 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpUSxS5LeHwDTELElWlC5CDMzmr\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437564,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi there! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 10,\n \"total_tokens\": 19,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:05 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '460'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '477'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,215 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '71'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpStmyOpe9DrthWBlDdMZfVMJ1u\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hi! How can I assist you today?\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 9,\n \"total_tokens\": 18,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:02 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '415'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '434'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"user","content":"Say bye"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '72'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2HpS1DP0Xd3tmWt5PBincVrdU7yw\",\n \"object\":
\"chat.completion\",\n \"created\": 1769437562,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Goodbye! If you have more questions
in the future, feel free to reach out. Have a great day!\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
9,\n \"completion_tokens\": 23,\n \"total_tokens\": 32,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Mon, 26 Jan 2026 14:26:03 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '964'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '979'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,143 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Say hi"}],"model":"gpt-4o-mini","stream":true,"stream_options":{"include_usage":true}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '125'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.0
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: 'data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"rVIyGQF2E"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"Hi"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"ZGVqV7ZDm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"vnfm7IxlIB"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
How"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"o8F35ZZ"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
can"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"kiBzGe3"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
I"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"cbGT2RWgx"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
assist"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"DtxR"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
you"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"6y6Co8J"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"
today"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"SZOmm"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}],"usage":null,"obfuscation":"s9Bc0HqlPg"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}],"usage":null,"obfuscation":"u9aar"}
data: {"id":"chatcmpl-D2HpUGTvIFKBsR9Xd6XRT4AuFXzbz","object":"chat.completion.chunk","created":1769437564,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_29330a9688","choices":[],"usage":{"prompt_tokens":9,"completion_tokens":9,"total_tokens":18,"prompt_tokens_details":{"cached_tokens":0,"audio_tokens":0},"completion_tokens_details":{"reasoning_tokens":0,"audio_tokens":0,"accepted_prediction_tokens":0,"rejected_prediction_tokens":0}},"obfuscation":"5hudm8ySqh39"}
data: [DONE]
'
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- text/event-stream; charset=utf-8
Date:
- Mon, 26 Jan 2026 14:26:04 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '260'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '275'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -296,23 +296,6 @@ def test_create_folder_structure_folder_name_validation():
shutil.rmtree(folder_path)
def test_create_folder_structure_rejects_reserved_names():
"""Test that reserved script names are rejected to prevent pyproject.toml conflicts."""
with tempfile.TemporaryDirectory() as temp_dir:
reserved_names = ["test", "train", "replay", "run_crew", "run_with_trigger"]
for reserved_name in reserved_names:
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(reserved_name, parent_folder=temp_dir)
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(f"{reserved_name}/", parent_folder=temp_dir)
capitalized = reserved_name.capitalize()
with pytest.raises(ValueError, match="which is reserved"):
create_folder_structure(capitalized, parent_folder=temp_dir)
@mock.patch("crewai.cli.create_crew.create_folder_structure")
@mock.patch("crewai.cli.create_crew.copy_template")
@mock.patch("crewai.cli.create_crew.load_env_vars")

View File

@@ -1,20 +1,10 @@
"""Test for version management."""
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai import __version__
from crewai.cli.version import (
_get_cache_file,
_is_cache_valid,
get_crewai_version,
get_latest_version_from_pypi,
is_newer_version_available,
)
from crewai.cli.version import get_crewai_version
def test_dynamic_versioning_consistency() -> None:
def test_dynamic_versioning_consistency():
"""Test that dynamic versioning provides consistent version across all access methods."""
cli_version = get_crewai_version()
package_version = __version__
@@ -25,186 +15,3 @@ def test_dynamic_versioning_consistency() -> None:
# Version should not be empty
assert package_version is not None
assert len(package_version.strip()) > 0
class TestVersionChecking:
"""Test version checking utilities."""
def test_get_crewai_version(self) -> None:
"""Test getting current crewai version."""
version = get_crewai_version()
assert isinstance(version, str)
assert len(version) > 0
def test_get_cache_file(self) -> None:
"""Test cache file path generation."""
cache_file = _get_cache_file()
assert isinstance(cache_file, Path)
assert cache_file.name == "version_cache.json"
def test_is_cache_valid_with_fresh_cache(self) -> None:
"""Test cache validation with fresh cache."""
cache_data = {"timestamp": datetime.now().isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is True
def test_is_cache_valid_with_stale_cache(self) -> None:
"""Test cache validation with stale cache."""
old_time = datetime.now() - timedelta(hours=25)
cache_data = {"timestamp": old_time.isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
def test_is_cache_valid_with_missing_timestamp(self) -> None:
"""Test cache validation with missing timestamp."""
cache_data = {"version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
@patch("crewai.cli.version.Path.exists")
@patch("crewai.cli.version.request.urlopen")
def test_get_latest_version_from_pypi_success(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test successful PyPI version fetch."""
# Mock cache not existing to force fetch from PyPI
mock_exists.return_value = False
mock_response = MagicMock()
mock_response.read.return_value = b'{"info": {"version": "2.0.0"}}'
mock_urlopen.return_value.__enter__.return_value = mock_response
version = get_latest_version_from_pypi()
assert version == "2.0.0"
@patch("crewai.cli.version.Path.exists")
@patch("crewai.cli.version.request.urlopen")
def test_get_latest_version_from_pypi_failure(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test PyPI version fetch failure."""
from urllib.error import URLError
# Mock cache not existing to force fetch from PyPI
mock_exists.return_value = False
mock_urlopen.side_effect = URLError("Network error")
version = get_latest_version_from_pypi()
assert version is None
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_true(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when newer version is available."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is True
assert current == "1.0.0"
assert latest == "2.0.0"
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_false(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when no newer version is available."""
mock_current.return_value = "2.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "2.0.0"
assert latest == "2.0.0"
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_with_none_latest(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when PyPI fetch fails."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = None
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "1.0.0"
assert latest is None
class TestConsoleFormatterVersionCheck:
"""Test version check display in ConsoleFormatter."""
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": ""})
def test_version_message_shows_when_update_available_and_verbose(
self, mock_check: MagicMock
) -> None:
"""Test version message shows when update available and verbose enabled."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
assert mock_print.call_count == 2
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
def test_version_message_hides_when_verbose_false(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when verbose disabled."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=False)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
def test_version_message_hides_when_no_update_available(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when no update available."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (False, "2.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": "true"})
def test_version_message_hides_in_ci_environment(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when running in CI/CD."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": "1"})
def test_version_message_hides_in_ci_environment_with_numeric_value(
self, mock_check: MagicMock
) -> None:
"""Test version message hidden when CI=1."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()

View File

@@ -177,40 +177,4 @@ class TestTriggeredByScope:
raise ValueError("test error")
except ValueError:
pass
assert get_triggering_event_id() is None
def test_agent_scope_preserved_after_tool_error_event() -> None:
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
push_event_scope("crew-1", "crew_kickoff_started")
push_event_scope("task-1", "task_started")
push_event_scope("agent-1", "agent_execution_started")
crewai_event_bus.emit(
None,
ToolUsageStartedEvent(
tool_name="test_tool",
tool_args={},
agent_key="test_agent",
)
)
crewai_event_bus.emit(
None,
ToolUsageErrorEvent(
tool_name="test_tool",
tool_args={},
agent_key="test_agent",
error=ValueError("test error"),
)
)
crewai_event_bus.flush()
assert get_current_parent_id() == "agent-1"
assert get_triggering_event_id() is None

View File

@@ -45,6 +45,85 @@ def test_anthropic_completion_is_used_when_claude_provider():
def test_anthropic_tool_use_conversation_flow():
"""
Test that the Anthropic completion properly handles tool use conversation flow
"""
from unittest.mock import Mock, patch
from crewai.llms.providers.anthropic.completion import AnthropicCompletion
from anthropic.types.tool_use_block import ToolUseBlock
# Create AnthropicCompletion instance
completion = AnthropicCompletion(model="claude-3-5-sonnet-20241022")
# Mock tool function
def mock_weather_tool(location: str) -> str:
return f"The weather in {location} is sunny and 75°F"
available_functions = {"get_weather": mock_weather_tool}
# Mock the Anthropic client responses
with patch.object(completion.client.messages, 'create') as mock_create:
# Mock initial response with tool use - need to properly mock ToolUseBlock
mock_tool_use = Mock(spec=ToolUseBlock)
mock_tool_use.type = "tool_use"
mock_tool_use.id = "tool_123"
mock_tool_use.name = "get_weather"
mock_tool_use.input = {"location": "San Francisco"}
mock_initial_response = Mock()
mock_initial_response.content = [mock_tool_use]
mock_initial_response.usage = Mock()
mock_initial_response.usage.input_tokens = 100
mock_initial_response.usage.output_tokens = 50
# Mock final response after tool result - properly mock text content
mock_text_block = Mock()
mock_text_block.type = "text"
# Set the text attribute as a string, not another Mock
mock_text_block.configure_mock(text="Based on the weather data, it's a beautiful day in San Francisco with sunny skies and 75°F temperature.")
mock_final_response = Mock()
mock_final_response.content = [mock_text_block]
mock_final_response.usage = Mock()
mock_final_response.usage.input_tokens = 150
mock_final_response.usage.output_tokens = 75
# Configure mock to return different responses on successive calls
mock_create.side_effect = [mock_initial_response, mock_final_response]
# Test the call
messages = [{"role": "user", "content": "What's the weather like in San Francisco?"}]
result = completion.call(
messages=messages,
available_functions=available_functions
)
# Verify the result contains the final response
assert "beautiful day in San Francisco" in result
assert "sunny skies" in result
assert "75°F" in result
# Verify that two API calls were made (initial + follow-up)
assert mock_create.call_count == 2
# Verify the second call includes tool results
second_call_args = mock_create.call_args_list[1][1] # kwargs of second call
messages_in_second_call = second_call_args["messages"]
# Should have original user message + assistant tool use + user tool result
assert len(messages_in_second_call) == 3
assert messages_in_second_call[0]["role"] == "user"
assert messages_in_second_call[1]["role"] == "assistant"
assert messages_in_second_call[2]["role"] == "user"
# Verify tool result format
tool_result = messages_in_second_call[2]["content"][0]
assert tool_result["type"] == "tool_result"
assert tool_result["tool_use_id"] == "tool_123"
assert "sunny and 75°F" in tool_result["content"]
def test_anthropic_completion_module_is_imported():
"""
Test that the completion module is properly imported when using Anthropic provider
@@ -795,125 +874,6 @@ def test_anthropic_function_calling():
# =============================================================================
@pytest.mark.vcr(filter_headers=["authorization", "x-api-key"])
def test_anthropic_tool_execution_with_available_functions():
"""
Test that Anthropic provider correctly executes tools when available_functions is provided.
This specifically tests the fix for double llm_call_completed emission - when
available_functions is provided, _handle_tool_execution is called which already
emits llm_call_completed, so the caller should not emit it again.
The test verifies:
1. The tool is called with correct arguments
2. The tool result is returned directly (not wrapped in conversation)
3. The result is valid JSON matching the tool output format
"""
import json
llm = LLM(model="anthropic/claude-3-5-haiku-20241022")
# Simple tool that returns a formatted string
def create_reasoning_plan(plan: str, steps: list, ready: bool) -> str:
"""Create a reasoning plan with steps."""
return json.dumps({"plan": plan, "steps": steps, "ready": ready})
tools = [
{
"name": "create_reasoning_plan",
"description": "Create a structured reasoning plan for completing a task",
"input_schema": {
"type": "object",
"properties": {
"plan": {
"type": "string",
"description": "High-level plan description"
},
"steps": {
"type": "array",
"items": {"type": "object"},
"description": "List of steps to execute"
},
"ready": {
"type": "boolean",
"description": "Whether the plan is ready to execute"
}
},
"required": ["plan", "steps", "ready"]
}
}
]
result = llm.call(
messages=[{"role": "user", "content": "Create a simple plan to say hello. Use the create_reasoning_plan tool."}],
tools=tools,
available_functions={"create_reasoning_plan": create_reasoning_plan}
)
# Verify result is valid JSON from the tool
assert result is not None
assert isinstance(result, str)
# Parse the result to verify it's valid JSON
parsed_result = json.loads(result)
assert "plan" in parsed_result
assert "steps" in parsed_result
assert "ready" in parsed_result
@pytest.mark.vcr(filter_headers=["authorization", "x-api-key"])
def test_anthropic_tool_execution_returns_tool_result_directly():
"""
Test that when available_functions is provided, the tool result is returned directly
without additional LLM conversation (matching OpenAI behavior for reasoning_handler).
"""
llm = LLM(model="anthropic/claude-3-5-haiku-20241022")
call_count = 0
def simple_calculator(operation: str, a: int, b: int) -> str:
"""Perform a simple calculation."""
nonlocal call_count
call_count += 1
if operation == "add":
return str(a + b)
elif operation == "multiply":
return str(a * b)
return "Unknown operation"
tools = [
{
"name": "simple_calculator",
"description": "Perform simple math operations",
"input_schema": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["add", "multiply"],
"description": "The operation to perform"
},
"a": {"type": "integer", "description": "First number"},
"b": {"type": "integer", "description": "Second number"}
},
"required": ["operation", "a", "b"]
}
}
]
result = llm.call(
messages=[{"role": "user", "content": "Calculate 5 + 3 using the simple_calculator tool with operation 'add'."}],
tools=tools,
available_functions={"simple_calculator": simple_calculator}
)
# Tool should have been called exactly once
assert call_count == 1, f"Expected tool to be called once, got {call_count}"
# Result should be the direct tool output
assert result == "8", f"Expected '8' but got '{result}'"
@pytest.mark.vcr()
def test_anthropic_agent_kickoff_structured_output_without_tools():
"""

View File

@@ -2,7 +2,6 @@ from unittest.mock import MagicMock, patch
import pytest
from crewai.events.event_listener import event_listener
from crewai.core.providers.human_input import SyncHumanInputProvider
class TestFlowHumanInputIntegration:
@@ -25,9 +24,14 @@ class TestFlowHumanInputIntegration:
@patch("builtins.input", return_value="")
def test_human_input_pauses_flow_updates(self, mock_input):
"""Test that human input pauses Flow status updates."""
provider = SyncHumanInputProvider()
crew = MagicMock()
crew._train = False
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = False
executor._printer = MagicMock()
formatter = event_listener.formatter
@@ -35,7 +39,7 @@ class TestFlowHumanInputIntegration:
patch.object(formatter, "pause_live_updates") as mock_pause,
patch.object(formatter, "resume_live_updates") as mock_resume,
):
result = provider._prompt_input(crew)
result = executor._ask_human_input("Test result")
mock_pause.assert_called_once()
mock_resume.assert_called_once()
@@ -45,9 +49,14 @@ class TestFlowHumanInputIntegration:
@patch("builtins.input", side_effect=["feedback", ""])
def test_multiple_human_input_rounds(self, mock_input):
"""Test multiple rounds of human input with Flow status management."""
provider = SyncHumanInputProvider()
crew = MagicMock()
crew._train = False
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = False
executor._printer = MagicMock()
formatter = event_listener.formatter
@@ -66,10 +75,10 @@ class TestFlowHumanInputIntegration:
formatter, "resume_live_updates", side_effect=track_resume
),
):
result1 = provider._prompt_input(crew)
result1 = executor._ask_human_input("Test result 1")
assert result1 == "feedback"
result2 = provider._prompt_input(crew)
result2 = executor._ask_human_input("Test result 2")
assert result2 == ""
assert len(pause_calls) == 2
@@ -94,9 +103,14 @@ class TestFlowHumanInputIntegration:
def test_pause_resume_exception_handling(self):
"""Test that resume is called even if exception occurs during human input."""
provider = SyncHumanInputProvider()
crew = MagicMock()
crew._train = False
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = False
executor._printer = MagicMock()
formatter = event_listener.formatter
@@ -108,16 +122,21 @@ class TestFlowHumanInputIntegration:
),
):
with pytest.raises(KeyboardInterrupt):
provider._prompt_input(crew)
executor._ask_human_input("Test result")
mock_pause.assert_called_once()
mock_resume.assert_called_once()
def test_training_mode_human_input(self):
"""Test human input in training mode."""
provider = SyncHumanInputProvider()
crew = MagicMock()
crew._train = True
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = True
executor._printer = MagicMock()
formatter = event_listener.formatter
@@ -127,7 +146,7 @@ class TestFlowHumanInputIntegration:
patch.object(formatter.console, "print") as mock_console_print,
patch("builtins.input", return_value="training feedback"),
):
result = provider._prompt_input(crew)
result = executor._ask_human_input("Test result")
mock_pause.assert_called_once()
mock_resume.assert_called_once()
@@ -142,4 +161,4 @@ class TestFlowHumanInputIntegration:
for call in call_args
if call[0]
)
assert training_panel_found
assert training_panel_found

View File

@@ -8,6 +8,7 @@ from pathlib import Path
import pytest
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.human_feedback import human_feedback
from crewai.flow.visualization import (
build_flow_structure,
visualize_flow_structure,
@@ -667,4 +668,180 @@ def test_no_warning_for_properly_typed_router(caplog):
# No warnings should be logged
warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
assert not any("Could not determine return paths" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
def test_human_feedback_node_metadata():
"""Test that human feedback nodes have correct metadata."""
from typing import Literal
class HITLFlow(Flow):
"""Flow with human-in-the-loop feedback."""
@start()
@human_feedback(
message="Please review the output:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review_content(self) -> Literal["approved", "rejected"]:
return "approved"
@listen("approved")
def on_approved(self):
return "published"
@listen("rejected")
def on_rejected(self):
return "discarded"
flow = HITLFlow()
structure = build_flow_structure(flow)
review_node = structure["nodes"]["review_content"]
assert review_node["is_human_feedback"] is True
assert review_node["type"] == "human_feedback"
assert review_node["human_feedback_message"] == "Please review the output:"
assert review_node["human_feedback_emit"] == ["approved", "rejected"]
assert review_node["human_feedback_llm"] == "gpt-4o-mini"
def test_human_feedback_visualization_includes_hitl_data():
"""Test that visualization includes human feedback data in HTML."""
from typing import Literal
class HITLFlow(Flow):
"""Flow with human-in-the-loop feedback."""
@start()
@human_feedback(
message="Please review the output:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review_content(self) -> Literal["approved", "rejected"]:
return "approved"
@listen("approved")
def on_approved(self):
return "published"
flow = HITLFlow()
structure = build_flow_structure(flow)
html_file = visualize_flow_structure(structure, "test_hitl.html", show=False)
html_path = Path(html_file)
js_file = html_path.parent / f"{html_path.stem}_script.js"
js_content = js_file.read_text(encoding="utf-8")
assert "HITL" in js_content
assert "Please review the output:" in js_content
assert "approved" in js_content
assert "rejected" in js_content
assert "#4A90E2" in js_content
def test_human_feedback_without_emit_metadata():
"""Test that human feedback without emit has correct metadata."""
class HITLSimpleFlow(Flow):
"""Flow with simple human feedback (no routing)."""
@start()
@human_feedback(message="Please provide feedback:")
def review_step(self):
return "content"
@listen(review_step)
def next_step(self):
return "done"
flow = HITLSimpleFlow()
structure = build_flow_structure(flow)
review_node = structure["nodes"]["review_step"]
assert review_node["is_human_feedback"] is True
assert "is_router" not in review_node or review_node["is_router"] is False
assert review_node["type"] == "start"
assert review_node["human_feedback_message"] == "Please provide feedback:"
def test_human_feedback_with_default_outcome():
"""Test that human feedback with default outcome includes it in metadata."""
from typing import Literal
class HITLDefaultFlow(Flow):
"""Flow with human feedback that has a default outcome."""
@start()
@human_feedback(
message="Review this:",
emit=["approved", "needs_work"],
llm="gpt-4o-mini",
default_outcome="needs_work",
)
def review(self) -> Literal["approved", "needs_work"]:
return "approved"
@listen("approved")
def on_approved(self):
return "published"
@listen("needs_work")
def on_needs_work(self):
return "revised"
flow = HITLDefaultFlow()
structure = build_flow_structure(flow)
review_node = structure["nodes"]["review"]
assert review_node["is_human_feedback"] is True
assert review_node["human_feedback_default_outcome"] == "needs_work"
def test_mixed_router_and_human_feedback():
"""Test flow with both regular routers and human feedback routers."""
from typing import Literal
class MixedFlow(Flow):
"""Flow with both regular routers and HITL."""
@start()
def init(self):
return "initialized"
@router(init)
def auto_decision(self) -> Literal["path_a", "path_b"]:
return "path_a"
@listen("path_a")
@human_feedback(
message="Review this step:",
emit=["continue", "stop"],
llm="gpt-4o-mini",
)
def human_review(self) -> Literal["continue", "stop"]:
return "continue"
@listen("continue")
def proceed(self):
return "done"
@listen("stop")
def halt(self):
return "halted"
flow = MixedFlow()
structure = build_flow_structure(flow)
auto_node = structure["nodes"]["auto_decision"]
assert auto_node["type"] == "router"
assert auto_node["is_router"] is True
assert "is_human_feedback" not in auto_node or auto_node["is_human_feedback"] is False
human_node = structure["nodes"]["human_review"]
assert human_node["type"] == "human_feedback"
assert human_node["is_router"] is True
assert human_node["is_human_feedback"] is True
assert human_node["human_feedback_message"] == "Review this step:"

View File

@@ -217,7 +217,6 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Hello ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -225,7 +224,6 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="World!",
call_id="test-call-id",
),
)
return mock_output
@@ -286,7 +284,6 @@ class TestCrewKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="",
call_id="test-call-id",
tool_call=ToolCall(
id="call-123",
function=FunctionCall(
@@ -367,7 +364,6 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -375,7 +371,6 @@ class TestCrewKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Stream!",
call_id="test-call-id",
),
)
return mock_output
@@ -456,7 +451,6 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Flow ",
call_id="test-call-id",
),
)
crewai_event_bus.emit(
@@ -464,7 +458,6 @@ class TestFlowKickoffStreaming:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="output!",
call_id="test-call-id",
),
)
return "done"
@@ -552,7 +545,6 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="Async flow ",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -561,7 +553,6 @@ class TestFlowKickoffStreamingAsync:
LLMStreamChunkEvent(
type="llm_stream_chunk",
chunk="stream!",
call_id="test-call-id",
),
)
await asyncio.sleep(0.01)
@@ -695,7 +686,6 @@ class TestStreamingEdgeCases:
type="llm_stream_chunk",
chunk="Task 1",
task_name="First task",
call_id="test-call-id",
),
)
return mock_output

View File

@@ -249,8 +249,6 @@ def test_guardrail_emits_events(sample_agent):
result = task.execute_sync(agent=sample_agent)
crewai_event_bus.flush(timeout=10.0)
with condition:
success = condition.wait_for(
lambda: len(started_guardrail) >= 2 and len(completed_guardrail) >= 2,
@@ -269,8 +267,6 @@ def test_guardrail_emits_events(sample_agent):
task.execute_sync(agent=sample_agent)
crewai_event_bus.flush(timeout=10.0)
with condition:
success = condition.wait_for(
lambda: len(started_guardrail) >= 3 and len(completed_guardrail) >= 3,

View File

@@ -10,9 +10,7 @@ from crewai import Agent, Task
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolSelectionErrorEvent,
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
ToolValidateInputErrorEvent,
)
from crewai.tools import BaseTool
@@ -746,78 +744,3 @@ def test_tool_usage_finished_event_with_cached_result():
assert isinstance(event.started_at, datetime.datetime)
assert isinstance(event.finished_at, datetime.datetime)
assert event.type == "tool_usage_finished"
def test_tool_error_does_not_emit_finished_event():
from crewai.tools.tool_calling import ToolCalling
class FailingTool(BaseTool):
name: str = "Failing Tool"
description: str = "A tool that always fails"
def _run(self, **kwargs) -> str:
raise ValueError("Intentional failure")
failing_tool = FailingTool().to_structured_tool()
mock_agent = MagicMock()
mock_agent.key = "test_agent_key"
mock_agent.role = "test_agent_role"
mock_agent._original_role = "test_agent_role"
mock_agent.verbose = False
mock_agent.fingerprint = None
mock_agent.i18n.tools.return_value = {"name": "Add Image"}
mock_agent.i18n.errors.return_value = "Error: {error}"
mock_agent.i18n.slice.return_value = "Available tools: {tool_names}"
mock_task = MagicMock()
mock_task.delegations = 0
mock_task.name = "Test Task"
mock_task.description = "A test task"
mock_task.id = "test-task-id"
mock_action = MagicMock()
mock_action.tool = "failing_tool"
mock_action.tool_input = "{}"
tool_usage = ToolUsage(
tools_handler=MagicMock(cache=None, last_used_tool=None),
tools=[failing_tool],
task=mock_task,
function_calling_llm=None,
agent=mock_agent,
action=mock_action,
)
started_events = []
error_events = []
finished_events = []
error_received = threading.Event()
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_started(source, event):
if event.tool_name == "failing_tool":
started_events.append(event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_error(source, event):
if event.tool_name == "failing_tool":
error_events.append(event)
error_received.set()
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_finished(source, event):
if event.tool_name == "failing_tool":
finished_events.append(event)
tool_calling = ToolCalling(tool_name="failing_tool", arguments={})
tool_usage.use(calling=tool_calling, tool_string="Action: failing_tool")
assert error_received.wait(timeout=5), "Timeout waiting for error event"
crewai_event_bus.flush()
assert len(started_events) >= 1, "Expected at least one ToolUsageStartedEvent"
assert len(error_events) >= 1, "Expected at least one ToolUsageErrorEvent"
assert len(finished_events) == 0, (
"ToolUsageFinishedEvent should NOT be emitted after ToolUsageErrorEvent"
)

View File

@@ -984,8 +984,8 @@ def test_streaming_fallback_to_non_streaming():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
nonlocal fallback_called
# Emit a couple of chunks to simulate partial streaming
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id="Id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id="Id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id = "Id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id = "Id"))
# Mark that fallback would be called
fallback_called = True
@@ -1041,7 +1041,7 @@ def test_streaming_empty_response_handling():
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
# Emit a few empty chunks
for _ in range(3):
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="", response_id="id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="",response_id="id"))
# Return the default message for empty responses
return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request."
@@ -1280,105 +1280,6 @@ def test_llm_emits_event_with_lite_agent():
assert set(all_agent_id) == {str(agent.id)}
# ----------- CALL_ID CORRELATION TESTS -----------
@pytest.mark.vcr()
def test_llm_call_events_share_call_id():
"""All events from a single LLM call should share the same call_id."""
import uuid
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
with condition:
success = condition.wait_for(lambda: len(events) >= 2, timeout=10)
assert success, "Timeout waiting for LLM events"
# Behavior: all events from the call share the same call_id
assert len(events) == 2
assert events[0].call_id == events[1].call_id
# call_id should be a valid UUID
uuid.UUID(events[0].call_id)
@pytest.mark.vcr()
def test_streaming_chunks_share_call_id_with_call():
"""Streaming chunks should share call_id with started/completed events."""
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_chunk(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini", stream=True)
llm.call("Say hi")
with condition:
# Wait for at least started, some chunks, and completed
success = condition.wait_for(lambda: len(events) >= 3, timeout=10)
assert success, "Timeout waiting for streaming events"
# Behavior: all events (started, chunks, completed) share the same call_id
call_ids = {e.call_id for e in events}
assert len(call_ids) == 1
@pytest.mark.vcr()
def test_separate_llm_calls_have_different_call_ids():
"""Different LLM calls should have different call_ids."""
call_ids = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
call_ids.append(event.call_id)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
llm.call("Say bye")
with condition:
success = condition.wait_for(lambda: len(call_ids) >= 2, timeout=10)
assert success, "Timeout waiting for LLM call events"
# Behavior: each call has its own call_id
assert len(call_ids) == 2
assert call_ids[0] != call_ids[1]
# ----------- HUMAN FEEDBACK EVENTS -----------

View File

@@ -28,7 +28,7 @@ dev = [
"boto3-stubs[bedrock-runtime]==1.40.54",
"types-psycopg2==2.9.21.20251012",
"types-pymysql==1.1.0.20250916",
"types-aiofiles~=25.1.0",
"types-aiofiles~=24.1.0",
]

12
uv.lock generated
View File

@@ -51,7 +51,7 @@ dev = [
{ name = "pytest-timeout", specifier = "==2.4.0" },
{ name = "pytest-xdist", specifier = "==3.8.0" },
{ name = "ruff", specifier = "==0.14.7" },
{ name = "types-aiofiles", specifier = "~=25.1.0" },
{ name = "types-aiofiles", specifier = "~=24.1.0" },
{ name = "types-appdirs", specifier = "==1.4.*" },
{ name = "types-psycopg2", specifier = "==2.9.21.20251012" },
{ name = "types-pymysql", specifier = "==1.1.0.20250916" },
@@ -1294,10 +1294,10 @@ requires-dist = [
{ name = "json-repair", specifier = "~=0.25.2" },
{ name = "json5", specifier = "~=0.10.0" },
{ name = "jsonref", specifier = "~=1.1.0" },
{ name = "litellm", marker = "extra == 'litellm'", specifier = ">=1.74.9,<3" },
{ name = "litellm", marker = "extra == 'litellm'", specifier = "~=1.74.9" },
{ name = "mcp", specifier = "~=1.23.1" },
{ name = "mem0ai", marker = "extra == 'mem0'", specifier = "~=0.1.94" },
{ name = "openai", specifier = ">=1.83.0,<3" },
{ name = "openai", specifier = "~=1.83.0" },
{ name = "openpyxl", specifier = "~=3.1.5" },
{ name = "openpyxl", marker = "extra == 'openpyxl'", specifier = "~=3.1.5" },
{ name = "opentelemetry-api", specifier = "~=1.34.0" },
@@ -8282,11 +8282,11 @@ wheels = [
[[package]]
name = "types-aiofiles"
version = "25.1.0.20251011"
version = "24.1.0.20250822"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/84/6c/6d23908a8217e36704aa9c79d99a620f2fdd388b66a4b7f72fbc6b6ff6c6/types_aiofiles-25.1.0.20251011.tar.gz", hash = "sha256:1c2b8ab260cb3cd40c15f9d10efdc05a6e1e6b02899304d80dfa0410e028d3ff", size = 14535, upload-time = "2025-10-11T02:44:51.237Z" }
sdist = { url = "https://files.pythonhosted.org/packages/19/48/c64471adac9206cc844afb33ed311ac5a65d2f59df3d861e0f2d0cad7414/types_aiofiles-24.1.0.20250822.tar.gz", hash = "sha256:9ab90d8e0c307fe97a7cf09338301e3f01a163e39f3b529ace82466355c84a7b", size = 14484, upload-time = "2025-08-22T03:02:23.039Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/71/0f/76917bab27e270bb6c32addd5968d69e558e5b6f7fb4ac4cbfa282996a96/types_aiofiles-25.1.0.20251011-py3-none-any.whl", hash = "sha256:8ff8de7f9d42739d8f0dadcceeb781ce27cd8d8c4152d4a7c52f6b20edb8149c", size = 14338, upload-time = "2025-10-11T02:44:50.054Z" },
{ url = "https://files.pythonhosted.org/packages/bc/8e/5e6d2215e1d8f7c2a94c6e9d0059ae8109ce0f5681956d11bb0a228cef04/types_aiofiles-24.1.0.20250822-py3-none-any.whl", hash = "sha256:0ec8f8909e1a85a5a79aed0573af7901f53120dd2a29771dd0b3ef48e12328b0", size = 14322, upload-time = "2025-08-22T03:02:21.918Z" },
]
[[package]]