scala - parallelize - Caché Spark: RDD Solo 8% en caché
spark rdd take (1)
Para mi fragmento de código como a continuación:
val levelsFile = sc.textFile(levelsFilePath)
val levelsSplitedFile = levelsFile.map(line => line.split(fileDelimiter, -1))
val levelPairRddtemp = levelsSplitedFile
.filter(linearr => ( linearr(pogIndex).length!=0))
.map(linearr => (linearr(pogIndex).toLong, levelsIndexes.map(x => linearr(x))
.filter(value => (!value.equalsIgnoreCase("") && !value.equalsIgnoreCase(" ") && !value.equalsIgnoreCase("null")))))
.mapValues(value => value.mkString(","))
.partitionBy(new HashPartitioner(24))
.persist(StorageLevel.MEMORY_ONLY_SER)
levelPairRddtemp.count // just to trigger rdd creation
Información
- El tamaño del archivo es ~ 4G
- Estoy usando 2
executors
(5G cada uno) y 12 núcleos. - Versión
Spark
: 1.5.2
Problema
Cuando miro SparkUI
en la Storage tab
, lo que veo es:
Al mirar dentro del RDD
, parece que solo 2 de las 24 partitions
están en la memoria caché.
Cualquier explicación a este comportamiento, y cómo solucionarlo.
EDIT 1 : Acabo de probar con 60 particiones para HashPartitioner
como:
..
.partitionBy(new HashPartitioner(60))
..
Y funcionó . Ahora estoy almacenando el RDD
completo en caché. ¿Adivina qué pudo haber pasado aquí? ¿Puede la asimetría de los datos causar este comportamiento?
Edit-2 : Registros que contienen BlockManagerInfo
cuando corrí de nuevo con 24 partitions
. Esta vez 3/24 partitions
fueron almacenadas en caché:
16/03/17 14:15:28 INFO BlockManagerInfo: Added rdd_294_14 in memory on ip-10-1-34-66.ec2.internal:47526 (size: 107.3 MB, free: 2.6 GB)
16/03/17 14:15:30 INFO BlockManagerInfo: Added rdd_294_17 in memory on ip-10-1-34-65.ec2.internal:57300 (size: 107.3 MB, free: 2.6 GB)
16/03/17 14:15:30 INFO BlockManagerInfo: Added rdd_294_21 in memory on ip-10-1-34-65.ec2.internal:57300 (size: 107.4 MB, free: 2.5 GB)
Creo que esto sucede porque se alcanzan los límites de memoria, o incluso más en el punto, las opciones de memoria que utiliza no permiten que su trabajo utilice todos los recursos.
Aumentar las #particiones significa disminuir el tamaño de cada tarea, lo que podría explicar el comportamiento.