Skip to content

Instantly share code, notes, and snippets.

@amxv
Created July 5, 2025 22:47
Show Gist options
  • Select an option

  • Save amxv/9c930675a935cc10ed7fe2fc3068705a to your computer and use it in GitHub Desktop.

Select an option

Save amxv/9c930675a935cc10ed7fe2fc3068705a to your computer and use it in GitHub Desktop.
clw-v5.py
#!/usr/bin/env python3
"""
clw – Chunk-Line-Wrapper (robust stream version)
Keeps *every* write to stdout/stderr ≤1550 bytes so Codex Cloud
never triggers its 1 600-byte PTY crash guard, even when a program
prints megabytes without a single newline (pnpm, progress bars,
binary hexdumps, etc.).
"""
import binascii, os, re, sys, signal
# Quietly ignore downstream pipes closing early (grep -q, head -n1, …)
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
# Tunables ------------------------------------------------------------
MAX_LINE = int(os.getenv("CLW_MAX_LINE_LENGTH", 1550))
WRAP_MARK = (binascii.unhexlify(os.getenv("CLW_WRAP_MARK"))
if os.getenv("CLW_WRAP_MARK")
else "⏎".encode()) + b"\n"
CHUNK_SIZE = MAX_LINE - len(WRAP_MARK) # 1550-len(mark)
READ_BLOCK = 4096 # bytes per os.read
WORD_RE = re.compile(rb"^.*(\b).+", re.DOTALL)
# ---------------------------------------------------------------------
buf = bytearray()
def flush_piece(piece: bytes):
"""Slice *piece* into ≤CHUNK_SIZE chunks, trying to break on word-boundaries."""
start = 0
while start < len(piece):
end = start + CHUNK_SIZE
chunk = piece[start:end]
if len(chunk) == CHUNK_SIZE: # try neat word wrap
m = WORD_RE.match(chunk)
if m and m.start(1):
chunk = chunk[:m.start(1)]
end = start + len(chunk)
sys.stdout.buffer.write(chunk)
if end < len(piece): # wrapped ⇒ mark + EOL
sys.stdout.buffer.write(WRAP_MARK)
start = end
sys.stdout.buffer.flush()
fd = sys.stdin.fileno()
delims = (b'\n', b'\r')
while True:
data = os.read(fd, READ_BLOCK)
if not data: # EOF
if buf:
flush_piece(bytes(buf))
break
buf.extend(data)
# Process as many complete “logical lines” as we can find
search_pos = 0
while True:
# Locate earliest delimiter
idxs = [buf.find(d, search_pos) for d in delims]
idxs = [i for i in idxs if i != -1]
split = min(idxs) if idxs else -1
# If none found *and* buffer is getting large, force-wrap
if split == -1:
if len(buf) >= CHUNK_SIZE:
flush_piece(bytes(buf[:CHUNK_SIZE]))
del buf[:CHUNK_SIZE]
break
# Emit up to (but not including) the delimiter
if split:
flush_piece(bytes(buf[:split]))
# Emit the delimiter itself so TTY tricks keep working
flush_piece(buf[split:split+1])
del buf[:split+1]
search_pos = 0 # restart scan – buffer changed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment