Skip to content

Instantly share code, notes, and snippets.

@jeryini
Created April 3, 2015 07:40
Show Gist options
  • Select an option

  • Save jeryini/2ab888a0ae5a441e47d8 to your computer and use it in GitHub Desktop.

Select an option

Save jeryini/2ab888a0ae5a441e47d8 to your computer and use it in GitHub Desktop.
Using ThreadPoolExecutorDispatcher.
package com.jernejerin;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.ThreadPoolExecutorDispatcher;
import reactor.rx.broadcast.Broadcaster;
public class ReactorHelloWorld {
public static void main(String[] args) throws InterruptedException {
// If using Reactor’s static Environment instance, it must be
// initialized before calling Environment.get(). You could also do
// this in a static {} block.
Environment.initialize();
// A Broadcaster is a special kind of Stream that allows
// publishing of values. A Broadcaster is a subclass of Stream which
// exposes methods for publishing values into the pipeline. It is
// possible to publish discreet values typed to the generic type of
// the Stream as well as error conditions and the Reactive Streams
// "complete" signal via the onComplete() method.
Broadcaster<String> sink = Broadcaster.create(Environment.get());
// Default is RingBufferDispatcher, which extends a SingleThreadDispatcher.
Dispatcher dispatcher1 = Environment.cachedDispatcher();
// in the environment we can access the same dispatcher as we have assigned through the environment to the sink
Environment environment = Environment.get();
// Dispatch downstream tasks onto a load-balanced Dispatcher.
// Re-route incoming values into a dynamically created Stream for each unique key evaluated by the {param keyMapper}.
sink.groupBy(d -> d.hashCode() % 5)
.consume(grouped -> grouped.dispatchOn(new ThreadPoolExecutorDispatcher(5, 5, "mythread"))
// Transform input to upper-case and implicitly broadcast downstream.
.map(String::toUpperCase)
// Consume the transformed input and print to STDOUT.
.consume(s -> System.out.printf("t=%s,s=%s%n", Thread.currentThread(), s)));
// Publish a value into the Stream
sink.onNext("On thread 1");
sink.onNext("On thread 2");
sink.onNext("On thread 3");
sink.onNext("On thread 4");
sink.onNext("On thread 5");
// Block the main thread until work on other threads is complete,
// otherwise we won’t see any output.
Thread.sleep(1000);
}
}
@jbrisbin
Copy link

jbrisbin commented Apr 7, 2015

I don't think what you're doing in the .dispatchOn call is what you intend. You need to use a Dispatcher whose lifecycle is managed. You can create them manually, but you should only do that if you really, really know you need to do that. Otherwise you should only get them from the Environment.

Here's a variation that works like I would expect:

Broadcaster<Buffer> sink = Broadcaster.create(Environment.get());

int procs = Runtime.getRuntime().availableProcessors();
sink.groupBy(s -> s.hashCode() % procs)
    .consume(stream -> {
      stream.dispatchOn(Environment.cachedDispatcher())
            .consume(b -> LOG.info("{} from thread {}", b.readInt(), Thread.currentThread()));
    });

for (int i = 0; i < 10; i++) {
  sink.onNext(new Buffer().append(i).flip());
}
sink.onComplete();

Thread.sleep(500);

@jeryini
Copy link
Author

jeryini commented Apr 7, 2015

Thank you for example! This works as expected, but what if I want to explicitly manage number of threads (hence why I was using ThreadPoolExecutorDispatcher) used for each operation in pipeline? For example consider I have a I/O intensive operation, that is why I want to have lots of threads, so that I do not block the only 4 Runtime processors. Basically I want to have Thread Pool with e.g. 10 threads, where I do some operation (e.g. String::toUpperCase), then for the next operation in pipeline, I want to have another Thread Pool with only 4 threads. Is this doable and how exactly would I write this in Reactor streams?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment