-
-
Save josh-sw-is-fun/be8d2883269bd102ef237fb932ee46c0 to your computer and use it in GitHub Desktop.
Convert DSView TXT dump of USB packet fields into valid PCAP file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # convert DSView TXT dump of USB packet fields into valid PCAP file | |
| import re | |
| import sys | |
| import struct | |
| import os | |
| from enum import IntEnum | |
| class PCAP_Header(): | |
| def __init__(self): | |
| self.magic = 0xa1b2c3d4 | |
| self.version_major = 2 | |
| self.version_minor = 4 | |
| self.thiszone = 0 | |
| self.sigfigs = 0 | |
| self.snaplen = 0x40000 | |
| self.network = 288 | |
| def __bytes__(self): | |
| return struct.pack("<IHHIIII", self.magic, self.version_major, self.version_minor, self.thiszone, self.sigfigs, self.snaplen, self.network) | |
| class PCAPrec_Header(): | |
| def __init__(self, length, nanos): | |
| sec = nanos / 1e9 | |
| self.ts_sec = int(sec) | |
| self.ts_usec = round((sec - int(sec)) * 1e6) | |
| self.incl_len = length | |
| self.orig_len = length | |
| def __bytes__(self): | |
| return struct.pack("<IIII", self.ts_sec, self.ts_usec, self.incl_len, self.orig_len) | |
| class PID(IntEnum): | |
| SETUP = 0x2d | |
| DATA1 = 0x4b | |
| NAK = 0x5a | |
| IN = 0x69 | |
| SOF = 0xa5 | |
| DATA0 = 0xc3 | |
| ACK = 0xd2 | |
| OUT = 0xe1 | |
| STALL = 0x1e | |
| class SOF: | |
| def __init__(self, stream): | |
| self.pid = PID.SOF | |
| line = stream.readline() | |
| if re.match(r'.*Frame.*', line): | |
| _, ns, field = line.strip().split(',') | |
| frame = field.split(':')[1].strip() | |
| self.frame = int(frame) | |
| else: | |
| raise Exception('Frame not found') | |
| line = stream.readline() | |
| if re.match(r'.*CRC5.*', line): | |
| crc5 = line.strip().split(',')[2].split(':')[1].strip() | |
| self.crc5 = int(crc5, base=16) | |
| else: | |
| raise Exception('CRC5 not found') | |
| self.ns = float(ns) | |
| def get_ns(self): | |
| return self.ns | |
| def __bytes__(self): | |
| return struct.pack("<BH", int(self.pid), self.frame | (self.crc5 << 11)) | |
| class SETUP: | |
| def __init__(self, stream): | |
| self.pid = PID.SETUP | |
| line = stream.readline() | |
| if re.match(r'.*Address.*', line): | |
| _, ns, field = line.strip().split(',') | |
| address = field.split(':')[1].strip() | |
| self.address = int(address) | |
| else: | |
| raise Exception('Address not found') | |
| line = stream.readline() | |
| if re.match(r'.*Endpoint.*', line): | |
| endpoint = line.strip().split(',')[2].split(':')[1].strip() | |
| self.endpoint = int(endpoint) | |
| else: | |
| raise Exception('Endpoint not found') | |
| line = stream.readline() | |
| if re.match(r'.*CRC5.*', line): | |
| crc5 = line.strip().split(',')[2].split(':')[1].strip() | |
| self.crc5 = int(crc5, base=16) | |
| else: | |
| raise Exception('CRC5 not found') | |
| self.ns = float(ns) | |
| def get_ns(self): | |
| return self.ns | |
| def __bytes__(self): | |
| return struct.pack("<BH", int(self.pid), self.address | (self.endpoint << 7) | (self.crc5 << 11)) | |
| class IN: | |
| def __init__(self, stream): | |
| self.pid = PID.IN | |
| line = stream.readline() | |
| if re.match(r'.*Address.*', line): | |
| _, ns, field = line.strip().split(',') | |
| address = field.split(':')[1].strip() | |
| self.address = int(address) | |
| else: | |
| raise Exception('Address not found') | |
| line = stream.readline() | |
| if re.match(r'.*Endpoint.*', line): | |
| _, ns, field = line.strip().split(',') | |
| endpoint = field.split(':')[1].strip() | |
| self.endpoint = int(endpoint) | |
| else: | |
| raise Exception('Endpoint not found') | |
| line = stream.readline() | |
| if re.match(r'.*CRC5.*', line): | |
| crc5 = line.strip().split(',')[2].split(':')[1].strip() | |
| self.crc5 = int(crc5, base=16) | |
| else: | |
| raise Exception('CRC5 not found') | |
| self.ns = float(ns) | |
| def get_ns(self): | |
| return self.ns | |
| def __bytes__(self): | |
| return struct.pack("<BH", int(self.pid), self.address | (self.endpoint << 7) | (self.crc5 << 11)) | |
| class OUT: | |
| def __init__(self, stream): | |
| self.pid = PID.OUT | |
| line = stream.readline() | |
| if re.match(r'.*Address.*', line): | |
| _, ns, field = line.strip().split(',') | |
| address = field.split(':')[1].strip() | |
| self.address = int(address) | |
| else: | |
| raise Exception('Address not found') | |
| line = stream.readline() | |
| if re.match(r'.*Endpoint.*', line): | |
| endpoint = line.strip().split(',')[2].split(':')[1].strip() | |
| self.endpoint = int(endpoint) | |
| else: | |
| raise Exception('Endpoint not found') | |
| line = stream.readline() | |
| if re.match(r'.*CRC5.*', line): | |
| crc5 = line.strip().split(',')[2].split(':')[1].strip() | |
| self.crc5 = int(crc5, base=16) | |
| else: | |
| raise Exception('CRC5 not found') | |
| self.ns = float(ns) | |
| def get_ns(self): | |
| return self.ns | |
| def __bytes__(self): | |
| return struct.pack("<BH", int(self.pid), self.address | (self.endpoint << 7) | (self.crc5 << 11)) | |
| class DATA0: | |
| def __init__(self, stream): | |
| self.pid = PID.DATA0 | |
| self.databytes = [] | |
| line = stream.readline() | |
| ns = line.strip().split(',')[1] | |
| while re.match(r'.*Databyte.*', line): | |
| databyte = line.strip().split(',')[2].split(':')[1].strip() | |
| self.databytes.append(int(databyte, base=16)) | |
| line = stream.readline() | |
| if re.match(r'.*CRC16.*', line): | |
| crc16 = line.strip().split(',')[2].split(':')[1].strip() | |
| self.crc16 = int(crc16, base=16) | |
| else: | |
| raise Exception('CRC16 not found') | |
| self.ns = float(ns) | |
| def get_ns(self): | |
| return self.ns | |
| def __bytes__(self): | |
| b = b'' | |
| b += struct.pack("<B", int(self.pid)) | |
| for db in self.databytes: | |
| b += struct.pack("<B", db) | |
| b += struct.pack("<H", self.crc16) | |
| return b | |
| class DATA1: | |
| def __init__(self, stream): | |
| self.pid = PID.DATA1 | |
| self.databytes = [] | |
| line = stream.readline() | |
| ns = line.strip().split(',')[1] | |
| while re.match(r'.*Databyte.*', line): | |
| databyte = line.strip().split(',')[2].split(':')[1].strip() | |
| self.databytes.append(int(databyte, base=16)) | |
| line = stream.readline() | |
| if re.match(r'.*CRC16.*', line): | |
| crc16 = line.strip().split(',')[2].split(':')[1].strip() | |
| self.crc16 = int(crc16, base=16) | |
| else: | |
| raise Exception('CRC16 not found') | |
| self.ns = float(ns) | |
| def get_ns(self): | |
| return self.ns | |
| def __bytes__(self): | |
| b = b'' | |
| b += struct.pack("<B", int(self.pid)) | |
| for db in self.databytes: | |
| b += struct.pack("<B", db) | |
| b += struct.pack("<H", self.crc16) | |
| return b | |
| class ACK: | |
| def __init__(self, stream, ns): | |
| self.pid = PID.ACK | |
| self.ns = ns | |
| def get_ns(self): | |
| return self.ns | |
| def __bytes__(self): | |
| return struct.pack("<B", int(self.pid)) | |
| class NAK: | |
| def __init__(self, stream, ns): | |
| self.pid = PID.NAK | |
| self.ns = ns | |
| def get_ns(self): | |
| return self.ns | |
| def __bytes__(self): | |
| return struct.pack("<B", int(self.pid)) | |
| ignore_sof = len(sys.argv) > 1 and sys.argv[1] == '--no-sof' | |
| hdr = PCAP_Header() | |
| os.write(1, bytes(hdr)) | |
| line = None | |
| ns_offset = 0 | |
| while line != '': | |
| line = sys.stdin.readline() | |
| if re.match(r'.*SYNC.*', line): | |
| sync = line.strip().split(',')[2] | |
| sync_value = int(sync.split(':')[1].strip()) | |
| sync_id = line.strip().split(',')[0] | |
| print(sync_id, file=sys.stderr) | |
| if sync_value != 1: | |
| continue | |
| line = sys.stdin.readline() | |
| if re.match(r'.*PID.*', line): | |
| _, ns, field = line.strip().split(',') | |
| pid = field.split(':')[1].strip() | |
| #try: | |
| if True: | |
| pid = PID[pid] | |
| if pid == PID.SOF and not ignore_sof: | |
| sof = SOF(sys.stdin) | |
| print(sof.pid, sof.frame, sof.crc5, file=sys.stderr) | |
| b = bytes(sof) | |
| hdr = PCAPrec_Header(len(b), sof.get_ns()) | |
| os.write(1, bytes(hdr) + b) | |
| #sys.exit(0) | |
| elif pid == PID.SETUP: | |
| setup = SETUP(sys.stdin) | |
| print(setup.pid, setup.address, setup.endpoint, setup.crc5, file=sys.stderr) | |
| b = bytes(setup) | |
| hdr = PCAPrec_Header(len(b), setup.get_ns()) | |
| os.write(1, bytes(hdr) + b) | |
| elif pid == PID.IN: | |
| inn = IN(sys.stdin) | |
| print(inn.pid, inn.address, inn.endpoint, inn.crc5, file=sys.stderr) | |
| b = bytes(inn) | |
| hdr = PCAPrec_Header(len(b), inn.get_ns()) | |
| os.write(1, bytes(hdr) + b) | |
| elif pid == PID.OUT: | |
| out = OUT(sys.stdin) | |
| print(out.pid, out.address, out.endpoint, out.crc5, file=sys.stderr) | |
| b = bytes(out) | |
| hdr = PCAPrec_Header(len(b), out.get_ns()) | |
| os.write(1, bytes(hdr) + b) | |
| elif pid == PID.DATA0: | |
| data0 = DATA0(sys.stdin) | |
| print(data0.pid, data0.databytes, data0.crc16, file=sys.stderr) | |
| b = bytes(data0) | |
| hdr = PCAPrec_Header(len(b), data0.get_ns()) | |
| os.write(1, bytes(hdr) + b) | |
| elif pid == PID.DATA1: | |
| data1 = DATA1(sys.stdin) | |
| print(data1.pid, data1.databytes, data1.crc16, file=sys.stderr) | |
| b = bytes(data1) | |
| hdr = PCAPrec_Header(len(b), data1.get_ns()) | |
| os.write(1, bytes(hdr) + b) | |
| elif pid == PID.ACK: | |
| ack = ACK(sys.stdin, float(ns)) | |
| print(ack.pid, file=sys.stderr) | |
| b = bytes(ack) | |
| hdr = PCAPrec_Header(len(b), ack.get_ns()) | |
| os.write(1, bytes(hdr) + b) | |
| elif pid == PID.NAK: | |
| nak = NAK(sys.stdin, float(ns)) | |
| print(nak.pid, file=sys.stderr) | |
| b = bytes(nak) | |
| hdr = PCAPrec_Header(len(b), nak.get_ns()) | |
| os.write(1, bytes(hdr) + b) | |
| else: | |
| print(f'Ignoring {str(pid)}', file=sys.stderr) | |
| #except Exception as e: | |
| # print(e) | |
| # continue | |
| else: | |
| continue | |
| else: | |
| pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment