Skip to content

Instantly share code, notes, and snippets.

@Quelklef
Created December 8, 2025 05:32
Show Gist options
  • Select an option

  • Save Quelklef/bc5850e1dbb55494aef35c03f7c3aac7 to your computer and use it in GitHub Desktop.

Select an option

Save Quelklef/bc5850e1dbb55494aef35c03f7c3aac7 to your computer and use it in GitHub Desktop.
capability-polymorphic responsive implementaiton. also async-polymorphic!
/*
A responsive can admit any combination of the following
capabilities:
* WATCH: meaning it can be subscribed to
* POLL: meaning its current value can be queried
*/
exports.Responsive =
class Responsive {
// All arguments optional
constructor({ watch, poll }) {
// (T -> void) -> (() -> void)
//
// This MAY be async and MAY throw.
// Same for the canceller
this.watch = watch;
// () -> T
//
// This MAY be async and MAY throw
this.poll = poll;
}
static fromValue(v) {
return new Responsive({ poll: () => v });
}
static fromGetter(poll) {
return new Responsive({ poll });
}
// Map over a responsive (of any capabilities)
map(f) {
return new Responsive({
watch: this.watch && (cb => this.watch(v => bindp(v, f, cb))),
poll: this.poll && (() => bindp(this.poll(), f)),
});
}
// Like map but for an effectful action. Does not observe the value returned
perform(f) {
return this.map(x => (f(x), x));
}
// Most WATCH responsives defer the actual generation of events until a
// callback is registered, and generate a separate stream of events for
// each callback registered
//
// Calling startNow() instructs the responsive to begin event generation
// now, and causes callback registration to simply observe the existing
// stream of events.
//
// Returns a 2-tuple [cancel, responsive]. The first element is a
// canceller for the initialized stream, and the second is the generated
// responsive, which is WATCH and is additionally POLL if the input was.
//
// If you don't need the canceller, use startNow_() instead
startNow() {
if (!this.watch) throw Error(`Can only startNow() a WATCH responsive`);
const watchers = []
const cancel = this.watch(val => mapp(watchers, cb => cb(val)));
return [
cancel,
new Responsive({
poll: this.poll,
watch: cb => {
watchers.push(cb);
return () => {
const idx = watchers.indexOf(cb);
if (idx !== -1) watchers.splice(idx, 1);
};
},
}),
]
}
// Like startNow(), but only the generated responsive is returned, not
// the canceller.
startNow_() {
const [cancel, responsive] = this.startNow();
return responsive;
}
// Replace emitted events and/or polled values with the results
// of the provided poll functon
//
// Accepts a responsive of any capabilities
//
// Produces a responsive which is both WATCH and POLL
override(poll) {
return new Responsive({
watch: this.watch && (cb => this.watch(() => bindp(poll(), cb))),
poll: () => poll(),
});
}
// Filter a WATCH. The result is WATCH-only
filter(pred) {
if (!this.watch) throw Error(`Can only filter() a WATCH responsive`);
return new Responsive({
watch: cb => this.watch(v => bindp(v, pred, bool => bool && cb(v))),
});
}
// Filter a WATCH to certain values. The result is WATCH-only
filterEq(val) {
return this.filter(v => v === val);
}
// Instruct a POLL to emit its value when a WATCH fires.
// The result is both POLL and WATCH
pollOn(other) {
if (!this.poll) throw Error(`Can only pollOn() a POLL responsive`);
if (!other.watch) throw Error(`Can only pollOn() with a WATCH responsive`);
return new Responsive({
poll: () => this.poll(),
watch: cb => other.watch(() => bindp(this.poll(), cb)),
});
}
// Instruct a POLL to emit its value every n seconds,
// producing a responsive that is both POLL and WATCH
pollEvery({ seconds }) {
return new Responsive({
poll: () => this.poll(),
watch: cb => {
const tid = setInterval(() => bindp(this.poll(), cb), seconds * 1000);
return () => clearInterval(tid);
},
});
}
static every({ seconds }) {
return Responsive.fromValue(undefined).pollEvery({ seconds });
}
// Union of WATCHes. The result is WATCH-only
static union(...rs) {
return new Responsive({
watch: cb => {
const cancels = mapp(rs, r => r.watch(cb));
return bindp(cancels, cs => () => mapp(cs, c => c()));
},
});
}
// Synonym for Responsive.union(this, ...others)
union(...others) {
return Responsive.union(this, ...others);
}
// Creates a POLL giving access to the current time, as a Date
static currentTime() {
return new Responsive({ poll: () => new Date });
}
// Track the most recent value of a WATCH, producing a POLL
trackLatest(initial = undefined) {
if (!this.watch) throw Error(`Can only trackLatest() on a WATCH responsive`);
let latest = initial;
return bindp(
this.watch(val => latest = val),
() => new Responsive({ poll: () => latest }),
);
}
}
// pipe a value through a sequence of functions
// returns a promise if the value is a promise or any function returns a promise
function bindp(v, ...fs) {
for (const f of fs) {
if (v instanceof Promise) {
v = v.then(x => f(x));
} else {
v = f(v);
}
}
return v;
}
// Like Array.map, but returns a promise if any element maps to a promise
function mapp(xs, f) {
let isPromise = false;
const ys = xs.map(x => {
const y = f(x);
isPromise ||= y instanceof Promise;
return y;
});
return isPromise ? Promise.all(ys) : ys;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment