Skip to content

Instantly share code, notes, and snippets.

@TimoPtr
Last active October 10, 2025 07:06
Show Gist options
  • Select an option

  • Save TimoPtr/876163eca43d4c89ae10772b19b8cee9 to your computer and use it in GitHub Desktop.

Select an option

Save TimoPtr/876163eca43d4c89ae10772b19b8cee9 to your computer and use it in GitHub Desktop.
Flow operator - delayFirst (delay the first item of a given duration)
@OptIn(ExperimentalTime::class)
@VisibleForTesting
internal fun <T> Flow<T>.delayFirst(delayDuration: Duration, clock: Clock = Clock.System): Flow<T> {
lateinit var endDelay: Instant
return buffer().onStart {
endDelay = clock.now() + delayDuration
}.onEach {
val currentTime = clock.now()
if (currentTime < endDelay) {
delay(endDelay - currentTime)
}
}
}
/**
* Delays the first emission of items from the upstream flow by the specified [delayDuration].
* Subsequent items are emitted without delay. This is useful when you want to delay the
* initial processing of a flow without delaying the start of the upstream flow itself.
* It also adds a buffer to the flow to ensure that the upstream flow can continue emitting items
* while the first item is being delayed.
*
* Unlike using `onStart { delay(duration) }`, which postpones the start of the entire flow,
* this operator allows the upstream flow to start immediately, and only delays the downstream flow.
*
* For example, if we have the following emissions from the upstream flow with a `delayDuration` of 500ms:
* - `item0` at t0 = 0ms
* - `item1` at t1 = 100ms
* - `item2` at t2 = 550ms
* Then the downstream flow would receive:
* - `item0` and `item1` at t = 500ms (buffered and emitted immediately after the initial delay)
* - `item2` at t = 550ms (emitted immediately as it arrives after the initial delay period)
*
* @param delayDuration The duration to delay the first item.
*/
@OptIn(ExperimentalTime::class)
fun <T> Flow<T>.delayFirst(delayDuration: Duration): Flow<T> {
return delayFirst(delayDuration, Clock.System)
}
@OptIn(ExperimentalCoroutinesApi::class)
class FlowUtilTest {
@Test
fun `Given event sent before delay when consuming a flow with delayFirst then should emits all after delay without delay`() = runTest {
flowOf(1, 2, 3)
.delayFirst(1.seconds)
.test {
expectNoEvents()
advanceTimeBy(500.milliseconds)
expectNoEvents()
advanceTimeBy(500.milliseconds)
assertEquals(1, awaitItem())
assertEquals(2, awaitItem())
assertEquals(3, awaitItem())
awaitComplete()
}
}
@OptIn(ExperimentalTime::class)
@Test
fun `Given event sent before delay and after delay when consuming a flow with delayFirst then should emits at the write time`() = runTest {
val channel = Channel<Int>(Channel.UNLIMITED)
val fakeClock = FakeClock()
channel.consumeAsFlow()
.delayFirst(1.seconds, fakeClock).test {
expectNoEvents()
channel.trySend(1)
expectNoEvents()
advanceTimeBy(1.seconds)
runCurrent()
assertEquals(1, expectMostRecentItem())
// We need to modify the clock manually since it is not advancing with advanceTimeBy
fakeClock.currentInstant = fakeClock.currentInstant.plus(1.seconds)
channel.trySend(2)
assertEquals(2, expectMostRecentItem())
channel.close()
awaitComplete()
}
}
@Test
fun `Given events before delay when consuming a flow with delayFirst then should not block upstream`() = runTest {
val channel = Channel<Int>(Channel.UNLIMITED)
var currentValueInUpstream: Int? = null
channel.consumeAsFlow()
.onEach { currentValueInUpstream = it }
.delayFirst(1.seconds).test {
expectNoEvents()
channel.trySend(1)
assertEquals(1, currentValueInUpstream)
channel.trySend(2)
assertEquals(2, currentValueInUpstream)
expectNoEvents()
advanceTimeBy(1.seconds)
runCurrent()
assertEquals(1, expectMostRecentItem())
assertEquals(2, awaitItem())
channel.close()
awaitComplete()
}
}
@OptIn(ExperimentalTime::class)
@Test
fun `Given events after delay when consuming a flow with delayFirst then no delay should apply`() = runTest {
val channel = Channel<Int>(Channel.UNLIMITED)
val fakeClock = FakeClock()
channel.consumeAsFlow()
.delayFirst(90.seconds, fakeClock).test {
expectNoEvents()
advanceTimeBy(90.seconds)
runCurrent()
// We need to modify the clock manually since it is not advancing with advanceTimeBy
fakeClock.currentInstant = fakeClock.currentInstant.plus(90.seconds)
val timeBeforeSend = testScheduler.currentTime
channel.trySend(1)
runCurrent()
assertEquals(1, awaitItem())
// We want to assert that the item is sent right away
assertEquals(0, testScheduler.currentTime - timeBeforeSend)
channel.close()
awaitComplete()
}
}
@Test
fun `Given no event when consuming a flow with delayFirst then should complete`() = runTest {
flowOf<Int>()
.delayFirst(1.seconds)
.test {
awaitComplete()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment