tablas - Python: uso de multiprocesamiento en un marco de datos de pandas
pandas python tutorial (2)
Que esta mal
Esta línea de su código:
pool.map(calc_dist, [''lat'',''lon''])
procesos de calc_dist(''lat'')
2: uno ejecuta calc_dist(''lat'')
y el otro ejecuta calc_dist(''lon'')
. Compara el primer ejemplo en doc . (Básicamente, pool.map(f, [1,2,3])
llama a f
tres veces con los argumentos dados en la lista que sigue: f(1)
, f(2)
y f(3)
.) Si I '' No me equivoque, su función calc_dist
solo se puede llamar calc_dist(''lat'', ''lon'')
. Y no permite el procesamiento paralelo.
Solución
Creo que desea dividir el trabajo entre procesos, probablemente enviando cada tupla (grp, lst)
a un proceso separado. El siguiente código hace exactamente eso.
Primero, preparémonos para dividir:
grp_lst_args = list(df.groupby(''co_nm'').groups.items())
print(grp_lst_args)
[(''aa'', [0, 1, 2]), (''cc'', [7, 8, 9]), (''bb'', [3, 4, 5, 6])]
Enviaremos cada una de estas tuplas (aquí, hay tres de ellas) como un argumento a una función en un proceso separado. Necesitamos reescribir la función, llamémosla calc_dist2
. Por conveniencia, su argumento es una tupla como en calc_dist2((''aa'',[0,1,2]))
def calc_dist2(arg):
grp, lst = arg
return pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], [''lat'',''lon'']],
df.loc[c[1], [''lat'',''lon'']])
]
for c in combinations(lst, 2)
],
columns=[''co_nm'',''machineA'',''machineB'',''distance''])
Y ahora viene el multiprocesamiento:
pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)
results
es una lista de resultados (aquí marcos de datos) de las llamadas calc_dist2((grp,lst))
para (grp,lst)
en grp_lst_args
. Los elementos de los results
se concatenan posteriormente en un marco de datos.
print(results_df)
co_nm machineA machineB distance
0 aa 1 2 156.876149391 km
1 aa 1 3 313.705445447 km
2 aa 2 3 156.829329105 km
0 cc 8 9 156.060165391 km
1 cc 8 0 311.910998169 km
2 cc 9 0 155.851498134 km
0 bb 4 5 156.665641837 km
1 bb 4 6 313.214333025 km
2 bb 4 7 469.622535339 km
3 bb 5 6 156.548897414 km
4 bb 5 7 312.957597466 km
5 bb 6 7 156.40899677 km
Por cierto, en Python 3 podríamos usar una with
construcción:
with mp.Pool() as pool:
results = pool.map(calc_dist2, grp_lst_args)
Actualizar
He probado este código sólo en Linux. En Linux, se puede acceder al marco de datos de solo lectura df
mediante procesos secundarios y no se copia en su espacio de memoria, pero no estoy seguro de cómo funciona exactamente en Windows. Puede considerar dividir df
en trozos (agrupados por co_nm
) y enviar estos trozos como argumentos a alguna otra versión de calc_dist
.
Quiero usar el multiprocessing
en un conjunto de datos grande para encontrar la distancia entre dos puntos gps. Construí un conjunto de prueba, pero no he podido hacer que el multiprocessing
funcione en este conjunto.
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({''ser_no'': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
''co_nm'': [''aa'', ''aa'', ''aa'', ''bb'', ''bb'', ''bb'', ''bb'', ''cc'', ''cc'', ''cc''],
''lat'': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
''lon'': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
return pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], x],
df.loc[c[1], x])
]
for grp,lst in df.groupby(''co_nm'').groups.items()
for c in combinations(lst, 2)
],
columns=[''co_nm'',''machineA'',''machineB'',''distance''])
if __name__ == ''__main__'':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, [''lat'',''lon''])
pool.close()
pool.join()
Estoy usando Python 2.7.11 e Ipython 4.1.2 con Anaconda 2.5.0 de 64 bits en Windows7 Professional cuando ocurre este error.
runfile (''C: /.../ Desktop / multiprocessing test.py'', wdir = ''C: /.../ Desktop'') Traceback (última llamada más reciente):
Archivo "", línea 1, en el archivo de ejecución (''C: /.../ Desktop / multiprocessing test.py'', wdir = ''C: /.../ Desktop'')
Archivo "C: ... / Local / Continuum / Anaconda2 / lib / site-packages / spyderlib / widgets / externalshell / sitecustomize.py", línea 699, en el archivo ejecutable execfile (nombre de archivo, espacio de nombres)
Archivo "C: ... / Local / Continuum / Anaconda2 / lib / site-packages / spyderlib / widgets / externalshell / sitecustomize.py", línea 74, en execfile exec (compile (scripttext, filename, ''exec''), glob , loc)
Archivo "C: /..../ multiprocessing test.py", línea 33, en pool.map (calc_dist, [''lat'', ''lon''])
Archivo "C: ... / AppData / Local / Continuum / Anaconda2 / lib / multiprocessing / pool.py", línea 251, en el retorno del mapa self.map_async (func, iterable, chunksize) .get ()
Archivo "C: ... / Local / Continuum / Anaconda2 / lib / multiprocessing / pool.py", línea 567, en get raise self._value
TypeError: Error al crear la instancia de Point desde 1.
def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
Extraño. Parece funcionar bajo python2 pero no python3.
Esta es una versión modificada mínima para imprimir la salida:
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({''ser_no'': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
''co_nm'': [''aa'', ''aa'', ''aa'', ''bb'', ''bb'', ''bb'', ''bb'', ''cc'', ''cc'', ''cc''],
''lat'': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
''lon'': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
ret = pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], x],
df.loc[c[1], x])
]
for grp,lst in df.groupby(''co_nm'').groups.items()
for c in combinations(lst, 2)
],
columns=[''co_nm'',''machineA'',''machineB'',''distance''])
print(ret)
return ret
if __name__ == ''__main__'':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, [''lat'',''lon''])
pool.close()
pool.join()
Y esta es la salida de python2.
0 aa 1 2 110.723608682 km
1 aa 1 3 221.460709525 km
2 aa 2 3 110.737100843 km
3 cc 8 9 110.827576495 km
4 cc 8 0 221.671650552 km
co_nm machineA machineB distance
5 cc 9 0 110.844074057 km
0 aa 1 2 110.575064814 km
1 aa 1 3 221.151481337 km
6 bb 4 5 110.765515243 km
2 aa 2 3 110.576416524 km
7 bb 4 6 221.5459187 km
3 cc 8 9 110.598565514 km
4 cc 8 0 221.203121352 km
8 bb 4 7 332.341640771 km
5 cc 9 0 110.604555838 km
6 bb 4 5 110.58113908 km
9 bb 5 6 110.780403457 km
7 bb 4 6 221.165643396 km
10 bb 5 7 221.576125528 km
8 bb 4 7 331.754177186 km
9 bb 5 6 110.584504316 km
10 bb 5 7 221.173038106 km
11 bb 6 7 110.795722071 km
11 bb 6 7 110.58853379 km
Y este es el rastro de pila de python3.
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 123, in __new__
seq = iter(arg)
TypeError: ''numpy.int64'' object is not iterable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.4/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "gps.py", line 29, in calc_dist
for grp, lst in df.groupby(''co_nm'').groups.items()
File "gps.py", line 30, in <listcomp>
for c in combinations(lst, 2)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 322, in __init__
super(vincenty, self).__init__(*args, **kwargs)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 115, in __init__
kilometers += self.measure(a, b)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 342, in measure
a, b = Point(a), Point(b)
File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 126, in __new__
"Failed to create Point instance from %r." % (arg,)
TypeError: Failed to create Point instance from 8.
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "gps.py", line 38, in <module>
pool.map(calc_dist, [''lat'', ''lon''])
File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
TypeError: Failed to create Point instance from 8.
Sé que esta no es la respuesta, pero quizás ayude ...