Skip to content

Instantly share code, notes, and snippets.

@SamarthGarge
Created March 26, 2025 10:58
Show Gist options
  • Select an option

  • Save SamarthGarge/3ffa5212987570c4084e545d24b5c793 to your computer and use it in GitHub Desktop.

Select an option

Save SamarthGarge/3ffa5212987570c4084e545d24b5c793 to your computer and use it in GitHub Desktop.
Outliers feature : 2 ways to use it => 1) From the outliers.py function 2) By integrating it in the clean_dataframe function
def clean_dataframe(df, null_method='nan', fix_numeric=True, remove_dups=True, dup_subset=None,
detect_outliers_flag=False, outlier_columns=None, outlier_method='zscore',
outlier_threshold=3, outlier_processing='remove', outlier_cap_values=None):
"""
Apply all cleaning functions in sequence
Parameters:
-----------
df : pandas.DataFrame
The dataframe to clean
null_method : str
Method to handle null values: 'nan' or 'zero'
fix_numeric : bool
Whether to fix numeric data types
remove_dups : bool
Whether to remove duplicates
dup_subset : list, optional
Columns to consider for duplicates
detect_outliers_flag : bool, default False
Whether to detect and process outliers
outlier_columns : list, optional
Columns to check for outliers. If None, check all numeric columns
outlier_method : str, default 'zscore'
Method to detect outliers: 'zscore', 'iqr', 'modified_zscore'
outlier_threshold : float, default 3
Threshold for detecting outliers
outlier_processing : str, default 'remove'
How to process outliers: 'remove', 'cap', 'transform', 'impute', 'separate'
outlier_cap_values : dict, optional
Dictionary specifying upper and lower bounds for capping (e.g., {'col1': (lower, upper)})
Returns:
--------
pandas.DataFrame
The cleaned dataframe
dict
A report of all cleaning operations
"""
report = {'original_shape': df.shape}
# Handle null values
df, null_report = handle_null_values(df, method=null_method)
report['null_handling'] = null_report
# Fix numeric data types
if fix_numeric:
df, dtype_report = fix_numeric_datatypes(df)
report['datatype_handling'] = dtype_report
# Remove duplicates
if remove_dups:
df, dup_report = remove_duplicates(df, subset=dup_subset)
report['duplicate_handling'] = dup_report
# Detect and process outliers
if detect_outliers_flag:
df, outlier_report = detect_outliers(
df,
columns=outlier_columns,
method=outlier_method,
threshold=outlier_threshold,
processing=outlier_processing,
cap_values=outlier_cap_values
)
report['outlier_handling'] = outlier_report
report['final_shape'] = df.shape
report['shape_change'] = {
'rows': report['final_shape'][0] - report['original_shape'][0],
'columns': report['final_shape'][1] - report['original_shape'][1]
}
return df, report
def detect_outliers(df, columns=None, method='zscore', threshold=3, processing='remove', cap_values=None):
"""
Detect and process outliers in the dataset.
Parameters:
-----------
df : pandas.DataFrame
The dataframe to process
columns : list, optional
List of column names to check for outliers. If None, check all numeric columns
method : str, default 'zscore'
Method to detect outliers: 'zscore', 'iqr', 'modified_zscore'
threshold : float, default 3
Threshold for detecting outliers (e.g., Z-score > 3)
processing : str, default 'remove'
How to process outliers: 'remove', 'cap', 'transform', 'impute', 'separate'
cap_values : dict, optional
Dictionary specifying upper and lower bounds for capping (e.g., {'col1': (lower, upper)})
Returns:
--------
pandas.DataFrame
The dataframe with outliers processed
dict
A report of outliers detected and processed
"""
if columns is None:
# Automatically select numeric columns
columns = df.select_dtypes(include=[np.number]).columns.tolist()
df_copy = df.copy()
outlier_report = {
'method': method,
'threshold': threshold,
'processing': processing,
'columns': {},
}
for col in columns:
if method == 'zscore':
# Z-score method
z_scores = (df_copy[col] - df_copy[col].mean()) / df_copy[col].std()
outliers = z_scores.abs() > threshold
elif method == 'iqr':
# IQR method
Q1 = df_copy[col].quantile(0.25)
Q3 = df_copy[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - (threshold * IQR)
upper_bound = Q3 + (threshold * IQR)
outliers = (df_copy[col] < lower_bound) | (df_copy[col] > upper_bound)
elif method == 'modified_zscore':
# Modified Z-score method
median = df_copy[col].median()
mad = (df_copy[col] - median).abs().median()
modified_z_scores = 0.6745 * (df_copy[col] - median) / mad
outliers = modified_z_scores.abs() > threshold
else:
raise ValueError(f"Unsupported method: {method}")
outlier_count = outliers.sum()
outlier_report['columns'][col] = {
'outlier_count': int(outlier_count),
'total_count': len(df_copy[col]),
'outlier_percentage': round((outlier_count / len(df_copy[col])) * 100, 2),
}
if processing == 'remove':
# Remove outliers
df_copy = df_copy[~outliers]
elif processing == 'cap':
# Cap outliers
if cap_values and col in cap_values:
lower, upper = cap_values[col]
else:
lower, upper = df_copy[col].quantile(0.01), df_copy[col].quantile(0.99)
df_copy[col] = np.where(df_copy[col] < lower, lower, df_copy[col])
df_copy[col] = np.where(df_copy[col] > upper, upper, df_copy[col])
elif processing == 'transform':
# Apply logarithmic transformation
df_copy[col] = np.log1p(df_copy[col].clip(lower=0))
elif processing == 'impute':
# Replace outliers with median
median = df_copy[col].median()
df_copy[col] = np.where(outliers, median, df_copy[col])
elif processing == 'separate':
# Separate outliers for analysis
outlier_report['columns'][col]['outlier_values'] = df_copy.loc[outliers, col].tolist()
else:
raise ValueError(f"Unsupported processing option: {processing}")
return df_copy, outlier_report
@app.route('/api/clean', methods=['POST'])
def clean_data():
try:
# Get cleaning parameters
null_method = request.form.get('null_method', 'nan')
fix_numeric = request.form.get('fix_numeric', 'true').lower() == 'true'
remove_dups = request.form.get('remove_duplicates', 'true').lower() == 'true'
detect_outliers_flag = request.form.get('detect_outliers', 'false').lower() == 'true'
outlier_method = request.form.get('outlier_method', 'zscore')
outlier_threshold = float(request.form.get('outlier_threshold', 3))
outlier_processing = request.form.get('outlier_processing', 'remove')
# Get dataframe either from file upload or path
df = None
file_source = None
if 'file' in request.files:
file = request.files['file']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
df = read_dataframe(file_path)
file_source = f"Uploaded file: {filename}"
else:
return jsonify({'status': 'error', 'message': 'Invalid file'}), 400
elif request.json and 'file_path' in request.json:
raw_path = request.json['file_path']
file_path = resolve_path(raw_path)
if os.path.exists(file_path):
df = read_dataframe(file_path)
file_source = f"File path: {raw_path}"
else:
return jsonify({'status': 'error', 'message': f"File not found: {raw_path}"}), 404
else:
return jsonify({'status': 'error', 'message': 'No file provided'}), 400
# Clean the dataframe
cleaned_df, report = clean_dataframe(
df,
null_method=null_method,
fix_numeric=fix_numeric,
remove_dups=remove_dups,
detect_outliers_flag=detect_outliers_flag,
outlier_method=outlier_method,
outlier_threshold=outlier_threshold,
outlier_processing=outlier_processing
)
# Add file source to report
report['file_source'] = file_source
# Return JSON response with report and sample data
sample_size = min(5, len(cleaned_df))
return jsonify({
'status': 'success',
'report': report,
'sample_data': cleaned_df.head(sample_size).to_dict('records'),
'shape': cleaned_df.shape,
'dtypes': {col: str(dtype) for col, dtype in cleaned_df.dtypes.items()}
})
except Exception as e:
app.logger.error(f"Error: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)}), 500
def main():
# Ask the user if they want to use a sample dataset or provide a CSV file
choice = input("Do you want to use a sample dataset or provide a CSV file? (sample/csv): ").strip().lower()
if choice == 'csv':
file_path = input("Enter the path to the CSV file: ").strip()
df = read_csv_input(file_path)
else:
# Create a test dataset
df = create_test_dataset()
print("Original dataset:")
print(df)
print("\nOriginal data types:")
print(df.dtypes)
print("\nOriginal null values:")
print(df.isna().sum())
# Clean the dataframe
cleaned_df, report = clean_dataframe(
df,
null_method='zero', # Replace nulls with 0
fix_numeric=True, # Fix numeric data types
remove_dups=True, # Remove duplicates
detect_outliers_flag=True, # Enable outlier detection
outlier_columns=None, # Detect outliers in all numeric columns
outlier_method='zscore', # Use Z-score method
outlier_threshold=3, # Threshold for Z-score
outlier_processing='remove' # Remove outliers
)
print("\n\nCleaned dataset:")
print(cleaned_df)
print("\nCleaned data types:")
print(cleaned_df.dtypes)
print("\nCleaned null values:")
print(cleaned_df.isna().sum())
print("\n\nCleaning Report:")
for section, details in report.items():
print(f"\n{section.upper()}:")
if isinstance(details, dict):
for key, value in details.items():
print(f" {key}: {value}")
else:
print(f" {details}")
# Save to CSV for testing the API later
cleaned_df.to_csv('cleaned_data.csv', index=False)
print("\nSaved cleaned data to 'cleaned_data.csv'")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment