Created
October 17, 2025 20:14
-
-
Save VVX7/94a76d3307a6b8ec326e744dc8fcecd6 to your computer and use it in GitHub Desktop.
suspend okta account
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Automated Account Suspension for Compromised User Accounts | |
| Triggered by: | |
| - Impossible travel detection | |
| - Credential stuffing attempts | |
| - Anomalous behavior alerts | |
| Actions performed: | |
| 1. Suspend Okta account | |
| 2. Revoke all active sessions | |
| 3. Reset MFA factors | |
| Usage: | |
| python automated_account_suspension.py | |
| Environment Variables Required: | |
| - OKTA_ORG_URL | |
| - OKTA_API_TOKEN | |
| - AWS_REGION | |
| - SNS_ALERT_TOPIC (optional) | |
| """ | |
| import os | |
| import boto3 | |
| import json | |
| from datetime import datetime | |
| from typing import Dict | |
| from okta.client import Client as OktaClient | |
| import asyncio | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class AutomatedAccountSuspension: | |
| """ | |
| Automated response for compromised user accounts | |
| Triggered by: | |
| - Impossible travel detection | |
| - Credential stuffing attempts | |
| - Anomalous behavior alerts | |
| Actions performed: | |
| 1. Suspend Okta account | |
| 2. Revoke all active sessions | |
| 3. Reset MFA factors | |
| """ | |
| def __init__(self): | |
| required_env_vars = [ | |
| 'OKTA_ORG_URL', | |
| 'OKTA_API_TOKEN', | |
| 'AWS_REGION' | |
| ] | |
| missing_vars = [var for var in required_env_vars if not os.environ.get(var)] | |
| if missing_vars: | |
| raise EnvironmentError(f"Missing required env vars: {missing_vars}") | |
| self.okta_client = OktaClient({ | |
| 'orgUrl': os.environ['OKTA_ORG_URL'], | |
| 'token': os.environ['OKTA_API_TOKEN'] | |
| }) | |
| aws_region = os.environ.get('AWS_REGION', 'us-east-1') | |
| self.sns = boto3.client('sns', region_name=aws_region) | |
| self.dynamodb = boto3.resource('dynamodb', region_name=aws_region) | |
| self.max_retries = 3 | |
| async def suspend_compromised_account( | |
| self, | |
| user_id: str, | |
| alert: Dict | |
| ) -> Dict[str, any]: | |
| """ | |
| Suspend user account and revoke all active sessions | |
| Args: | |
| user_id: Okta user ID or email | |
| alert: Alert details from detection system | |
| Returns: | |
| Dict with action results and metadata | |
| """ | |
| logger.info(f"Starting account suspension for: {user_id}") | |
| response_summary = { | |
| 'user_id': user_id, | |
| 'alert_id': alert.get('alert_id'), | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'actions': {}, | |
| 'success': False, | |
| 'errors': [] | |
| } | |
| try: | |
| user = await self._get_user(user_id) | |
| if not user: | |
| raise ValueError(f"User not found: {user_id}") | |
| response_summary['user_email'] = user.profile.email | |
| suspend_result = await self._suspend_okta_account(user) | |
| response_summary['actions']['account_suspended'] = suspend_result | |
| sessions_result = await self._revoke_user_sessions(user_id) | |
| response_summary['actions']['sessions_revoked'] = sessions_result | |
| mfa_result = await self._reset_mfa_factors(user_id) | |
| response_summary['actions']['mfa_reset'] = mfa_result | |
| critical_actions = ['account_suspended', 'sessions_revoked'] | |
| response_summary['success'] = all( | |
| response_summary['actions'].get(action, False) | |
| for action in critical_actions | |
| ) | |
| await self._log_suspension(response_summary) | |
| logger.info(f"Account suspension completed for {user_id}: {response_summary}") | |
| return response_summary | |
| except Exception as e: | |
| logger.error(f"Account suspension failed for {user_id}: {e}", exc_info=True) | |
| response_summary['errors'].append(str(e)) | |
| response_summary['success'] = False | |
| await self._log_suspension(response_summary) | |
| await self._alert_on_failure(user_id, alert, str(e)) | |
| return response_summary | |
| async def _get_user(self, user_id: str): | |
| """ | |
| Retrieve user from Okta with retry logic | |
| Args: | |
| user_id: Okta user ID or email | |
| Returns: | |
| User object or None | |
| """ | |
| for attempt in range(self.max_retries): | |
| try: | |
| user, _, err = await self.okta_client.get_user(user_id) | |
| if err: | |
| raise Exception(f"Okta API error: {err}") | |
| return user | |
| except Exception as e: | |
| logger.warning(f"Attempt {attempt + 1}/{self.max_retries} failed: {e}") | |
| if attempt < self.max_retries - 1: | |
| await asyncio.sleep(2 ** attempt) | |
| else: | |
| logger.error(f"Failed to retrieve user after {self.max_retries} attempts") | |
| return None | |
| async def _suspend_okta_account(self, user) -> bool: | |
| """Suspend user in Okta""" | |
| try: | |
| _, err = await user.suspend() | |
| if err: | |
| raise Exception(f"Okta suspend error: {err}") | |
| logger.info(f"Okta account suspended: {user.id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to suspend Okta account: {e}") | |
| return False | |
| async def _revoke_user_sessions(self, user_id: str) -> bool: | |
| """Revoke all active sessions for user""" | |
| try: | |
| sessions, _, err = await self.okta_client.list_user_sessions(user_id) | |
| if err: | |
| raise Exception(f"Failed to list sessions: {err}") | |
| if not sessions: | |
| logger.info(f"No active sessions found for {user_id}") | |
| return True | |
| session_count = 0 | |
| for session in sessions: | |
| try: | |
| _, err = await self.okta_client.delete_user_session(user_id, session.id) | |
| if err: | |
| logger.warning(f"Failed to delete session {session.id}: {err}") | |
| else: | |
| session_count += 1 | |
| except Exception as e: | |
| logger.warning(f"Error deleting session {session.id}: {e}") | |
| logger.info(f"Revoked {session_count} sessions for: {user_id}") | |
| return session_count > 0 or len(sessions) == 0 | |
| except Exception as e: | |
| logger.error(f"Failed to revoke sessions: {e}") | |
| return False | |
| async def _reset_mfa_factors(self, user_id: str) -> bool: | |
| """Reset MFA factors (user must re-enroll)""" | |
| try: | |
| factors, _, err = await self.okta_client.list_factors(user_id) | |
| if err: | |
| raise Exception(f"Failed to list factors: {err}") | |
| if not factors: | |
| logger.info(f"No MFA factors found for {user_id}") | |
| return True | |
| factor_count = 0 | |
| for factor in factors: | |
| try: | |
| _, err = await self.okta_client.delete_factor(user_id, factor.id) | |
| if err: | |
| logger.warning(f"Failed to delete factor {factor.id}: {err}") | |
| else: | |
| factor_count += 1 | |
| except Exception as e: | |
| logger.warning(f"Error deleting factor {factor.id}: {e}") | |
| logger.info(f"Reset {factor_count} MFA factors for: {user_id}") | |
| return factor_count > 0 or len(factors) == 0 | |
| except Exception as e: | |
| logger.error(f"Failed to reset MFA: {e}") | |
| return False | |
| async def _log_suspension(self, response_summary: Dict) -> bool: | |
| """Log suspension in DynamoDB audit trail""" | |
| try: | |
| table = self.dynamodb.Table('security-response-audit-log') | |
| # Retention period for compliance | |
| ttl = int((datetime.timestamp() + (365 * 7 * 24 * 60 * 60))) | |
| item = { | |
| 'timestamp': response_summary['timestamp'], | |
| 'action_type': 'account_suspension', | |
| 'user_id': response_summary['user_id'], | |
| 'user_email': response_summary.get('user_email'), | |
| 'alert_id': response_summary.get('alert_id'), | |
| 'actions': response_summary.get('actions', {}), | |
| 'success': response_summary['success'], | |
| 'errors': response_summary.get('errors', []), | |
| 'automated': True, | |
| 'ttl': ttl | |
| } | |
| table.put_item(Item=item) | |
| logger.info(f"Logged to audit trail: {response_summary['user_id']}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to log to audit trail: {e}") | |
| return False | |
| async def _alert_on_failure(self, user_id: str, alert: Dict, error: str): | |
| """Send SNS alert when automation fails""" | |
| try: | |
| sns_topic = os.environ.get('SNS_ALERT_TOPIC') | |
| if not sns_topic: | |
| logger.warning("SNS_ALERT_TOPIC not configured, skipping failure alert") | |
| return | |
| message = { | |
| 'alert_type': 'automation_failure', | |
| 'user_id': user_id, | |
| 'original_alert': alert, | |
| 'error': error, | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'action_required': 'Manual intervention required - automated suspension failed' | |
| } | |
| self.sns.publish( | |
| TopicArn=sns_topic, | |
| Subject='Account Suspension Automation Failed', | |
| Message=json.dumps(message, indent=2) | |
| ) | |
| logger.info(f"Failure alert sent via SNS for {user_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to send failure alert: {e}") | |
| async def main(): | |
| """Example usage of automated account suspension""" | |
| automation = AutomatedAccountSuspension() | |
| alert = { | |
| 'alert_id': 'ALT-2024-00123', | |
| 'title': 'Impossible Travel: Login from US then China within 1 hour', | |
| 'created_time': '2024-10-16T10:30:00Z', | |
| 'description': 'User authenticated from New York, then Shanghai 45 minutes later', | |
| 'severity': 'CRITICAL', | |
| 'user_id': '[email protected]' | |
| } | |
| result = await automation.suspend_compromised_account( | |
| user_id=alert['user_id'], | |
| alert=alert | |
| ) | |
| print(json.dumps(result, indent=2)) | |
| if __name__ == '__main__': | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment