Created
June 18, 2016 21:13
-
-
Save ruudud/b1b11459981d7615f6efd86562b60429 to your computer and use it in GitHub Desktop.
Working with streams, futures, threads and executor service in Java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.github.ruudud.exectest; | |
| import com.google.common.base.Stopwatch; | |
| import com.google.common.collect.ImmutableList; | |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; | |
| import java.util.List; | |
| import java.util.concurrent.*; | |
| import java.util.stream.Collectors; | |
| public class ExecTest { | |
| public static void main(String[] args) throws InterruptedException { | |
| final int threads = 4; | |
| final ExecutorService executorService = new ThreadPoolExecutor( | |
| threads, threads, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), | |
| new ThreadFactoryBuilder().setNameFormat("foo-%d").build()); | |
| final String logTemplate = "%s method, Actual time spent: %d ms, Total time slept in threads: %d ms"; | |
| final List<String> objects = ImmutableList.of("1-Foo", "2-Bar", "3-Baz", "4-Bif", "5-Bof", "6-Baf"); | |
| final Stopwatch stopwatch = Stopwatch.createStarted(); | |
| final Long totalSleepFirst = submitAndGet(executorService, objects); | |
| System.out.println(String.format(logTemplate, "Submit get", stopwatch.elapsed(TimeUnit.MILLISECONDS), totalSleepFirst)); | |
| stopwatch.reset().start(); | |
| final List<Callable<Long>> callables = objects.stream() | |
| .map(DoAThing::new) | |
| .collect(Collectors.toList()); | |
| final Long totalSleepSnd = invokeAllCallables(executorService, callables); | |
| System.out.println(String.format(logTemplate, "Callables invokeAll", stopwatch.elapsed(TimeUnit.MILLISECONDS), totalSleepSnd)); | |
| } | |
| private static Long invokeAllCallables(ExecutorService executorService, List<Callable<Long>> callables) throws InterruptedException { | |
| return executorService.invokeAll(callables).stream() | |
| .map(f -> { | |
| try { | |
| return f.get(8L, TimeUnit.SECONDS); | |
| } catch (Exception e) { | |
| System.out.println("Timed out after 8 seconds."); | |
| return 1L; | |
| } | |
| }).collect(Collectors.summingLong(Long::longValue)); | |
| } | |
| private static Long submitAndGet(ExecutorService executorService, List<String> customers) { | |
| return customers.stream() | |
| .map(c -> executorService.submit(() -> { | |
| final long elapse = (long) (250 + Math.floor(5000 * Math.random())); | |
| System.out.println(c + ": Doing in " + elapse + "ms..."); | |
| Thread.sleep(elapse); | |
| System.out.println(c + ": Done sleeping for " + elapse + "ms..."); | |
| return elapse; | |
| })) | |
| .map(f -> { | |
| try { | |
| return f.get(8L, TimeUnit.SECONDS); | |
| } catch (Exception e) { | |
| System.out.println("Timed out after 8 seconds."); | |
| return 1L; | |
| } | |
| }).collect(Collectors.summingLong(Long::longValue)); | |
| } | |
| private static class DoAThing implements Callable<Long> { | |
| private final String name; | |
| public DoAThing(final String name) { | |
| this.name = name; | |
| } | |
| @Override | |
| public Long call() throws Exception { | |
| final long elapse = (long) (250 + Math.floor(5000 * Math.random())); | |
| System.out.println(this.name + ": Doing in " + elapse + "ms..."); | |
| Thread.sleep(elapse); | |
| System.out.println(this.name + ": Done sleeping for " + elapse + "ms..."); | |
| return elapse; | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I ended up here while searching for why my stream + executor service submit method was running so slow.
The fix was to introduce a stream terminal operation to ensure all operations are
submitted first and then their futures aregetted.More information on the execution order of streams:
https://stackoverflow.com/questions/29915591/java-8-stream-operations-execution-order