prep new version

This commit is contained in:
João Moura
2025-03-17 18:58:45 -07:00
parent 568aace62e
commit 658c23547e

View File

@@ -229,30 +229,22 @@ class DatabricksQueryTool(BaseTool):
poll_count = 0
previous_state = None # Track previous state to detect changes
print(f"Starting to poll for statement ID: {statement_id}")
while time.time() - start_time < timeout:
poll_count += 1
try:
# Get statement status
result = statement.get_statement(statement_id)
# Debug info
if poll_count % 5 == 0: # Log every 5th poll
print(f"Poll #{poll_count}: State={result.status.state if hasattr(result, 'status') else 'Unknown'}")
# Check if finished - be very explicit about state checking
if hasattr(result, 'status') and hasattr(result.status, 'state'):
state_value = str(result.status.state) # Convert to string to handle both string and enum
# Track state changes for debugging
if previous_state != state_value:
print(f"State changed from {previous_state} to {state_value}")
previous_state = state_value
# Check if state indicates completion
if "SUCCEEDED" in state_value:
print(f"Query succeeded after {poll_count} polls")
break
elif "FAILED" in state_value:
# Extract error message with more robust handling
@@ -272,29 +264,12 @@ class DatabricksQueryTool(BaseTool):
# If all else fails, try to get any info we can
error_info = f"Error details unavailable: {str(err_extract_error)}"
# Print error for debugging
print(f"Query failed after {poll_count} polls: {error_info}")
# Output full status object for debugging
print(f"Full status object: {dir(result.status)}")
if hasattr(result.status, 'error'):
print(f"Error object details: {dir(result.status.error)}")
# Return immediately on first FAILED state detection
print(f"Exiting polling loop after detecting FAILED state")
return f"Query execution failed: {error_info}"
elif "CANCELED" in state_value:
print(f"Query was canceled after {poll_count} polls")
return "Query was canceled"
else:
# Print state for debugging if not recognized
if poll_count % 5 == 0:
print(f"Current state: {state_value}")
else:
print(f"Warning: Result structure does not contain expected status attributes")
except Exception as poll_error:
print(f"Error during polling (attempt #{poll_count}): {str(poll_error)}")
# Don't immediately fail - try again a few times
if poll_count > 3:
return f"Error checking query status: {str(poll_error)}"
@@ -317,13 +292,6 @@ class DatabricksQueryTool(BaseTool):
# Get results - adapt this based on the actual structure of the result object
chunk_results = []
# Debug info - print the result structure to help debug
print(f"Result structure: {dir(result)}")
if hasattr(result, 'manifest'):
print(f"Manifest structure: {dir(result.manifest)}")
if hasattr(result, 'result'):
print(f"Result data structure: {dir(result.result)}")
# Check if we have results and a schema in a very defensive way
has_schema = (hasattr(result, 'manifest') and result.manifest is not None and
hasattr(result.manifest, 'schema') and result.manifest.schema is not None)
@@ -335,26 +303,17 @@ class DatabricksQueryTool(BaseTool):
columns = [col.name for col in result.manifest.schema.columns]
# Debug info for schema
print(f"Schema columns: {columns}")
print(f"Number of columns in schema: {len(columns)}")
print(f"Type of result.result: {type(result.result)}")
# Keep track of all dynamic columns we create
all_columns = set(columns)
# Dump the raw structure of result data to help troubleshoot
if hasattr(result.result, 'data_array'):
print(f"data_array structure: {type(result.result.data_array)}")
# Add defensive check for None data_array
if result.result.data_array is None:
print("data_array is None - likely an empty result set or DDL query")
# Return empty result handling rather than trying to process null data
return "Query executed successfully (no data returned)"
elif result.result.data_array and len(result.result.data_array) > 0:
print(f"First chunk type: {type(result.result.data_array[0])}")
if len(result.result.data_array[0]) > 0:
print(f"First row type: {type(result.result.data_array[0][0])}")
print(f"First row value: {result.result.data_array[0][0]}")
# IMPROVED DETECTION LOGIC: Check if we're possibly dealing with rows where each item
# contains a single value or character (which could indicate incorrect row structure)
@@ -380,7 +339,6 @@ class DatabricksQueryTool(BaseTool):
# If a significant portion of the first values are single characters or digits,
# this likely indicates data is being incorrectly structured
if total_items > 0 and (single_char_count + single_digit_count) / total_items > 0.5:
print(f"Detected potential incorrect row structure: {single_char_count} single chars, {single_digit_count} digits out of {total_items} total items")
is_likely_incorrect_row_structure = True
# Additional check: if many rows have just 1 item when we expect multiple columns
@@ -389,7 +347,6 @@ class DatabricksQueryTool(BaseTool):
sample_size_for_rows = min(sample_size, len(result.result.data_array[0])) if 'sample_size' in locals() else min(20, len(result.result.data_array[0]))
rows_with_single_item = sum(1 for row in result.result.data_array[0][:sample_size_for_rows] if isinstance(row, list) and len(row) == 1)
if rows_with_single_item > sample_size_for_rows * 0.5 and len(columns) > 1:
print(f"Many rows ({rows_with_single_item}/{sample_size_for_rows}) have only a single value when expecting {len(columns)} columns")
is_likely_incorrect_row_structure = True
# Check if we're getting primarily single characters or the data structure seems off,
@@ -416,14 +373,8 @@ class DatabricksQueryTool(BaseTool):
else:
all_values.append(item)
# Print what we gathered
print(f"Collected {len(all_values)} total values")
if len(all_values) > 0:
print(f"Sample values: {all_values[:20]}")
# Get the expected column count from schema
expected_column_count = len(columns)
print(f"Expected columns per row: {expected_column_count}")
# Try to reconstruct rows using pattern recognition
reconstructed_rows = []
@@ -445,10 +396,8 @@ class DatabricksQueryTool(BaseTool):
# If following values look like they could be part of a title
if any(isinstance(v, str) and len(v) > 1 for v in next_few_values):
id_indices.append(i)
print(f"Found potential row start at index {i}: {val}")
if id_indices:
print(f"Identified {len(id_indices)} potential row boundaries")
# If we found potential row starts, use them to extract rows
for i in range(len(id_indices)):
@@ -461,7 +410,6 @@ class DatabricksQueryTool(BaseTool):
# Special handling for Netflix title data
# Titles might be split into individual characters
if 'Title' in columns and len(row_values) > expected_column_count:
print(f"Row has {len(row_values)} values, likely contains split strings")
# Try to reconstruct by looking for patterns
# We know ID is first, then Title (which may be split)
@@ -487,7 +435,6 @@ class DatabricksQueryTool(BaseTool):
if all(isinstance(c, str) and len(c) == 1 for c in title_chars):
title = ''.join(title_chars)
row_dict['Title'] = title
print(f"Reconstructed title: {title}")
# Assign remaining values to columns
remaining_values = row_values[title_end_idx:]
@@ -514,49 +461,42 @@ class DatabricksQueryTool(BaseTool):
reconstructed_rows.append(row_dict)
else:
# If pattern recognition didn't work, try more sophisticated reconstruction
print("Pattern recognition did not find row boundaries, trying alternative methods")
# More intelligent chunking - try to detect where columns like Title might be split
try:
title_idx = columns.index('Title') if 'Title' in columns else -1
title_idx = columns.index('Title') if 'Title' in columns else -1
if title_idx >= 0:
print("Attempting title reconstruction method")
# Try to detect if title is split across multiple values
i = 0
while i < len(all_values):
# Check if this could be an ID (start of a row)
if isinstance(all_values[i], str) and id_pattern.match(all_values[i]):
row_dict = {columns[0]: all_values[i]}
if title_idx >= 0:
print("Attempting title reconstruction method")
# Try to detect if title is split across multiple values
i = 0
while i < len(all_values):
# Check if this could be an ID (start of a row)
if isinstance(all_values[i], str) and id_pattern.match(all_values[i]):
row_dict = {columns[0]: all_values[i]}
i += 1
# Try to reconstruct title if it appears to be split
title_chars = []
while (i < len(all_values) and
isinstance(all_values[i], str) and
len(all_values[i]) <= 1 and
len(title_chars) < 100): # Cap title length
title_chars.append(all_values[i])
i += 1
# Try to reconstruct title if it appears to be split
title_chars = []
while (i < len(all_values) and
isinstance(all_values[i], str) and
len(all_values[i]) <= 1 and
len(title_chars) < 100): # Cap title length
title_chars.append(all_values[i])
if title_chars:
row_dict[columns[title_idx]] = ''.join(title_chars)
# Add remaining fields
for j in range(title_idx + 1, len(columns)):
if i < len(all_values):
row_dict[columns[j]] = all_values[i]
i += 1
else:
row_dict[columns[j]] = None
if title_chars:
row_dict[columns[title_idx]] = ''.join(title_chars)
print(f"Reconstructed title by joining characters: {row_dict[columns[title_idx]]}")
# Add remaining fields
for j in range(title_idx + 1, len(columns)):
if i < len(all_values):
row_dict[columns[j]] = all_values[i]
i += 1
else:
row_dict[columns[j]] = None
reconstructed_rows.append(row_dict)
else:
i += 1
except Exception as e:
print(f"Error during title reconstruction: {e}")
reconstructed_rows.append(row_dict)
else:
i += 1
# If we still don't have rows, use simple chunking as fallback
if not reconstructed_rows:
@@ -587,14 +527,11 @@ class DatabricksQueryTool(BaseTool):
if isinstance(row.get('Title'), str) and len(row.get('Title')) <= 1:
# This is likely still a fragmented title - mark as potentially incomplete
row['Title'] = f"[INCOMPLETE] {row.get('Title')}"
print(f"Found potentially incomplete title: {row.get('Title')}")
# Ensure we respect the row limit
if row_limit and len(reconstructed_rows) > row_limit:
reconstructed_rows = reconstructed_rows[:row_limit]
print(f"Limited to {row_limit} rows as requested")
print(f"Successfully reconstructed {len(reconstructed_rows)} rows")
chunk_results = reconstructed_rows
else:
# Process normal result structure as before
@@ -604,7 +541,6 @@ class DatabricksQueryTool(BaseTool):
if hasattr(result.result, 'data_array') and result.result.data_array:
# Check if data appears to be malformed within chunks
for chunk_idx, chunk in enumerate(result.result.data_array):
print(f"Processing chunk {chunk_idx} with {len(chunk)} values")
# Check if chunk might actually contain individual columns of a single row
# This is another way data might be malformed - check the first few values
@@ -627,7 +563,6 @@ class DatabricksQueryTool(BaseTool):
reconstructed_rows.append(row_dict)
if reconstructed_rows:
print(f"Reconstructed {len(reconstructed_rows)} rows from chunk")
chunk_results.extend(reconstructed_rows)
continue # Skip normal processing for this chunk
@@ -635,7 +570,6 @@ class DatabricksQueryTool(BaseTool):
# This handles the case where instead of a list of rows, we just got all values in a flat list
if all(isinstance(val, (str, int, float)) and not isinstance(val, (list, dict)) for val in chunk):
if len(chunk) == len(columns) or (len(chunk) > 0 and len(chunk) % len(columns) == 0):
print(f"Chunk appears to contain flat values - treating as rows with {len(columns)} columns each")
# Process flat list of values as rows
for i in range(0, len(chunk), len(columns)):
@@ -643,7 +577,6 @@ class DatabricksQueryTool(BaseTool):
if len(row_values) == len(columns): # Only process complete rows
row_dict = {col: val for col, val in zip(columns, row_values)}
chunk_results.append(row_dict)
print(f"Created row from flat values: {row_dict}")
# Skip regular row processing for this chunk
continue
@@ -652,16 +585,9 @@ class DatabricksQueryTool(BaseTool):
for row_idx, row in enumerate(chunk):
# Ensure row is actually a collection of values
if not isinstance(row, (list, tuple, dict)):
print(f"Row {row_idx} is not a collection: {row} ({type(row)})")
# This might be a single value; skip it or handle specially
continue
# Debug info for this row
if isinstance(row, (list, tuple)):
print(f"Row {row_idx} has {len(row)} values")
elif isinstance(row, dict):
print(f"Row {row_idx} already has column mapping: {list(row.keys())}")
# Convert each row to a dictionary with column names as keys
row_dict = {}
@@ -689,11 +615,9 @@ class DatabricksQueryTool(BaseTool):
elif hasattr(result.result, 'data') and result.result.data:
# Alternative data structure
print(f"Processing data with {len(result.result.data)} rows")
for row_idx, row in enumerate(result.result.data):
# Debug info
print(f"Row {row_idx} has {len(row)} values")
# Safely create dictionary matching column names to values
row_dict = {}
@@ -714,7 +638,6 @@ class DatabricksQueryTool(BaseTool):
chunk_results.append(row_dict)
# After processing all rows, ensure all rows have all columns
print(f"All columns detected: {all_columns}")
normalized_results = []
for row in chunk_results:
# Create a new row with all columns, defaulting to None for missing ones
@@ -724,16 +647,10 @@ class DatabricksQueryTool(BaseTool):
# Replace the original results with normalized ones
chunk_results = normalized_results
# Print the processed results for debugging
print(f"Processed {len(chunk_results)} rows")
for i, row in enumerate(chunk_results[:3]): # Show only first 3 rows to avoid log spam
print(f"Row {i}: {row}")
except Exception as results_error:
# Enhanced error message with more context
import traceback
error_details = traceback.format_exc()
print(f"Error processing results: {error_details}")
return f"Error processing query results: {str(results_error)}\n\nDetails:\n{error_details}"
# If we have no results but the query succeeded (e.g., for DDL statements)