Skip to content

Instantly share code, notes, and snippets.

@moriyoshi
Last active November 12, 2025 13:11
Show Gist options
  • Select an option

  • Save moriyoshi/b7987482c2cf7541ad2e987196b93220 to your computer and use it in GitHub Desktop.

Select an option

Save moriyoshi/b7987482c2cf7541ad2e987196b93220 to your computer and use it in GitHub Desktop.
from __future__ import annotations
import asyncio
import concurrent.futures as cf
import dataclasses
import datetime
import logging
import os
import pathlib
import re
import types
import struct
import sys
import zlib
from collections import deque
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Coroutine, Iterator, Iterable, Sequence
from typing import Any, BinaryIO, Generic, Never, Protocol, TypeVar, TYPE_CHECKING
import aioboto3
import structlog
if TYPE_CHECKING:
from types_aiobotocore_s3.client import S3Client as AsyncS3Client
logger = structlog.get_logger(__name__)
T = TypeVar("T")
def unwrap_optional(v: T | None) -> T:
assert v is not None
return v
class QueueShutdown(Exception):
pass
class Signal:
signaled: bool
cond: asyncio.Condition
async def __call__(self) -> None:
if self.signaled:
return
async with self.cond:
await self.cond.wait()
async def signal(self) -> None:
try:
self.signaled = True
async with self.cond:
self.cond.notify_all()
except Exception:
import traceback
traceback.print_exc()
raise
def __init__(self) -> None:
self.signaled = False
self.cond = asyncio.Condition()
class ShutdownableQueue(Generic[T]):
queue: asyncio.Queue[T]
signal: Signal
async def put(self, item: T) -> None:
if self.signal.signaled:
raise QueueShutdown()
await self.queue.put(item)
async def get(self) -> T:
try:
return self.queue.get_nowait()
except asyncio.QueueEmpty:
pass
if self.signal.signaled:
raise QueueShutdown()
queue_signal_task = asyncio.create_task(self.signal())
dequeue_task = asyncio.create_task(self.queue.get())
done, _ = await asyncio.wait([dequeue_task, queue_signal_task], return_when=asyncio.FIRST_COMPLETED)
dequeued = dequeue_task in done
try:
if not dequeued:
assert self.signal.signaled
raise QueueShutdown()
return dequeue_task.result()
finally:
if not dequeued:
dequeue_task.cancel()
if queue_signal_task not in done:
queue_signal_task.cancel()
def __init__(self, signal: Signal, size: int = 0) -> None:
self.queue = asyncio.Queue(size)
self.signal = signal
@dataclasses.dataclass
class ChunkDescriptor:
i: int
f: BinaryIO | None
start: int
end: int
prev: ChunkDescriptor | None
next: ChunkDescriptor | None
def dispose(self) -> None:
if self.f is not None:
self.f.close()
self.f = None
def _adjust_chunk_offset(chunk: ChunkDescriptor) -> None:
if chunk.prev is None:
return None
prev = chunk.prev
assert prev.f is not None
prev.f.seek(chunk.start - 1)
c = prev.f.read(1)
if c != b'\n':
prev.f.readline()
chunk.start = prev.end = prev.f.tell()
prev.f.seek(prev.start)
async def _read_file_chunks_inner(
executor: cf.ThreadPoolExecutor,
opener: Callable[[], BinaryIO],
chunk_size: int,
) -> Sequence[ChunkDescriptor]:
f = opener()
try:
if not f.seekable():
raise ValueError("stream must be seekable")
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(0, os.SEEK_SET)
except Exception:
f.close()
raise
chunks: list[ChunkDescriptor] = []
if size == 0:
return chunks
prev: ChunkDescriptor | None = None
offset = 0
next_offset: int | None = None
while True:
next_offset = min(offset + chunk_size, size)
chunk = ChunkDescriptor(
i=len(chunks),
f=f,
start=offset,
end=next_offset,
prev=prev,
next=None,
)
if prev is not None:
prev.next = chunk
chunks.append(chunk)
prev = chunk
if next_offset >= size:
break
f = opener()
offset = next_offset
await asyncio.wait([
asyncio.wrap_future(executor.submit(_adjust_chunk_offset, chunk))
for chunk in chunks
])
for chunk in chunks:
unwrap_optional(chunk.f).seek(chunk.start)
return chunks
def read_lines_upto(f: BinaryIO, pos: int) -> Iterator[bytes]:
while f.tell() < pos:
try:
l = next(f)
except StopIteration:
break
yield l
async def read_file_chunks(
executor: cf.ThreadPoolExecutor,
opener: Callable[[], BinaryIO],
chunk_size: int = 16777216,
) -> Sequence[asyncio.Future[tuple[int, Sequence[bytes]]]]:
chunks = await _read_file_chunks_inner(executor, opener, chunk_size)
logger.debug("chunks of the file list", chunks=len(chunks))
return [
asyncio.wrap_future(executor.submit(lambda chunk: (chunk.i, list(read_lines_upto(unwrap_optional(chunk.f), chunk.end))), chunk))
for chunk in chunks
]
class _ScanChunksInner(Generic[T]):
inner: _ScanChunks[T]
current_iter: Iterator[T] | None
should_call_aexit: bool
async def __anext__(self) -> T:
if self.inner.task_group is None:
self.should_call_aexit = True
await self.inner.__aenter__()
while True:
if self.current_iter is None:
try:
self.current_iter = iter(await self.inner.queue.get())
except QueueShutdown:
if self.should_call_aexit:
await self.inner.__aexit__(None, None, None)
raise StopAsyncIteration
try:
return next(self.current_iter)
except StopIteration:
pass
self.current_iter = None
def __init__(self, inner: _ScanChunks[T]) -> None:
self.inner = inner
self.current_iter = None
self.should_call_aexit = False
class _ScanChunks(Generic[T]):
executor: cf.ThreadPoolExecutor
path: pathlib.Path
callback: Callable[[bytes], Coroutine[Any, Any, Iterable[T]]]
signal: Signal
queue: ShutdownableQueue[Iterable[T]]
task_group: asyncio.TaskGroup | None
async def _scan_chunk(
self,
fu: asyncio.Future[tuple[int, Sequence[bytes]]],
) -> None:
assert self.task_group is not None
i, lines = await fu
logger.debug(f"reading file list chunk #{i}", lines=len(lines), first_line=(lines[0] if lines else None))
try:
for result in asyncio.as_completed([
self.task_group.create_task(self.callback(line))
for line in lines
]):
await self.queue.put(await result)
except Exception as e:
import traceback
traceback.print_exc()
logger.error("error occurred in _scan_chunk", exc=e)
def __aiter__(self) -> _ScanChunksInner:
return _ScanChunksInner(self)
async def __aenter__(self) -> _ScanChunks:
chunks_fu = await read_file_chunks(self.executor, lambda: self.path.open("rb"))
self.task_group = await asyncio.TaskGroup().__aenter__()
for fu in chunks_fu:
self.task_group.create_task(self._scan_chunk(fu))
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
tb: types.TracebackType | None,
) -> None:
assert self.task_group is not None
await self.signal.signal()
await self.task_group.__aexit__(exc_type, exc_value, tb)
def __init__(
self,
executor: cf.ThreadPoolExecutor,
path: pathlib.Path,
callback: Callable[[bytes], Coroutine[Any, Any, Iterable[T]]],
) -> None:
self.executor = executor
self.path = path
self.callback = callback
self.signal = Signal()
self.queue = ShutdownableQueue(self.signal, 0)
self.task_group = None
def scan_chunks(
executor: cf.ThreadPoolExecutor,
path: pathlib.Path,
callback: Callable[[bytes], Coroutine[Any, Any, Iterable[T]]],
) -> _ScanChunks[T]:
return _ScanChunks(executor, path, callback)
def _parse_line(line: str) -> tuple[datetime.datetime, int, str]:
size, _, key = line[19:].strip().partition(" ")
return (
datetime.datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S"),
int(size),
key,
)
class AsyncReaderStream(Protocol):
async def read(self, n: int = -1) -> bytes:
pass
class BufferedAsyncReaderStream:
reader: AsyncReaderStream
chunk_size: int
buf: deque[bytes]
buf_bytes: int
eof: bool
def _append_buf(self, b: bytes) -> None:
self.buf.append(b)
self.buf_bytes += len(b)
def _pop_buf(self) -> bytes:
chunk = self.buf.popleft()
self.buf_bytes -= len(chunk)
return chunk
def unread(self, b: bytes) -> None:
self.buf.appendleft(b)
self.buf_bytes += len(b)
async def read_upto(self, c: bytes) -> bytes | None:
r = bytearray()
while True:
if not self.buf:
if not await self._feed_one():
return None
chunk = self._pop_buf()
i = chunk.find(c)
if i >= 0:
i += 1
if i < len(chunk):
self.unread(chunk[i:])
r.extend(chunk[:i])
break
else:
r.extend(chunk)
return bytes(r)
async def __anext__(self) -> bytes:
l = await self.read_upto(b"\n")
if l is None:
raise StopAsyncIteration
return l
async def _feed_one(self) -> bool:
if self.eof:
return False
chunk = await self.reader.read(self.chunk_size)
if not chunk:
self.eof = True
return False
self._append_buf(chunk)
return True
async def _feed_buf(self, n: int = -1) -> None:
while n < 0 or self.buf_bytes < n:
if not await self._feed_one():
break
async def read(self, n: int = -1) -> bytes:
await self._feed_buf(n)
bn = 0
chunks: list[bytes] = []
while self.buf:
chunk = self._pop_buf()
bn += len(chunk)
chunks.append(chunk)
if bn >= n:
break
b = bytearray(bn)
o = 0
for chunk in chunks:
b[o:o + len(chunk)] = chunk
o += len(chunk)
if bn > n:
b = b[:n]
self.unread(chunk[n-bn:])
return b
def __aiter__(self) -> AsyncIterator[bytes]:
return self
def __init__(self, reader: AsyncReaderStream, chunk_size: int) -> None:
self.reader = reader
self.chunk_size = chunk_size
self.buf = deque()
self.buf_bytes = 0
self.eof = False
class AsyncDecompressor(BufferedAsyncReaderStream):
reader: BufferedAsyncReaderStream
dec: zlib._Decompress | None
header_read: bool
async def _feed_one(self) -> bool:
if self.eof:
return False
while True:
if not self.header_read:
b = await self.reader.read(10)
if not b:
self.eof = True
return False
if len(b) < 10:
raise RuntimeError("unexpected EOF")
(id_, cm, flg, mtime, xfl, os_) = struct.unpack("<HBBLBB", b)
if flg & 4 != 0:
# FEXTRA
b = await self.reader.read(2)
if len(b) < 2:
raise RuntimeError("unexpected EOF")
(nb, ) = struct.unpack("<H", b)
b = await self.reader.read(nb)
if len(b) < nb:
raise RuntimeError("unexpected EOF")
if flg & 8:
# FNAME
bb = await self.reader.read_upto(b"\0")
if bb is None:
raise RuntimeError("unexpected EOF")
if flg & 16:
# FCOMMENT
bb = await self.reader.read_upto(b"\0")
if bb is None:
raise RuntimeError("unexpected EOF")
if flg & 2:
# FHCRC
b = await self.reader.read(2)
self.header_read = True
self.dec = zlib.decompressobj(-zlib.MAX_WBITS)
assert self.dec is not None
if self.dec.eof:
b = self.dec.flush()
unused_data = self.dec.unused_data
if len(unused_data) < 8:
# postamble (CRC32 + Size)
raise RuntimeError("unexpected EOF")
unused_data = unused_data[8:]
self.dec = None
if not unused_data:
if b:
self._append_buf(b)
return True
else:
return False
self.reader.unread(unused_data)
self.header_read = False
if b:
self._append_buf(b)
return True
else:
continue
raw_chunk = await self.reader.read(self.chunk_size)
if not raw_chunk:
if self.dec is None:
return False
b = self.dec.flush()
self.eof = True
if b:
self._append_buf(b)
return True
else:
return False
self._append_buf(self.dec.decompress(raw_chunk))
return True
def __init__(self, reader: BufferedAsyncReaderStream, chunk_size: int) -> None:
super().__init__(reader, chunk_size)
self.dec = None
self.header_read = False
ALB_LOG_REGEXP = re.compile(" ".join([
r'(?P<type>[^ ]*)',
r'(?P<time>[^ ]*)',
r'(?P<elb>[^ ]*)',
r'(?P<client_ip>[^ ]*):(?P<client_port>[0-9]*)',
r'(?:(?P<target_ip>[^ ]*):(?P<target_port>[0-9]*)|-)',
r'(?P<request_processing_time>[-.0-9]*)',
r'(?P<target_processing_time>[-.0-9]*)',
r'(?P<response_processing_time>[-.0-9]*)',
r'(?P<elb_status_code>|[-0-9]*)',
r'(?P<target_status_code>-|[-0-9]*)',
r'(?P<received_bytes>[-0-9]*)',
r'(?P<sent_bytes>[-0-9]*)',
r'"(?P<request_verb>[^ ]*) (?P<request_url>.*) (?P<request_proto>- |[^ ]*)"',
r'"(?P<user_agent>[^"]*)"',
r'(?P<ssl_cipher>[A-Z0-9-_]+)',
r'(?P<ssl_protocol>[A-Za-z0-9.-]*)',
r'(?P<target_group_arn>[^ ]*)',
r'"(?P<trace_id>[^"]*)"',
r'"(?P<domain_name>[^"]*)"',
r'"(?P<chosen_cert_arn>[^"]*)"',
r'(?P<matched_rule_priority>[-.0-9]*)',
r'(?P<request_creation_time>[^ ]*)',
r'"(?P<actions_executed>[^"]*)"',
r'"(?P<redirect_url>[^"]*)"',
r'"(?P<lambda_error_reason>[^"]*)"',
r'"(?P<target_port_list>[^"]*)"',
r'"(?P<target_status_code_list>[^"]*)"',
r'"(?P<classification>[^"]*)"',
r'"(?P<classification_reason>[^"]*)"',
]) + r'(?: (?P<conn_trace_id>[^ ]*))?')
@dataclasses.dataclass
class ALBLogEntry:
type: str
time: str
elb: str
client_ip: str
client_port: int
target_ip: str | None
target_port: int | None
request_processing_time: float
target_processing_time: float
response_processing_time: float
elb_status_code: int
target_status_code: str
received_bytes: int
sent_bytes: int
request_verb: str
request_url: str
request_proto: str
user_agent: str
ssl_cipher: str
ssl_protocol: str
target_group_arn: str
trace_id: str
domain_name: str
chosen_cert_arn: str
matched_rule_priority: str
request_creation_time: str
actions_executed: str
redirect_url: str
lambda_error_reason: str
target_port_list: Sequence[str]
target_status_code_list: Sequence[str]
classification: str
classification_reason: str
conn_trace_id: str
def parse_alb_log_line(line: str) -> ALBLogEntry:
m = ALB_LOG_REGEXP.match(line)
if m is None:
raise ValueError(f"invalid log entry: {line}")
def _str(k: str) -> str:
v = m.group(k)
if v is None:
raise ValueError(f"missing value for field {k}")
return v
def _int(k: str) -> int:
return int(_str(k))
def _maybe_int(k: str) -> int | None:
v = m.group(k)
return int(v) if v is not None else None
def _float(k: str) -> float:
return float(_str(k))
return ALBLogEntry(
type=_str("type"),
time=_str("time"),
elb=_str("elb"),
client_ip=_str("client_ip"),
client_port=_int("client_port"),
target_ip=m.group("target_ip"),
target_port=_maybe_int("target_port"),
request_processing_time=_float("request_processing_time"),
target_processing_time=_float("target_processing_time"),
response_processing_time=_float("response_processing_time"),
elb_status_code=_int("elb_status_code"),
target_status_code=_str("target_status_code"),
received_bytes=_int("received_bytes"),
sent_bytes=_int("sent_bytes"),
request_verb=_str("request_verb"),
request_url=_str("request_url"),
request_proto=_str("request_proto"),
user_agent=_str("user_agent"),
ssl_cipher=_str("ssl_cipher"),
ssl_protocol=_str("ssl_protocol"),
target_group_arn=_str("target_group_arn"),
trace_id=_str("trace_id"),
domain_name=_str("domain_name"),
chosen_cert_arn=_str("chosen_cert_arn"),
matched_rule_priority=_str("matched_rule_priority"),
request_creation_time=_str("request_creation_time"),
actions_executed=_str("actions_executed"),
redirect_url=_str("redirect_url"),
lambda_error_reason=_str("lambda_error_reason"),
target_port_list=_str("target_port_list").split(),
target_status_code_list=_str("target_status_code_list").split(),
classification=_str("classification"),
classification_reason=_str("classification_reason"),
conn_trace_id=m.group("conn_trace_id"),
)
async def _read_alb_log(client: AsyncS3Client, bucket: str, _date: datetime.datetime, size: int, key: str) -> Sequence[ALBLogEntry]:
logger.info("extracting logs", key=key)
resp =await client.get_object(
Bucket=bucket,
Key=key,
)
entries = []
try:
async for line in AsyncDecompressor(BufferedAsyncReaderStream(resp["Body"], 1048576), 1048576):
entry = parse_alb_log_line(line.decode("utf-8"))
entries.append(entry)
except Exception:
import traceback
traceback.print_exc()
raise
logger.info("log extracted", key=key, items=len(entries))
return entries
async def main(executor: cf.ThreadPoolExecutor):
session = aioboto3.Session()
bucket = "*****"
signal = Signal()
queue = ShutdownableQueue[tuple[asyncio.Future[Iterable[ALBLogEntry]], tuple[datetime.datetime, int, str]]](signal, 1000)
async def _enqueue(line: bytes) -> Iterable[ALBLogEntry]:
fu = asyncio.Future[Iterable[ALBLogEntry]]()
await queue.put((fu, _parse_line(line.decode("utf-8"))))
return await fu
async def _handler(client: AsyncS3Client, bucket: str) -> None:
try:
while True:
try:
item = await queue.get()
except QueueShutdown:
break
(fu, (date, size, key)) = item
fu.set_result(await _read_alb_log(client, bucket, date, size, key))
except Exception as e:
import traceback
traceback.print_exc()
logger.error("error occurred in _handler", exc=e)
raise
async with session.client("s3") as client:
async with asyncio.TaskGroup() as g:
for _ in range(10):
g.create_task(_handler(client, bucket))
try:
async for entry in scan_chunks(
executor,
pathlib.Path("/tmp/filelist.txt"),
_enqueue,
):
print("\t".join([entry.time, entry.request_url]))
finally:
await signal.signal()
structlog.configure(
logger_factory=structlog.PrintLoggerFactory(sys.stderr)
)
with cf.ThreadPoolExecutor() as executor:
asyncio.run(main(executor))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment