Skip to content

Instantly share code, notes, and snippets.

@iu2frl
Last active March 12, 2026 19:35
Show Gist options
  • Select an option

  • Save iu2frl/6285f157147edf028617cf692fd808d4 to your computer and use it in GitHub Desktop.

Select an option

Save iu2frl/6285f157147edf028617cf692fd808d4 to your computer and use it in GitHub Desktop.
ICOM RS-BA1 Python client
"""
Minimal script to connect to ICOM radio trough network connection of RS-BA1 and read frequency.
"""
import struct
import socket
import time
import random
# Radio config
RADIO_IP = "192.168.0.230"
CONTROL_PORT = 50001
CIV_PORT = 50002
USERNAME = b"admin" + b"\x00" * 11 # 16 bytes total
PASSWORD = b"password123" + b"\x00" * 5 # 16 bytes total
CLIENT_NAME = b"PythonClient" + b"\x00" * 4 # 16 bytes total
CLIENT_ID = random.randint(0x1000000, 0x7FFFFFFF)
transport_seq = 1
auth_seq = 0x30
sent_packets = {} # retransmit buffer: seq -> bytes
PASSCODE_SEQUENCE = bytes([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0x47, 0x5D, 0x4C, 0x42, 0x66, 0x20, 0x23, 0x46, 0x4E, 0x57, 0x45, 0x3D, 0x67, 0x76, 0x60, 0x41,
0x62, 0x39, 0x59, 0x2D, 0x68, 0x7E, 0x7C, 0x65, 0x7D, 0x49, 0x29, 0x72, 0x73, 0x78, 0x21, 0x6E,
0x5A, 0x5E, 0x4A, 0x3E, 0x71, 0x2C, 0x2A, 0x54, 0x3C, 0x3A, 0x63, 0x4F, 0x43, 0x75, 0x27, 0x79,
0x5B, 0x35, 0x70, 0x48, 0x6B, 0x56, 0x6F, 0x34, 0x32, 0x6C, 0x30, 0x61, 0x6D, 0x7B, 0x2F, 0x4B,
0x64, 0x38, 0x2B, 0x2E, 0x50, 0x40, 0x3F, 0x55, 0x33, 0x37, 0x25, 0x77, 0x24, 0x26, 0x74, 0x6A,
0x28, 0x53, 0x4D, 0x69, 0x22, 0x5C, 0x44, 0x31, 0x36, 0x58, 0x3B, 0x7A, 0x51, 0x5F, 0x52,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
])
def next_transport_seq() -> int:
global transport_seq
current = transport_seq
transport_seq = (transport_seq + 1) & 0xFFFF
return current
def next_auth_seq() -> int:
global auth_seq
current = auth_seq
auth_seq = (auth_seq + 1) & 0xFFFF
return current
def send_and_track(sock: socket.socket, data: bytes, port: int):
"""Send a packet and store it for potential retransmit."""
# Extract seq from packet
target_seq = struct.unpack("<H", data[6:8])[0]
sent_packets[target_seq] = (data, port)
sock.sendto(data, (RADIO_IP, port))
def handle_retransmit(sock: socket.socket, requested_seq: int):
"""Resend a packet if we have it."""
if requested_seq in sent_packets:
data, port = sent_packets[requested_seq]
print(f"→ Resending seq={requested_seq}")
sock.sendto(data, (RADIO_IP, port))
return True
return False
def encode_passcode_field(value: bytes) -> bytes:
result = bytearray(16)
for index, raw in enumerate(value[:16]):
if raw == 0:
break
mapped = raw + index
if mapped > 126:
mapped = 32 + (mapped % 127)
result[index] = PASSCODE_SEQUENCE[mapped]
return bytes(result)
def bcd_to_hz(bcd: bytes) -> int:
"""Convert 5-byte little-endian BCD to Hz."""
hz = 0
mult = 1
for byte in bcd:
low = byte & 0x0F
high = (byte >> 4) & 0x0F
hz += low * mult
mult *= 10
hz += high * mult
mult *= 10
return hz
def get_local_ip() -> str:
probe = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
probe.connect((RADIO_IP, CONTROL_PORT))
return probe.getsockname()[0]
finally:
probe.close()
def build_id(local_ip: str, port: int) -> int:
octets = [int(part) for part in local_ip.split(".")]
return ((octets[2] & 0xFF) << 24) | ((octets[3] & 0xFF) << 16) | (port & 0xFFFF)
def send_control_untracked(sock: socket.socket, pkt_type: int, sentid: int, rcvdid: int, fixed_seq: int):
"""Send an untracked control packet."""
data = struct.pack("<IHHII", 0x10, pkt_type, fixed_seq, sentid, rcvdid)
print(f"→ Control type={pkt_type:02x} seq={fixed_seq}")
print(f" Bytes: {data.hex()}")
sock.sendto(data, (RADIO_IP, CONTROL_PORT))
def send_control_tracked(sock: socket.socket, pkt_type: int, sentid: int, rcvdid: int):
"""Send a tracked control packet."""
s = next_transport_seq()
data = struct.pack("<IHHII", 0x10, pkt_type, s, sentid, rcvdid)
print(f"→ Control type={pkt_type:02x} seq={s}")
print(f" Bytes: {data.hex()}")
send_and_track(sock, data, CONTROL_PORT)
def recv_packet(sock: socket.socket, timeout=2.0) -> tuple[bytes, tuple]:
"""Receive a packet with timeout."""
sock.settimeout(timeout)
try:
data, addr = sock.recvfrom(1500)
return data, addr
except socket.timeout:
return None, None
except ConnectionResetError:
return None, None
def main():
# Create control socket
local_ip = get_local_ip()
control_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
control_sock.bind(("0.0.0.0", 0))
local_port = control_sock.getsockname()[1]
print(f"Connecting to {RADIO_IP}:{CONTROL_PORT}")
print(f"Client ID: 0x{CLIENT_ID:08x}")
print(f"Local IP: {local_ip}")
print(f"Local port: {local_port}\n")
# Step 1: Are you there?
print("=== Step 1: Are you there? ===")
send_control_untracked(control_sock, 0x03, CLIENT_ID, 0, 0x0000)
data, addr = recv_packet(control_sock)
if not data:
print("ERROR: No response to 'Are you there?'")
return
pkt = struct.unpack("<IHHII", data[:16])
print(f"← type={pkt[1]:02x} (I am here)")
if pkt[1] != 0x04:
print(f"ERROR: Expected type 0x04, got {pkt[1]:02x}")
return
RADIO_ID = pkt[3] # sentid from radio
print(f"Radio ID: 0x{RADIO_ID:08x}\n")
# Step 2: Are you ready?
print("=== Step 2: Are you ready? ===")
send_control_untracked(control_sock, 0x06, CLIENT_ID, RADIO_ID, 0x0001)
data, addr = recv_packet(control_sock)
if not data:
print("ERROR: No response to 'Are you ready?'")
return
pkt = struct.unpack("<IHHII", data[:16])
print(f"← type={pkt[1]:02x} (I am ready)\n")
if pkt[1] != 0x06:
print(f"ERROR: Expected type 0x06, got {pkt[1]:02x}")
return
# Step 3: Login
print("=== Step 3: Login ===")
login_seq = next_transport_seq()
login_inner_seq = next_auth_seq()
token_request = random.randint(0, 0xFFFF)
enc_user = encode_passcode_field(USERNAME)
enc_pass = encode_passcode_field(PASSWORD)
# Build login packet (manually to ensure correct endianness)
header = struct.pack("<IHHII", 0x80, 0x00, login_seq, CLIENT_ID, RADIO_ID)
# Payload with mixed endianness
payloadsize = 0x70 # 0x80 - 0x10 header = 112 bytes
requestreply = 0x01 # CRITICAL! Must be 0x01, not 0x00
requesttype = 0x00
innerseq = login_inner_seq
tokrequest = token_request
token = 0x00000000
payload = struct.pack(">I", payloadsize) # BIG endian
payload += struct.pack("<BB", requestreply, requesttype) # little
payload += struct.pack(">H", innerseq) # BIG endian
payload += bytes(2) # padding
payload += struct.pack("<HI", tokrequest, token) # little
payload += bytes(32) # reserved
payload += enc_user
payload += enc_pass
payload += CLIENT_NAME
payload += bytes(16) # reserved
login_pkt = header + payload
print(f"Sending login packet (len={len(login_pkt)})")
print(f"Payload: size={payloadsize} innerseq={innerseq}")
print(f"Login packet hex (first 48 bytes):")
print(f" {login_pkt[:48].hex()}")
send_and_track(control_sock, login_pkt, CONTROL_PORT)
# Wait for login response with token (handle retransmit requests)
for attempt in range(10):
data, addr = recv_packet(control_sock, timeout=3.0)
if not data:
print("ERROR: No login response")
return
print(f"← Received {len(data)} bytes")
print(f" Hex: {data.hex()}")
pkt = struct.unpack("<IHHII", data[:16])
pkt_type = pkt[1]
pkt_seq = pkt[2]
if pkt_type == 0x01:
print(f"← Retransmit request for seq={pkt_seq}")
handle_retransmit(control_sock, pkt_seq)
continue
elif pkt_type == 0x06:
print(f"← Duplicate 'I am ready' (ignoring)")
continue
elif pkt_type == 0x07:
# Ping - echo it back
if len(data) >= 21:
ping_pkt = bytearray(data)
# Set reply=0x01
struct.pack_into("<I", ping_pkt, 8, CLIENT_ID) # sentid
struct.pack_into("<I", ping_pkt, 12, RADIO_ID) # rcvdid
ping_pkt[16] = 0x01 # reply=0x01
print(f"← Ping, echoing back")
control_sock.sendto(bytes(ping_pkt), (RADIO_IP, CONTROL_PORT))
continue
elif pkt_type == 0x00:
# This is the login response we want
break
else:
print("ERROR: No login response after retransmits")
return
if len(data) < 32:
print(f"ERROR: Login response too short ({len(data)} bytes)")
return
pkt = struct.unpack("<IHHII", data[:16])
print(f"← type={pkt[1]:02x}")
if pkt[1] == 0x00:
# Parse login response
if len(data) >= 80:
innerseq_resp = struct.unpack(">H", data[22:24])[0]
tokrequest_resp = struct.unpack("<H", data[26:28])[0]
token = struct.unpack("<I", data[28:32])[0]
error = struct.unpack("<I", data[48:52])[0]
connection_name = data[64:80].rstrip(b"\x00").decode("ascii", errors="ignore")
print(
f"Login response: innerseq={innerseq_resp} tokrequest=0x{tokrequest_resp:04x} "
f"token=0x{token:08x} error=0x{error:08x} connection={connection_name}"
)
if error == 0x00000000:
print(f"✓ Login successful! Token: 0x{token:08x}\n")
else:
print(f"ERROR: Login failed with error 0x{error:08x}")
return
else:
print("ERROR: Can't parse login response")
return
else:
print(f"ERROR: Unexpected response type {pkt[1]:02x}")
return
print("=== Step 4: Token Exchange ===")
token_pkt = bytearray(64)
struct.pack_into("<IHHII", token_pkt, 0, 64, 0x00, next_transport_seq(), CLIENT_ID, RADIO_ID)
struct.pack_into(">I", token_pkt, 16, 0x30)
token_pkt[20] = 0x01
token_pkt[21] = 0x02
struct.pack_into(">H", token_pkt, 22, next_auth_seq())
struct.pack_into("<H", token_pkt, 26, token_request)
struct.pack_into("<I", token_pkt, 28, token)
struct.pack_into(">H", token_pkt, 36, 0x0798)
struct.pack_into("<I", token_pkt, 48, 0)
send_and_track(control_sock, bytes(token_pkt), CONTROL_PORT)
print("Sent token request")
radio_name = ""
radio_guid = bytes(16)
radio_mac = bytes(6)
radio_commoncap = 0
radio_use_guid = False
civ_address = 0x94
for _ in range(30):
data, addr = recv_packet(control_sock, timeout=3.0)
if not data:
print("ERROR: Timeout waiting for capabilities")
return
pkt_len = len(data)
pkt_type = struct.unpack("<H", data[4:6])[0]
print(f"< Control packet type=0x{pkt_type:02x} len={pkt_len}")
if pkt_len == 16:
pkt_seq = struct.unpack("<H", data[6:8])[0]
if pkt_type == 0x01:
print(f"< Retransmit request for seq={pkt_seq}")
handle_retransmit(control_sock, pkt_seq)
elif pkt_type == 0x06:
print("< Duplicate 'I am ready'")
continue
if pkt_len == 21 and pkt_type == 0x07:
ping_pkt = bytearray(data)
struct.pack_into("<I", ping_pkt, 8, CLIENT_ID)
struct.pack_into("<I", ping_pkt, 12, RADIO_ID)
ping_pkt[16] = 0x01
control_sock.sendto(bytes(ping_pkt), (RADIO_IP, CONTROL_PORT))
print("< Ping, echoed")
continue
if pkt_len == 64:
# Token ACK - update token but don't gate on it
request_type = data[21]
response = struct.unpack("<I", data[48:52])[0]
new_token = struct.unpack("<I", data[28:32])[0]
print(f"< Token ack requesttype=0x{request_type:02x} response=0x{response:08x}")
if response == 0x00000000:
token = new_token
continue
if pkt_len >= 166:
# CapabilitiesPacket header is 66 bytes (0x42), then RadioCapPacket entries
num_radios = struct.unpack("<H", data[64:66])[0]
print(f"< Capabilities packet radios={num_radios}")
radio = data[66:168] # first RadioCapPacket (102 bytes)
radio_guid = radio[0:16] # GUID at +0x00
radio_commoncap = struct.unpack("<H", radio[7:9])[0] # commoncap at +0x07
if radio_commoncap == 0x1080:
radio_commoncap = 0x8010
radio_use_guid = radio_commoncap != 0x8010
radio_mac = radio[10:16] # macaddress at +0x0A
radio_name = radio[16:48].split(b"\x00", 1)[0].decode("ascii", errors="ignore") # name at +0x10
civ_address = radio[0x52] # civ at +0x52
print(f"Radio name: {radio_name}")
print(f"CI-V address: 0x{civ_address:02x}")
# Send stream request immediately on receiving capabilities
break
else:
print("ERROR: Capabilities not received")
return
print("\n=== Step 5: Stream Request ===")
civ_local_port = local_port + 20
# ConnInfoPacket is 0x90 = 144 bytes
conn_pkt = bytearray(144)
struct.pack_into("<IHHII", conn_pkt, 0, 144, 0x00, next_transport_seq(), CLIENT_ID, RADIO_ID)
struct.pack_into(">I", conn_pkt, 16, 128) # payloadsize = 144 - 16 = 128
conn_pkt[20] = 0x01 # requestreply
conn_pkt[21] = 0x03 # requesttype
struct.pack_into(">H", conn_pkt, 22, next_auth_seq())
struct.pack_into("<H", conn_pkt, 26, token_request)
struct.pack_into("<I", conn_pkt, 28, token)
if radio_use_guid:
conn_pkt[32:48] = radio_guid
else:
struct.pack_into("<H", conn_pkt, 39, radio_commoncap)
conn_pkt[42:48] = radio_mac
conn_pkt[64:96] = radio_name.encode("ascii", errors="ignore")[:31].ljust(32, b"\x00")
conn_pkt[96:112] = encode_passcode_field(USERNAME)
# rxenable=0, txenable=0, rxcodec=0, txcodec=0, rxsample=0, txsample=0
conn_pkt[0x70] = 0x00 # rxenable
conn_pkt[0x71] = 0x00 # txenable
# civport at 0x7c (big-endian uint32)
struct.pack_into(">I", conn_pkt, 0x7c, civ_local_port)
# audioport at 0x80
struct.pack_into(">I", conn_pkt, 0x80, civ_local_port + 1)
conn_pkt[0x88] = 0x01 # convert=1
send_and_track(control_sock, bytes(conn_pkt), CONTROL_PORT)
print(f"Requested stream with CIV local port {civ_local_port}")
civ_remote_port = CIV_PORT
ping_count = 0
for _ in range(200):
data, addr = recv_packet(control_sock, timeout=1.0)
if not data:
continue
pkt_len = len(data)
pkt_type = struct.unpack("<H", data[4:6])[0]
if pkt_len == 16:
pkt_seq = struct.unpack("<H", data[6:8])[0]
if pkt_type == 0x01:
handle_retransmit(control_sock, pkt_seq)
continue
if pkt_len == 21 and pkt_type == 0x07:
ping_pkt = bytearray(data)
struct.pack_into("<I", ping_pkt, 8, CLIENT_ID)
struct.pack_into("<I", ping_pkt, 12, RADIO_ID)
ping_pkt[16] = 0x01
control_sock.sendto(bytes(ping_pkt), (RADIO_IP, CONTROL_PORT))
ping_count += 1
if ping_count == 1:
print(" Waiting for previous session to time out...")
continue
if pkt_len == 64:
request_type = data[21]
response = struct.unpack("<I", data[48:52])[0]
if request_type == 0x03 and response == 0xffffffff:
print("ERROR: Stream rejected by radio (try rebooting radio)")
return
# Busy/pending - keep waiting
continue
if pkt_len >= 80:
request_type = data[21]
if request_type == 0x03:
status_error = struct.unpack("<I", data[48:52])[0]
disc = data[0x40] if len(data) > 0x40 else 0
civ_remote_port = struct.unpack(">H", data[0x42:0x44])[0]
if civ_remote_port == 0:
civ_remote_port = CIV_PORT
print(f"< Stream accepted civ_port={civ_remote_port}")
if status_error == 0x00000000 and disc == 0:
break
elif disc == 0x01:
print("ERROR: Radio reports disconnected")
return
elif status_error == 0xffffffff:
print("ERROR: Stream rejected (try rebooting radio)")
return
continue
else:
print("ERROR: Stream status was not received")
return
print("\n=== Step 6: Opening CIV channel ===")
civ_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
civ_sock.bind(("0.0.0.0", civ_local_port))
civ_client_id = build_id(local_ip, civ_local_port)
civ_remote_id = 0
civ_are_you_there = struct.pack("<IHHII", 0x10, 0x03, 0x0000, civ_client_id, 0)
civ_sock.sendto(civ_are_you_there, (RADIO_IP, civ_remote_port))
print(f"→ CIV Are-you-there sent with client id 0x{civ_client_id:08x}")
for _ in range(10):
data, addr = recv_packet(civ_sock, timeout=2.0)
if not data:
print("ERROR: Timeout during CIV handshake")
return
pkt_len = len(data)
pkt_type = struct.unpack("<H", data[4:6])[0]
print(f"← CIV packet type=0x{pkt_type:02x} len={pkt_len}")
if pkt_len == 16 and pkt_type == 0x04:
civ_remote_id = struct.unpack("<I", data[8:12])[0]
ready = struct.pack("<IHHII", 0x10, 0x06, 0x0001, civ_client_id, civ_remote_id)
civ_sock.sendto(ready, (RADIO_IP, civ_remote_port))
print("→ CIV Are-you-ready sent")
continue
if pkt_len == 16 and pkt_type == 0x06:
civ_remote_id = struct.unpack("<I", data[8:12])[0]
open_pkt = bytearray(22)
struct.pack_into("<IHHII", open_pkt, 0, 22, 0x00, 1, civ_client_id, civ_remote_id)
struct.pack_into("<H", open_pkt, 16, 0x01C0)
open_pkt[18] = 0x00
struct.pack_into(">H", open_pkt, 19, 0x0000)
open_pkt[21] = 0x04
civ_sock.sendto(bytes(open_pkt), (RADIO_IP, civ_remote_port))
print("→ CIV Open packet sent")
break
if pkt_len == 21 and pkt_type == 0x07:
ping_pkt = bytearray(data)
struct.pack_into("<I", ping_pkt, 8, civ_client_id)
struct.pack_into("<I", ping_pkt, 12, civ_remote_id)
ping_pkt[16] = 0x01
civ_sock.sendto(bytes(ping_pkt), (RADIO_IP, civ_remote_port))
print("← CIV Ping, echoed")
else:
print("ERROR: CIV channel did not become ready")
return
print("\n=== Step 7: Reading frequency ===")
civ_cmd = bytes([0xFE, 0xFE, civ_address, 0xE0, 0x03, 0xFD])
civ_pkt = bytearray(21 + len(civ_cmd))
struct.pack_into("<IHHII", civ_pkt, 0, len(civ_pkt), 0x00, 2, civ_client_id, civ_remote_id)
civ_pkt[16] = 0xC1
struct.pack_into("<H", civ_pkt, 17, len(civ_cmd))
struct.pack_into(">H", civ_pkt, 19, 1)
civ_pkt[21:] = civ_cmd
civ_sock.sendto(bytes(civ_pkt), (RADIO_IP, civ_remote_port))
print(f"→ CI-V poll sent: {civ_cmd.hex()}")
for _ in range(20):
data, addr = recv_packet(civ_sock, timeout=3.0)
if not data:
print("ERROR: No frequency response")
return
pkt_len = len(data)
pkt_type = struct.unpack("<H", data[4:6])[0]
print(f"← CIV packet type=0x{pkt_type:02x} len={pkt_len}")
if pkt_len == 21 and pkt_type == 0x07:
ping_pkt = bytearray(data)
struct.pack_into("<I", ping_pkt, 8, civ_client_id)
struct.pack_into("<I", ping_pkt, 12, civ_remote_id)
ping_pkt[16] = 0x01
civ_sock.sendto(bytes(ping_pkt), (RADIO_IP, civ_remote_port))
print("← CIV Ping, echoed")
continue
if pkt_len > 21:
civ_data = data[21:]
print(f"CI-V data: {civ_data.hex()}")
if len(civ_data) >= 11 and civ_data[:2] == b"\xFE\xFE" and civ_data[4] in (0x00, 0x03):
bcd_freq = civ_data[5:10]
freq_hz = bcd_to_hz(bcd_freq)
print(f"\nFrequency: {freq_hz} Hz ({freq_hz / 1_000_000:.6f} MHz)")
break
else:
print("ERROR: Frequency response format not received")
return
print("\nConnection and frequency read succeeded.")
control_sock.close()
civ_sock.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment