feat: Add Bright Data tools (#314)

* Initial commit of BrightData tools

* Renamed the BrightData test file names

* Refactored and improved the overall BrightData tools

* Add BrightData tools

* Add tools to init

* Added config class

* Fix test failures and add missing __init__.py files

- Remove problematic brightdata_dataset_tool_test.py that referenced non-existent classes
- Fix brightdata_serp_tool_test.py to expect string responses instead of dict
- Fix brightdata_webunlocker_tool_test.py to expect string responses instead of dict
- Add missing tests/tools/__init__.py for proper test imports

---------

Co-authored-by: Ranjan Dailata <ranjancse@gmail.com>
Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
This commit is contained in:
meirk-brd
2025-08-07 17:29:51 +03:00
committed by GitHub
parent d00c9764fc
commit 41ce4981ac
10 changed files with 1103 additions and 0 deletions

View File

@@ -12,6 +12,9 @@ from .tools import (
ApifyActorsTool,
ArxivPaperTool,
BraveSearchTool,
BrightDataWebUnlockerTool,
BrightDataSearchTool,
BrightDataDatasetTool,
BrowserbaseLoadTool,
CodeDocsSearchTool,
CodeInterpreterTool,

View File

@@ -102,4 +102,9 @@ from .youtube_channel_search_tool.youtube_channel_search_tool import (
YoutubeChannelSearchTool,
)
from .youtube_video_search_tool.youtube_video_search_tool import YoutubeVideoSearchTool
from .brightdata_tool import (
BrightDataDatasetTool,
BrightDataSearchTool,
BrightDataWebUnlockerTool
)
from .zapier_action_tool.zapier_action_tool import ZapierActionTools

View File

@@ -0,0 +1,79 @@
# BrightData Tools Documentation
## Description
A comprehensive suite of CrewAI tools that leverage Bright Data's powerful infrastructure for web scraping, data extraction, and search operations. These tools provide three distinct capabilities:
- **BrightDataDatasetTool**: Extract structured data from popular data feeds (Amazon, LinkedIn, Instagram, etc.) using pre-built datasets
- **BrightDataSearchTool**: Perform web searches across multiple search engines with geo-targeting and device simulation
- **BrightDataWebUnlockerTool**: Scrape any website content while bypassing bot protection mechanisms
## Installation
To incorporate these tools into your project, follow the installation instructions below:
```shell
pip install crewai[tools] aiohttp requests
```
## Examples
### Dataset Tool - Extract Amazon Product Data
```python
from crewai_tools import BrightDataDatasetTool
# Initialize with specific dataset and URL
tool = BrightDataDatasetTool(
dataset_type="amazon_product",
url="https://www.amazon.com/dp/B08QB1QMJ5/"
)
result = tool.run()
```
### Search Tool - Perform Web Search
```python
from crewai_tools import BrightDataSearchTool
# Initialize with search query
tool = BrightDataSearchTool(
query="latest AI trends 2025",
search_engine="google",
country="us"
)
result = tool.run()
```
### Web Unlocker Tool - Scrape Website Content
```python
from crewai_tools import BrightDataWebUnlockerTool
# Initialize with target URL
tool = BrightDataWebUnlockerTool(
url="https://example.com",
data_format="markdown"
)
result = tool.run()
```
## Steps to Get Started
To effectively use the BrightData Tools, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Register for a Bright Data account at `https://brightdata.com/` and obtain your API credentials from your account settings.
3. **Environment Configuration**: Set up the required environment variables:
```bash
export BRIGHT_DATA_API_KEY="your_api_key_here"
export BRIGHT_DATA_ZONE="your_zone_here"
```
4. **Tool Selection**: Choose the appropriate tool based on your needs:
- Use **DatasetTool** for structured data from supported platforms
- Use **SearchTool** for web search operations
- Use **WebUnlockerTool** for general website scraping
## Conclusion
By integrating BrightData Tools into your CrewAI agents, you gain access to enterprise-grade web scraping and data extraction capabilities. These tools handle complex challenges like bot protection, geo-restrictions, and data parsing, allowing you to focus on building your applications rather than managing scraping infrastructure.

View File

@@ -0,0 +1,9 @@
from .brightdata_dataset import BrightDataDatasetTool
from .brightdata_serp import BrightDataSearchTool
from .brightdata_unlocker import BrightDataWebUnlockerTool
__all__ = [
"BrightDataDatasetTool",
"BrightDataSearchTool",
"BrightDataWebUnlockerTool"
]

View File

@@ -0,0 +1,566 @@
import asyncio
import os
from typing import Any, Dict, Optional, Type
import aiohttp
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
class BrightDataConfig(BaseSettings):
API_URL: str = "https://api.brightdata.com"
DEFAULT_TIMEOUT: int = 600
DEFAULT_POLLING_INTERVAL: int = 1
class Config:
env_prefix = "BRIGHTDATA_"
class BrightDataDatasetToolException(Exception):
"""Exception raised for custom error in the application."""
def __init__(self, message, error_code):
self.message = message
super().__init__(message)
self.error_code = error_code
def __str__(self):
return f"{self.message} (Error Code: {self.error_code})"
class BrightDataDatasetToolSchema(BaseModel):
"""
Schema for validating input parameters for the BrightDataDatasetTool.
Attributes:
dataset_type (str): Required Bright Data Dataset Type used to specify which dataset to access.
format (str): Response format (json by default). Multiple formats exist - json, ndjson, jsonl, csv
url (str): The URL from which structured data needs to be extracted.
zipcode (Optional[str]): An optional ZIP code to narrow down the data geographically.
additional_params (Optional[Dict]): Extra parameters for the Bright Data API call.
"""
dataset_type: str = Field(..., description="The Bright Data Dataset Type")
format: Optional[str] = Field(
default="json", description="Response format (json by default)"
)
url: str = Field(..., description="The URL to extract data from")
zipcode: Optional[str] = Field(default=None, description="Optional zipcode")
additional_params: Optional[Dict[str, Any]] = Field(
default=None, description="Additional params if any"
)
config = BrightDataConfig()
BRIGHTDATA_API_URL = config.API_URL
timeout = config.DEFAULT_TIMEOUT
datasets = [
{
"id": "amazon_product",
"dataset_id": "gd_l7q7dkf244hwjntr0",
"description": "\n".join(
[
"Quickly read structured amazon product data.",
"Requires a valid product URL with /dp/ in it.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "amazon_product_reviews",
"dataset_id": "gd_le8e811kzy4ggddlq",
"description": "\n".join(
[
"Quickly read structured amazon product review data.",
"Requires a valid product URL with /dp/ in it.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "amazon_product_search",
"dataset_id": "gd_lwdb4vjm1ehb499uxs",
"description": "\n".join(
[
"Quickly read structured amazon product search data.",
"Requires a valid search keyword and amazon domain URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["keyword", "url", "pages_to_search"],
"defaults": {"pages_to_search": "1"},
},
{
"id": "walmart_product",
"dataset_id": "gd_l95fol7l1ru6rlo116",
"description": "\n".join(
[
"Quickly read structured walmart product data.",
"Requires a valid product URL with /ip/ in it.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "walmart_seller",
"dataset_id": "gd_m7ke48w81ocyu4hhz0",
"description": "\n".join(
[
"Quickly read structured walmart seller data.",
"Requires a valid walmart seller URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "ebay_product",
"dataset_id": "gd_ltr9mjt81n0zzdk1fb",
"description": "\n".join(
[
"Quickly read structured ebay product data.",
"Requires a valid ebay product URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "homedepot_products",
"dataset_id": "gd_lmusivh019i7g97q2n",
"description": "\n".join(
[
"Quickly read structured homedepot product data.",
"Requires a valid homedepot product URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "zara_products",
"dataset_id": "gd_lct4vafw1tgx27d4o0",
"description": "\n".join(
[
"Quickly read structured zara product data.",
"Requires a valid zara product URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "etsy_products",
"dataset_id": "gd_ltppk0jdv1jqz25mz",
"description": "\n".join(
[
"Quickly read structured etsy product data.",
"Requires a valid etsy product URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "bestbuy_products",
"dataset_id": "gd_ltre1jqe1jfr7cccf",
"description": "\n".join(
[
"Quickly read structured bestbuy product data.",
"Requires a valid bestbuy product URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "linkedin_person_profile",
"dataset_id": "gd_l1viktl72bvl7bjuj0",
"description": "\n".join(
[
"Quickly read structured linkedin people profile data.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "linkedin_company_profile",
"dataset_id": "gd_l1vikfnt1wgvvqz95w",
"description": "\n".join(
[
"Quickly read structured linkedin company profile data",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "linkedin_job_listings",
"dataset_id": "gd_lpfll7v5hcqtkxl6l",
"description": "\n".join(
[
"Quickly read structured linkedin job listings data",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "linkedin_posts",
"dataset_id": "gd_lyy3tktm25m4avu764",
"description": "\n".join(
[
"Quickly read structured linkedin posts data",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "linkedin_people_search",
"dataset_id": "gd_m8d03he47z8nwb5xc",
"description": "\n".join(
[
"Quickly read structured linkedin people search data",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url", "first_name", "last_name"],
},
{
"id": "crunchbase_company",
"dataset_id": "gd_l1vijqt9jfj7olije",
"description": "\n".join(
[
"Quickly read structured crunchbase company data",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "zoominfo_company_profile",
"dataset_id": "gd_m0ci4a4ivx3j5l6nx",
"description": "\n".join(
[
"Quickly read structured ZoomInfo company profile data.",
"Requires a valid ZoomInfo company URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "instagram_profiles",
"dataset_id": "gd_l1vikfch901nx3by4",
"description": "\n".join(
[
"Quickly read structured Instagram profile data.",
"Requires a valid Instagram URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "instagram_posts",
"dataset_id": "gd_lk5ns7kz21pck8jpis",
"description": "\n".join(
[
"Quickly read structured Instagram post data.",
"Requires a valid Instagram URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "instagram_reels",
"dataset_id": "gd_lyclm20il4r5helnj",
"description": "\n".join(
[
"Quickly read structured Instagram reel data.",
"Requires a valid Instagram URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "instagram_comments",
"dataset_id": "gd_ltppn085pokosxh13",
"description": "\n".join(
[
"Quickly read structured Instagram comments data.",
"Requires a valid Instagram URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "facebook_posts",
"dataset_id": "gd_lyclm1571iy3mv57zw",
"description": "\n".join(
[
"Quickly read structured Facebook post data.",
"Requires a valid Facebook post URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "facebook_marketplace_listings",
"dataset_id": "gd_lvt9iwuh6fbcwmx1a",
"description": "\n".join(
[
"Quickly read structured Facebook marketplace listing data.",
"Requires a valid Facebook marketplace listing URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "facebook_company_reviews",
"dataset_id": "gd_m0dtqpiu1mbcyc2g86",
"description": "\n".join(
[
"Quickly read structured Facebook company reviews data.",
"Requires a valid Facebook company URL and number of reviews.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url", "num_of_reviews"],
},
{
"id": "facebook_events",
"dataset_id": "gd_m14sd0to1jz48ppm51",
"description": "\n".join(
[
"Quickly read structured Facebook events data.",
"Requires a valid Facebook event URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "tiktok_profiles",
"dataset_id": "gd_l1villgoiiidt09ci",
"description": "\n".join(
[
"Quickly read structured Tiktok profiles data.",
"Requires a valid Tiktok profile URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "tiktok_posts",
"dataset_id": "gd_lu702nij2f790tmv9h",
"description": "\n".join(
[
"Quickly read structured Tiktok post data.",
"Requires a valid Tiktok post URL.",
"This can be a cache lookup, so it can be more reliable than scraping",
]
),
"inputs": ["url"],
},
{
"id": "tiktok_shop",
"dataset_id": "gd_m45m1u911dsa4274pi",
"description": "\n".join(
[
"Quickly read structured Tiktok shop data.",
"Requires a valid Tiktok shop product URL.",
"This can be a cache lookup...",
]
),
"inputs": ["url"],
},
]
class BrightDataDatasetTool(BaseTool):
"""
CrewAI-compatible tool for scraping structured data using Bright Data Datasets.
Attributes:
name (str): Tool name displayed in the CrewAI environment.
description (str): Tool description shown to agents or users.
args_schema (Type[BaseModel]): Pydantic schema for validating input arguments.
"""
name: str = "Bright Data Dataset Tool"
description: str = "Scrapes structured data using Bright Data Dataset API from a URL and optional input parameters"
args_schema: Type[BaseModel] = BrightDataDatasetToolSchema
dataset_type: Optional[str] = None
url: Optional[str] = None
format: str = "json"
zipcode: Optional[str] = None
additional_params: Optional[Dict[str, Any]] = None
def __init__(self, dataset_type: str = None, url: str = None, format: str = "json", zipcode: str = None, additional_params: Dict[str, Any] = None):
super().__init__()
self.dataset_type = dataset_type
self.url = url
self.format = format
self.zipcode = zipcode
self.additional_params = additional_params
def filter_dataset_by_id(self, target_id):
return [dataset for dataset in datasets if dataset["id"] == target_id]
async def get_dataset_data_async(
self,
dataset_type: str,
output_format: str,
url: str,
zipcode: Optional[str] = None,
additional_params: Optional[Dict[str, Any]] = None,
polling_interval: int = 1,
) -> Dict:
"""
Asynchronously trigger and poll Bright Data dataset scraping.
Args:
dataset_type (str): Bright Data Dataset Type.
url (str): Target URL to scrape.
zipcode (Optional[str]): Optional ZIP code for geo-specific data.
additional_params (Optional[Dict]): Extra API parameters.
polling_interval (int): Time interval in seconds between polling attempts.
Returns:
Dict: Structured dataset result from Bright Data.
Raises:
Exception: If any API step fails or the job fails.
TimeoutError: If polling times out before job completion.
"""
request_data = {"url": url}
if zipcode is not None:
request_data["zipcode"] = zipcode
# Set additional parameters dynamically depending upon the dataset that is being requested
if additional_params:
request_data.update(additional_params)
api_key = os.getenv("BRIGHT_DATA_API_KEY")
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
dataset_id = ""
dataset = self.filter_dataset_by_id(dataset_type)
if len(dataset) == 1:
dataset_id = dataset[0]["dataset_id"]
else:
raise ValueError(
f"Unable to find the dataset for {dataset_type}. Please make sure to pass a valid one"
)
async with aiohttp.ClientSession() as session:
# Step 1: Trigger job
async with session.post(
f"{BRIGHTDATA_API_URL}/datasets/v3/trigger",
params={"dataset_id": dataset_id, "include_errors": "true"},
json=[request_data],
headers=headers,
) as trigger_response:
if trigger_response.status != 200:
raise BrightDataDatasetToolException(
f"Trigger failed: {await trigger_response.text()}",
trigger_response.status,
)
trigger_data = await trigger_response.json()
print(trigger_data)
snapshot_id = trigger_data.get("snapshot_id")
# Step 2: Poll for completion
elapsed = 0
while elapsed < timeout:
await asyncio.sleep(polling_interval)
elapsed += polling_interval
async with session.get(
f"{BRIGHTDATA_API_URL}/datasets/v3/progress/{snapshot_id}",
headers=headers,
) as status_response:
if status_response.status != 200:
raise BrightDataDatasetToolException(
f"Status check failed: {await status_response.text()}",
status_response.status,
)
status_data = await status_response.json()
if status_data.get("status") == "ready":
print("Job is ready")
break
elif status_data.get("status") == "error":
raise BrightDataDatasetToolException(
f"Job failed: {status_data}", 0
)
else:
raise TimeoutError("Polling timed out before job completed.")
# Step 3: Retrieve result
async with session.get(
f"{BRIGHTDATA_API_URL}/datasets/v3/snapshot/{snapshot_id}",
params={"format": output_format},
headers=headers,
) as snapshot_response:
if snapshot_response.status != 200:
raise BrightDataDatasetToolException(
f"Result fetch failed: {await snapshot_response.text()}",
snapshot_response.status,
)
return await snapshot_response.text()
def _run(self, url: str = None, dataset_type: str = None, format: str = None, zipcode: str = None, additional_params: Dict[str, Any] = None, **kwargs: Any) -> Any:
dataset_type = dataset_type or self.dataset_type
output_format = format or self.format
url = url or self.url
zipcode = zipcode or self.zipcode
additional_params = additional_params or self.additional_params
if not dataset_type:
raise ValueError("dataset_type is required either in constructor or method call")
if not url:
raise ValueError("url is required either in constructor or method call")
valid_output_formats = {"json", "ndjson", "jsonl", "csv"}
if output_format not in valid_output_formats:
raise ValueError(
f"Unsupported output format: {output_format}. Must be one of {', '.join(valid_output_formats)}."
)
api_key = os.getenv("BRIGHT_DATA_API_KEY")
if not api_key:
raise ValueError("BRIGHT_DATA_API_KEY environment variable is required.")
try:
return asyncio.run(
self.get_dataset_data_async(
dataset_type=dataset_type,
output_format=output_format,
url=url,
zipcode=zipcode,
additional_params=additional_params,
)
)
except TimeoutError as e:
return f"Timeout Exception occured in method : get_dataset_data_async. Details - {str(e)}"
except BrightDataDatasetToolException as e:
return f"Exception occured in method : get_dataset_data_async. Details - {str(e)}"
except Exception as e:
return f"Bright Data API error: {str(e)}"

View File

@@ -0,0 +1,204 @@
import os
import urllib.parse
from typing import Any, Optional, Type
import requests
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
class BrightDataConfig(BaseSettings):
API_URL: str = "https://api.brightdata.com/request"
class Config:
env_prefix = "BRIGHTDATA_"
class BrightDataSearchToolSchema(BaseModel):
"""
Schema that defines the input arguments for the BrightDataSearchToolSchema.
Attributes:
query (str): The search query to be executed (e.g., "latest AI news").
search_engine (Optional[str]): The search engine to use ("google", "bing", "yandex"). Default is "google".
country (Optional[str]): Two-letter country code for geo-targeting (e.g., "us", "in"). Default is "us".
language (Optional[str]): Language code for search results (e.g., "en", "es"). Default is "en".
search_type (Optional[str]): Type of search, such as "isch" (images), "nws" (news), "jobs", etc.
device_type (Optional[str]): Device type to simulate ("desktop", "mobile", "ios", "android"). Default is "desktop".
parse_results (Optional[bool]): If True, results will be returned in structured JSON. If False, raw HTML. Default is True.
"""
query: str = Field(..., description="Search query to perform")
search_engine: Optional[str] = Field(
default="google",
description="Search engine domain (e.g., 'google', 'bing', 'yandex')",
)
country: Optional[str] = Field(
default="us",
description="Two-letter country code for geo-targeting (e.g., 'us', 'gb')",
)
language: Optional[str] = Field(
default="en",
description="Language code (e.g., 'en', 'es') used in the query URL",
)
search_type: Optional[str] = Field(
default=None,
description="Type of search (e.g., 'isch' for images, 'nws' for news)",
)
device_type: Optional[str] = Field(
default="desktop",
description="Device type to simulate (e.g., 'mobile', 'desktop', 'ios')",
)
parse_results: Optional[bool] = Field(
default=True,
description="Whether to parse and return JSON (True) or raw HTML/text (False)",
)
class BrightDataSearchTool(BaseTool):
"""
A web search tool that utilizes Bright Data's SERP API to perform queries and return either structured results
or raw page content from search engines like Google or Bing.
Attributes:
name (str): Tool name used by the agent.
description (str): A brief explanation of what the tool does.
args_schema (Type[BaseModel]): Schema class for validating tool arguments.
base_url (str): The Bright Data API endpoint used for making the POST request.
api_key (str): Bright Data API key loaded from the environment variable 'BRIGHT_DATA_API_KEY'.
zone (str): Zone identifier from Bright Data, loaded from the environment variable 'BRIGHT_DATA_ZONE'.
Raises:
ValueError: If API key or zone environment variables are not set.
"""
name: str = "Bright Data SERP Search"
description: str = "Tool to perform web search using Bright Data SERP API."
args_schema: Type[BaseModel] = BrightDataSearchToolSchema
_config = BrightDataConfig()
base_url: str = ""
api_key: str = ""
zone: str = ""
query: Optional[str] = None
search_engine: str = "google"
country: str = "us"
language: str = "en"
search_type: Optional[str] = None
device_type: str = "desktop"
parse_results: bool = True
def __init__(self, query: str = None, search_engine: str = "google", country: str = "us", language: str = "en", search_type: str = None, device_type: str = "desktop", parse_results: bool = True):
super().__init__()
self.base_url = self._config.API_URL
self.query = query
self.search_engine = search_engine
self.country = country
self.language = language
self.search_type = search_type
self.device_type = device_type
self.parse_results = parse_results
self.api_key = os.getenv("BRIGHT_DATA_API_KEY")
self.zone = os.getenv("BRIGHT_DATA_ZONE")
if not self.api_key:
raise ValueError("BRIGHT_DATA_API_KEY environment variable is required.")
if not self.zone:
raise ValueError("BRIGHT_DATA_ZONE environment variable is required.")
def get_search_url(self, engine: str, query: str):
if engine == "yandex":
return f"https://yandex.com/search/?text=${query}"
elif engine == "bing":
return f"https://www.bing.com/search?q=${query}"
return f"https://www.google.com/search?q=${query}"
def _run(self, query: str = None, search_engine: str = None, country: str = None, language: str = None, search_type: str = None, device_type: str = None, parse_results: bool = None, **kwargs) -> Any:
"""
Executes a search query using Bright Data SERP API and returns results.
Args:
query (str): The search query string (URL encoded internally).
search_engine (str): The search engine to use (default: "google").
country (str): Country code for geotargeting (default: "us").
language (str): Language code for the query (default: "en").
search_type (str): Optional type of search such as "nws", "isch", "jobs".
device_type (str): Optional device type to simulate (e.g., "mobile", "ios", "desktop").
parse_results (bool): If True, returns structured data; else raw page (default: True).
results_count (str or int): Number of search results to fetch (default: "10").
Returns:
dict or str: Parsed JSON data from Bright Data if available, otherwise error message.
"""
query = query or self.query
search_engine = search_engine or self.search_engine
country = country or self.country
language = language or self.language
search_type = search_type or self.search_type
device_type = device_type or self.device_type
parse_results = parse_results if parse_results is not None else self.parse_results
results_count = kwargs.get("results_count", "10")
# Validate required parameters
if not query:
raise ValueError("query is required either in constructor or method call")
# Build the search URL
query = urllib.parse.quote(query)
url = self.get_search_url(search_engine, query)
# Add parameters to the URL
params = []
if country:
params.append(f"gl={country}")
if language:
params.append(f"hl={language}")
if results_count:
params.append(f"num={results_count}")
if parse_results:
params.append(f"brd_json=1")
if search_type:
if search_type == "jobs":
params.append("ibp=htl;jobs")
else:
params.append(f"tbm={search_type}")
if device_type:
if device_type == "mobile":
params.append("brd_mobile=1")
elif device_type == "ios":
params.append("brd_mobile=ios")
elif device_type == "android":
params.append("brd_mobile=android")
# Combine parameters with the URL
if params:
url += "&" + "&".join(params)
# Set up the API request parameters
request_params = {"zone": self.zone, "url": url, "format": "raw"}
request_params = {k: v for k, v in request_params.items() if v is not None}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
try:
response = requests.post(
self.base_url, json=request_params, headers=headers
)
print(f"Status code: {response.status_code}")
response.raise_for_status()
return response.text
except requests.RequestException as e:
return f"Error performing BrightData search: {str(e)}"
except Exception as e:
return f"Error fetching results: {str(e)}"

View File

@@ -0,0 +1,119 @@
import os
from typing import Any, Optional, Type
import requests
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
class BrightDataConfig(BaseSettings):
API_URL: str = "https://api.brightdata.com/request"
class Config:
env_prefix = "BRIGHTDATA_"
class BrightDataUnlockerToolSchema(BaseModel):
"""
Pydantic schema for input parameters used by the BrightDataWebUnlockerTool.
This schema defines the structure and validation for parameters passed when performing
a web scraping request using Bright Data's Web Unlocker.
Attributes:
url (str): The target URL to scrape.
format (Optional[str]): Format of the response returned by Bright Data. Default 'raw' format.
data_format (Optional[str]): Response data format (html by default). markdown is one more option.
"""
url: str = Field(..., description="URL to perform the web scraping")
format: Optional[str] = Field(
default="raw", description="Response format (raw is standard)"
)
data_format: Optional[str] = Field(
default="markdown", description="Response data format (html by default)"
)
class BrightDataWebUnlockerTool(BaseTool):
"""
A tool for performing web scraping using the Bright Data Web Unlocker API.
This tool allows automated and programmatic access to web pages by routing requests
through Bright Data's unlocking and proxy infrastructure, which can bypass bot
protection mechanisms like CAPTCHA, geo-restrictions, and anti-bot detection.
Attributes:
name (str): Name of the tool.
description (str): Description of what the tool does.
args_schema (Type[BaseModel]): Pydantic model schema for expected input arguments.
base_url (str): Base URL of the Bright Data Web Unlocker API.
api_key (str): Bright Data API key (must be set in the BRIGHT_DATA_API_KEY environment variable).
zone (str): Bright Data zone identifier (must be set in the BRIGHT_DATA_ZONE environment variable).
Methods:
_run(**kwargs: Any) -> Any:
Sends a scraping request to Bright Data's Web Unlocker API and returns the result.
"""
name: str = "Bright Data Web Unlocker Scraping"
description: str = "Tool to perform web scraping using Bright Data Web Unlocker"
args_schema: Type[BaseModel] = BrightDataUnlockerToolSchema
_config = BrightDataConfig()
base_url: str = ""
api_key: str = ""
zone: str = ""
url: Optional[str] = None
format: str = "raw"
data_format: str = "markdown"
def __init__(self, url: str = None, format: str = "raw", data_format: str = "markdown"):
super().__init__()
self.base_url = self._config.API_URL
self.url = url
self.format = format
self.data_format = data_format
self.api_key = os.getenv("BRIGHT_DATA_API_KEY")
self.zone = os.getenv("BRIGHT_DATA_ZONE")
if not self.api_key:
raise ValueError("BRIGHT_DATA_API_KEY environment variable is required.")
if not self.zone:
raise ValueError("BRIGHT_DATA_ZONE environment variable is required.")
def _run(self, url: str = None, format: str = None, data_format: str = None, **kwargs: Any) -> Any:
url = url or self.url
format = format or self.format
data_format = data_format or self.data_format
if not url:
raise ValueError("url is required either in constructor or method call")
payload = {
"url": url,
"zone": self.zone,
"format": format,
}
valid_data_formats = {"html", "markdown"}
if data_format not in valid_data_formats:
raise ValueError(
f"Unsupported data format: {data_format}. Must be one of {', '.join(valid_data_formats)}."
)
if data_format == "markdown":
payload["data_format"] = "markdown"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
try:
response = requests.post(self.base_url, json=payload, headers=headers)
print(f"Status Code: {response.status_code}")
response.raise_for_status()
return response.text
except requests.RequestException as e:
return f"HTTP Error performing BrightData Web Unlocker Scrape: {e}\nResponse: {getattr(e.response, 'text', '')}"
except Exception as e:
return f"Error fetching results: {str(e)}"

0
tests/tools/__init__.py Normal file
View File

View File

@@ -0,0 +1,54 @@
import unittest
from unittest.mock import MagicMock, patch
from crewai_tools.tools.brightdata_tool.brightdata_serp import BrightDataSearchTool
class TestBrightDataSearchTool(unittest.TestCase):
@patch.dict(
"os.environ",
{"BRIGHT_DATA_API_KEY": "test_api_key", "BRIGHT_DATA_ZONE": "test_zone"},
)
def setUp(self):
self.tool = BrightDataSearchTool()
@patch("requests.post")
def test_run_successful_search(self, mock_post):
# Sample mock JSON response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = "mock response text"
mock_post.return_value = mock_response
# Define search input
input_data = {
"query": "latest AI news",
"search_engine": "google",
"country": "us",
"language": "en",
"search_type": "nws",
"device_type": "desktop",
"parse_results": True,
"save_file": False,
}
result = self.tool._run(**input_data)
# Assertions
self.assertIsInstance(result, str) # Your tool returns response.text (string)
mock_post.assert_called_once()
@patch("requests.post")
def test_run_with_request_exception(self, mock_post):
mock_post.side_effect = Exception("Timeout")
result = self.tool._run(query="AI", search_engine="google")
self.assertIn("Error", result)
def tearDown(self):
# Clean up env vars
pass
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,64 @@
from unittest.mock import Mock, patch
import requests
from crewai_tools.tools.brightdata_tool.brightdata_unlocker import (
BrightDataWebUnlockerTool,
)
@patch.dict(
"os.environ",
{"BRIGHT_DATA_API_KEY": "test_api_key", "BRIGHT_DATA_ZONE": "test_zone"},
)
@patch("crewai_tools.tools.brightdata_tool.brightdata_unlocker.requests.post")
def test_run_success_html(mock_post):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = "<html><body>Test</body></html>"
mock_response.raise_for_status = Mock()
mock_post.return_value = mock_response
tool = BrightDataWebUnlockerTool()
result = tool._run(url="https://example.com", format="html", save_file=False)
print(result)
@patch.dict(
"os.environ",
{"BRIGHT_DATA_API_KEY": "test_api_key", "BRIGHT_DATA_ZONE": "test_zone"},
)
@patch("crewai_tools.tools.brightdata_tool.brightdata_unlocker.requests.post")
def test_run_success_json(mock_post):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = "mock response text"
mock_response.raise_for_status = Mock()
mock_post.return_value = mock_response
tool = BrightDataWebUnlockerTool()
result = tool._run(url="https://example.com", format="json")
assert isinstance(result, str)
@patch.dict(
"os.environ",
{"BRIGHT_DATA_API_KEY": "test_api_key", "BRIGHT_DATA_ZONE": "test_zone"},
)
@patch("crewai_tools.tools.brightdata_tool.brightdata_unlocker.requests.post")
def test_run_http_error(mock_post):
mock_response = Mock()
mock_response.status_code = 403
mock_response.text = "Forbidden"
mock_response.raise_for_status.side_effect = requests.HTTPError(
response=mock_response
)
mock_post.return_value = mock_response
tool = BrightDataWebUnlockerTool()
result = tool._run(url="https://example.com")
assert "HTTP Error" in result
assert "Forbidden" in result