Created
January 22, 2023 16:09
-
-
Save nort3x/9605058cfd171eadff7bd0090337a2dc to your computer and use it in GitHub Desktop.
Embedded Kotlin FanOut BroadCaster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import kotlinx.coroutines.CoroutineScope | |
| import kotlinx.coroutines.SupervisorJob | |
| import kotlinx.coroutines.cancel | |
| import kotlinx.coroutines.channels.BufferOverflow | |
| import kotlinx.coroutines.flow.* | |
| import org.slf4j.LoggerFactory | |
| import org.springframework.stereotype.Component | |
| import java.util.concurrent.ConcurrentHashMap | |
| class BroadCaster { | |
| private val broadCasters = ConcurrentHashMap<Any, MutableSharedFlow<Any?>>() | |
| private val broadCasterScope = CoroutineScope(SupervisorJob()) | |
| private var stopped = false | |
| fun <K : Any, V : Any> sendToChannel(key: K, payload: V) { | |
| if (stopped) throw IllegalStateException("broadcaster stopped") else | |
| broadCasters.computeIfPresent(key) { _, v -> | |
| v.tryEmit(payload) | |
| LoggerFactory.getLogger(javaClass).debug("[$key] emitted -> $payload") | |
| v | |
| } | |
| } | |
| fun dropAll() { | |
| stopped = true | |
| broadCasterScope.cancel("stopped") | |
| broadCasters.clear() | |
| } | |
| @Suppress("UNCHECKED_CAST") | |
| fun <K : Any, V : Any> receiveFromChannel(key: K): SharedFlow<V> = | |
| if (stopped) throw IllegalStateException("broadcaster stopped") else | |
| broadCasters.computeIfAbsent(key) { | |
| val msf = MutableSharedFlow<V>(onBufferOverflow = BufferOverflow.DROP_OLDEST, extraBufferCapacity = 1) | |
| msf.subscriptionCount | |
| .drop(1) | |
| .onEach { | |
| LoggerFactory.getLogger(javaClass).debug("[$key] subscriber count changed: $it") | |
| if (it == 0) { | |
| broadCasters.remove(key) | |
| LoggerFactory.getLogger(javaClass).debug("[$key] broadcaster deleted") | |
| } | |
| }.launchIn(broadCasterScope) | |
| msf as MutableSharedFlow<Any?> | |
| }.asSharedFlow() as SharedFlow<V> | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment