Last active
December 10, 2025 19:40
-
-
Save mtmorgan/8bd7746eb48a3164c4c7ee6fb84f6f4c to your computer and use it in GitHub Desktop.
parallel progress with the future package
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ## install.packages("BiocManager") | |
| ## BiocManager::install("SharedObject") | |
| library(future) | |
| library(SharedObject) | |
| #' @description Record progress of several `future()` calls, each with | |
| #' a number of iterations that is only known when the future | |
| #' starts. | |
| #' | |
| ## | |
| ## Create a light-weight S3 class for iteration progress. | |
| ## | |
| #' @details `future_iteration_progress()` creates the object that will | |
| #' be used to track progress. The argument `n_tasks` is the number | |
| #' of tasks (`future()` calls) to be tracked. | |
| #' | |
| #' @param n_tasks The number of tasks (`future()` calls) to be tracked. | |
| #' | |
| #' @export | |
| future_iteration_progress <- function(n_tasks) { | |
| stopifnot(n_tasks >= 0) | |
| structure(list( | |
| n_iterations = share(integer(n_tasks), copyOnWrite = FALSE), | |
| ith_iteration = share(integer(n_tasks), copyOnWrite = FALSE) | |
| ), class="future_iteration_progress") | |
| } | |
| #' @details Call `reset_progress()` at the start of the futures | |
| #' calculation for which progress is being tracked. This resets | |
| #' the total number of iterations of each task to `NA`, and the | |
| #' number of iterations performed to 0. | |
| #' | |
| #' @param p An object created by `future_iteration_progress()`. | |
| #' | |
| #' @export | |
| reset_progress <- function(p) { | |
| ## reset_progress() allows for re-use of the progress bar. | |
| for (i in seq_along(p$n_iterations)) { | |
| p$n_iterations[i] <- NA_integer_ | |
| p$ith_iteration[i] <- 0L | |
| } | |
| } | |
| #' @details `format.future_iteration_progress()` is a method on the | |
| #' `format()` S3 generic. It creates a text string describing | |
| #' progress. The string consists of numbered tasks `1:n_tasks`, | |
| #' the number of completed iterations in the task, and the total | |
| #' number iterations expected. For example, `2: 3/11` indicates | |
| #' that task 2 has completed 3 of 11 iterations. The line | |
| #' concludes with a summary in square brackets of the number of | |
| #' tasks for which evaluation has started, the number of | |
| #' iterations performed, and the total number of iterations | |
| #' expected so far. E.g., `[3/7: 13/42]` indicates that 3 of 7 | |
| #' processes have started their iteration, and that 13 of the 42 | |
| #' iterations to be perfomed by the three processes have been | |
| #' completed. Note that the total number of iterations will | |
| #' increase as more tasks are initiated. | |
| #' | |
| #' @param x An object created by `future_iteration_progress()`. | |
| #' | |
| #' @param ... Ignored. | |
| #' | |
| #' @export | |
| format.future_iteration_progress <- function(x, ...) { | |
| n <- unshare(x$n_iterations) | |
| n_started <- sum(!is.na(n)) | |
| n[is.na(n)] <- 0L | |
| paste0(c( | |
| sprintf( | |
| "%s: %d/%d", | |
| seq_along(x$n_iterations), x$ith_iteration, n | |
| ), | |
| sprintf( | |
| "[%d/%d: %d/%d]", | |
| n_started, length(n), | |
| sum(x$ith_iteration), sum(n))), | |
| collapse = " " | |
| ) | |
| } | |
| #' @export | |
| print.future_iteration_progress <- function(x) { | |
| cat(format(x), "\n") | |
| } | |
| #' @details `n_iterations()` sets the number of iterations `n` | |
| #' expected for the `i`th task. This can be determined inside the | |
| #' `future` in which iterations are occuring. | |
| #' | |
| #' @param i The `i`th task being performed. | |
| #' | |
| #' @param n The number of iterations to be performed in the `i`th task. | |
| #' | |
| #' @export | |
| n_iterations <- function(p, i, n) { | |
| stopifnot( | |
| inherits(p, "future_iteration_progress"), | |
| is.numeric(i), i > 0, i <= length(p$n_iterations), | |
| is.numeric(n), n > 0 | |
| ) | |
| p$n_iterations[i] <- as.integer(n) | |
| p | |
| } | |
| #' @details `iterate()` is called each time an iteration is | |
| #' completed. The argument `p` is the object created by | |
| #' `future_iteration_progress()` and `i` the index of the task for | |
| #' which the iteration has been completed. | |
| #' | |
| #' @export | |
| iterate <- function(p, i) { | |
| stopifnot( | |
| inherits(p, "future_iteration_progress"), | |
| is.numeric(i), i > 0, i <= length(p$n_iterations), | |
| `iterating more than n_iterations` = | |
| p$ith_iteration[i] < p$n_iterations[i] | |
| ) | |
| p$ith_iteration[i] <- p$ith_iteration[i] + 1L | |
| p | |
| } | |
| progress_once <- function(p) { | |
| test <- anyNA(p$n_iterations) || | |
| (sum(p$n_iterations) != sum(p$ith_iteration)) | |
| message("\r", format(p), appendLF = !test) | |
| test | |
| } | |
| #' @details Call `progress()` after all futures have been | |
| #' launched. `progress()` blocks until all tasks have launched and | |
| #' all iterations completed. FIXME: Errors that prematurely stop | |
| #' iterations will cause `progress()` to block indefinitely. | |
| #' | |
| #' @export | |
| progress <- function(p) { | |
| while (progress_once(p)) { | |
| Sys.sleep(1) | |
| } | |
| } | |
| ## | |
| ## Basic illustration of functionality | |
| ## | |
| #' @examples | |
| #' p <- future_iteration_progress(5) | |
| #' p | |
| #' n_iterations(p, 3, 5) # task 3 has 5 total iterations | |
| #' iterate(p, 3) # iterate task 3 | |
| #' p ... 3: 1/5 ... | |
| #' reset_progress(p) | |
| ## | |
| ## A more extensive example. | |
| ## | |
| #' @examples | |
| #' ## Set up a dispatcher to allow fewer workers than queries without | |
| #' ## blocking. | |
| #' | |
| #' plan(multisession, workers = 2) | |
| #' | |
| #' ## Set the number of queries and workers. | |
| #' | |
| #' n_queries <- 7 | |
| #' n_workers <- 3 | |
| #' | |
| #' ## Create the iterator. | |
| #' | |
| #' p <- future_iteration_progress(n_queries) | |
| #' | |
| #' ## Perform the calculation asynchronously. | |
| #' | |
| #' results <- future({ | |
| #' ## Setup the dispatcher, including the plan for parallel evaluation. | |
| #' old_options <- options( | |
| #' ## Allow up to n_workers without a warning. | |
| #' parallelly.maxWorkers.localhost = c(n_workers, n_workers) | |
| #' ) | |
| #' plan(multisession, workers = n_workers) | |
| #' reset_progress(p) | |
| #' | |
| #' ## Iteration on each worker. | |
| #' result <- vector("list", n_workers) | |
| #' for (i in seq_len(n_queries)) { | |
| #' result[[i]] <- future({ | |
| #' n_pages <- as.integer(runif(1, 5, 15)) | |
| #' n_iterations(p, i, n_pages) | |
| #' for (j in seq_len(n_pages)) { | |
| #' Sys.sleep(runif(1)) | |
| #' iterate(p, i) | |
| #' } | |
| #' j | |
| #' }, seed = TRUE) | |
| #' } | |
| #' | |
| #' ## Clean up. | |
| #' options(old_options) | |
| #' v <- value(result) # blocking | |
| #' plan(sequential) # close connections | |
| #' v | |
| #' }, seed = TRUE) | |
| #' | |
| #' ## Track progress -- blocking. | |
| #' | |
| #' progress(p) | |
| #' | |
| #' ## Final value. | |
| #' | |
| #' unlist(value(results)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment