apache spark - ¿Cómo optimizar la partición cuando se migran datos desde la fuente JDBC?
apache-spark hive (3)
-
Determine cuántas particiones necesita, dada la cantidad de datos de entrada y sus recursos de clúster. Como regla general, es mejor mantener la entrada de partición a menos de 1 GB a menos que sea estrictamente necesario. y estrictamente más pequeño que el límite de tamaño de bloque.
Anteriormente, ha declarado que migra 1 TB de los valores de datos que utiliza en diferentes publicaciones (5 - 70) que probablemente sean demasiado bajos para garantizar un proceso sin problemas.
Trate de usar un valor que no requiera más
repartitioning
. -
Conozca sus datos.
Analice las columnas disponibles en el conjunto de datos para determinar si hay columnas con alta cardinalidad y distribución uniforme para distribuir entre el número deseado de particiones. Estos son buenos candidatos para un proceso de importación. Además, debe determinar un rango exacto de valores.
Las agregaciones con diferente centralidad y medida de sesgo, así como los histogramas y los conteos básicos por clave son buenas herramientas de exploración. Para esta parte, es mejor analizar los datos directamente en la base de datos, en lugar de buscarlos en Spark.
Dependiendo del RDBMS, es posible que pueda usar
width_bucket
(PostgreSQL, Oracle) o una función equivalente para obtener una idea decente de cómo se distribuirán los datos en Spark después de cargarlos conpartitionColumn
,lowerBound
,upperBound
,numPartitons
.s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*) FROM t GROUP BY bucket) as tmp)"""
-
Si no hay columnas que cumplan con los criterios anteriores, considere:
-
Creando uno personalizado y exponiéndolo vía.
una vista.
Hashes sobre múltiples columnas independientes son generalmente buenos candidatos.
Consulte el manual de su base de datos para determinar las funciones que se pueden usar aquí (
DBMS_CRYPTO
en Oracle,pgcrypto
en PostgreSQL) *. -
El uso de un conjunto de columnas independientes que, en conjunto, proporciona una cardinalidad suficientemente alta.
Opcionalmente, si va a escribir en una tabla de Hive particionada, debería considerar incluir las columnas de partición de Hive. Podría limitar la cantidad de archivos generados más tarde.
-
Creando uno personalizado y exponiéndolo vía.
una vista.
Hashes sobre múltiples columnas independientes son generalmente buenos candidatos.
Consulte el manual de su base de datos para determinar las funciones que se pueden usar aquí (
-
Preparar argumentos de particionamiento
-
Si la columna seleccionada o creada en los pasos anteriores es numérica ( o fecha / marca de tiempo en Spark> = 2.4 ),
lowerBound
directamente comopartitionColumn
y use los valores de rango determinados antes para completarlowerBound
yupperBound
.Si los valores enlazados no reflejan las propiedades de los datos (
min(col)
paralowerBound
,max(col)
paraupperBound
), puede dar lugar a una desviación significativa de los datos, así que enrosque con cuidado. En el peor de los casos, cuando los límites no cubren el rango de datos, todos los registros serán recuperados por una sola máquina, por lo que no es mejor que ninguna partición. -
Si la columna seleccionada en los pasos anteriores es categórica o si es un conjunto de columnas, se genera una lista de predicados mutuamente exclusivos que cubren completamente los datos, en una forma que se puede usar en una cláusula
SQL
donde.Por ejemplo, si tiene una columna
A
con valores {a1
,a2
,a3
} y una columnaB
con valores {b1
,b2
,b3
}:val predicates = for { a <- Seq("a1", "a2", "a3") b <- Seq("b1", "b2", "b3") } yield s"A = $a AND B = $b"
Verifique que las condiciones no se superpongan y que todas las combinaciones estén cubiertas. Si no se cumplen estas condiciones, terminará con duplicados o registros faltantes, respectivamente.
Pase los datos como argumento de
predicates
a la llamadajdbc
. Tenga en cuenta que el número de particiones será exactamente igual al número de predicados.
-
-
Ponga la base de datos en modo de solo lectura (cualquier escritura en curso puede causar inconsistencias en los datos. Si es posible, debe bloquear la base de datos antes de comenzar todo el proceso, pero si no es posible, en su organización).
-
Si el número de particiones coincide con los datos de carga de salida deseados sin
repartition
y volcado directamente al sumidero, si no puede intentar reparticionar siguiendo las mismas reglas que en el paso 1. -
Si aún experimenta algún problema, asegúrese de haber configurado correctamente las opciones de memoria Spark y GC.
-
Si nada de lo anterior funciona:
-
Considere descargar sus datos a una red / distribuya el almacenamiento usando herramientas como
COPY TO
y léalos directamente desde allí.Tenga en cuenta que, o las utilidades de base de datos estándar, normalmente necesitará un sistema de archivos compatible con POSIX, por lo que HDFS generalmente no funciona.
La ventaja de este enfoque es que no necesita preocuparse por las propiedades de la columna, y no es necesario poner los datos en modo de solo lectura, para garantizar la coherencia.
-
Usando herramientas dedicadas de transferencia masiva, como Apache Sqoop, y remodelando los datos después.
-
* No use pseudocolumns - Pseudocolumn en Spark JDBC .
Estoy tratando de mover datos de una tabla en la tabla PostgreSQL a una tabla Hive en HDFS. Para ello, se me ocurrió el siguiente código:
val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
val colList = allColumns.split(",").toList
val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != '' '')))
val queryCols = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year=''2017'' and period_num=''12''"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
.option("user", devUserName).option("password", devPassword)
.option("partitionColumn","cast_id")
.option("lowerBound", 1).option("upperBound", 100000)
.option("numPartitions",70).load()
val totalCols:List[String] = splitColumns ++ textList
val cdt = new ChangeDataTypes(totalCols, dataMapper)
hiveDataTypes = cdt.gpDetails()
val fc = prepareHiveTableSchema(hiveDataTypes, partition_columns)
val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns
val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
val resultDF = yearDF.select(allCols:_*)
val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
val finalDF = stringColumns.foldLeft(resultDF) {
(tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[/r/n]+", " "), "[/t]+"," "))
}
finalDF
}
val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
val dataDFPart = dataDF.repartition(30)
dataDFPart.createOrReplaceTempView("preparedDF")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")
Los datos se insertan en la tabla hive dinámicamente particionada en base a
prtn_String_columns: source_system_name, period_year, period_num
Envío de chispa utilizado:
SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal [email protected] --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar
Los siguientes mensajes de error se generan en los registros del ejecutor:
Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
at java.util.jar.JarFile.getManifest(JarFile.java:180)
at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)
Veo en los registros que la lectura se está ejecutando correctamente con el número de particiones dado a continuación:
Scan JDBCRelation((select column_names from schema.tablename where period_year=''2017'' and period_num=''12'') as year2017) [numPartitions=50]
A continuación se muestra el estado de los ejecutores en etapas:
Los datos no se están particionando correctamente.
Una partición es más pequeña mientras que la otra se vuelve enorme.
Hay un problema de sesgo aquí.
Al insertar los datos en la tabla de Hive, el trabajo falla en la línea:
spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")
pero entiendo que esto está sucediendo debido a la desviación de los datos problema.
Intenté aumentar el número de ejecutores, aumentar la memoria del ejecutor, la memoria del controlador, intenté simplemente guardar un archivo csv en lugar de guardar el marco de datos en una tabla de Hive, pero nada afecta a la ejecución al dar la excepción:
java.lang.OutOfMemoryError: GC overhead limit exceeded
¿Hay algo en el código que necesito corregir? ¿Alguien podría decirme cómo puedo solucionar este problema?
En mi experiencia, hay 4 tipos de configuraciones de memoria que marcan la diferencia:
A) [1] Memoria para almacenar datos por razones de procesamiento VS [2] Espacio de almacenamiento para almacenar la pila de programas
B) [1] Memoria del ejecutor del controlador VS [2]
Hasta ahora, siempre pude hacer que mis trabajos de Spark se ejecutaran con éxito al aumentar el tipo de memoria apropiada:
A2-B1 sería por lo tanto la memoria disponible en el controlador para mantener la pila de programas. Etc.
Los nombres de las propiedades son los siguientes:
A1-B1)
executor-memory
A1-B2)
driver-memory
A2-B1)
spark.yarn.executor.memoryOverhead
A2-B2)
spark.yarn.driver.memoryOverhead
Tenga en cuenta que la suma de todos * -B1 debe ser menor que la memoria disponible en sus trabajadores y la suma de todos * -B2 debe ser menor que la memoria de su nodo de controlador.
Mi apuesta sería que el culpable es uno de los ajustes del montón marcados en negrita.
Hubo otra pregunta tuya enviada aquí como duplicado.
''How to avoid data skewing while reading huge datasets or tables into spark?
The data is not being partitioned properly. One partition is smaller while the
other one becomes huge on read.
I observed that one of the partition has nearly 2million rows and
while inserting there is a skew in partition. ''
si el problema es lidiar con los datos que se particionan en un marco de datos después de leerlos, ¿ha jugado por ahí aumentando el valor de "numPartitions"?
.option("numPartitions",50)
lowerBound, upperBound
forman pasos de partición para las expresiones generadas de la cláusula WHERE y numpartitions determina el número de división.
digamos, por ejemplo, sometable tiene columna - ID (elegimos eso como
partitionColumn
);
el rango de valores que vemos en la tabla para la columna-
ID
es de 1 a 1000 y queremos obtener todos los registros ejecutando
select * from sometable
, así que vamos con lowerbound = 1 & upperbound = 1000 y numpartition = 4
esto producirá un marco de datos de 4 particiones con el resultado de cada consulta al construir sql basado en nuestro feed
(lowerbound = 1 & upperbound = 1000 and numpartition = 4)
select * from sometable where ID < 250
select * from sometable where ID >= 250 and ID < 500
select * from sometable where ID >= 500 and ID < 750
select * from sometable where ID >= 750
¿Qué
ID(500,750)
si la mayoría de los registros en nuestra tabla están dentro del rango de
ID(500,750)
?
Esa es la situación en la que estás.
Cuando aumentamos la partición numérica, la división ocurre aún más y eso reduce el volumen de registros en la misma partición, pero esto no es un buen disparo.
En lugar de dividir por chispas la columna de
partitioncolumn
función de los límites que proporcionamos, si piensa alimentar la división por sí mismo, los datos se pueden dividir de manera uniforme.
debe cambiar a otro método JDBC donde, en lugar de
(lowerbound,upperbound & numpartition)
, podemos proporcionar predicados directamente.
def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame