Created
October 25, 2025 13:50
-
-
Save faresbakhit/f1fd11487bdc7043cb687ee411e19c63 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import heapq | |
| import math | |
| import sys | |
| from io import BufferedReader, BufferedWriter | |
| Symbol = int | |
| class Node: | |
| def __init__(self, sym: Symbol, freq: int): | |
| self.sym = sym | |
| self.freq = freq | |
| def __lt__(self, other) -> bool: | |
| return self.freq < other.freq | |
| def combine_with(self, other): | |
| return Node(0xFF + self.sym + other.sym, self.freq + other.freq) | |
| @staticmethod | |
| def is_composite(sym: Symbol) -> bool: | |
| return sym >= 0xFF | |
| class Huffman: | |
| def compress(self, input_bytes: bytes, outfile: BufferedWriter): | |
| """Compress the `input_bytes` bytes into the file `outfile` with the Huffman algorithm""" | |
| code: dict[Symbol, bytes] = {} | |
| statistics = self.build_statistics(input_bytes) | |
| entropy = self.get_entropy(statistics, len(input_bytes)) | |
| print(f"Entropy:\t{entropy:.3f} bits/symbol", file=sys.stderr) | |
| nodes_heap = self.build_nodes_heap(statistics) | |
| self.build_huffman(code, nodes_heap) | |
| average_length = self.get_average_length(statistics, len(input_bytes), code) | |
| print(f"Average length:\t{average_length:.3f} bits/symbol", file=sys.stderr) | |
| print(f"Redundancy:\t{average_length - entropy:.3f} bits/symbol", file=sys.stderr) | |
| for sym, codeword in code.items(): | |
| if not Node.is_composite(sym): | |
| outfile.write(repr(sym.to_bytes(1))[2:-1].encode() + b"\t" + codeword + b"\n") | |
| outfile.write(b"\n") | |
| for sym in input_bytes: | |
| outfile.write(code[sym]) | |
| outfile.write(b"\n") | |
| def build_huffman(self, code: dict[Symbol, bytes], nodes_heap: list[Node]): | |
| """The Huffman Coding algorithm""" | |
| if not nodes_heap: | |
| return | |
| node1 = heapq.heappop(nodes_heap) | |
| if not nodes_heap: | |
| code[node1.sym] = b"0" | |
| return | |
| node2 = heapq.heappop(nodes_heap) | |
| if not nodes_heap: | |
| code[node1.sym] = b"0" | |
| code[node2.sym] = b"1" | |
| return | |
| node3 = node1.combine_with(node2) | |
| heapq.heappush(nodes_heap, node3) | |
| self.build_huffman(code, nodes_heap) | |
| code[node1.sym] = code[node3.sym] + b"0" | |
| code[node2.sym] = code[node3.sym] + b"1" | |
| def build_nodes_heap(self, statistics: dict[Symbol, int]) -> list[Node]: | |
| nodes_heap = [Node(sym, freq) for sym, freq in statistics.items()] | |
| heapq.heapify(nodes_heap) | |
| return nodes_heap | |
| def build_statistics(self, input_bytes: bytes) -> tuple[dict[Symbol, int], int]: | |
| """Collect the number of occurrences for each symbol""" | |
| statistics: dict[Symbol, int] = {} | |
| for sym in input_bytes: | |
| if statistics.get(sym) is None: | |
| statistics[sym] = 0 | |
| statistics[sym] += 1 | |
| return statistics | |
| def get_entropy(self, statistics: dict[Symbol, int], count: int) -> float: | |
| """Calculate entropy of source""" | |
| s = 0.0 | |
| for n in statistics.values(): | |
| p = n / count | |
| s -= p * math.log2(p) | |
| return s | |
| def get_average_length(self, statistics: dict[Symbol, int], count: int, code: dict[Symbol, bytes]) -> float: | |
| """Calculate average length of Huffman code""" | |
| s = 0.0 | |
| for sym, n in statistics.items(): | |
| p = n / count | |
| s += p * len(code[sym]) | |
| return s | |
| def decompress(self, input_bytes: bytes, outfile: BufferedWriter): | |
| """Decompress a Huffman-encoded file""" | |
| parts = input_bytes.split(b"\n\n", 1) | |
| if len(parts) != 2: | |
| raise NotACompressedHuffmanFileError() | |
| header, encoded_data = parts | |
| encoded_data = encoded_data | |
| decoding_table = {} | |
| entries = header.split(b"\n") | |
| for entry in entries: | |
| entry = entry.split(b"\t") | |
| if len(entry) != 2: | |
| raise NotACompressedHuffmanFileError() | |
| sym = entry[0] | |
| code = entry[1] | |
| decoding_table[code.decode()] = sym.decode("unicode_escape") | |
| buffer = "" | |
| for bit in encoded_data.decode(): | |
| buffer += bit | |
| if buffer in decoding_table: | |
| outfile.write(decoding_table[buffer].encode()) | |
| buffer = "" | |
| class NotACompressedHuffmanFileError(RuntimeError): | |
| def __init__(self): | |
| super().__init__("not a compressed Huffman file") | |
| import argparse | |
| import textwrap | |
| def main(argv: list[str] | None = None) -> int: | |
| if argv is None: | |
| argv = sys.argv[1:] | |
| parser = argparse.ArgumentParser( | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| description="Compress files with the Huffman Coding algorithm and decompress them back", | |
| epilog=textwrap.dedent( | |
| """ | |
| Authors: Fares Ahmed Bakhit <20230277> | |
| Momen Desouky Mahmoud <20230431> | |
| """ | |
| ), | |
| ) | |
| parser.add_argument("input_bytes", type=argparse.FileType("rb")) | |
| parser.add_argument("OUTFILE", type=argparse.FileType("wb")) | |
| parser.add_argument("-d", "--decompress", action="store_true", help="decompress input_bytes into OUTFILE") | |
| args = parser.parse_args(argv) | |
| huffman = Huffman() | |
| with args.input_bytes as input_bytes, args.OUTFILE as outfile: | |
| if args.decompress: | |
| try: | |
| huffman.decompress(input_bytes.read(), outfile) | |
| except NotACompressedHuffmanFileError as e: | |
| print(f"error: {args.input_bytes.name}: {e}", file=sys.stderr) | |
| else: | |
| huffman.compress(input_bytes.read(), outfile) | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment