Created
August 19, 2025 09:27
-
-
Save klinkby/5a7fd479cb76bfef388047f1359a5c76 to your computer and use it in GitHub Desktop.
Generate zip stream from async pipeline
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Microsoft.Extensions.Logging; | |
| using Microsoft.Extensions.Logging.Abstractions; | |
| using System.IO.Compression; | |
| using System.IO.Pipelines; | |
| public static class StreamAdapters | |
| { | |
| /// <summary> | |
| /// Creates a readable stream that generates a ZIP archive on the fly from a list of source streams, | |
| /// without buffering the entire archive. | |
| /// </summary> | |
| /// <param name="zipPullEntries">Async enumerable that provides source files for the ZIP archive.</param> | |
| /// <param name="compressionLevel">The compression level to use.</param> | |
| /// <param name="leaveOpen">Whether to leave source streams open after processing.</param> | |
| /// <param name="logger">Log progress and status from the pump.</param> | |
| /// <param name="cancellation">Token to cancel the operation.</param> | |
| /// <returns>A readable Stream that generates ZIP content on-demand.</returns> | |
| /// <exception cref="ArgumentException">Thrown when the filename is invalid or the stream is not readable.</exception> | |
| /// <exception cref="ArgumentNullException">Thrown when sourceFiles or a stream is null.</exception> | |
| /// <exception cref="TaskCanceledException">Thrown when cancellation was signaled.</exception> | |
| public static Stream CreateZipPullStream( | |
| IAsyncEnumerable<ZipPullEntry> zipPullEntries, | |
| CompressionLevel compressionLevel = CompressionLevel.Optimal, | |
| bool leaveOpen = false, | |
| ILogger? logger = null, | |
| CancellationToken cancellation = default) | |
| { | |
| ArgumentNullException.ThrowIfNull(zipPullEntries); | |
| Pipe pipe = new(); | |
| // Start a background task to write to the pipe. | |
| // We don't await this task; it runs in the background. | |
| // The consumer reading from pipe.Reader will pull data from this task. | |
| _ = Task.Run(() => | |
| CreateZipArchiveAsync( | |
| pipe.Writer, | |
| zipPullEntries, | |
| compressionLevel, | |
| leaveOpen, | |
| logger ?? NullLogger.Instance, | |
| cancellation), | |
| cancellation); | |
| // Immediately return the reading end of the pipe as a Stream. | |
| return pipe.Reader.AsStream(); | |
| } | |
| async private static Task CreateZipArchiveAsync( | |
| PipeWriter writer, | |
| IAsyncEnumerable<ZipPullEntry> zipPullEntries, | |
| CompressionLevel compressionLevel, | |
| bool leaveOpen, | |
| ILogger logger, | |
| CancellationToken cancellation) | |
| { | |
| try | |
| { | |
| // Create a ZipArchive that writes to the pipe. | |
| logger.LogInformation("Start zip pull stream"); | |
| using (ZipArchive archive = new(writer.AsStream(), ZipArchiveMode.Create, true)) | |
| { | |
| await foreach (ZipPullEntry sourceFile in zipPullEntries.WithCancellation(cancellation)) | |
| { | |
| logger.LogInformation("Pull {Filename}", sourceFile.FileName); | |
| await sourceFile.CopyToZipAsync(archive, compressionLevel, leaveOpen, cancellation); | |
| } | |
| // Important to dispose of the ZipArchive as it flushes the buffer | |
| } | |
| // Signal to the reader that we are done writing. | |
| logger.LogInformation("Complete zip pull stream"); | |
| await writer.CompleteAsync(); | |
| } | |
| catch (OperationCanceledException ex) | |
| { | |
| logger.LogWarning(ex, "Zip pull stream was cancelled"); | |
| await writer.CompleteAsync(ex); | |
| } | |
| catch (Exception ex) | |
| { | |
| logger.LogWarning(ex, "Zip pull failed"); | |
| // Signal that an error occurred. This will throw an exception | |
| // on the reader's side when they try to read. | |
| await writer.CompleteAsync(ex); | |
| } | |
| } | |
| async private static Task CopyToZipAsync(this ZipPullEntry zipPullEntry, ZipArchive archive, | |
| CompressionLevel compressionLevel, bool leaveOpen, CancellationToken cancellation) | |
| { | |
| Stream dataSource = await zipPullEntry.DataSource; | |
| ArgumentNullException.ThrowIfNull(dataSource); | |
| try | |
| { | |
| cancellation.ThrowIfCancellationRequested(); | |
| if (!dataSource.CanRead) throw new ArgumentException("Data source must be readable", nameof(zipPullEntry)); | |
| ZipArchiveEntry zipEntry = archive.CreateEntry(zipPullEntry.FileName, compressionLevel); | |
| await using Stream entryStream = zipEntry.Open(); | |
| // As we copy, data is pushed into the pipe. | |
| await dataSource.CopyToAsync(entryStream, cancellation); | |
| } | |
| finally | |
| { | |
| if (!leaveOpen) | |
| { | |
| await dataSource.DisposeAsync(); | |
| } | |
| } | |
| } | |
| } | |
| public record struct ZipPullEntry(string FileName, ValueTask<Stream> DataSource) | |
| { | |
| public static ZipPullEntry FromBuffer(string fileName, byte[] dataSourceBuffer) | |
| => FromStream(fileName, new MemoryStream(dataSourceBuffer)); | |
| public static ZipPullEntry FromStream(string fileName, Stream dataSourceStream) | |
| => new(fileName, new ValueTask<Stream>(dataSourceStream)); | |
| public static ZipPullEntry FromTask(string fileName, Task<Stream> dataSourceStreamTask) | |
| => new(fileName, new ValueTask<Stream>(dataSourceStreamTask)); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment