Bedrock AgentCore browser and code interpreter toolkits (#385)

* Added browser and code tools

* Added dependencies, moved imports inside class

* Added instructions in README

* Updated imports

* Updated imports

* Updated dependencies

* Fix 'get_current_page' utilities for Browser tools

* Support browser session cleanup from synchronous code

* Update browser tool examples for new changes

* Manually override _run->_arun and set nested loop when in crew event loop

* Browser async example

* update examples with uv

* Fix toolkit fields for code interpreter

* Update code interpreter examples

* update uv.lock

* Move nest_asyncio import

---------

Co-authored-by: Michael Chin <mchin188@yahoo.com>
This commit is contained in:
Piyush Jain
2025-07-18 14:33:39 -07:00
committed by GitHub
parent 9f6002a9dd
commit 2c38d1d448
10 changed files with 1835 additions and 7 deletions

View File

@@ -1,9 +1,16 @@
from .s3 import S3ReaderTool, S3WriterTool
from .bedrock import BedrockKBRetrieverTool, BedrockInvokeAgentTool
from .bedrock import (
BedrockKBRetrieverTool,
BedrockInvokeAgentTool,
create_browser_toolkit,
create_code_interpreter_toolkit,
)
__all__ = [
'S3ReaderTool',
'S3WriterTool',
'BedrockKBRetrieverTool',
'BedrockInvokeAgentTool'
]
"S3ReaderTool",
"S3WriterTool",
"BedrockKBRetrieverTool",
"BedrockInvokeAgentTool",
"create_browser_toolkit",
"create_code_interpreter_toolkit"
]

View File

@@ -1,4 +1,11 @@
from .knowledge_base.retriever_tool import BedrockKBRetrieverTool
from .agents.invoke_agent_tool import BedrockInvokeAgentTool
from .browser import create_browser_toolkit
from .code_interpreter import create_code_interpreter_toolkit
__all__ = ["BedrockKBRetrieverTool", "BedrockInvokeAgentTool"]
__all__ = [
"BedrockKBRetrieverTool",
"BedrockInvokeAgentTool",
"create_browser_toolkit",
"create_code_interpreter_toolkit"
]

View File

@@ -0,0 +1,158 @@
# AWS Bedrock Browser Tools
This toolkit provides a set of tools for interacting with web browsers through AWS Bedrock Browser. It enables your CrewAI agents to navigate websites, extract content, click elements, and more.
## Features
- Navigate to URLs and browse the web
- Extract text and hyperlinks from pages
- Click on elements using CSS selectors
- Navigate back through browser history
- Get information about the current webpage
- Multiple browser sessions with thread-based isolation
## Installation
Ensure you have the necessary dependencies:
```bash
uv add crewai-tools bedrock-agentcore beautifulsoup4 playwright nest-asyncio
```
## Usage
### Basic Usage
```python
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws.bedrock.browser import create_browser_toolkit
# Create the browser toolkit
toolkit, browser_tools = create_browser_toolkit(region="us-west-2")
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create a CrewAI agent that uses the browser tools
research_agent = Agent(
role="Web Researcher",
goal="Research and summarize web content",
backstory="You're an expert at finding information online.",
tools=browser_tools,
llm=llm
)
# Create a task for the agent
research_task = Task(
description="Navigate to https://example.com and extract all text content. Summarize the main points.",
expected_output="A list of bullet points containing the most important information on https://example.com. Plus, a description of the tool calls used, and actions performed to get to the page.",
agent=research_agent
)
# Create and run the crew
crew = Crew(
agents=[research_agent],
tasks=[research_task]
)
result = crew.kickoff()
print(f"\n***Final result:***\n\n{result}")
# Clean up browser resources when done
toolkit.sync_cleanup()
```
### Available Tools
The toolkit provides the following tools:
1. `navigate_browser` - Navigate to a URL
2. `click_element` - Click on an element using CSS selectors
3. `extract_text` - Extract all text from the current webpage
4. `extract_hyperlinks` - Extract all hyperlinks from the current webpage
5. `get_elements` - Get elements matching a CSS selector
6. `navigate_back` - Navigate to the previous page
7. `current_webpage` - Get information about the current webpage
### Advanced Usage (with async)
```python
import asyncio
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws.bedrock.browser import create_browser_toolkit
async def main():
# Create the browser toolkit with specific AWS region
toolkit, browser_tools = create_browser_toolkit(region="us-west-2")
tools_by_name = toolkit.get_tools_by_name()
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create agents with specific tools
navigator_agent = Agent(
role="Navigator",
goal="Find specific information across websites",
backstory="You navigate through websites to locate information.",
tools=[
tools_by_name["navigate_browser"],
tools_by_name["click_element"],
tools_by_name["navigate_back"]
],
llm=llm
)
content_agent = Agent(
role="Content Extractor",
goal="Extract and analyze webpage content",
backstory="You extract and analyze content from webpages.",
tools=[
tools_by_name["extract_text"],
tools_by_name["extract_hyperlinks"],
tools_by_name["get_elements"]
],
llm=llm
)
# Create tasks for the agents
navigation_task = Task(
description="Navigate to https://example.com, then click on the the 'More information...' link.",
expected_output="The status of the tool calls for this task.",
agent=navigator_agent,
)
extraction_task = Task(
description="Extract all text from the current page and summarize it.",
expected_output="The summary of the page, and a description of the tool calls used, and actions performed to get to the page.",
agent=content_agent,
)
# Create and run the crew
crew = Crew(
agents=[navigator_agent, content_agent],
tasks=[navigation_task, extraction_task]
)
result = await crew.kickoff_async()
# Clean up browser resources when done
toolkit.sync_cleanup()
return result
if __name__ == "__main__":
result = asyncio.run(main())
print(f"\n***Final result:***\n\n{result}")
```
## Requirements
- AWS account with access to Bedrock AgentCore API
- Properly configured AWS credentials

View File

@@ -0,0 +1,3 @@
from .browser_toolkit import BrowserToolkit, create_browser_toolkit
__all__ = ["BrowserToolkit", "create_browser_toolkit"]

View File

@@ -0,0 +1,260 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Dict, Tuple
if TYPE_CHECKING:
from playwright.async_api import Browser as AsyncBrowser
from playwright.sync_api import Browser as SyncBrowser
from bedrock_agentcore.tools.browser_client import BrowserClient
logger = logging.getLogger(__name__)
class BrowserSessionManager:
"""
Manages browser sessions for different threads.
This class maintains separate browser sessions for different threads,
enabling concurrent usage of browsers in multi-threaded environments.
Browsers are created lazily only when needed by tools.
"""
def __init__(self, region: str = "us-west-2"):
"""
Initialize the browser session manager.
Args:
region: AWS region for browser client
"""
self.region = region
self._async_sessions: Dict[str, Tuple[BrowserClient, AsyncBrowser]] = {}
self._sync_sessions: Dict[str, Tuple[BrowserClient, SyncBrowser]] = {}
async def get_async_browser(self, thread_id: str) -> AsyncBrowser:
"""
Get or create an async browser for the specified thread.
Args:
thread_id: Unique identifier for the thread requesting the browser
Returns:
An async browser instance specific to the thread
"""
if thread_id in self._async_sessions:
return self._async_sessions[thread_id][1]
return await self._create_async_browser_session(thread_id)
def get_sync_browser(self, thread_id: str) -> SyncBrowser:
"""
Get or create a sync browser for the specified thread.
Args:
thread_id: Unique identifier for the thread requesting the browser
Returns:
A sync browser instance specific to the thread
"""
if thread_id in self._sync_sessions:
return self._sync_sessions[thread_id][1]
return self._create_sync_browser_session(thread_id)
async def _create_async_browser_session(self, thread_id: str) -> AsyncBrowser:
"""
Create a new async browser session for the specified thread.
Args:
thread_id: Unique identifier for the thread
Returns:
The newly created async browser instance
Raises:
Exception: If browser session creation fails
"""
from bedrock_agentcore.tools.browser_client import BrowserClient
browser_client = BrowserClient(region=self.region)
try:
# Start browser session
browser_client.start()
# Get WebSocket connection info
ws_url, headers = browser_client.generate_ws_headers()
logger.info(
f"Connecting to async WebSocket endpoint for thread {thread_id}: {ws_url}"
)
from playwright.async_api import async_playwright
# Connect to browser using Playwright
playwright = await async_playwright().start()
browser = await playwright.chromium.connect_over_cdp(
endpoint_url=ws_url, headers=headers, timeout=30000
)
logger.info(
f"Successfully connected to async browser for thread {thread_id}"
)
# Store session resources
self._async_sessions[thread_id] = (browser_client, browser)
return browser
except Exception as e:
logger.error(
f"Failed to create async browser session for thread {thread_id}: {e}"
)
# Clean up resources if session creation fails
if browser_client:
try:
browser_client.stop()
except Exception as cleanup_error:
logger.warning(f"Error cleaning up browser client: {cleanup_error}")
raise
def _create_sync_browser_session(self, thread_id: str) -> SyncBrowser:
"""
Create a new sync browser session for the specified thread.
Args:
thread_id: Unique identifier for the thread
Returns:
The newly created sync browser instance
Raises:
Exception: If browser session creation fails
"""
from bedrock_agentcore.tools.browser_client import BrowserClient
browser_client = BrowserClient(region=self.region)
try:
# Start browser session
browser_client.start()
# Get WebSocket connection info
ws_url, headers = browser_client.generate_ws_headers()
logger.info(
f"Connecting to sync WebSocket endpoint for thread {thread_id}: {ws_url}"
)
from playwright.sync_api import sync_playwright
# Connect to browser using Playwright
playwright = sync_playwright().start()
browser = playwright.chromium.connect_over_cdp(
endpoint_url=ws_url, headers=headers, timeout=30000
)
logger.info(
f"Successfully connected to sync browser for thread {thread_id}"
)
# Store session resources
self._sync_sessions[thread_id] = (browser_client, browser)
return browser
except Exception as e:
logger.error(
f"Failed to create sync browser session for thread {thread_id}: {e}"
)
# Clean up resources if session creation fails
if browser_client:
try:
browser_client.stop()
except Exception as cleanup_error:
logger.warning(f"Error cleaning up browser client: {cleanup_error}")
raise
async def close_async_browser(self, thread_id: str) -> None:
"""
Close the async browser session for the specified thread.
Args:
thread_id: Unique identifier for the thread
"""
if thread_id not in self._async_sessions:
logger.warning(f"No async browser session found for thread {thread_id}")
return
browser_client, browser = self._async_sessions[thread_id]
# Close browser
if browser:
try:
await browser.close()
except Exception as e:
logger.warning(
f"Error closing async browser for thread {thread_id}: {e}"
)
# Stop browser client
if browser_client:
try:
browser_client.stop()
except Exception as e:
logger.warning(
f"Error stopping browser client for thread {thread_id}: {e}"
)
# Remove session from dictionary
del self._async_sessions[thread_id]
logger.info(f"Async browser session cleaned up for thread {thread_id}")
def close_sync_browser(self, thread_id: str) -> None:
"""
Close the sync browser session for the specified thread.
Args:
thread_id: Unique identifier for the thread
"""
if thread_id not in self._sync_sessions:
logger.warning(f"No sync browser session found for thread {thread_id}")
return
browser_client, browser = self._sync_sessions[thread_id]
# Close browser
if browser:
try:
browser.close()
except Exception as e:
logger.warning(
f"Error closing sync browser for thread {thread_id}: {e}"
)
# Stop browser client
if browser_client:
try:
browser_client.stop()
except Exception as e:
logger.warning(
f"Error stopping browser client for thread {thread_id}: {e}"
)
# Remove session from dictionary
del self._sync_sessions[thread_id]
logger.info(f"Sync browser session cleaned up for thread {thread_id}")
async def close_all_browsers(self) -> None:
"""Close all browser sessions."""
# Close all async browsers
async_thread_ids = list(self._async_sessions.keys())
for thread_id in async_thread_ids:
await self.close_async_browser(thread_id)
# Close all sync browsers
sync_thread_ids = list(self._sync_sessions.keys())
for thread_id in sync_thread_ids:
self.close_sync_browser(thread_id)
logger.info("All browser sessions closed")

View File

@@ -0,0 +1,587 @@
"""Toolkit for navigating web with AWS browser."""
import json
import logging
import asyncio
from typing import Dict, List, Tuple, Any, Type
from urllib.parse import urlparse
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from .browser_session_manager import BrowserSessionManager
from .utils import aget_current_page, get_current_page
logger = logging.getLogger(__name__)
# Input schemas
class NavigateToolInput(BaseModel):
"""Input for NavigateTool."""
url: str = Field(description="URL to navigate to")
thread_id: str = Field(default="default", description="Thread ID for the browser session")
class ClickToolInput(BaseModel):
"""Input for ClickTool."""
selector: str = Field(description="CSS selector for the element to click on")
thread_id: str = Field(default="default", description="Thread ID for the browser session")
class GetElementsToolInput(BaseModel):
"""Input for GetElementsTool."""
selector: str = Field(description="CSS selector for elements to get")
thread_id: str = Field(default="default", description="Thread ID for the browser session")
class ExtractTextToolInput(BaseModel):
"""Input for ExtractTextTool."""
thread_id: str = Field(default="default", description="Thread ID for the browser session")
class ExtractHyperlinksToolInput(BaseModel):
"""Input for ExtractHyperlinksTool."""
thread_id: str = Field(default="default", description="Thread ID for the browser session")
class NavigateBackToolInput(BaseModel):
"""Input for NavigateBackTool."""
thread_id: str = Field(default="default", description="Thread ID for the browser session")
class CurrentWebPageToolInput(BaseModel):
"""Input for CurrentWebPageTool."""
thread_id: str = Field(default="default", description="Thread ID for the browser session")
# Base tool class
class BrowserBaseTool(BaseTool):
"""Base class for browser tools."""
def __init__(self, session_manager: BrowserSessionManager):
"""Initialize with a session manager."""
super().__init__()
self._session_manager = session_manager
if self._is_in_asyncio_loop() and hasattr(self, '_arun'):
self._original_run = self._run
# Override _run to use _arun when in an asyncio loop
def patched_run(*args, **kwargs):
try:
import nest_asyncio
loop = asyncio.get_event_loop()
nest_asyncio.apply(loop)
return asyncio.get_event_loop().run_until_complete(
self._arun(*args, **kwargs)
)
except Exception as e:
return f"Error in patched _run: {str(e)}"
self._run = patched_run
async def get_async_page(self, thread_id: str) -> Any:
"""Get or create a page for the specified thread."""
browser = await self._session_manager.get_async_browser(thread_id)
page = await aget_current_page(browser)
return page
def get_sync_page(self, thread_id: str) -> Any:
"""Get or create a page for the specified thread."""
browser = self._session_manager.get_sync_browser(thread_id)
page = get_current_page(browser)
return page
def _is_in_asyncio_loop(self) -> bool:
"""Check if we're currently in an asyncio event loop."""
try:
loop = asyncio.get_event_loop()
return loop.is_running()
except RuntimeError:
return False
# Tool classes
class NavigateTool(BrowserBaseTool):
"""Tool for navigating a browser to a URL."""
name: str = "navigate_browser"
description: str = "Navigate a browser to the specified URL"
args_schema: Type[BaseModel] = NavigateToolInput
def _run(self, url: str, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get page for this thread
page = self.get_sync_page(thread_id)
# Validate URL scheme
parsed_url = urlparse(url)
if parsed_url.scheme not in ("http", "https"):
raise ValueError("URL scheme must be 'http' or 'https'")
# Navigate to URL
response = page.goto(url)
status = response.status if response else "unknown"
return f"Navigating to {url} returned status code {status}"
except Exception as e:
return f"Error navigating to {url}: {str(e)}"
async def _arun(self, url: str, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get page for this thread
page = await self.get_async_page(thread_id)
# Validate URL scheme
parsed_url = urlparse(url)
if parsed_url.scheme not in ("http", "https"):
raise ValueError("URL scheme must be 'http' or 'https'")
# Navigate to URL
response = await page.goto(url)
status = response.status if response else "unknown"
return f"Navigating to {url} returned status code {status}"
except Exception as e:
return f"Error navigating to {url}: {str(e)}"
class ClickTool(BrowserBaseTool):
"""Tool for clicking on an element with the given CSS selector."""
name: str = "click_element"
description: str = "Click on an element with the given CSS selector"
args_schema: Type[BaseModel] = ClickToolInput
visible_only: bool = True
"""Whether to consider only visible elements."""
playwright_strict: bool = False
"""Whether to employ Playwright's strict mode when clicking on elements."""
playwright_timeout: float = 1_000
"""Timeout (in ms) for Playwright to wait for element to be ready."""
def _selector_effective(self, selector: str) -> str:
if not self.visible_only:
return selector
return f"{selector} >> visible=1"
def _run(self, selector: str, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get the current page
page = self.get_sync_page(thread_id)
# Click on the element
selector_effective = self._selector_effective(selector=selector)
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
try:
page.click(
selector_effective,
strict=self.playwright_strict,
timeout=self.playwright_timeout,
)
except PlaywrightTimeoutError:
return f"Unable to click on element '{selector}'"
except Exception as click_error:
return f"Unable to click on element '{selector}': {str(click_error)}"
return f"Clicked element '{selector}'"
except Exception as e:
return f"Error clicking on element: {str(e)}"
async def _arun(self, selector: str, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get the current page
page = await self.get_async_page(thread_id)
# Click on the element
selector_effective = self._selector_effective(selector=selector)
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
try:
await page.click(
selector_effective,
strict=self.playwright_strict,
timeout=self.playwright_timeout,
)
except PlaywrightTimeoutError:
return f"Unable to click on element '{selector}'"
except Exception as click_error:
return f"Unable to click on element '{selector}': {str(click_error)}"
return f"Clicked element '{selector}'"
except Exception as e:
return f"Error clicking on element: {str(e)}"
class NavigateBackTool(BrowserBaseTool):
"""Tool for navigating back in browser history."""
name: str = "navigate_back"
description: str = "Navigate back to the previous page"
args_schema: Type[BaseModel] = NavigateBackToolInput
def _run(self, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get the current page
page = self.get_sync_page(thread_id)
# Navigate back
try:
page.go_back()
return "Navigated back to the previous page"
except Exception as nav_error:
return f"Unable to navigate back: {str(nav_error)}"
except Exception as e:
return f"Error navigating back: {str(e)}"
async def _arun(self, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get the current page
page = await self.get_async_page(thread_id)
# Navigate back
try:
await page.go_back()
return "Navigated back to the previous page"
except Exception as nav_error:
return f"Unable to navigate back: {str(nav_error)}"
except Exception as e:
return f"Error navigating back: {str(e)}"
class ExtractTextTool(BrowserBaseTool):
"""Tool for extracting text from a webpage."""
name: str = "extract_text"
description: str = "Extract all the text on the current webpage"
args_schema: Type[BaseModel] = ExtractTextToolInput
def _run(self, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Import BeautifulSoup
try:
from bs4 import BeautifulSoup
except ImportError:
return (
"The 'beautifulsoup4' package is required to use this tool."
" Please install it with 'pip install beautifulsoup4'."
)
# Get the current page
page = self.get_sync_page(thread_id)
# Extract text
content = page.content()
soup = BeautifulSoup(content, "html.parser")
return soup.get_text(separator="\n").strip()
except Exception as e:
return f"Error extracting text: {str(e)}"
async def _arun(self, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Import BeautifulSoup
try:
from bs4 import BeautifulSoup
except ImportError:
return (
"The 'beautifulsoup4' package is required to use this tool."
" Please install it with 'pip install beautifulsoup4'."
)
# Get the current page
page = await self.get_async_page(thread_id)
# Extract text
content = await page.content()
soup = BeautifulSoup(content, "html.parser")
return soup.get_text(separator="\n").strip()
except Exception as e:
return f"Error extracting text: {str(e)}"
class ExtractHyperlinksTool(BrowserBaseTool):
"""Tool for extracting hyperlinks from a webpage."""
name: str = "extract_hyperlinks"
description: str = "Extract all hyperlinks on the current webpage"
args_schema: Type[BaseModel] = ExtractHyperlinksToolInput
def _run(self, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Import BeautifulSoup
try:
from bs4 import BeautifulSoup
except ImportError:
return (
"The 'beautifulsoup4' package is required to use this tool."
" Please install it with 'pip install beautifulsoup4'."
)
# Get the current page
page = self.get_sync_page(thread_id)
# Extract hyperlinks
content = page.content()
soup = BeautifulSoup(content, "html.parser")
links = []
for link in soup.find_all("a", href=True):
text = link.get_text().strip()
href = link["href"]
if href.startswith("http") or href.startswith("https"):
links.append({"text": text, "url": href})
if not links:
return "No hyperlinks found on the current page."
return json.dumps(links, indent=2)
except Exception as e:
return f"Error extracting hyperlinks: {str(e)}"
async def _arun(self, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Import BeautifulSoup
try:
from bs4 import BeautifulSoup
except ImportError:
return (
"The 'beautifulsoup4' package is required to use this tool."
" Please install it with 'pip install beautifulsoup4'."
)
# Get the current page
page = await self.get_async_page(thread_id)
# Extract hyperlinks
content = await page.content()
soup = BeautifulSoup(content, "html.parser")
links = []
for link in soup.find_all("a", href=True):
text = link.get_text().strip()
href = link["href"]
if href.startswith("http") or href.startswith("https"):
links.append({"text": text, "url": href})
if not links:
return "No hyperlinks found on the current page."
return json.dumps(links, indent=2)
except Exception as e:
return f"Error extracting hyperlinks: {str(e)}"
class GetElementsTool(BrowserBaseTool):
"""Tool for getting elements from a webpage."""
name: str = "get_elements"
description: str = "Get elements from the webpage using a CSS selector"
args_schema: Type[BaseModel] = GetElementsToolInput
def _run(self, selector: str, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get the current page
page = self.get_sync_page(thread_id)
# Get elements
elements = page.query_selector_all(selector)
if not elements:
return f"No elements found with selector '{selector}'"
elements_text = []
for i, element in enumerate(elements):
text = element.text_content()
elements_text.append(f"Element {i+1}: {text.strip()}")
return "\n".join(elements_text)
except Exception as e:
return f"Error getting elements: {str(e)}"
async def _arun(self, selector: str, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get the current page
page = await self.get_async_page(thread_id)
# Get elements
elements = await page.query_selector_all(selector)
if not elements:
return f"No elements found with selector '{selector}'"
elements_text = []
for i, element in enumerate(elements):
text = await element.text_content()
elements_text.append(f"Element {i+1}: {text.strip()}")
return "\n".join(elements_text)
except Exception as e:
return f"Error getting elements: {str(e)}"
class CurrentWebPageTool(BrowserBaseTool):
"""Tool for getting information about the current webpage."""
name: str = "current_webpage"
description: str = "Get information about the current webpage"
args_schema: Type[BaseModel] = CurrentWebPageToolInput
def _run(self, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get the current page
page = self.get_sync_page(thread_id)
# Get information
url = page.url
title = page.title()
return f"URL: {url}\nTitle: {title}"
except Exception as e:
return f"Error getting current webpage info: {str(e)}"
async def _arun(self, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get the current page
page = await self.get_async_page(thread_id)
# Get information
url = page.url
title = await page.title()
return f"URL: {url}\nTitle: {title}"
except Exception as e:
return f"Error getting current webpage info: {str(e)}"
class BrowserToolkit:
"""Toolkit for navigating web with AWS Bedrock browser.
This toolkit provides a set of tools for working with a remote browser
and supports multiple threads by maintaining separate browser sessions
for each thread ID. Browsers are created lazily only when needed.
Example:
```python
from crewai import Agent, Task, Crew
from crewai_tools.aws.bedrock.browser import create_browser_toolkit
# Create the browser toolkit
toolkit, browser_tools = create_browser_toolkit(region="us-west-2")
# Create a CrewAI agent that uses the browser tools
research_agent = Agent(
role="Web Researcher",
goal="Research and summarize web content",
backstory="You're an expert at finding information online.",
tools=browser_tools
)
# Create a task for the agent
research_task = Task(
description="Navigate to https://example.com and extract all text content. Summarize the main points.",
agent=research_agent
)
# Create and run the crew
crew = Crew(
agents=[research_agent],
tasks=[research_task]
)
result = crew.kickoff()
# Clean up browser resources when done
import asyncio
asyncio.run(toolkit.cleanup())
```
"""
def __init__(self, region: str = "us-west-2"):
"""
Initialize the toolkit
Args:
region: AWS region for the browser client
"""
self.region = region
self.session_manager = BrowserSessionManager(region=region)
self.tools: List[BaseTool] = []
self._nest_current_loop()
self._setup_tools()
def _nest_current_loop(self):
"""Apply nest_asyncio if we're in an asyncio loop."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
try:
import nest_asyncio
nest_asyncio.apply(loop)
except Exception as e:
logger.warning(f"Failed to apply nest_asyncio: {str(e)}")
except RuntimeError:
pass
def _setup_tools(self) -> None:
"""Initialize tools without creating any browsers."""
self.tools = [
NavigateTool(session_manager=self.session_manager),
ClickTool(session_manager=self.session_manager),
NavigateBackTool(session_manager=self.session_manager),
ExtractTextTool(session_manager=self.session_manager),
ExtractHyperlinksTool(session_manager=self.session_manager),
GetElementsTool(session_manager=self.session_manager),
CurrentWebPageTool(session_manager=self.session_manager)
]
def get_tools(self) -> List[BaseTool]:
"""
Get the list of browser tools
Returns:
List of CrewAI tools
"""
return self.tools
def get_tools_by_name(self) -> Dict[str, BaseTool]:
"""
Get a dictionary of tools mapped by their names
Returns:
Dictionary of {tool_name: tool}
"""
return {tool.name: tool for tool in self.tools}
async def cleanup(self) -> None:
"""Clean up all browser sessions asynchronously"""
await self.session_manager.close_all_browsers()
logger.info("All browser sessions cleaned up")
def sync_cleanup(self) -> None:
"""Clean up all browser sessions from synchronous code"""
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.cleanup())
else:
loop.run_until_complete(self.cleanup())
except RuntimeError:
asyncio.run(self.cleanup())
def create_browser_toolkit(
region: str = "us-west-2",
) -> Tuple[BrowserToolkit, List[BaseTool]]:
"""
Create a BrowserToolkit
Args:
region: AWS region for browser client
Returns:
Tuple of (toolkit, tools)
"""
toolkit = BrowserToolkit(region=region)
tools = toolkit.get_tools()
return toolkit, tools

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Union
if TYPE_CHECKING:
from playwright.async_api import Browser as AsyncBrowser
from playwright.async_api import Page as AsyncPage
from playwright.sync_api import Browser as SyncBrowser
from playwright.sync_api import Page as SyncPage
async def aget_current_page(browser: Union[AsyncBrowser, Any]) -> AsyncPage:
"""
Asynchronously get the current page of the browser.
Args:
browser: The browser (AsyncBrowser) to get the current page from.
Returns:
AsyncPage: The current page.
"""
if not browser.contexts:
context = await browser.new_context()
return await context.new_page()
context = browser.contexts[0]
if not context.pages:
return await context.new_page()
return context.pages[-1]
def get_current_page(browser: Union[SyncBrowser, Any]) -> SyncPage:
"""
Get the current page of the browser.
Args:
browser: The browser to get the current page from.
Returns:
SyncPage: The current page.
"""
if not browser.contexts:
context = browser.new_context()
return context.new_page()
context = browser.contexts[0]
if not context.pages:
return context.new_page()
return context.pages[-1]

View File

@@ -0,0 +1,217 @@
# AWS Bedrock Code Interpreter Tools
This toolkit provides a set of tools for interacting with the AWS Bedrock Code Interpreter environment. It enables your CrewAI agents to execute code, run shell commands, manage files, and perform computational tasks in a secure, isolated environment.
## Features
- Execute code in various languages (primarily Python)
- Run shell commands in the environment
- Read, write, list, and delete files
- Manage long-running tasks asynchronously
- Multiple code interpreter sessions with thread-based isolation
## Installation
Ensure you have the necessary dependencies:
```bash
uv add crewai-tools bedrock-agentcore
```
## Usage
### Basic Usage
```python
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws import create_code_interpreter_toolkit
# Create the code interpreter toolkit
toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2")
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create a CrewAI agent that uses the code interpreter tools
developer_agent = Agent(
role="Python Developer",
goal="Create and execute Python code to solve problems.",
backstory="You're a skilled Python developer with expertise in data analysis.",
tools=code_tools,
llm=llm
)
# Create a task for the agent
coding_task = Task(
description="Write a Python function that calculates the factorial of a number and test it. Do not use any imports from outside the Python standard library.",
expected_output="The Python function created, and the test results.",
agent=developer_agent
)
# Create and run the crew
crew = Crew(
agents=[developer_agent],
tasks=[coding_task]
)
result = crew.kickoff()
print(f"\n***Final result:***\n\n{result}")
# Clean up resources when done
import asyncio
asyncio.run(toolkit.cleanup())
```
### Available Tools
The toolkit provides the following tools:
1. `execute_code` - Run code in various languages (primarily Python)
2. `execute_command` - Run shell commands in the environment
3. `read_files` - Read content of files in the environment
4. `list_files` - List files in directories
5. `delete_files` - Remove files from the environment
6. `write_files` - Create or update files
7. `start_command_execution` - Start long-running commands asynchronously
8. `get_task` - Check status of async tasks
9. `stop_task` - Stop running tasks
### Advanced Usage
```python
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws import create_code_interpreter_toolkit
# Create the code interpreter toolkit
toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2")
tools_by_name = toolkit.get_tools_by_name()
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create agents with specific tools
code_agent = Agent(
role="Code Developer",
goal="Write and execute code",
backstory="You write and test code to solve complex problems.",
tools=[
# Use specific tools by name
tools_by_name["execute_code"],
tools_by_name["execute_command"],
tools_by_name["read_files"],
tools_by_name["write_files"]
],
llm=llm
)
file_agent = Agent(
role="File Manager",
goal="Manage files in the environment",
backstory="You help organize and manage files in the code environment.",
tools=[
# Use specific tools by name
tools_by_name["list_files"],
tools_by_name["read_files"],
tools_by_name["write_files"],
tools_by_name["delete_files"]
],
llm=llm
)
# Create tasks for the agents
coding_task = Task(
description="Write a Python script to analyze data from a CSV file. Do not use any imports from outside the Python standard library.",
expected_output="The Python function created.",
agent=code_agent
)
file_task = Task(
description="Organize the created files into separate directories.",
agent=file_agent
)
# Create and run the crew
crew = Crew(
agents=[code_agent, file_agent],
tasks=[coding_task, file_task]
)
result = crew.kickoff()
print(f"\n***Final result:***\n\n{result}")
# Clean up code interpreter resources when done
import asyncio
asyncio.run(toolkit.cleanup())
```
### Example: Data Analysis with Python
```python
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws import create_code_interpreter_toolkit
# Create toolkit and tools
toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2")
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create a data analyst agent
analyst_agent = Agent(
role="Data Analyst",
goal="Analyze data using Python",
backstory="You're an expert data analyst who uses Python for data processing.",
tools=code_tools,
llm=llm
)
# Create a task for the agent
analysis_task = Task(
description="""
For all of the below, do not use any imports from outside the Python standard library.
1. Create a sample dataset with random data
2. Perform statistical analysis on the dataset
3. Generate visualizations of the results
4. Save the results and visualizations to files
""",
agent=analyst_agent
)
# Create and run the crew
crew = Crew(
agents=[analyst_agent],
tasks=[analysis_task]
)
result = crew.kickoff()
print(f"\n***Final result:***\n\n{result}")
# Clean up resources
import asyncio
asyncio.run(toolkit.cleanup())
```
## Resource Cleanup
Always clean up code interpreter resources when done to prevent resource leaks:
```python
import asyncio
# Clean up all code interpreter sessions
asyncio.run(toolkit.cleanup())
```
## Requirements
- AWS account with access to Bedrock AgentCore API
- Properly configured AWS credentials

View File

@@ -0,0 +1,3 @@
from .code_interpreter_toolkit import CodeInterpreterToolkit, create_code_interpreter_toolkit
__all__ = ["CodeInterpreterToolkit", "create_code_interpreter_toolkit"]

View File

@@ -0,0 +1,543 @@
"""Toolkit for working with AWS Bedrock Code Interpreter."""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple, Optional, Type, Any
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
if TYPE_CHECKING:
from bedrock_agentcore.tools.code_interpreter_client import CodeInterpreter
logger = logging.getLogger(__name__)
def extract_output_from_stream(response):
"""
Extract output from code interpreter response stream
Args:
response: Response from code interpreter execution
Returns:
Extracted output as string
"""
output = []
for event in response["stream"]:
if "result" in event:
result = event["result"]
for content_item in result["content"]:
if content_item["type"] == "text":
output.append(content_item["text"])
if content_item["type"] == "resource":
resource = content_item["resource"]
if "text" in resource:
file_path = resource["uri"].replace("file://", "")
file_content = resource["text"]
output.append(f"==== File: {file_path} ====\n{file_content}\n")
else:
output.append(json.dumps(resource))
return "\n".join(output)
# Input schemas
class ExecuteCodeInput(BaseModel):
"""Input for ExecuteCode."""
code: str = Field(description="The code to execute")
language: str = Field(default="python", description="The programming language of the code")
clear_context: bool = Field(default=False, description="Whether to clear execution context")
thread_id: str = Field(default="default", description="Thread ID for the code interpreter session")
class ExecuteCommandInput(BaseModel):
"""Input for ExecuteCommand."""
command: str = Field(description="The command to execute")
thread_id: str = Field(default="default", description="Thread ID for the code interpreter session")
class ReadFilesInput(BaseModel):
"""Input for ReadFiles."""
paths: List[str] = Field(description="List of file paths to read")
thread_id: str = Field(default="default", description="Thread ID for the code interpreter session")
class ListFilesInput(BaseModel):
"""Input for ListFiles."""
directory_path: str = Field(default="", description="Path to the directory to list")
thread_id: str = Field(default="default", description="Thread ID for the code interpreter session")
class DeleteFilesInput(BaseModel):
"""Input for DeleteFiles."""
paths: List[str] = Field(description="List of file paths to delete")
thread_id: str = Field(default="default", description="Thread ID for the code interpreter session")
class WriteFilesInput(BaseModel):
"""Input for WriteFiles."""
files: List[Dict[str, str]] = Field(description="List of dictionaries with path and text fields")
thread_id: str = Field(default="default", description="Thread ID for the code interpreter session")
class StartCommandInput(BaseModel):
"""Input for StartCommand."""
command: str = Field(description="The command to execute asynchronously")
thread_id: str = Field(default="default", description="Thread ID for the code interpreter session")
class GetTaskInput(BaseModel):
"""Input for GetTask."""
task_id: str = Field(description="The ID of the task to check")
thread_id: str = Field(default="default", description="Thread ID for the code interpreter session")
class StopTaskInput(BaseModel):
"""Input for StopTask."""
task_id: str = Field(description="The ID of the task to stop")
thread_id: str = Field(default="default", description="Thread ID for the code interpreter session")
# Tool classes
class ExecuteCodeTool(BaseTool):
"""Tool for executing code in various languages."""
name: str = "execute_code"
description: str = "Execute code in various languages (primarily Python)"
args_schema: Type[BaseModel] = ExecuteCodeInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, code: str, language: str = "python", clear_context: bool = False, thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id)
# Execute code
response = code_interpreter.invoke(
method="executeCode",
params={"code": code, "language": language, "clearContext": clear_context},
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error executing code: {str(e)}"
async def _arun(self, code: str, language: str = "python", clear_context: bool = False, thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(code=code, language=language, clear_context=clear_context, thread_id=thread_id)
class ExecuteCommandTool(BaseTool):
"""Tool for running shell commands in the code interpreter environment."""
name: str = "execute_command"
description: str = "Run shell commands in the code interpreter environment"
args_schema: Type[BaseModel] = ExecuteCommandInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, command: str, thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id)
# Execute command
response = code_interpreter.invoke(
method="executeCommand", params={"command": command}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error executing command: {str(e)}"
async def _arun(self, command: str, thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(command=command, thread_id=thread_id)
class ReadFilesTool(BaseTool):
"""Tool for reading content of files in the environment."""
name: str = "read_files"
description: str = "Read content of files in the environment"
args_schema: Type[BaseModel] = ReadFilesInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, paths: List[str], thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id)
# Read files
response = code_interpreter.invoke(method="readFiles", params={"paths": paths})
return extract_output_from_stream(response)
except Exception as e:
return f"Error reading files: {str(e)}"
async def _arun(self, paths: List[str], thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(paths=paths, thread_id=thread_id)
class ListFilesTool(BaseTool):
"""Tool for listing files in directories in the environment."""
name: str = "list_files"
description: str = "List files in directories in the environment"
args_schema: Type[BaseModel] = ListFilesInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, directory_path: str = "", thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id)
# List files
response = code_interpreter.invoke(
method="listFiles", params={"directoryPath": directory_path}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error listing files: {str(e)}"
async def _arun(self, directory_path: str = "", thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(directory_path=directory_path, thread_id=thread_id)
class DeleteFilesTool(BaseTool):
"""Tool for removing files from the environment."""
name: str = "delete_files"
description: str = "Remove files from the environment"
args_schema: Type[BaseModel] = DeleteFilesInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, paths: List[str], thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id)
# Remove files
response = code_interpreter.invoke(
method="removeFiles", params={"paths": paths}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error deleting files: {str(e)}"
async def _arun(self, paths: List[str], thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(paths=paths, thread_id=thread_id)
class WriteFilesTool(BaseTool):
"""Tool for creating or updating files in the environment."""
name: str = "write_files"
description: str = "Create or update files in the environment"
args_schema: Type[BaseModel] = WriteFilesInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, files: List[Dict[str, str]], thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id)
# Write files
response = code_interpreter.invoke(
method="writeFiles", params={"content": files}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error writing files: {str(e)}"
async def _arun(self, files: List[Dict[str, str]], thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(files=files, thread_id=thread_id)
class StartCommandTool(BaseTool):
"""Tool for starting long-running commands asynchronously."""
name: str = "start_command_execution"
description: str = "Start long-running commands asynchronously"
args_schema: Type[BaseModel] = StartCommandInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, command: str, thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id)
# Start command execution
response = code_interpreter.invoke(
method="startCommandExecution", params={"command": command}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error starting command: {str(e)}"
async def _arun(self, command: str, thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(command=command, thread_id=thread_id)
class GetTaskTool(BaseTool):
"""Tool for checking status of async tasks."""
name: str = "get_task"
description: str = "Check status of async tasks"
args_schema: Type[BaseModel] = GetTaskInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, task_id: str, thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id)
# Get task status
response = code_interpreter.invoke(method="getTask", params={"taskId": task_id})
return extract_output_from_stream(response)
except Exception as e:
return f"Error getting task status: {str(e)}"
async def _arun(self, task_id: str, thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(task_id=task_id, thread_id=thread_id)
class StopTaskTool(BaseTool):
"""Tool for stopping running tasks."""
name: str = "stop_task"
description: str = "Stop running tasks"
args_schema: Type[BaseModel] = StopTaskInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, task_id: str, thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id)
# Stop task
response = code_interpreter.invoke(
method="stopTask", params={"taskId": task_id}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error stopping task: {str(e)}"
async def _arun(self, task_id: str, thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(task_id=task_id, thread_id=thread_id)
class CodeInterpreterToolkit:
"""Toolkit for working with AWS Bedrock code interpreter environment.
This toolkit provides a set of tools for working with a remote code interpreter environment:
* execute_code - Run code in various languages (primarily Python)
* execute_command - Run shell commands
* read_files - Read content of files in the environment
* list_files - List files in directories
* delete_files - Remove files from the environment
* write_files - Create or update files
* start_command_execution - Start long-running commands asynchronously
* get_task - Check status of async tasks
* stop_task - Stop running tasks
The toolkit lazily initializes the code interpreter session on first use.
It supports multiple threads by maintaining separate code interpreter sessions for each thread ID.
Example:
```python
from crewai import Agent, Task, Crew
from crewai_tools.aws.bedrock.code_interpreter import create_code_interpreter_toolkit
# Create the code interpreter toolkit
toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2")
# Create a CrewAI agent that uses the code interpreter tools
developer_agent = Agent(
role="Python Developer",
goal="Create and execute Python code to solve problems",
backstory="You're a skilled Python developer with expertise in data analysis.",
tools=code_tools
)
# Create a task for the agent
coding_task = Task(
description="Write a Python function that calculates the factorial of a number and test it.",
agent=developer_agent
)
# Create and run the crew
crew = Crew(
agents=[developer_agent],
tasks=[coding_task]
)
result = crew.kickoff()
# Clean up resources when done
import asyncio
asyncio.run(toolkit.cleanup())
```
"""
def __init__(self, region: str = "us-west-2"):
"""
Initialize the toolkit
Args:
region: AWS region for the code interpreter
"""
self.region = region
self._code_interpreters: Dict[str, CodeInterpreter] = {}
self.tools: List[BaseTool] = []
self._setup_tools()
def _setup_tools(self) -> None:
"""Initialize tools without creating any code interpreter sessions."""
self.tools = [
ExecuteCodeTool(self),
ExecuteCommandTool(self),
ReadFilesTool(self),
ListFilesTool(self),
DeleteFilesTool(self),
WriteFilesTool(self),
StartCommandTool(self),
GetTaskTool(self),
StopTaskTool(self)
]
def _get_or_create_interpreter(
self, thread_id: str = "default"
) -> CodeInterpreter:
"""Get or create a code interpreter for the specified thread.
Args:
thread_id: Thread ID for the code interpreter session
Returns:
CodeInterpreter instance
"""
if thread_id in self._code_interpreters:
return self._code_interpreters[thread_id]
# Create a new code interpreter for this thread
from bedrock_agentcore.tools.code_interpreter_client import CodeInterpreter
code_interpreter = CodeInterpreter(region=self.region)
code_interpreter.start()
logger.info(
f"Started code interpreter with session_id:{code_interpreter.session_id} for thread:{thread_id}"
)
# Store the interpreter
self._code_interpreters[thread_id] = code_interpreter
return code_interpreter
def get_tools(self) -> List[BaseTool]:
"""
Get the list of code interpreter tools
Returns:
List of CrewAI tools
"""
return self.tools
def get_tools_by_name(self) -> Dict[str, BaseTool]:
"""
Get a dictionary of tools mapped by their names
Returns:
Dictionary of {tool_name: tool}
"""
return {tool.name: tool for tool in self.tools}
async def cleanup(self, thread_id: Optional[str] = None) -> None:
"""Clean up resources
Args:
thread_id: Optional thread ID to clean up. If None, cleans up all sessions.
"""
if thread_id:
# Clean up a specific thread's session
if thread_id in self._code_interpreters:
try:
self._code_interpreters[thread_id].stop()
del self._code_interpreters[thread_id]
logger.info(
f"Code interpreter session for thread {thread_id} cleaned up"
)
except Exception as e:
logger.warning(
f"Error stopping code interpreter for thread {thread_id}: {e}"
)
else:
# Clean up all sessions
thread_ids = list(self._code_interpreters.keys())
for tid in thread_ids:
try:
self._code_interpreters[tid].stop()
except Exception as e:
logger.warning(
f"Error stopping code interpreter for thread {tid}: {e}"
)
self._code_interpreters = {}
logger.info("All code interpreter sessions cleaned up")
def create_code_interpreter_toolkit(
region: str = "us-west-2",
) -> Tuple[CodeInterpreterToolkit, List[BaseTool]]:
"""
Create a CodeInterpreterToolkit
Args:
region: AWS region for code interpreter
Returns:
Tuple of (toolkit, tools)
"""
toolkit = CodeInterpreterToolkit(region=region)
tools = toolkit.get_tools()
return toolkit, tools