Compare commits

..

1 Commits

Author SHA1 Message Date
Greyson LaLonde
4b2d5633c1 chore: add commitizen to pre-commit hooks 2025-07-09 09:35:02 -04:00
109 changed files with 638 additions and 8314 deletions

3
.gitignore vendored
View File

@@ -26,5 +26,4 @@ test_flow.html
crewairules.mdc
plan.md
conceptual_plan.md
build_image
chromadb-*.lock
build_image

View File

@@ -5,3 +5,7 @@ repos:
- id: ruff
args: ["--fix"]
- id: ruff-format
- repo: https://github.com/commitizen-tools/commitizen
rev: v3.13.0
hooks:
- id: commitizen

View File

@@ -9,7 +9,12 @@
},
"favicon": "/images/favicon.svg",
"contextual": {
"options": ["copy", "view", "chatgpt", "claude"]
"options": [
"copy",
"view",
"chatgpt",
"claude"
]
},
"navigation": {
"languages": [
@@ -32,6 +37,11 @@
"href": "https://chatgpt.com/g/g-qqTuUWsBY-crewai-assistant",
"icon": "robot"
},
{
"anchor": "Get Help",
"href": "mailto:support@crewai.com",
"icon": "headset"
},
{
"anchor": "Releases",
"href": "https://github.com/crewAIInc/crewAI/releases",
@@ -45,22 +55,32 @@
"groups": [
{
"group": "Get Started",
"pages": ["en/introduction", "en/installation", "en/quickstart"]
"pages": [
"en/introduction",
"en/installation",
"en/quickstart"
]
},
{
"group": "Guides",
"pages": [
{
"group": "Strategy",
"pages": ["en/guides/concepts/evaluating-use-cases"]
"pages": [
"en/guides/concepts/evaluating-use-cases"
]
},
{
"group": "Agents",
"pages": ["en/guides/agents/crafting-effective-agents"]
"pages": [
"en/guides/agents/crafting-effective-agents"
]
},
{
"group": "Crews",
"pages": ["en/guides/crews/first-crew"]
"pages": [
"en/guides/crews/first-crew"
]
},
{
"group": "Flows",
@@ -74,6 +94,7 @@
"pages": [
"en/guides/advanced/customizing-prompts",
"en/guides/advanced/fingerprinting"
]
}
]
@@ -161,9 +182,7 @@
"en/tools/search-research/websitesearchtool",
"en/tools/search-research/codedocssearchtool",
"en/tools/search-research/youtubechannelsearchtool",
"en/tools/search-research/youtubevideosearchtool",
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool"
"en/tools/search-research/youtubevideosearchtool"
]
},
{
@@ -222,7 +241,6 @@
"en/observability/langtrace",
"en/observability/maxim",
"en/observability/mlflow",
"en/observability/neatlogs",
"en/observability/openlit",
"en/observability/opik",
"en/observability/patronus-evaluation",
@@ -256,7 +274,9 @@
},
{
"group": "Telemetry",
"pages": ["en/telemetry"]
"pages": [
"en/telemetry"
]
}
]
},
@@ -265,7 +285,9 @@
"groups": [
{
"group": "Getting Started",
"pages": ["en/enterprise/introduction"]
"pages": [
"en/enterprise/introduction"
]
},
{
"group": "Features",
@@ -320,7 +342,9 @@
},
{
"group": "Resources",
"pages": ["en/enterprise/resources/frequently-asked-questions"]
"pages": [
"en/enterprise/resources/frequently-asked-questions"
]
}
]
},
@@ -329,7 +353,9 @@
"groups": [
{
"group": "Getting Started",
"pages": ["en/api-reference/introduction"]
"pages": [
"en/api-reference/introduction"
]
},
{
"group": "Endpoints",
@@ -339,13 +365,16 @@
},
{
"tab": "Examples",
"groups": [
"groups": [
{
"group": "Examples",
"pages": ["en/examples/example"]
"pages": [
"en/examples/example"
]
}
]
}
]
},
{
@@ -367,6 +396,11 @@
"href": "https://chatgpt.com/g/g-qqTuUWsBY-crewai-assistant",
"icon": "robot"
},
{
"anchor": "Obter Ajuda",
"href": "mailto:support@crewai.com",
"icon": "headset"
},
{
"anchor": "Lançamentos",
"href": "https://github.com/crewAIInc/crewAI/releases",
@@ -391,15 +425,21 @@
"pages": [
{
"group": "Estratégia",
"pages": ["pt-BR/guides/concepts/evaluating-use-cases"]
"pages": [
"pt-BR/guides/concepts/evaluating-use-cases"
]
},
{
"group": "Agentes",
"pages": ["pt-BR/guides/agents/crafting-effective-agents"]
"pages": [
"pt-BR/guides/agents/crafting-effective-agents"
]
},
{
"group": "Crews",
"pages": ["pt-BR/guides/crews/first-crew"]
"pages": [
"pt-BR/guides/crews/first-crew"
]
},
{
"group": "Flows",
@@ -592,7 +632,9 @@
},
{
"group": "Telemetria",
"pages": ["pt-BR/telemetry"]
"pages": [
"pt-BR/telemetry"
]
}
]
},
@@ -601,7 +643,9 @@
"groups": [
{
"group": "Começando",
"pages": ["pt-BR/enterprise/introduction"]
"pages": [
"pt-BR/enterprise/introduction"
]
},
{
"group": "Funcionalidades",
@@ -666,7 +710,9 @@
"groups": [
{
"group": "Começando",
"pages": ["pt-BR/api-reference/introduction"]
"pages": [
"pt-BR/api-reference/introduction"
]
},
{
"group": "Endpoints",
@@ -676,13 +722,16 @@
},
{
"tab": "Exemplos",
"groups": [
"groups": [
{
"group": "Exemplos",
"pages": ["pt-BR/examples/example"]
"pages": [
"pt-BR/examples/example"
]
}
]
}
]
}
]

View File

@@ -32,7 +32,6 @@ A crew in crewAI represents a collaborative group of agents working together to
| **Prompt File** _(optional)_ | `prompt_file` | Path to the prompt JSON file to be used for the crew. |
| **Planning** *(optional)* | `planning` | Adds planning ability to the Crew. When activated before each Crew iteration, all Crew data is sent to an AgentPlanner that will plan the tasks and this plan will be added to each task description. |
| **Planning LLM** *(optional)* | `planning_llm` | The language model used by the AgentPlanner in a planning process. |
| **Knowledge Sources** _(optional)_ | `knowledge_sources` | Knowledge sources available at the crew level, accessible to all the agents. |
<Tip>
**Crew Max RPM**: The `max_rpm` attribute sets the maximum number of requests per minute the crew can perform to avoid rate limits and will override individual agents' `max_rpm` settings if you set it.

View File

@@ -270,7 +270,7 @@ In this section, you'll find detailed examples that help you select, configure,
from crewai import LLM
llm = LLM(
model="gemini-1.5-pro-latest", # or vertex_ai/gemini-1.5-pro-latest
model="gemini/gemini-1.5-pro-latest",
temperature=0.7,
vertex_credentials=vertex_credentials_json
)

View File

@@ -623,7 +623,7 @@ for provider in providers_to_test:
**Model not found errors:**
```python
# Verify model availability
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
try:
@@ -712,7 +712,7 @@ crew = Crew(
memory_config={
"provider": "mem0",
"config": {"user_id": "john"},
"user_memory": {} # DEPRECATED: Will be removed in version 0.156.0 or on 2025-08-04, use external_memory instead
"user_memory": {} # Required - triggers user memory initialization
},
process=Process.sequential,
verbose=True
@@ -720,16 +720,7 @@ crew = Crew(
```
### Advanced Mem0 Configuration
When using Mem0 Client, you can customize the memory configuration further, by using parameters like 'includes', 'excludes', 'custom_categories', 'infer' and 'run_id' (this is only for short-term memory).
You can find more details in the [Mem0 documentation](https://docs.mem0.ai/).
```python
new_categories = [
{"lifestyle_management_concerns": "Tracks daily routines, habits, hobbies and interests including cooking, time management and work-life balance"},
{"seeking_structure": "Documents goals around creating routines, schedules, and organized systems in various life areas"},
{"personal_information": "Basic information about the user including name, preferences, and personality traits"}
]
crew = Crew(
agents=[...],
tasks=[...],
@@ -741,11 +732,6 @@ crew = Crew(
"org_id": "my_org_id", # Optional
"project_id": "my_project_id", # Optional
"api_key": "custom-api-key" # Optional - overrides env var
"run_id": "my_run_id", # Optional - for short-term memory
"includes": "include1", # Optional
"excludes": "exclude1", # Optional
"infer": True # Optional defaults to True
"custom_categories": new_categories # Optional - custom categories for user memory
},
"user_memory": {}
}
@@ -775,8 +761,7 @@ crew = Crew(
"provider": "openai",
"config": {"api_key": "your-api-key", "model": "text-embedding-3-small"}
}
},
"infer": True # Optional defaults to True
}
},
"user_memory": {}
}

View File

@@ -54,11 +54,9 @@ crew = Crew(
| **Markdown** _(optional)_ | `markdown` | `Optional[bool]` | Whether the task should instruct the agent to return the final answer formatted in Markdown. Defaults to False. |
| **Config** _(optional)_ | `config` | `Optional[Dict[str, Any]]` | Task-specific configuration parameters. |
| **Output File** _(optional)_ | `output_file` | `Optional[str]` | File path for storing the task output. |
| **Create Directory** _(optional)_ | `create_directory` | `Optional[bool]` | Whether to create the directory for output_file if it doesn't exist. Defaults to True. |
| **Output JSON** _(optional)_ | `output_json` | `Optional[Type[BaseModel]]` | A Pydantic model to structure the JSON output. |
| **Output Pydantic** _(optional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | A Pydantic model for task output. |
| **Callback** _(optional)_ | `callback` | `Optional[Any]` | Function/object to be executed after task completion. |
| **Guardrail** _(optional)_ | `guardrail` | `Optional[Callable]` | Function to validate task output before proceeding to next task. |
## Creating Tasks
@@ -334,11 +332,9 @@ Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
feedback to agents when their output doesn't meet specific criteria.
Guardrails are implemented as Python functions that contain custom validation logic, giving you complete control over the validation process and ensuring reliable, deterministic results.
### Using Task Guardrails
### Function-Based Guardrails
To add a function-based guardrail to a task, provide a validation function through the `guardrail` parameter:
To add a guardrail to a task, provide a validation function through the `guardrail` parameter:
```python Code
from typing import Tuple, Union, Dict, Any
@@ -376,7 +372,9 @@ blog_task = Task(
- On success: it returns a tuple of `(bool, Any)`. For example: `(True, validated_result)`
- On Failure: it returns a tuple of `(bool, str)`. For example: `(False, "Error message explain the failure")`
### LLMGuardrail
The `LLMGuardrail` class offers a robust mechanism for validating task outputs.
### Error Handling Best Practices
@@ -800,91 +798,184 @@ While creating and executing tasks, certain validation mechanisms are in place t
These validations help in maintaining the consistency and reliability of task executions within the crewAI framework.
## Task Guardrails
Task guardrails provide a powerful way to validate, transform, or filter task outputs before they are passed to the next task. Guardrails are optional functions that execute before the next task starts, allowing you to ensure that task outputs meet specific requirements or formats.
### Basic Usage
#### Define your own logic to validate
```python Code
from typing import Tuple, Union
from crewai import Task
def validate_json_output(result: str) -> Tuple[bool, Union[dict, str]]:
"""Validate that the output is valid JSON."""
try:
json_data = json.loads(result)
return (True, json_data)
except json.JSONDecodeError:
return (False, "Output must be valid JSON")
task = Task(
description="Generate JSON data",
expected_output="Valid JSON object",
guardrail=validate_json_output
)
```
#### Leverage a no-code approach for validation
```python Code
from crewai import Task
task = Task(
description="Generate JSON data",
expected_output="Valid JSON object",
guardrail="Ensure the response is a valid JSON object"
)
```
#### Using YAML
```yaml
research_task:
...
guardrail: make sure each bullet contains a minimum of 100 words
...
```
```python Code
@CrewBase
class InternalCrew:
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
...
@task
def research_task(self):
return Task(config=self.tasks_config["research_task"]) # type: ignore[index]
...
```
#### Use custom models for code generation
```python Code
from crewai import Task
from crewai.llm import LLM
task = Task(
description="Generate JSON data",
expected_output="Valid JSON object",
guardrail=LLMGuardrail(
description="Ensure the response is a valid JSON object",
llm=LLM(model="gpt-4o-mini"),
)
)
```
### How Guardrails Work
1. **Optional Attribute**: Guardrails are an optional attribute at the task level, allowing you to add validation only where needed.
2. **Execution Timing**: The guardrail function is executed before the next task starts, ensuring valid data flow between tasks.
3. **Return Format**: Guardrails must return a tuple of `(success, data)`:
- If `success` is `True`, `data` is the validated/transformed result
- If `success` is `False`, `data` is the error message
4. **Result Routing**:
- On success (`True`), the result is automatically passed to the next task
- On failure (`False`), the error is sent back to the agent to generate a new answer
### Common Use Cases
#### Data Format Validation
```python Code
def validate_email_format(result: str) -> Tuple[bool, Union[str, str]]:
"""Ensure the output contains a valid email address."""
import re
email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
if re.match(email_pattern, result.strip()):
return (True, result.strip())
return (False, "Output must be a valid email address")
```
#### Content Filtering
```python Code
def filter_sensitive_info(result: str) -> Tuple[bool, Union[str, str]]:
"""Remove or validate sensitive information."""
sensitive_patterns = ['SSN:', 'password:', 'secret:']
for pattern in sensitive_patterns:
if pattern.lower() in result.lower():
return (False, f"Output contains sensitive information ({pattern})")
return (True, result)
```
#### Data Transformation
```python Code
def normalize_phone_number(result: str) -> Tuple[bool, Union[str, str]]:
"""Ensure phone numbers are in a consistent format."""
import re
digits = re.sub(r'\D', '', result)
if len(digits) == 10:
formatted = f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
return (True, formatted)
return (False, "Output must be a 10-digit phone number")
```
### Advanced Features
#### Chaining Multiple Validations
```python Code
def chain_validations(*validators):
"""Chain multiple validators together."""
def combined_validator(result):
for validator in validators:
success, data = validator(result)
if not success:
return (False, data)
result = data
return (True, result)
return combined_validator
# Usage
task = Task(
description="Get user contact info",
expected_output="Email and phone",
guardrail=chain_validations(
validate_email_format,
filter_sensitive_info
)
)
```
#### Custom Retry Logic
```python Code
task = Task(
description="Generate data",
expected_output="Valid data",
guardrail=validate_data,
max_retries=5 # Override default retry limit
)
```
## Creating Directories when Saving Files
The `create_directory` parameter controls whether CrewAI should automatically create directories when saving task outputs to files. This feature is particularly useful for organizing outputs and ensuring that file paths are correctly structured, especially when working with complex project hierarchies.
### Default Behavior
By default, `create_directory=True`, which means CrewAI will automatically create any missing directories in the output file path:
You can now specify if a task should create directories when saving its output to a file. This is particularly useful for organizing outputs and ensuring that file paths are correctly structured.
```python Code
# Default behavior - directories are created automatically
report_task = Task(
description='Generate a comprehensive market analysis report',
expected_output='A detailed market analysis with charts and insights',
agent=analyst_agent,
output_file='reports/2025/market_analysis.md', # Creates 'reports/2025/' if it doesn't exist
markdown=True
# ...
save_output_task = Task(
description='Save the summarized AI news to a file',
expected_output='File saved successfully',
agent=research_agent,
tools=[file_save_tool],
output_file='outputs/ai_news_summary.txt',
create_directory=True
)
```
### Disabling Directory Creation
If you want to prevent automatic directory creation and ensure that the directory already exists, set `create_directory=False`:
```python Code
# Strict mode - directory must already exist
strict_output_task = Task(
description='Save critical data that requires existing infrastructure',
expected_output='Data saved to pre-configured location',
agent=data_agent,
output_file='secure/vault/critical_data.json',
create_directory=False # Will raise RuntimeError if 'secure/vault/' doesn't exist
)
```
### YAML Configuration
You can also configure this behavior in your YAML task definitions:
```yaml tasks.yaml
analysis_task:
description: >
Generate quarterly financial analysis
expected_output: >
A comprehensive financial report with quarterly insights
agent: financial_analyst
output_file: reports/quarterly/q4_2024_analysis.pdf
create_directory: true # Automatically create 'reports/quarterly/' directory
audit_task:
description: >
Perform compliance audit and save to existing audit directory
expected_output: >
A compliance audit report
agent: auditor
output_file: audit/compliance_report.md
create_directory: false # Directory must already exist
```
### Use Cases
**Automatic Directory Creation (`create_directory=True`):**
- Development and prototyping environments
- Dynamic report generation with date-based folders
- Automated workflows where directory structure may vary
- Multi-tenant applications with user-specific folders
**Manual Directory Management (`create_directory=False`):**
- Production environments with strict file system controls
- Security-sensitive applications where directories must be pre-configured
- Systems with specific permission requirements
- Compliance environments where directory creation is audited
### Error Handling
When `create_directory=False` and the directory doesn't exist, CrewAI will raise a `RuntimeError`:
```python Code
try:
result = crew.kickoff()
except RuntimeError as e:
# Handle missing directory error
print(f"Directory creation failed: {e}")
# Create directory manually or use fallback location
#...
```
Check out the video below to see how to use structured outputs in CrewAI:

View File

@@ -1,356 +0,0 @@
---
title: LangDB Integration
description: How to use LangDB AI Gateway with CrewAI
icon: database
---
<img src="https://raw.githubusercontent.com/LangDB/assets/main/langdb-crewai-header.png" alt="LangDB CrewAI Header Image" width="70%" />
## Introduction
LangDB is the fastest enterprise AI gateway that enhances CrewAI with production-ready observability and optimization features. It provides:
- **Complete end-to-end tracing** of every agent interaction and LLM call
- **Real-time cost monitoring** and optimization across 250+ LLMs
- **Performance analytics** with detailed metrics and insights
- **Secure governance** for enterprise AI deployments
- **OpenAI-compatible APIs** for seamless integration
- **Fine-grained control** over agent workflows and resource usage
### Installation & Setup
<Steps>
<Step title="Install the required packages">
```bash
pip install -U crewai langdb
```
</Step>
<Step title="Set up environment variables" icon="lock">
Configure your LangDB credentials from the [LangDB dashboard](https://app.langdb.ai/):
```bash
export LANGDB_API_KEY="your_langdb_api_key"
export LANGDB_PROJECT_ID="your_project_id"
```
</Step>
<Step title="Initialize LangDB with CrewAI">
The integration requires a single initialization call before creating your agents:
```python
from langdb import LangDB
from crewai import Agent, Task, Crew, LLM
# Initialize LangDB tracing
LangDB.init()
# Create LLM instance - LangDB automatically traces all calls
llm = LLM(
model="gpt-4o",
temperature=0.7
)
# Create your agents as usual
@agent
def research_agent(self) -> Agent:
return Agent(
role="Senior Research Analyst",
goal="Conduct comprehensive research on assigned topics",
backstory="You are an expert researcher with deep analytical skills.",
llm=llm,
verbose=True
)
```
</Step>
</Steps>
## Key Features
### 1. Comprehensive Observability
LangDB provides complete visibility into your CrewAI agent workflows with minimal setup overhead.
<Tabs>
<Tab title="Request Tracing">
LangDB automatically captures every LLM interaction in your crew execution:
```python
from langdb import LangDB
from crewai import Agent, Task, Crew, LLM
# Initialize with custom trace metadata
LangDB.init(
metadata={
"environment": "production",
"crew_type": "research_workflow",
"user_id": "user_123"
}
)
# All agent interactions are automatically traced
crew = Crew(
agents=[research_agent, writer_agent],
tasks=[research_task, writing_task],
verbose=True
)
# Execute with full tracing
result = crew.kickoff(inputs={"topic": "AI trends 2025"})
```
View detailed traces in the LangDB dashboard showing:
- Complete agent conversation flows
- Tool usage and function calls
- Task execution timelines
- LLM request/response pairs
</Tab>
<Tab title="Performance Metrics">
LangDB tracks comprehensive performance metrics for your crews:
- **Execution Time**: Total and per-task execution duration
- **Token Usage**: Input/output tokens for cost optimization
- **Success Rates**: Task completion and failure analytics
- **Latency Analysis**: Response times and bottleneck identification
```python
# Access metrics programmatically
from langdb import LangDB
# Get crew execution metrics
metrics = LangDB.get_metrics(
project_id="your_project_id",
filters={
"crew_type": "research_workflow",
"time_range": "last_24h"
}
)
print(f"Average execution time: {metrics.avg_execution_time}")
print(f"Total cost: ${metrics.total_cost}")
print(f"Success rate: {metrics.success_rate}%")
```
</Tab>
<Tab title="Cost Monitoring">
Track and optimize AI spending across your CrewAI deployments:
```python
from langdb import LangDB
# Initialize with cost tracking
LangDB.init(
cost_tracking=True,
budget_alerts={
"daily_limit": 100.0, # $100 daily limit
"alert_threshold": 0.8 # Alert at 80% of limit
}
)
# LangDB automatically tracks costs for all LLM calls
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
# View cost breakdown
cost_report = LangDB.get_cost_report(
breakdown_by=["model", "agent", "task"]
)
```
Features include:
- Real-time cost tracking across all models
- Budget alerts and spending limits
- Cost optimization recommendations
- Detailed cost attribution by agent and task
</Tab>
</Tabs>
### 2. Advanced Analytics & Insights
LangDB provides powerful analytics to optimize your CrewAI workflows.
<Tabs>
<Tab title="Agent Performance Analysis">
Analyze individual agent performance and identify optimization opportunities:
```python
from langdb import LangDB
# Get agent-specific analytics
analytics = LangDB.get_agent_analytics(
agent_role="Senior Research Analyst",
time_range="last_week"
)
print(f"Average task completion time: {analytics.avg_completion_time}")
print(f"Most used tools: {analytics.top_tools}")
print(f"Success rate: {analytics.success_rate}%")
print(f"Cost per task: ${analytics.cost_per_task}")
```
</Tab>
<Tab title="Workflow Optimization">
Identify bottlenecks and optimization opportunities in your crew workflows:
```python
# Analyze crew workflow patterns
workflow_analysis = LangDB.analyze_workflow(
crew_id="research_crew_v1",
optimization_focus=["speed", "cost", "quality"]
)
# Get optimization recommendations
recommendations = workflow_analysis.recommendations
for rec in recommendations:
print(f"Optimization: {rec.type}")
print(f"Potential savings: {rec.estimated_savings}")
print(f"Implementation: {rec.implementation_guide}")
```
</Tab>
</Tabs>
### 3. Production-Ready Features
<CardGroup cols="2">
<Card title="Error Monitoring" icon="exclamation-triangle" href="https://docs.langdb.ai/features/error-monitoring">
Automatic detection and alerting for agent failures, LLM errors, and workflow issues.
</Card>
<Card title="Rate Limiting" icon="gauge" href="https://docs.langdb.ai/features/rate-limiting">
Intelligent rate limiting to prevent API quota exhaustion and optimize throughput.
</Card>
<Card title="Caching" icon="bolt" href="https://docs.langdb.ai/features/caching">
Smart caching of LLM responses to reduce costs and improve response times.
</Card>
<Card title="Load Balancing" icon="scale-balanced" href="https://docs.langdb.ai/features/load-balancing">
Distribute requests across multiple LLM providers for reliability and performance.
</Card>
</CardGroup>
### 4. Enterprise Security & Governance
LangDB provides enterprise-grade security features for production CrewAI deployments:
```python
from langdb import LangDB
# Initialize with security configurations
LangDB.init(
security_config={
"pii_detection": True,
"content_filtering": True,
"audit_logging": True,
"data_retention_days": 90
}
)
# All crew interactions are automatically secured
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
```
Security features include:
- **PII Detection**: Automatic detection and redaction of sensitive information
- **Content Filtering**: Block inappropriate or harmful content
- **Audit Logging**: Complete audit trails for compliance
- **Data Governance**: Configurable data retention and privacy controls
## Advanced Configuration
### Custom Metadata and Filtering
Add custom metadata to enable powerful filtering and analytics:
```python
from langdb import LangDB
from crewai import Agent, Crew, Task
# Initialize with rich metadata
LangDB.init(
metadata={
"environment": "production",
"team": "research_team",
"version": "v2.1.0",
"customer_tier": "enterprise"
}
)
# Add task-specific metadata
@task
def research_task(self) -> Task:
return Task(
description="Research the latest AI trends",
expected_output="Comprehensive research report",
agent=research_agent,
metadata={
"task_type": "research",
"priority": "high",
"estimated_duration": "30min"
}
)
```
### Multi-Environment Setup
Configure different LangDB projects for different environments:
```python
import os
from langdb import LangDB
# Environment-specific configuration
environment = os.getenv("ENVIRONMENT", "development")
if environment == "production":
LangDB.init(
project_id="prod_project_id",
sampling_rate=1.0, # Trace all requests
cost_tracking=True
)
elif environment == "staging":
LangDB.init(
project_id="staging_project_id",
sampling_rate=0.5, # Sample 50% of requests
cost_tracking=False
)
else:
LangDB.init(
project_id="dev_project_id",
sampling_rate=0.1, # Sample 10% of requests
cost_tracking=False
)
```
## Best Practices
### Development Phase
- Use detailed tracing to understand agent behavior patterns
- Monitor resource usage during testing and development
- Set up cost alerts to prevent unexpected spending
- Implement comprehensive error handling and monitoring
### Production Phase
- Enable full request tracing for complete observability
- Set up automated alerts for performance degradation
- Implement cost optimization strategies based on analytics
- Use metadata for detailed filtering and analysis
### Continuous Improvement
- Regular performance reviews using LangDB analytics
- A/B testing of different agent configurations
- Cost optimization based on usage patterns
- Workflow optimization using bottleneck analysis
## Getting Started
1. **Sign up** for a LangDB account at [app.langdb.ai](https://app.langdb.ai)
2. **Install** the LangDB package: `pip install langdb`
3. **Initialize** LangDB in your CrewAI application
4. **Deploy** your crews with automatic observability
5. **Monitor** and optimize using the LangDB dashboard
<Card title="LangDB Documentation" icon="book" href="https://docs.langdb.ai">
Explore comprehensive LangDB documentation and advanced features
</Card>
LangDB transforms your CrewAI agents into production-ready, observable, and optimized AI workflows with minimal code changes and maximum insights.

View File

@@ -1,134 +0,0 @@
---
title: Neatlogs Integration
description: Understand, debug, and share your CrewAI agent runs
icon: magnifying-glass-chart
---
# Introduction
Neatlogs helps you **see what your agent did**, **why**, and **share it**.
It captures every step: thoughts, tool calls, responses, evaluations. No raw logs. Just clear, structured traces. Great for debugging and collaboration.
## Why use Neatlogs?
CrewAI agents use multiple tools and reasoning steps. When something goes wrong, you need context — not just errors.
Neatlogs lets you:
- Follow the full decision path
- Add feedback directly on steps
- Chat with the trace using AI assistant
- Share runs publicly for feedback
- Turn insights into tasks
All in one place.
Manage your traces effortlessly
![Traces](/images/neatlogs-1.png)
![Trace Response](/images/neatlogs-2.png)
The best UX to view a CrewAI trace. Post comments anywhere you want. Use AI to debug.
![Trace Details](/images/neatlogs-3.png)
![Ai Chat Bot With A Trace](/images/neatlogs-4.png)
![Comments Drawer](/images/neatlogs-5.png)
## Core Features
- **Trace Viewer**: Track thoughts, tools, and decisions in sequence
- **Inline Comments**: Tag teammates on any trace step
- **Feedback & Evaluation**: Mark outputs as correct or incorrect
- **Error Highlighting**: Automatic flagging of API/tool failures
- **Task Conversion**: Convert comments into assigned tasks
- **Ask the Trace (AI)**: Chat with your trace using Neatlogs AI bot
- **Public Sharing**: Publish trace links to your community
## Quick Setup with CrewAI
<Steps>
<Step title="Sign Up & Get API Key">
Visit [neatlogs.com](https://neatlogs.com/?utm_source=crewAI-docs), create a project, copy the API key.
</Step>
<Step title="Install SDK">
```bash
pip install neatlogs
```
(Latest version 0.8.0, Python 3.8+; MIT license)
</Step>
<Step title="Initialize Neatlogs">
Before starting Crew agents, add:
```python
import neatlogs
neatlogs.init("YOUR_PROJECT_API_KEY")
```
Agents run as usual. Neatlogs captures everything automatically.
</Step>
</Steps>
## Under the Hood
According to GitHub, Neatlogs:
- Captures thoughts, tool calls, responses, errors, and token stats
- Supports AI-powered task generation and robust evaluation workflows
All with just two lines of code.
## Watch It Work
### 🔍 Full Demo (4min)
<iframe
width="100%"
height="315"
src="https://www.youtube.com/embed/8KDme9T2I7Q?si=b8oHteaBwFNs_Duk"
title="YouTube video player"
frameBorder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowFullScreen
></iframe>
### ⚙️ CrewAI Integration (30s)
<iframe
className="w-full aspect-video rounded-xl"
src="https://www.loom.com/embed/9c78b552af43452bb3e4783cb8d91230?sid=e9d7d370-a91a-49b0-809e-2f375d9e801d"
title="Loom video player"
frameBorder="0"
allowFullScreen
></iframe>
## Links & Support
- 📘 [Neatlogs Docs](https://docs.neatlogs.com/)
- 🔐 [Dashboard & API Key](https://app.neatlogs.com/)
- 🐦 [Follow on Twitter](https://twitter.com/neatlogs)
- 📧 Contact: hello@neatlogs.com
- 🛠 [GitHub SDK](https://github.com/NeatLogs/neatlogs)
## TL;DR
With just:
```bash
pip install neatlogs
import neatlogs
neatlogs.init("YOUR_API_KEY")
You can now capture, understand, share, and act on your CrewAI agent runs in seconds.
No setup overhead. Full trace transparency. Full team collaboration.
```

View File

@@ -56,10 +56,6 @@ Observability is crucial for understanding how your CrewAI agents perform, ident
<Card title="Weave" icon="network-wired" href="/en/observability/weave">
Weights & Biases platform for tracking and evaluating AI applications.
</Card>
<Card title="LangDB" icon="database" href="/en/observability/langdb">
Enterprise AI gateway with comprehensive tracing, cost optimization, and performance analytics.
</Card>
</CardGroup>
### Evaluation & Quality Assurance

View File

@@ -44,14 +44,6 @@ These tools enable your agents to search the web, research topics, and find info
<Card title="YouTube Video Search" icon="play" href="/en/tools/search-research/youtubevideosearchtool">
Find and analyze YouTube videos by topic, keyword, or criteria.
</Card>
<Card title="Tavily Search Tool" icon="magnifying-glass" href="/en/tools/search-research/tavilysearchtool">
Comprehensive web search using Tavily's AI-powered search API.
</Card>
<Card title="Tavily Extractor Tool" icon="file-text" href="/en/tools/search-research/tavilyextractortool">
Extract structured content from web pages using the Tavily API.
</Card>
</CardGroup>
## **Common Use Cases**
@@ -63,19 +55,17 @@ These tools enable your agents to search the web, research topics, and find info
- **Academic Research**: Find scholarly articles and technical papers
```python
from crewai_tools import SerperDevTool, GitHubSearchTool, YoutubeVideoSearchTool, TavilySearchTool, TavilyExtractorTool
from crewai_tools import SerperDevTool, GitHubSearchTool, YoutubeVideoSearchTool
# Create research tools
web_search = SerperDevTool()
code_search = GitHubSearchTool()
video_research = YoutubeVideoSearchTool()
tavily_search = TavilySearchTool()
content_extractor = TavilyExtractorTool()
# Add to your agent
agent = Agent(
role="Research Analyst",
tools=[web_search, code_search, video_research, tavily_search, content_extractor],
tools=[web_search, code_search, video_research],
goal="Gather comprehensive information on any topic"
)
```

View File

@@ -6,6 +6,10 @@ icon: google
# `SerperDevTool`
<Note>
We are still working on improving tools, so there might be unexpected behavior or changes in the future.
</Note>
## Description
This tool is designed to perform a semantic search for a specified query from a text's content across the internet. It utilizes the [serper.dev](https://serper.dev) API
@@ -13,12 +17,6 @@ to fetch and display the most relevant search results based on the query provide
## Installation
To effectively use the `SerperDevTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for a free account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
To incorporate this tool into your project, follow the installation instructions below:
```shell
@@ -36,6 +34,14 @@ from crewai_tools import SerperDevTool
tool = SerperDevTool()
```
## Steps to Get Started
To effectively use the `SerperDevTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for a free account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
## Parameters
The `SerperDevTool` comes with several parameters that will be passed to the API :

View File

@@ -1,139 +0,0 @@
---
title: "Tavily Extractor Tool"
description: "Extract structured content from web pages using the Tavily API"
icon: "file-text"
---
The `TavilyExtractorTool` allows CrewAI agents to extract structured content from web pages using the Tavily API. It can process single URLs or lists of URLs and provides options for controlling the extraction depth and including images.
## Installation
To use the `TavilyExtractorTool`, you need to install the `tavily-python` library:
```shell
pip install 'crewai[tools]' tavily-python
```
You also need to set your Tavily API key as an environment variable:
```bash
export TAVILY_API_KEY='your-tavily-api-key'
```
## Example Usage
Here's how to initialize and use the `TavilyExtractorTool` within a CrewAI agent:
```python
import os
from crewai import Agent, Task, Crew
from crewai_tools import TavilyExtractorTool
# Ensure TAVILY_API_KEY is set in your environment
# os.environ["TAVILY_API_KEY"] = "YOUR_API_KEY"
# Initialize the tool
tavily_tool = TavilyExtractorTool()
# Create an agent that uses the tool
extractor_agent = Agent(
role='Web Content Extractor',
goal='Extract key information from specified web pages',
backstory='You are an expert at extracting relevant content from websites using the Tavily API.',
tools=[tavily_tool],
verbose=True
)
# Define a task for the agent
extract_task = Task(
description='Extract the main content from the URL https://example.com using basic extraction depth.',
expected_output='A JSON string containing the extracted content from the URL.',
agent=extractor_agent
)
# Create and run the crew
crew = Crew(
agents=[extractor_agent],
tasks=[extract_task],
verbose=2
)
result = crew.kickoff()
print(result)
```
## Configuration Options
The `TavilyExtractorTool` accepts the following arguments:
- `urls` (Union[List[str], str]): **Required**. A single URL string or a list of URL strings to extract data from.
- `include_images` (Optional[bool]): Whether to include images in the extraction results. Defaults to `False`.
- `extract_depth` (Literal["basic", "advanced"]): The depth of extraction. Use `"basic"` for faster, surface-level extraction or `"advanced"` for more comprehensive extraction. Defaults to `"basic"`.
- `timeout` (int): The maximum time in seconds to wait for the extraction request to complete. Defaults to `60`.
## Advanced Usage
### Multiple URLs with Advanced Extraction
```python
# Example with multiple URLs and advanced extraction
multi_extract_task = Task(
description='Extract content from https://example.com and https://anotherexample.org using advanced extraction.',
expected_output='A JSON string containing the extracted content from both URLs.',
agent=extractor_agent
)
# Configure the tool with custom parameters
custom_extractor = TavilyExtractorTool(
extract_depth='advanced',
include_images=True,
timeout=120
)
agent_with_custom_tool = Agent(
role="Advanced Content Extractor",
goal="Extract comprehensive content with images",
tools=[custom_extractor]
)
```
### Tool Parameters
You can customize the tool's behavior by setting parameters during initialization:
```python
# Initialize with custom configuration
extractor_tool = TavilyExtractorTool(
extract_depth='advanced', # More comprehensive extraction
include_images=True, # Include image results
timeout=90 # Custom timeout
)
```
## Features
- **Single or Multiple URLs**: Extract content from one URL or process multiple URLs in a single request
- **Configurable Depth**: Choose between basic (fast) and advanced (comprehensive) extraction modes
- **Image Support**: Optionally include images in the extraction results
- **Structured Output**: Returns well-formatted JSON containing the extracted content
- **Error Handling**: Robust handling of network timeouts and extraction errors
## Response Format
The tool returns a JSON string representing the structured data extracted from the provided URL(s). The exact structure depends on the content of the pages and the `extract_depth` used.
Common response elements include:
- **Title**: The page title
- **Content**: Main text content of the page
- **Images**: Image URLs and metadata (when `include_images=True`)
- **Metadata**: Additional page information like author, description, etc.
## Use Cases
- **Content Analysis**: Extract and analyze content from competitor websites
- **Research**: Gather structured data from multiple sources for analysis
- **Content Migration**: Extract content from existing websites for migration
- **Monitoring**: Regular extraction of content for change detection
- **Data Collection**: Systematic extraction of information from web sources
Refer to the [Tavily API documentation](https://docs.tavily.com/docs/tavily-api/python-sdk#extract) for detailed information about the response structure and available options.

View File

@@ -1,122 +0,0 @@
---
title: "Tavily Search Tool"
description: "Perform comprehensive web searches using the Tavily Search API"
icon: "magnifying-glass"
---
The `TavilySearchTool` provides an interface to the Tavily Search API, enabling CrewAI agents to perform comprehensive web searches. It allows for specifying search depth, topics, time ranges, included/excluded domains, and whether to include direct answers, raw content, or images in the results.
## Installation
To use the `TavilySearchTool`, you need to install the `tavily-python` library:
```shell
pip install 'crewai[tools]' tavily-python
```
## Environment Variables
Ensure your Tavily API key is set as an environment variable:
```bash
export TAVILY_API_KEY='your_tavily_api_key'
```
## Example Usage
Here's how to initialize and use the `TavilySearchTool` within a CrewAI agent:
```python
import os
from crewai import Agent, Task, Crew
from crewai_tools import TavilySearchTool
# Ensure the TAVILY_API_KEY environment variable is set
# os.environ["TAVILY_API_KEY"] = "YOUR_TAVILY_API_KEY"
# Initialize the tool
tavily_tool = TavilySearchTool()
# Create an agent that uses the tool
researcher = Agent(
role='Market Researcher',
goal='Find information about the latest AI trends',
backstory='An expert market researcher specializing in technology.',
tools=[tavily_tool],
verbose=True
)
# Create a task for the agent
research_task = Task(
description='Search for the top 3 AI trends in 2024.',
expected_output='A JSON report summarizing the top 3 AI trends found.',
agent=researcher
)
# Form the crew and kick it off
crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=2
)
result = crew.kickoff()
print(result)
```
## Configuration Options
The `TavilySearchTool` accepts the following arguments during initialization or when calling the `run` method:
- `query` (str): **Required**. The search query string.
- `search_depth` (Literal["basic", "advanced"], optional): The depth of the search. Defaults to `"basic"`.
- `topic` (Literal["general", "news", "finance"], optional): The topic to focus the search on. Defaults to `"general"`.
- `time_range` (Literal["day", "week", "month", "year"], optional): The time range for the search. Defaults to `None`.
- `days` (int, optional): The number of days to search back. Relevant if `time_range` is not set. Defaults to `7`.
- `max_results` (int, optional): The maximum number of search results to return. Defaults to `5`.
- `include_domains` (Sequence[str], optional): A list of domains to prioritize in the search. Defaults to `None`.
- `exclude_domains` (Sequence[str], optional): A list of domains to exclude from the search. Defaults to `None`.
- `include_answer` (Union[bool, Literal["basic", "advanced"]], optional): Whether to include a direct answer synthesized from the search results. Defaults to `False`.
- `include_raw_content` (bool, optional): Whether to include the raw HTML content of the searched pages. Defaults to `False`.
- `include_images` (bool, optional): Whether to include image results. Defaults to `False`.
- `timeout` (int, optional): The request timeout in seconds. Defaults to `60`.
## Advanced Usage
You can configure the tool with custom parameters:
```python
# Example: Initialize with specific parameters
custom_tavily_tool = TavilySearchTool(
search_depth='advanced',
max_results=10,
include_answer=True
)
# The agent will use these defaults
agent_with_custom_tool = Agent(
role="Advanced Researcher",
goal="Conduct detailed research with comprehensive results",
tools=[custom_tavily_tool]
)
```
## Features
- **Comprehensive Search**: Access to Tavily's powerful search index
- **Configurable Depth**: Choose between basic and advanced search modes
- **Topic Filtering**: Focus searches on general, news, or finance topics
- **Time Range Control**: Limit results to specific time periods
- **Domain Control**: Include or exclude specific domains
- **Direct Answers**: Get synthesized answers from search results
- **Content Filtering**: Prevent context window issues with automatic content truncation
## Response Format
The tool returns search results as a JSON string containing:
- Search results with titles, URLs, and content snippets
- Optional direct answers to queries
- Optional image results
- Optional raw HTML content (when enabled)
Content for each result is automatically truncated to prevent context window issues while maintaining the most relevant information.

View File

@@ -1,100 +0,0 @@
---
title: Serper Scrape Website
description: The `SerperScrapeWebsiteTool` is designed to scrape websites and extract clean, readable content using Serper's scraping API.
icon: globe
---
# `SerperScrapeWebsiteTool`
## Description
This tool is designed to scrape website content and extract clean, readable text from any website URL. It utilizes the [serper.dev](https://serper.dev) scraping API to fetch and process web pages, optionally including markdown formatting for better structure and readability.
## Installation
To effectively use the `SerperScrapeWebsiteTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for an account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
To incorporate this tool into your project, follow the installation instructions below:
```shell
pip install 'crewai[tools]'
```
## Example
The following example demonstrates how to initialize the tool and scrape a website:
```python Code
from crewai_tools import SerperScrapeWebsiteTool
# Initialize the tool for website scraping capabilities
tool = SerperScrapeWebsiteTool()
# Scrape a website with markdown formatting
result = tool.run(url="https://example.com", include_markdown=True)
```
## Arguments
The `SerperScrapeWebsiteTool` accepts the following arguments:
- **url**: Required. The URL of the website to scrape.
- **include_markdown**: Optional. Whether to include markdown formatting in the scraped content. Defaults to `True`.
## Example with Parameters
Here is an example demonstrating how to use the tool with different parameters:
```python Code
from crewai_tools import SerperScrapeWebsiteTool
tool = SerperScrapeWebsiteTool()
# Scrape with markdown formatting (default)
markdown_result = tool.run(
url="https://docs.crewai.com",
include_markdown=True
)
# Scrape without markdown formatting for plain text
plain_result = tool.run(
url="https://docs.crewai.com",
include_markdown=False
)
print("Markdown formatted content:")
print(markdown_result)
print("\nPlain text content:")
print(plain_result)
```
## Use Cases
The `SerperScrapeWebsiteTool` is particularly useful for:
- **Content Analysis**: Extract and analyze website content for research purposes
- **Data Collection**: Gather structured information from web pages
- **Documentation Processing**: Convert web-based documentation into readable formats
- **Competitive Analysis**: Scrape competitor websites for market research
- **Content Migration**: Extract content from existing websites for migration purposes
## Error Handling
The tool includes comprehensive error handling for:
- **Network Issues**: Handles connection timeouts and network errors gracefully
- **API Errors**: Provides detailed error messages for API-related issues
- **Invalid URLs**: Validates and reports issues with malformed URLs
- **Authentication**: Clear error messages for missing or invalid API keys
## Security Considerations
- Always store your `SERPER_API_KEY` in environment variables, never hardcode it in your source code
- Be mindful of rate limits imposed by the Serper API
- Respect robots.txt and website terms of service when scraping content
- Consider implementing delays between requests for large-scale scraping operations

Binary file not shown.

Before

Width:  |  Height:  |  Size: 222 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 329 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 590 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 216 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 277 KiB

View File

@@ -76,7 +76,6 @@ Exemplo:
crewai train -n 10 -f my_training_data.pkl
```
```python
# Exemplo de uso programático do comando train
n_iterations = 2
inputs = {"topic": "Treinamento CrewAI"}
@@ -84,13 +83,12 @@ filename = "seu_modelo.pkl"
try:
SuaCrew().crew().train(
n_iterations=n_iterations,
inputs=inputs,
n_iterations=n_iterations,
inputs=inputs,
filename=filename
)
except Exception as e:
raise Exception(f"Ocorreu um erro ao treinar a crew: {e}")
```
### 4. Replay
@@ -103,7 +101,7 @@ crewai replay [OPTIONS]
- `-t, --task_id TEXT`: Reexecuta o crew a partir deste task ID, incluindo todas as tarefas subsequentes
Exemplo:
```shell Terminal
```shell Terminal
crewai replay -t task_123456
```
@@ -149,7 +147,7 @@ crewai test [OPTIONS]
- `-m, --model TEXT`: Modelo LLM para executar os testes no Crew (padrão: "gpt-4o-mini")
Exemplo:
```shell Terminal
```shell Terminal
crewai test -n 5 -m gpt-3.5-turbo
```
@@ -203,7 +201,10 @@ def crew(self) -> Crew:
Implemente o crew ou flow no [CrewAI Enterprise](https://app.crewai.com).
- **Autenticação**: Você precisa estar autenticado para implementar no CrewAI Enterprise.
Você pode fazer login ou criar uma conta com:
```shell Terminal
crewai signup
```
Caso já tenha uma conta, você pode fazer login com:
```shell Terminal
crewai login
```
@@ -250,7 +251,7 @@ Você deve estar autenticado no CrewAI Enterprise para usar estes comandos de ge
- **Implantar o Crew**: Depois de autenticado, você pode implantar seu crew ou flow no CrewAI Enterprise.
```shell Terminal
crewai deploy push
```
```
- Inicia o processo de deployment na plataforma CrewAI Enterprise.
- Após a iniciação bem-sucedida, será exibida a mensagem Deployment created successfully! juntamente com o Nome do Deployment e um Deployment ID (UUID) único.
@@ -323,4 +324,4 @@ Ao escolher um provedor, o CLI solicitará que você informe o nome da chave e a
Veja o seguinte link para o nome de chave de cada provedor:
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)

View File

@@ -268,7 +268,7 @@ Nesta seção, você encontrará exemplos detalhados que ajudam a selecionar, co
from crewai import LLM
llm = LLM(
model="gemini-1.5-pro-latest", # or vertex_ai/gemini-1.5-pro-latest
model="gemini/gemini-1.5-pro-latest",
temperature=0.7,
vertex_credentials=vertex_credentials_json
)

View File

@@ -623,7 +623,7 @@ for provider in providers_to_test:
**Erros de modelo não encontrado:**
```python
# Verifique disponibilidade do modelo
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
try:

View File

@@ -54,11 +54,9 @@ crew = Crew(
| **Markdown** _(opcional)_ | `markdown` | `Optional[bool]` | Se a tarefa deve instruir o agente a retornar a resposta final formatada em Markdown. O padrão é False. |
| **Config** _(opcional)_ | `config` | `Optional[Dict[str, Any]]` | Parâmetros de configuração específicos da tarefa. |
| **Arquivo de Saída** _(opcional)_| `output_file` | `Optional[str]` | Caminho do arquivo para armazenar a saída da tarefa. |
| **Criar Diretório** _(opcional)_ | `create_directory` | `Optional[bool]` | Se deve criar o diretório para output_file caso não exista. O padrão é True. |
| **Saída JSON** _(opcional)_ | `output_json` | `Optional[Type[BaseModel]]` | Um modelo Pydantic para estruturar a saída em JSON. |
| **Output Pydantic** _(opcional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | Um modelo Pydantic para a saída da tarefa. |
| **Callback** _(opcional)_ | `callback` | `Optional[Any]` | Função/objeto a ser executado após a conclusão da tarefa. |
| **Guardrail** _(opcional)_ | `guardrail` | `Optional[Callable]` | Função para validar a saída da tarefa antes de prosseguir para a próxima tarefa. |
## Criando Tarefas
@@ -332,11 +330,9 @@ analysis_task = Task(
Guardrails (trilhas de proteção) de tarefas fornecem uma maneira de validar e transformar as saídas das tarefas antes que elas sejam passadas para a próxima tarefa. Esse recurso assegura a qualidade dos dados e oferece feedback aos agentes quando sua saída não atende a critérios específicos.
Guardrails são implementados como funções Python que contêm lógica de validação customizada, proporcionando controle total sobre o processo de validação e garantindo resultados confiáveis e determinísticos.
### Usando Guardrails em Tarefas
### Guardrails Baseados em Função
Para adicionar um guardrail baseado em função a uma tarefa, forneça uma função de validação por meio do parâmetro `guardrail`:
Para adicionar um guardrail a uma tarefa, forneça uma função de validação por meio do parâmetro `guardrail`:
```python Code
from typing import Tuple, Union, Dict, Any
@@ -374,7 +370,9 @@ blog_task = Task(
- Em caso de sucesso: retorna uma tupla `(True, resultado_validado)`
- Em caso de falha: retorna uma tupla `(False, "mensagem de erro explicando a falha")`
### LLMGuardrail
A classe `LLMGuardrail` oferece um mecanismo robusto para validação das saídas das tarefas.
### Melhores Práticas de Tratamento de Erros
@@ -825,7 +823,26 @@ task = Task(
)
```
#### Use uma abordagem no-code para validação
```python Code
from crewai import Task
task = Task(
description="Gerar dados em JSON",
expected_output="Objeto JSON válido",
guardrail="Garanta que a resposta é um objeto JSON válido"
)
```
#### Usando YAML
```yaml
research_task:
...
guardrail: garanta que cada bullet tenha no mínimo 100 palavras
...
```
```python Code
@CrewBase
@@ -941,87 +958,21 @@ task = Task(
## Criando Diretórios ao Salvar Arquivos
O parâmetro `create_directory` controla se o CrewAI deve criar automaticamente diretórios ao salvar saídas de tarefas em arquivos. Este recurso é particularmente útil para organizar outputs e garantir que os caminhos de arquivos estejam estruturados corretamente, especialmente ao trabalhar com hierarquias de projetos complexas.
### Comportamento Padrão
Por padrão, `create_directory=True`, o que significa que o CrewAI criará automaticamente qualquer diretório ausente no caminho do arquivo de saída:
Agora é possível especificar se uma tarefa deve criar diretórios ao salvar sua saída em arquivo. Isso é útil para organizar outputs e garantir que os caminhos estejam corretos.
```python Code
# Comportamento padrão - diretórios são criados automaticamente
report_task = Task(
description='Gerar um relatório abrangente de análise de mercado',
expected_output='Uma análise detalhada de mercado com gráficos e insights',
agent=analyst_agent,
output_file='reports/2025/market_analysis.md', # Cria 'reports/2025/' se não existir
markdown=True
# ...
save_output_task = Task(
description='Salve o resumo das notícias de IA em um arquivo',
expected_output='Arquivo salvo com sucesso',
agent=research_agent,
tools=[file_save_tool],
output_file='outputs/ai_news_summary.txt',
create_directory=True
)
```
### Desabilitando a Criação de Diretórios
Se você quiser evitar a criação automática de diretórios e garantir que o diretório já exista, defina `create_directory=False`:
```python Code
# Modo estrito - o diretório já deve existir
strict_output_task = Task(
description='Salvar dados críticos que requerem infraestrutura existente',
expected_output='Dados salvos em localização pré-configurada',
agent=data_agent,
output_file='secure/vault/critical_data.json',
create_directory=False # Gerará RuntimeError se 'secure/vault/' não existir
)
```
### Configuração YAML
Você também pode configurar este comportamento em suas definições de tarefas YAML:
```yaml tasks.yaml
analysis_task:
description: >
Gerar análise financeira trimestral
expected_output: >
Um relatório financeiro abrangente com insights trimestrais
agent: financial_analyst
output_file: reports/quarterly/q4_2024_analysis.pdf
create_directory: true # Criar automaticamente o diretório 'reports/quarterly/'
audit_task:
description: >
Realizar auditoria de conformidade e salvar no diretório de auditoria existente
expected_output: >
Um relatório de auditoria de conformidade
agent: auditor
output_file: audit/compliance_report.md
create_directory: false # O diretório já deve existir
```
### Casos de Uso
**Criação Automática de Diretórios (`create_directory=True`):**
- Ambientes de desenvolvimento e prototipagem
- Geração dinâmica de relatórios com pastas baseadas em datas
- Fluxos de trabalho automatizados onde a estrutura de diretórios pode variar
- Aplicações multi-tenant com pastas específicas do usuário
**Gerenciamento Manual de Diretórios (`create_directory=False`):**
- Ambientes de produção com controles rígidos do sistema de arquivos
- Aplicações sensíveis à segurança onde diretórios devem ser pré-configurados
- Sistemas com requisitos específicos de permissão
- Ambientes de conformidade onde a criação de diretórios é auditada
### Tratamento de Erros
Quando `create_directory=False` e o diretório não existe, o CrewAI gerará um `RuntimeError`:
```python Code
try:
result = crew.kickoff()
except RuntimeError as e:
# Tratar erro de diretório ausente
print(f"Falha na criação do diretório: {e}")
# Criar diretório manualmente ou usar local alternativo
#...
```
Veja o vídeo abaixo para aprender como utilizar saídas estruturadas no CrewAI:

View File

@@ -11,7 +11,7 @@ dependencies = [
# Core Dependencies
"pydantic>=2.4.2",
"openai>=1.13.3",
"litellm==1.74.3",
"litellm==1.72.6",
"instructor>=1.3.3",
# Text Processing
"pdfplumber>=0.11.4",
@@ -39,7 +39,6 @@ dependencies = [
"tomli>=2.0.2",
"blinker>=1.9.0",
"json5>=0.10.0",
"portalocker==2.7.0",
]
[project.urls]
@@ -48,7 +47,7 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.58.0"]
tools = ["crewai-tools~=0.49.0"]
embeddings = [
"tiktoken~=0.8.0"
]

View File

@@ -54,7 +54,7 @@ def _track_install_async():
_track_install_async()
__version__ = "0.150.0"
__version__ = "0.140.0"
__all__ = [
"Agent",
"Crew",

View File

@@ -210,6 +210,7 @@ class Agent(BaseAgent):
sources=self.knowledge_sources,
embedder=self.embedder,
collection_name=self.role,
storage=self.knowledge_storage or None,
)
self.knowledge.add_sources()
except (TypeError, ValueError) as e:
@@ -340,8 +341,7 @@ class Agent(BaseAgent):
self.knowledge_config.model_dump() if self.knowledge_config else {}
)
if self.knowledge or (self.crew and self.crew.knowledge):
if self.knowledge:
crewai_event_bus.emit(
self,
event=KnowledgeRetrievalStartedEvent(
@@ -353,28 +353,25 @@ class Agent(BaseAgent):
task_prompt
)
if self.knowledge_search_query:
# Quering agent specific knowledge
if self.knowledge:
agent_knowledge_snippets = self.knowledge.query(
[self.knowledge_search_query], **knowledge_config
)
if agent_knowledge_snippets:
self.agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
)
if self.agent_knowledge_context:
task_prompt += self.agent_knowledge_context
# Quering crew specific knowledge
knowledge_snippets = self.crew.query_knowledge(
agent_knowledge_snippets = self.knowledge.query(
[self.knowledge_search_query], **knowledge_config
)
if knowledge_snippets:
self.crew_knowledge_context = extract_knowledge_context(
knowledge_snippets
if agent_knowledge_snippets:
self.agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
)
if self.crew_knowledge_context:
task_prompt += self.crew_knowledge_context
if self.agent_knowledge_context:
task_prompt += self.agent_knowledge_context
if self.crew:
knowledge_snippets = self.crew.query_knowledge(
[self.knowledge_search_query], **knowledge_config
)
if knowledge_snippets:
self.crew_knowledge_context = extract_knowledge_context(
knowledge_snippets
)
if self.crew_knowledge_context:
task_prompt += self.crew_knowledge_context
crewai_event_bus.emit(
self,

View File

@@ -120,8 +120,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
raise
except Exception as e:
handle_unknown_error(self._printer, e)
raise
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
else:
raise e
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)

View File

@@ -5,4 +5,4 @@ AUTH0_AUDIENCE = "https://crewai.us.auth0.com/api/v2/"
WORKOS_DOMAIN = "login.crewai.com"
WORKOS_CLI_CONNECT_APP_ID = "client_01JYT06R59SP0NXYGD994NFXXX"
WORKOS_ENVIRONMENT_ID = "client_01JNJQWBJ4SPFN3SWJM5T7BDG8"
WORKOS_ENVIRONMENT_ID = "client_01JNJQWB4HG8T5980R5VHP057C"

View File

@@ -30,9 +30,6 @@ def validate_jwt_token(
jwk_client = PyJWKClient(jwks_url)
signing_key = jwk_client.get_signing_key_from_jwt(jwt_token)
_unverified_decoded_token = jwt.decode(
jwt_token, options={"verify_signature": False}
)
decoded_token = jwt.decode(
jwt_token,
signing_key.key,
@@ -52,15 +49,9 @@ def validate_jwt_token(
except jwt.ExpiredSignatureError:
raise Exception("Token has expired.")
except jwt.InvalidAudienceError:
actual_audience = _unverified_decoded_token.get("aud", "[no audience found]")
raise Exception(
f"Invalid token audience. Got: '{actual_audience}'. Expected: '{audience}'"
)
raise Exception(f"Invalid token audience. Expected: '{audience}'")
except jwt.InvalidIssuerError:
actual_issuer = _unverified_decoded_token.get("iss", "[no issuer found]")
raise Exception(
f"Invalid token issuer. Got: '{actual_issuer}'. Expected: '{issuer}'"
)
raise Exception(f"Invalid token issuer. Expected: '{issuer}'")
except jwt.MissingRequiredClaimError as e:
raise Exception(f"Token is missing required claims: {str(e)}")
except jwt.exceptions.PyJWKClientError as e:

View File

@@ -26,7 +26,7 @@ class PlusAPIMixin:
"Please sign up/login to CrewAI+ before using the CLI.",
style="bold red",
)
console.print("Run 'crewai login' to sign up/login.", style="bold green")
console.print("Run 'crewai signup' to sign up/login.", style="bold green")
raise SystemExit
def _validate_response(self, response: requests.Response) -> None:

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.150.0,<1.0.0"
"crewai[tools]>=0.140.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.150.0,<1.0.0",
"crewai[tools]>=0.140.0,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.150.0"
"crewai[tools]>=0.140.0"
]
[tool.crewai]

View File

@@ -161,7 +161,7 @@ class Crew(FlowTrackable, BaseModel):
)
user_memory: Optional[InstanceOf[UserMemory]] = Field(
default=None,
description="DEPRECATED: Will be removed in version 0.156.0 or on 2025-08-04, whichever comes first. Use external_memory instead.",
description="An instance of the UserMemory to be used by the Crew to store/fetch memories of a specific user.",
)
external_memory: Optional[InstanceOf[ExternalMemory]] = Field(
default=None,
@@ -327,7 +327,7 @@ class Crew(FlowTrackable, BaseModel):
self._short_term_memory = self.short_term_memory
self._entity_memory = self.entity_memory
# UserMemory will be removed in version 0.156.0 or on 2025-08-04, whichever comes first
# UserMemory is gonna to be deprecated in the future, but we have to initialize a default value for now
self._user_memory = None
if self.memory:
@@ -1255,7 +1255,6 @@ class Crew(FlowTrackable, BaseModel):
if self.external_memory:
copied_data["external_memory"] = self.external_memory.model_copy(deep=True)
if self.user_memory:
# DEPRECATED: UserMemory will be removed in version 0.156.0 or on 2025-08-04
copied_data["user_memory"] = self.user_memory.model_copy(deep=True)
copied_data.pop("agents", None)
@@ -1332,7 +1331,6 @@ class Crew(FlowTrackable, BaseModel):
),
)
test_crew = self.copy()
evaluator = CrewEvaluator(test_crew, llm_instance)
for i in range(1, n_iterations + 1):

View File

@@ -1,40 +0,0 @@
from crewai.experimental.evaluation import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult,
SemanticQualityEvaluator,
GoalAlignmentEvaluator,
ReasoningEfficiencyEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
EvaluationTraceCallback,
create_evaluation_callbacks,
AgentEvaluator,
create_default_evaluator,
ExperimentRunner,
ExperimentResults,
ExperimentResult,
)
__all__ = [
"BaseEvaluator",
"EvaluationScore",
"MetricCategory",
"AgentEvaluationResult",
"SemanticQualityEvaluator",
"GoalAlignmentEvaluator",
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator",
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]

View File

@@ -1,51 +0,0 @@
from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult
)
from crewai.experimental.evaluation.metrics import (
SemanticQualityEvaluator,
GoalAlignmentEvaluator,
ReasoningEfficiencyEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.experimental.evaluation.evaluation_listener import (
EvaluationTraceCallback,
create_evaluation_callbacks
)
from crewai.experimental.evaluation.agent_evaluator import (
AgentEvaluator,
create_default_evaluator
)
from crewai.experimental.evaluation.experiment import (
ExperimentRunner,
ExperimentResults,
ExperimentResult
)
__all__ = [
"BaseEvaluator",
"EvaluationScore",
"MetricCategory",
"AgentEvaluationResult",
"SemanticQualityEvaluator",
"GoalAlignmentEvaluator",
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator",
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]

View File

@@ -1,245 +0,0 @@
import threading
from typing import Any
from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.events.task_events import TaskCompletedEvent
from crewai.utilities.events.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, EvaluationScore, MetricCategory
class ExecutionState:
def __init__(self):
self.traces = {}
self.current_agent_id: str | None = None
self.current_task_id: str | None = None
self.iteration = 1
self.iterations_results = {}
self.agent_evaluators = {}
class AgentEvaluator:
def __init__(
self,
agents: list[Agent],
evaluators: Sequence[BaseEvaluator] | None = None,
):
self.agents: list[Agent] = agents
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.callback = create_evaluation_callbacks()
self.console_formatter = ConsoleFormatter()
self.display_formatter = EvaluationDisplayFormatter()
self._thread_local: threading.local = threading.local()
for agent in self.agents:
self._execution_state.agent_evaluators[str(agent.id)] = self.evaluators
self._subscribe_to_events()
@property
def _execution_state(self) -> ExecutionState:
if not hasattr(self._thread_local, 'execution_state'):
self._thread_local.execution_state = ExecutionState()
return self._thread_local.execution_state
def _subscribe_to_events(self) -> None:
from typing import cast
crewai_event_bus.register_handler(TaskCompletedEvent, cast(Any, self._handle_task_completed))
crewai_event_bus.register_handler(LiteAgentExecutionCompletedEvent, cast(Any, self._handle_lite_agent_completed))
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
assert event.task is not None
agent = event.task.agent
if agent and str(getattr(agent, 'id', 'unknown')) in self._execution_state.agent_evaluators:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=str(event.task.id))
state = ExecutionState()
state.current_agent_id = str(agent.id)
state.current_task_id = str(event.task.id)
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=agent,
task=event.task,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
if agent.role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent.role] = []
self._execution_state.iterations_results[current_iteration][agent.role].append(result)
def _handle_lite_agent_completed(self, source: object, event: LiteAgentExecutionCompletedEvent) -> None:
agent_info = event.agent_info
agent_id = str(agent_info["id"])
if agent_id in self._execution_state.agent_evaluators:
state = ExecutionState()
state.current_agent_id = agent_id
state.current_task_id = "lite_task"
target_agent = None
for agent in self.agents:
if str(agent.id) == agent_id:
target_agent = agent
break
if not target_agent:
return
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=target_agent,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
agent_role = target_agent.role
if agent_role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent_role] = []
self._execution_state.iterations_results[current_iteration][agent_role].append(result)
def set_iteration(self, iteration: int) -> None:
self._execution_state.iteration = iteration
def reset_iterations_results(self) -> None:
self._execution_state.iterations_results = {}
def get_evaluation_results(self) -> dict[str, list[AgentEvaluationResult]]:
if self._execution_state.iterations_results and self._execution_state.iteration in self._execution_state.iterations_results:
return self._execution_state.iterations_results[self._execution_state.iteration]
return {}
def display_results_with_iterations(self) -> None:
self.display_formatter.display_summary_results(self._execution_state.iterations_results)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = True) -> dict[str, AgentAggregatedEvaluationResult]:
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
for agent_role, results in task_results.items():
if not results:
continue
agent_id = results[0].agent_id
aggregated_result = self.display_formatter._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=results,
strategy=strategy
)
agent_results[agent_role] = aggregated_result
if self._execution_state.iterations_results and self._execution_state.iteration == max(self._execution_state.iterations_results.keys(), default=0):
self.display_results_with_iterations()
if include_evaluation_feedback:
self.display_evaluation_with_feedback()
return agent_results
def display_evaluation_with_feedback(self) -> None:
self.display_formatter.display_evaluation_with_feedback(self._execution_state.iterations_results)
def evaluate(
self,
agent: Agent,
execution_trace: dict[str, Any],
final_output: Any,
state: ExecutionState,
task: Task | None = None,
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=state.current_agent_id or str(agent.id),
task_id=state.current_task_id or (str(task.id) if task else "unknown_task")
)
assert self.evaluators is not None
task_id = str(task.id) if task else None
for evaluator in self.evaluators:
try:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id)
score = evaluator.evaluate(
agent=agent,
task=task,
execution_trace=execution_trace,
final_output=final_output
)
result.metrics[evaluator.metric_category] = score
self.emit_evaluation_completed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, metric_category=evaluator.metric_category, score=score)
except Exception as e:
self.emit_evaluation_failed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, error=str(e))
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
return result
def emit_evaluation_started_event(self, agent_role: str, agent_id: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationStartedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration)
)
def emit_evaluation_completed_event(self, agent_role: str, agent_id: str, task_id: str | None = None, metric_category: MetricCategory | None = None, score: EvaluationScore | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationCompletedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, metric_category=metric_category, score=score)
)
def emit_evaluation_failed_event(self, agent_role: str, agent_id: str, error: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationFailedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, error=error)
)
def create_default_evaluator(agents: list[Agent], llm: None = None):
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
)
evaluators = [
GoalAlignmentEvaluator(llm=llm),
SemanticQualityEvaluator(llm=llm),
ToolSelectionEvaluator(llm=llm),
ParameterExtractionEvaluator(llm=llm),
ToolInvocationEvaluator(llm=llm),
ReasoningEfficiencyEvaluator(llm=llm),
]
return AgentEvaluator(evaluators=evaluators, agents=agents)

View File

@@ -1,125 +0,0 @@
import abc
import enum
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.task import Task
from crewai.llm import BaseLLM
from crewai.utilities.llm_utils import create_llm
class MetricCategory(enum.Enum):
GOAL_ALIGNMENT = "goal_alignment"
SEMANTIC_QUALITY = "semantic_quality"
REASONING_EFFICIENCY = "reasoning_efficiency"
TOOL_SELECTION = "tool_selection"
PARAMETER_EXTRACTION = "parameter_extraction"
TOOL_INVOCATION = "tool_invocation"
def title(self):
return self.value.replace('_', ' ').title()
class EvaluationScore(BaseModel):
score: float | None = Field(
default=5.0,
description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable",
ge=0.0,
le=10.0
)
feedback: str = Field(
default="",
description="Detailed feedback explaining the evaluation score"
)
raw_response: str | None = Field(
default=None,
description="Raw response from the evaluator (e.g., LLM)"
)
def __str__(self) -> str:
if self.score is None:
return f"Score: N/A - {self.feedback}"
return f"Score: {self.score:.1f}/10 - {self.feedback}"
class BaseEvaluator(abc.ABC):
def __init__(self, llm: BaseLLM | None = None):
self.llm: BaseLLM | None = create_llm(llm)
@property
@abc.abstractmethod
def metric_category(self) -> MetricCategory:
pass
@abc.abstractmethod
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
pass
class AgentEvaluationResult(BaseModel):
agent_id: str = Field(description="ID of the evaluated agent")
task_id: str = Field(description="ID of the task that was executed")
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Evaluation scores for each metric category"
)
class AggregationStrategy(Enum):
SIMPLE_AVERAGE = "simple_average" # Equal weight to all tasks
WEIGHTED_BY_COMPLEXITY = "weighted_by_complexity" # Weight by task complexity
BEST_PERFORMANCE = "best_performance" # Use best scores across tasks
WORST_PERFORMANCE = "worst_performance" # Use worst scores across tasks
class AgentAggregatedEvaluationResult(BaseModel):
agent_id: str = Field(
default="",
description="ID of the agent"
)
agent_role: str = Field(
default="",
description="Role of the agent"
)
task_count: int = Field(
default=0,
description="Number of tasks included in this aggregation"
)
aggregation_strategy: AggregationStrategy = Field(
default=AggregationStrategy.SIMPLE_AVERAGE,
description="Strategy used for aggregation"
)
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Aggregated metrics across all tasks"
)
task_results: List[str] = Field(
default_factory=list,
description="IDs of tasks included in this aggregation"
)
overall_score: Optional[float] = Field(
default=None,
description="Overall score for this agent"
)
def __str__(self) -> str:
result = f"Agent Evaluation: {self.agent_role}\n"
result += f"Strategy: {self.aggregation_strategy.value}\n"
result += f"Tasks evaluated: {self.task_count}\n"
for category, score in self.metrics.items():
result += f"\n\n- {category.value.upper()}: {score.score}/10\n"
if score.feedback:
detailed_feedback = "\n ".join(score.feedback.split('\n'))
result += f" {detailed_feedback}\n"
return result

View File

@@ -1,333 +0,0 @@
from collections import defaultdict
from typing import Dict, Any, List
from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED
from collections.abc import Sequence
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
from crewai.experimental.evaluation import EvaluationScore
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.llm_utils import create_llm
class EvaluationDisplayFormatter:
def __init__(self):
self.console_formatter = ConsoleFormatter()
def display_evaluation_with_feedback(self, iterations_results: Dict[int, Dict[str, List[Any]]]):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
return
all_agent_roles: set[str] = set()
for iter_results in iterations_results.values():
all_agent_roles.update(iter_results.keys())
for agent_role in sorted(all_agent_roles):
self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]")
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
continue
agent_results = results[agent_role]
agent_id = agent_results[0].agent_id
aggregated_result = self._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
)
self.console_formatter.print(f"\n[bold]Iteration {iter_num}[/bold]")
table = Table(box=ROUNDED)
table.add_column("Metric", style="cyan")
table.add_column("Score (1-10)", justify="center")
table.add_column("Feedback", style="green")
if aggregated_result.metrics:
for metric, evaluation_score in aggregated_result.metrics.items():
score = evaluation_score.score
if isinstance(score, (int, float)):
if score >= 8.0:
score_text = f"[green]{score:.1f}[/green]"
elif score >= 6.0:
score_text = f"[cyan]{score:.1f}[/cyan]"
elif score >= 4.0:
score_text = f"[yellow]{score:.1f}[/yellow]"
else:
score_text = f"[red]{score:.1f}[/red]"
else:
score_text = "[dim]N/A[/dim]"
table.add_section()
table.add_row(
metric.title(),
score_text,
evaluation_score.feedback or ""
)
if aggregated_result.overall_score is not None:
overall_score = aggregated_result.overall_score
if overall_score >= 8.0:
overall_color = "green"
elif overall_score >= 6.0:
overall_color = "cyan"
elif overall_score >= 4.0:
overall_color = "yellow"
else:
overall_color = "red"
table.add_section()
table.add_row(
"Overall Score",
f"[{overall_color}]{overall_score:.1f}[/]",
"Overall agent evaluation score"
)
self.console_formatter.print(table)
def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
return
self.console_formatter.print("\n")
table = Table(title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE)
table.add_column("Agent/Metric", style="cyan")
for iter_num in sorted(iterations_results.keys()):
run_label = f"Run {iter_num}"
table.add_column(run_label, justify="center")
table.add_column("Avg. Total", justify="center")
all_agent_roles: set[str] = set()
for results in iterations_results.values():
all_agent_roles.update(results.keys())
for agent_role in sorted(all_agent_roles):
agent_scores_by_iteration = {}
agent_metrics_by_iteration = {}
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
continue
agent_results = results[agent_role]
agent_id = agent_results[0].agent_id
aggregated_result = self._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
strategy=AggregationStrategy.SIMPLE_AVERAGE
)
valid_scores = [score.score for score in aggregated_result.metrics.values()
if score.score is not None]
if valid_scores:
avg_score = sum(valid_scores) / len(valid_scores)
agent_scores_by_iteration[iter_num] = avg_score
agent_metrics_by_iteration[iter_num] = aggregated_result.metrics
if not agent_scores_by_iteration:
continue
avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(agent_scores_by_iteration)
row = [f"[bold]{agent_role}[/bold]"]
for iter_num in sorted(iterations_results.keys()):
if iter_num in agent_scores_by_iteration:
score = agent_scores_by_iteration[iter_num]
if score >= 8.0:
color = "green"
elif score >= 6.0:
color = "cyan"
elif score >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[bold {color}]{score:.1f}[/]")
else:
row.append("-")
if avg_across_iterations >= 8.0:
color = "green"
elif avg_across_iterations >= 6.0:
color = "cyan"
elif avg_across_iterations >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[bold {color}]{avg_across_iterations:.1f}[/]")
table.add_row(*row)
all_metrics: set[Any] = set()
for metrics in agent_metrics_by_iteration.values():
all_metrics.update(metrics.keys())
for metric in sorted(all_metrics, key=lambda x: x.value):
metric_scores = []
row = [f" - {metric.title()}"]
for iter_num in sorted(iterations_results.keys()):
if (iter_num in agent_metrics_by_iteration and
metric in agent_metrics_by_iteration[iter_num]):
metric_score = agent_metrics_by_iteration[iter_num][metric].score
if metric_score is not None:
metric_scores.append(metric_score)
if metric_score >= 8.0:
color = "green"
elif metric_score >= 6.0:
color = "cyan"
elif metric_score >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[{color}]{metric_score:.1f}[/]")
else:
row.append("[dim]N/A[/dim]")
else:
row.append("-")
if metric_scores:
avg = sum(metric_scores) / len(metric_scores)
if avg >= 8.0:
color = "green"
elif avg >= 6.0:
color = "cyan"
elif avg >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[{color}]{avg:.1f}[/]")
else:
row.append("-")
table.add_row(*row)
table.add_row(*[""] * (len(sorted(iterations_results.keys())) + 2))
self.console_formatter.print(table)
self.console_formatter.print("\n")
def _aggregate_agent_results(
self,
agent_id: str,
agent_role: str,
results: Sequence[AgentEvaluationResult],
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
) -> AgentAggregatedEvaluationResult:
metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(list)
for result in results:
for metric_name, evaluation_score in result.metrics.items():
metrics_by_category[metric_name].append(evaluation_score)
aggregated_metrics: dict[MetricCategory, EvaluationScore] = {}
for category, scores in metrics_by_category.items():
valid_scores = [s.score for s in scores if s.score is not None]
avg_score = sum(valid_scores) / len(valid_scores) if valid_scores else None
feedbacks = [s.feedback for s in scores if s.feedback]
feedback_summary = None
if feedbacks:
if len(feedbacks) > 1:
feedback_summary = self._summarize_feedbacks(
agent_role=agent_role,
metric=category.title(),
feedbacks=feedbacks,
scores=[s.score for s in scores],
strategy=strategy
)
else:
feedback_summary = feedbacks[0]
aggregated_metrics[category] = EvaluationScore(
score=avg_score,
feedback=feedback_summary
)
overall_score = None
if aggregated_metrics:
valid_scores = [m.score for m in aggregated_metrics.values() if m.score is not None]
if valid_scores:
overall_score = sum(valid_scores) / len(valid_scores)
return AgentAggregatedEvaluationResult(
agent_id=agent_id,
agent_role=agent_role,
metrics=aggregated_metrics,
overall_score=overall_score,
task_count=len(results),
aggregation_strategy=strategy
)
def _summarize_feedbacks(
self,
agent_role: str,
metric: str,
feedbacks: List[str],
scores: List[float | None],
strategy: AggregationStrategy
) -> str:
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):
return "\n\n".join([f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)])
try:
llm = create_llm()
formatted_feedbacks = []
for i, (feedback, score) in enumerate(zip(feedbacks, scores)):
if len(feedback) > 500:
feedback = feedback[:500] + "..."
score_text = f"{score:.1f}" if score is not None else "N/A"
formatted_feedbacks.append(f"Feedback #{i+1} (Score: {score_text}):\n{feedback}")
all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks)
strategy_guidance = ""
if strategy == AggregationStrategy.BEST_PERFORMANCE:
strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated."
elif strategy == AggregationStrategy.WORST_PERFORMANCE:
strategy_guidance = "Focus on areas that need improvement and common issues across tasks."
else:
strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks."
prompt = [
{"role": "system", "content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback.
Your job is to synthesize multiple feedback points about the same metric across different tasks.
Create a concise, insightful summary that captures the key patterns and themes from all feedback.
{strategy_guidance}
Your summary should be:
1. Specific and concrete (not vague or general)
2. Focused on actionable insights
3. Highlighting patterns across tasks
4. 150-250 words in length
The summary should be directly usable as final feedback for the agent's performance on this metric."""},
{"role": "user", "content": f"""I need a synthesized summary of the following feedback for:
Agent Role: {agent_role}
Metric: {metric.title()}
{all_feedbacks}
"""}
]
assert llm is not None
response = llm.call(prompt)
return response
except Exception:
return "Synthesized from multiple tasks: " + "\n\n".join([f"- {fb[:500]}..." for fb in feedbacks])

View File

@@ -1,234 +0,0 @@
from datetime import datetime
from typing import Any, Dict, Optional
from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolExecutionErrorEvent,
ToolSelectionErrorEvent,
ToolValidateInputErrorEvent
)
from crewai.utilities.events.llm_events import (
LLMCallStartedEvent,
LLMCallCompletedEvent
)
class EvaluationTraceCallback(BaseEventListener):
"""Event listener for collecting execution traces for evaluation.
This listener attaches to the event bus to collect detailed information
about the execution process, including agent steps, tool uses, knowledge
retrievals, and final output - all for use in agent evaluation.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not hasattr(self, "_initialized") or not self._initialized:
super().__init__()
self.traces = {}
self.current_agent_id = None
self.current_task_id = None
self._initialized = True
def setup_listeners(self, event_bus: CrewAIEventsBus):
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event: AgentExecutionStartedEvent):
self.on_agent_start(event.agent, event.task)
@event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_started(source, event: LiteAgentExecutionStartedEvent):
self.on_lite_agent_start(event.agent_info)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event: AgentExecutionCompletedEvent):
self.on_agent_finish(event.agent, event.task, event.output)
@event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_completed(source, event: LiteAgentExecutionCompletedEvent):
self.on_lite_agent_finish(event.output)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_completed(source, event: ToolUsageFinishedEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="usage_error")
@event_bus.on(ToolExecutionErrorEvent)
def on_tool_execution_error(source, event: ToolExecutionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="execution_error")
@event_bus.on(ToolSelectionErrorEvent)
def on_tool_selection_error(source, event: ToolSelectionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="selection_error")
@event_bus.on(ToolValidateInputErrorEvent)
def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="validation_error")
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
self.on_llm_call_start(event.messages, event.tools)
@event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.on_llm_call_end(event.messages, event.response)
def on_lite_agent_start(self, agent_info: dict[str, Any]):
self.current_agent_id = agent_info['id']
self.current_task_id = "lite_task"
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
self._init_trace(
trace_key=trace_key,
agent_id=self.current_agent_id,
task_id=self.current_task_id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
def _init_trace(self, trace_key: str, **kwargs: Any):
self.traces[trace_key] = kwargs
def on_agent_start(self, agent: Agent, task: Task):
self.current_agent_id = agent.id
self.current_task_id = task.id
trace_key = f"{agent.id}_{task.id}"
self._init_trace(
trace_key=trace_key,
agent_id=agent.id,
task_id=task.id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
trace_key = f"{agent.id}_{task.id}"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def _reset_current(self):
self.current_agent_id = None
self.current_task_id = None
def on_lite_agent_finish(self, output: Any):
trace_key = f"{self.current_agent_id}_lite_task"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: str | None = None):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key in self.traces:
tool_use = {
"tool": tool_name,
"args": tool_args,
"result": result,
"success": success,
"timestamp": datetime.now()
}
# Add error information if applicable
if not success and error_type:
tool_use["error"] = True
tool_use["error_type"] = error_type
self.traces[trace_key]["tool_uses"].append(tool_use)
def on_llm_call_start(self, messages: str | Sequence[dict[str, Any]] | None, tools: Sequence[dict[str, Any]] | None = None):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key not in self.traces:
return
self.current_llm_call = {
"messages": messages,
"tools": tools,
"start_time": datetime.now(),
"response": None,
"end_time": None
}
def on_llm_call_end(self, messages: str | list[dict[str, Any]] | None, response: Any):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key not in self.traces:
return
total_tokens = 0
if hasattr(response, "usage") and hasattr(response.usage, "total_tokens"):
total_tokens = response.usage.total_tokens
current_time = datetime.now()
start_time = None
if hasattr(self, "current_llm_call") and self.current_llm_call:
start_time = self.current_llm_call.get("start_time")
if not start_time:
start_time = current_time
llm_call = {
"messages": messages,
"response": response,
"start_time": start_time,
"end_time": current_time,
"total_tokens": total_tokens
}
self.traces[trace_key]["llm_calls"].append(llm_call)
if hasattr(self, "current_llm_call"):
self.current_llm_call = {}
def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]:
trace_key = f"{agent_id}_{task_id}"
return self.traces.get(trace_key)
def create_evaluation_callbacks() -> EvaluationTraceCallback:
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
callback = EvaluationTraceCallback()
callback.setup_listeners(crewai_event_bus)
return callback

View File

@@ -1,8 +0,0 @@
from crewai.experimental.evaluation.experiment.runner import ExperimentRunner
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
__all__ = [
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]

View File

@@ -1,122 +0,0 @@
import json
import os
from datetime import datetime, timezone
from typing import Any
from pydantic import BaseModel
class ExperimentResult(BaseModel):
identifier: str
inputs: dict[str, Any]
score: int | dict[str, int | float]
expected_score: int | dict[str, int | float]
passed: bool
agent_evaluations: dict[str, Any] | None = None
class ExperimentResults:
def __init__(self, results: list[ExperimentResult], metadata: dict[str, Any] | None = None):
self.results = results
self.metadata = metadata or {}
self.timestamp = datetime.now(timezone.utc)
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
self.display = ExperimentResultsDisplay()
def to_json(self, filepath: str | None = None) -> dict[str, Any]:
data = {
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata,
"results": [r.model_dump(exclude={"agent_evaluations"}) for r in self.results]
}
if filepath:
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
self.display.console.print(f"[green]Results saved to {filepath}[/green]")
return data
def compare_with_baseline(self, baseline_filepath: str, save_current: bool = True, print_summary: bool = False) -> dict[str, Any]:
baseline_runs = []
if os.path.exists(baseline_filepath) and os.path.getsize(baseline_filepath) > 0:
try:
with open(baseline_filepath, 'r') as f:
baseline_data = json.load(f)
if isinstance(baseline_data, dict) and "timestamp" in baseline_data:
baseline_runs = [baseline_data]
elif isinstance(baseline_data, list):
baseline_runs = baseline_data
except (json.JSONDecodeError, FileNotFoundError) as e:
self.display.console.print(f"[yellow]Warning: Could not load baseline file: {str(e)}[/yellow]")
if not baseline_runs:
if save_current:
current_data = self.to_json()
with open(baseline_filepath, 'w') as f:
json.dump([current_data], f, indent=2)
self.display.console.print(f"[green]Saved current results as new baseline to {baseline_filepath}[/green]")
return {"is_baseline": True, "changes": {}}
baseline_runs.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
latest_run = baseline_runs[0]
comparison = self._compare_with_run(latest_run)
if print_summary:
self.display.comparison_summary(comparison, latest_run["timestamp"])
if save_current:
current_data = self.to_json()
baseline_runs.append(current_data)
with open(baseline_filepath, 'w') as f:
json.dump(baseline_runs, f, indent=2)
self.display.console.print(f"[green]Added current results to baseline file {baseline_filepath}[/green]")
return comparison
def _compare_with_run(self, baseline_run: dict[str, Any]) -> dict[str, Any]:
baseline_results = baseline_run.get("results", [])
baseline_lookup = {}
for result in baseline_results:
test_identifier = result.get("identifier")
if test_identifier:
baseline_lookup[test_identifier] = result
improved = []
regressed = []
unchanged = []
new_tests = []
for result in self.results:
test_identifier = result.identifier
if not test_identifier or test_identifier not in baseline_lookup:
new_tests.append(test_identifier)
continue
baseline_result = baseline_lookup[test_identifier]
baseline_passed = baseline_result.get("passed", False)
if result.passed and not baseline_passed:
improved.append(test_identifier)
elif not result.passed and baseline_passed:
regressed.append(test_identifier)
else:
unchanged.append(test_identifier)
missing_tests = []
current_test_identifiers = {result.identifier for result in self.results}
for result in baseline_results:
test_identifier = result.get("identifier")
if test_identifier and test_identifier not in current_test_identifiers:
missing_tests.append(test_identifier)
return {
"improved": improved,
"regressed": regressed,
"unchanged": unchanged,
"new_tests": new_tests,
"missing_tests": missing_tests,
"total_compared": len(improved) + len(regressed) + len(unchanged),
"baseline_timestamp": baseline_run.get("timestamp", "unknown")
}

View File

@@ -1,70 +0,0 @@
from typing import Dict, Any
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from crewai.experimental.evaluation.experiment.result import ExperimentResults
class ExperimentResultsDisplay:
def __init__(self):
self.console = Console()
def summary(self, experiment_results: ExperimentResults):
total = len(experiment_results.results)
passed = sum(1 for r in experiment_results.results if r.passed)
table = Table(title="Experiment Summary")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Test Cases", str(total))
table.add_row("Passed", str(passed))
table.add_row("Failed", str(total - passed))
table.add_row("Success Rate", f"{(passed / total * 100):.1f}%" if total > 0 else "N/A")
self.console.print(table)
def comparison_summary(self, comparison: Dict[str, Any], baseline_timestamp: str):
self.console.print(Panel(f"[bold]Comparison with baseline run from {baseline_timestamp}[/bold]",
expand=False))
table = Table(title="Results Comparison")
table.add_column("Metric", style="cyan")
table.add_column("Count", style="white")
table.add_column("Details", style="dim")
improved = comparison.get("improved", [])
if improved:
details = ", ".join([f"{test_identifier}" for test_identifier in improved[:3]])
if len(improved) > 3:
details += f" and {len(improved) - 3} more"
table.add_row("✅ Improved", str(len(improved)), details)
else:
table.add_row("✅ Improved", "0", "")
regressed = comparison.get("regressed", [])
if regressed:
details = ", ".join([f"{test_identifier}" for test_identifier in regressed[:3]])
if len(regressed) > 3:
details += f" and {len(regressed) - 3} more"
table.add_row("❌ Regressed", str(len(regressed)), details, style="red")
else:
table.add_row("❌ Regressed", "0", "")
unchanged = comparison.get("unchanged", [])
table.add_row("⏺ Unchanged", str(len(unchanged)), "")
new_tests = comparison.get("new_tests", [])
if new_tests:
details = ", ".join(new_tests[:3])
if len(new_tests) > 3:
details += f" and {len(new_tests) - 3} more"
table.add_row(" New Tests", str(len(new_tests)), details)
missing_tests = comparison.get("missing_tests", [])
if missing_tests:
details = ", ".join(missing_tests[:3])
if len(missing_tests) > 3:
details += f" and {len(missing_tests) - 3} more"
table.add_row(" Missing Tests", str(len(missing_tests)), details)
self.console.print(table)

View File

@@ -1,125 +0,0 @@
from collections import defaultdict
from hashlib import md5
from typing import Any
from crewai import Crew, Agent
from crewai.experimental.evaluation import AgentEvaluator, create_default_evaluator
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
from crewai.experimental.evaluation.evaluation_display import AgentAggregatedEvaluationResult
class ExperimentRunner:
def __init__(self, dataset: list[dict[str, Any]]):
self.dataset = dataset or []
self.evaluator: AgentEvaluator | None = None
self.display = ExperimentResultsDisplay()
def run(self, crew: Crew | None = None, agents: list[Agent] | None = None, print_summary: bool = False) -> ExperimentResults:
if crew and not agents:
agents = crew.agents
assert agents is not None
self.evaluator = create_default_evaluator(agents=agents)
results = []
for test_case in self.dataset:
self.evaluator.reset_iterations_results()
result = self._run_test_case(test_case=test_case, crew=crew, agents=agents)
results.append(result)
experiment_results = ExperimentResults(results)
if print_summary:
self.display.summary(experiment_results)
return experiment_results
def _run_test_case(self, test_case: dict[str, Any], agents: list[Agent], crew: Crew | None = None) -> ExperimentResult:
inputs = test_case["inputs"]
expected_score = test_case["expected_score"]
identifier = test_case.get("identifier") or md5(str(test_case).encode(), usedforsecurity=False).hexdigest()
try:
self.display.console.print(f"[dim]Running crew with input: {str(inputs)[:50]}...[/dim]")
self.display.console.print("\n")
if crew:
crew.kickoff(inputs=inputs)
else:
for agent in agents:
agent.kickoff(**inputs)
assert self.evaluator is not None
agent_evaluations = self.evaluator.get_agent_evaluation()
actual_score = self._extract_scores(agent_evaluations)
passed = self._assert_scores(expected_score, actual_score)
return ExperimentResult(
identifier=identifier,
inputs=inputs,
score=actual_score,
expected_score=expected_score,
passed=passed,
agent_evaluations=agent_evaluations
)
except Exception as e:
self.display.console.print(f"[red]Error running test case: {str(e)}[/red]")
return ExperimentResult(
identifier=identifier,
inputs=inputs,
score=0,
expected_score=expected_score,
passed=False
)
def _extract_scores(self, agent_evaluations: dict[str, AgentAggregatedEvaluationResult]) -> float | dict[str, float]:
all_scores: dict[str, list[float]] = defaultdict(list)
for evaluation in agent_evaluations.values():
for metric_name, score in evaluation.metrics.items():
if score.score is not None:
all_scores[metric_name.value].append(score.score)
avg_scores = {m: sum(s)/len(s) for m, s in all_scores.items()}
if len(avg_scores) == 1:
return list(avg_scores.values())[0]
return avg_scores
def _assert_scores(self, expected: float | dict[str, float],
actual: float | dict[str, float]) -> bool:
"""
Compare expected and actual scores, and return whether the test case passed.
The rules for comparison are as follows:
- If both expected and actual scores are single numbers, the actual score must be >= expected.
- If expected is a single number and actual is a dict, compare against the average of actual values.
- If expected is a dict and actual is a single number, actual must be >= all expected values.
- If both are dicts, actual must have matching keys with values >= expected values.
"""
if isinstance(expected, (int, float)) and isinstance(actual, (int, float)):
return actual >= expected
if isinstance(expected, dict) and isinstance(actual, (int, float)):
return all(actual >= exp_score for exp_score in expected.values())
if isinstance(expected, (int, float)) and isinstance(actual, dict):
if not actual:
return False
avg_score = sum(actual.values()) / len(actual)
return avg_score >= expected
if isinstance(expected, dict) and isinstance(actual, dict):
if not expected:
return True
matching_keys = set(expected.keys()) & set(actual.keys())
if not matching_keys:
return False
# All matching keys must have actual >= expected
return all(actual[key] >= expected[key] for key in matching_keys)
return False

View File

@@ -1,30 +0,0 @@
"""Robust JSON parsing utilities for evaluation responses."""
import json
import re
from typing import Any
def extract_json_from_llm_response(text: str) -> dict[str, Any]:
try:
return json.loads(text)
except json.JSONDecodeError:
pass
json_patterns = [
# Standard markdown code blocks with json
r'```json\s*([\s\S]*?)\s*```',
# Code blocks without language specifier
r'```\s*([\s\S]*?)\s*```',
# Inline code with JSON
r'`([{\\[].*[}\]])`',
]
for pattern in json_patterns:
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
for match in matches:
try:
return json.loads(match.strip())
except json.JSONDecodeError:
continue
raise ValueError("No valid JSON found in the response")

View File

@@ -1,26 +0,0 @@
from crewai.experimental.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator
)
from crewai.experimental.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.experimental.evaluation.metrics.goal_metrics import (
GoalAlignmentEvaluator
)
from crewai.experimental.evaluation.metrics.semantic_quality_metrics import (
SemanticQualityEvaluator
)
__all__ = [
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"GoalAlignmentEvaluator",
"SemanticQualityEvaluator"
]

View File

@@ -1,69 +0,0 @@
from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
class GoalAlignmentEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.GOAL_ALIGNMENT
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
Score the agent's goal alignment on a scale from 0-10 where:
- 0: Complete misalignment, agent did not understand or attempt the task goal
- 5: Partial alignment, agent attempted the task but missed key requirements
- 10: Perfect alignment, agent fully satisfied all task requirements
Consider:
1. Did the agent correctly interpret the task goal?
2. Did the final output directly address the requirements?
3. Did the agent focus on relevant aspects of the task?
4. Did the agent provide all requested information or deliverables?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Agent goal: {agent.goal}
{task_context}
Agent's final output:
{final_output}
Evaluate how well the agent's output aligns with the assigned task goal.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore(
score=evaluation_data.get("score", 0),
feedback=evaluation_data.get("feedback", response),
raw_response=response
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)

View File

@@ -1,361 +0,0 @@
"""Agent reasoning efficiency evaluators.
This module provides evaluator implementations for:
- Reasoning efficiency
- Loop detection
- Thinking-to-action ratio
"""
import logging
import re
from enum import Enum
from typing import Any, Dict, List, Tuple
import numpy as np
from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.tasks.task_output import TaskOutput
class ReasoningPatternType(Enum):
EFFICIENT = "efficient" # Good reasoning flow
LOOP = "loop" # Agent is stuck in a loop
VERBOSE = "verbose" # Agent is unnecessarily verbose
INDECISIVE = "indecisive" # Agent struggles to make decisions
SCATTERED = "scattered" # Agent jumps between topics without focus
class ReasoningEfficiencyEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.REASONING_EFFICIENCY
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: TaskOutput | str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
llm_calls = execution_trace.get("llm_calls", [])
if not llm_calls or len(llm_calls) < 2:
return EvaluationScore(
score=None,
feedback="Insufficient LLM calls to evaluate reasoning efficiency."
)
total_calls = len(llm_calls)
total_tokens = sum(call.get("total_tokens", 0) for call in llm_calls)
avg_tokens_per_call = total_tokens / total_calls if total_calls > 0 else 0
time_intervals = []
has_reliable_timing = True
for i in range(1, len(llm_calls)):
start_time = llm_calls[i-1].get("end_time")
end_time = llm_calls[i].get("start_time")
if start_time and end_time and start_time != end_time:
try:
interval = end_time - start_time
time_intervals.append(interval.total_seconds() if hasattr(interval, 'total_seconds') else 0)
except Exception:
has_reliable_timing = False
else:
has_reliable_timing = False
loop_detected, loop_details = self._detect_loops(llm_calls)
pattern_analysis = self._analyze_reasoning_patterns(llm_calls)
efficiency_metrics = {
"total_llm_calls": total_calls,
"total_tokens": total_tokens,
"avg_tokens_per_call": avg_tokens_per_call,
"reasoning_pattern": pattern_analysis["primary_pattern"].value,
"loops_detected": loop_detected,
}
if has_reliable_timing and time_intervals:
efficiency_metrics["avg_time_between_calls"] = np.mean(time_intervals)
loop_info = f"Detected {len(loop_details)} potential reasoning loops." if loop_detected else "No significant reasoning loops detected."
call_samples = self._get_call_samples(llm_calls)
final_output = final_output.raw if isinstance(final_output, TaskOutput) else final_output
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
Evaluate the agent's reasoning efficiency across these five key subcategories:
1. Focus (0-10): How well the agent stays on topic and avoids unnecessary tangents
2. Progression (0-10): How effectively the agent builds on previous thoughts rather than repeating or circling
3. Decision Quality (0-10): How decisively and appropriately the agent makes decisions
4. Conciseness (0-10): How efficiently the agent communicates without unnecessary verbosity
5. Loop Avoidance (0-10): How well the agent avoids getting stuck in repetitive thinking patterns
For each subcategory, provide a score from 0-10 where:
- 0: Completely inefficient
- 5: Moderately efficient
- 10: Highly efficient
The overall score should be a weighted average of these subcategories.
Return your evaluation as JSON with the following structure:
{
"overall_score": float,
"scores": {
"focus": float,
"progression": float,
"decision_quality": float,
"conciseness": float,
"loop_avoidance": float
},
"feedback": string (general feedback about overall reasoning efficiency),
"optimization_suggestions": string (concrete suggestions for improving reasoning efficiency),
"detected_patterns": string (describe any inefficient reasoning patterns you observe)
}"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Reasoning efficiency metrics:
- Total LLM calls: {efficiency_metrics["total_llm_calls"]}
- Average tokens per call: {efficiency_metrics["avg_tokens_per_call"]:.1f}
- Primary reasoning pattern: {efficiency_metrics["reasoning_pattern"]}
- {loop_info}
{"- Average time between calls: {:.2f} seconds".format(efficiency_metrics.get("avg_time_between_calls", 0)) if "avg_time_between_calls" in efficiency_metrics else ""}
Sample of agent reasoning flow (chronological sequence):
{call_samples}
Agent's final output:
{final_output[:500]}... (truncated)
Evaluate the reasoning efficiency of this agent based on these interaction patterns.
Identify any inefficient reasoning patterns and provide specific suggestions for optimization.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
scores = evaluation_data.get("scores", {})
focus = scores.get("focus", 5.0)
progression = scores.get("progression", 5.0)
decision_quality = scores.get("decision_quality", 5.0)
conciseness = scores.get("conciseness", 5.0)
loop_avoidance = scores.get("loop_avoidance", 5.0)
overall_score = evaluation_data.get("overall_score", evaluation_data.get("score", 5.0))
feedback = evaluation_data.get("feedback", "No detailed feedback provided.")
optimization_suggestions = evaluation_data.get("optimization_suggestions", "No specific suggestions provided.")
detailed_feedback = "Reasoning Efficiency Evaluation:\n"
detailed_feedback += f"• Focus: {focus}/10 - Staying on topic without tangents\n"
detailed_feedback += f"• Progression: {progression}/10 - Building on previous thinking\n"
detailed_feedback += f"• Decision Quality: {decision_quality}/10 - Making appropriate decisions\n"
detailed_feedback += f"• Conciseness: {conciseness}/10 - Communicating efficiently\n"
detailed_feedback += f"• Loop Avoidance: {loop_avoidance}/10 - Avoiding repetitive patterns\n\n"
detailed_feedback += f"Feedback:\n{feedback}\n\n"
detailed_feedback += f"Optimization Suggestions:\n{optimization_suggestions}"
return EvaluationScore(
score=float(overall_score),
feedback=detailed_feedback,
raw_response=response
)
except Exception as e:
logging.warning(f"Failed to parse reasoning efficiency evaluation: {e}")
return EvaluationScore(
score=None,
feedback=f"Failed to parse reasoning efficiency evaluation. Raw response: {response[:200]}...",
raw_response=response
)
def _detect_loops(self, llm_calls: List[Dict]) -> Tuple[bool, List[Dict]]:
loop_details = []
messages = []
for call in llm_calls:
content = call.get("response", "")
if isinstance(content, str):
messages.append(content)
elif isinstance(content, list) and len(content) > 0:
# Handle message list format
for msg in content:
if isinstance(msg, dict) and "content" in msg:
messages.append(msg["content"])
# Simple n-gram based similarity detection
# For a more robust implementation, consider using embedding-based similarity
for i in range(len(messages) - 2):
for j in range(i + 1, len(messages) - 1):
# Check for repeated patterns (simplistic approach)
# A more sophisticated approach would use semantic similarity
similarity = self._calculate_text_similarity(messages[i], messages[j])
if similarity > 0.7: # Arbitrary threshold
loop_details.append({
"first_occurrence": i,
"second_occurrence": j,
"similarity": similarity,
"snippet": messages[i][:100] + "..."
})
return len(loop_details) > 0, loop_details
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
text1 = re.sub(r'\s+', ' ', text1.lower()).strip()
text2 = re.sub(r'\s+', ' ', text2.lower()).strip()
# Simple Jaccard similarity on word sets
words1 = set(text1.split())
words2 = set(text2.split())
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
return intersection / union if union > 0 else 0.0
def _analyze_reasoning_patterns(self, llm_calls: List[Dict]) -> Dict[str, Any]:
call_lengths = []
response_times = []
for call in llm_calls:
content = call.get("response", "")
if isinstance(content, str):
call_lengths.append(len(content))
elif isinstance(content, list) and len(content) > 0:
# Handle message list format
total_length = 0
for msg in content:
if isinstance(msg, dict) and "content" in msg:
total_length += len(msg["content"])
call_lengths.append(total_length)
start_time = call.get("start_time")
end_time = call.get("end_time")
if start_time and end_time:
try:
response_times.append(end_time - start_time)
except Exception:
pass
avg_length = np.mean(call_lengths) if call_lengths else 0
std_length = np.std(call_lengths) if call_lengths else 0
length_trend = self._calculate_trend(call_lengths)
primary_pattern = ReasoningPatternType.EFFICIENT
details = "Agent demonstrates efficient reasoning patterns."
loop_score = self._calculate_loop_likelihood(call_lengths, response_times)
if loop_score > 0.7:
primary_pattern = ReasoningPatternType.LOOP
details = "Agent appears to be stuck in repetitive thinking patterns."
elif avg_length > 1000 and std_length / avg_length < 0.3:
primary_pattern = ReasoningPatternType.VERBOSE
details = "Agent is consistently verbose across interactions."
elif len(llm_calls) > 10 and length_trend > 0.5:
primary_pattern = ReasoningPatternType.INDECISIVE
details = "Agent shows signs of indecisiveness with increasing message lengths."
elif std_length / avg_length > 0.8:
primary_pattern = ReasoningPatternType.SCATTERED
details = "Agent shows inconsistent reasoning flow with highly variable responses."
return {
"primary_pattern": primary_pattern,
"details": details,
"metrics": {
"avg_length": avg_length,
"std_length": std_length,
"length_trend": length_trend,
"loop_score": loop_score
}
}
def _calculate_trend(self, values: Sequence[float | int]) -> float:
if not values or len(values) < 2:
return 0.0
try:
x = np.arange(len(values))
y = np.array(values)
# Simple linear regression
slope = np.polyfit(x, y, 1)[0]
# Normalize slope to -1 to 1 range
max_possible_slope = max(values) - min(values)
if max_possible_slope > 0:
normalized_slope = slope / max_possible_slope
return max(min(normalized_slope, 1.0), -1.0)
return 0.0
except Exception:
return 0.0
def _calculate_loop_likelihood(self, call_lengths: Sequence[float], response_times: Sequence[float]) -> float:
if not call_lengths or len(call_lengths) < 3:
return 0.0
indicators = []
if len(call_lengths) >= 4:
repeated_lengths = 0
for i in range(len(call_lengths) - 2):
ratio = call_lengths[i] / call_lengths[i + 2] if call_lengths[i + 2] > 0 else 0
if 0.85 <= ratio <= 1.15:
repeated_lengths += 1
length_repetition_score = repeated_lengths / (len(call_lengths) - 2)
indicators.append(length_repetition_score)
if response_times and len(response_times) >= 3:
try:
std_time = np.std(response_times)
mean_time = np.mean(response_times)
if mean_time > 0:
time_consistency = 1.0 - (std_time / mean_time)
indicators.append(max(0, time_consistency - 0.3) * 1.5)
except Exception:
pass
return np.mean(indicators) if indicators else 0.0
def _get_call_samples(self, llm_calls: List[Dict]) -> str:
samples = []
if len(llm_calls) <= 6:
sample_indices = list(range(len(llm_calls)))
else:
sample_indices = [0, 1, len(llm_calls) // 2 - 1, len(llm_calls) // 2,
len(llm_calls) - 2, len(llm_calls) - 1]
for idx in sample_indices:
call = llm_calls[idx]
content = call.get("response", "")
if isinstance(content, str):
sample = content
elif isinstance(content, list) and len(content) > 0:
sample_parts = []
for msg in content:
if isinstance(msg, dict) and "content" in msg:
sample_parts.append(msg["content"])
sample = "\n".join(sample_parts)
else:
sample = str(content)
truncated = sample[:200] + "..." if len(sample) > 200 else sample
samples.append(f"Call {idx + 1}:\n{truncated}\n")
return "\n".join(samples)

View File

@@ -1,68 +0,0 @@
from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
class SemanticQualityEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.SEMANTIC_QUALITY
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
Score the semantic quality on a scale from 0-10 where:
- 0: Completely incoherent, confusing, or logically flawed output
- 5: Moderately clear and logical output with some issues
- 10: Exceptionally clear, coherent, and logically sound output
Consider:
1. Is the output well-structured and organized?
2. Is the reasoning logical and well-supported?
3. Is the language clear, precise, and appropriate for the task?
4. Are claims supported by evidence when appropriate?
5. Is the output free from contradictions and logical fallacies?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Agent's final output:
{final_output}
Evaluate the semantic quality and reasoning of this output.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore(
score=float(evaluation_data["score"]) if evaluation_data.get("score") is not None else None,
feedback=evaluation_data.get("feedback", response),
raw_response=response
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)

View File

@@ -1,410 +0,0 @@
import json
from typing import Dict, Any
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.agent import Agent
from crewai.task import Task
class ToolSelectionEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.TOOL_SELECTION
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses])
if tool_count == 0:
if not agent.tools:
return EvaluationScore(
score=None,
feedback="Agent had no tools available to use."
)
else:
return EvaluationScore(
score=None,
feedback="Agent had tools available but didn't use any."
)
available_tools_info = ""
if agent.tools:
for tool in agent.tools:
available_tools_info += f"- {tool.name}: {tool.description}\n"
else:
available_tools_info = "No tools available"
tool_types_summary = "Tools selected by the agent:\n"
for tool_type in sorted(unique_tool_types):
tool_types_summary += f"- {tool_type}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing if an AI agent selected the most appropriate tools for a given task.
You must evaluate based on these 2 criteria:
1. Relevance (0-10): Were the tools chosen directly aligned with the task's goals?
2. Coverage (0-10): Did the agent select ALL appropriate tools from the AVAILABLE tools?
IMPORTANT:
- ONLY consider tools that are listed as available to the agent
- DO NOT suggest tools that aren't in the 'Available tools' list
- DO NOT evaluate the quality or accuracy of tool outputs/results
- DO NOT evaluate how many times each tool was used
- DO NOT evaluate how the agent used the parameters
- DO NOT evaluate whether the agent interpreted the task correctly
Focus ONLY on whether the correct CATEGORIES of tools were selected from what was available.
Return your evaluation as JSON with these fields:
- scores: {"relevance": number, "coverage": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on tool selection decisions from available tools)
- improvement_suggestions: string (ONLY suggest better selection from the AVAILABLE tools list, NOT new tools)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Available tools for this agent:
{available_tools_info}
{tool_types_summary}
Based ONLY on the task description and comparing the AVAILABLE tools with those that were selected (listed above), evaluate if the agent selected the appropriate tool types for this task.
IMPORTANT:
- ONLY evaluate selection from tools listed as available
- DO NOT suggest new tools that aren't in the available tools list
- DO NOT evaluate tool usage or results
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
relevance = scores.get("relevance", 5.0)
coverage = scores.get("coverage", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Tool Selection Evaluation:\n"
feedback += f"• Relevance: {relevance}/10 - Selection of appropriate tool types for the task\n"
feedback += f"• Coverage: {coverage}/10 - Selection of all necessary tool types\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool selection: {e}",
raw_response=response
)
class ParameterExtractionEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.PARAMETER_EXTRACTION
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate parameter extraction."
)
validation_errors = []
for tool_use in tool_uses:
if not tool_use.get("success", True) and tool_use.get("error_type") == "validation_error":
validation_errors.append({
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"args": tool_use.get("args", {})
})
validation_error_rate = len(validation_errors) / tool_count if tool_count > 0 else 0
param_samples = []
for i, tool_use in enumerate(tool_uses[:5]):
tool_name = tool_use.get("tool", "Unknown tool")
tool_args = tool_use.get("args", {})
success = tool_use.get("success", True) and not tool_use.get("error", False)
error_type = tool_use.get("error_type", "") if not success else ""
is_validation_error = error_type == "validation_error"
sample = f"Tool use #{i+1} - {tool_name}:\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}"
if is_validation_error:
sample += " (PARAMETER VALIDATION ERROR)\n"
sample += f"- Error: {tool_use.get('result', 'Unknown error')}"
elif not success:
sample += f" (Other error: {error_type})\n"
param_samples.append(sample)
validation_errors_info = ""
if validation_errors:
validation_errors_info = f"\nParameter validation errors detected: {len(validation_errors)} ({validation_error_rate:.1%} of tool uses)\n"
for i, err in enumerate(validation_errors[:3]):
tool_name = err.get("tool", "Unknown tool")
error_msg = err.get("error", "Unknown error")
args = err.get("args", {})
validation_errors_info += f"\nValidation Error #{i+1}:\n- Tool: {tool_name}\n- Args: {json.dumps(args, indent=2)}\n- Error: {error_msg}"
if len(validation_errors) > 3:
validation_errors_info += f"\n...and {len(validation_errors) - 3} more validation errors."
param_samples_text = "\n\n".join(param_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent extracts and formats PARAMETER VALUES for tool calls.
Your job is to evaluate ONLY whether the agent used the correct parameter VALUES, not whether the right tools were selected or how the tools were invoked.
Evaluate parameter extraction based on these criteria:
1. Accuracy (0-10): Are parameter values correctly identified from the context/task?
2. Formatting (0-10): Are values formatted correctly for each tool's requirements?
3. Completeness (0-10): Are all required parameter values provided, with no missing information?
IMPORTANT: DO NOT evaluate:
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
- How the tools were structurally invoked (that's the ToolInvocationEvaluator's job)
- The quality of results from tools
Focus ONLY on the PARAMETER VALUES - whether they were correctly extracted from the context, properly formatted, and complete.
Validation errors are important signals that parameter values weren't properly extracted or formatted.
Return your evaluation as JSON with these fields:
- scores: {"accuracy": number, "formatting": number, "completeness": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on parameter value extraction quality)
- improvement_suggestions: string (concrete suggestions for better parameter VALUE extraction)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Parameter extraction examples:
{param_samples_text}
{validation_errors_info}
Evaluate the quality of the agent's parameter extraction for this task.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
accuracy = scores.get("accuracy", 5.0)
formatting = scores.get("formatting", 5.0)
completeness = scores.get("completeness", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Parameter Extraction Evaluation:\n"
feedback += f"• Accuracy: {accuracy}/10 - Correctly identifying required parameters\n"
feedback += f"• Formatting: {formatting}/10 - Properly formatting parameters for tools\n"
feedback += f"• Completeness: {completeness}/10 - Including all necessary information\n\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating parameter extraction: {e}",
raw_response=response
)
class ToolInvocationEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.TOOL_INVOCATION
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_errors = []
tool_count = len(tool_uses)
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate tool invocation."
)
for tool_use in tool_uses:
if not tool_use.get("success", True) or tool_use.get("error", False):
error_info = {
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"error_type": tool_use.get("error_type", "unknown_error")
}
tool_errors.append(error_info)
error_rate = len(tool_errors) / tool_count if tool_count > 0 else 0
error_types = {}
for error in tool_errors:
error_type = error.get("error_type", "unknown_error")
if error_type not in error_types:
error_types[error_type] = 0
error_types[error_type] += 1
invocation_samples = []
for i, tool_use in enumerate(tool_uses[:5]):
tool_name = tool_use.get("tool", "Unknown tool")
tool_args = tool_use.get("args", {})
success = tool_use.get("success", True) and not tool_use.get("error", False)
error_type = tool_use.get("error_type", "") if not success else ""
error_msg = tool_use.get("result", "No error") if not success else "No error"
sample = f"Tool invocation #{i+1}:\n"
sample += f"- Tool: {tool_name}\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}\n"
if not success:
sample += f"- Error type: {error_type}\n"
sample += f"- Error: {error_msg}"
invocation_samples.append(sample)
error_type_summary = ""
if error_types:
error_type_summary = "Error type breakdown:\n"
for error_type, count in error_types.items():
error_type_summary += f"- {error_type}: {count} occurrences ({(count/tool_count):.1%})\n"
invocation_samples_text = "\n\n".join(invocation_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how correctly an AI agent's tool invocations are STRUCTURED.
Your job is to evaluate ONLY the structural and syntactical aspects of how the agent called tools, NOT which tools were selected or what parameter values were used.
Evaluate the agent's tool invocation based on these criteria:
1. Structure (0-10): Does the tool call follow the expected syntax and format?
2. Error Handling (0-10): Does the agent handle tool errors appropriately?
3. Invocation Patterns (0-10): Are tool calls properly sequenced, batched, or managed?
Error types that indicate invocation issues:
- execution_error: The tool was called correctly but failed during execution
- usage_error: General errors in how the tool was used structurally
IMPORTANT: DO NOT evaluate:
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
- Whether the parameter values are correct (that's the ParameterExtractionEvaluator's job)
- The quality of results from tools
Focus ONLY on HOW tools were invoked - the structure, format, and handling of the invocation process.
Return your evaluation as JSON with these fields:
- scores: {"structure": number, "error_handling": number, "invocation_patterns": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on structural aspects of tool invocation)
- improvement_suggestions: string (concrete suggestions for better structuring of tool calls)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Tool invocation examples:
{invocation_samples_text}
Tool error rate: {error_rate:.2%} ({len(tool_errors)} errors out of {tool_count} invocations)
{error_type_summary}
Evaluate the quality of the agent's tool invocation structure during this task.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
structure = scores.get("structure", 5.0)
error_handling = scores.get("error_handling", 5.0)
invocation_patterns = scores.get("invocation_patterns", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Tool Invocation Evaluation:\n"
feedback += f"• Structure: {structure}/10 - Following proper syntax and format\n"
feedback += f"• Error Handling: {error_handling}/10 - Appropriately handling tool errors\n"
feedback += f"• Invocation Patterns: {invocation_patterns}/10 - Proper sequencing and management of calls\n\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool invocation: {e}",
raw_response=response
)

View File

@@ -1,52 +0,0 @@
import inspect
from typing_extensions import Any
import warnings
from crewai.experimental.evaluation.experiment import ExperimentResults, ExperimentRunner
from crewai import Crew, Agent
def assert_experiment_successfully(experiment_results: ExperimentResults, baseline_filepath: str | None = None) -> None:
failed_tests = [result for result in experiment_results.results if not result.passed]
if failed_tests:
detailed_failures: list[str] = []
for result in failed_tests:
expected = result.expected_score
actual = result.score
detailed_failures.append(f"- {result.identifier}: expected {expected}, got {actual}")
failure_details = "\n".join(detailed_failures)
raise AssertionError(f"The following test cases failed:\n{failure_details}")
baseline_filepath = baseline_filepath or _get_baseline_filepath_fallback()
comparison = experiment_results.compare_with_baseline(baseline_filepath=baseline_filepath)
assert_experiment_no_regression(comparison)
def assert_experiment_no_regression(comparison_result: dict[str, list[str]]) -> None:
regressed = comparison_result.get("regressed", [])
if regressed:
raise AssertionError(f"Regression detected! The following tests that previously passed now fail: {regressed}")
missing_tests = comparison_result.get("missing_tests", [])
if missing_tests:
warnings.warn(
f"Warning: {len(missing_tests)} tests from the baseline are missing in the current run: {missing_tests}",
UserWarning
)
def run_experiment(dataset: list[dict[str, Any]], crew: Crew | None = None, agents: list[Agent] | None = None, verbose: bool = False) -> ExperimentResults:
runner = ExperimentRunner(dataset=dataset)
return runner.run(agents=agents, crew=crew, print_summary=verbose)
def _get_baseline_filepath_fallback() -> str:
test_func_name = "experiment_fallback"
try:
current_frame = inspect.currentframe()
if current_frame is not None:
test_func_name = current_frame.f_back.f_back.f_code.co_name # type: ignore[union-attr]
except Exception:
...
return f"{test_func_name}_results.json"

View File

@@ -436,7 +436,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
_routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
name: Optional[str] = None
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
@@ -474,7 +473,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self.name or self.__class__.__name__,
flow_name=self.__class__.__name__,
),
)
@@ -770,7 +769,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
flow_name=self.__class__.__name__,
inputs=inputs,
),
)
@@ -793,7 +792,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
flow_name=self.__class__.__name__,
result=final_output,
),
)
@@ -835,7 +834,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
@@ -857,7 +856,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self.__class__.__name__,
state=self._copy_state(),
result=result,
),
@@ -870,7 +869,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
flow_name=self.__class__.__name__,
error=e,
),
)
@@ -1077,7 +1076,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self.name or self.__class__.__name__,
flow_name=self.__class__.__name__,
),
)
plot_flow(self, filename)

View File

@@ -0,0 +1,55 @@
from abc import ABC, abstractmethod
from typing import List
import numpy as np
class BaseEmbedder(ABC):
"""
Abstract base class for text embedding models
"""
@abstractmethod
def embed_chunks(self, chunks: List[str]) -> np.ndarray:
"""
Generate embeddings for a list of text chunks
Args:
chunks: List of text chunks to embed
Returns:
Array of embeddings
"""
pass
@abstractmethod
def embed_texts(self, texts: List[str]) -> np.ndarray:
"""
Generate embeddings for a list of texts
Args:
texts: List of texts to embed
Returns:
Array of embeddings
"""
pass
@abstractmethod
def embed_text(self, text: str) -> np.ndarray:
"""
Generate embedding for a single text
Args:
text: Text to embed
Returns:
Embedding array
"""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""Get the dimension of the embeddings"""
pass

View File

@@ -13,12 +13,11 @@ from chromadb.api.types import OneOrMany
from chromadb.config import Settings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
from crewai.utilities.paths import db_storage_path
from crewai.utilities.chromadb import create_persistent_client
@contextlib.contextmanager
@@ -85,11 +84,14 @@ class KnowledgeStorage(BaseKnowledgeStorage):
raise Exception("Collection not initialized")
def initialize_knowledge_storage(self):
self.app = create_persistent_client(
path=os.path.join(db_storage_path(), "knowledge"),
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try:
collection_name = (
f"knowledge_{self.collection_name}"
@@ -109,8 +111,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
def reset(self):
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = create_persistent_client(
path=base_path, settings=Settings(allow_reset=True)
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()

View File

@@ -28,7 +28,7 @@ from pydantic import (
InstanceOf,
PrivateAttr,
model_validator,
field_validator
field_validator,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -40,7 +40,7 @@ from crewai.agents.parser import (
OutputParserException,
)
from crewai.flow.flow_trackable import FlowTrackable
from crewai.llm import LLM, BaseLLM
from crewai.llm import LLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities import I18N
@@ -135,7 +135,7 @@ class LiteAgent(FlowTrackable, BaseModel):
role: str = Field(description="Role of the agent")
goal: str = Field(description="Goal of the agent")
backstory: str = Field(description="Backstory of the agent")
llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
default=None, description="Language model that will run the agent"
)
tools: List[BaseTool] = Field(
@@ -209,8 +209,8 @@ class LiteAgent(FlowTrackable, BaseModel):
def setup_llm(self):
"""Set up the LLM and other components after initialization."""
self.llm = create_llm(self.llm)
if not isinstance(self.llm, BaseLLM):
raise ValueError(f"Expected LLM instance of type BaseLLM, got {type(self.llm).__name__}")
if not isinstance(self.llm, LLM):
raise ValueError("Unable to create LLM instance")
# Initialize callbacks
token_callback = TokenCalcHandler(token_cost_process=self._token_process)
@@ -232,8 +232,7 @@ class LiteAgent(FlowTrackable, BaseModel):
elif isinstance(self.guardrail, str):
from crewai.tasks.llm_guardrail import LLMGuardrail
if not isinstance(self.llm, BaseLLM):
raise TypeError(f"Guardrail requires LLM instance of type BaseLLM, got {type(self.llm).__name__}")
assert isinstance(self.llm, LLM)
self._guardrail = LLMGuardrail(description=self.guardrail, llm=self.llm)
@@ -305,7 +304,6 @@ class LiteAgent(FlowTrackable, BaseModel):
"""
# Create agent info for event emission
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
@@ -539,7 +537,6 @@ class LiteAgent(FlowTrackable, BaseModel):
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(
messages=self._messages,
response=answer,
call_type=LLMCallType.LLM_CALL,
from_agent=self,
@@ -622,4 +619,4 @@ class LiteAgent(FlowTrackable, BaseModel):
def _append_message(self, text: str, role: str = "assistant") -> None:
"""Append a message to the message list with the given role."""
self._messages.append(format_message_for_llm(text, role=role))
self._messages.append(format_message_for_llm(text, role=role))

View File

@@ -59,7 +59,6 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
load_dotenv()
litellm.suppress_debug_info = True
class FilteredStream(io.TextIOBase):
_lock = None
@@ -77,7 +76,9 @@ class FilteredStream(io.TextIOBase):
# Skip common noisy LiteLLM banners and any other lines that contain "litellm"
if (
"litellm.info:" in lower_s
"give feedback / get help" in lower_s
or "litellm.info:" in lower_s
or "litellm" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy" in lower_s
):
return 0
@@ -507,6 +508,7 @@ class LLM(BaseLLM):
# Enable tool calls using streaming
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
if tool_calls:
result = self._handle_streaming_tool_calls(
tool_calls=tool_calls,
@@ -515,7 +517,6 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
)
if result is not None:
chunk_content = result
@@ -630,7 +631,7 @@ class LLM(BaseLLM):
# Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# Emit completion event and return response
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
return full_response
# --- 9) Handle tool calls if present
@@ -642,7 +643,7 @@ class LLM(BaseLLM):
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# --- 11) Emit completion event and return response
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
return full_response
except ContextWindowExceededError as e:
@@ -654,7 +655,7 @@ class LLM(BaseLLM):
logging.error(f"Error in streaming response: {str(e)}")
if full_response.strip():
logging.warning(f"Returning partial response despite error: {str(e)}")
self._handle_emit_call_events(response=full_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
return full_response
# Emit failed event and re-raise the exception
@@ -759,7 +760,7 @@ class LLM(BaseLLM):
available_functions: Optional[Dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> str | Any:
) -> str:
"""Handle a non-streaming response from the LLM.
Args:
@@ -783,11 +784,13 @@ class LLM(BaseLLM):
# Convert litellm's context window error to our own exception type
# for consistent handling in the rest of the codebase
raise LLMContextLengthExceededException(str(e))
# --- 2) Extract response message and content
response_message = cast(Choices, cast(ModelResponse, response).choices)[
0
].message
text_response = response_message.content or ""
# --- 3) Handle callbacks with usage info
if callbacks and len(callbacks) > 0:
for callback in callbacks:
@@ -800,23 +803,22 @@ class LLM(BaseLLM):
start_time=0,
end_time=0,
)
# --- 4) Check for tool calls
tool_calls = getattr(response_message, "tool_calls", [])
# --- 5) If no tool calls or no available functions, return the text response directly as long as there is a text response
if (not tool_calls or not available_functions) and text_response:
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
# --- 5) If no tool calls or no available functions, return the text response directly
if not tool_calls or not available_functions:
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
return text_response
# --- 6) If there is no text response, no available functions, but there are tool calls, return the tool calls
elif tool_calls and not available_functions and not text_response:
return tool_calls
# --- 7) Handle tool calls if present
# --- 6) Handle tool calls if present
tool_result = self._handle_tool_call(tool_calls, available_functions)
if tool_result is not None:
return tool_result
# --- 8) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
# --- 7) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
return text_response
def _handle_tool_call(
@@ -859,7 +861,6 @@ class LLM(BaseLLM):
tool_args=function_args,
),
)
result = fn(**function_args)
crewai_event_bus.emit(
self,
@@ -873,7 +874,7 @@ class LLM(BaseLLM):
)
# --- 3.3) Emit success event
self._handle_emit_call_events(response=result, call_type=LLMCallType.TOOL_CALL)
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
return result
except Exception as e:
# --- 3.4) Handle execution errors
@@ -950,18 +951,22 @@ class LLM(BaseLLM):
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
message["role"] = "assistant"
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
return self._handle_streaming_response(
@@ -978,48 +983,25 @@ class LLM(BaseLLM):
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(e) and "'stop'" in str(e)
if unsupported_stop:
if "additional_drop_params" in self.additional_params and isinstance(self.additional_params["additional_drop_params"], list):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info(
"Retrying LLM call without the unsupported 'stop'"
)
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e), from_task=from_task, from_agent=from_agent),
)
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None, messages: str | list[dict[str, Any]] | None = None):
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None):
"""Handle the events for the LLM call.
Args:
response (str): The response from the LLM call.
call_type (str): The type of call, either "tool_call" or "llm_call".
from_task: Optional task object
from_agent: Optional agent object
messages: Optional messages object
"""
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(messages=messages, response=response, call_type=call_type, from_task=from_task, from_agent=from_agent),
event=LLMCallCompletedEvent(response=response, call_type=call_type, from_task=from_task, from_agent=from_agent),
)
def _format_messages_for_provider(
@@ -1072,15 +1054,6 @@ class LLM(BaseLLM):
messages.append({"role": "user", "content": "Please continue."})
return messages
# TODO: Remove this code after merging PR https://github.com/BerriAI/litellm/pull/10917
# Ollama doesn't supports last message to be 'assistant'
if "ollama" in self.model.lower() and messages and messages[-1]["role"] == "assistant":
messages = messages.copy()
messages.append(
{"role": "user", "content": ""}
)
return messages
# Handle Anthropic models
if not self.is_anthropic:
return messages

View File

@@ -108,7 +108,6 @@ class ContextualMemory:
def _fetch_user_context(self, query: str) -> str:
"""
DEPRECATED: Will be removed in version 0.156.0 or on 2025-08-04, whichever comes first.
Fetches and formats relevant user information from User Memory.
Args:
query (str): The search query to find relevant user memories.

View File

@@ -4,6 +4,7 @@ from typing import Any, Dict, List
from mem0 import Memory, MemoryClient
from crewai.memory.storage.interface import Storage
from crewai.utilities.chromadb import sanitize_collection_name
MAX_AGENT_ID_LENGTH_MEM0 = 255
@@ -12,150 +13,135 @@ class Mem0Storage(Storage):
"""
Extends Storage to handle embedding and searching across entities using Mem0.
"""
def __init__(self, type, crew=None, config=None):
super().__init__()
self._validate_type(type)
self.memory_type = type
self.crew = crew
# TODO: Memory config will be removed in the future the config will be passed as a parameter
self.config = config or getattr(crew, "memory_config", {}).get("config", {}) or {}
self._validate_user_id()
self._extract_config_values()
self._initialize_memory()
def _validate_type(self, type):
supported_types = {"user", "short_term", "long_term", "entities", "external"}
supported_types = ["user", "short_term", "long_term", "entities", "external"]
if type not in supported_types:
raise ValueError(
f"Invalid type '{type}' for Mem0Storage. Must be one of: {', '.join(supported_types)}"
f"Invalid type '{type}' for Mem0Storage. Must be one of: "
+ ", ".join(supported_types)
)
def _validate_user_id(self):
if self.memory_type == "user" and not self.config.get("user_id", ""):
self.memory_type = type
self.crew = crew
self.config = config or {}
# TODO: Memory config will be removed in the future the config will be passed as a parameter
self.memory_config = self.config or getattr(crew, "memory_config", {}) or {}
# User ID is required for user memory type "user" since it's used as a unique identifier for the user.
user_id = self._get_user_id()
if type == "user" and not user_id:
raise ValueError("User ID is required for user memory type")
def _extract_config_values(self):
cfg = self.config
self.mem0_run_id = cfg.get("run_id")
self.includes = cfg.get("includes")
self.excludes = cfg.get("excludes")
self.custom_categories = cfg.get("custom_categories")
self.infer = cfg.get("infer", True)
# API key in memory config overrides the environment variable
config = self._get_config()
mem0_api_key = config.get("api_key") or os.getenv("MEM0_API_KEY")
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
mem0_local_config = config.get("local_mem0_config")
def _initialize_memory(self):
api_key = self.config.get("api_key") or os.getenv("MEM0_API_KEY")
org_id = self.config.get("org_id")
project_id = self.config.get("project_id")
local_config = self.config.get("local_mem0_config")
if api_key:
self.memory = (
MemoryClient(api_key=api_key, org_id=org_id, project_id=project_id)
if org_id and project_id
else MemoryClient(api_key=api_key)
)
if self.custom_categories:
self.memory.update_project(custom_categories=self.custom_categories)
# Initialize MemoryClient or Memory based on the presence of the mem0_api_key
if mem0_api_key:
if mem0_org_id and mem0_project_id:
self.memory = MemoryClient(
api_key=mem0_api_key, org_id=mem0_org_id, project_id=mem0_project_id
)
else:
self.memory = MemoryClient(api_key=mem0_api_key)
else:
self.memory = (
Memory.from_config(local_config)
if local_config and len(local_config)
else Memory()
)
if mem0_local_config and len(mem0_local_config):
self.memory = Memory.from_config(mem0_local_config)
else:
self.memory = Memory()
def _create_filter_for_search(self):
def _sanitize_role(self, role: str) -> str:
"""
Returns:
dict: A filter dictionary containing AND conditions for querying data.
- Includes user_id if memory_type is 'external'.
- Includes run_id if memory_type is 'short_term' and mem0_run_id is present.
Sanitizes agent roles to ensure valid directory names.
"""
filter = {
"AND": []
}
# Add user_id condition if the memory type is external
if self.memory_type == "external":
filter["AND"].append({"user_id": self.config.get("user_id", "")})
# Add run_id condition if the memory type is short_term and a run ID is set
if self.memory_type == "short_term" and self.mem0_run_id:
filter["AND"].append({"run_id": self.mem0_run_id})
return filter
return role.replace("\n", "").replace(" ", "_").replace("/", "_")
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self.config.get("user_id", "")
assistant_message = [{"role" : "assistant","content" : value}]
base_metadata = {
"short_term": "short_term",
"long_term": "long_term",
"entities": "entity",
"external": "external"
}
# Shared base params
params: dict[str, Any] = {
"metadata": {"type": base_metadata[self.memory_type], **metadata},
"infer": self.infer
}
if self.memory_type == "external":
params["user_id"] = user_id
if params:
# MemoryClient-specific overrides
if isinstance(self.memory, MemoryClient):
params["includes"] = self.includes
params["excludes"] = self.excludes
params["output_format"] = "v1.1"
params["version"]="v2"
if self.memory_type == "short_term":
params["run_id"] = self.mem0_run_id
self.memory.add(assistant_message, **params)
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> List[Any]:
params = {
"query": query,
"limit": limit,
"version": "v2",
"output_format": "v1.1"
user_id = self._get_user_id()
agent_name = self._get_agent_name()
params = None
if self.memory_type == "short_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "short_term", **metadata},
}
if user_id := self.config.get("user_id", ""):
elif self.memory_type == "long_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "long_term", **metadata},
}
elif self.memory_type == "entities":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "entity", **metadata},
}
elif self.memory_type == "external":
params = {
"user_id": user_id,
"agent_id": agent_name,
"metadata": {"type": "external", **metadata},
}
if params:
if isinstance(self.memory, MemoryClient):
params["output_format"] = "v1.1"
self.memory.add(value, **params)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
params = {"query": query, "limit": limit, "output_format": "v1.1"}
if user_id := self._get_user_id():
params["user_id"] = user_id
memory_type_map = {
"short_term": {"type": "short_term"},
"long_term": {"type": "long_term"},
"entities": {"type": "entity"},
"external": {"type": "external"},
}
if self.memory_type in memory_type_map:
params["metadata"] = memory_type_map[self.memory_type]
if self.memory_type == "short_term":
params["run_id"] = self.mem0_run_id
agent_name = self._get_agent_name()
if self.memory_type == "short_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "short_term"}
elif self.memory_type == "long_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "long_term"}
elif self.memory_type == "entities":
params["agent_id"] = agent_name
params["metadata"] = {"type": "entity"}
elif self.memory_type == "external":
params["agent_id"] = agent_name
params["metadata"] = {"type": "external"}
# Discard the filters for now since we create the filters
# automatically when the crew is created.
params["filters"] = self._create_filter_for_search()
params['threshold'] = score_threshold
if isinstance(self.memory, Memory):
del params["metadata"], params["version"], params["run_id"], params['output_format']
del params["metadata"], params["output_format"]
results = self.memory.search(**params)
return [r for r in results["results"]]
return [r for r in results["results"] if r["score"] >= score_threshold]
def _get_user_id(self) -> str:
return self._get_config().get("user_id", "")
def _get_agent_name(self) -> str:
if not self.crew:
return ""
agents = self.crew.agents
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return sanitize_collection_name(name=agents,max_collection_length=MAX_AGENT_ID_LENGTH_MEM0)
def _get_config(self) -> Dict[str, Any]:
return self.config or getattr(self, "memory_config", {}).get("config", {}) or {}
def reset(self):
if self.memory:
self.memory.reset()

View File

@@ -4,12 +4,12 @@ import logging
import os
import shutil
import uuid
from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI
from crewai.rag.storage.base_rag_storage import BaseRAGStorage
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.chromadb import create_persistent_client
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
@@ -60,15 +60,17 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
self.app = create_persistent_client(
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
self.collection = self.app.get_or_create_collection(
name=self.type, embedding_function=self.embedder_config
)

View File

@@ -14,8 +14,7 @@ class UserMemory(Memory):
def __init__(self, crew=None):
warnings.warn(
"UserMemory is deprecated and will be removed in version 0.156.0 "
"or on 2025-08-04, whichever comes first. "
"UserMemory is deprecated and will be removed in a future version. "
"Please use ExternalMemory instead.",
DeprecationWarning,
stacklevel=2,

View File

@@ -1,16 +1,8 @@
import warnings
from typing import Any, Dict, Optional
class UserMemoryItem:
def __init__(self, data: Any, user: str, metadata: Optional[Dict[str, Any]] = None):
warnings.warn(
"UserMemoryItem is deprecated and will be removed in version 0.156.0 "
"or on 2025-08-04, whichever comes first. "
"Please use ExternalMemory instead.",
DeprecationWarning,
stacklevel=2,
)
self.data = data
self.user = user
self.metadata = metadata if metadata is not None else {}

View File

@@ -1 +0,0 @@
"""RAG (Retrieval-Augmented Generation) infrastructure for CrewAI."""

View File

@@ -1 +0,0 @@
"""Embedding components for RAG infrastructure."""

View File

@@ -1 +0,0 @@
"""Storage components for RAG infrastructure."""

View File

@@ -67,7 +67,6 @@ class Task(BaseModel):
description: Descriptive text detailing task's purpose and execution.
expected_output: Clear definition of expected task outcome.
output_file: File path for storing task output.
create_directory: Whether to create the directory for output_file if it doesn't exist.
output_json: Pydantic model for structuring JSON output.
output_pydantic: Pydantic model for task output.
security_config: Security configuration including fingerprinting.
@@ -116,10 +115,6 @@ class Task(BaseModel):
description="A file path to be used to create a file output.",
default=None,
)
create_directory: Optional[bool] = Field(
description="Whether to create the directory for output_file if it doesn't exist.",
default=True,
)
output: Optional[TaskOutput] = Field(
description="Task output, it's final result after being executed", default=None
)
@@ -758,10 +753,8 @@ Follow these guidelines:
resolved_path = Path(self.output_file).expanduser().resolve()
directory = resolved_path.parent
if self.create_directory and not directory.exists():
if not directory.exists():
directory.mkdir(parents=True, exist_ok=True)
elif not self.create_directory and not directory.exists():
raise RuntimeError(f"Directory {directory} does not exist and create_directory is False")
with resolved_path.open("w", encoding="utf-8") as file:
if isinstance(result, dict):

View File

@@ -1,9 +1,10 @@
from typing import Any, Tuple
from typing import Any, Optional, Tuple
from pydantic import BaseModel, Field
from crewai.agent import Agent, LiteAgentOutput
from crewai.llm import BaseLLM
from crewai.llm import LLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
@@ -31,11 +32,11 @@ class LLMGuardrail:
def __init__(
self,
description: str,
llm: BaseLLM,
llm: LLM,
):
self.description = description
self.llm: BaseLLM = llm
self.llm: LLM = llm
def _validate_output(self, task_output: TaskOutput) -> LiteAgentOutput:
agent = Agent(

View File

@@ -10,6 +10,7 @@ from .rpm_controller import RPMController
from .exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from .embedding_configurator import EmbeddingConfigurator
__all__ = [
"Converter",
@@ -23,4 +24,5 @@ __all__ = [
"RPMController",
"YamlParser",
"LLMContextLengthExceededException",
"EmbeddingConfigurator",
]

View File

@@ -157,6 +157,10 @@ def get_llm_response(
from_agent=from_agent,
)
except Exception as e:
printer.print(
content=f"Error during LLM call: {e}",
color="red",
)
raise e
if not answer:
printer.print(
@@ -228,17 +232,12 @@ def handle_unknown_error(printer: Any, exception: Exception) -> None:
printer: Printer instance for output
exception: The exception that occurred
"""
error_message = str(exception)
if "litellm" in error_message:
return
printer.print(
content="An unknown error occurred. Please check the details below.",
color="red",
)
printer.print(
content=f"Error details: {error_message}",
content=f"Error details: {exception}",
color="red",
)

View File

@@ -1,10 +1,6 @@
import re
import portalocker
from chromadb import PersistentClient
from hashlib import md5
from typing import Optional
MIN_COLLECTION_LENGTH = 3
MAX_COLLECTION_LENGTH = 63
DEFAULT_COLLECTION = "default_collection"
@@ -64,16 +60,3 @@ def sanitize_collection_name(name: Optional[str], max_collection_length: int = M
sanitized = sanitized[:-1] + "z"
return sanitized
def create_persistent_client(path: str, **kwargs):
"""
Creates a persistent client for ChromaDB with a lock file to prevent
concurrent creations. Works for both multi-threads and multi-processes
environments.
"""
lockfile = f"chromadb-{md5(path.encode(), usedforsecurity=False).hexdigest()}.lock"
with portalocker.Lock(lockfile):
client = PersistentClient(path=path, **kwargs)
return client

View File

@@ -155,7 +155,6 @@ class CrewEvaluator:
)
console = Console()
console.print("\n")
console.print(table)
def evaluate(self, task_output: TaskOutput):

View File

@@ -17,9 +17,6 @@ from .agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentEvaluationStartedEvent,
AgentEvaluationCompletedEvent,
AgentEvaluationFailedEvent,
)
from .task_events import (
TaskStartedEvent,
@@ -77,9 +74,6 @@ __all__ = [
"AgentExecutionStartedEvent",
"AgentExecutionCompletedEvent",
"AgentExecutionErrorEvent",
"AgentEvaluationStartedEvent",
"AgentEvaluationCompletedEvent",
"AgentEvaluationFailedEvent",
"TaskStartedEvent",
"TaskCompletedEvent",
"TaskFailedEvent",

View File

@@ -123,28 +123,3 @@ class AgentLogsExecutionEvent(BaseEvent):
type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True}
# Agent Eval events
class AgentEvaluationStartedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
type: str = "agent_evaluation_started"
class AgentEvaluationCompletedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
metric_category: Any
score: Any
type: str = "agent_evaluation_completed"
class AgentEvaluationFailedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
error: str
type: str = "agent_evaluation_failed"

View File

@@ -1,5 +1,6 @@
from datetime import datetime, timezone
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
@@ -8,7 +9,7 @@ from crewai.utilities.serialization import to_serializable
class BaseEvent(BaseModel):
"""Base class for all events"""
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
timestamp: datetime = Field(default_factory=datetime.now)
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"

View File

@@ -4,7 +4,6 @@ from .agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
)
from .crew_events import (
CrewKickoffCompletedEvent,
@@ -81,7 +80,6 @@ EventTypes = Union[
CrewTrainFailedEvent,
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionCompletedEvent,
TaskStartedEvent,
TaskCompletedEvent,
TaskFailedEvent,

View File

@@ -48,8 +48,8 @@ class LLMCallStartedEvent(LLMEventBase):
"""
type: str = "llm_call_started"
messages: Optional[Union[str, List[Dict[str, Any]]]] = None
tools: Optional[List[dict[str, Any]]] = None
messages: Union[str, List[Dict[str, Any]]]
tools: Optional[List[dict]] = None
callbacks: Optional[List[Any]] = None
available_functions: Optional[Dict[str, Any]] = None
@@ -58,10 +58,10 @@ class LLMCallCompletedEvent(LLMEventBase):
"""Event emitted when a LLM call completes"""
type: str = "llm_call_completed"
messages: str | list[dict[str, Any]] | None = None
response: Any
call_type: LLMCallType
class LLMCallFailedEvent(LLMEventBase):
"""Event emitted when a LLM call fails"""

View File

@@ -1896,80 +1896,6 @@ def test_agent_with_knowledge_sources_generate_search_query():
assert "red" in result.raw.lower()
@pytest.mark.vcr(record_mode='none', filter_headers=["authorization"])
def test_agent_with_knowledge_with_no_crewai_knowledge():
mock_knowledge = MagicMock(spec=Knowledge)
agent = Agent(
role="Information Agent",
goal="Provide information based on knowledge sources",
backstory="You have access to specific knowledge sources.",
llm=LLM(model="openrouter/openai/gpt-4o-mini",api_key=os.getenv('OPENROUTER_API_KEY')),
knowledge=mock_knowledge
)
# Create a task that requires the agent to use the knowledge
task = Task(
description="What is Vidit's favorite color?",
expected_output="Vidit's favorclearite color.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
crew.kickoff()
mock_knowledge.query.assert_called_once()
@pytest.mark.vcr(record_mode='none', filter_headers=["authorization"])
def test_agent_with_only_crewai_knowledge():
mock_knowledge = MagicMock(spec=Knowledge)
agent = Agent(
role="Information Agent",
goal="Provide information based on knowledge sources",
backstory="You have access to specific knowledge sources.",
llm=LLM(model="openrouter/openai/gpt-4o-mini",api_key=os.getenv('OPENROUTER_API_KEY'))
)
# Create a task that requires the agent to use the knowledge
task = Task(
description="What is Vidit's favorite color?",
expected_output="Vidit's favorclearite color.",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task],knowledge=mock_knowledge)
crew.kickoff()
mock_knowledge.query.assert_called_once()
@pytest.mark.vcr(record_mode='none', filter_headers=["authorization"])
def test_agent_knowledege_with_crewai_knowledge():
crew_knowledge = MagicMock(spec=Knowledge)
agent_knowledge = MagicMock(spec=Knowledge)
agent = Agent(
role="Information Agent",
goal="Provide information based on knowledge sources",
backstory="You have access to specific knowledge sources.",
llm=LLM(model="openrouter/openai/gpt-4o-mini",api_key=os.getenv('OPENROUTER_API_KEY')),
knowledge=agent_knowledge
)
# Create a task that requires the agent to use the knowledge
task = Task(
description="What is Vidit's favorite color?",
expected_output="Vidit's favorclearite color.",
agent=agent,
)
crew = Crew(agents=[agent],tasks=[task],knowledge=crew_knowledge)
crew.kickoff()
agent_knowledge.query.assert_called_once()
crew_knowledge.query.assert_called_once()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_litellm_auth_error_handling():
"""Test that LiteLLM authentication errors are handled correctly and not retried."""
@@ -2010,6 +1936,7 @@ def test_crew_agent_executor_litellm_auth_error():
from litellm.exceptions import AuthenticationError
from crewai.agents.tools_handler import ToolsHandler
from crewai.utilities import Printer
# Create an agent and executor
agent = Agent(
@@ -2042,6 +1969,7 @@ def test_crew_agent_executor_litellm_auth_error():
# Mock the LLM call to raise AuthenticationError
with (
patch.object(LLM, "call") as mock_llm_call,
patch.object(Printer, "print") as mock_printer,
pytest.raises(AuthenticationError) as exc_info,
):
mock_llm_call.side_effect = AuthenticationError(
@@ -2055,6 +1983,13 @@ def test_crew_agent_executor_litellm_auth_error():
}
)
# Verify error handling messages
error_message = f"Error during LLM call: {str(mock_llm_call.side_effect)}"
mock_printer.assert_any_call(
content=error_message,
color="red",
)
# Verify the call was only made once (no retries)
mock_llm_call.assert_called_once()

View File

@@ -1,237 +0,0 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. An agent
created for testing purposes\nYour personal goal is: Complete test tasks successfully\n\nTo
give my best complete final answer to the task respond using the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
answer must be the great and the most complete as possible, it must be outcome
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
"content": "Complete this task successfully"}], "model": "gpt-4o-mini", "stop":
["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '583'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFNNb9swDL3nVxA6J0U+HKTNbd0woMAOw7Bu6LbCUCXa1iqLgkgnzYr8
98FKWqdbB+wiQHx81OMj9TgCUM6qNSjTaDFt9JNL+TZ7N/dfrusPN01NyV6vPk3f/mrl5vLrXI17
Bt39RCNPrDNDbfQojsIBNgm1YF91tlrOl+fzxXKWgZYs+p5WR5kUNGldcJP5dF5MpqvJ7PzIbsgZ
ZLWG7yMAgMd89jqDxQe1hun4KdIis65RrZ+TAFQi30eUZnYsOogaD6ChIBiy9M8NdXUja7iCQFsw
OkDtNgga6l4/6MBbTAA/wnsXtIc3+b6Gjx41I8REG2cRWoStkwakQeCIxlXOgEXRzjNQgvzigwBV
OUU038OOOgiIFhr0MdPHoIOFK9g67wEDdwlBCI7OIjgB7oxB5qrzfpeznxRokIZS3wwk5EiB8ey0
54RVx7r3PXTenwA6BBLdzy27fXtE9s/+eqpjojv+g6oqFxw3ZULNFHovWSiqjO5HALd5jt2L0aiY
qI1SCt1jfu7i4lBODdszgEVxBIVE+yE+KxbjV8qVR79PFkEZbRq0A3XYGt1ZRyfA6KTpv9W8VvvQ
uAv1/5QfAGMwCtoyJrTOvOx4SEvYf65/pT2bnAUrxrRxBktxmPpBWKx05w8rr3jHgm1ZuVBjiskd
9r6K5aLQy0LjxcKo0X70GwAA//8DAMz2wVUFBAAA
headers:
CF-RAY:
- 95f93ea9af627e0b-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 12:25:54 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=GRZmZLrjW5ZRHNmUJa4ccrMcy20D1rmeqK6Ptlv0mRY-1752582354-1.0.1.1-xKd_yga48Eedech5TRlThlEpDgsB2whxkWHlCyAGOVMqMcvH1Ju9FdXYbuQ9NdUQcVxPLgiGM35lYhqSLVQiXDyK01dnyp2Gvm560FBN9DY;
path=/; expires=Tue, 15-Jul-25 12:55:54 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=MYqswpSR7sqr4kGp6qZVkaL7HDYwMiww49PeN9QBP.A-1752582354973-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '4047'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '4440'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999885'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_5704c0f206a927ddc12aa1a19b612a75
status:
code: 200
message: OK
- request:
body: '{"messages": [{"role": "system", "content": "You are an expert evaluator
assessing how well an AI agent''s output aligns with its assigned task goal.\n\nScore
the agent''s goal alignment on a scale from 0-10 where:\n- 0: Complete misalignment,
agent did not understand or attempt the task goal\n- 5: Partial alignment, agent
attempted the task but missed key requirements\n- 10: Perfect alignment, agent
fully satisfied all task requirements\n\nConsider:\n1. Did the agent correctly
interpret the task goal?\n2. Did the final output directly address the requirements?\n3.
Did the agent focus on relevant aspects of the task?\n4. Did the agent provide
all requested information or deliverables?\n\nReturn your evaluation as JSON
with fields ''score'' (number) and ''feedback'' (string).\n"}, {"role": "user",
"content": "\nAgent role: Test Agent\nAgent goal: Complete test tasks successfully\n\n\nAgent''s
final output:\nPlease provide me with the specific details or context of the
task you need help with, and I will ensure to complete it successfully and provide
a thorough response.\n\nEvaluate how well the agent''s output aligns with the
assigned task goal.\n"}], "model": "gpt-4o-mini", "stop": []}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '1196'
content-type:
- application/json
cookie:
- __cf_bm=GRZmZLrjW5ZRHNmUJa4ccrMcy20D1rmeqK6Ptlv0mRY-1752582354-1.0.1.1-xKd_yga48Eedech5TRlThlEpDgsB2whxkWHlCyAGOVMqMcvH1Ju9FdXYbuQ9NdUQcVxPLgiGM35lYhqSLVQiXDyK01dnyp2Gvm560FBN9DY;
_cfuvid=MYqswpSR7sqr4kGp6qZVkaL7HDYwMiww49PeN9QBP.A-1752582354973-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAA4xUy27bQAy8+yuIPdtGbMdN4FvbSxM0QIsEKNA6MJhdSmK82hWWVFwj8L8XKz/k
9AH0ogOHnOFjVq8DAMPOLMDYCtXWjR990O+TT7dfZs/v5OtFy/ef7++mxfu7j83t/cONGeaK+PRM
Vo9VYxvrxpNyDHvYJkKlzDq5mk/n19PZfN4BdXTkc1nZ6OgyjmoOPJpeTC9HF1ejyfWhuopsScwC
fgwAAF67b+4zOPppFnAxPEZqEsGSzOKUBGBS9DliUIRFMagZ9qCNQSl0rb8uA8DSiI2JlmYB0+E+
UBC5J7TrHFuah4oASwoKjh2EqOCojkE0oRIgWE+YoA2OUhZzHEqIBWhFoChrKCP6IWwqthWwgEY4
bItASbRLEpDWWhIpWu+3Y7gJooRuCKyAsiYHRUxQx0TgSJG9DIGDY4ua5RA82nVW5cDKqPxCWYhC
iSXBhrU69TOGbxV7ysxSxY0Awoa951AGkq69/do67QLZk8vBJsUXdgQYtoBWW/SQSJoYpFPq2Ptp
MLjTttC51DFXVIPjRFb9drw0y7A7v0uiohXM3git92cAhhAVs7c6RzwekN3JAz6WTYpP8lupKTiw
VKtEKDHke4vGxnTobgDw2HmtfWMf06RYN7rSuKZObjo7eM30Fu/R6yOoUdH38dnkCLzhWx1ud+ZW
Y9FW5PrS3trYOo5nwOBs6j+7+Rv3fnIO5f/Q94C11Ci5VZPIsX07cZ+WKP8B/pV22nLXsBFKL2xp
pUwpX8JRga3fv0sjW1GqVwWHklKTuHuc+ZKD3eAXAAAA//8DADksFsafBAAA
headers:
CF-RAY:
- 95f93ec73a1c7e0b-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 12:25:57 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '1544'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '1546'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999732'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_44930ba12ad8d1e3f0beed1d5e3d8b0c
status:
code: 200
message: OK
version: 1

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,123 +0,0 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. An agent
created for testing purposes\nYour personal goal is: Complete test tasks successfully\nTo
give my best complete final answer to the task respond using the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
answer must be the great and the most complete as possible, it must be outcome
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
"content": "\nCurrent Task: Test task description\n\nThis is the expected criteria
for your final answer: Expected test output\nyou MUST return the actual complete
content as the final answer, not a summary.\n\nBegin! This is VERY important
to you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '879'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFTBbhtHDL3rK4g5rwRbtaNYt9RoEaNoUaBODm0DgZnh7jKe5WyHXDmO
4X8vZiRLcupDLwvsPPLxPQ45jzMAx8GtwfkezQ9jnP9oeLv98N5+vfl9+4v89Mf76+XV7XDz8Yc/
r39T15SM9PkLeXvOWvg0jJGMk+xgnwmNCuv56nJ5+XZ1tbqswJACxZLWjTa/SPOBhefLs+XF/Gw1
P3+7z+4Te1K3hr9mAACP9Vt0SqCvbg1nzfPJQKrYkVsfggBcTrGcOFRlNRRzzRH0SYykSr8BSffg
UaDjLQFCV2QDit5TBvhbfmbBCO/q/xpue1ZgBesJ6OtI3iiAkRqkycbJGrjv2ffgk5S6CqkFhECG
HClAIPWZx9Kkgtz3aJVq37vChXoH2qcpBogp3UHkO1rAbU/QViW7Os8hLD5OgQBjBCFfOpEfgKVN
ecBSpoFAQxK1jMbSgY+Y2R6aWjJTT6K8JSHVBlACYOgpk3gCS4DyADqS55YpQDdxoMhCuoCbgwKf
tpSB0PeAJdaKseKpOsn0z8SZBhJrgESnXERY8S0JRsxWulkoilkKkDJ0JJQx8jcKi13DX3pWyuWm
FPDQN8jU7mW3KRfdSaj2r5ZLMEmgXOYg7K5OlcQYI1Cs4vSFavSVmLWnsDgdnEztpFiGV6YYTwAU
SVYbXkf20x55OgxpTN2Y02f9LtW1LKz9JhNqkjKQaml0FX2aAXyqyzC9mG835jSMtrF0R7Xc+Zvz
HZ877uARvXqzBy0ZxuP58nLVvMK32Q2rnqyT8+h7CsfU4+7hFDidALMT1/9V8xr3zjlL93/oj4D3
NBqFzZgpsH/p+BiW6Utd0dfDDl2ugl2ZK/a0MaZcbiJQi1PcPRxOH9Ro2LQsHeUxc309yk3Onmb/
AgAA//8DAAbYfvVABQAA
headers:
CF-RAY:
- 95f9c7ffa8331b11-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 15 Jul 2025 13:59:38 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=J_xe1AP.B5P6D2GVMCesyioeS5E9DnYT34rbwQUefFc-1752587978-1.0.1.1-5Dflk5cAj6YCsOSVbCFWWSpXpw_mXsczIdzWzs2h2OwDL01HQbduE5LAToy67sfjFjHeeO4xRrqPLUQpySy2QqyHXbI_fzX4UAt3.UdwHxU;
path=/; expires=Tue, 15-Jul-25 14:29:38 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=0rTD8RMpxBQQy42jzmum16_eoRtWNfaZMG_TJkhGS7I-1752587978437-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '2623'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '2626'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999813'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_ccc347e91010713379c920aa0efd1f4f
status:
code: 200
message: OK
version: 1

View File

@@ -1,150 +0,0 @@
interactions:
- request:
body: '{"model": "openai/gpt-4o-mini", "messages": [{"role": "system", "content":
"Your goal is to rewrite the user query so that it is optimized for retrieval
from a vector database. Consider how the query will be used to find relevant
documents, and aim to make it more specific and context-aware. \n\n Do not include
any other text than the rewritten query, especially any preamble or postamble
and only add expected output format if its relevant to the rewritten query.
\n\n Focus on the key words of the intended task and to retrieve the most relevant
information. \n\n There will be some extra context provided that might need
to be removed such as expected_output formats structured_outputs and other instructions."},
{"role": "user", "content": "The original query is: What is Vidit''s favorite
color?\n\nThis is the expected criteria for your final answer: Vidit''s favorclearite
color.\nyou MUST return the actual complete content as the final answer, not
a summary.."}], "stream": false, "stop": ["\nObservation:"]}'
headers:
accept:
- '*/*'
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '1017'
content-type:
- application/json
host:
- openrouter.ai
http-referer:
- https://litellm.ai
user-agent:
- litellm/1.68.0
x-title:
- liteLLM
method: POST
uri: https://openrouter.ai/api/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//4lKAAS4AAAAA//90kE1vE0EMhv9K9V64TMrmgyadG8ceECAhhIrQarrj
3bidHY/GTgSK9r+jpUpaJLja78djn8ARHgPlxXK72a6X6+12szhq7Id72d2V8b58/nbzQb98gkOp
cuRIFR4fC+X3d3AYJVKChxTKgd8OxRYbWYycGQ7y8EidwaPbB7vuZCyJjCXDoasUjCL8S61Dtxfu
SOG/n5BkKFUeFD4fUnLoObPu20pBJcNDTQoccjA+UvufLedIP+Ebh5FUw0DwJ1RJBI+gymoh20wj
2SjPpF85sr3Rqz4cpbLRVSdJ6jUcKvUHDenM81zFeXgeTNMPB/2lRuMMM1Atlf8k9qVt1rer3WrV
3DZwOJw5SpWxWGvyRFnnR7ybQc4/usxvHEwspBfhbun+NreRLHDSObUL3Z7iRdxM/wh9rb/c8coy
Tb8BAAD//wMAqVt3JyMCAAA=
headers:
Access-Control-Allow-Origin:
- '*'
CF-RAY:
- 9402cb503aec46c0-BOM
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Thu, 15 May 2025 12:56:14 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
Vary:
- Accept-Encoding
x-clerk-auth-message:
- Invalid JWT form. A JWT consists of three parts separated by dots. (reason=token-invalid,
token-carrier=header)
x-clerk-auth-reason:
- token-invalid
x-clerk-auth-status:
- signed-out
status:
code: 200
message: OK
- request:
body: '{"model": "openai/gpt-4o-mini", "messages": [{"role": "system", "content":
"You are Information Agent. You have access to specific knowledge sources.\nYour
personal goal is: Provide information based on knowledge sources\nTo give my
best complete final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"}, {"role": "user", "content": "\nCurrent
Task: What is Vidit''s favorite color?\n\nThis is the expected criteria for
your final answer: Vidit''s favorclearite color.\nyou MUST return the actual
complete content as the final answer, not a summary.\n\nBegin! This is VERY
important to you, use the tools available and give your best Final Answer, your
job depends on it!\n\nThought:"}], "stream": false, "stop": ["\nObservation:"]}'
headers:
accept:
- '*/*'
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '951'
content-type:
- application/json
host:
- openrouter.ai
http-referer:
- https://litellm.ai
user-agent:
- litellm/1.68.0
x-title:
- liteLLM
method: POST
uri: https://openrouter.ai/api/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//4lKAAS4AAAAA///iQjABAAAA//90kE9rG0EMxb/K8C69jNON7WJ7boFS
CD2ENm2g/1jGs/Ja7aw0zIydBuPvXjbBcQrtUU9P0u/pAO7g0JNMLhfzxexytli8mdy8r7c6/3Lb
v13eff00088fPj7AImXdc0cZDjeJ5OoaFoN2FOGgicTz6z7VyVwnAwvDQtc/KVQ4hK2vF0GHFKmy
CixCJl+pgzuftQhb5UAF7tsBUfuUdV3gZBejxYaFy7bN5IsKHErVBAvxlffU/qfL0tFvuMZioFJ8
T3AHZI0EB18Kl+qljjQqlWQkvTai9yZ4MT3vyXjTj6DGS7mnbMx3ecfio7l6rJ25447rq2I2fq+Z
K5mgUbPhYtZxRxewyLTZFR9PMZ4IWfon4Xj8YVEeSqVhzNBTTpkfQTapbWar6XI6bVYNLHYn/JR1
SLWt+oukjP9rRv7Ta8/6yqJq9fGsLFf27+m2o+o5lnFt8GFL3bO5Of5j60v/c5AXI8fjHwAAAP//
AwDEkP8dZgIAAA==
headers:
Access-Control-Allow-Origin:
- '*'
CF-RAY:
- 9402cb55c9fe46c0-BOM
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Thu, 15 May 2025 12:56:15 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
Vary:
- Accept-Encoding
x-clerk-auth-message:
- Invalid JWT form. A JWT consists of three parts separated by dots. (reason=token-invalid,
token-carrier=header)
x-clerk-auth-reason:
- token-invalid
x-clerk-auth-status:
- signed-out
status:
code: 200
message: OK
version: 1

View File

@@ -1,151 +0,0 @@
interactions:
- request:
body: '{"model": "openai/gpt-4o-mini", "messages": [{"role": "system", "content":
"Your goal is to rewrite the user query so that it is optimized for retrieval
from a vector database. Consider how the query will be used to find relevant
documents, and aim to make it more specific and context-aware. \n\n Do not include
any other text than the rewritten query, especially any preamble or postamble
and only add expected output format if its relevant to the rewritten query.
\n\n Focus on the key words of the intended task and to retrieve the most relevant
information. \n\n There will be some extra context provided that might need
to be removed such as expected_output formats structured_outputs and other instructions."},
{"role": "user", "content": "The original query is: What is Vidit''s favorite
color?\n\nThis is the expected criteria for your final answer: Vidit''s favorclearite
color.\nyou MUST return the actual complete content as the final answer, not
a summary.."}], "stream": false, "stop": ["\nObservation:"]}'
headers:
accept:
- '*/*'
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '1017'
content-type:
- application/json
host:
- openrouter.ai
http-referer:
- https://litellm.ai
user-agent:
- litellm/1.68.0
x-title:
- liteLLM
method: POST
uri: https://openrouter.ai/api/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//4lKAAS4AAAAA//90kE1vE0EMhv9K9V64TGCbNGQ7N46gIg6IXhBaTWed
Xbez49HYiaii/e9oqRKKBFf7/XjsE7iHx0B5db272W2uN++b3ep585k+jcmo/XqnYXvX5m/3cChV
jtxThceXQvnDRzhM0lOChxTKgd8NxVY3spo4Mxzk4ZGiwSOOwd5GmUoiY8lwiJWCUQ9/qW0d4igc
SeG/n5BkKFUeFD4fUnLYc2Ydu0pBJcNDTQoccjA+UvefLeeefsI3DhOphoHgT6iSCB5BldVCtoVG
slFeSO+5Z3ujV/twlMpGV1GSVDhU2h80pDPOSxPn4WUwzz8c9FmNpoVloFoq/w7cl67Z3K7b9bq5
beBwOGOUKlOxzuSJsi5/2C4c5xdd5lsHEwvpj7Bt3N/mricLnHRJjSGO1F/EzfyP0Nf6yx2vLPP8
CwAA//8DAOHu/cIiAgAA
headers:
Access-Control-Allow-Origin:
- '*'
CF-RAY:
- 9402c73df9d8859c-BOM
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Thu, 15 May 2025 12:53:27 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
Vary:
- Accept-Encoding
x-clerk-auth-message:
- Invalid JWT form. A JWT consists of three parts separated by dots. (reason=token-invalid,
token-carrier=header)
x-clerk-auth-reason:
- token-invalid
x-clerk-auth-status:
- signed-out
status:
code: 200
message: OK
- request:
body: '{"model": "openai/gpt-4o-mini", "messages": [{"role": "system", "content":
"You are Information Agent. You have access to specific knowledge sources.\nYour
personal goal is: Provide information based on knowledge sources\nTo give my
best complete final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"}, {"role": "user", "content": "\nCurrent
Task: What is Vidit''s favorite color?\n\nThis is the expected criteria for
your final answer: Vidit''s favorclearite color.\nyou MUST return the actual
complete content as the final answer, not a summary.\n\nBegin! This is VERY
important to you, use the tools available and give your best Final Answer, your
job depends on it!\n\nThought:"}], "stream": false, "stop": ["\nObservation:"]}'
headers:
accept:
- '*/*'
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '951'
content-type:
- application/json
host:
- openrouter.ai
http-referer:
- https://litellm.ai
user-agent:
- litellm/1.68.0
x-title:
- liteLLM
method: POST
uri: https://openrouter.ai/api/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//4lKAAS4AAAAA///iQjABAAAA//90kUGPEzEMhf+K5QuXdJmlpbvkthIg
emFXQoIDoMpNPFNDJo6STLul6n9H09KyIDjmxc9+/rxH8Wix4zi5vpndTK+n8+Z2wo9vXj28fHff
vW4+PNT5j1l6/wkNpqwb8ZzR4n3ieLdAg716DmhRE0eS512qk5lOeomCBnX1jV1Fi25N9cppnwJX
0YgGXWaq7NH+HmvQrVUcF7Sf9xi0S1lXBW0cQjDYSpSyXmamohEtlqoJDUaqsuHlf34len5E2xjs
uRTqGO0eswZGi1SKlEqxjmk0Vo5j0gVE3YKjCJ1sGAi6MShQLFvOAF/iW4kU4O74tvBRvNRnBVra
aJbK4DRoBikQtcJWPIcdeHVDz7GyB4mQhlUQF3ZAG5JAq8BQdMiOi4GisBiHj+ZftIHA87hePeY5
5cjcUfYSO1hLgZLYSSvurxRXaDBzOxQKZ4gnPhK7k3A4fDVYdqVyPxLsOKcsRwxtWvoVOZo3vm3Q
4HCGl7L2qS6rfudYxus1I73zYS/69NZg1UrhorwYD/yHe+m5koQytnXk1uwvxc3hH12f1l8WeWI5
HH4CAAD//wMAhZKqO+QCAAA=
headers:
Access-Control-Allow-Origin:
- '*'
CF-RAY:
- 9402c7459f3f859c-BOM
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Thu, 15 May 2025 12:53:28 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
Vary:
- Accept-Encoding
x-clerk-auth-message:
- Invalid JWT form. A JWT consists of three parts separated by dots. (reason=token-invalid,
token-carrier=header)
x-clerk-auth-reason:
- token-invalid
x-clerk-auth-status:
- signed-out
status:
code: 200
message: OK
version: 1

View File

@@ -1,150 +0,0 @@
interactions:
- request:
body: '{"model": "openai/gpt-4o-mini", "messages": [{"role": "system", "content":
"Your goal is to rewrite the user query so that it is optimized for retrieval
from a vector database. Consider how the query will be used to find relevant
documents, and aim to make it more specific and context-aware. \n\n Do not include
any other text than the rewritten query, especially any preamble or postamble
and only add expected output format if its relevant to the rewritten query.
\n\n Focus on the key words of the intended task and to retrieve the most relevant
information. \n\n There will be some extra context provided that might need
to be removed such as expected_output formats structured_outputs and other instructions."},
{"role": "user", "content": "The original query is: What is Vidit''s favorite
color?\n\nThis is the expected criteria for your final answer: Vidit''s favorclearite
color.\nyou MUST return the actual complete content as the final answer, not
a summary.."}], "stream": false, "stop": ["\nObservation:"]}'
headers:
accept:
- '*/*'
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '1017'
content-type:
- application/json
host:
- openrouter.ai
http-referer:
- https://litellm.ai
user-agent:
- litellm/1.68.0
x-title:
- liteLLM
method: POST
uri: https://openrouter.ai/api/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//4lKAAS4AAAAA//90kE1PIzEMhv8Kei97Sdnplwq5gTgAF8ShcFitRmnG
nTFk4ihxq11V899Xs6gFJLja78djH8ANLFqKk+lqsZpP56vpYqJhublfP1eP65v1i79Lt9fdMwxS
lj03lGHxkChe3cGgl4YCLCRRdPyzTTpZyKTnyDCQzQt5hYXvnJ576VMgZYkw8JmcUgP7XmvgO2FP
BfbXAUHalGVTYOMuBIMtRy5dnckVibAoKgkG0Snvqf5my7GhP7CVQU+luJZgD8gSCBauFC7qoo40
EpXiSPrEDeuPcrZ1e8msdOYlSIZBpu2uuHDEeWvi2L4NhuG3QflblPqRpaWcMv8P3Ka6ml/OLmaz
6rKCwe6IkbL0SWuVV4pl/MNy5Di+6DRfGqioC+/Ci8p8NtcNqeNQxlTvfEfNSVwNX4R+1J/u+GAZ
hn8AAAD//wMAIwJ79CICAAA=
headers:
Access-Control-Allow-Origin:
- '*'
CF-RAY:
- 9402c9db99ec4722-BOM
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Thu, 15 May 2025 12:55:14 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
Vary:
- Accept-Encoding
x-clerk-auth-message:
- Invalid JWT form. A JWT consists of three parts separated by dots. (reason=token-invalid,
token-carrier=header)
x-clerk-auth-reason:
- token-invalid
x-clerk-auth-status:
- signed-out
status:
code: 200
message: OK
- request:
body: '{"model": "openai/gpt-4o-mini", "messages": [{"role": "system", "content":
"You are Information Agent. You have access to specific knowledge sources.\nYour
personal goal is: Provide information based on knowledge sources\nTo give my
best complete final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"}, {"role": "user", "content": "\nCurrent
Task: What is Vidit''s favorite color?\n\nThis is the expected criteria for
your final answer: Vidit''s favorclearite color.\nyou MUST return the actual
complete content as the final answer, not a summary.\n\nBegin! This is VERY
important to you, use the tools available and give your best Final Answer, your
job depends on it!\n\nThought:"}], "stream": false, "stop": ["\nObservation:"]}'
headers:
accept:
- '*/*'
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '951'
content-type:
- application/json
host:
- openrouter.ai
http-referer:
- https://litellm.ai
user-agent:
- litellm/1.68.0
x-title:
- liteLLM
method: POST
uri: https://openrouter.ai/api/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//4lKAAS4AAAAA///iQjABAAAA//90kN1qGzEQRl9FfNdyul4nday73ARy
VUpLE2jLIu+O15NoZ4QkOy1moa/R1+uTlE1wnEB7qU/zc84cwB0cepLZfHm+XMwXy/nF7II/3d7V
H+tOPvsS3le3d+keFjHpnjtKcPgQSa5uYDFoRwEOGkk8v+tjmZ3rbGBhWOj6ntoCh3bry1mrQwxU
WAUWbSJfqIM7rbVot8otZbivBwTtY9J1hpNdCBYbFs7bJpHPKnDIRSMsxBfeU/OfX5aOfsBVFgPl
7HuCOyBpIDj4nDkXL2WiUSkkE+mNEX00rRfT856MN/0EarzkR0rGfJNrFh/M1dPbmS/ccfnz63c2
G7/XxIVMq0GT4WzWYUdnsEi02WUfjiLPjCz9czCO3y3yz1xomCx6SjHxE8omNtViVV/WdbWqYLE7
CsSkQyxN0QeSPF2wmgyOxz3lK4uixYdTcrmyb7ubjornkKexrW+31L0UV+M/pr6ufxF51TKOfwEA
AP//AwBybekMaAIAAA==
headers:
Access-Control-Allow-Origin:
- '*'
CF-RAY:
- 9402c9e1b94a4722-BOM
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Thu, 15 May 2025 12:55:15 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
Vary:
- Accept-Encoding
x-clerk-auth-message:
- Invalid JWT form. A JWT consists of three parts separated by dots. (reason=token-invalid,
token-carrier=header)
x-clerk-auth-reason:
- token-invalid
x-clerk-auth-status:
- signed-out
status:
code: 200
message: OK
version: 1

View File

@@ -1,209 +0,0 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": "What is the capital of France?"}],
"model": "o1-mini", "stop": ["stop"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '115'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.75.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.75.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"error\": {\n \"message\": \"Unsupported parameter: 'stop'
is not supported with this model.\",\n \"type\": \"invalid_request_error\",\n
\ \"param\": \"stop\",\n \"code\": \"unsupported_parameter\"\n }\n}"
headers:
CF-RAY:
- 961215744c94cb45-GIG
Connection:
- keep-alive
Content-Length:
- '196'
Content-Type:
- application/json
Date:
- Fri, 18 Jul 2025 12:46:46 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=KwJ1K47OHX4n2TZN8bMW37yKzKyK__S4HbTiCfyWjXM-1752842806-1.0.1.1-lweHFR7Kv2v7hT5I6xxYVz_7Ruu6aBdEgpJrSWrMxi_ficAeWC0oDeQ.0w2Lr1WRejIjqqcwSgdl6RixF2qEkjJZfS0pz_Vjjqexe44ayp4;
path=/; expires=Fri, 18-Jul-25 13:16:46 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=zv09c6bwcgNsYU80ah3wXzqeaIKyt_h61EAh_XRA87I-1752842806652-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '20'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '32'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999990'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_7be4715c3ee32aa406eacb68c7cc966e
status:
code: 400
message: Bad Request
- request:
body: '{"messages": [{"role": "user", "content": "What is the capital of France?"}],
"model": "o1-mini"}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '97'
content-type:
- application/json
cookie:
- __cf_bm=KwJ1K47OHX4n2TZN8bMW37yKzKyK__S4HbTiCfyWjXM-1752842806-1.0.1.1-lweHFR7Kv2v7hT5I6xxYVz_7Ruu6aBdEgpJrSWrMxi_ficAeWC0oDeQ.0w2Lr1WRejIjqqcwSgdl6RixF2qEkjJZfS0pz_Vjjqexe44ayp4;
_cfuvid=zv09c6bwcgNsYU80ah3wXzqeaIKyt_h61EAh_XRA87I-1752842806652-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.75.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.75.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAA3RSwU7jMBC95ytGPlYNakJhQ2/sgSsg7QUhFA32pJni2JHtwFao/76yC3XQwsWH
efOe35uZ9wJAsBIbELLHIIdRl78nGvaqOt/dPDxf71/fdg/9bXO3e5ETXt+LZWTY5x3J8Mk6k3YY
NQW25ghLRxgoqla/LupmXTeXqwQMVpGONFuVAxsu61W9LldXZVV/MHvLkrzYwGMBAPCe3ujRKPor
NpB0UmUg73FLYnNqAhDO6lgR6D37gCaIZQalNYFMsv2nJ5A4ckANtoMbh0YSsIfF4g4d+8XibM50
1E0eo3MzaT0D0BgbMCZPnp8+kMPJZceGfd86Qm9N/NkHO4qEHgqAp5R6+hJEjM4OY2iDfaEkW62P
ciLPOYPNJxhsQJ3rV83yG7VWUUDWfjY1IVH2pDIzjxgnxXYGFLNs/5v5TvuYm802q1yuf9TPgJQ0
BlLt6Eix/Jo4tzmKZ/hT22nIybHw5F5ZUhuYXFyEog4nfTwQ4fc+0NB2bLbkRsfpSuKui0PxDwAA
//8DAN7IUy8kAwAA
headers:
CF-RAY:
- 961216c3f9837e07-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Fri, 18 Jul 2025 12:47:41 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '1027'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '1029'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999990'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_19a0763b09f0410b9d09598078a04cd6
status:
code: 200
message: OK
version: 1

View File

@@ -1,206 +0,0 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": "What is the capital of France?"}],
"model": "o1-mini", "stop": ["stop"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '115'
content-type:
- application/json
cookie:
- __cf_bm=KwJ1K47OHX4n2TZN8bMW37yKzKyK__S4HbTiCfyWjXM-1752842806-1.0.1.1-lweHFR7Kv2v7hT5I6xxYVz_7Ruu6aBdEgpJrSWrMxi_ficAeWC0oDeQ.0w2Lr1WRejIjqqcwSgdl6RixF2qEkjJZfS0pz_Vjjqexe44ayp4;
_cfuvid=zv09c6bwcgNsYU80ah3wXzqeaIKyt_h61EAh_XRA87I-1752842806652-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.75.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.75.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"error\": {\n \"message\": \"Unsupported parameter: 'stop'
is not supported with this model.\",\n \"type\": \"invalid_request_error\",\n
\ \"param\": \"stop\",\n \"code\": \"unsupported_parameter\"\n }\n}"
headers:
CF-RAY:
- 961220323a627e05-GRU
Connection:
- keep-alive
Content-Length:
- '196'
Content-Type:
- application/json
Date:
- Fri, 18 Jul 2025 12:54:06 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '9'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '11'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999990'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_e8d7880c5977029062d8487d215e5282
status:
code: 400
message: Bad Request
- request:
body: '{"messages": [{"role": "user", "content": "What is the capital of France?"}],
"model": "o1-mini"}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '97'
content-type:
- application/json
cookie:
- __cf_bm=KwJ1K47OHX4n2TZN8bMW37yKzKyK__S4HbTiCfyWjXM-1752842806-1.0.1.1-lweHFR7Kv2v7hT5I6xxYVz_7Ruu6aBdEgpJrSWrMxi_ficAeWC0oDeQ.0w2Lr1WRejIjqqcwSgdl6RixF2qEkjJZfS0pz_Vjjqexe44ayp4;
_cfuvid=zv09c6bwcgNsYU80ah3wXzqeaIKyt_h61EAh_XRA87I-1752842806652-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.75.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.75.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.11.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAA3SSQW/bMAyF7/4Vgo5BXCSeV6c5bkAPPTVbMaAYCoOT6JitLAkSPbQo8t8HKWns
Yu1FB3181HsUXwshJGm5FVL1wGrwpvw2In/fXY3Pcd/sftzf9ENvnurm569dc9/IZVK4P4+o+E11
odzgDTI5e8QqIDCmruvma7Wpv1T1ZQaD02iSzK3LgSyV1aqqy9VVua5Oyt6Rwii34nchhBCv+Uwe
rcZnuRWr5dvNgDHCHuX2XCSEDM6kGwkxUmSwLJcTVM4y2mz7rkehwBODEa4T1wGsQkFRLBa3ECgu
FhdzZcBujJCc29GYGQBrHUNKnj0/nMjh7LIjS7FvA0J0Nr0c2XmZ6aEQ4iGnHt8FkT64wXPL7glz
23V9bCenOc/h5kTZMZgZuKyWH/RrNTKQibO5SQWqRz1JpyHDqMnNQDFL97+dj3ofk5Pdz5xVm08f
mIBS6Bl16wNqUu9DT2UB0yZ+Vnaec7YsI4a/pLBlwpD+QmMHoznuiIwvkXFoO7J7DD5QXpT03cWh
+AcAAP//AwAo/zsSJwMAAA==
headers:
CF-RAY:
- 961220338bd47e05-GRU
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Fri, 18 Jul 2025 12:54:08 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '1280'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-envoy-upstream-service-time:
- '1286'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999990'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_b7390d46fa4e14380d42162cb22045df
status:
code: 200
message: OK
version: 1

View File

@@ -27,7 +27,7 @@ class TestValidateToken(unittest.TestCase):
audience="app_id_xxxx",
)
mock_jwt.decode.assert_called_with(
mock_jwt.decode.assert_called_once_with(
"aaaaa.bbbbbb.cccccc",
"mock_signing_key",
algorithms=["RS256"],

View File

@@ -1,28 +0,0 @@
import pytest
from unittest.mock import MagicMock
from crewai.agent import Agent
from crewai.task import Task
class BaseEvaluationMetricsTest:
@pytest.fixture
def mock_agent(self):
agent = MagicMock(spec=Agent)
agent.id = "test_agent_id"
agent.role = "Test Agent"
agent.goal = "Test goal"
agent.tools = []
return agent
@pytest.fixture
def mock_task(self):
task = MagicMock(spec=Task)
task.description = "Test task description"
task.expected_output = "Test expected output"
return task
@pytest.fixture
def execution_trace(self):
return {
"thinking": ["I need to analyze this data carefully"],
"actions": ["Gathered information", "Analyzed data"]
}

View File

@@ -1,59 +0,0 @@
from unittest.mock import patch, MagicMock
from tests.experimental.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.experimental.evaluation.base_evaluator import EvaluationScore
from crewai.experimental.evaluation.metrics.goal_metrics import GoalAlignmentEvaluator
from crewai.utilities.llm_utils import LLM
class TestGoalAlignmentEvaluator(BaseEvaluationMetricsTest):
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_success(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"score": 8.5,
"feedback": "The agent correctly understood the task and produced relevant output."
}
"""
mock_create_llm.return_value = mock_llm
evaluator = GoalAlignmentEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="This is the final output"
)
assert isinstance(result, EvaluationScore)
assert result.score == 8.5
assert "correctly understood the task" in result.feedback
mock_llm.call.assert_called_once()
prompt = mock_llm.call.call_args[0][0]
assert len(prompt) >= 2
assert "system" in prompt[0]["role"]
assert "user" in prompt[1]["role"]
assert mock_agent.role in prompt[1]["content"]
assert mock_task.description in prompt[1]["content"]
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_error_handling(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = "Invalid JSON response"
mock_create_llm.return_value = mock_llm
evaluator = GoalAlignmentEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="This is the final output"
)
assert isinstance(result, EvaluationScore)
assert result.score is None
assert "Failed to parse" in result.feedback

View File

@@ -1,166 +0,0 @@
import pytest
from unittest.mock import patch, MagicMock
from typing import List, Dict, Any
from crewai.tasks.task_output import TaskOutput
from crewai.experimental.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator,
)
from tests.experimental.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.utilities.llm_utils import LLM
from crewai.experimental.evaluation.base_evaluator import EvaluationScore
class TestReasoningEfficiencyEvaluator(BaseEvaluationMetricsTest):
@pytest.fixture
def mock_output(self):
output = MagicMock(spec=TaskOutput)
output.raw = "This is the test output"
return output
@pytest.fixture
def llm_calls(self) -> List[Dict[str, Any]]:
return [
{
"prompt": "How should I approach this task?",
"response": "I'll first research the topic, then compile findings.",
"timestamp": 1626987654
},
{
"prompt": "What resources should I use?",
"response": "I'll use relevant academic papers and reliable websites.",
"timestamp": 1626987754
},
{
"prompt": "How should I structure the output?",
"response": "I'll organize information clearly with headings and bullet points.",
"timestamp": 1626987854
}
]
def test_insufficient_llm_calls(self, mock_agent, mock_task, mock_output):
execution_trace = {"llm_calls": []}
evaluator = ReasoningEfficiencyEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=mock_output
)
assert isinstance(result, EvaluationScore)
assert result.score is None
assert "Insufficient LLM calls" in result.feedback
@patch("crewai.utilities.llm_utils.create_llm")
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task, mock_output, llm_calls):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"scores": {
"focus": 8.0,
"progression": 7.0,
"decision_quality": 7.5,
"conciseness": 8.0,
"loop_avoidance": 9.0
},
"overall_score": 7.9,
"feedback": "The agent demonstrated good reasoning efficiency.",
"optimization_suggestions": "The agent could improve by being more concise."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with sufficient LLM calls
execution_trace = {"llm_calls": llm_calls}
# Mock the _detect_loops method to return a simple result
evaluator = ReasoningEfficiencyEvaluator(llm=mock_llm)
evaluator._detect_loops = MagicMock(return_value=(False, []))
# Evaluate
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=mock_output
)
# Assertions
assert isinstance(result, EvaluationScore)
assert result.score == 7.9
assert "The agent demonstrated good reasoning efficiency" in result.feedback
assert "Reasoning Efficiency Evaluation:" in result.feedback
assert "• Focus: 8.0/10" in result.feedback
# Verify LLM was called
mock_llm.call.assert_called_once()
@patch("crewai.utilities.llm_utils.create_llm")
def test_parse_error_handling(self, mock_create_llm, mock_agent, mock_task, mock_output, llm_calls):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = "Invalid JSON response"
mock_create_llm.return_value = mock_llm
# Setup execution trace
execution_trace = {"llm_calls": llm_calls}
# Mock the _detect_loops method
evaluator = ReasoningEfficiencyEvaluator(llm=mock_llm)
evaluator._detect_loops = MagicMock(return_value=(False, []))
# Evaluate
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=mock_output
)
# Assertions for error handling
assert isinstance(result, EvaluationScore)
assert result.score is None
assert "Failed to parse reasoning efficiency evaluation" in result.feedback
@patch("crewai.utilities.llm_utils.create_llm")
def test_loop_detection(self, mock_create_llm, mock_agent, mock_task, mock_output):
# Setup LLM calls with a repeating pattern
repetitive_llm_calls = [
{"prompt": "How to solve?", "response": "I'll try method A", "timestamp": 1000},
{"prompt": "Let me try method A", "response": "It didn't work", "timestamp": 1100},
{"prompt": "How to solve?", "response": "I'll try method A again", "timestamp": 1200},
{"prompt": "Let me try method A", "response": "It didn't work", "timestamp": 1300},
{"prompt": "How to solve?", "response": "I'll try method A one more time", "timestamp": 1400}
]
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"scores": {
"focus": 6.0,
"progression": 3.0,
"decision_quality": 4.0,
"conciseness": 6.0,
"loop_avoidance": 2.0
},
"overall_score": 4.2,
"feedback": "The agent is stuck in a reasoning loop.",
"optimization_suggestions": "The agent should try different approaches when one fails."
}
"""
mock_create_llm.return_value = mock_llm
execution_trace = {"llm_calls": repetitive_llm_calls}
evaluator = ReasoningEfficiencyEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=mock_output
)
assert isinstance(result, EvaluationScore)
assert result.score == 4.2
assert "• Loop Avoidance: 2.0/10" in result.feedback

View File

@@ -1,82 +0,0 @@
from unittest.mock import patch, MagicMock
from crewai.experimental.evaluation.base_evaluator import EvaluationScore
from crewai.experimental.evaluation.metrics.semantic_quality_metrics import SemanticQualityEvaluator
from tests.experimental.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
from crewai.utilities.llm_utils import LLM
class TestSemanticQualityEvaluator(BaseEvaluationMetricsTest):
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_success(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"score": 8.5,
"feedback": "The output is clear, coherent, and logically structured."
}
"""
mock_create_llm.return_value = mock_llm
evaluator = SemanticQualityEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="This is a well-structured analysis of the data."
)
assert isinstance(result, EvaluationScore)
assert result.score == 8.5
assert "clear, coherent" in result.feedback
mock_llm.call.assert_called_once()
prompt = mock_llm.call.call_args[0][0]
assert len(prompt) >= 2
assert "system" in prompt[0]["role"]
assert "user" in prompt[1]["role"]
assert mock_agent.role in prompt[1]["content"]
assert mock_task.description in prompt[1]["content"]
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_with_empty_output(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"score": 2.0,
"feedback": "The output is empty or minimal, lacking substance."
}
"""
mock_create_llm.return_value = mock_llm
evaluator = SemanticQualityEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output=""
)
assert isinstance(result, EvaluationScore)
assert result.score == 2.0
assert "empty or minimal" in result.feedback
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluate_error_handling(self, mock_create_llm, mock_agent, mock_task, execution_trace):
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = "Invalid JSON response"
mock_create_llm.return_value = mock_llm
evaluator = SemanticQualityEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="This is the output."
)
assert isinstance(result, EvaluationScore)
assert result.score is None
assert "Failed to parse" in result.feedback

View File

@@ -1,230 +0,0 @@
from unittest.mock import patch, MagicMock
from crewai.experimental.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.utilities.llm_utils import LLM
from tests.experimental.evaluation.metrics.base_evaluation_metrics_test import BaseEvaluationMetricsTest
class TestToolSelectionEvaluator(BaseEvaluationMetricsTest):
def test_no_tools_available(self, mock_task, mock_agent):
# Create agent with no tools
mock_agent.tools = []
execution_trace = {"tool_uses": []}
evaluator = ToolSelectionEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score is None
assert "no tools available" in result.feedback.lower()
def test_tools_available_but_none_used(self, mock_agent, mock_task):
mock_agent.tools = ["tool1", "tool2"]
execution_trace = {"tool_uses": []}
evaluator = ToolSelectionEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score is None
assert "had tools available but didn't use any" in result.feedback.lower()
@patch("crewai.utilities.llm_utils.create_llm")
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task):
# Setup mock LLM response
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"overall_score": 8.5,
"feedback": "The agent made good tool selections."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with tool uses
execution_trace = {
"tool_uses": [
{"tool": "search_tool", "input": {"query": "test query"}, "output": "search results"},
{"tool": "calculator", "input": {"expression": "2+2"}, "output": "4"}
]
}
evaluator = ToolSelectionEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score == 8.5
assert "The agent made good tool selections" in result.feedback
# Verify LLM was called with correct prompt
mock_llm.call.assert_called_once()
prompt = mock_llm.call.call_args[0][0]
assert isinstance(prompt, list)
assert len(prompt) >= 2
assert "system" in prompt[0]["role"]
assert "user" in prompt[1]["role"]
class TestParameterExtractionEvaluator(BaseEvaluationMetricsTest):
def test_no_tool_uses(self, mock_agent, mock_task):
execution_trace = {"tool_uses": []}
evaluator = ParameterExtractionEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score is None
assert "no tool usage" in result.feedback.lower()
@patch("crewai.utilities.llm_utils.create_llm")
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task):
mock_agent.tools = ["tool1", "tool2"]
# Setup mock LLM response
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"overall_score": 9.0,
"feedback": "The agent extracted parameters correctly."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with tool uses
execution_trace = {
"tool_uses": [
{
"tool": "search_tool",
"input": {"query": "test query"},
"output": "search results",
"error": None
},
{
"tool": "calculator",
"input": {"expression": "2+2"},
"output": "4",
"error": None
}
]
}
evaluator = ParameterExtractionEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score == 9.0
assert "The agent extracted parameters correctly" in result.feedback
class TestToolInvocationEvaluator(BaseEvaluationMetricsTest):
def test_no_tool_uses(self, mock_agent, mock_task):
execution_trace = {"tool_uses": []}
evaluator = ToolInvocationEvaluator()
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score is None
assert "no tool usage" in result.feedback.lower()
@patch("crewai.utilities.llm_utils.create_llm")
def test_successful_evaluation(self, mock_create_llm, mock_agent, mock_task):
mock_agent.tools = ["tool1", "tool2"]
# Setup mock LLM response
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"overall_score": 8.0,
"feedback": "The agent invoked tools correctly."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with tool uses
execution_trace = {
"tool_uses": [
{"tool": "search_tool", "input": {"query": "test query"}, "output": "search results"},
{"tool": "calculator", "input": {"expression": "2+2"}, "output": "4"}
]
}
evaluator = ToolInvocationEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score == 8.0
assert "The agent invoked tools correctly" in result.feedback
@patch("crewai.utilities.llm_utils.create_llm")
def test_evaluation_with_errors(self, mock_create_llm, mock_agent, mock_task):
mock_agent.tools = ["tool1", "tool2"]
# Setup mock LLM response
mock_llm = MagicMock(spec=LLM)
mock_llm.call.return_value = """
{
"overall_score": 5.5,
"feedback": "The agent had some errors in tool invocation."
}
"""
mock_create_llm.return_value = mock_llm
# Setup execution trace with tool uses including errors
execution_trace = {
"tool_uses": [
{
"tool": "search_tool",
"input": {"query": "test query"},
"output": "search results",
"error": None
},
{
"tool": "calculator",
"input": {"expression": "2+"},
"output": None,
"error": "Invalid expression"
}
]
}
evaluator = ToolInvocationEvaluator(llm=mock_llm)
result = evaluator.evaluate(
agent=mock_agent,
task=mock_task,
execution_trace=execution_trace,
final_output="Final output"
)
assert result.score == 5.5
assert "The agent had some errors in tool invocation" in result.feedback

View File

@@ -1,278 +0,0 @@
import pytest
from crewai.agent import Agent
from crewai.task import Task
from crewai.crew import Crew
from crewai.experimental.evaluation.agent_evaluator import AgentEvaluator
from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator,
MetricCategory,
EvaluationScore
)
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.experimental.evaluation import create_default_evaluator
class TestAgentEvaluator:
@pytest.fixture
def mock_crew(self):
agent = Agent(
role="Test Agent",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
allow_delegation=False,
verbose=False
)
task = Task(
description="Test task description",
agent=agent,
expected_output="Expected test output"
)
crew = Crew(
agents=[agent],
tasks=[task]
)
return crew
def test_set_iteration(self):
agent_evaluator = AgentEvaluator(agents=[])
agent_evaluator.set_iteration(3)
assert agent_evaluator._execution_state.iteration == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_evaluate_current_iteration(self, mock_crew):
agent_evaluator = AgentEvaluator(agents=mock_crew.agents, evaluators=[GoalAlignmentEvaluator()])
mock_crew.kickoff()
results = agent_evaluator.get_evaluation_results()
assert isinstance(results, dict)
agent, = mock_crew.agents
task, = mock_crew.tasks
assert len(mock_crew.agents) == 1
assert agent.role in results
assert len(results[agent.role]) == 1
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 5.0
expected_feedback = "The agent's output demonstrates an understanding of the need for a comprehensive document outlining task"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 5' in goal_alignment.raw_response
def test_create_default_evaluator(self, mock_crew):
agent_evaluator = create_default_evaluator(agents=mock_crew.agents)
assert isinstance(agent_evaluator, AgentEvaluator)
assert agent_evaluator.agents == mock_crew.agents
expected_types = [
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
]
assert len(agent_evaluator.evaluators) == len(expected_types)
for evaluator, expected_type in zip(agent_evaluator.evaluators, expected_types):
assert isinstance(evaluator, expected_type)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_eval_lite_agent(self):
agent = Agent(
role="Test Agent",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
)
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()])
agent.kickoff(messages="Complete this task successfully")
assert events.keys() == {"started", "completed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id is None
assert events["started"].iteration == 1
assert events["completed"].agent_id == str(agent.id)
assert events["completed"].agent_role == agent.role
assert events["completed"].task_id is None
assert events["completed"].iteration == 1
assert events["completed"].metric_category == MetricCategory.GOAL_ALIGNMENT
assert isinstance(events["completed"].score, EvaluationScore)
assert events["completed"].score.score == 2.0
results = agent_evaluator.get_evaluation_results()
assert isinstance(results, dict)
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == "lite_task"
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 2.0
expected_feedback = "The agent did not demonstrate a clear understanding of the task goal, which is to complete test tasks successfully"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 2' in goal_alignment.raw_response
@pytest.mark.vcr(filter_headers=["authorization"])
def test_eval_specific_agents_from_crew(self, mock_crew):
agent = Agent(
role="Test Agent Eval",
goal="Complete test tasks successfully",
backstory="An agent created for testing purposes",
)
task = Task(
description="Test task description",
agent=agent,
expected_output="Expected test output"
)
mock_crew.agents.append(agent)
mock_crew.tasks.append(task)
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[GoalAlignmentEvaluator()])
mock_crew.kickoff()
assert events.keys() == {"started", "completed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id == str(task.id)
assert events["started"].iteration == 1
assert events["completed"].agent_id == str(agent.id)
assert events["completed"].agent_role == agent.role
assert events["completed"].task_id == str(task.id)
assert events["completed"].iteration == 1
assert events["completed"].metric_category == MetricCategory.GOAL_ALIGNMENT
assert isinstance(events["completed"].score, EvaluationScore)
assert events["completed"].score.score == 5.0
results = agent_evaluator.get_evaluation_results()
assert isinstance(results, dict)
assert len(results.keys()) == 1
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
goal_alignment, = result.metrics.values()
assert goal_alignment.score == 5.0
expected_feedback = "The agent provided a thorough guide on how to conduct a test task but failed to produce specific expected output"
assert expected_feedback in goal_alignment.feedback
assert goal_alignment.raw_response is not None
assert '"score": 5' in goal_alignment.raw_response
@pytest.mark.vcr(filter_headers=["authorization"])
def test_failed_evaluation(self, mock_crew):
agent, = mock_crew.agents
task, = mock_crew.tasks
with crewai_event_bus.scoped_handlers():
events = {}
@crewai_event_bus.on(AgentEvaluationStartedEvent)
def capture_started(source, event):
events["started"] = event
@crewai_event_bus.on(AgentEvaluationCompletedEvent)
def capture_completed(source, event):
events["completed"] = event
@crewai_event_bus.on(AgentEvaluationFailedEvent)
def capture_failed(source, event):
events["failed"] = event
# Create a mock evaluator that will raise an exception
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator
from crewai.experimental.evaluation import MetricCategory
class FailingEvaluator(BaseEvaluator):
metric_category = MetricCategory.GOAL_ALIGNMENT
def evaluate(self, agent, task, execution_trace, final_output):
raise ValueError("Forced evaluation failure")
agent_evaluator = AgentEvaluator(agents=[agent], evaluators=[FailingEvaluator()])
mock_crew.kickoff()
assert events.keys() == {"started", "failed"}
assert events["started"].agent_id == str(agent.id)
assert events["started"].agent_role == agent.role
assert events["started"].task_id == str(task.id)
assert events["started"].iteration == 1
assert events["failed"].agent_id == str(agent.id)
assert events["failed"].agent_role == agent.role
assert events["failed"].task_id == str(task.id)
assert events["failed"].iteration == 1
assert events["failed"].error == "Forced evaluation failure"
results = agent_evaluator.get_evaluation_results()
result, = results[agent.role]
assert isinstance(result, AgentEvaluationResult)
assert result.agent_id == str(agent.id)
assert result.task_id == str(task.id)
assert result.metrics == {}

View File

@@ -1,111 +0,0 @@
import pytest
from unittest.mock import MagicMock, patch
from crewai.experimental.evaluation.experiment.result import ExperimentResult, ExperimentResults
class TestExperimentResult:
@pytest.fixture
def mock_results(self):
return [
ExperimentResult(
identifier="test-1",
inputs={"query": "What is the capital of France?"},
score=10,
expected_score=7,
passed=True
),
ExperimentResult(
identifier="test-2",
inputs={"query": "Who wrote Hamlet?"},
score={"relevance": 9, "factuality": 8},
expected_score={"relevance": 7, "factuality": 7},
passed=True,
agent_evaluations={"agent1": {"metrics": {"goal_alignment": {"score": 9}}}}
),
ExperimentResult(
identifier="test-3",
inputs={"query": "Any query"},
score={"relevance": 9, "factuality": 8},
expected_score={"relevance": 7, "factuality": 7},
passed=False,
agent_evaluations={"agent1": {"metrics": {"goal_alignment": {"score": 9}}}}
),
ExperimentResult(
identifier="test-4",
inputs={"query": "Another query"},
score={"relevance": 9, "factuality": 8},
expected_score={"relevance": 7, "factuality": 7},
passed=True,
agent_evaluations={"agent1": {"metrics": {"goal_alignment": {"score": 9}}}}
),
ExperimentResult(
identifier="test-6",
inputs={"query": "Yet another query"},
score={"relevance": 9, "factuality": 8},
expected_score={"relevance": 7, "factuality": 7},
passed=True,
agent_evaluations={"agent1": {"metrics": {"goal_alignment": {"score": 9}}}}
)
]
@patch('os.path.exists', return_value=True)
@patch('os.path.getsize', return_value=1)
@patch('json.load')
@patch('builtins.open', new_callable=MagicMock)
def test_experiment_results_compare_with_baseline(self, mock_open, mock_json_load, mock_path_getsize, mock_path_exists, mock_results):
baseline_data = {
"timestamp": "2023-01-01T00:00:00+00:00",
"results": [
{
"identifier": "test-1",
"inputs": {"query": "What is the capital of France?"},
"score": 7,
"expected_score": 7,
"passed": False
},
{
"identifier": "test-2",
"inputs": {"query": "Who wrote Hamlet?"},
"score": {"relevance": 8, "factuality": 7},
"expected_score": {"relevance": 7, "factuality": 7},
"passed": True
},
{
"identifier": "test-3",
"inputs": {"query": "Any query"},
"score": {"relevance": 8, "factuality": 7},
"expected_score": {"relevance": 7, "factuality": 7},
"passed": True
},
{
"identifier": "test-4",
"inputs": {"query": "Another query"},
"score": {"relevance": 8, "factuality": 7},
"expected_score": {"relevance": 7, "factuality": 7},
"passed": True
},
{
"identifier": "test-5",
"inputs": {"query": "Another query"},
"score": {"relevance": 8, "factuality": 7},
"expected_score": {"relevance": 7, "factuality": 7},
"passed": True
}
]
}
mock_json_load.return_value = baseline_data
results = ExperimentResults(results=mock_results)
results.display = MagicMock()
comparison = results.compare_with_baseline(baseline_filepath="baseline.json")
assert "baseline_timestamp" in comparison
assert comparison["baseline_timestamp"] == "2023-01-01T00:00:00+00:00"
assert comparison["improved"] == ["test-1"]
assert comparison["regressed"] == ["test-3"]
assert comparison["unchanged"] == ["test-2", "test-4"]
assert comparison["new_tests"] == ["test-6"]
assert comparison["missing_tests"] == ["test-5"]

View File

@@ -1,197 +0,0 @@
import pytest
from unittest.mock import MagicMock, patch
from crewai.crew import Crew
from crewai.experimental.evaluation.experiment.runner import ExperimentRunner
from crewai.experimental.evaluation.experiment.result import ExperimentResults
from crewai.experimental.evaluation.evaluation_display import AgentAggregatedEvaluationResult
from crewai.experimental.evaluation.base_evaluator import MetricCategory, EvaluationScore
class TestExperimentRunner:
@pytest.fixture
def mock_crew(self):
return MagicMock(llm=Crew)
@pytest.fixture
def mock_evaluator_results(self):
agent_evaluation = AgentAggregatedEvaluationResult(
agent_id="Test Agent",
agent_role="Test Agent Role",
metrics={
MetricCategory.GOAL_ALIGNMENT: EvaluationScore(
score=9,
feedback="Test feedback for goal alignment",
raw_response="Test raw response for goal alignment"
),
MetricCategory.REASONING_EFFICIENCY: EvaluationScore(
score=None,
feedback="Reasoning efficiency not applicable",
raw_response="Reasoning efficiency not applicable"
),
MetricCategory.PARAMETER_EXTRACTION: EvaluationScore(
score=7,
feedback="Test parameter extraction explanation",
raw_response="Test raw output"
),
MetricCategory.TOOL_SELECTION: EvaluationScore(
score=8,
feedback="Test tool selection explanation",
raw_response="Test raw output"
)
}
)
return {"Test Agent": agent_evaluation}
@patch('crewai.experimental.evaluation.experiment.runner.create_default_evaluator')
def test_run_success(self, mock_create_evaluator, mock_crew, mock_evaluator_results):
dataset = [
{
"identifier": "test-case-1",
"inputs": {"query": "Test query 1"},
"expected_score": 8
},
{
"identifier": "test-case-2",
"inputs": {"query": "Test query 2"},
"expected_score": {"goal_alignment": 7}
},
{
"inputs": {"query": "Test query 3"},
"expected_score": {"tool_selection": 9}
}
]
mock_evaluator = MagicMock()
mock_evaluator.get_agent_evaluation.return_value = mock_evaluator_results
mock_evaluator.reset_iterations_results = MagicMock()
mock_create_evaluator.return_value = mock_evaluator
runner = ExperimentRunner(dataset=dataset)
results = runner.run(crew=mock_crew)
assert isinstance(results, ExperimentResults)
result_1, result_2, result_3 = results.results
assert len(results.results) == 3
assert result_1.identifier == "test-case-1"
assert result_1.inputs == {"query": "Test query 1"}
assert result_1.expected_score == 8
assert result_1.passed is True
assert result_2.identifier == "test-case-2"
assert result_2.inputs == {"query": "Test query 2"}
assert isinstance(result_2.expected_score, dict)
assert "goal_alignment" in result_2.expected_score
assert result_2.passed is True
assert result_3.identifier == "c2ed49e63aa9a83af3ca382794134fd5"
assert result_3.inputs == {"query": "Test query 3"}
assert isinstance(result_3.expected_score, dict)
assert "tool_selection" in result_3.expected_score
assert result_3.passed is False
assert mock_crew.kickoff.call_count == 3
mock_crew.kickoff.assert_any_call(inputs={"query": "Test query 1"})
mock_crew.kickoff.assert_any_call(inputs={"query": "Test query 2"})
mock_crew.kickoff.assert_any_call(inputs={"query": "Test query 3"})
assert mock_evaluator.reset_iterations_results.call_count == 3
assert mock_evaluator.get_agent_evaluation.call_count == 3
@patch('crewai.experimental.evaluation.experiment.runner.create_default_evaluator')
def test_run_success_with_unknown_metric(self, mock_create_evaluator, mock_crew, mock_evaluator_results):
dataset = [
{
"identifier": "test-case-2",
"inputs": {"query": "Test query 2"},
"expected_score": {"goal_alignment": 7, "unknown_metric": 8}
}
]
mock_evaluator = MagicMock()
mock_evaluator.get_agent_evaluation.return_value = mock_evaluator_results
mock_evaluator.reset_iterations_results = MagicMock()
mock_create_evaluator.return_value = mock_evaluator
runner = ExperimentRunner(dataset=dataset)
results = runner.run(crew=mock_crew)
result, = results.results
assert result.identifier == "test-case-2"
assert result.inputs == {"query": "Test query 2"}
assert isinstance(result.expected_score, dict)
assert "goal_alignment" in result.expected_score.keys()
assert "unknown_metric" in result.expected_score.keys()
assert result.passed is True
@patch('crewai.experimental.evaluation.experiment.runner.create_default_evaluator')
def test_run_success_with_single_metric_evaluator_and_expected_specific_metric(self, mock_create_evaluator, mock_crew, mock_evaluator_results):
dataset = [
{
"identifier": "test-case-2",
"inputs": {"query": "Test query 2"},
"expected_score": {"goal_alignment": 7}
}
]
mock_evaluator = MagicMock()
mock_create_evaluator["Test Agent"].metrics = {
MetricCategory.GOAL_ALIGNMENT: EvaluationScore(
score=9,
feedback="Test feedback for goal alignment",
raw_response="Test raw response for goal alignment"
)
}
mock_evaluator.get_agent_evaluation.return_value = mock_evaluator_results
mock_evaluator.reset_iterations_results = MagicMock()
mock_create_evaluator.return_value = mock_evaluator
runner = ExperimentRunner(dataset=dataset)
results = runner.run(crew=mock_crew)
result, = results.results
assert result.identifier == "test-case-2"
assert result.inputs == {"query": "Test query 2"}
assert isinstance(result.expected_score, dict)
assert "goal_alignment" in result.expected_score.keys()
assert result.passed is True
@patch('crewai.experimental.evaluation.experiment.runner.create_default_evaluator')
def test_run_success_when_expected_metric_is_not_available(self, mock_create_evaluator, mock_crew, mock_evaluator_results):
dataset = [
{
"identifier": "test-case-2",
"inputs": {"query": "Test query 2"},
"expected_score": {"unknown_metric": 7}
}
]
mock_evaluator = MagicMock()
mock_create_evaluator["Test Agent"].metrics = {
MetricCategory.GOAL_ALIGNMENT: EvaluationScore(
score=5,
feedback="Test feedback for goal alignment",
raw_response="Test raw response for goal alignment"
)
}
mock_evaluator.get_agent_evaluation.return_value = mock_evaluator_results
mock_evaluator.reset_iterations_results = MagicMock()
mock_create_evaluator.return_value = mock_evaluator
runner = ExperimentRunner(dataset=dataset)
results = runner.run(crew=mock_crew)
result, = results.results
assert result.identifier == "test-case-2"
assert result.inputs == {"query": "Test query 2"}
assert isinstance(result.expected_score, dict)
assert "unknown_metric" in result.expected_score.keys()
assert result.passed is False

View File

@@ -755,15 +755,3 @@ def test_multiple_routers_from_same_trigger():
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)
def test_flow_name():
class MyFlow(Flow):
name = "MyFlow"
@start()
def start(self):
return "Hello, world!"
flow = MyFlow()
assert flow.name == "MyFlow"

Some files were not shown because too many files have changed in this diff Show More