Compare commits

..

3 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
3d148090d9 Merge branch 'main' into bugfix/fix-type-error-in-token-counter 2025-02-26 13:25:02 -05:00
Lorenze Jay
82cfd353b3 Merge branch 'main' into bugfix/fix-type-error-in-token-counter 2025-02-26 10:01:19 -08:00
Brandon Hancock
0903bbeca2 Fix type issue 2025-02-25 12:18:52 -05:00
44 changed files with 579 additions and 5561 deletions

View File

@@ -136,21 +136,17 @@ crewai test -n 5 -m gpt-3.5-turbo
### 8. Run
Run the crew or flow.
Run the crew.
```shell Terminal
crewai run
```
<Note>
Starting from version 0.103.0, the `crewai run` command can be used to run both standard crews and flows. For flows, it automatically detects the type from pyproject.toml and runs the appropriate command. This is now the recommended way to run both crews and flows.
</Note>
<Note>
Make sure to run these commands from the directory where your CrewAI project is set up.
Some commands may require additional configuration or setup within your project structure.
</Note>
### 9. Chat
Starting in version `0.98.0`, when you run the `crewai chat` command, you start an interactive session with your crew. The AI assistant will guide you by asking for necessary inputs to execute the crew. Once all inputs are provided, the crew will execute its tasks.
@@ -179,6 +175,7 @@ def crew(self) -> Crew:
```
</Note>
### 10. API Keys
When running ```crewai create crew``` command, the CLI will first show you the top 5 most common LLM providers and ask you to select one.

View File

@@ -1,349 +0,0 @@
---
title: 'Event Listeners'
description: 'Tap into CrewAI events to build custom integrations and monitoring'
---
# Event Listeners
CrewAI provides a powerful event system that allows you to listen for and react to various events that occur during the execution of your Crew. This feature enables you to build custom integrations, monitoring solutions, logging systems, or any other functionality that needs to be triggered based on CrewAI's internal events.
## How It Works
CrewAI uses an event bus architecture to emit events throughout the execution lifecycle. The event system is built on the following components:
1. **CrewAIEventsBus**: A singleton event bus that manages event registration and emission
2. **CrewEvent**: Base class for all events in the system
3. **BaseEventListener**: Abstract base class for creating custom event listeners
When specific actions occur in CrewAI (like a Crew starting execution, an Agent completing a task, or a tool being used), the system emits corresponding events. You can register handlers for these events to execute custom code when they occur.
## Creating a Custom Event Listener
To create a custom event listener, you need to:
1. Create a class that inherits from `BaseEventListener`
2. Implement the `setup_listeners` method
3. Register handlers for the events you're interested in
4. Create an instance of your listener in the appropriate file
Here's a simple example of a custom event listener class:
```python
from crewai.utilities.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class MyCustomListener(BaseEventListener):
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
print(f"Crew '{event.crew_name}' has started execution!")
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
print(f"Crew '{event.crew_name}' has completed execution!")
print(f"Output: {event.output}")
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event):
print(f"Agent '{event.agent.role}' completed task")
print(f"Output: {event.output}")
```
## Properly Registering Your Listener
Simply defining your listener class isn't enough. You need to create an instance of it and ensure it's imported in your application. This ensures that:
1. The event handlers are registered with the event bus
2. The listener instance remains in memory (not garbage collected)
3. The listener is active when events are emitted
### Option 1: Import and Instantiate in Your Crew or Flow Implementation
The most important thing is to create an instance of your listener in the file where your Crew or Flow is defined and executed:
#### For Crew-based Applications
Create and import your listener at the top of your Crew implementation file:
```python
# In your crew.py file
from crewai import Agent, Crew, Task
from my_listeners import MyCustomListener
# Create an instance of your listener
my_listener = MyCustomListener()
class MyCustomCrew:
# Your crew implementation...
def crew(self):
return Crew(
agents=[...],
tasks=[...],
# ...
)
```
#### For Flow-based Applications
Create and import your listener at the top of your Flow implementation file:
```python
# In your main.py or flow.py file
from crewai.flow import Flow, listen, start
from my_listeners import MyCustomListener
# Create an instance of your listener
my_listener = MyCustomListener()
class MyCustomFlow(Flow):
# Your flow implementation...
@start()
def first_step(self):
# ...
```
This ensures that your listener is loaded and active when your Crew or Flow is executed.
### Option 2: Create a Package for Your Listeners
For a more structured approach, especially if you have multiple listeners:
1. Create a package for your listeners:
```
my_project/
├── listeners/
│ ├── __init__.py
│ ├── my_custom_listener.py
│ └── another_listener.py
```
2. In `my_custom_listener.py`, define your listener class and create an instance:
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
# ... import events ...
class MyCustomListener(BaseEventListener):
# ... implementation ...
# Create an instance of your listener
my_custom_listener = MyCustomListener()
```
3. In `__init__.py`, import the listener instances to ensure they're loaded:
```python
# __init__.py
from .my_custom_listener import my_custom_listener
from .another_listener import another_listener
# Optionally export them if you need to access them elsewhere
__all__ = ['my_custom_listener', 'another_listener']
```
4. Import your listeners package in your Crew or Flow file:
```python
# In your crew.py or flow.py file
import my_project.listeners # This loads all your listeners
class MyCustomCrew:
# Your crew implementation...
```
This is exactly how CrewAI's built-in `agentops_listener` is registered. In the CrewAI codebase, you'll find:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Available Event Types
CrewAI provides a wide range of events that you can listen for:
### Crew Events
- **CrewKickoffStartedEvent**: Emitted when a Crew starts execution
- **CrewKickoffCompletedEvent**: Emitted when a Crew completes execution
- **CrewKickoffFailedEvent**: Emitted when a Crew fails to complete execution
- **CrewTestStartedEvent**: Emitted when a Crew starts testing
- **CrewTestCompletedEvent**: Emitted when a Crew completes testing
- **CrewTestFailedEvent**: Emitted when a Crew fails to complete testing
- **CrewTrainStartedEvent**: Emitted when a Crew starts training
- **CrewTrainCompletedEvent**: Emitted when a Crew completes training
- **CrewTrainFailedEvent**: Emitted when a Crew fails to complete training
### Agent Events
- **AgentExecutionStartedEvent**: Emitted when an Agent starts executing a task
- **AgentExecutionCompletedEvent**: Emitted when an Agent completes executing a task
- **AgentExecutionErrorEvent**: Emitted when an Agent encounters an error during execution
### Task Events
- **TaskStartedEvent**: Emitted when a Task starts execution
- **TaskCompletedEvent**: Emitted when a Task completes execution
- **TaskFailedEvent**: Emitted when a Task fails to complete execution
- **TaskEvaluationEvent**: Emitted when a Task is evaluated
### Tool Usage Events
- **ToolUsageStartedEvent**: Emitted when a tool execution is started
- **ToolUsageFinishedEvent**: Emitted when a tool execution is completed
- **ToolUsageErrorEvent**: Emitted when a tool execution encounters an error
- **ToolValidateInputErrorEvent**: Emitted when a tool input validation encounters an error
- **ToolExecutionErrorEvent**: Emitted when a tool execution encounters an error
- **ToolSelectionErrorEvent**: Emitted when there's an error selecting a tool
### Flow Events
- **FlowCreatedEvent**: Emitted when a Flow is created
- **FlowStartedEvent**: Emitted when a Flow starts execution
- **FlowFinishedEvent**: Emitted when a Flow completes execution
- **FlowPlotEvent**: Emitted when a Flow is plotted
- **MethodExecutionStartedEvent**: Emitted when a Flow method starts execution
- **MethodExecutionFinishedEvent**: Emitted when a Flow method completes execution
- **MethodExecutionFailedEvent**: Emitted when a Flow method fails to complete execution
### LLM Events
- **LLMCallStartedEvent**: Emitted when an LLM call starts
- **LLMCallCompletedEvent**: Emitted when an LLM call completes
- **LLMCallFailedEvent**: Emitted when an LLM call fails
## Event Handler Structure
Each event handler receives two parameters:
1. **source**: The object that emitted the event
2. **event**: The event instance, containing event-specific data
The structure of the event object depends on the event type, but all events inherit from `CrewEvent` and include:
- **timestamp**: The time when the event was emitted
- **type**: A string identifier for the event type
Additional fields vary by event type. For example, `CrewKickoffCompletedEvent` includes `crew_name` and `output` fields.
## Real-World Example: Integration with AgentOps
CrewAI includes an example of a third-party integration with [AgentOps](https://github.com/AgentOps-AI/agentops), a monitoring and observability platform for AI agents. Here's how it's implemented:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
This listener initializes an AgentOps session when a Crew starts, registers agents with AgentOps, tracks tool usage, and ends the session when the Crew completes.
The AgentOps listener is registered in CrewAI's event system through the import in `src/crewai/utilities/events/third_party/__init__.py`:
```python
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Advanced Usage: Scoped Handlers
For temporary event handling (useful for testing or specific operations), you can use the `scoped_handlers` context manager:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)
def temp_handler(source, event):
print("This handler only exists within this context")
# Do something that emits events
# Outside the context, the temporary handler is removed
```
## Use Cases
Event listeners can be used for a variety of purposes:
1. **Logging and Monitoring**: Track the execution of your Crew and log important events
2. **Analytics**: Collect data about your Crew's performance and behavior
3. **Debugging**: Set up temporary listeners to debug specific issues
4. **Integration**: Connect CrewAI with external systems like monitoring platforms, databases, or notification services
5. **Custom Behavior**: Trigger custom actions based on specific events
## Best Practices
1. **Keep Handlers Light**: Event handlers should be lightweight and avoid blocking operations
2. **Error Handling**: Include proper error handling in your event handlers to prevent exceptions from affecting the main execution
3. **Cleanup**: If your listener allocates resources, ensure they're properly cleaned up
4. **Selective Listening**: Only listen for events you actually need to handle
5. **Testing**: Test your event listeners in isolation to ensure they behave as expected
By leveraging CrewAI's event system, you can extend its functionality and integrate it seamlessly with your existing infrastructure.

View File

@@ -150,12 +150,12 @@ final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
```
````
```text Output
---- Final Output ----
Second method received: Output from first_method
```
````
</CodeGroup>
@@ -738,34 +738,3 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>
## Running Flows
There are two ways to run a flow:
### Using the Flow API
You can run a flow programmatically by creating an instance of your flow class and calling the `kickoff()` method:
```python
flow = ExampleFlow()
result = flow.kickoff()
```
### Using the CLI
Starting from version 0.103.0, you can run flows using the `crewai run` command:
```shell
crewai run
```
This command automatically detects if your project is a flow (based on the `type = "flow"` setting in your pyproject.toml) and runs it accordingly. This is the recommended way to run flows from the command line.
For backward compatibility, you can also use:
```shell
crewai flow kickoff
```
However, the `crewai run` command is now the preferred method as it works for both crews and flows.

View File

@@ -876,19 +876,6 @@ save_output_task = Task(
#...
```
Check out the video below to see how to use structured outputs in CrewAI:
<iframe
width="560"
height="315"
src="https://www.youtube.com/embed/dNpKQk5uxHw"
title="YouTube video player"
frameborder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>
## Conclusion
Tasks are the driving force behind the actions of agents in CrewAI.

View File

@@ -1,681 +0,0 @@
# Custom LLM Implementations
CrewAI supports custom LLM implementations through the `LLM` base class. This allows you to create your own LLM implementations that don't rely on litellm's authentication mechanism.
## Using Custom LLM Implementations
To create a custom LLM implementation, you need to:
1. Inherit from the `LLM` base class
2. Implement the required methods:
- `call()`: The main method to call the LLM with messages
- `supports_function_calling()`: Whether the LLM supports function calling
- `supports_stop_words()`: Whether the LLM supports stop words
- `get_context_window_size()`: The context window size of the LLM
## Using the Default LLM Implementation
If you don't need a custom LLM implementation, you can use the default implementation provided by CrewAI:
```python
from crewai import LLM
# Create a default LLM instance
llm = LLM.create(model="gpt-4")
# Or with more parameters
llm = LLM.create(
model="gpt-4",
temperature=0.7,
max_tokens=1000,
api_key="your-api-key"
)
```
## Example: Basic Custom LLM
```python
from crewai import LLM
from typing import Any, Dict, List, Optional, Union
class CustomLLM(LLM):
def __init__(self, api_key: str, endpoint: str):
super().__init__() # Initialize the base class to set default attributes
if not api_key or not isinstance(api_key, str):
raise ValueError("Invalid API key: must be a non-empty string")
if not endpoint or not isinstance(endpoint, str):
raise ValueError("Invalid endpoint URL: must be a non-empty string")
self.api_key = api_key
self.endpoint = endpoint
self.stop = [] # You can customize stop words if needed
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Call the LLM with the given messages.
Args:
messages: Input messages for the LLM.
tools: Optional list of tool schemas for function calling.
callbacks: Optional list of callback functions.
available_functions: Optional dict mapping function names to callables.
Returns:
Either a text response from the LLM or the result of a tool function call.
Raises:
TimeoutError: If the LLM request times out.
RuntimeError: If the LLM request fails for other reasons.
ValueError: If the response format is invalid.
"""
# Implement your own logic to call the LLM
# For example, using requests:
import requests
try:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# Convert string message to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
data = {
"messages": messages,
"tools": tools
}
response = requests.post(
self.endpoint,
headers=headers,
json=data,
timeout=30 # Set a reasonable timeout
)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()["choices"][0]["message"]["content"]
except requests.Timeout:
raise TimeoutError("LLM request timed out")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
except (KeyError, IndexError, ValueError) as e:
raise ValueError(f"Invalid response format: {str(e)}")
def supports_function_calling(self) -> bool:
"""Check if the LLM supports function calling.
Returns:
True if the LLM supports function calling, False otherwise.
"""
# Return True if your LLM supports function calling
return True
def supports_stop_words(self) -> bool:
"""Check if the LLM supports stop words.
Returns:
True if the LLM supports stop words, False otherwise.
"""
# Return True if your LLM supports stop words
return True
def get_context_window_size(self) -> int:
"""Get the context window size of the LLM.
Returns:
The context window size as an integer.
"""
# Return the context window size of your LLM
return 8192
```
## Error Handling Best Practices
When implementing custom LLMs, it's important to handle errors properly to ensure robustness and reliability. Here are some best practices:
### 1. Implement Try-Except Blocks for API Calls
Always wrap API calls in try-except blocks to handle different types of errors:
```python
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
try:
# API call implementation
response = requests.post(
self.endpoint,
headers=self.headers,
json=self.prepare_payload(messages),
timeout=30 # Set a reasonable timeout
)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()["choices"][0]["message"]["content"]
except requests.Timeout:
raise TimeoutError("LLM request timed out")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
except (KeyError, IndexError, ValueError) as e:
raise ValueError(f"Invalid response format: {str(e)}")
```
### 2. Implement Retry Logic for Transient Failures
For transient failures like network issues or rate limiting, implement retry logic with exponential backoff:
```python
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
import time
max_retries = 3
retry_delay = 1 # seconds
for attempt in range(max_retries):
try:
response = requests.post(
self.endpoint,
headers=self.headers,
json=self.prepare_payload(messages),
timeout=30
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except (requests.Timeout, requests.ConnectionError) as e:
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt)) # Exponential backoff
continue
raise TimeoutError(f"LLM request failed after {max_retries} attempts: {str(e)}")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
```
### 3. Validate Input Parameters
Always validate input parameters to prevent runtime errors:
```python
def __init__(self, api_key: str, endpoint: str):
super().__init__()
if not api_key or not isinstance(api_key, str):
raise ValueError("Invalid API key: must be a non-empty string")
if not endpoint or not isinstance(endpoint, str):
raise ValueError("Invalid endpoint URL: must be a non-empty string")
self.api_key = api_key
self.endpoint = endpoint
```
### 4. Handle Authentication Errors Gracefully
Provide clear error messages for authentication failures:
```python
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
try:
response = requests.post(self.endpoint, headers=self.headers, json=data)
if response.status_code == 401:
raise ValueError("Authentication failed: Invalid API key or token")
elif response.status_code == 403:
raise ValueError("Authorization failed: Insufficient permissions")
response.raise_for_status()
# Process response
except Exception as e:
# Handle error
raise
```
## Example: JWT-based Authentication
For services that use JWT-based authentication instead of API keys, you can implement a custom LLM like this:
```python
from crewai import LLM, Agent, Task
from typing import Any, Dict, List, Optional, Union
class JWTAuthLLM(LLM):
def __init__(self, jwt_token: str, endpoint: str):
super().__init__() # Initialize the base class to set default attributes
if not jwt_token or not isinstance(jwt_token, str):
raise ValueError("Invalid JWT token: must be a non-empty string")
if not endpoint or not isinstance(endpoint, str):
raise ValueError("Invalid endpoint URL: must be a non-empty string")
self.jwt_token = jwt_token
self.endpoint = endpoint
self.stop = [] # You can customize stop words if needed
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Call the LLM with JWT authentication.
Args:
messages: Input messages for the LLM.
tools: Optional list of tool schemas for function calling.
callbacks: Optional list of callback functions.
available_functions: Optional dict mapping function names to callables.
Returns:
Either a text response from the LLM or the result of a tool function call.
Raises:
TimeoutError: If the LLM request times out.
RuntimeError: If the LLM request fails for other reasons.
ValueError: If the response format is invalid.
"""
# Implement your own logic to call the LLM with JWT authentication
import requests
try:
headers = {
"Authorization": f"Bearer {self.jwt_token}",
"Content-Type": "application/json"
}
# Convert string message to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
data = {
"messages": messages,
"tools": tools
}
response = requests.post(
self.endpoint,
headers=headers,
json=data,
timeout=30 # Set a reasonable timeout
)
if response.status_code == 401:
raise ValueError("Authentication failed: Invalid JWT token")
elif response.status_code == 403:
raise ValueError("Authorization failed: Insufficient permissions")
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()["choices"][0]["message"]["content"]
except requests.Timeout:
raise TimeoutError("LLM request timed out")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
except (KeyError, IndexError, ValueError) as e:
raise ValueError(f"Invalid response format: {str(e)}")
def supports_function_calling(self) -> bool:
"""Check if the LLM supports function calling.
Returns:
True if the LLM supports function calling, False otherwise.
"""
return True
def supports_stop_words(self) -> bool:
"""Check if the LLM supports stop words.
Returns:
True if the LLM supports stop words, False otherwise.
"""
return True
def get_context_window_size(self) -> int:
"""Get the context window size of the LLM.
Returns:
The context window size as an integer.
"""
return 8192
```
## Troubleshooting
Here are some common issues you might encounter when implementing custom LLMs and how to resolve them:
### 1. Authentication Failures
**Symptoms**: 401 Unauthorized or 403 Forbidden errors
**Solutions**:
- Verify that your API key or JWT token is valid and not expired
- Check that you're using the correct authentication header format
- Ensure that your token has the necessary permissions
### 2. Timeout Issues
**Symptoms**: Requests taking too long or timing out
**Solutions**:
- Implement timeout handling as shown in the examples
- Use retry logic with exponential backoff
- Consider using a more reliable network connection
### 3. Response Parsing Errors
**Symptoms**: KeyError, IndexError, or ValueError when processing responses
**Solutions**:
- Validate the response format before accessing nested fields
- Implement proper error handling for malformed responses
- Check the API documentation for the expected response format
### 4. Rate Limiting
**Symptoms**: 429 Too Many Requests errors
**Solutions**:
- Implement rate limiting in your custom LLM
- Add exponential backoff for retries
- Consider using a token bucket algorithm for more precise rate control
## Advanced Features
### Logging
Adding logging to your custom LLM can help with debugging and monitoring:
```python
import logging
from typing import Any, Dict, List, Optional, Union
class LoggingLLM(BaseLLM):
def __init__(self, api_key: str, endpoint: str):
super().__init__()
self.api_key = api_key
self.endpoint = endpoint
self.logger = logging.getLogger("crewai.llm.custom")
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
self.logger.info(f"Calling LLM with {len(messages) if isinstance(messages, list) else 1} messages")
try:
# API call implementation
response = self._make_api_call(messages, tools)
self.logger.debug(f"LLM response received: {response[:100]}...")
return response
except Exception as e:
self.logger.error(f"LLM call failed: {str(e)}")
raise
```
### Rate Limiting
Implementing rate limiting can help avoid overwhelming the LLM API:
```python
import time
from typing import Any, Dict, List, Optional, Union
class RateLimitedLLM(BaseLLM):
def __init__(
self,
api_key: str,
endpoint: str,
requests_per_minute: int = 60
):
super().__init__()
self.api_key = api_key
self.endpoint = endpoint
self.requests_per_minute = requests_per_minute
self.request_times: List[float] = []
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
self._enforce_rate_limit()
# Record this request time
self.request_times.append(time.time())
# Make the actual API call
return self._make_api_call(messages, tools)
def _enforce_rate_limit(self) -> None:
"""Enforce the rate limit by waiting if necessary."""
now = time.time()
# Remove request times older than 1 minute
self.request_times = [t for t in self.request_times if now - t < 60]
if len(self.request_times) >= self.requests_per_minute:
# Calculate how long to wait
oldest_request = min(self.request_times)
wait_time = 60 - (now - oldest_request)
if wait_time > 0:
time.sleep(wait_time)
```
### Metrics Collection
Collecting metrics can help you monitor your LLM usage:
```python
import time
from typing import Any, Dict, List, Optional, Union
class MetricsCollectingLLM(BaseLLM):
def __init__(self, api_key: str, endpoint: str):
super().__init__()
self.api_key = api_key
self.endpoint = endpoint
self.metrics: Dict[str, Any] = {
"total_calls": 0,
"total_tokens": 0,
"errors": 0,
"latency": []
}
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
start_time = time.time()
self.metrics["total_calls"] += 1
try:
response = self._make_api_call(messages, tools)
# Estimate tokens (simplified)
if isinstance(messages, str):
token_estimate = len(messages) // 4
else:
token_estimate = sum(len(m.get("content", "")) // 4 for m in messages)
self.metrics["total_tokens"] += token_estimate
return response
except Exception as e:
self.metrics["errors"] += 1
raise
finally:
latency = time.time() - start_time
self.metrics["latency"].append(latency)
def get_metrics(self) -> Dict[str, Any]:
"""Return the collected metrics."""
avg_latency = sum(self.metrics["latency"]) / len(self.metrics["latency"]) if self.metrics["latency"] else 0
return {
**self.metrics,
"avg_latency": avg_latency
}
```
## Advanced Usage: Function Calling
If your LLM supports function calling, you can implement the function calling logic in your custom LLM:
```python
import json
from typing import Any, Dict, List, Optional, Union
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
import requests
try:
headers = {
"Authorization": f"Bearer {self.jwt_token}",
"Content-Type": "application/json"
}
# Convert string message to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
data = {
"messages": messages,
"tools": tools
}
response = requests.post(
self.endpoint,
headers=headers,
json=data,
timeout=30
)
response.raise_for_status()
response_data = response.json()
# Check if the LLM wants to call a function
if response_data["choices"][0]["message"].get("tool_calls"):
tool_calls = response_data["choices"][0]["message"]["tool_calls"]
# Process each tool call
for tool_call in tool_calls:
function_name = tool_call["function"]["name"]
function_args = json.loads(tool_call["function"]["arguments"])
if available_functions and function_name in available_functions:
function_to_call = available_functions[function_name]
function_response = function_to_call(**function_args)
# Add the function response to the messages
messages.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"name": function_name,
"content": str(function_response)
})
# Call the LLM again with the updated messages
return self.call(messages, tools, callbacks, available_functions)
# Return the text response if no function call
return response_data["choices"][0]["message"]["content"]
except requests.Timeout:
raise TimeoutError("LLM request timed out")
except requests.RequestException as e:
raise RuntimeError(f"LLM request failed: {str(e)}")
except (KeyError, IndexError, ValueError) as e:
raise ValueError(f"Invalid response format: {str(e)}")
```
## Using Your Custom LLM with CrewAI
Once you've implemented your custom LLM, you can use it with CrewAI agents and crews:
```python
from crewai import Agent, Task, Crew
from typing import Dict, Any
# Create your custom LLM instance
jwt_llm = JWTAuthLLM(
jwt_token="your.jwt.token",
endpoint="https://your-llm-endpoint.com/v1/chat/completions"
)
# Use it with an agent
agent = Agent(
role="Research Assistant",
goal="Find information on a topic",
backstory="You are a research assistant tasked with finding information.",
llm=jwt_llm,
)
# Create a task for the agent
task = Task(
description="Research the benefits of exercise",
agent=agent,
expected_output="A summary of the benefits of exercise",
)
# Execute the task
result = agent.execute_task(task)
print(result)
# Or use it with a crew
crew = Crew(
agents=[agent],
tasks=[task],
manager_llm=jwt_llm, # Use your custom LLM for the manager
)
# Run the crew
result = crew.kickoff()
print(result)
```
## Implementing Your Own Authentication Mechanism
The `LLM` class allows you to implement any authentication mechanism you need, not just JWT or API keys. You can use:
- OAuth tokens
- Client certificates
- Custom headers
- Session-based authentication
- Any other authentication method required by your LLM provider
Simply implement the appropriate authentication logic in your custom LLM class.
## Migrating from BaseLLM to LLM
If you were previously using `BaseLLM`, you can simply replace it with `LLM`:
```python
# Old code
from crewai import BaseLLM
class CustomLLM(BaseLLM):
# ...
# New code
from crewai import LLM
class CustomLLM(LLM):
# ...
```
The `BaseLLM` class is still available for backward compatibility but will be removed in a future release. It now inherits from `LLM` and emits a deprecation warning when instantiated.

View File

@@ -48,6 +48,7 @@ Define a crew with a designated manager and establish a clear chain of command.
</Tip>
```python Code
from langchain_openai import ChatOpenAI
from crewai import Crew, Process, Agent
# Agents are defined with attributes for backstory, cache, and verbose mode
@@ -55,51 +56,38 @@ researcher = Agent(
role='Researcher',
goal='Conduct in-depth analysis',
backstory='Experienced data analyst with a knack for uncovering hidden trends.',
cache=True,
verbose=False,
# tools=[] # This can be optionally specified; defaults to an empty list
use_system_prompt=True, # Enable or disable system prompts for this agent
max_rpm=30, # Limit on the number of requests per minute
max_iter=5 # Maximum number of iterations for a final answer
)
writer = Agent(
role='Writer',
goal='Create engaging content',
backstory='Creative writer passionate about storytelling in technical domains.',
cache=True,
verbose=False,
# tools=[] # Optionally specify tools; defaults to an empty list
use_system_prompt=True, # Enable or disable system prompts for this agent
max_rpm=30, # Limit on the number of requests per minute
max_iter=5 # Maximum number of iterations for a final answer
)
# Establishing the crew with a hierarchical process and additional configurations
project_crew = Crew(
tasks=[...], # Tasks to be delegated and executed under the manager's supervision
agents=[researcher, writer],
manager_llm="gpt-4o", # Specify which LLM the manager should use
process=Process.hierarchical,
planning=True,
manager_llm=ChatOpenAI(temperature=0, model="gpt-4"), # Mandatory if manager_agent is not set
process=Process.hierarchical, # Specifies the hierarchical management approach
respect_context_window=True, # Enable respect of the context window for tasks
memory=True, # Enable memory usage for enhanced task execution
manager_agent=None, # Optional: explicitly set a specific agent as manager instead of the manager_llm
planning=True, # Enable planning feature for pre-execution strategy
)
```
### Using a Custom Manager Agent
Alternatively, you can create a custom manager agent with specific attributes tailored to your project's management needs. This gives you more control over the manager's behavior and capabilities.
```python
# Define a custom manager agent
manager = Agent(
role="Project Manager",
goal="Efficiently manage the crew and ensure high-quality task completion",
backstory="You're an experienced project manager, skilled in overseeing complex projects and guiding teams to success.",
allow_delegation=True,
)
# Use the custom manager in your crew
project_crew = Crew(
tasks=[...],
agents=[researcher, writer],
manager_agent=manager, # Use your custom manager agent
process=Process.hierarchical,
planning=True,
)
```
<Tip>
For more details on creating and customizing a manager agent, check out the [Custom Manager Agent documentation](https://docs.crewai.com/how-to/custom-manager-agent#custom-manager-agent).
</Tip>
### Workflow in Action
1. **Task Assignment**: The manager assigns tasks strategically, considering each agent's capabilities and available tools.
@@ -109,4 +97,4 @@ project_crew = Crew(
## Conclusion
Adopting the hierarchical process in CrewAI, with the correct configurations and understanding of the system's capabilities, facilitates an organized and efficient approach to project management.
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.

View File

@@ -15,124 +15,162 @@ icon: wrench
If you need to update Python, visit [python.org/downloads](https://python.org/downloads)
</Note>
CrewAI uses the `uv` as its dependency management and package handling tool. It simplifies project setup and execution, offering a seamless experience.
# Setting Up Your Environment
If you haven't installed `uv` yet, follow **step 1** to quickly get it set up on your system, else you can skip to **step 2**.
Before installing CrewAI, it's recommended to set up a virtual environment. This helps isolate your project dependencies and avoid conflicts.
<Steps>
<Step title="Install uv">
- **On macOS/Linux:**
<Step title="Create a Virtual Environment">
Choose your preferred method to create a virtual environment:
Use `curl` to download the script and execute it with `sh`:
```shell
curl -LsSf https://astral.sh/uv/install.sh | sh
```
If your system doesn't have `curl`, you can use `wget`:
```shell
wget -qO- https://astral.sh/uv/install.sh | sh
**Using venv (Python's built-in tool):**
```shell Terminal
python3 -m venv .venv
```
- **On Windows:**
Use `irm` to download the script and `iex` to execute it:
```shell
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
**Using conda:**
```shell Terminal
conda create -n crewai-env python=3.12
```
If you run into any issues, refer to [UV's installation guide](https://docs.astral.sh/uv/getting-started/installation/) for more information.
</Step>
<Step title="Install CrewAI 🚀">
- Run the following command to install `crewai` CLI:
```shell
uv tool install crewai
```
<Warning>
If you encounter a `PATH` warning, run this command to update your shell:
```shell
uv tool update-shell
```
</Warning>
<Step title="Activate the Virtual Environment">
Activate your virtual environment based on your platform:
- To verify that `crewai` is installed, run:
```shell
uv tools list
**On macOS/Linux (venv):**
```shell Terminal
source .venv/bin/activate
```
- You should see something like:
```markdown
crewai v0.102.0
- crewai
**On Windows (venv):**
```shell Terminal
.venv\Scripts\activate
```
**Using conda (all platforms):**
```shell Terminal
conda activate crewai-env
```
<Check>Installation successful! You're ready to create your first crew! 🎉</Check>
</Step>
</Steps>
# Creating a CrewAI Project
# Installing CrewAI
We recommend using the `YAML` template scaffolding for a structured approach to defining agents and tasks. Here's how to get started:
Now let's get you set up! 🚀
<Steps>
<Step title="Generate Project Scaffolding">
- Run the `crewai` CLI command:
```shell
crewai create crew <your_project_name>
```
<Step title="Install CrewAI">
Install CrewAI with all recommended tools using either method:
```shell Terminal
pip install 'crewai[tools]'
```
or
```shell Terminal
pip install crewai crewai-tools
```
- This creates a new project with the following structure:
<Frame>
```
my_project/
├── .gitignore
├── knowledge/
├── pyproject.toml
├── README.md
├── .env
└── src/
└── my_project/
├── __init__.py
├── main.py
├── crew.py
├── tools/
│ ├── custom_tool.py
│ └── __init__.py
└── config/
├── agents.yaml
└── tasks.yaml
```
</Frame>
<Note>
Both methods install the core package and additional tools needed for most use cases.
</Note>
</Step>
<Step title="Upgrade CrewAI (Existing Installations Only)">
If you have an older version of CrewAI installed, you can upgrade it:
```shell Terminal
pip install --upgrade crewai crewai-tools
```
<Warning>
If you see a Poetry-related warning, you'll need to migrate to our new dependency manager:
```shell Terminal
crewai update
```
This will update your project to use [UV](https://github.com/astral-sh/uv), our new faster dependency manager.
</Warning>
<Note>
Skip this step if you're doing a fresh installation.
</Note>
</Step>
<Step title="Verify Installation">
Check your installed versions:
```shell Terminal
pip freeze | grep crewai
```
You should see something like:
```markdown Output
crewai==X.X.X
crewai-tools==X.X.X
```
<Check>Installation successful! You're ready to create your first crew.</Check>
</Step>
</Steps>
# Creating a New Project
<Tip>
We recommend using the YAML Template scaffolding for a structured approach to defining agents and tasks.
</Tip>
<Steps>
<Step title="Generate Project Structure">
Run the CrewAI CLI command:
```shell Terminal
crewai create crew <project_name>
```
This creates a new project with the following structure:
<Frame>
```
my_project/
├── .gitignore
├── pyproject.toml
├── README.md
├── .env
└── src/
└── my_project/
├── __init__.py
├── main.py
├── crew.py
├── tools/
│ ├── custom_tool.py
│ └── __init__.py
└── config/
├── agents.yaml
└── tasks.yaml
```
</Frame>
</Step>
<Step title="Install Additional Tools">
You can install additional tools using UV:
```shell Terminal
uv add <tool-name>
```
<Tip>
UV is our preferred package manager as it's significantly faster than pip and provides better dependency resolution.
</Tip>
</Step>
<Step title="Customize Your Project">
- Your project will contain these essential files:
| File | Purpose |
| --- | --- |
| `agents.yaml` | Define your AI agents and their roles |
| `tasks.yaml` | Set up agent tasks and workflows |
| `.env` | Store API keys and environment variables |
| `main.py` | Project entry point and execution flow |
| `crew.py` | Crew orchestration and coordination |
| `tools/` | Directory for custom agent tools |
| `knowledge/` | Directory for knowledge base |
Your project will contain these essential files:
- Start by editing `agents.yaml` and `tasks.yaml` to define your crew's behavior.
- Keep sensitive information like API keys in `.env`.
</Step>
| File | Purpose |
| --- | --- |
| `agents.yaml` | Define your AI agents and their roles |
| `tasks.yaml` | Set up agent tasks and workflows |
| `.env` | Store API keys and environment variables |
| `main.py` | Project entry point and execution flow |
| `crew.py` | Crew orchestration and coordination |
| `tools/` | Directory for custom agent tools |
<Step title="Run your Crew">
- Before you run your crew, make sure to run:
```bash
crewai install
```
- If you need to install additional packages, use:
```shell
uv add <package-name>
```
- To run your crew, execute the following command in the root of your project:
```bash
crewai run
```
<Tip>
Start by editing `agents.yaml` and `tasks.yaml` to define your crew's behavior.
Keep sensitive information like API keys in `.env`.
</Tip>
</Step>
</Steps>

View File

@@ -116,8 +116,6 @@
{
"group": "Tools",
"pages": [
"tools/aimindtool",
"tools/bravesearchtool",
"tools/browserbaseloadtool",
"tools/codedocssearchtool",
"tools/codeinterpretertool",
@@ -134,32 +132,18 @@
"tools/firecrawlscrapewebsitetool",
"tools/firecrawlsearchtool",
"tools/githubsearchtool",
"tools/hyperbrowserloadtool",
"tools/linkupsearchtool",
"tools/llamaindextool",
"tools/serperdevtool",
"tools/s3readertool",
"tools/s3writertool",
"tools/scrapegraphscrapetool",
"tools/scrapeelementfromwebsitetool",
"tools/jsonsearchtool",
"tools/mdxsearchtool",
"tools/mysqltool",
"tools/multiontool",
"tools/nl2sqltool",
"tools/patronustools",
"tools/pdfsearchtool",
"tools/pgsearchtool",
"tools/qdrantvectorsearchtool",
"tools/ragtool",
"tools/scrapewebsitetool",
"tools/scrapflyscrapetool",
"tools/seleniumscrapingtool",
"tools/snowflakesearchtool",
"tools/spidertool",
"tools/txtsearchtool",
"tools/visiontool",
"tools/weaviatevectorsearchtool",
"tools/websitesearchtool",
"tools/xmlsearchtool",
"tools/youtubechannelsearchtool",

View File

@@ -8,10 +8,10 @@ icon: rocket
Let's create a simple crew that will help us `research` and `report` on the `latest AI developments` for a given topic or subject.
Before we proceed, make sure you have finished installing CrewAI.
Before we proceed, make sure you have `crewai` and `crewai-tools` installed.
If you haven't installed them yet, you can do so by following the [installation guide](/installation).
Follow the steps below to get Crewing! 🚣‍♂️
Follow the steps below to get crewing! 🚣‍♂️
<Steps>
<Step title="Create your crew">
@@ -23,13 +23,6 @@ Follow the steps below to get Crewing! 🚣‍♂️
```
</CodeGroup>
</Step>
<Step title="Navigate to your new crew project">
<CodeGroup>
```shell Terminal
cd latest-ai-development
```
</CodeGroup>
</Step>
<Step title="Modify your `agents.yaml` file">
<Tip>
You can also modify the agents as needed to fit your use case or copy and paste as is to your project.
@@ -179,26 +172,21 @@ Follow the steps below to get Crewing! 🚣‍♂️
- A [Serper.dev](https://serper.dev/) API key: `SERPER_API_KEY=YOUR_KEY_HERE`
</Step>
<Step title="Lock and install the dependencies">
- Lock the dependencies and install them by using the CLI command:
<CodeGroup>
```shell Terminal
crewai install
```
</CodeGroup>
- If you have additional packages that you want to install, you can do so by running:
<CodeGroup>
```shell Terminal
uv add <package-name>
```
</CodeGroup>
Lock the dependencies and install them by using the CLI command but first, navigate to your project directory:
<CodeGroup>
```shell Terminal
cd latest-ai-development
crewai install
```
</CodeGroup>
</Step>
<Step title="Run your crew">
- To run your crew, execute the following command in the root of your project:
<CodeGroup>
```bash Terminal
crewai run
```
</CodeGroup>
To run your crew, execute the following command in the root of your project:
<CodeGroup>
```bash Terminal
crewai run
```
</CodeGroup>
</Step>
<Step title="View your final report">
You should see the output in the console and the `report.md` file should be created in the root of your project with the final report.
@@ -270,12 +258,6 @@ Follow the steps below to get Crewing! 🚣‍♂️
</Step>
</Steps>
<Check>
Congratulations!
You have successfully set up your crew project and are ready to start building your own agentic workflows!
</Check>
### Note on Consistency in Naming
The names you use in your YAML files (`agents.yaml` and `tasks.yaml`) should match the method names in your Python code.
@@ -315,9 +297,194 @@ email_summarizer_task:
- research_task
```
Use the annotations to properly reference the agent and task in the `crew.py` file.
### Annotations include:
Here are examples of how to use each annotation in your CrewAI project, and when you should use them:
#### @agent
Used to define an agent in your crew. Use this when:
- You need to create a specialized AI agent with a specific role
- You want the agent to be automatically collected and managed by the crew
- You need to reuse the same agent configuration across multiple tasks
```python
@agent
def research_agent(self) -> Agent:
return Agent(
role="Research Analyst",
goal="Conduct thorough research on given topics",
backstory="Expert researcher with years of experience in data analysis",
tools=[SerperDevTool()],
verbose=True
)
```
#### @task
Used to define a task that can be executed by agents. Use this when:
- You need to define a specific piece of work for an agent
- You want tasks to be automatically sequenced and managed
- You need to establish dependencies between different tasks
```python
@task
def research_task(self) -> Task:
return Task(
description="Research the latest developments in AI technology",
expected_output="A comprehensive report on AI advancements",
agent=self.research_agent(),
output_file="output/research.md"
)
```
#### @crew
Used to define your crew configuration. Use this when:
- You want to automatically collect all @agent and @task definitions
- You need to specify how tasks should be processed (sequential or hierarchical)
- You want to set up crew-wide configurations
```python
@crew
def research_crew(self) -> Crew:
return Crew(
agents=self.agents, # Automatically collected from @agent methods
tasks=self.tasks, # Automatically collected from @task methods
process=Process.sequential,
verbose=True
)
```
#### @tool
Used to create custom tools for your agents. Use this when:
- You need to give agents specific capabilities (like web search, data analysis)
- You want to encapsulate external API calls or complex operations
- You need to share functionality across multiple agents
```python
@tool
def web_search_tool(query: str, max_results: int = 5) -> list[str]:
"""
Search the web for information.
Args:
query: The search query
max_results: Maximum number of results to return
Returns:
List of search results
"""
# Implement your search logic here
return [f"Result {i} for: {query}" for i in range(max_results)]
```
#### @before_kickoff
Used to execute logic before the crew starts. Use this when:
- You need to validate or preprocess input data
- You want to set up resources or configurations before execution
- You need to perform any initialization logic
```python
@before_kickoff
def validate_inputs(self, inputs: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Validate and preprocess inputs before the crew starts."""
if inputs is None:
return None
if 'topic' not in inputs:
raise ValueError("Topic is required")
# Add additional context
inputs['timestamp'] = datetime.now().isoformat()
inputs['topic'] = inputs['topic'].strip().lower()
return inputs
```
#### @after_kickoff
Used to process results after the crew completes. Use this when:
- You need to format or transform the final output
- You want to perform cleanup operations
- You need to save or log the results in a specific way
```python
@after_kickoff
def process_results(self, result: CrewOutput) -> CrewOutput:
"""Process and format the results after the crew completes."""
result.raw = result.raw.strip()
result.raw = f"""
# Research Results
Generated on: {datetime.now().isoformat()}
{result.raw}
"""
return result
```
#### @callback
Used to handle events during crew execution. Use this when:
- You need to monitor task progress
- You want to log intermediate results
- You need to implement custom progress tracking or metrics
```python
@callback
def log_task_completion(self, task: Task, output: str):
"""Log task completion details for monitoring."""
print(f"Task '{task.description}' completed")
print(f"Output length: {len(output)} characters")
print(f"Agent used: {task.agent.role}")
print("-" * 50)
```
#### @cache_handler
Used to implement custom caching for task results. Use this when:
- You want to avoid redundant expensive operations
- You need to implement custom cache storage or expiration logic
- You want to persist results between runs
```python
@cache_handler
def custom_cache(self, key: str) -> Optional[str]:
"""Custom cache implementation for storing task results."""
cache_file = f"cache/{key}.json"
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
data = json.load(f)
# Check if cache is still valid (e.g., not expired)
if datetime.fromisoformat(data['timestamp']) > datetime.now() - timedelta(days=1):
return data['result']
return None
```
<Note>
These decorators are part of the CrewAI framework and help organize your crew's structure by automatically collecting agents, tasks, and handling various lifecycle events.
They should be used within a class decorated with `@CrewBase`.
</Note>
### Replay Tasks from Latest Crew Kickoff
CrewAI now includes a replay feature that allows you to list the tasks from the last run and replay from a specific one. To use this feature, run.
```shell
crewai replay <task_id>
```
Replace `<task_id>` with the ID of the task you want to replay.
### Reset Crew Memory
If you need to reset the memory of your crew before running it again, you can do so by calling the reset memory feature:
```shell
crewai reset-memories --all
```
This will clear the crew's memory, allowing for a fresh start.
## Deploying Your Project
The easiest way to deploy your crew is through [CrewAI Enterprise](http://app.crewai.com), where you can deploy your crew in a few clicks.
The easiest way to deploy your crew is through CrewAI Enterprise, where you can deploy your crew in a few clicks.
<CardGroup cols={2}>
<Card

View File

@@ -1,118 +0,0 @@
---
title: AI Mind Tool
description: The `AIMindTool` is designed to query data sources in natural language.
icon: brain
---
# `AIMindTool`
## Description
The `AIMindTool` is a wrapper around [AI-Minds](https://mindsdb.com/minds) provided by [MindsDB](https://mindsdb.com/). It allows you to query data sources in natural language by simply configuring their connection parameters. This tool is useful when you need answers to questions from your data stored in various data sources including PostgreSQL, MySQL, MariaDB, ClickHouse, Snowflake, and Google BigQuery.
Minds are AI systems that work similarly to large language models (LLMs) but go beyond by answering any question from any data. This is accomplished by:
- Selecting the most relevant data for an answer using parametric search
- Understanding the meaning and providing responses within the correct context through semantic search
- Delivering precise answers by analyzing data and using machine learning (ML) models
## Installation
To incorporate this tool into your project, you need to install the Minds SDK:
```shell
uv add minds-sdk
```
## Steps to Get Started
To effectively use the `AIMindTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` and `minds-sdk` packages are installed in your Python environment.
2. **API Key Acquisition**: Sign up for a Minds account [here](https://mdb.ai/register), and obtain an API key.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `MINDS_API_KEY` to facilitate its use by the tool.
## Example
The following example demonstrates how to initialize the tool and execute a query:
```python Code
from crewai_tools import AIMindTool
# Initialize the AIMindTool
aimind_tool = AIMindTool(
datasources=[
{
"description": "house sales data",
"engine": "postgres",
"connection_data": {
"user": "demo_user",
"password": "demo_password",
"host": "samples.mindsdb.com",
"port": 5432,
"database": "demo",
"schema": "demo_data"
},
"tables": ["house_sales"]
}
]
)
# Run a natural language query
result = aimind_tool.run("How many 3 bedroom houses were sold in 2008?")
print(result)
```
## Parameters
The `AIMindTool` accepts the following parameters:
- **api_key**: Optional. Your Minds API key. If not provided, it will be read from the `MINDS_API_KEY` environment variable.
- **datasources**: A list of dictionaries, each containing the following keys:
- **description**: A description of the data contained in the datasource.
- **engine**: The engine (or type) of the datasource.
- **connection_data**: A dictionary containing the connection parameters for the datasource.
- **tables**: A list of tables that the data source will use. This is optional and can be omitted if all tables in the data source are to be used.
A list of supported data sources and their connection parameters can be found [here](https://docs.mdb.ai/docs/data_sources).
## Agent Integration Example
Here's how to integrate the `AIMindTool` with a CrewAI agent:
```python Code
from crewai import Agent
from crewai.project import agent
from crewai_tools import AIMindTool
# Initialize the tool
aimind_tool = AIMindTool(
datasources=[
{
"description": "sales data",
"engine": "postgres",
"connection_data": {
"user": "your_user",
"password": "your_password",
"host": "your_host",
"port": 5432,
"database": "your_db",
"schema": "your_schema"
},
"tables": ["sales"]
}
]
)
# Define an agent with the AIMindTool
@agent
def data_analyst(self) -> Agent:
return Agent(
config=self.agents_config["data_analyst"],
allow_delegation=False,
tools=[aimind_tool]
)
```
## Conclusion
The `AIMindTool` provides a powerful way to query your data sources using natural language, making it easier to extract insights without writing complex SQL queries. By connecting to various data sources and leveraging AI-Minds technology, this tool enables agents to access and analyze data efficiently.

View File

@@ -1,96 +0,0 @@
---
title: Brave Search
description: The `BraveSearchTool` is designed to search the internet using the Brave Search API.
icon: searchengin
---
# `BraveSearchTool`
## Description
This tool is designed to perform web searches using the Brave Search API. It allows you to search the internet with a specified query and retrieve relevant results. The tool supports customizable result counts and country-specific searches.
## Installation
To incorporate this tool into your project, follow the installation instructions below:
```shell
pip install 'crewai[tools]'
```
## Steps to Get Started
To effectively use the `BraveSearchTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a Brave Search API key by registering at [Brave Search API](https://api.search.brave.com/app/keys).
3. **Environment Configuration**: Store your obtained API key in an environment variable named `BRAVE_API_KEY` to facilitate its use by the tool.
## Example
The following example demonstrates how to initialize the tool and execute a search with a given query:
```python Code
from crewai_tools import BraveSearchTool
# Initialize the tool for internet searching capabilities
tool = BraveSearchTool()
# Execute a search
results = tool.run(search_query="CrewAI agent framework")
print(results)
```
## Parameters
The `BraveSearchTool` accepts the following parameters:
- **search_query**: Mandatory. The search query you want to use to search the internet.
- **country**: Optional. Specify the country for the search results. Default is empty string.
- **n_results**: Optional. Number of search results to return. Default is `10`.
- **save_file**: Optional. Whether to save the search results to a file. Default is `False`.
## Example with Parameters
Here is an example demonstrating how to use the tool with additional parameters:
```python Code
from crewai_tools import BraveSearchTool
# Initialize the tool with custom parameters
tool = BraveSearchTool(
country="US",
n_results=5,
save_file=True
)
# Execute a search
results = tool.run(search_query="Latest AI developments")
print(results)
```
## Agent Integration Example
Here's how to integrate the `BraveSearchTool` with a CrewAI agent:
```python Code
from crewai import Agent
from crewai.project import agent
from crewai_tools import BraveSearchTool
# Initialize the tool
brave_search_tool = BraveSearchTool()
# Define an agent with the BraveSearchTool
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config["researcher"],
allow_delegation=False,
tools=[brave_search_tool]
)
```
## Conclusion
By integrating the `BraveSearchTool` into Python projects, users gain the ability to conduct real-time, relevant searches across the internet directly from their applications. The tool provides a simple interface to the powerful Brave Search API, making it easy to retrieve and process search results programmatically. By adhering to the setup and usage guidelines provided, incorporating this tool into projects is streamlined and straightforward.

View File

@@ -8,15 +8,18 @@ icon: code-simple
## Description
The `CodeInterpreterTool` enables CrewAI agents to execute Python 3 code that they generate autonomously. The code is run in a secure, isolated Docker container, ensuring safety regardless of the content. This functionality is particularly valuable as it allows agents to create code, execute it, obtain the results, and utilize that information to inform subsequent decisions and actions.
This tool enables the Agent to execute Python 3 code that it has generated autonomously. The code is run in a secure, isolated environment, ensuring safety regardless of the content.
This functionality is particularly valuable as it allows the Agent to create code, execute it within the same ecosystem,
obtain the results, and utilize that information to inform subsequent decisions and actions.
## Requirements
- Docker must be installed and running on your system. If you don't have it, you can install it from [here](https://docs.docker.com/get-docker/).
- Docker
## Installation
To use this tool, you need to install the CrewAI tools package:
Install the `crewai_tools` package
```shell
pip install 'crewai[tools]'
@@ -24,153 +27,27 @@ pip install 'crewai[tools]'
## Example
The following example demonstrates how to use the `CodeInterpreterTool` with a CrewAI agent:
Remember that when using this tool, the code must be generated by the Agent itself.
The code must be a Python3 code. And it will take some time for the first time to run
because it needs to build the Docker image.
```python Code
from crewai import Agent, Task, Crew, Process
from crewai import Agent
from crewai_tools import CodeInterpreterTool
# Initialize the tool
code_interpreter = CodeInterpreterTool()
# Define an agent that uses the tool
programmer_agent = Agent(
role="Python Programmer",
goal="Write and execute Python code to solve problems",
backstory="An expert Python programmer who can write efficient code to solve complex problems.",
tools=[code_interpreter],
verbose=True,
Agent(
...
tools=[CodeInterpreterTool()],
)
# Example task to generate and execute code
coding_task = Task(
description="Write a Python function to calculate the Fibonacci sequence up to the 10th number and print the result.",
expected_output="The Fibonacci sequence up to the 10th number.",
agent=programmer_agent,
)
# Create and run the crew
crew = Crew(
agents=[programmer_agent],
tasks=[coding_task],
verbose=True,
process=Process.sequential,
)
result = crew.kickoff()
```
You can also enable code execution directly when creating an agent:
We also provide a simple way to use it directly from the Agent.
```python Code
from crewai import Agent
# Create an agent with code execution enabled
programmer_agent = Agent(
role="Python Programmer",
goal="Write and execute Python code to solve problems",
backstory="An expert Python programmer who can write efficient code to solve complex problems.",
allow_code_execution=True, # This automatically adds the CodeInterpreterTool
verbose=True,
agent = Agent(
...
allow_code_execution=True,
)
```
## Parameters
The `CodeInterpreterTool` accepts the following parameters during initialization:
- **user_dockerfile_path**: Optional. Path to a custom Dockerfile to use for the code interpreter container.
- **user_docker_base_url**: Optional. URL to the Docker daemon to use for running the container.
- **unsafe_mode**: Optional. Whether to run code directly on the host machine instead of in a Docker container. Default is `False`. Use with caution!
When using the tool with an agent, the agent will need to provide:
- **code**: Required. The Python 3 code to execute.
- **libraries_used**: Required. A list of libraries used in the code that need to be installed.
## Agent Integration Example
Here's a more detailed example of how to integrate the `CodeInterpreterTool` with a CrewAI agent:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import CodeInterpreterTool
# Initialize the tool
code_interpreter = CodeInterpreterTool()
# Define an agent that uses the tool
data_analyst = Agent(
role="Data Analyst",
goal="Analyze data using Python code",
backstory="""You are an expert data analyst who specializes in using Python
to analyze and visualize data. You can write efficient code to process
large datasets and extract meaningful insights.""",
tools=[code_interpreter],
verbose=True,
)
# Create a task for the agent
analysis_task = Task(
description="""
Write Python code to:
1. Generate a random dataset of 100 points with x and y coordinates
2. Calculate the correlation coefficient between x and y
3. Create a scatter plot of the data
4. Print the correlation coefficient and save the plot as 'scatter.png'
Make sure to handle any necessary imports and print the results.
""",
expected_output="The correlation coefficient and confirmation that the scatter plot has been saved.",
agent=data_analyst,
)
# Run the task
crew = Crew(
agents=[data_analyst],
tasks=[analysis_task],
verbose=True,
process=Process.sequential,
)
result = crew.kickoff()
```
## Implementation Details
The `CodeInterpreterTool` uses Docker to create a secure environment for code execution:
```python Code
class CodeInterpreterTool(BaseTool):
name: str = "Code Interpreter"
description: str = "Interprets Python3 code strings with a final print statement."
args_schema: Type[BaseModel] = CodeInterpreterSchema
default_image_tag: str = "code-interpreter:latest"
def _run(self, **kwargs) -> str:
code = kwargs.get("code", self.code)
libraries_used = kwargs.get("libraries_used", [])
if self.unsafe_mode:
return self.run_code_unsafe(code, libraries_used)
else:
return self.run_code_in_docker(code, libraries_used)
```
The tool performs the following steps:
1. Verifies that the Docker image exists or builds it if necessary
2. Creates a Docker container with the current working directory mounted
3. Installs any required libraries specified by the agent
4. Executes the Python code in the container
5. Returns the output of the code execution
6. Cleans up by stopping and removing the container
## Security Considerations
By default, the `CodeInterpreterTool` runs code in an isolated Docker container, which provides a layer of security. However, there are still some security considerations to keep in mind:
1. The Docker container has access to the current working directory, so sensitive files could potentially be accessed.
2. The `unsafe_mode` parameter allows code to be executed directly on the host machine, which should only be used in trusted environments.
3. Be cautious when allowing agents to install arbitrary libraries, as they could potentially include malicious code.
## Conclusion
The `CodeInterpreterTool` provides a powerful way for CrewAI agents to execute Python code in a relatively secure environment. By enabling agents to write and run code, it significantly expands their problem-solving capabilities, especially for tasks involving data analysis, calculations, or other computational work. This tool is particularly useful for agents that need to perform complex operations that are more efficiently expressed in code than in natural language.

View File

@@ -1,86 +0,0 @@
---
title: Hyperbrowser Load Tool
description: The `HyperbrowserLoadTool` enables web scraping and crawling using Hyperbrowser.
icon: globe
---
# `HyperbrowserLoadTool`
## Description
The `HyperbrowserLoadTool` enables web scraping and crawling using [Hyperbrowser](https://hyperbrowser.ai), a platform for running and scaling headless browsers. This tool allows you to scrape a single page or crawl an entire site, returning the content in properly formatted markdown or HTML.
Key Features:
- Instant Scalability - Spin up hundreds of browser sessions in seconds without infrastructure headaches
- Simple Integration - Works seamlessly with popular tools like Puppeteer and Playwright
- Powerful APIs - Easy to use APIs for scraping/crawling any site
- Bypass Anti-Bot Measures - Built-in stealth mode, ad blocking, automatic CAPTCHA solving, and rotating proxies
## Installation
To use this tool, you need to install the Hyperbrowser SDK:
```shell
uv add hyperbrowser
```
## Steps to Get Started
To effectively use the `HyperbrowserLoadTool`, follow these steps:
1. **Sign Up**: Head to [Hyperbrowser](https://app.hyperbrowser.ai/) to sign up and generate an API key.
2. **API Key**: Set the `HYPERBROWSER_API_KEY` environment variable or pass it directly to the tool constructor.
3. **Install SDK**: Install the Hyperbrowser SDK using the command above.
## Example
The following example demonstrates how to initialize the tool and use it to scrape a website:
```python Code
from crewai_tools import HyperbrowserLoadTool
from crewai import Agent
# Initialize the tool with your API key
tool = HyperbrowserLoadTool(api_key="your_api_key") # Or use environment variable
# Define an agent that uses the tool
@agent
def web_researcher(self) -> Agent:
'''
This agent uses the HyperbrowserLoadTool to scrape websites
and extract information.
'''
return Agent(
config=self.agents_config["web_researcher"],
tools=[tool]
)
```
## Parameters
The `HyperbrowserLoadTool` accepts the following parameters:
### Constructor Parameters
- **api_key**: Optional. Your Hyperbrowser API key. If not provided, it will be read from the `HYPERBROWSER_API_KEY` environment variable.
### Run Parameters
- **url**: Required. The website URL to scrape or crawl.
- **operation**: Optional. The operation to perform on the website. Either 'scrape' or 'crawl'. Default is 'scrape'.
- **params**: Optional. Additional parameters for the scrape or crawl operation.
## Supported Parameters
For detailed information on all supported parameters, visit:
- [Scrape Parameters](https://docs.hyperbrowser.ai/reference/sdks/python/scrape#start-scrape-job-and-wait)
- [Crawl Parameters](https://docs.hyperbrowser.ai/reference/sdks/python/crawl#start-crawl-job-and-wait)
## Return Format
The tool returns content in the following format:
- For **scrape** operations: The content of the page in markdown or HTML format.
- For **crawl** operations: The content of each page separated by dividers, including the URL of each page.
## Conclusion
The `HyperbrowserLoadTool` provides a powerful way to scrape and crawl websites, handling complex scenarios like anti-bot measures, CAPTCHAs, and more. By leveraging Hyperbrowser's platform, this tool enables agents to access and extract web content efficiently.

View File

@@ -1,112 +0,0 @@
---
title: Linkup Search Tool
description: The `LinkupSearchTool` enables querying the Linkup API for contextual information.
icon: link
---
# `LinkupSearchTool`
## Description
The `LinkupSearchTool` provides the ability to query the Linkup API for contextual information and retrieve structured results. This tool is ideal for enriching workflows with up-to-date and reliable information from Linkup, allowing agents to access relevant data during their tasks.
## Installation
To use this tool, you need to install the Linkup SDK:
```shell
uv add linkup-sdk
```
## Steps to Get Started
To effectively use the `LinkupSearchTool`, follow these steps:
1. **API Key**: Obtain a Linkup API key.
2. **Environment Setup**: Set up your environment with the API key.
3. **Install SDK**: Install the Linkup SDK using the command above.
## Example
The following example demonstrates how to initialize the tool and use it in an agent:
```python Code
from crewai_tools import LinkupSearchTool
from crewai import Agent
import os
# Initialize the tool with your API key
linkup_tool = LinkupSearchTool(api_key=os.getenv("LINKUP_API_KEY"))
# Define an agent that uses the tool
@agent
def researcher(self) -> Agent:
'''
This agent uses the LinkupSearchTool to retrieve contextual information
from the Linkup API.
'''
return Agent(
config=self.agents_config["researcher"],
tools=[linkup_tool]
)
```
## Parameters
The `LinkupSearchTool` accepts the following parameters:
### Constructor Parameters
- **api_key**: Required. Your Linkup API key.
### Run Parameters
- **query**: Required. The search term or phrase.
- **depth**: Optional. The search depth. Default is "standard".
- **output_type**: Optional. The type of output. Default is "searchResults".
## Advanced Usage
You can customize the search parameters for more specific results:
```python Code
# Perform a search with custom parameters
results = linkup_tool.run(
query="Women Nobel Prize Physics",
depth="deep",
output_type="searchResults"
)
```
## Return Format
The tool returns results in the following format:
```json
{
"success": true,
"results": [
{
"name": "Result Title",
"url": "https://example.com/result",
"content": "Content of the result..."
},
// Additional results...
]
}
```
If an error occurs, the response will be:
```json
{
"success": false,
"error": "Error message"
}
```
## Error Handling
The tool gracefully handles API errors and provides structured feedback. If the API request fails, the tool will return a dictionary with `success: false` and an error message.
## Conclusion
The `LinkupSearchTool` provides a seamless way to integrate Linkup's contextual information retrieval capabilities into your CrewAI agents. By leveraging this tool, agents can access relevant and up-to-date information to enhance their decision-making and task execution.

View File

@@ -1,146 +0,0 @@
---
title: LlamaIndex Tool
description: The `LlamaIndexTool` is a wrapper for LlamaIndex tools and query engines.
icon: address-book
---
# `LlamaIndexTool`
## Description
The `LlamaIndexTool` is designed to be a general wrapper around LlamaIndex tools and query engines, enabling you to leverage LlamaIndex resources in terms of RAG/agentic pipelines as tools to plug into CrewAI agents. This tool allows you to seamlessly integrate LlamaIndex's powerful data processing and retrieval capabilities into your CrewAI workflows.
## Installation
To use this tool, you need to install LlamaIndex:
```shell
uv add llama-index
```
## Steps to Get Started
To effectively use the `LlamaIndexTool`, follow these steps:
1. **Install LlamaIndex**: Install the LlamaIndex package using the command above.
2. **Set Up LlamaIndex**: Follow the [LlamaIndex documentation](https://docs.llamaindex.ai/) to set up a RAG/agent pipeline.
3. **Create a Tool or Query Engine**: Create a LlamaIndex tool or query engine that you want to use with CrewAI.
## Example
The following examples demonstrate how to initialize the tool from different LlamaIndex components:
### From a LlamaIndex Tool
```python Code
from crewai_tools import LlamaIndexTool
from crewai import Agent
from llama_index.core.tools import FunctionTool
# Example 1: Initialize from FunctionTool
def search_data(query: str) -> str:
"""Search for information in the data."""
# Your implementation here
return f"Results for: {query}"
# Create a LlamaIndex FunctionTool
og_tool = FunctionTool.from_defaults(
search_data,
name="DataSearchTool",
description="Search for information in the data"
)
# Wrap it with LlamaIndexTool
tool = LlamaIndexTool.from_tool(og_tool)
# Define an agent that uses the tool
@agent
def researcher(self) -> Agent:
'''
This agent uses the LlamaIndexTool to search for information.
'''
return Agent(
config=self.agents_config["researcher"],
tools=[tool]
)
```
### From LlamaHub Tools
```python Code
from crewai_tools import LlamaIndexTool
from llama_index.tools.wolfram_alpha import WolframAlphaToolSpec
# Initialize from LlamaHub Tools
wolfram_spec = WolframAlphaToolSpec(app_id="your_app_id")
wolfram_tools = wolfram_spec.to_tool_list()
tools = [LlamaIndexTool.from_tool(t) for t in wolfram_tools]
```
### From a LlamaIndex Query Engine
```python Code
from crewai_tools import LlamaIndexTool
from llama_index.core import VectorStoreIndex
from llama_index.core.readers import SimpleDirectoryReader
# Load documents
documents = SimpleDirectoryReader("./data").load_data()
# Create an index
index = VectorStoreIndex.from_documents(documents)
# Create a query engine
query_engine = index.as_query_engine()
# Create a LlamaIndexTool from the query engine
query_tool = LlamaIndexTool.from_query_engine(
query_engine,
name="Company Data Query Tool",
description="Use this tool to lookup information in company documents"
)
```
## Class Methods
The `LlamaIndexTool` provides two main class methods for creating instances:
### from_tool
Creates a `LlamaIndexTool` from a LlamaIndex tool.
```python Code
@classmethod
def from_tool(cls, tool: Any, **kwargs: Any) -> "LlamaIndexTool":
# Implementation details
```
### from_query_engine
Creates a `LlamaIndexTool` from a LlamaIndex query engine.
```python Code
@classmethod
def from_query_engine(
cls,
query_engine: Any,
name: Optional[str] = None,
description: Optional[str] = None,
return_direct: bool = False,
**kwargs: Any,
) -> "LlamaIndexTool":
# Implementation details
```
## Parameters
The `from_query_engine` method accepts the following parameters:
- **query_engine**: Required. The LlamaIndex query engine to wrap.
- **name**: Optional. The name of the tool.
- **description**: Optional. The description of the tool.
- **return_direct**: Optional. Whether to return the response directly. Default is `False`.
## Conclusion
The `LlamaIndexTool` provides a powerful way to integrate LlamaIndex's capabilities into CrewAI agents. By wrapping LlamaIndex tools and query engines, it enables agents to leverage sophisticated data retrieval and processing functionalities, enhancing their ability to work with complex information sources.

View File

@@ -1,128 +0,0 @@
---
title: MultiOn Tool
description: The `MultiOnTool` empowers CrewAI agents with the capability to navigate and interact with the web through natural language instructions.
icon: globe
---
# `MultiOnTool`
## Description
The `MultiOnTool` is designed to wrap [MultiOn's](https://docs.multion.ai/welcome) web browsing capabilities, enabling CrewAI agents to control web browsers using natural language instructions. This tool facilitates seamless web browsing, making it an essential asset for projects requiring dynamic web data interaction and automation of web-based tasks.
## Installation
To use this tool, you need to install the MultiOn package:
```shell
uv add multion
```
You'll also need to install the MultiOn browser extension and enable API usage.
## Steps to Get Started
To effectively use the `MultiOnTool`, follow these steps:
1. **Install CrewAI**: Ensure that the `crewai[tools]` package is installed in your Python environment.
2. **Install and use MultiOn**: Follow [MultiOn documentation](https://docs.multion.ai/learn/browser-extension) for installing the MultiOn Browser Extension.
3. **Enable API Usage**: Click on the MultiOn extension in the extensions folder of your browser (not the hovering MultiOn icon on the web page) to open the extension configurations. Click the API Enabled toggle to enable the API.
## Example
The following example demonstrates how to initialize the tool and execute a web browsing task:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import MultiOnTool
# Initialize the tool
multion_tool = MultiOnTool(api_key="YOUR_MULTION_API_KEY", local=False)
# Define an agent that uses the tool
browser_agent = Agent(
role="Browser Agent",
goal="Control web browsers using natural language",
backstory="An expert browsing agent.",
tools=[multion_tool],
verbose=True,
)
# Example task to search and summarize news
browse_task = Task(
description="Summarize the top 3 trending AI News headlines",
expected_output="A summary of the top 3 trending AI News headlines",
agent=browser_agent,
)
# Create and run the crew
crew = Crew(agents=[browser_agent], tasks=[browse_task])
result = crew.kickoff()
```
## Parameters
The `MultiOnTool` accepts the following parameters during initialization:
- **api_key**: Optional. Specifies the MultiOn API key. If not provided, it will look for the `MULTION_API_KEY` environment variable.
- **local**: Optional. Set to `True` to run the agent locally on your browser. Make sure the MultiOn browser extension is installed and API Enabled is checked. Default is `False`.
- **max_steps**: Optional. Sets the maximum number of steps the MultiOn agent can take for a command. Default is `3`.
## Usage
When using the `MultiOnTool`, the agent will provide natural language instructions that the tool translates into web browsing actions. The tool returns the results of the browsing session along with a status.
```python Code
# Example of using the tool with an agent
browser_agent = Agent(
role="Web Browser Agent",
goal="Search for and summarize information from the web",
backstory="An expert at finding and extracting information from websites.",
tools=[multion_tool],
verbose=True,
)
# Create a task for the agent
search_task = Task(
description="Search for the latest AI news on TechCrunch and summarize the top 3 headlines",
expected_output="A summary of the top 3 AI news headlines from TechCrunch",
agent=browser_agent,
)
# Run the task
crew = Crew(agents=[browser_agent], tasks=[search_task])
result = crew.kickoff()
```
If the status returned is `CONTINUE`, the agent should be instructed to reissue the same instruction to continue execution.
## Implementation Details
The `MultiOnTool` is implemented as a subclass of `BaseTool` from CrewAI. It wraps the MultiOn client to provide web browsing capabilities:
```python Code
class MultiOnTool(BaseTool):
"""Tool to wrap MultiOn Browse Capabilities."""
name: str = "Multion Browse Tool"
description: str = """Multion gives the ability for LLMs to control web browsers using natural language instructions.
If the status is 'CONTINUE', reissue the same instruction to continue execution
"""
# Implementation details...
def _run(self, cmd: str, *args: Any, **kwargs: Any) -> str:
"""
Run the Multion client with the given command.
Args:
cmd (str): The detailed and specific natural language instruction for web browsing
*args (Any): Additional arguments to pass to the Multion client
**kwargs (Any): Additional keyword arguments to pass to the Multion client
"""
# Implementation details...
```
## Conclusion
The `MultiOnTool` provides a powerful way to integrate web browsing capabilities into CrewAI agents. By enabling agents to interact with websites through natural language instructions, it opens up a wide range of possibilities for web-based tasks, from data collection and research to automated interactions with web services.

View File

@@ -1,195 +0,0 @@
---
title: Patronus Evaluation Tools
description: The Patronus evaluation tools enable CrewAI agents to evaluate and score model inputs and outputs using the Patronus AI platform.
icon: check
---
# `Patronus Evaluation Tools`
## Description
The [Patronus evaluation tools](https://patronus.ai) are designed to enable CrewAI agents to evaluate and score model inputs and outputs using the Patronus AI platform. These tools provide different levels of control over the evaluation process, from allowing agents to select the most appropriate evaluator and criteria to using predefined criteria or custom local evaluators.
There are three main Patronus evaluation tools:
1. **PatronusEvalTool**: Allows agents to select the most appropriate evaluator and criteria for the evaluation task.
2. **PatronusPredefinedCriteriaEvalTool**: Uses predefined evaluator and criteria specified by the user.
3. **PatronusLocalEvaluatorTool**: Uses custom function evaluators defined by the user.
## Installation
To use these tools, you need to install the Patronus package:
```shell
uv add patronus
```
You'll also need to set up your Patronus API key as an environment variable:
```shell
export PATRONUS_API_KEY="your_patronus_api_key"
```
## Steps to Get Started
To effectively use the Patronus evaluation tools, follow these steps:
1. **Install Patronus**: Install the Patronus package using the command above.
2. **Set Up API Key**: Set your Patronus API key as an environment variable.
3. **Choose the Right Tool**: Select the appropriate Patronus evaluation tool based on your needs.
4. **Configure the Tool**: Configure the tool with the necessary parameters.
## Examples
### Using PatronusEvalTool
The following example demonstrates how to use the `PatronusEvalTool`, which allows agents to select the most appropriate evaluator and criteria:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import PatronusEvalTool
# Initialize the tool
patronus_eval_tool = PatronusEvalTool()
# Define an agent that uses the tool
coding_agent = Agent(
role="Coding Agent",
goal="Generate high quality code and verify that the output is code",
backstory="An experienced coder who can generate high quality python code.",
tools=[patronus_eval_tool],
verbose=True,
)
# Example task to generate and evaluate code
generate_code_task = Task(
description="Create a simple program to generate the first N numbers in the Fibonacci sequence. Select the most appropriate evaluator and criteria for evaluating your output.",
expected_output="Program that generates the first N numbers in the Fibonacci sequence.",
agent=coding_agent,
)
# Create and run the crew
crew = Crew(agents=[coding_agent], tasks=[generate_code_task])
result = crew.kickoff()
```
### Using PatronusPredefinedCriteriaEvalTool
The following example demonstrates how to use the `PatronusPredefinedCriteriaEvalTool`, which uses predefined evaluator and criteria:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import PatronusPredefinedCriteriaEvalTool
# Initialize the tool with predefined criteria
patronus_eval_tool = PatronusPredefinedCriteriaEvalTool(
evaluators=[{"evaluator": "judge", "criteria": "contains-code"}]
)
# Define an agent that uses the tool
coding_agent = Agent(
role="Coding Agent",
goal="Generate high quality code",
backstory="An experienced coder who can generate high quality python code.",
tools=[patronus_eval_tool],
verbose=True,
)
# Example task to generate code
generate_code_task = Task(
description="Create a simple program to generate the first N numbers in the Fibonacci sequence.",
expected_output="Program that generates the first N numbers in the Fibonacci sequence.",
agent=coding_agent,
)
# Create and run the crew
crew = Crew(agents=[coding_agent], tasks=[generate_code_task])
result = crew.kickoff()
```
### Using PatronusLocalEvaluatorTool
The following example demonstrates how to use the `PatronusLocalEvaluatorTool`, which uses custom function evaluators:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import PatronusLocalEvaluatorTool
from patronus import Client, EvaluationResult
import random
# Initialize the Patronus client
client = Client()
# Register a custom evaluator
@client.register_local_evaluator("random_evaluator")
def random_evaluator(**kwargs):
score = random.random()
return EvaluationResult(
score_raw=score,
pass_=score >= 0.5,
explanation="example explanation",
)
# Initialize the tool with the custom evaluator
patronus_eval_tool = PatronusLocalEvaluatorTool(
patronus_client=client,
evaluator="random_evaluator",
evaluated_model_gold_answer="example label",
)
# Define an agent that uses the tool
coding_agent = Agent(
role="Coding Agent",
goal="Generate high quality code",
backstory="An experienced coder who can generate high quality python code.",
tools=[patronus_eval_tool],
verbose=True,
)
# Example task to generate code
generate_code_task = Task(
description="Create a simple program to generate the first N numbers in the Fibonacci sequence.",
expected_output="Program that generates the first N numbers in the Fibonacci sequence.",
agent=coding_agent,
)
# Create and run the crew
crew = Crew(agents=[coding_agent], tasks=[generate_code_task])
result = crew.kickoff()
```
## Parameters
### PatronusEvalTool
The `PatronusEvalTool` does not require any parameters during initialization. It automatically fetches available evaluators and criteria from the Patronus API.
### PatronusPredefinedCriteriaEvalTool
The `PatronusPredefinedCriteriaEvalTool` accepts the following parameters during initialization:
- **evaluators**: Required. A list of dictionaries containing the evaluator and criteria to use. For example: `[{"evaluator": "judge", "criteria": "contains-code"}]`.
### PatronusLocalEvaluatorTool
The `PatronusLocalEvaluatorTool` accepts the following parameters during initialization:
- **patronus_client**: Required. The Patronus client instance.
- **evaluator**: Optional. The name of the registered local evaluator to use. Default is an empty string.
- **evaluated_model_gold_answer**: Optional. The gold answer to use for evaluation. Default is an empty string.
## Usage
When using the Patronus evaluation tools, you provide the model input, output, and context, and the tool returns the evaluation results from the Patronus API.
For the `PatronusEvalTool` and `PatronusPredefinedCriteriaEvalTool`, the following parameters are required when calling the tool:
- **evaluated_model_input**: The agent's task description in simple text.
- **evaluated_model_output**: The agent's output of the task.
- **evaluated_model_retrieved_context**: The agent's context.
For the `PatronusLocalEvaluatorTool`, the same parameters are required, but the evaluator and gold answer are specified during initialization.
## Conclusion
The Patronus evaluation tools provide a powerful way to evaluate and score model inputs and outputs using the Patronus AI platform. By enabling agents to evaluate their own outputs or the outputs of other agents, these tools can help improve the quality and reliability of CrewAI workflows.

View File

@@ -1,271 +0,0 @@
---
title: 'Qdrant Vector Search Tool'
description: 'Semantic search capabilities for CrewAI agents using Qdrant vector database'
icon: magnifying-glass-plus
---
# `QdrantVectorSearchTool`
The Qdrant Vector Search Tool enables semantic search capabilities in your CrewAI agents by leveraging [Qdrant](https://qdrant.tech/), a vector similarity search engine. This tool allows your agents to search through documents stored in a Qdrant collection using semantic similarity.
## Installation
Install the required packages:
```bash
uv add qdrant-client
```
## Basic Usage
Here's a minimal example of how to use the tool:
```python
from crewai import Agent
from crewai_tools import QdrantVectorSearchTool
# Initialize the tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url="your_qdrant_url",
qdrant_api_key="your_qdrant_api_key",
collection_name="your_collection"
)
# Create an agent that uses the tool
agent = Agent(
role="Research Assistant",
goal="Find relevant information in documents",
tools=[qdrant_tool]
)
# The tool will automatically use OpenAI embeddings
# and return the 3 most relevant results with scores > 0.35
```
## Complete Working Example
Here's a complete example showing how to:
1. Extract text from a PDF
2. Generate embeddings using OpenAI
3. Store in Qdrant
4. Create a CrewAI agentic RAG workflow for semantic search
```python
import os
import uuid
import pdfplumber
from openai import OpenAI
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process, LLM
from crewai_tools import QdrantVectorSearchTool
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
# Load environment variables
load_dotenv()
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Extract text from PDF
def extract_text_from_pdf(pdf_path):
text = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text.append(page_text.strip())
return text
# Generate OpenAI embeddings
def get_openai_embedding(text):
response = client.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return response.data[0].embedding
# Store text and embeddings in Qdrant
def load_pdf_to_qdrant(pdf_path, qdrant, collection_name):
# Extract text from PDF
text_chunks = extract_text_from_pdf(pdf_path)
# Create Qdrant collection
if qdrant.collection_exists(collection_name):
qdrant.delete_collection(collection_name)
qdrant.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
# Store embeddings
points = []
for chunk in text_chunks:
embedding = get_openai_embedding(chunk)
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={"text": chunk}
))
qdrant.upsert(collection_name=collection_name, points=points)
# Initialize Qdrant client and load data
qdrant = QdrantClient(
url=os.getenv("QDRANT_URL"),
api_key=os.getenv("QDRANT_API_KEY")
)
collection_name = "example_collection"
pdf_path = "path/to/your/document.pdf"
load_pdf_to_qdrant(pdf_path, qdrant, collection_name)
# Initialize Qdrant search tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url=os.getenv("QDRANT_URL"),
qdrant_api_key=os.getenv("QDRANT_API_KEY"),
collection_name=collection_name,
limit=3,
score_threshold=0.35
)
# Create CrewAI agents
search_agent = Agent(
role="Senior Semantic Search Agent",
goal="Find and analyze documents based on semantic search",
backstory="""You are an expert research assistant who can find relevant
information using semantic search in a Qdrant database.""",
tools=[qdrant_tool],
verbose=True
)
answer_agent = Agent(
role="Senior Answer Assistant",
goal="Generate answers to questions based on the context provided",
backstory="""You are an expert answer assistant who can generate
answers to questions based on the context provided.""",
tools=[qdrant_tool],
verbose=True
)
# Define tasks
search_task = Task(
description="""Search for relevant documents about the {query}.
Your final answer should include:
- The relevant information found
- The similarity scores of the results
- The metadata of the relevant documents""",
agent=search_agent
)
answer_task = Task(
description="""Given the context and metadata of relevant documents,
generate a final answer based on the context.""",
agent=answer_agent
)
# Run CrewAI workflow
crew = Crew(
agents=[search_agent, answer_agent],
tasks=[search_task, answer_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff(
inputs={"query": "What is the role of X in the document?"}
)
print(result)
```
## Tool Parameters
### Required Parameters
- `qdrant_url` (str): The URL of your Qdrant server
- `qdrant_api_key` (str): API key for authentication with Qdrant
- `collection_name` (str): Name of the Qdrant collection to search
### Optional Parameters
- `limit` (int): Maximum number of results to return (default: 3)
- `score_threshold` (float): Minimum similarity score threshold (default: 0.35)
- `custom_embedding_fn` (Callable[[str], list[float]]): Custom function for text vectorization
## Search Parameters
The tool accepts these parameters in its schema:
- `query` (str): The search query to find similar documents
- `filter_by` (str, optional): Metadata field to filter on
- `filter_value` (str, optional): Value to filter by
## Return Format
The tool returns results in JSON format:
```json
[
{
"metadata": {
// Any metadata stored with the document
},
"context": "The actual text content of the document",
"distance": 0.95 // Similarity score
}
]
```
## Default Embedding
By default, the tool uses OpenAI's `text-embedding-3-small` model for vectorization. This requires:
- OpenAI API key set in environment: `OPENAI_API_KEY`
## Custom Embeddings
Instead of using the default embedding model, you might want to use your own embedding function in cases where you:
1. Want to use a different embedding model (e.g., Cohere, HuggingFace, Ollama models)
2. Need to reduce costs by using open-source embedding models
3. Have specific requirements for vector dimensions or embedding quality
4. Want to use domain-specific embeddings (e.g., for medical or legal text)
Here's an example using a HuggingFace model:
```python
from transformers import AutoTokenizer, AutoModel
import torch
# Load model and tokenizer
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
def custom_embeddings(text: str) -> list[float]:
# Tokenize and get model outputs
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
outputs = model(**inputs)
# Use mean pooling to get text embedding
embeddings = outputs.last_hidden_state.mean(dim=1)
# Convert to list of floats and return
return embeddings[0].tolist()
# Use custom embeddings with the tool
tool = QdrantVectorSearchTool(
qdrant_url="your_url",
qdrant_api_key="your_key",
collection_name="your_collection",
custom_embedding_fn=custom_embeddings # Pass your custom function
)
```
## Error Handling
The tool handles these specific errors:
- Raises ImportError if `qdrant-client` is not installed (with option to auto-install)
- Raises ValueError if `QDRANT_URL` is not set
- Prompts to install `qdrant-client` if missing using `uv add qdrant-client`
## Environment Variables
Required environment variables:
```bash
export QDRANT_URL="your_qdrant_url" # If not provided in constructor
export QDRANT_API_KEY="your_api_key" # If not provided in constructor
export OPENAI_API_KEY="your_openai_key" # If using default embeddings

View File

@@ -1,154 +0,0 @@
---
title: RAG Tool
description: The `RagTool` is a dynamic knowledge base tool for answering questions using Retrieval-Augmented Generation.
icon: vector-square
---
# `RagTool`
## Description
The `RagTool` is designed to answer questions by leveraging the power of Retrieval-Augmented Generation (RAG) through EmbedChain.
It provides a dynamic knowledge base that can be queried to retrieve relevant information from various data sources.
This tool is particularly useful for applications that require access to a vast array of information and need to provide contextually relevant answers.
## Example
The following example demonstrates how to initialize the tool and use it with different data sources:
```python Code
from crewai_tools import RagTool
# Create a RAG tool with default settings
rag_tool = RagTool()
# Add content from a file
rag_tool.add(data_type="file", path="path/to/your/document.pdf")
# Add content from a web page
rag_tool.add(data_type="web_page", url="https://example.com")
# Define an agent with the RagTool
@agent
def knowledge_expert(self) -> Agent:
'''
This agent uses the RagTool to answer questions about the knowledge base.
'''
return Agent(
config=self.agents_config["knowledge_expert"],
allow_delegation=False,
tools=[rag_tool]
)
```
## Supported Data Sources
The `RagTool` can be used with a wide variety of data sources, including:
- 📰 PDF files
- 📊 CSV files
- 📃 JSON files
- 📝 Text
- 📁 Directories/Folders
- 🌐 HTML Web pages
- 📽️ YouTube Channels
- 📺 YouTube Videos
- 📚 Documentation websites
- 📝 MDX files
- 📄 DOCX files
- 🧾 XML files
- 📬 Gmail
- 📝 GitHub repositories
- 🐘 PostgreSQL databases
- 🐬 MySQL databases
- 🤖 Slack conversations
- 💬 Discord messages
- 🗨️ Discourse forums
- 📝 Substack newsletters
- 🐝 Beehiiv content
- 💾 Dropbox files
- 🖼️ Images
- ⚙️ Custom data sources
## Parameters
The `RagTool` accepts the following parameters:
- **summarize**: Optional. Whether to summarize the retrieved content. Default is `False`.
- **adapter**: Optional. A custom adapter for the knowledge base. If not provided, an EmbedchainAdapter will be used.
- **config**: Optional. Configuration for the underlying EmbedChain App.
## Adding Content
You can add content to the knowledge base using the `add` method:
```python Code
# Add a PDF file
rag_tool.add(data_type="file", path="path/to/your/document.pdf")
# Add a web page
rag_tool.add(data_type="web_page", url="https://example.com")
# Add a YouTube video
rag_tool.add(data_type="youtube_video", url="https://www.youtube.com/watch?v=VIDEO_ID")
# Add a directory of files
rag_tool.add(data_type="directory", path="path/to/your/directory")
```
## Agent Integration Example
Here's how to integrate the `RagTool` with a CrewAI agent:
```python Code
from crewai import Agent
from crewai.project import agent
from crewai_tools import RagTool
# Initialize the tool and add content
rag_tool = RagTool()
rag_tool.add(data_type="web_page", url="https://docs.crewai.com")
rag_tool.add(data_type="file", path="company_data.pdf")
# Define an agent with the RagTool
@agent
def knowledge_expert(self) -> Agent:
return Agent(
config=self.agents_config["knowledge_expert"],
allow_delegation=False,
tools=[rag_tool]
)
```
## Advanced Configuration
You can customize the behavior of the `RagTool` by providing a configuration dictionary:
```python Code
from crewai_tools import RagTool
# Create a RAG tool with custom configuration
config = {
"app": {
"name": "custom_app",
},
"llm": {
"provider": "openai",
"config": {
"model": "gpt-4",
}
},
"embedder": {
"provider": "openai",
"config": {
"model": "text-embedding-ada-002"
}
}
}
rag_tool = RagTool(config=config, summarize=True)
```
## Conclusion
The `RagTool` provides a powerful way to create and query knowledge bases from various data sources. By leveraging Retrieval-Augmented Generation, it enables agents to access and retrieve relevant information efficiently, enhancing their ability to provide accurate and contextually appropriate responses.

View File

@@ -1,144 +0,0 @@
---
title: S3 Reader Tool
description: The `S3ReaderTool` enables CrewAI agents to read files from Amazon S3 buckets.
icon: aws
---
# `S3ReaderTool`
## Description
The `S3ReaderTool` is designed to read files from Amazon S3 buckets. This tool allows CrewAI agents to access and retrieve content stored in S3, making it ideal for workflows that require reading data, configuration files, or any other content stored in AWS S3 storage.
## Installation
To use this tool, you need to install the required dependencies:
```shell
uv add boto3
```
## Steps to Get Started
To effectively use the `S3ReaderTool`, follow these steps:
1. **Install Dependencies**: Install the required packages using the command above.
2. **Configure AWS Credentials**: Set up your AWS credentials as environment variables.
3. **Initialize the Tool**: Create an instance of the tool.
4. **Specify S3 Path**: Provide the S3 path to the file you want to read.
## Example
The following example demonstrates how to use the `S3ReaderTool` to read a file from an S3 bucket:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools.aws.s3 import S3ReaderTool
# Initialize the tool
s3_reader_tool = S3ReaderTool()
# Define an agent that uses the tool
file_reader_agent = Agent(
role="File Reader",
goal="Read files from S3 buckets",
backstory="An expert in retrieving and processing files from cloud storage.",
tools=[s3_reader_tool],
verbose=True,
)
# Example task to read a configuration file
read_task = Task(
description="Read the configuration file from {my_bucket} and summarize its contents.",
expected_output="A summary of the configuration file contents.",
agent=file_reader_agent,
)
# Create and run the crew
crew = Crew(agents=[file_reader_agent], tasks=[read_task])
result = crew.kickoff(inputs={"my_bucket": "s3://my-bucket/config/app-config.json"})
```
## Parameters
The `S3ReaderTool` accepts the following parameter when used by an agent:
- **file_path**: Required. The S3 file path in the format `s3://bucket-name/file-name`.
## AWS Credentials
The tool requires AWS credentials to access S3 buckets. You can configure these credentials using environment variables:
- **CREW_AWS_REGION**: The AWS region where your S3 bucket is located. Default is `us-east-1`.
- **CREW_AWS_ACCESS_KEY_ID**: Your AWS access key ID.
- **CREW_AWS_SEC_ACCESS_KEY**: Your AWS secret access key.
## Usage
When using the `S3ReaderTool` with an agent, the agent will need to provide the S3 file path:
```python Code
# Example of using the tool with an agent
file_reader_agent = Agent(
role="File Reader",
goal="Read files from S3 buckets",
backstory="An expert in retrieving and processing files from cloud storage.",
tools=[s3_reader_tool],
verbose=True,
)
# Create a task for the agent to read a specific file
read_config_task = Task(
description="Read the application configuration file from {my_bucket} and extract the database connection settings.",
expected_output="The database connection settings from the configuration file.",
agent=file_reader_agent,
)
# Run the task
crew = Crew(agents=[file_reader_agent], tasks=[read_config_task])
result = crew.kickoff(inputs={"my_bucket": "s3://my-bucket/config/app-config.json"})
```
## Error Handling
The `S3ReaderTool` includes error handling for common S3 issues:
- Invalid S3 path format
- Missing or inaccessible files
- Permission issues
- AWS credential problems
When an error occurs, the tool will return an error message that includes details about the issue.
## Implementation Details
The `S3ReaderTool` uses the AWS SDK for Python (boto3) to interact with S3:
```python Code
class S3ReaderTool(BaseTool):
name: str = "S3 Reader Tool"
description: str = "Reads a file from Amazon S3 given an S3 file path"
def _run(self, file_path: str) -> str:
try:
bucket_name, object_key = self._parse_s3_path(file_path)
s3 = boto3.client(
's3',
region_name=os.getenv('CREW_AWS_REGION', 'us-east-1'),
aws_access_key_id=os.getenv('CREW_AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('CREW_AWS_SEC_ACCESS_KEY')
)
# Read file content from S3
response = s3.get_object(Bucket=bucket_name, Key=object_key)
file_content = response['Body'].read().decode('utf-8')
return file_content
except ClientError as e:
return f"Error reading file from S3: {str(e)}"
```
## Conclusion
The `S3ReaderTool` provides a straightforward way to read files from Amazon S3 buckets. By enabling agents to access content stored in S3, it facilitates workflows that require cloud-based file access. This tool is particularly useful for data processing, configuration management, and any task that involves retrieving information from AWS S3 storage.

View File

@@ -1,150 +0,0 @@
---
title: S3 Writer Tool
description: The `S3WriterTool` enables CrewAI agents to write content to files in Amazon S3 buckets.
icon: aws
---
# `S3WriterTool`
## Description
The `S3WriterTool` is designed to write content to files in Amazon S3 buckets. This tool allows CrewAI agents to create or update files in S3, making it ideal for workflows that require storing data, saving configuration files, or persisting any other content to AWS S3 storage.
## Installation
To use this tool, you need to install the required dependencies:
```shell
uv add boto3
```
## Steps to Get Started
To effectively use the `S3WriterTool`, follow these steps:
1. **Install Dependencies**: Install the required packages using the command above.
2. **Configure AWS Credentials**: Set up your AWS credentials as environment variables.
3. **Initialize the Tool**: Create an instance of the tool.
4. **Specify S3 Path and Content**: Provide the S3 path where you want to write the file and the content to be written.
## Example
The following example demonstrates how to use the `S3WriterTool` to write content to a file in an S3 bucket:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools.aws.s3 import S3WriterTool
# Initialize the tool
s3_writer_tool = S3WriterTool()
# Define an agent that uses the tool
file_writer_agent = Agent(
role="File Writer",
goal="Write content to files in S3 buckets",
backstory="An expert in storing and managing files in cloud storage.",
tools=[s3_writer_tool],
verbose=True,
)
# Example task to write a report
write_task = Task(
description="Generate a summary report of the quarterly sales data and save it to {my_bucket}.",
expected_output="Confirmation that the report was successfully saved to S3.",
agent=file_writer_agent,
)
# Create and run the crew
crew = Crew(agents=[file_writer_agent], tasks=[write_task])
result = crew.kickoff(inputs={"my_bucket": "s3://my-bucket/reports/quarterly-summary.txt"})
```
## Parameters
The `S3WriterTool` accepts the following parameters when used by an agent:
- **file_path**: Required. The S3 file path in the format `s3://bucket-name/file-name`.
- **content**: Required. The content to write to the file.
## AWS Credentials
The tool requires AWS credentials to access S3 buckets. You can configure these credentials using environment variables:
- **CREW_AWS_REGION**: The AWS region where your S3 bucket is located. Default is `us-east-1`.
- **CREW_AWS_ACCESS_KEY_ID**: Your AWS access key ID.
- **CREW_AWS_SEC_ACCESS_KEY**: Your AWS secret access key.
## Usage
When using the `S3WriterTool` with an agent, the agent will need to provide both the S3 file path and the content to write:
```python Code
# Example of using the tool with an agent
file_writer_agent = Agent(
role="File Writer",
goal="Write content to files in S3 buckets",
backstory="An expert in storing and managing files in cloud storage.",
tools=[s3_writer_tool],
verbose=True,
)
# Create a task for the agent to write a specific file
write_config_task = Task(
description="""
Create a configuration file with the following database settings:
- host: db.example.com
- port: 5432
- username: app_user
- password: secure_password
Save this configuration as JSON to {my_bucket}.
""",
expected_output="Confirmation that the configuration file was successfully saved to S3.",
agent=file_writer_agent,
)
# Run the task
crew = Crew(agents=[file_writer_agent], tasks=[write_config_task])
result = crew.kickoff(inputs={"my_bucket": "s3://my-bucket/config/db-config.json"})
```
## Error Handling
The `S3WriterTool` includes error handling for common S3 issues:
- Invalid S3 path format
- Permission issues (e.g., no write access to the bucket)
- AWS credential problems
- Bucket does not exist
When an error occurs, the tool will return an error message that includes details about the issue.
## Implementation Details
The `S3WriterTool` uses the AWS SDK for Python (boto3) to interact with S3:
```python Code
class S3WriterTool(BaseTool):
name: str = "S3 Writer Tool"
description: str = "Writes content to a file in Amazon S3 given an S3 file path"
def _run(self, file_path: str, content: str) -> str:
try:
bucket_name, object_key = self._parse_s3_path(file_path)
s3 = boto3.client(
's3',
region_name=os.getenv('CREW_AWS_REGION', 'us-east-1'),
aws_access_key_id=os.getenv('CREW_AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('CREW_AWS_SEC_ACCESS_KEY')
)
s3.put_object(Bucket=bucket_name, Key=object_key, Body=content.encode('utf-8'))
return f"Successfully wrote content to {file_path}"
except ClientError as e:
return f"Error writing file to S3: {str(e)}"
```
## Conclusion
The `S3WriterTool` provides a straightforward way to write content to files in Amazon S3 buckets. By enabling agents to create and update files in S3, it facilitates workflows that require cloud-based file storage. This tool is particularly useful for data persistence, configuration management, report generation, and any task that involves storing information in AWS S3 storage.

View File

@@ -1,139 +0,0 @@
---
title: Scrape Element From Website Tool
description: The `ScrapeElementFromWebsiteTool` enables CrewAI agents to extract specific elements from websites using CSS selectors.
icon: code
---
# `ScrapeElementFromWebsiteTool`
## Description
The `ScrapeElementFromWebsiteTool` is designed to extract specific elements from websites using CSS selectors. This tool allows CrewAI agents to scrape targeted content from web pages, making it useful for data extraction tasks where only specific parts of a webpage are needed.
## Installation
To use this tool, you need to install the required dependencies:
```shell
uv add requests beautifulsoup4
```
## Steps to Get Started
To effectively use the `ScrapeElementFromWebsiteTool`, follow these steps:
1. **Install Dependencies**: Install the required packages using the command above.
2. **Identify CSS Selectors**: Determine the CSS selectors for the elements you want to extract from the website.
3. **Initialize the Tool**: Create an instance of the tool with the necessary parameters.
## Example
The following example demonstrates how to use the `ScrapeElementFromWebsiteTool` to extract specific elements from a website:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import ScrapeElementFromWebsiteTool
# Initialize the tool
scrape_tool = ScrapeElementFromWebsiteTool()
# Define an agent that uses the tool
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract specific information from websites",
backstory="An expert in web scraping who can extract targeted content from web pages.",
tools=[scrape_tool],
verbose=True,
)
# Example task to extract headlines from a news website
scrape_task = Task(
description="Extract the main headlines from the CNN homepage. Use the CSS selector '.headline' to target the headline elements.",
expected_output="A list of the main headlines from CNN.",
agent=web_scraper_agent,
)
# Create and run the crew
crew = Crew(agents=[web_scraper_agent], tasks=[scrape_task])
result = crew.kickoff()
```
You can also initialize the tool with predefined parameters:
```python Code
# Initialize the tool with predefined parameters
scrape_tool = ScrapeElementFromWebsiteTool(
website_url="https://www.example.com",
css_element=".main-content"
)
```
## Parameters
The `ScrapeElementFromWebsiteTool` accepts the following parameters during initialization:
- **website_url**: Optional. The URL of the website to scrape. If provided during initialization, the agent won't need to specify it when using the tool.
- **css_element**: Optional. The CSS selector for the elements to extract. If provided during initialization, the agent won't need to specify it when using the tool.
- **cookies**: Optional. A dictionary containing cookies to be sent with the request. This can be useful for websites that require authentication.
## Usage
When using the `ScrapeElementFromWebsiteTool` with an agent, the agent will need to provide the following parameters (unless they were specified during initialization):
- **website_url**: The URL of the website to scrape.
- **css_element**: The CSS selector for the elements to extract.
The tool will return the text content of all elements matching the CSS selector, joined by newlines.
```python Code
# Example of using the tool with an agent
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract specific elements from websites",
backstory="An expert in web scraping who can extract targeted content using CSS selectors.",
tools=[scrape_tool],
verbose=True,
)
# Create a task for the agent to extract specific elements
extract_task = Task(
description="""
Extract all product titles from the featured products section on example.com.
Use the CSS selector '.product-title' to target the title elements.
""",
expected_output="A list of product titles from the website",
agent=web_scraper_agent,
)
# Run the task through a crew
crew = Crew(agents=[web_scraper_agent], tasks=[extract_task])
result = crew.kickoff()
```
## Implementation Details
The `ScrapeElementFromWebsiteTool` uses the `requests` library to fetch the web page and `BeautifulSoup` to parse the HTML and extract the specified elements:
```python Code
class ScrapeElementFromWebsiteTool(BaseTool):
name: str = "Read a website content"
description: str = "A tool that can be used to read a website content."
# Implementation details...
def _run(self, **kwargs: Any) -> Any:
website_url = kwargs.get("website_url", self.website_url)
css_element = kwargs.get("css_element", self.css_element)
page = requests.get(
website_url,
headers=self.headers,
cookies=self.cookies if self.cookies else {},
)
parsed = BeautifulSoup(page.content, "html.parser")
elements = parsed.select(css_element)
return "\n".join([element.get_text() for element in elements])
```
## Conclusion
The `ScrapeElementFromWebsiteTool` provides a powerful way to extract specific elements from websites using CSS selectors. By enabling agents to target only the content they need, it makes web scraping tasks more efficient and focused. This tool is particularly useful for data extraction, content monitoring, and research tasks where specific information needs to be extracted from web pages.

View File

@@ -1,196 +0,0 @@
---
title: Scrapegraph Scrape Tool
description: The `ScrapegraphScrapeTool` leverages Scrapegraph AI's SmartScraper API to intelligently extract content from websites.
icon: chart-area
---
# `ScrapegraphScrapeTool`
## Description
The `ScrapegraphScrapeTool` is designed to leverage Scrapegraph AI's SmartScraper API to intelligently extract content from websites. This tool provides advanced web scraping capabilities with AI-powered content extraction, making it ideal for targeted data collection and content analysis tasks. Unlike traditional web scrapers, it can understand the context and structure of web pages to extract the most relevant information based on natural language prompts.
## Installation
To use this tool, you need to install the Scrapegraph Python client:
```shell
uv add scrapegraph-py
```
You'll also need to set up your Scrapegraph API key as an environment variable:
```shell
export SCRAPEGRAPH_API_KEY="your_api_key"
```
You can obtain an API key from [Scrapegraph AI](https://scrapegraphai.com).
## Steps to Get Started
To effectively use the `ScrapegraphScrapeTool`, follow these steps:
1. **Install Dependencies**: Install the required package using the command above.
2. **Set Up API Key**: Set your Scrapegraph API key as an environment variable or provide it during initialization.
3. **Initialize the Tool**: Create an instance of the tool with the necessary parameters.
4. **Define Extraction Prompts**: Create natural language prompts to guide the extraction of specific content.
## Example
The following example demonstrates how to use the `ScrapegraphScrapeTool` to extract content from a website:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import ScrapegraphScrapeTool
# Initialize the tool
scrape_tool = ScrapegraphScrapeTool(api_key="your_api_key")
# Define an agent that uses the tool
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract specific information from websites",
backstory="An expert in web scraping who can extract targeted content from web pages.",
tools=[scrape_tool],
verbose=True,
)
# Example task to extract product information from an e-commerce site
scrape_task = Task(
description="Extract product names, prices, and descriptions from the featured products section of example.com.",
expected_output="A structured list of product information including names, prices, and descriptions.",
agent=web_scraper_agent,
)
# Create and run the crew
crew = Crew(agents=[web_scraper_agent], tasks=[scrape_task])
result = crew.kickoff()
```
You can also initialize the tool with predefined parameters:
```python Code
# Initialize the tool with predefined parameters
scrape_tool = ScrapegraphScrapeTool(
website_url="https://www.example.com",
user_prompt="Extract all product prices and descriptions",
api_key="your_api_key"
)
```
## Parameters
The `ScrapegraphScrapeTool` accepts the following parameters during initialization:
- **api_key**: Optional. Your Scrapegraph API key. If not provided, it will look for the `SCRAPEGRAPH_API_KEY` environment variable.
- **website_url**: Optional. The URL of the website to scrape. If provided during initialization, the agent won't need to specify it when using the tool.
- **user_prompt**: Optional. Custom instructions for content extraction. If provided during initialization, the agent won't need to specify it when using the tool.
- **enable_logging**: Optional. Whether to enable logging for the Scrapegraph client. Default is `False`.
## Usage
When using the `ScrapegraphScrapeTool` with an agent, the agent will need to provide the following parameters (unless they were specified during initialization):
- **website_url**: The URL of the website to scrape.
- **user_prompt**: Optional. Custom instructions for content extraction. Default is "Extract the main content of the webpage".
The tool will return the extracted content based on the provided prompt.
```python Code
# Example of using the tool with an agent
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract specific information from websites",
backstory="An expert in web scraping who can extract targeted content from web pages.",
tools=[scrape_tool],
verbose=True,
)
# Create a task for the agent to extract specific content
extract_task = Task(
description="Extract the main heading and summary from example.com",
expected_output="The main heading and summary from the website",
agent=web_scraper_agent,
)
# Run the task
crew = Crew(agents=[web_scraper_agent], tasks=[extract_task])
result = crew.kickoff()
```
## Error Handling
The `ScrapegraphScrapeTool` may raise the following exceptions:
- **ValueError**: When API key is missing or URL format is invalid.
- **RateLimitError**: When API rate limits are exceeded.
- **RuntimeError**: When scraping operation fails (network issues, API errors).
It's recommended to instruct agents to handle potential errors gracefully:
```python Code
# Create a task that includes error handling instructions
robust_extract_task = Task(
description="""
Extract the main heading from example.com.
Be aware that you might encounter errors such as:
- Invalid URL format
- Missing API key
- Rate limit exceeded
- Network or API errors
If you encounter any errors, provide a clear explanation of what went wrong
and suggest possible solutions.
""",
expected_output="Either the extracted heading or a clear error explanation",
agent=web_scraper_agent,
)
```
## Rate Limiting
The Scrapegraph API has rate limits that vary based on your subscription plan. Consider the following best practices:
- Implement appropriate delays between requests when processing multiple URLs.
- Handle rate limit errors gracefully in your application.
- Check your API plan limits on the Scrapegraph dashboard.
## Implementation Details
The `ScrapegraphScrapeTool` uses the Scrapegraph Python client to interact with the SmartScraper API:
```python Code
class ScrapegraphScrapeTool(BaseTool):
"""
A tool that uses Scrapegraph AI to intelligently scrape website content.
"""
# Implementation details...
def _run(self, **kwargs: Any) -> Any:
website_url = kwargs.get("website_url", self.website_url)
user_prompt = (
kwargs.get("user_prompt", self.user_prompt)
or "Extract the main content of the webpage"
)
if not website_url:
raise ValueError("website_url is required")
# Validate URL format
self._validate_url(website_url)
try:
# Make the SmartScraper request
response = self._client.smartscraper(
website_url=website_url,
user_prompt=user_prompt,
)
return response
# Error handling...
```
## Conclusion
The `ScrapegraphScrapeTool` provides a powerful way to extract content from websites using AI-powered understanding of web page structure. By enabling agents to target specific information using natural language prompts, it makes web scraping tasks more efficient and focused. This tool is particularly useful for data extraction, content monitoring, and research tasks where specific information needs to be extracted from web pages.

View File

@@ -1,220 +0,0 @@
---
title: Scrapfly Scrape Website Tool
description: The `ScrapflyScrapeWebsiteTool` leverages Scrapfly's web scraping API to extract content from websites in various formats.
icon: spider
---
# `ScrapflyScrapeWebsiteTool`
## Description
The `ScrapflyScrapeWebsiteTool` is designed to leverage [Scrapfly](https://scrapfly.io/)'s web scraping API to extract content from websites. This tool provides advanced web scraping capabilities with headless browser support, proxies, and anti-bot bypass features. It allows for extracting web page data in various formats, including raw HTML, markdown, and plain text, making it ideal for a wide range of web scraping tasks.
## Installation
To use this tool, you need to install the Scrapfly SDK:
```shell
uv add scrapfly-sdk
```
You'll also need to obtain a Scrapfly API key by registering at [scrapfly.io/register](https://www.scrapfly.io/register/).
## Steps to Get Started
To effectively use the `ScrapflyScrapeWebsiteTool`, follow these steps:
1. **Install Dependencies**: Install the Scrapfly SDK using the command above.
2. **Obtain API Key**: Register at Scrapfly to get your API key.
3. **Initialize the Tool**: Create an instance of the tool with your API key.
4. **Configure Scraping Parameters**: Customize the scraping parameters based on your needs.
## Example
The following example demonstrates how to use the `ScrapflyScrapeWebsiteTool` to extract content from a website:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import ScrapflyScrapeWebsiteTool
# Initialize the tool
scrape_tool = ScrapflyScrapeWebsiteTool(api_key="your_scrapfly_api_key")
# Define an agent that uses the tool
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract information from websites",
backstory="An expert in web scraping who can extract content from any website.",
tools=[scrape_tool],
verbose=True,
)
# Example task to extract content from a website
scrape_task = Task(
description="Extract the main content from the product page at https://web-scraping.dev/products and summarize the available products.",
expected_output="A summary of the products available on the website.",
agent=web_scraper_agent,
)
# Create and run the crew
crew = Crew(agents=[web_scraper_agent], tasks=[scrape_task])
result = crew.kickoff()
```
You can also customize the scraping parameters:
```python Code
# Example with custom scraping parameters
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract information from websites with custom parameters",
backstory="An expert in web scraping who can extract content from any website.",
tools=[scrape_tool],
verbose=True,
)
# The agent will use the tool with parameters like:
# url="https://web-scraping.dev/products"
# scrape_format="markdown"
# ignore_scrape_failures=True
# scrape_config={
# "asp": True, # Bypass scraping blocking solutions, like Cloudflare
# "render_js": True, # Enable JavaScript rendering with a cloud headless browser
# "proxy_pool": "public_residential_pool", # Select a proxy pool
# "country": "us", # Select a proxy location
# "auto_scroll": True, # Auto scroll the page
# }
scrape_task = Task(
description="Extract the main content from the product page at https://web-scraping.dev/products using advanced scraping options including JavaScript rendering and proxy settings.",
expected_output="A detailed summary of the products with all available information.",
agent=web_scraper_agent,
)
```
## Parameters
The `ScrapflyScrapeWebsiteTool` accepts the following parameters:
### Initialization Parameters
- **api_key**: Required. Your Scrapfly API key.
### Run Parameters
- **url**: Required. The URL of the website to scrape.
- **scrape_format**: Optional. The format in which to extract the web page content. Options are "raw" (HTML), "markdown", or "text". Default is "markdown".
- **scrape_config**: Optional. A dictionary containing additional Scrapfly scraping configuration options.
- **ignore_scrape_failures**: Optional. Whether to ignore failures during scraping. If set to `True`, the tool will return `None` instead of raising an exception when scraping fails.
## Scrapfly Configuration Options
The `scrape_config` parameter allows you to customize the scraping behavior with the following options:
- **asp**: Enable anti-scraping protection bypass.
- **render_js**: Enable JavaScript rendering with a cloud headless browser.
- **proxy_pool**: Select a proxy pool (e.g., "public_residential_pool", "datacenter").
- **country**: Select a proxy location (e.g., "us", "uk").
- **auto_scroll**: Automatically scroll the page to load lazy-loaded content.
- **js**: Execute custom JavaScript code by the headless browser.
For a complete list of configuration options, refer to the [Scrapfly API documentation](https://scrapfly.io/docs/scrape-api/getting-started).
## Usage
When using the `ScrapflyScrapeWebsiteTool` with an agent, the agent will need to provide the URL of the website to scrape and can optionally specify the format and additional configuration options:
```python Code
# Example of using the tool with an agent
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract information from websites",
backstory="An expert in web scraping who can extract content from any website.",
tools=[scrape_tool],
verbose=True,
)
# Create a task for the agent
scrape_task = Task(
description="Extract the main content from example.com in markdown format.",
expected_output="The main content of example.com in markdown format.",
agent=web_scraper_agent,
)
# Run the task
crew = Crew(agents=[web_scraper_agent], tasks=[scrape_task])
result = crew.kickoff()
```
For more advanced usage with custom configuration:
```python Code
# Create a task with more specific instructions
advanced_scrape_task = Task(
description="""
Extract content from example.com with the following requirements:
- Convert the content to plain text format
- Enable JavaScript rendering
- Use a US-based proxy
- Handle any scraping failures gracefully
""",
expected_output="The extracted content from example.com",
agent=web_scraper_agent,
)
```
## Error Handling
By default, the `ScrapflyScrapeWebsiteTool` will raise an exception if scraping fails. Agents can be instructed to handle failures gracefully by specifying the `ignore_scrape_failures` parameter:
```python Code
# Create a task that instructs the agent to handle errors
error_handling_task = Task(
description="""
Extract content from a potentially problematic website and make sure to handle any
scraping failures gracefully by setting ignore_scrape_failures to True.
""",
expected_output="Either the extracted content or a graceful error message",
agent=web_scraper_agent,
)
```
## Implementation Details
The `ScrapflyScrapeWebsiteTool` uses the Scrapfly SDK to interact with the Scrapfly API:
```python Code
class ScrapflyScrapeWebsiteTool(BaseTool):
name: str = "Scrapfly web scraping API tool"
description: str = (
"Scrape a webpage url using Scrapfly and return its content as markdown or text"
)
# Implementation details...
def _run(
self,
url: str,
scrape_format: str = "markdown",
scrape_config: Optional[Dict[str, Any]] = None,
ignore_scrape_failures: Optional[bool] = None,
):
from scrapfly import ScrapeApiResponse, ScrapeConfig
scrape_config = scrape_config if scrape_config is not None else {}
try:
response: ScrapeApiResponse = self.scrapfly.scrape(
ScrapeConfig(url, format=scrape_format, **scrape_config)
)
return response.scrape_result["content"]
except Exception as e:
if ignore_scrape_failures:
logger.error(f"Error fetching data from {url}, exception: {e}")
return None
else:
raise e
```
## Conclusion
The `ScrapflyScrapeWebsiteTool` provides a powerful way to extract content from websites using Scrapfly's advanced web scraping capabilities. With features like headless browser support, proxies, and anti-bot bypass, it can handle complex websites and extract content in various formats. This tool is particularly useful for data extraction, content monitoring, and research tasks where reliable web scraping is required.

View File

@@ -13,183 +13,64 @@ icon: clipboard-user
## Description
The `SeleniumScrapingTool` is crafted for high-efficiency web scraping tasks.
The SeleniumScrapingTool is crafted for high-efficiency web scraping tasks.
It allows for precise extraction of content from web pages by using CSS selectors to target specific elements.
Its design caters to a wide range of scraping needs, offering flexibility to work with any provided website URL.
## Installation
To use this tool, you need to install the CrewAI tools package and Selenium:
To get started with the SeleniumScrapingTool, install the crewai_tools package using pip:
```shell
pip install 'crewai[tools]'
uv add selenium webdriver-manager
```
You'll also need to have Chrome installed on your system, as the tool uses Chrome WebDriver for browser automation.
## Usage Examples
## Example
The following example demonstrates how to use the `SeleniumScrapingTool` with a CrewAI agent:
Below are some scenarios where the SeleniumScrapingTool can be utilized:
```python Code
from crewai import Agent, Task, Crew, Process
from crewai_tools import SeleniumScrapingTool
# Initialize the tool
selenium_tool = SeleniumScrapingTool()
# Example 1:
# Initialize the tool without any parameters to scrape
# the current page it navigates to
tool = SeleniumScrapingTool()
# Define an agent that uses the tool
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract information from websites using Selenium",
backstory="An expert web scraper who can extract content from dynamic websites.",
tools=[selenium_tool],
verbose=True,
# Example 2:
# Scrape the entire webpage of a given URL
tool = SeleniumScrapingTool(website_url='https://example.com')
# Example 3:
# Target and scrape a specific CSS element from a webpage
tool = SeleniumScrapingTool(
website_url='https://example.com',
css_element='.main-content'
)
# Example task to scrape content from a website
scrape_task = Task(
description="Extract the main content from the homepage of example.com. Use the CSS selector 'main' to target the main content area.",
expected_output="The main content from example.com's homepage.",
agent=web_scraper_agent,
)
# Create and run the crew
crew = Crew(
agents=[web_scraper_agent],
tasks=[scrape_task],
verbose=True,
process=Process.sequential,
)
result = crew.kickoff()
```
You can also initialize the tool with predefined parameters:
```python Code
# Initialize the tool with predefined parameters
selenium_tool = SeleniumScrapingTool(
# Example 4:
# Perform scraping with additional parameters for a customized experience
tool = SeleniumScrapingTool(
website_url='https://example.com',
css_element='.main-content',
wait_time=5
)
# Define an agent that uses the tool
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract information from websites using Selenium",
backstory="An expert web scraper who can extract content from dynamic websites.",
tools=[selenium_tool],
verbose=True,
cookie={'name': 'user', 'value': 'John Doe'},
wait_time=10
)
```
## Parameters
## Arguments
The `SeleniumScrapingTool` accepts the following parameters during initialization:
The following parameters can be used to customize the SeleniumScrapingTool's scraping process:
- **website_url**: Optional. The URL of the website to scrape. If provided during initialization, the agent won't need to specify it when using the tool.
- **css_element**: Optional. The CSS selector for the elements to extract. If provided during initialization, the agent won't need to specify it when using the tool.
- **cookie**: Optional. A dictionary containing cookie information, useful for simulating a logged-in session to access restricted content.
- **wait_time**: Optional. Specifies the delay (in seconds) before scraping, allowing the website and any dynamic content to fully load. Default is `3` seconds.
- **return_html**: Optional. Whether to return the HTML content instead of just the text. Default is `False`.
| Argument | Type | Description |
|:---------------|:---------|:-------------------------------------------------------------------------------------------------------------------------------------|
| **website_url** | `string` | **Mandatory**. Specifies the URL of the website from which content is to be scraped. |
| **css_element** | `string` | **Mandatory**. The CSS selector for a specific element to target on the website, enabling focused scraping of a particular part of a webpage. |
| **cookie** | `object` | **Optional**. A dictionary containing cookie information, useful for simulating a logged-in session to access restricted content. |
| **wait_time** | `int` | **Optional**. Specifies the delay (in seconds) before scraping, allowing the website and any dynamic content to fully load. |
When using the tool with an agent, the agent will need to provide the following parameters (unless they were specified during initialization):
- **website_url**: Required. The URL of the website to scrape.
- **css_element**: Required. The CSS selector for the elements to extract.
## Agent Integration Example
Here's a more detailed example of how to integrate the `SeleniumScrapingTool` with a CrewAI agent:
```python Code
from crewai import Agent, Task, Crew, Process
from crewai_tools import SeleniumScrapingTool
# Initialize the tool
selenium_tool = SeleniumScrapingTool()
# Define an agent that uses the tool
web_scraper_agent = Agent(
role="Web Scraper",
goal="Extract and analyze information from dynamic websites",
backstory="""You are an expert web scraper who specializes in extracting
content from dynamic websites that require browser automation. You have
extensive knowledge of CSS selectors and can identify the right selectors
to target specific content on any website.""",
tools=[selenium_tool],
verbose=True,
)
# Create a task for the agent
scrape_task = Task(
description="""
Extract the following information from the news website at {website_url}:
1. The headlines of all featured articles (CSS selector: '.headline')
2. The publication dates of these articles (CSS selector: '.pub-date')
3. The author names where available (CSS selector: '.author')
Compile this information into a structured format with each article's details grouped together.
""",
expected_output="A structured list of articles with their headlines, publication dates, and authors.",
agent=web_scraper_agent,
)
# Run the task
crew = Crew(
agents=[web_scraper_agent],
tasks=[scrape_task],
verbose=True,
process=Process.sequential,
)
result = crew.kickoff(inputs={"website_url": "https://news-example.com"})
```
## Implementation Details
The `SeleniumScrapingTool` uses Selenium WebDriver to automate browser interactions:
```python Code
class SeleniumScrapingTool(BaseTool):
name: str = "Read a website content"
description: str = "A tool that can be used to read a website content."
args_schema: Type[BaseModel] = SeleniumScrapingToolSchema
def _run(self, **kwargs: Any) -> Any:
website_url = kwargs.get("website_url", self.website_url)
css_element = kwargs.get("css_element", self.css_element)
return_html = kwargs.get("return_html", self.return_html)
driver = self._create_driver(website_url, self.cookie, self.wait_time)
content = self._get_content(driver, css_element, return_html)
driver.close()
return "\n".join(content)
```
The tool performs the following steps:
1. Creates a headless Chrome browser instance
2. Navigates to the specified URL
3. Waits for the specified time to allow the page to load
4. Adds any cookies if provided
5. Extracts content based on the CSS selector
6. Returns the extracted content as text or HTML
7. Closes the browser instance
## Handling Dynamic Content
The `SeleniumScrapingTool` is particularly useful for scraping websites with dynamic content that is loaded via JavaScript. By using a real browser instance, it can:
1. Execute JavaScript on the page
2. Wait for dynamic content to load
3. Interact with elements if needed
4. Extract content that would not be available with simple HTTP requests
You can adjust the `wait_time` parameter to ensure that all dynamic content has loaded before extraction.
## Conclusion
The `SeleniumScrapingTool` provides a powerful way to extract content from websites using browser automation. By enabling agents to interact with websites as a real user would, it facilitates scraping of dynamic content that would be difficult or impossible to extract using simpler methods. This tool is particularly useful for research, data collection, and monitoring tasks that involve modern web applications with JavaScript-rendered content.
<Warning>
Since the `SeleniumScrapingTool` is under active development, the parameters and functionality may evolve over time.
Users are encouraged to keep the tool updated and report any issues or suggestions for enhancements.
</Warning>

View File

@@ -1,202 +0,0 @@
---
title: Snowflake Search Tool
description: The `SnowflakeSearchTool` enables CrewAI agents to execute SQL queries and perform semantic search on Snowflake data warehouses.
icon: snowflake
---
# `SnowflakeSearchTool`
## Description
The `SnowflakeSearchTool` is designed to connect to Snowflake data warehouses and execute SQL queries with advanced features like connection pooling, retry logic, and asynchronous execution. This tool allows CrewAI agents to interact with Snowflake databases, making it ideal for data analysis, reporting, and business intelligence tasks that require access to enterprise data stored in Snowflake.
## Installation
To use this tool, you need to install the required dependencies:
```shell
uv add cryptography snowflake-connector-python snowflake-sqlalchemy
```
Or alternatively:
```shell
uv sync --extra snowflake
```
## Steps to Get Started
To effectively use the `SnowflakeSearchTool`, follow these steps:
1. **Install Dependencies**: Install the required packages using one of the commands above.
2. **Configure Snowflake Connection**: Create a `SnowflakeConfig` object with your Snowflake credentials.
3. **Initialize the Tool**: Create an instance of the tool with the necessary configuration.
4. **Execute Queries**: Use the tool to run SQL queries against your Snowflake database.
## Example
The following example demonstrates how to use the `SnowflakeSearchTool` to query data from a Snowflake database:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import SnowflakeSearchTool, SnowflakeConfig
# Create Snowflake configuration
config = SnowflakeConfig(
account="your_account",
user="your_username",
password="your_password",
warehouse="COMPUTE_WH",
database="your_database",
snowflake_schema="your_schema"
)
# Initialize the tool
snowflake_tool = SnowflakeSearchTool(config=config)
# Define an agent that uses the tool
data_analyst_agent = Agent(
role="Data Analyst",
goal="Analyze data from Snowflake database",
backstory="An expert data analyst who can extract insights from enterprise data.",
tools=[snowflake_tool],
verbose=True,
)
# Example task to query sales data
query_task = Task(
description="Query the sales data for the last quarter and summarize the top 5 products by revenue.",
expected_output="A summary of the top 5 products by revenue for the last quarter.",
agent=data_analyst_agent,
)
# Create and run the crew
crew = Crew(agents=[data_analyst_agent],
tasks=[query_task])
result = crew.kickoff()
```
You can also customize the tool with additional parameters:
```python Code
# Initialize the tool with custom parameters
snowflake_tool = SnowflakeSearchTool(
config=config,
pool_size=10,
max_retries=5,
retry_delay=2.0,
enable_caching=True
)
```
## Parameters
### SnowflakeConfig Parameters
The `SnowflakeConfig` class accepts the following parameters:
- **account**: Required. Snowflake account identifier.
- **user**: Required. Snowflake username.
- **password**: Optional*. Snowflake password.
- **private_key_path**: Optional*. Path to private key file (alternative to password).
- **warehouse**: Required. Snowflake warehouse name.
- **database**: Required. Default database.
- **snowflake_schema**: Required. Default schema.
- **role**: Optional. Snowflake role.
- **session_parameters**: Optional. Custom session parameters as a dictionary.
*Either `password` or `private_key_path` must be provided.
### SnowflakeSearchTool Parameters
The `SnowflakeSearchTool` accepts the following parameters during initialization:
- **config**: Required. A `SnowflakeConfig` object containing connection details.
- **pool_size**: Optional. Number of connections in the pool. Default is 5.
- **max_retries**: Optional. Maximum retry attempts for failed queries. Default is 3.
- **retry_delay**: Optional. Delay between retries in seconds. Default is 1.0.
- **enable_caching**: Optional. Whether to enable query result caching. Default is True.
## Usage
When using the `SnowflakeSearchTool`, you need to provide the following parameters:
- **query**: Required. The SQL query to execute.
- **database**: Optional. Override the default database specified in the config.
- **snowflake_schema**: Optional. Override the default schema specified in the config.
- **timeout**: Optional. Query timeout in seconds. Default is 300.
The tool will return the query results as a list of dictionaries, where each dictionary represents a row with column names as keys.
```python Code
# Example of using the tool with an agent
data_analyst = Agent(
role="Data Analyst",
goal="Analyze sales data from Snowflake",
backstory="An expert data analyst with experience in SQL and data visualization.",
tools=[snowflake_tool],
verbose=True
)
# The agent will use the tool with parameters like:
# query="SELECT product_name, SUM(revenue) as total_revenue FROM sales GROUP BY product_name ORDER BY total_revenue DESC LIMIT 5"
# timeout=600
# Create a task for the agent
analysis_task = Task(
description="Query the sales database and identify the top 5 products by revenue for the last quarter.",
expected_output="A detailed analysis of the top 5 products by revenue.",
agent=data_analyst
)
# Run the task
crew = Crew(
agents=[data_analyst],
tasks=[analysis_task]
)
result = crew.kickoff()
```
## Advanced Features
### Connection Pooling
The `SnowflakeSearchTool` implements connection pooling to improve performance by reusing database connections. You can control the pool size with the `pool_size` parameter.
### Automatic Retries
The tool automatically retries failed queries with exponential backoff. You can configure the retry behavior with the `max_retries` and `retry_delay` parameters.
### Query Result Caching
To improve performance for repeated queries, the tool can cache query results. This feature is enabled by default but can be disabled by setting `enable_caching=False`.
### Key-Pair Authentication
In addition to password authentication, the tool supports key-pair authentication for enhanced security:
```python Code
config = SnowflakeConfig(
account="your_account",
user="your_username",
private_key_path="/path/to/your/private/key.p8",
warehouse="COMPUTE_WH",
database="your_database",
snowflake_schema="your_schema"
)
```
## Error Handling
The `SnowflakeSearchTool` includes comprehensive error handling for common Snowflake issues:
- Connection failures
- Query timeouts
- Authentication errors
- Database and schema errors
When an error occurs, the tool will attempt to retry the operation (if configured) and provide detailed error information.
## Conclusion
The `SnowflakeSearchTool` provides a powerful way to integrate Snowflake data warehouses with CrewAI agents. With features like connection pooling, automatic retries, and query caching, it enables efficient and reliable access to enterprise data. This tool is particularly useful for data analysis, reporting, and business intelligence tasks that require access to structured data stored in Snowflake.

View File

@@ -1,164 +0,0 @@
---
title: Weaviate Vector Search
description: The `WeaviateVectorSearchTool` is designed to search a Weaviate vector database for semantically similar documents.
icon: database
---
# `WeaviateVectorSearchTool`
## Description
The `WeaviateVectorSearchTool` is specifically crafted for conducting semantic searches within documents stored in a Weaviate vector database. This tool allows you to find semantically similar documents to a given query, leveraging the power of vector embeddings for more accurate and contextually relevant search results.
[Weaviate](https://weaviate.io/) is a vector database that stores and queries vector embeddings, enabling semantic search capabilities.
## Installation
To incorporate this tool into your project, you need to install the Weaviate client:
```shell
uv add weaviate-client
```
## Steps to Get Started
To effectively use the `WeaviateVectorSearchTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` and `weaviate-client` packages are installed in your Python environment.
2. **Weaviate Setup**: Set up a Weaviate cluster. You can follow the [Weaviate documentation](https://weaviate.io/developers/wcs/connect) for instructions.
3. **API Keys**: Obtain your Weaviate cluster URL and API key.
4. **OpenAI API Key**: Ensure you have an OpenAI API key set in your environment variables as `OPENAI_API_KEY`.
## Example
The following example demonstrates how to initialize the tool and execute a search:
```python Code
from crewai_tools import WeaviateVectorSearchTool
# Initialize the tool
tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)
@agent
def search_agent(self) -> Agent:
'''
This agent uses the WeaviateVectorSearchTool to search for
semantically similar documents in a Weaviate vector database.
'''
return Agent(
config=self.agents_config["search_agent"],
tools=[tool]
)
```
## Parameters
The `WeaviateVectorSearchTool` accepts the following parameters:
- **collection_name**: Required. The name of the collection to search within.
- **weaviate_cluster_url**: Required. The URL of the Weaviate cluster.
- **weaviate_api_key**: Required. The API key for the Weaviate cluster.
- **limit**: Optional. The number of results to return. Default is `3`.
- **vectorizer**: Optional. The vectorizer to use. If not provided, it will use `text2vec_openai` with the `nomic-embed-text` model.
- **generative_model**: Optional. The generative model to use. If not provided, it will use OpenAI's `gpt-4o`.
## Advanced Configuration
You can customize the vectorizer and generative model used by the tool:
```python Code
from crewai_tools import WeaviateVectorSearchTool
from weaviate.classes.config import Configure
# Setup custom model for vectorizer and generative model
tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
vectorizer=Configure.Vectorizer.text2vec_openai(model="nomic-embed-text"),
generative_model=Configure.Generative.openai(model="gpt-4o-mini"),
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)
```
## Preloading Documents
You can preload your Weaviate database with documents before using the tool:
```python Code
import os
from crewai_tools import WeaviateVectorSearchTool
import weaviate
from weaviate.classes.init import Auth
# Connect to Weaviate
client = weaviate.connect_to_weaviate_cloud(
cluster_url="https://your-weaviate-cluster-url.com",
auth_credentials=Auth.api_key("your-weaviate-api-key"),
headers={"X-OpenAI-Api-Key": "your-openai-api-key"}
)
# Get or create collection
test_docs = client.collections.get("example_collections")
if not test_docs:
test_docs = client.collections.create(
name="example_collections",
vectorizer_config=Configure.Vectorizer.text2vec_openai(model="nomic-embed-text"),
generative_config=Configure.Generative.openai(model="gpt-4o"),
)
# Load documents
docs_to_load = os.listdir("knowledge")
with test_docs.batch.dynamic() as batch:
for d in docs_to_load:
with open(os.path.join("knowledge", d), "r") as f:
content = f.read()
batch.add_object(
{
"content": content,
"year": d.split("_")[0],
}
)
# Initialize the tool
tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)
```
## Agent Integration Example
Here's how to integrate the `WeaviateVectorSearchTool` with a CrewAI agent:
```python Code
from crewai import Agent
from crewai_tools import WeaviateVectorSearchTool
# Initialize the tool
weaviate_tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)
# Create an agent with the tool
rag_agent = Agent(
name="rag_agent",
role="You are a helpful assistant that can answer questions with the help of the WeaviateVectorSearchTool.",
llm="gpt-4o-mini",
tools=[weaviate_tool],
)
```
## Conclusion
The `WeaviateVectorSearchTool` provides a powerful way to search for semantically similar documents in a Weaviate vector database. By leveraging vector embeddings, it enables more accurate and contextually relevant search results compared to traditional keyword-based searches. This tool is particularly useful for applications that require finding information based on meaning rather than exact matches.

View File

@@ -27,73 +27,31 @@ pip install 'crewai[tools]'
## Example
The following example demonstrates how to use the `YoutubeChannelSearchTool` with a CrewAI agent:
To begin using the YoutubeChannelSearchTool, follow the example below.
This demonstrates initializing the tool with a specific Youtube channel handle and conducting a search within that channel's content.
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import YoutubeChannelSearchTool
# Initialize the tool for general YouTube channel searches
youtube_channel_tool = YoutubeChannelSearchTool()
# Initialize the tool to search within any Youtube channel's content the agent learns about during its execution
tool = YoutubeChannelSearchTool()
# Define an agent that uses the tool
channel_researcher = Agent(
role="Channel Researcher",
goal="Extract relevant information from YouTube channels",
backstory="An expert researcher who specializes in analyzing YouTube channel content.",
tools=[youtube_channel_tool],
verbose=True,
)
# OR
# Example task to search for information in a specific channel
research_task = Task(
description="Search for information about machine learning tutorials in the YouTube channel {youtube_channel_handle}",
expected_output="A summary of the key machine learning tutorials available on the channel.",
agent=channel_researcher,
)
# Create and run the crew
crew = Crew(agents=[channel_researcher], tasks=[research_task])
result = crew.kickoff(inputs={"youtube_channel_handle": "@exampleChannel"})
# Initialize the tool with a specific Youtube channel handle to target your search
tool = YoutubeChannelSearchTool(youtube_channel_handle='@exampleChannel')
```
You can also initialize the tool with a specific YouTube channel handle:
## Arguments
```python Code
# Initialize the tool with a specific YouTube channel handle
youtube_channel_tool = YoutubeChannelSearchTool(
youtube_channel_handle='@exampleChannel'
)
- `youtube_channel_handle` : A mandatory string representing the Youtube channel handle. This parameter is crucial for initializing the tool to specify the channel you want to search within. The tool is designed to only search within the content of the provided channel handle.
# Define an agent that uses the tool
channel_researcher = Agent(
role="Channel Researcher",
goal="Extract relevant information from a specific YouTube channel",
backstory="An expert researcher who specializes in analyzing YouTube channel content.",
tools=[youtube_channel_tool],
verbose=True,
)
```
## Parameters
The `YoutubeChannelSearchTool` accepts the following parameters:
- **youtube_channel_handle**: Optional. The handle of the YouTube channel to search within. If provided during initialization, the agent won't need to specify it when using the tool. If the handle doesn't start with '@', it will be automatically added.
- **config**: Optional. Configuration for the underlying RAG system, including LLM and embedder settings.
- **summarize**: Optional. Whether to summarize the retrieved content. Default is `False`.
When using the tool with an agent, the agent will need to provide:
- **search_query**: Required. The search query to find relevant information in the channel content.
- **youtube_channel_handle**: Required only if not provided during initialization. The handle of the YouTube channel to search within.
## Custom Model and Embeddings
## Custom model and embeddings
By default, the tool uses OpenAI for both embeddings and summarization. To customize the model, you can use a config dictionary as follows:
```python Code
youtube_channel_tool = YoutubeChannelSearchTool(
```python Code
tool = YoutubeChannelSearchTool(
config=dict(
llm=dict(
provider="ollama", # or google, openai, anthropic, llama2, ...
@@ -114,81 +72,4 @@ youtube_channel_tool = YoutubeChannelSearchTool(
),
)
)
```
## Agent Integration Example
Here's a more detailed example of how to integrate the `YoutubeChannelSearchTool` with a CrewAI agent:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import YoutubeChannelSearchTool
# Initialize the tool
youtube_channel_tool = YoutubeChannelSearchTool()
# Define an agent that uses the tool
channel_researcher = Agent(
role="Channel Researcher",
goal="Extract and analyze information from YouTube channels",
backstory="""You are an expert channel researcher who specializes in extracting
and analyzing information from YouTube channels. You have a keen eye for detail
and can quickly identify key points and insights from video content across an entire channel.""",
tools=[youtube_channel_tool],
verbose=True,
)
# Create a task for the agent
research_task = Task(
description="""
Search for information about data science projects and tutorials
in the YouTube channel {youtube_channel_handle}.
Focus on:
1. Key data science techniques covered
2. Popular tutorial series
3. Most viewed or recommended videos
Provide a comprehensive summary of these points.
""",
expected_output="A detailed summary of data science content available on the channel.",
agent=channel_researcher,
)
# Run the task
crew = Crew(agents=[channel_researcher], tasks=[research_task])
result = crew.kickoff(inputs={"youtube_channel_handle": "@exampleDataScienceChannel"})
```
## Implementation Details
The `YoutubeChannelSearchTool` is implemented as a subclass of `RagTool`, which provides the base functionality for Retrieval-Augmented Generation:
```python Code
class YoutubeChannelSearchTool(RagTool):
name: str = "Search a Youtube Channels content"
description: str = "A tool that can be used to semantic search a query from a Youtube Channels content."
args_schema: Type[BaseModel] = YoutubeChannelSearchToolSchema
def __init__(self, youtube_channel_handle: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
if youtube_channel_handle is not None:
kwargs["data_type"] = DataType.YOUTUBE_CHANNEL
self.add(youtube_channel_handle)
self.description = f"A tool that can be used to semantic search a query the {youtube_channel_handle} Youtube Channels content."
self.args_schema = FixedYoutubeChannelSearchToolSchema
self._generate_description()
def add(
self,
youtube_channel_handle: str,
**kwargs: Any,
) -> None:
if not youtube_channel_handle.startswith("@"):
youtube_channel_handle = f"@{youtube_channel_handle}"
super().add(youtube_channel_handle, **kwargs)
```
## Conclusion
The `YoutubeChannelSearchTool` provides a powerful way to search and extract information from YouTube channel content using RAG techniques. By enabling agents to search across an entire channel's videos, it facilitates information extraction and analysis tasks that would otherwise be difficult to perform. This tool is particularly useful for research, content analysis, and knowledge extraction from YouTube channels.
```

View File

@@ -29,73 +29,35 @@ pip install 'crewai[tools]'
## Example
The following example demonstrates how to use the `YoutubeVideoSearchTool` with a CrewAI agent:
To integrate the YoutubeVideoSearchTool into your Python projects, follow the example below.
This demonstrates how to use the tool both for general Youtube content searches and for targeted searches within a specific video's content.
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import YoutubeVideoSearchTool
# Initialize the tool for general YouTube video searches
youtube_search_tool = YoutubeVideoSearchTool()
# General search across Youtube content without specifying a video URL,
# so the agent can search within any Youtube video content
# it learns about its url during its operation
tool = YoutubeVideoSearchTool()
# Define an agent that uses the tool
video_researcher = Agent(
role="Video Researcher",
goal="Extract relevant information from YouTube videos",
backstory="An expert researcher who specializes in analyzing video content.",
tools=[youtube_search_tool],
verbose=True,
)
# Example task to search for information in a specific video
research_task = Task(
description="Search for information about machine learning frameworks in the YouTube video at {youtube_video_url}",
expected_output="A summary of the key machine learning frameworks mentioned in the video.",
agent=video_researcher,
)
# Create and run the crew
crew = Crew(agents=[video_researcher], tasks=[research_task])
result = crew.kickoff(inputs={"youtube_video_url": "https://youtube.com/watch?v=example"})
```
You can also initialize the tool with a specific YouTube video URL:
```python Code
# Initialize the tool with a specific YouTube video URL
youtube_search_tool = YoutubeVideoSearchTool(
# Targeted search within a specific Youtube video's content
tool = YoutubeVideoSearchTool(
youtube_video_url='https://youtube.com/watch?v=example'
)
# Define an agent that uses the tool
video_researcher = Agent(
role="Video Researcher",
goal="Extract relevant information from a specific YouTube video",
backstory="An expert researcher who specializes in analyzing video content.",
tools=[youtube_search_tool],
verbose=True,
)
```
## Parameters
## Arguments
The `YoutubeVideoSearchTool` accepts the following parameters:
The YoutubeVideoSearchTool accepts the following initialization arguments:
- **youtube_video_url**: Optional. The URL of the YouTube video to search within. If provided during initialization, the agent won't need to specify it when using the tool.
- **config**: Optional. Configuration for the underlying RAG system, including LLM and embedder settings.
- **summarize**: Optional. Whether to summarize the retrieved content. Default is `False`.
- `youtube_video_url`: An optional argument at initialization but required if targeting a specific Youtube video. It specifies the Youtube video URL path you want to search within.
When using the tool with an agent, the agent will need to provide:
- **search_query**: Required. The search query to find relevant information in the video content.
- **youtube_video_url**: Required only if not provided during initialization. The URL of the YouTube video to search within.
## Custom Model and Embeddings
## Custom model and embeddings
By default, the tool uses OpenAI for both embeddings and summarization. To customize the model, you can use a config dictionary as follows:
```python Code
youtube_search_tool = YoutubeVideoSearchTool(
tool = YoutubeVideoSearchTool(
config=dict(
llm=dict(
provider="ollama", # or google, openai, anthropic, llama2, ...
@@ -116,72 +78,4 @@ youtube_search_tool = YoutubeVideoSearchTool(
),
)
)
```
## Agent Integration Example
Here's a more detailed example of how to integrate the `YoutubeVideoSearchTool` with a CrewAI agent:
```python Code
from crewai import Agent, Task, Crew
from crewai_tools import YoutubeVideoSearchTool
# Initialize the tool
youtube_search_tool = YoutubeVideoSearchTool()
# Define an agent that uses the tool
video_researcher = Agent(
role="Video Researcher",
goal="Extract and analyze information from YouTube videos",
backstory="""You are an expert video researcher who specializes in extracting
and analyzing information from YouTube videos. You have a keen eye for detail
and can quickly identify key points and insights from video content.""",
tools=[youtube_search_tool],
verbose=True,
)
# Create a task for the agent
research_task = Task(
description="""
Search for information about recent advancements in artificial intelligence
in the YouTube video at {youtube_video_url}.
Focus on:
1. Key AI technologies mentioned
2. Real-world applications discussed
3. Future predictions made by the speaker
Provide a comprehensive summary of these points.
""",
expected_output="A detailed summary of AI advancements, applications, and future predictions from the video.",
agent=video_researcher,
)
# Run the task
crew = Crew(agents=[video_researcher], tasks=[research_task])
result = crew.kickoff(inputs={"youtube_video_url": "https://youtube.com/watch?v=example"})
```
## Implementation Details
The `YoutubeVideoSearchTool` is implemented as a subclass of `RagTool`, which provides the base functionality for Retrieval-Augmented Generation:
```python Code
class YoutubeVideoSearchTool(RagTool):
name: str = "Search a Youtube Video content"
description: str = "A tool that can be used to semantic search a query from a Youtube Video content."
args_schema: Type[BaseModel] = YoutubeVideoSearchToolSchema
def __init__(self, youtube_video_url: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
if youtube_video_url is not None:
kwargs["data_type"] = DataType.YOUTUBE_VIDEO
self.add(youtube_video_url)
self.description = f"A tool that can be used to semantic search a query the {youtube_video_url} Youtube Video content."
self.args_schema = FixedYoutubeVideoSearchToolSchema
self._generate_description()
```
## Conclusion
The `YoutubeVideoSearchTool` provides a powerful way to search and extract information from YouTube video content using RAG techniques. By enabling agents to search within video content, it facilitates information extraction and analysis tasks that would otherwise be difficult to perform. This tool is particularly useful for research, content analysis, and knowledge extraction from video sources.
```

View File

@@ -4,7 +4,7 @@ from crewai.agent import Agent
from crewai.crew import Crew
from crewai.flow.flow import Flow
from crewai.knowledge.knowledge import Knowledge
from crewai.llm import LLM, BaseLLM, DefaultLLM
from crewai.llm import LLM
from crewai.process import Process
from crewai.task import Task
@@ -21,8 +21,6 @@ __all__ = [
"Process",
"Task",
"LLM",
"BaseLLM",
"DefaultLLM",
"Flow",
"Knowledge",
]

View File

@@ -11,7 +11,7 @@ from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
from crewai.llm import LLM, BaseLLM
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.task import Task
from crewai.tools import BaseTool
@@ -70,10 +70,10 @@ class Agent(BaseAgent):
default=True,
description="Use system prompt for the agent.",
)
llm: Union[str, InstanceOf[BaseLLM], Any] = Field(
llm: Union[str, InstanceOf[LLM], Any] = Field(
description="Language model that will run the agent.", default=None
)
function_calling_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
description="Language model that will run the agent.", default=None
)
system_template: Optional[str] = Field(
@@ -116,16 +116,9 @@ class Agent(BaseAgent):
def post_init_setup(self):
self.agent_ops_agent_name = self.role
try:
self.llm = create_llm(self.llm)
except Exception as e:
raise RuntimeError(f"Failed to initialize LLM for agent '{self.role}': {str(e)}")
if self.function_calling_llm and not isinstance(self.function_calling_llm, BaseLLM):
try:
self.function_calling_llm = create_llm(self.function_calling_llm)
except Exception as e:
raise RuntimeError(f"Failed to initialize function calling LLM for agent '{self.role}': {str(e)}")
self.llm = create_llm(self.llm)
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
if not self.agent_executor:
self._setup_agent_executor()

View File

@@ -124,15 +124,14 @@ class CrewAgentParser:
)
def _extract_thought(self, text: str) -> str:
thought_index = text.find("\n\nAction")
if thought_index == -1:
thought_index = text.find("\n\nFinal Answer")
if thought_index == -1:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
regex = r"(.*?)(?:\n\nAction|\n\nFinal Answer)"
thought_match = re.search(regex, text, re.DOTALL)
if thought_match:
thought = thought_match.group(1).strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
return ""
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""

View File

@@ -203,6 +203,7 @@ def install(context):
@crewai.command()
def run():
"""Run the Crew."""
click.echo("Running the Crew")
run_crew()

View File

@@ -14,7 +14,7 @@ from packaging import version
from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
from crewai.crew import Crew
from crewai.llm import LLM, BaseLLM
from crewai.llm import LLM
from crewai.types.crew_chat import ChatInputField, ChatInputs
from crewai.utilities.llm_utils import create_llm
@@ -116,7 +116,7 @@ def show_loading(event: threading.Event):
print()
def initialize_chat_llm(crew: Crew) -> Optional[BaseLLM]:
def initialize_chat_llm(crew: Crew) -> Optional[LLM]:
"""Initializes the chat LLM and handles exceptions."""
try:
return create_llm(crew.chat_llm)
@@ -220,7 +220,7 @@ def get_user_input() -> str:
def handle_user_input(
user_input: str,
chat_llm: BaseLLM,
chat_llm: LLM,
messages: List[Dict[str, str]],
crew_tool_schema: Dict[str, Any],
available_functions: Dict[str, Any],

View File

@@ -1,6 +1,4 @@
import subprocess
from enum import Enum
from typing import List, Optional
import click
from packaging import version
@@ -9,24 +7,16 @@ from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
class CrewType(Enum):
STANDARD = "standard"
FLOW = "flow"
def run_crew() -> None:
"""
Run the crew or flow by running a command in the UV environment.
Starting from version 0.103.0, this command can be used to run both
standard crews and flows. For flows, it detects the type from pyproject.toml
and automatically runs the appropriate command.
Run the crew by running a command in the UV environment.
"""
command = ["uv", "run", "run_crew"]
crewai_version = get_crewai_version()
min_required_version = "0.71.0"
pyproject_data = read_toml()
# Check for legacy poetry configuration
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
):
@@ -36,54 +26,18 @@ def run_crew() -> None:
fg="red",
)
# Determine crew type
is_flow = pyproject_data.get("tool", {}).get("crewai", {}).get("type") == "flow"
crew_type = CrewType.FLOW if is_flow else CrewType.STANDARD
# Display appropriate message
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")
# Execute the appropriate command
execute_command(crew_type)
def execute_command(crew_type: CrewType) -> None:
"""
Execute the appropriate command based on crew type.
Args:
crew_type: The type of crew to run
"""
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
try:
subprocess.run(command, capture_output=False, text=True, check=True)
except subprocess.CalledProcessError as e:
handle_error(e, crew_type)
click.echo(f"An error occurred while running the crew: {e}", err=True)
click.echo(e.output, err=True, nl=True)
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
def handle_error(error: subprocess.CalledProcessError, crew_type: CrewType) -> None:
"""
Handle subprocess errors with appropriate messaging.
Args:
error: The subprocess error that occurred
crew_type: The type of crew that was being run
"""
entity_type = "flow" if crew_type == CrewType.FLOW else "crew"
click.echo(f"An error occurred while running the {entity_type}: {error}", err=True)
if error.output:
click.echo(error.output, err=True, nl=True)
pyproject_data = read_toml()
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, "
"please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)

View File

@@ -1,62 +1,62 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
# If you want to run a snippet of code before or after the crew starts,
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
@CrewBase
class {{crew_name}}():
"""{{crew_name}} crew"""
"""{{crew_name}} crew"""
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
# If you would like to add tools to your agents, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True
)
# If you would like to add tools to your agents, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
output_file='report.md'
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
output_file='report.md'
)
@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)

View File

@@ -30,13 +30,13 @@ crewai install
## Running the Project
To kickstart your flow and begin execution, run this from the root folder of your project:
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
```bash
crewai run
```
This command initializes the {{name}} Flow as defined in your configuration.
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.

View File

@@ -6,9 +6,8 @@ import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar, Union, cast
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from langchain_core.tools import BaseTool as LangchainBaseTool
from pydantic import (
UUID4,
BaseModel,
@@ -27,7 +26,7 @@ from crewai.agents.cache import CacheHandler
from crewai.crews.crew_output import CrewOutput
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM, BaseLLM
from crewai.llm import LLM
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
@@ -37,7 +36,7 @@ from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool, Tool
from crewai.tools.base_tool import Tool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -151,14 +150,14 @@ class Crew(BaseModel):
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
manager_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
manager_llm: Optional[Any] = Field(
description="Language model that will run the agent.", default=None
)
manager_agent: Optional[BaseAgent] = Field(
description="Custom agent that will be used as manager.", default=None
)
function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
description="Language model that will be used for function calling.", default=None
description="Language model that will run the agent.", default=None
)
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
@@ -185,7 +184,7 @@ class Crew(BaseModel):
default=None,
description="Maximum number of requests per minute for the crew execution to be respected.",
)
prompt_file: Optional[str] = Field(
prompt_file: str = Field(
default=None,
description="Path to the prompt json file to be used for the crew.",
)
@@ -197,7 +196,7 @@ class Crew(BaseModel):
default=False,
description="Plan the crew execution and add the plan to the crew.",
)
planning_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
planning_llm: Optional[Any] = Field(
default=None,
description="Language model that will run the AgentPlanner if planning is True.",
)
@@ -213,7 +212,7 @@ class Crew(BaseModel):
default=None,
description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.",
)
chat_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
chat_llm: Optional[Any] = Field(
default=None,
description="LLM used to handle chatting with the crew.",
)
@@ -799,8 +798,7 @@ class Crew(BaseModel):
# Determine which tools to use - task tools take precedence over agent tools
tools_for_task = task.tools or agent_to_use.tools or []
# Prepare tools and ensure they're compatible with task execution
tools_for_task = self._prepare_tools(agent_to_use, task, cast(Union[List[Tool], List[BaseTool]], tools_for_task))
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
self._log_task_start(task, agent_to_use.role)
@@ -819,7 +817,7 @@ class Crew(BaseModel):
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=cast(List[BaseTool], tools_for_task),
tools=tools_for_task,
)
futures.append((task, future, task_index))
else:
@@ -831,7 +829,7 @@ class Crew(BaseModel):
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=cast(List[BaseTool], tools_for_task),
tools=tools_for_task,
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
@@ -869,10 +867,10 @@ class Crew(BaseModel):
return None
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, agent: BaseAgent, task: Task, tools: List[Tool]
) -> List[Tool]:
# Add delegation tools if agent allows delegation
if hasattr(agent, "allow_delegation") and getattr(agent, "allow_delegation", False):
if agent.allow_delegation:
if self.process == Process.hierarchical:
if self.manager_agent:
tools = self._update_manager_tools(task, tools)
@@ -881,18 +879,17 @@ class Crew(BaseModel):
"Manager agent is required for hierarchical process."
)
elif agent:
elif agent and agent.allow_delegation:
tools = self._add_delegation_tools(task, tools)
# Add code execution tools if agent allows code execution
if hasattr(agent, "allow_code_execution") and getattr(agent, "allow_code_execution", False):
if agent.allow_code_execution:
tools = self._add_code_execution_tools(agent, tools)
if agent and hasattr(agent, "multimodal") and getattr(agent, "multimodal", False):
if agent and agent.multimodal:
tools = self._add_multimodal_tools(agent, tools)
# Return a List[BaseTool] which is compatible with both Task.execute_sync and Task.execute_async
return cast(List[BaseTool], tools)
return tools
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
@@ -900,11 +897,11 @@ class Crew(BaseModel):
return task.agent
def _merge_tools(
self, existing_tools: Union[List[Tool], List[BaseTool]], new_tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, existing_tools: List[Tool], new_tools: List[Tool]
) -> List[Tool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
if not new_tools:
return cast(List[BaseTool], existing_tools)
return existing_tools
# Create mapping of tool names to new tools
new_tool_map = {tool.name: tool for tool in new_tools}
@@ -915,32 +912,23 @@ class Crew(BaseModel):
# Add all new tools
tools.extend(new_tools)
return cast(List[BaseTool], tools)
return tools
def _inject_delegation_tools(
self, tools: Union[List[Tool], List[BaseTool]], task_agent: BaseAgent, agents: List[BaseAgent]
) -> List[BaseTool]:
if hasattr(task_agent, "get_delegation_tools"):
delegation_tools = task_agent.get_delegation_tools(agents)
# Cast delegation_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], delegation_tools))
return cast(List[BaseTool], tools)
self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]
):
delegation_tools = task_agent.get_delegation_tools(agents)
return self._merge_tools(tools, delegation_tools)
def _add_multimodal_tools(self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]) -> List[BaseTool]:
if hasattr(agent, "get_multimodal_tools"):
multimodal_tools = agent.get_multimodal_tools()
# Cast multimodal_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], multimodal_tools))
return cast(List[BaseTool], tools)
def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]):
multimodal_tools = agent.get_multimodal_tools()
return self._merge_tools(tools, multimodal_tools)
def _add_code_execution_tools(self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]) -> List[BaseTool]:
if hasattr(agent, "get_code_execution_tools"):
code_tools = agent.get_code_execution_tools()
# Cast code_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], code_tools))
return cast(List[BaseTool], tools)
def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]):
code_tools = agent.get_code_execution_tools()
return self._merge_tools(tools, code_tools)
def _add_delegation_tools(self, task: Task, tools: Union[List[Tool], List[BaseTool]]) -> List[BaseTool]:
def _add_delegation_tools(self, task: Task, tools: List[Tool]):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
@@ -948,7 +936,7 @@ class Crew(BaseModel):
tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation
)
return cast(List[BaseTool], tools)
return tools
def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file:
@@ -956,7 +944,7 @@ class Crew(BaseModel):
task_name=task.name, task=task.description, agent=role, status="started"
)
def _update_manager_tools(self, task: Task, tools: Union[List[Tool], List[BaseTool]]) -> List[BaseTool]:
def _update_manager_tools(self, task: Task, tools: List[Tool]):
if self.manager_agent:
if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
@@ -964,7 +952,7 @@ class Crew(BaseModel):
tools = self._inject_delegation_tools(
tools, self.manager_agent, self.agents
)
return cast(List[BaseTool], tools)
return tools
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
context = (
@@ -1210,27 +1198,21 @@ class Crew(BaseModel):
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
try:
# Create LLM instance and ensure it's of type LLM for CrewEvaluator
llm_instance = create_llm(eval_llm)
if not llm_instance:
eval_llm = create_llm(eval_llm)
if not eval_llm:
raise ValueError("Failed to create LLM instance.")
# Ensure we have an LLM instance (not just BaseLLM) for CrewEvaluator
from crewai.llm import LLM
if not isinstance(llm_instance, LLM):
raise TypeError("CrewEvaluator requires an LLM instance, not a BaseLLM instance.")
crewai_event_bus.emit(
self,
CrewTestStartedEvent(
crew_name=self.name or "crew",
n_iterations=n_iterations,
eval_llm=llm_instance,
eval_llm=eval_llm,
inputs=inputs,
),
)
test_crew = self.copy()
evaluator = CrewEvaluator(test_crew, llm_instance)
evaluator = CrewEvaluator(test_crew, eval_llm) # type: ignore[arg-type]
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)

View File

@@ -894,45 +894,35 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Routers are executed sequentially to maintain flow control
- Each router's result becomes a new trigger_method
- Each router's result becomes the new trigger_method
- Normal listeners are executed in parallel for efficiency
- Listeners can receive the trigger method's result as a parameter
"""
# First, handle routers repeatedly until no router triggers anymore
router_results = []
current_trigger = trigger_method
while True:
routers_triggered = self._find_triggered_methods(
current_trigger, router_only=True
trigger_method, router_only=True
)
if not routers_triggered:
break
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
# After executing router, the router's result is the path
router_result = self._method_outputs[-1]
if router_result: # Only add non-None results
router_results.append(router_result)
current_trigger = (
router_result # Update for next iteration of router chain
)
# The last router executed sets the trigger_method
# The router result is the last element in self._method_outputs
trigger_method = self._method_outputs[-1]
# Now execute normal listeners for all router results and the original trigger
all_triggers = [trigger_method] + router_results
for current_trigger in all_triggers:
if current_trigger: # Skip None results
listeners_triggered = self._find_triggered_methods(
current_trigger, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
# Now that no more routers are triggered by current trigger_method,
# execute normal listeners
listeners_triggered = self._find_triggered_methods(
trigger_method, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
def _find_triggered_methods(
self, trigger_method: str, router_only: bool

View File

@@ -4,7 +4,6 @@ import os
import sys
import threading
import warnings
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import Any, Dict, List, Literal, Optional, Type, Union, cast
@@ -35,223 +34,6 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
load_dotenv()
class LLM(ABC):
"""Base class for LLM implementations.
This class defines the interface that all LLM implementations must follow.
Users can extend this class to create custom LLM implementations that don't
rely on litellm's authentication mechanism.
Custom LLM implementations should handle error cases gracefully, including
timeouts, authentication failures, and malformed responses. They should also
implement proper validation for input parameters and provide clear error
messages when things go wrong.
Attributes:
stop (list): A list of stop sequences that the LLM should use to stop generation.
This is used by the CrewAgentExecutor and other components.
"""
def __new__(cls, *args, **kwargs):
"""Create a new LLM instance.
This method handles backward compatibility by creating a DefaultLLM instance
when the LLM class is instantiated directly with parameters.
Args:
*args: Positional arguments.
**kwargs: Keyword arguments.
Returns:
Either a new LLM instance or a DefaultLLM instance for backward compatibility.
"""
if cls is LLM and (args or kwargs.get('model') is not None):
# Import locally to avoid circular imports
# This is safe because DefaultLLM is defined later in this file
DefaultLLM = globals().get('DefaultLLM')
if DefaultLLM is None:
# If DefaultLLM is not yet defined, return a placeholder
# that will be replaced with a real DefaultLLM instance later
return object.__new__(cls)
return DefaultLLM(*args, **kwargs)
return super().__new__(cls)
def __init__(self):
"""Initialize the LLM with default attributes.
This constructor sets default values for attributes that are expected
by the CrewAgentExecutor and other components.
All custom LLM implementations should call super().__init__() to ensure
that these default attributes are properly initialized.
"""
self.stop = []
@classmethod
def create(
cls,
model: str,
timeout: Optional[Union[float, int]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
n: Optional[int] = None,
stop: Optional[Union[str, List[str]]] = None,
max_completion_tokens: Optional[int] = None,
max_tokens: Optional[int] = None,
presence_penalty: Optional[float] = None,
frequency_penalty: Optional[float] = None,
logit_bias: Optional[Dict[int, float]] = None,
response_format: Optional[Type[BaseModel]] = None,
seed: Optional[int] = None,
logprobs: Optional[int] = None,
top_logprobs: Optional[int] = None,
base_url: Optional[str] = None,
api_base: Optional[str] = None,
api_version: Optional[str] = None,
api_key: Optional[str] = None,
callbacks: List[Any] = [],
reasoning_effort: Optional[Literal["none", "low", "medium", "high"]] = None,
**kwargs,
) -> 'DefaultLLM':
"""Create a default LLM instance using litellm.
This factory method creates a default LLM instance using litellm as the backend.
It's the recommended way to create LLM instances for most users.
Args:
model: The model name (e.g., "gpt-4").
timeout: Optional timeout for the LLM call.
temperature: Optional temperature for the LLM call.
top_p: Optional top_p for the LLM call.
n: Optional n for the LLM call.
stop: Optional stop sequences for the LLM call.
max_completion_tokens: Optional max_completion_tokens for the LLM call.
max_tokens: Optional max_tokens for the LLM call.
presence_penalty: Optional presence_penalty for the LLM call.
frequency_penalty: Optional frequency_penalty for the LLM call.
logit_bias: Optional logit_bias for the LLM call.
response_format: Optional response_format for the LLM call.
seed: Optional seed for the LLM call.
logprobs: Optional logprobs for the LLM call.
top_logprobs: Optional top_logprobs for the LLM call.
base_url: Optional base_url for the LLM call.
api_base: Optional api_base for the LLM call.
api_version: Optional api_version for the LLM call.
api_key: Optional api_key for the LLM call.
callbacks: Optional callbacks for the LLM call.
reasoning_effort: Optional reasoning_effort for the LLM call.
**kwargs: Additional keyword arguments for the LLM call.
Returns:
A DefaultLLM instance configured with the provided parameters.
"""
from crewai.llm import DefaultLLM
return DefaultLLM(
model=model,
timeout=timeout,
temperature=temperature,
top_p=top_p,
n=n,
stop=stop,
max_completion_tokens=max_completion_tokens,
max_tokens=max_tokens,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
logit_bias=logit_bias,
response_format=response_format,
seed=seed,
logprobs=logprobs,
top_logprobs=top_logprobs,
base_url=base_url,
api_base=api_base,
api_version=api_version,
api_key=api_key,
callbacks=callbacks,
reasoning_effort=reasoning_effort,
**kwargs,
)
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Call the LLM with the given messages.
Args:
messages: Input messages for the LLM.
Can be a string or list of message dictionaries.
If string, it will be converted to a single user message.
If list, each dict must have 'role' and 'content' keys.
tools: Optional list of tool schemas for function calling.
Each tool should define its name, description, and parameters.
callbacks: Optional list of callback functions to be executed
during and after the LLM call.
available_functions: Optional dict mapping function names to callables
that can be invoked by the LLM.
Returns:
Either a text response from the LLM (str) or
the result of a tool function call (Any).
Raises:
ValueError: If the messages format is invalid.
TimeoutError: If the LLM request times out.
RuntimeError: If the LLM request fails for other reasons.
NotImplementedError: If this method is not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement call()")
def supports_function_calling(self) -> bool:
"""Check if the LLM supports function calling.
This method should return True if the LLM implementation supports
function calling (tools), and False otherwise. If this method returns
True, the LLM should be able to handle the 'tools' parameter in the
call() method.
Returns:
True if the LLM supports function calling, False otherwise.
Raises:
NotImplementedError: If this method is not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement supports_function_calling()")
def supports_stop_words(self) -> bool:
"""Check if the LLM supports stop words.
This method should return True if the LLM implementation supports
stop words, and False otherwise. If this method returns True, the
LLM should respect the 'stop' attribute when generating responses.
Returns:
True if the LLM supports stop words, False otherwise.
Raises:
NotImplementedError: If this method is not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement supports_stop_words()")
def get_context_window_size(self) -> int:
"""Get the context window size of the LLM.
This method should return the maximum number of tokens that the LLM
can process in a single request. This is used by CrewAI to ensure
that messages don't exceed the LLM's context window.
Returns:
The context window size as an integer.
Raises:
NotImplementedError: If this method is not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement get_context_window_size()")
class FilteredStream:
def __init__(self, original_stream):
self._original_stream = original_stream
@@ -344,14 +126,7 @@ def suppress_warnings():
sys.stderr = old_stderr
class DefaultLLM(LLM):
"""Default LLM implementation using litellm.
This class provides a concrete implementation of the LLM interface
using litellm as the backend. It's the default implementation used
by CrewAI when no custom LLM is provided.
"""
class LLM:
def __init__(
self,
model: str,
@@ -377,8 +152,6 @@ class DefaultLLM(LLM):
reasoning_effort: Optional[Literal["none", "low", "medium", "high"]] = None,
**kwargs,
):
super().__init__() # Initialize the base class
self.model = model
self.timeout = timeout
self.temperature = temperature
@@ -407,7 +180,7 @@ class DefaultLLM(LLM):
# Normalize self.stop to always be a List[str]
if stop is None:
self.stop = [] # Already initialized in base class
self.stop: List[str] = []
elif isinstance(stop, str):
self.stop = [stop]
else:
@@ -791,27 +564,3 @@ class DefaultLLM(LLM):
litellm.success_callback = success_callbacks
litellm.failure_callback = failure_callbacks
class BaseLLM(LLM):
"""Deprecated: Use LLM instead.
This class is kept for backward compatibility and will be removed in a future release.
It inherits from LLM and provides the same interface, but emits a deprecation warning
when instantiated.
"""
def __init__(self):
"""Initialize the BaseLLM with a deprecation warning.
This constructor emits a deprecation warning and then calls the parent class's
constructor to initialize the LLM.
"""
import warnings
warnings.warn(
"BaseLLM is deprecated and will be removed in a future release. "
"Use LLM instead for custom implementations.",
DeprecationWarning,
stacklevel=2
)
super().__init__()

View File

@@ -2,7 +2,7 @@ import os
from typing import Any, Dict, List, Optional, Union
from crewai.cli.constants import DEFAULT_LLM_MODEL, ENV_VARS, LITELLM_PARAMS
from crewai.llm import LLM, BaseLLM
from crewai.llm import LLM
def create_llm(
@@ -19,17 +19,17 @@ def create_llm(
- None: Use environment-based or fallback default model.
Returns:
A LLM instance if successful, or None if something fails.
An LLM instance if successful, or None if something fails.
"""
# 1) If llm_value is already a LLM object, return it directly
# 1) If llm_value is already an LLM object, return it directly
if isinstance(llm_value, LLM):
return llm_value
# 2) If llm_value is a string (model name)
if isinstance(llm_value, str):
try:
created_llm = LLM.create(model=llm_value)
created_llm = LLM(model=llm_value)
return created_llm
except Exception as e:
print(f"Failed to instantiate LLM with model='{llm_value}': {e}")
@@ -56,7 +56,7 @@ def create_llm(
base_url: Optional[str] = getattr(llm_value, "base_url", None)
api_base: Optional[str] = getattr(llm_value, "api_base", None)
created_llm = LLM.create(
created_llm = LLM(
model=model,
temperature=temperature,
max_tokens=max_tokens,
@@ -175,7 +175,7 @@ def _llm_via_environment_or_fallback() -> Optional[LLM]:
# Try creating the LLM
try:
new_llm = LLM.create(**llm_params)
new_llm = LLM(**llm_params)
return new_llm
except Exception as e:
print(

View File

@@ -1,89 +0,0 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are test role. test backstory\nYour
personal goal is: test goal\nTo give my best complete final answer to the task
respond using the exact following format:\n\nThought: I now can give a great
answer\nFinal Answer: Your final answer must be the great and the most complete
as possible, it must be outcome described.\n\nI MUST use these formats, my job
depends on it!"}, {"role": "user", "content": "\nCurrent Task: Test task\n\nThis
is the expected criteria for your final answer: Test output\nyou MUST return
the actual complete content as the final answer, not a summary.\n\nBegin! This
is VERY important to you, use the tools available and give your best Final Answer,
your job depends on it!\n\nThought:"}], "model": "gpt-4", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '805'
content-type:
- application/json
cookie:
- _cfuvid=xecEkmr_qTiKn7EKC7aeGN5bpsbPM9ofyIsipL4VCYM-1734033219265-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-proj-********************************************************************************************************************************************************sLcA.
You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 9201beec18a0762e-SEA
Connection:
- keep-alive
Content-Length:
- '414'
Content-Type:
- application/json; charset=utf-8
Date:
- Fri, 14 Mar 2025 06:34:31 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=wF6OyTyATDK7A9tGqAdaSB3QZfmd34JWPicYlDC1hug-1741934071-1.0.1.1-nZThPWX_7A9FsU7Z14PyrVhl6mCD99iuk9ujCFkNCCdepMHEwK9EXoDrP4IBBCXxkXmKjrVTSaQ63zpcociXuMHR8JKhth2fRUV2H4hMldY;
path=/; expires=Fri, 14-Mar-25 07:04:31 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=rn5IWZdYMRmbyCa2_84MkWO46MIaP6soWc8npaLc9iQ-1741934071787-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_f55471c8eb5755daaef3d63eab5a95de
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -1,570 +0,0 @@
from collections import deque
from typing import Any, Dict, List, Optional, Union
import time
import jwt
import pytest
from crewai.llm import LLM
from crewai.utilities.llm_utils import create_llm
class CustomLLM(LLM):
"""Custom LLM implementation for testing.
This is a simple implementation of the LLM abstract base class
that returns a predefined response for testing purposes.
"""
def __init__(self, response: str = "Custom LLM response"):
"""Initialize the CustomLLM with a predefined response.
Args:
response: The predefined response to return from call().
"""
super().__init__()
self.response = response
self.calls = []
self.stop = []
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Record the call and return the predefined response.
Args:
messages: Input messages for the LLM.
tools: Optional list of tool schemas for function calling.
callbacks: Optional list of callback functions.
available_functions: Optional dict mapping function names to callables.
Returns:
The predefined response string.
"""
self.calls.append({
"messages": messages,
"tools": tools,
"callbacks": callbacks,
"available_functions": available_functions
})
return self.response
def supports_function_calling(self) -> bool:
"""Return True to indicate that function calling is supported.
Returns:
True, indicating that this LLM supports function calling.
"""
return True
def supports_stop_words(self) -> bool:
"""Return True to indicate that stop words are supported.
Returns:
True, indicating that this LLM supports stop words.
"""
return True
def get_context_window_size(self) -> int:
"""Return a default context window size.
Returns:
8192, a typical context window size for modern LLMs.
"""
return 8192
def test_custom_llm_implementation():
"""Test that a custom LLM implementation works with create_llm."""
custom_llm = CustomLLM(response="The answer is 42")
# Test that create_llm returns the custom LLM instance directly
result_llm = create_llm(custom_llm)
assert result_llm is custom_llm
# Test calling the custom LLM
response = result_llm.call("What is the answer to life, the universe, and everything?")
# Verify that the custom LLM was called
assert len(custom_llm.calls) > 0
# Verify that the response from the custom LLM was used
assert response == "The answer is 42"
class JWTAuthLLM(LLM):
"""Custom LLM implementation with JWT authentication.
This class demonstrates how to implement a custom LLM that uses JWT
authentication instead of API key-based authentication. It validates
the JWT token before each call and checks for token expiration.
"""
def __init__(self, jwt_token: str, expiration_buffer: int = 60):
"""Initialize the JWTAuthLLM with a JWT token.
Args:
jwt_token: The JWT token to use for authentication.
expiration_buffer: Buffer time in seconds to warn about token expiration.
Default is 60 seconds.
Raises:
ValueError: If the JWT token is invalid or missing.
"""
super().__init__()
if not jwt_token or not isinstance(jwt_token, str):
raise ValueError("Invalid JWT token")
self.jwt_token = jwt_token
self.expiration_buffer = expiration_buffer
self.calls = []
self.stop = []
# Validate the token immediately
self._validate_token()
def _validate_token(self) -> None:
"""Validate the JWT token.
Checks if the token is valid and not expired. Also warns if the token
is about to expire within the expiration_buffer time.
Raises:
ValueError: If the token is invalid, expired, or malformed.
"""
try:
# Decode without verification to check expiration
# In a real implementation, you would verify the signature
decoded = jwt.decode(self.jwt_token, options={"verify_signature": False})
# Check if token is expired or about to expire
if 'exp' in decoded:
expiration_time = decoded['exp']
current_time = time.time()
if expiration_time < current_time:
raise ValueError("JWT token has expired")
if expiration_time < current_time + self.expiration_buffer:
# Token will expire soon, log a warning
import logging
logging.warning(f"JWT token will expire in {expiration_time - current_time} seconds")
except jwt.PyJWTError as e:
raise ValueError(f"Invalid JWT token format: {str(e)}")
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Call the LLM with JWT authentication.
Validates the JWT token before making the call to ensure it's still valid.
Args:
messages: Input messages for the LLM.
tools: Optional list of tool schemas for function calling.
callbacks: Optional list of callback functions.
available_functions: Optional dict mapping function names to callables.
Returns:
The LLM response.
Raises:
ValueError: If the JWT token is invalid or expired.
TimeoutError: If the request times out.
ConnectionError: If there's a connection issue.
"""
# Validate token before making the call
self._validate_token()
self.calls.append({
"messages": messages,
"tools": tools,
"callbacks": callbacks,
"available_functions": available_functions
})
# In a real implementation, this would use the JWT token to authenticate
# with an external service
return "Response from JWT-authenticated LLM"
def supports_function_calling(self) -> bool:
"""Return True to indicate that function calling is supported."""
return True
def supports_stop_words(self) -> bool:
"""Return True to indicate that stop words are supported."""
return True
def get_context_window_size(self) -> int:
"""Return a default context window size."""
return 8192
def test_custom_llm_with_jwt_auth():
"""Test a custom LLM implementation with JWT authentication."""
# Create a valid JWT token that expires 1 hour from now
valid_token = jwt.encode(
{"exp": int(time.time()) + 3600},
"secret",
algorithm="HS256"
)
jwt_llm = JWTAuthLLM(jwt_token=valid_token)
# Test that create_llm returns the JWT-authenticated LLM instance directly
result_llm = create_llm(jwt_llm)
assert result_llm is jwt_llm
# Test calling the JWT-authenticated LLM
response = result_llm.call("Test message")
# Verify that the JWT-authenticated LLM was called
assert len(jwt_llm.calls) > 0
# Verify that the response from the JWT-authenticated LLM was used
assert response == "Response from JWT-authenticated LLM"
def test_jwt_auth_llm_validation():
"""Test that JWT token validation works correctly."""
# Test with invalid JWT token (empty string)
with pytest.raises(ValueError, match="Invalid JWT token"):
JWTAuthLLM(jwt_token="")
# Test with invalid JWT token (non-string)
with pytest.raises(ValueError, match="Invalid JWT token"):
JWTAuthLLM(jwt_token=None)
# Test with expired token
# Create a token that expired 1 hour ago
expired_token = jwt.encode(
{"exp": int(time.time()) - 3600},
"secret",
algorithm="HS256"
)
with pytest.raises(ValueError, match="JWT token has expired"):
JWTAuthLLM(jwt_token=expired_token)
# Test with malformed token
with pytest.raises(ValueError, match="Invalid JWT token format"):
JWTAuthLLM(jwt_token="not.a.valid.jwt.token")
# Test with valid token
# Create a token that expires 1 hour from now
valid_token = jwt.encode(
{"exp": int(time.time()) + 3600},
"secret",
algorithm="HS256"
)
# This should not raise an exception
jwt_llm = JWTAuthLLM(jwt_token=valid_token)
assert jwt_llm.jwt_token == valid_token
class TimeoutHandlingLLM(LLM):
"""Custom LLM implementation with timeout handling and retry logic."""
def __init__(self, max_retries: int = 3, timeout: int = 30):
"""Initialize the TimeoutHandlingLLM with retry and timeout settings.
Args:
max_retries: Maximum number of retry attempts.
timeout: Timeout in seconds for each API call.
"""
super().__init__()
self.max_retries = max_retries
self.timeout = timeout
self.calls = []
self.stop = []
self.fail_count = 0 # Number of times to simulate failure
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Simulate API calls with timeout handling and retry logic.
Args:
messages: Input messages for the LLM.
tools: Optional list of tool schemas for function calling.
callbacks: Optional list of callback functions.
available_functions: Optional dict mapping function names to callables.
Returns:
A response string based on whether this is the first attempt or a retry.
Raises:
TimeoutError: If all retry attempts fail.
"""
# Record the initial call
self.calls.append({
"messages": messages,
"tools": tools,
"callbacks": callbacks,
"available_functions": available_functions,
"attempt": 0
})
# Simulate retry logic
for attempt in range(self.max_retries):
# Skip the first attempt recording since we already did that above
if attempt == 0:
# Simulate a failure if fail_count > 0
if self.fail_count > 0:
self.fail_count -= 1
# If we've used all retries, raise an error
if attempt == self.max_retries - 1:
raise TimeoutError(f"LLM request failed after {self.max_retries} attempts")
# Otherwise, continue to the next attempt (simulating backoff)
continue
else:
# Success on first attempt
return "First attempt response"
else:
# This is a retry attempt (attempt > 0)
# Always record retry attempts
self.calls.append({
"retry_attempt": attempt,
"messages": messages,
"tools": tools,
"callbacks": callbacks,
"available_functions": available_functions
})
# Simulate a failure if fail_count > 0
if self.fail_count > 0:
self.fail_count -= 1
# If we've used all retries, raise an error
if attempt == self.max_retries - 1:
raise TimeoutError(f"LLM request failed after {self.max_retries} attempts")
# Otherwise, continue to the next attempt (simulating backoff)
continue
else:
# Success on retry
return "Response after retry"
def supports_function_calling(self) -> bool:
"""Return True to indicate that function calling is supported.
Returns:
True, indicating that this LLM supports function calling.
"""
return True
def supports_stop_words(self) -> bool:
"""Return True to indicate that stop words are supported.
Returns:
True, indicating that this LLM supports stop words.
"""
return True
def get_context_window_size(self) -> int:
"""Return a default context window size.
Returns:
8192, a typical context window size for modern LLMs.
"""
return 8192
def test_timeout_handling_llm():
"""Test a custom LLM implementation with timeout handling and retry logic."""
# Test successful first attempt
llm = TimeoutHandlingLLM()
response = llm.call("Test message")
assert response == "First attempt response"
assert len(llm.calls) == 1
# Test successful retry
llm = TimeoutHandlingLLM()
llm.fail_count = 1 # Fail once, then succeed
response = llm.call("Test message")
assert response == "Response after retry"
assert len(llm.calls) == 2 # Initial call + successful retry call
# Test failure after all retries
llm = TimeoutHandlingLLM(max_retries=2)
llm.fail_count = 2 # Fail twice, which is all retries
with pytest.raises(TimeoutError, match="LLM request failed after 2 attempts"):
llm.call("Test message")
assert len(llm.calls) == 2 # Initial call + failed retry attempt
def test_rate_limited_llm():
"""Test that rate limiting works correctly."""
# Create a rate limited LLM with a very low limit (2 requests per minute)
llm = RateLimitedLLM(requests_per_minute=2)
# First request should succeed
response1 = llm.call("Test message 1")
assert response1 == "Rate limited response"
assert len(llm.calls) == 1
# Second request should succeed
response2 = llm.call("Test message 2")
assert response2 == "Rate limited response"
assert len(llm.calls) == 2
# Third request should fail due to rate limiting
with pytest.raises(ValueError, match="Rate limit exceeded"):
llm.call("Test message 3")
# Test with invalid requests_per_minute
with pytest.raises(ValueError, match="requests_per_minute must be a positive integer"):
RateLimitedLLM(requests_per_minute=0)
with pytest.raises(ValueError, match="requests_per_minute must be a positive integer"):
RateLimitedLLM(requests_per_minute=-1)
def test_rate_limit_reset():
"""Test that rate limits reset after the time window passes."""
# Create a rate limited LLM with a very low limit (1 request per minute)
# and a short time window for testing (1 second instead of 60 seconds)
time_window = 1 # 1 second instead of 60 seconds
llm = RateLimitedLLM(requests_per_minute=1, time_window=time_window)
# First request should succeed
response1 = llm.call("Test message 1")
assert response1 == "Rate limited response"
# Second request should fail due to rate limiting
with pytest.raises(ValueError, match="Rate limit exceeded"):
llm.call("Test message 2")
# Wait for the rate limit to reset
import time
time.sleep(time_window + 0.1) # Add a small buffer
# After waiting, we should be able to make another request
response3 = llm.call("Test message 3")
assert response3 == "Rate limited response"
assert len(llm.calls) == 2 # First and third requests
class RateLimitedLLM(LLM):
"""Custom LLM implementation with rate limiting.
This class demonstrates how to implement a custom LLM with rate limiting
capabilities. It uses a sliding window algorithm to ensure that no more
than a specified number of requests are made within a given time period.
"""
def __init__(self, requests_per_minute: int = 60, base_response: str = "Rate limited response", time_window: int = 60):
"""Initialize the RateLimitedLLM with rate limiting parameters.
Args:
requests_per_minute: Maximum number of requests allowed per minute.
base_response: Default response to return.
time_window: Time window in seconds for rate limiting (default: 60).
This is configurable for testing purposes.
Raises:
ValueError: If requests_per_minute is not a positive integer.
"""
super().__init__()
if not isinstance(requests_per_minute, int) or requests_per_minute <= 0:
raise ValueError("requests_per_minute must be a positive integer")
self.requests_per_minute = requests_per_minute
self.base_response = base_response
self.time_window = time_window
self.request_times = deque()
self.calls = []
self.stop = []
def _check_rate_limit(self) -> None:
"""Check if the current request exceeds the rate limit.
This method implements a sliding window rate limiting algorithm.
It keeps track of request timestamps and ensures that no more than
`requests_per_minute` requests are made within the configured time window.
Raises:
ValueError: If the rate limit is exceeded.
"""
current_time = time.time()
# Remove requests older than the time window
while self.request_times and current_time - self.request_times[0] > self.time_window:
self.request_times.popleft()
# Check if we've exceeded the rate limit
if len(self.request_times) >= self.requests_per_minute:
wait_time = self.time_window - (current_time - self.request_times[0])
raise ValueError(
f"Rate limit exceeded. Maximum {self.requests_per_minute} "
f"requests per {self.time_window} seconds. Try again in {wait_time:.2f} seconds."
)
# Record this request
self.request_times.append(current_time)
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Call the LLM with rate limiting.
Args:
messages: Input messages for the LLM.
tools: Optional list of tool schemas for function calling.
callbacks: Optional list of callback functions.
available_functions: Optional dict mapping function names to callables.
Returns:
The LLM response.
Raises:
ValueError: If the rate limit is exceeded.
"""
# Check rate limit before making the call
self._check_rate_limit()
self.calls.append({
"messages": messages,
"tools": tools,
"callbacks": callbacks,
"available_functions": available_functions
})
return self.base_response
def supports_function_calling(self) -> bool:
"""Return True to indicate that function calling is supported.
Returns:
True, indicating that this LLM supports function calling.
"""
return True
def supports_stop_words(self) -> bool:
"""Return True to indicate that stop words are supported.
Returns:
True, indicating that this LLM supports stop words.
"""
return True
def get_context_window_size(self) -> int:
"""Return a default context window size.
Returns:
8192, a typical context window size for modern LLMs.
"""
return 8192

View File

@@ -654,104 +654,3 @@ def test_flow_plotting():
assert isinstance(received_events[0], FlowPlotEvent)
assert received_events[0].flow_name == "StatelessFlow"
assert isinstance(received_events[0].timestamp, datetime)
def test_multiple_routers_from_same_trigger():
"""Test that multiple routers triggered by the same method all activate their listeners."""
execution_order = []
class MultiRouterFlow(Flow):
def __init__(self):
super().__init__()
# Set diagnosed conditions to trigger all routers
self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A
@start()
def scan_medical(self):
execution_order.append("scan_medical")
return "scan_complete"
@router(scan_medical)
def diagnose_conditions(self):
execution_order.append("diagnose_conditions")
return "diagnosis_complete"
@router(diagnose_conditions)
def diabetes_router(self):
execution_order.append("diabetes_router")
if "D" in self.state["diagnosed_conditions"]:
return "diabetes"
return None
@listen("diabetes")
def diabetes_analysis(self):
execution_order.append("diabetes_analysis")
return "diabetes_analysis_complete"
@router(diagnose_conditions)
def hypertension_router(self):
execution_order.append("hypertension_router")
if "H" in self.state["diagnosed_conditions"]:
return "hypertension"
return None
@listen("hypertension")
def hypertension_analysis(self):
execution_order.append("hypertension_analysis")
return "hypertension_analysis_complete"
@router(diagnose_conditions)
def anemia_router(self):
execution_order.append("anemia_router")
if "A" in self.state["diagnosed_conditions"]:
return "anemia"
return None
@listen("anemia")
def anemia_analysis(self):
execution_order.append("anemia_analysis")
return "anemia_analysis_complete"
flow = MultiRouterFlow()
flow.kickoff()
# Verify all methods were called
assert "scan_medical" in execution_order
assert "diagnose_conditions" in execution_order
# Verify all routers were called
assert "diabetes_router" in execution_order
assert "hypertension_router" in execution_order
assert "anemia_router" in execution_order
# Verify all listeners were called - this is the key test for the fix
assert "diabetes_analysis" in execution_order
assert "hypertension_analysis" in execution_order
assert "anemia_analysis" in execution_order
# Verify execution order constraints
assert execution_order.index("diagnose_conditions") > execution_order.index(
"scan_medical"
)
# All routers should execute after diagnose_conditions
assert execution_order.index("diabetes_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("hypertension_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("anemia_router") > execution_order.index(
"diagnose_conditions"
)
# All analyses should execute after their respective routers
assert execution_order.index("diabetes_analysis") > execution_order.index(
"diabetes_router"
)
assert execution_order.index("hypertension_analysis") > execution_order.index(
"hypertension_router"
)
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)