Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
fae812ffb7 feat: add ToolSearchTool for on-demand tool discovery
Implements Anthropic's Tool Search Tool pattern for on-demand tool loading,
reducing token consumption when working with large tool libraries.

Features:
- ToolSearchTool class that searches through a catalog of tools
- Keyword-based search with relevance scoring (default)
- Regex-based search as alternative strategy
- Support for custom search functions
- Tool catalog management (add, remove, list tools)
- Returns JSON with tool definitions including name, description, and args_schema

Closes #4224

Co-Authored-By: João <joao@crewai.com>
2026-01-12 09:19:16 +00:00
32 changed files with 795 additions and 2455 deletions

View File

@@ -291,7 +291,6 @@
"en/observability/arize-phoenix",
"en/observability/braintrust",
"en/observability/datadog",
"en/observability/galileo",
"en/observability/langdb",
"en/observability/langfuse",
"en/observability/langtrace",
@@ -743,7 +742,6 @@
"pt-BR/observability/arize-phoenix",
"pt-BR/observability/braintrust",
"pt-BR/observability/datadog",
"pt-BR/observability/galileo",
"pt-BR/observability/langdb",
"pt-BR/observability/langfuse",
"pt-BR/observability/langtrace",
@@ -1205,7 +1203,6 @@
"ko/observability/arize-phoenix",
"ko/observability/braintrust",
"ko/observability/datadog",
"ko/observability/galileo",
"ko/observability/langdb",
"ko/observability/langfuse",
"ko/observability/langtrace",

View File

@@ -91,10 +91,6 @@ The `A2AConfig` class accepts the following parameters:
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
</ParamField>
<ParamField path="transport_protocol" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
</ParamField>
## Authentication
For A2A agents that require authentication, use one of the provided auth schemes:

View File

@@ -1,115 +0,0 @@
---
title: Galileo
description: Galileo integration for CrewAI tracing and evaluation
icon: telescope
mode: "wide"
---
## Overview
This guide demonstrates how to integrate **Galileo** with **CrewAI**
for comprehensive tracing and Evaluation Engineering.
By the end of this guide, you will be able to trace your CrewAI agents,
monitor their performance, and evaluate their behaviour with
Galileo's powerful observability platform.
> **What is Galileo?** [Galileo](https://galileo.ai) is AI evaluation and observability
platform that delivers end-to-end tracing, evaluation,
and monitoring for AI applications. It enables teams to capture ground truth,
create robust guardrails, and run systematic experiments with
built-in experiment tracking and performance analytics—ensuring reliability,
transparency, and continuous improvement across the AI lifecycle.
## Getting started
This tutorial follows the [CrewAI quickstart](/en/quickstart) and shows how to add
Galileo's [CrewAIEventListener](https://v2docs.galileo.ai/sdk-api/python/reference/handlers/crewai/handler),
an event handler.
For more information, see Galileos
[Add Galileo to a CrewAI Application](https://v2docs.galileo.ai/how-to-guides/third-party-integrations/add-galileo-to-crewai/add-galileo-to-crewai)
how-to guide.
> **Note** This tutorial assumes you have completed the [CrewAI quickstart](/en/quickstart).
If you want a completed comprehensive example, see the Galileo
[CrewAI sdk-example repo](https://github.com/rungalileo/sdk-examples/tree/main/python/agent/crew-ai).
### Step 1: Install dependencies
Install the required dependencies for your app.
Create a virtual environment using your preferred method,
then install dependencies inside that environment using your
preferred tool:
```bash
uv add galileo
```
### Step 2: Add to the .env file from the [CrewAI quickstart](/en/quickstart)
```bash
# Your Galileo API key
GALILEO_API_KEY="your-galileo-api-key"
# Your Galileo project name
GALILEO_PROJECT="your-galileo-project-name"
# The name of the Log stream you want to use for logging
GALILEO_LOG_STREAM="your-galileo-log-stream "
```
### Step 3: Add the Galileo event listener
To enable logging with Galileo, you need to create an instance of the `CrewAIEventListener`.
Import the Galileo CrewAI handler package by
adding the following code at the top of your main.py file:
```python
from galileo.handlers.crewai.handler import CrewAIEventListener
```
At the start of your run function, create the event listener:
```python
def run():
# Create the event listener
CrewAIEventListener()
# The rest of your existing code goes here
```
When you create the listener instance, it is automatically
registered with CrewAI.
### Step 4: Run your crew
Run your crew with the CrewAI CLI:
```bash
crewai run
```
### Step 5: View the traces in Galileo
Once your crew has finished, the traces will be flushed and appear in Galileo.
![Galileo trace view](/images/galileo-trace-veiw.png)
## Understanding the Galileo Integration
Galileo integrates with CrewAI by registering an event listener
that captures Crew execution events (e.g., agent actions, tool calls, model responses)
and forwards them to Galileo for observability and evaluation.
### Understanding the event listener
Creating a `CrewAIEventListener()` instance is all thats
required to enable Galileo for a CrewAI run. When instantiated, the listener:
- Automatically registers itself with CrewAI
- Reads Galileo configuration from environment variables
- Logs all run data to the Galileo project and log stream specified by
`GALILEO_PROJECT` and `GALILEO_LOG_STREAM`
No additional configuration or code changes are required.
All data from this run is logged to the Galileo project and
log stream specified by your environment configuration
(for example, GALILEO_PROJECT and GALILEO_LOG_STREAM).

Binary file not shown.

Before

Width:  |  Height:  |  Size: 239 KiB

View File

@@ -1,115 +0,0 @@
---
title: Galileo 갈릴레오
description: CrewAI 추적 및 평가를 위한 Galileo 통합
icon: telescope
mode: "wide"
---
## 개요
이 가이드는 **Galileo**를 **CrewAI**와 통합하는 방법을 보여줍니다.
포괄적인 추적 및 평가 엔지니어링을 위한 것입니다.
이 가이드가 끝나면 CrewAI 에이전트를 추적할 수 있게 됩니다.
성과를 모니터링하고 행동을 평가합니다.
Galileo의 강력한 관측 플랫폼.
> **갈릴레오(Galileo)란 무엇인가요?**[Galileo](https://galileo.ai/)는 AI 평가 및 관찰 가능성입니다.
엔드투엔드 추적, 평가,
AI 애플리케이션 모니터링. 이를 통해 팀은 실제 사실을 포착할 수 있습니다.
견고한 가드레일을 만들고 체계적인 실험을 실행하세요.
내장된 실험 추적 및 성능 분석으로 신뢰성 보장
AI 수명주기 전반에 걸쳐 투명성과 지속적인 개선을 제공합니다.
## 시작하기
이 튜토리얼은 [CrewAI 빠른 시작](/ko/quickstart.mdx)을 따르며 추가하는 방법을 보여줍니다.
갈릴레오의 [CrewAIEventListener](https://v2docs.galileo.ai/sdk-api/python/reference/handlers/crewai/handler),
이벤트 핸들러.
자세한 내용은 갈릴레오 문서를 참고하세요.
[CrewAI 애플리케이션에 Galileo 추가](https://v2docs.galileo.ai/how-to-guides/third-party-integrations/add-galileo-to-crewai/add-galileo-to-crewai)
방법 안내.
> **참고**이 튜토리얼에서는 [CrewAI 빠른 시작](/ko/quickstart.mdx)을 완료했다고 가정합니다.
완전한 포괄적인 예제를 원한다면 Galileo
[CrewAI SDK 예제 저장소](https://github.com/rungalileo/sdk-examples/tree/main/python/agent/crew-ai).
### 1단계: 종속성 설치
앱에 필요한 종속성을 설치합니다.
원하는 방법으로 가상 환경을 생성하고,
그런 다음 다음을 사용하여 해당 환경 내에 종속성을 설치하십시오.
선호하는 도구:
```bash
uv add galileo
```
### 2단계: [CrewAI 빠른 시작](/ko/quickstart.mdx)에서 .env 파일에 추가
```bash
# Your Galileo API key
GALILEO_API_KEY="your-galileo-api-key"
# Your Galileo project name
GALILEO_PROJECT="your-galileo-project-name"
# The name of the Log stream you want to use for logging
GALILEO_LOG_STREAM="your-galileo-log-stream "
```
### 3단계: Galileo 이벤트 리스너 추가
Galileo로 로깅을 활성화하려면 `CrewAIEventListener`의 인스턴스를 생성해야 합니다.
다음을 통해 Galileo CrewAI 핸들러 패키지를 가져옵니다.
main.py 파일 상단에 다음 코드를 추가하세요.
```python
from galileo.handlers.crewai.handler import CrewAIEventListener
```
실행 함수 시작 시 이벤트 리스너를 생성합니다.
```python
def run():
# Create the event listener
CrewAIEventListener()
# The rest of your existing code goes here
```
리스너 인스턴스를 생성하면 자동으로
CrewAI에 등록되었습니다.
### 4단계: Crew Agent 실행
CrewAI CLI를 사용하여 Crew Agent를 실행하세요.
```bash
crewai run
```
### 5단계: Galileo에서 추적 보기
승무원 에이전트가 완료되면 흔적이 플러시되어 Galileo에 나타납니다.
![Galileo trace view](/images/galileo-trace-veiw.png)
## 갈릴레오 통합 이해
Galileo는 이벤트 리스너를 등록하여 CrewAI와 통합됩니다.
승무원 실행 이벤트(예: 에이전트 작업, 도구 호출, 모델 응답)를 캡처합니다.
관찰 가능성과 평가를 위해 이를 갈릴레오에 전달합니다.
### 이벤트 리스너 이해
`CrewAIEventListener()` 인스턴스를 생성하는 것이 전부입니다.
CrewAI 실행을 위해 Galileo를 활성화하는 데 필요합니다. 인스턴스화되면 리스너는 다음을 수행합니다.
-CrewAI에 자동으로 등록됩니다.
-환경 변수에서 Galileo 구성을 읽습니다.
-모든 실행 데이터를 Galileo 프로젝트 및 다음에서 지정한 로그 스트림에 기록합니다.
`GALILEO_PROJECT` 및 `GALILEO_LOG_STREAM`
추가 구성이나 코드 변경이 필요하지 않습니다.
이 실행의 모든 데이터는 Galileo 프로젝트에 기록되며
환경 구성에 따라 지정된 로그 스트림
(예: GALILEO_PROJECT 및 GALILEO_LOG_STREAM)

View File

@@ -1,115 +0,0 @@
---
title: Galileo Galileu
description: Integração Galileo para rastreamento e avaliação CrewAI
icon: telescope
mode: "wide"
---
## Visão geral
Este guia demonstra como integrar o **Galileo**com o **CrewAI**
para rastreamento abrangente e engenharia de avaliação.
Ao final deste guia, você será capaz de rastrear seus agentes CrewAI,
monitorar seu desempenho e avaliar seu comportamento com
A poderosa plataforma de observabilidade do Galileo.
> **O que é Galileo?**[Galileo](https://galileo.ai/) é avaliação e observabilidade de IA
plataforma que oferece rastreamento, avaliação e
e monitoramento de aplicações de IA. Ele permite que as equipes capturem a verdade,
criar grades de proteção robustas e realizar experimentos sistemáticos com
rastreamento de experimentos integrado e análise de desempenho -garantindo confiabilidade,
transparência e melhoria contínua em todo o ciclo de vida da IA.
## Primeiros passos
Este tutorial segue o [CrewAI Quickstart](pt-BR/quickstart) e mostra como adicionar
[CrewAIEventListener] do Galileo(https://v2docs.galileo.ai/sdk-api/python/reference/handlers/crewai/handler),
um manipulador de eventos.
Para mais informações, consulte Galileu
[Adicionar Galileo a um aplicativo CrewAI](https://v2docs.galileo.ai/how-to-guides/third-party-integrations/add-galileo-to-crewai/add-galileo-to-crewai)
guia prático.
> **Observação**Este tutorial pressupõe que você concluiu o [CrewAI Quickstart](pt-BR/quickstart).
Se você quiser um exemplo completo e abrangente, consulte o Galileo
[Repositório de exemplo SDK da CrewAI](https://github.com/rungalileo/sdk-examples/tree/main/python/agent/crew-ai).
### Etapa 1: instalar dependências
Instale as dependências necessárias para seu aplicativo.
Crie um ambiente virtual usando seu método preferido,
em seguida, instale dependências dentro desse ambiente usando seu
ferramenta preferida:
```bash
uv add galileo
```
### Etapa 2: adicione ao arquivo .env do [CrewAI Quickstart](/pt-BR/quickstart)
```bash
# Your Galileo API key
GALILEO_API_KEY="your-galileo-api-key"
# Your Galileo project name
GALILEO_PROJECT="your-galileo-project-name"
# The name of the Log stream you want to use for logging
GALILEO_LOG_STREAM="your-galileo-log-stream "
```
### Etapa 3: adicionar o ouvinte de eventos Galileo
Para habilitar o registro com Galileo, você precisa criar uma instância do `CrewAIEventListener`.
Importe o pacote manipulador Galileo CrewAI por
adicionando o seguinte código no topo do seu arquivo main.py:
```python
from galileo.handlers.crewai.handler import CrewAIEventListener
```
No início da sua função run, crie o ouvinte de evento:
```python
def run():
# Create the event listener
CrewAIEventListener()
# The rest of your existing code goes here
```
Quando você cria a instância do listener, ela é automaticamente
registrado na CrewAI.
### Etapa 4: administre sua Crew
Administre sua Crew com o CrewAI CLI:
```bash
crewai run
```
### Passo 5: Visualize os traços no Galileo
Assim que sua tripulação terminar, os rastros serão eliminados e aparecerão no Galileo.
![Galileo trace view](/images/galileo-trace-veiw.png)
## Compreendendo a integração do Galileo
Galileo se integra ao CrewAI registrando um ouvinte de evento
que captura eventos de execução da tripulação (por exemplo, ações do agente, chamadas de ferramentas, respostas do modelo)
e os encaminha ao Galileo para observabilidade e avaliação.
### Compreendendo o ouvinte de eventos
Criar uma instância `CrewAIEventListener()` é tudo o que você precisa
necessário para habilitar o Galileo para uma execução do CrewAI. Quando instanciado, o ouvinte:
-Registra-se automaticamente no CrewAI
-Lê a configuração do Galileo a partir de variáveis de ambiente
-Registra todos os dados de execução no projeto Galileo e fluxo de log especificado por
`GALILEO_PROJECT` e `GALILEO_LOG_STREAM`
Nenhuma configuração adicional ou alterações de código são necessárias.
Todos os dados desta execução são registados no projecto Galileo e
fluxo de log especificado pela configuração do seu ambiente
(por exemplo, GALILEO_PROJECT e GALILEO_LOG_STREAM).

View File

@@ -5,7 +5,7 @@ This module is separate from experimental.a2a to avoid circular imports.
from __future__ import annotations
from typing import Annotated, Any, ClassVar, Literal
from typing import Annotated, Any, ClassVar
from pydantic import (
BaseModel,
@@ -53,7 +53,6 @@ class A2AConfig(BaseModel):
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
transport_protocol: A2A transport protocol (grpc, jsonrpc, http+json).
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@@ -83,7 +82,3 @@ class A2AConfig(BaseModel):
default_factory=_get_default_update_config,
description="Update mechanism config",
)
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)

View File

@@ -7,7 +7,7 @@ from collections.abc import AsyncIterator, MutableMapping
from contextlib import asynccontextmanager
from functools import lru_cache
import time
from typing import TYPE_CHECKING, Any, Literal
from typing import TYPE_CHECKING, Any
import uuid
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
@@ -18,6 +18,7 @@ from a2a.types import (
PushNotificationConfig as A2APushNotificationConfig,
Role,
TextPart,
TransportProtocol,
)
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
@@ -258,7 +259,6 @@ async def _afetch_agent_card_impl(
def execute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -282,23 +282,6 @@ def execute_a2a_delegation(
use aexecute_a2a_delegation directly.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
timeout: Request timeout in seconds
task_description: The task to delegate
context: Optional context information
context_id: Context ID for correlating messages/tasks
task_id: Specific task identifier
reference_task_ids: List of related task IDs
metadata: Additional metadata (external_id, request_id, etc.)
extensions: Protocol extensions for custom fields
conversation_history: Previous Message objects from conversation
agent_id: Agent identifier for logging
agent_role: Role of the CrewAI agent delegating the task
agent_branch: Optional agent tree branch for logging
response_model: Optional Pydantic model for structured outputs
turn_number: Optional turn number for multi-turn conversations
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -340,7 +323,6 @@ def execute_a2a_delegation(
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
transport_protocol=transport_protocol,
turn_number=turn_number,
updates=updates,
)
@@ -351,7 +333,6 @@ def execute_a2a_delegation(
async def aexecute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -375,23 +356,6 @@ async def aexecute_a2a_delegation(
in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()).
Args:
endpoint: A2A agent endpoint URL
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
task_description: Task to delegate
context: Optional context
context_id: Context ID for correlation
task_id: Specific task identifier
reference_task_ids: Related task IDs
metadata: Additional metadata
extensions: Protocol extensions
conversation_history: Previous Message objects
turn_number: Current turn number
agent_branch: Agent tree branch for logging
agent_id: Agent identifier for logging
agent_role: Agent role for logging
response_model: Optional Pydantic model for structured outputs
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -450,7 +414,6 @@ async def aexecute_a2a_delegation(
agent_role=agent_role,
response_model=response_model,
updates=updates,
transport_protocol=transport_protocol,
)
crewai_event_bus.emit(
@@ -468,7 +431,6 @@ async def aexecute_a2a_delegation(
async def _aexecute_a2a_delegation_impl(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -562,6 +524,7 @@ async def _aexecute_a2a_delegation_impl(
extensions=extensions,
)
transport_protocol = TransportProtocol("JSONRPC")
new_messages: list[Message] = [*conversation_history, message]
crewai_event_bus.emit(
None,
@@ -633,7 +596,7 @@ async def _aexecute_a2a_delegation_impl(
@asynccontextmanager
async def _create_a2a_client(
agent_card: AgentCard,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
transport_protocol: TransportProtocol,
timeout: int,
headers: MutableMapping[str, str],
streaming: bool,
@@ -677,7 +640,7 @@ async def _create_a2a_client(
config = ClientConfig(
httpx_client=httpx_client,
supported_transports=[transport_protocol],
supported_transports=[str(transport_protocol.value)],
streaming=streaming and not use_polling,
polling=use_polling,
accepted_output_modes=["application/json"],

View File

@@ -771,7 +771,6 @@ def _delegate_to_a2a(
response_model=agent_config.response_model,
turn_number=turn_num + 1,
updates=agent_config.updates,
transport_protocol=agent_config.transport_protocol,
)
conversation_history = a2a_result.get("history", [])
@@ -1086,7 +1085,6 @@ async def _adelegate_to_a2a(
agent_branch=agent_branch,
response_model=agent_config.response_model,
turn_number=turn_num + 1,
transport_protocol=agent_config.transport_protocol,
updates=agent_config.updates,
)

View File

@@ -709,17 +709,9 @@ class Agent(BaseAgent):
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
use_native_tool_calling = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and len(raw_tools) > 0
)
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
use_native_tool_calling=use_native_tool_calling,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
@@ -727,8 +719,6 @@ class Agent(BaseAgent):
response_template=self.response_template,
).task_execution()
print("prompt", prompt)
stop_words = [self.i18n.slice("observation")]
if self.response_template:

View File

@@ -236,30 +236,14 @@ def process_tool_results(agent: Agent, result: Any) -> Any:
def save_last_messages(agent: Agent) -> None:
"""Save the last messages from agent executor.
Sanitizes messages to be compatible with TaskOutput's LLMMessage type,
which only accepts 'user', 'assistant', 'system' roles and requires
content to be a string or list (not None).
Args:
agent: The agent instance.
"""
if not agent.agent_executor or not hasattr(agent.agent_executor, "messages"):
agent._last_messages = []
return
sanitized_messages = []
for msg in agent.agent_executor.messages:
role = msg.get("role", "")
# Only include messages with valid LLMMessage roles
if role not in ("user", "assistant", "system"):
continue
# Ensure content is not None (can happen with tool call assistant messages)
content = msg.get("content")
if content is None:
content = ""
sanitized_messages.append({"role": role, "content": content})
agent._last_messages = sanitized_messages
agent._last_messages = (
agent.agent_executor.messages.copy()
if agent.agent_executor and hasattr(agent.agent_executor, "messages")
else []
)
def prepare_tools(

View File

@@ -30,7 +30,6 @@ from crewai.hooks.llm_hooks import (
)
from crewai.utilities.agent_utils import (
aget_llm_response,
convert_tools_to_openai_schema,
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
@@ -216,33 +215,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _invoke_loop(self) -> AgentFinish:
"""Execute agent loop until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return self._invoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return self._invoke_loop_react()
def _invoke_loop_react(self) -> AgentFinish:
"""Execute agent loop using ReAct text-based pattern.
This is the traditional approach where tool definitions are embedded
in the prompt and the LLM outputs Action/Action Input text that is
parsed to execute tools.
Returns:
Final answer from the agent.
"""
@@ -272,10 +244,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
response_model=self.response_model,
executor_context=self,
)
print("--------------------------------")
print("get_llm_response answer", answer)
print("--------------------------------")
# breakpoint()
if self.response_model is not None:
try:
self.response_model.model_validate_json(answer)
@@ -365,338 +333,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
def _invoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern. The LLM directly returns
structured tool calls which are executed and results fed back.
Returns:
Final answer from the agent.
"""
print("--------------------------------")
print("invoke_loop_native_tools")
print("--------------------------------")
# Convert tools to OpenAI schema format
if not self.original_tools:
# No tools available, fall back to simple LLM call
return self._invoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Debug: Show messages being sent to LLM
print("--------------------------------")
print(f"Messages count: {len(self.messages)}")
for i, msg in enumerate(self.messages):
role = msg.get("role", "unknown")
content = msg.get("content", "")
if content:
preview = (
content[:200] + "..." if len(content) > 200 else content
)
else:
preview = "(no content)"
print(f" [{i}] {role}: {preview}")
print("--------------------------------")
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
print("--------------------------------")
print("invoke_loop_native_tools answer", answer)
print("--------------------------------")
# print("get_llm_response answer", answer[:500] + "...")
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
self._handle_native_tool_calls(answer, available_functions)
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
def _invoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# OpenAI-style
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Anthropic-style
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Gemini-style
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
def _handle_native_tool_calls(
self,
tool_calls: list[Any],
available_functions: dict[str, Callable[..., Any]],
) -> None:
"""Handle a single native tool call from the LLM.
Executes only the FIRST tool call and appends the result to message history.
This enables sequential tool execution with reflection after each tool,
allowing the LLM to reason about results before deciding on next steps.
Args:
tool_calls: List of tool calls from the LLM (only first is processed).
available_functions: Dict mapping function names to callables.
"""
from datetime import datetime
import json
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
if not tool_calls:
return
# Only process the FIRST tool call for sequential execution with reflection
tool_call = tool_calls[0]
# Extract tool call info - handle OpenAI-style, Anthropic-style, and Gemini-style
if hasattr(tool_call, "function"):
# OpenAI-style: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.function.name
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini-style: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
func_name = tool_call.function_call.name
func_args = (
dict(tool_call.function_call.args)
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.name
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
call_id = tool_call.get("id", f"call_{id(tool_call)}")
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
return
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
],
}
self.messages.append(assistant_message)
# Parse arguments for the single tool call
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
),
)
# Execute the tool
print(f"Using Tool: {func_name}")
result = "Tool not found"
if func_name in available_functions:
try:
tool_func = available_functions[func_name]
result = tool_func(**args_dict)
if not isinstance(result, str):
result = str(result)
except Exception as e:
result = f"Error executing tool: {e}"
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"content": result,
}
self.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Tool {func_name} executed with result: {result[:200]}...",
color="green",
)
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.messages.append(reasoning_message)
async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute the agent asynchronously with given inputs.
@@ -746,29 +382,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
async def _ainvoke_loop(self) -> AgentFinish:
"""Execute agent loop asynchronously until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return await self._ainvoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return await self._ainvoke_loop_react()
async def _ainvoke_loop_react(self) -> AgentFinish:
"""Execute agent loop asynchronously using ReAct text-based pattern.
Returns:
Final answer from the agent.
"""
@@ -882,139 +495,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
async def _ainvoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop asynchronously using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern.
Returns:
Final answer from the agent.
"""
# Convert tools to OpenAI schema format
if not self.original_tools:
return await self._ainvoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
print("--------------------------------")
print("native llm completion answer", answer)
print("--------------------------------")
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
self._handle_native_tool_calls(answer, available_functions)
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
async def _ainvoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple async LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> AgentAction | AgentFinish:

View File

@@ -378,12 +378,6 @@ class EventListener(BaseEventListener):
self.formatter.handle_llm_tool_usage_finished(
event.tool_name,
)
else:
self.formatter.handle_tool_usage_finished(
event.tool_name,
event.output,
getattr(event, "run_attempts", None),
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source: Any, event: ToolUsageErrorEvent) -> None:

View File

@@ -366,32 +366,6 @@ To enable tracing, do any one of these:
self.print_panel(content, f"🔧 Tool Execution Started (#{iteration})", "yellow")
def handle_tool_usage_finished(
self,
tool_name: str,
output: str,
run_attempts: int | None = None,
) -> None:
"""Handle tool usage finished event with panel display."""
if not self.verbose:
return
iteration = self.tool_usage_counts.get(tool_name, 1)
content = Text()
content.append("Tool Completed\n", style="green bold")
content.append("Tool: ", style="white")
content.append(f"{tool_name}\n", style="green bold")
if output:
content.append("Output: ", style="white")
content.append(f"{output}\n", style="green")
self.print_panel(
content, f"✅ Tool Execution Completed (#{iteration})", "green"
)
def handle_tool_usage_error(
self,
tool_name: str,

View File

@@ -1,8 +1,6 @@
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
import json
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
@@ -19,24 +17,16 @@ from crewai.agents.parser import (
OutputParserError,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled_in_context,
)
from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.flow.flow import Flow, listen, or_, router, start
from crewai.hooks.llm_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
)
from crewai.utilities.agent_utils import (
convert_tools_to_openai_schema,
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
@@ -81,8 +71,6 @@ class AgentReActState(BaseModel):
current_answer: AgentAction | AgentFinish | None = Field(default=None)
is_finished: bool = Field(default=False)
ask_for_human_input: bool = Field(default=False)
use_native_tools: bool = Field(default=False)
pending_tool_calls: list[Any] = Field(default_factory=list)
class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@@ -191,10 +179,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
)
)
# Native tool calling support
self._openai_tools: list[dict[str, Any]] = []
self._available_functions: dict[str, Callable[..., Any]] = {}
self._state = AgentReActState()
def _ensure_flow_initialized(self) -> None:
@@ -205,66 +189,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
Only the instance that actually executes via invoke() will emit events.
"""
if not self._flow_initialized:
current_tracing = is_tracing_enabled_in_context()
# Now call Flow's __init__ which will replace self._state
# with Flow's managed state. Suppress flow events since this is
# an agent executor, not a user-facing flow.
super().__init__(
suppress_flow_events=True,
tracing=current_tracing if current_tracing else None,
)
self._flow_initialized = True
def _check_native_tool_support(self) -> bool:
"""Check if LLM supports native function calling.
Returns:
True if the LLM supports native function calling and tools are available.
"""
return (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and bool(self.original_tools)
)
def _setup_native_tools(self) -> None:
"""Convert tools to OpenAI schema format for native function calling."""
if self.original_tools:
self._openai_tools, self._available_functions = (
convert_tools_to_openai_schema(self.original_tools)
)
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# Check for OpenAI-style tool call structure
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Check for Anthropic-style tool call structure (ToolUseBlock)
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Check for Gemini-style function call (Part with function_call)
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
@property
def use_stop_words(self) -> bool:
"""Check to determine if stop words are being used.
@@ -297,11 +229,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def initialize_reasoning(self) -> Literal["initialized"]:
"""Initialize the reasoning flow and emit agent start logs."""
self._show_start_logs()
# Check for native tool support on first iteration
if self.state.iterations == 0:
self.state.use_native_tools = self._check_native_tool_support()
if self.state.use_native_tools:
self._setup_native_tools()
return "initialized"
@listen("force_final_answer")
@@ -376,69 +303,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
handle_unknown_error(self._printer, e)
raise
@listen("continue_reasoning_native")
def call_llm_native_tools(
self,
) -> Literal["native_tool_calls", "native_finished", "context_error"]:
"""Execute LLM call with native function calling.
Returns routing decision based on whether tool calls or final answer.
"""
try:
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution.
answer = get_llm_response(
llm=self.llm,
messages=list(self.state.messages),
callbacks=self.callbacks,
printer=self._printer,
tools=self._openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
# Check if the response is a list of tool calls
if isinstance(answer, list) and answer and self._is_tool_call_list(answer):
# Store tool calls for sequential processing
self.state.pending_tool_calls = list(answer)
return "native_tool_calls"
# Text response - this is the final answer
if isinstance(answer, str):
self.state.current_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(answer)
return "native_finished"
# Unexpected response type, treat as final answer
self.state.current_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(str(answer))
return "native_finished"
except Exception as e:
if is_context_length_exceeded(e):
self._last_context_error = e
return "context_error"
if e.__class__.__module__.startswith("litellm"):
raise e
handle_unknown_error(self._printer, e)
raise
@router(call_llm_and_parse)
def route_by_answer_type(self) -> Literal["execute_tool", "agent_finished"]:
"""Route based on whether answer is AgentAction or AgentFinish."""
@@ -494,14 +358,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.is_finished = True
return "tool_result_is_final"
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "tool_completed"
except Exception as e:
@@ -511,143 +367,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self._console.print(error_text)
raise
@listen("native_tool_calls")
def execute_native_tool(self) -> Literal["native_tool_completed"]:
"""Execute a single native tool call and inject reasoning prompt.
Processes only the FIRST tool call from pending_tool_calls for
sequential execution with reflection after each tool.
"""
if not self.state.pending_tool_calls:
return "native_tool_completed"
tool_call = self.state.pending_tool_calls[0]
self.state.pending_tool_calls = [] # Clear pending calls
# Extract tool call info - handle OpenAI, Anthropic, and Gemini formats
if hasattr(tool_call, "function"):
# OpenAI format: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.function.name
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini format: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
func_name = tool_call.function_call.name
func_args = (
dict(tool_call.function_call.args)
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.name
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
call_id = tool_call.get("id", f"call_{id(tool_call)}")
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
return "native_tool_completed"
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
],
}
self.state.messages.append(assistant_message)
# Parse arguments for the single tool call
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
),
)
# Execute the tool
result = "Tool not found"
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
result = tool_func(**args_dict)
if not isinstance(result, str):
result = str(result)
except Exception as e:
result = f"Error executing tool: {e}"
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"content": result,
}
self.state.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Tool {func_name} executed with result: {result[:200]}...",
color="green",
)
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "native_tool_completed"
@router(execute_native_tool)
def increment_native_and_continue(self) -> Literal["initialized"]:
"""Increment iteration counter after native tool execution."""
self.state.iterations += 1
return "initialized"
@listen("initialized")
def continue_iteration(self) -> Literal["check_iteration"]:
"""Bridge listener that connects iteration loop back to iteration check."""
@@ -656,14 +375,10 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@router(or_(initialize_reasoning, continue_iteration))
def check_max_iterations(
self,
) -> Literal[
"force_final_answer", "continue_reasoning", "continue_reasoning_native"
]:
) -> Literal["force_final_answer", "continue_reasoning"]:
"""Check if max iterations reached before proceeding with reasoning."""
if has_reached_max_iterations(self.state.iterations, self.max_iter):
return "force_final_answer"
if self.state.use_native_tools:
return "continue_reasoning_native"
return "continue_reasoning"
@router(execute_tool_action)
@@ -672,7 +387,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.iterations += 1
return "initialized"
@listen(or_("agent_finished", "tool_result_is_final", "native_finished"))
@listen(or_("agent_finished", "tool_result_is_final"))
def finalize(self) -> Literal["completed", "skipped"]:
"""Finalize execution and emit completion logs."""
if self.state.current_answer is None:
@@ -760,8 +475,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
self.state.use_native_tools = False
self.state.pending_tool_calls = []
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)

View File

@@ -1,5 +1,4 @@
import inspect
from typing import Any
from pydantic import BaseModel, Field, InstanceOf, model_validator
from typing_extensions import Self
@@ -15,14 +14,14 @@ class FlowTrackable(BaseModel):
inspecting the call stack.
"""
parent_flow: InstanceOf[Flow[Any]] | None = Field(
parent_flow: InstanceOf[Flow] | None = Field(
default=None,
description="The parent flow of the instance, if it was created inside a flow.",
)
@model_validator(mode="after")
def _set_parent_flow(self) -> Self:
max_depth = 8
max_depth = 5
frame = inspect.currentframe()
try:

View File

@@ -931,6 +931,7 @@ class LLM(BaseLLM):
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
if not tool_calls or not available_functions:
if response_model and self.is_litellm:
instructor_instance = InternalInstructor(
content=full_response,
@@ -1143,12 +1144,8 @@ class LLM(BaseLLM):
if response_model:
params["response_model"] = response_model
response = litellm.completion(**params)
if (
hasattr(response, "usage")
and not isinstance(response.usage, type)
and response.usage
):
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1202,19 +1199,16 @@ class LLM(BaseLLM):
)
return text_response
# --- 6) If there are tool calls but no available functions, return the tool calls
# This allows the caller (e.g., executor) to handle tool execution
if tool_calls and not available_functions:
# --- 6) If there is no text response, no available functions, but there are tool calls, return the tool calls
if tool_calls and not available_functions and not text_response:
return tool_calls
# --- 7) Handle tool calls if present (execute when available_functions provided)
if tool_calls and available_functions:
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# --- 7) Handle tool calls if present
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# --- 8) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(
response=text_response,
@@ -1279,11 +1273,7 @@ class LLM(BaseLLM):
params["response_model"] = response_model
response = await litellm.acompletion(**params)
if (
hasattr(response, "usage")
and not isinstance(response.usage, type)
and response.usage
):
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1331,18 +1321,14 @@ class LLM(BaseLLM):
)
return text_response
# If there are tool calls but no available functions, return the tool calls
# This allows the caller (e.g., executor) to handle tool execution
if tool_calls and not available_functions:
if tool_calls and not available_functions and not text_response:
return tool_calls
# Handle tool calls if present (execute when available_functions provided)
if tool_calls and available_functions:
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
self._handle_emit_call_events(
response=text_response,
@@ -1377,7 +1363,7 @@ class LLM(BaseLLM):
"""
full_response = ""
chunk_count = 0
usage_info = None
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict(

View File

@@ -445,7 +445,7 @@ class BaseLLM(ABC):
from_agent=from_agent,
)
return result
return str(result)
except Exception as e:
error_msg = f"Error executing function '{function_name}': {e!s}"

View File

@@ -418,7 +418,6 @@ class AnthropicCompletion(BaseLLM):
- System messages are separate from conversation messages
- Messages must alternate between user and assistant
- First message must be from user
- Tool results must be in user messages with tool_result content blocks
- When thinking is enabled, assistant messages must start with thinking blocks
Args:
@@ -432,7 +431,6 @@ class AnthropicCompletion(BaseLLM):
formatted_messages: list[LLMMessage] = []
system_message: str | None = None
pending_tool_results: list[dict[str, Any]] = []
for message in base_formatted:
role = message.get("role")
@@ -443,47 +441,16 @@ class AnthropicCompletion(BaseLLM):
system_message += f"\n\n{content}"
else:
system_message = cast(str, content)
elif role == "tool":
# Convert OpenAI-style tool message to Anthropic tool_result format
# These will be collected and added as a user message
tool_call_id = message.get("tool_call_id", "")
tool_result = {
"type": "tool_result",
"tool_use_id": tool_call_id,
"content": content if content else "",
}
pending_tool_results.append(tool_result)
elif role == "assistant":
# First, flush any pending tool results as a user message
if pending_tool_results:
formatted_messages.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
else:
role_str = role if role is not None else "user"
# Handle assistant message with tool_calls (convert to Anthropic format)
tool_calls = message.get("tool_calls", [])
if tool_calls:
assistant_content: list[dict[str, Any]] = []
for tc in tool_calls:
if isinstance(tc, dict):
func = tc.get("function", {})
tool_use = {
"type": "tool_use",
"id": tc.get("id", ""),
"name": func.get("name", ""),
"input": json.loads(func.get("arguments", "{}"))
if isinstance(func.get("arguments"), str)
else func.get("arguments", {}),
}
assistant_content.append(tool_use)
if assistant_content:
formatted_messages.append(
{"role": "assistant", "content": assistant_content}
)
elif isinstance(content, list):
formatted_messages.append({"role": "assistant", "content": content})
elif self.thinking and self.previous_thinking_blocks:
if isinstance(content, list):
formatted_messages.append({"role": role_str, "content": content})
elif (
role_str == "assistant"
and self.thinking
and self.previous_thinking_blocks
):
structured_content = cast(
list[dict[str, Any]],
[
@@ -492,34 +459,14 @@ class AnthropicCompletion(BaseLLM):
],
)
formatted_messages.append(
LLMMessage(role="assistant", content=structured_content)
LLMMessage(role=role_str, content=structured_content)
)
else:
content_str = content if content is not None else ""
formatted_messages.append(
LLMMessage(role="assistant", content=content_str)
)
else:
# User message - first flush any pending tool results
if pending_tool_results:
formatted_messages.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
role_str = role if role is not None else "user"
if isinstance(content, list):
formatted_messages.append({"role": role_str, "content": content})
else:
content_str = content if content is not None else ""
formatted_messages.append(
LLMMessage(role=role_str, content=content_str)
)
# Flush any remaining pending tool results
if pending_tool_results:
formatted_messages.append({"role": "user", "content": pending_tool_results})
# Ensure first message is from user (Anthropic requirement)
if not formatted_messages:
# If no messages, add a default user message
@@ -579,19 +526,13 @@ class AnthropicCompletion(BaseLLM):
return structured_json
# Check if Claude wants to use tools
if response.content:
if response.content and available_functions:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
# This allows the executor to manage tool execution with proper
# message history and post-tool reasoning prompts
if not available_functions:
return list(tool_uses)
# Handle tool use conversation flow internally
# Handle tool use conversation flow
return self._handle_tool_use_conversation(
response,
tool_uses,
@@ -755,7 +696,7 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if final_message.content:
if final_message.content and available_functions:
tool_uses = [
block
for block in final_message.content
@@ -763,11 +704,7 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
# Handle tool use conversation flow internally
# Handle tool use conversation flow
return self._handle_tool_use_conversation(
final_message,
tool_uses,
@@ -996,16 +933,12 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if response.content:
if response.content and available_functions:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
return await self._ahandle_tool_use_conversation(
response,
tool_uses,
@@ -1146,7 +1079,7 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if final_message.content:
if final_message.content and available_functions:
tool_uses = [
block
for block in final_message.content
@@ -1154,10 +1087,6 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
return await self._ahandle_tool_use_conversation(
final_message,
tool_uses,

View File

@@ -443,7 +443,7 @@ class AzureCompletion(BaseLLM):
params["presence_penalty"] = self.presence_penalty
if self.max_tokens is not None:
params["max_tokens"] = self.max_tokens
if self.stop and self.supports_stop_words():
if self.stop:
params["stop"] = self.stop
# Handle tools/functions for Azure OpenAI models
@@ -514,31 +514,10 @@ class AzureCompletion(BaseLLM):
for message in base_formatted:
role = message.get("role", "user") # Default to user if no role
# Handle None content - Azure requires string content
content = message.get("content") or ""
content = message.get("content", "")
# Handle tool role messages - keep as tool role for Azure OpenAI
if role == "tool":
tool_call_id = message.get("tool_call_id", "unknown")
azure_messages.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": content,
}
)
# Handle assistant messages with tool_calls
elif role == "assistant" and message.get("tool_calls"):
tool_calls = message.get("tool_calls", [])
azure_msg: LLMMessage = {
"role": "assistant",
"content": content, # Already defaulted to "" above
"tool_calls": tool_calls,
}
azure_messages.append(azure_msg)
else:
# Azure AI Inference requires both 'role' and 'content'
azure_messages.append({"role": role, "content": content})
# Azure AI Inference requires both 'role' and 'content'
azure_messages.append({"role": role, "content": content})
return azure_messages
@@ -625,11 +604,6 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
)
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
return list(message.tool_calls)
# Handle tool calls
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0] # Handle first tool call
@@ -801,21 +775,6 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
)
# If there are tool_calls but no available_functions, return them
# in OpenAI-compatible format for executor to handle
if tool_calls and not available_functions:
return [
{
"id": call_data.get("id", f"call_{idx}"),
"type": "function",
"function": {
"name": call_data["name"],
"arguments": call_data["arguments"],
},
}
for idx, call_data in tool_calls.items()
]
# Handle completed tool calls
if tool_calls and available_functions:
for call_data in tool_calls.values():
@@ -972,28 +931,8 @@ class AzureCompletion(BaseLLM):
return self.is_openai_model
def supports_stop_words(self) -> bool:
"""Check if the model supports stop words.
Models using the Responses API (GPT-5 family, o-series reasoning models,
computer-use-preview) do not support stop sequences.
See: https://learn.microsoft.com/en-us/azure/ai-foundry/foundry-models/concepts/models-sold-directly-by-azure
"""
model_lower = self.model.lower() if self.model else ""
if "gpt-5" in model_lower:
return False
o_series_models = ["o1", "o3", "o4", "o1-mini", "o3-mini", "o4-mini"]
responses_api_models = ["computer-use-preview"]
unsupported_stop_models = o_series_models + responses_api_models
for unsupported in unsupported_stop_models:
if unsupported in model_lower:
return False
return True
"""Check if the model supports stop words."""
return True # Most Azure models support stop sequences
def get_context_window_size(self) -> int:
"""Get the context window size for the model."""

View File

@@ -606,17 +606,6 @@ class GeminiCompletion(BaseLLM):
if response.candidates and (self.tools or available_functions):
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
# Collect function call parts
function_call_parts = [
part for part in candidate.content.parts if part.function_call
]
# If there are function calls but no available_functions,
# return them for the executor to handle (like OpenAI/Anthropic)
if function_call_parts and not available_functions:
return function_call_parts
# Otherwise execute the tools internally
for part in candidate.content.parts:
if part.function_call:
function_name = part.function_call.name
@@ -731,7 +720,7 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str | list[dict[str, Any]]:
) -> str:
"""Finalize streaming response with usage tracking, function execution, and events.
Args:
@@ -749,21 +738,6 @@ class GeminiCompletion(BaseLLM):
"""
self._track_token_usage_internal(usage_data)
# If there are function calls but no available_functions,
# return them for the executor to handle
if function_calls and not available_functions:
return [
{
"id": call_data["id"],
"function": {
"name": call_data["name"],
"arguments": json.dumps(call_data["args"]),
},
"type": "function",
}
for call_data in function_calls.values()
]
# Handle completed function calls
if function_calls and available_functions:
for call_data in function_calls.values():

View File

@@ -428,12 +428,6 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
return list(message.tool_calls)
# If there are tool_calls and available_functions, execute the tools
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0]
function_name = tool_call.function.name
@@ -731,15 +725,6 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
print("--------------------------------")
print("lorenze tool_calls", list(message.tool_calls))
print("--------------------------------")
return list(message.tool_calls)
# If there are tool_calls and available_functions, execute the tools
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0]
function_name = tool_call.function.name

View File

@@ -1,9 +1,12 @@
from crewai.tools.base_tool import BaseTool, EnvVar, tool
from crewai.tools.tool_search_tool import SearchStrategy, ToolSearchTool
__all__ = [
"BaseTool",
"EnvVar",
"SearchStrategy",
"ToolSearchTool",
"tool",
]

View File

@@ -0,0 +1,333 @@
"""Tool Search Tool for on-demand tool discovery.
This module implements a Tool Search Tool that allows agents to dynamically
discover and load tools on-demand, reducing token consumption when working
with large tool libraries.
Inspired by Anthropic's Tool Search Tool approach for on-demand tool loading.
"""
from __future__ import annotations
from collections.abc import Callable, Sequence
from enum import Enum
import json
import re
from typing import Any
from pydantic import BaseModel, Field
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.pydantic_schema_utils import generate_model_description
class SearchStrategy(str, Enum):
"""Search strategy for tool discovery."""
KEYWORD = "keyword"
REGEX = "regex"
class ToolSearchResult(BaseModel):
"""Result from a tool search operation."""
name: str = Field(description="The name of the tool")
description: str = Field(description="The description of the tool")
args_schema: dict[str, Any] = Field(
description="The JSON schema for the tool's arguments"
)
class ToolSearchToolSchema(BaseModel):
"""Schema for the Tool Search Tool arguments."""
query: str = Field(
description="The search query to find relevant tools. Use keywords that describe the capability you need."
)
max_results: int = Field(
default=5,
description="Maximum number of tools to return. Default is 5.",
ge=1,
le=20,
)
class ToolSearchTool(BaseTool):
"""A tool that searches through a catalog of tools to find relevant ones.
This tool enables on-demand tool discovery, allowing agents to work with
large tool libraries without loading all tool definitions upfront. Instead
of consuming tokens with all tool definitions, the agent can search for
relevant tools when needed.
Example:
```python
from crewai.tools import BaseTool, ToolSearchTool
# Create your tools
search_tool = MySearchTool()
scrape_tool = MyScrapeWebsiteTool()
database_tool = MyDatabaseTool()
# Create a tool search tool with your tool catalog
tool_search = ToolSearchTool(
tool_catalog=[search_tool, scrape_tool, database_tool],
search_strategy=SearchStrategy.KEYWORD,
)
# Use with an agent - only the tool_search is loaded initially
agent = Agent(
role="Researcher",
tools=[tool_search], # Other tools discovered on-demand
)
```
Attributes:
tool_catalog: List of tools available for search.
search_strategy: Strategy to use for searching (keyword or regex).
custom_search_fn: Optional custom search function for advanced matching.
"""
name: str = Field(
default="Tool Search",
description="The name of the tool search tool.",
)
description: str = Field(
default="Search for available tools by describing the capability you need. Returns tool definitions that match your query.",
description="Description of what the tool search tool does.",
)
args_schema: type[BaseModel] = Field(
default=ToolSearchToolSchema,
description="The schema for the tool search arguments.",
)
tool_catalog: list[BaseTool | CrewStructuredTool] = Field(
default_factory=list,
description="List of tools available for search.",
)
search_strategy: SearchStrategy = Field(
default=SearchStrategy.KEYWORD,
description="Strategy to use for searching tools.",
)
custom_search_fn: Callable[
[str, Sequence[BaseTool | CrewStructuredTool]], list[BaseTool | CrewStructuredTool]
] | None = Field(
default=None,
description="Optional custom search function for advanced matching.",
)
def _run(self, query: str, max_results: int = 5) -> str:
"""Search for tools matching the query.
Args:
query: The search query to find relevant tools.
max_results: Maximum number of tools to return.
Returns:
JSON string containing the matching tool definitions.
"""
if not self.tool_catalog:
return json.dumps(
{
"status": "error",
"message": "No tools available in the catalog.",
"tools": [],
}
)
if self.custom_search_fn:
matching_tools = self.custom_search_fn(query, self.tool_catalog)
elif self.search_strategy == SearchStrategy.REGEX:
matching_tools = self._regex_search(query)
else:
matching_tools = self._keyword_search(query)
matching_tools = matching_tools[:max_results]
if not matching_tools:
return json.dumps(
{
"status": "no_results",
"message": f"No tools found matching query: '{query}'. Try different keywords.",
"tools": [],
}
)
tool_results = []
for tool in matching_tools:
tool_info = self._get_tool_info(tool)
tool_results.append(tool_info)
return json.dumps(
{
"status": "success",
"message": f"Found {len(tool_results)} tool(s) matching your query.",
"tools": tool_results,
},
indent=2,
)
def _keyword_search(
self, query: str
) -> list[BaseTool | CrewStructuredTool]:
"""Search tools using keyword matching.
Args:
query: The search query.
Returns:
List of matching tools sorted by relevance.
"""
query_lower = query.lower()
query_words = set(query_lower.split())
scored_tools: list[tuple[float, BaseTool | CrewStructuredTool]] = []
for tool in self.tool_catalog:
score = self._calculate_keyword_score(tool, query_lower, query_words)
if score > 0:
scored_tools.append((score, tool))
scored_tools.sort(key=lambda x: x[0], reverse=True)
return [tool for _, tool in scored_tools]
def _calculate_keyword_score(
self,
tool: BaseTool | CrewStructuredTool,
query_lower: str,
query_words: set[str],
) -> float:
"""Calculate relevance score for a tool based on keyword matching.
Args:
tool: The tool to score.
query_lower: Lowercase query string.
query_words: Set of query words.
Returns:
Relevance score (higher is better).
"""
score = 0.0
tool_name_lower = tool.name.lower()
tool_desc_lower = tool.description.lower()
if query_lower in tool_name_lower:
score += 10.0
if query_lower in tool_desc_lower:
score += 5.0
for word in query_words:
if len(word) < 2:
continue
if word in tool_name_lower:
score += 3.0
if word in tool_desc_lower:
score += 1.0
return score
def _regex_search(
self, query: str
) -> list[BaseTool | CrewStructuredTool]:
"""Search tools using regex pattern matching.
Args:
query: The regex pattern to search for.
Returns:
List of matching tools.
"""
try:
pattern = re.compile(query, re.IGNORECASE)
except re.error:
pattern = re.compile(re.escape(query), re.IGNORECASE)
return [
tool
for tool in self.tool_catalog
if pattern.search(tool.name) or pattern.search(tool.description)
]
def _get_tool_info(self, tool: BaseTool | CrewStructuredTool) -> dict[str, Any]:
"""Get tool information as a dictionary.
Args:
tool: The tool to get information from.
Returns:
Dictionary containing tool name, description, and args schema.
"""
if isinstance(tool, BaseTool):
schema_dict = generate_model_description(tool.args_schema)
args_schema = schema_dict.get("json_schema", {}).get("schema", {})
else:
args_schema = tool.args_schema.model_json_schema()
return {
"name": tool.name,
"description": self._get_original_description(tool),
"args_schema": args_schema,
}
def _get_original_description(self, tool: BaseTool | CrewStructuredTool) -> str:
"""Get the original description of a tool without the generated schema.
Args:
tool: The tool to get the description from.
Returns:
The original tool description.
"""
description = tool.description
if "Tool Description:" in description:
parts = description.split("Tool Description:")
if len(parts) > 1:
return parts[1].strip()
return description
def add_tool(self, tool: BaseTool | CrewStructuredTool) -> None:
"""Add a tool to the catalog.
Args:
tool: The tool to add.
"""
self.tool_catalog.append(tool)
def add_tools(self, tools: Sequence[BaseTool | CrewStructuredTool]) -> None:
"""Add multiple tools to the catalog.
Args:
tools: The tools to add.
"""
self.tool_catalog.extend(tools)
def remove_tool(self, tool_name: str) -> bool:
"""Remove a tool from the catalog by name.
Args:
tool_name: The name of the tool to remove.
Returns:
True if the tool was removed, False if not found.
"""
for i, tool in enumerate(self.tool_catalog):
if tool.name == tool_name:
self.tool_catalog.pop(i)
return True
return False
def get_catalog_size(self) -> int:
"""Get the number of tools in the catalog.
Returns:
The number of tools in the catalog.
"""
return len(self.tool_catalog)
def list_tool_names(self) -> list[str]:
"""List all tool names in the catalog.
Returns:
List of tool names.
"""
return [tool.name for tool in self.tool_catalog]

View File

@@ -11,9 +11,6 @@
"role_playing": "You are {role}. {backstory}\nYour personal goal is: {goal}",
"tools": "\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n{tools}\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [{tool_names}], just the name, exactly as it's written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n```",
"no_tools": "\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!",
"native_tools": "\nUse available tools to gather information and complete your task.",
"native_task": "\nCurrent Task: {input}\n\nThis is VERY important to you, your job depends on it!",
"post_tool_reasoning": "PAUSE and THINK before responding.\n\nInternally consider (DO NOT output these steps):\n- What key insights did the tool provide?\n- Have I fulfilled ALL requirements from my original instructions (e.g., minimum tool calls, specific sources)?\n- Do I have enough information to fully answer the task?\n\nIF you have NOT met all requirements or need more information: Call another tool now.\n\nIF you have met all requirements and have sufficient information: Provide ONLY your final answer in the format specified by the task's expected output. Do NOT include reasoning steps, analysis sections, or meta-commentary. Just deliver the answer.",
"format": "I MUST either use a tool (use one at time) OR give my best final answer not both at the same time. When responding, I must use the following format:\n\n```\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action, dictionary enclosed in curly braces\nObservation: the result of the action\n```\nThis Thought/Action/Action Input/Result can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",
"final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```",
"format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nHere is the expected format I must follow:\n\n```\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n```\n This Thought/Action/Action Input/Result process can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",

View File

@@ -108,65 +108,6 @@ def render_text_description_and_args(
return "\n".join(tool_strings)
def convert_tools_to_openai_schema(
tools: Sequence[BaseTool | CrewStructuredTool],
) -> tuple[list[dict[str, Any]], dict[str, Callable[..., Any]]]:
"""Convert CrewAI tools to OpenAI function calling format.
This function converts CrewAI BaseTool and CrewStructuredTool objects
into the OpenAI-compatible tool schema format that can be passed to
LLM providers for native function calling.
Args:
tools: List of CrewAI tool objects to convert.
Returns:
Tuple containing:
- List of OpenAI-format tool schema dictionaries
- Dict mapping tool names to their callable run() methods
Example:
>>> tools = [CalculatorTool(), SearchTool()]
>>> schemas, functions = convert_tools_to_openai_schema(tools)
>>> # schemas can be passed to llm.call(tools=schemas)
>>> # functions can be passed to llm.call(available_functions=functions)
"""
openai_tools: list[dict[str, Any]] = []
available_functions: dict[str, Callable[..., Any]] = {}
for tool in tools:
# Get the JSON schema for tool parameters
parameters: dict[str, Any] = {}
if hasattr(tool, "args_schema") and tool.args_schema is not None:
try:
parameters = tool.args_schema.model_json_schema()
# Remove title and description from schema root as they're redundant
parameters.pop("title", None)
parameters.pop("description", None)
except Exception:
parameters = {}
# Extract original description from formatted description
# BaseTool formats description as "Tool Name: ...\nTool Arguments: ...\nTool Description: {original}"
description = tool.description
if "Tool Description:" in description:
# Extract the original description after "Tool Description:"
description = description.split("Tool Description:")[-1].strip()
schema: dict[str, Any] = {
"type": "function",
"function": {
"name": tool.name,
"description": description,
"parameters": parameters,
},
}
openai_tools.append(schema)
available_functions[tool.name] = tool.run
return openai_tools, available_functions
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
"""Check if the maximum number of iterations has been reached.
@@ -293,13 +234,11 @@ def get_llm_response(
messages: list[LLMMessage],
callbacks: list[TokenCalcHandler],
printer: Printer,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | LiteAgent | None = None,
) -> str | Any:
) -> str:
"""Call the LLM and return the response, handling any invalid responses.
Args:
@@ -307,16 +246,13 @@ def get_llm_response(
messages: The messages to send to the LLM.
callbacks: List of callbacks for the LLM call.
printer: Printer instance for output.
tools: Optional list of tool schemas for native function calling.
available_functions: Optional dict mapping function names to callables.
from_task: Optional task context for the LLM call.
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
Returns:
The response from the LLM as a string, or tool call results if
native function calling is used.
The response from the LLM as a string.
Raises:
Exception: If an error occurs.
@@ -331,9 +267,7 @@ def get_llm_response(
try:
answer = llm.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
response_model=response_model,
@@ -355,13 +289,11 @@ async def aget_llm_response(
messages: list[LLMMessage],
callbacks: list[TokenCalcHandler],
printer: Printer,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | None = None,
) -> str | Any:
) -> str:
"""Call the LLM asynchronously and return the response.
Args:
@@ -369,16 +301,13 @@ async def aget_llm_response(
messages: The messages to send to the LLM.
callbacks: List of callbacks for the LLM call.
printer: Printer instance for output.
tools: Optional list of tool schemas for native function calling.
available_functions: Optional dict mapping function names to callables.
from_task: Optional task context for the LLM call.
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
Returns:
The response from the LLM as a string, or tool call results if
native function calling is used.
The response from the LLM as a string.
Raises:
Exception: If an error occurs.
@@ -392,9 +321,7 @@ async def aget_llm_response(
try:
answer = await llm.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
response_model=response_model,

View File

@@ -22,9 +22,7 @@ class SystemPromptResult(StandardPromptResult):
user: Annotated[str, "The user prompt component"]
COMPONENTS = Literal[
"role_playing", "tools", "no_tools", "native_tools", "task", "native_task"
]
COMPONENTS = Literal["role_playing", "tools", "no_tools", "task"]
class Prompts(BaseModel):
@@ -38,10 +36,6 @@ class Prompts(BaseModel):
has_tools: bool = Field(
default=False, description="Indicates if the agent has access to tools"
)
use_native_tool_calling: bool = Field(
default=False,
description="Whether to use native function calling instead of ReAct format",
)
system_template: str | None = Field(
default=None, description="Custom system prompt template"
)
@@ -64,24 +58,12 @@ class Prompts(BaseModel):
A dictionary containing the constructed prompt(s).
"""
slices: list[COMPONENTS] = ["role_playing"]
# When using native tool calling with tools, use native_tools instructions
# When using ReAct pattern with tools, use tools instructions
# When no tools are available, use no_tools instructions
if self.has_tools:
if self.use_native_tool_calling:
slices.append("native_tools")
else:
slices.append("tools")
slices.append("tools")
else:
slices.append("no_tools")
system: str = self._build_prompt(slices)
# Use native_task for native tool calling (no "Thought:" prompt)
# Use task for ReAct pattern (includes "Thought:" prompt)
task_slice: COMPONENTS = (
"native_task" if self.use_native_tool_calling else "task"
)
slices.append(task_slice)
slices.append("task")
if (
not self.system_template
@@ -90,7 +72,7 @@ class Prompts(BaseModel):
):
return SystemPromptResult(
system=system,
user=self._build_prompt([task_slice]),
user=self._build_prompt(["task"]),
prompt=self._build_prompt(slices),
)
return StandardPromptResult(

View File

@@ -1,479 +0,0 @@
"""Integration tests for native tool calling functionality.
These tests verify that agents can use native function calling
when the LLM supports it, across multiple providers.
"""
from __future__ import annotations
import os
from typing import Any
from unittest.mock import patch, MagicMock
import pytest
from pydantic import BaseModel, Field
from crewai import Agent, Crew, Task
from crewai.llm import LLM
from crewai.tools.base_tool import BaseTool
# Check for optional provider availability
try:
import anthropic
HAS_ANTHROPIC = True
except ImportError:
HAS_ANTHROPIC = False
try:
import google.genai
HAS_GOOGLE_GENAI = True
except ImportError:
HAS_GOOGLE_GENAI = False
try:
import boto3
HAS_BOTO3 = True
except ImportError:
HAS_BOTO3 = False
class CalculatorInput(BaseModel):
"""Input schema for calculator tool."""
expression: str = Field(description="Mathematical expression to evaluate")
class CalculatorTool(BaseTool):
"""A calculator tool that performs mathematical calculations."""
name: str = "calculator"
description: str = "Perform mathematical calculations. Use this for any math operations."
args_schema: type[BaseModel] = CalculatorInput
def _run(self, expression: str) -> str:
"""Execute the calculation."""
try:
# Safe evaluation for basic math
result = eval(expression) # noqa: S307
return f"The result of {expression} is {result}"
except Exception as e:
return f"Error calculating {expression}: {e}"
class WeatherInput(BaseModel):
"""Input schema for weather tool."""
location: str = Field(description="City name to get weather for")
class WeatherTool(BaseTool):
"""A mock weather tool for testing."""
name: str = "get_weather"
description: str = "Get the current weather for a location"
args_schema: type[BaseModel] = WeatherInput
def _run(self, location: str) -> str:
"""Get weather (mock implementation)."""
return f"The weather in {location} is sunny with a temperature of 72°F"
@pytest.fixture
def calculator_tool() -> CalculatorTool:
"""Create a calculator tool for testing."""
return CalculatorTool()
@pytest.fixture
def weather_tool() -> WeatherTool:
"""Create a weather tool for testing."""
return WeatherTool()
# =============================================================================
# OpenAI Provider Tests
# =============================================================================
class TestOpenAINativeToolCalling:
"""Tests for native tool calling with OpenAI models."""
@pytest.mark.vcr()
def test_openai_agent_with_native_tool_calling(
self, calculator_tool: CalculatorTool
) -> None:
"""Test OpenAI agent can use native tool calling."""
agent = Agent(
role="Math Assistant",
goal="Help users with mathematical calculations",
backstory="You are a helpful math assistant.",
tools=[calculator_tool],
llm=LLM(model="gpt-4o-mini"),
verbose=False,
max_iter=3,
)
task = Task(
description="Calculate what is 15 * 8",
expected_output="The result of the calculation",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
assert "120" in str(result.raw)
def test_openai_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test OpenAI agent kickoff with mocked LLM call."""
llm = LLM(model="gpt-4o-mini")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Anthropic Provider Tests
# =============================================================================
@pytest.mark.skipif(not HAS_ANTHROPIC, reason="anthropic package not installed")
class TestAnthropicNativeToolCalling:
"""Tests for native tool calling with Anthropic models."""
@pytest.fixture(autouse=True)
def mock_anthropic_api_key(self):
"""Mock ANTHROPIC_API_KEY for tests."""
if "ANTHROPIC_API_KEY" not in os.environ:
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "test-key"}):
yield
else:
yield
@pytest.mark.vcr()
def test_anthropic_agent_with_native_tool_calling(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Anthropic agent can use native tool calling."""
agent = Agent(
role="Math Assistant",
goal="Help users with mathematical calculations",
backstory="You are a helpful math assistant.",
tools=[calculator_tool],
llm=LLM(model="anthropic/claude-3-5-haiku-20241022"),
verbose=False,
max_iter=3,
)
task = Task(
description="Calculate what is 15 * 8",
expected_output="The result of the calculation",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
def test_anthropic_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Anthropic agent kickoff with mocked LLM call."""
llm = LLM(model="anthropic/claude-3-5-haiku-20241022")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Google/Gemini Provider Tests
# =============================================================================
@pytest.mark.skipif(not HAS_GOOGLE_GENAI, reason="google-genai package not installed")
class TestGeminiNativeToolCalling:
"""Tests for native tool calling with Gemini models."""
@pytest.fixture(autouse=True)
def mock_google_api_key(self):
"""Mock GOOGLE_API_KEY for tests."""
with patch.dict(os.environ, {"GOOGLE_API_KEY": "test-key"}):
yield
@pytest.mark.vcr()
def test_gemini_agent_with_native_tool_calling(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Gemini agent can use native tool calling."""
agent = Agent(
role="Math Assistant",
goal="Help users with mathematical calculations",
backstory="You are a helpful math assistant.",
tools=[calculator_tool],
llm=LLM(model="gemini/gemini-2.0-flash-001"),
verbose=False,
max_iter=3,
)
task = Task(
description="Calculate what is 15 * 8",
expected_output="The result of the calculation",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
def test_gemini_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Gemini agent kickoff with mocked LLM call."""
llm = LLM(model="gemini/gemini-2.0-flash-001")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Azure Provider Tests
# =============================================================================
class TestAzureNativeToolCalling:
"""Tests for native tool calling with Azure OpenAI models."""
@pytest.fixture(autouse=True)
def mock_azure_env(self):
"""Mock Azure environment variables for tests."""
env_vars = {
"AZURE_API_KEY": "test-key",
"AZURE_API_BASE": "https://test.openai.azure.com",
"AZURE_API_VERSION": "2024-02-15-preview",
}
with patch.dict(os.environ, env_vars):
yield
def test_azure_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Azure agent kickoff with mocked LLM call."""
llm = LLM(
model="azure/gpt-4o-mini",
api_key="test-key",
base_url="https://test.openai.azure.com",
)
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Bedrock Provider Tests
# =============================================================================
@pytest.mark.skipif(not HAS_BOTO3, reason="boto3 package not installed")
class TestBedrockNativeToolCalling:
"""Tests for native tool calling with AWS Bedrock models."""
@pytest.fixture(autouse=True)
def mock_aws_env(self):
"""Mock AWS environment variables for tests."""
env_vars = {
"AWS_ACCESS_KEY_ID": "test-key",
"AWS_SECRET_ACCESS_KEY": "test-secret",
"AWS_REGION": "us-east-1",
}
with patch.dict(os.environ, env_vars):
yield
def test_bedrock_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Bedrock agent kickoff with mocked LLM call."""
llm = LLM(model="bedrock/anthropic.claude-3-haiku-20240307-v1:0")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Cross-Provider Native Tool Calling Behavior Tests
# =============================================================================
class TestNativeToolCallingBehavior:
"""Tests for native tool calling behavior across providers."""
def test_supports_function_calling_check(self) -> None:
"""Test that supports_function_calling() is properly checked."""
# OpenAI should support function calling
openai_llm = LLM(model="gpt-4o-mini")
assert hasattr(openai_llm, "supports_function_calling")
assert openai_llm.supports_function_calling() is True
@pytest.mark.skipif(not HAS_ANTHROPIC, reason="anthropic package not installed")
def test_anthropic_supports_function_calling(self) -> None:
"""Test that Anthropic models support function calling."""
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "test-key"}):
llm = LLM(model="anthropic/claude-3-5-haiku-20241022")
assert hasattr(llm, "supports_function_calling")
assert llm.supports_function_calling() is True
@pytest.mark.skipif(not HAS_GOOGLE_GENAI, reason="google-genai package not installed")
def test_gemini_supports_function_calling(self) -> None:
"""Test that Gemini models support function calling."""
# with patch.dict(os.environ, {"GOOGLE_API_KEY": "test-key"}):
print("GOOGLE_API_KEY", os.getenv("GOOGLE_API_KEY"))
llm = LLM(model="gemini/gemini-2.5-flash")
assert hasattr(llm, "supports_function_calling")
# Gemini uses supports_tools property
assert llm.supports_function_calling() is True
# =============================================================================
# Token Usage Tests
# =============================================================================
class TestNativeToolCallingTokenUsage:
"""Tests for token usage with native tool calling."""
@pytest.mark.vcr()
def test_openai_native_tool_calling_token_usage(
self, calculator_tool: CalculatorTool
) -> None:
"""Test token usage tracking with OpenAI native tool calling."""
agent = Agent(
role="Calculator",
goal="Perform calculations efficiently",
backstory="You calculate things.",
tools=[calculator_tool],
llm=LLM(model="gpt-4o-mini"),
verbose=False,
max_iter=3,
)
task = Task(
description="What is 100 / 4?",
expected_output="The result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.token_usage is not None
assert result.token_usage.total_tokens > 0
assert result.token_usage.successful_requests >= 1
print(f"\n[OPENAI NATIVE TOOL CALLING TOKEN USAGE]")
print(f" Prompt tokens: {result.token_usage.prompt_tokens}")
print(f" Completion tokens: {result.token_usage.completion_tokens}")
print(f" Total tokens: {result.token_usage.total_tokens}")

View File

@@ -515,94 +515,6 @@ def test_azure_supports_stop_words():
assert llm.supports_stop_words() == True
def test_azure_gpt5_models_do_not_support_stop_words():
"""
Test that GPT-5 family models do not support stop words.
GPT-5 models use the Responses API which doesn't support stop sequences.
See: https://learn.microsoft.com/en-us/azure/ai-foundry/foundry-models/concepts/models-sold-directly-by-azure
"""
# GPT-5 base models
gpt5_models = [
"azure/gpt-5",
"azure/gpt-5-mini",
"azure/gpt-5-nano",
"azure/gpt-5-chat",
# GPT-5.1 series
"azure/gpt-5.1",
"azure/gpt-5.1-chat",
"azure/gpt-5.1-codex",
"azure/gpt-5.1-codex-mini",
# GPT-5.2 series
"azure/gpt-5.2",
"azure/gpt-5.2-chat",
]
for model_name in gpt5_models:
llm = LLM(model=model_name)
assert llm.supports_stop_words() == False, f"Expected {model_name} to NOT support stop words"
def test_azure_o_series_models_do_not_support_stop_words():
"""
Test that o-series reasoning models do not support stop words.
"""
o_series_models = [
"azure/o1",
"azure/o1-mini",
"azure/o3",
"azure/o3-mini",
"azure/o4",
"azure/o4-mini",
]
for model_name in o_series_models:
llm = LLM(model=model_name)
assert llm.supports_stop_words() == False, f"Expected {model_name} to NOT support stop words"
def test_azure_responses_api_models_do_not_support_stop_words():
"""
Test that models using the Responses API do not support stop words.
"""
responses_api_models = [
"azure/computer-use-preview",
]
for model_name in responses_api_models:
llm = LLM(model=model_name)
assert llm.supports_stop_words() == False, f"Expected {model_name} to NOT support stop words"
def test_azure_stop_words_not_included_for_unsupported_models():
"""
Test that stop words are not included in completion params for models that don't support them.
"""
with patch.dict(os.environ, {
"AZURE_API_KEY": "test-key",
"AZURE_ENDPOINT": "https://models.inference.ai.azure.com"
}):
# Test GPT-5 model - stop should NOT be included even if set
llm_gpt5 = LLM(
model="azure/gpt-5-nano",
stop=["STOP", "END"]
)
params = llm_gpt5._prepare_completion_params(
messages=[{"role": "user", "content": "test"}]
)
assert "stop" not in params, "stop should not be included for GPT-5 models"
# Test regular model - stop SHOULD be included
llm_gpt4 = LLM(
model="azure/gpt-4",
stop=["STOP", "END"]
)
params = llm_gpt4._prepare_completion_params(
messages=[{"role": "user", "content": "test"}]
)
assert "stop" in params, "stop should be included for GPT-4 models"
assert params["stop"] == ["STOP", "END"]
def test_azure_context_window_size():
"""
Test that Azure models return correct context window sizes

View File

@@ -4500,71 +4500,6 @@ def test_crew_copy_with_memory():
pytest.fail(f"Copying crew raised an unexpected exception: {e}")
def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
@CrewBase
class TestCrew:
agents_config = None
tasks_config = None
agents: list[BaseAgent]
tasks: list[Task]
@agent
def researcher(self) -> Agent:
return Agent(
role="Researcher",
goal="Research things",
backstory="Expert researcher",
)
@agent
def writer_agent(self) -> Agent:
return Agent(
role="Writer",
goal="Write things",
backstory="Expert writer",
)
@task
def research_task(self) -> Task:
return Task(
description="Test task for researcher",
expected_output="output",
agent=self.researcher(),
)
@task
def write_task(self) -> Task:
return Task(
description="Test task for writer",
expected_output="output",
agent=self.writer_agent(),
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
)
captured_crew = None
class MyFlow(Flow):
@start()
def start_method(self):
nonlocal captured_crew
captured_crew = TestCrew().crew()
return captured_crew
flow = MyFlow()
flow.kickoff()
assert captured_crew is not None
assert captured_crew.parent_flow is flow
def test_sets_parent_flow_when_outside_flow(researcher, writer):
crew = Crew(
agents=[researcher, writer],

View File

@@ -0,0 +1,393 @@
"""Tests for the ToolSearchTool functionality."""
import json
import pytest
from pydantic import BaseModel
from crewai.tools import BaseTool, SearchStrategy, ToolSearchTool
class MockSearchTool(BaseTool):
"""A mock search tool for testing."""
name: str = "Web Search"
description: str = "Search the web for information on any topic."
def _run(self, query: str) -> str:
return f"Search results for: {query}"
class MockDatabaseTool(BaseTool):
"""A mock database tool for testing."""
name: str = "Database Query"
description: str = "Query a SQL database to retrieve data."
def _run(self, query: str) -> str:
return f"Database results for: {query}"
class MockScrapeTool(BaseTool):
"""A mock web scraping tool for testing."""
name: str = "Web Scraper"
description: str = "Scrape content from websites and extract text."
def _run(self, url: str) -> str:
return f"Scraped content from: {url}"
class MockEmailTool(BaseTool):
"""A mock email tool for testing."""
name: str = "Send Email"
description: str = "Send an email to a specified recipient."
def _run(self, to: str, subject: str, body: str) -> str:
return f"Email sent to {to}"
class MockCalculatorTool(BaseTool):
"""A mock calculator tool for testing."""
name: str = "Calculator"
description: str = "Perform mathematical calculations and arithmetic operations."
def _run(self, expression: str) -> str:
return f"Result: {eval(expression)}"
@pytest.fixture
def sample_tools() -> list[BaseTool]:
"""Create a list of sample tools for testing."""
return [
MockSearchTool(),
MockDatabaseTool(),
MockScrapeTool(),
MockEmailTool(),
MockCalculatorTool(),
]
@pytest.fixture
def tool_search(sample_tools: list[BaseTool]) -> ToolSearchTool:
"""Create a ToolSearchTool with sample tools."""
return ToolSearchTool(tool_catalog=sample_tools)
class TestToolSearchToolCreation:
"""Tests for ToolSearchTool creation and initialization."""
def test_create_tool_search_with_empty_catalog(self) -> None:
"""Test creating a ToolSearchTool with an empty catalog."""
tool_search = ToolSearchTool()
assert tool_search.name == "Tool Search"
assert tool_search.tool_catalog == []
assert tool_search.search_strategy == SearchStrategy.KEYWORD
def test_create_tool_search_with_tools(self, sample_tools: list[BaseTool]) -> None:
"""Test creating a ToolSearchTool with a list of tools."""
tool_search = ToolSearchTool(tool_catalog=sample_tools)
assert len(tool_search.tool_catalog) == 5
assert tool_search.get_catalog_size() == 5
def test_create_tool_search_with_regex_strategy(
self, sample_tools: list[BaseTool]
) -> None:
"""Test creating a ToolSearchTool with regex search strategy."""
tool_search = ToolSearchTool(
tool_catalog=sample_tools, search_strategy=SearchStrategy.REGEX
)
assert tool_search.search_strategy == SearchStrategy.REGEX
def test_create_tool_search_with_custom_name(self) -> None:
"""Test creating a ToolSearchTool with a custom name."""
tool_search = ToolSearchTool(name="My Tool Finder")
assert tool_search.name == "My Tool Finder"
class TestToolSearchKeywordSearch:
"""Tests for keyword-based tool search."""
def test_search_by_exact_name(self, tool_search: ToolSearchTool) -> None:
"""Test searching for a tool by its exact name."""
result = tool_search._run("Web Search")
result_data = json.loads(result)
assert result_data["status"] == "success"
assert len(result_data["tools"]) >= 1
assert result_data["tools"][0]["name"] == "Web Search"
def test_search_by_partial_name(self, tool_search: ToolSearchTool) -> None:
"""Test searching for a tool by partial name."""
result = tool_search._run("Search")
result_data = json.loads(result)
assert result_data["status"] == "success"
assert len(result_data["tools"]) >= 1
tool_names = [t["name"] for t in result_data["tools"]]
assert "Web Search" in tool_names
def test_search_by_description_keyword(self, tool_search: ToolSearchTool) -> None:
"""Test searching for a tool by keyword in description."""
result = tool_search._run("database")
result_data = json.loads(result)
assert result_data["status"] == "success"
assert len(result_data["tools"]) >= 1
tool_names = [t["name"] for t in result_data["tools"]]
assert "Database Query" in tool_names
def test_search_with_multiple_keywords(self, tool_search: ToolSearchTool) -> None:
"""Test searching with multiple keywords."""
result = tool_search._run("web scrape content")
result_data = json.loads(result)
assert result_data["status"] == "success"
assert len(result_data["tools"]) >= 1
tool_names = [t["name"] for t in result_data["tools"]]
assert "Web Scraper" in tool_names
def test_search_no_results(self, tool_search: ToolSearchTool) -> None:
"""Test searching with a query that returns no results."""
result = tool_search._run("xyznonexistent123abc")
result_data = json.loads(result)
assert result_data["status"] == "no_results"
assert len(result_data["tools"]) == 0
def test_search_max_results_limit(self, tool_search: ToolSearchTool) -> None:
"""Test that max_results limits the number of returned tools."""
result = tool_search._run("tool", max_results=2)
result_data = json.loads(result)
assert result_data["status"] == "success"
assert len(result_data["tools"]) <= 2
def test_search_empty_catalog(self) -> None:
"""Test searching with an empty tool catalog."""
tool_search = ToolSearchTool()
result = tool_search._run("search")
result_data = json.loads(result)
assert result_data["status"] == "error"
assert "No tools available" in result_data["message"]
class TestToolSearchRegexSearch:
"""Tests for regex-based tool search."""
def test_regex_search_simple_pattern(
self, sample_tools: list[BaseTool]
) -> None:
"""Test regex search with a simple pattern."""
tool_search = ToolSearchTool(
tool_catalog=sample_tools, search_strategy=SearchStrategy.REGEX
)
result = tool_search._run("Web.*")
result_data = json.loads(result)
assert result_data["status"] == "success"
tool_names = [t["name"] for t in result_data["tools"]]
assert "Web Search" in tool_names or "Web Scraper" in tool_names
def test_regex_search_case_insensitive(
self, sample_tools: list[BaseTool]
) -> None:
"""Test that regex search is case insensitive."""
tool_search = ToolSearchTool(
tool_catalog=sample_tools, search_strategy=SearchStrategy.REGEX
)
result = tool_search._run("email")
result_data = json.loads(result)
assert result_data["status"] == "success"
tool_names = [t["name"] for t in result_data["tools"]]
assert "Send Email" in tool_names
def test_regex_search_invalid_pattern_fallback(
self, sample_tools: list[BaseTool]
) -> None:
"""Test that invalid regex patterns are escaped and still work."""
tool_search = ToolSearchTool(
tool_catalog=sample_tools, search_strategy=SearchStrategy.REGEX
)
result = tool_search._run("[invalid(regex")
result_data = json.loads(result)
assert result_data["status"] in ["success", "no_results"]
class TestToolSearchCustomSearch:
"""Tests for custom search function."""
def test_custom_search_function(self, sample_tools: list[BaseTool]) -> None:
"""Test using a custom search function."""
def custom_search(
query: str, tools: list[BaseTool]
) -> list[BaseTool]:
return [t for t in tools if "email" in t.name.lower()]
tool_search = ToolSearchTool(
tool_catalog=sample_tools, custom_search_fn=custom_search
)
result = tool_search._run("anything")
result_data = json.loads(result)
assert result_data["status"] == "success"
assert len(result_data["tools"]) == 1
assert result_data["tools"][0]["name"] == "Send Email"
class TestToolSearchCatalogManagement:
"""Tests for tool catalog management."""
def test_add_tool(self, tool_search: ToolSearchTool) -> None:
"""Test adding a tool to the catalog."""
initial_size = tool_search.get_catalog_size()
class NewTool(BaseTool):
name: str = "New Tool"
description: str = "A new tool for testing."
def _run(self) -> str:
return "New tool result"
tool_search.add_tool(NewTool())
assert tool_search.get_catalog_size() == initial_size + 1
def test_add_tools(self, tool_search: ToolSearchTool) -> None:
"""Test adding multiple tools to the catalog."""
initial_size = tool_search.get_catalog_size()
class NewTool1(BaseTool):
name: str = "New Tool 1"
description: str = "First new tool."
def _run(self) -> str:
return "Result 1"
class NewTool2(BaseTool):
name: str = "New Tool 2"
description: str = "Second new tool."
def _run(self) -> str:
return "Result 2"
tool_search.add_tools([NewTool1(), NewTool2()])
assert tool_search.get_catalog_size() == initial_size + 2
def test_remove_tool(self, tool_search: ToolSearchTool) -> None:
"""Test removing a tool from the catalog."""
initial_size = tool_search.get_catalog_size()
result = tool_search.remove_tool("Web Search")
assert result is True
assert tool_search.get_catalog_size() == initial_size - 1
def test_remove_nonexistent_tool(self, tool_search: ToolSearchTool) -> None:
"""Test removing a tool that doesn't exist."""
initial_size = tool_search.get_catalog_size()
result = tool_search.remove_tool("Nonexistent Tool")
assert result is False
assert tool_search.get_catalog_size() == initial_size
def test_list_tool_names(self, tool_search: ToolSearchTool) -> None:
"""Test listing all tool names in the catalog."""
names = tool_search.list_tool_names()
assert len(names) == 5
assert "Web Search" in names
assert "Database Query" in names
assert "Web Scraper" in names
assert "Send Email" in names
assert "Calculator" in names
class TestToolSearchResultFormat:
"""Tests for the format of search results."""
def test_result_contains_tool_info(self, tool_search: ToolSearchTool) -> None:
"""Test that search results contain complete tool information."""
result = tool_search._run("Calculator")
result_data = json.loads(result)
assert result_data["status"] == "success"
tool_info = result_data["tools"][0]
assert "name" in tool_info
assert "description" in tool_info
assert "args_schema" in tool_info
assert tool_info["name"] == "Calculator"
def test_result_args_schema_format(self, tool_search: ToolSearchTool) -> None:
"""Test that args_schema is properly formatted."""
result = tool_search._run("Email")
result_data = json.loads(result)
assert result_data["status"] == "success"
tool_info = result_data["tools"][0]
assert "args_schema" in tool_info
args_schema = tool_info["args_schema"]
assert isinstance(args_schema, dict)
class TestToolSearchIntegration:
"""Integration tests for ToolSearchTool."""
def test_tool_search_as_base_tool(self, sample_tools: list[BaseTool]) -> None:
"""Test that ToolSearchTool works as a BaseTool."""
tool_search = ToolSearchTool(tool_catalog=sample_tools)
assert isinstance(tool_search, BaseTool)
assert tool_search.name == "Tool Search"
assert "search" in tool_search.description.lower()
def test_tool_search_to_structured_tool(
self, sample_tools: list[BaseTool]
) -> None:
"""Test converting ToolSearchTool to structured tool."""
tool_search = ToolSearchTool(tool_catalog=sample_tools)
structured = tool_search.to_structured_tool()
assert structured.name == "Tool Search"
assert structured.args_schema is not None
def test_tool_search_run_method(self, tool_search: ToolSearchTool) -> None:
"""Test the run method of ToolSearchTool."""
result = tool_search.run(query="search", max_results=3)
assert isinstance(result, str)
result_data = json.loads(result)
assert "status" in result_data
assert "tools" in result_data
class TestToolSearchScoring:
"""Tests for the keyword scoring algorithm."""
def test_exact_name_match_scores_highest(
self, sample_tools: list[BaseTool]
) -> None:
"""Test that exact name matches score higher than partial matches."""
tool_search = ToolSearchTool(tool_catalog=sample_tools)
result = tool_search._run("Web Search")
result_data = json.loads(result)
assert result_data["status"] == "success"
assert result_data["tools"][0]["name"] == "Web Search"
def test_name_match_scores_higher_than_description(
self, sample_tools: list[BaseTool]
) -> None:
"""Test that name matches score higher than description matches."""
tool_search = ToolSearchTool(tool_catalog=sample_tools)
result = tool_search._run("Calculator")
result_data = json.loads(result)
assert result_data["status"] == "success"
assert result_data["tools"][0]["name"] == "Calculator"

View File

@@ -1,214 +0,0 @@
"""Tests for agent utility functions."""
from __future__ import annotations
from typing import Any
import pytest
from pydantic import BaseModel, Field
from crewai.tools.base_tool import BaseTool
from crewai.utilities.agent_utils import convert_tools_to_openai_schema
class CalculatorInput(BaseModel):
"""Input schema for calculator tool."""
expression: str = Field(description="Mathematical expression to evaluate")
class CalculatorTool(BaseTool):
"""A simple calculator tool for testing."""
name: str = "calculator"
description: str = "Perform mathematical calculations"
args_schema: type[BaseModel] = CalculatorInput
def _run(self, expression: str) -> str:
"""Execute the calculation."""
try:
result = eval(expression) # noqa: S307
return str(result)
except Exception as e:
return f"Error: {e}"
class SearchInput(BaseModel):
"""Input schema for search tool."""
query: str = Field(description="Search query")
max_results: int = Field(default=10, description="Maximum number of results")
class SearchTool(BaseTool):
"""A search tool for testing."""
name: str = "web_search"
description: str = "Search the web for information"
args_schema: type[BaseModel] = SearchInput
def _run(self, query: str, max_results: int = 10) -> str:
"""Execute the search."""
return f"Search results for '{query}' (max {max_results})"
class NoSchemaTool(BaseTool):
"""A tool without an args schema for testing edge cases."""
name: str = "simple_tool"
description: str = "A simple tool with no schema"
def _run(self, **kwargs: Any) -> str:
"""Execute the tool."""
return "Simple tool executed"
class TestConvertToolsToOpenaiSchema:
"""Tests for convert_tools_to_openai_schema function."""
def test_converts_single_tool(self) -> None:
"""Test converting a single tool to OpenAI schema."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert len(schemas) == 1
assert len(functions) == 1
schema = schemas[0]
assert schema["type"] == "function"
assert schema["function"]["name"] == "calculator"
assert schema["function"]["description"] == "Perform mathematical calculations"
assert "properties" in schema["function"]["parameters"]
assert "expression" in schema["function"]["parameters"]["properties"]
def test_converts_multiple_tools(self) -> None:
"""Test converting multiple tools to OpenAI schema."""
tools = [CalculatorTool(), SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert len(schemas) == 2
assert len(functions) == 2
# Check calculator
calc_schema = next(s for s in schemas if s["function"]["name"] == "calculator")
assert calc_schema["function"]["description"] == "Perform mathematical calculations"
# Check search
search_schema = next(s for s in schemas if s["function"]["name"] == "web_search")
assert search_schema["function"]["description"] == "Search the web for information"
assert "query" in search_schema["function"]["parameters"]["properties"]
assert "max_results" in search_schema["function"]["parameters"]["properties"]
def test_functions_dict_contains_callables(self) -> None:
"""Test that the functions dict maps names to callable run methods."""
tools = [CalculatorTool(), SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert "calculator" in functions
assert "web_search" in functions
assert callable(functions["calculator"])
assert callable(functions["web_search"])
def test_function_can_be_called(self) -> None:
"""Test that the returned function can be called."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
result = functions["calculator"](expression="2 + 2")
assert result == "4"
def test_empty_tools_list(self) -> None:
"""Test with an empty tools list."""
schemas, functions = convert_tools_to_openai_schema([])
assert schemas == []
assert functions == {}
def test_schema_has_required_fields(self) -> None:
"""Test that the schema includes required fields information."""
tools = [SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
schema = schemas[0]
params = schema["function"]["parameters"]
# Should have required array
assert "required" in params
assert "query" in params["required"]
def test_tool_without_args_schema(self) -> None:
"""Test converting a tool that doesn't have an args_schema."""
# Create a minimal tool without args_schema
class MinimalTool(BaseTool):
name: str = "minimal"
description: str = "A minimal tool"
def _run(self) -> str:
return "done"
tools = [MinimalTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert len(schemas) == 1
schema = schemas[0]
assert schema["function"]["name"] == "minimal"
# Parameters should be empty dict or have minimal schema
assert isinstance(schema["function"]["parameters"], dict)
def test_schema_structure_matches_openai_format(self) -> None:
"""Test that the schema structure matches OpenAI's expected format."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
schema = schemas[0]
# Top level must have "type": "function"
assert schema["type"] == "function"
# Must have "function" key with nested structure
assert "function" in schema
func = schema["function"]
# Function must have name and description
assert "name" in func
assert "description" in func
assert isinstance(func["name"], str)
assert isinstance(func["description"], str)
# Parameters should be a valid JSON schema
assert "parameters" in func
params = func["parameters"]
assert isinstance(params, dict)
def test_removes_redundant_schema_fields(self) -> None:
"""Test that redundant title and description are removed from parameters."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
params = schemas[0]["function"]["parameters"]
# Title should be removed as it's redundant with function name
assert "title" not in params
def test_preserves_field_descriptions(self) -> None:
"""Test that field descriptions are preserved in the schema."""
tools = [SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
params = schemas[0]["function"]["parameters"]
query_prop = params["properties"]["query"]
# Field description should be preserved
assert "description" in query_prop
assert query_prop["description"] == "Search query"
def test_preserves_default_values(self) -> None:
"""Test that default values are preserved in the schema."""
tools = [SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
params = schemas[0]["function"]["parameters"]
max_results_prop = params["properties"]["max_results"]
# Default value should be preserved
assert "default" in max_results_prop
assert max_results_prop["default"] == 10