sql - cluster - amazon emr
La consulta SQL en tamaƱo Spark/scala excede a Integer.MAX_VALUE (1)
Ningún bloque de aleatorización de chispas puede ser más grande que 2 GB (bytes Integer.MAX_VALUE) por lo que necesita particiones más / más pequeñas.
Debes ajustar spark.default.parallelism y spark.sql.shuffle.partitions (por defecto 200) para que la cantidad de particiones pueda acomodar tus datos sin alcanzar el límite de 2GB (puedes intentar apuntar a 256MB / partición para que con 200GB obtengas 800 particiones). Miles de particiones son muy comunes, así que no tengas miedo de volver a particionar a 1000 como se sugiere.
Para su información, puede verificar el número de particiones para un RDD con algo como rdd.getNumPartitions (es decir, d2.rdd.getNumPartitions)
Hay una historia para rastrear el esfuerzo de abordar los distintos límites de 2 GB (ha estado abierto por un tiempo): https://issues.apache.org/jira/browse/SPARK-6235
Consulte http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25 para obtener más información sobre este error.
Estoy tratando de crear una consulta SQL simple en eventos S3 usando Spark. Estoy cargando ~ 30GB de archivos JSON como sigue:
val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");
Entonces estoy tratando de escribir para archivar el resultado de mi consulta:
val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");
Pero Spark está lanzando la siguiente excepción:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Tenga en cuenta que la misma consulta funciona para cantidades más pequeñas de datos. ¿Cuál es el problema aquí?