Skip to content

Instantly share code, notes, and snippets.

@RobertWHurst
Created September 24, 2025 23:28
Show Gist options
  • Select an option

  • Save RobertWHurst/c918d9d257609c774e41faf915de7cd3 to your computer and use it in GitHub Desktop.

Select an option

Save RobertWHurst/c918d9d257609c774e41faf915de7cd3 to your computer and use it in GitHub Desktop.

File Pipe

A Go package for streaming multiple files through processing pipelines. Designed for scenarios where you need to process many files sequentially without loading them entirely into memory.

Table of Contents

Overview

File Pipe provides a streaming interface for processing multiple files through a single pipeline. It's useful for scenarios like log processing, archive manipulation, and data transformation where you want to avoid loading entire file sets into memory.

The package implements a producer-consumer pattern where files flow through a pipe with their metadata (headers) and content, enabling processing operations while streaming data rather than loading it all at once.

Features

  • Memory Efficient: Stream files without loading complete contents into memory
  • Pipeline Operations: Support for tee, transform, scan, and selective processing
  • Tar Integration: Convert between tar archives and file pipes
  • Concurrent Safe: Safe for single reader/writer pairs
  • Error Propagation: Error handling throughout pipeline stages
  • Flexible Processing: Skip files, transform content, or selectively process based on criteria

Installation

go get github.com/telemetrytv/Go-Shared/file-pipe

Quick Start

File Pipe uses a simple producer-consumer model where one goroutine writes files to the pipe while another reads and processes them.

package main

import (
    "fmt"
    "io"
    "log"

    "github.com/telemetrytv/Go-Shared/file-pipe"
)

func main() {
    // Create a new file pipe
    fp, fpw := filepipe.New()

    // Writer goroutine
    go func() {
        defer fpw.Close()

        // Write first file
        fpw.WriteHeader(&filepipe.FileHeader{
            Name: "hello.txt",
            Size: 13,
        })
        fpw.Write([]byte("Hello, World!"))

        // Write second file
        fpw.WriteHeader(&filepipe.FileHeader{
            Name: "data.txt",
            Size: 8,
        })
        fpw.Write([]byte("Some data"))
    }()

    // Process files
    for {
        header, err := fp.Next()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }

        fmt.Printf("Processing: %s\n", header.Name)
        content, _ := io.ReadAll(fp)
        fmt.Printf("Content: %s\n", string(content))
    }
}

Core Concepts

FilePipe

The main streaming interface that implements io.ReadCloser. FilePipe provides sequential access to files with methods to read headers and content. It supports advanced operations like tee, transform, and scan.

FilePipeWriter

The writing interface that implements io.WriteCloser. Used to add files to the pipe by writing headers followed by content. Each file must start with a header before any content can be written.

FileHeader

A structure containing file metadata including name, size, permissions, and modification time. Headers can be manually constructed or created from os.FileInfo using the NewFileHeader constructor.

ScanReader

A specialized reader returned by scan operations that provides both the matched file's header (via Header() method) and implements io.Reader for content access.

Basic Usage

Creating a Pipe

Create a new file pipe pair with separate reader and writer interfaces. The typical pattern uses goroutines for concurrent operation.

// Create a new file pipe
fp, fpw := filepipe.New()

// Use in separate goroutines for concurrent operation
go func() {
    defer fpw.Close()
    // Write files...
}()

// Read files in main goroutine
for {
    header, err := fp.Next()
    if err == io.EOF {
        break
    }
    // Process files...
}

Writing Files

Add files to the pipe by calling WriteHeader() followed by one or more Write() calls. Files are automatically completed when the next header is written or the writer is closed.

Important: Always close the writer when done writing files. Readers will block indefinitely waiting for more data if the writer is not closed.

// Write a single file
err := fpw.WriteHeader(&filepipe.FileHeader{
    Name: "example.txt",
    Size: 25,
})
if err != nil {
    return err
}
fpw.Write([]byte("This is the file content"))

// Write a file in multiple chunks
fpw.WriteHeader(&filepipe.FileHeader{Name: "large.txt"})
fpw.Write([]byte("First chunk"))
fpw.Write([]byte("Second chunk"))
fpw.Write([]byte("Third chunk"))

// File automatically completes when next header is written
fpw.WriteHeader(&filepipe.FileHeader{Name: "next.txt"})

Reading Files

Process files by calling Next() to get each file's header, then using standard io.Reader methods to read content. Files can be skipped by calling Next() again without reading content.

for {
    header, err := fp.Next()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }

    fmt.Printf("File: %s (%d bytes)\n", header.Name, header.Size)

    // Read entire file
    content, err := io.ReadAll(fp)
    if err != nil {
        return err
    }
    process(content)
}

// Skip files based on criteria
for {
    header, err := fp.Next()
    if err == io.EOF {
        break
    }

    // Skip non-text files
    if !strings.HasSuffix(header.Name, ".txt") {
        continue // Automatically skips to next file
    }

    // Process only text files
    content, _ := io.ReadAll(fp)
    processTextFile(content)
}

Advanced Operations

Tee - Splitting Streams

Split a single file stream into two identical streams that can be processed independently and concurrently. Both streams receive the same files with the same content.

// Split stream into two identical pipes
fpA, fpB := fp.Tee()

// Process stream A (e.g., save to disk)
go func() {
    for {
        header, err := fpA.Next()
        if err == io.EOF {
            break
        }

        file, _ := os.Create("backup/" + header.Name)
        io.Copy(file, fpA)
        file.Close()
    }
}()

// Process stream B (e.g., analyze content)
go func() {
    for {
        header, err := fpB.Next()
        if err == io.EOF {
            break
        }

        content, _ := io.ReadAll(fpB)
        analyzeFile(header.Name, content)
    }
}()

Transform - File Processing

Apply transformation functions to all files in the stream. Transform functions can produce zero, one, or multiple output files for each input, enabling filtering, modification, and expansion operations.

// Transform all files to uppercase
transformed := fp.Transform(func(header *filepipe.FileHeader, reader io.Reader, emit func(*filepipe.FileHeader, io.Reader)) error {
    // Read original content
    content, err := io.ReadAll(reader)
    if err != nil {
        return err
    }

    // Transform content
    upperContent := strings.ToUpper(string(content))

    // Create new header
    newHeader := &filepipe.FileHeader{
        Name:    "upper_" + header.Name,
        Size:    int64(len(upperContent)),
        Mode:    header.Mode,
        ModTime: time.Now(),
    }

    // Emit transformed file
    emit(newHeader, strings.NewReader(upperContent))
    return nil
})

// Filter files (only emit .txt files)
filtered := fp.Transform(func(header *filepipe.FileHeader, reader io.Reader, emit func(*filepipe.FileHeader, io.Reader)) error {
    if strings.HasSuffix(header.Name, ".txt") {
        emit(header, reader) // Pass through unchanged
    }
    // Other files are filtered out (not emitted)
    return nil
})

Scan - Finding Files

Search through the stream to find the first file matching a predicate function. Returns a ScanReader for the matched file or io.EOF if no matches are found.

// Find first log file
logReader, err := fp.Scan(func(header *filepipe.FileHeader) bool {
    return strings.HasSuffix(header.Name, ".log")
})

if err == io.EOF {
    fmt.Println("No log files found")
    return
}
if err != nil {
    return err
}

// Access matched file info
fmt.Printf("Found log: %s\n", logReader.Header().Name)

// Read the log content
logData, err := io.ReadAll(logReader)
if err != nil {
    return err
}

processLogFile(logData)

// Find file by size
largeFile, err := fp.Scan(func(header *filepipe.FileHeader) bool {
    return header.Size > 1024*1024 // Files larger than 1MB
})

ScanAndTransform - Selective Processing

Combine scanning and transformation to selectively process only files matching specific criteria while passing through other files unchanged.

// Process only .txt files, pass everything else through
processed := fp.ScanAndTransform(
    // Scan: which files to transform
    func(header *filepipe.FileHeader) bool {
        return strings.HasSuffix(header.Name, ".txt")
    },
    // Transform: how to process matching files
    func(header *filepipe.FileHeader, reader io.Reader, emit func(*filepipe.FileHeader, io.Reader)) error {
        content, err := io.ReadAll(reader)
        if err != nil {
            return err
        }

        // Add timestamp prefix to text files
        processed := fmt.Sprintf("[%s] %s", time.Now().Format("2006-01-02"), string(content))

        newHeader := &filepipe.FileHeader{
            Name:    header.Name,
            Size:    int64(len(processed)),
            Mode:    header.Mode,
            ModTime: time.Now(),
        }

        emit(newHeader, strings.NewReader(processed))
        return nil
    },
)

// All files (including non-.txt) are available in 'processed'
for {
    header, err := processed.Next()
    if err == io.EOF {
        break
    }
    // .txt files are transformed, others passed through unchanged
    content, _ := io.ReadAll(processed)
    saveFile(header.Name, content)
}

Integrations

Tar Archive Support

Seamlessly convert between tar archives and file pipes using FromTar() and IntoTar() functions. This enables processing tar contents as streams or creating tar archives from file pipe output.

// Read from tar archive
tarFile, err := os.Open("archive.tar")
if err != nil {
    return err
}
defer tarFile.Close()

fp := filepipe.FromTar(tarFile)

// Process files from tar
for {
    header, err := fp.Next()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }

    fmt.Printf("Extracting: %s\n", header.Name)
    content, _ := io.ReadAll(fp)
    processFile(header.Name, content)
}

// Create tar from file pipe
tarReader := fp.IntoTar()

// Write to new tar file
outputFile, _ := os.Create("output.tar")
defer outputFile.Close()

io.Copy(outputFile, tarReader)

// Chain operations: read tar, transform, write new tar
tarFile, _ := os.Open("input.tar")
fp := filepipe.FromTar(tarFile)

transformed := fp.Transform(func(header *filepipe.FileHeader, reader io.Reader, emit func(*filepipe.FileHeader, io.Reader)) error {
    // Transform logic...
    return nil
})

newTarReader := transformed.IntoTar()
outputFile, _ := os.Create("transformed.tar")
io.Copy(outputFile, newTarReader)

Error Handling

Basic Error Handling

Standard Go error patterns apply with io.EOF indicating end of stream. Most operations return errors that should be checked and handled appropriately.

for {
    header, err := fp.Next()
    if err == io.EOF {
        break // Normal end of stream
    }
    if err != nil {
        log.Printf("Error reading header: %v", err)
        return err
    }

    content, err := io.ReadAll(fp)
    if err != nil {
        log.Printf("Error reading content for %s: %v", header.Name, err)
        return err
    }

    if err := processFile(header.Name, content); err != nil {
        log.Printf("Error processing %s: %v", header.Name, err)
        // Continue with next file or return based on requirements
        continue
    }
}

Advanced Error Propagation

Errors can be propagated through pipeline stages using the error propagation system. Reader errors affect writers and vice versa, ensuring pipeline-wide error handling.

CloseWithError Pattern

Both reader and writer support CloseWithError() to close the pipe with a specific error. This error will be propagated to the other side of the pipe and through any connected pipeline operations.

// Writer-side error handling
go func() {
    defer fpw.Close()

    for _, filename := range files {
        file, err := os.Open(filename)
        if err != nil {
            // Propagate error to readers
            fpw.CloseWithError(fmt.Errorf("failed to open %s: %w", filename, err))
            return
        }

        fileInfo, _ := file.Stat()
        header := filepipe.NewFileHeader(fileInfo)

        if err := fpw.WriteHeader(header); err != nil {
            fpw.CloseWithError(fmt.Errorf("failed to write header for %s: %w", filename, err))
            return
        }

        if _, err := io.Copy(fpw, file); err != nil {
            fpw.CloseWithError(fmt.Errorf("failed to copy %s: %w", filename, err))
            return
        }
        file.Close()
    }
}()

// Reader-side error handling
for {
    header, err := fp.Next()
    if err == io.EOF {
        break
    }
    if err != nil {
        // Error was propagated from writer
        log.Printf("Pipeline error: %v", err)
        return err
    }

    if criticalError := validateFile(header); criticalError != nil {
        // Propagate error back to writer
        fp.CloseWithError(fmt.Errorf("validation failed for %s: %w", header.Name, criticalError))
        return criticalError
    }

    // Continue processing...
}

Best Practices

Performance Optimization

Use appropriate buffer sizes when copying large files. Consider the trade-off between memory usage and I/O efficiency based on your specific use case.

Memory Management

Files are streamed rather than loaded entirely into memory. The package handles partial reads automatically, so no manual buffering is required.

Concurrency Patterns

Pipeline operations coordinate automatically between stages. Tee operations create independent streams that can be safely processed concurrently without synchronization.

API Reference

Types

  • FilePipe: Main streaming pipe implementing io.ReadCloser
  • FilePipeWriter: Writer interface implementing io.WriteCloser
  • FileHeader: File metadata structure
  • ScanReader: Scan result with header access and io.Reader interface

Functions

  • New(): Create new pipe pair
  • FromTar(): Create pipe from tar stream
  • NewFileHeader(): Create header from os.FileInfo

Methods

  • Next(): Get next file header
  • WriteHeader(): Start writing new file
  • Tee(): Split into two streams
  • Transform(): Transform all files
  • Scan(): Find first matching file
  • ScanAndTransform(): Transform matching files
  • IntoTar(): Convert pipe to tar stream
  • CloseWithError(): Close with error propagation

Troubleshooting

Deadlocks: Ensure reader and writer are running in separate goroutines. Blocking operations require the other side to be actively processing.

Memory Issues: If memory usage is high, check that files are being fully read or properly skipped. Partial reads can cause data to accumulate.

Error Propagation: Errors may originate from any stage in the pipeline. Check the entire pipeline for error sources, not just the immediate operation.

Tar Compatibility: When working with tar files, ensure proper header initialization, especially for file sizes and types.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment