apache spark - Cómo controlar el tamaño de partición en Spark SQL
apache-spark hive (3)
Si su SQL realiza una combinación aleatoria (por ejemplo, tiene una unión, o algún tipo de grupo por), puede establecer el número de particiones configurando la propiedad ''spark.sql.shuffle.partitions''
sqlContext.setConf( "spark.sql.shuffle.partitions", 64)
Siguiendo con lo que sugiere Fokko, podría usar una variable aleatoria para agrupar.
val result = sqlContext.sql("""
select * from (
select *,random(64) as rand_part from bt_st_ent
) cluster by rand_part""")
Tengo un requisito para cargar datos de una tabla de Hive usando Spark SQL
HiveContext
y cargar en HDFS.
Por defecto, el
DataFrame
de la salida SQL tiene 2 particiones.
Para obtener más paralelismo, necesito más particiones fuera del SQL.
No hay un método sobrecargado en
HiveContex
t para tomar el número de parámetros de particiones.
Reparticionar el RDD causa barajado y resulta en más tiempo de procesamiento.
>
val result = sqlContext.sql("select * from bt_st_ent")
Tiene la salida de registro de:
Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes)
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes)
Me gustaría saber si hay alguna forma de aumentar el tamaño de las particiones de la salida de SQL.
Un problema muy común y doloroso.
Debe buscar una clave que distribuya los datos en particiones uniformes.
Puede usar los operadores
DISTRIBUTE BY
y
CLUSTER BY
para indicarle a spark que agrupe las filas en una partición.
Esto incurrirá en una sobrecarga en la consulta misma.
Pero dará como resultado particiones de tamaño uniforme.
Deepsense
tiene un muy buen tutorial sobre esto.
Chispa <2.0 :
Puede usar las opciones de configuración de Hadoop:
-
mapred.min.split.size
. -
mapred.max.split.size
así como el tamaño de bloque HDFS para controlar el tamaño de partición para formatos basados en el sistema de archivos *.
val minSplit: Int = ???
val maxSplit: Int = ???
sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit)
sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit)
Spark 2.0+ :
Puede usar la configuración de
spark.sql.files.maxPartitionBytes
:
spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit)
En ambos casos, estos valores pueden no estar en uso por una API de fuente de datos específica, por lo que siempre debe verificar los detalles de la documentación / implementación del formato que utiliza.
* Otros formatos de entrada pueden usar diferentes configuraciones. Ver por ejemplo
- Particionar en chispa mientras lee desde RDBMS a través de JDBC
- Diferencia entre mapreduce split y spark paritition
Además, los
Datasets
creados a partir de
RDDs
heredarán el diseño de partición de sus padres.
Del mismo modo, las tablas agrupadas utilizarán el diseño del depósito definido en el metastore con relación 1: 1 entre el depósito y la partición del
Dataset
.