Last active
October 25, 2016 10:41
-
-
Save Piasy/613ded2cee703b853464acd1645c7687 to your computer and use it in GitHub Desktop.
Demonstrate ValueRequestOperator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.Random; | |
| import java.util.concurrent.TimeUnit; | |
| import java.util.concurrent.atomic.AtomicInteger; | |
| import java.util.concurrent.atomic.AtomicReference; | |
| import org.junit.Test; | |
| import rx.Emitter; | |
| import rx.Observable; | |
| import rx.Producer; | |
| import rx.Subscriber; | |
| import rx.Subscription; | |
| import rx.schedulers.Schedulers; | |
| public class ProducerTest { | |
| private static Observable<Long> source() { | |
| return Observable.fromEmitter(emitter -> { | |
| new Thread(() -> { | |
| while (true) { | |
| long value = System.currentTimeMillis(); | |
| emitter.onNext(value); | |
| sleep(10); | |
| } | |
| }, "source thread").start(); | |
| }, Emitter.BackpressureMode.DROP); | |
| } | |
| private static void sleep(long millis) { | |
| try { | |
| Thread.sleep(millis); | |
| } catch (InterruptedException e) { | |
| // ignore | |
| } | |
| } | |
| private static void log(String message) { | |
| System.out.println(message | |
| + " @ " | |
| + Thread.currentThread().getName() | |
| + ", " | |
| + System.currentTimeMillis()); | |
| } | |
| @Test | |
| public void test() { | |
| final Random random = new Random(System.currentTimeMillis()); | |
| Subscription subscription = source() | |
| .observeOn(Schedulers.computation(), 1) | |
| .map(value -> { | |
| sleep(50 + random.nextInt(50)); // simulate heavy job | |
| if (value % 2 == 0) { | |
| return String.valueOf(value); | |
| } else { | |
| return ""; | |
| } | |
| }) | |
| .filter(str -> str.length() > 0) | |
| .observeOn(Schedulers.io(), 1) | |
| .lift(new ValueRequestOperator<>()) | |
| .subscribe(new Subscriber<String>() { | |
| @Override | |
| public void onStart() { | |
| request(1); | |
| } | |
| @Override | |
| public void onCompleted() { | |
| } | |
| @Override | |
| public void onError(Throwable e) { | |
| } | |
| @Override | |
| public void onNext(String s) { | |
| log("final subscriber got " + s); | |
| Observable.timer(1, TimeUnit.SECONDS) | |
| .subscribe(l -> { | |
| log("request another"); | |
| request(1); | |
| }); | |
| } | |
| }); | |
| sleep(10_000); | |
| subscription.unsubscribe(); | |
| } | |
| static class ValueRequestOperator<T> implements Observable.Operator<T, T> { | |
| @Override | |
| public Subscriber<? super T> call(Subscriber<? super T> child) { | |
| ValueRequestProducer<T> producer = new ValueRequestProducer<>(child); | |
| Subscriber<T> subscriber = new Subscriber<T>() { | |
| @Override | |
| public void onCompleted() { | |
| child.onCompleted(); | |
| } | |
| @Override | |
| public void onError(Throwable e) { | |
| child.onError(e); | |
| } | |
| @Override | |
| public void onNext(T s) { | |
| producer.setValue(s); | |
| } | |
| }; | |
| child.setProducer(producer); | |
| return subscriber; | |
| } | |
| } | |
| static class ValueRequestProducer<T> implements Producer { | |
| private static final int NO_REQ_NO_VALUE = 1; | |
| private static final int NO_REQ_HAS_VALUE = 2; | |
| private static final int HAS_REQ_NO_VALUE = 3; | |
| private final AtomicInteger mState = new AtomicInteger(NO_REQ_NO_VALUE); | |
| private final AtomicReference<T> mValue = new AtomicReference<>(); | |
| private final Subscriber<? super T> mChild; | |
| ValueRequestProducer(Subscriber<? super T> child) { | |
| mChild = child; | |
| } | |
| public void setValue(T value) { | |
| while (true) { | |
| int state = mState.get(); | |
| if (state == NO_REQ_NO_VALUE) { | |
| mValue.set(value); | |
| if (!mState.compareAndSet(NO_REQ_NO_VALUE, NO_REQ_HAS_VALUE)) { | |
| continue; | |
| } | |
| } else if (state == HAS_REQ_NO_VALUE) { | |
| if (mState.compareAndSet(HAS_REQ_NO_VALUE, NO_REQ_NO_VALUE)) { | |
| if (!mChild.isUnsubscribed()) { | |
| mChild.onNext(value); | |
| mValue.set(null); | |
| } | |
| } | |
| } else if (state == NO_REQ_HAS_VALUE) { | |
| mValue.set(value); | |
| } | |
| return; | |
| } | |
| } | |
| @Override | |
| public void request(long n) { | |
| if (n == 0) { | |
| return; | |
| } | |
| if (n < 0) { | |
| throw new IllegalStateException("Request can't be negative! " + n); | |
| } | |
| while (true) { | |
| int state = mState.get(); | |
| if (state == NO_REQ_NO_VALUE) { | |
| if (!mState.compareAndSet(NO_REQ_NO_VALUE, HAS_REQ_NO_VALUE)) { | |
| continue; | |
| } | |
| } else if (state == NO_REQ_HAS_VALUE) { | |
| if (mState.compareAndSet(NO_REQ_HAS_VALUE, NO_REQ_NO_VALUE)) { | |
| if (!mChild.isUnsubscribed()) { | |
| T value = mValue.getAndSet(null); | |
| if (value != null && !mChild.isUnsubscribed()) { | |
| mChild.onNext(value); | |
| } | |
| } | |
| } | |
| } | |
| return; | |
| } | |
| } | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The problem:
We have a data source, which will keep emitting values in a separate thread, then we need to process the emitted value, which will take some time, during the process, we may or may not send a value to downstream. At the subscriber, we need to show a dialog to the user, when the dialog is showing, no event should be sent to the subscriber.
The code above simulate this process, we use the
source()function to generate values, and use amap()operator to simulate to process, then we usefilter()to filter out illegal values, at last welift()theValueRequestOperatoron it. Besides, we may need to switch executing thread of different steps, doing the process incomputationthread, and responding to the value inmainthread (here I useioin order to run the test in JVM). At last, we use a "1 second" later request to simulate user's action on the dialog.The
ValueRequestOperatorworks like theSingleDelayedProducerin http://akarnokd.blogspot.com/2015/05/operator-concurrency-primitives_86.html, except its state machine.So I have two questions:
ValueRequestOperatorcorrect?