significado salud medicina amazon-web-services apache-spark amazon-s3 amazon-emr

amazon web services - salud - Tiempos de escritura S3 extremadamente lentos desde EMR/Spark



emr salud (6)

Estoy escribiendo para ver si alguien sabe cómo acelerar los tiempos de escritura S3 de Spark en ejecución en EMR?

Mi tarea de Spark tarda más de 4 horas en completarse, sin embargo, el clúster solo está bajo carga durante las primeras 1,5 horas.

Tenía curiosidad por lo que Spark estaba haciendo todo este tiempo. Miré los registros y encontré muchos comandos s3 mv , uno para cada archivo. Luego, mirando directamente a S3, veo que todos mis archivos están en un directorio _temporary .

En segundo lugar, me preocupa el costo de mi clúster, parece que necesito comprar 2 horas de cómputo para esta tarea específica. Sin embargo, termino comprando hasta 5 horas. Tengo curiosidad si EMR AutoScaling puede ayudar con el costo en esta situación.

Algunos artículos discuten el cambio del algoritmo de salida de archivos, pero he tenido poco éxito con eso.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

Escribir en el HDFS local es rápido. Tengo curiosidad por si emitir un comando hadoop para copiar los datos a S3 sería más rápido?


¿Qué tan grande es el archivo (s) que está escribiendo también? Tener un núcleo escrito en un archivo muy grande va a ser mucho más lento que dividir el archivo y tener múltiples trabajadores que escriban archivos más pequeños.


¿Qué ves en la salida de chispa? Si ve muchas operaciones de cambio de nombre, lea this


El interlocutor directo fue sacado de la chispa, ya que no era resistente a las fallas. Recomiendo encarecidamente no usarlo.

Hay trabajo en curso en Hadoop, s3guard, para agregar confirmadores de cambio de nombre 0, que serán O (1) y tolerantes a fallas; mantener un ojo en HADOOP-13786 .

Ignorando "el comensal de Magic" por ahora, el comisionado basado en Netflix se enviará primero (¿hadoop 2.9? 3.0?)

  1. Esto escribe el trabajo al FS local, en la tarea de cometer
  2. emite operaciones de envío de partes múltiples no comprometidas para escribir los datos, pero no materializarlos.
  3. Guarda la información necesaria para enviar el PUT a HDFS, utilizando el comando de salida de archivo original "algoritmo 1"
  4. Implementa una confirmación de trabajo que utiliza la confirmación de salida de archivo de HDFS para decidir qué PUT completar y cuáles cancelar.

Resultado: la confirmación de la tarea toma datos / ancho de banda en segundos, pero la confirmación de la tarea no toma más tiempo que el de hacer 1-4 GET en la carpeta de destino y un POST para cada archivo pendiente, este último en paralelo.

Puede seleccionar el committer en el que se basa este trabajo, de netflix , y probablemente utilizarlo en chispa hoy. Configure el algoritmo de confirmación de archivo = 1 (debería ser el predeterminado) o no escribirá los datos.


Experimentamos lo mismo en Azure usando Spark en WASB. Finalmente decidimos no utilizar el almacenamiento distribuido directamente con chispa. Hicimos spark.write en un destino real de hdfs: // y desarrollamos una herramienta específica que hace: hadoop copyFromLocal hdfs: // wasb: // El HDFS es nuestro búfer temporal antes de archivar en WASB (o S3).


Lo que está viendo es un problema con el outputcommitter y s3. el trabajo de confirmación aplica fs.rename en la carpeta _temporary y, dado que S3 no admite el cambio de nombre, significa que una sola solicitud ahora está copiando y eliminando todos los archivos de _temporary a su destino final.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") solo funciona con la versión hadoop> 2.7. lo que hace es copiar cada archivo de _temporary en la tarea de confirmación y no confirmar la tarea, por lo que se distribuye y funciona bastante rápido.

Si usa una versión anterior de hadoop, usaría Spark 1.6 y usaría:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

* tenga en cuenta que no funciona con la especificación activada o escribiendo en modo de adición

** también tenga en cuenta que está en desuso en Spark 2.0 (reemplazado por algorithm.version = 2)

Por cierto, en mi equipo escribimos con Spark a HDFS y utilizamos trabajos DISTCP (específicamente s3-dist-cp) en producción para copiar los archivos a S3, pero esto se hace por varias otras razones (coherencia, tolerancia a fallos), por lo que no es necesario .. puedes escribir a S3 bastante rápido usando lo que sugerí.


Tuve un caso de uso similar en el que usé spark para escribir en s3 y tuve un problema de rendimiento. La razón principal fue que spark estaba creando una gran cantidad de archivos de pieza de cero bytes y reemplazando los archivos temporales por el nombre real del archivo estaba ralentizando el proceso de escritura. Trató a continuación el enfoque como trabajo alrededor

  1. Escriba la salida de spark en HDFS y use Hive para escribir en s3. El rendimiento fue mucho mejor ya que Hive estaba creando menos cantidad de archivos de pieza. El problema que tuve fue (también tuve el mismo problema al usar chispa), la acción de eliminación en la Política no se proporcionó en prod env por razones de seguridad. El cubo de S3 fue kms cifrado en mi caso.

  2. Escriba la salida de chispa en HDFS y los archivos hdfs copiados en local y use aws s3 copy para enviar datos a s3. Tuvo segundos mejores resultados con este enfoque. Creé un ticket con Amazon y me propusieron ir con éste.

  3. Use s3 dist cp para copiar archivos de HDFS a S3. Esto fue trabajar sin problemas, pero no ejecutante