Combine has a .values property that turns publishers into AsyncSequence. unfortunately, it has a race condition bug that can cause values to be dropped.
the following implementation uses async streams as a replacement.
| import Foundation | |
| import Combine | |
| extension AnyCancellable: @retroactive @unchecked Sendable {} | |
| extension CurrentValueSubject: @retroactive @unchecked Sendable {} | |
| public extension Publisher where Failure == Never, Output: Sendable { | |
| var stream: AsyncStream<Output> { | |
| AsyncStream { continuation in | |
| let cancellable = self.sink { completion in | |
| continuation.finish() | |
| } receiveValue: { value in | |
| continuation.yield(value) | |
| } | |
| continuation.onTermination = { continuation in | |
| cancellable.cancel() | |
| } | |
| } | |
| } | |
| } | |
| public extension Publisher where Failure: Error, Output: Sendable { | |
| var stream: AsyncThrowingStream<Output, Error> { | |
| AsyncThrowingStream<Output, Error> { continuation in | |
| let cancellable = self.sink { completion in | |
| switch completion { | |
| case .finished: | |
| continuation.finish() | |
| case .failure(let error): | |
| continuation.finish(throwing: error) | |
| } | |
| } receiveValue: { value in | |
| continuation.yield(value) | |
| } | |
| continuation.onTermination = { continuation in | |
| cancellable.cancel() | |
| } | |
| } | |
| } | |
| } |