Created
October 7, 2025 16:50
-
-
Save nealeu/4746e7a2a9312ae2749264c4e08a426e to your computer and use it in GitHub Desktop.
How we can take an ordered Flux and do processor intensive operations in parallel while maintaining order
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class FluxUtils { | |
| /** | |
| * Map a flux of items in parallel using the provided executor. This method is suitable for large | |
| * streams as it buffers the flux into chunks to avoid overwhelming the executor. | |
| */ | |
| public static <T, R> Flux<R> orderedAsyncMap( | |
| Flux<T> flux, AsyncTaskExecutor executor, Function<T, R> mapper) { | |
| return flux.map(item -> executor.submitCompletable(() -> mapper.apply(item))) | |
| .buffer(AVAILABLE_PROCESSORS) | |
| .flatMap(futures -> Flux.fromIterable(futures).map(CompletableFuture::join)); | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
e.g. If using ReactiveMongoOperations to map large documents, instead of allowing Spring Data to do the mapping, get a Flux and then provide
document -> mongoConverter.read(YourEntity.class, document)as the mapper