A Go package for streaming multiple files through processing pipelines. Designed for scenarios where you need to process many files sequentially without loading them entirely into memory.
- Overview
- Features
- Installation
- Quick Start
- Core Concepts
- Basic Usage
- Advanced Operations
- Integrations
- Error Handling
- Best Practices
- API Reference
- Troubleshooting
File Pipe provides a streaming interface for processing multiple files through a single pipeline. It's useful for scenarios like log processing, archive manipulation, and data transformation where you want to avoid loading entire file sets into memory.
The package implements a producer-consumer pattern where files flow through a pipe with their metadata (headers) and content, enabling processing operations while streaming data rather than loading it all at once.
- Memory Efficient: Stream files without loading complete contents into memory
- Pipeline Operations: Support for tee, transform, scan, and selective processing
- Tar Integration: Convert between tar archives and file pipes
- Concurrent Safe: Safe for single reader/writer pairs
- Error Propagation: Error handling throughout pipeline stages
- Flexible Processing: Skip files, transform content, or selectively process based on criteria
go get github.com/telemetrytv/Go-Shared/file-pipeFile Pipe uses a simple producer-consumer model where one goroutine writes files to the pipe while another reads and processes them.
package main
import (
"fmt"
"io"
"log"
"github.com/telemetrytv/Go-Shared/file-pipe"
)
func main() {
// Create a new file pipe
fp, fpw := filepipe.New()
// Writer goroutine
go func() {
defer fpw.Close()
// Write first file
fpw.WriteHeader(&filepipe.FileHeader{
Name: "hello.txt",
Size: 13,
})
fpw.Write([]byte("Hello, World!"))
// Write second file
fpw.WriteHeader(&filepipe.FileHeader{
Name: "data.txt",
Size: 8,
})
fpw.Write([]byte("Some data"))
}()
// Process files
for {
header, err := fp.Next()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("Processing: %s\n", header.Name)
content, _ := io.ReadAll(fp)
fmt.Printf("Content: %s\n", string(content))
}
}The main streaming interface that implements io.ReadCloser. FilePipe provides sequential access to files with methods to read headers and content. It supports advanced operations like tee, transform, and scan.
The writing interface that implements io.WriteCloser. Used to add files to the pipe by writing headers followed by content. Each file must start with a header before any content can be written.
A structure containing file metadata including name, size, permissions, and modification time. Headers can be manually constructed or created from os.FileInfo using the NewFileHeader constructor.
A specialized reader returned by scan operations that provides both the matched file's header (via Header() method) and implements io.Reader for content access.
Create a new file pipe pair with separate reader and writer interfaces. The typical pattern uses goroutines for concurrent operation.
// Create a new file pipe
fp, fpw := filepipe.New()
// Use in separate goroutines for concurrent operation
go func() {
defer fpw.Close()
// Write files...
}()
// Read files in main goroutine
for {
header, err := fp.Next()
if err == io.EOF {
break
}
// Process files...
}Add files to the pipe by calling WriteHeader() followed by one or more Write() calls. Files are automatically completed when the next header is written or the writer is closed.
Important: Always close the writer when done writing files. Readers will block indefinitely waiting for more data if the writer is not closed.
// Write a single file
err := fpw.WriteHeader(&filepipe.FileHeader{
Name: "example.txt",
Size: 25,
})
if err != nil {
return err
}
fpw.Write([]byte("This is the file content"))
// Write a file in multiple chunks
fpw.WriteHeader(&filepipe.FileHeader{Name: "large.txt"})
fpw.Write([]byte("First chunk"))
fpw.Write([]byte("Second chunk"))
fpw.Write([]byte("Third chunk"))
// File automatically completes when next header is written
fpw.WriteHeader(&filepipe.FileHeader{Name: "next.txt"})Process files by calling Next() to get each file's header, then using standard io.Reader methods to read content. Files can be skipped by calling Next() again without reading content.
for {
header, err := fp.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Printf("File: %s (%d bytes)\n", header.Name, header.Size)
// Read entire file
content, err := io.ReadAll(fp)
if err != nil {
return err
}
process(content)
}
// Skip files based on criteria
for {
header, err := fp.Next()
if err == io.EOF {
break
}
// Skip non-text files
if !strings.HasSuffix(header.Name, ".txt") {
continue // Automatically skips to next file
}
// Process only text files
content, _ := io.ReadAll(fp)
processTextFile(content)
}Split a single file stream into two identical streams that can be processed independently and concurrently. Both streams receive the same files with the same content.
// Split stream into two identical pipes
fpA, fpB := fp.Tee()
// Process stream A (e.g., save to disk)
go func() {
for {
header, err := fpA.Next()
if err == io.EOF {
break
}
file, _ := os.Create("backup/" + header.Name)
io.Copy(file, fpA)
file.Close()
}
}()
// Process stream B (e.g., analyze content)
go func() {
for {
header, err := fpB.Next()
if err == io.EOF {
break
}
content, _ := io.ReadAll(fpB)
analyzeFile(header.Name, content)
}
}()Apply transformation functions to all files in the stream. Transform functions can produce zero, one, or multiple output files for each input, enabling filtering, modification, and expansion operations.
// Transform all files to uppercase
transformed := fp.Transform(func(header *filepipe.FileHeader, reader io.Reader, emit func(*filepipe.FileHeader, io.Reader)) error {
// Read original content
content, err := io.ReadAll(reader)
if err != nil {
return err
}
// Transform content
upperContent := strings.ToUpper(string(content))
// Create new header
newHeader := &filepipe.FileHeader{
Name: "upper_" + header.Name,
Size: int64(len(upperContent)),
Mode: header.Mode,
ModTime: time.Now(),
}
// Emit transformed file
emit(newHeader, strings.NewReader(upperContent))
return nil
})
// Filter files (only emit .txt files)
filtered := fp.Transform(func(header *filepipe.FileHeader, reader io.Reader, emit func(*filepipe.FileHeader, io.Reader)) error {
if strings.HasSuffix(header.Name, ".txt") {
emit(header, reader) // Pass through unchanged
}
// Other files are filtered out (not emitted)
return nil
})Search through the stream to find the first file matching a predicate function. Returns a ScanReader for the matched file or io.EOF if no matches are found.
// Find first log file
logReader, err := fp.Scan(func(header *filepipe.FileHeader) bool {
return strings.HasSuffix(header.Name, ".log")
})
if err == io.EOF {
fmt.Println("No log files found")
return
}
if err != nil {
return err
}
// Access matched file info
fmt.Printf("Found log: %s\n", logReader.Header().Name)
// Read the log content
logData, err := io.ReadAll(logReader)
if err != nil {
return err
}
processLogFile(logData)
// Find file by size
largeFile, err := fp.Scan(func(header *filepipe.FileHeader) bool {
return header.Size > 1024*1024 // Files larger than 1MB
})Combine scanning and transformation to selectively process only files matching specific criteria while passing through other files unchanged.
// Process only .txt files, pass everything else through
processed := fp.ScanAndTransform(
// Scan: which files to transform
func(header *filepipe.FileHeader) bool {
return strings.HasSuffix(header.Name, ".txt")
},
// Transform: how to process matching files
func(header *filepipe.FileHeader, reader io.Reader, emit func(*filepipe.FileHeader, io.Reader)) error {
content, err := io.ReadAll(reader)
if err != nil {
return err
}
// Add timestamp prefix to text files
processed := fmt.Sprintf("[%s] %s", time.Now().Format("2006-01-02"), string(content))
newHeader := &filepipe.FileHeader{
Name: header.Name,
Size: int64(len(processed)),
Mode: header.Mode,
ModTime: time.Now(),
}
emit(newHeader, strings.NewReader(processed))
return nil
},
)
// All files (including non-.txt) are available in 'processed'
for {
header, err := processed.Next()
if err == io.EOF {
break
}
// .txt files are transformed, others passed through unchanged
content, _ := io.ReadAll(processed)
saveFile(header.Name, content)
}Seamlessly convert between tar archives and file pipes using FromTar() and IntoTar() functions. This enables processing tar contents as streams or creating tar archives from file pipe output.
// Read from tar archive
tarFile, err := os.Open("archive.tar")
if err != nil {
return err
}
defer tarFile.Close()
fp := filepipe.FromTar(tarFile)
// Process files from tar
for {
header, err := fp.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
fmt.Printf("Extracting: %s\n", header.Name)
content, _ := io.ReadAll(fp)
processFile(header.Name, content)
}
// Create tar from file pipe
tarReader := fp.IntoTar()
// Write to new tar file
outputFile, _ := os.Create("output.tar")
defer outputFile.Close()
io.Copy(outputFile, tarReader)
// Chain operations: read tar, transform, write new tar
tarFile, _ := os.Open("input.tar")
fp := filepipe.FromTar(tarFile)
transformed := fp.Transform(func(header *filepipe.FileHeader, reader io.Reader, emit func(*filepipe.FileHeader, io.Reader)) error {
// Transform logic...
return nil
})
newTarReader := transformed.IntoTar()
outputFile, _ := os.Create("transformed.tar")
io.Copy(outputFile, newTarReader)Standard Go error patterns apply with io.EOF indicating end of stream. Most operations return errors that should be checked and handled appropriately.
for {
header, err := fp.Next()
if err == io.EOF {
break // Normal end of stream
}
if err != nil {
log.Printf("Error reading header: %v", err)
return err
}
content, err := io.ReadAll(fp)
if err != nil {
log.Printf("Error reading content for %s: %v", header.Name, err)
return err
}
if err := processFile(header.Name, content); err != nil {
log.Printf("Error processing %s: %v", header.Name, err)
// Continue with next file or return based on requirements
continue
}
}Errors can be propagated through pipeline stages using the error propagation system. Reader errors affect writers and vice versa, ensuring pipeline-wide error handling.
Both reader and writer support CloseWithError() to close the pipe with a specific error. This error will be propagated to the other side of the pipe and through any connected pipeline operations.
// Writer-side error handling
go func() {
defer fpw.Close()
for _, filename := range files {
file, err := os.Open(filename)
if err != nil {
// Propagate error to readers
fpw.CloseWithError(fmt.Errorf("failed to open %s: %w", filename, err))
return
}
fileInfo, _ := file.Stat()
header := filepipe.NewFileHeader(fileInfo)
if err := fpw.WriteHeader(header); err != nil {
fpw.CloseWithError(fmt.Errorf("failed to write header for %s: %w", filename, err))
return
}
if _, err := io.Copy(fpw, file); err != nil {
fpw.CloseWithError(fmt.Errorf("failed to copy %s: %w", filename, err))
return
}
file.Close()
}
}()
// Reader-side error handling
for {
header, err := fp.Next()
if err == io.EOF {
break
}
if err != nil {
// Error was propagated from writer
log.Printf("Pipeline error: %v", err)
return err
}
if criticalError := validateFile(header); criticalError != nil {
// Propagate error back to writer
fp.CloseWithError(fmt.Errorf("validation failed for %s: %w", header.Name, criticalError))
return criticalError
}
// Continue processing...
}Use appropriate buffer sizes when copying large files. Consider the trade-off between memory usage and I/O efficiency based on your specific use case.
Files are streamed rather than loaded entirely into memory. The package handles partial reads automatically, so no manual buffering is required.
Pipeline operations coordinate automatically between stages. Tee operations create independent streams that can be safely processed concurrently without synchronization.
FilePipe: Main streaming pipe implementingio.ReadCloserFilePipeWriter: Writer interface implementingio.WriteCloserFileHeader: File metadata structureScanReader: Scan result with header access andio.Readerinterface
New(): Create new pipe pairFromTar(): Create pipe from tar streamNewFileHeader(): Create header fromos.FileInfo
Next(): Get next file headerWriteHeader(): Start writing new fileTee(): Split into two streamsTransform(): Transform all filesScan(): Find first matching fileScanAndTransform(): Transform matching filesIntoTar(): Convert pipe to tar streamCloseWithError(): Close with error propagation
Deadlocks: Ensure reader and writer are running in separate goroutines. Blocking operations require the other side to be actively processing.
Memory Issues: If memory usage is high, check that files are being fully read or properly skipped. Partial reads can cause data to accumulate.
Error Propagation: Errors may originate from any stage in the pipeline. Check the entire pipeline for error sources, not just the immediate operation.
Tar Compatibility: When working with tar files, ensure proper header initialization, especially for file sizes and types.