scala apache-spark rdd

scala - ¿Por qué el parámetro de partición de SparkContext.textFile no tiene efecto?



apache-spark rdd (2)

@ zero323 lo logró, pero pensé que agregaría un poco más de antecedentes (de bajo nivel) sobre cómo este parámetro de entrada minPartitions influye en el número de particiones.

tl; dr El parámetro de partición tiene un efecto en SparkContext.textFile como el número mínimo (¡no exacto!) de particiones.

En este caso particular de usar SparkContext.textFile , el número de particiones se calcula directamente por org.apache.hadoop.mapred.TextInputFormat.getSplits (jobConf, minPartitions) que utiliza textFile . TextInputFormat solo sabe cómo particionar (también conocido como dividir ) los datos distribuidos con Spark solo siguiendo los consejos.

Del javadoc FileInputFormat de Hadoop:

FileInputFormat es la clase base para todos los InputFormats basados ​​en archivos. Esto proporciona una implementación genérica de getSplits (JobConf, int). Las subclases de FileInputFormat también pueden anular el método isSplitable (FileSystem, Path) para garantizar que los archivos de entrada no se dividan y que Mappers los procese como un todo.

Es un muy buen ejemplo de cómo Spark aprovecha la API de Hadoop.

Por cierto, puede encontrar las fuentes esclarecedoras ;-)

scala> val p=sc.textFile("file:///c:/_home/so-posts.xml", 8) //i''ve 8 cores p: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:21 scala> p.partitions.size res33: Int = 729

Esperaba que se imprimiera 8 y veo 729 tareas en Spark UI

EDITAR:

Después de llamar a repartition() como lo sugiere @ zero323

scala> p1 = p.repartition(8) scala> p1.partitions.size res60: Int = 8 scala> p1.count

Todavía veo 729 tareas en la interfaz de usuario de Spark a pesar de que la chispa imprime 8.


Si echas un vistazo a la firma

textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

verá que el argumento que usa se llama minPartitions y esto describe más o menos su función. En algunos casos, incluso eso se ignora, pero es un asunto diferente. El formato de entrada que se usa detrás de escena todavía decide cómo calcular las divisiones.

En este caso particular, probablemente podría usar mapred.min.split.size para aumentar el tamaño de división (esto funcionará durante la carga) o simplemente repartition después de cargar (esto tendrá efecto después de cargar los datos), pero en general no debería ser necesario ese.