Skip to content

Instantly share code, notes, and snippets.

@nort3x
Created January 22, 2023 16:09
Show Gist options
  • Select an option

  • Save nort3x/9605058cfd171eadff7bd0090337a2dc to your computer and use it in GitHub Desktop.

Select an option

Save nort3x/9605058cfd171eadff7bd0090337a2dc to your computer and use it in GitHub Desktop.
Embedded Kotlin FanOut BroadCaster
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.*
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
class BroadCaster {
private val broadCasters = ConcurrentHashMap<Any, MutableSharedFlow<Any?>>()
private val broadCasterScope = CoroutineScope(SupervisorJob())
private var stopped = false
fun <K : Any, V : Any> sendToChannel(key: K, payload: V) {
if (stopped) throw IllegalStateException("broadcaster stopped") else
broadCasters.computeIfPresent(key) { _, v ->
v.tryEmit(payload)
LoggerFactory.getLogger(javaClass).debug("[$key] emitted -> $payload")
v
}
}
fun dropAll() {
stopped = true
broadCasterScope.cancel("stopped")
broadCasters.clear()
}
@Suppress("UNCHECKED_CAST")
fun <K : Any, V : Any> receiveFromChannel(key: K): SharedFlow<V> =
if (stopped) throw IllegalStateException("broadcaster stopped") else
broadCasters.computeIfAbsent(key) {
val msf = MutableSharedFlow<V>(onBufferOverflow = BufferOverflow.DROP_OLDEST, extraBufferCapacity = 1)
msf.subscriptionCount
.drop(1)
.onEach {
LoggerFactory.getLogger(javaClass).debug("[$key] subscriber count changed: $it")
if (it == 0) {
broadCasters.remove(key)
LoggerFactory.getLogger(javaClass).debug("[$key] broadcaster deleted")
}
}.launchIn(broadCasterScope)
msf as MutableSharedFlow<Any?>
}.asSharedFlow() as SharedFlow<V>
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment