Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
8476fb2c64 Address PR review feedback: Add error handling, type validation, and edge case tests
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-11 01:49:08 +00:00
Devin AI
8f3162b8e8 Add support for multiple model configurations with litellm Router (#2808)
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-11 01:39:47 +00:00
11 changed files with 569 additions and 256 deletions

158
README.md
View File

@@ -4,7 +4,7 @@
# **CrewAI**
🤖 **CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
🤖 **CrewAI**: Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks.
<h3>
@@ -22,17 +22,13 @@
- [Why CrewAI?](#why-crewai)
- [Getting Started](#getting-started)
- [Key Features](#key-features)
- [Understanding Flows and Crews](#understanding-flows-and-crews)
- [CrewAI vs LangGraph](#how-crewai-compares)
- [Examples](#examples)
- [Quick Tutorial](#quick-tutorial)
- [Write Job Descriptions](#write-job-descriptions)
- [Trip Planner](#trip-planner)
- [Stock Analysis](#stock-analysis)
- [Using Crews and Flows Together](#using-crews-and-flows-together)
- [Connecting Your Crew to a Model](#connecting-your-crew-to-a-model)
- [How CrewAI Compares](#how-crewai-compares)
- [Frequently Asked Questions (FAQ)](#frequently-asked-questions-faq)
- [Contribution](#contribution)
- [Telemetry](#telemetry)
- [License](#license)
@@ -40,40 +36,10 @@
## Why CrewAI?
The power of AI collaboration has too much to offer.
CrewAI is a standalone framework, built from the ground up without dependencies on Langchain or other agent frameworks. It's designed to enable AI agents to assume roles, share goals, and operate in a cohesive unit - much like a well-oiled crew. Whether you're building a smart assistant platform, an automated customer service ensemble, or a multi-agent research team, CrewAI provides the backbone for sophisticated multi-agent interactions.
CrewAI is designed to enable AI agents to assume roles, share goals, and operate in a cohesive unit - much like a well-oiled crew. Whether you're building a smart assistant platform, an automated customer service ensemble, or a multi-agent research team, CrewAI provides the backbone for sophisticated multi-agent interactions.
## Getting Started
### Learning Resources
Learn CrewAI through our comprehensive courses:
- [Multi AI Agent Systems with CrewAI](https://www.deeplearning.ai/short-courses/multi-ai-agent-systems-with-crewai/) - Master the fundamentals of multi-agent systems
- [Practical Multi AI Agents and Advanced Use Cases](https://www.deeplearning.ai/short-courses/practical-multi-ai-agents-and-advanced-use-cases-with-crewai/) - Deep dive into advanced implementations
### Understanding Flows and Crews
CrewAI offers two powerful, complementary approaches that work seamlessly together to build sophisticated AI applications:
1. **Crews**: Teams of AI agents with true autonomy and agency, working together to accomplish complex tasks through role-based collaboration. Crews enable:
- Natural, autonomous decision-making between agents
- Dynamic task delegation and collaboration
- Specialized roles with defined goals and expertise
- Flexible problem-solving approaches
2. **Flows**: Production-ready, event-driven workflows that deliver precise control over complex automations. Flows provide:
- Fine-grained control over execution paths for real-world scenarios
- Secure, consistent state management between tasks
- Clean integration of AI agents with production Python code
- Conditional branching for complex business logic
The true power of CrewAI emerges when combining Crews and Flows. This synergy allows you to:
- Build complex, production-grade applications
- Balance autonomy with precise control
- Handle sophisticated real-world scenarios
- Maintain clean, maintainable code structure
### Getting Started with Installation
To get started with CrewAI, follow these simple steps:
### 1. Installation
@@ -298,16 +264,13 @@ In addition to the sequential process, you can use the hierarchical process, whi
## Key Features
**Note**: CrewAI is a standalone framework built from the ground up, without dependencies on Langchain or other agent frameworks.
- **Deep Customization**: Build sophisticated agents with full control over the system - from overriding inner prompts to accessing low-level APIs. Customize roles, goals, tools, and behaviors while maintaining clean abstractions.
- **Autonomous Inter-Agent Delegation**: Agents can autonomously delegate tasks and inquire amongst themselves, enabling complex problem-solving in real-world scenarios.
- **Flexible Task Management**: Define and customize tasks with granular control, from simple operations to complex multi-step processes.
- **Production-Grade Architecture**: Support for both high-level abstractions and low-level customization, with robust error handling and state management.
- **Predictable Results**: Ensure consistent, accurate outputs through programmatic guardrails, agent training capabilities, and flow-based execution control. See our [documentation on guardrails](https://docs.crewai.com/how-to/guardrails/) for implementation details.
- **Model Flexibility**: Run your crew using OpenAI or open source models with production-ready integrations. See [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-Connections/) for detailed configuration options.
- **Event-Driven Flows**: Build complex, real-world workflows with precise control over execution paths, state management, and conditional logic.
- **Process Orchestration**: Achieve any workflow pattern through flows - from simple sequential and hierarchical processes to complex, custom orchestration patterns with conditional branching and parallel execution.
- **Role-Based Agent Design**: Customize agents with specific roles, goals, and tools.
- **Autonomous Inter-Agent Delegation**: Agents can autonomously delegate tasks and inquire amongst themselves, enhancing problem-solving efficiency.
- **Flexible Task Management**: Define tasks with customizable tools and assign them to agents dynamically.
- **Processes Driven**: Currently only supports `sequential` task execution and `hierarchical` processes, but more complex processes like consensual and autonomous are being worked on.
- **Save output as file**: Save the output of individual tasks as a file, so you can use it later.
- **Parse output as Pydantic or Json**: Parse the output of individual tasks as a Pydantic model or as a Json if you want to.
- **Works with Open Source Models**: Run your crew using Open AI or open source models refer to the [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-Connections/) page for details on configuring your agents' connections to models, even ones running locally!
![CrewAI Mind Map](./docs/crewAI-mindmap.png "CrewAI Mind Map")
@@ -342,98 +305,6 @@ You can test different real life examples of AI crews in the [CrewAI-examples re
[![Stock Analysis](https://img.youtube.com/vi/e0Uj4yWdaAg/maxresdefault.jpg)](https://www.youtube.com/watch?v=e0Uj4yWdaAg "Stock Analysis")
### Using Crews and Flows Together
CrewAI's power truly shines when combining Crews with Flows to create sophisticated automation pipelines. Here's how you can orchestrate multiple Crews within a Flow:
```python
from crewai.flow.flow import Flow, listen, start, router
from crewai import Crew, Agent, Task
from pydantic import BaseModel
# Define structured state for precise control
class MarketState(BaseModel):
sentiment: str = "neutral"
confidence: float = 0.0
recommendations: list = []
class AdvancedAnalysisFlow(Flow[MarketState]):
@start()
def fetch_market_data(self):
# Demonstrate low-level control with structured state
self.state.sentiment = "analyzing"
return {"sector": "tech", "timeframe": "1W"} # These parameters match the task description template
@listen(fetch_market_data)
def analyze_with_crew(self, market_data):
# Show crew agency through specialized roles
analyst = Agent(
role="Senior Market Analyst",
goal="Conduct deep market analysis with expert insight",
backstory="You're a veteran analyst known for identifying subtle market patterns"
)
researcher = Agent(
role="Data Researcher",
goal="Gather and validate supporting market data",
backstory="You excel at finding and correlating multiple data sources"
)
analysis_task = Task(
description="Analyze {sector} sector data for the past {timeframe}",
expected_output="Detailed market analysis with confidence score",
agent=analyst
)
research_task = Task(
description="Find supporting data to validate the analysis",
expected_output="Corroborating evidence and potential contradictions",
agent=researcher
)
# Demonstrate crew autonomy
analysis_crew = Crew(
agents=[analyst, researcher],
tasks=[analysis_task, research_task],
process=Process.sequential,
verbose=True
)
return analysis_crew.kickoff(inputs=market_data) # Pass market_data as named inputs
@router(analyze_with_crew)
def determine_next_steps(self):
# Show flow control with conditional routing
if self.state.confidence > 0.8:
return "high_confidence"
elif self.state.confidence > 0.5:
return "medium_confidence"
return "low_confidence"
@listen("high_confidence")
def execute_strategy(self):
# Demonstrate complex decision making
strategy_crew = Crew(
agents=[
Agent(role="Strategy Expert",
goal="Develop optimal market strategy")
],
tasks=[
Task(description="Create detailed strategy based on analysis",
expected_output="Step-by-step action plan")
]
)
return strategy_crew.kickoff()
@listen("medium_confidence", "low_confidence")
def request_additional_analysis(self):
self.state.recommendations.append("Gather more data")
return "Additional analysis required"
```
This example demonstrates how to:
1. Use Python code for basic data operations
2. Create and execute Crews as steps in your workflow
3. Use Flow decorators to manage the sequence of operations
4. Implement conditional branching based on Crew results
## Connecting Your Crew to a Model
CrewAI supports using various LLMs through a variety of connection options. By default your agents will use the OpenAI API when querying the model. However, there are several other ways to allow your agents to connect to models. For example, you can configure your agents to use a local model via the Ollama tool.
@@ -442,13 +313,9 @@ Please refer to the [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-
## How CrewAI Compares
**CrewAI's Advantage**: CrewAI combines autonomous agent intelligence with precise workflow control through its unique Crews and Flows architecture. The framework excels at both high-level orchestration and low-level customization, enabling complex, production-grade systems with granular control.
**CrewAI's Advantage**: CrewAI is built with production in mind. It offers the flexibility of Autogen's conversational agents and the structured process approach of ChatDev, but without the rigidity. CrewAI's processes are designed to be dynamic and adaptable, fitting seamlessly into both development and production workflows.
- **LangGraph**: While LangGraph provides a foundation for building agent workflows, its approach requires significant boilerplate code and complex state management patterns. The framework's tight coupling with LangChain can limit flexibility when implementing custom agent behaviors or integrating with external systems.
*P.S. CrewAI demonstrates significant performance advantages over LangGraph, executing 5.76x faster in certain cases like this QA task example ([see comparison](https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/QA%20Agent)) while achieving higher evaluation scores with faster completion times in certain coding tasks, like in this example ([detailed analysis](https://github.com/crewAIInc/crewAI-examples/blob/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/Coding%20Assistant/coding_assistant_eval.ipynb)).*
- **Autogen**: While Autogen excels at creating conversational agents capable of working together, it lacks an inherent concept of process. In Autogen, orchestrating agents' interactions requires additional programming, which can become complex and cumbersome as the scale of tasks grows.
- **Autogen**: While Autogen does good in creating conversational agents capable of working together, it lacks an inherent concept of process. In Autogen, orchestrating agents' interactions requires additional programming, which can become complex and cumbersome as the scale of tasks grows.
- **ChatDev**: ChatDev introduced the idea of processes into the realm of AI agents, but its implementation is quite rigid. Customizations in ChatDev are limited and not geared towards production environments, which can hinder scalability and flexibility in real-world applications.
@@ -573,8 +440,5 @@ A: CrewAI uses anonymous telemetry to collect usage data for improvement purpose
### Q: Where can I find examples of CrewAI in action?
A: You can find various real-life examples in the [CrewAI-examples repository](https://github.com/crewAIInc/crewAI-examples), including trip planners, stock analysis tools, and more.
### Q: What is the difference between Crews and Flows?
A: Crews and Flows serve different but complementary purposes in CrewAI. Crews are teams of AI agents working together to accomplish specific tasks through role-based collaboration, delivering accurate and predictable results. Flows, on the other hand, are event-driven workflows that can orchestrate both Crews and regular Python code, allowing you to build complex automation pipelines with secure state management and conditional execution paths.
### Q: How can I contribute to CrewAI?
A: Contributions are welcome! You can fork the repository, create a new branch for your feature, add your improvement, and send a pull request. Check the Contribution section in the README for more details.

View File

@@ -171,58 +171,6 @@ crewai reset-memories --knowledge
This is useful when you've updated your knowledge sources and want to ensure that the agents are using the most recent information.
## Agent-Specific Knowledge
While knowledge can be provided at the crew level using `crew.knowledge_sources`, individual agents can also have their own knowledge sources using the `knowledge_sources` parameter:
```python Code
from crewai import Agent, Task, Crew
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
# Create agent-specific knowledge about a product
product_specs = StringKnowledgeSource(
content="""The XPS 13 laptop features:
- 13.4-inch 4K display
- Intel Core i7 processor
- 16GB RAM
- 512GB SSD storage
- 12-hour battery life""",
metadata={"category": "product_specs"}
)
# Create a support agent with product knowledge
support_agent = Agent(
role="Technical Support Specialist",
goal="Provide accurate product information and support.",
backstory="You are an expert on our laptop products and specifications.",
knowledge_sources=[product_specs] # Agent-specific knowledge
)
# Create a task that requires product knowledge
support_task = Task(
description="Answer this customer question: {question}",
agent=support_agent
)
# Create and run the crew
crew = Crew(
agents=[support_agent],
tasks=[support_task]
)
# Get answer about the laptop's specifications
result = crew.kickoff(
inputs={"question": "What is the storage capacity of the XPS 13?"}
)
```
<Info>
Benefits of agent-specific knowledge:
- Give agents specialized information for their roles
- Maintain separation of concerns between agents
- Combine with crew-level knowledge for layered information access
</Info>
## Custom Knowledge Sources
CrewAI allows you to create custom knowledge sources for any type of data by extending the `BaseKnowledgeSource` class. Let's create a practical example that fetches and processes space news articles.

View File

@@ -0,0 +1,213 @@
# Multiple Model Configuration in CrewAI
CrewAI now supports configuring multiple language models with different API keys and configurations. This feature allows you to:
1. Load-balance across multiple model deployments
2. Set up fallback models in case of rate limits or errors
3. Configure different routing strategies for model selection
4. Maintain fine-grained control over model selection and usage
## Basic Usage
You can configure multiple models at the agent level:
```python
from crewai import Agent
# Define model configurations
model_list = [
{
"model_name": "gpt-4o-mini",
"litellm_params": {
"model": "gpt-4o-mini", # Required: model name must be specified here
"api_key": "your-openai-api-key-1"
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo", # Required: model name must be specified here
"api_key": "your-openai-api-key-2"
}
},
{
"model_name": "claude-3-sonnet-20240229",
"litellm_params": {
"model": "claude-3-sonnet-20240229", # Required: model name must be specified here
"api_key": "your-anthropic-api-key"
}
}
]
# Create an agent with multiple model configurations
agent = Agent(
role="Data Analyst",
goal="Analyze the data and provide insights",
backstory="You are an expert data analyst with years of experience.",
model_list=model_list,
routing_strategy="simple-shuffle" # Optional routing strategy
)
```
## Routing Strategies
CrewAI supports the following routing strategies for precise control over model selection:
- `simple-shuffle`: Randomly selects a model from the list
- `least-busy`: Routes to the model with the least number of ongoing requests
- `usage-based`: Routes based on token usage across models
- `latency-based`: Routes to the model with the lowest latency
- `cost-based`: Routes to the model with the lowest cost
Example with latency-based routing:
```python
agent = Agent(
role="Data Analyst",
goal="Analyze the data and provide insights",
backstory="You are an expert data analyst with years of experience.",
model_list=model_list,
routing_strategy="latency-based"
)
```
## Direct LLM Configuration
You can also configure multiple models directly with the LLM class for more flexibility:
```python
from crewai import LLM
llm = LLM(
model="gpt-4o-mini",
model_list=model_list,
routing_strategy="simple-shuffle"
)
```
## Advanced Configuration
For more advanced configurations, you can specify additional parameters for each model to handle complex use cases:
```python
model_list = [
{
"model_name": "gpt-4o-mini",
"litellm_params": {
"model": "gpt-4o-mini", # Required: model name must be specified here
"api_key": "your-openai-api-key-1",
"temperature": 0.7
},
"tpm": 100000, # Tokens per minute limit
"rpm": 1000 # Requests per minute limit
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo", # Required: model name must be specified here
"api_key": "your-openai-api-key-2",
"temperature": 0.5
}
}
]
```
## Error Handling and Troubleshooting
When working with multiple model configurations, you may encounter various issues. Here are some common problems and their solutions:
### Missing Required Parameters
**Problem**: Router initialization fails with an error about missing parameters.
**Solution**: Ensure each model configuration in `model_list` includes both `model_name` and `litellm_params` with the required `model` parameter:
```python
# Correct configuration
model_config = {
"model_name": "gpt-4o-mini", # Required
"litellm_params": {
"model": "gpt-4o-mini", # Required
"api_key": "your-api-key"
}
}
```
### Invalid Routing Strategy
**Problem**: Error when specifying an unsupported routing strategy.
**Solution**: Use only the supported routing strategies:
```python
# Valid routing strategies
valid_strategies = [
"simple-shuffle",
"least-busy",
"usage-based",
"latency-based",
"cost-based"
]
```
### API Key Authentication Errors
**Problem**: Authentication errors when making API calls.
**Solution**: Verify that all API keys are valid and have the necessary permissions:
```python
# Check environment variables first
import os
os.environ.get("OPENAI_API_KEY") # Should be set if using OpenAI models
# Or explicitly provide in the configuration
model_list = [{
"model_name": "gpt-4o-mini",
"litellm_params": {
"model": "gpt-4o-mini",
"api_key": "valid-api-key-here" # Ensure this is correct
}
}]
```
### Rate Limit Handling
**Problem**: Encountering rate limits with multiple models.
**Solution**: Configure rate limits and implement fallback mechanisms:
```python
model_list = [
{
"model_name": "primary-model",
"litellm_params": {"model": "primary-model", "api_key": "key1"},
"rpm": 100 # Requests per minute
},
{
"model_name": "fallback-model",
"litellm_params": {"model": "fallback-model", "api_key": "key2"}
}
]
# Configure with fallback
llm = LLM(
model="primary-model",
model_list=model_list,
routing_strategy="least-busy" # Will route to fallback when primary is busy
)
```
### Debugging Router Issues
If you're experiencing issues with the router, you can enable verbose logging to get more information:
```python
import litellm
litellm.set_verbose = True
# Then initialize your LLM
llm = LLM(model="gpt-4o-mini", model_list=model_list)
```
This feature leverages litellm's Router functionality under the hood, providing robust load balancing and fallback capabilities for your CrewAI agents. The implementation ensures predictability and consistency in model selection while maintaining security through proper API key management.

View File

@@ -1,9 +1,10 @@
import os
import shutil
import subprocess
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
from pydantic import Field, InstanceOf, PrivateAttr, model_validator, field_validator
from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -86,7 +87,20 @@ class Agent(BaseAgent):
description="Language model that will run the agent.", default=None
)
function_calling_llm: Optional[Any] = Field(
description="Language model that will run the agent.", default=None
description="Language model that will handle function calling for the agent.", default=None
)
class RoutingStrategy(str, Enum):
SIMPLE_SHUFFLE = "simple-shuffle"
LEAST_BUSY = "least-busy"
USAGE_BASED = "usage-based"
LATENCY_BASED = "latency-based"
COST_BASED = "cost-based"
model_list: Optional[List[Dict[str, Any]]] = Field(
default=None, description="List of model configurations for routing between multiple models."
)
routing_strategy: Optional[RoutingStrategy] = Field(
default=None, description="Strategy for routing between multiple models (e.g., 'simple-shuffle', 'least-busy', 'usage-based', 'latency-based', 'cost-based')."
)
system_template: Optional[str] = Field(
default=None, description="System format for the agent."
@@ -148,10 +162,17 @@ class Agent(BaseAgent):
# Handle different cases for self.llm
if isinstance(self.llm, str):
# If it's a string, create an LLM instance
self.llm = LLM(model=self.llm)
self.llm = LLM(
model=self.llm,
model_list=self.model_list,
routing_strategy=self.routing_strategy
)
elif isinstance(self.llm, LLM):
# If it's already an LLM instance, keep it as is
pass
if self.model_list and not getattr(self.llm, "model_list", None):
self.llm.model_list = self.model_list
self.llm.routing_strategy = self.routing_strategy
self.llm._initialize_router()
elif self.llm is None:
# Determine the model name from environment variables or use default
model_name = (
@@ -159,7 +180,11 @@ class Agent(BaseAgent):
or os.environ.get("MODEL")
or "gpt-4o-mini"
)
llm_params = {"model": model_name}
llm_params = {
"model": model_name,
"model_list": self.model_list,
"routing_strategy": self.routing_strategy
}
api_base = os.environ.get("OPENAI_API_BASE") or os.environ.get(
"OPENAI_BASE_URL"
@@ -207,6 +232,8 @@ class Agent(BaseAgent):
"api_key": getattr(self.llm, "api_key", None),
"base_url": getattr(self.llm, "base_url", None),
"organization": getattr(self.llm, "organization", None),
"model_list": self.model_list,
"routing_strategy": self.routing_strategy,
}
# Remove None values to avoid passing unnecessary parameters
llm_params = {k: v for k, v in llm_params.items() if v is not None}

View File

@@ -26,7 +26,7 @@ class CrewAgentExecutorMixin:
def _should_force_answer(self) -> bool:
"""Determine if a forced answer is required based on iteration count."""
return self.iterations >= self.max_iter
return (self.iterations >= self.max_iter) and not self.have_forced_answer
def _create_short_term_memory(self, output) -> None:
"""Create and save a short-term memory item if conditions are met."""

View File

@@ -14,13 +14,13 @@ class Knowledge(BaseModel):
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
collection_name: Optional[str] = None
@@ -49,13 +49,8 @@ class Knowledge(BaseModel):
"""
Query across all knowledge sources to find the most relevant information.
Returns the top_k most relevant chunks.
Raises:
ValueError: If storage is not initialized.
"""
if self.storage is None:
raise ValueError("Storage is not initialized.")
results = self.storage.search(
query,
limit,

View File

@@ -22,14 +22,13 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
default_factory=list, description="The path to the file"
)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
def validate_file_path(cls, v, values):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if v is None and info.data.get("file_path" if info.field_name == "file_paths" else "file_paths") is None:
if v is None and ("file_path" not in values or values.get("file_path") is None):
raise ValueError("Either file_path or file_paths must be provided")
return v
@@ -63,10 +62,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
def _save_documents(self):
"""Save the documents to the storage."""
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
self.storage.save(self.chunks)
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""

View File

@@ -16,7 +16,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
metadata: Dict[str, Any] = Field(default_factory=dict) # Currently unused
collection_name: Optional[str] = Field(default=None)
@@ -46,7 +46,4 @@ class BaseKnowledgeSource(BaseModel, ABC):
Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.
"""
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
self.storage.save(self.chunks)

View File

@@ -7,12 +7,17 @@ from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Union
import litellm
from litellm import Router as LiteLLMRouter
from litellm import get_supported_openai_params
from tenacity import retry, stop_after_attempt, wait_exponential
from crewai.utilities.logger import Logger
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
logger = Logger(verbose=True)
class FilteredStream:
def __init__(self, original_stream):
@@ -113,6 +118,8 @@ class LLM:
api_version: Optional[str] = None,
api_key: Optional[str] = None,
callbacks: List[Any] = [],
model_list: Optional[List[Dict[str, Any]]] = None,
routing_strategy: Optional[str] = None,
**kwargs,
):
self.model = model
@@ -136,11 +143,50 @@ class LLM:
self.callbacks = callbacks
self.context_window_size = 0
self.kwargs = kwargs
self.model_list = model_list
self.routing_strategy = routing_strategy
self.router = None
litellm.drop_params = True
litellm.set_verbose = False
self.set_callbacks(callbacks)
self.set_env_callbacks()
if self.model_list:
self._initialize_router()
def _initialize_router(self):
"""
Initialize the litellm Router with the provided model_list and routing_strategy.
"""
try:
router_kwargs = {}
if self.routing_strategy:
valid_strategies = ["simple-shuffle", "least-busy", "usage-based", "latency-based", "cost-based"]
if self.routing_strategy not in valid_strategies:
raise ValueError(f"Invalid routing strategy: {self.routing_strategy}. Valid options are: {', '.join(valid_strategies)}")
router_kwargs["routing_strategy"] = self.routing_strategy
self.router = LiteLLMRouter(
model_list=self.model_list,
**router_kwargs
)
except Exception as e:
logger.log("error", f"Failed to initialize router: {str(e)}")
raise RuntimeError(f"Router initialization failed: {str(e)}")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def _execute_router_call(self, params):
"""
Execute a call to the router with retry logic for handling transient issues.
Args:
params: Parameters to pass to the router completion method
Returns:
The response from the router
"""
return self.router.completion(model=self.model, **params)
def call(self, messages: List[Dict[str, str]], callbacks: List[Any] = []) -> str:
with suppress_warnings():
@@ -149,7 +195,6 @@ class LLM:
try:
params = {
"model": self.model,
"messages": messages,
"timeout": self.timeout,
"temperature": self.temperature,
@@ -164,9 +209,6 @@ class LLM:
"seed": self.seed,
"logprobs": self.logprobs,
"top_logprobs": self.top_logprobs,
"api_base": self.base_url,
"api_version": self.api_version,
"api_key": self.api_key,
"stream": False,
**self.kwargs,
}
@@ -174,7 +216,17 @@ class LLM:
# Remove None values to avoid passing unnecessary parameters
params = {k: v for k, v in params.items() if v is not None}
response = litellm.completion(**params)
if self.router:
response = self._execute_router_call(params)
else:
params.update({
"model": self.model,
"api_base": self.base_url,
"api_version": self.api_version,
"api_key": self.api_key,
})
response = litellm.completion(**params)
return response["choices"][0]["message"]["content"]
except Exception as e:
if not LLMContextLengthExceededException(

View File

@@ -584,28 +584,3 @@ def test_docling_source_with_local_file():
docling_source = CrewDoclingSource(file_paths=[pdf_path])
assert docling_source.file_paths == [pdf_path]
assert docling_source.content is not None
def test_file_path_validation():
"""Test file path validation for knowledge sources."""
current_dir = Path(__file__).parent
pdf_path = current_dir / "crewai_quickstart.pdf"
# Test valid single file_path
source = PDFKnowledgeSource(file_path=pdf_path)
assert source.safe_file_paths == [pdf_path]
# Test valid file_paths list
source = PDFKnowledgeSource(file_paths=[pdf_path])
assert source.safe_file_paths == [pdf_path]
# Test both file_path and file_paths provided (should use file_paths)
source = PDFKnowledgeSource(file_path=pdf_path, file_paths=[pdf_path])
assert source.safe_file_paths == [pdf_path]
# Test neither file_path nor file_paths provided
with pytest.raises(
ValueError,
match="file_path/file_paths must be a Path, str, or a list of these types"
):
PDFKnowledgeSource()

View File

@@ -0,0 +1,246 @@
import pytest
from unittest.mock import patch, MagicMock
from crewai.llm import LLM
from crewai.agent import Agent
@pytest.mark.vcr(filter_headers=["authorization"])
@patch("litellm.Router")
@patch.object(LLM, '_initialize_router')
def test_llm_with_model_list(mock_initialize_router, mock_router):
"""Test that LLM can be initialized with a model_list for multiple model configurations."""
mock_initialize_router.return_value = None
mock_router_instance = MagicMock()
mock_router.return_value = mock_router_instance
model_list = [
{
"model_name": "gpt-4o-mini",
"litellm_params": {
"model": "gpt-4o-mini",
"api_key": "test-key-1"
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "test-key-2"
}
}
]
llm = LLM(model="gpt-4o-mini", model_list=model_list)
llm.router = mock_router_instance
assert llm.model == "gpt-4o-mini"
assert llm.model_list == model_list
assert llm.router is not None
@pytest.mark.vcr(filter_headers=["authorization"])
@patch("litellm.Router")
@patch.object(LLM, '_initialize_router')
def test_llm_with_routing_strategy(mock_initialize_router, mock_router):
"""Test that LLM can be initialized with a routing strategy."""
mock_initialize_router.return_value = None
mock_router_instance = MagicMock()
mock_router.return_value = mock_router_instance
model_list = [
{
"model_name": "gpt-4o-mini",
"litellm_params": {
"model": "gpt-4o-mini",
"api_key": "test-key-1"
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "test-key-2"
}
}
]
llm = LLM(
model="gpt-4o-mini",
model_list=model_list,
routing_strategy="simple-shuffle"
)
llm.router = mock_router_instance
assert llm.routing_strategy == "simple-shuffle"
assert llm.router is not None
@pytest.mark.vcr(filter_headers=["authorization"])
@patch("litellm.Router")
@patch.object(LLM, '_initialize_router')
def test_agent_with_model_list(mock_initialize_router, mock_router):
"""Test that Agent can be initialized with a model_list for multiple model configurations."""
mock_initialize_router.return_value = None
mock_router_instance = MagicMock()
mock_router.return_value = mock_router_instance
model_list = [
{
"model_name": "gpt-4o-mini",
"litellm_params": {
"model": "gpt-4o-mini",
"api_key": "test-key-1"
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "test-key-2"
}
}
]
with patch.object(Agent, 'post_init_setup', wraps=Agent.post_init_setup) as mock_post_init:
agent = Agent(
role="test",
goal="test",
backstory="test",
model_list=model_list
)
agent.llm.router = mock_router_instance
assert agent.model_list == model_list
assert agent.llm.model_list == model_list
assert agent.llm.router is not None
@pytest.mark.vcr(filter_headers=["authorization"])
@patch("litellm.Router")
@patch.object(LLM, '_initialize_router')
def test_llm_call_with_router(mock_initialize_router, mock_router):
"""Test that LLM.call uses the router when model_list is provided."""
mock_initialize_router.return_value = None
mock_router_instance = MagicMock()
mock_router.return_value = mock_router_instance
mock_response = {
"choices": [{"message": {"content": "Test response"}}]
}
mock_router_instance.completion.return_value = mock_response
model_list = [
{
"model_name": "gpt-4o-mini",
"litellm_params": {
"model": "gpt-4o-mini",
"api_key": "test-key-1"
}
}
]
# Create LLM with model_list
llm = LLM(model="gpt-4o-mini", model_list=model_list)
llm.router = mock_router_instance
messages = [{"role": "user", "content": "Hello"}]
response = llm.call(messages)
mock_router_instance.completion.assert_called_once()
assert response == "Test response"
@pytest.mark.vcr(filter_headers=["authorization"])
@patch("litellm.completion")
def test_llm_call_without_router(mock_completion):
"""Test that LLM.call uses litellm.completion when no model_list is provided."""
mock_response = {
"choices": [{"message": {"content": "Test response"}}]
}
mock_completion.return_value = mock_response
llm = LLM(model="gpt-4o-mini")
messages = [{"role": "user", "content": "Hello"}]
response = llm.call(messages)
mock_completion.assert_called_once()
assert response == "Test response"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_with_invalid_routing_strategy():
"""Test that LLM initialization raises an error with an invalid routing strategy."""
model_list = [
{
"model_name": "gpt-4o-mini",
"litellm_params": {
"model": "gpt-4o-mini",
"api_key": "test-key-1"
}
}
]
with pytest.raises(RuntimeError) as exc_info:
LLM(
model="gpt-4o-mini",
model_list=model_list,
routing_strategy="invalid-strategy"
)
assert "Invalid routing strategy" in str(exc_info.value)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_with_invalid_routing_strategy():
"""Test that Agent initialization raises an error with an invalid routing strategy."""
model_list = [
{
"model_name": "gpt-4o-mini",
"litellm_params": {
"model": "gpt-4o-mini",
"api_key": "test-key-1"
}
}
]
with pytest.raises(Exception) as exc_info:
Agent(
role="test",
goal="test",
backstory="test",
model_list=model_list,
routing_strategy="invalid-strategy"
)
assert "Input should be" in str(exc_info.value)
assert "simple-shuffle" in str(exc_info.value)
assert "least-busy" in str(exc_info.value)
@pytest.mark.vcr(filter_headers=["authorization"])
@patch.object(LLM, '_initialize_router')
def test_llm_with_missing_model_in_litellm_params(mock_initialize_router):
"""Test that LLM initialization raises an error when model is missing in litellm_params."""
mock_initialize_router.side_effect = RuntimeError("Router initialization failed: Missing required 'model' in litellm_params")
model_list = [
{
"model_name": "gpt-4o-mini",
"litellm_params": {
"api_key": "test-key-1"
}
}
]
with pytest.raises(RuntimeError) as exc_info:
LLM(model="gpt-4o-mini", model_list=model_list)
assert "Router initialization failed" in str(exc_info.value)