diff --git a/src/crewai_tools/tools/databricks_query_tool/databricks_query_tool.py b/src/crewai_tools/tools/databricks_query_tool/databricks_query_tool.py index 24ed6e6a0..e6381c8c5 100644 --- a/src/crewai_tools/tools/databricks_query_tool/databricks_query_tool.py +++ b/src/crewai_tools/tools/databricks_query_tool/databricks_query_tool.py @@ -229,30 +229,22 @@ class DatabricksQueryTool(BaseTool): poll_count = 0 previous_state = None # Track previous state to detect changes - print(f"Starting to poll for statement ID: {statement_id}") - while time.time() - start_time < timeout: poll_count += 1 try: # Get statement status result = statement.get_statement(statement_id) - # Debug info - if poll_count % 5 == 0: # Log every 5th poll - print(f"Poll #{poll_count}: State={result.status.state if hasattr(result, 'status') else 'Unknown'}") - # Check if finished - be very explicit about state checking if hasattr(result, 'status') and hasattr(result.status, 'state'): state_value = str(result.status.state) # Convert to string to handle both string and enum # Track state changes for debugging if previous_state != state_value: - print(f"State changed from {previous_state} to {state_value}") previous_state = state_value # Check if state indicates completion if "SUCCEEDED" in state_value: - print(f"Query succeeded after {poll_count} polls") break elif "FAILED" in state_value: # Extract error message with more robust handling @@ -272,29 +264,12 @@ class DatabricksQueryTool(BaseTool): # If all else fails, try to get any info we can error_info = f"Error details unavailable: {str(err_extract_error)}" - # Print error for debugging - print(f"Query failed after {poll_count} polls: {error_info}") - - # Output full status object for debugging - print(f"Full status object: {dir(result.status)}") - if hasattr(result.status, 'error'): - print(f"Error object details: {dir(result.status.error)}") - # Return immediately on first FAILED state detection - print(f"Exiting polling loop after detecting FAILED state") return f"Query execution failed: {error_info}" elif "CANCELED" in state_value: - print(f"Query was canceled after {poll_count} polls") return "Query was canceled" - else: - # Print state for debugging if not recognized - if poll_count % 5 == 0: - print(f"Current state: {state_value}") - else: - print(f"Warning: Result structure does not contain expected status attributes") except Exception as poll_error: - print(f"Error during polling (attempt #{poll_count}): {str(poll_error)}") # Don't immediately fail - try again a few times if poll_count > 3: return f"Error checking query status: {str(poll_error)}" @@ -317,13 +292,6 @@ class DatabricksQueryTool(BaseTool): # Get results - adapt this based on the actual structure of the result object chunk_results = [] - # Debug info - print the result structure to help debug - print(f"Result structure: {dir(result)}") - if hasattr(result, 'manifest'): - print(f"Manifest structure: {dir(result.manifest)}") - if hasattr(result, 'result'): - print(f"Result data structure: {dir(result.result)}") - # Check if we have results and a schema in a very defensive way has_schema = (hasattr(result, 'manifest') and result.manifest is not None and hasattr(result.manifest, 'schema') and result.manifest.schema is not None) @@ -335,26 +303,17 @@ class DatabricksQueryTool(BaseTool): columns = [col.name for col in result.manifest.schema.columns] # Debug info for schema - print(f"Schema columns: {columns}") - print(f"Number of columns in schema: {len(columns)}") - print(f"Type of result.result: {type(result.result)}") # Keep track of all dynamic columns we create all_columns = set(columns) # Dump the raw structure of result data to help troubleshoot if hasattr(result.result, 'data_array'): - print(f"data_array structure: {type(result.result.data_array)}") # Add defensive check for None data_array if result.result.data_array is None: print("data_array is None - likely an empty result set or DDL query") # Return empty result handling rather than trying to process null data return "Query executed successfully (no data returned)" - elif result.result.data_array and len(result.result.data_array) > 0: - print(f"First chunk type: {type(result.result.data_array[0])}") - if len(result.result.data_array[0]) > 0: - print(f"First row type: {type(result.result.data_array[0][0])}") - print(f"First row value: {result.result.data_array[0][0]}") # IMPROVED DETECTION LOGIC: Check if we're possibly dealing with rows where each item # contains a single value or character (which could indicate incorrect row structure) @@ -380,7 +339,6 @@ class DatabricksQueryTool(BaseTool): # If a significant portion of the first values are single characters or digits, # this likely indicates data is being incorrectly structured if total_items > 0 and (single_char_count + single_digit_count) / total_items > 0.5: - print(f"Detected potential incorrect row structure: {single_char_count} single chars, {single_digit_count} digits out of {total_items} total items") is_likely_incorrect_row_structure = True # Additional check: if many rows have just 1 item when we expect multiple columns @@ -389,7 +347,6 @@ class DatabricksQueryTool(BaseTool): sample_size_for_rows = min(sample_size, len(result.result.data_array[0])) if 'sample_size' in locals() else min(20, len(result.result.data_array[0])) rows_with_single_item = sum(1 for row in result.result.data_array[0][:sample_size_for_rows] if isinstance(row, list) and len(row) == 1) if rows_with_single_item > sample_size_for_rows * 0.5 and len(columns) > 1: - print(f"Many rows ({rows_with_single_item}/{sample_size_for_rows}) have only a single value when expecting {len(columns)} columns") is_likely_incorrect_row_structure = True # Check if we're getting primarily single characters or the data structure seems off, @@ -416,14 +373,8 @@ class DatabricksQueryTool(BaseTool): else: all_values.append(item) - # Print what we gathered - print(f"Collected {len(all_values)} total values") - if len(all_values) > 0: - print(f"Sample values: {all_values[:20]}") - # Get the expected column count from schema expected_column_count = len(columns) - print(f"Expected columns per row: {expected_column_count}") # Try to reconstruct rows using pattern recognition reconstructed_rows = [] @@ -445,10 +396,8 @@ class DatabricksQueryTool(BaseTool): # If following values look like they could be part of a title if any(isinstance(v, str) and len(v) > 1 for v in next_few_values): id_indices.append(i) - print(f"Found potential row start at index {i}: {val}") if id_indices: - print(f"Identified {len(id_indices)} potential row boundaries") # If we found potential row starts, use them to extract rows for i in range(len(id_indices)): @@ -461,7 +410,6 @@ class DatabricksQueryTool(BaseTool): # Special handling for Netflix title data # Titles might be split into individual characters if 'Title' in columns and len(row_values) > expected_column_count: - print(f"Row has {len(row_values)} values, likely contains split strings") # Try to reconstruct by looking for patterns # We know ID is first, then Title (which may be split) @@ -487,7 +435,6 @@ class DatabricksQueryTool(BaseTool): if all(isinstance(c, str) and len(c) == 1 for c in title_chars): title = ''.join(title_chars) row_dict['Title'] = title - print(f"Reconstructed title: {title}") # Assign remaining values to columns remaining_values = row_values[title_end_idx:] @@ -514,49 +461,42 @@ class DatabricksQueryTool(BaseTool): reconstructed_rows.append(row_dict) else: - # If pattern recognition didn't work, try more sophisticated reconstruction - print("Pattern recognition did not find row boundaries, trying alternative methods") - # More intelligent chunking - try to detect where columns like Title might be split - try: - title_idx = columns.index('Title') if 'Title' in columns else -1 + title_idx = columns.index('Title') if 'Title' in columns else -1 - if title_idx >= 0: - print("Attempting title reconstruction method") - # Try to detect if title is split across multiple values - i = 0 - while i < len(all_values): - # Check if this could be an ID (start of a row) - if isinstance(all_values[i], str) and id_pattern.match(all_values[i]): - row_dict = {columns[0]: all_values[i]} + if title_idx >= 0: + print("Attempting title reconstruction method") + # Try to detect if title is split across multiple values + i = 0 + while i < len(all_values): + # Check if this could be an ID (start of a row) + if isinstance(all_values[i], str) and id_pattern.match(all_values[i]): + row_dict = {columns[0]: all_values[i]} + i += 1 + + # Try to reconstruct title if it appears to be split + title_chars = [] + while (i < len(all_values) and + isinstance(all_values[i], str) and + len(all_values[i]) <= 1 and + len(title_chars) < 100): # Cap title length + title_chars.append(all_values[i]) i += 1 - # Try to reconstruct title if it appears to be split - title_chars = [] - while (i < len(all_values) and - isinstance(all_values[i], str) and - len(all_values[i]) <= 1 and - len(title_chars) < 100): # Cap title length - title_chars.append(all_values[i]) + if title_chars: + row_dict[columns[title_idx]] = ''.join(title_chars) + + # Add remaining fields + for j in range(title_idx + 1, len(columns)): + if i < len(all_values): + row_dict[columns[j]] = all_values[i] i += 1 + else: + row_dict[columns[j]] = None - if title_chars: - row_dict[columns[title_idx]] = ''.join(title_chars) - print(f"Reconstructed title by joining characters: {row_dict[columns[title_idx]]}") - - # Add remaining fields - for j in range(title_idx + 1, len(columns)): - if i < len(all_values): - row_dict[columns[j]] = all_values[i] - i += 1 - else: - row_dict[columns[j]] = None - - reconstructed_rows.append(row_dict) - else: - i += 1 - except Exception as e: - print(f"Error during title reconstruction: {e}") + reconstructed_rows.append(row_dict) + else: + i += 1 # If we still don't have rows, use simple chunking as fallback if not reconstructed_rows: @@ -587,14 +527,11 @@ class DatabricksQueryTool(BaseTool): if isinstance(row.get('Title'), str) and len(row.get('Title')) <= 1: # This is likely still a fragmented title - mark as potentially incomplete row['Title'] = f"[INCOMPLETE] {row.get('Title')}" - print(f"Found potentially incomplete title: {row.get('Title')}") # Ensure we respect the row limit if row_limit and len(reconstructed_rows) > row_limit: reconstructed_rows = reconstructed_rows[:row_limit] - print(f"Limited to {row_limit} rows as requested") - print(f"Successfully reconstructed {len(reconstructed_rows)} rows") chunk_results = reconstructed_rows else: # Process normal result structure as before @@ -604,7 +541,6 @@ class DatabricksQueryTool(BaseTool): if hasattr(result.result, 'data_array') and result.result.data_array: # Check if data appears to be malformed within chunks for chunk_idx, chunk in enumerate(result.result.data_array): - print(f"Processing chunk {chunk_idx} with {len(chunk)} values") # Check if chunk might actually contain individual columns of a single row # This is another way data might be malformed - check the first few values @@ -627,7 +563,6 @@ class DatabricksQueryTool(BaseTool): reconstructed_rows.append(row_dict) if reconstructed_rows: - print(f"Reconstructed {len(reconstructed_rows)} rows from chunk") chunk_results.extend(reconstructed_rows) continue # Skip normal processing for this chunk @@ -635,7 +570,6 @@ class DatabricksQueryTool(BaseTool): # This handles the case where instead of a list of rows, we just got all values in a flat list if all(isinstance(val, (str, int, float)) and not isinstance(val, (list, dict)) for val in chunk): if len(chunk) == len(columns) or (len(chunk) > 0 and len(chunk) % len(columns) == 0): - print(f"Chunk appears to contain flat values - treating as rows with {len(columns)} columns each") # Process flat list of values as rows for i in range(0, len(chunk), len(columns)): @@ -643,7 +577,6 @@ class DatabricksQueryTool(BaseTool): if len(row_values) == len(columns): # Only process complete rows row_dict = {col: val for col, val in zip(columns, row_values)} chunk_results.append(row_dict) - print(f"Created row from flat values: {row_dict}") # Skip regular row processing for this chunk continue @@ -652,16 +585,9 @@ class DatabricksQueryTool(BaseTool): for row_idx, row in enumerate(chunk): # Ensure row is actually a collection of values if not isinstance(row, (list, tuple, dict)): - print(f"Row {row_idx} is not a collection: {row} ({type(row)})") # This might be a single value; skip it or handle specially continue - # Debug info for this row - if isinstance(row, (list, tuple)): - print(f"Row {row_idx} has {len(row)} values") - elif isinstance(row, dict): - print(f"Row {row_idx} already has column mapping: {list(row.keys())}") - # Convert each row to a dictionary with column names as keys row_dict = {} @@ -689,11 +615,9 @@ class DatabricksQueryTool(BaseTool): elif hasattr(result.result, 'data') and result.result.data: # Alternative data structure - print(f"Processing data with {len(result.result.data)} rows") for row_idx, row in enumerate(result.result.data): # Debug info - print(f"Row {row_idx} has {len(row)} values") # Safely create dictionary matching column names to values row_dict = {} @@ -714,7 +638,6 @@ class DatabricksQueryTool(BaseTool): chunk_results.append(row_dict) # After processing all rows, ensure all rows have all columns - print(f"All columns detected: {all_columns}") normalized_results = [] for row in chunk_results: # Create a new row with all columns, defaulting to None for missing ones @@ -724,16 +647,10 @@ class DatabricksQueryTool(BaseTool): # Replace the original results with normalized ones chunk_results = normalized_results - # Print the processed results for debugging - print(f"Processed {len(chunk_results)} rows") - for i, row in enumerate(chunk_results[:3]): # Show only first 3 rows to avoid log spam - print(f"Row {i}: {row}") - except Exception as results_error: # Enhanced error message with more context import traceback error_details = traceback.format_exc() - print(f"Error processing results: {error_details}") return f"Error processing query results: {str(results_error)}\n\nDetails:\n{error_details}" # If we have no results but the query succeeded (e.g., for DDL statements)