python python-2.7 hadoop intellij-idea apache-spark

python - ¿Cómo se puede llamar a PySpark en modo de depuración?



python-2.7 hadoop (1)

Tengo IntelliJ IDEA configurado con Apache Spark 1.4.

Quiero poder agregar puntos de depuración a mis scripts de Spark Python para poder depurarlos fácilmente.

Actualmente estoy ejecutando este bit de Python para inicializar el proceso de chispa

proc = subprocess.Popen([SPARK_SUBMIT_PATH, scriptFile, inputFile], shell=SHELL_OUTPUT, stdout=subprocess.PIPE) if VERBOSE: print proc.stdout.read() print proc.stderr.read()

Cuando spark-submit finalmente llama a myFirstSparkScript.py , el modo de depuración no se myFirstSparkScript.py y se ejecuta de manera normal. Desafortunadamente, editar el código fuente de Apache Spark y ejecutar una copia personalizada no es una solución aceptable.

¿Alguien sabe si es posible hacer que spark-submit llame al script Apache Spark en modo de depuración? ¿Si es así, cómo?


Hasta donde entiendo tus intenciones, lo que quieres no es directamente posible dada la arquitectura Spark. Incluso sin una llamada de subprocess la única parte de su programa que es accesible directamente en un controlador es un SparkContext . Del resto, está efectivamente aislado por diferentes capas de comunicación, incluida al menos una instancia JVM (en el modo local). Para ilustrar eso, usemos un diagrama de la documentación de PySpark Internals .

Lo que está en el cuadro de la izquierda es la parte a la que se puede acceder localmente y que se puede usar para adjuntar un depurador. Dado que está más limitado a las llamadas JVM, realmente no hay nada que pueda interesarle, a menos que esté modificando PySpark.

Lo que está a la derecha sucede de forma remota y, dependiendo del administrador de clúster que utilice, es prácticamente un recuadro negro desde la perspectiva del usuario. Además, hay muchas situaciones en las que el código Python de la derecha no hace más que llamar a la API JVM.

Esta fue la parte mala. Lo bueno es que la mayoría de las veces no debería haber necesidad de depuración remota. Excluyendo el acceso a objetos como TaskContext , que se pueden burlar fácilmente, cada parte de su código debe ser fácilmente ejecutable / comprobable localmente sin usar ninguna instancia de Spark.

Las funciones que pasa a las acciones / transformaciones toman objetos Python estándar y predecibles y se espera que también devuelvan objetos Python estándar. Lo que también es importante es que estos deben estar libres de efectos secundarios.

Entonces, al final del día, tiene que dividir partes de su programa: una capa delgada a la que se puede acceder de forma interactiva y probada basándose únicamente en entradas / salidas y "núcleo computacional" que no requiere Spark para pruebas / depuración.

Otras opciones

Dicho esto, no estás completamente sin opciones aquí.

Modo local

( adjunte pasivamente el depurador a un intérprete en ejecución )

Tanto el depurador de GDB como el de PySpark pueden conectarse a un proceso en ejecución. Esto solo se puede hacer una vez que se hayan iniciado los procesos de PySpark daemon y / o de trabajo. En modo local, puede forzarlo ejecutando una acción ficticia, por ejemplo:

sc.parallelize([], n).count()

donde n es un número de "núcleos" disponibles en el modo local[n] ( local[n] ). Procedimiento de ejemplo paso a paso en sistemas tipo Unix:

  • Inicie el shell PySpark:

    $SPARK_HOME/bin/pyspark

  • Use pgrep para verificar que no se esté ejecutando ningún proceso de daemon:

    ➜ spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon ➜ spark-2.1.0-bin-hadoop2.7$

  • Lo mismo se puede determinar en PyCharm por:

    alt + shift + a y eligiendo Adjuntar al proceso local :

    o Ejecutar -> Adjuntar a proceso local .

    En este punto, debería ver solo el shell PySpark (y posiblemente algunos procesos no relacionados).

  • Ejecutar acción ficticia:

    sc.parallelize ([], 1) .count ()

  • Ahora debería ver tanto el daemon como el worker (aquí solo uno):

    ➜ spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon 13990 14046 ➜ spark-2.1.0-bin-hadoop2.7$

    y

    El proceso con un pid inferior es un demonio, el que tiene un pid superior es (posiblemente) un trabajador efímero.

  • En este punto, puede adjuntar el depurador a un proceso de interés:

    • En PyCharm eligiendo el proceso para conectarse.
    • Con GDB simple llamando a:

      gdb python <pid of running process>

La mayor desventaja de este enfoque es que ha encontrado el intérprete correcto en el momento correcto.

Modo distribuido

( Uso del componente activo que se conecta al servidor del depurador )

Con PyCharm

PyCharm proporciona Python Debug Server que se puede usar con trabajos de PySpark.

En primer lugar, debe agregar una configuración para el depurador remoto:

  • alt + shift + a y elija Editar configuraciones o Ejecutar -> Editar configuraciones .
  • Haga clic en Agregar nueva configuración (más verde) y elija Depuración remota de Python .
  • Configure el host y el puerto de acuerdo con su propia configuración (asegúrese de que ese puerto y sea alcanzado desde una máquina remota)

  • Inicie el servidor de depuración:

    shift + F9

    Debería ver la consola del depurador:

  • Asegúrese de que pyddev sea ​​accesible en los nodos de trabajo, ya sea instalándolo o distribuyendo el archivo egg .

  • pydevd usa un componente activo que debe incluirse en su código:

    import pydevd pydevd.settrace(<host name>, port=<port number>)

    La parte difícil es encontrar el lugar correcto para incluirlo y, a menos que depure las operaciones por lotes (como las funciones pasadas a mapPartitions ), puede requerir el parcheo de la fuente PySpark, por ejemplo, pyspark.daemon.worker o métodos RDD como RDD.mapPartitions . Digamos que estamos interesados ​​en depurar el comportamiento de los trabajadores. El posible parche puede verse así:

    diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 7f06d4288c..6cff353795 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -44,6 +44,9 @@ def worker(sock): """ Called by a worker process after the fork(). """ + import pydevd + pydevd.settrace(''foobar'', port=9999, stdoutToServer=True, stderrToServer=True) + signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) signal.signal(SIGTERM, SIG_DFL)

    Si decide parchear la fuente de Spark, asegúrese de usar la versión parcheada de la fuente no empaquetada que se encuentra en $SPARK_HOME/python/lib .

  • Ejecute el código PySpark. Regrese a la consola del depurador y diviértase:

Otras herramientas

Hay una serie de herramientas, incluyendo python-manhole o pyrasite que se pueden usar, con cierto esfuerzo, para trabajar con PySpark.

Nota :

Por supuesto, puede utilizar métodos "remotos" (activos) con modo local y, hasta cierto punto, métodos "locales" con modo distribuido (puede conectarse al nodo de trabajo y seguir los mismos pasos que en el modo local).