apache spark - spark - Obtener el número actual de particiones de un DataFrame
spark streaming (4)
convertir a RDD y luego obtener la longitud de las particiones
DF.rdd.partitions.length
¿Hay alguna forma de obtener el número actual de particiones de un DataFrame? Verifiqué el DataFrame javadoc (spark 1.6) y no encontré un método para eso, ¿o simplemente me lo perdí? (En el caso de JavaRDD, hay un método getNumPartitions ()).
dataframe.rdd.partitions.size
es otra alternativa aparte de
df.rdd.getNumPartitions()
o
df.rdd.length
.
déjame explicarte esto con un ejemplo completo ...
val x = (1 to 10).toList
val numberDF = x.toDF(“number”)
numberDF.rdd.partitions.size // => 4
Para probar la cantidad de particiones que obtuvimos arriba ... guarde ese marco de datos como csv
numberDF.write.csv(“/Users/Ram.Ghadiyaram/output/numbers”)
Así es como se separan los datos en las diferentes particiones.
Partition 00000: 1, 2
Partition 00001: 3, 4, 5
Partition 00002: 6, 7
Partition 00003: 8, 9, 10
Actualización:
@Hemanth hizo una buena pregunta en el comentario ... básicamente por qué el número de particiones es 4 en el caso anterior
Respuesta corta: depende de los casos en los que esté ejecutando. desde que utilicé local [4], obtuve 4 particiones.
Respuesta larga :
Estaba ejecutando el programa anterior en mi máquina local y usé master como local [4] en base a que estaba tomando 4 particiones.
DF.rdd.partitions.length
Si su caparazón en hilo maestro obtuve el número de particiones como 2
ejemplo:
spark-shell --master yarn
y volvió a escribir los mismos comandos
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
df.rdd.getNumPartitions
- aquí 2 es el paralelismo predeterminado de chispa
-
Basado en hashpartitioner spark decidirá cuántas particiones distribuir.
si está ejecutando en
--master local
y basado en suRuntime.getRuntime.availableProcessors()
es decir,local[Runtime.getRuntime.availableProcessors()]
intentará asignar ese número de particiones. si su número disponible de procesadores es 12 (es decir,local[Runtime.getRuntime.availableProcessors()])
y tiene una lista de 1 a 10, solo se crearán 10 particiones.
NOTA:
Si está en una computadora portátil de 12 núcleos donde estoy ejecutando el programa spark y, por defecto, la cantidad de particiones / tareas es la cantidad de todos los núcleos disponibles, es decir, 12. eso significa
local[*]
os"local[${Runtime.getRuntime.availableProcessors()}]")
pero en este caso solo hay 10 números, por lo que se limitará a 10
teniendo en cuenta todos estos indicadores, te sugiero que lo pruebes por tu cuenta
getNumPartitions()
llamar a
getNumPartitions()
en el RDD subyacente del DataFrame, por ejemplo,
df.rdd.getNumPartitions()
.
En el caso de Scala, este es un método sin parámetros:
df.rdd.getNumPartitions
.
val df = Seq( ("A", 1), ("B", 2), ("A", 3), ("C", 1) ).toDF("k", "v") df.rdd.getNumPartitions