Skip to content

Instantly share code, notes, and snippets.

@jeryini
Created April 3, 2015 07:40
Show Gist options
  • Select an option

  • Save jeryini/2ab888a0ae5a441e47d8 to your computer and use it in GitHub Desktop.

Select an option

Save jeryini/2ab888a0ae5a441e47d8 to your computer and use it in GitHub Desktop.
Using ThreadPoolExecutorDispatcher.
package com.jernejerin;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.ThreadPoolExecutorDispatcher;
import reactor.rx.broadcast.Broadcaster;
public class ReactorHelloWorld {
public static void main(String[] args) throws InterruptedException {
// If using Reactor’s static Environment instance, it must be
// initialized before calling Environment.get(). You could also do
// this in a static {} block.
Environment.initialize();
// A Broadcaster is a special kind of Stream that allows
// publishing of values. A Broadcaster is a subclass of Stream which
// exposes methods for publishing values into the pipeline. It is
// possible to publish discreet values typed to the generic type of
// the Stream as well as error conditions and the Reactive Streams
// "complete" signal via the onComplete() method.
Broadcaster<String> sink = Broadcaster.create(Environment.get());
// Default is RingBufferDispatcher, which extends a SingleThreadDispatcher.
Dispatcher dispatcher1 = Environment.cachedDispatcher();
// in the environment we can access the same dispatcher as we have assigned through the environment to the sink
Environment environment = Environment.get();
// Dispatch downstream tasks onto a load-balanced Dispatcher.
// Re-route incoming values into a dynamically created Stream for each unique key evaluated by the {param keyMapper}.
sink.groupBy(d -> d.hashCode() % 5)
.consume(grouped -> grouped.dispatchOn(new ThreadPoolExecutorDispatcher(5, 5, "mythread"))
// Transform input to upper-case and implicitly broadcast downstream.
.map(String::toUpperCase)
// Consume the transformed input and print to STDOUT.
.consume(s -> System.out.printf("t=%s,s=%s%n", Thread.currentThread(), s)));
// Publish a value into the Stream
sink.onNext("On thread 1");
sink.onNext("On thread 2");
sink.onNext("On thread 3");
sink.onNext("On thread 4");
sink.onNext("On thread 5");
// Block the main thread until work on other threads is complete,
// otherwise we won’t see any output.
Thread.sleep(1000);
}
}
@jeryini
Copy link
Author

jeryini commented Apr 7, 2015

Thank you for example! This works as expected, but what if I want to explicitly manage number of threads (hence why I was using ThreadPoolExecutorDispatcher) used for each operation in pipeline? For example consider I have a I/O intensive operation, that is why I want to have lots of threads, so that I do not block the only 4 Runtime processors. Basically I want to have Thread Pool with e.g. 10 threads, where I do some operation (e.g. String::toUpperCase), then for the next operation in pipeline, I want to have another Thread Pool with only 4 threads. Is this doable and how exactly would I write this in Reactor streams?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment