Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ Collate: AllGenerics.R DeveloperInterface.R prototype.R
SerialParam-class.R DoparParam-class.R SnowParam-utils.R
BatchJobsParam-class.R BatchtoolsParam-class.R
progress.R ipcmutex.R utilities.R rng.R bpinit.R reducer.R worker.R
bpoptions.R cpp11.R
bpoptions.R balancer.R
cpp11.R
LinkingTo: BH, cpp11
VignetteBuilder: knitr
RoxygenNote: 7.1.2
150 changes: 150 additions & 0 deletions R/balancer.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
## Find the balancer generator function given the name or the function
.find_balancer <-
function(type = c("lapply", "iterate"), balancer = NULL)
{
type <- match.arg(type)
if (is.null(balancer)) {
if (type == "lapply")
balancer <- "stepwise"
else
balancer <- "sequential"
}

if (is.character(balancer)) {
if (type == "lapply") {
if (balancer == "sequential")
return(.balancer_sequential_lapply)
if (balancer == "random")
return(.balancer_random_lapply)
if (balancer == "stepwise")
return(.balancer_stepwise_lapply)
} else {
if (balancer == "sequential")
return(.balancer_sequential_iterate)
}
}

if (is.function(balancer))
return(balancer)
stop("Unrecognized balancer")
}



##################
## bplapply balancer generator:
## Input:
## 1. n: The length of the vector `X`
## 2. BPPARAM: The parallel backend, you should respect the value
## bptasks(BPPARAM) as much as possible
## Output: a list with two functions
## 1. next_task(): The next Task, the return value is a list with
## - task_id: An arbitrary index used by the balancer to identify the task
## - index: The indices of a subset of X that will be evaluated as a task
## (The indices must be increasing!)
## 2. record(worker, task_id, time): record the task execution time in seconds
## argument:
## - worker: which worker is responsible for this task
## - task_id: The id generated by nextTask()
## - time: The execution time in seconds
##################

## A simple balancer to equally divide the vector X into tasks.
.balancer_sequential_lapply <-
function(n, BPPARAM)
{
## How many elements in a task?
ntask <- .ntask(n, bpnworkers(BPPARAM), bptasks(BPPARAM))
elements_per_task <- ceiling(n/ntask)
task_id <- 0L
start <- 1L
list(
record = function(node, task_id, time) {
# message("Node:", node, ",id:", task_id, ",time:", time)
},
next_task = function() {
upper <- min(n, start + elements_per_task - 1L)
index <- seq.int(start, upper)
start <<- start + length(index)
task_id <<- task_id + 1L
list(
task_id = task_id,
index = index
)
}
)
}

## Randomly sample the vector INDEX.
.balancer_random_lapply <-
function(n, BPPARAM)
{
.rng_internal_stream$set()
on.exit(.rng_internal_stream$reset())
## How many elements in a task?
ntask <- .ntask(n, bpnworkers(BPPARAM), bptasks(BPPARAM))
elements_per_task <- ceiling(n/ntask)
random_index <- sample.int(n)
task_id <- 0L
start <- 1L
list(
record = function(node, task_id, time) {
# message("Node:", node, ",id:", task_id, ",time:", time)
},
next_task = function() {
upper <- min(n, start + elements_per_task - 1L)
index <- sort(random_index[seq.int(start, upper)])
start <<- start + length(index)
task_id <<- task_id + 1L
list(
task_id = task_id,
index = index
)
}
)
}

## Randomly sample the vector INDEX.
.balancer_stepwise_lapply <-
function(n, BPPARAM)
{
ntask <- .ntask(n, bpnworkers(BPPARAM), bptasks(BPPARAM))
task_id <- 0L
list(
record = function(node, task_id, time) {
# message("Node:", node, ",id:", task_id, ",time:", time)
},
next_task = function() {
task_id <<- task_id + 1L
index <- seq.int(task_id, n, by = ntask)
list(
task_id = task_id,
index = index
)
}
)
}

.balancer_sequential_iterate <-
function(ITER, BPPARAM)
{
force(ITER)
task_id <- 0L
list(
record = function(node, task_id, time) {
# message("Node:", node, ",id:", task_id, ",time:", time)
},
next_task = function(){
task_id <<- task_id + 1L
## the task value must be a list when it is not empty
value <- ITER()
if (!is.null(value))
value <- list(value)
list(
task_id = task_id,
index = task_id,
value = value
)
}
)
}
Loading