Created
December 2, 2025 10:19
-
-
Save parj/a42f1b8ea07658b8c937fefb4177d37d to your computer and use it in GitHub Desktop.
explain-excel.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Excel Large Data Handler Tool for AGNO with Azure OpenAI | |
| Handles Excel files up to 20MB by chunking and intelligent data sampling | |
| """ | |
| from typing import Any, Dict, List, Optional, Union | |
| import pandas as pd | |
| import openpyxl | |
| from pathlib import Path | |
| import json | |
| class ExcelLargeDataTool: | |
| """ | |
| Tool for reading and documenting large Excel spreadsheets with context window management. | |
| Implements chunking, summarization, and query-based retrieval strategies. | |
| """ | |
| def __init__(self, max_rows_per_chunk: int = 100, max_sample_rows: int = 50): | |
| """ | |
| Initialize the Excel tool. | |
| Args: | |
| max_rows_per_chunk: Maximum rows to process in each chunk | |
| max_sample_rows: Number of sample rows to show for initial analysis | |
| """ | |
| self.max_rows_per_chunk = max_rows_per_chunk | |
| self.max_sample_rows = max_sample_rows | |
| def get_excel_schema(self, file_path: str, sheet_name: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Get the schema and metadata of the Excel file without loading all data. | |
| This is the first step - always understand structure before loading data. | |
| Args: | |
| file_path: Path to the Excel file | |
| sheet_name: Specific sheet name (if None, analyzes all sheets) | |
| Returns: | |
| Dictionary containing schema information | |
| """ | |
| try: | |
| # Get basic file info | |
| file_size = Path(file_path).stat().st_size / (1024 * 1024) # MB | |
| # Load workbook metadata only | |
| wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True) | |
| sheets_info = {} | |
| for sheet in wb.sheetnames: | |
| if sheet_name and sheet != sheet_name: | |
| continue | |
| ws = wb[sheet] | |
| # Get dimensions without loading all data | |
| max_row = ws.max_row | |
| max_col = ws.max_column | |
| # Read only the header row | |
| header_row = [] | |
| for cell in ws[1]: | |
| header_row.append(cell.value) | |
| # Sample a few rows to determine data types | |
| sample_data = [] | |
| sample_rows = min(10, max_row) | |
| for row_idx in range(2, sample_rows + 1): | |
| row_data = [cell.value for cell in ws[row_idx]] | |
| sample_data.append(row_data) | |
| # Infer column types from sample | |
| df_sample = pd.DataFrame(sample_data, columns=header_row) | |
| column_types = {col: str(dtype) for col, dtype in df_sample.dtypes.items()} | |
| sheets_info[sheet] = { | |
| 'total_rows': max_row - 1, # Exclude header | |
| 'total_columns': max_col, | |
| 'columns': header_row, | |
| 'column_types': column_types, | |
| 'estimated_chunks': (max_row - 1) // self.max_rows_per_chunk + 1 | |
| } | |
| wb.close() | |
| return { | |
| 'file_path': file_path, | |
| 'file_size_mb': round(file_size, 2), | |
| 'sheets': sheets_info, | |
| 'chunking_strategy': f'Data will be processed in chunks of {self.max_rows_per_chunk} rows' | |
| } | |
| except Exception as e: | |
| return {'error': f'Failed to read Excel schema: {str(e)}'} | |
| def get_data_sample(self, file_path: str, sheet_name: str, | |
| sample_size: Optional[int] = None) -> Dict[str, Any]: | |
| """ | |
| Get a representative sample of the data for initial analysis. | |
| Args: | |
| file_path: Path to the Excel file | |
| sheet_name: Sheet name to sample from | |
| sample_size: Number of rows to sample (default: max_sample_rows) | |
| Returns: | |
| Dictionary containing sample data and statistics | |
| """ | |
| try: | |
| sample_size = sample_size or self.max_sample_rows | |
| # Read only the sample | |
| df = pd.read_excel(file_path, sheet_name=sheet_name, nrows=sample_size) | |
| # Get basic statistics | |
| stats = { | |
| 'numeric_columns': {}, | |
| 'categorical_columns': {} | |
| } | |
| for col in df.columns: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| stats['numeric_columns'][col] = { | |
| 'min': float(df[col].min()) if not pd.isna(df[col].min()) else None, | |
| 'max': float(df[col].max()) if not pd.isna(df[col].max()) else None, | |
| 'mean': float(df[col].mean()) if not pd.isna(df[col].mean()) else None, | |
| 'null_count': int(df[col].isna().sum()) | |
| } | |
| else: | |
| unique_vals = df[col].dropna().unique() | |
| stats['categorical_columns'][col] = { | |
| 'unique_count': len(unique_vals), | |
| 'top_values': list(unique_vals[:10]), | |
| 'null_count': int(df[col].isna().sum()) | |
| } | |
| return { | |
| 'sheet_name': sheet_name, | |
| 'sample_size': len(df), | |
| 'sample_data': df.head(20).to_dict('records'), # First 20 rows as preview | |
| 'statistics': stats, | |
| 'message': f'Showing sample of {len(df)} rows. Use get_data_chunk() to retrieve specific row ranges.' | |
| } | |
| except Exception as e: | |
| return {'error': f'Failed to get data sample: {str(e)}'} | |
| def get_data_chunk(self, file_path: str, sheet_name: str, | |
| start_row: int, end_row: Optional[int] = None, | |
| columns: Optional[List[str]] = None) -> Dict[str, Any]: | |
| """ | |
| Retrieve a specific chunk of data by row range. | |
| Args: | |
| file_path: Path to the Excel file | |
| sheet_name: Sheet name to read from | |
| start_row: Starting row number (0-indexed, excludes header) | |
| end_row: Ending row number (if None, reads max_rows_per_chunk) | |
| columns: Specific columns to retrieve (if None, retrieves all) | |
| Returns: | |
| Dictionary containing the requested data chunk | |
| """ | |
| try: | |
| if end_row is None: | |
| end_row = start_row + self.max_rows_per_chunk | |
| # Calculate actual row numbers for pandas (header is row 0) | |
| skiprows = start_row + 1 if start_row > 0 else None | |
| nrows = end_row - start_row | |
| # Read the specific chunk | |
| df = pd.read_excel( | |
| file_path, | |
| sheet_name=sheet_name, | |
| skiprows=range(1, skiprows) if skiprows else None, | |
| nrows=nrows, | |
| usecols=columns | |
| ) | |
| return { | |
| 'sheet_name': sheet_name, | |
| 'start_row': start_row, | |
| 'end_row': min(end_row, start_row + len(df)), | |
| 'rows_returned': len(df), | |
| 'columns': list(df.columns), | |
| 'data': df.to_dict('records') | |
| } | |
| except Exception as e: | |
| return {'error': f'Failed to get data chunk: {str(e)}'} | |
| def query_data(self, file_path: str, sheet_name: str, | |
| filters: Optional[Dict[str, Any]] = None, | |
| columns: Optional[List[str]] = None, | |
| max_results: int = 200) -> Dict[str, Any]: | |
| """ | |
| Query data with filters to retrieve only relevant rows. | |
| Reads data in chunks and filters on-the-fly to manage memory. | |
| Args: | |
| file_path: Path to the Excel file | |
| sheet_name: Sheet name to query | |
| filters: Dictionary of column:value pairs for filtering | |
| Example: {'Status': 'Active', 'Amount': lambda x: x > 1000} | |
| columns: Specific columns to retrieve | |
| max_results: Maximum number of results to return | |
| Returns: | |
| Dictionary containing filtered data | |
| """ | |
| try: | |
| results = [] | |
| chunk_num = 0 | |
| # Read and filter in chunks | |
| for chunk in pd.read_excel(file_path, sheet_name=sheet_name, | |
| chunksize=self.max_rows_per_chunk, | |
| usecols=columns): | |
| # Apply filters | |
| filtered_chunk = chunk | |
| if filters: | |
| for col, condition in filters.items(): | |
| if col in filtered_chunk.columns: | |
| if callable(condition): | |
| filtered_chunk = filtered_chunk[filtered_chunk[col].apply(condition)] | |
| else: | |
| filtered_chunk = filtered_chunk[filtered_chunk[col] == condition] | |
| results.extend(filtered_chunk.to_dict('records')) | |
| # Stop if we have enough results | |
| if len(results) >= max_results: | |
| results = results[:max_results] | |
| break | |
| chunk_num += 1 | |
| return { | |
| 'sheet_name': sheet_name, | |
| 'filters_applied': str(filters) if filters else 'None', | |
| 'chunks_processed': chunk_num + 1, | |
| 'results_found': len(results), | |
| 'data': results | |
| } | |
| except Exception as e: | |
| return {'error': f'Failed to query data: {str(e)}'} | |
| def get_pivot_summary(self, file_path: str, sheet_name: str, | |
| index_col: str, value_col: str, | |
| agg_func: str = 'sum') -> Dict[str, Any]: | |
| """ | |
| Create a pivot table summary by processing data in chunks. | |
| Args: | |
| file_path: Path to the Excel file | |
| sheet_name: Sheet name | |
| index_col: Column to use as index/grouping | |
| value_col: Column to aggregate | |
| agg_func: Aggregation function ('sum', 'mean', 'count', 'min', 'max') | |
| Returns: | |
| Dictionary containing pivot summary | |
| """ | |
| try: | |
| aggregated_data = {} | |
| # Process in chunks and aggregate | |
| for chunk in pd.read_excel(file_path, sheet_name=sheet_name, | |
| chunksize=self.max_rows_per_chunk, | |
| usecols=[index_col, value_col]): | |
| grouped = chunk.groupby(index_col)[value_col].agg(agg_func) | |
| for idx, val in grouped.items(): | |
| if idx not in aggregated_data: | |
| aggregated_data[idx] = [] | |
| aggregated_data[idx].append(val) | |
| # Final aggregation | |
| final_result = {} | |
| for idx, values in aggregated_data.items(): | |
| if agg_func == 'sum': | |
| final_result[idx] = sum(values) | |
| elif agg_func == 'mean': | |
| final_result[idx] = sum(values) / len(values) | |
| elif agg_func == 'count': | |
| final_result[idx] = len(values) | |
| elif agg_func == 'min': | |
| final_result[idx] = min(values) | |
| elif agg_func == 'max': | |
| final_result[idx] = max(values) | |
| return { | |
| 'sheet_name': sheet_name, | |
| 'pivot_by': index_col, | |
| 'aggregated_column': value_col, | |
| 'aggregation_function': agg_func, | |
| 'summary': final_result, | |
| 'total_groups': len(final_result) | |
| } | |
| except Exception as e: | |
| return {'error': f'Failed to create pivot summary: {str(e)}'} | |
| def get_column_summary(self, file_path: str, sheet_name: str, | |
| column: str) -> Dict[str, Any]: | |
| """ | |
| Get a comprehensive summary of a specific column across all data. | |
| Processes in chunks to handle large files. | |
| Args: | |
| file_path: Path to the Excel file | |
| sheet_name: Sheet name | |
| column: Column name to summarize | |
| Returns: | |
| Dictionary containing column summary | |
| """ | |
| try: | |
| values = [] | |
| total_rows = 0 | |
| null_count = 0 | |
| # Collect values in chunks | |
| for chunk in pd.read_excel(file_path, sheet_name=sheet_name, | |
| chunksize=self.max_rows_per_chunk, | |
| usecols=[column]): | |
| total_rows += len(chunk) | |
| null_count += chunk[column].isna().sum() | |
| values.extend(chunk[column].dropna().tolist()) | |
| # Generate summary based on data type | |
| summary = { | |
| 'column': column, | |
| 'total_rows': total_rows, | |
| 'null_count': null_count, | |
| 'non_null_count': len(values) | |
| } | |
| if values and pd.api.types.is_numeric_dtype(pd.Series(values)): | |
| summary.update({ | |
| 'type': 'numeric', | |
| 'min': min(values), | |
| 'max': max(values), | |
| 'mean': sum(values) / len(values), | |
| 'median': sorted(values)[len(values) // 2] | |
| }) | |
| else: | |
| unique_values = list(set(values)) | |
| summary.update({ | |
| 'type': 'categorical', | |
| 'unique_values': len(unique_values), | |
| 'top_10_values': unique_values[:10] | |
| }) | |
| return summary | |
| except Exception as e: | |
| return {'error': f'Failed to summarize column: {str(e)}'} | |
| # Example usage for AGNO integration | |
| def create_agno_excel_tools(): | |
| """ | |
| Create AGNO-compatible tool definitions for the Excel handler. | |
| """ | |
| tool = ExcelLargeDataTool() | |
| tools = [ | |
| { | |
| 'name': 'get_excel_schema', | |
| 'description': 'Get the structure and metadata of an Excel file (sheets, columns, row counts) without loading all data. Always call this first.', | |
| 'function': tool.get_excel_schema | |
| }, | |
| { | |
| 'name': 'get_data_sample', | |
| 'description': 'Get a representative sample of rows from a sheet for initial analysis.', | |
| 'function': tool.get_data_sample | |
| }, | |
| { | |
| 'name': 'get_data_chunk', | |
| 'description': 'Retrieve a specific range of rows from a sheet.', | |
| 'function': tool.get_data_chunk | |
| }, | |
| { | |
| 'name': 'query_data', | |
| 'description': 'Query data with filters to retrieve only relevant rows across the entire dataset.', | |
| 'function': tool.query_data | |
| }, | |
| { | |
| 'name': 'get_pivot_summary', | |
| 'description': 'Create a pivot table summary by processing large data in chunks.', | |
| 'function': tool.get_pivot_summary | |
| }, | |
| { | |
| 'name': 'get_column_summary', | |
| 'description': 'Get comprehensive statistics for a specific column across all data.', | |
| 'function': tool.get_column_summary | |
| } | |
| ] | |
| return tools | |
| if __name__ == '__main__': | |
| # Example usage | |
| tool = ExcelLargeDataTool(max_rows_per_chunk=100, max_sample_rows=50) | |
| # Step 1: Always get schema first | |
| schema = tool.get_excel_schema('large_file.xlsx') | |
| print("Schema:", json.dumps(schema, indent=2)) | |
| # Step 2: Get a sample for initial understanding | |
| sample = tool.get_data_sample('large_file.xlsx', 'Sheet1') | |
| print("\nSample:", json.dumps(sample, indent=2)) | |
| # Step 3: Query specific data as needed | |
| filtered = tool.query_data( | |
| 'large_file.xlsx', | |
| 'Sheet1', | |
| filters={'Status': 'Active'}, | |
| max_results=100 | |
| ) | |
| print("\nFiltered:", json.dumps(filtered, indent=2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment