Last active
October 10, 2025 07:06
-
-
Save TimoPtr/876163eca43d4c89ae10772b19b8cee9 to your computer and use it in GitHub Desktop.
Flow operator - delayFirst (delay the first item of a given duration)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @OptIn(ExperimentalTime::class) | |
| @VisibleForTesting | |
| internal fun <T> Flow<T>.delayFirst(delayDuration: Duration, clock: Clock = Clock.System): Flow<T> { | |
| lateinit var endDelay: Instant | |
| return buffer().onStart { | |
| endDelay = clock.now() + delayDuration | |
| }.onEach { | |
| val currentTime = clock.now() | |
| if (currentTime < endDelay) { | |
| delay(endDelay - currentTime) | |
| } | |
| } | |
| } | |
| /** | |
| * Delays the first emission of items from the upstream flow by the specified [delayDuration]. | |
| * Subsequent items are emitted without delay. This is useful when you want to delay the | |
| * initial processing of a flow without delaying the start of the upstream flow itself. | |
| * It also adds a buffer to the flow to ensure that the upstream flow can continue emitting items | |
| * while the first item is being delayed. | |
| * | |
| * Unlike using `onStart { delay(duration) }`, which postpones the start of the entire flow, | |
| * this operator allows the upstream flow to start immediately, and only delays the downstream flow. | |
| * | |
| * For example, if we have the following emissions from the upstream flow with a `delayDuration` of 500ms: | |
| * - `item0` at t0 = 0ms | |
| * - `item1` at t1 = 100ms | |
| * - `item2` at t2 = 550ms | |
| * Then the downstream flow would receive: | |
| * - `item0` and `item1` at t = 500ms (buffered and emitted immediately after the initial delay) | |
| * - `item2` at t = 550ms (emitted immediately as it arrives after the initial delay period) | |
| * | |
| * @param delayDuration The duration to delay the first item. | |
| */ | |
| @OptIn(ExperimentalTime::class) | |
| fun <T> Flow<T>.delayFirst(delayDuration: Duration): Flow<T> { | |
| return delayFirst(delayDuration, Clock.System) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @OptIn(ExperimentalCoroutinesApi::class) | |
| class FlowUtilTest { | |
| @Test | |
| fun `Given event sent before delay when consuming a flow with delayFirst then should emits all after delay without delay`() = runTest { | |
| flowOf(1, 2, 3) | |
| .delayFirst(1.seconds) | |
| .test { | |
| expectNoEvents() | |
| advanceTimeBy(500.milliseconds) | |
| expectNoEvents() | |
| advanceTimeBy(500.milliseconds) | |
| assertEquals(1, awaitItem()) | |
| assertEquals(2, awaitItem()) | |
| assertEquals(3, awaitItem()) | |
| awaitComplete() | |
| } | |
| } | |
| @OptIn(ExperimentalTime::class) | |
| @Test | |
| fun `Given event sent before delay and after delay when consuming a flow with delayFirst then should emits at the write time`() = runTest { | |
| val channel = Channel<Int>(Channel.UNLIMITED) | |
| val fakeClock = FakeClock() | |
| channel.consumeAsFlow() | |
| .delayFirst(1.seconds, fakeClock).test { | |
| expectNoEvents() | |
| channel.trySend(1) | |
| expectNoEvents() | |
| advanceTimeBy(1.seconds) | |
| runCurrent() | |
| assertEquals(1, expectMostRecentItem()) | |
| // We need to modify the clock manually since it is not advancing with advanceTimeBy | |
| fakeClock.currentInstant = fakeClock.currentInstant.plus(1.seconds) | |
| channel.trySend(2) | |
| assertEquals(2, expectMostRecentItem()) | |
| channel.close() | |
| awaitComplete() | |
| } | |
| } | |
| @Test | |
| fun `Given events before delay when consuming a flow with delayFirst then should not block upstream`() = runTest { | |
| val channel = Channel<Int>(Channel.UNLIMITED) | |
| var currentValueInUpstream: Int? = null | |
| channel.consumeAsFlow() | |
| .onEach { currentValueInUpstream = it } | |
| .delayFirst(1.seconds).test { | |
| expectNoEvents() | |
| channel.trySend(1) | |
| assertEquals(1, currentValueInUpstream) | |
| channel.trySend(2) | |
| assertEquals(2, currentValueInUpstream) | |
| expectNoEvents() | |
| advanceTimeBy(1.seconds) | |
| runCurrent() | |
| assertEquals(1, expectMostRecentItem()) | |
| assertEquals(2, awaitItem()) | |
| channel.close() | |
| awaitComplete() | |
| } | |
| } | |
| @OptIn(ExperimentalTime::class) | |
| @Test | |
| fun `Given events after delay when consuming a flow with delayFirst then no delay should apply`() = runTest { | |
| val channel = Channel<Int>(Channel.UNLIMITED) | |
| val fakeClock = FakeClock() | |
| channel.consumeAsFlow() | |
| .delayFirst(90.seconds, fakeClock).test { | |
| expectNoEvents() | |
| advanceTimeBy(90.seconds) | |
| runCurrent() | |
| // We need to modify the clock manually since it is not advancing with advanceTimeBy | |
| fakeClock.currentInstant = fakeClock.currentInstant.plus(90.seconds) | |
| val timeBeforeSend = testScheduler.currentTime | |
| channel.trySend(1) | |
| runCurrent() | |
| assertEquals(1, awaitItem()) | |
| // We want to assert that the item is sent right away | |
| assertEquals(0, testScheduler.currentTime - timeBeforeSend) | |
| channel.close() | |
| awaitComplete() | |
| } | |
| } | |
| @Test | |
| fun `Given no event when consuming a flow with delayFirst then should complete`() = runTest { | |
| flowOf<Int>() | |
| .delayFirst(1.seconds) | |
| .test { | |
| awaitComplete() | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment