Created
November 12, 2025 13:29
-
-
Save moriyoshi/9e8891e4e4100867fb45f4687b558dcf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "bufio" | |
| "compress/gzip" | |
| "context" | |
| "fmt" | |
| "io" | |
| "log" | |
| "os" | |
| "regexp" | |
| "strconv" | |
| "strings" | |
| "sync" | |
| "time" | |
| "github.com/aws/aws-sdk-go-v2/config" | |
| "github.com/aws/aws-sdk-go-v2/service/s3" | |
| ) | |
| // ALBLogEntry represents a parsed ALB log entry | |
| type ALBLogEntry struct { | |
| Type string | |
| Time string | |
| ELB string | |
| ClientIP string | |
| ClientPort int | |
| TargetIP *string | |
| TargetPort *int | |
| RequestProcessingTime float64 | |
| TargetProcessingTime float64 | |
| ResponseProcessingTime float64 | |
| ELBStatusCode int | |
| TargetStatusCode string | |
| ReceivedBytes int | |
| SentBytes int | |
| RequestVerb string | |
| RequestURL string | |
| RequestProto string | |
| UserAgent string | |
| SSLCipher string | |
| SSLProtocol string | |
| TargetGroupARN string | |
| TraceID string | |
| DomainName string | |
| ChosenCertARN string | |
| MatchedRulePriority string | |
| RequestCreationTime string | |
| ActionsExecuted string | |
| RedirectURL string | |
| LambdaErrorReason string | |
| TargetPortList []string | |
| TargetStatusCodeList []string | |
| Classification string | |
| ClassificationReason string | |
| ConnTraceID string | |
| } | |
| // ChunkDescriptor describes a chunk of the file to process | |
| type ChunkDescriptor struct { | |
| i int | |
| start int64 | |
| end int64 | |
| } | |
| // S3Object represents metadata about an S3 object from the file list | |
| type S3Object struct { | |
| Date time.Time | |
| Size int | |
| Key string | |
| } | |
| // ALB log regex pattern | |
| var albLogRegexp = regexp.MustCompile(strings.Join([]string{ | |
| `(?P<type>[^ ]*)`, | |
| `(?P<time>[^ ]*)`, | |
| `(?P<elb>[^ ]*)`, | |
| `(?P<client_ip>[^ ]*):(?P<client_port>[0-9]*)`, | |
| `(?:(?P<target_ip>[^ ]*):(?P<target_port>[0-9]*)|-)`, | |
| `(?P<request_processing_time>[-.0-9]*)`, | |
| `(?P<target_processing_time>[-.0-9]*)`, | |
| `(?P<response_processing_time>[-.0-9]*)`, | |
| `(?P<elb_status_code>|[-0-9]*)`, | |
| `(?P<target_status_code>-|[-0-9]*)`, | |
| `(?P<received_bytes>[-0-9]*)`, | |
| `(?P<sent_bytes>[-0-9]*)`, | |
| `"(?P<request_verb>[^ ]*) (?P<request_url>.*) (?P<request_proto>- |[^ ]*)"`, | |
| `"(?P<user_agent>[^"]*)"`, | |
| `(?P<ssl_cipher>[A-Z0-9-_]+)`, | |
| `(?P<ssl_protocol>[A-Za-z0-9.-]*)`, | |
| `(?P<target_group_arn>[^ ]*)`, | |
| `"(?P<trace_id>[^"]*)"`, | |
| `"(?P<domain_name>[^"]*)"`, | |
| `"(?P<chosen_cert_arn>[^"]*)"`, | |
| `(?P<matched_rule_priority>[-.0-9]*)`, | |
| `(?P<request_creation_time>[^ ]*)`, | |
| `"(?P<actions_executed>[^"]*)"`, | |
| `"(?P<redirect_url>[^"]*)"`, | |
| `"(?P<lambda_error_reason>[^"]*)"`, | |
| `"(?P<target_port_list>[^"]*)"`, | |
| `"(?P<target_status_code_list>[^"]*)"`, | |
| `"(?P<classification>[^"]*)"`, | |
| `"(?P<classification_reason>[^"]*)"`, | |
| }, " ") + `(?: (?P<conn_trace_id>[^ ]*))?`) | |
| // parseLine parses a line from the file list (timestamp, size, key) | |
| func parseLine(line string) (*S3Object, error) { | |
| if len(line) < 19 { | |
| return nil, fmt.Errorf("line too short: %s", line) | |
| } | |
| dateStr := line[:19] | |
| rest := strings.TrimSpace(line[19:]) | |
| parts := strings.SplitN(rest, " ", 2) | |
| if len(parts) < 2 { | |
| return nil, fmt.Errorf("invalid line format: %s", line) | |
| } | |
| date, err := time.Parse("2006-01-02 15:04:05", dateStr) | |
| if err != nil { | |
| return nil, fmt.Errorf("invalid date: %w", err) | |
| } | |
| size, err := strconv.Atoi(parts[0]) | |
| if err != nil { | |
| return nil, fmt.Errorf("invalid size: %w", err) | |
| } | |
| return &S3Object{ | |
| Date: date, | |
| Size: size, | |
| Key: parts[1], | |
| }, nil | |
| } | |
| // parseALBLogLine parses a single ALB log line | |
| func parseALBLogLine(line string) (*ALBLogEntry, error) { | |
| matches := albLogRegexp.FindStringSubmatch(line) | |
| if matches == nil { | |
| return nil, fmt.Errorf("invalid log entry: %s", line) | |
| } | |
| names := albLogRegexp.SubexpNames() | |
| result := make(map[string]string) | |
| for i, name := range names { | |
| if i != 0 && name != "" { | |
| result[name] = matches[i] | |
| } | |
| } | |
| getStr := func(key string) string { | |
| return result[key] | |
| } | |
| getInt := func(key string) (int, error) { | |
| v := result[key] | |
| if v == "" || v == "-" { | |
| return 0, nil | |
| } | |
| return strconv.Atoi(v) | |
| } | |
| getMaybeInt := func(key string) *int { | |
| v := result[key] | |
| if v == "" || v == "-" { | |
| return nil | |
| } | |
| i, err := strconv.Atoi(v) | |
| if err != nil { | |
| return nil | |
| } | |
| return &i | |
| } | |
| getFloat := func(key string) (float64, error) { | |
| v := result[key] | |
| if v == "" || v == "-" { | |
| return 0, nil | |
| } | |
| return strconv.ParseFloat(v, 64) | |
| } | |
| getMaybeStr := func(key string) *string { | |
| v := result[key] | |
| if v == "" || v == "-" { | |
| return nil | |
| } | |
| return &v | |
| } | |
| clientPort, err := getInt("client_port") | |
| if err != nil { | |
| return nil, err | |
| } | |
| reqProcTime, err := getFloat("request_processing_time") | |
| if err != nil { | |
| return nil, err | |
| } | |
| targetProcTime, err := getFloat("target_processing_time") | |
| if err != nil { | |
| return nil, err | |
| } | |
| respProcTime, err := getFloat("response_processing_time") | |
| if err != nil { | |
| return nil, err | |
| } | |
| elbStatusCode, err := getInt("elb_status_code") | |
| if err != nil { | |
| return nil, err | |
| } | |
| receivedBytes, err := getInt("received_bytes") | |
| if err != nil { | |
| return nil, err | |
| } | |
| sentBytes, err := getInt("sent_bytes") | |
| if err != nil { | |
| return nil, err | |
| } | |
| targetPortList := strings.Fields(getStr("target_port_list")) | |
| targetStatusCodeList := strings.Fields(getStr("target_status_code_list")) | |
| return &ALBLogEntry{ | |
| Type: getStr("type"), | |
| Time: getStr("time"), | |
| ELB: getStr("elb"), | |
| ClientIP: getStr("client_ip"), | |
| ClientPort: clientPort, | |
| TargetIP: getMaybeStr("target_ip"), | |
| TargetPort: getMaybeInt("target_port"), | |
| RequestProcessingTime: reqProcTime, | |
| TargetProcessingTime: targetProcTime, | |
| ResponseProcessingTime: respProcTime, | |
| ELBStatusCode: elbStatusCode, | |
| TargetStatusCode: getStr("target_status_code"), | |
| ReceivedBytes: receivedBytes, | |
| SentBytes: sentBytes, | |
| RequestVerb: getStr("request_verb"), | |
| RequestURL: getStr("request_url"), | |
| RequestProto: getStr("request_proto"), | |
| UserAgent: getStr("user_agent"), | |
| SSLCipher: getStr("ssl_cipher"), | |
| SSLProtocol: getStr("ssl_protocol"), | |
| TargetGroupARN: getStr("target_group_arn"), | |
| TraceID: getStr("trace_id"), | |
| DomainName: getStr("domain_name"), | |
| ChosenCertARN: getStr("chosen_cert_arn"), | |
| MatchedRulePriority: getStr("matched_rule_priority"), | |
| RequestCreationTime: getStr("request_creation_time"), | |
| ActionsExecuted: getStr("actions_executed"), | |
| RedirectURL: getStr("redirect_url"), | |
| LambdaErrorReason: getStr("lambda_error_reason"), | |
| TargetPortList: targetPortList, | |
| TargetStatusCodeList: targetStatusCodeList, | |
| Classification: getStr("classification"), | |
| ClassificationReason: getStr("classification_reason"), | |
| ConnTraceID: getStr("conn_trace_id"), | |
| }, nil | |
| } | |
| // readFileChunks divides a file into chunks for parallel processing | |
| func readFileChunks(path string, chunkSize int64) ([]ChunkDescriptor, error) { | |
| file, err := os.Open(path) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer file.Close() | |
| stat, err := file.Stat() | |
| if err != nil { | |
| return nil, err | |
| } | |
| size := stat.Size() | |
| if size == 0 { | |
| return []ChunkDescriptor{}, nil | |
| } | |
| chunks := []ChunkDescriptor{} | |
| offset := int64(0) | |
| i := 0 | |
| for offset < size { | |
| end := offset + chunkSize | |
| if end > size { | |
| end = size | |
| } | |
| chunks = append(chunks, ChunkDescriptor{ | |
| i: i, | |
| start: offset, | |
| end: end, | |
| }) | |
| offset = end | |
| i++ | |
| } | |
| // Adjust chunk boundaries to align with line breaks | |
| for i := 1; i < len(chunks); i++ { | |
| file.Seek(chunks[i].start-1, io.SeekStart) | |
| reader := bufio.NewReader(file) | |
| c, err := reader.ReadByte() | |
| if err != nil { | |
| return nil, err | |
| } | |
| if c != '\n' { | |
| // Read to end of line | |
| _, err := reader.ReadBytes('\n') | |
| if err != nil && err != io.EOF { | |
| return nil, err | |
| } | |
| newPos, err := file.Seek(0, io.SeekCurrent) | |
| if err != nil { | |
| return nil, err | |
| } | |
| chunks[i-1].end = newPos | |
| chunks[i].start = newPos | |
| } | |
| } | |
| return chunks, nil | |
| } | |
| // processChunk reads a chunk of the file and sends S3 objects to the queue | |
| func processChunk(path string, chunk ChunkDescriptor, queue chan<- *S3Object) error { | |
| file, err := os.Open(path) | |
| if err != nil { | |
| return err | |
| } | |
| defer file.Close() | |
| file.Seek(chunk.start, io.SeekStart) | |
| reader := bufio.NewReader(io.LimitReader(file, chunk.end-chunk.start)) | |
| log.Printf("Processing chunk #%d (start=%d, end=%d)", chunk.i, chunk.start, chunk.end) | |
| lineCount := 0 | |
| for { | |
| line, err := reader.ReadString('\n') | |
| if err != nil && err != io.EOF { | |
| return err | |
| } | |
| if line != "" { | |
| line = strings.TrimSpace(line) | |
| if line != "" { | |
| obj, err := parseLine(line) | |
| if err != nil { | |
| log.Printf("Error parsing line: %v", err) | |
| continue | |
| } | |
| queue <- obj | |
| lineCount++ | |
| } | |
| } | |
| if err == io.EOF { | |
| break | |
| } | |
| } | |
| log.Printf("Chunk #%d processed %d lines", chunk.i, lineCount) | |
| return nil | |
| } | |
| // downloadAndProcessLog downloads a log file from S3, decompresses it, and parses entries | |
| func downloadAndProcessLog(ctx context.Context, client *s3.Client, bucket string, obj *S3Object) ([]*ALBLogEntry, error) { | |
| log.Printf("Downloading log: %s", obj.Key) | |
| result, err := client.GetObject(ctx, &s3.GetObjectInput{ | |
| Bucket: &bucket, | |
| Key: &obj.Key, | |
| }) | |
| if err != nil { | |
| return nil, fmt.Errorf("failed to get object: %w", err) | |
| } | |
| defer result.Body.Close() | |
| // Decompress gzip | |
| gzipReader, err := gzip.NewReader(result.Body) | |
| if err != nil { | |
| return nil, fmt.Errorf("failed to create gzip reader: %w", err) | |
| } | |
| defer gzipReader.Close() | |
| entries := []*ALBLogEntry{} | |
| scanner := bufio.NewScanner(gzipReader) | |
| // Increase buffer size for long lines | |
| buf := make([]byte, 0, 64*1024) | |
| scanner.Buffer(buf, 1024*1024) | |
| for scanner.Scan() { | |
| line := scanner.Text() | |
| entry, err := parseALBLogLine(line) | |
| if err != nil { | |
| log.Printf("Error parsing ALB log line: %v", err) | |
| continue | |
| } | |
| entries = append(entries, entry) | |
| } | |
| if err := scanner.Err(); err != nil { | |
| return nil, fmt.Errorf("error reading log: %w", err) | |
| } | |
| log.Printf("Log extracted: %s, entries: %d", obj.Key, len(entries)) | |
| return entries, nil | |
| } | |
| // worker processes S3 objects from the queue | |
| func worker(ctx context.Context, id int, client *s3.Client, bucket string, queue <-chan *S3Object, results chan<- *ALBLogEntry, wg *sync.WaitGroup) { | |
| defer wg.Done() | |
| for obj := range queue { | |
| entries, err := downloadAndProcessLog(ctx, client, bucket, obj) | |
| if err != nil { | |
| log.Printf("Worker %d: error processing %s: %v", id, obj.Key, err) | |
| continue | |
| } | |
| for _, entry := range entries { | |
| results <- entry | |
| } | |
| } | |
| } | |
| func main() { | |
| ctx := context.Background() | |
| // Load AWS configuration | |
| cfg, err := config.LoadDefaultConfig(ctx) | |
| if err != nil { | |
| log.Fatalf("Failed to load AWS config: %v", err) | |
| } | |
| client := s3.NewFromConfig(cfg) | |
| bucket := "*****" | |
| filePath := "/tmp/filelist.txt" | |
| chunkSize := int64(16 * 1024 * 1024) // 16MB chunks | |
| numWorkers := 10 | |
| // Read file in chunks | |
| chunks, err := readFileChunks(filePath, chunkSize) | |
| if err != nil { | |
| log.Fatalf("Failed to read file chunks: %v", err) | |
| } | |
| log.Printf("File divided into %d chunks", len(chunks)) | |
| // Create channels | |
| objectQueue := make(chan *S3Object, 1000) | |
| resultQueue := make(chan *ALBLogEntry, 1000) | |
| // Start workers | |
| var workerWg sync.WaitGroup | |
| for i := 0; i < numWorkers; i++ { | |
| workerWg.Add(1) | |
| go worker(ctx, i, client, bucket, objectQueue, resultQueue, &workerWg) | |
| } | |
| // Start result printer | |
| var printerWg sync.WaitGroup | |
| printerWg.Add(1) | |
| go func() { | |
| defer printerWg.Done() | |
| for entry := range resultQueue { | |
| fmt.Printf("%s\t%s\n", entry.Time, entry.RequestURL) | |
| } | |
| }() | |
| // Process chunks in parallel | |
| var chunkWg sync.WaitGroup | |
| for _, chunk := range chunks { | |
| chunkWg.Add(1) | |
| go func(c ChunkDescriptor) { | |
| defer chunkWg.Done() | |
| if err := processChunk(filePath, c, objectQueue); err != nil { | |
| log.Printf("Error processing chunk %d: %v", c.i, err) | |
| } | |
| }(chunk) | |
| } | |
| // Wait for all chunks to be processed | |
| chunkWg.Wait() | |
| close(objectQueue) | |
| // Wait for all workers to finish | |
| workerWg.Wait() | |
| close(resultQueue) | |
| // Wait for printer to finish | |
| printerWg.Wait() | |
| log.Println("Done") | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment