Skip to content

Instantly share code, notes, and snippets.

@moriyoshi
Created November 12, 2025 13:29
Show Gist options
  • Select an option

  • Save moriyoshi/9e8891e4e4100867fb45f4687b558dcf to your computer and use it in GitHub Desktop.

Select an option

Save moriyoshi/9e8891e4e4100867fb45f4687b558dcf to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"compress/gzip"
"context"
"fmt"
"io"
"log"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
// ALBLogEntry represents a parsed ALB log entry
type ALBLogEntry struct {
Type string
Time string
ELB string
ClientIP string
ClientPort int
TargetIP *string
TargetPort *int
RequestProcessingTime float64
TargetProcessingTime float64
ResponseProcessingTime float64
ELBStatusCode int
TargetStatusCode string
ReceivedBytes int
SentBytes int
RequestVerb string
RequestURL string
RequestProto string
UserAgent string
SSLCipher string
SSLProtocol string
TargetGroupARN string
TraceID string
DomainName string
ChosenCertARN string
MatchedRulePriority string
RequestCreationTime string
ActionsExecuted string
RedirectURL string
LambdaErrorReason string
TargetPortList []string
TargetStatusCodeList []string
Classification string
ClassificationReason string
ConnTraceID string
}
// ChunkDescriptor describes a chunk of the file to process
type ChunkDescriptor struct {
i int
start int64
end int64
}
// S3Object represents metadata about an S3 object from the file list
type S3Object struct {
Date time.Time
Size int
Key string
}
// ALB log regex pattern
var albLogRegexp = regexp.MustCompile(strings.Join([]string{
`(?P<type>[^ ]*)`,
`(?P<time>[^ ]*)`,
`(?P<elb>[^ ]*)`,
`(?P<client_ip>[^ ]*):(?P<client_port>[0-9]*)`,
`(?:(?P<target_ip>[^ ]*):(?P<target_port>[0-9]*)|-)`,
`(?P<request_processing_time>[-.0-9]*)`,
`(?P<target_processing_time>[-.0-9]*)`,
`(?P<response_processing_time>[-.0-9]*)`,
`(?P<elb_status_code>|[-0-9]*)`,
`(?P<target_status_code>-|[-0-9]*)`,
`(?P<received_bytes>[-0-9]*)`,
`(?P<sent_bytes>[-0-9]*)`,
`"(?P<request_verb>[^ ]*) (?P<request_url>.*) (?P<request_proto>- |[^ ]*)"`,
`"(?P<user_agent>[^"]*)"`,
`(?P<ssl_cipher>[A-Z0-9-_]+)`,
`(?P<ssl_protocol>[A-Za-z0-9.-]*)`,
`(?P<target_group_arn>[^ ]*)`,
`"(?P<trace_id>[^"]*)"`,
`"(?P<domain_name>[^"]*)"`,
`"(?P<chosen_cert_arn>[^"]*)"`,
`(?P<matched_rule_priority>[-.0-9]*)`,
`(?P<request_creation_time>[^ ]*)`,
`"(?P<actions_executed>[^"]*)"`,
`"(?P<redirect_url>[^"]*)"`,
`"(?P<lambda_error_reason>[^"]*)"`,
`"(?P<target_port_list>[^"]*)"`,
`"(?P<target_status_code_list>[^"]*)"`,
`"(?P<classification>[^"]*)"`,
`"(?P<classification_reason>[^"]*)"`,
}, " ") + `(?: (?P<conn_trace_id>[^ ]*))?`)
// parseLine parses a line from the file list (timestamp, size, key)
func parseLine(line string) (*S3Object, error) {
if len(line) < 19 {
return nil, fmt.Errorf("line too short: %s", line)
}
dateStr := line[:19]
rest := strings.TrimSpace(line[19:])
parts := strings.SplitN(rest, " ", 2)
if len(parts) < 2 {
return nil, fmt.Errorf("invalid line format: %s", line)
}
date, err := time.Parse("2006-01-02 15:04:05", dateStr)
if err != nil {
return nil, fmt.Errorf("invalid date: %w", err)
}
size, err := strconv.Atoi(parts[0])
if err != nil {
return nil, fmt.Errorf("invalid size: %w", err)
}
return &S3Object{
Date: date,
Size: size,
Key: parts[1],
}, nil
}
// parseALBLogLine parses a single ALB log line
func parseALBLogLine(line string) (*ALBLogEntry, error) {
matches := albLogRegexp.FindStringSubmatch(line)
if matches == nil {
return nil, fmt.Errorf("invalid log entry: %s", line)
}
names := albLogRegexp.SubexpNames()
result := make(map[string]string)
for i, name := range names {
if i != 0 && name != "" {
result[name] = matches[i]
}
}
getStr := func(key string) string {
return result[key]
}
getInt := func(key string) (int, error) {
v := result[key]
if v == "" || v == "-" {
return 0, nil
}
return strconv.Atoi(v)
}
getMaybeInt := func(key string) *int {
v := result[key]
if v == "" || v == "-" {
return nil
}
i, err := strconv.Atoi(v)
if err != nil {
return nil
}
return &i
}
getFloat := func(key string) (float64, error) {
v := result[key]
if v == "" || v == "-" {
return 0, nil
}
return strconv.ParseFloat(v, 64)
}
getMaybeStr := func(key string) *string {
v := result[key]
if v == "" || v == "-" {
return nil
}
return &v
}
clientPort, err := getInt("client_port")
if err != nil {
return nil, err
}
reqProcTime, err := getFloat("request_processing_time")
if err != nil {
return nil, err
}
targetProcTime, err := getFloat("target_processing_time")
if err != nil {
return nil, err
}
respProcTime, err := getFloat("response_processing_time")
if err != nil {
return nil, err
}
elbStatusCode, err := getInt("elb_status_code")
if err != nil {
return nil, err
}
receivedBytes, err := getInt("received_bytes")
if err != nil {
return nil, err
}
sentBytes, err := getInt("sent_bytes")
if err != nil {
return nil, err
}
targetPortList := strings.Fields(getStr("target_port_list"))
targetStatusCodeList := strings.Fields(getStr("target_status_code_list"))
return &ALBLogEntry{
Type: getStr("type"),
Time: getStr("time"),
ELB: getStr("elb"),
ClientIP: getStr("client_ip"),
ClientPort: clientPort,
TargetIP: getMaybeStr("target_ip"),
TargetPort: getMaybeInt("target_port"),
RequestProcessingTime: reqProcTime,
TargetProcessingTime: targetProcTime,
ResponseProcessingTime: respProcTime,
ELBStatusCode: elbStatusCode,
TargetStatusCode: getStr("target_status_code"),
ReceivedBytes: receivedBytes,
SentBytes: sentBytes,
RequestVerb: getStr("request_verb"),
RequestURL: getStr("request_url"),
RequestProto: getStr("request_proto"),
UserAgent: getStr("user_agent"),
SSLCipher: getStr("ssl_cipher"),
SSLProtocol: getStr("ssl_protocol"),
TargetGroupARN: getStr("target_group_arn"),
TraceID: getStr("trace_id"),
DomainName: getStr("domain_name"),
ChosenCertARN: getStr("chosen_cert_arn"),
MatchedRulePriority: getStr("matched_rule_priority"),
RequestCreationTime: getStr("request_creation_time"),
ActionsExecuted: getStr("actions_executed"),
RedirectURL: getStr("redirect_url"),
LambdaErrorReason: getStr("lambda_error_reason"),
TargetPortList: targetPortList,
TargetStatusCodeList: targetStatusCodeList,
Classification: getStr("classification"),
ClassificationReason: getStr("classification_reason"),
ConnTraceID: getStr("conn_trace_id"),
}, nil
}
// readFileChunks divides a file into chunks for parallel processing
func readFileChunks(path string, chunkSize int64) ([]ChunkDescriptor, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return nil, err
}
size := stat.Size()
if size == 0 {
return []ChunkDescriptor{}, nil
}
chunks := []ChunkDescriptor{}
offset := int64(0)
i := 0
for offset < size {
end := offset + chunkSize
if end > size {
end = size
}
chunks = append(chunks, ChunkDescriptor{
i: i,
start: offset,
end: end,
})
offset = end
i++
}
// Adjust chunk boundaries to align with line breaks
for i := 1; i < len(chunks); i++ {
file.Seek(chunks[i].start-1, io.SeekStart)
reader := bufio.NewReader(file)
c, err := reader.ReadByte()
if err != nil {
return nil, err
}
if c != '\n' {
// Read to end of line
_, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
return nil, err
}
newPos, err := file.Seek(0, io.SeekCurrent)
if err != nil {
return nil, err
}
chunks[i-1].end = newPos
chunks[i].start = newPos
}
}
return chunks, nil
}
// processChunk reads a chunk of the file and sends S3 objects to the queue
func processChunk(path string, chunk ChunkDescriptor, queue chan<- *S3Object) error {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
file.Seek(chunk.start, io.SeekStart)
reader := bufio.NewReader(io.LimitReader(file, chunk.end-chunk.start))
log.Printf("Processing chunk #%d (start=%d, end=%d)", chunk.i, chunk.start, chunk.end)
lineCount := 0
for {
line, err := reader.ReadString('\n')
if err != nil && err != io.EOF {
return err
}
if line != "" {
line = strings.TrimSpace(line)
if line != "" {
obj, err := parseLine(line)
if err != nil {
log.Printf("Error parsing line: %v", err)
continue
}
queue <- obj
lineCount++
}
}
if err == io.EOF {
break
}
}
log.Printf("Chunk #%d processed %d lines", chunk.i, lineCount)
return nil
}
// downloadAndProcessLog downloads a log file from S3, decompresses it, and parses entries
func downloadAndProcessLog(ctx context.Context, client *s3.Client, bucket string, obj *S3Object) ([]*ALBLogEntry, error) {
log.Printf("Downloading log: %s", obj.Key)
result, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj.Key,
})
if err != nil {
return nil, fmt.Errorf("failed to get object: %w", err)
}
defer result.Body.Close()
// Decompress gzip
gzipReader, err := gzip.NewReader(result.Body)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
defer gzipReader.Close()
entries := []*ALBLogEntry{}
scanner := bufio.NewScanner(gzipReader)
// Increase buffer size for long lines
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
for scanner.Scan() {
line := scanner.Text()
entry, err := parseALBLogLine(line)
if err != nil {
log.Printf("Error parsing ALB log line: %v", err)
continue
}
entries = append(entries, entry)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error reading log: %w", err)
}
log.Printf("Log extracted: %s, entries: %d", obj.Key, len(entries))
return entries, nil
}
// worker processes S3 objects from the queue
func worker(ctx context.Context, id int, client *s3.Client, bucket string, queue <-chan *S3Object, results chan<- *ALBLogEntry, wg *sync.WaitGroup) {
defer wg.Done()
for obj := range queue {
entries, err := downloadAndProcessLog(ctx, client, bucket, obj)
if err != nil {
log.Printf("Worker %d: error processing %s: %v", id, obj.Key, err)
continue
}
for _, entry := range entries {
results <- entry
}
}
}
func main() {
ctx := context.Background()
// Load AWS configuration
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatalf("Failed to load AWS config: %v", err)
}
client := s3.NewFromConfig(cfg)
bucket := "*****"
filePath := "/tmp/filelist.txt"
chunkSize := int64(16 * 1024 * 1024) // 16MB chunks
numWorkers := 10
// Read file in chunks
chunks, err := readFileChunks(filePath, chunkSize)
if err != nil {
log.Fatalf("Failed to read file chunks: %v", err)
}
log.Printf("File divided into %d chunks", len(chunks))
// Create channels
objectQueue := make(chan *S3Object, 1000)
resultQueue := make(chan *ALBLogEntry, 1000)
// Start workers
var workerWg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
workerWg.Add(1)
go worker(ctx, i, client, bucket, objectQueue, resultQueue, &workerWg)
}
// Start result printer
var printerWg sync.WaitGroup
printerWg.Add(1)
go func() {
defer printerWg.Done()
for entry := range resultQueue {
fmt.Printf("%s\t%s\n", entry.Time, entry.RequestURL)
}
}()
// Process chunks in parallel
var chunkWg sync.WaitGroup
for _, chunk := range chunks {
chunkWg.Add(1)
go func(c ChunkDescriptor) {
defer chunkWg.Done()
if err := processChunk(filePath, c, objectQueue); err != nil {
log.Printf("Error processing chunk %d: %v", c.i, err)
}
}(chunk)
}
// Wait for all chunks to be processed
chunkWg.Wait()
close(objectQueue)
// Wait for all workers to finish
workerWg.Wait()
close(resultQueue)
// Wait for printer to finish
printerWg.Wait()
log.Println("Done")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment