python - textfile - sparkcontext text file
Cómo usar una clase Scala dentro de Pyspark (1)
Sí, es posible, aunque puede estar lejos de ser trivial. Por lo general, desea una envoltura Java (amigable) para que no tenga que lidiar con las funciones de Scala que no se pueden expresar fácilmente con Java simple y, como resultado, no funcionan bien con la puerta de enlace Py4J.
Suponiendo que su clase es int en el paquete com.example y tiene Python DataFrame llamado df
df = ... # Python DataFrame
tendrás que:
Construye un jar usando tu herramienta de compilación favorita .
--driver-class-pathen el classpath del controlador, por ejemplo, utilizando el--driver-class-pathpara PySpark shell /spark-submit. Dependiendo del código exacto, puede que tenga que pasarlo usando--jarstambiénExtraiga la instancia de JVM de una instancia de Python
SparkContext:jvm = sc._jvmExtraiga Scala
SQLContextde una instancia deSQLContext:ssqlContext = sqlContext._ssql_ctxExtraiga Java
DataFramede ladf:jdf = df._jdfCrear una nueva instancia de
SimpleClass:simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")Llame al método
exeyDataFrameel resultado usando PythonDataFrame:from pyspark.sql import DataFrame DataFrame(simpleObject.exe(), ssqlContext)
El resultado debe ser un marco de DataFrame PySpark DataFrame . Por supuesto, puede combinar todos los pasos en una sola llamada.
Importante : este enfoque solo es posible si el código Python se ejecuta únicamente en el controlador. No se puede usar dentro de la acción o transformación de Python. Consulte ¿Cómo utilizar la función Java / Scala desde una acción o una transformación? para detalles.
He estado buscando por un tiempo si hay alguna forma de usar una clase de Scala en Pyspark , y no he encontrado ninguna documentación ni guía sobre este tema.
Digamos que creo una clase simple en Scala que usa algunas bibliotecas de apache-spark , algo así como:
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
def exe(): DataFrame = {
import sqlContext.implicits._
df.select(col(column))
}
}
- ¿Hay alguna manera posible de usar esta clase en
Pyspark? - ¿Es demasiado duro?
- ¿Tengo que crear un archivo
.py? - ¿Hay alguna guía que muestre cómo hacerlo?
Por cierto, también miré el código de spark y me sentí un poco perdido, y fui incapaz de replicar su funcionalidad para mi propio propósito.