Last active
October 1, 2019 10:13
-
-
Save OhadC/e9126052b52e184108a164c9a1d7ce43 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { Observable } from "rxjs"; | |
| // code based on https://rxjs-dev.firebaseapp.com/guide/v6/pipeable-operators | |
| export const delayAndTakeLast = () => <T>(source: Observable<wrappedWithDelay<T>>) => | |
| new Observable<T>(observer => { | |
| let nextIndex = 0; | |
| let prevFiredIndex = -1; | |
| let delayedEvents: { index: number; event: T; timeout: any }[] = []; | |
| function onDestroy() { | |
| clearDelayedEvents(); | |
| } | |
| function dispatch(value: T, index: number) { | |
| clearDelayedEvents(index); | |
| observer.next(value); | |
| prevFiredIndex = index; | |
| } | |
| function clearDelayedEvents(toIndex = Number.MAX_SAFE_INTEGER) { | |
| delayedEvents = delayedEvents.filter(delayedEvent => { | |
| if (delayedEvent.index > toIndex) { | |
| return true; | |
| } else { | |
| clearTimeout(delayedEvent.timeout); | |
| return false; | |
| } | |
| }); | |
| } | |
| return source.subscribe({ | |
| next(value) { | |
| const index = nextIndex++; | |
| if (value.delay) { | |
| const timeout = setTimeout(() => { | |
| if (prevFiredIndex < index) { | |
| dispatch(value.event, index); | |
| } | |
| }, value.delay); | |
| delayedEvents.push({ | |
| event: value.event, | |
| index, | |
| timeout | |
| }); | |
| } else { | |
| dispatch(value.event, index); | |
| } | |
| }, | |
| error(err) { | |
| onDestroy(); | |
| observer.error(err); | |
| }, | |
| complete() { | |
| onDestroy(); | |
| observer.complete(); | |
| } | |
| }); | |
| }); | |
| type wrappedWithDelay<T> = { | |
| event: T; | |
| delay?: number; | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment