Last active
April 7, 2025 15:05
-
-
Save hdary85/83dc7be5c3ffd3bed4a02231ec74c970 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.decomposition import PCA | |
| from sklearn.cluster import DBSCAN | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| # === ÉTAPE 1 : Préparation des données === | |
| # Sélection des colonnes utilisées pour la détection d'anomalies | |
| features_cols = [ | |
| 'TV_COUNT', 'TV_TOTAL_AMOUNT', 'TV_AVG_AMOUNT', 'TV_RISQUE_SECTEUR', | |
| 'TV_ORIGINATOR_COUNT', 'TV_BENEFICIARY_COUNT', | |
| 'TV_ORIGINATOR_SUM', 'TV_BENEFICIARY_SUM', | |
| 'ESPECES_NBR_TRX', 'ESPECES_TOTAL', 'RATIO_ESPECES_TO_TV' | |
| ] | |
| one_hot_cols = [col for col in df_merged.columns if col.startswith('PTYPE_')] | |
| X = pd.concat([df_merged[features_cols], df_merged[one_hot_cols]], axis=1) | |
| # Standardisation des données | |
| scaler = StandardScaler() | |
| X_scaled = scaler.fit_transform(X) | |
| # === ÉTAPE 2 : DBSCAN === | |
| # DBSCAN (Density-Based Spatial Clustering of Applications with Noise) | |
| # repère les points qui ne sont pas proches de groupes denses — ce sont les anomalies | |
| dbscan = DBSCAN(eps=0.5, min_samples=5) | |
| dbscan_labels = dbscan.fit_predict(X_scaled) | |
| # Label -1 = anomalie | |
| df_merged['dbscan_label'] = dbscan_labels | |
| df_merged['dbscan_anomaly'] = (dbscan_labels == -1).astype(int) | |
| # === ÉTAPE 3 : Identifier les 2 variables les plus contributives === | |
| standardized = (df_merged[features_cols] - df_merged[features_cols].mean()) / df_merged[features_cols].std() | |
| suspects = df_merged[df_merged['dbscan_anomaly'] == 1].copy() | |
| standardized_suspects = standardized.loc[suspects.index] | |
| def get_top_2_vars(row_std, row_orig): | |
| top_vars = row_std.abs().sort_values(ascending=False).head(2).index | |
| return [(var, row_orig[var], "élevé" if row_std[var] > 0 else "faible") for var in top_vars] | |
| top2_by_client = [] | |
| for idx in suspects.index: | |
| top2_by_client.append(get_top_2_vars(standardized_suspects.loc[idx], df_merged.loc[idx])) | |
| df_merged.loc[suspects.index, 'top_var_1'] = [f"{v[0]} ({v[2]}, {v[1]:,.2f})" for v in [x[0] for x in top2_by_client]] | |
| df_merged.loc[suspects.index, 'top_var_2'] = [f"{v[0]} ({v[2]}, {v[1]:,.2f})" for v in [x[1] for x in top2_by_client]] | |
| # === ÉTAPE 4 : Visualisation (PCA) === | |
| # Permet de projeter les données en 2D pour visualiser où sont les anomalies | |
| pca = PCA(n_components=2) | |
| pca_result = pca.fit_transform(X_scaled) | |
| df_merged['pca_1'] = pca_result[:, 0] | |
| df_merged['pca_2'] = pca_result[:, 1] | |
| plt.figure(figsize=(10, 6)) | |
| sns.scatterplot(data=df_merged, x='pca_1', y='pca_2', | |
| hue='dbscan_anomaly', palette={0: 'blue', 1: 'red'}, alpha=0.6) | |
| plt.title("📊 DBSCAN - Anomalies détectées (Projection PCA)") | |
| plt.xlabel("Composante principale 1") | |
| plt.ylabel("Composante principale 2") | |
| plt.legend(title='Anomalie') | |
| plt.grid(True) | |
| plt.show() | |
| # === ÉTAPE 5 : Résumé final et export === | |
| final_alerts = df_merged[df_merged['dbscan_anomaly'] == 1][[ | |
| 'PARTY_KEY', 'top_var_1', 'top_var_2', 'dbscan_label' | |
| ]] | |
| final_alerts.to_csv("clients_suspects_dbscan.csv", index=False) | |
| print("✅ Export terminé : clients_suspects_dbscan.csv") | |
| print(f"🔍 {len(final_alerts)} clients identifiés comme suspects par DBSCAN.") | |
| # Étape 1 : Recalculer les z-scores pour toutes les features utilisées | |
| standardized = (df_merged[features_cols] - df_merged[features_cols].mean()) / df_merged[features_cols].std() | |
| # Étape 2 : Récupérer uniquement les anomalies détectées par DBSCAN | |
| suspects = df_merged[df_merged['dbscan_anomaly'] == 1].copy() | |
| standardized_suspects = standardized.loc[suspects.index] | |
| # Étape 3 : Calcul du score de suspicion global (moyenne des z-scores absolus sur toutes les features) | |
| suspicion_scores = standardized_suspects.abs().mean(axis=1) | |
| df_merged.loc[suspects.index, 'suspicion_score'] = suspicion_scores | |
| # Étape 4 : Identifier les indices des 10 clients les plus suspects | |
| top10_indices = suspicion_scores.sort_values(ascending=False).head(10).index | |
| top_10_std = standardized.loc[top10_indices] | |
| top_10_orig = df_merged.loc[top10_indices] | |
| # Étape 5 : Identifier les 2 variables les plus contributives par client | |
| top_vars_list = [] | |
| for idx in top10_indices: | |
| row_std = top_10_std.loc[idx] | |
| row_orig = top_10_orig.loc[idx] | |
| top_vars = row_std.abs().sort_values(ascending=False).head(2).index | |
| formatted = [ | |
| f"{var} ({'élevé' if row_std[var] > 0 else 'faible'}, {row_orig[var]:,.2f})" | |
| for var in top_vars | |
| ] | |
| top_vars_list.append(formatted) | |
| # Étape 6 : Construction du DataFrame final | |
| df_final_top10 = top_10_orig.copy() | |
| df_final_top10['suspicion_score'] = suspicion_scores.loc[top10_indices] | |
| df_final_top10['top_var_1'] = [v[0] for v in top_vars_list] | |
| df_final_top10['top_var_2'] = [v[1] for v in top_vars_list] | |
| # Étape 7 : Affichage final du top 10 dans un environnement local | |
| print("🔝 Top 10 clients les plus suspects (DBSCAN + score global + variables contributives) :\n") | |
| print(df_final_top10[['PARTY_KEY', 'suspicion_score', 'top_var_1', 'top_var_2']]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment