example - pandas python
AplicaciĆ³n eficiente de una funciĆ³n a un panda DataFrame agrupado en paralelo (1)
De los comentarios anteriores, parece que esto está planeado para los pandas
algún momento (también hay un proyecto rosetta
aspecto interesante que acabo de notar).
Sin embargo, hasta que cada funcionalidad paralela se incorpora a los pandas
, noté que es muy fácil escribir aumentos paralelos eficientes y sin copia de memoria directamente a pandas
usando cython
+ OpenMP y C ++.
Aquí hay un pequeño ejemplo de cómo escribir un groupby-sum paralelo, cuyo uso es algo como esto:
import pandas as pd
import para_group_demo
df = pd.DataFrame({''a'': [1, 2, 1, 2, 1, 1, 0], ''b'': range(7)})
print para_group_demo.sum(df.a, df.b)
y la salida es:
sum
key
0 6
1 11
2 4
Nota Sin duda, la funcionalidad de este simple ejemplo eventualmente formará parte de los pandas
. Algunas cosas, sin embargo, serán más naturales para paralelizar en C ++ durante algún tiempo, y es importante ser conscientes de lo fácil que es combinar esto en pandas
.
Para hacer esto, escribí una simple extensión de archivo de fuente única cuyo código sigue.
Comienza con algunas importaciones y definiciones de tipo
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange
import pandas as pd
ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t
El tipo C ++ unordered_map
es para sumar por un solo hilo, y el vector
es para sumar por todos los hilos.
Ahora a la función sum
. Comienza con vistas de memoria tipadas para un acceso rápido:
def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values
La función continúa dividiendo el semi-igual a los subprocesos (aquí codificados en 4), y haciendo que cada subproceso sume las entradas en su rango:
cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l / num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)
Cuando los hilos se han completado, la función fusiona todos los resultados (de los diferentes rangos) en un solo unordered_map
:
cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)
Todo lo que queda es crear un DataFrame
y devolver los resultados:
key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)
df = pd.DataFrame({''key'': key, ''sum'': sum_})
df.set_index(''key'', inplace=True)
return df
A menudo necesito aplicar una función a los grupos de un DataFrame
muy grande (de tipos de datos mixtos) y me gustaría aprovechar los múltiples núcleos.
Puedo crear un iterador a partir de los grupos y usar el módulo de multiprocesamiento, pero no es eficiente porque cada grupo y los resultados de la función deben ser escaneados para mensajes entre procesos.
¿Hay alguna forma de evitar el decapado o incluso evitar la copia del DataFrame
completo? Parece que las funciones de memoria compartida de los módulos de multiprocesamiento están limitadas a matrices numpy
. ¿Hay más opciones?