¿Hay manera de seguir el progreso en un mclapply?
progress-bar (5)
Aquí hay una función basada en la solución de @ fotNelton para aplicar donde quiera que normalmente use mcapply.
mcadply <- function(X, FUN, ...) {
# Runs multicore lapply with progress indicator and transformation to
# data.table output. Arguments mirror those passed to lapply.
#
# Args:
# X: Vector.
# FUN: Function to apply to each value of X. Note this is transformed to
# a data.frame return if necessary.
# ...: Other arguments passed to mclapply.
#
# Returns:
# data.table stack of each mclapply return value
#
# Progress bar code based on https://stackoverflow.com/a/10993589
require(multicore)
require(plyr)
require(data.table)
local({
f <- fifo(tempfile(), open="w+b", blocking=T)
if (inherits(fork(), "masterProcess")) {
# Child
progress <- 0
print.progress <- 0
while (progress < 1 && !isIncomplete(f)) {
msg <- readBin(f, "double")
progress <- progress + as.numeric(msg)
# Print every 1%
if(progress >= print.progress + 0.01) {
cat(sprintf("Progress: %.0f%%/n", progress * 100))
print.progress <- floor(progress * 100) / 100
}
}
exit()
}
newFun <- function(...) {
writeBin(1 / length(X), f)
return(as.data.frame(FUN(...)))
}
result <- as.data.table(rbind.fill(mclapply(X, newFun, ...)))
close(f)
cat("Done/n")
return(result)
})
}
Me encanta la configuración .progress = ''text''
en el plyr''s
llply
. Sin embargo, me causa mucha ansiedad no saber qué tan lejos está mclapply
(del paquete de multicore
), ya que los elementos de la lista se envían a varios núcleos y luego se recopilan al final.
He estado enviando mensajes como *currently in sim_id # ....*
pero eso no es muy útil porque no me da un indicador de qué porcentaje de elementos de la lista están completos (aunque es útil saber que mi script no está atascado y se mueve a lo largo).
¿Alguien puede sugerir otras ideas que me permitan ver mi archivo .Rout
y tener una idea del progreso? He pensado en agregar un contador manual, pero no veo cómo lo implementaría, ya que mclapply
debe terminar de procesar todos los elementos de la lista antes de poder emitir comentarios.
Basado en la respuesta de @fotNelson, usando una barra de progreso en lugar de imprimir línea por línea y llamando a una función externa con mclapply.
library(''utils'')
library(''multicore'')
prog.indic <- local({ #evaluates in local environment only
f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection
assign(x=''f'',value=f,envir=.GlobalEnv)
pb <- txtProgressBar(min=1, max=MC,style=3)
if (inherits(fork(), "masterProcess")) { #progress tracker
# Child
progress <- 0.0
while (progress < MC && !isIncomplete(f)){
msg <- readBin(f, "double")
progress <- progress + as.numeric(msg)
# Updating the progress bar.
setTxtProgressBar(pb,progress)
}
exit()
}
MC <- 100
result <- mclapply(1:MC, .mcfunc)
cat(''/n'')
assign(x=''result'',value=result,envir=.GlobalEnv)
close(f)
})
.mcfunc<-function(i,...){
writeBin(1, f)
return(i)
}
Es necesario asignar la conexión fifo a .GlobalEnv para usarla desde una función fuera de la llamada mclapply. Gracias por las preguntas y las respuestas anteriores, me había estado preguntando cómo hacer esto por un tiempo.
Debido al hecho de que mclapply
genera múltiples procesos, uno podría querer usar quince, tuberías o incluso enchufes. Ahora considere el siguiente ejemplo:
library(multicore)
finalResult <- local({
f <- fifo(tempfile(), open="w+b", blocking=T)
if (inherits(fork(), "masterProcess")) {
# Child
progress <- 0.0
while (progress < 1 && !isIncomplete(f)) {
msg <- readBin(f, "double")
progress <- progress + as.numeric(msg)
cat(sprintf("Progress: %.2f%%/n", progress * 100))
}
exit()
}
numJobs <- 100
result <- mclapply(1:numJobs, function(...) {
# Dome something fancy here
# ...
# Send some progress update
writeBin(1/numJobs, f)
# Some arbitrary result
sample(1000, 1)
})
close(f)
result
})
cat("Done/n")
Aquí, un archivo temporal se usa como fifo, y el proceso principal incluye a un niño cuyo único deber es informar el progreso actual. El proceso principal continúa llamando a mclapply
donde la expresión (más precisamente, el bloque de expresión) que se va a evaluar escribe información de progreso parcial en el búfer de fifo mediante writeBin
.
Como este es solo un ejemplo simple, probablemente tendrá que adaptar todo el material de salida a sus necesidades. HTH!
El paquete pbapply
ha implementado esto para el caso general. Tanto pblapply
como pbsapply
tienen un argumento cl
. De la documentation :
El procesamiento paralelo se puede habilitar a través del argumento
cl
.parLapply
se llama cuandocl
es un objeto ''cluster
'',mclapply
se llama cuandocl
es un entero. Mostrar la barra de progreso aumenta la sobrecarga de comunicación entre el proceso principal y los procesos nodos / hijos en comparación con los equivalentes paralelos de las funciones sin la barra de progreso. Las funciones vuelven a sus equivalentes originales cuando la barra de progreso está desactivada (es decir,getOption("pboptions")$type == "none"
dopb()
esFALSE
). Este es el valor predeterminado cuandointeractive()
si esFALSE
(es decir, se llama desde el script de la línea de comandos R).
Si uno no proporciona cl
(o pasa NULL
), se utiliza el lapply
no paralelo lapply
, que también incluye una barra de progreso.
Esencialmente, agregando otra versión de la solución de @ fotNelson, pero con algunas modificaciones:
- Drop en reemplazo de mclapply (admite todas las funciones de mclapply)
- Atrapa ctrl-c llama y aborta con gracia
- utiliza la barra de progreso incorporada (txtProgressBar)
- opción para realizar un seguimiento del progreso o no y utilizar un estilo específico de la barra de progreso
- utiliza
parallel
lugar demulticore
que ahora se ha eliminado de CRAN - obliga a X a enumerar según mclapply (de modo que la longitud (X) da los resultados esperados)
- Documentación de estilo roxygen2 en la parte superior.
Espero que esto ayude a alguien...
library(parallel)
#-------------------------------------------------------------------------------
#'' Wrapper around mclapply to track progress
#''
#'' Based on http://.com/questions/10984556
#''
#'' @param X a vector (atomic or list) or an expressions vector. Other
#'' objects (including classed objects) will be coerced by
#'' ‘as.list’
#'' @param FUN the function to be applied to
#'' @param ... optional arguments to ‘FUN’
#'' @param mc.preschedule see mclapply
#'' @param mc.set.seed see mclapply
#'' @param mc.silent see mclapply
#'' @param mc.cores see mclapply
#'' @param mc.cleanup see mclapply
#'' @param mc.allow.recursive see mclapply
#'' @param mc.progress track progress?
#'' @param mc.style style of progress bar (see txtProgressBar)
#''
#'' @examples
#'' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01))
#'' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1)
#''
#'' dat <- lapply(1:10, function(x) rnorm(100))
#'' func <- function(x, arg1) mean(x)/arg1
#'' mclapply2(dat, func, arg1=10, mc.cores=2)
#-------------------------------------------------------------------------------
mclapply2 <- function(X, FUN, ...,
mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE,
mc.progress=TRUE, mc.style=3)
{
if (!is.vector(X) || is.object(X)) X <- as.list(X)
if (mc.progress) {
f <- fifo(tempfile(), open="w+b", blocking=T)
p <- parallel:::mcfork()
pb <- txtProgressBar(0, length(X), style=mc.style)
setTxtProgressBar(pb, 0)
progress <- 0
if (inherits(p, "masterProcess")) {
while (progress < length(X)) {
readBin(f, "double")
progress <- progress + 1
setTxtProgressBar(pb, progress)
}
cat("/n")
parallel:::mcexit()
}
}
tryCatch({
result <- mclapply(X, ..., function(...) {
res <- FUN(...)
if (mc.progress) writeBin(1, f)
res
},
mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed,
mc.silent = mc.silent, mc.cores = mc.cores,
mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive
)
}, finally = {
if (mc.progress) close(f)
})
result
}