There are two problems with this piece of code:
- it blocks the current thread - Monix is always defaulting to process things synchronously and you might argue that this
timeoutOnSlowDownstreamoperator violates the principle of least surprise, but if you want the timeout to work, it must not block the thread that executesonNext- consider that on top of Javascript it's not even possible
So it is better doing this and the timeout will get triggered:
def onNext(elem: Int) = Future {
sum += elem
Thread.sleep(20000)
Continue
}- this operator is about protecting the source and not the downstream consumer - if you'll take a look at the DownstreamTimeoutObservable implementation, it does send the
onErrorafter 1 second.
You can use the dump operator to debug this:
val task = source.take(5)
.map(x => 1)
.dump("Upstream")
.timeoutOnSlowDownstream(1.second)
.dump("Downstream")
.consumeWith(sumConsumer)After running it, you first get this output (immediately):
0: Upstream-->1
0: Downstream-->1
After 1 second the error is sent downstream and the upstream gets canceled:
1: Downstream-->monix.reactive.exceptions.DownstreamTimeoutException: Downstream timeout after 1 second
2: Upstream canceled
After another 9 seconds:
Yay, got timeout!
In other words, the error may have been emitted immediately, but that future got completed after 10 seconds, because it had to wait for the result of the final onNext before onError to be finally emitted.
You see, even though in the protocol of Observer we are allowed to send an onError without back-pressuring the last onNext, the subscribers given by users (in this instance your Subscriber) are wrapped in a SafeSubscriber instance by the user facing Observable.subscribe. This is because, otherwise, the logic users give in onNext can become concurrent with the logic in onComplete or onError and that's error prone. I'd hate to explain to users how concurrency works in regards to onComplete / onError not back-pressuring the final onNext, hence it's for people that know what they are doing and that can use Observable.unsafeSubscribeFn.
onErrorcan be called without back-pressuringonNext, howeverSafeSubscriberis wrapping user-facing subscribers because otherwise it would lead to concurrency issues, which means that your subscriber will receive that message after 20 seconds (when the lastonNextis complete`)- the
timeoutOnSlowDownstreamdoesn't insert an asynchronous boundary - I'm not sure if this was a good decision, but it's consistent with Monix's design, because async boundaries are expensive and should be explicit
I would love to get your thoughts on it.