Skip to content

Instantly share code, notes, and snippets.

@parj
Created December 2, 2025 10:19
Show Gist options
  • Select an option

  • Save parj/a42f1b8ea07658b8c937fefb4177d37d to your computer and use it in GitHub Desktop.

Select an option

Save parj/a42f1b8ea07658b8c937fefb4177d37d to your computer and use it in GitHub Desktop.
explain-excel.py
"""
Excel Large Data Handler Tool for AGNO with Azure OpenAI
Handles Excel files up to 20MB by chunking and intelligent data sampling
"""
from typing import Any, Dict, List, Optional, Union
import pandas as pd
import openpyxl
from pathlib import Path
import json
class ExcelLargeDataTool:
"""
Tool for reading and documenting large Excel spreadsheets with context window management.
Implements chunking, summarization, and query-based retrieval strategies.
"""
def __init__(self, max_rows_per_chunk: int = 100, max_sample_rows: int = 50):
"""
Initialize the Excel tool.
Args:
max_rows_per_chunk: Maximum rows to process in each chunk
max_sample_rows: Number of sample rows to show for initial analysis
"""
self.max_rows_per_chunk = max_rows_per_chunk
self.max_sample_rows = max_sample_rows
def get_excel_schema(self, file_path: str, sheet_name: Optional[str] = None) -> Dict[str, Any]:
"""
Get the schema and metadata of the Excel file without loading all data.
This is the first step - always understand structure before loading data.
Args:
file_path: Path to the Excel file
sheet_name: Specific sheet name (if None, analyzes all sheets)
Returns:
Dictionary containing schema information
"""
try:
# Get basic file info
file_size = Path(file_path).stat().st_size / (1024 * 1024) # MB
# Load workbook metadata only
wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True)
sheets_info = {}
for sheet in wb.sheetnames:
if sheet_name and sheet != sheet_name:
continue
ws = wb[sheet]
# Get dimensions without loading all data
max_row = ws.max_row
max_col = ws.max_column
# Read only the header row
header_row = []
for cell in ws[1]:
header_row.append(cell.value)
# Sample a few rows to determine data types
sample_data = []
sample_rows = min(10, max_row)
for row_idx in range(2, sample_rows + 1):
row_data = [cell.value for cell in ws[row_idx]]
sample_data.append(row_data)
# Infer column types from sample
df_sample = pd.DataFrame(sample_data, columns=header_row)
column_types = {col: str(dtype) for col, dtype in df_sample.dtypes.items()}
sheets_info[sheet] = {
'total_rows': max_row - 1, # Exclude header
'total_columns': max_col,
'columns': header_row,
'column_types': column_types,
'estimated_chunks': (max_row - 1) // self.max_rows_per_chunk + 1
}
wb.close()
return {
'file_path': file_path,
'file_size_mb': round(file_size, 2),
'sheets': sheets_info,
'chunking_strategy': f'Data will be processed in chunks of {self.max_rows_per_chunk} rows'
}
except Exception as e:
return {'error': f'Failed to read Excel schema: {str(e)}'}
def get_data_sample(self, file_path: str, sheet_name: str,
sample_size: Optional[int] = None) -> Dict[str, Any]:
"""
Get a representative sample of the data for initial analysis.
Args:
file_path: Path to the Excel file
sheet_name: Sheet name to sample from
sample_size: Number of rows to sample (default: max_sample_rows)
Returns:
Dictionary containing sample data and statistics
"""
try:
sample_size = sample_size or self.max_sample_rows
# Read only the sample
df = pd.read_excel(file_path, sheet_name=sheet_name, nrows=sample_size)
# Get basic statistics
stats = {
'numeric_columns': {},
'categorical_columns': {}
}
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
stats['numeric_columns'][col] = {
'min': float(df[col].min()) if not pd.isna(df[col].min()) else None,
'max': float(df[col].max()) if not pd.isna(df[col].max()) else None,
'mean': float(df[col].mean()) if not pd.isna(df[col].mean()) else None,
'null_count': int(df[col].isna().sum())
}
else:
unique_vals = df[col].dropna().unique()
stats['categorical_columns'][col] = {
'unique_count': len(unique_vals),
'top_values': list(unique_vals[:10]),
'null_count': int(df[col].isna().sum())
}
return {
'sheet_name': sheet_name,
'sample_size': len(df),
'sample_data': df.head(20).to_dict('records'), # First 20 rows as preview
'statistics': stats,
'message': f'Showing sample of {len(df)} rows. Use get_data_chunk() to retrieve specific row ranges.'
}
except Exception as e:
return {'error': f'Failed to get data sample: {str(e)}'}
def get_data_chunk(self, file_path: str, sheet_name: str,
start_row: int, end_row: Optional[int] = None,
columns: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Retrieve a specific chunk of data by row range.
Args:
file_path: Path to the Excel file
sheet_name: Sheet name to read from
start_row: Starting row number (0-indexed, excludes header)
end_row: Ending row number (if None, reads max_rows_per_chunk)
columns: Specific columns to retrieve (if None, retrieves all)
Returns:
Dictionary containing the requested data chunk
"""
try:
if end_row is None:
end_row = start_row + self.max_rows_per_chunk
# Calculate actual row numbers for pandas (header is row 0)
skiprows = start_row + 1 if start_row > 0 else None
nrows = end_row - start_row
# Read the specific chunk
df = pd.read_excel(
file_path,
sheet_name=sheet_name,
skiprows=range(1, skiprows) if skiprows else None,
nrows=nrows,
usecols=columns
)
return {
'sheet_name': sheet_name,
'start_row': start_row,
'end_row': min(end_row, start_row + len(df)),
'rows_returned': len(df),
'columns': list(df.columns),
'data': df.to_dict('records')
}
except Exception as e:
return {'error': f'Failed to get data chunk: {str(e)}'}
def query_data(self, file_path: str, sheet_name: str,
filters: Optional[Dict[str, Any]] = None,
columns: Optional[List[str]] = None,
max_results: int = 200) -> Dict[str, Any]:
"""
Query data with filters to retrieve only relevant rows.
Reads data in chunks and filters on-the-fly to manage memory.
Args:
file_path: Path to the Excel file
sheet_name: Sheet name to query
filters: Dictionary of column:value pairs for filtering
Example: {'Status': 'Active', 'Amount': lambda x: x > 1000}
columns: Specific columns to retrieve
max_results: Maximum number of results to return
Returns:
Dictionary containing filtered data
"""
try:
results = []
chunk_num = 0
# Read and filter in chunks
for chunk in pd.read_excel(file_path, sheet_name=sheet_name,
chunksize=self.max_rows_per_chunk,
usecols=columns):
# Apply filters
filtered_chunk = chunk
if filters:
for col, condition in filters.items():
if col in filtered_chunk.columns:
if callable(condition):
filtered_chunk = filtered_chunk[filtered_chunk[col].apply(condition)]
else:
filtered_chunk = filtered_chunk[filtered_chunk[col] == condition]
results.extend(filtered_chunk.to_dict('records'))
# Stop if we have enough results
if len(results) >= max_results:
results = results[:max_results]
break
chunk_num += 1
return {
'sheet_name': sheet_name,
'filters_applied': str(filters) if filters else 'None',
'chunks_processed': chunk_num + 1,
'results_found': len(results),
'data': results
}
except Exception as e:
return {'error': f'Failed to query data: {str(e)}'}
def get_pivot_summary(self, file_path: str, sheet_name: str,
index_col: str, value_col: str,
agg_func: str = 'sum') -> Dict[str, Any]:
"""
Create a pivot table summary by processing data in chunks.
Args:
file_path: Path to the Excel file
sheet_name: Sheet name
index_col: Column to use as index/grouping
value_col: Column to aggregate
agg_func: Aggregation function ('sum', 'mean', 'count', 'min', 'max')
Returns:
Dictionary containing pivot summary
"""
try:
aggregated_data = {}
# Process in chunks and aggregate
for chunk in pd.read_excel(file_path, sheet_name=sheet_name,
chunksize=self.max_rows_per_chunk,
usecols=[index_col, value_col]):
grouped = chunk.groupby(index_col)[value_col].agg(agg_func)
for idx, val in grouped.items():
if idx not in aggregated_data:
aggregated_data[idx] = []
aggregated_data[idx].append(val)
# Final aggregation
final_result = {}
for idx, values in aggregated_data.items():
if agg_func == 'sum':
final_result[idx] = sum(values)
elif agg_func == 'mean':
final_result[idx] = sum(values) / len(values)
elif agg_func == 'count':
final_result[idx] = len(values)
elif agg_func == 'min':
final_result[idx] = min(values)
elif agg_func == 'max':
final_result[idx] = max(values)
return {
'sheet_name': sheet_name,
'pivot_by': index_col,
'aggregated_column': value_col,
'aggregation_function': agg_func,
'summary': final_result,
'total_groups': len(final_result)
}
except Exception as e:
return {'error': f'Failed to create pivot summary: {str(e)}'}
def get_column_summary(self, file_path: str, sheet_name: str,
column: str) -> Dict[str, Any]:
"""
Get a comprehensive summary of a specific column across all data.
Processes in chunks to handle large files.
Args:
file_path: Path to the Excel file
sheet_name: Sheet name
column: Column name to summarize
Returns:
Dictionary containing column summary
"""
try:
values = []
total_rows = 0
null_count = 0
# Collect values in chunks
for chunk in pd.read_excel(file_path, sheet_name=sheet_name,
chunksize=self.max_rows_per_chunk,
usecols=[column]):
total_rows += len(chunk)
null_count += chunk[column].isna().sum()
values.extend(chunk[column].dropna().tolist())
# Generate summary based on data type
summary = {
'column': column,
'total_rows': total_rows,
'null_count': null_count,
'non_null_count': len(values)
}
if values and pd.api.types.is_numeric_dtype(pd.Series(values)):
summary.update({
'type': 'numeric',
'min': min(values),
'max': max(values),
'mean': sum(values) / len(values),
'median': sorted(values)[len(values) // 2]
})
else:
unique_values = list(set(values))
summary.update({
'type': 'categorical',
'unique_values': len(unique_values),
'top_10_values': unique_values[:10]
})
return summary
except Exception as e:
return {'error': f'Failed to summarize column: {str(e)}'}
# Example usage for AGNO integration
def create_agno_excel_tools():
"""
Create AGNO-compatible tool definitions for the Excel handler.
"""
tool = ExcelLargeDataTool()
tools = [
{
'name': 'get_excel_schema',
'description': 'Get the structure and metadata of an Excel file (sheets, columns, row counts) without loading all data. Always call this first.',
'function': tool.get_excel_schema
},
{
'name': 'get_data_sample',
'description': 'Get a representative sample of rows from a sheet for initial analysis.',
'function': tool.get_data_sample
},
{
'name': 'get_data_chunk',
'description': 'Retrieve a specific range of rows from a sheet.',
'function': tool.get_data_chunk
},
{
'name': 'query_data',
'description': 'Query data with filters to retrieve only relevant rows across the entire dataset.',
'function': tool.query_data
},
{
'name': 'get_pivot_summary',
'description': 'Create a pivot table summary by processing large data in chunks.',
'function': tool.get_pivot_summary
},
{
'name': 'get_column_summary',
'description': 'Get comprehensive statistics for a specific column across all data.',
'function': tool.get_column_summary
}
]
return tools
if __name__ == '__main__':
# Example usage
tool = ExcelLargeDataTool(max_rows_per_chunk=100, max_sample_rows=50)
# Step 1: Always get schema first
schema = tool.get_excel_schema('large_file.xlsx')
print("Schema:", json.dumps(schema, indent=2))
# Step 2: Get a sample for initial understanding
sample = tool.get_data_sample('large_file.xlsx', 'Sheet1')
print("\nSample:", json.dumps(sample, indent=2))
# Step 3: Query specific data as needed
filtered = tool.query_data(
'large_file.xlsx',
'Sheet1',
filters={'Status': 'Active'},
max_results=100
)
print("\nFiltered:", json.dumps(filtered, indent=2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment