Last active
March 12, 2026 19:35
-
-
Save iu2frl/6285f157147edf028617cf692fd808d4 to your computer and use it in GitHub Desktop.
ICOM RS-BA1 Python client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Minimal script to connect to ICOM radio trough network connection of RS-BA1 and read frequency. | |
| """ | |
| import struct | |
| import socket | |
| import time | |
| import random | |
| # Radio config | |
| RADIO_IP = "192.168.0.230" | |
| CONTROL_PORT = 50001 | |
| CIV_PORT = 50002 | |
| USERNAME = b"admin" + b"\x00" * 11 # 16 bytes total | |
| PASSWORD = b"password123" + b"\x00" * 5 # 16 bytes total | |
| CLIENT_NAME = b"PythonClient" + b"\x00" * 4 # 16 bytes total | |
| CLIENT_ID = random.randint(0x1000000, 0x7FFFFFFF) | |
| transport_seq = 1 | |
| auth_seq = 0x30 | |
| sent_packets = {} # retransmit buffer: seq -> bytes | |
| PASSCODE_SEQUENCE = bytes([ | |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, | |
| 0x47, 0x5D, 0x4C, 0x42, 0x66, 0x20, 0x23, 0x46, 0x4E, 0x57, 0x45, 0x3D, 0x67, 0x76, 0x60, 0x41, | |
| 0x62, 0x39, 0x59, 0x2D, 0x68, 0x7E, 0x7C, 0x65, 0x7D, 0x49, 0x29, 0x72, 0x73, 0x78, 0x21, 0x6E, | |
| 0x5A, 0x5E, 0x4A, 0x3E, 0x71, 0x2C, 0x2A, 0x54, 0x3C, 0x3A, 0x63, 0x4F, 0x43, 0x75, 0x27, 0x79, | |
| 0x5B, 0x35, 0x70, 0x48, 0x6B, 0x56, 0x6F, 0x34, 0x32, 0x6C, 0x30, 0x61, 0x6D, 0x7B, 0x2F, 0x4B, | |
| 0x64, 0x38, 0x2B, 0x2E, 0x50, 0x40, 0x3F, 0x55, 0x33, 0x37, 0x25, 0x77, 0x24, 0x26, 0x74, 0x6A, | |
| 0x28, 0x53, 0x4D, 0x69, 0x22, 0x5C, 0x44, 0x31, 0x36, 0x58, 0x3B, 0x7A, 0x51, 0x5F, 0x52, | |
| 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, | |
| ]) | |
| def next_transport_seq() -> int: | |
| global transport_seq | |
| current = transport_seq | |
| transport_seq = (transport_seq + 1) & 0xFFFF | |
| return current | |
| def next_auth_seq() -> int: | |
| global auth_seq | |
| current = auth_seq | |
| auth_seq = (auth_seq + 1) & 0xFFFF | |
| return current | |
| def send_and_track(sock: socket.socket, data: bytes, port: int): | |
| """Send a packet and store it for potential retransmit.""" | |
| # Extract seq from packet | |
| target_seq = struct.unpack("<H", data[6:8])[0] | |
| sent_packets[target_seq] = (data, port) | |
| sock.sendto(data, (RADIO_IP, port)) | |
| def handle_retransmit(sock: socket.socket, requested_seq: int): | |
| """Resend a packet if we have it.""" | |
| if requested_seq in sent_packets: | |
| data, port = sent_packets[requested_seq] | |
| print(f"→ Resending seq={requested_seq}") | |
| sock.sendto(data, (RADIO_IP, port)) | |
| return True | |
| return False | |
| def encode_passcode_field(value: bytes) -> bytes: | |
| result = bytearray(16) | |
| for index, raw in enumerate(value[:16]): | |
| if raw == 0: | |
| break | |
| mapped = raw + index | |
| if mapped > 126: | |
| mapped = 32 + (mapped % 127) | |
| result[index] = PASSCODE_SEQUENCE[mapped] | |
| return bytes(result) | |
| def bcd_to_hz(bcd: bytes) -> int: | |
| """Convert 5-byte little-endian BCD to Hz.""" | |
| hz = 0 | |
| mult = 1 | |
| for byte in bcd: | |
| low = byte & 0x0F | |
| high = (byte >> 4) & 0x0F | |
| hz += low * mult | |
| mult *= 10 | |
| hz += high * mult | |
| mult *= 10 | |
| return hz | |
| def get_local_ip() -> str: | |
| probe = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| try: | |
| probe.connect((RADIO_IP, CONTROL_PORT)) | |
| return probe.getsockname()[0] | |
| finally: | |
| probe.close() | |
| def build_id(local_ip: str, port: int) -> int: | |
| octets = [int(part) for part in local_ip.split(".")] | |
| return ((octets[2] & 0xFF) << 24) | ((octets[3] & 0xFF) << 16) | (port & 0xFFFF) | |
| def send_control_untracked(sock: socket.socket, pkt_type: int, sentid: int, rcvdid: int, fixed_seq: int): | |
| """Send an untracked control packet.""" | |
| data = struct.pack("<IHHII", 0x10, pkt_type, fixed_seq, sentid, rcvdid) | |
| print(f"→ Control type={pkt_type:02x} seq={fixed_seq}") | |
| print(f" Bytes: {data.hex()}") | |
| sock.sendto(data, (RADIO_IP, CONTROL_PORT)) | |
| def send_control_tracked(sock: socket.socket, pkt_type: int, sentid: int, rcvdid: int): | |
| """Send a tracked control packet.""" | |
| s = next_transport_seq() | |
| data = struct.pack("<IHHII", 0x10, pkt_type, s, sentid, rcvdid) | |
| print(f"→ Control type={pkt_type:02x} seq={s}") | |
| print(f" Bytes: {data.hex()}") | |
| send_and_track(sock, data, CONTROL_PORT) | |
| def recv_packet(sock: socket.socket, timeout=2.0) -> tuple[bytes, tuple]: | |
| """Receive a packet with timeout.""" | |
| sock.settimeout(timeout) | |
| try: | |
| data, addr = sock.recvfrom(1500) | |
| return data, addr | |
| except socket.timeout: | |
| return None, None | |
| except ConnectionResetError: | |
| return None, None | |
| def main(): | |
| # Create control socket | |
| local_ip = get_local_ip() | |
| control_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| control_sock.bind(("0.0.0.0", 0)) | |
| local_port = control_sock.getsockname()[1] | |
| print(f"Connecting to {RADIO_IP}:{CONTROL_PORT}") | |
| print(f"Client ID: 0x{CLIENT_ID:08x}") | |
| print(f"Local IP: {local_ip}") | |
| print(f"Local port: {local_port}\n") | |
| # Step 1: Are you there? | |
| print("=== Step 1: Are you there? ===") | |
| send_control_untracked(control_sock, 0x03, CLIENT_ID, 0, 0x0000) | |
| data, addr = recv_packet(control_sock) | |
| if not data: | |
| print("ERROR: No response to 'Are you there?'") | |
| return | |
| pkt = struct.unpack("<IHHII", data[:16]) | |
| print(f"← type={pkt[1]:02x} (I am here)") | |
| if pkt[1] != 0x04: | |
| print(f"ERROR: Expected type 0x04, got {pkt[1]:02x}") | |
| return | |
| RADIO_ID = pkt[3] # sentid from radio | |
| print(f"Radio ID: 0x{RADIO_ID:08x}\n") | |
| # Step 2: Are you ready? | |
| print("=== Step 2: Are you ready? ===") | |
| send_control_untracked(control_sock, 0x06, CLIENT_ID, RADIO_ID, 0x0001) | |
| data, addr = recv_packet(control_sock) | |
| if not data: | |
| print("ERROR: No response to 'Are you ready?'") | |
| return | |
| pkt = struct.unpack("<IHHII", data[:16]) | |
| print(f"← type={pkt[1]:02x} (I am ready)\n") | |
| if pkt[1] != 0x06: | |
| print(f"ERROR: Expected type 0x06, got {pkt[1]:02x}") | |
| return | |
| # Step 3: Login | |
| print("=== Step 3: Login ===") | |
| login_seq = next_transport_seq() | |
| login_inner_seq = next_auth_seq() | |
| token_request = random.randint(0, 0xFFFF) | |
| enc_user = encode_passcode_field(USERNAME) | |
| enc_pass = encode_passcode_field(PASSWORD) | |
| # Build login packet (manually to ensure correct endianness) | |
| header = struct.pack("<IHHII", 0x80, 0x00, login_seq, CLIENT_ID, RADIO_ID) | |
| # Payload with mixed endianness | |
| payloadsize = 0x70 # 0x80 - 0x10 header = 112 bytes | |
| requestreply = 0x01 # CRITICAL! Must be 0x01, not 0x00 | |
| requesttype = 0x00 | |
| innerseq = login_inner_seq | |
| tokrequest = token_request | |
| token = 0x00000000 | |
| payload = struct.pack(">I", payloadsize) # BIG endian | |
| payload += struct.pack("<BB", requestreply, requesttype) # little | |
| payload += struct.pack(">H", innerseq) # BIG endian | |
| payload += bytes(2) # padding | |
| payload += struct.pack("<HI", tokrequest, token) # little | |
| payload += bytes(32) # reserved | |
| payload += enc_user | |
| payload += enc_pass | |
| payload += CLIENT_NAME | |
| payload += bytes(16) # reserved | |
| login_pkt = header + payload | |
| print(f"Sending login packet (len={len(login_pkt)})") | |
| print(f"Payload: size={payloadsize} innerseq={innerseq}") | |
| print(f"Login packet hex (first 48 bytes):") | |
| print(f" {login_pkt[:48].hex()}") | |
| send_and_track(control_sock, login_pkt, CONTROL_PORT) | |
| # Wait for login response with token (handle retransmit requests) | |
| for attempt in range(10): | |
| data, addr = recv_packet(control_sock, timeout=3.0) | |
| if not data: | |
| print("ERROR: No login response") | |
| return | |
| print(f"← Received {len(data)} bytes") | |
| print(f" Hex: {data.hex()}") | |
| pkt = struct.unpack("<IHHII", data[:16]) | |
| pkt_type = pkt[1] | |
| pkt_seq = pkt[2] | |
| if pkt_type == 0x01: | |
| print(f"← Retransmit request for seq={pkt_seq}") | |
| handle_retransmit(control_sock, pkt_seq) | |
| continue | |
| elif pkt_type == 0x06: | |
| print(f"← Duplicate 'I am ready' (ignoring)") | |
| continue | |
| elif pkt_type == 0x07: | |
| # Ping - echo it back | |
| if len(data) >= 21: | |
| ping_pkt = bytearray(data) | |
| # Set reply=0x01 | |
| struct.pack_into("<I", ping_pkt, 8, CLIENT_ID) # sentid | |
| struct.pack_into("<I", ping_pkt, 12, RADIO_ID) # rcvdid | |
| ping_pkt[16] = 0x01 # reply=0x01 | |
| print(f"← Ping, echoing back") | |
| control_sock.sendto(bytes(ping_pkt), (RADIO_IP, CONTROL_PORT)) | |
| continue | |
| elif pkt_type == 0x00: | |
| # This is the login response we want | |
| break | |
| else: | |
| print("ERROR: No login response after retransmits") | |
| return | |
| if len(data) < 32: | |
| print(f"ERROR: Login response too short ({len(data)} bytes)") | |
| return | |
| pkt = struct.unpack("<IHHII", data[:16]) | |
| print(f"← type={pkt[1]:02x}") | |
| if pkt[1] == 0x00: | |
| # Parse login response | |
| if len(data) >= 80: | |
| innerseq_resp = struct.unpack(">H", data[22:24])[0] | |
| tokrequest_resp = struct.unpack("<H", data[26:28])[0] | |
| token = struct.unpack("<I", data[28:32])[0] | |
| error = struct.unpack("<I", data[48:52])[0] | |
| connection_name = data[64:80].rstrip(b"\x00").decode("ascii", errors="ignore") | |
| print( | |
| f"Login response: innerseq={innerseq_resp} tokrequest=0x{tokrequest_resp:04x} " | |
| f"token=0x{token:08x} error=0x{error:08x} connection={connection_name}" | |
| ) | |
| if error == 0x00000000: | |
| print(f"✓ Login successful! Token: 0x{token:08x}\n") | |
| else: | |
| print(f"ERROR: Login failed with error 0x{error:08x}") | |
| return | |
| else: | |
| print("ERROR: Can't parse login response") | |
| return | |
| else: | |
| print(f"ERROR: Unexpected response type {pkt[1]:02x}") | |
| return | |
| print("=== Step 4: Token Exchange ===") | |
| token_pkt = bytearray(64) | |
| struct.pack_into("<IHHII", token_pkt, 0, 64, 0x00, next_transport_seq(), CLIENT_ID, RADIO_ID) | |
| struct.pack_into(">I", token_pkt, 16, 0x30) | |
| token_pkt[20] = 0x01 | |
| token_pkt[21] = 0x02 | |
| struct.pack_into(">H", token_pkt, 22, next_auth_seq()) | |
| struct.pack_into("<H", token_pkt, 26, token_request) | |
| struct.pack_into("<I", token_pkt, 28, token) | |
| struct.pack_into(">H", token_pkt, 36, 0x0798) | |
| struct.pack_into("<I", token_pkt, 48, 0) | |
| send_and_track(control_sock, bytes(token_pkt), CONTROL_PORT) | |
| print("Sent token request") | |
| radio_name = "" | |
| radio_guid = bytes(16) | |
| radio_mac = bytes(6) | |
| radio_commoncap = 0 | |
| radio_use_guid = False | |
| civ_address = 0x94 | |
| for _ in range(30): | |
| data, addr = recv_packet(control_sock, timeout=3.0) | |
| if not data: | |
| print("ERROR: Timeout waiting for capabilities") | |
| return | |
| pkt_len = len(data) | |
| pkt_type = struct.unpack("<H", data[4:6])[0] | |
| print(f"< Control packet type=0x{pkt_type:02x} len={pkt_len}") | |
| if pkt_len == 16: | |
| pkt_seq = struct.unpack("<H", data[6:8])[0] | |
| if pkt_type == 0x01: | |
| print(f"< Retransmit request for seq={pkt_seq}") | |
| handle_retransmit(control_sock, pkt_seq) | |
| elif pkt_type == 0x06: | |
| print("< Duplicate 'I am ready'") | |
| continue | |
| if pkt_len == 21 and pkt_type == 0x07: | |
| ping_pkt = bytearray(data) | |
| struct.pack_into("<I", ping_pkt, 8, CLIENT_ID) | |
| struct.pack_into("<I", ping_pkt, 12, RADIO_ID) | |
| ping_pkt[16] = 0x01 | |
| control_sock.sendto(bytes(ping_pkt), (RADIO_IP, CONTROL_PORT)) | |
| print("< Ping, echoed") | |
| continue | |
| if pkt_len == 64: | |
| # Token ACK - update token but don't gate on it | |
| request_type = data[21] | |
| response = struct.unpack("<I", data[48:52])[0] | |
| new_token = struct.unpack("<I", data[28:32])[0] | |
| print(f"< Token ack requesttype=0x{request_type:02x} response=0x{response:08x}") | |
| if response == 0x00000000: | |
| token = new_token | |
| continue | |
| if pkt_len >= 166: | |
| # CapabilitiesPacket header is 66 bytes (0x42), then RadioCapPacket entries | |
| num_radios = struct.unpack("<H", data[64:66])[0] | |
| print(f"< Capabilities packet radios={num_radios}") | |
| radio = data[66:168] # first RadioCapPacket (102 bytes) | |
| radio_guid = radio[0:16] # GUID at +0x00 | |
| radio_commoncap = struct.unpack("<H", radio[7:9])[0] # commoncap at +0x07 | |
| if radio_commoncap == 0x1080: | |
| radio_commoncap = 0x8010 | |
| radio_use_guid = radio_commoncap != 0x8010 | |
| radio_mac = radio[10:16] # macaddress at +0x0A | |
| radio_name = radio[16:48].split(b"\x00", 1)[0].decode("ascii", errors="ignore") # name at +0x10 | |
| civ_address = radio[0x52] # civ at +0x52 | |
| print(f"Radio name: {radio_name}") | |
| print(f"CI-V address: 0x{civ_address:02x}") | |
| # Send stream request immediately on receiving capabilities | |
| break | |
| else: | |
| print("ERROR: Capabilities not received") | |
| return | |
| print("\n=== Step 5: Stream Request ===") | |
| civ_local_port = local_port + 20 | |
| # ConnInfoPacket is 0x90 = 144 bytes | |
| conn_pkt = bytearray(144) | |
| struct.pack_into("<IHHII", conn_pkt, 0, 144, 0x00, next_transport_seq(), CLIENT_ID, RADIO_ID) | |
| struct.pack_into(">I", conn_pkt, 16, 128) # payloadsize = 144 - 16 = 128 | |
| conn_pkt[20] = 0x01 # requestreply | |
| conn_pkt[21] = 0x03 # requesttype | |
| struct.pack_into(">H", conn_pkt, 22, next_auth_seq()) | |
| struct.pack_into("<H", conn_pkt, 26, token_request) | |
| struct.pack_into("<I", conn_pkt, 28, token) | |
| if radio_use_guid: | |
| conn_pkt[32:48] = radio_guid | |
| else: | |
| struct.pack_into("<H", conn_pkt, 39, radio_commoncap) | |
| conn_pkt[42:48] = radio_mac | |
| conn_pkt[64:96] = radio_name.encode("ascii", errors="ignore")[:31].ljust(32, b"\x00") | |
| conn_pkt[96:112] = encode_passcode_field(USERNAME) | |
| # rxenable=0, txenable=0, rxcodec=0, txcodec=0, rxsample=0, txsample=0 | |
| conn_pkt[0x70] = 0x00 # rxenable | |
| conn_pkt[0x71] = 0x00 # txenable | |
| # civport at 0x7c (big-endian uint32) | |
| struct.pack_into(">I", conn_pkt, 0x7c, civ_local_port) | |
| # audioport at 0x80 | |
| struct.pack_into(">I", conn_pkt, 0x80, civ_local_port + 1) | |
| conn_pkt[0x88] = 0x01 # convert=1 | |
| send_and_track(control_sock, bytes(conn_pkt), CONTROL_PORT) | |
| print(f"Requested stream with CIV local port {civ_local_port}") | |
| civ_remote_port = CIV_PORT | |
| ping_count = 0 | |
| for _ in range(200): | |
| data, addr = recv_packet(control_sock, timeout=1.0) | |
| if not data: | |
| continue | |
| pkt_len = len(data) | |
| pkt_type = struct.unpack("<H", data[4:6])[0] | |
| if pkt_len == 16: | |
| pkt_seq = struct.unpack("<H", data[6:8])[0] | |
| if pkt_type == 0x01: | |
| handle_retransmit(control_sock, pkt_seq) | |
| continue | |
| if pkt_len == 21 and pkt_type == 0x07: | |
| ping_pkt = bytearray(data) | |
| struct.pack_into("<I", ping_pkt, 8, CLIENT_ID) | |
| struct.pack_into("<I", ping_pkt, 12, RADIO_ID) | |
| ping_pkt[16] = 0x01 | |
| control_sock.sendto(bytes(ping_pkt), (RADIO_IP, CONTROL_PORT)) | |
| ping_count += 1 | |
| if ping_count == 1: | |
| print(" Waiting for previous session to time out...") | |
| continue | |
| if pkt_len == 64: | |
| request_type = data[21] | |
| response = struct.unpack("<I", data[48:52])[0] | |
| if request_type == 0x03 and response == 0xffffffff: | |
| print("ERROR: Stream rejected by radio (try rebooting radio)") | |
| return | |
| # Busy/pending - keep waiting | |
| continue | |
| if pkt_len >= 80: | |
| request_type = data[21] | |
| if request_type == 0x03: | |
| status_error = struct.unpack("<I", data[48:52])[0] | |
| disc = data[0x40] if len(data) > 0x40 else 0 | |
| civ_remote_port = struct.unpack(">H", data[0x42:0x44])[0] | |
| if civ_remote_port == 0: | |
| civ_remote_port = CIV_PORT | |
| print(f"< Stream accepted civ_port={civ_remote_port}") | |
| if status_error == 0x00000000 and disc == 0: | |
| break | |
| elif disc == 0x01: | |
| print("ERROR: Radio reports disconnected") | |
| return | |
| elif status_error == 0xffffffff: | |
| print("ERROR: Stream rejected (try rebooting radio)") | |
| return | |
| continue | |
| else: | |
| print("ERROR: Stream status was not received") | |
| return | |
| print("\n=== Step 6: Opening CIV channel ===") | |
| civ_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| civ_sock.bind(("0.0.0.0", civ_local_port)) | |
| civ_client_id = build_id(local_ip, civ_local_port) | |
| civ_remote_id = 0 | |
| civ_are_you_there = struct.pack("<IHHII", 0x10, 0x03, 0x0000, civ_client_id, 0) | |
| civ_sock.sendto(civ_are_you_there, (RADIO_IP, civ_remote_port)) | |
| print(f"→ CIV Are-you-there sent with client id 0x{civ_client_id:08x}") | |
| for _ in range(10): | |
| data, addr = recv_packet(civ_sock, timeout=2.0) | |
| if not data: | |
| print("ERROR: Timeout during CIV handshake") | |
| return | |
| pkt_len = len(data) | |
| pkt_type = struct.unpack("<H", data[4:6])[0] | |
| print(f"← CIV packet type=0x{pkt_type:02x} len={pkt_len}") | |
| if pkt_len == 16 and pkt_type == 0x04: | |
| civ_remote_id = struct.unpack("<I", data[8:12])[0] | |
| ready = struct.pack("<IHHII", 0x10, 0x06, 0x0001, civ_client_id, civ_remote_id) | |
| civ_sock.sendto(ready, (RADIO_IP, civ_remote_port)) | |
| print("→ CIV Are-you-ready sent") | |
| continue | |
| if pkt_len == 16 and pkt_type == 0x06: | |
| civ_remote_id = struct.unpack("<I", data[8:12])[0] | |
| open_pkt = bytearray(22) | |
| struct.pack_into("<IHHII", open_pkt, 0, 22, 0x00, 1, civ_client_id, civ_remote_id) | |
| struct.pack_into("<H", open_pkt, 16, 0x01C0) | |
| open_pkt[18] = 0x00 | |
| struct.pack_into(">H", open_pkt, 19, 0x0000) | |
| open_pkt[21] = 0x04 | |
| civ_sock.sendto(bytes(open_pkt), (RADIO_IP, civ_remote_port)) | |
| print("→ CIV Open packet sent") | |
| break | |
| if pkt_len == 21 and pkt_type == 0x07: | |
| ping_pkt = bytearray(data) | |
| struct.pack_into("<I", ping_pkt, 8, civ_client_id) | |
| struct.pack_into("<I", ping_pkt, 12, civ_remote_id) | |
| ping_pkt[16] = 0x01 | |
| civ_sock.sendto(bytes(ping_pkt), (RADIO_IP, civ_remote_port)) | |
| print("← CIV Ping, echoed") | |
| else: | |
| print("ERROR: CIV channel did not become ready") | |
| return | |
| print("\n=== Step 7: Reading frequency ===") | |
| civ_cmd = bytes([0xFE, 0xFE, civ_address, 0xE0, 0x03, 0xFD]) | |
| civ_pkt = bytearray(21 + len(civ_cmd)) | |
| struct.pack_into("<IHHII", civ_pkt, 0, len(civ_pkt), 0x00, 2, civ_client_id, civ_remote_id) | |
| civ_pkt[16] = 0xC1 | |
| struct.pack_into("<H", civ_pkt, 17, len(civ_cmd)) | |
| struct.pack_into(">H", civ_pkt, 19, 1) | |
| civ_pkt[21:] = civ_cmd | |
| civ_sock.sendto(bytes(civ_pkt), (RADIO_IP, civ_remote_port)) | |
| print(f"→ CI-V poll sent: {civ_cmd.hex()}") | |
| for _ in range(20): | |
| data, addr = recv_packet(civ_sock, timeout=3.0) | |
| if not data: | |
| print("ERROR: No frequency response") | |
| return | |
| pkt_len = len(data) | |
| pkt_type = struct.unpack("<H", data[4:6])[0] | |
| print(f"← CIV packet type=0x{pkt_type:02x} len={pkt_len}") | |
| if pkt_len == 21 and pkt_type == 0x07: | |
| ping_pkt = bytearray(data) | |
| struct.pack_into("<I", ping_pkt, 8, civ_client_id) | |
| struct.pack_into("<I", ping_pkt, 12, civ_remote_id) | |
| ping_pkt[16] = 0x01 | |
| civ_sock.sendto(bytes(ping_pkt), (RADIO_IP, civ_remote_port)) | |
| print("← CIV Ping, echoed") | |
| continue | |
| if pkt_len > 21: | |
| civ_data = data[21:] | |
| print(f"CI-V data: {civ_data.hex()}") | |
| if len(civ_data) >= 11 and civ_data[:2] == b"\xFE\xFE" and civ_data[4] in (0x00, 0x03): | |
| bcd_freq = civ_data[5:10] | |
| freq_hz = bcd_to_hz(bcd_freq) | |
| print(f"\nFrequency: {freq_hz} Hz ({freq_hz / 1_000_000:.6f} MHz)") | |
| break | |
| else: | |
| print("ERROR: Frequency response format not received") | |
| return | |
| print("\nConnection and frequency read succeeded.") | |
| control_sock.close() | |
| civ_sock.close() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment