tutorial spark org examples example espaƱol python apache-spark pyspark

org - spark python example



Parece que no puedo--py archivos en Spark para trabajar (4)

Tengo un problema con el uso de Python en Spark. Mi aplicación tiene algunas dependencias, como numpy, pandas, astropy, etc. No puedo usar virtualenv para crear un entorno con todas las dependencias, ya que los nodos en el clúster no tienen ningún punto de montaje o sistema de archivos común, además de HDFS. Por lo tanto, estoy atascado con el uso de spark-submit --py-files . Empaquetaré el contenido de los paquetes de sitio en un archivo ZIP y --py-files=dependencies.zip el trabajo como con la --py-files=dependencies.zip (como se sugiere en la forma más fácil de instalar las dependencias de Python en los nodos ejecutores de Spark ). Sin embargo, los nodos en el clúster aún no parecen ver los módulos dentro y arrojan ImportError como este al importar numpy.

File "/path/anonymized/module.py", line 6, in <module> import numpy File "/tmp/pip-build-4fjFLQ/numpy/numpy/__init__.py", line 180, in <module> File "/tmp/pip-build-4fjFLQ/numpy/numpy/add_newdocs.py", line 13, in <module> File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/__init__.py", line 8, in <module> # File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/type_check.py", line 11, in <module> File "/tmp/pip-build-4fjFLQ/numpy/numpy/core/__init__.py", line 14, in <module> ImportError: cannot import name multiarray

Cuando cambio al virtualenv y uso el shell local de pyspark, todo funciona bien, por lo que las dependencias están todas allí. ¿Alguien sabe qué podría causar este problema y cómo solucionarlo?

¡Gracias!


En primer lugar, supondré que sus dependencias están enumeradas en requirements.txt . Para empaquetar y comprimir las dependencias, ejecute lo siguiente en la línea de comando:

pip install -t dependencies -r requirements.txt cd dependencies zip -r ../dependencies.zip .

Arriba, el comando de cd dependencies es crucial para garantizar que los módulos estén en el nivel superior del archivo zip. Gracias a la publicación de Dan Corin para el heads-up.

A continuación, envíe el trabajo a través de:

spark-submit --py-files dependencies.zip spark_job.py

La directiva --py-files envía el archivo zip a los trabajadores de Spark pero no lo agrega a PYTHONPATH (fuente de confusión para mí). Para agregar las dependencias a PYTHONPATH para corregir ImportError , agregue la siguiente línea al trabajo Spark, spark_job.py :

sc.addPyFile("dependencies.zip")

Una advertencia de esta publicación de Cloudera :

Una suposición de que cualquier persona que haga computación distribuida con hardware básico debe suponer que el hardware subyacente es potencialmente heterogéneo. Un huevo de Python creado en una máquina cliente será específico para la arquitectura de la CPU del cliente debido a la compilación C requerida. Distribuir un huevo para un paquete complejo y compilado como NumPy, SciPy o pandas es una solución frágil que probablemente falle en la mayoría de los clusters, al menos eventualmente.

Aunque la solución anterior no crea un huevo, se aplica la misma directriz.


  • Primero necesita pasar sus archivos a través de --py archivos o archivos

    • Cuando pase sus archivos / zip con los indicadores anteriores, básicamente sus recursos se transferirán al directorio temporal creado en HDFS solo durante el tiempo de vida de esa aplicación.
  • Ahora en su código, agregue esos archivos zip usando el siguiente comando

    sc.addPyFile("your zip/file")

    • lo que hace lo anterior es que carga los archivos en el entorno de ejecución, como JVM.
  • Ahora importe su archivo zip en su código con un alias como el siguiente para comenzar a hacer referencia a él

    import zip/file as your-alias

    Nota: No necesita usar la extensión de archivo durante la importación, como .py al final

Espero que esto sea útil.


Puede ubicar todos los .pys que necesita y agregarlos relativamente. mira aquí para esta explicación:

import os, sys, inspect # realpath() will make your script run, even if you symlink it :) cmd_folder = os.path.realpath(os.path.abspath(os.path.split(inspect.getfile( inspect.currentframe() ))[0])) if cmd_folder not in sys.path: sys.path.insert(0, cmd_folder) # use this if you want to include modules from a subfolder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"subfolder"))) if cmd_subfolder not in sys.path: sys.path.insert(0, cmd_subfolder) # Info: # cmd_folder = os.path.dirname(os.path.abspath(__file__)) # DO NOT USE __file__ !!! # __file__ fails if script is called in different ways on Windows # __file__ fails if someone does os.chdir() before # sys.argv[0] also fails because it doesn''t not always contains the path


Spark también suspenderá silenciosamente la carga de un archivo zip que se crea con el módulo python zipfile . Los archivos zip se deben crear usando una utilidad de compresión.