apache-spark hive apache-spark-sql partitioning

apache spark - Cómo controlar el tamaño de partición en Spark SQL



apache-spark hive (3)

Si su SQL realiza una combinación aleatoria (por ejemplo, tiene una unión, o algún tipo de grupo por), puede establecer el número de particiones configurando la propiedad ''spark.sql.shuffle.partitions''

sqlContext.setConf( "spark.sql.shuffle.partitions", 64)

Siguiendo con lo que sugiere Fokko, podría usar una variable aleatoria para agrupar.

val result = sqlContext.sql(""" select * from ( select *,random(64) as rand_part from bt_st_ent ) cluster by rand_part""")

Tengo un requisito para cargar datos de una tabla de Hive usando Spark SQL HiveContext y cargar en HDFS. Por defecto, el DataFrame de la salida SQL tiene 2 particiones. Para obtener más paralelismo, necesito más particiones fuera del SQL. No hay un método sobrecargado en HiveContex t para tomar el número de parámetros de particiones.

Reparticionar el RDD causa barajado y resulta en más tiempo de procesamiento.

>

val result = sqlContext.sql("select * from bt_st_ent")

Tiene la salida de registro de:

Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes) Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes)

Me gustaría saber si hay alguna forma de aumentar el tamaño de las particiones de la salida de SQL.


Un problema muy común y doloroso. Debe buscar una clave que distribuya los datos en particiones uniformes. Puede usar los operadores DISTRIBUTE BY y CLUSTER BY para indicarle a spark que agrupe las filas en una partición. Esto incurrirá en una sobrecarga en la consulta misma. Pero dará como resultado particiones de tamaño uniforme. Deepsense tiene un muy buen tutorial sobre esto.


Chispa <2.0 :

Puede usar las opciones de configuración de Hadoop:

  • mapred.min.split.size .
  • mapred.max.split.size

así como el tamaño de bloque HDFS para controlar el tamaño de partición para formatos basados ​​en el sistema de archivos *.

val minSplit: Int = ??? val maxSplit: Int = ??? sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit) sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit)

Spark 2.0+ :

Puede usar la configuración de spark.sql.files.maxPartitionBytes :

spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit)

En ambos casos, estos valores pueden no estar en uso por una API de fuente de datos específica, por lo que siempre debe verificar los detalles de la documentación / implementación del formato que utiliza.

* Otros formatos de entrada pueden usar diferentes configuraciones. Ver por ejemplo

  • Particionar en chispa mientras lee desde RDBMS a través de JDBC
  • Diferencia entre mapreduce split y spark paritition

Además, los Datasets creados a partir de RDDs heredarán el diseño de partición de sus padres.

Del mismo modo, las tablas agrupadas utilizarán el diseño del depósito definido en el metastore con relación 1: 1 entre el depósito y la partición del Dataset .