Skip to content

Instantly share code, notes, and snippets.

@OhadC
Last active October 1, 2019 10:13
Show Gist options
  • Select an option

  • Save OhadC/e9126052b52e184108a164c9a1d7ce43 to your computer and use it in GitHub Desktop.

Select an option

Save OhadC/e9126052b52e184108a164c9a1d7ce43 to your computer and use it in GitHub Desktop.
import { Observable } from "rxjs";
// code based on https://rxjs-dev.firebaseapp.com/guide/v6/pipeable-operators
export const delayAndTakeLast = () => <T>(source: Observable<wrappedWithDelay<T>>) =>
new Observable<T>(observer => {
let nextIndex = 0;
let prevFiredIndex = -1;
let delayedEvents: { index: number; event: T; timeout: any }[] = [];
function onDestroy() {
clearDelayedEvents();
}
function dispatch(value: T, index: number) {
clearDelayedEvents(index);
observer.next(value);
prevFiredIndex = index;
}
function clearDelayedEvents(toIndex = Number.MAX_SAFE_INTEGER) {
delayedEvents = delayedEvents.filter(delayedEvent => {
if (delayedEvent.index > toIndex) {
return true;
} else {
clearTimeout(delayedEvent.timeout);
return false;
}
});
}
return source.subscribe({
next(value) {
const index = nextIndex++;
if (value.delay) {
const timeout = setTimeout(() => {
if (prevFiredIndex < index) {
dispatch(value.event, index);
}
}, value.delay);
delayedEvents.push({
event: value.event,
index,
timeout
});
} else {
dispatch(value.event, index);
}
},
error(err) {
onDestroy();
observer.error(err);
},
complete() {
onDestroy();
observer.complete();
}
});
});
type wrappedWithDelay<T> = {
event: T;
delay?: number;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment