scala - ¿Por qué el parámetro de partición de SparkContext.textFile no tiene efecto?
apache-spark rdd (2)
@ zero323 lo logró, pero pensé que agregaría un poco más de antecedentes (de bajo nivel) sobre cómo este parámetro de entrada
minPartitions
influye en el número de particiones.
tl; dr
El parámetro de partición tiene un efecto en
SparkContext.textFile
como el número
mínimo
(¡no exacto!) de particiones.
En este caso particular de usar
SparkContext.textFile
, el número de particiones se calcula directamente por
org.apache.hadoop.mapred.TextInputFormat.getSplits (jobConf, minPartitions)
que utiliza
textFile
.
TextInputFormat
solo
sabe cómo particionar (también conocido como
dividir
) los datos distribuidos con Spark solo siguiendo los consejos.
Del javadoc FileInputFormat de Hadoop:
FileInputFormat es la clase base para todos los InputFormats basados en archivos. Esto proporciona una implementación genérica de getSplits (JobConf, int). Las subclases de FileInputFormat también pueden anular el método isSplitable (FileSystem, Path) para garantizar que los archivos de entrada no se dividan y que Mappers los procese como un todo.
Es un muy buen ejemplo de cómo Spark aprovecha la API de Hadoop.
Por cierto, puede encontrar las fuentes esclarecedoras ;-)
scala> val p=sc.textFile("file:///c:/_home/so-posts.xml", 8) //i''ve 8 cores
p: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:21
scala> p.partitions.size
res33: Int = 729
Esperaba que se imprimiera 8 y veo 729 tareas en Spark UI
EDITAR:
Después de llamar a
repartition()
como lo sugiere @ zero323
scala> p1 = p.repartition(8)
scala> p1.partitions.size
res60: Int = 8
scala> p1.count
Todavía veo 729 tareas en la interfaz de usuario de Spark a pesar de que la chispa imprime 8.
Si echas un vistazo a la firma
textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
verá que el argumento que usa se llama
minPartitions
y esto describe más o menos su función.
En algunos casos, incluso eso se ignora, pero es un asunto diferente.
El formato de entrada que se usa detrás de escena todavía decide cómo calcular las divisiones.
En este caso particular, probablemente podría usar
mapred.min.split.size
para aumentar el tamaño de división (esto funcionará durante la carga) o simplemente
repartition
después de cargar (esto tendrá efecto después de cargar los datos), pero en general no debería ser necesario ese.