Skip to content

Instantly share code, notes, and snippets.

@SamarthGarge
Created March 23, 2025 10:21
Show Gist options
  • Select an option

  • Save SamarthGarge/b511d33b0590448be95110ea40cf0676 to your computer and use it in GitHub Desktop.

Select an option

Save SamarthGarge/b511d33b0590448be95110ea40cf0676 to your computer and use it in GitHub Desktop.
Clean_data
import pandas as pd
import numpy as np
import re
import os
from pathlib import Path
def handle_null_values(df, method:'nan', columns=None):
"""
Check for null values and replace them with specified value.
Parameters:
-----------
df : pandas.DataFrame
The dataframe to check for null values
method : str
Method to handle null values: 'nan' or 'zero'
columns : list, optional
List of column names to check. If None, check all columns
Returns:
--------
pandas.DataFrame
The dataframe with null values handled
dict
A report of null values before and after handling
"""
if columns is None:
columns = df.columns
# Create a copy to avoid modifying the original
df_copy = df.copy()
# Create report before handling
null_report = {
'before': {col: df_copy[col].isna().sum() for col in columns},
'replaced_values': {}
}
# Handle null values
for col in columns:
null_count = df_copy[col].isna().sum()
if null_count > 0:
if method == 'zero':
df_copy[col] = df_copy[col].fillna(0)
null_report['replaced_values'][col] = {'method': 'zero', 'count': null_count}
else: # Default to NaN
df_copy[col] = df_copy[col].fillna(np.nan)
null_report['replaced_values'][col] = {'method': 'nan', 'count': null_count}
# Create report after handling
null_report['after'] = {col: df_copy[col].isna().sum() for col in columns}
return df_copy, null_report
def remove_duplicates(df, subset=None, keep='first', inplace=False):
"""
Check for and remove duplicate rows.
Parameters:
-----------
df : pandas.DataFrame
The dataframe to check for duplicates
subset : list, optional
List of column names to consider for duplicates. If None, use all columns
keep : {'first', 'last', False}, default 'first'
Determines which duplicates to keep
inplace : bool, default False
Whether to drop duplicates in place or return a copy
Returns:
--------
pandas.DataFrame
The dataframe with duplicates removed
dict
A report about duplicates
"""
# Create a copy if not inplace
df_copy = df if inplace else df.copy()
# Find duplicates
if subset is None:
duplicated_rows = df_copy.duplicated(keep=False)
else:
duplicated_rows = df_copy.duplicated(subset=subset, keep=False)
# Create detailed report
dup_report = {
'total_rows': len(df_copy),
'duplicate_rows': duplicated_rows.sum(),
'unique_rows': len(df_copy) - duplicated_rows.sum(),
'duplicate_percentage': round((duplicated_rows.sum() / len(df_copy)) * 100, 2)
}
# Extract sample of duplicates for detailed review
if duplicated_rows.sum() > 0:
sample_size = min(5, duplicated_rows.sum())
dup_examples = df_copy[duplicated_rows].head(sample_size)
if subset is None:
grouped = df_copy[duplicated_rows].groupby(list(df_copy.columns))
else:
grouped = df_copy[duplicated_rows].groupby(subset)
# Count of duplicates by group
dup_counts = grouped.size().reset_index(name='count')
dup_counts = dup_counts.sort_values('count', ascending=False)
dup_report['duplicate_groups'] = len(dup_counts)
dup_report['top_duplicates'] = dup_counts.head(5).to_dict('records')
dup_report['dup_examples'] = dup_examples.to_dict('records')
# Remove duplicates
if duplicated_rows.sum() > 0:
before_count = len(df_copy)
df_copy.drop_duplicates(subset=subset, keep=keep, inplace=True)
after_count = len(df_copy)
dup_report['rows_removed'] = before_count - after_count
return df_copy, dup_report
def fix_numeric_datatypes(df, columns=None):
"""
Fix data types for numeric columns, handling various formats and cleaning.
Parameters:
-----------
df : pandas.DataFrame
The dataframe to fix numeric data types
columns : list, optional
List of column names to fix. If None, attempt to fix all columns
Returns:
--------
pandas.DataFrame
The dataframe with fixed numeric data types
dict
A report about the conversions
"""
# Create a copy to avoid modifying the original
df_copy = df.copy()
if columns is None:
# Try to identify numeric columns automatically
# This is a simple heuristic - find columns where >50% of non-null values can be converted to numeric
columns = []
for col in df_copy.columns:
sample = df_copy[col].dropna().head(100)
if len(sample) == 0:
continue
# Try to convert sample to numeric and see how many succeed
success_count = pd.to_numeric(sample, errors='coerce').notna().sum()
if success_count / len(sample) > 0.5:
columns.append(col)
conversion_report = {
'successful_conversions': {},
'failed_conversions': {},
'original_dtypes': {col: str(df_copy[col].dtype) for col in columns},
'final_dtypes': {}
}
for col in columns:
# Save original data for reporting
original_data = df_copy[col].copy()
non_null_count = original_data.notna().sum()
# Step 1: Handle text-based numbers like "Twenty-Eight"
# This is a basic implementation - would need more comprehensive mapping for production
number_word_map = {
'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
'ten': '10', 'twenty': '20', 'thirty': '30', 'forty': '40', 'fifty': '50',
'sixty': '60', 'seventy': '70', 'eighty': '80', 'ninety': '90',
'hundred': '100', 'thousand': '1000', 'million': '1000000'
}
if df_copy[col].dtype == 'object':
# Convert to string first to handle various types
df_copy[col] = df_copy[col].astype(str)
# Replace number words with digits
for word, number in number_word_map.items():
pattern = r'\b' + word + r'\b'
df_copy[col] = df_copy[col].str.replace(pattern, number, case=False, regex=True)
# Handle hyphenated compound numbers like "twenty-eight"
for word2, number2 in number_word_map.items():
compound_pattern = r'\b' + word + r'-' + word2 + r'\b'
# For simplicity in this example - in production would need more complex logic
if word in ['twenty', 'thirty', 'forty', 'fifty', 'sixty', 'seventy', 'eighty', 'ninety']:
replacement = str(int(number) + int(number2))
df_copy[col] = df_copy[col].str.replace(compound_pattern, replacement, case=False, regex=True)
# Step 2: Clean out non-numeric characters but keep decimal points
# First replace common patterns like '28y/o' -> '28'
df_copy[col] = df_copy[col].str.replace(r'(\d+)y\/o', r'\1', regex=True)
df_copy[col] = df_copy[col].str.replace(r'(\d+)\.', r'\1', regex=True) # Remove trailing periods
# Extract numeric part - prioritize digits with optional decimal point
def extract_numeric(value):
if pd.isna(value):
return value
# Try to match floating point number pattern
match = re.search(r'-?\d+(\.\d+)?', str(value))
if match:
return match.group(0)
return value
df_copy[col] = df_copy[col].apply(extract_numeric)
# Step 3: Convert to numeric
df_copy[col] = pd.to_numeric(df_copy[col], errors='coerce')
# Report results
successful_conversions = df_copy[col].notna().sum()
failed_conversions = non_null_count - successful_conversions
conversion_report['successful_conversions'][col] = successful_conversions
conversion_report['failed_conversions'][col] = failed_conversions
conversion_report['final_dtypes'][col] = str(df_copy[col].dtype)
return df_copy, conversion_report
def clean_dataframe(df, null_method='nan', fix_numeric=True, remove_dups=True, dup_subset=None):
"""
Apply all cleaning functions in sequence
Parameters:
-----------
df : pandas.DataFrame
The dataframe to clean
null_method : str
Method to handle null values: 'nan' or 'zero'
fix_numeric : bool
Whether to fix numeric data types
remove_dups : bool
Whether to remove duplicates
dup_subset : list, optional
Columns to consider for duplicates
Returns:
--------
pandas.DataFrame
The cleaned dataframe
dict
A report of all cleaning operations
"""
report = {'original_shape': df.shape}
# Handle null values
df, null_report = handle_null_values(df, method=null_method)
report['null_handling'] = null_report
# Fix numeric data types
if fix_numeric:
df, dtype_report = fix_numeric_datatypes(df)
report['datatype_handling'] = dtype_report
# Remove duplicates
if remove_dups:
df, dup_report = remove_duplicates(df, subset=dup_subset)
report['duplicate_handling'] = dup_report
report['final_shape'] = df.shape
report['shape_change'] = {
'rows': report['final_shape'][0] - report['original_shape'][0],
'columns': report['final_shape'][1] - report['original_shape'][1]
}
return df, report
from flask import Flask, request, jsonify, send_file
import pandas as pd
import os
import json
import tempfile
from pathlib import Path
import platform
import traceback
from werkzeug.utils import secure_filename
# Import our data cleaning functions
from data_cleaning_functions import (
handle_null_values,
remove_duplicates,
fix_numeric_datatypes,
clean_dataframe
)
app = Flask(__name__)
# Configure upload settings
UPLOAD_FOLDER = tempfile.mkdtemp()
ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB max upload size
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def resolve_path(path):
"""Resolve special path variables like %USERPROFILE% or %HOME%"""
if platform.system() == 'Windows':
if path.startswith('%USERPROFILE%'):
return path.replace('%USERPROFILE%', os.environ.get('USERPROFILE', ''))
else: # Assume Linux/Mac
if path.startswith('%HOME%'):
return path.replace('%HOME%', os.environ.get('HOME', ''))
return path
def read_dataframe(file_path):
"""Read a dataframe from a file path, detecting the file type"""
ext = file_path.split('.')[-1].lower()
if ext == 'csv':
return pd.read_csv(file_path)
elif ext in ['xlsx', 'xls']:
return pd.read_excel(file_path)
else:
raise ValueError(f"Unsupported file format: {ext}")
@app.route('/api/clean', methods=['POST'])
def clean_data():
"""
API endpoint for cleaning data.
This endpoint accepts either:
1. A file upload (multipart/form-data) with a 'file' field
2. A JSON request with a 'file_path' field
Optional parameters:
- null_method: 'nan' or 'zero' (default: 'nan')
- fix_numeric: 'true' or 'false' (default: 'true')
- remove_duplicates: 'true' or 'false' (default: 'true')
- output_format: 'csv' or 'json' (default: 'csv')
- download: 'true' or 'false' (default: 'false')
"""
try:
# Get cleaning parameters
null_method = request.form.get('null_method', 'nan')
fix_numeric = request.form.get('fix_numeric', 'true').lower() == 'true'
remove_dups = request.form.get('remove_duplicates', 'true').lower() == 'true'
output_format = request.form.get('output_format', 'csv').lower()
download = request.form.get('download', 'false').lower() == 'true'
# Get dataframe either from file upload or path
df = None
file_source = None
if 'file' in request.files:
# Handle file upload
file = request.files['file']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
df = read_dataframe(file_path)
file_source = f"Uploaded file: {filename}"
else:
return jsonify({
'status': 'error',
'message': f"Invalid file. Allowed formats: {', '.join(ALLOWED_EXTENSIONS)}"
}), 400
elif request.json and 'file_path' in request.json:
# Handle file path
raw_path = request.json['file_path']
file_path = resolve_path(raw_path)
if os.path.exists(file_path):
df = read_dataframe(file_path)
file_source = f"File path: {raw_path}"
else:
return jsonify({
'status': 'error',
'message': f"File not found: {raw_path}"
}), 404
else:
return jsonify({
'status': 'error',
'message': "No file provided. Please upload a file or provide a file path."
}), 400
# Clean the dataframe
cleaned_df, report = clean_dataframe(
df,
null_method=null_method,
fix_numeric=fix_numeric,
remove_dups=remove_dups
)
# Add file source to report
report['file_source'] = file_source
# Return results based on requested format
if download:
# Create temp file for download
fd, temp_path = tempfile.mkstemp(
suffix=f".{output_format}" if output_format != 'json' else '.csv'
)
try:
if output_format == 'json':
with os.fdopen(fd, 'w') as f:
json_data = cleaned_df.to_json(orient='records')
f.write(json_data)
else: # csv or other formats
with os.fdopen(fd, 'w') as f:
cleaned_df.to_csv(temp_path, index=False)
# Send the file
return send_file(
temp_path,
as_attachment=True,
download_name=f"cleaned_data.{output_format}",
mimetype='text/csv' if output_format == 'csv' else 'application/json'
)
finally:
# Clean up temp file after sending
if os.path.exists(temp_path):
os.unlink(temp_path)
else:
# Return JSON response with report and sample data
sample_size = min(5, len(cleaned_df))
return jsonify({
'status': 'success',
'report': report,
'sample_data': cleaned_df.head(sample_size).to_dict('records'),
'shape': cleaned_df.shape,
'dtypes': {col: str(dtype) for col, dtype in cleaned_df.dtypes.items()}
})
except Exception as e:
app.logger.error(f"Error: {str(e)}")
app.logger.error(traceback.format_exc())
return jsonify({
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc() if app.debug else None
}), 500
@app.route('/api/nulls', methods=['POST'])
def handle_nulls():
"""API endpoint for handling null values only"""
try:
# Get parameters
null_method = request.form.get('method', 'nan')
columns = request.form.getlist('columns') or None
# Get dataframe
df = None
if 'file' in request.files:
file = request.files['file']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
df = read_dataframe(file_path)
else:
return jsonify({'status': 'error', 'message': 'Invalid file'}), 400
elif request.json and 'file_path' in request.json:
file_path = resolve_path(request.json['file_path'])
if os.path.exists(file_path):
df = read_dataframe(file_path)
else:
return jsonify({'status': 'error', 'message': 'File not found'}), 404
else:
return jsonify({'status': 'error', 'message': 'No file provided'}), 400
# Handle nulls
cleaned_df, report = handle_null_values(df, method=null_method, columns=columns)
# Return response
return jsonify({
'status': 'success',
'report': report,
'sample_data': cleaned_df.head(5).to_dict('records')
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/duplicates', methods=['POST'])
def handle_duplicates():
"""API endpoint for handling duplicates only"""
try:
# Get parameters
subset = request.form.getlist('columns') or None
keep = request.form.get('keep', 'first')
# Get dataframe
df = None
if 'file' in request.files:
file = request.files['file']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
df = read_dataframe(file_path)
else:
return jsonify({'status': 'error', 'message': 'Invalid file'}), 400
elif request.json and 'file_path' in request.json:
file_path = resolve_path(request.json['file_path'])
if os.path.exists(file_path):
df = read_dataframe(file_path)
else:
return jsonify({'status': 'error', 'message': 'File not found'}), 404
else:
return jsonify({'status': 'error', 'message': 'No file provided'}), 400
# Handle duplicates
cleaned_df, report = remove_duplicates(df, subset=subset, keep=keep)
# Return response
return jsonify({
'status': 'success',
'report': report,
'sample_data': cleaned_df.head(5).to_dict('records')
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/fix-datatypes', methods=['POST'])
def fix_datatypes():
"""API endpoint for fixing numeric datatypes only"""
try:
# Get parameters
columns = request.form.getlist('columns') or None
# Get dataframe
df = None
if 'file' in request.files:
file = request.files['file']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
df = read_dataframe(file_path)
else:
return jsonify({'status': 'error', 'message': 'Invalid file'}), 400
elif request.json and 'file_path' in request.json:
file_path = resolve_path(request.json['file_path'])
if os.path.exists(file_path):
df = read_dataframe(file_path)
else:
return jsonify({'status': 'error', 'message': 'File not found'}), 404
else:
return jsonify({'status': 'error', 'message': 'No file provided'}), 400
# Fix datatypes
cleaned_df, report = fix_numeric_datatypes(df, columns=columns)
# Return response
return jsonify({
'status': 'success',
'report': report,
'sample_data': cleaned_df.head(5).to_dict('records')
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
import pandas as pd
import numpy as np
import os
from data_cleaning_functions import clean_dataframe
def create_test_dataset():
"""Create a sample dataset with various issues to test the cleaning functions"""
# Create a dataframe with various issues
data = {
'id': [1, 2, 3, 4, 5, 5, None, 7],
'age': ['28', '34y/o', 'thirty-five', '40.', None, 'male', '22', '50'],
'income': [50000, 60000, None, '75000', 'unknown', 55000, np.nan, '80000'],
'category': ['A', 'B', 'A', 'C', 'B', 'B', 'D', 'A']
}
df = pd.DataFrame(data)
return df
def read_csv_input(file_path):
"""Read a CSV file and return a DataFrame"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"The file '{file_path}' does not exist.")
try:
df = pd.read_csv(file_path)
print(f"Successfully loaded CSV file: {file_path}")
return df
except Exception as e:
raise ValueError(f"Error reading the CSV file: {e}")
def main():
# Ask the user if they want to use a sample dataset or provide a CSV file
choice = input("Do you want to use a sample dataset or provide a CSV file? (sample/csv): ").strip().lower()
if choice == 'csv':
file_path = input("Enter the path to the CSV file: ").strip()
df = read_csv_input(file_path)
else:
# Create a test dataset
df = create_test_dataset()
# # Create a test dataset
# df = create_test_dataset()
# # df = 'gapminder-unclean.csv'
print("Original dataset:")
print(df)
print("\nOriginal data types:")
print(df.dtypes)
print("\nOriginal null values:")
print(df.isna().sum())
# Clean the dataframe
cleaned_df, report = clean_dataframe(
df,
null_method='zero', # Replace nulls with 0
fix_numeric=True, # Fix numeric data types
remove_dups=True # Remove duplicates
)
print("\n\nCleaned dataset:")
print(cleaned_df)
print("\nCleaned data types:")
print(cleaned_df.dtypes)
print("\nCleaned null values:")
print(cleaned_df.isna().sum())
print("\n\nCleaning Report:")
for section, details in report.items():
print(f"\n{section.upper()}:")
if isinstance(details, dict):
for key, value in details.items():
print(f" {key}: {value}")
else:
print(f" {details}")
# Save to CSV for testing the API later
cleaned_df.to_csv('cleaned_data.csv', index=False)
df.to_csv('gapminder_unclean.csv', index=False)
print("\nSaved original and cleaned data to CSV files for API testing")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment