Paralelizacion en R¶
Clusters¶
Para paralelizar, primero hay que "crear un cluster".
no_cores <- parallel::detectCores() - 1
cl <- parallel::makeCluster(no_cores,
setup_strategy = "sequential", # necesarry in my machine https://github.com/rstudio/rstudio/issues/6692
outfile = "/tmp/dopar.txt") # The outfile is where print messages will be written, can be "NULL" for console output
Si en esta parte se cuelga R, y no devuelve el prompt un rato, intentar agregando setup_strategy = "sequential"
a los argumentos de makeCluster
.
Paquetes¶
En todos los casos, los paquetes y funciones deben cargarse explicitamente en el cluster antes de la ejecución, eso se hace más abajo con los argumentos .packages
y .export
en foreach, o con parallel::clusterExport
y parallel::clusterEvalQ
en parLapply.
Ejemplos¶
foreach¶
Un loop en paralelo con foreach
.
custom_function <- function(stuff){
# A custom function used in the parallel execution
}
library(foreach)
library(parallel)
library(doPparallel)
no_cores <- parallel::detectCores() - 1
cl <- parallel::makeCluster(no_cores,
outfile = "/tmp/dopar.txt") # The outfile is where print messages will be written
doParallel::registerDoParallel(cl)
iterable <- 1:something # your iterable for the loop
result_list <- foreach(i=iterable,
# .packages = c("stats"), # use when the task needs functions from a package
.export=c("custom_function") # use when the task needs functions defined in your environment
) %dopar% {
# Your code goes here...
result <- custom_function(i)
result # the last expression is returned
}
A a un foreach loop se le puede agregar una funcion final, que se aplica sobre el output. Es útil, por ejemplo, cuando quieremos conservar los nombres en el input. En ese caso agregamos el argumento:
.final = function(x) setNames(x, names(iterable)),
parLapply¶
Un loop en paralelo con parallel::parLapply
:
custom_function <- function(stuff){
# A custom function used in the parallel execution
}
library(parallel)
no_cores <- parallel::detectCores() - 1
cl <- parallel::makeCluster(no_cores, outfile = "/tmp/dopar.txt")
parallel::clusterExport(cl, "custom_function")
parallel::clusterEvalQ(cl, {
# Load here the libraries and other stuff that each core will use
library(a_useful_library)
})
split_data <- split(data, data$an_id_column) # your list iterable for the loop
result_list <- parallel::parLapply(cl,
split_data,
fun = function(df){
# Your code goes here...
result <- custom_function("... do the stuff ...")
return(result)
}
Barrita de progreso¶
Con progressbar (y doSNOW): https://stackoverflow.com/a/26519566
# Setup cluster
no_cores <- parallel::detectCores() - 1
cl <- parallel::makeCluster(no_cores, outfile = "/tmp/dopar.txt", setup_strategy = "sequential")
doSNOW::registerDoSNOW(cl)
# Progress bar stuff
iterations <- length(unique(cell.boundaries$id.cell))
pb <- txtProgressBar(max = iterations, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress = progress)
result <- foreach(i = 1:iterations, .combine = rbind) %dopar%
{
s <- summary(rnorm(1e6))[3]
setTxtProgressBar(pb, i)
return(s)
}
close(pb)
stopCluster(cl)
Chunking¶
Cuando son muchas cuentitas, pero cada una es muy pequeña, puede ser que procesar cada item por separado sea "lento".
Por ejemplo, podemos ver que tarda mucho, y que los cores del CPU no están siendo usados al máximo.
Esto tiene que ver con que la ejecucion paralela de cada cachito tiene un poco de overhead. Y se nota especialmente cuando la ejecucion de esos cachitos es muy rápida.
La solucion es partir el input en varios cachos grandes, que van a repartirse en paralelo a cada core. Pero, en cada core, cada cacho grande se va a procesar secuencialmente (o sea, de la manera en que lo hariamos sin usar paralelizacion).
Eso elimina el overhead de ejecutar miles de cachitos por separado, a la vez que nos permite paralelizar, y aprovechar todo el CPU disponible.
La implementacion que sigue es un ejemplo de como podria hacerse:
# Split the data
data.list <- split(your_data, ~split_variable)
# Setup cluster
no_cores <- parallel::detectCores() - 1
cl <- parallel::makeCluster(no_cores, outfile = "/tmp/dopar.txt", setup_strategy = "sequential")
doSNOW::registerDoSNOW(cl)
# Define some arbitrary number of chunks to make
n_chunks <- min(no_cores*4,
length(data.list))
# Split the data into n_chunks
data.list.chunked <- split(data.list,
rep_len(1:n_chunks, length.out = length(data.list)))
# Progress bar stuff
iterations <- n_chunks
pb <- txtProgressBar(max = iterations, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress = progress)
output.list <- foreach(data.list.chunk = data.list.chunked,
.options.snow = opts,
.combine = c, # important for the lapply in the loop
.packages = c("dplyr")
) %dopar% {
lapply(data.list.chunk, function(data){
# TU CODIGO SARASA
})
}
close(pb)
parallel::stopCluster(cl)