spark aws scala amazon-s3 apache-spark apache-spark-sql parquet

scala - aws - amazon emr



Usar Spark para escribir un archivo de parquet en s3 sobre s3a es muy lento (2)

Los valores predeterminados de chispa causan una gran cantidad de sobrecarga (probablemente) innecesaria durante las operaciones de E / S, especialmente al escribir en S3. Este artículo analiza esto más a fondo, pero hay dos configuraciones que querrá considerar cambiar.

  • Usando DirectParquetOutputCommitter. Por defecto, Spark guardará todos los datos en una carpeta temporal y luego moverá esos archivos. El uso del DirectParquetOutputCommitter ahorrará tiempo al escribir directamente en la ruta de salida S3

    • Ya no está disponible en Spark 2.0+
      • Como se indica en el boleto jira, la solución actual es
        1. Cambie su código a usar s3a y Hadoop 2.7.2+; es mejor todo, mejora en Hadoop 2.8 y es la base para s3guard
        2. Utilice Hadoop FileOutputCommitter y establezca mapreduce.fileoutputcommitter.algorithm.version en 2
  • Desactiva la combinación de esquemas. Si la combinación de esquemas está activada, el nodo del controlador analizará todos los archivos para garantizar un esquema coherente. Esto es especialmente costoso porque no es una operación distribuida. Asegúrate de que esto se apaga haciendo

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

Estoy intentando escribir un archivo de parquet en Amazon S3 usando Spark 1.6.1. El pequeño parquet que estoy generando es ~ 2GB una vez escrito, así que no hay demasiados datos. Estoy tratando de demostrar que Spark es una plataforma que puedo usar.

Básicamente, lo que voy a hacer es configurar un esquema de estrella con marcos de datos, luego voy a escribir esas tablas en el parquet. Los datos provienen de archivos csv proporcionados por un proveedor y estoy usando Spark como una plataforma ETL. Actualmente tengo un clúster de 3 nodos en ec2 (r3.2xlarge) Así que 120 GB de memoria en los ejecutores y 16 núcleos en total.

Los archivos de entrada suman aproximadamente 22 GB y estoy extrayendo aproximadamente 2 GB de esos datos por ahora. Eventualmente, serán muchos terabytes cuando empiece a cargar el conjunto de datos completo.

Aquí está mi pseudocódigo de chispa / scala:

def loadStage(): Unit = { sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData") sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter") sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false") var sqlCtx = new SQLContext(sc) val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz") //Setup header table/df val header_rec = DataFile.map(_.split("//|")).filter(x=> x(0) == "1") val headerSchemaDef = "market_no,rel_date,field1, field2, field3....." val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false))) val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) )) val header = sqlCtx.createDataFrame(headerRecords, headerSchema) header.registerTempTable("header") sqlCtx.cacheTable("header") //Setup fact table/df val fact_recs = DataFile.map(_.split("//|")).filter(x=> x(0) == "2") val factSchemaDef = "market_no,rel_date,field1, field2, field3....." val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false))) val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10))) val df = sqlCtx.createDataFrame(records, factSchema) df.registerTempTable("fact") val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date") println(results.count()) results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet") }

El recuento demora aproximadamente 2 minutos en 465884512 filas. La escritura en el parquet dura 38 minutos

Entiendo que la unión hace una mezcla al conductor que hace la escritura ... pero la cantidad de tiempo que está tomando me hace pensar que estoy haciendo algo muy mal. Sin la fusión, esto todavía toma 15 minutos, que IMO es demasiado largo y me da una tonelada de pequeños archivos de parquet. Me gustaría tener un archivo grande por día de datos que tendré. Tengo un código para hacer la partición por un valor de campo también, y es tan lento. También traté de enviar esto a csv y eso lleva ~ 1 hora.

Cualquier idea / ayuda / sugerencias / sugerencias sería muy apreciada.

Además, realmente no estoy configurando accesorios de tiempo de ejecución cuando presento mi trabajo. Aquí están mis estadísticas de la consola:

Alive Workers: 2 Cores en uso: 16 Total, 16 Used Memory en uso: 117.5 GB Total, 107.5 GB Used Applications: 1 Running, 5 Drivers completados: 0 Running, 0 Completed Status: ALIVE

Eso es por un trabajo.


El regulador de salida directa se ha ido de la base de código de chispa; usted debe escribir el suyo / resucitar el código eliminado en su propio JAR. SI lo hace, desactive la especulación en su trabajo, y sepa que otras fallas también pueden causar problemas, donde el problema es "datos inválidos".

En una nota más brillante, Hadoop 2.8 va a agregar algunas aceleraciones S3A específicamente para leer formatos binarios optimizados (ORC, Parquet) fuera de S3; ver HADOOP-11694 para más detalles. Y algunas personas están trabajando en el uso de Amazon Dynamo para la tienda de metadatos consistente que debería ser capaz de hacer un compromiso O (1) robusto al final del trabajo.