Merge branch 'main' into main

This commit is contained in:
Marco Vinciguerra
2024-12-28 09:12:50 +01:00
committed by GitHub
18 changed files with 877 additions and 197 deletions

View File

@@ -43,4 +43,6 @@ from .tools import (
YoutubeChannelSearchTool,
YoutubeVideoSearchTool,
WeaviateVectorSearchTool,
SerpApiGoogleSearchTool,
SerpApiGoogleShoppingTool,
)

View File

@@ -53,3 +53,5 @@ from .youtube_channel_search_tool.youtube_channel_search_tool import (
)
from .youtube_video_search_tool.youtube_video_search_tool import YoutubeVideoSearchTool
from .weaviate_tool.vector_search import WeaviateVectorSearchTool
from .serpapi_tool.serpapi_google_search_tool import SerpApiGoogleSearchTool
from .serpapi_tool.serpapi_google_shopping_tool import SerpApiGoogleShoppingTool

View File

@@ -38,3 +38,16 @@ Agent(
tools=[CodeInterpreterTool(user_dockerfile_path="<Dockerfile_path>")],
)
```
If it is difficult to connect to docker daemon automatically (especially for macOS users), you can do this to setup docker host manually
```python
from crewai_tools import CodeInterpreterTool
Agent(
...
tools=[CodeInterpreterTool(user_docker_base_url="<Docker Host Base Url>",
user_dockerfile_path="<Dockerfile_path>")],
)
```

View File

@@ -2,7 +2,9 @@ import importlib.util
import os
from typing import List, Optional, Type
import docker
from docker import from_env as docker_from_env
from docker.models.containers import Container
from docker.errors import ImageNotFound, NotFound
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
@@ -28,6 +30,7 @@ class CodeInterpreterTool(BaseTool):
default_image_tag: str = "code-interpreter:latest"
code: Optional[str] = None
user_dockerfile_path: Optional[str] = None
user_docker_base_url: Optional[str] = None
unsafe_mode: bool = False
@staticmethod
@@ -39,12 +42,13 @@ class CodeInterpreterTool(BaseTool):
"""
Verify if the Docker image is available. Optionally use a user-provided Dockerfile.
"""
client = docker.from_env()
client = docker_from_env() if self.user_docker_base_url == None else docker.DockerClient(base_url=self.user_docker_base_url)
try:
client.images.get(self.default_image_tag)
except docker.errors.ImageNotFound:
except ImageNotFound:
if self.user_dockerfile_path and os.path.exists(self.user_dockerfile_path):
dockerfile_path = self.user_dockerfile_path
else:
@@ -73,17 +77,17 @@ class CodeInterpreterTool(BaseTool):
return self.run_code_in_docker(code, libraries_used)
def _install_libraries(
self, container: docker.models.containers.Container, libraries: List[str]
self, container: Container, libraries: List[str]
) -> None:
"""
Install missing libraries in the Docker container
"""
for library in libraries:
container.exec_run(f"pip install {library}")
container.exec_run(["pip", "install", library])
def _init_docker_container(self) -> docker.models.containers.Container:
def _init_docker_container(self) -> Container:
container_name = "code-interpreter"
client = docker.from_env()
client = docker_from_env()
current_path = os.getcwd()
# Check if the container is already running
@@ -91,7 +95,7 @@ class CodeInterpreterTool(BaseTool):
existing_container = client.containers.get(container_name)
existing_container.stop()
existing_container.remove()
except docker.errors.NotFound:
except NotFound:
pass # Container does not exist, no need to remove
return client.containers.run(
@@ -108,8 +112,7 @@ class CodeInterpreterTool(BaseTool):
container = self._init_docker_container()
self._install_libraries(container, libraries_used)
cmd_to_run = f'python3 -c "{code}"'
exec_result = container.exec_run(cmd_to_run)
exec_result = container.exec_run(["python3", "-c", code])
container.stop()
container.remove()

View File

@@ -1,8 +1,10 @@
import os
from typing import TYPE_CHECKING, Any, Dict, Optional, Type
from crewai.tools import BaseTool
from pydantic import BaseModel, ConfigDict, Field
from crewai.tools import BaseTool
# Type checking import
if TYPE_CHECKING:
from firecrawl import FirecrawlApp
@@ -10,13 +12,6 @@ if TYPE_CHECKING:
class FirecrawlCrawlWebsiteToolSchema(BaseModel):
url: str = Field(description="Website URL")
crawler_options: Optional[Dict[str, Any]] = Field(
default=None, description="Options for crawling"
)
page_options: Optional[Dict[str, Any]] = Field(
default=None, description="Options for page"
)
class FirecrawlCrawlWebsiteTool(BaseTool):
model_config = ConfigDict(
@@ -25,10 +20,25 @@ class FirecrawlCrawlWebsiteTool(BaseTool):
name: str = "Firecrawl web crawl tool"
description: str = "Crawl webpages using Firecrawl and return the contents"
args_schema: Type[BaseModel] = FirecrawlCrawlWebsiteToolSchema
firecrawl_app: Optional["FirecrawlApp"] = None
api_key: Optional[str] = None
firecrawl: Optional["FirecrawlApp"] = None
url: Optional[str] = None
params: Optional[Dict[str, Any]] = None
poll_interval: Optional[int] = 2
idempotency_key: Optional[str] = None
def __init__(self, api_key: Optional[str] = None, **kwargs):
"""Initialize FirecrawlCrawlWebsiteTool.
Args:
api_key (Optional[str]): Firecrawl API key. If not provided, will check FIRECRAWL_API_KEY env var.
url (Optional[str]): Base URL to crawl. Can be overridden by the _run method.
firecrawl_app (Optional[FirecrawlApp]): Previously created FirecrawlApp instance.
params (Optional[Dict[str, Any]]): Additional parameters to pass to the FirecrawlApp.
poll_interval (Optional[int]): Poll interval for the FirecrawlApp.
idempotency_key (Optional[str]): Idempotency key for the FirecrawlApp.
**kwargs: Additional arguments passed to BaseTool.
"""
super().__init__(**kwargs)
try:
from firecrawl import FirecrawlApp # type: ignore
@@ -37,21 +47,28 @@ class FirecrawlCrawlWebsiteTool(BaseTool):
"`firecrawl` package not found, please run `pip install firecrawl-py`"
)
self.firecrawl = FirecrawlApp(api_key=api_key)
# Allows passing a previously created FirecrawlApp instance
# or builds a new one with the provided API key
if not self.firecrawl_app:
client_api_key = api_key or os.getenv("FIRECRAWL_API_KEY")
if not client_api_key:
raise ValueError(
"FIRECRAWL_API_KEY is not set. Please provide it either via the constructor "
"with the `api_key` argument or by setting the FIRECRAWL_API_KEY environment variable."
)
self.firecrawl_app = FirecrawlApp(api_key=client_api_key)
def _run(
self,
url: str,
crawler_options: Optional[Dict[str, Any]] = None,
page_options: Optional[Dict[str, Any]] = None,
):
if crawler_options is None:
crawler_options = {}
if page_options is None:
page_options = {}
def _run(self, url: str):
# Unless url has been previously set via constructor by the user,
# use the url argument provided by the agent at runtime.
base_url = self.url or url
options = {"crawlerOptions": crawler_options, "pageOptions": page_options}
return self.firecrawl.crawl_url(url, options)
return self.firecrawl_app.crawl_url(
base_url,
params=self.params,
poll_interval=self.poll_interval,
idempotency_key=self.idempotency_key
)
try:

View File

@@ -18,6 +18,10 @@ class LlamaIndexTool(BaseTool):
from llama_index.core.tools import BaseTool as LlamaBaseTool
tool = cast(LlamaBaseTool, self.llama_index_tool)
if self.result_as_answer:
return tool(*args, **kwargs).content
return tool(*args, **kwargs)
@classmethod

View File

@@ -24,6 +24,16 @@ tool = SeleniumScrapingTool(website_url='https://example.com', css_element='.mai
# Example 4: Scrape using optional parameters for customized scraping
tool = SeleniumScrapingTool(website_url='https://example.com', css_element='.main-content', cookie={'name': 'user', 'value': 'John Doe'})
# Example 5: Scrape content in HTML format
tool = SeleniumScrapingTool(website_url='https://example.com', return_html=True)
result = tool._run()
# Returns HTML content like: ['<div class="content">Hello World</div>', '<div class="footer">Copyright 2024</div>']
# Example 6: Scrape content in text format (default)
tool = SeleniumScrapingTool(website_url='https://example.com', return_html=False)
result = tool._run()
# Returns text content like: ['Hello World', 'Copyright 2024']
```
## Arguments
@@ -31,3 +41,4 @@ tool = SeleniumScrapingTool(website_url='https://example.com', css_element='.mai
- `css_element`: Mandatory. The CSS selector for a specific element to scrape from the website.
- `cookie`: Optional. A dictionary containing cookie information. This parameter allows the tool to simulate a session with cookie information, providing access to content that may be restricted to logged-in users.
- `wait_time`: Optional. The number of seconds the tool waits after loading the website and after setting a cookie, before scraping the content. This allows for dynamic content to load properly.
- `return_html`: Optional. If True, the tool returns HTML content. If False, the tool returns text content.

View File

@@ -1,8 +1,10 @@
import re
import time
from typing import Any, Optional, Type
from urllib.parse import urlparse
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, validator
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
@@ -11,18 +13,39 @@ from selenium.webdriver.common.by import By
class FixedSeleniumScrapingToolSchema(BaseModel):
"""Input for SeleniumScrapingTool."""
pass
class SeleniumScrapingToolSchema(FixedSeleniumScrapingToolSchema):
"""Input for SeleniumScrapingTool."""
website_url: str = Field(..., description="Mandatory website url to read the file")
website_url: str = Field(..., description="Mandatory website url to read the file. Must start with http:// or https://")
css_element: str = Field(
...,
description="Mandatory css reference for element to scrape from the website",
)
@validator('website_url')
def validate_website_url(cls, v):
if not v:
raise ValueError("Website URL cannot be empty")
if len(v) > 2048: # Common maximum URL length
raise ValueError("URL is too long (max 2048 characters)")
if not re.match(r'^https?://', v):
raise ValueError("URL must start with http:// or https://")
try:
result = urlparse(v)
if not all([result.scheme, result.netloc]):
raise ValueError("Invalid URL format")
except Exception as e:
raise ValueError(f"Invalid URL: {str(e)}")
if re.search(r'\s', v):
raise ValueError("URL cannot contain whitespace")
return v
class SeleniumScrapingTool(BaseTool):
name: str = "Read a website content"
@@ -33,6 +56,7 @@ class SeleniumScrapingTool(BaseTool):
cookie: Optional[dict] = None
wait_time: Optional[int] = 3
css_element: Optional[str] = None
return_html: Optional[bool] = False
def __init__(
self,
@@ -63,19 +87,54 @@ class SeleniumScrapingTool(BaseTool):
) -> Any:
website_url = kwargs.get("website_url", self.website_url)
css_element = kwargs.get("css_element", self.css_element)
return_html = kwargs.get("return_html", self.return_html)
driver = self._create_driver(website_url, self.cookie, self.wait_time)
content = []
if css_element is None or css_element.strip() == "":
body_text = driver.find_element(By.TAG_NAME, "body").text
content.append(body_text)
else:
for element in driver.find_elements(By.CSS_SELECTOR, css_element):
content.append(element.text)
content = self._get_content(driver, css_element, return_html)
driver.close()
return "\n".join(content)
def _get_content(self, driver, css_element, return_html):
content = []
if self._is_css_element_empty(css_element):
content.append(self._get_body_content(driver, return_html))
else:
content.extend(self._get_elements_content(driver, css_element, return_html))
return content
def _is_css_element_empty(self, css_element):
return css_element is None or css_element.strip() == ""
def _get_body_content(self, driver, return_html):
body_element = driver.find_element(By.TAG_NAME, "body")
return (
body_element.get_attribute("outerHTML")
if return_html
else body_element.text
)
def _get_elements_content(self, driver, css_element, return_html):
elements_content = []
for element in driver.find_elements(By.CSS_SELECTOR, css_element):
elements_content.append(
element.get_attribute("outerHTML") if return_html else element.text
)
return elements_content
def _create_driver(self, url, cookie, wait_time):
if not url:
raise ValueError("URL cannot be empty")
# Validate URL format
if not re.match(r'^https?://', url):
raise ValueError("URL must start with http:// or https://")
options = Options()
options.add_argument("--headless")
driver = self.driver(options=options)

View File

@@ -0,0 +1,32 @@
# SerpApi Tools
## Description
[SerpApi](https://serpapi.com/) tools are built for searching information in the internet. It currently supports:
- Google Search
- Google Shopping
To successfully make use of SerpApi tools, you have to have `SERPAPI_API_KEY` set in the environment. To get the API key, register a free account at [SerpApi](https://serpapi.com/).
## Installation
To start using the SerpApi Tools, you must first install the `crewai_tools` package. This can be easily done with the following command:
```shell
pip install 'crewai[tools]'
```
## Examples
The following example demonstrates how to initialize the tool
### Google Search
```python
from crewai_tools import SerpApiGoogleSearchTool
tool = SerpApiGoogleSearchTool()
```
### Google Shopping
```python
from crewai_tools import SerpApiGoogleShoppingTool
tool = SerpApiGoogleShoppingTool()
```

View File

@@ -0,0 +1,38 @@
import os
import re
from typing import Optional, Any, Union
from crewai.tools import BaseTool
class SerpApiBaseTool(BaseTool):
"""Base class for SerpApi functionality with shared capabilities."""
client: Optional[Any] = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
try:
from serpapi import Client
except ImportError:
raise ImportError(
"`serpapi` package not found, please install with `pip install serpapi`"
)
api_key = os.getenv("SERPAPI_API_KEY")
if not api_key:
raise ValueError(
"Missing API key, you can get the key from https://serpapi.com/manage-api-key"
)
self.client = Client(api_key=api_key)
def _omit_fields(self, data: Union[dict, list], omit_patterns: list[str]) -> None:
if isinstance(data, dict):
for field in list(data.keys()):
if any(re.compile(p).match(field) for p in omit_patterns):
data.pop(field, None)
else:
if isinstance(data[field], (dict, list)):
self._omit_fields(data[field], omit_patterns)
elif isinstance(data, list):
for item in data:
self._omit_fields(item, omit_patterns)

View File

@@ -0,0 +1,40 @@
from typing import Any, Type, Optional
import re
from pydantic import BaseModel, Field
from .serpapi_base_tool import SerpApiBaseTool
from serpapi import HTTPError
class SerpApiGoogleSearchToolSchema(BaseModel):
"""Input for Google Search."""
search_query: str = Field(..., description="Mandatory search query you want to use to Google search.")
location: Optional[str] = Field(None, description="Location you want the search to be performed in.")
class SerpApiGoogleSearchTool(SerpApiBaseTool):
name: str = "Google Search"
description: str = (
"A tool to perform to perform a Google search with a search_query."
)
args_schema: Type[BaseModel] = SerpApiGoogleSearchToolSchema
def _run(
self,
**kwargs: Any,
) -> Any:
try:
results = self.client.search({
"q": kwargs.get("search_query"),
"location": kwargs.get("location"),
}).as_dict()
self._omit_fields(
results,
[r"search_metadata", r"search_parameters", r"serpapi_.+", r".+_token", r"displayed_link", r"pagination"]
)
return results
except HTTPError as e:
return f"An error occurred: {str(e)}. Some parameters may be invalid."

View File

@@ -0,0 +1,42 @@
from typing import Any, Type, Optional
import re
from pydantic import BaseModel, Field
from .serpapi_base_tool import SerpApiBaseTool
from serpapi import HTTPError
class SerpApiGoogleShoppingToolSchema(BaseModel):
"""Input for Google Shopping."""
search_query: str = Field(..., description="Mandatory search query you want to use to Google shopping.")
location: Optional[str] = Field(None, description="Location you want the search to be performed in.")
class SerpApiGoogleShoppingTool(SerpApiBaseTool):
name: str = "Google Shopping"
description: str = (
"A tool to perform search on Google shopping with a search_query."
)
args_schema: Type[BaseModel] = SerpApiGoogleShoppingToolSchema
def _run(
self,
**kwargs: Any,
) -> Any:
try:
results = self.client.search({
"engine": "google_shopping",
"q": kwargs.get("search_query"),
"location": kwargs.get("location")
}).as_dict()
self._omit_fields(
results,
[r"search_metadata", r"search_parameters", r"serpapi_.+", r"filters", r"pagination"]
)
return results
except HTTPError as e:
return f"An error occurred: {str(e)}. Some parameters may be invalid."

View File

@@ -1,30 +1,49 @@
# SerperDevTool Documentation
## Description
This tool is designed to perform a semantic search for a specified query from a text's content across the internet. It utilizes the `serper.dev` API to fetch and display the most relevant search results based on the query provided by the user.
The SerperDevTool is a powerful search tool that interfaces with the `serper.dev` API to perform internet searches. It supports multiple search types including general search and news search, with features like knowledge graph integration, organic results, "People Also Ask" questions, and related searches.
## Features
- Multiple search types: 'search' (default) and 'news'
- Knowledge graph integration for enhanced search context
- Organic search results with sitelinks
- "People Also Ask" questions and answers
- Related searches suggestions
- News search with date, source, and image information
- Configurable number of results
- Optional result saving to file
## Installation
To incorporate this tool into your project, follow the installation instructions below:
```shell
pip install 'crewai[tools]'
```
## Example
The following example demonstrates how to initialize the tool and execute a search with a given query:
## Usage
```python
from crewai_tools import SerperDevTool
# Initialize the tool for internet searching capabilities
tool = SerperDevTool()
# Initialize the tool
tool = SerperDevTool(
n_results=10, # Optional: Number of results to return (default: 10)
save_file=False, # Optional: Save results to file (default: False)
search_type="search" # Optional: Type of search - "search" or "news" (default: "search")
)
# Execute a search
results = tool._run(search_query="your search query")
```
## Steps to Get Started
To effectively use the `SerperDevTool`, follow these steps:
## Configuration
1. **API Key Setup**:
- Sign up for an account at `serper.dev`
- Obtain your API key
- Set the environment variable: `SERPER_API_KEY`
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for a free account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
## Conclusion
By integrating the `SerperDevTool` into Python projects, users gain the ability to conduct real-time, relevant searches across the internet directly from their applications. By adhering to the setup and usage guidelines provided, incorporating this tool into projects is streamlined and straightforward.
## Response Format
The tool returns structured data including:
- Search parameters
- Knowledge graph data (for general search)
- Organic search results
- "People Also Ask" questions
- Related searches
- News results (for news search type)

View File

@@ -1,19 +1,29 @@
import datetime
import json
import os
from typing import Any, Optional, Type
import logging
from typing import Any, Type
import requests
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def _save_results_to_file(content: str) -> None:
"""Saves the search results to a file."""
filename = f"search_results_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.txt"
with open(filename, "w") as file:
file.write(content)
print(f"Results saved to {filename}")
try:
filename = f"search_results_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.txt"
with open(filename, "w") as file:
file.write(content)
logger.info(f"Results saved to {filename}")
except IOError as e:
logger.error(f"Failed to save results to file: {e}")
raise
class SerperDevToolSchema(BaseModel):
@@ -27,67 +37,199 @@ class SerperDevToolSchema(BaseModel):
class SerperDevTool(BaseTool):
name: str = "Search the internet"
description: str = (
"A tool that can be used to search the internet with a search_query."
"A tool that can be used to search the internet with a search_query. "
"Supports different search types: 'search' (default), 'news'"
)
args_schema: Type[BaseModel] = SerperDevToolSchema
search_url: str = "https://google.serper.dev/search"
country: Optional[str] = ""
location: Optional[str] = ""
locale: Optional[str] = ""
base_url: str = "https://google.serper.dev"
n_results: int = 10
save_file: bool = False
search_type: str = "search"
def _run(
self,
**kwargs: Any,
) -> Any:
def _get_search_url(self, search_type: str) -> str:
"""Get the appropriate endpoint URL based on search type."""
search_type = search_type.lower()
allowed_search_types = ["search", "news"]
if search_type not in allowed_search_types:
raise ValueError(
f"Invalid search type: {search_type}. Must be one of: {', '.join(allowed_search_types)}"
)
return f"{self.base_url}/{search_type}"
search_query = kwargs.get("search_query") or kwargs.get("query")
save_file = kwargs.get("save_file", self.save_file)
n_results = kwargs.get("n_results", self.n_results)
def _process_knowledge_graph(self, kg: dict) -> dict:
"""Process knowledge graph data from search results."""
return {
"title": kg.get("title", ""),
"type": kg.get("type", ""),
"website": kg.get("website", ""),
"imageUrl": kg.get("imageUrl", ""),
"description": kg.get("description", ""),
"descriptionSource": kg.get("descriptionSource", ""),
"descriptionLink": kg.get("descriptionLink", ""),
"attributes": kg.get("attributes", {}),
}
payload = {"q": search_query, "num": n_results}
def _process_organic_results(self, organic_results: list) -> list:
"""Process organic search results."""
processed_results = []
for result in organic_results[: self.n_results]:
try:
result_data = {
"title": result["title"],
"link": result["link"],
"snippet": result.get("snippet", ""),
"position": result.get("position"),
}
if self.country != "":
payload["gl"] = self.country
if self.location != "":
payload["location"] = self.location
if self.locale != "":
payload["hl"] = self.locale
if "sitelinks" in result:
result_data["sitelinks"] = [
{
"title": sitelink.get("title", ""),
"link": sitelink.get("link", ""),
}
for sitelink in result["sitelinks"]
]
payload = json.dumps(payload)
processed_results.append(result_data)
except KeyError:
logger.warning(f"Skipping malformed organic result: {result}")
continue
return processed_results
def _process_people_also_ask(self, paa_results: list) -> list:
"""Process 'People Also Ask' results."""
processed_results = []
for result in paa_results[: self.n_results]:
try:
result_data = {
"question": result["question"],
"snippet": result.get("snippet", ""),
"title": result.get("title", ""),
"link": result.get("link", ""),
}
processed_results.append(result_data)
except KeyError:
logger.warning(f"Skipping malformed PAA result: {result}")
continue
return processed_results
def _process_related_searches(self, related_results: list) -> list:
"""Process related search results."""
processed_results = []
for result in related_results[: self.n_results]:
try:
processed_results.append({"query": result["query"]})
except KeyError:
logger.warning(f"Skipping malformed related search result: {result}")
continue
return processed_results
def _process_news_results(self, news_results: list) -> list:
"""Process news search results."""
processed_results = []
for result in news_results[: self.n_results]:
try:
result_data = {
"title": result["title"],
"link": result["link"],
"snippet": result.get("snippet", ""),
"date": result.get("date", ""),
"source": result.get("source", ""),
"imageUrl": result.get("imageUrl", ""),
}
processed_results.append(result_data)
except KeyError:
logger.warning(f"Skipping malformed news result: {result}")
continue
return processed_results
def _make_api_request(self, search_query: str, search_type: str) -> dict:
"""Make API request to Serper."""
search_url = self._get_search_url(search_type)
payload = json.dumps({"q": search_query, "num": self.n_results})
headers = {
"X-API-KEY": os.environ["SERPER_API_KEY"],
"content-type": "application/json",
}
response = requests.request(
"POST", self.search_url, headers=headers, data=payload
)
results = response.json()
if "organic" in results:
results = results["organic"][: self.n_results]
string = []
for result in results:
try:
string.append(
"\n".join(
[
f"Title: {result['title']}",
f"Link: {result['link']}",
f"Snippet: {result['snippet']}",
"---",
]
)
)
except KeyError:
continue
content = "\n".join(string)
if save_file:
_save_results_to_file(content)
return f"\nSearch results: {content}\n"
else:
response = None
try:
response = requests.post(
search_url, headers=headers, json=json.loads(payload), timeout=10
)
response.raise_for_status()
results = response.json()
if not results:
logger.error("Empty response from Serper API")
raise ValueError("Empty response from Serper API")
return results
except requests.exceptions.RequestException as e:
error_msg = f"Error making request to Serper API: {e}"
if response is not None and hasattr(response, "content"):
error_msg += f"\nResponse content: {response.content}"
logger.error(error_msg)
raise
except json.JSONDecodeError as e:
if response is not None and hasattr(response, "content"):
logger.error(f"Error decoding JSON response: {e}")
logger.error(f"Response content: {response.content}")
else:
logger.error(
f"Error decoding JSON response: {e} (No response content available)"
)
raise
def _process_search_results(self, results: dict, search_type: str) -> dict:
"""Process search results based on search type."""
formatted_results = {}
if search_type == "search":
if "knowledgeGraph" in results:
formatted_results["knowledgeGraph"] = self._process_knowledge_graph(
results["knowledgeGraph"]
)
if "organic" in results:
formatted_results["organic"] = self._process_organic_results(
results["organic"]
)
if "peopleAlsoAsk" in results:
formatted_results["peopleAlsoAsk"] = self._process_people_also_ask(
results["peopleAlsoAsk"]
)
if "relatedSearches" in results:
formatted_results["relatedSearches"] = self._process_related_searches(
results["relatedSearches"]
)
elif search_type == "news":
if "news" in results:
formatted_results["news"] = self._process_news_results(results["news"])
return formatted_results
def _run(self, **kwargs: Any) -> Any:
"""Execute the search operation."""
search_query = kwargs.get("search_query") or kwargs.get("query")
search_type = kwargs.get("search_type", self.search_type)
save_file = kwargs.get("save_file", self.save_file)
results = self._make_api_request(search_query, search_type)
formatted_results = {
"searchParameters": {
"q": search_query,
"type": search_type,
**results.get("searchParameters", {}),
}
}
formatted_results.update(self._process_search_results(results, search_type))
formatted_results["credits"] = results.get("credits", 1)
if save_file:
_save_results_to_file(json.dumps(formatted_results, indent=2))
return formatted_results

View File

@@ -1,81 +1,87 @@
# SpiderTool
## Description
[Spider](https://spider.cloud/?ref=crewai) is the [fastest](https://github.com/spider-rs/spider/blob/main/benches/BENCHMARKS.md#benchmark-results) open source scraper and crawler that returns LLM-ready data. It converts any website into pure HTML, markdown, metadata or text while enabling you to crawl with custom actions using AI.
[Spider](https://spider.cloud/?ref=crewai) is a high-performance web scraping and crawling tool that delivers optimized markdown for LLMs and AI agents. It intelligently switches between HTTP requests and JavaScript rendering based on page requirements. Perfect for both single-page scraping and website crawling—making it ideal for content extraction and data collection.
## Installation
To use the Spider API you need to download the [Spider SDK](https://pypi.org/project/spider-client/) and the crewai[tools] SDK too:
To use the Spider API you need to download the [Spider SDK](https://pypi.org/project/spider-client/) and the crewai[tools] SDK, too:
```python
pip install spider-client 'crewai[tools]'
```
## Example
This example shows you how you can use the Spider tool to enable your agent to scrape and crawl websites. The data returned from the Spider API is already LLM-ready, so no need to do any cleaning there.
This example shows you how you can use the Spider tool to enable your agent to scrape and crawl websites. The data returned from the Spider API is LLM-ready.
```python
from crewai_tools import SpiderTool
def main():
spider_tool = SpiderTool()
searcher = Agent(
role="Web Research Expert",
goal="Find related information from specific URL's",
backstory="An expert web researcher that uses the web extremely well",
tools=[spider_tool],
verbose=True,
)
# To enable scraping any website it finds during its execution
spider_tool = SpiderTool(api_key='YOUR_API_KEY')
return_metadata = Task(
description="Scrape https://spider.cloud with a limit of 1 and enable metadata",
expected_output="Metadata and 10 word summary of spider.cloud",
agent=searcher
)
# Initialize the tool with the website URL, so the agent can only scrape the content of the specified website
spider_tool = SpiderTool(website_url='https://spider.cloud')
crew = Crew(
agents=[searcher],
tasks=[
return_metadata,
],
verbose=2
)
crew.kickoff()
# Pass in custom parameters, see below for more details
spider_tool = SpiderTool(
website_url='https://spider.cloud',
custom_params={"depth": 2, "anti_bot": True, "proxy_enabled": True}
)
if __name__ == "__main__":
main()
# Advanced usage using css query selector to extract content
css_extraction_map = {
"/": [ # pass in path (main index in this case)
{
"name": "headers", # give it a name for this element
"selectors": [
"h1"
]
}
]
}
spider_tool = SpiderTool(
website_url='https://spider.cloud',
custom_params={"anti_bot": True, "proxy_enabled": True, "metadata": True, "css_extraction_map": css_extraction_map}
)
### Response (extracted text will be in the metadata)
"css_extracted": {
"headers": [
"The Web Crawler for AI Agents and LLMs!"
]
}
```
## Agent setup
```yaml
researcher:
role: >
You're a researcher that is tasked with researching a website and it's content (use crawl mode). The website is to crawl is: {website_url}.
```
## Arguments
- `api_key` (string, optional): Specifies Spider API key. If not specified, it looks for `SPIDER_API_KEY` in environment variables.
- `params` (object, optional): Optional parameters for the request. Defaults to `{"return_format": "markdown"}` to return the website's content in a format that fits LLMs better.
- `website_url` (string): The website URL. Will be used as a fallback if passed when the tool is initialized.
- `log_failures` (bool): Log scrape failures or fail silently. Defaults to `true`.
- `custom_params` (object, optional): Optional parameters for the request.
- `return_format` (string): The return format of the website's content. Defaults to `markdown`.
- `request` (string): The request type to perform. Possible values are `http`, `chrome`, and `smart`. Use `smart` to perform an HTTP request by default until JavaScript rendering is needed for the HTML.
- `limit` (int): The maximum number of pages allowed to crawl per website. Remove the value or set it to `0` to crawl all pages.
- `depth` (int): The crawl limit for maximum depth. If `0`, no limit will be applied.
- `cache` (bool): Use HTTP caching for the crawl to speed up repeated runs. Default is `true`.
- `budget` (object): Object that has paths with a counter for limiting the amount of pages example `{"*":1}` for only crawling the root page.
- `locale` (string): The locale to use for request, example `en-US`.
- `cookies` (string): Add HTTP cookies to use for request.
- `stealth` (bool): Use stealth mode for headless chrome request to help prevent being blocked. The default is `true` on chrome.
- `headers` (object): Forward HTTP headers to use for all request. The object is expected to be a map of key value pairs.
- `metadata` (bool): Boolean to store metadata about the pages and content found. This could help improve AI interopt. Defaults to `false` unless you have the website already stored with the configuration enabled.
- `viewport` (object): Configure the viewport for chrome. Defaults to `800x600`.
- `encoding` (string): The type of encoding to use like `UTF-8`, `SHIFT_JIS`, or etc.
- `metadata` (bool): Boolean to store metadata about the pages and content found. Defaults to `false`.
- `subdomains` (bool): Allow subdomains to be included. Default is `false`.
- `user_agent` (string): Add a custom HTTP user agent to the request. By default this is set to a random agent.
- `store_data` (bool): Boolean to determine if storage should be used. If set this takes precedence over `storageless`. Defaults to `false`.
- `gpt_config` (object): Use AI to generate actions to perform during the crawl. You can pass an array for the `"prompt"` to chain steps.
- `fingerprint` (bool): Use advanced fingerprint for chrome.
- `storageless` (bool): Boolean to prevent storing any type of data for the request including storage and AI vectors embedding. Defaults to `false` unless you have the website already stored.
- `readability` (bool): Use [readability](https://github.com/mozilla/readability) to pre-process the content for reading. This may drastically improve the content for LLM usage.
`return_format` (string): The format to return the data in. Possible values are `markdown`, `raw`, `text`, and `html2text`. Use `raw` to return the default format of the page like HTML etc.
- `proxy_enabled` (bool): Enable high performance premium proxies for the request to prevent being blocked at the network level.
- `query_selector` (string): The CSS query selector to use when extracting content from the markup.
- `full_resources` (bool): Crawl and download all the resources for a website.
- `css_extraction_map` (object): Use CSS or XPath selectors to scrape contents from the web page. Set the paths and the extraction object map to perform extractions per path or page.
- `request_timeout` (int): The timeout to use for request. Timeouts can be from `5-60`. The default is `30` seconds.
- `run_in_background` (bool): Run the request in the background. Useful if storing data and wanting to trigger crawls to the dashboard. This has no effect if storageless is set.
- `return_headers` (bool): Return the HTTP response headers with the results. Defaults to `false`.
- `filter_output_main_only` (bool): Filter the nav, aside, and footer from the output.
- `headers` (object): Forward HTTP headers to use for all request. The object is expected to be a map of key value pairs.
Learn other parameters that can be used: [https://spider.cloud/docs/api](https://spider.cloud/docs/api)

View File

@@ -1,60 +1,202 @@
import logging
from typing import Any, Dict, Literal, Optional, Type
from urllib.parse import unquote, urlparse
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
logger = logging.getLogger(__file__)
class SpiderToolSchema(BaseModel):
url: str = Field(description="Website URL")
params: Optional[Dict[str, Any]] = Field(
description="Set additional params. Options include:\n"
"- `limit`: Optional[int] - The maximum number of pages allowed to crawl per website. Remove the value or set it to `0` to crawl all pages.\n"
"- `depth`: Optional[int] - The crawl limit for maximum depth. If `0`, no limit will be applied.\n"
"- `metadata`: Optional[bool] - Boolean to include metadata or not. Defaults to `False` unless set to `True`. If the user wants metadata, include params.metadata = True.\n"
"- `query_selector`: Optional[str] - The CSS query selector to use when extracting content from the markup.\n"
"""Input schema for SpiderTool."""
website_url: str = Field(
..., description="Mandatory website URL to scrape or crawl"
)
mode: Literal["scrape", "crawl"] = Field(
default="scrape",
description="Mode, the only two allowed modes are `scrape` or `crawl`. Use `scrape` to scrape a single page and `crawl` to crawl the entire website following subpages. These modes are the only allowed values even when ANY params is set.",
description="The mode of the SpiderTool. The only two allowed modes are `scrape` or `crawl`. Crawl mode will follow up to 5 links and return their content in markdown format.",
)
class SpiderTool(BaseTool):
name: str = "Spider scrape & crawl tool"
description: str = "Scrape & Crawl any url and return LLM-ready data."
args_schema: Type[BaseModel] = SpiderToolSchema
api_key: Optional[str] = None
spider: Optional[Any] = None
class SpiderToolConfig(BaseModel):
"""Configuration settings for SpiderTool.
Contains all default values and constants used by SpiderTool.
Centralizes configuration management for easier maintenance.
"""
# Crawling settings
DEFAULT_CRAWL_LIMIT: int = 5
DEFAULT_RETURN_FORMAT: str = "markdown"
# Request parameters
DEFAULT_REQUEST_MODE: str = "smart"
FILTER_SVG: bool = True
class SpiderTool(BaseTool):
"""Tool for scraping and crawling websites.
This tool provides functionality to either scrape a single webpage or crawl multiple
pages, returning content in a format suitable for LLM processing.
"""
name: str = "SpiderTool"
description: str = (
"A tool to scrape or crawl a website and return LLM-ready content."
)
args_schema: Type[BaseModel] = SpiderToolSchema
custom_params: Optional[Dict[str, Any]] = None
website_url: Optional[str] = None
api_key: Optional[str] = None
spider: Any = None
log_failures: bool = True
config: SpiderToolConfig = SpiderToolConfig()
def __init__(
self,
api_key: Optional[str] = None,
website_url: Optional[str] = None,
custom_params: Optional[Dict[str, Any]] = None,
log_failures: bool = True,
**kwargs,
):
"""Initialize SpiderTool for web scraping and crawling.
Args:
api_key (Optional[str]): Spider API key for authentication. Required for production use.
website_url (Optional[str]): Default website URL to scrape/crawl. Can be overridden during execution.
custom_params (Optional[Dict[str, Any]]): Additional parameters to pass to Spider API.
These override any parameters set by the LLM.
log_failures (bool): If True, logs errors. Defaults to True.
**kwargs: Additional arguments passed to BaseTool.
Raises:
ImportError: If spider-client package is not installed.
RuntimeError: If Spider client initialization fails.
"""
def __init__(self, api_key: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
if website_url is not None:
self.website_url = website_url
self.log_failures = log_failures
self.custom_params = custom_params
try:
from spider import Spider # type: ignore
self.spider = Spider(api_key=api_key)
except ImportError:
raise ImportError(
"`spider-client` package not found, please run `pip install spider-client`"
)
except Exception as e:
raise RuntimeError(f"Failed to initialize Spider client: {str(e)}")
self.spider = Spider(api_key=api_key)
def _validate_url(self, url: str) -> bool:
"""Validate URL format and security constraints.
Args:
url (str): URL to validate. Must be a properly formatted HTTP(S) URL
Returns:
bool: True if URL is valid and meets security requirements, False otherwise.
"""
try:
url = url.strip()
decoded_url = unquote(url)
result = urlparse(decoded_url)
if not all([result.scheme, result.netloc]):
return False
if result.scheme not in ["http", "https"]:
return False
return True
except Exception:
return False
def _run(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
mode: Optional[Literal["scrape", "crawl"]] = "scrape",
):
if mode not in ["scrape", "crawl"]:
raise ValueError(
"Unknown mode in `mode` parameter, `scrape` or `crawl` are the allowed modes"
website_url: str,
mode: Literal["scrape", "crawl"] = "scrape",
) -> Optional[str]:
"""Execute the spider tool to scrape or crawl the specified website.
Args:
website_url (str): The URL to process. Must be a valid HTTP(S) URL.
mode (Literal["scrape", "crawl"]): Operation mode.
- "scrape": Extract content from single page
- "crawl": Follow links and extract content from multiple pages
Returns:
Optional[str]: Extracted content in markdown format, or None if extraction fails
and log_failures is True.
Raises:
ValueError: If URL is invalid or missing, or if mode is invalid.
ImportError: If spider-client package is not properly installed.
ConnectionError: If network connection fails while accessing the URL.
Exception: For other runtime errors.
"""
try:
params = {}
url = website_url or self.website_url
if not url:
raise ValueError(
"Website URL must be provided either during initialization or execution"
)
if not self._validate_url(url):
raise ValueError(f"Invalid URL format: {url}")
if mode not in ["scrape", "crawl"]:
raise ValueError(
f"Invalid mode: {mode}. Must be either 'scrape' or 'crawl'"
)
params = {
"request": self.config.DEFAULT_REQUEST_MODE,
"filter_output_svg": self.config.FILTER_SVG,
"return_format": self.config.DEFAULT_RETURN_FORMAT,
}
if mode == "crawl":
params["limit"] = self.config.DEFAULT_CRAWL_LIMIT
if self.custom_params:
params.update(self.custom_params)
action = (
self.spider.scrape_url if mode == "scrape" else self.spider.crawl_url
)
return action(url=url, params=params)
# Ensure 'return_format': 'markdown' is always included
if params:
params["return_format"] = "markdown"
else:
params = {"return_format": "markdown"}
except ValueError as ve:
if self.log_failures:
logger.error(f"Validation error for URL {url}: {str(ve)}")
return None
raise ve
action = self.spider.scrape_url if mode == "scrape" else self.spider.crawl_url
spider_docs = action(url=url, params=params)
except ImportError as ie:
logger.error(f"Spider client import error: {str(ie)}")
raise ie
return spider_docs
except ConnectionError as ce:
if self.log_failures:
logger.error(f"Connection error while accessing {url}: {str(ce)}")
return None
raise ce
except Exception as e:
if self.log_failures:
logger.error(
f"Unexpected error during {mode} operation on {url}: {str(e)}"
)
return None
raise e

View File

@@ -0,0 +1,93 @@
from unittest.mock import MagicMock, patch
from bs4 import BeautifulSoup
from crewai_tools.tools.selenium_scraping_tool.selenium_scraping_tool import (
SeleniumScrapingTool,
)
def mock_driver_with_html(html_content):
driver = MagicMock()
mock_element = MagicMock()
mock_element.get_attribute.return_value = html_content
bs = BeautifulSoup(html_content, "html.parser")
mock_element.text = bs.get_text()
driver.find_elements.return_value = [mock_element]
driver.find_element.return_value = mock_element
return driver
def initialize_tool_with(mock_driver):
tool = SeleniumScrapingTool()
tool.driver = MagicMock(return_value=mock_driver)
return tool
def test_tool_initialization():
tool = SeleniumScrapingTool()
assert tool.website_url is None
assert tool.css_element is None
assert tool.cookie is None
assert tool.wait_time == 3
assert tool.return_html is False
@patch("selenium.webdriver.Chrome")
def test_scrape_without_css_selector(_mocked_chrome_driver):
html_content = "<html><body><div>test content</div></body></html>"
mock_driver = mock_driver_with_html(html_content)
tool = initialize_tool_with(mock_driver)
result = tool._run(website_url="https://example.com")
assert "test content" in result
mock_driver.get.assert_called_once_with("https://example.com")
mock_driver.find_element.assert_called_with("tag name", "body")
mock_driver.close.assert_called_once()
@patch("selenium.webdriver.Chrome")
def test_scrape_with_css_selector(_mocked_chrome_driver):
html_content = "<html><body><div>test content</div><div class='test'>test content in a specific div</div></body></html>"
mock_driver = mock_driver_with_html(html_content)
tool = initialize_tool_with(mock_driver)
result = tool._run(website_url="https://example.com", css_element="div.test")
assert "test content in a specific div" in result
mock_driver.get.assert_called_once_with("https://example.com")
mock_driver.find_elements.assert_called_with("css selector", "div.test")
mock_driver.close.assert_called_once()
@patch("selenium.webdriver.Chrome")
def test_scrape_with_return_html_true(_mocked_chrome_driver):
html_content = "<html><body><div>HTML content</div></body></html>"
mock_driver = mock_driver_with_html(html_content)
tool = initialize_tool_with(mock_driver)
result = tool._run(website_url="https://example.com", return_html=True)
assert html_content in result
mock_driver.get.assert_called_once_with("https://example.com")
mock_driver.find_element.assert_called_with("tag name", "body")
mock_driver.close.assert_called_once()
@patch("selenium.webdriver.Chrome")
def test_scrape_with_return_html_false(_mocked_chrome_driver):
html_content = "<html><body><div>HTML content</div></body></html>"
mock_driver = mock_driver_with_html(html_content)
tool = initialize_tool_with(mock_driver)
result = tool._run(website_url="https://example.com", return_html=False)
assert "HTML content" in result
mock_driver.get.assert_called_once_with("https://example.com")
mock_driver.find_element.assert_called_with("tag name", "body")
mock_driver.close.assert_called_once()

View File

@@ -7,32 +7,47 @@ from crewai_tools.tools.code_interpreter_tool.code_interpreter_tool import (
class TestCodeInterpreterTool(unittest.TestCase):
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.docker")
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.docker_from_env")
def test_run_code_in_docker(self, docker_mock):
tool = CodeInterpreterTool()
code = "print('Hello, World!')"
libraries_used = "numpy,pandas"
libraries_used = ["numpy", "pandas"]
expected_output = "Hello, World!\n"
docker_mock.from_env().containers.run().exec_run().exit_code = 0
docker_mock.from_env().containers.run().exec_run().output = (
docker_mock().containers.run().exec_run().exit_code = 0
docker_mock().containers.run().exec_run().output = (
expected_output.encode()
)
result = tool.run_code_in_docker(code, libraries_used)
self.assertEqual(result, expected_output)
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.docker")
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.docker_from_env")
def test_run_code_in_docker_with_error(self, docker_mock):
tool = CodeInterpreterTool()
code = "print(1/0)"
libraries_used = "numpy,pandas"
libraries_used = ["numpy", "pandas"]
expected_output = "Something went wrong while running the code: \nZeroDivisionError: division by zero\n"
docker_mock.from_env().containers.run().exec_run().exit_code = 1
docker_mock.from_env().containers.run().exec_run().output = (
docker_mock().containers.run().exec_run().exit_code = 1
docker_mock().containers.run().exec_run().output = (
b"ZeroDivisionError: division by zero\n"
)
result = tool.run_code_in_docker(code, libraries_used)
self.assertEqual(result, expected_output)
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.docker_from_env")
def test_run_code_in_docker_with_script(self, docker_mock):
tool = CodeInterpreterTool()
code = """print("This is line 1")
print("This is line 2")"""
libraries_used = [] # No additional libraries needed for this test
expected_output = "This is line 1\nThis is line 2\n"
# Mock Docker responses
docker_mock().containers.run().exec_run().exit_code = 0
docker_mock().containers.run().exec_run().output = expected_output.encode()
result = tool.run_code_in_docker(code, libraries_used)
self.assertEqual(result, expected_output)