spark mllib example python scala apache-spark pyspark apache-spark-mllib

python - mllib - spark libraries



Llamar a la funciĆ³n Java/Scala desde una tarea (1)

La comunicación usando la puerta de enlace Py4J predeterminada simplemente no es posible. Para comprender por qué tenemos que echar un vistazo al siguiente diagrama del documento de PySpark Internals [1]:

Dado que la puerta de enlace Py4J se ejecuta en el controlador, no es accesible para los intérpretes de Python que se comunican con los trabajadores de JVM a través de sockets (consulte, por ejemplo, PythonRDD / rdd.py ).

Teóricamente, podría ser posible crear una puerta de enlace Py4J separada para cada trabajador, pero en la práctica es poco probable que sea útil. Ignorando problemas como la confiabilidad, Py4J simplemente no está diseñado para realizar tareas intensivas en datos.

¿Hay alguna solución?

  1. Uso de la API de fuentes de datos SQL de Spark para ajustar el código JVM.

    Pros : compatible, de alto nivel, no requiere acceso a la API interna de PySpark

    Contras : relativamente detallado y no muy bien documentado, limitado principalmente a los datos de entrada

  2. Operando en DataFrames usando Scala UDFs.

    Pros : Fácil de implementar (consulte Spark: ¿Cómo mapear Python con Scala o las funciones definidas por el usuario de Java? ), No hay conversión de datos entre Python y Scala si los datos ya están almacenados en un DataFrame, acceso mínimo a Py4J

    Contras : Requiere acceso a la puerta de enlace Py4J y métodos internos, limitado a Spark SQL, difícil de depurar, no compatible

  3. Crear una interfaz Scala de alto nivel de manera similar a como se hace en MLlib.

    Pros : Flexible, capacidad de ejecutar código complejo arbitrario. Se puede usar directamente en RDD (ver, por ejemplo , envoltorios de modelos MLlib ) o con DataFrames (ver Cómo usar una clase Scala dentro de Pyspark ). La última solución parece ser mucho más amigable ya que todos los detalles del servicio ya son manejados por la API existente.

    Contras : Bajo nivel, la conversión de datos requerida, igual que las UDF, requiere acceso a Py4J y API interna, no compatible

    Se pueden encontrar algunos ejemplos básicos en Transformar PySpark RDD con Scala

  4. Usar una herramienta de gestión de flujo de trabajo externo para cambiar entre trabajos de Python y Scala / Java y pasar datos a un DFS.

    Pros : fácil de implementar, cambios mínimos en el código en sí

    Contras : Costo de leer / escribir datos (¿ Alluxio ?)

  5. Uso de SQLContext compartido (ver, por ejemplo, Apache Zeppelin o Livy ) para pasar datos entre idiomas invitados usando tablas temporales registradas.

    Pros : muy adecuado para el análisis interactivo

    Contras : no tanto para trabajos por lotes (Zeppelin) o puede requerir una orquestación adicional (Livy)

  1. Joshua Rosen. (04 de agosto de 2014) PySpark Internals . Recuperado de https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

Antecedentes

Mi pregunta original aquí fue ¿Por qué el uso de DecisionTreeModel.predict dentro de la función de mapa genera una excepción? y está relacionado con ¿Cómo generar tuplas de (etiqueta original, etiqueta pronosticada) en Spark con MLlib?

Cuando usamos Scala API, una forma recomendada de obtener predicciones para RDD[LabeledPoint] usando DecisionTreeModel es simplemente mapear sobre RDD :

val labelAndPreds = testData.map { point => val prediction = model.predict(point.features) (point.label, prediction) }

Desafortunadamente, un enfoque similar en PySpark no funciona tan bien:

labelsAndPredictions = testData.map( lambda lp: (lp.label, model.predict(lp.features)) labelsAndPredictions.first()

Excepción: Parece que está intentando hacer referencia a SparkContext desde una variable de difusión, acción o transformación. SparkContext solo se puede usar en el controlador, no en el código que se ejecuta en los trabajadores. Para más información, ver SPARK-5063 .

En lugar de esa documentación oficial, se recomienda algo como esto:

predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

Entonces, ¿qué está pasando aquí? Aquí no hay ninguna variable de difusión y la API de Scala define la predict como sigue:

/** * Predict values for a single data point using the model trained. * * @param features array representing a single data point * @return Double prediction from the trained model */ def predict(features: Vector): Double = { topNode.predict(features) } /** * Predict values for the given data set using the model trained. * * @param features RDD representing data points to be predicted * @return RDD of predictions for each of the given data points */ def predict(features: RDD[Vector]): RDD[Double] = { features.map(x => predict(x)) }

así que al menos a primera vista, llamar desde la acción o la transformación no es un problema, ya que la predicción parece ser una operación local.

Explicación

Después de investigar un poco, descubrí que el origen del problema es un método JavaModelWrapper.call invocado desde DecisionTreeModel.predict . SparkContext access SparkContext que se requiere para llamar a la función Java:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

Pregunta

En el caso de DecisionTreeModel.predict existe una solución recomendada y todo el código requerido ya forma parte de la API de Scala, pero ¿hay alguna forma elegante de manejar un problema como este en general?

Solo las soluciones en las que puedo pensar en este momento son bastante pesadas:

  • empujar todo a JVM extendiendo las clases de Spark a través de conversiones implícitas o agregando algún tipo de envoltorios
  • usando la puerta de enlace Py4j directamente