Skip to content

Instantly share code, notes, and snippets.

@mtmorgan
Last active December 10, 2025 19:40
Show Gist options
  • Select an option

  • Save mtmorgan/8bd7746eb48a3164c4c7ee6fb84f6f4c to your computer and use it in GitHub Desktop.

Select an option

Save mtmorgan/8bd7746eb48a3164c4c7ee6fb84f6f4c to your computer and use it in GitHub Desktop.
parallel progress with the future package
## install.packages("BiocManager")
## BiocManager::install("SharedObject")
library(future)
library(SharedObject)
#' @description Record progress of several `future()` calls, each with
#' a number of iterations that is only known when the future
#' starts.
#'
##
## Create a light-weight S3 class for iteration progress.
##
#' @details `future_iteration_progress()` creates the object that will
#' be used to track progress. The argument `n_tasks` is the number
#' of tasks (`future()` calls) to be tracked.
#'
#' @param n_tasks The number of tasks (`future()` calls) to be tracked.
#'
#' @export
future_iteration_progress <- function(n_tasks) {
stopifnot(n_tasks >= 0)
structure(list(
n_iterations = share(integer(n_tasks), copyOnWrite = FALSE),
ith_iteration = share(integer(n_tasks), copyOnWrite = FALSE)
), class="future_iteration_progress")
}
#' @details Call `reset_progress()` at the start of the futures
#' calculation for which progress is being tracked. This resets
#' the total number of iterations of each task to `NA`, and the
#' number of iterations performed to 0.
#'
#' @param p An object created by `future_iteration_progress()`.
#'
#' @export
reset_progress <- function(p) {
## reset_progress() allows for re-use of the progress bar.
for (i in seq_along(p$n_iterations)) {
p$n_iterations[i] <- NA_integer_
p$ith_iteration[i] <- 0L
}
}
#' @details `format.future_iteration_progress()` is a method on the
#' `format()` S3 generic. It creates a text string describing
#' progress. The string consists of numbered tasks `1:n_tasks`,
#' the number of completed iterations in the task, and the total
#' number iterations expected. For example, `2: 3/11` indicates
#' that task 2 has completed 3 of 11 iterations. The line
#' concludes with a summary in square brackets of the number of
#' tasks for which evaluation has started, the number of
#' iterations performed, and the total number of iterations
#' expected so far. E.g., `[3/7: 13/42]` indicates that 3 of 7
#' processes have started their iteration, and that 13 of the 42
#' iterations to be perfomed by the three processes have been
#' completed. Note that the total number of iterations will
#' increase as more tasks are initiated.
#'
#' @param x An object created by `future_iteration_progress()`.
#'
#' @param ... Ignored.
#'
#' @export
format.future_iteration_progress <- function(x, ...) {
n <- unshare(x$n_iterations)
n_started <- sum(!is.na(n))
n[is.na(n)] <- 0L
paste0(c(
sprintf(
"%s: %d/%d",
seq_along(x$n_iterations), x$ith_iteration, n
),
sprintf(
"[%d/%d: %d/%d]",
n_started, length(n),
sum(x$ith_iteration), sum(n))),
collapse = " "
)
}
#' @export
print.future_iteration_progress <- function(x) {
cat(format(x), "\n")
}
#' @details `n_iterations()` sets the number of iterations `n`
#' expected for the `i`th task. This can be determined inside the
#' `future` in which iterations are occuring.
#'
#' @param i The `i`th task being performed.
#'
#' @param n The number of iterations to be performed in the `i`th task.
#'
#' @export
n_iterations <- function(p, i, n) {
stopifnot(
inherits(p, "future_iteration_progress"),
is.numeric(i), i > 0, i <= length(p$n_iterations),
is.numeric(n), n > 0
)
p$n_iterations[i] <- as.integer(n)
p
}
#' @details `iterate()` is called each time an iteration is
#' completed. The argument `p` is the object created by
#' `future_iteration_progress()` and `i` the index of the task for
#' which the iteration has been completed.
#'
#' @export
iterate <- function(p, i) {
stopifnot(
inherits(p, "future_iteration_progress"),
is.numeric(i), i > 0, i <= length(p$n_iterations),
`iterating more than n_iterations` =
p$ith_iteration[i] < p$n_iterations[i]
)
p$ith_iteration[i] <- p$ith_iteration[i] + 1L
p
}
progress_once <- function(p) {
test <- anyNA(p$n_iterations) ||
(sum(p$n_iterations) != sum(p$ith_iteration))
message("\r", format(p), appendLF = !test)
test
}
#' @details Call `progress()` after all futures have been
#' launched. `progress()` blocks until all tasks have launched and
#' all iterations completed. FIXME: Errors that prematurely stop
#' iterations will cause `progress()` to block indefinitely.
#'
#' @export
progress <- function(p) {
while (progress_once(p)) {
Sys.sleep(1)
}
}
##
## Basic illustration of functionality
##
#' @examples
#' p <- future_iteration_progress(5)
#' p
#' n_iterations(p, 3, 5) # task 3 has 5 total iterations
#' iterate(p, 3) # iterate task 3
#' p ... 3: 1/5 ...
#' reset_progress(p)
##
## A more extensive example.
##
#' @examples
#' ## Set up a dispatcher to allow fewer workers than queries without
#' ## blocking.
#'
#' plan(multisession, workers = 2)
#'
#' ## Set the number of queries and workers.
#'
#' n_queries <- 7
#' n_workers <- 3
#'
#' ## Create the iterator.
#'
#' p <- future_iteration_progress(n_queries)
#'
#' ## Perform the calculation asynchronously.
#'
#' results <- future({
#' ## Setup the dispatcher, including the plan for parallel evaluation.
#' old_options <- options(
#' ## Allow up to n_workers without a warning.
#' parallelly.maxWorkers.localhost = c(n_workers, n_workers)
#' )
#' plan(multisession, workers = n_workers)
#' reset_progress(p)
#'
#' ## Iteration on each worker.
#' result <- vector("list", n_workers)
#' for (i in seq_len(n_queries)) {
#' result[[i]] <- future({
#' n_pages <- as.integer(runif(1, 5, 15))
#' n_iterations(p, i, n_pages)
#' for (j in seq_len(n_pages)) {
#' Sys.sleep(runif(1))
#' iterate(p, i)
#' }
#' j
#' }, seed = TRUE)
#' }
#'
#' ## Clean up.
#' options(old_options)
#' v <- value(result) # blocking
#' plan(sequential) # close connections
#' v
#' }, seed = TRUE)
#'
#' ## Track progress -- blocking.
#'
#' progress(p)
#'
#' ## Final value.
#'
#' unlist(value(results))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment