Skip to content

Instantly share code, notes, and snippets.

@hdary85
Last active April 7, 2025 15:05
Show Gist options
  • Select an option

  • Save hdary85/83dc7be5c3ffd3bed4a02231ec74c970 to your computer and use it in GitHub Desktop.

Select an option

Save hdary85/83dc7be5c3ffd3bed4a02231ec74c970 to your computer and use it in GitHub Desktop.
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
import matplotlib.pyplot as plt
import seaborn as sns
# === ÉTAPE 1 : Préparation des données ===
# Sélection des colonnes utilisées pour la détection d'anomalies
features_cols = [
'TV_COUNT', 'TV_TOTAL_AMOUNT', 'TV_AVG_AMOUNT', 'TV_RISQUE_SECTEUR',
'TV_ORIGINATOR_COUNT', 'TV_BENEFICIARY_COUNT',
'TV_ORIGINATOR_SUM', 'TV_BENEFICIARY_SUM',
'ESPECES_NBR_TRX', 'ESPECES_TOTAL', 'RATIO_ESPECES_TO_TV'
]
one_hot_cols = [col for col in df_merged.columns if col.startswith('PTYPE_')]
X = pd.concat([df_merged[features_cols], df_merged[one_hot_cols]], axis=1)
# Standardisation des données
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# === ÉTAPE 2 : DBSCAN ===
# DBSCAN (Density-Based Spatial Clustering of Applications with Noise)
# repère les points qui ne sont pas proches de groupes denses — ce sont les anomalies
dbscan = DBSCAN(eps=0.5, min_samples=5)
dbscan_labels = dbscan.fit_predict(X_scaled)
# Label -1 = anomalie
df_merged['dbscan_label'] = dbscan_labels
df_merged['dbscan_anomaly'] = (dbscan_labels == -1).astype(int)
# === ÉTAPE 3 : Identifier les 2 variables les plus contributives ===
standardized = (df_merged[features_cols] - df_merged[features_cols].mean()) / df_merged[features_cols].std()
suspects = df_merged[df_merged['dbscan_anomaly'] == 1].copy()
standardized_suspects = standardized.loc[suspects.index]
def get_top_2_vars(row_std, row_orig):
top_vars = row_std.abs().sort_values(ascending=False).head(2).index
return [(var, row_orig[var], "élevé" if row_std[var] > 0 else "faible") for var in top_vars]
top2_by_client = []
for idx in suspects.index:
top2_by_client.append(get_top_2_vars(standardized_suspects.loc[idx], df_merged.loc[idx]))
df_merged.loc[suspects.index, 'top_var_1'] = [f"{v[0]} ({v[2]}, {v[1]:,.2f})" for v in [x[0] for x in top2_by_client]]
df_merged.loc[suspects.index, 'top_var_2'] = [f"{v[0]} ({v[2]}, {v[1]:,.2f})" for v in [x[1] for x in top2_by_client]]
# === ÉTAPE 4 : Visualisation (PCA) ===
# Permet de projeter les données en 2D pour visualiser où sont les anomalies
pca = PCA(n_components=2)
pca_result = pca.fit_transform(X_scaled)
df_merged['pca_1'] = pca_result[:, 0]
df_merged['pca_2'] = pca_result[:, 1]
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df_merged, x='pca_1', y='pca_2',
hue='dbscan_anomaly', palette={0: 'blue', 1: 'red'}, alpha=0.6)
plt.title("📊 DBSCAN - Anomalies détectées (Projection PCA)")
plt.xlabel("Composante principale 1")
plt.ylabel("Composante principale 2")
plt.legend(title='Anomalie')
plt.grid(True)
plt.show()
# === ÉTAPE 5 : Résumé final et export ===
final_alerts = df_merged[df_merged['dbscan_anomaly'] == 1][[
'PARTY_KEY', 'top_var_1', 'top_var_2', 'dbscan_label'
]]
final_alerts.to_csv("clients_suspects_dbscan.csv", index=False)
print("✅ Export terminé : clients_suspects_dbscan.csv")
print(f"🔍 {len(final_alerts)} clients identifiés comme suspects par DBSCAN.")
# Étape 1 : Recalculer les z-scores pour toutes les features utilisées
standardized = (df_merged[features_cols] - df_merged[features_cols].mean()) / df_merged[features_cols].std()
# Étape 2 : Récupérer uniquement les anomalies détectées par DBSCAN
suspects = df_merged[df_merged['dbscan_anomaly'] == 1].copy()
standardized_suspects = standardized.loc[suspects.index]
# Étape 3 : Calcul du score de suspicion global (moyenne des z-scores absolus sur toutes les features)
suspicion_scores = standardized_suspects.abs().mean(axis=1)
df_merged.loc[suspects.index, 'suspicion_score'] = suspicion_scores
# Étape 4 : Identifier les indices des 10 clients les plus suspects
top10_indices = suspicion_scores.sort_values(ascending=False).head(10).index
top_10_std = standardized.loc[top10_indices]
top_10_orig = df_merged.loc[top10_indices]
# Étape 5 : Identifier les 2 variables les plus contributives par client
top_vars_list = []
for idx in top10_indices:
row_std = top_10_std.loc[idx]
row_orig = top_10_orig.loc[idx]
top_vars = row_std.abs().sort_values(ascending=False).head(2).index
formatted = [
f"{var} ({'élevé' if row_std[var] > 0 else 'faible'}, {row_orig[var]:,.2f})"
for var in top_vars
]
top_vars_list.append(formatted)
# Étape 6 : Construction du DataFrame final
df_final_top10 = top_10_orig.copy()
df_final_top10['suspicion_score'] = suspicion_scores.loc[top10_indices]
df_final_top10['top_var_1'] = [v[0] for v in top_vars_list]
df_final_top10['top_var_2'] = [v[1] for v in top_vars_list]
# Étape 7 : Affichage final du top 10 dans un environnement local
print("🔝 Top 10 clients les plus suspects (DBSCAN + score global + variables contributives) :\n")
print(df_final_top10[['PARTY_KEY', 'suspicion_score', 'top_var_1', 'top_var_2']])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment