Skip to content

Instantly share code, notes, and snippets.

@jdanthinne
Created November 17, 2025 15:44
Show Gist options
  • Select an option

  • Save jdanthinne/f45eb390102b03e0244a6519e9b4382c to your computer and use it in GitHub Desktop.

Select an option

Save jdanthinne/f45eb390102b03e0244a6519e9b4382c to your computer and use it in GitHub Desktop.
Extension for AsyncSequence to launch an operation when stream starts
import Foundation
package extension AsyncSequence where Element: Sendable, Self: Sendable {
func onSubscribed(perform: @Sendable @escaping () async throws -> Void) -> AsyncThrowingStream<Element, Error> {
.init { continuation in
let task = Task {
do {
// Use a task group to run both operations concurrently
try await withThrowingTaskGroup(of: Void.self) { group in
// Launch the iteration task
group.addTask {
for try await element in self {
guard !Task.isCancelled else { return }
continuation.yield(element)
}
}
// Launch the perform block
group.addTask {
try await perform()
}
// Wait for both tasks to complete
try await group.waitForAll()
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
continuation.onTermination = { _ in task.cancel() }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment