org - spark python example
Parece que no puedo--py archivos en Spark para trabajar (4)
Tengo un problema con el uso de Python en Spark. Mi aplicación tiene algunas dependencias, como numpy, pandas, astropy, etc. No puedo usar virtualenv para crear un entorno con todas las dependencias, ya que los nodos en el clúster no tienen ningún punto de montaje o sistema de archivos común, además de HDFS. Por lo tanto, estoy atascado con el uso de spark-submit --py-files
. Empaquetaré el contenido de los paquetes de sitio en un archivo ZIP y --py-files=dependencies.zip
el trabajo como con la --py-files=dependencies.zip
(como se sugiere en la forma más fácil de instalar las dependencias de Python en los nodos ejecutores de Spark ). Sin embargo, los nodos en el clúster aún no parecen ver los módulos dentro y arrojan ImportError
como este al importar numpy.
File "/path/anonymized/module.py", line 6, in <module>
import numpy
File "/tmp/pip-build-4fjFLQ/numpy/numpy/__init__.py", line 180, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/add_newdocs.py", line 13, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/__init__.py", line 8, in <module>
#
File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/type_check.py", line 11, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/core/__init__.py", line 14, in <module>
ImportError: cannot import name multiarray
Cuando cambio al virtualenv y uso el shell local de pyspark, todo funciona bien, por lo que las dependencias están todas allí. ¿Alguien sabe qué podría causar este problema y cómo solucionarlo?
¡Gracias!
En primer lugar, supondré que sus dependencias están enumeradas en requirements.txt
. Para empaquetar y comprimir las dependencias, ejecute lo siguiente en la línea de comando:
pip install -t dependencies -r requirements.txt
cd dependencies
zip -r ../dependencies.zip .
Arriba, el comando de cd dependencies
es crucial para garantizar que los módulos estén en el nivel superior del archivo zip. Gracias a la publicación de Dan Corin para el heads-up.
A continuación, envíe el trabajo a través de:
spark-submit --py-files dependencies.zip spark_job.py
La directiva --py-files
envía el archivo zip a los trabajadores de Spark pero no lo agrega a PYTHONPATH
(fuente de confusión para mí). Para agregar las dependencias a PYTHONPATH
para corregir ImportError
, agregue la siguiente línea al trabajo Spark, spark_job.py
:
sc.addPyFile("dependencies.zip")
Una advertencia de esta publicación de Cloudera :
Una suposición de que cualquier persona que haga computación distribuida con hardware básico debe suponer que el hardware subyacente es potencialmente heterogéneo. Un huevo de Python creado en una máquina cliente será específico para la arquitectura de la CPU del cliente debido a la compilación C requerida. Distribuir un huevo para un paquete complejo y compilado como NumPy, SciPy o pandas es una solución frágil que probablemente falle en la mayoría de los clusters, al menos eventualmente.
Aunque la solución anterior no crea un huevo, se aplica la misma directriz.
Primero necesita pasar sus archivos a través de --py archivos o archivos
- Cuando pase sus archivos / zip con los indicadores anteriores, básicamente sus recursos se transferirán al directorio temporal creado en HDFS solo durante el tiempo de vida de esa aplicación.
Ahora en su código, agregue esos archivos zip usando el siguiente comando
sc.addPyFile("your zip/file")
- lo que hace lo anterior es que carga los archivos en el entorno de ejecución, como JVM.
Ahora importe su archivo zip en su código con un alias como el siguiente para comenzar a hacer referencia a él
import zip/file as your-alias
Nota: No necesita usar la extensión de archivo durante la importación, como .py al final
Espero que esto sea útil.
Puede ubicar todos los .pys que necesita y agregarlos relativamente. mira aquí para esta explicación:
import os, sys, inspect
# realpath() will make your script run, even if you symlink it :)
cmd_folder = os.path.realpath(os.path.abspath(os.path.split(inspect.getfile( inspect.currentframe() ))[0]))
if cmd_folder not in sys.path:
sys.path.insert(0, cmd_folder)
# use this if you want to include modules from a subfolder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"subfolder")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
# Info:
# cmd_folder = os.path.dirname(os.path.abspath(__file__)) # DO NOT USE __file__ !!!
# __file__ fails if script is called in different ways on Windows
# __file__ fails if someone does os.chdir() before
# sys.argv[0] also fails because it doesn''t not always contains the path
Spark también suspenderá silenciosamente la carga de un archivo zip que se crea con el módulo python zipfile
. Los archivos zip se deben crear usando una utilidad de compresión.