Last active
November 12, 2025 13:11
-
-
Save moriyoshi/b7987482c2cf7541ad2e987196b93220 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import asyncio | |
| import concurrent.futures as cf | |
| import dataclasses | |
| import datetime | |
| import logging | |
| import os | |
| import pathlib | |
| import re | |
| import types | |
| import struct | |
| import sys | |
| import zlib | |
| from collections import deque | |
| from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Coroutine, Iterator, Iterable, Sequence | |
| from typing import Any, BinaryIO, Generic, Never, Protocol, TypeVar, TYPE_CHECKING | |
| import aioboto3 | |
| import structlog | |
| if TYPE_CHECKING: | |
| from types_aiobotocore_s3.client import S3Client as AsyncS3Client | |
| logger = structlog.get_logger(__name__) | |
| T = TypeVar("T") | |
| def unwrap_optional(v: T | None) -> T: | |
| assert v is not None | |
| return v | |
| class QueueShutdown(Exception): | |
| pass | |
| class Signal: | |
| signaled: bool | |
| cond: asyncio.Condition | |
| async def __call__(self) -> None: | |
| if self.signaled: | |
| return | |
| async with self.cond: | |
| await self.cond.wait() | |
| async def signal(self) -> None: | |
| try: | |
| self.signaled = True | |
| async with self.cond: | |
| self.cond.notify_all() | |
| except Exception: | |
| import traceback | |
| traceback.print_exc() | |
| raise | |
| def __init__(self) -> None: | |
| self.signaled = False | |
| self.cond = asyncio.Condition() | |
| class ShutdownableQueue(Generic[T]): | |
| queue: asyncio.Queue[T] | |
| signal: Signal | |
| async def put(self, item: T) -> None: | |
| if self.signal.signaled: | |
| raise QueueShutdown() | |
| await self.queue.put(item) | |
| async def get(self) -> T: | |
| try: | |
| return self.queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| pass | |
| if self.signal.signaled: | |
| raise QueueShutdown() | |
| queue_signal_task = asyncio.create_task(self.signal()) | |
| dequeue_task = asyncio.create_task(self.queue.get()) | |
| done, _ = await asyncio.wait([dequeue_task, queue_signal_task], return_when=asyncio.FIRST_COMPLETED) | |
| dequeued = dequeue_task in done | |
| try: | |
| if not dequeued: | |
| assert self.signal.signaled | |
| raise QueueShutdown() | |
| return dequeue_task.result() | |
| finally: | |
| if not dequeued: | |
| dequeue_task.cancel() | |
| if queue_signal_task not in done: | |
| queue_signal_task.cancel() | |
| def __init__(self, signal: Signal, size: int = 0) -> None: | |
| self.queue = asyncio.Queue(size) | |
| self.signal = signal | |
| @dataclasses.dataclass | |
| class ChunkDescriptor: | |
| i: int | |
| f: BinaryIO | None | |
| start: int | |
| end: int | |
| prev: ChunkDescriptor | None | |
| next: ChunkDescriptor | None | |
| def dispose(self) -> None: | |
| if self.f is not None: | |
| self.f.close() | |
| self.f = None | |
| def _adjust_chunk_offset(chunk: ChunkDescriptor) -> None: | |
| if chunk.prev is None: | |
| return None | |
| prev = chunk.prev | |
| assert prev.f is not None | |
| prev.f.seek(chunk.start - 1) | |
| c = prev.f.read(1) | |
| if c != b'\n': | |
| prev.f.readline() | |
| chunk.start = prev.end = prev.f.tell() | |
| prev.f.seek(prev.start) | |
| async def _read_file_chunks_inner( | |
| executor: cf.ThreadPoolExecutor, | |
| opener: Callable[[], BinaryIO], | |
| chunk_size: int, | |
| ) -> Sequence[ChunkDescriptor]: | |
| f = opener() | |
| try: | |
| if not f.seekable(): | |
| raise ValueError("stream must be seekable") | |
| f.seek(0, os.SEEK_END) | |
| size = f.tell() | |
| f.seek(0, os.SEEK_SET) | |
| except Exception: | |
| f.close() | |
| raise | |
| chunks: list[ChunkDescriptor] = [] | |
| if size == 0: | |
| return chunks | |
| prev: ChunkDescriptor | None = None | |
| offset = 0 | |
| next_offset: int | None = None | |
| while True: | |
| next_offset = min(offset + chunk_size, size) | |
| chunk = ChunkDescriptor( | |
| i=len(chunks), | |
| f=f, | |
| start=offset, | |
| end=next_offset, | |
| prev=prev, | |
| next=None, | |
| ) | |
| if prev is not None: | |
| prev.next = chunk | |
| chunks.append(chunk) | |
| prev = chunk | |
| if next_offset >= size: | |
| break | |
| f = opener() | |
| offset = next_offset | |
| await asyncio.wait([ | |
| asyncio.wrap_future(executor.submit(_adjust_chunk_offset, chunk)) | |
| for chunk in chunks | |
| ]) | |
| for chunk in chunks: | |
| unwrap_optional(chunk.f).seek(chunk.start) | |
| return chunks | |
| def read_lines_upto(f: BinaryIO, pos: int) -> Iterator[bytes]: | |
| while f.tell() < pos: | |
| try: | |
| l = next(f) | |
| except StopIteration: | |
| break | |
| yield l | |
| async def read_file_chunks( | |
| executor: cf.ThreadPoolExecutor, | |
| opener: Callable[[], BinaryIO], | |
| chunk_size: int = 16777216, | |
| ) -> Sequence[asyncio.Future[tuple[int, Sequence[bytes]]]]: | |
| chunks = await _read_file_chunks_inner(executor, opener, chunk_size) | |
| logger.debug("chunks of the file list", chunks=len(chunks)) | |
| return [ | |
| asyncio.wrap_future(executor.submit(lambda chunk: (chunk.i, list(read_lines_upto(unwrap_optional(chunk.f), chunk.end))), chunk)) | |
| for chunk in chunks | |
| ] | |
| class _ScanChunksInner(Generic[T]): | |
| inner: _ScanChunks[T] | |
| current_iter: Iterator[T] | None | |
| should_call_aexit: bool | |
| async def __anext__(self) -> T: | |
| if self.inner.task_group is None: | |
| self.should_call_aexit = True | |
| await self.inner.__aenter__() | |
| while True: | |
| if self.current_iter is None: | |
| try: | |
| self.current_iter = iter(await self.inner.queue.get()) | |
| except QueueShutdown: | |
| if self.should_call_aexit: | |
| await self.inner.__aexit__(None, None, None) | |
| raise StopAsyncIteration | |
| try: | |
| return next(self.current_iter) | |
| except StopIteration: | |
| pass | |
| self.current_iter = None | |
| def __init__(self, inner: _ScanChunks[T]) -> None: | |
| self.inner = inner | |
| self.current_iter = None | |
| self.should_call_aexit = False | |
| class _ScanChunks(Generic[T]): | |
| executor: cf.ThreadPoolExecutor | |
| path: pathlib.Path | |
| callback: Callable[[bytes], Coroutine[Any, Any, Iterable[T]]] | |
| signal: Signal | |
| queue: ShutdownableQueue[Iterable[T]] | |
| task_group: asyncio.TaskGroup | None | |
| async def _scan_chunk( | |
| self, | |
| fu: asyncio.Future[tuple[int, Sequence[bytes]]], | |
| ) -> None: | |
| assert self.task_group is not None | |
| i, lines = await fu | |
| logger.debug(f"reading file list chunk #{i}", lines=len(lines), first_line=(lines[0] if lines else None)) | |
| try: | |
| for result in asyncio.as_completed([ | |
| self.task_group.create_task(self.callback(line)) | |
| for line in lines | |
| ]): | |
| await self.queue.put(await result) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| logger.error("error occurred in _scan_chunk", exc=e) | |
| def __aiter__(self) -> _ScanChunksInner: | |
| return _ScanChunksInner(self) | |
| async def __aenter__(self) -> _ScanChunks: | |
| chunks_fu = await read_file_chunks(self.executor, lambda: self.path.open("rb")) | |
| self.task_group = await asyncio.TaskGroup().__aenter__() | |
| for fu in chunks_fu: | |
| self.task_group.create_task(self._scan_chunk(fu)) | |
| return self | |
| async def __aexit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_value: BaseException | None, | |
| tb: types.TracebackType | None, | |
| ) -> None: | |
| assert self.task_group is not None | |
| await self.signal.signal() | |
| await self.task_group.__aexit__(exc_type, exc_value, tb) | |
| def __init__( | |
| self, | |
| executor: cf.ThreadPoolExecutor, | |
| path: pathlib.Path, | |
| callback: Callable[[bytes], Coroutine[Any, Any, Iterable[T]]], | |
| ) -> None: | |
| self.executor = executor | |
| self.path = path | |
| self.callback = callback | |
| self.signal = Signal() | |
| self.queue = ShutdownableQueue(self.signal, 0) | |
| self.task_group = None | |
| def scan_chunks( | |
| executor: cf.ThreadPoolExecutor, | |
| path: pathlib.Path, | |
| callback: Callable[[bytes], Coroutine[Any, Any, Iterable[T]]], | |
| ) -> _ScanChunks[T]: | |
| return _ScanChunks(executor, path, callback) | |
| def _parse_line(line: str) -> tuple[datetime.datetime, int, str]: | |
| size, _, key = line[19:].strip().partition(" ") | |
| return ( | |
| datetime.datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S"), | |
| int(size), | |
| key, | |
| ) | |
| class AsyncReaderStream(Protocol): | |
| async def read(self, n: int = -1) -> bytes: | |
| pass | |
| class BufferedAsyncReaderStream: | |
| reader: AsyncReaderStream | |
| chunk_size: int | |
| buf: deque[bytes] | |
| buf_bytes: int | |
| eof: bool | |
| def _append_buf(self, b: bytes) -> None: | |
| self.buf.append(b) | |
| self.buf_bytes += len(b) | |
| def _pop_buf(self) -> bytes: | |
| chunk = self.buf.popleft() | |
| self.buf_bytes -= len(chunk) | |
| return chunk | |
| def unread(self, b: bytes) -> None: | |
| self.buf.appendleft(b) | |
| self.buf_bytes += len(b) | |
| async def read_upto(self, c: bytes) -> bytes | None: | |
| r = bytearray() | |
| while True: | |
| if not self.buf: | |
| if not await self._feed_one(): | |
| return None | |
| chunk = self._pop_buf() | |
| i = chunk.find(c) | |
| if i >= 0: | |
| i += 1 | |
| if i < len(chunk): | |
| self.unread(chunk[i:]) | |
| r.extend(chunk[:i]) | |
| break | |
| else: | |
| r.extend(chunk) | |
| return bytes(r) | |
| async def __anext__(self) -> bytes: | |
| l = await self.read_upto(b"\n") | |
| if l is None: | |
| raise StopAsyncIteration | |
| return l | |
| async def _feed_one(self) -> bool: | |
| if self.eof: | |
| return False | |
| chunk = await self.reader.read(self.chunk_size) | |
| if not chunk: | |
| self.eof = True | |
| return False | |
| self._append_buf(chunk) | |
| return True | |
| async def _feed_buf(self, n: int = -1) -> None: | |
| while n < 0 or self.buf_bytes < n: | |
| if not await self._feed_one(): | |
| break | |
| async def read(self, n: int = -1) -> bytes: | |
| await self._feed_buf(n) | |
| bn = 0 | |
| chunks: list[bytes] = [] | |
| while self.buf: | |
| chunk = self._pop_buf() | |
| bn += len(chunk) | |
| chunks.append(chunk) | |
| if bn >= n: | |
| break | |
| b = bytearray(bn) | |
| o = 0 | |
| for chunk in chunks: | |
| b[o:o + len(chunk)] = chunk | |
| o += len(chunk) | |
| if bn > n: | |
| b = b[:n] | |
| self.unread(chunk[n-bn:]) | |
| return b | |
| def __aiter__(self) -> AsyncIterator[bytes]: | |
| return self | |
| def __init__(self, reader: AsyncReaderStream, chunk_size: int) -> None: | |
| self.reader = reader | |
| self.chunk_size = chunk_size | |
| self.buf = deque() | |
| self.buf_bytes = 0 | |
| self.eof = False | |
| class AsyncDecompressor(BufferedAsyncReaderStream): | |
| reader: BufferedAsyncReaderStream | |
| dec: zlib._Decompress | None | |
| header_read: bool | |
| async def _feed_one(self) -> bool: | |
| if self.eof: | |
| return False | |
| while True: | |
| if not self.header_read: | |
| b = await self.reader.read(10) | |
| if not b: | |
| self.eof = True | |
| return False | |
| if len(b) < 10: | |
| raise RuntimeError("unexpected EOF") | |
| (id_, cm, flg, mtime, xfl, os_) = struct.unpack("<HBBLBB", b) | |
| if flg & 4 != 0: | |
| # FEXTRA | |
| b = await self.reader.read(2) | |
| if len(b) < 2: | |
| raise RuntimeError("unexpected EOF") | |
| (nb, ) = struct.unpack("<H", b) | |
| b = await self.reader.read(nb) | |
| if len(b) < nb: | |
| raise RuntimeError("unexpected EOF") | |
| if flg & 8: | |
| # FNAME | |
| bb = await self.reader.read_upto(b"\0") | |
| if bb is None: | |
| raise RuntimeError("unexpected EOF") | |
| if flg & 16: | |
| # FCOMMENT | |
| bb = await self.reader.read_upto(b"\0") | |
| if bb is None: | |
| raise RuntimeError("unexpected EOF") | |
| if flg & 2: | |
| # FHCRC | |
| b = await self.reader.read(2) | |
| self.header_read = True | |
| self.dec = zlib.decompressobj(-zlib.MAX_WBITS) | |
| assert self.dec is not None | |
| if self.dec.eof: | |
| b = self.dec.flush() | |
| unused_data = self.dec.unused_data | |
| if len(unused_data) < 8: | |
| # postamble (CRC32 + Size) | |
| raise RuntimeError("unexpected EOF") | |
| unused_data = unused_data[8:] | |
| self.dec = None | |
| if not unused_data: | |
| if b: | |
| self._append_buf(b) | |
| return True | |
| else: | |
| return False | |
| self.reader.unread(unused_data) | |
| self.header_read = False | |
| if b: | |
| self._append_buf(b) | |
| return True | |
| else: | |
| continue | |
| raw_chunk = await self.reader.read(self.chunk_size) | |
| if not raw_chunk: | |
| if self.dec is None: | |
| return False | |
| b = self.dec.flush() | |
| self.eof = True | |
| if b: | |
| self._append_buf(b) | |
| return True | |
| else: | |
| return False | |
| self._append_buf(self.dec.decompress(raw_chunk)) | |
| return True | |
| def __init__(self, reader: BufferedAsyncReaderStream, chunk_size: int) -> None: | |
| super().__init__(reader, chunk_size) | |
| self.dec = None | |
| self.header_read = False | |
| ALB_LOG_REGEXP = re.compile(" ".join([ | |
| r'(?P<type>[^ ]*)', | |
| r'(?P<time>[^ ]*)', | |
| r'(?P<elb>[^ ]*)', | |
| r'(?P<client_ip>[^ ]*):(?P<client_port>[0-9]*)', | |
| r'(?:(?P<target_ip>[^ ]*):(?P<target_port>[0-9]*)|-)', | |
| r'(?P<request_processing_time>[-.0-9]*)', | |
| r'(?P<target_processing_time>[-.0-9]*)', | |
| r'(?P<response_processing_time>[-.0-9]*)', | |
| r'(?P<elb_status_code>|[-0-9]*)', | |
| r'(?P<target_status_code>-|[-0-9]*)', | |
| r'(?P<received_bytes>[-0-9]*)', | |
| r'(?P<sent_bytes>[-0-9]*)', | |
| r'"(?P<request_verb>[^ ]*) (?P<request_url>.*) (?P<request_proto>- |[^ ]*)"', | |
| r'"(?P<user_agent>[^"]*)"', | |
| r'(?P<ssl_cipher>[A-Z0-9-_]+)', | |
| r'(?P<ssl_protocol>[A-Za-z0-9.-]*)', | |
| r'(?P<target_group_arn>[^ ]*)', | |
| r'"(?P<trace_id>[^"]*)"', | |
| r'"(?P<domain_name>[^"]*)"', | |
| r'"(?P<chosen_cert_arn>[^"]*)"', | |
| r'(?P<matched_rule_priority>[-.0-9]*)', | |
| r'(?P<request_creation_time>[^ ]*)', | |
| r'"(?P<actions_executed>[^"]*)"', | |
| r'"(?P<redirect_url>[^"]*)"', | |
| r'"(?P<lambda_error_reason>[^"]*)"', | |
| r'"(?P<target_port_list>[^"]*)"', | |
| r'"(?P<target_status_code_list>[^"]*)"', | |
| r'"(?P<classification>[^"]*)"', | |
| r'"(?P<classification_reason>[^"]*)"', | |
| ]) + r'(?: (?P<conn_trace_id>[^ ]*))?') | |
| @dataclasses.dataclass | |
| class ALBLogEntry: | |
| type: str | |
| time: str | |
| elb: str | |
| client_ip: str | |
| client_port: int | |
| target_ip: str | None | |
| target_port: int | None | |
| request_processing_time: float | |
| target_processing_time: float | |
| response_processing_time: float | |
| elb_status_code: int | |
| target_status_code: str | |
| received_bytes: int | |
| sent_bytes: int | |
| request_verb: str | |
| request_url: str | |
| request_proto: str | |
| user_agent: str | |
| ssl_cipher: str | |
| ssl_protocol: str | |
| target_group_arn: str | |
| trace_id: str | |
| domain_name: str | |
| chosen_cert_arn: str | |
| matched_rule_priority: str | |
| request_creation_time: str | |
| actions_executed: str | |
| redirect_url: str | |
| lambda_error_reason: str | |
| target_port_list: Sequence[str] | |
| target_status_code_list: Sequence[str] | |
| classification: str | |
| classification_reason: str | |
| conn_trace_id: str | |
| def parse_alb_log_line(line: str) -> ALBLogEntry: | |
| m = ALB_LOG_REGEXP.match(line) | |
| if m is None: | |
| raise ValueError(f"invalid log entry: {line}") | |
| def _str(k: str) -> str: | |
| v = m.group(k) | |
| if v is None: | |
| raise ValueError(f"missing value for field {k}") | |
| return v | |
| def _int(k: str) -> int: | |
| return int(_str(k)) | |
| def _maybe_int(k: str) -> int | None: | |
| v = m.group(k) | |
| return int(v) if v is not None else None | |
| def _float(k: str) -> float: | |
| return float(_str(k)) | |
| return ALBLogEntry( | |
| type=_str("type"), | |
| time=_str("time"), | |
| elb=_str("elb"), | |
| client_ip=_str("client_ip"), | |
| client_port=_int("client_port"), | |
| target_ip=m.group("target_ip"), | |
| target_port=_maybe_int("target_port"), | |
| request_processing_time=_float("request_processing_time"), | |
| target_processing_time=_float("target_processing_time"), | |
| response_processing_time=_float("response_processing_time"), | |
| elb_status_code=_int("elb_status_code"), | |
| target_status_code=_str("target_status_code"), | |
| received_bytes=_int("received_bytes"), | |
| sent_bytes=_int("sent_bytes"), | |
| request_verb=_str("request_verb"), | |
| request_url=_str("request_url"), | |
| request_proto=_str("request_proto"), | |
| user_agent=_str("user_agent"), | |
| ssl_cipher=_str("ssl_cipher"), | |
| ssl_protocol=_str("ssl_protocol"), | |
| target_group_arn=_str("target_group_arn"), | |
| trace_id=_str("trace_id"), | |
| domain_name=_str("domain_name"), | |
| chosen_cert_arn=_str("chosen_cert_arn"), | |
| matched_rule_priority=_str("matched_rule_priority"), | |
| request_creation_time=_str("request_creation_time"), | |
| actions_executed=_str("actions_executed"), | |
| redirect_url=_str("redirect_url"), | |
| lambda_error_reason=_str("lambda_error_reason"), | |
| target_port_list=_str("target_port_list").split(), | |
| target_status_code_list=_str("target_status_code_list").split(), | |
| classification=_str("classification"), | |
| classification_reason=_str("classification_reason"), | |
| conn_trace_id=m.group("conn_trace_id"), | |
| ) | |
| async def _read_alb_log(client: AsyncS3Client, bucket: str, _date: datetime.datetime, size: int, key: str) -> Sequence[ALBLogEntry]: | |
| logger.info("extracting logs", key=key) | |
| resp =await client.get_object( | |
| Bucket=bucket, | |
| Key=key, | |
| ) | |
| entries = [] | |
| try: | |
| async for line in AsyncDecompressor(BufferedAsyncReaderStream(resp["Body"], 1048576), 1048576): | |
| entry = parse_alb_log_line(line.decode("utf-8")) | |
| entries.append(entry) | |
| except Exception: | |
| import traceback | |
| traceback.print_exc() | |
| raise | |
| logger.info("log extracted", key=key, items=len(entries)) | |
| return entries | |
| async def main(executor: cf.ThreadPoolExecutor): | |
| session = aioboto3.Session() | |
| bucket = "*****" | |
| signal = Signal() | |
| queue = ShutdownableQueue[tuple[asyncio.Future[Iterable[ALBLogEntry]], tuple[datetime.datetime, int, str]]](signal, 1000) | |
| async def _enqueue(line: bytes) -> Iterable[ALBLogEntry]: | |
| fu = asyncio.Future[Iterable[ALBLogEntry]]() | |
| await queue.put((fu, _parse_line(line.decode("utf-8")))) | |
| return await fu | |
| async def _handler(client: AsyncS3Client, bucket: str) -> None: | |
| try: | |
| while True: | |
| try: | |
| item = await queue.get() | |
| except QueueShutdown: | |
| break | |
| (fu, (date, size, key)) = item | |
| fu.set_result(await _read_alb_log(client, bucket, date, size, key)) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| logger.error("error occurred in _handler", exc=e) | |
| raise | |
| async with session.client("s3") as client: | |
| async with asyncio.TaskGroup() as g: | |
| for _ in range(10): | |
| g.create_task(_handler(client, bucket)) | |
| try: | |
| async for entry in scan_chunks( | |
| executor, | |
| pathlib.Path("/tmp/filelist.txt"), | |
| _enqueue, | |
| ): | |
| print("\t".join([entry.time, entry.request_url])) | |
| finally: | |
| await signal.signal() | |
| structlog.configure( | |
| logger_factory=structlog.PrintLoggerFactory(sys.stderr) | |
| ) | |
| with cf.ThreadPoolExecutor() as executor: | |
| asyncio.run(main(executor)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment