diff --git a/src/crewai_tools/aws/__init__.py b/src/crewai_tools/aws/__init__.py index dd01fd8fe..b2d279078 100644 --- a/src/crewai_tools/aws/__init__.py +++ b/src/crewai_tools/aws/__init__.py @@ -1,9 +1,16 @@ from .s3 import S3ReaderTool, S3WriterTool -from .bedrock import BedrockKBRetrieverTool, BedrockInvokeAgentTool +from .bedrock import ( + BedrockKBRetrieverTool, + BedrockInvokeAgentTool, + create_browser_toolkit, + create_code_interpreter_toolkit, +) __all__ = [ - 'S3ReaderTool', - 'S3WriterTool', - 'BedrockKBRetrieverTool', - 'BedrockInvokeAgentTool' -] \ No newline at end of file + "S3ReaderTool", + "S3WriterTool", + "BedrockKBRetrieverTool", + "BedrockInvokeAgentTool", + "create_browser_toolkit", + "create_code_interpreter_toolkit" +] diff --git a/src/crewai_tools/aws/bedrock/__init__.py b/src/crewai_tools/aws/bedrock/__init__.py index ded472062..58fc5bca9 100644 --- a/src/crewai_tools/aws/bedrock/__init__.py +++ b/src/crewai_tools/aws/bedrock/__init__.py @@ -1,4 +1,11 @@ from .knowledge_base.retriever_tool import BedrockKBRetrieverTool from .agents.invoke_agent_tool import BedrockInvokeAgentTool +from .browser import create_browser_toolkit +from .code_interpreter import create_code_interpreter_toolkit -__all__ = ["BedrockKBRetrieverTool", "BedrockInvokeAgentTool"] +__all__ = [ + "BedrockKBRetrieverTool", + "BedrockInvokeAgentTool", + "create_browser_toolkit", + "create_code_interpreter_toolkit" +] diff --git a/src/crewai_tools/aws/bedrock/browser/README.md b/src/crewai_tools/aws/bedrock/browser/README.md new file mode 100644 index 000000000..7f0188bbb --- /dev/null +++ b/src/crewai_tools/aws/bedrock/browser/README.md @@ -0,0 +1,158 @@ +# AWS Bedrock Browser Tools + +This toolkit provides a set of tools for interacting with web browsers through AWS Bedrock Browser. It enables your CrewAI agents to navigate websites, extract content, click elements, and more. + +## Features + +- Navigate to URLs and browse the web +- Extract text and hyperlinks from pages +- Click on elements using CSS selectors +- Navigate back through browser history +- Get information about the current webpage +- Multiple browser sessions with thread-based isolation + +## Installation + +Ensure you have the necessary dependencies: + +```bash +uv add crewai-tools bedrock-agentcore beautifulsoup4 playwright nest-asyncio +``` + +## Usage + +### Basic Usage + +```python +from crewai import Agent, Task, Crew, LLM +from crewai_tools.aws.bedrock.browser import create_browser_toolkit + +# Create the browser toolkit +toolkit, browser_tools = create_browser_toolkit(region="us-west-2") + +# Create the Bedrock LLM +llm = LLM( + model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0", + region_name="us-west-2", +) + +# Create a CrewAI agent that uses the browser tools +research_agent = Agent( + role="Web Researcher", + goal="Research and summarize web content", + backstory="You're an expert at finding information online.", + tools=browser_tools, + llm=llm +) + +# Create a task for the agent +research_task = Task( + description="Navigate to https://example.com and extract all text content. Summarize the main points.", + expected_output="A list of bullet points containing the most important information on https://example.com. Plus, a description of the tool calls used, and actions performed to get to the page.", + agent=research_agent +) + +# Create and run the crew +crew = Crew( + agents=[research_agent], + tasks=[research_task] +) +result = crew.kickoff() + +print(f"\n***Final result:***\n\n{result}") + +# Clean up browser resources when done +toolkit.sync_cleanup() +``` + +### Available Tools + +The toolkit provides the following tools: + +1. `navigate_browser` - Navigate to a URL +2. `click_element` - Click on an element using CSS selectors +3. `extract_text` - Extract all text from the current webpage +4. `extract_hyperlinks` - Extract all hyperlinks from the current webpage +5. `get_elements` - Get elements matching a CSS selector +6. `navigate_back` - Navigate to the previous page +7. `current_webpage` - Get information about the current webpage + +### Advanced Usage (with async) + +```python +import asyncio +from crewai import Agent, Task, Crew, LLM +from crewai_tools.aws.bedrock.browser import create_browser_toolkit + +async def main(): + + # Create the browser toolkit with specific AWS region + toolkit, browser_tools = create_browser_toolkit(region="us-west-2") + tools_by_name = toolkit.get_tools_by_name() + + # Create the Bedrock LLM + llm = LLM( + model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0", + region_name="us-west-2", + ) + + # Create agents with specific tools + navigator_agent = Agent( + role="Navigator", + goal="Find specific information across websites", + backstory="You navigate through websites to locate information.", + tools=[ + tools_by_name["navigate_browser"], + tools_by_name["click_element"], + tools_by_name["navigate_back"] + ], + llm=llm + ) + + content_agent = Agent( + role="Content Extractor", + goal="Extract and analyze webpage content", + backstory="You extract and analyze content from webpages.", + tools=[ + tools_by_name["extract_text"], + tools_by_name["extract_hyperlinks"], + tools_by_name["get_elements"] + ], + llm=llm + ) + + # Create tasks for the agents + navigation_task = Task( + description="Navigate to https://example.com, then click on the the 'More information...' link.", + expected_output="The status of the tool calls for this task.", + agent=navigator_agent, + ) + + extraction_task = Task( + description="Extract all text from the current page and summarize it.", + expected_output="The summary of the page, and a description of the tool calls used, and actions performed to get to the page.", + agent=content_agent, + ) + + # Create and run the crew + crew = Crew( + agents=[navigator_agent, content_agent], + tasks=[navigation_task, extraction_task] + ) + + result = await crew.kickoff_async() + + # Clean up browser resources when done + toolkit.sync_cleanup() + + return result + +if __name__ == "__main__": + result = asyncio.run(main()) + print(f"\n***Final result:***\n\n{result}") +``` + +## Requirements + +- AWS account with access to Bedrock AgentCore API +- Properly configured AWS credentials \ No newline at end of file diff --git a/src/crewai_tools/aws/bedrock/browser/__init__.py b/src/crewai_tools/aws/bedrock/browser/__init__.py new file mode 100644 index 000000000..e82666ebc --- /dev/null +++ b/src/crewai_tools/aws/bedrock/browser/__init__.py @@ -0,0 +1,3 @@ +from .browser_toolkit import BrowserToolkit, create_browser_toolkit + +__all__ = ["BrowserToolkit", "create_browser_toolkit"] \ No newline at end of file diff --git a/src/crewai_tools/aws/bedrock/browser/browser_session_manager.py b/src/crewai_tools/aws/bedrock/browser/browser_session_manager.py new file mode 100644 index 000000000..d4652c320 --- /dev/null +++ b/src/crewai_tools/aws/bedrock/browser/browser_session_manager.py @@ -0,0 +1,260 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Dict, Tuple + +if TYPE_CHECKING: + from playwright.async_api import Browser as AsyncBrowser + from playwright.sync_api import Browser as SyncBrowser + from bedrock_agentcore.tools.browser_client import BrowserClient + +logger = logging.getLogger(__name__) + + +class BrowserSessionManager: + """ + Manages browser sessions for different threads. + + This class maintains separate browser sessions for different threads, + enabling concurrent usage of browsers in multi-threaded environments. + Browsers are created lazily only when needed by tools. + """ + + def __init__(self, region: str = "us-west-2"): + """ + Initialize the browser session manager. + + Args: + region: AWS region for browser client + """ + self.region = region + self._async_sessions: Dict[str, Tuple[BrowserClient, AsyncBrowser]] = {} + self._sync_sessions: Dict[str, Tuple[BrowserClient, SyncBrowser]] = {} + + async def get_async_browser(self, thread_id: str) -> AsyncBrowser: + """ + Get or create an async browser for the specified thread. + + Args: + thread_id: Unique identifier for the thread requesting the browser + + Returns: + An async browser instance specific to the thread + """ + if thread_id in self._async_sessions: + return self._async_sessions[thread_id][1] + + return await self._create_async_browser_session(thread_id) + + def get_sync_browser(self, thread_id: str) -> SyncBrowser: + """ + Get or create a sync browser for the specified thread. + + Args: + thread_id: Unique identifier for the thread requesting the browser + + Returns: + A sync browser instance specific to the thread + """ + if thread_id in self._sync_sessions: + return self._sync_sessions[thread_id][1] + + return self._create_sync_browser_session(thread_id) + + async def _create_async_browser_session(self, thread_id: str) -> AsyncBrowser: + """ + Create a new async browser session for the specified thread. + + Args: + thread_id: Unique identifier for the thread + + Returns: + The newly created async browser instance + + Raises: + Exception: If browser session creation fails + """ + from bedrock_agentcore.tools.browser_client import BrowserClient + browser_client = BrowserClient(region=self.region) + + try: + # Start browser session + browser_client.start() + + # Get WebSocket connection info + ws_url, headers = browser_client.generate_ws_headers() + + logger.info( + f"Connecting to async WebSocket endpoint for thread {thread_id}: {ws_url}" + ) + + from playwright.async_api import async_playwright + + # Connect to browser using Playwright + playwright = await async_playwright().start() + browser = await playwright.chromium.connect_over_cdp( + endpoint_url=ws_url, headers=headers, timeout=30000 + ) + logger.info( + f"Successfully connected to async browser for thread {thread_id}" + ) + + # Store session resources + self._async_sessions[thread_id] = (browser_client, browser) + + return browser + + except Exception as e: + logger.error( + f"Failed to create async browser session for thread {thread_id}: {e}" + ) + + # Clean up resources if session creation fails + if browser_client: + try: + browser_client.stop() + except Exception as cleanup_error: + logger.warning(f"Error cleaning up browser client: {cleanup_error}") + + raise + + def _create_sync_browser_session(self, thread_id: str) -> SyncBrowser: + """ + Create a new sync browser session for the specified thread. + + Args: + thread_id: Unique identifier for the thread + + Returns: + The newly created sync browser instance + + Raises: + Exception: If browser session creation fails + """ + from bedrock_agentcore.tools.browser_client import BrowserClient + browser_client = BrowserClient(region=self.region) + + try: + # Start browser session + browser_client.start() + + # Get WebSocket connection info + ws_url, headers = browser_client.generate_ws_headers() + + logger.info( + f"Connecting to sync WebSocket endpoint for thread {thread_id}: {ws_url}" + ) + + from playwright.sync_api import sync_playwright + + # Connect to browser using Playwright + playwright = sync_playwright().start() + browser = playwright.chromium.connect_over_cdp( + endpoint_url=ws_url, headers=headers, timeout=30000 + ) + logger.info( + f"Successfully connected to sync browser for thread {thread_id}" + ) + + # Store session resources + self._sync_sessions[thread_id] = (browser_client, browser) + + return browser + + except Exception as e: + logger.error( + f"Failed to create sync browser session for thread {thread_id}: {e}" + ) + + # Clean up resources if session creation fails + if browser_client: + try: + browser_client.stop() + except Exception as cleanup_error: + logger.warning(f"Error cleaning up browser client: {cleanup_error}") + + raise + + async def close_async_browser(self, thread_id: str) -> None: + """ + Close the async browser session for the specified thread. + + Args: + thread_id: Unique identifier for the thread + """ + if thread_id not in self._async_sessions: + logger.warning(f"No async browser session found for thread {thread_id}") + return + + browser_client, browser = self._async_sessions[thread_id] + + # Close browser + if browser: + try: + await browser.close() + except Exception as e: + logger.warning( + f"Error closing async browser for thread {thread_id}: {e}" + ) + + # Stop browser client + if browser_client: + try: + browser_client.stop() + except Exception as e: + logger.warning( + f"Error stopping browser client for thread {thread_id}: {e}" + ) + + # Remove session from dictionary + del self._async_sessions[thread_id] + logger.info(f"Async browser session cleaned up for thread {thread_id}") + + def close_sync_browser(self, thread_id: str) -> None: + """ + Close the sync browser session for the specified thread. + + Args: + thread_id: Unique identifier for the thread + """ + if thread_id not in self._sync_sessions: + logger.warning(f"No sync browser session found for thread {thread_id}") + return + + browser_client, browser = self._sync_sessions[thread_id] + + # Close browser + if browser: + try: + browser.close() + except Exception as e: + logger.warning( + f"Error closing sync browser for thread {thread_id}: {e}" + ) + + # Stop browser client + if browser_client: + try: + browser_client.stop() + except Exception as e: + logger.warning( + f"Error stopping browser client for thread {thread_id}: {e}" + ) + + # Remove session from dictionary + del self._sync_sessions[thread_id] + logger.info(f"Sync browser session cleaned up for thread {thread_id}") + + async def close_all_browsers(self) -> None: + """Close all browser sessions.""" + # Close all async browsers + async_thread_ids = list(self._async_sessions.keys()) + for thread_id in async_thread_ids: + await self.close_async_browser(thread_id) + + # Close all sync browsers + sync_thread_ids = list(self._sync_sessions.keys()) + for thread_id in sync_thread_ids: + self.close_sync_browser(thread_id) + + logger.info("All browser sessions closed") \ No newline at end of file diff --git a/src/crewai_tools/aws/bedrock/browser/browser_toolkit.py b/src/crewai_tools/aws/bedrock/browser/browser_toolkit.py new file mode 100644 index 000000000..2939bbb00 --- /dev/null +++ b/src/crewai_tools/aws/bedrock/browser/browser_toolkit.py @@ -0,0 +1,587 @@ +"""Toolkit for navigating web with AWS browser.""" + +import json +import logging +import asyncio +from typing import Dict, List, Tuple, Any, Type +from urllib.parse import urlparse + +from crewai.tools import BaseTool +from pydantic import BaseModel, Field + +from .browser_session_manager import BrowserSessionManager +from .utils import aget_current_page, get_current_page + +logger = logging.getLogger(__name__) + + +# Input schemas +class NavigateToolInput(BaseModel): + """Input for NavigateTool.""" + url: str = Field(description="URL to navigate to") + thread_id: str = Field(default="default", description="Thread ID for the browser session") + + +class ClickToolInput(BaseModel): + """Input for ClickTool.""" + selector: str = Field(description="CSS selector for the element to click on") + thread_id: str = Field(default="default", description="Thread ID for the browser session") + + +class GetElementsToolInput(BaseModel): + """Input for GetElementsTool.""" + selector: str = Field(description="CSS selector for elements to get") + thread_id: str = Field(default="default", description="Thread ID for the browser session") + + +class ExtractTextToolInput(BaseModel): + """Input for ExtractTextTool.""" + thread_id: str = Field(default="default", description="Thread ID for the browser session") + + +class ExtractHyperlinksToolInput(BaseModel): + """Input for ExtractHyperlinksTool.""" + thread_id: str = Field(default="default", description="Thread ID for the browser session") + + +class NavigateBackToolInput(BaseModel): + """Input for NavigateBackTool.""" + thread_id: str = Field(default="default", description="Thread ID for the browser session") + + +class CurrentWebPageToolInput(BaseModel): + """Input for CurrentWebPageTool.""" + thread_id: str = Field(default="default", description="Thread ID for the browser session") + + +# Base tool class +class BrowserBaseTool(BaseTool): + """Base class for browser tools.""" + + def __init__(self, session_manager: BrowserSessionManager): + """Initialize with a session manager.""" + super().__init__() + self._session_manager = session_manager + + if self._is_in_asyncio_loop() and hasattr(self, '_arun'): + self._original_run = self._run + # Override _run to use _arun when in an asyncio loop + def patched_run(*args, **kwargs): + try: + import nest_asyncio + loop = asyncio.get_event_loop() + nest_asyncio.apply(loop) + return asyncio.get_event_loop().run_until_complete( + self._arun(*args, **kwargs) + ) + except Exception as e: + return f"Error in patched _run: {str(e)}" + self._run = patched_run + + async def get_async_page(self, thread_id: str) -> Any: + """Get or create a page for the specified thread.""" + browser = await self._session_manager.get_async_browser(thread_id) + page = await aget_current_page(browser) + return page + + def get_sync_page(self, thread_id: str) -> Any: + """Get or create a page for the specified thread.""" + browser = self._session_manager.get_sync_browser(thread_id) + page = get_current_page(browser) + return page + + def _is_in_asyncio_loop(self) -> bool: + """Check if we're currently in an asyncio event loop.""" + try: + loop = asyncio.get_event_loop() + return loop.is_running() + except RuntimeError: + return False + + +# Tool classes +class NavigateTool(BrowserBaseTool): + """Tool for navigating a browser to a URL.""" + + name: str = "navigate_browser" + description: str = "Navigate a browser to the specified URL" + args_schema: Type[BaseModel] = NavigateToolInput + + def _run(self, url: str, thread_id: str = "default", **kwargs) -> str: + """Use the sync tool.""" + try: + # Get page for this thread + page = self.get_sync_page(thread_id) + + # Validate URL scheme + parsed_url = urlparse(url) + if parsed_url.scheme not in ("http", "https"): + raise ValueError("URL scheme must be 'http' or 'https'") + + # Navigate to URL + response = page.goto(url) + status = response.status if response else "unknown" + return f"Navigating to {url} returned status code {status}" + except Exception as e: + return f"Error navigating to {url}: {str(e)}" + + async def _arun(self, url: str, thread_id: str = "default", **kwargs) -> str: + """Use the async tool.""" + try: + # Get page for this thread + page = await self.get_async_page(thread_id) + + # Validate URL scheme + parsed_url = urlparse(url) + if parsed_url.scheme not in ("http", "https"): + raise ValueError("URL scheme must be 'http' or 'https'") + + # Navigate to URL + response = await page.goto(url) + status = response.status if response else "unknown" + return f"Navigating to {url} returned status code {status}" + except Exception as e: + return f"Error navigating to {url}: {str(e)}" + + +class ClickTool(BrowserBaseTool): + """Tool for clicking on an element with the given CSS selector.""" + + name: str = "click_element" + description: str = "Click on an element with the given CSS selector" + args_schema: Type[BaseModel] = ClickToolInput + + visible_only: bool = True + """Whether to consider only visible elements.""" + playwright_strict: bool = False + """Whether to employ Playwright's strict mode when clicking on elements.""" + playwright_timeout: float = 1_000 + """Timeout (in ms) for Playwright to wait for element to be ready.""" + + def _selector_effective(self, selector: str) -> str: + if not self.visible_only: + return selector + return f"{selector} >> visible=1" + + def _run(self, selector: str, thread_id: str = "default", **kwargs) -> str: + """Use the sync tool.""" + try: + # Get the current page + page = self.get_sync_page(thread_id) + + # Click on the element + selector_effective = self._selector_effective(selector=selector) + from playwright.sync_api import TimeoutError as PlaywrightTimeoutError + + try: + page.click( + selector_effective, + strict=self.playwright_strict, + timeout=self.playwright_timeout, + ) + except PlaywrightTimeoutError: + return f"Unable to click on element '{selector}'" + except Exception as click_error: + return f"Unable to click on element '{selector}': {str(click_error)}" + + return f"Clicked element '{selector}'" + except Exception as e: + return f"Error clicking on element: {str(e)}" + + async def _arun(self, selector: str, thread_id: str = "default", **kwargs) -> str: + """Use the async tool.""" + try: + # Get the current page + page = await self.get_async_page(thread_id) + + # Click on the element + selector_effective = self._selector_effective(selector=selector) + from playwright.async_api import TimeoutError as PlaywrightTimeoutError + + try: + await page.click( + selector_effective, + strict=self.playwright_strict, + timeout=self.playwright_timeout, + ) + except PlaywrightTimeoutError: + return f"Unable to click on element '{selector}'" + except Exception as click_error: + return f"Unable to click on element '{selector}': {str(click_error)}" + + return f"Clicked element '{selector}'" + except Exception as e: + return f"Error clicking on element: {str(e)}" + + +class NavigateBackTool(BrowserBaseTool): + """Tool for navigating back in browser history.""" + name: str = "navigate_back" + description: str = "Navigate back to the previous page" + args_schema: Type[BaseModel] = NavigateBackToolInput + + def _run(self, thread_id: str = "default", **kwargs) -> str: + """Use the sync tool.""" + try: + # Get the current page + page = self.get_sync_page(thread_id) + + # Navigate back + try: + page.go_back() + return "Navigated back to the previous page" + except Exception as nav_error: + return f"Unable to navigate back: {str(nav_error)}" + except Exception as e: + return f"Error navigating back: {str(e)}" + + async def _arun(self, thread_id: str = "default", **kwargs) -> str: + """Use the async tool.""" + try: + # Get the current page + page = await self.get_async_page(thread_id) + + # Navigate back + try: + await page.go_back() + return "Navigated back to the previous page" + except Exception as nav_error: + return f"Unable to navigate back: {str(nav_error)}" + except Exception as e: + return f"Error navigating back: {str(e)}" + + +class ExtractTextTool(BrowserBaseTool): + """Tool for extracting text from a webpage.""" + name: str = "extract_text" + description: str = "Extract all the text on the current webpage" + args_schema: Type[BaseModel] = ExtractTextToolInput + + def _run(self, thread_id: str = "default", **kwargs) -> str: + """Use the sync tool.""" + try: + # Import BeautifulSoup + try: + from bs4 import BeautifulSoup + except ImportError: + return ( + "The 'beautifulsoup4' package is required to use this tool." + " Please install it with 'pip install beautifulsoup4'." + ) + + # Get the current page + page = self.get_sync_page(thread_id) + + # Extract text + content = page.content() + soup = BeautifulSoup(content, "html.parser") + return soup.get_text(separator="\n").strip() + except Exception as e: + return f"Error extracting text: {str(e)}" + + async def _arun(self, thread_id: str = "default", **kwargs) -> str: + """Use the async tool.""" + try: + # Import BeautifulSoup + try: + from bs4 import BeautifulSoup + except ImportError: + return ( + "The 'beautifulsoup4' package is required to use this tool." + " Please install it with 'pip install beautifulsoup4'." + ) + + # Get the current page + page = await self.get_async_page(thread_id) + + # Extract text + content = await page.content() + soup = BeautifulSoup(content, "html.parser") + return soup.get_text(separator="\n").strip() + except Exception as e: + return f"Error extracting text: {str(e)}" + + +class ExtractHyperlinksTool(BrowserBaseTool): + """Tool for extracting hyperlinks from a webpage.""" + name: str = "extract_hyperlinks" + description: str = "Extract all hyperlinks on the current webpage" + args_schema: Type[BaseModel] = ExtractHyperlinksToolInput + + def _run(self, thread_id: str = "default", **kwargs) -> str: + """Use the sync tool.""" + try: + # Import BeautifulSoup + try: + from bs4 import BeautifulSoup + except ImportError: + return ( + "The 'beautifulsoup4' package is required to use this tool." + " Please install it with 'pip install beautifulsoup4'." + ) + + # Get the current page + page = self.get_sync_page(thread_id) + + # Extract hyperlinks + content = page.content() + soup = BeautifulSoup(content, "html.parser") + links = [] + for link in soup.find_all("a", href=True): + text = link.get_text().strip() + href = link["href"] + if href.startswith("http") or href.startswith("https"): + links.append({"text": text, "url": href}) + + if not links: + return "No hyperlinks found on the current page." + + return json.dumps(links, indent=2) + except Exception as e: + return f"Error extracting hyperlinks: {str(e)}" + + async def _arun(self, thread_id: str = "default", **kwargs) -> str: + """Use the async tool.""" + try: + # Import BeautifulSoup + try: + from bs4 import BeautifulSoup + except ImportError: + return ( + "The 'beautifulsoup4' package is required to use this tool." + " Please install it with 'pip install beautifulsoup4'." + ) + + # Get the current page + page = await self.get_async_page(thread_id) + + # Extract hyperlinks + content = await page.content() + soup = BeautifulSoup(content, "html.parser") + links = [] + for link in soup.find_all("a", href=True): + text = link.get_text().strip() + href = link["href"] + if href.startswith("http") or href.startswith("https"): + links.append({"text": text, "url": href}) + + if not links: + return "No hyperlinks found on the current page." + + return json.dumps(links, indent=2) + except Exception as e: + return f"Error extracting hyperlinks: {str(e)}" + + +class GetElementsTool(BrowserBaseTool): + """Tool for getting elements from a webpage.""" + name: str = "get_elements" + description: str = "Get elements from the webpage using a CSS selector" + args_schema: Type[BaseModel] = GetElementsToolInput + + def _run(self, selector: str, thread_id: str = "default", **kwargs) -> str: + """Use the sync tool.""" + try: + # Get the current page + page = self.get_sync_page(thread_id) + + # Get elements + elements = page.query_selector_all(selector) + if not elements: + return f"No elements found with selector '{selector}'" + + elements_text = [] + for i, element in enumerate(elements): + text = element.text_content() + elements_text.append(f"Element {i+1}: {text.strip()}") + + return "\n".join(elements_text) + except Exception as e: + return f"Error getting elements: {str(e)}" + + async def _arun(self, selector: str, thread_id: str = "default", **kwargs) -> str: + """Use the async tool.""" + try: + # Get the current page + page = await self.get_async_page(thread_id) + + # Get elements + elements = await page.query_selector_all(selector) + if not elements: + return f"No elements found with selector '{selector}'" + + elements_text = [] + for i, element in enumerate(elements): + text = await element.text_content() + elements_text.append(f"Element {i+1}: {text.strip()}") + + return "\n".join(elements_text) + except Exception as e: + return f"Error getting elements: {str(e)}" + + +class CurrentWebPageTool(BrowserBaseTool): + """Tool for getting information about the current webpage.""" + name: str = "current_webpage" + description: str = "Get information about the current webpage" + args_schema: Type[BaseModel] = CurrentWebPageToolInput + + def _run(self, thread_id: str = "default", **kwargs) -> str: + """Use the sync tool.""" + try: + # Get the current page + page = self.get_sync_page(thread_id) + + # Get information + url = page.url + title = page.title() + return f"URL: {url}\nTitle: {title}" + except Exception as e: + return f"Error getting current webpage info: {str(e)}" + + async def _arun(self, thread_id: str = "default", **kwargs) -> str: + """Use the async tool.""" + try: + # Get the current page + page = await self.get_async_page(thread_id) + + # Get information + url = page.url + title = await page.title() + return f"URL: {url}\nTitle: {title}" + except Exception as e: + return f"Error getting current webpage info: {str(e)}" + + +class BrowserToolkit: + """Toolkit for navigating web with AWS Bedrock browser. + + This toolkit provides a set of tools for working with a remote browser + and supports multiple threads by maintaining separate browser sessions + for each thread ID. Browsers are created lazily only when needed. + + Example: + ```python + from crewai import Agent, Task, Crew + from crewai_tools.aws.bedrock.browser import create_browser_toolkit + + # Create the browser toolkit + toolkit, browser_tools = create_browser_toolkit(region="us-west-2") + + # Create a CrewAI agent that uses the browser tools + research_agent = Agent( + role="Web Researcher", + goal="Research and summarize web content", + backstory="You're an expert at finding information online.", + tools=browser_tools + ) + + # Create a task for the agent + research_task = Task( + description="Navigate to https://example.com and extract all text content. Summarize the main points.", + agent=research_agent + ) + + # Create and run the crew + crew = Crew( + agents=[research_agent], + tasks=[research_task] + ) + result = crew.kickoff() + + # Clean up browser resources when done + import asyncio + asyncio.run(toolkit.cleanup()) + ``` + """ + + def __init__(self, region: str = "us-west-2"): + """ + Initialize the toolkit + + Args: + region: AWS region for the browser client + """ + self.region = region + self.session_manager = BrowserSessionManager(region=region) + self.tools: List[BaseTool] = [] + self._nest_current_loop() + self._setup_tools() + + def _nest_current_loop(self): + """Apply nest_asyncio if we're in an asyncio loop.""" + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + try: + import nest_asyncio + nest_asyncio.apply(loop) + except Exception as e: + logger.warning(f"Failed to apply nest_asyncio: {str(e)}") + except RuntimeError: + pass + + def _setup_tools(self) -> None: + """Initialize tools without creating any browsers.""" + self.tools = [ + NavigateTool(session_manager=self.session_manager), + ClickTool(session_manager=self.session_manager), + NavigateBackTool(session_manager=self.session_manager), + ExtractTextTool(session_manager=self.session_manager), + ExtractHyperlinksTool(session_manager=self.session_manager), + GetElementsTool(session_manager=self.session_manager), + CurrentWebPageTool(session_manager=self.session_manager) + ] + + def get_tools(self) -> List[BaseTool]: + """ + Get the list of browser tools + + Returns: + List of CrewAI tools + """ + return self.tools + + def get_tools_by_name(self) -> Dict[str, BaseTool]: + """ + Get a dictionary of tools mapped by their names + + Returns: + Dictionary of {tool_name: tool} + """ + return {tool.name: tool for tool in self.tools} + + async def cleanup(self) -> None: + """Clean up all browser sessions asynchronously""" + await self.session_manager.close_all_browsers() + logger.info("All browser sessions cleaned up") + + def sync_cleanup(self) -> None: + """Clean up all browser sessions from synchronous code""" + import asyncio + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.create_task(self.cleanup()) + else: + loop.run_until_complete(self.cleanup()) + except RuntimeError: + asyncio.run(self.cleanup()) + + +def create_browser_toolkit( + region: str = "us-west-2", +) -> Tuple[BrowserToolkit, List[BaseTool]]: + """ + Create a BrowserToolkit + + Args: + region: AWS region for browser client + + Returns: + Tuple of (toolkit, tools) + """ + toolkit = BrowserToolkit(region=region) + tools = toolkit.get_tools() + return toolkit, tools diff --git a/src/crewai_tools/aws/bedrock/browser/utils.py b/src/crewai_tools/aws/bedrock/browser/utils.py new file mode 100644 index 000000000..6e8b48e3a --- /dev/null +++ b/src/crewai_tools/aws/bedrock/browser/utils.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Union + +if TYPE_CHECKING: + from playwright.async_api import Browser as AsyncBrowser + from playwright.async_api import Page as AsyncPage + from playwright.sync_api import Browser as SyncBrowser + from playwright.sync_api import Page as SyncPage + + +async def aget_current_page(browser: Union[AsyncBrowser, Any]) -> AsyncPage: + """ + Asynchronously get the current page of the browser. + Args: + browser: The browser (AsyncBrowser) to get the current page from. + Returns: + AsyncPage: The current page. + """ + if not browser.contexts: + context = await browser.new_context() + return await context.new_page() + context = browser.contexts[0] + if not context.pages: + return await context.new_page() + return context.pages[-1] + + +def get_current_page(browser: Union[SyncBrowser, Any]) -> SyncPage: + """ + Get the current page of the browser. + Args: + browser: The browser to get the current page from. + Returns: + SyncPage: The current page. + """ + if not browser.contexts: + context = browser.new_context() + return context.new_page() + context = browser.contexts[0] + if not context.pages: + return context.new_page() + return context.pages[-1] \ No newline at end of file diff --git a/src/crewai_tools/aws/bedrock/code_interpreter/README.md b/src/crewai_tools/aws/bedrock/code_interpreter/README.md new file mode 100644 index 000000000..92e8ec5b2 --- /dev/null +++ b/src/crewai_tools/aws/bedrock/code_interpreter/README.md @@ -0,0 +1,217 @@ +# AWS Bedrock Code Interpreter Tools + +This toolkit provides a set of tools for interacting with the AWS Bedrock Code Interpreter environment. It enables your CrewAI agents to execute code, run shell commands, manage files, and perform computational tasks in a secure, isolated environment. + +## Features + +- Execute code in various languages (primarily Python) +- Run shell commands in the environment +- Read, write, list, and delete files +- Manage long-running tasks asynchronously +- Multiple code interpreter sessions with thread-based isolation + +## Installation + +Ensure you have the necessary dependencies: + +```bash +uv add crewai-tools bedrock-agentcore +``` + +## Usage + +### Basic Usage + +```python +from crewai import Agent, Task, Crew, LLM +from crewai_tools.aws import create_code_interpreter_toolkit + +# Create the code interpreter toolkit +toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2") + +# Create the Bedrock LLM +llm = LLM( + model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0", + region_name="us-west-2", +) + +# Create a CrewAI agent that uses the code interpreter tools +developer_agent = Agent( + role="Python Developer", + goal="Create and execute Python code to solve problems.", + backstory="You're a skilled Python developer with expertise in data analysis.", + tools=code_tools, + llm=llm +) + +# Create a task for the agent +coding_task = Task( + description="Write a Python function that calculates the factorial of a number and test it. Do not use any imports from outside the Python standard library.", + expected_output="The Python function created, and the test results.", + agent=developer_agent +) + +# Create and run the crew +crew = Crew( + agents=[developer_agent], + tasks=[coding_task] +) +result = crew.kickoff() + +print(f"\n***Final result:***\n\n{result}") + +# Clean up resources when done +import asyncio +asyncio.run(toolkit.cleanup()) +``` + +### Available Tools + +The toolkit provides the following tools: + +1. `execute_code` - Run code in various languages (primarily Python) +2. `execute_command` - Run shell commands in the environment +3. `read_files` - Read content of files in the environment +4. `list_files` - List files in directories +5. `delete_files` - Remove files from the environment +6. `write_files` - Create or update files +7. `start_command_execution` - Start long-running commands asynchronously +8. `get_task` - Check status of async tasks +9. `stop_task` - Stop running tasks + +### Advanced Usage + +```python +from crewai import Agent, Task, Crew, LLM +from crewai_tools.aws import create_code_interpreter_toolkit + +# Create the code interpreter toolkit +toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2") +tools_by_name = toolkit.get_tools_by_name() + +# Create the Bedrock LLM +llm = LLM( + model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0", + region_name="us-west-2", +) + +# Create agents with specific tools +code_agent = Agent( + role="Code Developer", + goal="Write and execute code", + backstory="You write and test code to solve complex problems.", + tools=[ + # Use specific tools by name + tools_by_name["execute_code"], + tools_by_name["execute_command"], + tools_by_name["read_files"], + tools_by_name["write_files"] + ], + llm=llm +) + +file_agent = Agent( + role="File Manager", + goal="Manage files in the environment", + backstory="You help organize and manage files in the code environment.", + tools=[ + # Use specific tools by name + tools_by_name["list_files"], + tools_by_name["read_files"], + tools_by_name["write_files"], + tools_by_name["delete_files"] + ], + llm=llm +) + +# Create tasks for the agents +coding_task = Task( + description="Write a Python script to analyze data from a CSV file. Do not use any imports from outside the Python standard library.", + expected_output="The Python function created.", + agent=code_agent +) + +file_task = Task( + description="Organize the created files into separate directories.", + agent=file_agent +) + +# Create and run the crew +crew = Crew( + agents=[code_agent, file_agent], + tasks=[coding_task, file_task] +) +result = crew.kickoff() + +print(f"\n***Final result:***\n\n{result}") + +# Clean up code interpreter resources when done +import asyncio +asyncio.run(toolkit.cleanup()) +``` + +### Example: Data Analysis with Python + +```python +from crewai import Agent, Task, Crew, LLM +from crewai_tools.aws import create_code_interpreter_toolkit + +# Create toolkit and tools +toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2") + +# Create the Bedrock LLM +llm = LLM( + model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0", + region_name="us-west-2", +) + +# Create a data analyst agent +analyst_agent = Agent( + role="Data Analyst", + goal="Analyze data using Python", + backstory="You're an expert data analyst who uses Python for data processing.", + tools=code_tools, + llm=llm +) + +# Create a task for the agent +analysis_task = Task( + description=""" + For all of the below, do not use any imports from outside the Python standard library. + 1. Create a sample dataset with random data + 2. Perform statistical analysis on the dataset + 3. Generate visualizations of the results + 4. Save the results and visualizations to files + """, + agent=analyst_agent +) + +# Create and run the crew +crew = Crew( + agents=[analyst_agent], + tasks=[analysis_task] +) +result = crew.kickoff() + +print(f"\n***Final result:***\n\n{result}") + +# Clean up resources +import asyncio +asyncio.run(toolkit.cleanup()) +``` + +## Resource Cleanup + +Always clean up code interpreter resources when done to prevent resource leaks: + +```python +import asyncio + +# Clean up all code interpreter sessions +asyncio.run(toolkit.cleanup()) +``` + +## Requirements + +- AWS account with access to Bedrock AgentCore API +- Properly configured AWS credentials \ No newline at end of file diff --git a/src/crewai_tools/aws/bedrock/code_interpreter/__init__.py b/src/crewai_tools/aws/bedrock/code_interpreter/__init__.py new file mode 100644 index 000000000..903c84e24 --- /dev/null +++ b/src/crewai_tools/aws/bedrock/code_interpreter/__init__.py @@ -0,0 +1,3 @@ +from .code_interpreter_toolkit import CodeInterpreterToolkit, create_code_interpreter_toolkit + +__all__ = ["CodeInterpreterToolkit", "create_code_interpreter_toolkit"] \ No newline at end of file diff --git a/src/crewai_tools/aws/bedrock/code_interpreter/code_interpreter_toolkit.py b/src/crewai_tools/aws/bedrock/code_interpreter/code_interpreter_toolkit.py new file mode 100644 index 000000000..4e697cafe --- /dev/null +++ b/src/crewai_tools/aws/bedrock/code_interpreter/code_interpreter_toolkit.py @@ -0,0 +1,543 @@ +"""Toolkit for working with AWS Bedrock Code Interpreter.""" +from __future__ import annotations + +import json +import logging +from typing import TYPE_CHECKING, Dict, List, Tuple, Optional, Type, Any + +from crewai.tools import BaseTool +from pydantic import BaseModel, Field + +if TYPE_CHECKING: + from bedrock_agentcore.tools.code_interpreter_client import CodeInterpreter + +logger = logging.getLogger(__name__) + + +def extract_output_from_stream(response): + """ + Extract output from code interpreter response stream + + Args: + response: Response from code interpreter execution + + Returns: + Extracted output as string + """ + output = [] + for event in response["stream"]: + if "result" in event: + result = event["result"] + for content_item in result["content"]: + if content_item["type"] == "text": + output.append(content_item["text"]) + if content_item["type"] == "resource": + resource = content_item["resource"] + if "text" in resource: + file_path = resource["uri"].replace("file://", "") + file_content = resource["text"] + output.append(f"==== File: {file_path} ====\n{file_content}\n") + else: + output.append(json.dumps(resource)) + + return "\n".join(output) + + +# Input schemas +class ExecuteCodeInput(BaseModel): + """Input for ExecuteCode.""" + code: str = Field(description="The code to execute") + language: str = Field(default="python", description="The programming language of the code") + clear_context: bool = Field(default=False, description="Whether to clear execution context") + thread_id: str = Field(default="default", description="Thread ID for the code interpreter session") + + +class ExecuteCommandInput(BaseModel): + """Input for ExecuteCommand.""" + command: str = Field(description="The command to execute") + thread_id: str = Field(default="default", description="Thread ID for the code interpreter session") + + +class ReadFilesInput(BaseModel): + """Input for ReadFiles.""" + paths: List[str] = Field(description="List of file paths to read") + thread_id: str = Field(default="default", description="Thread ID for the code interpreter session") + + +class ListFilesInput(BaseModel): + """Input for ListFiles.""" + directory_path: str = Field(default="", description="Path to the directory to list") + thread_id: str = Field(default="default", description="Thread ID for the code interpreter session") + + +class DeleteFilesInput(BaseModel): + """Input for DeleteFiles.""" + paths: List[str] = Field(description="List of file paths to delete") + thread_id: str = Field(default="default", description="Thread ID for the code interpreter session") + + +class WriteFilesInput(BaseModel): + """Input for WriteFiles.""" + files: List[Dict[str, str]] = Field(description="List of dictionaries with path and text fields") + thread_id: str = Field(default="default", description="Thread ID for the code interpreter session") + + +class StartCommandInput(BaseModel): + """Input for StartCommand.""" + command: str = Field(description="The command to execute asynchronously") + thread_id: str = Field(default="default", description="Thread ID for the code interpreter session") + + +class GetTaskInput(BaseModel): + """Input for GetTask.""" + task_id: str = Field(description="The ID of the task to check") + thread_id: str = Field(default="default", description="Thread ID for the code interpreter session") + + +class StopTaskInput(BaseModel): + """Input for StopTask.""" + task_id: str = Field(description="The ID of the task to stop") + thread_id: str = Field(default="default", description="Thread ID for the code interpreter session") + + +# Tool classes +class ExecuteCodeTool(BaseTool): + """Tool for executing code in various languages.""" + name: str = "execute_code" + description: str = "Execute code in various languages (primarily Python)" + args_schema: Type[BaseModel] = ExecuteCodeInput + toolkit: Any = Field(default=None, exclude=True) + + def __init__(self, toolkit): + super().__init__() + self.toolkit = toolkit + + def _run(self, code: str, language: str = "python", clear_context: bool = False, thread_id: str = "default") -> str: + try: + # Get or create code interpreter + code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id) + + # Execute code + response = code_interpreter.invoke( + method="executeCode", + params={"code": code, "language": language, "clearContext": clear_context}, + ) + + return extract_output_from_stream(response) + except Exception as e: + return f"Error executing code: {str(e)}" + + async def _arun(self, code: str, language: str = "python", clear_context: bool = False, thread_id: str = "default") -> str: + # Use _run as we're working with a synchronous API that's thread-safe + return self._run(code=code, language=language, clear_context=clear_context, thread_id=thread_id) + + +class ExecuteCommandTool(BaseTool): + """Tool for running shell commands in the code interpreter environment.""" + name: str = "execute_command" + description: str = "Run shell commands in the code interpreter environment" + args_schema: Type[BaseModel] = ExecuteCommandInput + toolkit: Any = Field(default=None, exclude=True) + + def __init__(self, toolkit): + super().__init__() + self.toolkit = toolkit + + def _run(self, command: str, thread_id: str = "default") -> str: + try: + # Get or create code interpreter + code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id) + + # Execute command + response = code_interpreter.invoke( + method="executeCommand", params={"command": command} + ) + + return extract_output_from_stream(response) + except Exception as e: + return f"Error executing command: {str(e)}" + + async def _arun(self, command: str, thread_id: str = "default") -> str: + # Use _run as we're working with a synchronous API that's thread-safe + return self._run(command=command, thread_id=thread_id) + + +class ReadFilesTool(BaseTool): + """Tool for reading content of files in the environment.""" + name: str = "read_files" + description: str = "Read content of files in the environment" + args_schema: Type[BaseModel] = ReadFilesInput + toolkit: Any = Field(default=None, exclude=True) + + def __init__(self, toolkit): + super().__init__() + self.toolkit = toolkit + + def _run(self, paths: List[str], thread_id: str = "default") -> str: + try: + # Get or create code interpreter + code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id) + + # Read files + response = code_interpreter.invoke(method="readFiles", params={"paths": paths}) + + return extract_output_from_stream(response) + except Exception as e: + return f"Error reading files: {str(e)}" + + async def _arun(self, paths: List[str], thread_id: str = "default") -> str: + # Use _run as we're working with a synchronous API that's thread-safe + return self._run(paths=paths, thread_id=thread_id) + + +class ListFilesTool(BaseTool): + """Tool for listing files in directories in the environment.""" + name: str = "list_files" + description: str = "List files in directories in the environment" + args_schema: Type[BaseModel] = ListFilesInput + toolkit: Any = Field(default=None, exclude=True) + + def __init__(self, toolkit): + super().__init__() + self.toolkit = toolkit + + def _run(self, directory_path: str = "", thread_id: str = "default") -> str: + try: + # Get or create code interpreter + code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id) + + # List files + response = code_interpreter.invoke( + method="listFiles", params={"directoryPath": directory_path} + ) + + return extract_output_from_stream(response) + except Exception as e: + return f"Error listing files: {str(e)}" + + async def _arun(self, directory_path: str = "", thread_id: str = "default") -> str: + # Use _run as we're working with a synchronous API that's thread-safe + return self._run(directory_path=directory_path, thread_id=thread_id) + + +class DeleteFilesTool(BaseTool): + """Tool for removing files from the environment.""" + name: str = "delete_files" + description: str = "Remove files from the environment" + args_schema: Type[BaseModel] = DeleteFilesInput + toolkit: Any = Field(default=None, exclude=True) + + def __init__(self, toolkit): + super().__init__() + self.toolkit = toolkit + + def _run(self, paths: List[str], thread_id: str = "default") -> str: + try: + # Get or create code interpreter + code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id) + + # Remove files + response = code_interpreter.invoke( + method="removeFiles", params={"paths": paths} + ) + + return extract_output_from_stream(response) + except Exception as e: + return f"Error deleting files: {str(e)}" + + async def _arun(self, paths: List[str], thread_id: str = "default") -> str: + # Use _run as we're working with a synchronous API that's thread-safe + return self._run(paths=paths, thread_id=thread_id) + + +class WriteFilesTool(BaseTool): + """Tool for creating or updating files in the environment.""" + name: str = "write_files" + description: str = "Create or update files in the environment" + args_schema: Type[BaseModel] = WriteFilesInput + toolkit: Any = Field(default=None, exclude=True) + + def __init__(self, toolkit): + super().__init__() + self.toolkit = toolkit + + def _run(self, files: List[Dict[str, str]], thread_id: str = "default") -> str: + try: + # Get or create code interpreter + code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id) + + # Write files + response = code_interpreter.invoke( + method="writeFiles", params={"content": files} + ) + + return extract_output_from_stream(response) + except Exception as e: + return f"Error writing files: {str(e)}" + + async def _arun(self, files: List[Dict[str, str]], thread_id: str = "default") -> str: + # Use _run as we're working with a synchronous API that's thread-safe + return self._run(files=files, thread_id=thread_id) + + +class StartCommandTool(BaseTool): + """Tool for starting long-running commands asynchronously.""" + name: str = "start_command_execution" + description: str = "Start long-running commands asynchronously" + args_schema: Type[BaseModel] = StartCommandInput + toolkit: Any = Field(default=None, exclude=True) + + def __init__(self, toolkit): + super().__init__() + self.toolkit = toolkit + + def _run(self, command: str, thread_id: str = "default") -> str: + try: + # Get or create code interpreter + code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id) + + # Start command execution + response = code_interpreter.invoke( + method="startCommandExecution", params={"command": command} + ) + + return extract_output_from_stream(response) + except Exception as e: + return f"Error starting command: {str(e)}" + + async def _arun(self, command: str, thread_id: str = "default") -> str: + # Use _run as we're working with a synchronous API that's thread-safe + return self._run(command=command, thread_id=thread_id) + + +class GetTaskTool(BaseTool): + """Tool for checking status of async tasks.""" + name: str = "get_task" + description: str = "Check status of async tasks" + args_schema: Type[BaseModel] = GetTaskInput + toolkit: Any = Field(default=None, exclude=True) + + def __init__(self, toolkit): + super().__init__() + self.toolkit = toolkit + + def _run(self, task_id: str, thread_id: str = "default") -> str: + try: + # Get or create code interpreter + code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id) + + # Get task status + response = code_interpreter.invoke(method="getTask", params={"taskId": task_id}) + + return extract_output_from_stream(response) + except Exception as e: + return f"Error getting task status: {str(e)}" + + async def _arun(self, task_id: str, thread_id: str = "default") -> str: + # Use _run as we're working with a synchronous API that's thread-safe + return self._run(task_id=task_id, thread_id=thread_id) + + +class StopTaskTool(BaseTool): + """Tool for stopping running tasks.""" + name: str = "stop_task" + description: str = "Stop running tasks" + args_schema: Type[BaseModel] = StopTaskInput + toolkit: Any = Field(default=None, exclude=True) + + def __init__(self, toolkit): + super().__init__() + self.toolkit = toolkit + + def _run(self, task_id: str, thread_id: str = "default") -> str: + try: + # Get or create code interpreter + code_interpreter = self.toolkit._get_or_create_interpreter(thread_id=thread_id) + + # Stop task + response = code_interpreter.invoke( + method="stopTask", params={"taskId": task_id} + ) + + return extract_output_from_stream(response) + except Exception as e: + return f"Error stopping task: {str(e)}" + + async def _arun(self, task_id: str, thread_id: str = "default") -> str: + # Use _run as we're working with a synchronous API that's thread-safe + return self._run(task_id=task_id, thread_id=thread_id) + + +class CodeInterpreterToolkit: + """Toolkit for working with AWS Bedrock code interpreter environment. + + This toolkit provides a set of tools for working with a remote code interpreter environment: + + * execute_code - Run code in various languages (primarily Python) + * execute_command - Run shell commands + * read_files - Read content of files in the environment + * list_files - List files in directories + * delete_files - Remove files from the environment + * write_files - Create or update files + * start_command_execution - Start long-running commands asynchronously + * get_task - Check status of async tasks + * stop_task - Stop running tasks + + The toolkit lazily initializes the code interpreter session on first use. + It supports multiple threads by maintaining separate code interpreter sessions for each thread ID. + + Example: + ```python + from crewai import Agent, Task, Crew + from crewai_tools.aws.bedrock.code_interpreter import create_code_interpreter_toolkit + + # Create the code interpreter toolkit + toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2") + + # Create a CrewAI agent that uses the code interpreter tools + developer_agent = Agent( + role="Python Developer", + goal="Create and execute Python code to solve problems", + backstory="You're a skilled Python developer with expertise in data analysis.", + tools=code_tools + ) + + # Create a task for the agent + coding_task = Task( + description="Write a Python function that calculates the factorial of a number and test it.", + agent=developer_agent + ) + + # Create and run the crew + crew = Crew( + agents=[developer_agent], + tasks=[coding_task] + ) + result = crew.kickoff() + + # Clean up resources when done + import asyncio + asyncio.run(toolkit.cleanup()) + ``` + """ + + def __init__(self, region: str = "us-west-2"): + """ + Initialize the toolkit + + Args: + region: AWS region for the code interpreter + """ + self.region = region + self._code_interpreters: Dict[str, CodeInterpreter] = {} + self.tools: List[BaseTool] = [] + self._setup_tools() + + def _setup_tools(self) -> None: + """Initialize tools without creating any code interpreter sessions.""" + self.tools = [ + ExecuteCodeTool(self), + ExecuteCommandTool(self), + ReadFilesTool(self), + ListFilesTool(self), + DeleteFilesTool(self), + WriteFilesTool(self), + StartCommandTool(self), + GetTaskTool(self), + StopTaskTool(self) + ] + + def _get_or_create_interpreter( + self, thread_id: str = "default" + ) -> CodeInterpreter: + """Get or create a code interpreter for the specified thread. + + Args: + thread_id: Thread ID for the code interpreter session + + Returns: + CodeInterpreter instance + """ + if thread_id in self._code_interpreters: + return self._code_interpreters[thread_id] + + # Create a new code interpreter for this thread + from bedrock_agentcore.tools.code_interpreter_client import CodeInterpreter + code_interpreter = CodeInterpreter(region=self.region) + code_interpreter.start() + logger.info( + f"Started code interpreter with session_id:{code_interpreter.session_id} for thread:{thread_id}" + ) + + # Store the interpreter + self._code_interpreters[thread_id] = code_interpreter + return code_interpreter + + + def get_tools(self) -> List[BaseTool]: + """ + Get the list of code interpreter tools + + Returns: + List of CrewAI tools + """ + return self.tools + + def get_tools_by_name(self) -> Dict[str, BaseTool]: + """ + Get a dictionary of tools mapped by their names + + Returns: + Dictionary of {tool_name: tool} + """ + return {tool.name: tool for tool in self.tools} + + async def cleanup(self, thread_id: Optional[str] = None) -> None: + """Clean up resources + + Args: + thread_id: Optional thread ID to clean up. If None, cleans up all sessions. + """ + if thread_id: + # Clean up a specific thread's session + if thread_id in self._code_interpreters: + try: + self._code_interpreters[thread_id].stop() + del self._code_interpreters[thread_id] + logger.info( + f"Code interpreter session for thread {thread_id} cleaned up" + ) + except Exception as e: + logger.warning( + f"Error stopping code interpreter for thread {thread_id}: {e}" + ) + else: + # Clean up all sessions + thread_ids = list(self._code_interpreters.keys()) + for tid in thread_ids: + try: + self._code_interpreters[tid].stop() + except Exception as e: + logger.warning( + f"Error stopping code interpreter for thread {tid}: {e}" + ) + + self._code_interpreters = {} + logger.info("All code interpreter sessions cleaned up") + + +def create_code_interpreter_toolkit( + region: str = "us-west-2", +) -> Tuple[CodeInterpreterToolkit, List[BaseTool]]: + """ + Create a CodeInterpreterToolkit + + Args: + region: AWS region for code interpreter + + Returns: + Tuple of (toolkit, tools) + """ + toolkit = CodeInterpreterToolkit(region=region) + tools = toolkit.get_tools() + return toolkit, tools \ No newline at end of file