python - saveastable - structtype pyspark
¿Cómo convertir un DataFrame a un RDD normal en pyspark? (3)
Necesito usar el
(rdd.)partitionBy(npartitions, custom_partitioner)
método que no está disponible en el DataFrame. Todos los métodos de DataFrame se refieren solo a los resultados del DataFrame. Entonces, ¿cómo crear un RDD a partir de los datos del DataFrame?
Nota: esto es un cambio (en 1.3.0) de 1.2.0.
Actualiza la respuesta de @dpangmao: el método es .rdd. Me interesaba comprender si (a) era público y (b) cuáles eran las implicaciones de rendimiento.
Bien (a) es sí y (b) - bueno, aquí se puede ver que hay implicaciones significativas en el rendimiento : se debe crear un nuevo RDD invocando mapPartitions :
En dataframe.py (tenga en cuenta que el nombre del archivo también cambió (era sql.py):
@property
def rdd(self):
"""
Return the content of the :class:`DataFrame` as an :class:`RDD`
of :class:`Row` s.
"""
if not hasattr(self, ''_lazy_rdd''):
jrdd = self._jdf.javaToPython()
rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
schema = self.schema
def applySchema(it):
cls = _create_cls(schema)
return itertools.imap(cls, it)
self._lazy_rdd = rdd.mapPartitions(applySchema)
return self._lazy_rdd
La respuesta dada por kennyut / Kistian funciona muy bien, pero para obtener un RDD exacto como salida cuando RDD consiste en una lista de atributos, por ejemplo [1,2,3,4] podemos usar el comando flatmap como se muestra a continuación,
rdd = df.rdd.flatMap(list)
or
rdd = df.rdd.flatmap(lambda x: list(x))
La respuesta de @dapangmao funciona, pero no da la chispa regular RDD, devuelve un objeto Row. Si quieres tener el formato RDD regular.
Prueba esto:
rdd = df.rdd.map(tuple)
o
rdd = df.rdd.map(list)
Utiliza el método .rdd
esta manera:
rdd = df.rdd