mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 16:48:30 +00:00
fix: enhance PlusAPI and TraceBatchManager with timeout handling and graceful failure logging (#3416)
* Added timeout parameters to PlusAPI trace event methods for improved reliability. * Updated TraceBatchManager to handle None responses gracefully, logging warnings instead of errors. * Improved logging messages to provide clearer context during trace batch initialization and event sending failures.
This commit is contained in:
@@ -119,12 +119,17 @@ class PlusAPI:
|
||||
|
||||
def initialize_trace_batch(self, payload) -> requests.Response:
|
||||
return self._make_request(
|
||||
"POST", f"{self.TRACING_RESOURCE}/batches", json=payload
|
||||
"POST",
|
||||
f"{self.TRACING_RESOURCE}/batches",
|
||||
json=payload,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
def initialize_ephemeral_trace_batch(self, payload) -> requests.Response:
|
||||
return self._make_request(
|
||||
"POST", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches", json=payload
|
||||
"POST",
|
||||
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches",
|
||||
json=payload,
|
||||
)
|
||||
|
||||
def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response:
|
||||
@@ -150,6 +155,7 @@ class PlusAPI:
|
||||
"PATCH",
|
||||
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
|
||||
json=payload,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
def finalize_ephemeral_trace_batch(
|
||||
@@ -159,4 +165,5 @@ class PlusAPI:
|
||||
"PATCH",
|
||||
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
|
||||
json=payload,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
@@ -56,6 +56,8 @@ class TokenManager:
|
||||
:return: The access token if valid and not expired, otherwise None.
|
||||
"""
|
||||
encrypted_data = self.read_secure_file(self.file_path)
|
||||
if encrypted_data is None:
|
||||
return None
|
||||
|
||||
decrypted_data = self.fernet.decrypt(encrypted_data) # type: ignore
|
||||
data = json.loads(decrypted_data)
|
||||
|
||||
@@ -50,7 +50,9 @@ class TraceBatchManager:
|
||||
|
||||
def __init__(self):
|
||||
try:
|
||||
self.plus_api = PlusAPI(api_key=get_auth_token())
|
||||
self.plus_api = PlusAPI(
|
||||
api_key=get_auth_token(),
|
||||
)
|
||||
except AuthError:
|
||||
self.plus_api = PlusAPI(api_key="")
|
||||
|
||||
@@ -114,7 +116,13 @@ class TraceBatchManager:
|
||||
else self.plus_api.initialize_trace_batch(payload)
|
||||
)
|
||||
|
||||
if response.status_code == 201 or response.status_code == 200:
|
||||
if response is None:
|
||||
logger.warning(
|
||||
"Trace batch initialization failed gracefully. Continuing without tracing."
|
||||
)
|
||||
return
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
response_data = response.json()
|
||||
self.trace_batch_id = (
|
||||
response_data["trace_id"]
|
||||
@@ -129,19 +137,21 @@ class TraceBatchManager:
|
||||
)
|
||||
console.print(panel)
|
||||
else:
|
||||
logger.error(
|
||||
f"❌ Failed to initialize trace batch: {response.status_code} - {response.text}"
|
||||
logger.warning(
|
||||
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error initializing trace batch: {str(e)}")
|
||||
logger.warning(
|
||||
f"Error initializing trace batch: {str(e)}. Continuing without tracing."
|
||||
)
|
||||
|
||||
def add_event(self, trace_event: TraceEvent):
|
||||
"""Add event to buffer"""
|
||||
self.event_buffer.append(trace_event)
|
||||
|
||||
def _send_events_to_backend(self):
|
||||
"""Send buffered events to backend"""
|
||||
"""Send buffered events to backend with graceful failure handling"""
|
||||
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
|
||||
return
|
||||
|
||||
@@ -155,24 +165,27 @@ class TraceBatchManager:
|
||||
},
|
||||
}
|
||||
|
||||
if not self.trace_batch_id:
|
||||
raise Exception("❌ Trace batch ID not found")
|
||||
|
||||
response = (
|
||||
self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload)
|
||||
if self.is_current_batch_ephemeral
|
||||
else self.plus_api.send_trace_events(self.trace_batch_id, payload)
|
||||
)
|
||||
|
||||
if response.status_code == 200 or response.status_code == 201:
|
||||
if response is None:
|
||||
logger.warning("Failed to send trace events. Events will be lost.")
|
||||
return
|
||||
|
||||
if response.status_code in [200, 201]:
|
||||
self.event_buffer.clear()
|
||||
else:
|
||||
logger.error(
|
||||
f"❌ Failed to send events: {response.status_code} - {response.text}"
|
||||
logger.warning(
|
||||
f"Failed to send events: {response.status_code}. Events will be lost."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error sending events to backend: {str(e)}")
|
||||
logger.warning(
|
||||
f"Error sending events to backend: {str(e)}. Events will be lost."
|
||||
)
|
||||
|
||||
def finalize_batch(self) -> Optional[TraceBatch]:
|
||||
"""Finalize batch and return it for sending"""
|
||||
|
||||
Reference in New Issue
Block a user