parallelize - Spark: ¿Cómo mapear Python con Scala o Java User Defined Functions?
spark pi example (1)
Digamos, por ejemplo, que mi equipo eligió Python como el lenguaje de referencia para desarrollar con Spark. Pero más tarde por motivos de rendimiento, nos gustaría desarrollar librairies específicos de Scala o Java para mapearlos con nuestro código Python (algo similar a los stubs de Python con esqueletos de Scala o Java).
¿No crees que es posible interconectar nuevos métodos personalizados de Python con algunas funciones definidas por el usuario de Scala o Java?
Spark 2.1+
Puede usar SQLContext.registerJavaFunction
:
Registre una UDF java para que pueda ser utilizada en declaraciones SQL.
que requiere un name
, un name
completamente calificado de la clase Java y un tipo de devolución opcional. Lamentablemente, por ahora solo se puede usar en sentencias SQL (o con expr
/ selectExpr
) y requiere una Java org.apache.spark.sql.api.java.UDF*
:
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.1.0"
)
package com.example.spark.udfs
import org.apache.spark.sql.api.java.UDF1
class addOne extends UDF1[Integer, Integer] {
def call(x: Integer) = x + 1
}
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne")
sqlContext.sql("SELECT add_one(1)").show()
## +------+
## |UDF(1)|
## +------+
## | 2|
## +------+
Versión independiente :
No iría tan lejos como para decir que es compatible, pero ciertamente es posible. Todas las funciones SQL disponibles actualmente en PySpark son simplemente un envoltorio alrededor de la API de Scala.
Supongamos que quiero reutilizar el GroupConcat
GroupConcat que he creado como respuesta al reemplazo de SPARK SQL para la función agregada GROUP_CONCAT de mysql y está ubicado en un paquete com.example.udaf
:
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row
row = Row("k", "v")
df = sc.parallelize([
row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()
def groupConcat(col):
"""Group and concatenate values for a given column
>>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
>>> df.select(groupConcat("v").alias("vs"))
[Row(vs=u''foo,bar'')]
"""
sc = SparkContext._active_spark_context
# It is possible to use java_import to avoid full package path
_groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply
# Converting to Seq to match apply(exprs: Column*)
return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))
df.groupBy("k").agg(groupConcat("v").alias("vs")).show()
## +---+---------+
## | k| vs|
## +---+---------+
## | 1|foo1,foo2|
## | 2|bar1,bar2|
## +---+---------+
Hay demasiados guiones bajos para mi gusto, pero como pueden ver, se puede hacer.
Relacionado con:
- Llamar a la función Java / Scala desde una tarea
- Cómo usar una clase Scala dentro de Pyspark
- Transformando PySpark RDD con Scala