Last active
September 22, 2025 10:11
-
-
Save NikolaiRuhe/de31b999b8c8c58c65e4c2e8724dda74 to your computer and use it in GitHub Desktop.
A property wrapper that creates an AsyncStream yielding values when the property is set.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| /// A property wrapper that turns a stored property into an async stream. | |
| /// | |
| /// Semantics are similar to Combine's `CurrentValueSubject`. Multiple clients | |
| /// are allowed. Each client gets an individual stream. | |
| /// | |
| /// The wrapped value itself inherits isolation from the surrounding type. | |
| /// The stream is nonisolated, so it can be accessed from any execution context. | |
| /// | |
| /// Usage: | |
| /// ``` | |
| /// struct Bike: Sendable { ... } | |
| /// | |
| /// @MainActor class MyModel { | |
| /// @Streaming | |
| /// var bike: Bike | |
| /// | |
| /// func someInternalProcessing() { | |
| /// bike.speed = 42 // This will trigger an emission on $bike. | |
| /// } | |
| /// | |
| /// nonisolated func waitForBikeToHalt() async { | |
| /// // self.$bike creates a new stream of values from self.bike | |
| /// for await bike in $bike where bike.speed == 0 { break } | |
| /// } | |
| /// } | |
| /// ``` | |
| @propertyWrapper | |
| public struct Streaming<Wrapped: Sendable>: Sendable { | |
| public typealias Stream = AsyncStream<Wrapped> | |
| public typealias Continuation = Stream.Continuation | |
| public typealias BufferingPolicy = Continuation.BufferingPolicy | |
| fileprivate typealias ProtectedState = OSAllocatedUnfairLock<State> | |
| private var value: Wrapped | |
| private let bufferingPolicy: BufferingPolicy | |
| private let protectedState: ProtectedState = .init(initialState: .init()) | |
| private let cleanup: Cleanup | |
| public init(wrappedValue: Wrapped, bufferingPolicy: BufferingPolicy = .bufferingNewest(1)) { | |
| self.value = wrappedValue | |
| self.bufferingPolicy = bufferingPolicy | |
| self.cleanup = Cleanup(protectedState) | |
| } | |
| public var wrappedValue: Wrapped { | |
| get { value } | |
| set { | |
| value = newValue | |
| let continuations = protectedState.withLock { $0.continuations.values } | |
| for continuation in continuations { | |
| continuation.yield(newValue) | |
| } | |
| } | |
| } | |
| nonisolated public var projectedValue: Stream { | |
| AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in | |
| let id = protectedState.withLock { state in | |
| let id = state.nextID | |
| state.nextID += 1 | |
| state.continuations[id] = continuation | |
| return id | |
| } | |
| continuation.onTermination = { [protectedState, id] _ in | |
| protectedState.withLock { $0.continuations[id] = nil } | |
| } | |
| continuation.yield(value) | |
| } | |
| } | |
| nonisolated public func finishAllStreams() { | |
| Self.finishAllStreams(with: protectedState) | |
| } | |
| nonisolated fileprivate static func finishAllStreams( | |
| with protectedState: ProtectedState | |
| ) { | |
| let continuations = protectedState.withLock { | |
| let continuations = $0.continuations.values | |
| $0.continuations = [:] | |
| return continuations | |
| } | |
| for continuation in continuations { | |
| continuation.finish() | |
| } | |
| } | |
| fileprivate struct State { | |
| var nextID: Int = 0 | |
| var continuations: [Int: Continuation] = [:] | |
| } | |
| private final class Cleanup: Sendable { | |
| private let protectedState: ProtectedState | |
| init(_ protectedState: ProtectedState) { | |
| self.protectedState = protectedState | |
| } | |
| deinit { | |
| Streaming.finishAllStreams(with: protectedState) | |
| } | |
| } | |
| } | |
| extension Streaming where Wrapped: Equatable { | |
| /// Apply a new value only if it differs from the current value. This | |
| /// prevents duplicates in streams. | |
| public mutating func update(to newValue: Wrapped) { | |
| guard wrappedValue != newValue else { return } | |
| wrappedValue = newValue | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment