diff --git a/lib/crewai-tools/src/crewai_tools/aws/bedrock/browser/browser_session_manager.py b/lib/crewai-tools/src/crewai_tools/aws/bedrock/browser/browser_session_manager.py index dc4f60528..697506cb4 100644 --- a/lib/crewai-tools/src/crewai_tools/aws/bedrock/browser/browser_session_manager.py +++ b/lib/crewai-tools/src/crewai_tools/aws/bedrock/browser/browser_session_manager.py @@ -19,6 +19,9 @@ class BrowserSessionManager: This class maintains separate browser sessions for different threads, enabling concurrent usage of browsers in multi-threaded environments. Browsers are created lazily only when needed by tools. + + Uses per-key events to serialize creation for the same thread_id without + blocking unrelated callers or wasting resources on duplicate sessions. """ def __init__(self, region: str = "us-west-2"): @@ -31,6 +34,7 @@ class BrowserSessionManager: self._lock = threading.Lock() self._async_sessions: dict[str, tuple[BrowserClient, AsyncBrowser]] = {} self._sync_sessions: dict[str, tuple[BrowserClient, SyncBrowser]] = {} + self._creating: dict[str, threading.Event] = {} async def get_async_browser(self, thread_id: str) -> AsyncBrowser: """Get or create an async browser for the specified thread. @@ -41,11 +45,28 @@ class BrowserSessionManager: Returns: An async browser instance specific to the thread """ - with self._lock: - if thread_id in self._async_sessions: - return self._async_sessions[thread_id][1] + while True: + with self._lock: + if thread_id in self._async_sessions: + return self._async_sessions[thread_id][1] + if thread_id not in self._creating: + self._creating[thread_id] = threading.Event() + break + event = self._creating[thread_id] + event.wait() - return await self._create_async_browser_session(thread_id) + try: + browser_client, browser = await self._create_async_browser_session( + thread_id + ) + with self._lock: + self._async_sessions[thread_id] = (browser_client, browser) + return browser + finally: + with self._lock: + event = self._creating.pop(thread_id, None) + if event is not None: + event.set() def get_sync_browser(self, thread_id: str) -> SyncBrowser: """Get or create a sync browser for the specified thread. @@ -56,20 +77,34 @@ class BrowserSessionManager: Returns: A sync browser instance specific to the thread """ - with self._lock: - if thread_id in self._sync_sessions: - return self._sync_sessions[thread_id][1] + while True: + with self._lock: + if thread_id in self._sync_sessions: + return self._sync_sessions[thread_id][1] + if thread_id not in self._creating: + self._creating[thread_id] = threading.Event() + break + event = self._creating[thread_id] + event.wait() - return self._create_sync_browser_session(thread_id) + try: + return self._create_sync_browser_session(thread_id) + finally: + with self._lock: + event = self._creating.pop(thread_id, None) + if event is not None: + event.set() - async def _create_async_browser_session(self, thread_id: str) -> AsyncBrowser: + async def _create_async_browser_session( + self, thread_id: str + ) -> tuple[BrowserClient, AsyncBrowser]: """Create a new async browser session for the specified thread. Args: thread_id: Unique identifier for the thread Returns: - The newly created async browser instance + Tuple of (BrowserClient, AsyncBrowser). Raises: Exception: If browser session creation fails @@ -79,10 +114,8 @@ class BrowserSessionManager: browser_client = BrowserClient(region=self.region) try: - # Start browser session browser_client.start() - # Get WebSocket connection info ws_url, headers = browser_client.generate_ws_headers() logger.info( @@ -91,7 +124,6 @@ class BrowserSessionManager: from playwright.async_api import async_playwright - # Connect to browser using Playwright playwright = await async_playwright().start() browser = await playwright.chromium.connect_over_cdp( endpoint_url=ws_url, headers=headers, timeout=30000 @@ -100,18 +132,13 @@ class BrowserSessionManager: f"Successfully connected to async browser for thread {thread_id}" ) - # Store session resources - with self._lock: - self._async_sessions[thread_id] = (browser_client, browser) - - return browser + return browser_client, browser except Exception as e: logger.error( f"Failed to create async browser session for thread {thread_id}: {e}" ) - # Clean up resources if session creation fails if browser_client: try: browser_client.stop() @@ -137,10 +164,8 @@ class BrowserSessionManager: browser_client = BrowserClient(region=self.region) try: - # Start browser session browser_client.start() - # Get WebSocket connection info ws_url, headers = browser_client.generate_ws_headers() logger.info( @@ -149,7 +174,6 @@ class BrowserSessionManager: from playwright.sync_api import sync_playwright - # Connect to browser using Playwright playwright = sync_playwright().start() browser = playwright.chromium.connect_over_cdp( endpoint_url=ws_url, headers=headers, timeout=30000 @@ -158,7 +182,6 @@ class BrowserSessionManager: f"Successfully connected to sync browser for thread {thread_id}" ) - # Store session resources with self._lock: self._sync_sessions[thread_id] = (browser_client, browser) @@ -169,7 +192,6 @@ class BrowserSessionManager: f"Failed to create sync browser session for thread {thread_id}: {e}" ) - # Clean up resources if session creation fails if browser_client: try: browser_client.stop() @@ -191,7 +213,6 @@ class BrowserSessionManager: browser_client, browser = self._async_sessions.pop(thread_id) - # Close browser if browser: try: await browser.close() @@ -200,7 +221,6 @@ class BrowserSessionManager: f"Error closing async browser for thread {thread_id}: {e}" ) - # Stop browser client if browser_client: try: browser_client.stop() @@ -224,7 +244,6 @@ class BrowserSessionManager: browser_client, browser = self._sync_sessions.pop(thread_id) - # Close browser if browser: try: browser.close() @@ -233,7 +252,6 @@ class BrowserSessionManager: f"Error closing sync browser for thread {thread_id}: {e}" ) - # Stop browser client if browser_client: try: browser_client.stop()