Skip to content

Instantly share code, notes, and snippets.

@NikolaiRuhe
Last active September 22, 2025 10:11
Show Gist options
  • Select an option

  • Save NikolaiRuhe/de31b999b8c8c58c65e4c2e8724dda74 to your computer and use it in GitHub Desktop.

Select an option

Save NikolaiRuhe/de31b999b8c8c58c65e4c2e8724dda74 to your computer and use it in GitHub Desktop.
A property wrapper that creates an AsyncStream yielding values when the property is set.
import os
/// A property wrapper that turns a stored property into an async stream.
///
/// Semantics are similar to Combine's `CurrentValueSubject`. Multiple clients
/// are allowed. Each client gets an individual stream.
///
/// The wrapped value itself inherits isolation from the surrounding type.
/// The stream is nonisolated, so it can be accessed from any execution context.
///
/// Usage:
/// ```
/// struct Bike: Sendable { ... }
///
/// @MainActor class MyModel {
/// @Streaming
/// var bike: Bike
///
/// func someInternalProcessing() {
/// bike.speed = 42 // This will trigger an emission on $bike.
/// }
///
/// nonisolated func waitForBikeToHalt() async {
/// // self.$bike creates a new stream of values from self.bike
/// for await bike in $bike where bike.speed == 0 { break }
/// }
/// }
/// ```
@propertyWrapper
public struct Streaming<Wrapped: Sendable>: Sendable {
public typealias Stream = AsyncStream<Wrapped>
public typealias Continuation = Stream.Continuation
public typealias BufferingPolicy = Continuation.BufferingPolicy
fileprivate typealias ProtectedState = OSAllocatedUnfairLock<State>
private var value: Wrapped
private let bufferingPolicy: BufferingPolicy
private let protectedState: ProtectedState = .init(initialState: .init())
private let cleanup: Cleanup
public init(wrappedValue: Wrapped, bufferingPolicy: BufferingPolicy = .bufferingNewest(1)) {
self.value = wrappedValue
self.bufferingPolicy = bufferingPolicy
self.cleanup = Cleanup(protectedState)
}
public var wrappedValue: Wrapped {
get { value }
set {
value = newValue
let continuations = protectedState.withLock { $0.continuations.values }
for continuation in continuations {
continuation.yield(newValue)
}
}
}
nonisolated public var projectedValue: Stream {
AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in
let id = protectedState.withLock { state in
let id = state.nextID
state.nextID += 1
state.continuations[id] = continuation
return id
}
continuation.onTermination = { [protectedState, id] _ in
protectedState.withLock { $0.continuations[id] = nil }
}
continuation.yield(value)
}
}
nonisolated public func finishAllStreams() {
Self.finishAllStreams(with: protectedState)
}
nonisolated fileprivate static func finishAllStreams(
with protectedState: ProtectedState
) {
let continuations = protectedState.withLock {
let continuations = $0.continuations.values
$0.continuations = [:]
return continuations
}
for continuation in continuations {
continuation.finish()
}
}
fileprivate struct State {
var nextID: Int = 0
var continuations: [Int: Continuation] = [:]
}
private final class Cleanup: Sendable {
private let protectedState: ProtectedState
init(_ protectedState: ProtectedState) {
self.protectedState = protectedState
}
deinit {
Streaming.finishAllStreams(with: protectedState)
}
}
}
extension Streaming where Wrapped: Equatable {
/// Apply a new value only if it differs from the current value. This
/// prevents duplicates in streams.
public mutating func update(to newValue: Wrapped) {
guard wrappedValue != newValue else { return }
wrappedValue = newValue
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment