Created
July 5, 2025 22:47
-
-
Save amxv/9c930675a935cc10ed7fe2fc3068705a to your computer and use it in GitHub Desktop.
clw-v5.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| clw – Chunk-Line-Wrapper (robust stream version) | |
| Keeps *every* write to stdout/stderr ≤1550 bytes so Codex Cloud | |
| never triggers its 1 600-byte PTY crash guard, even when a program | |
| prints megabytes without a single newline (pnpm, progress bars, | |
| binary hexdumps, etc.). | |
| """ | |
| import binascii, os, re, sys, signal | |
| # Quietly ignore downstream pipes closing early (grep -q, head -n1, …) | |
| signal.signal(signal.SIGPIPE, signal.SIG_DFL) | |
| # Tunables ------------------------------------------------------------ | |
| MAX_LINE = int(os.getenv("CLW_MAX_LINE_LENGTH", 1550)) | |
| WRAP_MARK = (binascii.unhexlify(os.getenv("CLW_WRAP_MARK")) | |
| if os.getenv("CLW_WRAP_MARK") | |
| else "⏎".encode()) + b"\n" | |
| CHUNK_SIZE = MAX_LINE - len(WRAP_MARK) # 1550-len(mark) | |
| READ_BLOCK = 4096 # bytes per os.read | |
| WORD_RE = re.compile(rb"^.*(\b).+", re.DOTALL) | |
| # --------------------------------------------------------------------- | |
| buf = bytearray() | |
| def flush_piece(piece: bytes): | |
| """Slice *piece* into ≤CHUNK_SIZE chunks, trying to break on word-boundaries.""" | |
| start = 0 | |
| while start < len(piece): | |
| end = start + CHUNK_SIZE | |
| chunk = piece[start:end] | |
| if len(chunk) == CHUNK_SIZE: # try neat word wrap | |
| m = WORD_RE.match(chunk) | |
| if m and m.start(1): | |
| chunk = chunk[:m.start(1)] | |
| end = start + len(chunk) | |
| sys.stdout.buffer.write(chunk) | |
| if end < len(piece): # wrapped ⇒ mark + EOL | |
| sys.stdout.buffer.write(WRAP_MARK) | |
| start = end | |
| sys.stdout.buffer.flush() | |
| fd = sys.stdin.fileno() | |
| delims = (b'\n', b'\r') | |
| while True: | |
| data = os.read(fd, READ_BLOCK) | |
| if not data: # EOF | |
| if buf: | |
| flush_piece(bytes(buf)) | |
| break | |
| buf.extend(data) | |
| # Process as many complete “logical lines” as we can find | |
| search_pos = 0 | |
| while True: | |
| # Locate earliest delimiter | |
| idxs = [buf.find(d, search_pos) for d in delims] | |
| idxs = [i for i in idxs if i != -1] | |
| split = min(idxs) if idxs else -1 | |
| # If none found *and* buffer is getting large, force-wrap | |
| if split == -1: | |
| if len(buf) >= CHUNK_SIZE: | |
| flush_piece(bytes(buf[:CHUNK_SIZE])) | |
| del buf[:CHUNK_SIZE] | |
| break | |
| # Emit up to (but not including) the delimiter | |
| if split: | |
| flush_piece(bytes(buf[:split])) | |
| # Emit the delimiter itself so TTY tricks keep working | |
| flush_piece(buf[split:split+1]) | |
| del buf[:split+1] | |
| search_pos = 0 # restart scan – buffer changed |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment