Skip to content

Instantly share code, notes, and snippets.

@faresbakhit
Created October 25, 2025 13:50
Show Gist options
  • Select an option

  • Save faresbakhit/f1fd11487bdc7043cb687ee411e19c63 to your computer and use it in GitHub Desktop.

Select an option

Save faresbakhit/f1fd11487bdc7043cb687ee411e19c63 to your computer and use it in GitHub Desktop.
import heapq
import math
import sys
from io import BufferedReader, BufferedWriter
Symbol = int
class Node:
def __init__(self, sym: Symbol, freq: int):
self.sym = sym
self.freq = freq
def __lt__(self, other) -> bool:
return self.freq < other.freq
def combine_with(self, other):
return Node(0xFF + self.sym + other.sym, self.freq + other.freq)
@staticmethod
def is_composite(sym: Symbol) -> bool:
return sym >= 0xFF
class Huffman:
def compress(self, input_bytes: bytes, outfile: BufferedWriter):
"""Compress the `input_bytes` bytes into the file `outfile` with the Huffman algorithm"""
code: dict[Symbol, bytes] = {}
statistics = self.build_statistics(input_bytes)
entropy = self.get_entropy(statistics, len(input_bytes))
print(f"Entropy:\t{entropy:.3f} bits/symbol", file=sys.stderr)
nodes_heap = self.build_nodes_heap(statistics)
self.build_huffman(code, nodes_heap)
average_length = self.get_average_length(statistics, len(input_bytes), code)
print(f"Average length:\t{average_length:.3f} bits/symbol", file=sys.stderr)
print(f"Redundancy:\t{average_length - entropy:.3f} bits/symbol", file=sys.stderr)
for sym, codeword in code.items():
if not Node.is_composite(sym):
outfile.write(repr(sym.to_bytes(1))[2:-1].encode() + b"\t" + codeword + b"\n")
outfile.write(b"\n")
for sym in input_bytes:
outfile.write(code[sym])
outfile.write(b"\n")
def build_huffman(self, code: dict[Symbol, bytes], nodes_heap: list[Node]):
"""The Huffman Coding algorithm"""
if not nodes_heap:
return
node1 = heapq.heappop(nodes_heap)
if not nodes_heap:
code[node1.sym] = b"0"
return
node2 = heapq.heappop(nodes_heap)
if not nodes_heap:
code[node1.sym] = b"0"
code[node2.sym] = b"1"
return
node3 = node1.combine_with(node2)
heapq.heappush(nodes_heap, node3)
self.build_huffman(code, nodes_heap)
code[node1.sym] = code[node3.sym] + b"0"
code[node2.sym] = code[node3.sym] + b"1"
def build_nodes_heap(self, statistics: dict[Symbol, int]) -> list[Node]:
nodes_heap = [Node(sym, freq) for sym, freq in statistics.items()]
heapq.heapify(nodes_heap)
return nodes_heap
def build_statistics(self, input_bytes: bytes) -> tuple[dict[Symbol, int], int]:
"""Collect the number of occurrences for each symbol"""
statistics: dict[Symbol, int] = {}
for sym in input_bytes:
if statistics.get(sym) is None:
statistics[sym] = 0
statistics[sym] += 1
return statistics
def get_entropy(self, statistics: dict[Symbol, int], count: int) -> float:
"""Calculate entropy of source"""
s = 0.0
for n in statistics.values():
p = n / count
s -= p * math.log2(p)
return s
def get_average_length(self, statistics: dict[Symbol, int], count: int, code: dict[Symbol, bytes]) -> float:
"""Calculate average length of Huffman code"""
s = 0.0
for sym, n in statistics.items():
p = n / count
s += p * len(code[sym])
return s
def decompress(self, input_bytes: bytes, outfile: BufferedWriter):
"""Decompress a Huffman-encoded file"""
parts = input_bytes.split(b"\n\n", 1)
if len(parts) != 2:
raise NotACompressedHuffmanFileError()
header, encoded_data = parts
encoded_data = encoded_data
decoding_table = {}
entries = header.split(b"\n")
for entry in entries:
entry = entry.split(b"\t")
if len(entry) != 2:
raise NotACompressedHuffmanFileError()
sym = entry[0]
code = entry[1]
decoding_table[code.decode()] = sym.decode("unicode_escape")
buffer = ""
for bit in encoded_data.decode():
buffer += bit
if buffer in decoding_table:
outfile.write(decoding_table[buffer].encode())
buffer = ""
class NotACompressedHuffmanFileError(RuntimeError):
def __init__(self):
super().__init__("not a compressed Huffman file")
import argparse
import textwrap
def main(argv: list[str] | None = None) -> int:
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Compress files with the Huffman Coding algorithm and decompress them back",
epilog=textwrap.dedent(
"""
Authors: Fares Ahmed Bakhit <20230277>
Momen Desouky Mahmoud <20230431>
"""
),
)
parser.add_argument("input_bytes", type=argparse.FileType("rb"))
parser.add_argument("OUTFILE", type=argparse.FileType("wb"))
parser.add_argument("-d", "--decompress", action="store_true", help="decompress input_bytes into OUTFILE")
args = parser.parse_args(argv)
huffman = Huffman()
with args.input_bytes as input_bytes, args.OUTFILE as outfile:
if args.decompress:
try:
huffman.decompress(input_bytes.read(), outfile)
except NotACompressedHuffmanFileError as e:
print(f"error: {args.input_bytes.name}: {e}", file=sys.stderr)
else:
huffman.compress(input_bytes.read(), outfile)
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment