parallel for r foreach parallel-processing r-bigmemory

parallel - Memoria compartida en paralelo foreach en R



foreach r (2)

Descripción del problema:

Tengo una gran matriz c , cargada en memoria RAM. Mi objetivo es a través del procesamiento paralelo tener acceso de solo lectura al mismo. Sin embargo, cuando creo las conexiones, ya sea que uso doSNOW , doMPI , big.matrix , etc., la cantidad de RAM utilizada aumenta dramáticamente.

¿Hay una manera de crear correctamente una memoria compartida, desde la cual se puedan leer todos los procesos, sin crear una copia local de todos los datos?

Ejemplo:

libs<-function(libraries){# Installs missing libraries and then load them for (lib in libraries){ if( !is.element(lib, .packages(all.available = TRUE)) ) { install.packages(lib) } library(lib,character.only = TRUE) } } libra<-list("foreach","parallel","doSNOW","bigmemory") libs(libra) #create a matrix of size 1GB aproximatelly c<-matrix(runif(10000^2),10000,10000) #convert it to bigmatrix x<-as.big.matrix(c) # get a description of the matrix mdesc <- describe(x) # Create the required connections cl <- makeCluster(detectCores ()) registerDoSNOW(cl) out<-foreach(linID = 1:10, .combine=c) %dopar% { #load bigmemory require(bigmemory) # attach the matrix via shared memory?? m <- attach.big.matrix(mdesc) #dummy expression to test data aquisition c<-m[1,1] } closeAllConnections()

RAM: en la imagen de arriba, puedes encontrar que la memoria aumenta mucho hasta que foreach termina y se libera.


Alternativamente, si está en Linux / Mac y quiere una memoria compartida de CoW, use las horquillas. Primero cargue todos sus datos en el hilo principal, y luego inicie los hilos de trabajo (bifurcaciones) con la función general mcparallel desde el paquete parallel .

Puede recopilar sus resultados con mccollect o con el uso de una memoria verdaderamente compartida utilizando la biblioteca Rdsm , como esto:

library(parallel) library(bigmemory) #for shared variables shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = ''double'') shared[1]<-1 #Init shared memory with some number job<-mcparallel({shared[1]<-23}) #...change it in another forked thread shared[1,1] #...and confirm that it gets changed # [1] 23

Puede confirmar que el valor realmente se actualiza en backgruound, si retrasa la escritura:

fn<-function() { Sys.sleep(1) #One second delay shared[1]<-11 } job<-mcparallel(fn()) shared[1] #Execute immediately after last command # [1] 23 aaa[1,1] #Execute after one second # [1] 11 mccollect() #To destroy all forked processes (and possibly collect their output)

Para controlar la concurrencia y evitar las condiciones de carrera, use candados:

library(synchronicity) #for locks m<-boost.mutex() #Lets create a mutex "m" bad.incr<-function() #This function doesn''t protect the shared resource with locks: { a<-shared[1] Sys.sleep(1) shared[1]<-a+1 } good.incr<-function() { lock(m) a<-shared[1] Sys.sleep(1) shared[1]<-a+1 unlock(m) } shared[1]<-1 for (i in 1:5) job<-mcparallel(bad.incr()) shared[1] #You can verify, that the value didn''t get increased 5 times due to race conditions mccollect() #To clear all threads, not to get the values shared[1]<-1 for (i in 1:5) job<-mcparallel(good.incr()) shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6 #[1] 6 mccollect()

Editar:

Simplifiqué un poco las dependencias al intercambiar Rdsm::mgrmakevar en bigmemory::big.matrix . mgrmakevar llama internamente a big.matrix todos modos, y no necesitamos nada más.


Creo que la solución al problema puede verse en el post de Steve Weston, el autor del paquete foreach , here . Allí dice:

El paquete doParallel exportará automáticamente las variables a los trabajadores a los que se hace referencia en el bucle foreach.

Así que creo que el problema es que en su código se hace referencia a su gran matriz c en la asignación c<-m[1,1] . Simplemente intente xyz <- m[1,1] lugar y vea qué sucede.

Aquí hay un ejemplo con un big.matrix respaldado por big.matrix :

#create a matrix of size 1GB aproximatelly n <- 10000 m <- 10000 c <- matrix(runif(n*m),n,m) #convert it to bigmatrix x <- as.big.matrix(x = c, type = "double", separated = FALSE, backingfile = "example.bin", descriptorfile = "example.desc") # get a description of the matrix mdesc <- describe(x) # Create the required connections cl <- makeCluster(detectCores ()) registerDoSNOW(cl) ## 1) No referencing out <- foreach(linID = 1:4, .combine=c) %dopar% { t <- attach.big.matrix("example.desc") for (i in seq_len(30L)) { for (j in seq_len(m)) { y <- t[i,j] } } return(0L) }

## 2) Referencing out <- foreach(linID = 1:4, .combine=c) %dopar% { invisible(c) ## c is referenced and thus exported to workers t <- attach.big.matrix("example.desc") for (i in seq_len(30L)) { for (j in seq_len(m)) { y <- t[i,j] } } return(0L) } closeAllConnections()