parallel - Memoria compartida en paralelo foreach en R
foreach r (2)
Descripción del problema:
Tengo una gran matriz c
, cargada en memoria RAM. Mi objetivo es a través del procesamiento paralelo tener acceso de solo lectura al mismo. Sin embargo, cuando creo las conexiones, ya sea que uso doSNOW
, doMPI
, big.matrix
, etc., la cantidad de RAM utilizada aumenta dramáticamente.
¿Hay una manera de crear correctamente una memoria compartida, desde la cual se puedan leer todos los procesos, sin crear una copia local de todos los datos?
Ejemplo:
libs<-function(libraries){# Installs missing libraries and then load them
for (lib in libraries){
if( !is.element(lib, .packages(all.available = TRUE)) ) {
install.packages(lib)
}
library(lib,character.only = TRUE)
}
}
libra<-list("foreach","parallel","doSNOW","bigmemory")
libs(libra)
#create a matrix of size 1GB aproximatelly
c<-matrix(runif(10000^2),10000,10000)
#convert it to bigmatrix
x<-as.big.matrix(c)
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
out<-foreach(linID = 1:10, .combine=c) %dopar% {
#load bigmemory
require(bigmemory)
# attach the matrix via shared memory??
m <- attach.big.matrix(mdesc)
#dummy expression to test data aquisition
c<-m[1,1]
}
closeAllConnections()
RAM: en la imagen de arriba, puedes encontrar que la memoria aumenta mucho hasta que foreach
termina y se libera.
Alternativamente, si está en Linux / Mac y quiere una memoria compartida de CoW, use las horquillas. Primero cargue todos sus datos en el hilo principal, y luego inicie los hilos de trabajo (bifurcaciones) con la función general mcparallel
desde el paquete parallel
.
Puede recopilar sus resultados con mccollect
o con el uso de una memoria verdaderamente compartida utilizando la biblioteca Rdsm
, como esto:
library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = ''double'')
shared[1]<-1 #Init shared memory with some number
job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23
Puede confirmar que el valor realmente se actualiza en backgruound, si retrasa la escritura:
fn<-function()
{
Sys.sleep(1) #One second delay
shared[1]<-11
}
job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)
Para controlar la concurrencia y evitar las condiciones de carrera, use candados:
library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"
bad.incr<-function() #This function doesn''t protect the shared resource with locks:
{
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
}
good.incr<-function()
{
lock(m)
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
unlock(m)
}
shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn''t get increased 5 times due to race conditions
mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6
mccollect()
Editar:
Simplifiqué un poco las dependencias al intercambiar Rdsm::mgrmakevar
en bigmemory::big.matrix
. mgrmakevar
llama internamente a big.matrix
todos modos, y no necesitamos nada más.
Creo que la solución al problema puede verse en el post de Steve Weston, el autor del paquete foreach
, here . Allí dice:
El paquete doParallel exportará automáticamente las variables a los trabajadores a los que se hace referencia en el bucle foreach.
Así que creo que el problema es que en su código se hace referencia a su gran matriz c
en la asignación c<-m[1,1]
. Simplemente intente xyz <- m[1,1]
lugar y vea qué sucede.
Aquí hay un ejemplo con un big.matrix
respaldado por big.matrix
:
#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double",
separated = FALSE,
backingfile = "example.bin",
descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
invisible(c) ## c is referenced and thus exported to workers
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
closeAllConnections()