python - textfile - sparkcontext text file
Cómo usar una clase Scala dentro de Pyspark (1)
Sí, es posible, aunque puede estar lejos de ser trivial. Por lo general, desea una envoltura Java (amigable) para que no tenga que lidiar con las funciones de Scala que no se pueden expresar fácilmente con Java simple y, como resultado, no funcionan bien con la puerta de enlace Py4J.
Suponiendo que su clase es int en el paquete com.example
y tiene Python DataFrame
llamado df
df = ... # Python DataFrame
tendrás que:
Construye un jar usando tu herramienta de compilación favorita .
--driver-class-path
en el classpath del controlador, por ejemplo, utilizando el--driver-class-path
para PySpark shell /spark-submit
. Dependiendo del código exacto, puede que tenga que pasarlo usando--jars
tambiénExtraiga la instancia de JVM de una instancia de Python
SparkContext
:jvm = sc._jvm
Extraiga Scala
SQLContext
de una instancia deSQLContext
:ssqlContext = sqlContext._ssql_ctx
Extraiga Java
DataFrame
de ladf
:jdf = df._jdf
Crear una nueva instancia de
SimpleClass
:simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
Llame al método
exe
yDataFrame
el resultado usando PythonDataFrame
:from pyspark.sql import DataFrame DataFrame(simpleObject.exe(), ssqlContext)
El resultado debe ser un marco de DataFrame
PySpark DataFrame
. Por supuesto, puede combinar todos los pasos en una sola llamada.
Importante : este enfoque solo es posible si el código Python se ejecuta únicamente en el controlador. No se puede usar dentro de la acción o transformación de Python. Consulte ¿Cómo utilizar la función Java / Scala desde una acción o una transformación? para detalles.
He estado buscando por un tiempo si hay alguna forma de usar una clase de Scala
en Pyspark
, y no he encontrado ninguna documentación ni guía sobre este tema.
Digamos que creo una clase simple en Scala
que usa algunas bibliotecas de apache-spark
, algo así como:
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
def exe(): DataFrame = {
import sqlContext.implicits._
df.select(col(column))
}
}
- ¿Hay alguna manera posible de usar esta clase en
Pyspark
? - ¿Es demasiado duro?
- ¿Tengo que crear un archivo
.py
? - ¿Hay alguna guía que muestre cómo hacerlo?
Por cierto, también miré el código de spark
y me sentí un poco perdido, y fui incapaz de replicar su funcionalidad para mi propio propósito.