Skip to content

Instantly share code, notes, and snippets.

@VVX7
Created October 17, 2025 20:14
Show Gist options
  • Select an option

  • Save VVX7/94a76d3307a6b8ec326e744dc8fcecd6 to your computer and use it in GitHub Desktop.

Select an option

Save VVX7/94a76d3307a6b8ec326e744dc8fcecd6 to your computer and use it in GitHub Desktop.
suspend okta account
"""
Automated Account Suspension for Compromised User Accounts
Triggered by:
- Impossible travel detection
- Credential stuffing attempts
- Anomalous behavior alerts
Actions performed:
1. Suspend Okta account
2. Revoke all active sessions
3. Reset MFA factors
Usage:
python automated_account_suspension.py
Environment Variables Required:
- OKTA_ORG_URL
- OKTA_API_TOKEN
- AWS_REGION
- SNS_ALERT_TOPIC (optional)
"""
import os
import boto3
import json
from datetime import datetime
from typing import Dict
from okta.client import Client as OktaClient
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AutomatedAccountSuspension:
"""
Automated response for compromised user accounts
Triggered by:
- Impossible travel detection
- Credential stuffing attempts
- Anomalous behavior alerts
Actions performed:
1. Suspend Okta account
2. Revoke all active sessions
3. Reset MFA factors
"""
def __init__(self):
required_env_vars = [
'OKTA_ORG_URL',
'OKTA_API_TOKEN',
'AWS_REGION'
]
missing_vars = [var for var in required_env_vars if not os.environ.get(var)]
if missing_vars:
raise EnvironmentError(f"Missing required env vars: {missing_vars}")
self.okta_client = OktaClient({
'orgUrl': os.environ['OKTA_ORG_URL'],
'token': os.environ['OKTA_API_TOKEN']
})
aws_region = os.environ.get('AWS_REGION', 'us-east-1')
self.sns = boto3.client('sns', region_name=aws_region)
self.dynamodb = boto3.resource('dynamodb', region_name=aws_region)
self.max_retries = 3
async def suspend_compromised_account(
self,
user_id: str,
alert: Dict
) -> Dict[str, any]:
"""
Suspend user account and revoke all active sessions
Args:
user_id: Okta user ID or email
alert: Alert details from detection system
Returns:
Dict with action results and metadata
"""
logger.info(f"Starting account suspension for: {user_id}")
response_summary = {
'user_id': user_id,
'alert_id': alert.get('alert_id'),
'timestamp': datetime.utcnow().isoformat(),
'actions': {},
'success': False,
'errors': []
}
try:
user = await self._get_user(user_id)
if not user:
raise ValueError(f"User not found: {user_id}")
response_summary['user_email'] = user.profile.email
suspend_result = await self._suspend_okta_account(user)
response_summary['actions']['account_suspended'] = suspend_result
sessions_result = await self._revoke_user_sessions(user_id)
response_summary['actions']['sessions_revoked'] = sessions_result
mfa_result = await self._reset_mfa_factors(user_id)
response_summary['actions']['mfa_reset'] = mfa_result
critical_actions = ['account_suspended', 'sessions_revoked']
response_summary['success'] = all(
response_summary['actions'].get(action, False)
for action in critical_actions
)
await self._log_suspension(response_summary)
logger.info(f"Account suspension completed for {user_id}: {response_summary}")
return response_summary
except Exception as e:
logger.error(f"Account suspension failed for {user_id}: {e}", exc_info=True)
response_summary['errors'].append(str(e))
response_summary['success'] = False
await self._log_suspension(response_summary)
await self._alert_on_failure(user_id, alert, str(e))
return response_summary
async def _get_user(self, user_id: str):
"""
Retrieve user from Okta with retry logic
Args:
user_id: Okta user ID or email
Returns:
User object or None
"""
for attempt in range(self.max_retries):
try:
user, _, err = await self.okta_client.get_user(user_id)
if err:
raise Exception(f"Okta API error: {err}")
return user
except Exception as e:
logger.warning(f"Attempt {attempt + 1}/{self.max_retries} failed: {e}")
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt)
else:
logger.error(f"Failed to retrieve user after {self.max_retries} attempts")
return None
async def _suspend_okta_account(self, user) -> bool:
"""Suspend user in Okta"""
try:
_, err = await user.suspend()
if err:
raise Exception(f"Okta suspend error: {err}")
logger.info(f"Okta account suspended: {user.id}")
return True
except Exception as e:
logger.error(f"Failed to suspend Okta account: {e}")
return False
async def _revoke_user_sessions(self, user_id: str) -> bool:
"""Revoke all active sessions for user"""
try:
sessions, _, err = await self.okta_client.list_user_sessions(user_id)
if err:
raise Exception(f"Failed to list sessions: {err}")
if not sessions:
logger.info(f"No active sessions found for {user_id}")
return True
session_count = 0
for session in sessions:
try:
_, err = await self.okta_client.delete_user_session(user_id, session.id)
if err:
logger.warning(f"Failed to delete session {session.id}: {err}")
else:
session_count += 1
except Exception as e:
logger.warning(f"Error deleting session {session.id}: {e}")
logger.info(f"Revoked {session_count} sessions for: {user_id}")
return session_count > 0 or len(sessions) == 0
except Exception as e:
logger.error(f"Failed to revoke sessions: {e}")
return False
async def _reset_mfa_factors(self, user_id: str) -> bool:
"""Reset MFA factors (user must re-enroll)"""
try:
factors, _, err = await self.okta_client.list_factors(user_id)
if err:
raise Exception(f"Failed to list factors: {err}")
if not factors:
logger.info(f"No MFA factors found for {user_id}")
return True
factor_count = 0
for factor in factors:
try:
_, err = await self.okta_client.delete_factor(user_id, factor.id)
if err:
logger.warning(f"Failed to delete factor {factor.id}: {err}")
else:
factor_count += 1
except Exception as e:
logger.warning(f"Error deleting factor {factor.id}: {e}")
logger.info(f"Reset {factor_count} MFA factors for: {user_id}")
return factor_count > 0 or len(factors) == 0
except Exception as e:
logger.error(f"Failed to reset MFA: {e}")
return False
async def _log_suspension(self, response_summary: Dict) -> bool:
"""Log suspension in DynamoDB audit trail"""
try:
table = self.dynamodb.Table('security-response-audit-log')
# Retention period for compliance
ttl = int((datetime.timestamp() + (365 * 7 * 24 * 60 * 60)))
item = {
'timestamp': response_summary['timestamp'],
'action_type': 'account_suspension',
'user_id': response_summary['user_id'],
'user_email': response_summary.get('user_email'),
'alert_id': response_summary.get('alert_id'),
'actions': response_summary.get('actions', {}),
'success': response_summary['success'],
'errors': response_summary.get('errors', []),
'automated': True,
'ttl': ttl
}
table.put_item(Item=item)
logger.info(f"Logged to audit trail: {response_summary['user_id']}")
return True
except Exception as e:
logger.error(f"Failed to log to audit trail: {e}")
return False
async def _alert_on_failure(self, user_id: str, alert: Dict, error: str):
"""Send SNS alert when automation fails"""
try:
sns_topic = os.environ.get('SNS_ALERT_TOPIC')
if not sns_topic:
logger.warning("SNS_ALERT_TOPIC not configured, skipping failure alert")
return
message = {
'alert_type': 'automation_failure',
'user_id': user_id,
'original_alert': alert,
'error': error,
'timestamp': datetime.utcnow().isoformat(),
'action_required': 'Manual intervention required - automated suspension failed'
}
self.sns.publish(
TopicArn=sns_topic,
Subject='Account Suspension Automation Failed',
Message=json.dumps(message, indent=2)
)
logger.info(f"Failure alert sent via SNS for {user_id}")
except Exception as e:
logger.error(f"Failed to send failure alert: {e}")
async def main():
"""Example usage of automated account suspension"""
automation = AutomatedAccountSuspension()
alert = {
'alert_id': 'ALT-2024-00123',
'title': 'Impossible Travel: Login from US then China within 1 hour',
'created_time': '2024-10-16T10:30:00Z',
'description': 'User authenticated from New York, then Shanghai 45 minutes later',
'severity': 'CRITICAL',
'user_id': '[email protected]'
}
result = await automation.suspend_compromised_account(
user_id=alert['user_id'],
alert=alert
)
print(json.dumps(result, indent=2))
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment