Skip to content

Instantly share code, notes, and snippets.

@klinkby
Created August 19, 2025 09:27
Show Gist options
  • Select an option

  • Save klinkby/5a7fd479cb76bfef388047f1359a5c76 to your computer and use it in GitHub Desktop.

Select an option

Save klinkby/5a7fd479cb76bfef388047f1359a5c76 to your computer and use it in GitHub Desktop.
Generate zip stream from async pipeline
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System.IO.Compression;
using System.IO.Pipelines;
public static class StreamAdapters
{
/// <summary>
/// Creates a readable stream that generates a ZIP archive on the fly from a list of source streams,
/// without buffering the entire archive.
/// </summary>
/// <param name="zipPullEntries">Async enumerable that provides source files for the ZIP archive.</param>
/// <param name="compressionLevel">The compression level to use.</param>
/// <param name="leaveOpen">Whether to leave source streams open after processing.</param>
/// <param name="logger">Log progress and status from the pump.</param>
/// <param name="cancellation">Token to cancel the operation.</param>
/// <returns>A readable Stream that generates ZIP content on-demand.</returns>
/// <exception cref="ArgumentException">Thrown when the filename is invalid or the stream is not readable.</exception>
/// <exception cref="ArgumentNullException">Thrown when sourceFiles or a stream is null.</exception>
/// <exception cref="TaskCanceledException">Thrown when cancellation was signaled.</exception>
public static Stream CreateZipPullStream(
IAsyncEnumerable<ZipPullEntry> zipPullEntries,
CompressionLevel compressionLevel = CompressionLevel.Optimal,
bool leaveOpen = false,
ILogger? logger = null,
CancellationToken cancellation = default)
{
ArgumentNullException.ThrowIfNull(zipPullEntries);
Pipe pipe = new();
// Start a background task to write to the pipe.
// We don't await this task; it runs in the background.
// The consumer reading from pipe.Reader will pull data from this task.
_ = Task.Run(() =>
CreateZipArchiveAsync(
pipe.Writer,
zipPullEntries,
compressionLevel,
leaveOpen,
logger ?? NullLogger.Instance,
cancellation),
cancellation);
// Immediately return the reading end of the pipe as a Stream.
return pipe.Reader.AsStream();
}
async private static Task CreateZipArchiveAsync(
PipeWriter writer,
IAsyncEnumerable<ZipPullEntry> zipPullEntries,
CompressionLevel compressionLevel,
bool leaveOpen,
ILogger logger,
CancellationToken cancellation)
{
try
{
// Create a ZipArchive that writes to the pipe.
logger.LogInformation("Start zip pull stream");
using (ZipArchive archive = new(writer.AsStream(), ZipArchiveMode.Create, true))
{
await foreach (ZipPullEntry sourceFile in zipPullEntries.WithCancellation(cancellation))
{
logger.LogInformation("Pull {Filename}", sourceFile.FileName);
await sourceFile.CopyToZipAsync(archive, compressionLevel, leaveOpen, cancellation);
}
// Important to dispose of the ZipArchive as it flushes the buffer
}
// Signal to the reader that we are done writing.
logger.LogInformation("Complete zip pull stream");
await writer.CompleteAsync();
}
catch (OperationCanceledException ex)
{
logger.LogWarning(ex, "Zip pull stream was cancelled");
await writer.CompleteAsync(ex);
}
catch (Exception ex)
{
logger.LogWarning(ex, "Zip pull failed");
// Signal that an error occurred. This will throw an exception
// on the reader's side when they try to read.
await writer.CompleteAsync(ex);
}
}
async private static Task CopyToZipAsync(this ZipPullEntry zipPullEntry, ZipArchive archive,
CompressionLevel compressionLevel, bool leaveOpen, CancellationToken cancellation)
{
Stream dataSource = await zipPullEntry.DataSource;
ArgumentNullException.ThrowIfNull(dataSource);
try
{
cancellation.ThrowIfCancellationRequested();
if (!dataSource.CanRead) throw new ArgumentException("Data source must be readable", nameof(zipPullEntry));
ZipArchiveEntry zipEntry = archive.CreateEntry(zipPullEntry.FileName, compressionLevel);
await using Stream entryStream = zipEntry.Open();
// As we copy, data is pushed into the pipe.
await dataSource.CopyToAsync(entryStream, cancellation);
}
finally
{
if (!leaveOpen)
{
await dataSource.DisposeAsync();
}
}
}
}
public record struct ZipPullEntry(string FileName, ValueTask<Stream> DataSource)
{
public static ZipPullEntry FromBuffer(string fileName, byte[] dataSourceBuffer)
=> FromStream(fileName, new MemoryStream(dataSourceBuffer));
public static ZipPullEntry FromStream(string fileName, Stream dataSourceStream)
=> new(fileName, new ValueTask<Stream>(dataSourceStream));
public static ZipPullEntry FromTask(string fileName, Task<Stream> dataSourceStreamTask)
=> new(fileName, new ValueTask<Stream>(dataSourceStreamTask));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment