sqlcontext spark read functions example apache-spark spark-dataframe dataframe apache-spark-sql

apache spark - spark - Obtener el número actual de particiones de un DataFrame



spark streaming (4)

convertir a RDD y luego obtener la longitud de las particiones

DF.rdd.partitions.length

¿Hay alguna forma de obtener el número actual de particiones de un DataFrame? Verifiqué el DataFrame javadoc (spark 1.6) y no encontré un método para eso, ¿o simplemente me lo perdí? (En el caso de JavaRDD, hay un método getNumPartitions ()).


dataframe.rdd.partitions.size es otra alternativa aparte de df.rdd.getNumPartitions() o df.rdd.length .

déjame explicarte esto con un ejemplo completo ...

val x = (1 to 10).toList val numberDF = x.toDF(“number”) numberDF.rdd.partitions.size // => 4

Para probar la cantidad de particiones que obtuvimos arriba ... guarde ese marco de datos como csv

numberDF.write.csv(“/Users/Ram.Ghadiyaram/output/numbers”)

Así es como se separan los datos en las diferentes particiones.

Partition 00000: 1, 2 Partition 00001: 3, 4, 5 Partition 00002: 6, 7 Partition 00003: 8, 9, 10

Actualización:

@Hemanth hizo una buena pregunta en el comentario ... básicamente por qué el número de particiones es 4 en el caso anterior

Respuesta corta: depende de los casos en los que esté ejecutando. desde que utilicé local [4], obtuve 4 particiones.

Respuesta larga :

Estaba ejecutando el programa anterior en mi máquina local y usé master como local [4] en base a que estaba tomando 4 particiones.

DF.rdd.partitions.length

Si su caparazón en hilo maestro obtuve el número de particiones como 2

ejemplo: spark-shell --master yarn y volvió a escribir los mismos comandos

val df = Seq( ("A", 1), ("B", 2), ("A", 3), ("C", 1) ).toDF("k", "v") df.rdd.getNumPartitions

  • aquí 2 es el paralelismo predeterminado de chispa
  • Basado en hashpartitioner spark decidirá cuántas particiones distribuir. si está ejecutando en --master local y basado en su Runtime.getRuntime.availableProcessors() es decir, local[Runtime.getRuntime.availableProcessors()] intentará asignar ese número de particiones. si su número disponible de procesadores es 12 (es decir, local[Runtime.getRuntime.availableProcessors()]) y tiene una lista de 1 a 10, solo se crearán 10 particiones.

NOTA:

Si está en una computadora portátil de 12 núcleos donde estoy ejecutando el programa spark y, por defecto, la cantidad de particiones / tareas es la cantidad de todos los núcleos disponibles, es decir, 12. eso significa local[*] o s"local[${Runtime.getRuntime.availableProcessors()}]") pero en este caso solo hay 10 números, por lo que se limitará a 10

teniendo en cuenta todos estos indicadores, te sugiero que lo pruebes por tu cuenta


getNumPartitions() llamar a getNumPartitions() en el RDD subyacente del DataFrame, por ejemplo, df.rdd.getNumPartitions() . En el caso de Scala, este es un método sin parámetros: df.rdd.getNumPartitions .


val df = Seq( ("A", 1), ("B", 2), ("A", 3), ("C", 1) ).toDF("k", "v") df.rdd.getNumPartitions