You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am trying to implement an iterative algorithm using a combination of parallel and future packages. I think my current implementation is suboptimal and I am requesting some feedback on improving it.
Problem description:
Consider a problem of where I have 10 data subsets that are distributed on a cluster. This can be done for a simple linear regression example as follows:
## X is the design matrix, b is regression coefficient, and y is the response
X <- matrix(rnorm(1000 * 2), 1000, 2)
b <- 1:2
y <- drop(X %*% b) + rnorm(1000, sd = 0.01)
## split index
splitIdx <- rep(1:10, each = 100)
subIdx <- split(1:1000, splitIdx)
## data subsets to be distributed on a cluster
dataList <- list()
for (ii in 1:10) {
dataList[[ii]] <- list(X = X[subIdx[[ii]], ],
y = y[subIdx[[ii]]]
)
}
## distribute them on a cluster
library(future)
library(parallel)
nsubs <- 10
numCores <- availableCores()
if (numCores >= nsubs) {
cat("**** working with ", numCores, " cores, but we only need ", nsubs, " cores ****", "\n")
cl <- makeCluster(nsubs)
} else {
cat("**** PROBLEM: working with ", numCores, " cores, but need ", nsubs, " cores :PROBLEM ***", "\n")
}
## Assign the variables appropriately in the workers' GlobalEnv. The 'recvd'
## variable is a consistency check to ensure we have succeeded.
sendData <- clusterApply(cl, dataList,
function (vars) {
x <<- vars$X
y <<- vars$y
recvd <<- TRUE
}
)
I would like to do Markov chain Monte Carlo sampling in parallel for inference on the regression coefficient b, but with a crucial twist. Instead of using all the data on the 10 subsets, I am only going to use the results from 2 workers who finished their computations first (of course this won't make a difference here but there is a motivation for doing so in massive data settings). This is how I do it currently using the future and parallel packages.
## the function for getting the sufficient statistics from the subsets
suffStat <- function(xmat, yvec, bhat) {
xtx <- crossprod(xmat, xmat)
xty <- drop(crossprod(xmat, yvec))
rss <- sum((drop(yvec - xmat %*% bhat))^2)
list(xtx = xtx, xty = xty, rss = rss)
}
## export to the workers
clusterExport(cl, "suffStat")
## import future package on all the workers
clusterEvalQ(cl, library(future))
## initialize the b parameter to 0 and initialize the least squares estimate
betas <- rep(0, 2)
bhat <- drop(solve(crossprod(X, X) ) %*% crossprod(X, y))
## export bhat to all the workers
clusterExport(cl, "bhat")
## intitialize a few variables before running our algorithm
niter <- 2000 ## number of MCMC iterations
betasSamp <- matrix(0.0, niter, 2) ## 'b' draws are saved here
## start the MCMC iterations
startTime <- proc.time()
for (its in 1:niter) {
if (its %% 100 == 0) cat("iter: ", its, "\n")
## export the future and evaluate it in the global env
clusterEvalQ(cl, f <<- future({suffStat(x, y, bhat)},
globals = list(bhat = bhat),
assign.env = .GlobalEnv
)
)
## check if we can have the value
done <- unlist(clusterEvalQ(cl, resolved(f)))
while(sum(done) < 2) {
done <- unlist(clusterEvalQ(cl, resolved(f)))
}
## break the loop when results are received from atleast 2 workers
suffStatList <- clusterEvalQ(cl, values(f))
## We want to use ONLY 2 worker results. This is a small
## operation, so we randomly pick 2.
waitIdx <- sort(sample(which(done), 2))
xtx <- Reduce("+", lapply(suffStatList[waitIdx], function(x) x$xtx)) / length(waitIdx)
xty <- Reduce("+", lapply(suffStatList[waitIdx], function(x) x$xty)) / length(waitIdx)
## draw error variance
sig <- Reduce("+", lapply(suffStatList[waitIdx], function(x) x$rss)) / rchisq(1, 198)
## draw b
covBetasInv <- solve(xtx)
covBetas <- sig * chol2inv(chol(covBetasInv))
meanBetas <- covBetas %*% xty
betas <- meanBetas + crossprod(chol(covBetas), rnorm(2, 0, 1))
## updated the books!
betasSamp[its, ] <- betas
}
endTime <- proc.time()
I know that this is a suboptimal implementation because I haven't used plan and relied on parallel package for more control. More importantly, I have the following question:
How can I re-evaluate a future across the iterations, where only one argument of the future changes across iterations?
Currently, I do this by resending the future using clusterEvalQ, but there should be a better solution for handling the iterative nature of the computations.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello Henrik and others,
I am trying to implement an iterative algorithm using a combination of parallel and future packages. I think my current implementation is suboptimal and I am requesting some feedback on improving it.
Problem description:
Consider a problem of where I have 10 data subsets that are distributed on a cluster. This can be done for a simple linear regression example as follows:
I would like to do Markov chain Monte Carlo sampling in parallel for inference on the regression coefficient
b
, but with a crucial twist. Instead of using all the data on the 10 subsets, I am only going to use the results from 2 workers who finished their computations first (of course this won't make a difference here but there is a motivation for doing so in massive data settings). This is how I do it currently using the future and parallel packages.I know that this is a suboptimal implementation because I haven't used
plan
and relied on parallel package for more control. More importantly, I have the following question:How can I re-evaluate a future across the iterations, where only one argument of the future changes across iterations?
Currently, I do this by resending the future using
clusterEvalQ
, but there should be a better solution for handling the iterative nature of the computations.Thank you for your help!
Best,
Sanvesh
Beta Was this translation helpful? Give feedback.
All reactions