Created
March 23, 2025 10:21
-
-
Save SamarthGarge/b511d33b0590448be95110ea40cf0676 to your computer and use it in GitHub Desktop.
Clean_data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| import numpy as np | |
| import re | |
| import os | |
| from pathlib import Path | |
| def handle_null_values(df, method:'nan', columns=None): | |
| """ | |
| Check for null values and replace them with specified value. | |
| Parameters: | |
| ----------- | |
| df : pandas.DataFrame | |
| The dataframe to check for null values | |
| method : str | |
| Method to handle null values: 'nan' or 'zero' | |
| columns : list, optional | |
| List of column names to check. If None, check all columns | |
| Returns: | |
| -------- | |
| pandas.DataFrame | |
| The dataframe with null values handled | |
| dict | |
| A report of null values before and after handling | |
| """ | |
| if columns is None: | |
| columns = df.columns | |
| # Create a copy to avoid modifying the original | |
| df_copy = df.copy() | |
| # Create report before handling | |
| null_report = { | |
| 'before': {col: df_copy[col].isna().sum() for col in columns}, | |
| 'replaced_values': {} | |
| } | |
| # Handle null values | |
| for col in columns: | |
| null_count = df_copy[col].isna().sum() | |
| if null_count > 0: | |
| if method == 'zero': | |
| df_copy[col] = df_copy[col].fillna(0) | |
| null_report['replaced_values'][col] = {'method': 'zero', 'count': null_count} | |
| else: # Default to NaN | |
| df_copy[col] = df_copy[col].fillna(np.nan) | |
| null_report['replaced_values'][col] = {'method': 'nan', 'count': null_count} | |
| # Create report after handling | |
| null_report['after'] = {col: df_copy[col].isna().sum() for col in columns} | |
| return df_copy, null_report | |
| def remove_duplicates(df, subset=None, keep='first', inplace=False): | |
| """ | |
| Check for and remove duplicate rows. | |
| Parameters: | |
| ----------- | |
| df : pandas.DataFrame | |
| The dataframe to check for duplicates | |
| subset : list, optional | |
| List of column names to consider for duplicates. If None, use all columns | |
| keep : {'first', 'last', False}, default 'first' | |
| Determines which duplicates to keep | |
| inplace : bool, default False | |
| Whether to drop duplicates in place or return a copy | |
| Returns: | |
| -------- | |
| pandas.DataFrame | |
| The dataframe with duplicates removed | |
| dict | |
| A report about duplicates | |
| """ | |
| # Create a copy if not inplace | |
| df_copy = df if inplace else df.copy() | |
| # Find duplicates | |
| if subset is None: | |
| duplicated_rows = df_copy.duplicated(keep=False) | |
| else: | |
| duplicated_rows = df_copy.duplicated(subset=subset, keep=False) | |
| # Create detailed report | |
| dup_report = { | |
| 'total_rows': len(df_copy), | |
| 'duplicate_rows': duplicated_rows.sum(), | |
| 'unique_rows': len(df_copy) - duplicated_rows.sum(), | |
| 'duplicate_percentage': round((duplicated_rows.sum() / len(df_copy)) * 100, 2) | |
| } | |
| # Extract sample of duplicates for detailed review | |
| if duplicated_rows.sum() > 0: | |
| sample_size = min(5, duplicated_rows.sum()) | |
| dup_examples = df_copy[duplicated_rows].head(sample_size) | |
| if subset is None: | |
| grouped = df_copy[duplicated_rows].groupby(list(df_copy.columns)) | |
| else: | |
| grouped = df_copy[duplicated_rows].groupby(subset) | |
| # Count of duplicates by group | |
| dup_counts = grouped.size().reset_index(name='count') | |
| dup_counts = dup_counts.sort_values('count', ascending=False) | |
| dup_report['duplicate_groups'] = len(dup_counts) | |
| dup_report['top_duplicates'] = dup_counts.head(5).to_dict('records') | |
| dup_report['dup_examples'] = dup_examples.to_dict('records') | |
| # Remove duplicates | |
| if duplicated_rows.sum() > 0: | |
| before_count = len(df_copy) | |
| df_copy.drop_duplicates(subset=subset, keep=keep, inplace=True) | |
| after_count = len(df_copy) | |
| dup_report['rows_removed'] = before_count - after_count | |
| return df_copy, dup_report | |
| def fix_numeric_datatypes(df, columns=None): | |
| """ | |
| Fix data types for numeric columns, handling various formats and cleaning. | |
| Parameters: | |
| ----------- | |
| df : pandas.DataFrame | |
| The dataframe to fix numeric data types | |
| columns : list, optional | |
| List of column names to fix. If None, attempt to fix all columns | |
| Returns: | |
| -------- | |
| pandas.DataFrame | |
| The dataframe with fixed numeric data types | |
| dict | |
| A report about the conversions | |
| """ | |
| # Create a copy to avoid modifying the original | |
| df_copy = df.copy() | |
| if columns is None: | |
| # Try to identify numeric columns automatically | |
| # This is a simple heuristic - find columns where >50% of non-null values can be converted to numeric | |
| columns = [] | |
| for col in df_copy.columns: | |
| sample = df_copy[col].dropna().head(100) | |
| if len(sample) == 0: | |
| continue | |
| # Try to convert sample to numeric and see how many succeed | |
| success_count = pd.to_numeric(sample, errors='coerce').notna().sum() | |
| if success_count / len(sample) > 0.5: | |
| columns.append(col) | |
| conversion_report = { | |
| 'successful_conversions': {}, | |
| 'failed_conversions': {}, | |
| 'original_dtypes': {col: str(df_copy[col].dtype) for col in columns}, | |
| 'final_dtypes': {} | |
| } | |
| for col in columns: | |
| # Save original data for reporting | |
| original_data = df_copy[col].copy() | |
| non_null_count = original_data.notna().sum() | |
| # Step 1: Handle text-based numbers like "Twenty-Eight" | |
| # This is a basic implementation - would need more comprehensive mapping for production | |
| number_word_map = { | |
| 'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4', | |
| 'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', | |
| 'ten': '10', 'twenty': '20', 'thirty': '30', 'forty': '40', 'fifty': '50', | |
| 'sixty': '60', 'seventy': '70', 'eighty': '80', 'ninety': '90', | |
| 'hundred': '100', 'thousand': '1000', 'million': '1000000' | |
| } | |
| if df_copy[col].dtype == 'object': | |
| # Convert to string first to handle various types | |
| df_copy[col] = df_copy[col].astype(str) | |
| # Replace number words with digits | |
| for word, number in number_word_map.items(): | |
| pattern = r'\b' + word + r'\b' | |
| df_copy[col] = df_copy[col].str.replace(pattern, number, case=False, regex=True) | |
| # Handle hyphenated compound numbers like "twenty-eight" | |
| for word2, number2 in number_word_map.items(): | |
| compound_pattern = r'\b' + word + r'-' + word2 + r'\b' | |
| # For simplicity in this example - in production would need more complex logic | |
| if word in ['twenty', 'thirty', 'forty', 'fifty', 'sixty', 'seventy', 'eighty', 'ninety']: | |
| replacement = str(int(number) + int(number2)) | |
| df_copy[col] = df_copy[col].str.replace(compound_pattern, replacement, case=False, regex=True) | |
| # Step 2: Clean out non-numeric characters but keep decimal points | |
| # First replace common patterns like '28y/o' -> '28' | |
| df_copy[col] = df_copy[col].str.replace(r'(\d+)y\/o', r'\1', regex=True) | |
| df_copy[col] = df_copy[col].str.replace(r'(\d+)\.', r'\1', regex=True) # Remove trailing periods | |
| # Extract numeric part - prioritize digits with optional decimal point | |
| def extract_numeric(value): | |
| if pd.isna(value): | |
| return value | |
| # Try to match floating point number pattern | |
| match = re.search(r'-?\d+(\.\d+)?', str(value)) | |
| if match: | |
| return match.group(0) | |
| return value | |
| df_copy[col] = df_copy[col].apply(extract_numeric) | |
| # Step 3: Convert to numeric | |
| df_copy[col] = pd.to_numeric(df_copy[col], errors='coerce') | |
| # Report results | |
| successful_conversions = df_copy[col].notna().sum() | |
| failed_conversions = non_null_count - successful_conversions | |
| conversion_report['successful_conversions'][col] = successful_conversions | |
| conversion_report['failed_conversions'][col] = failed_conversions | |
| conversion_report['final_dtypes'][col] = str(df_copy[col].dtype) | |
| return df_copy, conversion_report | |
| def clean_dataframe(df, null_method='nan', fix_numeric=True, remove_dups=True, dup_subset=None): | |
| """ | |
| Apply all cleaning functions in sequence | |
| Parameters: | |
| ----------- | |
| df : pandas.DataFrame | |
| The dataframe to clean | |
| null_method : str | |
| Method to handle null values: 'nan' or 'zero' | |
| fix_numeric : bool | |
| Whether to fix numeric data types | |
| remove_dups : bool | |
| Whether to remove duplicates | |
| dup_subset : list, optional | |
| Columns to consider for duplicates | |
| Returns: | |
| -------- | |
| pandas.DataFrame | |
| The cleaned dataframe | |
| dict | |
| A report of all cleaning operations | |
| """ | |
| report = {'original_shape': df.shape} | |
| # Handle null values | |
| df, null_report = handle_null_values(df, method=null_method) | |
| report['null_handling'] = null_report | |
| # Fix numeric data types | |
| if fix_numeric: | |
| df, dtype_report = fix_numeric_datatypes(df) | |
| report['datatype_handling'] = dtype_report | |
| # Remove duplicates | |
| if remove_dups: | |
| df, dup_report = remove_duplicates(df, subset=dup_subset) | |
| report['duplicate_handling'] = dup_report | |
| report['final_shape'] = df.shape | |
| report['shape_change'] = { | |
| 'rows': report['final_shape'][0] - report['original_shape'][0], | |
| 'columns': report['final_shape'][1] - report['original_shape'][1] | |
| } | |
| return df, report |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from flask import Flask, request, jsonify, send_file | |
| import pandas as pd | |
| import os | |
| import json | |
| import tempfile | |
| from pathlib import Path | |
| import platform | |
| import traceback | |
| from werkzeug.utils import secure_filename | |
| # Import our data cleaning functions | |
| from data_cleaning_functions import ( | |
| handle_null_values, | |
| remove_duplicates, | |
| fix_numeric_datatypes, | |
| clean_dataframe | |
| ) | |
| app = Flask(__name__) | |
| # Configure upload settings | |
| UPLOAD_FOLDER = tempfile.mkdtemp() | |
| ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls'} | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB max upload size | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def resolve_path(path): | |
| """Resolve special path variables like %USERPROFILE% or %HOME%""" | |
| if platform.system() == 'Windows': | |
| if path.startswith('%USERPROFILE%'): | |
| return path.replace('%USERPROFILE%', os.environ.get('USERPROFILE', '')) | |
| else: # Assume Linux/Mac | |
| if path.startswith('%HOME%'): | |
| return path.replace('%HOME%', os.environ.get('HOME', '')) | |
| return path | |
| def read_dataframe(file_path): | |
| """Read a dataframe from a file path, detecting the file type""" | |
| ext = file_path.split('.')[-1].lower() | |
| if ext == 'csv': | |
| return pd.read_csv(file_path) | |
| elif ext in ['xlsx', 'xls']: | |
| return pd.read_excel(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file format: {ext}") | |
| @app.route('/api/clean', methods=['POST']) | |
| def clean_data(): | |
| """ | |
| API endpoint for cleaning data. | |
| This endpoint accepts either: | |
| 1. A file upload (multipart/form-data) with a 'file' field | |
| 2. A JSON request with a 'file_path' field | |
| Optional parameters: | |
| - null_method: 'nan' or 'zero' (default: 'nan') | |
| - fix_numeric: 'true' or 'false' (default: 'true') | |
| - remove_duplicates: 'true' or 'false' (default: 'true') | |
| - output_format: 'csv' or 'json' (default: 'csv') | |
| - download: 'true' or 'false' (default: 'false') | |
| """ | |
| try: | |
| # Get cleaning parameters | |
| null_method = request.form.get('null_method', 'nan') | |
| fix_numeric = request.form.get('fix_numeric', 'true').lower() == 'true' | |
| remove_dups = request.form.get('remove_duplicates', 'true').lower() == 'true' | |
| output_format = request.form.get('output_format', 'csv').lower() | |
| download = request.form.get('download', 'false').lower() == 'true' | |
| # Get dataframe either from file upload or path | |
| df = None | |
| file_source = None | |
| if 'file' in request.files: | |
| # Handle file upload | |
| file = request.files['file'] | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| df = read_dataframe(file_path) | |
| file_source = f"Uploaded file: {filename}" | |
| else: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': f"Invalid file. Allowed formats: {', '.join(ALLOWED_EXTENSIONS)}" | |
| }), 400 | |
| elif request.json and 'file_path' in request.json: | |
| # Handle file path | |
| raw_path = request.json['file_path'] | |
| file_path = resolve_path(raw_path) | |
| if os.path.exists(file_path): | |
| df = read_dataframe(file_path) | |
| file_source = f"File path: {raw_path}" | |
| else: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': f"File not found: {raw_path}" | |
| }), 404 | |
| else: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': "No file provided. Please upload a file or provide a file path." | |
| }), 400 | |
| # Clean the dataframe | |
| cleaned_df, report = clean_dataframe( | |
| df, | |
| null_method=null_method, | |
| fix_numeric=fix_numeric, | |
| remove_dups=remove_dups | |
| ) | |
| # Add file source to report | |
| report['file_source'] = file_source | |
| # Return results based on requested format | |
| if download: | |
| # Create temp file for download | |
| fd, temp_path = tempfile.mkstemp( | |
| suffix=f".{output_format}" if output_format != 'json' else '.csv' | |
| ) | |
| try: | |
| if output_format == 'json': | |
| with os.fdopen(fd, 'w') as f: | |
| json_data = cleaned_df.to_json(orient='records') | |
| f.write(json_data) | |
| else: # csv or other formats | |
| with os.fdopen(fd, 'w') as f: | |
| cleaned_df.to_csv(temp_path, index=False) | |
| # Send the file | |
| return send_file( | |
| temp_path, | |
| as_attachment=True, | |
| download_name=f"cleaned_data.{output_format}", | |
| mimetype='text/csv' if output_format == 'csv' else 'application/json' | |
| ) | |
| finally: | |
| # Clean up temp file after sending | |
| if os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| else: | |
| # Return JSON response with report and sample data | |
| sample_size = min(5, len(cleaned_df)) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'report': report, | |
| 'sample_data': cleaned_df.head(sample_size).to_dict('records'), | |
| 'shape': cleaned_df.shape, | |
| 'dtypes': {col: str(dtype) for col, dtype in cleaned_df.dtypes.items()} | |
| }) | |
| except Exception as e: | |
| app.logger.error(f"Error: {str(e)}") | |
| app.logger.error(traceback.format_exc()) | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': str(e), | |
| 'traceback': traceback.format_exc() if app.debug else None | |
| }), 500 | |
| @app.route('/api/nulls', methods=['POST']) | |
| def handle_nulls(): | |
| """API endpoint for handling null values only""" | |
| try: | |
| # Get parameters | |
| null_method = request.form.get('method', 'nan') | |
| columns = request.form.getlist('columns') or None | |
| # Get dataframe | |
| df = None | |
| if 'file' in request.files: | |
| file = request.files['file'] | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| df = read_dataframe(file_path) | |
| else: | |
| return jsonify({'status': 'error', 'message': 'Invalid file'}), 400 | |
| elif request.json and 'file_path' in request.json: | |
| file_path = resolve_path(request.json['file_path']) | |
| if os.path.exists(file_path): | |
| df = read_dataframe(file_path) | |
| else: | |
| return jsonify({'status': 'error', 'message': 'File not found'}), 404 | |
| else: | |
| return jsonify({'status': 'error', 'message': 'No file provided'}), 400 | |
| # Handle nulls | |
| cleaned_df, report = handle_null_values(df, method=null_method, columns=columns) | |
| # Return response | |
| return jsonify({ | |
| 'status': 'success', | |
| 'report': report, | |
| 'sample_data': cleaned_df.head(5).to_dict('records') | |
| }) | |
| except Exception as e: | |
| return jsonify({'status': 'error', 'message': str(e)}), 500 | |
| @app.route('/api/duplicates', methods=['POST']) | |
| def handle_duplicates(): | |
| """API endpoint for handling duplicates only""" | |
| try: | |
| # Get parameters | |
| subset = request.form.getlist('columns') or None | |
| keep = request.form.get('keep', 'first') | |
| # Get dataframe | |
| df = None | |
| if 'file' in request.files: | |
| file = request.files['file'] | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| df = read_dataframe(file_path) | |
| else: | |
| return jsonify({'status': 'error', 'message': 'Invalid file'}), 400 | |
| elif request.json and 'file_path' in request.json: | |
| file_path = resolve_path(request.json['file_path']) | |
| if os.path.exists(file_path): | |
| df = read_dataframe(file_path) | |
| else: | |
| return jsonify({'status': 'error', 'message': 'File not found'}), 404 | |
| else: | |
| return jsonify({'status': 'error', 'message': 'No file provided'}), 400 | |
| # Handle duplicates | |
| cleaned_df, report = remove_duplicates(df, subset=subset, keep=keep) | |
| # Return response | |
| return jsonify({ | |
| 'status': 'success', | |
| 'report': report, | |
| 'sample_data': cleaned_df.head(5).to_dict('records') | |
| }) | |
| except Exception as e: | |
| return jsonify({'status': 'error', 'message': str(e)}), 500 | |
| @app.route('/api/fix-datatypes', methods=['POST']) | |
| def fix_datatypes(): | |
| """API endpoint for fixing numeric datatypes only""" | |
| try: | |
| # Get parameters | |
| columns = request.form.getlist('columns') or None | |
| # Get dataframe | |
| df = None | |
| if 'file' in request.files: | |
| file = request.files['file'] | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| df = read_dataframe(file_path) | |
| else: | |
| return jsonify({'status': 'error', 'message': 'Invalid file'}), 400 | |
| elif request.json and 'file_path' in request.json: | |
| file_path = resolve_path(request.json['file_path']) | |
| if os.path.exists(file_path): | |
| df = read_dataframe(file_path) | |
| else: | |
| return jsonify({'status': 'error', 'message': 'File not found'}), 404 | |
| else: | |
| return jsonify({'status': 'error', 'message': 'No file provided'}), 400 | |
| # Fix datatypes | |
| cleaned_df, report = fix_numeric_datatypes(df, columns=columns) | |
| # Return response | |
| return jsonify({ | |
| 'status': 'success', | |
| 'report': report, | |
| 'sample_data': cleaned_df.head(5).to_dict('records') | |
| }) | |
| except Exception as e: | |
| return jsonify({'status': 'error', 'message': str(e)}), 500 | |
| if __name__ == '__main__': | |
| app.run(debug=True) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| import numpy as np | |
| import os | |
| from data_cleaning_functions import clean_dataframe | |
| def create_test_dataset(): | |
| """Create a sample dataset with various issues to test the cleaning functions""" | |
| # Create a dataframe with various issues | |
| data = { | |
| 'id': [1, 2, 3, 4, 5, 5, None, 7], | |
| 'age': ['28', '34y/o', 'thirty-five', '40.', None, 'male', '22', '50'], | |
| 'income': [50000, 60000, None, '75000', 'unknown', 55000, np.nan, '80000'], | |
| 'category': ['A', 'B', 'A', 'C', 'B', 'B', 'D', 'A'] | |
| } | |
| df = pd.DataFrame(data) | |
| return df | |
| def read_csv_input(file_path): | |
| """Read a CSV file and return a DataFrame""" | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"The file '{file_path}' does not exist.") | |
| try: | |
| df = pd.read_csv(file_path) | |
| print(f"Successfully loaded CSV file: {file_path}") | |
| return df | |
| except Exception as e: | |
| raise ValueError(f"Error reading the CSV file: {e}") | |
| def main(): | |
| # Ask the user if they want to use a sample dataset or provide a CSV file | |
| choice = input("Do you want to use a sample dataset or provide a CSV file? (sample/csv): ").strip().lower() | |
| if choice == 'csv': | |
| file_path = input("Enter the path to the CSV file: ").strip() | |
| df = read_csv_input(file_path) | |
| else: | |
| # Create a test dataset | |
| df = create_test_dataset() | |
| # # Create a test dataset | |
| # df = create_test_dataset() | |
| # # df = 'gapminder-unclean.csv' | |
| print("Original dataset:") | |
| print(df) | |
| print("\nOriginal data types:") | |
| print(df.dtypes) | |
| print("\nOriginal null values:") | |
| print(df.isna().sum()) | |
| # Clean the dataframe | |
| cleaned_df, report = clean_dataframe( | |
| df, | |
| null_method='zero', # Replace nulls with 0 | |
| fix_numeric=True, # Fix numeric data types | |
| remove_dups=True # Remove duplicates | |
| ) | |
| print("\n\nCleaned dataset:") | |
| print(cleaned_df) | |
| print("\nCleaned data types:") | |
| print(cleaned_df.dtypes) | |
| print("\nCleaned null values:") | |
| print(cleaned_df.isna().sum()) | |
| print("\n\nCleaning Report:") | |
| for section, details in report.items(): | |
| print(f"\n{section.upper()}:") | |
| if isinstance(details, dict): | |
| for key, value in details.items(): | |
| print(f" {key}: {value}") | |
| else: | |
| print(f" {details}") | |
| # Save to CSV for testing the API later | |
| cleaned_df.to_csv('cleaned_data.csv', index=False) | |
| df.to_csv('gapminder_unclean.csv', index=False) | |
| print("\nSaved original and cleaned data to CSV files for API testing") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment