parallel-processing - paralelas - paralelismo de datos
Computación paralela en Julia con datos grandes (2)
Es posible que desee ver / cargar sus datos en matrices distribuidas
EDITAR: Probablemente algo como esto:
data = DataFrames.readtable("...")
dfiltered_data = distribute(data) #distributes data among processes automagically
filter_functions = [ fct1, fct2, fct3 ... ]
for fct in filter_functions
dfiltered_data = fct(dfiltered_data)::DataFrame
end
También puedes consultar las pruebas unitarias para ver más ejemplos
Primero mi pregunta:
- ¿Es posible evitar que Julia copie variables cada vez en un bucle for paralelo?
- si no, ¿cómo implementar operaciones de reducción paralela en Julia?
Ahora los detalles:
Tengo este programa:
data = DataFrames.readtable("...") # a big baby (~100MB)
filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
filtered_data = @parallel vcat for fct in filter_functions
fct(data)::DataFrame
end
Funciona bien con la funcionalidad, pero cada llamada paralela para fct (datos) en otro trabajador copia el marco de datos completo, haciendo que todo sea muy lento.
Idealmente, me gustaría cargar los datos una vez, y siempre usar cada uno en cada trabajador los datos precargados. Se me ocurrió este código para hacerlo:
@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
@everywhere for i in 1:length(filter_functions)
if (myid()-1) % nworkers()
fct = filter_functions[i]
filtered_data_temp = fct(data)
end
# How to vcat all the filtered_data_temp ?
end
Pero ahora tengo otro problema: no puedo entender cómo vcat () todo el filtrado_data_temp en una variable en el worker con myid () == 1.
Apreciaría mucho cualquier idea.
Nota: Soy consciente de que opera en paralelo en una gran estructura de datos constante en Julia . Sin embargo, no creo que se aplique a mi problema porque todas mis funciones de filtro operan en la matriz como un todo.
Después de todo, encontré allí la solución a mi pregunta: Julia: Cómo copiar datos a otro procesador en Julia .
Especialmente, introduce la siguiente primitiva para recuperar una variable de otro proceso:
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
Debajo está cómo lo estoy usando:
@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
# Executes the filter functions
@everywhere for i in 1:length(filter_functions)
local_results = ... # some type
if (myid()-1) % nworkers()
fct = filter_functions[i]
filtered_data_temp = fct(data)
local_results = vcat(local_results, filtered_data_temp)
end
# How to vcat all the filtered_data_temp ?
end
# Concatenate all the local results
all_results = ... # some type
for wid in 1:workers()
worker_local_results = getfrom(wid, :local_results)
all_results = vcat(all_results,worker_local_results)
end