Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
ca1f8fd7e0 feat: add MCP Discovery tool for dynamic MCP server discovery
This adds a new MCPDiscoveryTool that enables CrewAI agents to dynamically
discover MCP servers based on natural language queries using the MCP
Discovery API.

Features:
- Semantic search for MCP servers using natural language
- Support for constraints (max latency, required features, excluded servers)
- Returns server recommendations with installation instructions, capabilities,
  and performance metrics
- Includes comprehensive test coverage

Closes #4249

Co-Authored-By: João <joao@crewai.com>
2026-01-17 19:38:49 +00:00
35 changed files with 6559 additions and 7720 deletions

View File

@@ -375,13 +375,10 @@ In this section, you'll find detailed examples that help you select, configure,
GOOGLE_API_KEY=<your-api-key>
GEMINI_API_KEY=<your-api-key>
# For Vertex AI Express mode (API key authentication)
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
# For Vertex AI with service account
# Optional - for Vertex AI
GOOGLE_CLOUD_PROJECT=<your-project-id>
GOOGLE_CLOUD_LOCATION=<location> # Defaults to us-central1
GOOGLE_GENAI_USE_VERTEXAI=true # Set to use Vertex AI
```
**Basic Usage:**
@@ -415,35 +412,7 @@ In this section, you'll find detailed examples that help you select, configure,
)
```
**Vertex AI Express Mode (API Key Authentication):**
Vertex AI Express mode allows you to use Vertex AI with simple API key authentication instead of service account credentials. This is the quickest way to get started with Vertex AI.
To enable Express mode, set both environment variables in your `.env` file:
```toml .env
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
```
Then use the LLM as usual:
```python Code
from crewai import LLM
llm = LLM(
model="gemini/gemini-2.0-flash",
temperature=0.7
)
```
<Info>
To get an Express mode API key:
- New Google Cloud users: Get an [express mode API key](https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey)
- Existing Google Cloud users: Get a [Google Cloud API key bound to a service account](https://cloud.google.com/docs/authentication/api-keys)
For more details, see the [Vertex AI Express mode documentation](https://docs.cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey).
</Info>
**Vertex AI Configuration (Service Account):**
**Vertex AI Configuration:**
```python Code
from crewai import LLM
@@ -455,10 +424,10 @@ In this section, you'll find detailed examples that help you select, configure,
```
**Supported Environment Variables:**
- `GOOGLE_API_KEY` or `GEMINI_API_KEY`: Your Google API key (required for Gemini API and Vertex AI Express mode)
- `GOOGLE_GENAI_USE_VERTEXAI`: Set to `true` to use Vertex AI (required for Express mode)
- `GOOGLE_CLOUD_PROJECT`: Google Cloud project ID (for Vertex AI with service account)
- `GOOGLE_API_KEY` or `GEMINI_API_KEY`: Your Google API key (required for Gemini API)
- `GOOGLE_CLOUD_PROJECT`: Google Cloud project ID (for Vertex AI)
- `GOOGLE_CLOUD_LOCATION`: GCP location (defaults to `us-central1`)
- `GOOGLE_GENAI_USE_VERTEXAI`: Set to `true` to use Vertex AI
**Features:**
- Native function calling support for Gemini 1.5+ and 2.x models

View File

@@ -107,7 +107,7 @@ CrewAI 코드 내에는 사용할 모델을 지정할 수 있는 여러 위치
## 공급자 구성 예시
CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양한 LLM 공급자를 지원합니다.
CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양한 LLM 공급자를 지원합니다.
이 섹션에서는 프로젝트의 요구에 가장 적합한 LLM을 선택, 구성, 최적화하는 데 도움이 되는 자세한 예시를 제공합니다.
<AccordionGroup>
@@ -153,8 +153,8 @@ CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양
</Accordion>
<Accordion title="Meta-Llama">
Meta의 Llama API는 Meta의 대형 언어 모델 패밀리 접근을 제공합니다.
API는 [Meta Llama API](https://llama.developer.meta.com?utm_source=partner-crewai&utm_medium=website)에서 사용할 수 있습니다.
Meta의 Llama API는 Meta의 대형 언어 모델 패밀리 접근을 제공합니다.
API는 [Meta Llama API](https://llama.developer.meta.com?utm_source=partner-crewai&utm_medium=website)에서 사용할 수 있습니다.
`.env` 파일에 다음 환경 변수를 설정하십시오:
```toml Code
@@ -207,20 +207,11 @@ CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양
`.env` 파일에 API 키를 설정하십시오. 키가 필요하거나 기존 키를 찾으려면 [AI Studio](https://aistudio.google.com/apikey)를 확인하세요.
```toml .env
# Gemini API 사용 시 (다음 중 하나)
GOOGLE_API_KEY=<your-api-key>
# https://ai.google.dev/gemini-api/docs/api-key
GEMINI_API_KEY=<your-api-key>
# Vertex AI Express 모드 사용 시 (API 키 인증)
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
# Vertex AI 서비스 계정 사용 시
GOOGLE_CLOUD_PROJECT=<your-project-id>
GOOGLE_CLOUD_LOCATION=<location> # 기본값: us-central1
```
**기본 사용법:**
CrewAI 프로젝트에서의 예시 사용법:
```python Code
from crewai import LLM
@@ -230,34 +221,6 @@ CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양
)
```
**Vertex AI Express 모드 (API 키 인증):**
Vertex AI Express 모드를 사용하면 서비스 계정 자격 증명 대신 간단한 API 키 인증으로 Vertex AI를 사용할 수 있습니다. Vertex AI를 시작하는 가장 빠른 방법입니다.
Express 모드를 활성화하려면 `.env` 파일에 두 환경 변수를 모두 설정하세요:
```toml .env
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
```
그런 다음 평소처럼 LLM을 사용하세요:
```python Code
from crewai import LLM
llm = LLM(
model="gemini/gemini-2.0-flash",
temperature=0.7
)
```
<Info>
Express 모드 API 키를 받으려면:
- 신규 Google Cloud 사용자: [Express 모드 API 키](https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey) 받기
- 기존 Google Cloud 사용자: [서비스 계정에 바인딩된 Google Cloud API 키](https://cloud.google.com/docs/authentication/api-keys) 받기
자세한 내용은 [Vertex AI Express 모드 문서](https://docs.cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey)를 참조하세요.
</Info>
### Gemini 모델
Google은 다양한 용도에 최적화된 강력한 모델을 제공합니다.
@@ -513,7 +476,7 @@ CrewAI는 고유한 기능, 인증 방법, 모델 역량을 제공하는 다양
<Accordion title="Local NVIDIA NIM Deployed using WSL2">
NVIDIA NIM을 이용하면 Windows 기기에서 WSL2(Windows Subsystem for Linux)를 통해 강력한 LLM을 로컬로 실행할 수 있습니다.
NVIDIA NIM을 이용하면 Windows 기기에서 WSL2(Windows Subsystem for Linux)를 통해 강력한 LLM을 로컬로 실행할 수 있습니다.
이 방식은 Nvidia GPU를 활용하여 프라이빗하고, 안전하며, 비용 효율적인 AI 추론을 클라우드 서비스에 의존하지 않고 구현할 수 있습니다.
데이터 프라이버시, 오프라인 기능이 필요한 개발, 테스트, 또는 프로덕션 환경에 최적입니다.
@@ -991,4 +954,4 @@ LLM 설정을 최대한 활용하는 방법을 알아보세요:
llm = LLM(model="openai/gpt-4o") # 128K tokens
```
</Tab>
</Tabs>
</Tabs>

View File

@@ -79,7 +79,7 @@ Existem diferentes locais no código do CrewAI onde você pode especificar o mod
# Configuração avançada com parâmetros detalhados
llm = LLM(
model="openai/gpt-4",
model="openai/gpt-4",
temperature=0.8,
max_tokens=150,
top_p=0.9,
@@ -207,20 +207,11 @@ Nesta seção, você encontrará exemplos detalhados que ajudam a selecionar, co
Defina sua chave de API no seu arquivo `.env`. Se precisar de uma chave, ou encontrar uma existente, verifique o [AI Studio](https://aistudio.google.com/apikey).
```toml .env
# Para API Gemini (uma das seguintes)
GOOGLE_API_KEY=<your-api-key>
# https://ai.google.dev/gemini-api/docs/api-key
GEMINI_API_KEY=<your-api-key>
# Para Vertex AI Express mode (autenticação por chave de API)
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
# Para Vertex AI com conta de serviço
GOOGLE_CLOUD_PROJECT=<your-project-id>
GOOGLE_CLOUD_LOCATION=<location> # Padrão: us-central1
```
**Uso Básico:**
Exemplo de uso em seu projeto CrewAI:
```python Code
from crewai import LLM
@@ -230,34 +221,6 @@ Nesta seção, você encontrará exemplos detalhados que ajudam a selecionar, co
)
```
**Vertex AI Express Mode (Autenticação por Chave de API):**
O Vertex AI Express mode permite usar o Vertex AI com autenticação simples por chave de API, em vez de credenciais de conta de serviço. Esta é a maneira mais rápida de começar com o Vertex AI.
Para habilitar o Express mode, defina ambas as variáveis de ambiente no seu arquivo `.env`:
```toml .env
GOOGLE_GENAI_USE_VERTEXAI=true
GOOGLE_API_KEY=<your-api-key>
```
Em seguida, use o LLM normalmente:
```python Code
from crewai import LLM
llm = LLM(
model="gemini/gemini-2.0-flash",
temperature=0.7
)
```
<Info>
Para obter uma chave de API do Express mode:
- Novos usuários do Google Cloud: Obtenha uma [chave de API do Express mode](https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey)
- Usuários existentes do Google Cloud: Obtenha uma [chave de API do Google Cloud vinculada a uma conta de serviço](https://cloud.google.com/docs/authentication/api-keys)
Para mais detalhes, consulte a [documentação do Vertex AI Express mode](https://docs.cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey).
</Info>
### Modelos Gemini
O Google oferece uma variedade de modelos poderosos otimizados para diferentes casos de uso.
@@ -860,7 +823,7 @@ Saiba como obter o máximo da configuração do seu LLM:
Lembre-se de monitorar regularmente o uso de tokens e ajustar suas configurações para otimizar custos e desempenho.
</Info>
</Accordion>
<Accordion title="Descartar Parâmetros Adicionais">
O CrewAI usa Litellm internamente para chamadas LLM, permitindo descartar parâmetros adicionais desnecessários para seu caso de uso. Isso pode simplificar seu código e reduzir a complexidade da configuração do LLM.
Por exemplo, se não precisar enviar o parâmetro <code>stop</code>, basta omiti-lo na chamada do LLM:
@@ -919,4 +882,4 @@ Saiba como obter o máximo da configuração do seu LLM:
llm = LLM(model="openai/gpt-4o") # 128K tokens
```
</Tab>
</Tabs>
</Tabs>

View File

@@ -89,6 +89,9 @@ from crewai_tools.tools.jina_scrape_website_tool.jina_scrape_website_tool import
from crewai_tools.tools.json_search_tool.json_search_tool import JSONSearchTool
from crewai_tools.tools.linkup.linkup_search_tool import LinkupSearchTool
from crewai_tools.tools.llamaindex_tool.llamaindex_tool import LlamaIndexTool
from crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool import (
MCPDiscoveryTool,
)
from crewai_tools.tools.mdx_search_tool.mdx_search_tool import MDXSearchTool
from crewai_tools.tools.merge_agent_handler_tool.merge_agent_handler_tool import (
MergeAgentHandlerTool,
@@ -236,6 +239,7 @@ __all__ = [
"JinaScrapeWebsiteTool",
"LinkupSearchTool",
"LlamaIndexTool",
"MCPDiscoveryTool",
"MCPServerAdapter",
"MDXSearchTool",
"MergeAgentHandlerTool",

View File

@@ -0,0 +1,16 @@
from crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool import (
MCPDiscoveryResult,
MCPDiscoveryTool,
MCPDiscoveryToolSchema,
MCPServerMetrics,
MCPServerRecommendation,
)
__all__ = [
"MCPDiscoveryResult",
"MCPDiscoveryTool",
"MCPDiscoveryToolSchema",
"MCPServerMetrics",
"MCPServerRecommendation",
]

View File

@@ -0,0 +1,414 @@
"""MCP Discovery Tool for CrewAI agents.
This tool enables agents to dynamically discover MCP servers based on
natural language queries using the MCP Discovery API.
"""
import json
import logging
import os
from typing import Any, TypedDict
from crewai.tools import BaseTool, EnvVar
from pydantic import BaseModel, Field
import requests
logger = logging.getLogger(__name__)
class MCPServerMetrics(TypedDict, total=False):
"""Performance metrics for an MCP server."""
avg_latency_ms: float | None
uptime_pct: float | None
last_checked: str | None
class MCPServerRecommendation(TypedDict, total=False):
"""A recommended MCP server from the discovery API."""
server: str
npm_package: str
install_command: str
confidence: float
description: str
capabilities: list[str]
metrics: MCPServerMetrics
docs_url: str
github_url: str
class MCPDiscoveryResult(TypedDict):
"""Result from the MCP Discovery API."""
recommendations: list[MCPServerRecommendation]
total_found: int
query_time_ms: int
class MCPDiscoveryConstraints(BaseModel):
"""Constraints for MCP server discovery."""
max_latency_ms: int | None = Field(
default=None,
description="Maximum acceptable latency in milliseconds",
)
required_features: list[str] | None = Field(
default=None,
description="List of required features/capabilities",
)
exclude_servers: list[str] | None = Field(
default=None,
description="List of server names to exclude from results",
)
class MCPDiscoveryToolSchema(BaseModel):
"""Input schema for MCPDiscoveryTool."""
need: str = Field(
...,
description=(
"Natural language description of what you need. "
"For example: 'database with authentication', 'email automation', "
"'file storage', 'web scraping'"
),
)
constraints: MCPDiscoveryConstraints | None = Field(
default=None,
description="Optional constraints to filter results",
)
limit: int = Field(
default=5,
description="Maximum number of recommendations to return (1-10)",
ge=1,
le=10,
)
class MCPDiscoveryTool(BaseTool):
"""Tool for discovering MCP servers dynamically.
This tool uses the MCP Discovery API to find MCP servers that match
a natural language description of what the agent needs. It enables
agents to dynamically discover and select the best MCP servers for
their tasks without requiring pre-configuration.
Example:
```python
from crewai import Agent
from crewai_tools import MCPDiscoveryTool
discovery_tool = MCPDiscoveryTool()
agent = Agent(
role='Researcher',
tools=[discovery_tool],
goal='Research and analyze data'
)
# The agent can now discover MCP servers dynamically:
# discover_mcp_server(need="database with authentication")
# Returns: Supabase MCP server with installation instructions
```
Attributes:
name: The name of the tool.
description: A description of what the tool does.
args_schema: The Pydantic model for input validation.
base_url: The base URL for the MCP Discovery API.
timeout: Request timeout in seconds.
"""
name: str = "Discover MCP Server"
description: str = (
"Discover MCP (Model Context Protocol) servers that match your needs. "
"Use this tool to find the best MCP server for any task using natural "
"language. Returns server recommendations with installation instructions, "
"capabilities, and performance metrics."
)
args_schema: type[BaseModel] = MCPDiscoveryToolSchema
base_url: str = "https://mcp-discovery-production.up.railway.app"
timeout: int = 30
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
name="MCP_DISCOVERY_API_KEY",
description="API key for MCP Discovery (optional for free tier)",
required=False,
),
]
)
def _build_request_payload(
self,
need: str,
constraints: MCPDiscoveryConstraints | None,
limit: int,
) -> dict[str, Any]:
"""Build the request payload for the discovery API.
Args:
need: Natural language description of what is needed.
constraints: Optional constraints to filter results.
limit: Maximum number of recommendations.
Returns:
Dictionary containing the request payload.
"""
payload: dict[str, Any] = {
"need": need,
"limit": limit,
}
if constraints:
constraints_dict: dict[str, Any] = {}
if constraints.max_latency_ms is not None:
constraints_dict["max_latency_ms"] = constraints.max_latency_ms
if constraints.required_features:
constraints_dict["required_features"] = constraints.required_features
if constraints.exclude_servers:
constraints_dict["exclude_servers"] = constraints.exclude_servers
if constraints_dict:
payload["constraints"] = constraints_dict
return payload
def _make_api_request(self, payload: dict[str, Any]) -> dict[str, Any]:
"""Make a request to the MCP Discovery API.
Args:
payload: The request payload.
Returns:
The API response as a dictionary.
Raises:
ValueError: If the API returns an empty response.
requests.exceptions.RequestException: If the request fails.
"""
url = f"{self.base_url}/api/v1/discover"
headers = {
"Content-Type": "application/json",
}
api_key = os.environ.get("MCP_DISCOVERY_API_KEY")
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
response = None
try:
response = requests.post(
url,
headers=headers,
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
results = response.json()
if not results:
logger.error("Empty response from MCP Discovery API")
raise ValueError("Empty response from MCP Discovery API")
return results
except requests.exceptions.RequestException as e:
error_msg = f"Error making request to MCP Discovery API: {e}"
if response is not None and hasattr(response, "content"):
error_msg += (
f"\nResponse content: "
f"{response.content.decode('utf-8', errors='replace')}"
)
logger.error(error_msg)
raise
except json.JSONDecodeError as e:
if response is not None and hasattr(response, "content"):
logger.error(f"Error decoding JSON response: {e}")
logger.error(
f"Response content: "
f"{response.content.decode('utf-8', errors='replace')}"
)
else:
logger.error(
f"Error decoding JSON response: {e} (No response content available)"
)
raise
def _process_single_recommendation(
self, rec: dict[str, Any]
) -> MCPServerRecommendation | None:
"""Process a single recommendation from the API.
Args:
rec: Raw recommendation dictionary from the API.
Returns:
Processed MCPServerRecommendation or None if malformed.
"""
try:
metrics_data = rec.get("metrics", {}) if isinstance(rec, dict) else {}
metrics: MCPServerMetrics = {
"avg_latency_ms": metrics_data.get("avg_latency_ms"),
"uptime_pct": metrics_data.get("uptime_pct"),
"last_checked": metrics_data.get("last_checked"),
}
recommendation: MCPServerRecommendation = {
"server": rec.get("server", ""),
"npm_package": rec.get("npm_package", ""),
"install_command": rec.get("install_command", ""),
"confidence": rec.get("confidence", 0.0),
"description": rec.get("description", ""),
"capabilities": rec.get("capabilities", []),
"metrics": metrics,
"docs_url": rec.get("docs_url", ""),
"github_url": rec.get("github_url", ""),
}
return recommendation
except (KeyError, TypeError, AttributeError) as e:
logger.warning(f"Skipping malformed recommendation: {rec}, error: {e}")
return None
def _process_recommendations(
self, recommendations: list[dict[str, Any]]
) -> list[MCPServerRecommendation]:
"""Process and validate server recommendations.
Args:
recommendations: Raw recommendations from the API.
Returns:
List of processed MCPServerRecommendation objects.
"""
processed: list[MCPServerRecommendation] = []
for rec in recommendations:
result = self._process_single_recommendation(rec)
if result is not None:
processed.append(result)
return processed
def _format_result(self, result: MCPDiscoveryResult) -> str:
"""Format the discovery result as a human-readable string.
Args:
result: The discovery result to format.
Returns:
A formatted string representation of the result.
"""
if not result["recommendations"]:
return "No MCP servers found matching your requirements."
lines = [
f"Found {result['total_found']} MCP server(s) "
f"(query took {result['query_time_ms']}ms):\n"
]
for i, rec in enumerate(result["recommendations"], 1):
confidence_pct = rec.get("confidence", 0) * 100
lines.append(f"{i}. {rec.get('server', 'Unknown')} ({confidence_pct:.0f}% confidence)")
lines.append(f" Description: {rec.get('description', 'N/A')}")
lines.append(f" Capabilities: {', '.join(rec.get('capabilities', []))}")
lines.append(f" Install: {rec.get('install_command', 'N/A')}")
lines.append(f" NPM Package: {rec.get('npm_package', 'N/A')}")
metrics = rec.get("metrics", {})
if metrics.get("avg_latency_ms") is not None:
lines.append(f" Avg Latency: {metrics['avg_latency_ms']}ms")
if metrics.get("uptime_pct") is not None:
lines.append(f" Uptime: {metrics['uptime_pct']}%")
if rec.get("docs_url"):
lines.append(f" Docs: {rec['docs_url']}")
if rec.get("github_url"):
lines.append(f" GitHub: {rec['github_url']}")
lines.append("")
return "\n".join(lines)
def _run(self, **kwargs: Any) -> str:
"""Execute the MCP discovery operation.
Args:
**kwargs: Keyword arguments matching MCPDiscoveryToolSchema.
Returns:
A formatted string with discovered MCP servers.
Raises:
ValueError: If required parameters are missing.
"""
need: str | None = kwargs.get("need")
if not need:
raise ValueError("'need' parameter is required")
constraints_data = kwargs.get("constraints")
constraints: MCPDiscoveryConstraints | None = None
if constraints_data:
if isinstance(constraints_data, dict):
constraints = MCPDiscoveryConstraints(**constraints_data)
elif isinstance(constraints_data, MCPDiscoveryConstraints):
constraints = constraints_data
limit: int = kwargs.get("limit", 5)
payload = self._build_request_payload(need, constraints, limit)
response = self._make_api_request(payload)
recommendations = self._process_recommendations(
response.get("recommendations", [])
)
result: MCPDiscoveryResult = {
"recommendations": recommendations,
"total_found": response.get("total_found", len(recommendations)),
"query_time_ms": response.get("query_time_ms", 0),
}
return self._format_result(result)
def discover(
self,
need: str,
constraints: MCPDiscoveryConstraints | None = None,
limit: int = 5,
) -> MCPDiscoveryResult:
"""Discover MCP servers matching the given requirements.
This is a convenience method that returns structured data instead
of a formatted string.
Args:
need: Natural language description of what is needed.
constraints: Optional constraints to filter results.
limit: Maximum number of recommendations (1-10).
Returns:
MCPDiscoveryResult containing server recommendations.
Example:
```python
tool = MCPDiscoveryTool()
result = tool.discover(
need="database with authentication",
constraints=MCPDiscoveryConstraints(
max_latency_ms=200,
required_features=["auth", "realtime"]
),
limit=3
)
for rec in result["recommendations"]:
print(f"{rec['server']}: {rec['description']}")
```
"""
payload = self._build_request_payload(need, constraints, limit)
response = self._make_api_request(payload)
recommendations = self._process_recommendations(
response.get("recommendations", [])
)
return {
"recommendations": recommendations,
"total_found": response.get("total_found", len(recommendations)),
"query_time_ms": response.get("query_time_ms", 0),
}

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,452 @@
"""Tests for the MCP Discovery Tool."""
import json
from unittest.mock import MagicMock, patch
import pytest
import requests
from crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool import (
MCPDiscoveryConstraints,
MCPDiscoveryResult,
MCPDiscoveryTool,
MCPDiscoveryToolSchema,
MCPServerMetrics,
MCPServerRecommendation,
)
@pytest.fixture
def mock_api_response() -> dict:
"""Create a mock API response for testing."""
return {
"recommendations": [
{
"server": "sqlite-server",
"npm_package": "@modelcontextprotocol/server-sqlite",
"install_command": "npx -y @modelcontextprotocol/server-sqlite",
"confidence": 0.38,
"description": "SQLite database server for MCP.",
"capabilities": ["sqlite", "sql", "database", "embedded"],
"metrics": {
"avg_latency_ms": 50.0,
"uptime_pct": 99.9,
"last_checked": "2026-01-17T10:30:00Z",
},
"docs_url": "https://modelcontextprotocol.io/docs/servers/sqlite",
"github_url": "https://github.com/modelcontextprotocol/servers",
},
{
"server": "postgres-server",
"npm_package": "@modelcontextprotocol/server-postgres",
"install_command": "npx -y @modelcontextprotocol/server-postgres",
"confidence": 0.33,
"description": "PostgreSQL database server for MCP.",
"capabilities": ["postgres", "sql", "database", "queries"],
"metrics": {
"avg_latency_ms": None,
"uptime_pct": None,
"last_checked": None,
},
"docs_url": "https://modelcontextprotocol.io/docs/servers/postgres",
"github_url": "https://github.com/modelcontextprotocol/servers",
},
],
"total_found": 2,
"query_time_ms": 245,
}
@pytest.fixture
def discovery_tool() -> MCPDiscoveryTool:
"""Create an MCPDiscoveryTool instance for testing."""
return MCPDiscoveryTool()
class TestMCPDiscoveryToolSchema:
"""Tests for MCPDiscoveryToolSchema."""
def test_schema_with_required_fields(self) -> None:
"""Test schema with only required fields."""
schema = MCPDiscoveryToolSchema(need="database server")
assert schema.need == "database server"
assert schema.constraints is None
assert schema.limit == 5
def test_schema_with_all_fields(self) -> None:
"""Test schema with all fields."""
constraints = MCPDiscoveryConstraints(
max_latency_ms=200,
required_features=["auth", "realtime"],
exclude_servers=["deprecated-server"],
)
schema = MCPDiscoveryToolSchema(
need="database with authentication",
constraints=constraints,
limit=3,
)
assert schema.need == "database with authentication"
assert schema.constraints is not None
assert schema.constraints.max_latency_ms == 200
assert schema.constraints.required_features == ["auth", "realtime"]
assert schema.constraints.exclude_servers == ["deprecated-server"]
assert schema.limit == 3
def test_schema_limit_validation(self) -> None:
"""Test that limit is validated to be between 1 and 10."""
with pytest.raises(ValueError):
MCPDiscoveryToolSchema(need="test", limit=0)
with pytest.raises(ValueError):
MCPDiscoveryToolSchema(need="test", limit=11)
class TestMCPDiscoveryConstraints:
"""Tests for MCPDiscoveryConstraints."""
def test_empty_constraints(self) -> None:
"""Test creating empty constraints."""
constraints = MCPDiscoveryConstraints()
assert constraints.max_latency_ms is None
assert constraints.required_features is None
assert constraints.exclude_servers is None
def test_full_constraints(self) -> None:
"""Test creating constraints with all fields."""
constraints = MCPDiscoveryConstraints(
max_latency_ms=100,
required_features=["feature1", "feature2"],
exclude_servers=["server1", "server2"],
)
assert constraints.max_latency_ms == 100
assert constraints.required_features == ["feature1", "feature2"]
assert constraints.exclude_servers == ["server1", "server2"]
class TestMCPDiscoveryTool:
"""Tests for MCPDiscoveryTool."""
def test_tool_initialization(self, discovery_tool: MCPDiscoveryTool) -> None:
"""Test tool initialization with default values."""
assert discovery_tool.name == "Discover MCP Server"
assert "MCP" in discovery_tool.description
assert discovery_tool.base_url == "https://mcp-discovery-production.up.railway.app"
assert discovery_tool.timeout == 30
def test_build_request_payload_basic(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test building request payload with basic parameters."""
payload = discovery_tool._build_request_payload(
need="database server",
constraints=None,
limit=5,
)
assert payload == {"need": "database server", "limit": 5}
def test_build_request_payload_with_constraints(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test building request payload with constraints."""
constraints = MCPDiscoveryConstraints(
max_latency_ms=200,
required_features=["auth"],
exclude_servers=["old-server"],
)
payload = discovery_tool._build_request_payload(
need="database",
constraints=constraints,
limit=3,
)
assert payload["need"] == "database"
assert payload["limit"] == 3
assert "constraints" in payload
assert payload["constraints"]["max_latency_ms"] == 200
assert payload["constraints"]["required_features"] == ["auth"]
assert payload["constraints"]["exclude_servers"] == ["old-server"]
def test_build_request_payload_partial_constraints(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test building request payload with partial constraints."""
constraints = MCPDiscoveryConstraints(max_latency_ms=100)
payload = discovery_tool._build_request_payload(
need="test",
constraints=constraints,
limit=5,
)
assert payload["constraints"] == {"max_latency_ms": 100}
def test_process_recommendations(
self, discovery_tool: MCPDiscoveryTool, mock_api_response: dict
) -> None:
"""Test processing recommendations from API response."""
recommendations = discovery_tool._process_recommendations(
mock_api_response["recommendations"]
)
assert len(recommendations) == 2
first_rec = recommendations[0]
assert first_rec["server"] == "sqlite-server"
assert first_rec["npm_package"] == "@modelcontextprotocol/server-sqlite"
assert first_rec["confidence"] == 0.38
assert first_rec["capabilities"] == ["sqlite", "sql", "database", "embedded"]
assert first_rec["metrics"]["avg_latency_ms"] == 50.0
assert first_rec["metrics"]["uptime_pct"] == 99.9
second_rec = recommendations[1]
assert second_rec["server"] == "postgres-server"
assert second_rec["metrics"]["avg_latency_ms"] is None
def test_process_recommendations_with_malformed_data(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test processing recommendations with malformed data."""
malformed_recommendations = [
{"server": "valid-server", "confidence": 0.5},
None,
{"invalid": "data"},
]
recommendations = discovery_tool._process_recommendations(
malformed_recommendations
)
assert len(recommendations) >= 1
assert recommendations[0]["server"] == "valid-server"
def test_format_result_with_recommendations(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test formatting results with recommendations."""
result: MCPDiscoveryResult = {
"recommendations": [
{
"server": "test-server",
"npm_package": "@test/server",
"install_command": "npx -y @test/server",
"confidence": 0.85,
"description": "A test server",
"capabilities": ["test", "demo"],
"metrics": {
"avg_latency_ms": 100.0,
"uptime_pct": 99.5,
"last_checked": "2026-01-17T10:00:00Z",
},
"docs_url": "https://example.com/docs",
"github_url": "https://github.com/test/server",
}
],
"total_found": 1,
"query_time_ms": 150,
}
formatted = discovery_tool._format_result(result)
assert "Found 1 MCP server(s)" in formatted
assert "test-server" in formatted
assert "85% confidence" in formatted
assert "A test server" in formatted
assert "test, demo" in formatted
assert "npx -y @test/server" in formatted
assert "100.0ms" in formatted
assert "99.5%" in formatted
def test_format_result_empty(self, discovery_tool: MCPDiscoveryTool) -> None:
"""Test formatting results with no recommendations."""
result: MCPDiscoveryResult = {
"recommendations": [],
"total_found": 0,
"query_time_ms": 50,
}
formatted = discovery_tool._format_result(result)
assert "No MCP servers found" in formatted
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_success(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test successful API request."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool._make_api_request({"need": "database", "limit": 5})
assert result == mock_api_response
mock_post.assert_called_once()
call_args = mock_post.call_args
assert call_args[1]["json"] == {"need": "database", "limit": 5}
assert call_args[1]["timeout"] == 30
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_with_api_key(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test API request with API key."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
with patch.dict("os.environ", {"MCP_DISCOVERY_API_KEY": "test-key"}):
discovery_tool._make_api_request({"need": "test", "limit": 5})
call_args = mock_post.call_args
assert "Authorization" in call_args[1]["headers"]
assert call_args[1]["headers"]["Authorization"] == "Bearer test-key"
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_empty_response(
self, mock_post: MagicMock, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test API request with empty response."""
mock_response = MagicMock()
mock_response.json.return_value = {}
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
with pytest.raises(ValueError, match="Empty response"):
discovery_tool._make_api_request({"need": "test", "limit": 5})
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_network_error(
self, mock_post: MagicMock, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test API request with network error."""
mock_post.side_effect = requests.exceptions.ConnectionError("Network error")
with pytest.raises(requests.exceptions.ConnectionError):
discovery_tool._make_api_request({"need": "test", "limit": 5})
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_make_api_request_json_decode_error(
self, mock_post: MagicMock, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test API request with JSON decode error."""
mock_response = MagicMock()
mock_response.json.side_effect = json.JSONDecodeError("Error", "", 0)
mock_response.raise_for_status.return_value = None
mock_response.content = b"invalid json"
mock_post.return_value = mock_response
with pytest.raises(json.JSONDecodeError):
discovery_tool._make_api_request({"need": "test", "limit": 5})
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_run_success(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test successful _run execution."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool._run(need="database server")
assert "sqlite-server" in result
assert "postgres-server" in result
assert "Found 2 MCP server(s)" in result
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_run_with_constraints(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test _run with constraints."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool._run(
need="database",
constraints={"max_latency_ms": 100, "required_features": ["sql"]},
limit=3,
)
assert "sqlite-server" in result
call_args = mock_post.call_args
payload = call_args[1]["json"]
assert payload["constraints"]["max_latency_ms"] == 100
assert payload["constraints"]["required_features"] == ["sql"]
assert payload["limit"] == 3
def test_run_missing_need_parameter(
self, discovery_tool: MCPDiscoveryTool
) -> None:
"""Test _run with missing need parameter."""
with pytest.raises(ValueError, match="'need' parameter is required"):
discovery_tool._run()
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_discover_method(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test the discover convenience method."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool.discover(
need="database",
constraints=MCPDiscoveryConstraints(max_latency_ms=200),
limit=3,
)
assert isinstance(result, dict)
assert "recommendations" in result
assert "total_found" in result
assert "query_time_ms" in result
assert len(result["recommendations"]) == 2
assert result["total_found"] == 2
@patch("crewai_tools.tools.mcp_discovery_tool.mcp_discovery_tool.requests.post")
def test_discover_returns_structured_data(
self,
mock_post: MagicMock,
discovery_tool: MCPDiscoveryTool,
mock_api_response: dict,
) -> None:
"""Test that discover returns properly structured data."""
mock_response = MagicMock()
mock_response.json.return_value = mock_api_response
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
result = discovery_tool.discover(need="database")
first_rec = result["recommendations"][0]
assert "server" in first_rec
assert "npm_package" in first_rec
assert "install_command" in first_rec
assert "confidence" in first_rec
assert "description" in first_rec
assert "capabilities" in first_rec
assert "metrics" in first_rec
assert "docs_url" in first_rec
assert "github_url" in first_rec
class TestMCPDiscoveryToolIntegration:
"""Integration tests for MCPDiscoveryTool (requires network)."""
@pytest.mark.skip(reason="Integration test - requires network access")
def test_real_api_call(self) -> None:
"""Test actual API call to MCP Discovery service."""
tool = MCPDiscoveryTool()
result = tool._run(need="database", limit=3)
assert "MCP server" in result or "No MCP servers found" in result

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine, Sequence
from collections.abc import Callable, Sequence
import shutil
import subprocess
import time
@@ -34,11 +34,6 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
@@ -48,10 +43,10 @@ from crewai.events.types.memory_events import (
MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent,
)
from crewai.experimental.agent_executor import AgentExecutor
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.lite_agent_output import LiteAgentOutput
from crewai.lite_agent import LiteAgent
from crewai.llms.base_llm import BaseLLM
from crewai.mcp import (
MCPClient,
@@ -69,18 +64,15 @@ from crewai.security.fingerprint import Fingerprint
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities.agent_utils import (
get_tool_names,
is_inside_event_loop,
load_agent_from_repository,
parse_tools,
render_text_description_and_args,
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.guardrail import process_guardrail
from crewai.utilities.converter import Converter
from crewai.utilities.guardrail_types import GuardrailType
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -97,9 +89,9 @@ if TYPE_CHECKING:
from crewai_tools import CodeInterpreterTool
from crewai.agents.agent_builder.base_agent import PlatformAppOrAction
from crewai.lite_agent_output import LiteAgentOutput
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.types import LLMMessage
@@ -121,7 +113,7 @@ class Agent(BaseAgent):
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
Attributes:
agent_executor: An instance of the CrewAgentExecutor or AgentExecutor class.
agent_executor: An instance of the CrewAgentExecutor or CrewAgentExecutorFlow class.
role: The role of the agent.
goal: The objective of the agent.
backstory: The backstory of the agent.
@@ -246,9 +238,9 @@ class Agent(BaseAgent):
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
""",
)
executor_class: type[CrewAgentExecutor] | type[AgentExecutor] = Field(
executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field(
default=CrewAgentExecutor,
description="Class to use for the agent executor. Defaults to CrewAgentExecutor, can optionally use AgentExecutor.",
description="Class to use for the agent executor. Defaults to CrewAgentExecutor, can optionally use CrewAgentExecutorFlow.",
)
@model_validator(mode="before")
@@ -1591,25 +1583,26 @@ class Agent(BaseAgent):
)
return None
def _prepare_kickoff(
def kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> tuple[AgentExecutor, dict[str, str], dict[str, Any], list[CrewStructuredTool]]:
"""Prepare common setup for kickoff execution.
) -> LiteAgentOutput:
"""
Execute the agent with the given messages using a LiteAgent instance.
This method handles all the common preparation logic shared between
kickoff() and kickoff_async(), including tool processing, prompt building,
executor creation, and input formatting.
This method is useful when you want to use the Agent configuration but
with the simpler and more direct execution flow of LiteAgent.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output.
Returns:
Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution.
LiteAgentOutput: The result of the agent execution.
"""
# Process platform apps and MCP tools
if self.apps:
platform_tools = self.get_platform_tools(self.apps)
if platform_tools and self.tools is not None:
@@ -1619,359 +1612,25 @@ class Agent(BaseAgent):
if mcps and self.tools is not None:
self.tools.extend(mcps)
# Prepare tools
raw_tools: list[BaseTool] = self.tools or []
parsed_tools = parse_tools(raw_tools)
# Build agent_info for backward-compatible event emission
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": raw_tools,
"verbose": self.verbose,
}
# Build prompt for standalone execution
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
prompt_template=self.prompt_template,
response_template=self.response_template,
).task_execution()
# Prepare stop words
stop_words = [self.i18n.slice("observation")]
if self.response_template:
stop_words.append(
self.response_template.split("{{ .Response }}")[1].strip()
)
# Get RPM limit function
rpm_limit_fn = (
self._rpm_controller.check_or_wait if self._rpm_controller else None
)
# Create the executor for standalone mode (no crew, no task)
executor = AgentExecutor(
task=None,
crew=None,
llm=cast(BaseLLM, self.llm),
agent=self,
prompt=prompt,
max_iter=self.max_iter,
tools=parsed_tools,
tools_names=get_tool_names(parsed_tools),
stop_words=stop_words,
tools_description=render_text_description_and_args(parsed_tools),
tools_handler=self.tools_handler,
original_tools=raw_tools,
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
lite_agent = LiteAgent(
id=self.id,
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=rpm_limit_fn,
callbacks=[TokenCalcHandler(self._token_process)],
response_model=response_format,
verbose=self.verbose,
response_format=response_format,
i18n=self.i18n,
original_agent=self,
guardrail=self.guardrail,
guardrail_max_retries=self.guardrail_max_retries,
)
# Format messages
if isinstance(messages, str):
formatted_messages = messages
else:
formatted_messages = "\n".join(
str(msg.get("content", "")) for msg in messages if msg.get("content")
)
# Build the input dict for the executor
inputs = {
"input": formatted_messages,
"tool_names": get_tool_names(parsed_tools),
"tools": render_text_description_and_args(parsed_tools),
}
return executor, inputs, agent_info, parsed_tools
def kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]:
"""
Execute the agent with the given messages using the AgentExecutor.
This method provides standalone agent execution without requiring a Crew.
It supports tools, response formatting, and guardrails.
When called from within a Flow (sync or async method), this automatically
detects the event loop and returns a coroutine that the Flow framework
awaits. Users don't need to handle async explicitly.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output.
Returns:
LiteAgentOutput: The result of the agent execution.
When inside a Flow, returns a coroutine that resolves to LiteAgentOutput.
Note:
For explicit async usage outside of Flow, use kickoff_async() directly.
"""
# Magic auto-async: if inside event loop (e.g., inside a Flow),
# return coroutine for Flow to await
if is_inside_event_loop():
return self.kickoff_async(messages, response_format)
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format
)
try:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
),
)
output = self._execute_and_build_output(executor, inputs, response_format)
if self.guardrail is not None:
output = self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
)
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=output.raw,
),
)
return output
except Exception as e:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise
def _execute_and_build_output(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent and build the output object.
Args:
executor: The executor instance.
inputs: Input dictionary for execution.
response_format: Optional response format.
Returns:
LiteAgentOutput with raw output, formatted result, and metrics.
"""
import json
# Execute the agent (this is called from sync path, so invoke returns dict)
result = cast(dict[str, Any], executor.invoke(inputs))
raw_output = result.get("output", "")
# Handle response format conversion
formatted_result: BaseModel | None = None
if response_format:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
usage_metrics = self.llm.get_token_usage_summary()
else:
usage_metrics = self._token_process.get_summary()
return LiteAgentOutput(
raw=raw_output,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
)
async def _execute_and_build_output_async(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent asynchronously and build the output object.
This is the async version of _execute_and_build_output that uses
invoke_async() for native async execution within event loops.
Args:
executor: The executor instance.
inputs: Input dictionary for execution.
response_format: Optional response format.
Returns:
LiteAgentOutput with raw output, formatted result, and metrics.
"""
import json
# Execute the agent asynchronously
result = await executor.invoke_async(inputs)
raw_output = result.get("output", "")
# Handle response format conversion
formatted_result: BaseModel | None = None
if response_format:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
usage_metrics = self.llm.get_token_usage_summary()
else:
usage_metrics = self._token_process.get_summary()
return LiteAgentOutput(
raw=raw_output,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
)
def _process_kickoff_guardrail(
self,
output: LiteAgentOutput,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
retry_count: int = 0,
) -> LiteAgentOutput:
"""Process guardrail for kickoff execution with retry logic.
Args:
output: Current agent output.
executor: The executor instance.
inputs: Input dictionary for re-execution.
response_format: Optional response format.
retry_count: Current retry count.
Returns:
Validated/updated output.
"""
from crewai.utilities.guardrail_types import GuardrailCallable
# Ensure guardrail is callable
guardrail_callable: GuardrailCallable
if isinstance(self.guardrail, str):
from crewai.tasks.llm_guardrail import LLMGuardrail
guardrail_callable = cast(
GuardrailCallable,
LLMGuardrail(description=self.guardrail, llm=cast(BaseLLM, self.llm)),
)
elif callable(self.guardrail):
guardrail_callable = self.guardrail
else:
# Should not happen if called from kickoff with guardrail check
return output
guardrail_result = process_guardrail(
output=output,
guardrail=guardrail_callable,
retry_count=retry_count,
event_source=self,
from_agent=self,
)
if not guardrail_result.success:
if retry_count >= self.guardrail_max_retries:
raise ValueError(
f"Agent's guardrail failed validation after {self.guardrail_max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
# Add feedback and re-execute
executor._append_message_to_state(
guardrail_result.error or "Guardrail validation failed",
role="user",
)
# Re-execute and build new output
output = self._execute_and_build_output(executor, inputs, response_format)
# Recursively retry guardrail
return self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
retry_count=retry_count + 1,
)
# Apply guardrail result if available
if guardrail_result.result is not None:
if isinstance(guardrail_result.result, str):
output.raw = guardrail_result.result
elif isinstance(guardrail_result.result, BaseModel):
output.pydantic = guardrail_result.result
return output
return lite_agent.kickoff(messages)
async def kickoff_async(
self,
@@ -1979,11 +1638,9 @@ class Agent(BaseAgent):
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent asynchronously with the given messages.
Execute the agent asynchronously with the given messages using a LiteAgent instance.
This is the async version of the kickoff method that uses native async
execution. It is designed for use within async contexts, such as when
called from within an async Flow method.
This is the async version of the kickoff method.
Args:
messages: Either a string query or a list of message dictionaries.
@@ -1994,48 +1651,21 @@ class Agent(BaseAgent):
Returns:
LiteAgentOutput: The result of the agent execution.
"""
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format
lite_agent = LiteAgent(
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,
verbose=self.verbose,
response_format=response_format,
i18n=self.i18n,
original_agent=self,
guardrail=self.guardrail,
guardrail_max_retries=self.guardrail_max_retries,
)
try:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
),
)
output = await self._execute_and_build_output_async(
executor, inputs, response_format
)
if self.guardrail is not None:
output = self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
)
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=output.raw,
),
)
return output
except Exception as e:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise
return await lite_agent.kickoff_async(messages)

View File

@@ -21,9 +21,9 @@ if TYPE_CHECKING:
class CrewAgentExecutorMixin:
crew: Crew | None
crew: Crew
agent: Agent
task: Task | None
task: Task
iterations: int
max_iter: int
messages: list[LLMMessage]

View File

@@ -1,4 +1,4 @@
from crewai.experimental.agent_executor import AgentExecutor, CrewAgentExecutorFlow
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
@@ -23,9 +23,8 @@ from crewai.experimental.evaluation import (
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"AgentExecutor",
"BaseEvaluator",
"CrewAgentExecutorFlow", # Deprecated alias for AgentExecutor
"CrewAgentExecutorFlow",
"EvaluationScore",
"EvaluationTraceCallback",
"ExperimentResult",

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from collections.abc import Callable, Coroutine
from collections.abc import Callable
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
@@ -37,7 +37,6 @@ from crewai.utilities.agent_utils import (
handle_unknown_error,
has_reached_max_iterations,
is_context_length_exceeded,
is_inside_event_loop,
process_llm_response,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -74,17 +73,13 @@ class AgentReActState(BaseModel):
ask_for_human_input: bool = Field(default=False)
class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Agent Executor for both standalone agents and crew-bound agents.
class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Flow-based executor matching CrewAgentExecutor interface.
Inherits from:
- Flow[AgentReActState]: Provides flow orchestration capabilities
- CrewAgentExecutorMixin: Provides memory methods (short/long/external term)
This executor can operate in two modes:
- Standalone mode: When crew and task are None (used by Agent.kickoff())
- Crew mode: When crew and task are provided (used by Agent.execute_task())
Note: Multiple instances may be created during agent initialization
(cache setup, RPM controller setup, etc.) but only the final instance
should execute tasks via invoke().
@@ -93,6 +88,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
def __init__(
self,
llm: BaseLLM,
task: Task,
crew: Crew,
agent: Agent,
prompt: SystemPromptResult | StandardPromptResult,
max_iter: int,
@@ -101,8 +98,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
stop_words: list[str],
tools_description: str,
tools_handler: ToolsHandler,
task: Task | None = None,
crew: Crew | None = None,
step_callback: Any = None,
original_tools: list[BaseTool] | None = None,
function_calling_llm: BaseLLM | Any | None = None,
@@ -116,6 +111,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
Args:
llm: Language model instance.
task: Task to execute.
crew: Crew instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
@@ -124,8 +121,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
stop_words: Stop word list.
tools_description: Tool descriptions.
tools_handler: Tool handler instance.
task: Optional task to execute (None for standalone agent execution).
crew: Optional crew instance (None for standalone agent execution).
step_callback: Optional step callback.
original_tools: Original tool list.
function_calling_llm: Optional function calling LLM.
@@ -136,9 +131,9 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""
self._i18n: I18N = i18n or get_i18n()
self.llm = llm
self.task: Task | None = task
self.task = task
self.agent = agent
self.crew: Crew | None = crew
self.crew = crew
self.prompt = prompt
self.tools = tools
self.tools_names = tools_names
@@ -183,6 +178,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
else self.stop
)
)
self._state = AgentReActState()
def _ensure_flow_initialized(self) -> None:
@@ -268,7 +264,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=None,
response_model=self.response_model,
executor_context=self,
)
@@ -453,99 +449,9 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "initialized"
def invoke(
self, inputs: dict[str, Any]
) -> dict[str, Any] | Coroutine[Any, Any, dict[str, Any]]:
def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute agent with given inputs.
When called from within an existing event loop (e.g., inside a Flow),
this method returns a coroutine that should be awaited. The Flow
framework handles this automatically.
Args:
inputs: Input dictionary containing prompt variables.
Returns:
Dictionary with agent output, or a coroutine if inside an event loop.
"""
# Magic auto-async: if inside event loop, return coroutine for Flow to await
if is_inside_event_loop():
return self.invoke_async(inputs)
self._ensure_flow_initialized()
with self._execution_lock:
if self._is_executing:
raise RuntimeError(
"Executor is already running. "
"Cannot invoke the same executor instance concurrently."
)
self._is_executing = True
self._has_been_invoked = True
try:
# Reset state for fresh execution
self.state.messages.clear()
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.ask_for_human_input = bool(
inputs.get("ask_for_human_input", False)
)
self.kickoff()
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)
return {"output": formatted_answer.output}
except AssertionError:
fail_text = Text()
fail_text.append("", style="red bold")
fail_text.append(
"Agent failed to reach a final answer. This is likely a bug - please report it.",
style="red",
)
self._console.print(fail_text)
raise
except Exception as e:
handle_unknown_error(self._printer, e)
raise
finally:
self._is_executing = False
async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute agent asynchronously with given inputs.
This method is designed for use within async contexts, such as when
the agent is called from within an async Flow method. It uses
kickoff_async() directly instead of running in a separate thread.
Args:
inputs: Input dictionary containing prompt variables.
@@ -586,8 +492,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
inputs.get("ask_for_human_input", False)
)
# Use async kickoff directly since we're already in an async context
await self.kickoff_async()
self.kickoff()
formatted_answer = self.state.current_answer
@@ -678,14 +583,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
if self.agent is None:
raise ValueError("Agent cannot be None")
if self.task is None:
return
crewai_event_bus.emit(
self.agent,
AgentLogsStartedEvent(
agent_role=self.agent.role,
task_description=self.task.description,
task_description=(self.task.description if self.task else "Not Found"),
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
),
@@ -719,12 +621,10 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
result: Agent's final output.
human_feedback: Optional feedback from human.
"""
# Early return if no crew (standalone mode)
if self.crew is None:
return
agent_id = str(self.agent.id)
train_iteration = getattr(self.crew, "_train_iteration", None)
train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None
)
if train_iteration is None or not isinstance(train_iteration, int):
train_error = Text()
@@ -906,7 +806,3 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
requiring arbitrary_types_allowed=True.
"""
return core_schema.any_schema()
# Backward compatibility alias (deprecated)
CrewAgentExecutorFlow = AgentExecutor

View File

@@ -12,7 +12,6 @@ from concurrent.futures import Future
import copy
import inspect
import logging
import threading
from typing import (
TYPE_CHECKING,
Any,
@@ -65,7 +64,6 @@ from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import FlowExecutionData, FlowMethodName, PendingListenerKey
from crewai.flow.utils import (
_extract_all_methods,
_extract_all_methods_recursive,
_normalize_condition,
get_possible_return_constants,
is_flow_condition_dict,
@@ -75,7 +73,6 @@ from crewai.flow.utils import (
is_simple_flow_condition,
)
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.human_feedback import HumanFeedbackResult
@@ -399,62 +396,6 @@ def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition
return {"type": AND_CONDITION, "conditions": processed_conditions}
class StateProxy(Generic[T]):
"""Proxy that provides thread-safe access to flow state.
Wraps state objects (dict or BaseModel) and uses a lock for all write
operations to prevent race conditions when parallel listeners modify state.
"""
__slots__ = ("_proxy_lock", "_proxy_state")
def __init__(self, state: T, lock: threading.Lock) -> None:
object.__setattr__(self, "_proxy_state", state)
object.__setattr__(self, "_proxy_lock", lock)
def __getattr__(self, name: str) -> Any:
return getattr(object.__getattribute__(self, "_proxy_state"), name)
def __setattr__(self, name: str, value: Any) -> None:
if name in ("_proxy_state", "_proxy_lock"):
object.__setattr__(self, name, value)
else:
with object.__getattribute__(self, "_proxy_lock"):
setattr(object.__getattribute__(self, "_proxy_state"), name, value)
def __getitem__(self, key: str) -> Any:
return object.__getattribute__(self, "_proxy_state")[key]
def __setitem__(self, key: str, value: Any) -> None:
with object.__getattribute__(self, "_proxy_lock"):
object.__getattribute__(self, "_proxy_state")[key] = value
def __delitem__(self, key: str) -> None:
with object.__getattribute__(self, "_proxy_lock"):
del object.__getattribute__(self, "_proxy_state")[key]
def __contains__(self, key: str) -> bool:
return key in object.__getattribute__(self, "_proxy_state")
def __repr__(self) -> str:
return repr(object.__getattribute__(self, "_proxy_state"))
def _unwrap(self) -> T:
"""Return the underlying state object."""
return cast(T, object.__getattribute__(self, "_proxy_state"))
def model_dump(self) -> dict[str, Any]:
"""Return state as a dictionary.
Works for both dict and BaseModel underlying states.
"""
state = object.__getattribute__(self, "_proxy_state")
if isinstance(state, dict):
return state
result: dict[str, Any] = state.model_dump()
return result
class FlowMeta(type):
def __new__(
mcs,
@@ -578,12 +519,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._methods: dict[FlowMethodName, FlowMethod[Any, Any]] = {}
self._method_execution_counts: dict[FlowMethodName, int] = {}
self._pending_and_listeners: dict[PendingListenerKey, set[FlowMethodName]] = {}
self._fired_or_listeners: set[FlowMethodName] = (
set()
) # Track OR listeners that already fired
self._method_outputs: list[Any] = [] # list to store all method outputs
self._state_lock = threading.Lock()
self._or_listeners_lock = threading.Lock()
self._completed_methods: set[FlowMethodName] = (
set()
) # Track completed methods for reload
@@ -628,182 +564,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
method = method.__get__(self, self.__class__)
self._methods[method.__name__] = method
def _mark_or_listener_fired(self, listener_name: FlowMethodName) -> bool:
"""Mark an OR listener as fired atomically.
Args:
listener_name: The name of the OR listener to mark.
Returns:
True if this call was the first to fire the listener.
False if the listener was already fired.
"""
with self._or_listeners_lock:
if listener_name in self._fired_or_listeners:
return False
self._fired_or_listeners.add(listener_name)
return True
def _clear_or_listeners(self) -> None:
"""Clear fired OR listeners for cyclic flows."""
with self._or_listeners_lock:
self._fired_or_listeners.clear()
def _discard_or_listener(self, listener_name: FlowMethodName) -> None:
"""Discard a single OR listener from the fired set."""
with self._or_listeners_lock:
self._fired_or_listeners.discard(listener_name)
def _build_racing_groups(self) -> dict[frozenset[FlowMethodName], FlowMethodName]:
"""Identify groups of methods that race for the same OR listener.
Analyzes the flow graph to find listeners with OR conditions that have
multiple trigger methods. These trigger methods form a "racing group"
where only the first to complete should trigger the OR listener.
Only methods that are EXCLUSIVELY sources for the OR listener are included
in the racing group. Methods that are also triggers for other listeners
(e.g., AND conditions) are not cancelled when another racing source wins.
Returns:
Dictionary mapping frozensets of racing method names to their
shared OR listener name.
Example:
If we have `@listen(or_(method_a, method_b))` on `handler`,
and method_a/method_b aren't used elsewhere,
this returns: {frozenset({'method_a', 'method_b'}): 'handler'}
"""
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
method_to_listeners: dict[FlowMethodName, set[FlowMethodName]] = {}
for listener_name, condition_data in self._listeners.items():
if is_simple_flow_condition(condition_data):
_, methods = condition_data
for m in methods:
method_to_listeners.setdefault(m, set()).add(listener_name)
elif is_flow_condition_dict(condition_data):
all_methods = _extract_all_methods_recursive(condition_data)
for m in all_methods:
method_name = FlowMethodName(m) if isinstance(m, str) else m
method_to_listeners.setdefault(method_name, set()).add(
listener_name
)
for listener_name, condition_data in self._listeners.items():
if listener_name in self._routers:
continue
trigger_methods: set[FlowMethodName] = set()
if is_simple_flow_condition(condition_data):
condition_type, methods = condition_data
if condition_type == OR_CONDITION and len(methods) > 1:
trigger_methods = set(methods)
elif is_flow_condition_dict(condition_data):
top_level_type = condition_data.get("type", OR_CONDITION)
if top_level_type == OR_CONDITION:
all_methods = _extract_all_methods_recursive(condition_data)
if len(all_methods) > 1:
trigger_methods = set(
FlowMethodName(m) if isinstance(m, str) else m
for m in all_methods
)
if trigger_methods:
exclusive_methods = {
m
for m in trigger_methods
if method_to_listeners.get(m, set()) == {listener_name}
}
if len(exclusive_methods) > 1:
racing_groups[frozenset(exclusive_methods)] = listener_name
return racing_groups
def _get_racing_group_for_listeners(
self,
listener_names: list[FlowMethodName],
) -> tuple[frozenset[FlowMethodName], FlowMethodName] | None:
"""Check if the given listeners form a racing group.
Args:
listener_names: List of listener method names being executed.
Returns:
Tuple of (racing_members, or_listener_name) if these listeners race,
None otherwise.
"""
if not hasattr(self, "_racing_groups_cache"):
self._racing_groups_cache = self._build_racing_groups()
listener_set = set(listener_names)
for racing_members, or_listener in self._racing_groups_cache.items():
if racing_members & listener_set:
racing_subset = racing_members & listener_set
if len(racing_subset) > 1:
return (frozenset(racing_subset), or_listener)
return None
async def _execute_racing_listeners(
self,
racing_listeners: frozenset[FlowMethodName],
other_listeners: list[FlowMethodName],
result: Any,
) -> None:
"""Execute racing listeners with first-wins semantics.
Racing listeners are executed in parallel, but once the first one
completes, the others are cancelled. Non-racing listeners in the
same batch are executed normally in parallel.
Args:
racing_listeners: Set of listener names that race for an OR condition.
other_listeners: Other listeners to execute in parallel (not racing).
result: The result from the triggering method.
"""
racing_tasks = [
asyncio.create_task(
self._execute_single_listener(name, result),
name=str(name),
)
for name in racing_listeners
]
other_tasks = [
asyncio.create_task(
self._execute_single_listener(name, result),
name=str(name),
)
for name in other_listeners
]
if racing_tasks:
for coro in asyncio.as_completed(racing_tasks):
try:
await coro
except Exception as e:
logger.debug(f"Racing listener failed: {e}")
continue
break
for task in racing_tasks:
if not task.done():
task.cancel()
if other_tasks:
await asyncio.gather(*other_tasks, return_exceptions=True)
@classmethod
def from_pending(
cls,
flow_id: str,
persistence: FlowPersistence | None = None,
**kwargs: Any,
) -> Flow[Any]:
) -> "Flow[Any]":
"""Create a Flow instance from a pending feedback state.
This classmethod is used to restore a flow that was paused waiting
@@ -864,7 +631,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return instance
@property
def pending_feedback(self) -> PendingFeedbackContext | None:
def pending_feedback(self) -> "PendingFeedbackContext | None":
"""Get the pending feedback context if this flow is waiting for feedback.
Returns:
@@ -949,9 +716,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
Raises:
ValueError: If no pending feedback context exists
"""
from datetime import datetime
from crewai.flow.human_feedback import HumanFeedbackResult
from datetime import datetime
if self._pending_feedback_context is None:
raise ValueError(
@@ -974,14 +740,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
# No default and no feedback - use first outcome
collapsed_outcome = emit[0]
elif emit:
if llm is not None:
collapsed_outcome = self._collapse_to_outcome(
feedback=feedback,
outcomes=emit,
llm=llm,
)
else:
collapsed_outcome = emit[0]
# Collapse feedback to outcome using LLM
collapsed_outcome = self._collapse_to_outcome(
feedback=feedback,
outcomes=emit,
llm=llm,
)
# Create result
result = HumanFeedbackResult(
@@ -1020,16 +784,21 @@ class Flow(Generic[T], metaclass=FlowMeta):
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
self._is_execution_resuming = False
final_result: Any = result
# Determine what to pass to listeners
try:
if emit and collapsed_outcome:
# Router behavior - the outcome itself triggers listeners
# First, add the outcome to method outputs as a router would
self._method_outputs.append(collapsed_outcome)
await self._execute_listeners(
FlowMethodName(collapsed_outcome),
result,
# Then trigger listeners for the outcome (e.g., "approved" triggers @listen("approved"))
final_result = await self._execute_listeners(
FlowMethodName(collapsed_outcome), # Use outcome as trigger
result, # Pass HumanFeedbackResult to listeners
)
else:
await self._execute_listeners(
# Normal behavior - pass the HumanFeedbackResult
final_result = await self._execute_listeners(
FlowMethodName(context.method_name),
result,
)
@@ -1125,17 +894,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Handle case where initial_state is a type (class)
if isinstance(self.initial_state, type):
state_class: type[T] = self.initial_state
if issubclass(state_class, FlowState):
return state_class()
if issubclass(state_class, BaseModel):
model_fields = getattr(state_class, "model_fields", None)
if issubclass(self.initial_state, FlowState):
return self.initial_state() # Uses model defaults
if issubclass(self.initial_state, BaseModel):
# Validate that the model has an id field
model_fields = getattr(self.initial_state, "model_fields", None)
if not model_fields or "id" not in model_fields:
raise ValueError("Flow state model must have an 'id' field")
model_instance = state_class()
if not getattr(model_instance, "id", None):
object.__setattr__(model_instance, "id", str(uuid4()))
return model_instance
instance = self.initial_state()
# Ensure id is set - generate UUID if empty
if not getattr(instance, "id", None):
object.__setattr__(instance, "id", str(uuid4()))
return instance
if self.initial_state is dict:
return cast(T, {"id": str(uuid4())})
@@ -1200,7 +970,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
@property
def state(self) -> T:
return StateProxy(self._state, self._state_lock) # type: ignore[return-value]
return self._state
@property
def method_outputs(self) -> list[Any]:
@@ -1525,7 +1295,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._clear_or_listeners()
else:
# We're restoring from persistence, set the flag
self._is_execution_resuming = True
@@ -1577,26 +1346,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._initialize_state(inputs)
try:
# Determine which start methods to execute at kickoff
# Conditional start methods (with __trigger_methods__) are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
unconditional_starts = [
start_method
for start_method in self._start_methods
if not getattr(
self._methods.get(start_method), "__trigger_methods__", None
)
]
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
starts_to_execute = (
unconditional_starts
if unconditional_starts
else self._start_methods
)
tasks = [
self._execute_start_method(start_method)
for start_method in starts_to_execute
for start_method in self._start_methods
]
await asyncio.gather(*tasks)
except Exception as e:
@@ -1679,14 +1431,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
if not self.suppress_flow_events:
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
return final_output
finally:
@@ -1730,8 +1481,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(start_method_name)
# Also clear fired OR listeners to allow them to fire again in new cycle
self._clear_or_listeners()
method = self._methods[start_method_name]
enhanced_method = self._inject_trigger_payload_for_start_method(method)
@@ -1754,25 +1503,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self.last_human_feedback is not None
else result
)
racing_group = self._get_racing_group_for_listeners(
listeners_for_result
)
if racing_group:
racing_members, _ = racing_group
other_listeners = [
name
for name in listeners_for_result
if name not in racing_members
]
await self._execute_racing_listeners(
racing_members, other_listeners, listener_result
)
else:
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_for_result
]
await asyncio.gather(*tasks)
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_for_result
]
await asyncio.gather(*tasks)
else:
await self._execute_listeners(start_method_name, result)
@@ -1838,19 +1573,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
if future:
self._event_futures.append(future)
if asyncio.iscoroutinefunction(method):
result = await method(*args, **kwargs)
else:
# Run sync methods in thread pool for isolation
# This allows Agent.kickoff() to work synchronously inside Flow methods
import contextvars
ctx = contextvars.copy_context()
result = await asyncio.to_thread(ctx.run, method, *args, **kwargs)
# Auto-await coroutines returned from sync methods (enables AgentExecutor pattern)
if asyncio.iscoroutine(result):
result = await result
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
@@ -1997,27 +1724,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
listener_result = router_result_to_feedback.get(
str(current_trigger), result
)
racing_group = self._get_racing_group_for_listeners(
listeners_triggered
)
if racing_group:
racing_members, _ = racing_group
other_listeners = [
name
for name in listeners_triggered
if name not in racing_members
]
await self._execute_racing_listeners(
racing_members, other_listeners, listener_result
)
else:
tasks = [
self._execute_single_listener(
listener_name, listener_result
)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
if current_trigger in router_results:
# Find start methods triggered by this router result
@@ -2034,16 +1745,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
should_trigger = current_trigger in all_methods
if should_trigger:
# Execute conditional start method triggered by router result
# Only execute if this is a cycle (method was already completed)
if method_name in self._completed_methods:
# For cyclic re-execution, temporarily clear resumption flag
# For router-triggered start methods in cycles, temporarily clear resumption flag
# to allow cyclic execution
was_resuming = self._is_execution_resuming
self._is_execution_resuming = False
await self._execute_start_method(method_name)
self._is_execution_resuming = was_resuming
else:
# First-time execution of conditional start
await self._execute_start_method(method_name)
def _evaluate_condition(
self,
@@ -2141,21 +1850,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
condition_type, methods = condition_data
if condition_type == OR_CONDITION:
# Only trigger multi-source OR listeners (or_(A, B, C)) once - skip if already fired
# Simple single-method listeners fire every time their trigger occurs
# Routers also fire every time - they're decision points
has_multiple_triggers = len(methods) > 1
should_check_fired = has_multiple_triggers and not is_router
if (
not should_check_fired
or listener_name not in self._fired_or_listeners
):
if trigger_method in methods:
triggered.append(listener_name)
# Only track multi-source OR listeners (not single-method or routers)
if should_check_fired:
self._fired_or_listeners.add(listener_name)
if trigger_method in methods:
triggered.append(listener_name)
elif condition_type == AND_CONDITION:
pending_key = PendingListenerKey(listener_name)
if pending_key not in self._pending_and_listeners:
@@ -2168,26 +1864,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._pending_and_listeners.pop(pending_key, None)
elif is_flow_condition_dict(condition_data):
# For complex conditions, check if top-level is OR and track accordingly
top_level_type = condition_data.get("type", OR_CONDITION)
is_or_based = top_level_type == OR_CONDITION
# Only track multi-source OR conditions (multiple sub-conditions), not routers
sub_conditions = condition_data.get("conditions", [])
has_multiple_triggers = is_or_based and len(sub_conditions) > 1
should_check_fired = has_multiple_triggers and not is_router
# Skip compound OR-based listeners that have already fired
if should_check_fired and listener_name in self._fired_or_listeners:
continue
if self._evaluate_condition(
condition_data, trigger_method, listener_name
):
triggered.append(listener_name)
# Track compound OR-based listeners so they only fire once
if should_check_fired:
self._fired_or_listeners.add(listener_name)
return triggered
@@ -2216,22 +1896,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self._is_execution_resuming:
# During resumption, skip execution but continue listeners
await self._execute_listeners(listener_name, None)
# For routers, also check if any conditional starts they triggered are completed
# If so, continue their chains
if listener_name in self._routers:
for start_method_name in self._start_methods:
if (
start_method_name in self._listeners
and start_method_name in self._completed_methods
):
# This conditional start was executed, continue its chain
await self._execute_start_method(start_method_name)
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(listener_name)
# Also clear from fired OR listeners for cyclic flows
self._discard_or_listener(listener_name)
try:
method = self._methods[listener_name]
@@ -2264,25 +1931,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self.last_human_feedback is not None
else listener_result
)
racing_group = self._get_racing_group_for_listeners(
listeners_for_result
)
if racing_group:
racing_members, _ = racing_group
other_listeners = [
name
for name in listeners_for_result
if name not in racing_members
]
await self._execute_racing_listeners(
racing_members, other_listeners, feedback_result
)
else:
tasks = [
self._execute_single_listener(name, feedback_result)
for name in listeners_for_result
]
await asyncio.gather(*tasks)
tasks = [
self._execute_single_listener(name, feedback_result)
for name in listeners_for_result
]
await asyncio.gather(*tasks)
except Exception as e:
# Don't log HumanFeedbackPending as an error - it's expected control flow
@@ -2396,7 +2049,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
from crewai.utilities.i18n import get_i18n
llm_instance: BaseLLMClass
# Get or create LLM instance
if isinstance(llm, str):
llm_instance = LLM(model=llm)
elif isinstance(llm, BaseLLMClass):
@@ -2431,23 +2084,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
response_model=FeedbackOutcome,
)
# Parse the response - LLM returns JSON string when using response_model
if isinstance(response, str):
import json
try:
parsed = json.loads(response)
return str(parsed.get("outcome", outcomes[0]))
return parsed.get("outcome", outcomes[0])
except json.JSONDecodeError:
# Not valid JSON, might be raw outcome string
response_clean = response.strip()
for outcome in outcomes:
if outcome.lower() == response_clean.lower():
return outcome
return outcomes[0]
elif isinstance(response, FeedbackOutcome):
return str(response.outcome)
return response.outcome
elif hasattr(response, "outcome"):
return str(response.outcome)
return response.outcome
else:
# Unexpected type, fall back to first outcome
logger.warning(f"Unexpected response type: {type(response)}")
return outcomes[0]

View File

@@ -61,7 +61,7 @@ class PersistenceDecorator:
@classmethod
def persist_state(
cls,
flow_instance: Flow[Any],
flow_instance: Flow,
method_name: str,
persistence_instance: FlowPersistence,
verbose: bool = False,
@@ -90,13 +90,7 @@ class PersistenceDecorator:
flow_uuid: str | None = None
if isinstance(state, dict):
flow_uuid = state.get("id")
elif hasattr(state, "_unwrap"):
unwrapped = state._unwrap()
if isinstance(unwrapped, dict):
flow_uuid = unwrapped.get("id")
else:
flow_uuid = getattr(unwrapped, "id", None)
elif isinstance(state, BaseModel) or hasattr(state, "id"):
elif isinstance(state, BaseModel):
flow_uuid = getattr(state, "id", None)
if not flow_uuid:
@@ -110,11 +104,10 @@ class PersistenceDecorator:
logger.info(LOG_MESSAGES["save_state"].format(flow_uuid))
try:
state_data = state._unwrap() if hasattr(state, "_unwrap") else state
persistence_instance.save_state(
flow_uuid=flow_uuid,
method_name=method_name,
state_data=state_data,
state_data=state,
)
except Exception as e:
error_msg = LOG_MESSAGES["save_error"].format(method_name, str(e))
@@ -133,9 +126,7 @@ class PersistenceDecorator:
raise ValueError(error_msg) from e
def persist(
persistence: FlowPersistence | None = None, verbose: bool = False
) -> Callable[[type | Callable[..., T]], type | Callable[..., T]]:
def persist(persistence: FlowPersistence | None = None, verbose: bool = False):
"""Decorator to persist flow state.
This decorator can be applied at either the class level or method level.
@@ -198,8 +189,8 @@ def persist(
if asyncio.iscoroutinefunction(method):
# Create a closure to capture the current name and method
def create_async_wrapper(
method_name: str, original_method: Callable[..., Any]
) -> Callable[..., Any]:
method_name: str, original_method: Callable
):
@functools.wraps(original_method)
async def method_wrapper(
self: Any, *args: Any, **kwargs: Any
@@ -230,8 +221,8 @@ def persist(
else:
# Create a closure to capture the current name and method
def create_sync_wrapper(
method_name: str, original_method: Callable[..., Any]
) -> Callable[..., Any]:
method_name: str, original_method: Callable
):
@functools.wraps(original_method)
def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = original_method(self, *args, **kwargs)
@@ -277,7 +268,7 @@ def persist(
PersistenceDecorator.persist_state(
flow_instance, method.__name__, actual_persistence, verbose
)
return cast(T, result)
return result
for attr in [
"__is_start_method__",

View File

@@ -10,7 +10,6 @@ from typing import (
get_origin,
)
import uuid
import warnings
from pydantic import (
UUID4,
@@ -81,11 +80,6 @@ class LiteAgent(FlowTrackable, BaseModel):
"""
A lightweight agent that can process messages and use tools.
.. deprecated::
LiteAgent is deprecated and will be removed in a future version.
Use ``Agent().kickoff(messages)`` instead, which provides the same
functionality with additional features like memory and knowledge support.
This agent is simpler than the full Agent class, focusing on direct execution
rather than task delegation. It's designed to be used for simple interactions
where a full crew is not needed.
@@ -170,18 +164,6 @@ class LiteAgent(FlowTrackable, BaseModel):
default_factory=get_after_llm_call_hooks
)
@model_validator(mode="after")
def emit_deprecation_warning(self) -> Self:
"""Emit deprecation warning for LiteAgent usage."""
warnings.warn(
"LiteAgent is deprecated and will be removed in a future version. "
"Use Agent().kickoff(messages) instead, which provides the same "
"functionality with additional features like memory and knowledge support.",
DeprecationWarning,
stacklevel=2,
)
return self
@model_validator(mode="after")
def setup_llm(self) -> Self:
"""Set up the LLM and other components after initialization."""

View File

@@ -410,14 +410,8 @@ class LLM(BaseLLM):
# FALLBACK to LiteLLM
if not LITELLM_AVAILABLE:
logger.error(
f"Model '{model}' requires LiteLLM but it is not installed. "
"Install it with: pip install 'crewai[litellm]' or pip install litellm"
)
raise ImportError(
f"Model '{model}' requires LiteLLM for inference but LiteLLM is not installed. "
"Please install it with: pip install 'crewai[litellm]' or pip install litellm"
) from None
logger.error("LiteLLM is not available, falling back to LiteLLM")
raise ImportError("Fallback to LiteLLM is not available") from None
instance = object.__new__(cls)
super(LLM, instance).__init__(model=model, is_litellm=True, **kwargs)
@@ -1150,7 +1144,7 @@ class LLM(BaseLLM):
if response_model:
params["response_model"] = response_model
response = litellm.completion(**params)
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1369,7 +1363,7 @@ class LLM(BaseLLM):
"""
full_response = ""
chunk_count = 0
usage_info = None
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict(

View File

@@ -54,21 +54,15 @@ class GeminiCompletion(BaseLLM):
safety_settings: dict[str, Any] | None = None,
client_params: dict[str, Any] | None = None,
interceptor: BaseInterceptor[Any, Any] | None = None,
use_vertexai: bool | None = None,
**kwargs: Any,
):
"""Initialize Google Gemini chat completion client.
Args:
model: Gemini model name (e.g., 'gemini-2.0-flash-001', 'gemini-1.5-pro')
api_key: Google API key for Gemini API authentication.
Defaults to GOOGLE_API_KEY or GEMINI_API_KEY env var.
NOTE: Cannot be used with Vertex AI (project parameter). Use Gemini API instead.
project: Google Cloud project ID for Vertex AI with ADC authentication.
Requires Application Default Credentials (gcloud auth application-default login).
NOTE: Vertex AI does NOT support API keys, only OAuth2/ADC.
If both api_key and project are set, api_key takes precedence.
location: Google Cloud location (for Vertex AI with ADC, defaults to 'us-central1')
api_key: Google API key (defaults to GOOGLE_API_KEY or GEMINI_API_KEY env var)
project: Google Cloud project ID (for Vertex AI)
location: Google Cloud location (for Vertex AI, defaults to 'us-central1')
temperature: Sampling temperature (0-2)
top_p: Nucleus sampling parameter
top_k: Top-k sampling parameter
@@ -79,12 +73,6 @@ class GeminiCompletion(BaseLLM):
client_params: Additional parameters to pass to the Google Gen AI Client constructor.
Supports parameters like http_options, credentials, debug_config, etc.
interceptor: HTTP interceptor (not yet supported for Gemini).
use_vertexai: Whether to use Vertex AI instead of Gemini API.
- True: Use Vertex AI (with ADC or Express mode with API key)
- False: Use Gemini API (explicitly override env var)
- None (default): Check GOOGLE_GENAI_USE_VERTEXAI env var
When using Vertex AI with API key (Express mode), http_options with
api_version="v1" is automatically configured.
**kwargs: Additional parameters
"""
if interceptor is not None:
@@ -107,8 +95,7 @@ class GeminiCompletion(BaseLLM):
self.project = project or os.getenv("GOOGLE_CLOUD_PROJECT")
self.location = location or os.getenv("GOOGLE_CLOUD_LOCATION") or "us-central1"
if use_vertexai is None:
use_vertexai = os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "").lower() == "true"
use_vertexai = os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "").lower() == "true"
self.client = self._initialize_client(use_vertexai)
@@ -159,34 +146,13 @@ class GeminiCompletion(BaseLLM):
Returns:
Initialized Google Gen AI Client
Note:
Google Gen AI SDK has two distinct endpoints with different auth requirements:
- Gemini API (generativelanguage.googleapis.com): Supports API key authentication
- Vertex AI (aiplatform.googleapis.com): Only supports OAuth2/ADC, NO API keys
When vertexai=True is set, it routes to aiplatform.googleapis.com which rejects
API keys. Use Gemini API endpoint for API key authentication instead.
"""
client_params = {}
if self.client_params:
client_params.update(self.client_params)
# Determine authentication mode based on available credentials
has_api_key = bool(self.api_key)
has_project = bool(self.project)
if has_api_key and has_project:
logging.warning(
"Both API key and project provided. Using API key authentication. "
"Project/location parameters are ignored when using API keys. "
"To use Vertex AI with ADC, remove the api_key parameter."
)
has_project = False
# Vertex AI with ADC (project without API key)
if (use_vertexai or has_project) and not has_api_key:
if use_vertexai or self.project:
client_params.update(
{
"vertexai": True,
@@ -195,20 +161,12 @@ class GeminiCompletion(BaseLLM):
}
)
# API key authentication (works with both Gemini API and Vertex AI Express)
elif has_api_key:
client_params.pop("api_key", None)
elif self.api_key:
client_params["api_key"] = self.api_key
# Vertex AI Express mode: API key + vertexai=True + http_options with api_version="v1"
# See: https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstart?usertype=apikey
if use_vertexai:
client_params["vertexai"] = True
client_params["http_options"] = types.HttpOptions(api_version="v1")
else:
# This ensures we use the Gemini API (generativelanguage.googleapis.com)
client_params["vertexai"] = False
# Clean up project/location (not allowed with API key)
client_params.pop("vertexai", None)
client_params.pop("project", None)
client_params.pop("location", None)
@@ -217,13 +175,10 @@ class GeminiCompletion(BaseLLM):
return genai.Client(**client_params)
except Exception as e:
raise ValueError(
"Authentication required. Provide one of:\n"
" 1. API key via GOOGLE_API_KEY or GEMINI_API_KEY environment variable\n"
" (use_vertexai=True is optional for Vertex AI with API key)\n"
" 2. For Vertex AI with ADC: Set GOOGLE_CLOUD_PROJECT and run:\n"
" gcloud auth application-default login\n"
" 3. Pass api_key parameter directly to LLM constructor\n"
"Either GOOGLE_API_KEY/GEMINI_API_KEY (for Gemini API) or "
"GOOGLE_CLOUD_PROJECT (for Vertex AI) must be set"
) from e
return genai.Client(**client_params)
def _get_client_params(self) -> dict[str, Any]:
@@ -247,8 +202,6 @@ class GeminiCompletion(BaseLLM):
"location": self.location,
}
)
if self.api_key:
params["api_key"] = self.api_key
elif self.api_key:
params["api_key"] = self.api_key

View File

@@ -1,6 +1,5 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
import json
import re
@@ -55,23 +54,6 @@ console = Console()
_MULTIPLE_NEWLINES: Final[re.Pattern[str]] = re.compile(r"\n+")
def is_inside_event_loop() -> bool:
"""Check if code is currently running inside an asyncio event loop.
This is used to detect when code is being called from within an async context
(e.g., inside a Flow). In such cases, callers should return a coroutine
instead of executing synchronously to avoid nested event loop errors.
Returns:
True if inside a running event loop, False otherwise.
"""
try:
asyncio.get_running_loop()
return True
except RuntimeError:
return False
def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]:
"""Parse tools to be used for the task.

View File

@@ -1,4 +1,4 @@
"""Unit tests for AgentExecutor.
"""Unit tests for CrewAgentExecutorFlow.
Tests the Flow-based agent executor implementation including state management,
flow methods, routing logic, and error handling.
@@ -8,9 +8,9 @@ from unittest.mock import Mock, patch
import pytest
from crewai.experimental.agent_executor import (
from crewai.experimental.crew_agent_executor_flow import (
AgentReActState,
AgentExecutor,
CrewAgentExecutorFlow,
)
from crewai.agents.parser import AgentAction, AgentFinish
@@ -43,8 +43,8 @@ class TestAgentReActState:
assert state.ask_for_human_input is True
class TestAgentExecutor:
"""Test AgentExecutor class."""
class TestCrewAgentExecutorFlow:
"""Test CrewAgentExecutorFlow class."""
@pytest.fixture
def mock_dependencies(self):
@@ -87,8 +87,8 @@ class TestAgentExecutor:
}
def test_executor_initialization(self, mock_dependencies):
"""Test AgentExecutor initialization."""
executor = AgentExecutor(**mock_dependencies)
"""Test CrewAgentExecutorFlow initialization."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor.llm == mock_dependencies["llm"]
assert executor.task == mock_dependencies["task"]
@@ -100,9 +100,9 @@ class TestAgentExecutor:
def test_initialize_reasoning(self, mock_dependencies):
"""Test flow entry point."""
with patch.object(
AgentExecutor, "_show_start_logs"
CrewAgentExecutorFlow, "_show_start_logs"
) as mock_show_start:
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
result = executor.initialize_reasoning()
assert result == "initialized"
@@ -110,7 +110,7 @@ class TestAgentExecutor:
def test_check_max_iterations_not_reached(self, mock_dependencies):
"""Test routing when iterations < max."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.iterations = 5
result = executor.check_max_iterations()
@@ -118,7 +118,7 @@ class TestAgentExecutor:
def test_check_max_iterations_reached(self, mock_dependencies):
"""Test routing when iterations >= max."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.iterations = 10
result = executor.check_max_iterations()
@@ -126,7 +126,7 @@ class TestAgentExecutor:
def test_route_by_answer_type_action(self, mock_dependencies):
"""Test routing for AgentAction."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="search", tool_input="query", text="action text"
)
@@ -136,7 +136,7 @@ class TestAgentExecutor:
def test_route_by_answer_type_finish(self, mock_dependencies):
"""Test routing for AgentFinish."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thoughts", output="Final answer", text="complete"
)
@@ -146,7 +146,7 @@ class TestAgentExecutor:
def test_continue_iteration(self, mock_dependencies):
"""Test iteration continuation."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
result = executor.continue_iteration()
@@ -154,8 +154,8 @@ class TestAgentExecutor:
def test_finalize_success(self, mock_dependencies):
"""Test finalize with valid AgentFinish."""
with patch.object(AgentExecutor, "_show_logs") as mock_show_logs:
executor = AgentExecutor(**mock_dependencies)
with patch.object(CrewAgentExecutorFlow, "_show_logs") as mock_show_logs:
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thinking", output="Done", text="complete"
)
@@ -168,7 +168,7 @@ class TestAgentExecutor:
def test_finalize_failure(self, mock_dependencies):
"""Test finalize skips when given AgentAction instead of AgentFinish."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="search", tool_input="query", text="action text"
)
@@ -181,7 +181,7 @@ class TestAgentExecutor:
def test_format_prompt(self, mock_dependencies):
"""Test prompt formatting."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
inputs = {"input": "test input", "tool_names": "tool1, tool2", "tools": "desc"}
result = executor._format_prompt("Prompt {input} {tool_names} {tools}", inputs)
@@ -192,18 +192,18 @@ class TestAgentExecutor:
def test_is_training_mode_false(self, mock_dependencies):
"""Test training mode detection when not in training."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor._is_training_mode() is False
def test_is_training_mode_true(self, mock_dependencies):
"""Test training mode detection when in training."""
mock_dependencies["crew"]._train = True
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor._is_training_mode() is True
def test_append_message_to_state(self, mock_dependencies):
"""Test message appending to state."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
initial_count = len(executor.state.messages)
executor._append_message_to_state("test message")
@@ -216,7 +216,7 @@ class TestAgentExecutor:
callback = Mock()
mock_dependencies["step_callback"] = callback
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
answer = AgentFinish(thought="thinking", output="test", text="final")
executor._invoke_step_callback(answer)
@@ -226,14 +226,14 @@ class TestAgentExecutor:
def test_invoke_step_callback_none(self, mock_dependencies):
"""Test step callback when none provided."""
mock_dependencies["step_callback"] = None
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
# Should not raise error
executor._invoke_step_callback(
AgentFinish(thought="thinking", output="test", text="final")
)
@patch("crewai.experimental.agent_executor.handle_output_parser_exception")
@patch("crewai.experimental.crew_agent_executor_flow.handle_output_parser_exception")
def test_recover_from_parser_error(
self, mock_handle_exception, mock_dependencies
):
@@ -242,7 +242,7 @@ class TestAgentExecutor:
mock_handle_exception.return_value = None
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor._last_parser_error = OutputParserError("test error")
initial_iterations = executor.state.iterations
@@ -252,12 +252,12 @@ class TestAgentExecutor:
assert executor.state.iterations == initial_iterations + 1
mock_handle_exception.assert_called_once()
@patch("crewai.experimental.agent_executor.handle_context_length")
@patch("crewai.experimental.crew_agent_executor_flow.handle_context_length")
def test_recover_from_context_length(
self, mock_handle_context, mock_dependencies
):
"""Test recovery from context length error."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor._last_context_error = Exception("context too long")
initial_iterations = executor.state.iterations
@@ -270,16 +270,16 @@ class TestAgentExecutor:
def test_use_stop_words_property(self, mock_dependencies):
"""Test use_stop_words property."""
mock_dependencies["llm"].supports_stop_words.return_value = True
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor.use_stop_words is True
mock_dependencies["llm"].supports_stop_words.return_value = False
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
assert executor.use_stop_words is False
def test_compatibility_properties(self, mock_dependencies):
"""Test compatibility properties for mixin."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.messages = [{"role": "user", "content": "test"}]
executor.state.iterations = 5
@@ -321,8 +321,8 @@ class TestFlowErrorHandling:
"tools_handler": Mock(),
}
@patch("crewai.experimental.agent_executor.get_llm_response")
@patch("crewai.experimental.agent_executor.enforce_rpm_limit")
@patch("crewai.experimental.crew_agent_executor_flow.get_llm_response")
@patch("crewai.experimental.crew_agent_executor_flow.enforce_rpm_limit")
def test_call_llm_parser_error(
self, mock_enforce_rpm, mock_get_llm, mock_dependencies
):
@@ -332,15 +332,15 @@ class TestFlowErrorHandling:
mock_enforce_rpm.return_value = None
mock_get_llm.side_effect = OutputParserError("parse failed")
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
result = executor.call_llm_and_parse()
assert result == "parser_error"
assert executor._last_parser_error is not None
@patch("crewai.experimental.agent_executor.get_llm_response")
@patch("crewai.experimental.agent_executor.enforce_rpm_limit")
@patch("crewai.experimental.agent_executor.is_context_length_exceeded")
@patch("crewai.experimental.crew_agent_executor_flow.get_llm_response")
@patch("crewai.experimental.crew_agent_executor_flow.enforce_rpm_limit")
@patch("crewai.experimental.crew_agent_executor_flow.is_context_length_exceeded")
def test_call_llm_context_error(
self,
mock_is_context_exceeded,
@@ -353,7 +353,7 @@ class TestFlowErrorHandling:
mock_get_llm.side_effect = Exception("context length")
mock_is_context_exceeded.return_value = True
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
result = executor.call_llm_and_parse()
assert result == "context_error"
@@ -397,10 +397,10 @@ class TestFlowInvoke:
"tools_handler": Mock(),
}
@patch.object(AgentExecutor, "kickoff")
@patch.object(AgentExecutor, "_create_short_term_memory")
@patch.object(AgentExecutor, "_create_long_term_memory")
@patch.object(AgentExecutor, "_create_external_memory")
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(CrewAgentExecutorFlow, "_create_short_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_long_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_external_memory")
def test_invoke_success(
self,
mock_external_memory,
@@ -410,7 +410,7 @@ class TestFlowInvoke:
mock_dependencies,
):
"""Test successful invoke without human feedback."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
# Mock kickoff to set the final answer in state
def mock_kickoff_side_effect():
@@ -429,10 +429,10 @@ class TestFlowInvoke:
mock_long_term_memory.assert_called_once()
mock_external_memory.assert_called_once()
@patch.object(AgentExecutor, "kickoff")
@patch.object(CrewAgentExecutorFlow, "kickoff")
def test_invoke_failure_no_agent_finish(self, mock_kickoff, mock_dependencies):
"""Test invoke fails without AgentFinish."""
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="test", tool_input="test", text="action text"
)
@@ -442,10 +442,10 @@ class TestFlowInvoke:
with pytest.raises(RuntimeError, match="without reaching a final answer"):
executor.invoke(inputs)
@patch.object(AgentExecutor, "kickoff")
@patch.object(AgentExecutor, "_create_short_term_memory")
@patch.object(AgentExecutor, "_create_long_term_memory")
@patch.object(AgentExecutor, "_create_external_memory")
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(CrewAgentExecutorFlow, "_create_short_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_long_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_external_memory")
def test_invoke_with_system_prompt(
self,
mock_external_memory,
@@ -459,7 +459,7 @@ class TestFlowInvoke:
"system": "System: {input}",
"user": "User: {input} {tool_names} {tools}",
}
executor = AgentExecutor(**mock_dependencies)
executor = CrewAgentExecutorFlow(**mock_dependencies)
def mock_kickoff_side_effect():
executor.state.current_answer = AgentFinish(

View File

@@ -72,53 +72,62 @@ class ResearchResult(BaseModel):
@pytest.mark.vcr()
@pytest.mark.parametrize("verbose", [True, False])
def test_agent_kickoff_preserves_parameters(verbose):
"""Test that Agent.kickoff() uses the correct parameters from the Agent."""
def test_lite_agent_created_with_correct_parameters(monkeypatch, verbose):
"""Test that LiteAgent is created with the correct parameters when Agent.kickoff() is called."""
# Create a test agent with specific parameters
mock_llm = Mock(spec=LLM)
mock_llm.call.return_value = "Final Answer: Test response"
mock_llm.stop = []
from crewai.types.usage_metrics import UsageMetrics
mock_usage_metrics = UsageMetrics(
total_tokens=100,
prompt_tokens=50,
completion_tokens=50,
cached_prompt_tokens=0,
successful_requests=1,
)
mock_llm.get_token_usage_summary.return_value = mock_usage_metrics
llm = LLM(model="gpt-4o-mini")
custom_tools = [WebSearchTool(), CalculatorTool()]
max_iter = 10
max_execution_time = 300
agent = Agent(
role="Test Agent",
goal="Test Goal",
backstory="Test Backstory",
llm=mock_llm,
llm=llm,
tools=custom_tools,
max_iter=max_iter,
max_execution_time=max_execution_time,
verbose=verbose,
)
# Call kickoff and verify it works
result = agent.kickoff("Test query")
# Create a mock to capture the created LiteAgent
created_lite_agent = None
original_lite_agent = LiteAgent
# Verify the agent was configured correctly
assert agent.role == "Test Agent"
assert agent.goal == "Test Goal"
assert agent.backstory == "Test Backstory"
assert len(agent.tools) == 2
assert isinstance(agent.tools[0], WebSearchTool)
assert isinstance(agent.tools[1], CalculatorTool)
assert agent.max_iter == max_iter
assert agent.verbose == verbose
# Define a mock LiteAgent class that captures its arguments
class MockLiteAgent(original_lite_agent):
def __init__(self, **kwargs):
nonlocal created_lite_agent
created_lite_agent = kwargs
super().__init__(**kwargs)
# Verify kickoff returned a result
assert result is not None
assert result.raw is not None
# Patch the LiteAgent class
monkeypatch.setattr("crewai.agent.core.LiteAgent", MockLiteAgent)
# Call kickoff to create the LiteAgent
agent.kickoff("Test query")
# Verify all parameters were passed correctly
assert created_lite_agent is not None
assert created_lite_agent["role"] == "Test Agent"
assert created_lite_agent["goal"] == "Test Goal"
assert created_lite_agent["backstory"] == "Test Backstory"
assert created_lite_agent["llm"] == llm
assert len(created_lite_agent["tools"]) == 2
assert isinstance(created_lite_agent["tools"][0], WebSearchTool)
assert isinstance(created_lite_agent["tools"][1], CalculatorTool)
assert created_lite_agent["max_iterations"] == max_iter
assert created_lite_agent["max_execution_time"] == max_execution_time
assert created_lite_agent["verbose"] == verbose
assert created_lite_agent["response_format"] is None
# Test with a response_format
class TestResponse(BaseModel):
test_field: str
agent.kickoff("Test query", response_format=TestResponse)
assert created_lite_agent["response_format"] == TestResponse
@pytest.mark.vcr()
@@ -301,8 +310,7 @@ def verify_agent_parent_flow(result, agent, flow):
def test_sets_parent_flow_when_inside_flow():
"""Test that an Agent can be created and executed inside a Flow context."""
captured_event = None
captured_agent = None
mock_llm = Mock(spec=LLM)
mock_llm.call.return_value = "Test response"
@@ -335,17 +343,15 @@ def test_sets_parent_flow_when_inside_flow():
event_received = threading.Event()
@crewai_event_bus.on(LiteAgentExecutionStartedEvent)
def capture_event(source, event):
nonlocal captured_event
captured_event = event
def capture_agent(source, event):
nonlocal captured_agent
captured_agent = source
event_received.set()
result = flow.kickoff()
flow.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for agent execution event"
assert captured_event is not None
assert captured_event.agent_info["role"] == "Test Agent"
assert result is not None
assert captured_agent.parent_flow is flow
@pytest.mark.vcr()
@@ -367,14 +373,16 @@ def test_guardrail_is_called_using_string():
@crewai_event_bus.on(LLMGuardrailStartedEvent)
def capture_guardrail_started(source, event):
assert isinstance(source, Agent)
assert isinstance(source, LiteAgent)
assert source.original_agent == agent
with condition:
guardrail_events["started"].append(event)
condition.notify()
@crewai_event_bus.on(LLMGuardrailCompletedEvent)
def capture_guardrail_completed(source, event):
assert isinstance(source, Agent)
assert isinstance(source, LiteAgent)
assert source.original_agent == agent
with condition:
guardrail_events["completed"].append(event)
condition.notify()
@@ -675,151 +683,3 @@ def test_agent_kickoff_with_mcp_tools(mock_get_mcp_tools):
# Verify MCP tools were retrieved
mock_get_mcp_tools.assert_called_once_with("https://mcp.exa.ai/mcp?api_key=test_exa_key&profile=research")
# ============================================================================
# Tests for LiteAgent inside Flow (magic auto-async pattern)
# ============================================================================
from crewai.flow.flow import listen
@pytest.mark.vcr()
def test_lite_agent_inside_flow_sync():
"""Test that LiteAgent.kickoff() works magically inside a Flow.
This tests the "magic auto-async" pattern where calling agent.kickoff()
from within a Flow automatically detects the event loop and returns a
coroutine that the Flow framework awaits. Users don't need to use async/await.
"""
# Track execution
execution_log = []
class TestFlow(Flow):
@start()
def run_agent(self):
execution_log.append("flow_started")
agent = Agent(
role="Test Agent",
goal="Answer questions",
backstory="A helpful test assistant",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
# Magic: just call kickoff() normally - it auto-detects Flow context
result = agent.kickoff(messages="What is 2+2? Reply with just the number.")
execution_log.append("agent_completed")
return result
flow = TestFlow()
result = flow.kickoff()
# Verify the flow executed successfully
assert "flow_started" in execution_log
assert "agent_completed" in execution_log
assert result is not None
assert isinstance(result, LiteAgentOutput)
@pytest.mark.vcr()
def test_lite_agent_inside_flow_with_tools():
"""Test that LiteAgent with tools works correctly inside a Flow."""
class TestFlow(Flow):
@start()
def run_agent_with_tools(self):
agent = Agent(
role="Calculator Agent",
goal="Perform calculations",
backstory="A math expert",
llm=LLM(model="gpt-4o-mini"),
tools=[CalculatorTool()],
verbose=False,
)
result = agent.kickoff(messages="Calculate 10 * 5")
return result
flow = TestFlow()
result = flow.kickoff()
assert result is not None
assert isinstance(result, LiteAgentOutput)
assert result.raw is not None
@pytest.mark.vcr()
def test_multiple_agents_in_same_flow():
"""Test that multiple LiteAgents can run sequentially in the same Flow."""
class MultiAgentFlow(Flow):
@start()
def first_step(self):
agent1 = Agent(
role="First Agent",
goal="Greet users",
backstory="A friendly greeter",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
return agent1.kickoff(messages="Say hello")
@listen(first_step)
def second_step(self, first_result):
agent2 = Agent(
role="Second Agent",
goal="Say goodbye",
backstory="A polite farewell agent",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
return agent2.kickoff(messages="Say goodbye")
flow = MultiAgentFlow()
result = flow.kickoff()
assert result is not None
assert isinstance(result, LiteAgentOutput)
@pytest.mark.vcr()
def test_lite_agent_kickoff_async_inside_flow():
"""Test that Agent.kickoff_async() works correctly from async Flow methods."""
class AsyncAgentFlow(Flow):
@start()
async def async_agent_step(self):
agent = Agent(
role="Async Test Agent",
goal="Answer questions asynchronously",
backstory="An async helper",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
result = await agent.kickoff_async(messages="What is 3+3?")
return result
flow = AsyncAgentFlow()
result = flow.kickoff()
assert result is not None
assert isinstance(result, LiteAgentOutput)
@pytest.mark.vcr()
def test_lite_agent_standalone_still_works():
"""Test that LiteAgent.kickoff() still works normally outside of a Flow.
This verifies that the magic auto-async pattern doesn't break standalone usage
where there's no event loop running.
"""
agent = Agent(
role="Standalone Agent",
goal="Answer questions",
backstory="A helpful assistant",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
# This should work normally - no Flow, no event loop
result = agent.kickoff(messages="What is 5+5? Reply with just the number.")
assert result is not None
assert isinstance(result, LiteAgentOutput)
assert result.raw is not None

View File

@@ -1,119 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Test Agent. A helpful
test assistant\nYour personal goal is: Answer questions\nTo give my best complete
final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
What is 2+2? Reply with just the number.\n\nBegin! This is VERY important to
you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '673'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7b0HjL79y39EkUcMLrRhPFe3XGj\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444914,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: 4\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 136,\n \"completion_tokens\": 13,\n
\ \"total_tokens\": 149,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_8bbc38b4db\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:55 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '857'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '341'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '358'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,255 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Agent. A math
expert\nYour personal goal is: Perform calculations\nYou ONLY have access to
the following tools, and should NEVER make up tools that are not listed here:\n\nTool
Name: calculate\nTool Arguments: {\n \"properties\": {\n \"expression\":
{\n \"title\": \"Expression\",\n \"type\": \"string\"\n }\n },\n \"required\":
[\n \"expression\"\n ],\n \"title\": \"CalculatorToolSchema\",\n \"type\":
\"object\",\n \"additionalProperties\": false\n}\nTool Description: Calculate
the result of a mathematical expression.\n\nIMPORTANT: Use the following format
in your response:\n\n```\nThought: you should always think about what to do\nAction:
the action to take, only one name of [calculate], just the name, exactly as
it''s written.\nAction Input: the input to the action, just a simple JSON object,
enclosed in curly braces, using \" to wrap keys and values.\nObservation: the
result of the action\n```\n\nOnce all necessary information is gathered, return
the following format:\n\n```\nThought: I now know the final answer\nFinal Answer:
the final answer to the original input question\n```"},{"role":"user","content":"\nCurrent
Task: Calculate 10 * 5\n\nBegin! This is VERY important to you, use the tools
available and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1403'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7avghVPSpszLmlbHpwDQlWDoD6O\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444909,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Thought: I need to calculate the expression
10 * 5.\\nAction: calculate\\nAction Input: {\\\"expression\\\":\\\"10 * 5\\\"}\\nObservation:
50\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 291,\n \"completion_tokens\": 33,\n
\ \"total_tokens\": 324,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_c4585b5b9c\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:49 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '939'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '579'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '598'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Agent. A math
expert\nYour personal goal is: Perform calculations\nYou ONLY have access to
the following tools, and should NEVER make up tools that are not listed here:\n\nTool
Name: calculate\nTool Arguments: {\n \"properties\": {\n \"expression\":
{\n \"title\": \"Expression\",\n \"type\": \"string\"\n }\n },\n \"required\":
[\n \"expression\"\n ],\n \"title\": \"CalculatorToolSchema\",\n \"type\":
\"object\",\n \"additionalProperties\": false\n}\nTool Description: Calculate
the result of a mathematical expression.\n\nIMPORTANT: Use the following format
in your response:\n\n```\nThought: you should always think about what to do\nAction:
the action to take, only one name of [calculate], just the name, exactly as
it''s written.\nAction Input: the input to the action, just a simple JSON object,
enclosed in curly braces, using \" to wrap keys and values.\nObservation: the
result of the action\n```\n\nOnce all necessary information is gathered, return
the following format:\n\n```\nThought: I now know the final answer\nFinal Answer:
the final answer to the original input question\n```"},{"role":"user","content":"\nCurrent
Task: Calculate 10 * 5\n\nBegin! This is VERY important to you, use the tools
available and give your best Final Answer, your job depends on it!\n\nThought:"},{"role":"assistant","content":"Thought:
I need to calculate the expression 10 * 5.\nAction: calculate\nAction Input:
{\"expression\":\"10 * 5\"}\nObservation: The result of 10 * 5 is 50"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1591'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7avDhDZCLvv8v2dh8ZQRrLdci6A\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444909,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Thought: I now know the final answer.\\nFinal
Answer: 50\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 337,\n \"completion_tokens\": 14,\n
\ \"total_tokens\": 351,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_c4585b5b9c\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:50 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '864'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '429'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '457'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,119 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Async Test Agent. An async
helper\nYour personal goal is: Answer questions asynchronously\nTo give my best
complete final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
What is 3+3?\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '657'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7atOGxtc4y3oYNI62WiQ0Vogsdv\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444907,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: The sum of 3 + 3 is 6. Therefore, the outcome is that if you add three
and three together, you will arrive at the total of six.\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
131,\n \"completion_tokens\": 46,\n \"total_tokens\": 177,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:48 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '983'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '944'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1192'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,119 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Standalone Agent. A helpful
assistant\nYour personal goal is: Answer questions\nTo give my best complete
final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
What is 5+5? Reply with just the number.\n\nBegin! This is VERY important to
you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '674'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7azhPwUHQ0p5tdhxSAmLPoE8UgC\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444913,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: 10\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 136,\n \"completion_tokens\": 13,\n
\ \"total_tokens\": 149,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:54 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '858'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '455'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '583'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,239 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are First Agent. A friendly
greeter\nYour personal goal is: Greet users\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say
hello\n\nBegin! This is VERY important to you, use the tools available and give
your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '632'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CyRKzgODZ9yn3F9OkaXsscLk2Ln3N\",\n \"object\":
\"chat.completion\",\n \"created\": 1768520801,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: Hello! Welcome! I'm so glad to see you here. If you need any assistance
or have any questions, feel free to ask. Have a wonderful day!\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
127,\n \"completion_tokens\": 43,\n \"total_tokens\": 170,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_c4585b5b9c\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 23:46:42 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '990'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '880'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1160'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Second Agent. A polite
farewell agent\nYour personal goal is: Say goodbye\nTo give my best complete
final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
Say goodbye\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '640'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CyRL1Ua2PkK5xXPp3KeF0AnGAk3JP\",\n \"object\":
\"chat.completion\",\n \"created\": 1768520803,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: As we reach the end of our conversation, I want to express my gratitude
for the time we've shared. It's been a pleasure assisting you, and I hope
you found our interaction helpful and enjoyable. Remember, whenever you need
assistance, I'm just a message away. Wishing you all the best in your future
endeavors. Goodbye and take care!\",\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 126,\n \"completion_tokens\":
79,\n \"total_tokens\": 205,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 23:46:44 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1189'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '1363'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1605'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,75 +0,0 @@
interactions:
- request:
body: '{"contents": [{"parts": [{"text": "\nCurrent Task: What is the capital
of Japan?\n\nThis is the expected criteria for your final answer: The capital
of Japan\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "role":
"user"}], "systemInstruction": {"parts": [{"text": "You are Research Assistant.
You are a helpful research assistant.\nYour personal goal is: Find information
about the capital of Japan\nTo give my best complete final answer to the task
respond using the exact following format:\n\nThought: I now can give a great
answer\nFinal Answer: Your final answer must be the great and the most complete
as possible, it must be outcome described.\n\nI MUST use these formats, my job
depends on it!"}], "role": "user"}, "generationConfig": {"stopSequences": ["\nObservation:"]}}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- '*/*'
accept-encoding:
- ACCEPT-ENCODING-XXX
connection:
- keep-alive
content-length:
- '952'
content-type:
- application/json
host:
- aiplatform.googleapis.com
x-goog-api-client:
- google-genai-sdk/1.59.0 gl-python/3.13.3
x-goog-api-key:
- X-GOOG-API-KEY-XXX
method: POST
uri: https://aiplatform.googleapis.com/v1/publishers/google/models/gemini-2.0-flash-exp:generateContent
response:
body:
string: "{\n \"candidates\": [\n {\n \"content\": {\n \"role\":
\"model\",\n \"parts\": [\n {\n \"text\": \"The
capital of Japan is Tokyo.\\nFinal Answer: Tokyo\\n\"\n }\n ]\n
\ },\n \"finishReason\": \"STOP\",\n \"avgLogprobs\": -0.017845841554495003\n
\ }\n ],\n \"usageMetadata\": {\n \"promptTokenCount\": 163,\n \"candidatesTokenCount\":
13,\n \"totalTokenCount\": 176,\n \"trafficType\": \"ON_DEMAND\",\n
\ \"promptTokensDetails\": [\n {\n \"modality\": \"TEXT\",\n
\ \"tokenCount\": 163\n }\n ],\n \"candidatesTokensDetails\":
[\n {\n \"modality\": \"TEXT\",\n \"tokenCount\": 13\n
\ }\n ]\n },\n \"modelVersion\": \"gemini-2.0-flash-exp\",\n \"createTime\":
\"2026-01-15T22:27:38.066749Z\",\n \"responseId\": \"2mlpab2JBNOFidsPh5GigQs\"\n}\n"
headers:
Alt-Svc:
- h3=":443"; ma=2592000,h3-29=":443"; ma=2592000
Content-Type:
- application/json; charset=UTF-8
Date:
- Thu, 15 Jan 2026 22:27:38 GMT
Server:
- scaffolding on HTTPServer2
Transfer-Encoding:
- chunked
Vary:
- Origin
- X-Origin
- Referer
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
X-Frame-Options:
- X-FRAME-OPTIONS-XXX
X-XSS-Protection:
- '0'
content-length:
- '786'
status:
code: 200
message: OK
version: 1

File diff suppressed because one or more lines are too long

View File

@@ -1,528 +1,456 @@
interactions:
- request:
body: "{\"messages\":[{\"role\":\"system\",\"content\":\"You are Guardrail Agent.
You are a expert at validating the output of a task. By providing effective
feedback if the output is not valid.\\nYour personal goal is: Validate the output
of the task\\nTo give my best complete final answer to the task respond using
the exact following format:\\n\\nThought: I now can give a great answer\\nFinal
Answer: Your final answer must be the great and the most complete as possible,
it must be outcome described.\\n\\nI MUST use these formats, my job depends
on it!\"},{\"role\":\"user\",\"content\":\"\\nCurrent Task: \\n Ensure
the following task result complies with the given guardrail.\\n\\n Task
result:\\n \\n Lorem Ipsum is simply dummy text of the printing
and typesetting industry. Lorem Ipsum has been the industry's standard dummy
text ever\\n \\n\\n Guardrail:\\n Ensure the result has
less than 10 words\\n\\n Your task:\\n - Confirm if the Task result
complies with the guardrail.\\n - If not, provide clear feedback explaining
what is wrong (e.g., by how much it violates the rule, or what specific part
fails).\\n - Focus only on identifying issues \u2014 do not propose corrections.\\n
\ - If the Task result complies with the guardrail, saying that is valid\\n
\ \\n\\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\\n\\nThought:\"}],\"model\":\"gpt-4o\"}"
body: '{"trace_id": "00000000-0000-0000-0000-000000000000", "execution_type": "crew", "user_identifier": null, "execution_context": {"crew_fingerprint": null, "crew_name": "Unknown Crew", "flow_name": null, "crewai_version": "1.3.0", "privacy_level": "standard"}, "execution_metadata": {"expected_duration_estimate": 300, "agent_count": 0, "task_count": 0, "flow_method_count": 0, "execution_started_at": "2025-11-05T22:19:56.074812+00:00"}}'
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '434'
Content-Type:
- application/json
User-Agent:
- X-USER-AGENT-XXX
- CrewAI-CLI/1.3.0
X-Crewai-Version:
- 1.3.0
method: POST
uri: https://app.crewai.com/crewai_plus/api/v1/tracing/batches
response:
body:
string: '{"error":"bad_credentials","message":"Bad credentials"}'
headers:
Connection:
- keep-alive
Content-Length:
- '55'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 05 Nov 2025 22:19:56 GMT
cache-control:
- no-store
content-security-policy:
- 'default-src ''self'' *.app.crewai.com app.crewai.com; script-src ''self'' ''unsafe-inline'' *.app.crewai.com app.crewai.com https://cdn.jsdelivr.net/npm/apexcharts https://www.gstatic.com https://run.pstmn.io https://apis.google.com https://apis.google.com/js/api.js https://accounts.google.com https://accounts.google.com/gsi/client https://cdnjs.cloudflare.com/ajax/libs/normalize/8.0.1/normalize.min.css.map https://*.google.com https://docs.google.com https://slides.google.com https://js.hs-scripts.com https://js.sentry-cdn.com https://browser.sentry-cdn.com https://www.googletagmanager.com https://js-na1.hs-scripts.com https://js.hubspot.com http://js-na1.hs-scripts.com https://bat.bing.com https://cdn.amplitude.com https://cdn.segment.com https://d1d3n03t5zntha.cloudfront.net/ https://descriptusercontent.com https://edge.fullstory.com https://googleads.g.doubleclick.net https://js.hs-analytics.net https://js.hs-banner.com https://js.hsadspixel.net https://js.hscollectedforms.net
https://js.usemessages.com https://snap.licdn.com https://static.cloudflareinsights.com https://static.reo.dev https://www.google-analytics.com https://share.descript.com/; style-src ''self'' ''unsafe-inline'' *.app.crewai.com app.crewai.com https://cdn.jsdelivr.net/npm/apexcharts; img-src ''self'' data: *.app.crewai.com app.crewai.com https://zeus.tools.crewai.com https://dashboard.tools.crewai.com https://cdn.jsdelivr.net https://forms.hsforms.com https://track.hubspot.com https://px.ads.linkedin.com https://px4.ads.linkedin.com https://www.google.com https://www.google.com.br; font-src ''self'' data: *.app.crewai.com app.crewai.com; connect-src ''self'' *.app.crewai.com app.crewai.com https://zeus.tools.crewai.com https://connect.useparagon.com/ https://zeus.useparagon.com/* https://*.useparagon.com/* https://run.pstmn.io https://connect.tools.crewai.com/ https://*.sentry.io https://www.google-analytics.com https://edge.fullstory.com https://rs.fullstory.com https://api.hubspot.com
https://forms.hscollectedforms.net https://api.hubapi.com https://px.ads.linkedin.com https://px4.ads.linkedin.com https://google.com/pagead/form-data/16713662509 https://google.com/ccm/form-data/16713662509 https://www.google.com/ccm/collect https://worker-actionkit.tools.crewai.com https://api.reo.dev; frame-src ''self'' *.app.crewai.com app.crewai.com https://connect.useparagon.com/ https://zeus.tools.crewai.com https://zeus.useparagon.com/* https://connect.tools.crewai.com/ https://docs.google.com https://drive.google.com https://slides.google.com https://accounts.google.com https://*.google.com https://app.hubspot.com/ https://td.doubleclick.net https://www.googletagmanager.com/ https://www.youtube.com https://share.descript.com'
expires:
- '0'
permissions-policy:
- camera=(), microphone=(self), geolocation=()
pragma:
- no-cache
referrer-policy:
- strict-origin-when-cross-origin
strict-transport-security:
- max-age=63072000; includeSubDomains
vary:
- Accept
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
x-permitted-cross-domain-policies:
- none
x-request-id:
- 230c6cb5-92c7-448d-8c94-e5548a9f4259
x-runtime:
- '0.073220'
x-xss-protection:
- 1; mode=block
status:
code: 401
message: Unauthorized
- request:
body: '{"messages":[{"role":"system","content":"You are Guardrail Agent. You are a expert at validating the output of a task. By providing effective feedback if the output is not valid.\nYour personal goal is: Validate the output of the task\n\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\":
[\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\": false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"\n Ensure the following task result complies with the given guardrail.\n\n Task result:\n \n Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry''s standard dummy text ever\n \n\n Guardrail:\n Ensure
the result has less than 10 words\n\n Your task:\n - Confirm if the Task result complies with the guardrail.\n - If not, provide clear feedback explaining what is wrong (e.g., by how much it violates the rule, or what specific part fails).\n - Focus only on identifying issues — do not propose corrections.\n - If the Task result complies with the guardrail, saying that is valid\n "}],"model":"gpt-4o"}'
headers:
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '1467'
- '2452'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
- MacOS
x-stainless-package-version:
- 1.83.0
- 1.109.1
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7yHRYTZi8yzRbcODnKr92keLKCb\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446357,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"The task result provided has more than
10 words. I will count the words to verify this.\\n\\nThe task result is the
following text:\\n\\\"Lorem Ipsum is simply dummy text of the printing and
typesetting industry. Lorem Ipsum has been the industry's standard dummy text
ever\\\"\\n\\nCounting the words:\\n\\n1. Lorem \\n2. Ipsum \\n3. is \\n4.
simply \\n5. dummy \\n6. text \\n7. of \\n8. the \\n9. printing \\n10. and
\\n11. typesetting \\n12. industry. \\n13. Lorem \\n14. Ipsum \\n15. has \\n16.
been \\n17. the \\n18. industry's \\n19. standard \\n20. dummy \\n21. text
\\n22. ever\\n\\nThe total word count is 22.\\n\\nThought: I now can give
a great answer\\nFinal Answer: The task result does not comply with the guardrail.
It contains 22 words, which exceeds the limit of 10 words.\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
285,\n \"completion_tokens\": 195,\n \"total_tokens\": 480,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_deacdd5f6f\"\n}\n"
string: "{\n \"id\": \"chatcmpl-CYg96Riy2RJRxnBHvoROukymP9wvs\",\n \"object\": \"chat.completion\",\n \"created\": 1762381196,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"Thought: I need to check if the task result meets the requirement of having less than 10 words.\\n\\nFinal Answer: {\\n \\\"valid\\\": false,\\n \\\"feedback\\\": \\\"The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words.\\\"\\n}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 489,\n \"completion_tokens\": 61,\n \"total_tokens\": 550,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\"\
: 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_cbf1785567\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
- REDACTED-RAY
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 03:05:59 GMT
- Wed, 05 Nov 2025 22:19:58 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
- __cf_bm=REDACTED; path=/; expires=Wed, 05-Nov-25 22:49:58 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- _cfuvid=REDACTED; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Strict-Transport-Security:
- STS-XXX
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
- nosniff
access-control-expose-headers:
- ACCESS-CONTROL-XXX
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1557'
openai-organization:
- OPENAI-ORG-XXX
- user-hortuttj2f3qtmxyik2zxf4q
openai-processing-ms:
- '2130'
- '2201'
openai-project:
- OPENAI-PROJECT-XXX
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '2147'
- '2401'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
- '500'
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
- '30000'
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
- '499'
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
- '29439'
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
- 120ms
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
- 1.122s
x-request-id:
- X-REQUEST-ID-XXX
- req_REDACTED
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly
adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\":
{\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\":
{\n \"properties\": {\n \"valid\": {\n \"description\":
\"Whether the task output complies with the guardrail\",\n \"title\":
\"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\":
{\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\":
\"null\"\n }\n ],\n \"default\": null,\n \"description\":
\"A feedback about the task output if it is not valid\",\n \"title\":
\"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\":
\"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output.
Ensure the final output does not include any code block markers like ```json
or ```python."},{"role":"user","content":"The task result does not comply with
the guardrail. It contains 22 words, which exceeds the limit of 10 words."}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether
the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A
feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"{\n \"valid\": false,\n \"feedback\": \"The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words.\"\n}"}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '1835'
- '1884'
content-type:
- application/json
cookie:
- COOKIE-XXX
- __cf_bm=REDACTED; _cfuvid=REDACTED
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
- arm64
x-stainless-async:
- 'false'
x-stainless-helper-method:
- beta.chat.completions.parse
- chat.completions.parse
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
- MacOS
x-stainless-package-version:
- 1.83.0
- 1.109.1
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7yJiPCk4fXuogyT5e8XeGRLCSf8\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446359,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"{\\\"valid\\\":false,\\\"feedback\\\":\\\"The
task output exceeds the word limit of 10 words by containing 22 words.\\\"}\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
363,\n \"completion_tokens\": 25,\n \"total_tokens\": 388,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_a0e9480a2f\"\n}\n"
string: "{\n \"id\": \"chatcmpl-CYg98QlZ8NTrQ69676MpXXyCoZJT8\",\n \"object\": \"chat.completion\",\n \"created\": 1762381198,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"{\\\"valid\\\":false,\\\"feedback\\\":\\\"The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words.\\\"}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 374,\n \"completion_tokens\": 32,\n \"total_tokens\": 406,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n\
\ \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_cbf1785567\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
- REDACTED-RAY
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 03:05:59 GMT
- Wed, 05 Nov 2025 22:19:59 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
- nosniff
access-control-expose-headers:
- ACCESS-CONTROL-XXX
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '913'
openai-organization:
- OPENAI-ORG-XXX
- user-hortuttj2f3qtmxyik2zxf4q
openai-processing-ms:
- '488'
- '419'
openai-project:
- OPENAI-PROJECT-XXX
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '507'
- '432'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
- '500'
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
- '30000'
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
- '499'
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
- '29702'
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
- 120ms
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
- 596ms
x-request-id:
- X-REQUEST-ID-XXX
- req_REDACTED
status:
code: 200
message: OK
- request:
body: "{\"messages\":[{\"role\":\"system\",\"content\":\"You are Guardrail Agent.
You are a expert at validating the output of a task. By providing effective
feedback if the output is not valid.\\nYour personal goal is: Validate the output
of the task\\nTo give my best complete final answer to the task respond using
the exact following format:\\n\\nThought: I now can give a great answer\\nFinal
Answer: Your final answer must be the great and the most complete as possible,
it must be outcome described.\\n\\nI MUST use these formats, my job depends
on it!\"},{\"role\":\"user\",\"content\":\"\\nCurrent Task: \\n Ensure
the following task result complies with the given guardrail.\\n\\n Task
result:\\n \\n Lorem Ipsum is simply dummy text of the printing
and typesetting industry. Lorem Ipsum has been the industry's standard dummy
text ever\\n \\n\\n Guardrail:\\n Ensure the result has
less than 500 words\\n\\n Your task:\\n - Confirm if the Task
result complies with the guardrail.\\n - If not, provide clear feedback
explaining what is wrong (e.g., by how much it violates the rule, or what specific
part fails).\\n - Focus only on identifying issues \u2014 do not propose
corrections.\\n - If the Task result complies with the guardrail, saying
that is valid\\n \\n\\nBegin! This is VERY important to you, use the
tools available and give your best Final Answer, your job depends on it!\\n\\nThought:\"}],\"model\":\"gpt-4o\"}"
body: '{"messages":[{"role":"system","content":"You are Guardrail Agent. You are a expert at validating the output of a task. By providing effective feedback if the output is not valid.\nYour personal goal is: Validate the output of the task\n\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\":
[\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\": false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"\n Ensure the following task result complies with the given guardrail.\n\n Task result:\n \n Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry''s standard dummy text ever\n \n\n Guardrail:\n Ensure
the result has less than 500 words\n\n Your task:\n - Confirm if the Task result complies with the guardrail.\n - If not, provide clear feedback explaining what is wrong (e.g., by how much it violates the rule, or what specific part fails).\n - Focus only on identifying issues — do not propose corrections.\n - If the Task result complies with the guardrail, saying that is valid\n "}],"model":"gpt-4o"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '1468'
- '2453'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
- MacOS
x-stainless-package-version:
- 1.83.0
- 1.109.1
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7yKa0rmi2YoTLpyXt9hjeLt2rTI\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446360,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"First, I'll count the number of words
in the Task result to ensure it complies with the guardrail. \\n\\nThe Task
result is: \\\"Lorem Ipsum is simply dummy text of the printing and typesetting
industry. Lorem Ipsum has been the industry's standard dummy text ever.\\\"\\n\\nBy
counting the words: \\n1. Lorem\\n2. Ipsum\\n3. is\\n4. simply\\n5. dummy\\n6.
text\\n7. of\\n8. the\\n9. printing\\n10. and\\n11. typesetting\\n12. industry\\n13.
Lorem\\n14. Ipsum\\n15. has\\n16. been\\n17. the\\n18. industry's\\n19. standard\\n20.
dummy\\n21. text\\n22. ever\\n\\nThere are 22 words total in the Task result.\\n\\nI
need to verify if the count of 22 words is less than the guardrail limit of
500 words.\\n\\nThought: I now can give a great answer\\nFinal Answer: The
Task result complies with the guardrail as it contains 22 words, which is
less than the 500-word limit. Therefore, the output is valid.\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
285,\n \"completion_tokens\": 227,\n \"total_tokens\": 512,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_deacdd5f6f\"\n}\n"
string: "{\n \"id\": \"chatcmpl-CYgBMV6fu7EvV2BqzMdJaKyLAg1WW\",\n \"object\": \"chat.completion\",\n \"created\": 1762381336,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"Thought: I now can give a great answer\\nFinal Answer: {\\\"valid\\\": true, \\\"feedback\\\": null}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 489,\n \"completion_tokens\": 23,\n \"total_tokens\": 512,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\"\
: \"fp_cbf1785567\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
- REDACTED-RAY
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 03:06:02 GMT
- Wed, 05 Nov 2025 22:22:16 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
- __cf_bm=REDACTED; path=/; expires=Wed, 05-Nov-25 22:52:16 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- _cfuvid=REDACTED; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Strict-Transport-Security:
- STS-XXX
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
- nosniff
access-control-expose-headers:
- ACCESS-CONTROL-XXX
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1668'
openai-organization:
- OPENAI-ORG-XXX
- user-hortuttj2f3qtmxyik2zxf4q
openai-processing-ms:
- '2502'
- '327'
openai-project:
- OPENAI-PROJECT-XXX
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '2522'
- '372'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
- '500'
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
- '30000'
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
- '499'
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
- '29438'
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
- 120ms
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
- 1.124s
x-request-id:
- X-REQUEST-ID-XXX
- req_REDACTED
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly
adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\":
{\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\":
{\n \"properties\": {\n \"valid\": {\n \"description\":
\"Whether the task output complies with the guardrail\",\n \"title\":
\"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\":
{\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\":
\"null\"\n }\n ],\n \"default\": null,\n \"description\":
\"A feedback about the task output if it is not valid\",\n \"title\":
\"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\":
\"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output.
Ensure the final output does not include any code block markers like ```json
or ```python."},{"role":"user","content":"The Task result complies with the
guardrail as it contains 22 words, which is less than the 500-word limit. Therefore,
the output is valid."}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether
the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A
feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"{\"valid\": true, \"feedback\": null}"}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '1864'
- '1762'
content-type:
- application/json
cookie:
- COOKIE-XXX
- __cf_bm=REDACTED; _cfuvid=REDACTED
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
- arm64
x-stainless-async:
- 'false'
x-stainless-helper-method:
- beta.chat.completions.parse
- chat.completions.parse
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
- MacOS
x-stainless-package-version:
- 1.83.0
- 1.109.1
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7yMAjNYSCz2foZPEcSVCuapzF8y\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446362,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"{\\\"valid\\\":true,\\\"feedback\\\":null}\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
369,\n \"completion_tokens\": 9,\n \"total_tokens\": 378,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_a0e9480a2f\"\n}\n"
string: "{\n \"id\": \"chatcmpl-CYgBMU20R45qGGaLN6vNAmW1NR4R6\",\n \"object\": \"chat.completion\",\n \"created\": 1762381336,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"{\\\"valid\\\":true,\\\"feedback\\\":null}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 347,\n \"completion_tokens\": 9,\n \"total_tokens\": 356,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_cbf1785567\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
- REDACTED-RAY
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 03:06:03 GMT
- Wed, 05 Nov 2025 22:22:17 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
- nosniff
access-control-expose-headers:
- ACCESS-CONTROL-XXX
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '837'
openai-organization:
- OPENAI-ORG-XXX
- user-hortuttj2f3qtmxyik2zxf4q
openai-processing-ms:
- '413'
- '1081'
openai-project:
- OPENAI-PROJECT-XXX
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '650'
- '1241'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
- '500'
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
- '30000'
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
- '499'
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
- '29478'
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
- 120ms
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
- 1.042s
x-request-id:
- X-REQUEST-ID-XXX
- req_REDACTED
status:
code: 200
message: OK

View File

@@ -728,39 +728,3 @@ def test_google_streaming_returns_usage_metrics():
assert result.token_usage.prompt_tokens > 0
assert result.token_usage.completion_tokens > 0
assert result.token_usage.successful_requests >= 1
@pytest.mark.vcr()
def test_google_express_mode_works() -> None:
"""
Test Google Vertex AI Express mode with API key authentication.
This tests Vertex AI Express mode (aiplatform.googleapis.com) with API key
authentication.
"""
with patch.dict(os.environ, {"GOOGLE_GENAI_USE_VERTEXAI": "true"}):
agent = Agent(
role="Research Assistant",
goal="Find information about the capital of Japan",
backstory="You are a helpful research assistant.",
llm=LLM(
model="gemini/gemini-2.0-flash-exp",
),
verbose=True,
)
task = Task(
description="What is the capital of Japan?",
expected_output="The capital of Japan",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result.token_usage is not None
assert result.token_usage.total_tokens > 0
assert result.token_usage.prompt_tokens > 0
assert result.token_usage.completion_tokens > 0
assert result.token_usage.successful_requests >= 1

View File

@@ -1202,8 +1202,7 @@ def test_complex_and_or_branching():
)
assert execution_order.index("branch_2b") > min_branch_1_index
# Final should be after both 2a and 2b
# Final should be last and after both 2a and 2b
assert execution_order[-1] == "final"
assert execution_order.index("final") > execution_order.index("branch_2a")
assert execution_order.index("final") > execution_order.index("branch_2b")
@@ -1256,11 +1255,10 @@ def test_conditional_router_paths_exclusivity():
def test_state_consistency_across_parallel_branches():
"""Test that state remains consistent when branches execute in parallel.
"""Test that state remains consistent when branches execute sequentially.
Note: Branches triggered by the same parent execute in parallel for efficiency.
Thread-safe state access via StateProxy ensures no race conditions.
We check the execution order to ensure the branches execute in parallel.
Note: Branches triggered by the same parent execute sequentially, not in parallel.
This ensures predictable state mutations and prevents race conditions.
"""
execution_order = []
@@ -1297,14 +1295,12 @@ def test_state_consistency_across_parallel_branches():
flow = StateConsistencyFlow()
flow.kickoff()
assert "branch_a" in execution_order
assert "branch_b" in execution_order
assert "verify_state" in execution_order
# Branches execute sequentially, so branch_a runs first, then branch_b
assert flow.state["branch_a_value"] == 10 # Sees initial value
assert flow.state["branch_b_value"] == 11 # Sees value after branch_a increment
assert flow.state["branch_a_value"] is not None
assert flow.state["branch_b_value"] is not None
assert flow.state["counter"] == 16
# Final counter should reflect both increments sequentially
assert flow.state["counter"] == 16 # 10 + 1 + 5
def test_deeply_nested_conditions():

View File

@@ -247,4 +247,4 @@ def test_persistence_with_base_model(tmp_path):
assert message.role == "user"
assert message.type == "text"
assert message.content == "Hello, World!"
assert isinstance(flow.state._unwrap(), State)
assert isinstance(flow.state, State)

View File

@@ -737,33 +737,6 @@ def test_native_provider_falls_back_to_litellm_when_not_in_supported_list():
assert llm.model == "groq/llama-3.1-70b-versatile"
def test_litellm_fallback_raises_helpful_error_when_litellm_not_available():
"""Test that when LiteLLM is not available, a helpful error message is raised."""
with patch("crewai.llm.LITELLM_AVAILABLE", False):
with patch("crewai.llm.SUPPORTED_NATIVE_PROVIDERS", ["openai", "anthropic"]):
with pytest.raises(ImportError) as excinfo:
LLM(model="groq/llama-3.1-70b-versatile", is_litellm=False)
error_message = str(excinfo.value)
assert "groq/llama-3.1-70b-versatile" in error_message
assert "LiteLLM" in error_message
assert "pip install" in error_message
assert "crewai[litellm]" in error_message
def test_litellm_fallback_error_includes_model_name():
"""Test that the LiteLLM fallback error includes the model name for clarity."""
with patch("crewai.llm.LITELLM_AVAILABLE", False):
with patch("crewai.llm.SUPPORTED_NATIVE_PROVIDERS", ["openai", "anthropic"]):
test_model = "together/meta-llama/Llama-3-70b"
with pytest.raises(ImportError) as excinfo:
LLM(model=test_model, is_litellm=False)
error_message = str(excinfo.value)
assert test_model in error_message
assert "requires LiteLLM for inference" in error_message
def test_prefixed_models_with_valid_constants_use_native_sdk():
"""Test that models with native provider prefixes use native SDK when model is in constants."""
# Test openai/ prefix with actual OpenAI model in constants → Native SDK
@@ -1016,4 +989,4 @@ async def test_usage_info_streaming_with_acall():
assert llm._token_usage["completion_tokens"] > 0
assert llm._token_usage["total_tokens"] > 0
assert len(result) > 0
assert len(result) > 0

View File

@@ -185,8 +185,8 @@ def test_task_guardrail_process_output(task_output):
result = guardrail(task_output)
assert result[0] is False
# Check that feedback is provided (wording varies by LLM)
assert result[1] == "The task output exceeds the word limit of 10 words by containing 22 words."
assert result[1] == "The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words."
guardrail = LLMGuardrail(
description="Ensure the result has less than 500 words", llm=LLM(model="gpt-4o")

View File

@@ -348,11 +348,11 @@ def test_agent_emits_execution_error_event(base_agent, base_task):
error_message = "Error happening while sending prompt to model."
base_agent.max_retry_limit = 0
# Patch at the class level since agent_executor is created lazily
with patch.object(
CrewAgentExecutor, "invoke", side_effect=Exception(error_message)
):
CrewAgentExecutor, "invoke", wraps=base_agent.agent_executor.invoke
) as invoke_mock:
invoke_mock.side_effect = Exception(error_message)
with pytest.raises(Exception): # noqa: B017
base_agent.execute_task(
task=base_task,

8507
uv.lock generated

File diff suppressed because it is too large Load Diff