Skip to content

Instantly share code, notes, and snippets.

@nealeu
Created October 7, 2025 16:50
Show Gist options
  • Select an option

  • Save nealeu/4746e7a2a9312ae2749264c4e08a426e to your computer and use it in GitHub Desktop.

Select an option

Save nealeu/4746e7a2a9312ae2749264c4e08a426e to your computer and use it in GitHub Desktop.
How we can take an ordered Flux and do processor intensive operations in parallel while maintaining order
public class FluxUtils {
/**
* Map a flux of items in parallel using the provided executor. This method is suitable for large
* streams as it buffers the flux into chunks to avoid overwhelming the executor.
*/
public static <T, R> Flux<R> orderedAsyncMap(
Flux<T> flux, AsyncTaskExecutor executor, Function<T, R> mapper) {
return flux.map(item -> executor.submitCompletable(() -> mapper.apply(item)))
.buffer(AVAILABLE_PROCESSORS)
.flatMap(futures -> Flux.fromIterable(futures).map(CompletableFuture::join));
}
}
@nealeu
Copy link
Author

nealeu commented Oct 7, 2025

e.g. If using ReactiveMongoOperations to map large documents, instead of allowing Spring Data to do the mapping, get a Flux and then provide document -> mongoConverter.read(YourEntity.class, document) as the mapper

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment