Created
March 26, 2025 10:58
-
-
Save SamarthGarge/3ffa5212987570c4084e545d24b5c793 to your computer and use it in GitHub Desktop.
Outliers feature : 2 ways to use it => 1) From the outliers.py function 2) By integrating it in the clean_dataframe function
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def clean_dataframe(df, null_method='nan', fix_numeric=True, remove_dups=True, dup_subset=None, | |
| detect_outliers_flag=False, outlier_columns=None, outlier_method='zscore', | |
| outlier_threshold=3, outlier_processing='remove', outlier_cap_values=None): | |
| """ | |
| Apply all cleaning functions in sequence | |
| Parameters: | |
| ----------- | |
| df : pandas.DataFrame | |
| The dataframe to clean | |
| null_method : str | |
| Method to handle null values: 'nan' or 'zero' | |
| fix_numeric : bool | |
| Whether to fix numeric data types | |
| remove_dups : bool | |
| Whether to remove duplicates | |
| dup_subset : list, optional | |
| Columns to consider for duplicates | |
| detect_outliers_flag : bool, default False | |
| Whether to detect and process outliers | |
| outlier_columns : list, optional | |
| Columns to check for outliers. If None, check all numeric columns | |
| outlier_method : str, default 'zscore' | |
| Method to detect outliers: 'zscore', 'iqr', 'modified_zscore' | |
| outlier_threshold : float, default 3 | |
| Threshold for detecting outliers | |
| outlier_processing : str, default 'remove' | |
| How to process outliers: 'remove', 'cap', 'transform', 'impute', 'separate' | |
| outlier_cap_values : dict, optional | |
| Dictionary specifying upper and lower bounds for capping (e.g., {'col1': (lower, upper)}) | |
| Returns: | |
| -------- | |
| pandas.DataFrame | |
| The cleaned dataframe | |
| dict | |
| A report of all cleaning operations | |
| """ | |
| report = {'original_shape': df.shape} | |
| # Handle null values | |
| df, null_report = handle_null_values(df, method=null_method) | |
| report['null_handling'] = null_report | |
| # Fix numeric data types | |
| if fix_numeric: | |
| df, dtype_report = fix_numeric_datatypes(df) | |
| report['datatype_handling'] = dtype_report | |
| # Remove duplicates | |
| if remove_dups: | |
| df, dup_report = remove_duplicates(df, subset=dup_subset) | |
| report['duplicate_handling'] = dup_report | |
| # Detect and process outliers | |
| if detect_outliers_flag: | |
| df, outlier_report = detect_outliers( | |
| df, | |
| columns=outlier_columns, | |
| method=outlier_method, | |
| threshold=outlier_threshold, | |
| processing=outlier_processing, | |
| cap_values=outlier_cap_values | |
| ) | |
| report['outlier_handling'] = outlier_report | |
| report['final_shape'] = df.shape | |
| report['shape_change'] = { | |
| 'rows': report['final_shape'][0] - report['original_shape'][0], | |
| 'columns': report['final_shape'][1] - report['original_shape'][1] | |
| } | |
| return df, report |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def detect_outliers(df, columns=None, method='zscore', threshold=3, processing='remove', cap_values=None): | |
| """ | |
| Detect and process outliers in the dataset. | |
| Parameters: | |
| ----------- | |
| df : pandas.DataFrame | |
| The dataframe to process | |
| columns : list, optional | |
| List of column names to check for outliers. If None, check all numeric columns | |
| method : str, default 'zscore' | |
| Method to detect outliers: 'zscore', 'iqr', 'modified_zscore' | |
| threshold : float, default 3 | |
| Threshold for detecting outliers (e.g., Z-score > 3) | |
| processing : str, default 'remove' | |
| How to process outliers: 'remove', 'cap', 'transform', 'impute', 'separate' | |
| cap_values : dict, optional | |
| Dictionary specifying upper and lower bounds for capping (e.g., {'col1': (lower, upper)}) | |
| Returns: | |
| -------- | |
| pandas.DataFrame | |
| The dataframe with outliers processed | |
| dict | |
| A report of outliers detected and processed | |
| """ | |
| if columns is None: | |
| # Automatically select numeric columns | |
| columns = df.select_dtypes(include=[np.number]).columns.tolist() | |
| df_copy = df.copy() | |
| outlier_report = { | |
| 'method': method, | |
| 'threshold': threshold, | |
| 'processing': processing, | |
| 'columns': {}, | |
| } | |
| for col in columns: | |
| if method == 'zscore': | |
| # Z-score method | |
| z_scores = (df_copy[col] - df_copy[col].mean()) / df_copy[col].std() | |
| outliers = z_scores.abs() > threshold | |
| elif method == 'iqr': | |
| # IQR method | |
| Q1 = df_copy[col].quantile(0.25) | |
| Q3 = df_copy[col].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - (threshold * IQR) | |
| upper_bound = Q3 + (threshold * IQR) | |
| outliers = (df_copy[col] < lower_bound) | (df_copy[col] > upper_bound) | |
| elif method == 'modified_zscore': | |
| # Modified Z-score method | |
| median = df_copy[col].median() | |
| mad = (df_copy[col] - median).abs().median() | |
| modified_z_scores = 0.6745 * (df_copy[col] - median) / mad | |
| outliers = modified_z_scores.abs() > threshold | |
| else: | |
| raise ValueError(f"Unsupported method: {method}") | |
| outlier_count = outliers.sum() | |
| outlier_report['columns'][col] = { | |
| 'outlier_count': int(outlier_count), | |
| 'total_count': len(df_copy[col]), | |
| 'outlier_percentage': round((outlier_count / len(df_copy[col])) * 100, 2), | |
| } | |
| if processing == 'remove': | |
| # Remove outliers | |
| df_copy = df_copy[~outliers] | |
| elif processing == 'cap': | |
| # Cap outliers | |
| if cap_values and col in cap_values: | |
| lower, upper = cap_values[col] | |
| else: | |
| lower, upper = df_copy[col].quantile(0.01), df_copy[col].quantile(0.99) | |
| df_copy[col] = np.where(df_copy[col] < lower, lower, df_copy[col]) | |
| df_copy[col] = np.where(df_copy[col] > upper, upper, df_copy[col]) | |
| elif processing == 'transform': | |
| # Apply logarithmic transformation | |
| df_copy[col] = np.log1p(df_copy[col].clip(lower=0)) | |
| elif processing == 'impute': | |
| # Replace outliers with median | |
| median = df_copy[col].median() | |
| df_copy[col] = np.where(outliers, median, df_copy[col]) | |
| elif processing == 'separate': | |
| # Separate outliers for analysis | |
| outlier_report['columns'][col]['outlier_values'] = df_copy.loc[outliers, col].tolist() | |
| else: | |
| raise ValueError(f"Unsupported processing option: {processing}") | |
| return df_copy, outlier_report |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @app.route('/api/clean', methods=['POST']) | |
| def clean_data(): | |
| try: | |
| # Get cleaning parameters | |
| null_method = request.form.get('null_method', 'nan') | |
| fix_numeric = request.form.get('fix_numeric', 'true').lower() == 'true' | |
| remove_dups = request.form.get('remove_duplicates', 'true').lower() == 'true' | |
| detect_outliers_flag = request.form.get('detect_outliers', 'false').lower() == 'true' | |
| outlier_method = request.form.get('outlier_method', 'zscore') | |
| outlier_threshold = float(request.form.get('outlier_threshold', 3)) | |
| outlier_processing = request.form.get('outlier_processing', 'remove') | |
| # Get dataframe either from file upload or path | |
| df = None | |
| file_source = None | |
| if 'file' in request.files: | |
| file = request.files['file'] | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(file_path) | |
| df = read_dataframe(file_path) | |
| file_source = f"Uploaded file: {filename}" | |
| else: | |
| return jsonify({'status': 'error', 'message': 'Invalid file'}), 400 | |
| elif request.json and 'file_path' in request.json: | |
| raw_path = request.json['file_path'] | |
| file_path = resolve_path(raw_path) | |
| if os.path.exists(file_path): | |
| df = read_dataframe(file_path) | |
| file_source = f"File path: {raw_path}" | |
| else: | |
| return jsonify({'status': 'error', 'message': f"File not found: {raw_path}"}), 404 | |
| else: | |
| return jsonify({'status': 'error', 'message': 'No file provided'}), 400 | |
| # Clean the dataframe | |
| cleaned_df, report = clean_dataframe( | |
| df, | |
| null_method=null_method, | |
| fix_numeric=fix_numeric, | |
| remove_dups=remove_dups, | |
| detect_outliers_flag=detect_outliers_flag, | |
| outlier_method=outlier_method, | |
| outlier_threshold=outlier_threshold, | |
| outlier_processing=outlier_processing | |
| ) | |
| # Add file source to report | |
| report['file_source'] = file_source | |
| # Return JSON response with report and sample data | |
| sample_size = min(5, len(cleaned_df)) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'report': report, | |
| 'sample_data': cleaned_df.head(sample_size).to_dict('records'), | |
| 'shape': cleaned_df.shape, | |
| 'dtypes': {col: str(dtype) for col, dtype in cleaned_df.dtypes.items()} | |
| }) | |
| except Exception as e: | |
| app.logger.error(f"Error: {str(e)}") | |
| return jsonify({'status': 'error', 'message': str(e)}), 500 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def main(): | |
| # Ask the user if they want to use a sample dataset or provide a CSV file | |
| choice = input("Do you want to use a sample dataset or provide a CSV file? (sample/csv): ").strip().lower() | |
| if choice == 'csv': | |
| file_path = input("Enter the path to the CSV file: ").strip() | |
| df = read_csv_input(file_path) | |
| else: | |
| # Create a test dataset | |
| df = create_test_dataset() | |
| print("Original dataset:") | |
| print(df) | |
| print("\nOriginal data types:") | |
| print(df.dtypes) | |
| print("\nOriginal null values:") | |
| print(df.isna().sum()) | |
| # Clean the dataframe | |
| cleaned_df, report = clean_dataframe( | |
| df, | |
| null_method='zero', # Replace nulls with 0 | |
| fix_numeric=True, # Fix numeric data types | |
| remove_dups=True, # Remove duplicates | |
| detect_outliers_flag=True, # Enable outlier detection | |
| outlier_columns=None, # Detect outliers in all numeric columns | |
| outlier_method='zscore', # Use Z-score method | |
| outlier_threshold=3, # Threshold for Z-score | |
| outlier_processing='remove' # Remove outliers | |
| ) | |
| print("\n\nCleaned dataset:") | |
| print(cleaned_df) | |
| print("\nCleaned data types:") | |
| print(cleaned_df.dtypes) | |
| print("\nCleaned null values:") | |
| print(cleaned_df.isna().sum()) | |
| print("\n\nCleaning Report:") | |
| for section, details in report.items(): | |
| print(f"\n{section.upper()}:") | |
| if isinstance(details, dict): | |
| for key, value in details.items(): | |
| print(f" {key}: {value}") | |
| else: | |
| print(f" {details}") | |
| # Save to CSV for testing the API later | |
| cleaned_df.to_csv('cleaned_data.csv', index=False) | |
| print("\nSaved cleaned data to 'cleaned_data.csv'") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment