programas - que es computacion paralela y distribuida
Acelere el grupo data.table utilizando múltiples núcleos y programación paralela (2)
¿Se puede data.table
agregación con data.table
? Sí.
¿Vale la pena? No Este es un punto clave que la respuesta anterior no pudo resaltar.
Como explica Matt Dowle en la computación de tabla de datos y en paralelo , las copias ("trozos") deben realizarse antes de distribuirse cuando se ejecutan operaciones en paralelo. Esto ralentiza las cosas. En algunos casos, cuando no puede usar data.table
(por ejemplo, ejecutar muchas regresiones lineales), vale la pena dividir las tareas entre los núcleos. Pero no la agregación, al menos cuando se trata de data.table
.
En resumen (y hasta que se demuestre lo contrario), agregue utilizando data.table
y deje de preocuparse por los posibles aumentos de velocidad utilizando doMC
. data.table
ya está ardiendo rápidamente en comparación con cualquier otra cosa disponible cuando se trata de la agregación, ¡incluso si no es multinúcleo!
Aquí hay algunos puntos de referencia que puede ejecutar por sí mismo comparando datos. data.table
interna de la data.table
usando by
con foreach
y mclapply
. Los resultados se enumeran primero.
#-----------------------------------------------
# TL;DR FINAL RESULTS (Best to Worst)
# 3 replications, N = 10000:
# (1) 0.007 -- data.table using `by`
# (2) 3.548 -- mclapply with rbindlist
# (3) 5.557 -- foreach with rbindlist
# (4) 5.959 -- foreach with .combine = "rbind"
# (5) 14.029 -- lapply
# ----------------------------------------------
library(data.table)
## And used the following to create the dt
N <- 1e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
setkey(dt, "a")
# TEST AGGREGATION WITHOUT PARALLELIZATION ---------------------------
## using data.tables `by` to aggregate
round(rowMeans(replicate(3, system.time({
dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)]
}))), 3)
# [1] 0.007 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
## using `lapply`
round(rowMeans(replicate(3, system.time({
results <- lapply(unique(dt[["a"]]), function(x) {
dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1])]
})
rbindlist(results)
}))), 3)
# [1] 14.029 elapsed for N == 10,000
# USING `mclapply` FORKING ---------------------------------
## use mclapply
round(rowMeans(replicate(3, system.time({
results <- mclapply(unique(dt[["a"]]),
function(x) {
dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}, mc.cores=4)
rbindlist(results)
}))), 3)
# [1] 3.548 elapsed for N == 10,000
# PARALLELIZATION USING `doMC` PACKAGE ---------------------------------
library(doMC)
mc = 4
registerDoMC(cores=mc)
getDoParWorkers()
# [1] 4
## (option a) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}))), 3)
# [1] 5.959 elapsed for N == 10,000
## (option b) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
results <-
foreach(x=unique(dt[["a"]])) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
rbindlist(results)
}))), 3)
# [1] 5.557 elapsed for N == 10,000
registerDoSEQ()
getDoParWorkers()
# [1] 1
Tengo un código grande y el paso de agregación es el cuello de botella actual en términos de velocidad.
En mi código me gustaría acelerar el paso de agrupación de datos para ser más rápido. Un SNOTE (ejemplo simple no trivial) de mis datos se ve así:
library(data.table)
a = sample(1:10000000, 50000000, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e)
system.time(c.dt <- dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)])
user system elapsed
60.107 3.143 63.534
Esto es bastante rápido para un ejemplo de datos tan grande, pero en mi caso sigo buscando más velocidad. En mi caso, tengo varios núcleos, por lo que estoy casi seguro de que debe haber una forma de utilizar dicha capacidad de cálculo.
Estoy abierto a cambiar mi tipo de datos a un objeto data.frame, o idata.frame (en teoría, idata.frame es supuestamente más rápido que data.frames).
Investigué un poco y parece que el paquete plyr tiene algunas capacidades paralelas que podrían ser útiles, pero todavía estoy luchando sobre cómo hacerlo para la agrupación que estoy tratando de hacer. En otro post de SO discuten algunas de estas ideas . Todavía no estoy seguro de cuánto más lograría con esta paralelización, ya que utiliza la función foreach. En mi experiencia, la función foreach no es una buena idea para millones de operaciones rápidas porque el esfuerzo de comunicación entre los núcleos termina ralentizando el esfuerzo de paralelización.
Si tiene varios núcleos disponibles, ¿por qué no aprovechar el hecho de que puede filtrar y agrupar rápidamente filas en una tabla de datos utilizando su clave?
library(doMC)
registerDoMC(cores=4)
setkey(dt, "a")
finalRowOrderMatters = FALSE # FALSE can be faster
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
Tenga en cuenta que si el número de grupos únicos (es decir, length(unique(a))
) es relativamente pequeño, será más rápido descartar el argumento .combine
, volver a obtener los resultados en una lista, luego llamar a rbindlist
en los resultados. En mis pruebas en dos núcleos y 8 GB de RAM, el umbral estaba en aproximadamente 9,000 valores únicos. Aquí está lo que usé para comparar:
# (otion a)
round(rowMeans(replicate(3, system.time({
# ------- #
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
# ------- #
}))), 3)
# [1] 1.243 elapsed for N == 1,000
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 57.404 elapsed for N == 50,000
# (otion b)
round(rowMeans(replicate(3, system.time({
# ------- #
results <-
foreach(x=unique(dt[["a"]])) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
rbindlist(results)
# ------- #
}))), 3)
# [1] 1.117 elapsed for N == 1,000
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 76.613 elapsed for N == 50,000
## And used the following to create the dt
N <- 5e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")