apache-spark jdbc hive apache-spark-sql partitioning

apache spark - ¿Cómo optimizar la partición cuando se migran datos desde la fuente JDBC?



apache-spark hive (3)

  1. Determine cuántas particiones necesita, dada la cantidad de datos de entrada y sus recursos de clúster. Como regla general, es mejor mantener la entrada de partición a menos de 1 GB a menos que sea estrictamente necesario. y estrictamente más pequeño que el límite de tamaño de bloque.

    Anteriormente, ha declarado que migra 1 TB de los valores de datos que utiliza en diferentes publicaciones (5 - 70) que probablemente sean demasiado bajos para garantizar un proceso sin problemas.

    Trate de usar un valor que no requiera más repartitioning .

  2. Conozca sus datos.

    Analice las columnas disponibles en el conjunto de datos para determinar si hay columnas con alta cardinalidad y distribución uniforme para distribuir entre el número deseado de particiones. Estos son buenos candidatos para un proceso de importación. Además, debe determinar un rango exacto de valores.

    Las agregaciones con diferente centralidad y medida de sesgo, así como los histogramas y los conteos básicos por clave son buenas herramientas de exploración. Para esta parte, es mejor analizar los datos directamente en la base de datos, en lugar de buscarlos en Spark.

    Dependiendo del RDBMS, es posible que pueda usar width_bucket (PostgreSQL, Oracle) o una función equivalente para obtener una idea decente de cómo se distribuirán los datos en Spark después de cargarlos con partitionColumn , lowerBound , upperBound , numPartitons .

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*) FROM t GROUP BY bucket) as tmp)"""

  3. Si no hay columnas que cumplan con los criterios anteriores, considere:

    • Creando uno personalizado y exponiéndolo vía. una vista. Hashes sobre múltiples columnas independientes son generalmente buenos candidatos. Consulte el manual de su base de datos para determinar las funciones que se pueden usar aquí ( DBMS_CRYPTO en Oracle, pgcrypto en PostgreSQL) *.
    • El uso de un conjunto de columnas independientes que, en conjunto, proporciona una cardinalidad suficientemente alta.

      Opcionalmente, si va a escribir en una tabla de Hive particionada, debería considerar incluir las columnas de partición de Hive. Podría limitar la cantidad de archivos generados más tarde.

  4. Preparar argumentos de particionamiento

    • Si la columna seleccionada o creada en los pasos anteriores es numérica ( o fecha / marca de tiempo en Spark> = 2.4 ), lowerBound directamente como partitionColumn y use los valores de rango determinados antes para completar lowerBound y upperBound .

      Si los valores enlazados no reflejan las propiedades de los datos ( min(col) para lowerBound , max(col) para upperBound ), puede dar lugar a una desviación significativa de los datos, así que enrosque con cuidado. En el peor de los casos, cuando los límites no cubren el rango de datos, todos los registros serán recuperados por una sola máquina, por lo que no es mejor que ninguna partición.

    • Si la columna seleccionada en los pasos anteriores es categórica o si es un conjunto de columnas, se genera una lista de predicados mutuamente exclusivos que cubren completamente los datos, en una forma que se puede usar en una cláusula SQL donde.

      Por ejemplo, si tiene una columna A con valores { a1 , a2 , a3 } y una columna B con valores { b1 , b2 , b3 }:

      val predicates = for { a <- Seq("a1", "a2", "a3") b <- Seq("b1", "b2", "b3") } yield s"A = $a AND B = $b"

      Verifique que las condiciones no se superpongan y que todas las combinaciones estén cubiertas. Si no se cumplen estas condiciones, terminará con duplicados o registros faltantes, respectivamente.

      Pase los datos como argumento de predicates a la llamada jdbc . Tenga en cuenta que el número de particiones será exactamente igual al número de predicados.

  5. Ponga la base de datos en modo de solo lectura (cualquier escritura en curso puede causar inconsistencias en los datos. Si es posible, debe bloquear la base de datos antes de comenzar todo el proceso, pero si no es posible, en su organización).

  6. Si el número de particiones coincide con los datos de carga de salida deseados sin repartition y volcado directamente al sumidero, si no puede intentar reparticionar siguiendo las mismas reglas que en el paso 1.

  7. Si aún experimenta algún problema, asegúrese de haber configurado correctamente las opciones de memoria Spark y GC.

  8. Si nada de lo anterior funciona:

    • Considere descargar sus datos a una red / distribuya el almacenamiento usando herramientas como COPY TO y léalos directamente desde allí.

      Tenga en cuenta que, o las utilidades de base de datos estándar, normalmente necesitará un sistema de archivos compatible con POSIX, por lo que HDFS generalmente no funciona.

      La ventaja de este enfoque es que no necesita preocuparse por las propiedades de la columna, y no es necesario poner los datos en modo de solo lectura, para garantizar la coherencia.

    • Usando herramientas dedicadas de transferencia masiva, como Apache Sqoop, y remodelando los datos después.

* No use pseudocolumns - Pseudocolumn en Spark JDBC .

Estoy tratando de mover datos de una tabla en la tabla PostgreSQL a una tabla Hive en HDFS. Para ello, se me ocurrió el siguiente código:

val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true") val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate() def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = { val colList = allColumns.split(",").toList val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != '' ''))) val queryCols = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",") val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year=''2017'' and period_num=''12''" val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017") .option("user", devUserName).option("password", devPassword) .option("partitionColumn","cast_id") .option("lowerBound", 1).option("upperBound", 100000) .option("numPartitions",70).load() val totalCols:List[String] = splitColumns ++ textList val cdt = new ChangeDataTypes(totalCols, dataMapper) hiveDataTypes = cdt.gpDetails() val fc = prepareHiveTableSchema(hiveDataTypes, partition_columns) val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname)) val resultDF = yearDF.select(allCols:_*) val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name) val finalDF = stringColumns.foldLeft(resultDF) { (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[/r/n]+", " "), "[/t]+"," ")) } finalDF } val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark) val dataDFPart = dataDF.repartition(30) dataDFPart.createOrReplaceTempView("preparedDF") spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") spark.sql("set hive.exec.dynamic.partition=true") spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

Los datos se insertan en la tabla hive dinámicamente particionada en base a prtn_String_columns: source_system_name, period_year, period_num

Envío de chispa utilizado:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal [email protected] --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

Los siguientes mensajes de error se generan en los registros del ejecutor:

Container exited with a non-zero exit code 143. Killed by external signal 18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system] java.lang.OutOfMemoryError: Java heap space at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88) at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393) at java.util.zip.ZipFile.getInputStream(ZipFile.java:374) at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199) at java.util.jar.JarFile.getManifest(JarFile.java:180) at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944) at java.net.URLClassLoader.defineClass(URLClassLoader.java:450) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:745)

Veo en los registros que la lectura se está ejecutando correctamente con el número de particiones dado a continuación:

Scan JDBCRelation((select column_names from schema.tablename where period_year=''2017'' and period_num=''12'') as year2017) [numPartitions=50]

A continuación se muestra el estado de los ejecutores en etapas:

Los datos no se están particionando correctamente. Una partición es más pequeña mientras que la otra se vuelve enorme. Hay un problema de sesgo aquí. Al insertar los datos en la tabla de Hive, el trabajo falla en la línea: spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") pero entiendo que esto está sucediendo debido a la desviación de los datos problema.

Intenté aumentar el número de ejecutores, aumentar la memoria del ejecutor, la memoria del controlador, intenté simplemente guardar un archivo csv en lugar de guardar el marco de datos en una tabla de Hive, pero nada afecta a la ejecución al dar la excepción:

java.lang.OutOfMemoryError: GC overhead limit exceeded

¿Hay algo en el código que necesito corregir? ¿Alguien podría decirme cómo puedo solucionar este problema?


En mi experiencia, hay 4 tipos de configuraciones de memoria que marcan la diferencia:

A) [1] Memoria para almacenar datos por razones de procesamiento VS [2] Espacio de almacenamiento para almacenar la pila de programas

B) [1] Memoria del ejecutor del controlador VS [2]

Hasta ahora, siempre pude hacer que mis trabajos de Spark se ejecutaran con éxito al aumentar el tipo de memoria apropiada:

A2-B1 sería por lo tanto la memoria disponible en el controlador para mantener la pila de programas. Etc.

Los nombres de las propiedades son los siguientes:

A1-B1) executor-memory

A1-B2) driver-memory

A2-B1) spark.yarn.executor.memoryOverhead

A2-B2) spark.yarn.driver.memoryOverhead

Tenga en cuenta que la suma de todos * -B1 debe ser menor que la memoria disponible en sus trabajadores y la suma de todos * -B2 debe ser menor que la memoria de su nodo de controlador.

Mi apuesta sería que el culpable es uno de los ajustes del montón marcados en negrita.


Hubo otra pregunta tuya enviada aquí como duplicado.

''How to avoid data skewing while reading huge datasets or tables into spark? The data is not being partitioned properly. One partition is smaller while the other one becomes huge on read. I observed that one of the partition has nearly 2million rows and while inserting there is a skew in partition. ''

si el problema es lidiar con los datos que se particionan en un marco de datos después de leerlos, ¿ha jugado por ahí aumentando el valor de "numPartitions"?

.option("numPartitions",50)

lowerBound, upperBound forman pasos de partición para las expresiones generadas de la cláusula WHERE y numpartitions determina el número de división.

digamos, por ejemplo, sometable tiene columna - ID (elegimos eso como partitionColumn ); el rango de valores que vemos en la tabla para la columna- ID es de 1 a 1000 y queremos obtener todos los registros ejecutando select * from sometable , así que vamos con lowerbound = 1 & upperbound = 1000 y numpartition = 4

esto producirá un marco de datos de 4 particiones con el resultado de cada consulta al construir sql basado en nuestro feed (lowerbound = 1 & upperbound = 1000 and numpartition = 4)

select * from sometable where ID < 250 select * from sometable where ID >= 250 and ID < 500 select * from sometable where ID >= 500 and ID < 750 select * from sometable where ID >= 750

¿Qué ID(500,750) si la mayoría de los registros en nuestra tabla están dentro del rango de ID(500,750) ? Esa es la situación en la que estás.

Cuando aumentamos la partición numérica, la división ocurre aún más y eso reduce el volumen de registros en la misma partición, pero esto no es un buen disparo.

En lugar de dividir por chispas la columna de partitioncolumn función de los límites que proporcionamos, si piensa alimentar la división por sí mismo, los datos se pueden dividir de manera uniforme. debe cambiar a otro método JDBC donde, en lugar de (lowerbound,upperbound & numpartition) , podemos proporcionar predicados directamente.

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame

Link