Skip to content

Instantly share code, notes, and snippets.

@igabesz
Created October 29, 2021 00:24
Show Gist options
  • Select an option

  • Save igabesz/a6a36dc3403aac65c3622043d320c6f3 to your computer and use it in GitHub Desktop.

Select an option

Save igabesz/a6a36dc3403aac65c3622043d320c6f3 to your computer and use it in GitHub Desktop.
rxjs-challenge
import { interval, Observable, Subject } from 'rxjs';
import { filter, mapTo, pairwise, takeUntil } from 'rxjs/operators';
export class ProgressGenerator {
// I prefer explicit `destroyed$` over `untilDestroyed(this)` solutions
private readonly destroyed$ = new Subject<void>();
private readonly startEnded$: Observable<void>;
// This will be consumed externally.
readonly progress$ = new Subject<number>();
constructor(
private readonly state$: Observable<'start' | 'finish'>,
) {
this.state$.pipe(
filter(state => state === 'start'),
takeUntil(this.destroyed$),
).subscribe(() => this.onStart());
this.startEnded$ = state$.pipe(
pairwise(),
filter(([prev, current]) => prev === 'start' && current !== 'start'),
mapTo(undefined)); // For type conformity
}
destroy() {
this.destroyed$.next();
this.destroyed$.complete();
this.progress$.complete();
}
private onStart() {
interval(100).pipe(
// Some calculations based on the elapsed time
takeUntil(this.startEnded$),
takeUntil(this.destroyed$),
).subscribe(tick => this.onTick(tick));
}
private onTick(tick: number) {
// More calculations here
this.progress$.next(tick);
}
}
export class ProgressGenerator {
// <Omitted>
constructor(
private readonly state$: Observable<'start' | 'finish'>,
) {
this.state$.pipe(
filter(state => state === 'start'),
takeUntil(this.destroyed$),
).subscribe(() => this.onStart());
this.startEnded$ = state$.pipe(
pairwise(),
filter(([prev, current]) => prev === 'start' && current !== 'start'),
mapTo(undefined));
// We're debugging, so no proper cleanup here
this.startEnded$.subscribe(() => console.log('startEnded 1'))
}
private onStart() {
// We're debugging, so no proper cleanup here
this.startEnded$.subscribe(() => console.log('startEnded 2'))
interval(100).pipe(
takeUntil(this.startEnded$.pipe(tap(() => console.log('startEnded TAP')))),
takeUntil(this.destroyed$.pipe(tap(() => console.log('destroyed!')))),
).subscribe(tick => this.onTick(tick));
}
// <Omitted>
}
this.startEnded$ = state$.pipe(
pairwise(),
filter(([prev, current]) => prev === 'start' && current !== 'start'),
mapTo(undefined),
share()); // This fixes the stream for us! (Or does it?)
export class ProgressGenerator {
private readonly destroyed$ = new Subject<void>();
readonly progress$ = new Subject<number>();
constructor(
private readonly state$: Observable<'start' | 'finish'>,
) {
this.state$.pipe(
filter(state => state === 'start'),
takeUntil(this.destroyed$),
).subscribe(() => this.onStart());
// No startEnded$ here!
}
destroy() {
this.destroyed$.next();
this.destroyed$.complete();
this.progress$.complete();
}
private onStart() {
// Implementing again w/o the `pairwise` operator
const startEnded$ = this.state$.pipe(filter(state => state !== 'start'));
interval(100).pipe(
takeUntil(startEnded$),
takeUntil(this.destroyed$),
).subscribe(tick => this.onTick(tick));
}
private onTick(tick: number) {
this.progress$.next(tick);
}
}
interface ProgressGeneratorCtor {
new(state$: Observable<'start' | 'finish'>): {
progress$: Observable<number>,
destroy(): void;
},
}
export async function runExample(example: string): Promise<boolean> {
const state$ = new Subject<'start' | 'finish'>();
const ProgressGeneratorType = await loadProgressGenerator(example)
if (!ProgressGeneratorType) return false;
const progressGenerator = new ProgressGeneratorType(state$);
progressGenerator.progress$.subscribe(p => console.log('p', p));
console.log('Starting');
state$.next('start');
await new Promise(resolve => setTimeout(resolve, 500));
console.log('Finished -- no "p" logs expected below this');
state$.next('finish');
await new Promise(resolve => setTimeout(resolve, 500));
console.log('Teardown');
progressGenerator.destroy();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment