Skip to content

Instantly share code, notes, and snippets.

@jeryini
Created April 14, 2015 12:07
Show Gist options
  • Select an option

  • Save jeryini/c6d159d5100e013623b4 to your computer and use it in GitHub Desktop.

Select an option

Save jeryini/c6d159d5100e013623b4 to your computer and use it in GitHub Desktop.
Multiple stages for forking and joining.
package com.jernejerin.reactor.samples;
import reactor.Environment;
import reactor.core.DispatcherSupplier;
import reactor.rx.Streams;
import java.util.Arrays;
import java.util.List;
/**
* Created by Jernej Jerin on 7.4.2015.
*/
public class SimpleForkJoinPool2 {
public static void main(String[] args) throws InterruptedException {
Environment.initialize();
List<String> ids = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
DispatcherSupplier supplier1 = Environment.newCachedDispatchers(2, "pool1");
DispatcherSupplier supplier2 = Environment.newCachedDispatchers(5, "pool2");
Streams.from(ids)
.dispatchOn(Environment.sharedDispatcher())
.partition(2)
// here we receive multiple streams
.flatMap(stream -> stream
// we need to call dispatch on each stream
.dispatchOn(supplier1.get())
.map(s -> s + " " + Thread.currentThread().toString())
)
.map(t -> {
System.out.println(Thread.currentThread() + ", worker=" + t);
return t;
})
// Also tried to do another dispatch but with no success.
// .dispatchOn(Environment.sharedDispatcher())
.partition(5)
// here we receive multiple streams
.flatMap(stream -> stream
// we need to call dispatch on each stream
.dispatchOn(supplier2.get())
.map(s -> s + " " + Thread.currentThread().toString())
)
// worker threads should be funneled into the same, specific thread
// https://groups.google.com/forum/#!msg/reactor-framework/JO0hGftOaZs/20IhESjPQI0J
.consume(t -> System.out.println(Thread.currentThread() + ", worker=" + t));
Thread.sleep(500);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment