Fix type-checker issues in SSE client and MCP connector

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-04-27 16:41:06 +00:00
parent f738e9ab62
commit 66d35df858
2 changed files with 9 additions and 5 deletions

View File

@@ -31,7 +31,7 @@ class MCPToolConnector:
def connect(self) -> None:
"""Connect to the MCP SSE server for tools."""
token = self.token_manager.get_access_token()
token = self.token_manager.get_token()
if not token:
self.logger.error("Authentication token not found. Please log in first.")
raise ValueError("Authentication token not found. Please log in first.")

View File

@@ -78,9 +78,9 @@ class SSEClient:
stream=True,
timeout=self.timeout
)
self._response.raise_for_status()
self._client = sseclient.SSEClient(self._response)
if self._response is not None:
self._response.raise_for_status()
self._client = sseclient.SSEClient(self._response)
except Exception as e:
self.logger.error(f"Error connecting to SSE server: {str(e)}")
@@ -93,7 +93,7 @@ class SSEClient:
)
raise
def listen(self, event_handlers: Dict[str, Callable[[Any], None]] = None) -> None:
def listen(self, event_handlers: Optional[Dict[str, Callable[[Any], None]]] = None) -> None:
"""Listen for SSE events and process them with registered handlers.
Args:
@@ -105,6 +105,10 @@ class SSEClient:
event_handlers = event_handlers or {}
try:
if self._client is None:
self.logger.error("SSE client is not initialized")
return
for event in self._client:
event_type = event.event or "message"
data = None