Skip to content

Instantly share code, notes, and snippets.

@hdary85
Created April 7, 2025 14:29
Show Gist options
  • Select an option

  • Save hdary85/f9aae699405746e98d506408d0587a59 to your computer and use it in GitHub Desktop.

Select an option

Save hdary85/f9aae699405746e98d506408d0587a59 to your computer and use it in GitHub Desktop.
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.cluster import DBSCAN
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import seaborn as sns
# --- 1. Préparation du jeu de données ---
# Suppose que df_merged est déjà chargé avec les colonnes suivantes :
features_cols = [
'TV_COUNT', 'TV_TOTAL_AMOUNT', 'TV_AVG_AMOUNT', 'TV_RISQUE_SECTEUR',
'TV_ORIGINATOR_COUNT', 'TV_BENEFICIARY_COUNT', 'TV_ORIGINATOR_SUM', 'TV_BENEFICIARY_SUM',
'ESPECES_NBR_TRX', 'ESPECES_TOTAL', 'RATIO_ESPECES_TO_TV'
]
one_hot_cols = [col for col in df_merged.columns if col.startswith('PTYPE_')]
features = pd.concat([df_merged[features_cols], df_merged[one_hot_cols]], axis=1)
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)
# --- 2. Isolation Forest ---
# Détection d'anomalies par isolement dans une forêt d’arbres aléatoires
iso_forest = IsolationForest(contamination=0.05, random_state=42)
df_merged['iforest_label'] = iso_forest.fit_predict(features_scaled)
df_merged['iforest_score'] = iso_forest.decision_function(features_scaled)
# --- 3. Autoencodeur ---
# Réseau de neurones entraîné à reconstruire les données normales
X_train, X_val = train_test_split(features_scaled, test_size=0.2, random_state=42)
input_dim = features_scaled.shape[1]
encoding_dim = max(int(input_dim / 2), 1)
input_layer = Input(shape=(input_dim,))
encoded = Dense(encoding_dim, activation='relu')(input_layer)
encoded = Dense(max(int(encoding_dim / 2), 1), activation='relu')(encoded)
decoded = Dense(encoding_dim, activation='relu')(encoded)
decoded = Dense(input_dim, activation='linear')(decoded)
autoencoder = Model(inputs=input_layer, outputs=decoded)
autoencoder.compile(optimizer='adam', loss='mse')
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
autoencoder.fit(X_train, X_train,
epochs=100, batch_size=32, shuffle=True,
validation_data=(X_val, X_val), callbacks=[early_stop], verbose=0)
reconstructions = autoencoder.predict(features_scaled)
reconstruction_errors = np.mean(np.square(features_scaled - reconstructions), axis=1)
df_merged['ae_reconstruction_error'] = reconstruction_errors
ae_threshold = np.percentile(reconstruction_errors, 95)
df_merged['ae_label'] = (df_merged['ae_reconstruction_error'] > ae_threshold).astype(int)
# --- 4. DBSCAN ---
# Algorithme de clustering basé sur la densité
dbscan = DBSCAN(eps=0.5, min_samples=5)
df_merged['dbscan_label'] = dbscan.fit_predict(features_scaled)
df_merged['dbscan_anomaly'] = (df_merged['dbscan_label'] == -1).astype(int)
# --- 5. Fusion des résultats ---
df_merged['combined_anomaly'] = (
(df_merged['iforest_label'] == -1) |
(df_merged['ae_label'] == 1) |
(df_merged['dbscan_anomaly'] == 1)
).astype(int)
def determine_alert_reason(row):
reasons = []
if row['iforest_label'] == -1:
reasons.append("IsolationForest")
if row['ae_label'] == 1:
reasons.append("Autoencodeur")
if row['dbscan_anomaly'] == 1:
reasons.append("DBSCAN")
return "Anomalie détectée par " + ", ".join(reasons) if reasons else ""
df_merged['alert_reason'] = df_merged.apply(determine_alert_reason, axis=1)
# --- 6. Analyse des variables contributives ---
standardized = (df_merged[features_cols] - df_merged[features_cols].mean()) / df_merged[features_cols].std()
suspects = df_merged[df_merged['combined_anomaly'] == 1].copy()
standardized_suspects = standardized.loc[suspects.index]
def get_top_2_vars(row_std, row_orig):
top_vars = row_std.abs().sort_values(ascending=False).head(2).index
return [(var, row_orig[var], "élevé" if row_std[var] > 0 else "faible") for var in top_vars]
top2_by_client = []
for idx in suspects.index:
top2_by_client.append(get_top_2_vars(standardized_suspects.loc[idx], df_merged.loc[idx]))
df_merged.loc[suspects.index, 'top_var_1'] = [f"{v[0]} ({v[2]}, {v[1]:,.2f})" for v in [x[0] for x in top2_by_client]]
df_merged.loc[suspects.index, 'top_var_2'] = [f"{v[0]} ({v[2]}, {v[1]:,.2f})" for v in [x[1] for x in top2_by_client]]
final_alerts = df_merged[df_merged['combined_anomaly'] == 1][[
'PARTY_KEY', 'alert_reason', 'top_var_1', 'top_var_2',
'iforest_score', 'ae_reconstruction_error', 'dbscan_label'
]]
# --- 7. Visualisation graphique pédagogique ---
# 📊 Isolation Forest : score vs montant
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df_merged, x='TV_TOTAL_AMOUNT', y='iforest_score',
hue=(df_merged['iforest_label'] == -1),
palette={True: 'red', False: 'blue'}, alpha=0.6)
plt.axhline(0, linestyle='--', color='black')
plt.title("📊 Isolation Forest : Score vs Montant total des télévirements")
plt.xlabel("Montant total des télévirements")
plt.ylabel("Score d'isolement")
plt.legend(title='Anomalie', labels=['Normal', 'Suspect'])
plt.grid(True)
plt.show()
# 📊 Autoencodeur : erreur vs ratio espèces/TV
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df_merged, x='RATIO_ESPECES_TO_TV', y='ae_reconstruction_error',
hue=(df_merged['ae_label'] == 1),
palette={True: 'orange', False: 'green'}, alpha=0.6)
plt.axhline(ae_threshold, linestyle='--', color='black', label='Seuil')
plt.title("📊 Autoencodeur : Erreur vs Ratio espèces / télévirements")
plt.xlabel("Ratio espèces / télévirements")
plt.ylabel("Erreur de reconstruction")
plt.legend(title='Anomalie', labels=['Normal', 'Suspect'])
plt.grid(True)
plt.show()
# 📊 DBSCAN : projection PCA
pca = PCA(n_components=2)
pca_result = pca.fit_transform(features_scaled)
df_merged['pca_1'] = pca_result[:, 0]
df_merged['pca_2'] = pca_result[:, 1]
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df_merged, x='pca_1', y='pca_2',
hue='dbscan_label', palette='Set1', alpha=0.6)
plt.title("📊 DBSCAN : Clusters détectés (PCA)")
plt.xlabel("Composante principale 1")
plt.ylabel("Composante principale 2")
plt.legend(title='Cluster')
plt.grid(True)
plt.show()
# --- 8. Export final si besoin ---
final_alerts.to_csv("clients_suspects_anomalies.csv", index=False)
print("Export terminé : clients_suspects_anomalies.csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment