From fde94b50bb007fda48dd01c5837ac395ac6a0f78 Mon Sep 17 00:00:00 2001 From: Jiefei Wang Date: Tue, 31 May 2022 16:37:27 -0400 Subject: [PATCH] Add balancer for evenly distributing the tasks across workers 1. The default balancer is "stepwise" for `bplapply` and "sequential" for `bpiterate` 2. add two options `lapplyBalancer` and 'iterateBalancer' to `bpoptions` 3. use snake case --- DESCRIPTION | 3 +- R/balancer.R | 150 +++++++++++++++ R/bploop.R | 311 +++++++++++++++++++------------ R/bpoptions.R | 2 + R/reducer.R | 126 ++++--------- R/rng.R | 48 +++++ R/utilities.R | 65 +++++-- R/worker.R | 17 +- inst/unitTests/test_DoparParam.R | 14 +- inst/unitTests/test_balancer.R | 173 +++++++++++++++++ inst/unitTests/test_bploop.R | 160 +++++++--------- inst/unitTests/test_rng.R | 55 +++++- man/bpoptions.Rd | 7 +- 13 files changed, 805 insertions(+), 326 deletions(-) create mode 100644 R/balancer.R create mode 100644 inst/unitTests/test_balancer.R diff --git a/DESCRIPTION b/DESCRIPTION index 3ade439f..6396ef66 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -40,7 +40,8 @@ Collate: AllGenerics.R DeveloperInterface.R prototype.R SerialParam-class.R DoparParam-class.R SnowParam-utils.R BatchJobsParam-class.R BatchtoolsParam-class.R progress.R ipcmutex.R utilities.R rng.R bpinit.R reducer.R worker.R - bpoptions.R cpp11.R + bpoptions.R balancer.R + cpp11.R LinkingTo: BH, cpp11 VignetteBuilder: knitr RoxygenNote: 7.1.2 diff --git a/R/balancer.R b/R/balancer.R new file mode 100644 index 00000000..fa0724e8 --- /dev/null +++ b/R/balancer.R @@ -0,0 +1,150 @@ +## Find the balancer generator function given the name or the function +.find_balancer <- + function(type = c("lapply", "iterate"), balancer = NULL) +{ + type <- match.arg(type) + if (is.null(balancer)) { + if (type == "lapply") + balancer <- "stepwise" + else + balancer <- "sequential" + } + + if (is.character(balancer)) { + if (type == "lapply") { + if (balancer == "sequential") + return(.balancer_sequential_lapply) + if (balancer == "random") + return(.balancer_random_lapply) + if (balancer == "stepwise") + return(.balancer_stepwise_lapply) + } else { + if (balancer == "sequential") + return(.balancer_sequential_iterate) + } + } + + if (is.function(balancer)) + return(balancer) + stop("Unrecognized balancer") +} + + + +################## +## bplapply balancer generator: +## Input: +## 1. n: The length of the vector `X` +## 2. BPPARAM: The parallel backend, you should respect the value +## bptasks(BPPARAM) as much as possible +## Output: a list with two functions +## 1. next_task(): The next Task, the return value is a list with +## - task_id: An arbitrary index used by the balancer to identify the task +## - index: The indices of a subset of X that will be evaluated as a task +## (The indices must be increasing!) +## 2. record(worker, task_id, time): record the task execution time in seconds +## argument: +## - worker: which worker is responsible for this task +## - task_id: The id generated by nextTask() +## - time: The execution time in seconds +################## + +## A simple balancer to equally divide the vector X into tasks. +.balancer_sequential_lapply <- + function(n, BPPARAM) +{ + ## How many elements in a task? + ntask <- .ntask(n, bpnworkers(BPPARAM), bptasks(BPPARAM)) + elements_per_task <- ceiling(n/ntask) + task_id <- 0L + start <- 1L + list( + record = function(node, task_id, time) { + # message("Node:", node, ",id:", task_id, ",time:", time) + }, + next_task = function() { + upper <- min(n, start + elements_per_task - 1L) + index <- seq.int(start, upper) + start <<- start + length(index) + task_id <<- task_id + 1L + list( + task_id = task_id, + index = index + ) + } + ) +} + +## Randomly sample the vector INDEX. +.balancer_random_lapply <- + function(n, BPPARAM) +{ + .rng_internal_stream$set() + on.exit(.rng_internal_stream$reset()) + ## How many elements in a task? + ntask <- .ntask(n, bpnworkers(BPPARAM), bptasks(BPPARAM)) + elements_per_task <- ceiling(n/ntask) + random_index <- sample.int(n) + task_id <- 0L + start <- 1L + list( + record = function(node, task_id, time) { + # message("Node:", node, ",id:", task_id, ",time:", time) + }, + next_task = function() { + upper <- min(n, start + elements_per_task - 1L) + index <- sort(random_index[seq.int(start, upper)]) + start <<- start + length(index) + task_id <<- task_id + 1L + list( + task_id = task_id, + index = index + ) + } + ) +} + +## Randomly sample the vector INDEX. +.balancer_stepwise_lapply <- + function(n, BPPARAM) +{ + ntask <- .ntask(n, bpnworkers(BPPARAM), bptasks(BPPARAM)) + task_id <- 0L + list( + record = function(node, task_id, time) { + # message("Node:", node, ",id:", task_id, ",time:", time) + }, + next_task = function() { + task_id <<- task_id + 1L + index <- seq.int(task_id, n, by = ntask) + list( + task_id = task_id, + index = index + ) + } + ) +} + +.balancer_sequential_iterate <- + function(ITER, BPPARAM) +{ + force(ITER) + task_id <- 0L + list( + record = function(node, task_id, time) { + # message("Node:", node, ",id:", task_id, ",time:", time) + }, + next_task = function(){ + task_id <<- task_id + 1L + ## the task value must be a list when it is not empty + value <- ITER() + if (!is.null(value)) + value <- list(value) + list( + task_id = task_id, + index = task_id, + value = value + ) + } + ) +} diff --git a/R/bploop.R b/R/bploop.R index 49b9558b..60df5558 100644 --- a/R/bploop.R +++ b/R/bploop.R @@ -3,7 +3,7 @@ ## collect the results from the workers .collect_result <- - function(manager, reducer, progress, BPPARAM) + function(manager, task_iterator, reducer, tasks_index, progress, BPPARAM) { data_list <- .manager_recv(manager) success <- rep(TRUE, length(data_list)) @@ -12,19 +12,30 @@ ## in `.send` and possibly other elements used by the backend d <- data_list[[i]] - value <- d$value$value - njob <- d$value$tag + node <- d$node + value <- d$value + + result <- value$value + task_id <- value$tag + time <- value$time + + ## obtain the indices of `X` for a task + task_index <- tasks_index[[as.character(task_id)]] + rm(list = as.character(task_id), envir = tasks_index) + + ## report time to the balancer + task_iterator$record(node, task_id, time[["elapsed"]]) ## reduce - .reducer_add(reducer, njob, value) - .manager_log(BPPARAM, njob, d) - .manager_result_save(BPPARAM, njob, reducer$value()) + .reducer_add(reducer, task_index, result) + .manager_log(BPPARAM, task_id, d) + .manager_result_save(BPPARAM, task_id, reducer$value()) ## progress - progress$step(length(value)) + progress$step(length(result)) ## whether the result is ok, or to treat the failure as success - success[i] <- !bpstopOnError(BPPARAM) || d$value$success + success[i] <- !bpstopOnError(BPPARAM) || value$success } success } @@ -33,13 +44,14 @@ ## run on the master. Both enable logging, writing logs/results to ## files and 'stop on error'. .clear_cluster <- - function(manager, running, reducer, progress, BPPARAM) + function(manager, running, task_iterator, reducer, tasks_index, progress, BPPARAM) { tryCatch({ setTimeLimit(30, 30, TRUE) on.exit(setTimeLimit(Inf, Inf, FALSE)) while (running) { - success <- .collect_result(manager, reducer, progress, BPPARAM) + success <- .collect_result(manager, task_iterator, reducer, tasks_index, + progress, BPPARAM) running <- running - length(success) } }, error=function(e) { @@ -49,12 +61,12 @@ } .manager_log <- - function(BPPARAM, njob, d) + function(BPPARAM, task_id, d) { if (bplog(BPPARAM)) { con <- NULL if (!is.na(bplogdir(BPPARAM))) { - fname <- paste0(bpjobname(BPPARAM), ".task", njob, ".log") + fname <- paste0(bpjobname(BPPARAM), ".task", task_id, ".log") lfile <- file.path(bplogdir(BPPARAM), fname) con <- file(lfile, open="a") on.exit(close(con)) @@ -66,86 +78,96 @@ } .manager_result_save <- - function(BPPARAM, njob, value) + function(BPPARAM, task_id, value) { if (is.na(bpresultdir(BPPARAM))) return(NULL) - fname <- paste0(bpjobname(BPPARAM), ".task", njob, ".Rda") + fname <- paste0(bpjobname(BPPARAM), ".task", task_id, ".Rda") rfile <- file.path(bpresultdir(BPPARAM), fname) save(value, file=rfile) } - -## A dummy iterator for bploop.lapply -.bploop_lapply_iter <- - function(X, redo_index, elements_per_task) +## A task generator for bploop.lapply +## nextTask: returns a list containing `index` and `seed`, +## if no next task, return seed only +## record: record the execution time of a task +.bploop_lapply_task_iterator <- + function(X, balancer, init_seed, redo_index, ntotal) { - redo_n <- length(redo_index) - redo_i <- 1L - x_n <- length(X) - x_i <- 1L - function() { - if (redo_i <= redo_n && x_i <= x_n) { - redo <- redo_index[redo_i] == x_i - if (redo) { - ## Maximize `len` such that - ## - 1. all elements in X[x_i:(x_i + len)] should be redone - ## - 2. the number of elements in the task must be - ## limited by `elements_per_task` - len <- 1L - while (redo_i + len <= redo_n && - redo_index[redo_i + len] == x_i + len && - len < elements_per_task) { - len <- len + 1L - } - redo_i <<- redo_i + len - value <- X[seq.int(x_i, length.out = len)] + seed_generator <- .seed_generator(init_seed) + n <- length(redo_index) + list( + next_task = function() { + if (n > 0) { + task <- balancer$next_task() + n <<- n - length(task$index) + task$index <- redo_index[task$index] + task$seed <- seed_generator(task$index) + if (!is.null(task$index)) + task$value <- X[task$index] + task } else { - len <- redo_index[redo_i] - x_i - value <- .bploop_rng_iter(len) + list(seed = seed_generator(ntotal + 1L)) } - x_i <<- x_i + len - ## Do not return the last seed iterator - ## if no more redo element - if (x_i > x_n && !redo) { - list(NULL) - } else { - value - } - } else { - list(NULL) - } - } + }, + record = function(node, task_id, time) + balancer$record(node, task_id, time), + seed = function() seed_generator(ntotal + 1L) + ) } -## An iterator for bpiterate to handle BPREDO -.bploop_iterate_iter <- - function(ITER, reducer) +## An iterator for bpiterate to ignore the finished task in BPREDO +.bploop_iterate_with_redo <- + function(ITER, redo_index) { - errors <- sort(.redo_index_iterate(reducer)) - len <- reducer$total - if(is.null(len)) len <- 0L + redo_index_diff <- diff(c(0, redo_index)) i <- 0L function(){ - if (i < len) { + if (i < length(redo_index)) { i <<- i + 1L - value <- ITER() - if (i%in%errors) - list(value) - else - .bploop_rng_iter(1L) + for (j in seq_len(redo_index_diff[i])) + value <- ITER() + value } else { - list(ITER()) + ITER() } } } - -## This class object can force bploop.iterator to iterate -## the seed stream n times -.bploop_rng_iter <- function(n) { - structure(as.integer(n), class = c(".bploop_rng_iter")) +.bploop_iterate_task_iterator <- function(balancer, redo_ITER, + init_seed, redo_index){ + seed_generator <- .seed_generator(init_seed) + task_original_index <- out_of_range_vector(redo_index) + EOF <- FALSE + ## The maximum observed seed index + next_job_seed_index <- tail(redo_index, 1) + list( + next_task = function() { + if (!EOF) { + task <- balancer$next_task() + if (length(task$value) != 0) { + ## The index can be out of range + if (tail(task$index, 1L) <= length(redo_index)) + task$index <- redo_index[task$index] + else + task$index <- vapply(task$index ,task_original_index, numeric(1)) + task$seed <- seed_generator(task$index) + next_job_seed_index <<- + max(next_job_seed_index, tail(task$index, 1L) + 1L) + } + ## Reach the end of the iterator + if (is.null(task$value)) + EOF <<- TRUE + task + } else { + list(seed = seed_generator(next_job_seed_index)) + } + }, + record = function(node, task_id, time) + balancer$record(node, task_id, time), + seed = function() seed_generator(next_job_seed_index) + ) } ## Accessor for the elements in the BPREDO argument @@ -191,14 +213,21 @@ ## The core bploop implementation ## Arguments -## - ITER: Return a list where each list element will be passed to FUN -## 1. if nothing to proceed, it should return list(NULL) -## 2. if the task is to iterate the seed stream only, it should return -## an object from .bploop_rng_iter() +## - ITER: A list with two functions `nextTask` and `record` +## nextTask(seedOnly): Return a list with the following elements +## 1. task_id: the id of the current task +## 2. index: The original position of the value(must be increasing) +## 3. value: a list of elements that need to be evaluated by FUN +## 4. seed: The initial seed used by the evaluation +## If nothing to proceed, ITER should return a list +## with the element seed only +## record(task_id, time): report the task evaluation time +## ## - FUN: A function that will be evaluated in the worker ## - ARGS: the arguments to FUN .bploop_impl <- - function(ITER, FUN, ARGS, BPPARAM, BPREDO, BPOPTIONS, reducer, progress.length) + function(task_iterator, FUN, ARGS, BPPARAM, BPREDO, BPOPTIONS, + reducer, progress.length) { manager <- .manager(BPPARAM) on.exit(.manager_cleanup(manager), add = TRUE) @@ -213,15 +242,9 @@ force.GC = bpforceGC(BPPARAM) ) - ## prepare the seed stream for the worker - init_seed <- .redo_seed(BPREDO) - if (is.null(init_seed)) { - seed <- .RNGstream(BPPARAM) - on.exit(.RNGstream(BPPARAM) <- seed, add = TRUE) - init_seed <- seed - } else { - seed <- init_seed - } + ## set the last seed stream back to BPPARAM + if (identical(BPREDO, list())) + on.exit(.RNGstream(BPPARAM) <- task_iterator$seed(), add = TRUE) ## Progress bar progress <- .progress( @@ -242,43 +265,45 @@ names(globalVars) <- globalVarNames ## The data that will be sent to the worker - ARGFUN <- function(X, seed) + ARGFUN <- function(X, seed, index) list( X=X , FUN=FUN , ARGS = ARGS, OPTIONS = OPTIONS, BPRNGSEED = seed, + INDEX = index, GLOBALS = globalVars, PACKAGES = packages ) - static.args <- c("FUN", "ARGS", "OPTIONS", "GLOBALS") + static.args <- c("FUN", "ARGS", "OPTIONS", "GLOBALS", "PACKAGES") + + ## Used as a map to store the indices of the vector `X` or ITER + ## for a task + tasks_index <- new.env(parent = emptyenv()) total <- 0L running <- 0L - value <- NULL - ## keep the loop when there exists more ITER value or running tasks - while (!identical(value, list(NULL)) || running) { + task <- list(value = 0L) ## an arbitrary initial + ## keep the loop when there exists more tasks or running tasks + while (!is.null(task$value) || running) { ## send tasks to the workers while (running < .manager_capacity(manager)) { - value <- ITER() - ## If the value is of the class .bploop_rng_iter, we merely iterate - ## the seed stream `value` times and obtain the next value. - if (inherits(value, ".bploop_rng_iter")) { - seed <- .rng_iterate_substream(seed, value) - next - } - if (identical(value, list(NULL))) { + task <- task_iterator$next_task() + if (is.null(task$value)) { if (total == 0L) warning("first invocation of 'ITER()' returned NULL") break } - args <- ARGFUN(value, seed) - task <- .EXEC( - total + 1L, .workerLapply, + ## record the task index + tasks_index[[as.character(task$task_id)]] <- task$index + + args <- ARGFUN(task$value, task$seed, task$index) + exec <- .EXEC( + tag = task$task_id, + fun = .workerLapply, args = args, static.fun = TRUE, static.args = static.args ) - .manager_send(manager, task) - seed <- .rng_iterate_substream(seed, length(value)) + .manager_send(manager, exec) total <- total + 1L running <- running + 1L } @@ -289,13 +314,24 @@ next ## collect results from the workers - success <- .collect_result(manager, reducer, progress, BPPARAM) + success <- .collect_result(manager = manager, + task_iterator = task_iterator, + reducer = reducer, + tasks_index = tasks_index, + progress = progress, + BPPARAM = BPPARAM) running <- running - length(success) ## stop on error; Let running jobs finish and break if (!all(success)) { reducer <- .clear_cluster( - manager, running, reducer, progress, BPPARAM + manager = manager, + running = running, + task_iterator = task_iterator, + reducer = reducer, + tasks_index = tasks_index, + progress = progress, + BPPARAM = BPPARAM ) break } @@ -310,12 +346,13 @@ if(!.reducer_ok(reducer) || !.reducer_complete(reducer)) { .redo_env(res) <- new.env(parent = emptyenv()) .redo_reducer(res) <- reducer - .redo_seed(res) <- init_seed + .redo_seed(res) <- .RNGstream(BPPARAM) } res } + ## ## bploop.lapply(): derived from snow::dynamicClusterApply. ## @@ -333,17 +370,31 @@ bploop.lapply <- { ## which need to be redone? redo_index <- .redo_index(X, BPREDO) - - ## How many elements in a task? - ntask <- .ntask(X, bpnworkers(BPPARAM), bptasks(BPPARAM)) - elements_per_task <- ceiling(length(redo_index)/ntask) - ITER <- .bploop_lapply_iter(X, redo_index, elements_per_task) - + if (!identical(BPREDO, list())) { + init_seed <- .redo_seed(BPREDO) + } else { + init_seed <- .RNGstream(BPPARAM) + } ntotal <- length(X) - reducer <- .lapplyReducer(ntotal, reducer = .redo_reducer(BPREDO)) + nredo <- length(redo_index) + + balancer_generator <- .find_balancer("lapply", BPOPTIONS$lapplyBalancer) + ## The balancer only need to give the redo index, so we pass + ## nredo instead of ntotal + balancer <- balancer_generator(nredo, BPPARAM) + + ## The iterator for .bploop_core + task_iterator <- .bploop_lapply_task_iterator( + X = X, + balancer = balancer, + init_seed = init_seed, + redo_index = redo_index, + ntotal = ntotal + ) + reducer <- .lapply_reducer(ntotal, reducer = .redo_reducer(BPREDO)) res <- .bploop_impl( - ITER = ITER, + task_iterator = task_iterator, FUN = FUN, ARGS = ARGS, BPPARAM = BPPARAM, @@ -373,11 +424,35 @@ bploop.iterate <- init, reduce.in.order, ... ) { - ITER_ <- .bploop_iterate_iter(ITER, reducer = .redo_reducer(BPREDO)) - reducer <- .iterateReducer(REDUCE, init, reduce.in.order, - reducer = .redo_reducer(BPREDO)) + redo_index <- .redo_index(ITER, BPREDO) + if (!identical(BPREDO, list())) { + init_seed <- .redo_seed(BPREDO) + } else { + init_seed <- .RNGstream(BPPARAM) + } + + ## Ignore the task which do not need to be redone + redo_ITER <- .bploop_iterate_with_redo(ITER, redo_index) + + ## Create balancer + balancer_generator <- .find_balancer("iterate", BPOPTIONS$iterateBalancer) + balancer <- balancer_generator(redo_ITER, BPPARAM) + + ## The iterator for .bploop_core + task_iterator <- .bploop_iterate_task_iterator( + balancer = balancer, + redo_ITER = redo_ITER, + init_seed = init_seed, + redo_index = redo_index + ) + reducer <- .iterate_reducer( + REDUCE = REDUCE, + init = init, + reduce.in.order = reduce.in.order, + reducer = .redo_reducer(BPREDO)) + .bploop_impl( - ITER = ITER_, + task_iterator = task_iterator, FUN = FUN, ARGS = ARGS, BPPARAM = BPPARAM, @@ -393,7 +468,7 @@ bploop.iterate_batchtools <- ## get number of workers workers <- bpnworkers(BPPARAM) ## reduce in order - reducer <- .iterateReducer(REDUCE, init, reduce.in.order, + reducer <- .iterate_reducer(REDUCE, init, reduce.in.order, NULL) ## progress bar. diff --git a/R/bpoptions.R b/R/bpoptions.R index 5434d362..547ff3be 100644 --- a/R/bpoptions.R +++ b/R/bpoptions.R @@ -111,6 +111,8 @@ bpoptions <- RNGseed, force.GC, fallback, exports, packages, + lapplyBalancer, + iterateBalancer, ...) { dotsArgs <- list(...) diff --git a/R/reducer.R b/R/reducer.R index 5a893148..d2a479e6 100644 --- a/R/reducer.R +++ b/R/reducer.R @@ -4,27 +4,24 @@ result = "ANY", total = "numeric", reduced.num = "numeric", - reduced.index = "numeric", - value.cache = "environment", - redo.index = "numeric" + exists.error = "logical" ) ) .LapplyReducer <- setRefClass( "LapplyReducer", - fields = list( - exists.error = "logical" - ), + fields = list(), contains = "Reducer" ) .IterateReducer <- setRefClass( "IterateReducer", fields = list( - REDUCE = "ANY", + last.reduced.index = "numeric", + value.cache = "environment", errors = "environment", + REDUCE = "ANY", reduce.in.order = "logical", - appending.offset = "numeric", init.missing = "logical", REDUCE.missing = "logical" ), @@ -68,30 +65,25 @@ setMethod(".reducer_complete", signature = "Reducer", setMethod(".reducer_ok", signature = "Reducer", function(reducer) { - length(reducer$errors) == 0L + !reducer$exists.error }) ######################### ## LapplyReducer ######################### -.lapplyReducer <- +.lapply_reducer <- function(ntotal, reducer = NULL) { if (is.null(reducer)) { result <- rep(list(.error_unevaluated()), ntotal) - redo.index <- seq_len(ntotal) } else { result <- reducer$result - redo.index <- which(!bpok(result)) - ntotal <- length(redo.index) + ntotal <- sum(!bpok(result)) } .LapplyReducer( result = result, total = ntotal, - reduced.index = 1L, reduced.num = 0L, - value.cache = new.env(parent = emptyenv()), - redo.index = redo.index, exists.error = FALSE ) } @@ -99,61 +91,34 @@ setMethod(".reducer_ok", signature = "Reducer", setMethod(".reducer_add", signature = "LapplyReducer", function(reducer, idx, values) { - reducer$value.cache[[as.character(idx)]] <- values - - while (.reducer_reduce(reducer, reducer$reduced.index)) {} + reducer$result[idx] <- values + reducer$reduced.num <- reducer$reduced.num + length(idx) if(!all(bpok(values))) reducer$exists.error <- TRUE - reducer }) -setMethod(".reducer_reduce", signature = "LapplyReducer", - function(reducer, idx) -{ - ## obtain the cached value - idx <- as.character(idx) - if (!exists(idx, envir = reducer$value.cache)) - return(FALSE) - values <- reducer$value.cache[[idx]] - rm(list = idx, envir = reducer$value.cache) - - ## Find the true index of the reduced value in the result - idx <- reducer$redo.index[reducer$reduced.num + 1L] - reducer$result[idx - 1L + seq_along(values)] <- values - - reducer$reduced.index <- reducer$reduced.index + 1L - reducer$reduced.num <- reducer$reduced.num + length(values) - TRUE -}) - setMethod(".reducer_value", signature = "LapplyReducer", function(reducer) { reducer$result }) -setMethod(".reducer_ok", signature = "LapplyReducer", - function(reducer) -{ - !reducer$exists.error -}) - ######################### -## IterateReducer +## iterate_reducer ######################### -.redo_index_iterate <- +.iterate_error_index <- function(reducer) { if (is.null(reducer)) return(integer()) - finished_idx <- as.integer(names(reducer$value.cache)) + finished_idx <- as.numeric(names(reducer$value.cache)) missing_idx <- setdiff(seq_len(reducer$total), finished_idx) - c(missing_idx, as.integer(names(reducer$errors))) + sort(c(missing_idx, as.numeric(names(reducer$errors)))) } -.iterateReducer <- +.iterate_reducer <- function(REDUCE, init, reduce.in.order=FALSE, reducer = NULL) { if (is.null(reducer)) { @@ -174,20 +139,17 @@ setMethod(".reducer_ok", signature = "LapplyReducer", result = result, total = 0L, reduced.num = 0L, - reduced.index = 1L, + exists.error = FALSE, + last.reduced.index = 0L, value.cache = new.env(parent = emptyenv()), - redo.index = integer(), - REDUCE = REDUCE, errors = new.env(parent = emptyenv()), + REDUCE = REDUCE, reduce.in.order = reduce.in.order, - appending.offset = 0L, init.missing = init.missing, REDUCE.missing = REDUCE.missing ) } else { reducer <- reducer$copy() - reducer$appending.offset <- reducer$total - reducer$redo.index <- .redo_index_iterate(reducer) reducer$value.cache <- as.environment( as.list(reducer$value.cache, all.names=TRUE) ) @@ -198,35 +160,27 @@ setMethod(".reducer_ok", signature = "LapplyReducer", } } -setMethod(".map_index", signature = "IterateReducer", - function(reducer, idx) -{ - redo.index <- reducer$redo.index - if (idx <= length(redo.index)) - idx <- redo.index[idx] - else - idx <- idx - length(redo.index) + reducer$appending.offset - idx -}) - setMethod(".reducer_add", signature = "IterateReducer", function(reducer, idx, values) { reduce.in.order <- reducer$reduce.in.order - idx <- as.character(.map_index(reducer, idx)) + idx <- as.character(idx) value <- values[[1]] if (.bpeltok(value)) { - if (exists(idx, envir = reducer$errors)) + if (exists(idx, envir = reducer$errors)){ rm(list = idx, envir = reducer$errors) + reducer$exists.error <- (length(reducer$errors) > 0L) + } } else { - reducer$errors[[idx]] <- idx + reducer$errors[[idx]] <- value + reducer$exists.error <- TRUE } reducer$value.cache[[idx]] <- value reducer$total <- max(reducer$total, as.numeric(idx)) if (reduce.in.order) - while (.reducer_reduce(reducer, reducer$reduced.index)) {} + while (.reducer_reduce(reducer, reducer$last.reduced.index + 1L)) {} else .reducer_reduce(reducer, idx) @@ -241,10 +195,10 @@ setMethod(".reducer_reduce", signature = "IterateReducer", return(FALSE) } - ## stop reducing when reduce.in.order == TRUE - ## and we have a pending error + ## Do not reduce when there is an error and reduce.in.order == TRUE if (!.reducer_ok(reducer) && reducer$reduce.in.order) return(FALSE) + ## The cached value is a list of length 1 value <- reducer$value.cache[[idx]] ## Do not reduce the erroneous result if (!.bpeltok(value)) @@ -255,12 +209,12 @@ setMethod(".reducer_reduce", signature = "IterateReducer", } else { reducer$result <- reducer$REDUCE(reducer$result, value) } - ## DO NOT REMOVE, only set to NULL to keep track - ## of the finished results + ## DO NOT REMOVE the cache, only set it to NULL + ## this is used to keep track of the finished results reducer$value.cache[[idx]] <- NULL } reducer$reduced.num <- reducer$reduced.num + 1L - reducer$reduced.index <- reducer$reduced.index + 1L + reducer$last.reduced.index <- as.numeric(idx) TRUE }) @@ -271,9 +225,7 @@ setMethod(".reducer_value", signature = "IterateReducer", if (!reducer$REDUCE.missing) { res <- reducer$result } else { - ## remove the index of the meta elements and errors - idx <- names(value.cache) - idx <- setdiff(idx, names(reducer$errors)) + idx <- setdiff(names(value.cache), names(reducer$errors)) res <- rep(list(NULL), reducer$total) for (i in idx) res[[as.integer(i)]] <- value.cache[[i]] @@ -282,14 +234,14 @@ setMethod(".reducer_value", signature = "IterateReducer", if (!.reducer_ok(reducer) || !.reducer_complete(reducer)) { ## cannot attach attribute to NULL if (is.null(res)) res <- list() - idx <- .redo_index_iterate(reducer) - errors <- rep(list(.error_unevaluated()), length(idx)) - names(errors) <- as.character(idx) - for (i in names(reducer$errors)) - errors[[i]] <- value.cache[[i]] - attr(res, "errors") <- errors + + error_index <- .iterate_error_index(reducer) + all_errors <- rep(list(.error_unevaluated()), length(error_index)) + names(all_errors) <- as.character(error_index) + + non_missing_errors <- as.list(reducer$errors) + all_errors[names(non_missing_errors)] <- non_missing_errors + attr(res, ".bperrors") <- all_errors } res }) - - diff --git a/R/rng.R b/R/rng.R index a4333805..bae2de7a 100644 --- a/R/rng.R +++ b/R/rng.R @@ -104,3 +104,51 @@ .rng_reset_generator(state$kind, state$seed) }) }) + +## A seed generator which accept a vector 'index' as input +## return the seed stream corresponding to index[1] +## For example, if index = {8, 9}, +## The generator should iterate the initial seed 7 times and +## return the 8th seed stream +.seed_generator <- function(init_seed){ + seed <- init_seed + seed_index <- 1L + + ## Cache a few seeds for random seed index + seed_space <- new.env(parent = emptyenv()) + cache_range <- 1000 + seed_space[["0"]] <- init_seed + + ## input: an increasing index of seed + ## if the index is NULL, return the last seed iterated by 1 + ## output: the seed corresponding to the first index + function(index){ + first_index <- index[[1]] + if (first_index < seed_index) { + cache_id <- (first_index - 1L) %/% cache_range + iter_start_index <- cache_id * cache_range + 1L + iter_seed <- seed_space[[as.character(cache_id)]] + } else { + iter_start_index <- seed_index + iter_seed <- seed + } + + ## iterate the seed + for (i in seq_len(first_index - iter_start_index)) { + iter_seed <- .rng_next_substream(iter_seed) + iter_index <- iter_start_index + i + + ## Cache the seed value if it hits the cache point + if ((iter_index - 1L)%%cache_range == 0) + seed_space[[as.character((iter_index - 1L)/cache_range)]] <<- iter_seed + } + + ## the variable seed always holds the latest seed + ## and the largest length + if (first_index >= seed_index) { + seed_index <<- first_index + seed <<- iter_seed + } + return(iter_seed) + } +} diff --git a/R/utilities.R b/R/utilities.R index acca0e97..ca8cb8a4 100644 --- a/R/utilities.R +++ b/R/utilities.R @@ -66,21 +66,21 @@ } .ntask <- - function(X, workers, tasks) + function(n, workers, tasks) { if (is.na(tasks)) { - length(X) + n } else if (tasks == 0L) { workers } else { - min(length(X), tasks) + min(n, tasks) } } .splitX <- function(X, workers, tasks) { - tasks <- .ntask(X, workers, tasks) + tasks <- .ntask(length(X), workers, tasks) idx <- .splitIndices(length(X), tasks) relist(X, idx) } @@ -88,15 +88,42 @@ .redo_index <- function(X, BPREDO) { - if (length(BPREDO)) { - if (length(BPREDO) != length(X)) - stop("'length(BPREDO)' must equal 'length(X)'") - idx <- which(!bpok(BPREDO)) - if (!length(idx)) - stop("no previous error in 'BPREDO'") - idx + if (!is.function(X)) { + ## bplapply + if (length(BPREDO)) { + if (length(BPREDO) != length(X)) + stop("'length(BPREDO)' must equal 'length(X)'") + idx <- which(!bpok(BPREDO)) + if (!length(idx)) + stop("no previous error in 'BPREDO'") + idx + } else { + seq_along(X) + } } else { - seq_along(X) + ## bpiterate + if (length(BPREDO)) { + ## The total number of the task in the last run + len <- .redo_reducer(BPREDO)$total + + ## move to the next element + ## we do not know if the last run exhausted the iterator + if(is.null(len)) + len <- 1L + else + len <- len + 1L + + ## Find which one needs to be redone + errors <- attr(BPREDO, ".bperrors", exact = TRUE) + idx <- as.numeric(names(errors)) + if (!length(idx)) + stop("no previous error in 'BPREDO'") + + idx <- c(idx, len) + idx + } else { + 1L + } } } @@ -203,3 +230,17 @@ timeout <- Inf timeout } + +## Input: an increasing integer sequence +## output: a function accepting the index as argument +## If the index is out of range, extend x using the last element of x +## plus 1,2,3... +out_of_range_vector <- function(x){ + force(x) + function(i) { + if (i < length(x)) + x[i] + else + i - length(x) + tail(x, 1) + } +} diff --git a/R/worker.R b/R/worker.R index 7fb2fd0a..8b43e7a2 100644 --- a/R/worker.R +++ b/R/worker.R @@ -154,7 +154,7 @@ } .composeTry <- - function(FUN, OPTIONS, SEED) + function(FUN, OPTIONS, SEED, INDEX = NULL) { FUN <- match.fun(FUN) ERROR_OCCURRED <- FALSE @@ -185,6 +185,9 @@ if (!is.null(SEED)) SEED <- .rng_reset_generator("L'Ecuyer-CMRG", SEED)$seed + SEED_INDEX <- 1L + SEED_GAPS <- diff(INDEX) + function(...) { if (!identical(timeout, WORKER_TIMEOUT)) { @@ -212,8 +215,12 @@ ## worker trying to fill said heap, causing R to exhaust memory). if (force.GC) gc(verbose=FALSE, full=FALSE) - - SEED <<- .rng_next_substream(SEED) + if (SEED_INDEX <= length(SEED_GAPS)) { + SEED <<- .rng_iterate_substream(SEED, SEED_GAPS[SEED_INDEX]) + SEED_INDEX <<- SEED_INDEX + 1L + } else { + SEED <<- .rng_iterate_substream(SEED, 1L) + } output } @@ -222,7 +229,7 @@ .workerLapply_impl <- function(X, FUN, ARGS, OPTIONS, BPRNGSEED, - GLOBALS = NULL, PACKAGES = NULL) + INDEX = NULL, GLOBALS = NULL, PACKAGES = NULL) { state <- .rng_get_generator() on.exit(.rng_reset_generator(state$kind, state$seed)) @@ -250,7 +257,7 @@ add = TRUE) } - composeFunc <- .composeTry(FUN, OPTIONS, BPRNGSEED) + composeFunc <- .composeTry(FUN, OPTIONS, BPRNGSEED, INDEX) args <- c(list(X = X, FUN = composeFunc), ARGS) do.call(lapply, args) } diff --git a/inst/unitTests/test_DoparParam.R b/inst/unitTests/test_DoparParam.R index 37052226..4040e3f4 100644 --- a/inst/unitTests/test_DoparParam.R +++ b/inst/unitTests/test_DoparParam.R @@ -66,15 +66,15 @@ test_DoparParam_stop_on_error <- function() { doParallel::registerDoParallel(cl) fun <- function(x) { - if (x == 2) stop() + if (x == 3) stop() x } - res1 <- bptry(bplapply(1:4, fun, BPPARAM = DoparParam(stop.on.error = F))) - checkEquals(res1[c(1,3,4)], as.list(c(1,3,4))) - checkTrue(is(res1[[2]], "error")) + res1 <- bptry(bplapply(1:6, fun, BPPARAM = DoparParam(stop.on.error = F))) + checkEquals(res1[c(1,2,4:6)], as.list(c(1,2,4:6))) + checkTrue(is(res1[[3]], "error")) res2 <- bptry(bplapply(1:6, fun, BPPARAM = DoparParam(stop.on.error = T))) - checkEquals(res2[c(1,4:6)], as.list(c(1,4:6))) - checkTrue(is(res2[[2]], "error")) - checkTrue(is(res2[[3]], "error")) + checkEquals(res2[c(1,2,4,6)], as.list(c(1,2,4,6))) + checkTrue(is(res2[[3]], "remote_error")) + checkTrue(is(res2[[5]], "unevaluated_error")) } diff --git a/inst/unitTests/test_balancer.R b/inst/unitTests/test_balancer.R new file mode 100644 index 00000000..2f4b926f --- /dev/null +++ b/inst/unitTests/test_balancer.R @@ -0,0 +1,173 @@ +.lazyCount <- function(count) { + i <- 0L + function() { + if (i >= count) + return(NULL) + i <<- i + 1L + i + } +} + +test_balancer_lapply <- + function() +{ + params <- list( + SerialParam(), + SnowParam(2) + ) + for (p in params) { + bptasks(p) <- 10 + opts <- bpoptions(lapplyBalancer = "sequential") + res1 <- bplapply(1:10, function(x) x, BPPARAM = p, + BPOPTIONS = opts) + + opts <- bpoptions(lapplyBalancer = "random") + res2 <- bplapply(1:10, function(x) x, BPPARAM = p, + BPOPTIONS = opts) + checkIdentical(res1, res2) + + opts <- bpoptions(lapplyBalancer = "stepwise") + res3 <- bplapply(1:10, function(x) x, BPPARAM = p, + BPOPTIONS = opts) + checkIdentical(res1, res3) + } +} + +test_balancer_lapply_rng <- + function() +{ + params <- list( + SerialParam(), + SnowParam(2) + ) + ## initialize .Random.seed + set.seed(123) + for (p in params) { + bptasks(p) <- 10 + + bpRNGseed(p) <- 123 + seed1 <- .Random.seed + opts <- bpoptions(lapplyBalancer = "sequential") + res1 <- bplapply(1:10, function(x)runif(1), BPPARAM = p, + BPOPTIONS = opts) + checkIdentical(seed1, .Random.seed) + + bpRNGseed(p) <- 123 + seed2 <- .Random.seed + opts <- bpoptions(lapplyBalancer = "random") + res2 <- bplapply(1:10, function(x)runif(1), BPPARAM = p, + BPOPTIONS = opts) + checkIdentical(res1, res2) + checkIdentical(seed2, .Random.seed) + + bpRNGseed(p) <- 123 + seed3 <- .Random.seed + opts <- bpoptions(lapplyBalancer = "stepwise") + res3 <- bplapply(1:10, function(x) runif(1), BPPARAM = p, + BPOPTIONS = opts) + checkIdentical(res1, res3) + checkIdentical(seed3, .Random.seed) + } +} + +test_balancer_lapply_rng_redo <- + function() +{ + foo1 <- function(x) runif(1) + errorIdx <- c(2,3,6) + foo2 <- function(x){ + stopifnot(!x%in%errorIdx) + runif(1) + } + answer <- bplapply(1:10, foo1, BPPARAM = SerialParam(RNGseed = 123)) + params <- list( + SerialParam(), + SnowParam(2) + ) + balancerTypes <- c("sequential", "random", "stepwise") + taskNums <- c(1, 2, 5, 10) + for (p in params) { + for(balancerType in balancerTypes){ + for(taskNum in taskNums){ + opts <- bpoptions(lapplyBalancer = balancerType, + tasks = taskNum, + RNGseed = 123, + stop.on.error = FALSE) + res1 <- bptry(bplapply(1:10, foo2, BPPARAM = p, + BPOPTIONS = opts)) + checkIdentical(answer[-errorIdx], res1[-errorIdx]) + checkIdentical(bpok(res1)[errorIdx], + rep(FALSE, length(errorIdx))) + + res2 <- bplapply(1:10, foo1, BPPARAM = p, + BPREDO = res1, + BPOPTIONS = opts) + checkIdentical(answer, res2) + } + } + } +} + +test_balancer_iterate <- + function() +{ + params <- list( + SerialParam(), + SnowParam(2) + ) + for (p in params) { + bptasks(p) <- 10 + res1 <- bpiterate(.lazyCount(10), function(x) x, BPPARAM = p) + + opts <- bpoptions(iterateBalancer = "sequential") + res2 <- bplapply(1:10, function(x) x, BPPARAM = p) + checkIdentical(res1, res2) + } +} + +test_balancer_lapply_rng <- + function() +{ + params <- list( + SerialParam(), + SnowParam(2) + ) + for (p in params) { + bptasks(p) <- 10 + bpRNGseed(p) <- 123 + res1 <- bpiterate(.lazyCount(10), function(x)runif(1), BPPARAM = p) + + bpRNGseed(p) <- 123 + res2 <- bplapply(1:10, function(x)runif(1), BPPARAM = p) + checkIdentical(res1, res2) + } +} + +test_balancer_iterate_rng_redo <- + function() +{ + foo1 <- function(x) runif(1) + errorIdx <- c(2,3,6) + foo2 <- function(x){ + stopifnot(!x%in%errorIdx) + runif(1) + } + answer <- bplapply(1:10, foo1, BPPARAM = SerialParam(RNGseed = 123)) + params <- list( + SerialParam(), + SnowParam(2) + ) + for (p in params) { + opts <- bpoptions(RNGseed = 123, + stop.on.error = FALSE) + res1 <- bptry(bpiterate(.lazyCount(10), foo2, BPPARAM = p, + BPOPTIONS = opts)) + checkIdentical(answer[-errorIdx], res1[-errorIdx]) + checkTrue(sum(sapply(res1[errorIdx], is.null)) == 3) + + res2 <- bpiterate(.lazyCount(10), foo1, BPPARAM = p, + BPREDO = res1, + BPOPTIONS = opts) + checkIdentical(answer, res2) + } +} diff --git a/inst/unitTests/test_bploop.R b/inst/unitTests/test_bploop.R index 1e93348c..8705d650 100644 --- a/inst/unitTests/test_bploop.R +++ b/inst/unitTests/test_bploop.R @@ -1,7 +1,7 @@ message("Testing bploop") -.lapplyReducer <- BiocParallel:::.lapplyReducer -.iterateReducer <- BiocParallel:::.iterateReducer +.lapply_reducer <- BiocParallel:::.lapply_reducer +.iterate_reducer <- BiocParallel:::.iterate_reducer .reducer_value <- BiocParallel:::.reducer_value .reducer_add <- BiocParallel:::.reducer_add @@ -13,21 +13,22 @@ notAvailable <- BiocParallel:::.error_not_available("HI") ## Normal reduce process test_reducer_lapply_1 <- function() { - r <- .lapplyReducer(10, NULL) + r <- .lapply_reducer(10, NULL) result <- rep(list(unevaluated), 10) checkIdentical(result, .reducer_value(r)) - checkIdentical(result, { .reducer_add(r, 2, list(3,4)); .reducer_value(r) }) - result[1:4] <- list(1,2,3,4) - checkIdentical(result, { .reducer_add(r, 1, list(1,2)); .reducer_value(r) }) + result[3:4] <- list(3,4) + checkIdentical(result, { .reducer_add(r, 3:4, list(3,4)); .reducer_value(r) }) + result[1:2] <- list(1,2) + checkIdentical(result, { .reducer_add(r, 1:2, list(1,2)); .reducer_value(r) }) result[5:6] <- list(5,6) - checkIdentical(result, { .reducer_add(r, 3, list(5,6)); .reducer_value(r) }) + checkIdentical(result, { .reducer_add(r, 5:6, list(5,6)); .reducer_value(r) }) checkTrue(.reducer_ok(r)) checkTrue(!.reducer_complete(r)) result[7:10] <- list(7,8,9,10) - checkIdentical(result, { .reducer_add(r, 4, list(7,8,9,10)); .reducer_value(r) }) + checkIdentical(result, { .reducer_add(r, 7:10, list(7,8,9,10)); .reducer_value(r) }) checkTrue(.reducer_ok(r)) checkTrue(.reducer_complete(r)) @@ -35,11 +36,11 @@ test_reducer_lapply_1 <- function() { ## REDO test_reducer_lapply_2 <- function() { - r <- .lapplyReducer(10, NULL) + r <- .lapply_reducer(10, NULL) result <- rep(list(unevaluated), 10) checkIdentical(result, .reducer_value(r)) result[1:4] <- list(1,2,3,4) - checkIdentical(result, { .reducer_add(r, 1, list(1,2,3,4)); .reducer_value(r) }) + checkIdentical(result, { .reducer_add(r, 1:4, list(1,2,3,4)); .reducer_value(r) }) checkTrue(.reducer_ok(r)) checkTrue(!.reducer_complete(r)) @@ -47,38 +48,39 @@ test_reducer_lapply_2 <- function() { values <- list(notAvailable,notAvailable,notAvailable,8, notAvailable,notAvailable) result[5:10] <- values - checkIdentical(result, { .reducer_add(r, 2, values); .reducer_value(r) }) + checkIdentical(result, { .reducer_add(r, 5:10, values); .reducer_value(r) }) checkTrue(!.reducer_ok(r)) checkTrue(.reducer_complete(r)) ## REDO - r2 <- .lapplyReducer(10, r) - checkIdentical(c(5:7,9:10), r2$redo.index) + r2 <- .lapply_reducer(10, r) + ## 5 errors/unfinished values + checkIdentical(5L, r2$total) checkTrue(.reducer_ok(r2)) checkTrue(!.reducer_complete(r2)) - checkIdentical(result, { .reducer_add(r2, 2, list(7)); .reducer_value(r2) }) - checkIdentical(result, { .reducer_add(r2, 3, list(9,10)); .reducer_value(r2) }) + result[9:10] <- list(9,10) + checkIdentical(result, { .reducer_add(r2, 9:10, list(9,10)); .reducer_value(r2) }) - result[c(5:7,9:10)] <- list(5,6,7,9,10) - checkIdentical(result, { .reducer_add(r2, 1, list(5,6)); .reducer_value(r2) }) + result[5:7] <- list(5,6,7) + checkIdentical(result, { .reducer_add(r2, 5:7, list(5,6,7)); .reducer_value(r2) }) checkTrue(.reducer_ok(r2)) checkTrue(.reducer_complete(r2)) ## REDO with new error - r3 <- .lapplyReducer(10, r) + r3 <- .lapply_reducer(10, r) result[5:7] <- list(5,6,notAvailable) - .reducer_add(r3, 1, list(5,6,notAvailable)) - .reducer_add(r3, 2, list(9,10)) + .reducer_add(r3, 5:7, list(5,6,notAvailable)) + .reducer_add(r3, 9:10, list(9,10)) checkIdentical(result, .reducer_value(r3)) } ## default reducer and reduce in order test_reducer_iterate_1 <- function() { - r <- .iterateReducer(reduce.in.order=TRUE, + r <- .iterate_reducer(reduce.in.order=TRUE, reducer = NULL) checkTrue(.reducer_ok(r)) @@ -88,7 +90,7 @@ test_reducer_iterate_1 <- function() { checkIdentical(list(), .reducer_value(r)) .reducer_add(r, 2, list(2)) - expect <- structure(list(NULL,2), errors = list('1'=unevaluated)) + expect <- structure(list(NULL,2), .bperrors = list('1'=unevaluated)) checkIdentical(expect, .reducer_value(r)) checkTrue(.reducer_ok(r)) @@ -106,7 +108,7 @@ test_reducer_iterate_1 <- function() { .reducer_add(r, 5, list(notAvailable)) expect <- structure( list(1,2,3,NULL,NULL), - errors=list('4'=unevaluated,'5'=notAvailable) + .bperrors=list('4'=unevaluated,'5'=notAvailable) ) checkIdentical(expect, .reducer_value(r)) @@ -114,40 +116,40 @@ test_reducer_iterate_1 <- function() { checkTrue(!.reducer_complete(r)) ## BPREDO - r2 <- .iterateReducer(reducer = r) - checkIdentical(4:5, r2$redo.index) + r2 <- .iterate_reducer(reducer = r) + checkIdentical("5", names(r2$errors)) checkTrue(!.reducer_ok(r2)) checkTrue(!.reducer_complete(r2)) - .reducer_add(r2, 2, list(5)) + .reducer_add(r2, 5, list(5)) expect <- structure( list(1,2,3,NULL,5), - errors=list('4'=unevaluated) + .bperrors=list('4'=unevaluated) ) checkIdentical(expect, .reducer_value(r2)) checkTrue(.reducer_ok(r2)) checkTrue(!.reducer_complete(r2)) - .reducer_add(r2, 1, list(4)) + .reducer_add(r2, 4, list(4)) expect <- list(1,2,3,4,5) checkIdentical(expect, .reducer_value(r2)) checkTrue(.reducer_ok(r2)) checkTrue(.reducer_complete(r2)) - .reducer_add(r2, 3, list(6)) + .reducer_add(r2, 6, list(6)) expect <- list(1,2,3,4,5,6) checkIdentical(expect, .reducer_value(r2)) checkTrue(.reducer_ok(r2)) checkTrue(.reducer_complete(r2)) - .reducer_add(r2, 4, list(notAvailable)) + .reducer_add(r2, 7, list(notAvailable)) expect <- structure( list(1,2,3,4,5,6,NULL), - errors=list('7'=notAvailable) + .bperrors=list('7'=notAvailable) ) checkIdentical(expect, .reducer_value(r2)) @@ -156,10 +158,10 @@ test_reducer_iterate_1 <- function() { ## BPREDO 2 - r3 <- .iterateReducer(reducer = r2) - checkIdentical(7L, r3$redo.index) + r3 <- .iterate_reducer(reducer = r2) + checkIdentical("7", names(r3$errors)) - .reducer_add(r3, 1, list(7)) + .reducer_add(r3, 7, list(7)) expect <- list(1,2,3,4,5,6,7) checkIdentical(expect, .reducer_value(r3)) @@ -169,7 +171,7 @@ test_reducer_iterate_1 <- function() { ## customized reducer and reduce in order test_reducer_iterate_2 <- function() { - r <- .iterateReducer(`+`, init=0, reduce.in.order=TRUE, + r <- .iterate_reducer(`+`, init=0, reduce.in.order=TRUE, reducer = NULL) checkIdentical(0, .reducer_value(r)) @@ -178,7 +180,7 @@ test_reducer_iterate_2 <- function() { checkIdentical(expect, .reducer_value(r)) .reducer_add(r, 3, list(3)) - expect <- structure(1, errors = list('2' = unevaluated)) + expect <- structure(1, .bperrors = list('2' = unevaluated)) checkIdentical(expect, .reducer_value(r)) checkTrue(.reducer_ok(r)) @@ -189,43 +191,43 @@ test_reducer_iterate_2 <- function() { checkIdentical(expect, .reducer_value(r)) .reducer_add(r, 5, list(notAvailable)) - expect <- structure(6, errors = list('4' = unevaluated, '5' = notAvailable)) + expect <- structure(6, .bperrors = list('4' = unevaluated, '5' = notAvailable)) checkIdentical(expect, .reducer_value(r)) checkTrue(!.reducer_ok(r)) checkTrue(!.reducer_complete(r)) ## BPREDO round1 - r2 <- .iterateReducer(reducer = r) - checkIdentical(4:5, r2$redo.index) + r2 <- .iterate_reducer(reducer = r) + checkIdentical("5", names(r2$errors)) - .reducer_add(r2, 2, list(5)) - expect <- structure(6, errors = list('4' = unevaluated)) + .reducer_add(r2, 5, list(5)) + expect <- structure(6, .bperrors = list('4' = unevaluated)) checkIdentical(expect, .reducer_value(r2)) - .reducer_add(r2, 1, list(4)) + .reducer_add(r2, 4, list(4)) expect <- 15 checkIdentical(expect, .reducer_value(r2)) checkTrue(.reducer_ok(r2)) checkTrue(.reducer_complete(r2)) - .reducer_add(r2, 3, list(notAvailable)) - expect <- structure(15, errors = list('6' = notAvailable)) + .reducer_add(r2, 6, list(notAvailable)) + expect <- structure(15, .bperrors = list('6' = notAvailable)) checkIdentical(expect, .reducer_value(r2)) checkTrue(!.reducer_ok(r2)) checkTrue(!.reducer_complete(r2)) ## BPREDO round2 - r3 <- .iterateReducer(reducer = r2) - checkIdentical(6L, r3$redo.index) + r3 <- .iterate_reducer(reducer = r2) + checkIdentical("6", names(r3$errors)) - .reducer_add(r3, 1, list(6)) + .reducer_add(r3, 6, list(6)) expect <- 21 checkIdentical(expect, .reducer_value(r3)) - .reducer_add(r3, 2, list(7)) + .reducer_add(r3, 7, list(7)) expect <- 28 checkIdentical(expect, .reducer_value(r3)) @@ -236,7 +238,7 @@ test_reducer_iterate_2 <- function() { ## customized reducer and reduce not in order test_reducer_iterate_3 <- function() { - r <- .iterateReducer(`+`, init=0, reduce.in.order=FALSE, + r <- .iterate_reducer(`+`, init=0, reduce.in.order=FALSE, reducer = NULL) checkIdentical(0, .reducer_value(r)) @@ -245,7 +247,7 @@ test_reducer_iterate_3 <- function() { checkIdentical(expect, .reducer_value(r)) .reducer_add(r, 3, list(3)) - expect <- structure(4, errors = list('2' = unevaluated)) + expect <- structure(4, .bperrors = list('2' = unevaluated)) checkIdentical(expect, .reducer_value(r)) checkTrue(.reducer_ok(r)) @@ -256,43 +258,43 @@ test_reducer_iterate_3 <- function() { checkIdentical(expect, .reducer_value(r)) .reducer_add(r, 5, list(notAvailable)) - expect <- structure(6, errors = list('4' = unevaluated, '5' = notAvailable)) + expect <- structure(6, .bperrors = list('4' = unevaluated, '5' = notAvailable)) checkIdentical(expect, .reducer_value(r)) checkTrue(!.reducer_ok(r)) checkTrue(!.reducer_complete(r)) ## BPREDO round1 - r2 <- .iterateReducer(reducer = r) - checkIdentical(4:5, r2$redo.index) + r2 <- .iterate_reducer(reducer = r) + checkIdentical("5", names(r2$errors)) - .reducer_add(r2, 2, list(5)) - expect <- structure(11, errors = list('4' = unevaluated)) + .reducer_add(r2, 5, list(5)) + expect <- structure(11, .bperrors = list('4' = unevaluated)) checkIdentical(expect, .reducer_value(r2)) - .reducer_add(r2, 1, list(4)) + .reducer_add(r2, 4, list(4)) expect <- 15 checkIdentical(expect, .reducer_value(r2)) checkTrue(.reducer_ok(r2)) checkTrue(.reducer_complete(r2)) - .reducer_add(r2, 3, list(notAvailable)) - expect <- structure(15, errors = list('6' = notAvailable)) + .reducer_add(r2, 6, list(notAvailable)) + expect <- structure(15, .bperrors = list('6' = notAvailable)) checkIdentical(expect, .reducer_value(r2)) checkTrue(!.reducer_ok(r2)) checkTrue(!.reducer_complete(r2)) ## BPREDO round2 - r3 <- .iterateReducer(reducer = r2) - checkIdentical(6L, r3$redo.index) + r3 <- .iterate_reducer(reducer = r2) + checkIdentical("6", names(r3$errors)) - .reducer_add(r3, 1, list(6)) + .reducer_add(r3, 6, list(6)) expect <- 21 checkIdentical(expect, .reducer_value(r3)) - .reducer_add(r3, 2, list(7)) + .reducer_add(r3, 7, list(7)) expect <- 28 checkIdentical(expect, .reducer_value(r3)) @@ -304,7 +306,7 @@ test_reducer_iterate_3 <- function() { ## Test for a marginal case where the result is NULL ## and contains error test_reducer_iterate_4 <- function() { - r <- .iterateReducer(function(x,y)NULL, init=NULL, + r <- .iterate_reducer(function(x,y)NULL, init=NULL, reduce.in.order=FALSE, reducer = NULL) @@ -315,37 +317,7 @@ test_reducer_iterate_4 <- function() { checkIdentical(expect, .reducer_value(r)) .reducer_add(r, 2, list(notAvailable)) - expect <- structure(list(),errors=list('2'=notAvailable)) + expect <- structure(list(),.bperrors=list('2'=notAvailable)) checkIdentical(expect, .reducer_value(r)) } -test_iterator_lapply <- function() { - .bploop_lapply_iter <- BiocParallel:::.bploop_lapply_iter - .bploop_rng_iter <- BiocParallel:::.bploop_rng_iter - - X <- 1:10 - redo_index <- c(1:2,6:8) - iter <- .bploop_lapply_iter(X, redo_index, 10) - checkIdentical(iter(), 1:2) - checkIdentical(iter(), .bploop_rng_iter(3L)) - checkIdentical(iter(), 6:8) - checkIdentical(iter(), list(NULL)) - checkIdentical(iter(), list(NULL)) - - iter <- .bploop_lapply_iter(X, redo_index, 2) - checkIdentical(iter(), 1:2) - checkIdentical(iter(), .bploop_rng_iter(3L)) - checkIdentical(iter(), 6:7) - checkIdentical(iter(), 8L) - checkIdentical(iter(), list(NULL)) - checkIdentical(iter(), list(NULL)) - - redo_index <- 6:8 - iter <- .bploop_lapply_iter(X, redo_index, 1) - checkIdentical(iter(), .bploop_rng_iter(5L)) - checkIdentical(iter(), 6L) - checkIdentical(iter(), 7L) - checkIdentical(iter(), 8L) - checkIdentical(iter(), list(NULL)) - checkIdentical(iter(), list(NULL)) -} diff --git a/inst/unitTests/test_rng.R b/inst/unitTests/test_rng.R index ed7c20ba..88f56af8 100644 --- a/inst/unitTests/test_rng.R +++ b/inst/unitTests/test_rng.R @@ -327,7 +327,7 @@ test_rng_lapply_bpredo <- function() result <- bptry(bpiterate(iter_factory(11), FUN0, BPPARAM = param)) checkIdentical(unlist(result[-7]), target[-7]) checkTrue(is.null(result[[7]])) - checkTrue(inherits(attr(result,"errors")[[1]], "remote_error")) + checkTrue(inherits(attr(result,".bperrors")[["7"]], "remote_error")) FUN1 <- function(i) { if (identical(i, 7L)) { @@ -446,3 +446,56 @@ test_rng_reset_seed <- function() BPOPTIONS = opts) checkIdentical(res1, res2) } + +test_seed_generator <- + function() +{ + n <- 10000 + init_seed <- BiocParallel:::.rng_init_stream(NULL) + + ## Generate all seeds + seed <- init_seed + seeds <- list(seed) + for(i in seq_len(n-1)){ + seed <- BiocParallel:::.rng_next_substream(seed) + seeds <- c(seeds, list(seed)) + } + + generator <- BiocParallel:::.seed_generator(init_seed) + + ## sequentially ask for the seed + for(index in 1:(n - 1)){ + current <- generator(index) + checkIdentical(seeds[[index]], current) + } + + ## check the cache value + env <- environment(generator) + checkTrue(length(env$seed_space) == 10L) + + ## Randomly ask for the seed + redo_index <- sort(sample(1:n, 1000)) + for(index in redo_index){ + current <- generator(index) + checkIdentical(seeds[[index]], current) + } + + generator <- BiocParallel:::.seed_generator(init_seed) + ## Randomly ask for the seed + redo_index <- sort(sample(1:(n-1), 1000)) + for(index in redo_index){ + current <- generator(index) + checkIdentical(seeds[[index]], current) + } + + ## check the cache value + env <- environment(generator) + checkTrue(length(env$seed_space) > 0L) + + ## Randomly ask for the seed again + redo_index <- sort(sample(1:(n-1), 1000)) + for(index in redo_index){ + current <- generator(index) + checkIdentical(seeds[[index]], current) + } +} diff --git a/man/bpoptions.Rd b/man/bpoptions.Rd index a51128dd..f3bf5d36 100644 --- a/man/bpoptions.Rd +++ b/man/bpoptions.Rd @@ -16,7 +16,8 @@ bpoptions( workers, tasks, jobname, log, logdir, threshold, resultdir, stop.on.error, timeout, exportglobals, exportvariables, progressbar, - RNGseed, force.GC, fallback, exports, packages, ... + RNGseed, force.GC, fallback, exports, packages, lapplyBalancer, + iterateBalancer, ... ) } @@ -75,6 +76,10 @@ bpoptions( the worker prior to the evaluation of the task. This option works independently of the option \code{exportvariables}.} + \item{lapplyBalancer}{character(1) or function, the load balancer for \code{bplapply}. The build-in balancers are \code{"sequential"}, \code{"stepwise"}(default), and \code{"random"}. they can be selected by their character name. A custom balancer is also possible, see the vignette for details.} + + \item{iterateBalancer}{character(1) or function, the load balancer for \code{bpiterate}. The build-in balancer is \code{"sequential"}(default). it can be selected by its character name. A custom balancer is also possible, see the vignette for details.} + \item{...}{ Additional arguments which may(or may not) work for some specific type of \code{BPPARAM}.