Created
February 1, 2026 09:10
-
-
Save kawai-neko-meow/b47b79ad0d916c501c3f2210cb0be16e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import typing | |
| import dataclasses | |
| import os | |
| import binascii | |
| import uuid | |
| import enum | |
| import datetime | |
| class BlockDevice: | |
| fp: typing.BinaryIO | |
| def __init__(self, fp, block_size): | |
| self.fp = fp | |
| self.block_size = block_size | |
| self.bytes_size = os.fstat(fp.fileno()).st_size | |
| def read_bytes(self, count): | |
| return self.fp.read(count) | |
| def read(self, count): | |
| return self.fp.read(self.block_size * count) | |
| def read_at(self, pos, count): | |
| self.seek(pos) | |
| return self.read(count) | |
| def seek(self, pos): | |
| self.fp.seek(self.block_size * pos) | |
| def tell(self): | |
| return self.fp.tell() | |
| @property | |
| def size(self): | |
| return self.bytes_size // self.block_size | |
| class CHS(typing.NamedTuple): | |
| head: int | |
| sector: int | |
| cylinder: int | |
| @dataclasses.dataclass(frozen=True) | |
| class MbrPte: | |
| device: BlockDevice | |
| valid: bool | |
| flags: int | |
| type: int | |
| first_sector: CHS | |
| last_sector: CHS | |
| offset: int | |
| size: int | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 16 | |
| type = data[4] | |
| flags = data[0] | |
| if type == 0: | |
| return cls(device, True, 0, 0, CHS(0, 0, 0), CHS(0, 0, 0), 0, 0) | |
| offset = int.from_bytes(data[8:12], 'little') | |
| size = int.from_bytes(data[12:16], 'little') | |
| valid = offset != 0 and size != 0 and offset + size <= device.size and flags | 0x7F == 0x7F | |
| return cls( | |
| device, valid, | |
| flags, type, | |
| cls._read_chs(data[1:4]), cls._read_chs(data[4:7]), | |
| offset, size, | |
| ) | |
| @staticmethod | |
| def _read_chs(data: bytes): | |
| assert len(data) == 3 | |
| return CHS(data[0], data[1] & 0x3f, data[2] | ((data[1] & 0xC0) >> 6)) | |
| @dataclasses.dataclass(frozen=True) | |
| class MBR: | |
| device: BlockDevice | |
| valid: bool | |
| boot: bytes | |
| partitons: list[MbrPte] | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| parts = [MbrPte.from_bytes(device, data[446 + 16 * i : 446 + 16 * (i + 1)]) for i in range(4)] | |
| # TODO validate part positions | |
| valid = data[510:512] == b'\x55\xAA' and all((part.valid for part in parts)) | |
| return cls( | |
| device, valid, data[0:446], parts | |
| ) | |
| @classmethod | |
| def read(cls, device: BlockDevice): | |
| sect = device.read_at(0, 1) | |
| return cls.from_bytes(device, sect) | |
| def iter_partitions(self) -> typing.Iterable[MbrPte]: | |
| for pte in self.partitons: | |
| if pte.type != 0: | |
| yield pte | |
| def print(self): | |
| print(f"{'' if self.valid else 'Invalid '}MBR Table") | |
| print("# I A TT start size end") | |
| for i, p in enumerate(self.partitons): | |
| if p.type == 0: | |
| continue | |
| inv = ' ' if p.valid else 'I' | |
| act = 'A' if p.flags & 0x7F else ' ' | |
| print(f"{i} {inv} {act} {p.type:02X} {p.offset: 10d} {p.size: 10d} {p.offset+p.size: 10d}") | |
| class GPTPartitionFlag(enum.IntFlag): | |
| PLATFORM_REQUIRED = 1 << 0 | |
| FIRMWARE_IGNORE = 1 << 1 | |
| LEGACY_BOOTABLE = 1 << 2 | |
| RESERVED = 0xfffffffffff8 | |
| VENDOR_DEFINED = 0xffff000000000000 | |
| @dataclasses.dataclass(frozen=True) | |
| class GPTPartition: | |
| device: BlockDevice | |
| type: uuid.UUID | |
| guid: uuid.UUID | |
| flags: GPTPartitionFlag | |
| name: str | |
| start_pos: int | |
| end_pos: int | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) >= 128 | |
| type = uuid.UUID(bytes=data[0:16]) | |
| guid = uuid.UUID(bytes=data[16:32]) | |
| first_lba = int.from_bytes(data[32:40], 'little') | |
| last_lba = int.from_bytes(data[40:48], 'little') | |
| flags = int.from_bytes(data[48:56], 'little') | |
| name = data[56:128].decode('UTF-16LE').rstrip('\0') | |
| data = data[128:] | |
| return cls( | |
| device, | |
| type, guid, flags, name, | |
| first_lba, last_lba, | |
| ) | |
| @dataclasses.dataclass(frozen=True) | |
| class GPTTable: | |
| device: BlockDevice | |
| valid: bool | |
| revision: int | |
| backup_header_pos: int | |
| first_usable_pos: int | |
| last_usable_pos: int | |
| guid: uuid.UUID | |
| partitions: list[GPTPartition] | |
| @classmethod | |
| def read(cls, device: BlockDevice, is_backup = False): | |
| hdr_pos = device.size - 1 if is_backup else 1 | |
| data = device.read_at(hdr_pos, 1) | |
| hdr_valid = data[0:8] == b'EFI PART' | |
| revision = int.from_bytes(data[8:12], 'little') | |
| hdr_size = int.from_bytes(data[12:16], 'little') | |
| crc = int.from_bytes(data[16:20], 'little') | |
| res_valid = data[20:24] == b'\0\0\0\0' | |
| curr_pos = int.from_bytes(data[24:32], 'little') | |
| backup_header_pos = int.from_bytes(data[32:40], 'little') | |
| first_usable_pos = int.from_bytes(data[40:48], 'little') | |
| last_usable_pos = int.from_bytes(data[48:56], 'little') | |
| guid = uuid.UUID(bytes=data[56:72]) | |
| pt_lba = int.from_bytes(data[72:80], 'little') | |
| pt_ent_count = int.from_bytes(data[80:84], 'little') | |
| pt_ent_size = int.from_bytes(data[84:88], 'little') | |
| pt_crc = int.from_bytes(data[88:92], 'little') | |
| crc_valid = cls._crc(data) == crc | |
| valid = hdr_valid and res_valid and crc_valid and hdr_size == 92 and \ | |
| pt_ent_size >= 128 and curr_pos == hdr_pos | |
| device.seek(pt_lba) | |
| pt_data = device.read_bytes(pt_ent_count * pt_ent_size) | |
| pt_crc_valid = binascii.crc32(pt_data) == pt_crc | |
| valid = valid and pt_crc_valid | |
| parts = [] | |
| for i in range(pt_ent_count): | |
| part = GPTPartition.from_bytes(device, pt_data[i * pt_ent_size:(i+1) * pt_ent_size]) | |
| parts.append(part) | |
| return cls( | |
| device, valid, | |
| revision, backup_header_pos, | |
| first_usable_pos, last_usable_pos, | |
| guid, parts | |
| ) | |
| @staticmethod | |
| def _crc(data): | |
| data = bytearray(data) | |
| data[16:20] = b'\0\0\0\0' | |
| return binascii.crc32(data[0:0x5C]) | |
| def iter_partitions(self): | |
| for pte in self.partitions: | |
| if pte.type.bytes != b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0': | |
| yield pte | |
| def print(self): | |
| print(f"{'' if self.valid else 'Invalid '}GPT Table") | |
| print("### GUID Type GUID Flags Start pos End pos Name") | |
| for i, p in enumerate(self.partitions): | |
| if p.type.bytes == b'\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0': | |
| continue | |
| print(f"{i: 3d} {p.guid} {p.type} {p.flags: 18X} {p.start_pos: 10d} {p.end_pos: 10d} {p.name}") | |
| @dataclasses.dataclass(frozen=True) | |
| class IsoVolumeDescriptor: | |
| device: BlockDevice | |
| valid: bool | |
| type: int | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 2048 | |
| valid = data[1:7] == b'CD001\x01' | |
| type = data[0] | |
| data = data[7:] | |
| if not valid: | |
| return cls(device, False, type) | |
| if type == 0: | |
| return IsoBootVolume.from_bytes(device, data) | |
| if type == 1: | |
| return IsoPrimaryVolume.from_bytes(device, data) | |
| if type == 2: | |
| return IsoSupplementaryVolume.from_bytes(device, data) | |
| if type == 3: | |
| return IsoPartitionDescriptor.from_bytes(device, data) | |
| if type == 255: | |
| return IsoTerminator(device, True, 255) | |
| return cls(device, False, type) | |
| def to_display(self): | |
| return '' | |
| def print(self): | |
| pass | |
| @dataclasses.dataclass(frozen=True) | |
| class IsoBootVolume(IsoVolumeDescriptor): | |
| TYPE = 'BOOT' | |
| system_id: str | |
| boot_id: str | |
| data: bytes | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 2041 | |
| sys_id = data[0:32].decode('ascii').rstrip('\0') | |
| boot_id = data[32:64].decode('ascii').rstrip('\0') | |
| data = data[64:] | |
| if sys_id == 'EL TORITO SPECIFICATION': | |
| data = int.from_bytes(data[:4], 'little') | |
| return cls(device, True, 0, sys_id, boot_id, data) | |
| def to_display(self): | |
| if self.system_id == 'EL TORITO SPECIFICATION': | |
| return f'El Torito at 0x{self.data:x}' | |
| return self.system_id | |
| def et_read_entries(self): | |
| if self.system_id != 'EL TORITO SPECIFICATION': | |
| raise ValueError() | |
| entries = [] | |
| i = 0 | |
| while True: | |
| sect = self.device.read_at(self.data + i, 1) | |
| i += 1 | |
| for j in range(0, 2048, 32): | |
| data = sect[j:j+32] | |
| if data == b'\0' * 32: | |
| return entries | |
| entry = ETEntry.from_bytes(self.device, data) | |
| entries.append(entry) | |
| def print(self): | |
| for i in self.et_read_entries(): | |
| print(i) | |
| @dataclasses.dataclass(frozen=True) | |
| class ETEntry: | |
| device: BlockDevice | |
| valid: bool | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 32 | |
| code = data[0] | |
| if code == 1: | |
| return ValidationEntry.from_bytes(device, data) | |
| if code == 0 or code == 0x88: | |
| return SectionEntry.from_bytes(device, data) | |
| if code == 0x90 or code == 0x91: | |
| return SectionHeaderEntry.from_bytes(device, data) | |
| if code == 0x44: | |
| return ExtensionEntry.from_bytes(device, data) | |
| return cls(device, False) | |
| @dataclasses.dataclass(frozen=True) | |
| class ValidationEntry(ETEntry): | |
| platform_id: int | |
| id_string: str | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 32 | |
| platform_id = data[1] | |
| res_valid = data[2:4] == b'\0\0' | |
| id_string = data[4:28].decode('ascii').rstrip('\0') | |
| magic_valid = data[30:32] == b'\x55\xAA' | |
| checksum_valid = binascii.crc32(data) == 0 | |
| valid = res_valid and magic_valid and checksum_valid | |
| return cls(device, valid, platform_id, id_string) | |
| @dataclasses.dataclass(frozen=True) | |
| class SectionEntry(ETEntry): | |
| bootable: bool | |
| media_type: int | |
| load_segment: int | |
| system_type: int | |
| sect_count: int | |
| load_rba: int | |
| c_type: int | |
| c_data: bytes | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 32 | |
| bootable = data[0] != 0 | |
| media_type = data[1] | |
| load_segment = int.from_bytes(data[2:4], 'little') | |
| system_type = data[4] | |
| valid_res = data[5] == 0 | |
| sect_count = int.from_bytes(data[6:8], 'little') | |
| load_rba = int.from_bytes(data[8:12], 'little') | |
| c_type = data[12] | |
| c_data = data[13:32] if c_type else None | |
| valid = valid_res | |
| return cls(device, valid, bootable, media_type, load_segment, system_type, sect_count, load_rba, c_type, c_data) | |
| @dataclasses.dataclass(frozen=True) | |
| class SectionHeaderEntry(ETEntry): | |
| is_final: bool | |
| platform_id: int | |
| ent_count: int | |
| id_string: str | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 32 | |
| is_final = data[0] == 0x91 | |
| platform_id = data[1] | |
| ent_count = int.from_bytes(data[2:4], 'little') | |
| id_string = data[4:32].decode('ascii').rstrip('\0') | |
| return cls(device, True, is_final, platform_id, ent_count, id_string) | |
| @dataclasses.dataclass(frozen=True) | |
| class ExtensionEntry(ETEntry): | |
| is_final: bool | |
| data: bytes | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 32 | |
| is_final = data[0] & (1 << 4) == 0 | |
| return cls(device, True, is_final, data[2:32]) | |
| class IsoFileFlag(enum.IntFlag): | |
| HIDDEN = 1 << 0 | |
| DIRECTORY = 1 << 1 | |
| ASSOCIATED = 1 << 2 | |
| EXT_FORMAT = 1 << 3 | |
| EXT_PERMISSIONS = 1 << 4 | |
| HAS_NEXT = 1 << 7 | |
| @dataclasses.dataclass(frozen=True) | |
| class IsoDirectory: | |
| device: BlockDevice | |
| valid: bool | |
| data_offset: int | |
| data_length: int | |
| date: datetime.datetime | |
| flags: IsoFileFlag | |
| unit_size: int | |
| gap_size: int | |
| volume_seq: int | |
| name: str | |
| app_data: bytes | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| length = data[0] | |
| if length == 0: | |
| return None, 0 | |
| ext_length = data[1] | |
| extent_loc = _lbef(data[2:10]) | |
| data_length = _lbef(data[10:18]) | |
| date = data[18:25] | |
| flags = IsoFileFlag(data[25]) | |
| unit_size = data[26] | |
| gap_size = data[27] | |
| vol_seq = _lbef(data[28:32]) | |
| name_len = data[32] | |
| name = data[33:33+name_len] | |
| app_data = data[name_len+33:length] | |
| valid = True | |
| return cls(device, valid, extent_loc, data_length, date, flags, unit_size, gap_size, vol_seq, name, app_data), length | |
| @property | |
| def is_directory(self): | |
| return bool(self.flags & IsoFileFlag.DIRECTORY) | |
| def __repr__(self): | |
| parts = [] | |
| if not self.valid: | |
| parts.append("Invalid ") | |
| if self.is_directory: | |
| parts.append("directory ") | |
| else: | |
| parts.append("file ") | |
| parts.append(repr(self.name)) | |
| if not self.is_directory: | |
| parts.append(f" ({self.data_length} bytes)") | |
| parts.append(f" {self.flags&~IsoFileFlag.DIRECTORY} {self.date}") | |
| return ''.join(parts) | |
| def print(self, offset=0): | |
| print(' ' * offset, self) | |
| if self.is_directory: | |
| for ch in self.read_children(): | |
| if ch.name in {b'\x00', b'\x01'}: | |
| continue | |
| ch.print(offset+1) | |
| else: | |
| self.device.seek(self.data_offset) | |
| data = self.device.read_bytes(self.data_length) | |
| # print(' ' * (offset+1), data) | |
| def read_data(self): | |
| if self.is_directory: | |
| raise ValueError("Is a directory") | |
| self.device.seek(self.data_offset) | |
| return self.device.read_bytes(self.data_length) | |
| def read_children(self): | |
| if not self.is_directory: | |
| raise ValueError("Not a directory") | |
| children = [] | |
| d = self.data_length // self.device.block_size | |
| for i in range(d): | |
| data = self.device.read_at(self.data_offset + i, 1) | |
| while data: | |
| dir, length = IsoDirectory.from_bytes(self.device, data) | |
| if length == 0: | |
| break | |
| data = data[length:] | |
| children.append(dir) | |
| return children | |
| @dataclasses.dataclass(frozen=True) | |
| class IsoPrimaryVolume(IsoVolumeDescriptor): | |
| TYPE = 'PRIM' | |
| system_id: str | |
| volume_id: str | |
| space_size: int | |
| set_size: int | |
| seq_number: int | |
| block_size: int | |
| path_table_size: int | |
| path_table_lba: int | |
| opt_path_table_lba: int | |
| root_dir: IsoDirectory | |
| set_id: str | |
| pub_id: str | |
| dpr_id: str | |
| app_id: str | |
| copyright_file: str | |
| abstract_file: str | |
| bibliographic_file: str | |
| create_date: bytes | |
| modify_date: bytes | |
| expire_date: bytes | |
| effective_date: bytes | |
| app_data: bytes | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 2041 | |
| sys_id = data[1:33].decode('ascii').rstrip(' ') | |
| vol_id = data[33:65].decode('ascii').rstrip(' ') | |
| if data[65:73] != b'\0\0\0\0\0\0\0\0': | |
| raise RuntimeError() | |
| space_size = _lbef(data[73:81]) | |
| if data[81:113] != b'\0' * 32: | |
| raise RuntimeError() | |
| set_size = _lbef(data[113:117]) | |
| seq_number = _lbef(data[117:121]) | |
| block_size = _lbef(data[121:125]) | |
| path_table_size = _lbef(data[125:133]) | |
| path_table_lba = int.from_bytes(data[133:137], 'little') | |
| opt_path_table_lba = int.from_bytes(data[137:141], 'little') | |
| root_dir, root_length = IsoDirectory.from_bytes(device, data[149:183]) | |
| if root_length != 34: | |
| raise RuntimeError() | |
| set_id = data[183:311].decode('ascii').rstrip(' ') | |
| pub_id = data[311:439].decode('ascii').rstrip(' ') | |
| dpr_id = data[439:567].decode('ascii').rstrip(' ') | |
| app_id = data[567:695].decode('ascii').rstrip(' ') | |
| copyright_file = data[695:732].decode('ascii').rstrip(' ') | |
| abstract_file = data[732:769].decode('ascii').rstrip(' ') | |
| bibliographic_file = data[769:806].decode('ascii').rstrip(' ') | |
| create_date = data[806:823] | |
| modify_date = data[823:840] | |
| expire_date = data[840:857] | |
| effective_date = data[857:874] | |
| if data[874:876] != b'\x01\x00': | |
| raise ValueError() | |
| app_data = data[876:1388] | |
| return cls(device, True, 1, sys_id, vol_id, space_size, set_size, seq_number, block_size, path_table_size, path_table_lba, opt_path_table_lba, | |
| root_dir, set_id, pub_id, dpr_id, app_id, copyright_file, abstract_file, bibliographic_file, create_date, modify_date, | |
| expire_date, effective_date, app_data) | |
| def to_display(self): | |
| return f"{self.system_id} {self.volume_id!r} {self.space_size}" | |
| def print(self): | |
| print(f"Disk label: {self.volume_id}") | |
| print(f"Volume set: {self.set_id} ({self.seq_number} / {self.set_size})") | |
| print(f"Publisher: {self.pub_id}") | |
| print(f"Data preparer: {self.dpr_id}") | |
| print(f"Application: {self.app_id}") | |
| print() | |
| print(f"Copyright file: {self.copyright_file}") | |
| print(f"Abstract file: {self.abstract_file}") | |
| print(f"Bibliographic file: {self.bibliographic_file}") | |
| print() | |
| print(f"Creation date: {self.create_date}") | |
| print(f"Modification date: {self.modify_date}") | |
| print(f"Expire date: {self.expire_date}") | |
| print(f"Effective date: {self.effective_date}") | |
| print() | |
| print(f"Block size: {self.block_size:x}") | |
| print(f"Application data: {self.app_data}") | |
| self.device.seek(self.path_table_lba) | |
| pt = self.device.read_bytes(self.path_table_size) | |
| print(pt) | |
| # self.root_dir.print() | |
| def _lbef(data: bytes): | |
| assert len(data) % 2 == 0 | |
| # assert data[:len(data) //2] == reversed(data[len(data) //2:]) | |
| return int.from_bytes(data[:len(data) //2], 'little') | |
| @dataclasses.dataclass(frozen=True) | |
| class IsoSupplementaryVolume(IsoVolumeDescriptor): | |
| TYPE = 'SUPP' | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 2041 | |
| return cls(device, True, 2, ) | |
| @dataclasses.dataclass(frozen=True) | |
| class IsoPartitionDescriptor(IsoVolumeDescriptor): | |
| TYPE = 'PART' | |
| @classmethod | |
| def from_bytes(cls, device: BlockDevice, data: bytes): | |
| assert len(data) == 2041 | |
| return cls(device, True, 3, ) | |
| @dataclasses.dataclass(frozen=True) | |
| class IsoTerminator(IsoVolumeDescriptor): | |
| TYPE = 'TERM' | |
| @dataclasses.dataclass(frozen=True) | |
| class IsoFS: | |
| device: BlockDevice | |
| valid: bool | |
| volumes: list[IsoVolumeDescriptor] | |
| data_offset: int | |
| @classmethod | |
| def read(cls, device: BlockDevice): | |
| vols = [] | |
| device.seek(16) | |
| offset = 16 | |
| valid = True | |
| while True: | |
| sect = device.read(1) | |
| offset += 1 | |
| vol = IsoVolumeDescriptor.from_bytes(device, sect) | |
| if isinstance(vol, IsoTerminator): | |
| break | |
| valid = valid and vol.valid | |
| vols.append(vol) | |
| return cls(device, valid, vols, offset) | |
| def print(self): | |
| print('ISO filesystem') | |
| print("## Type") | |
| for i, v in enumerate(self.volumes): | |
| print(f"{i: 2d} {v.TYPE} {v.to_display()}") | |
| v.print() | |
| def main(): | |
| with open('C:\\alt.workstation.img', 'rb') as f: | |
| dev = BlockDevice(f, 512) | |
| mbr = MBR.read(dev) | |
| mbr.print() | |
| gpt = GPTTable.read(dev) | |
| gpt.print() | |
| # gpt2 = GPTTable.read(dev, is_backup=True) | |
| # gpt2.print() | |
| dev.block_size = 2048 | |
| iso = IsoFS.read(dev) | |
| iso.print() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment