scala apache-spark amazon-s3 apache-spark-sql aws-sdk

scala - Leyendo múltiples archivos desde S3 en Spark por período de fecha



apache-spark amazon-s3 (1)

Descripción

Tengo una aplicación, que envía datos a AWS Kinesis Firehose y esto escribe los datos en mi cubo S3. Firehose usa el formato "aaaa / MM / dd / HH" para escribir los archivos.

Como en este ejemplo de ruta S3:

s3://mybucket/2016/07/29/12

Ahora tengo una aplicación Spark escrita en Scala, donde necesito leer datos de un período de tiempo específico. Tengo fechas de inicio y finalización. Los datos están en formato JSON y es por eso que uso sqlContext.read.json() no sc.textFile() .

¿Cómo puedo leer los datos de forma rápida y eficiente?

¿Qué he intentado?

  1. Comodines : puedo seleccionar los datos de todas las horas de una fecha específica o todas las fechas de un mes específico, por ejemplo:

    val df = sqlContext.read.json("s3://mybucket/2016/07/29/*") val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")

    Pero si tengo que leer datos del período de fecha de unos pocos días, por ejemplo, 2016-07-29 - 2016-07-30 no puedo usar el método de comodín de la misma manera.

    Lo que me lleva a mi siguiente punto...

  2. Utilizando múltiples rutas o un CSV de directorios como lo presenta samthebest en this solución. Parece que separar directorios con comas solo funciona con sc.textFile() y no con sqlContext.read.json() .
  3. Unión : una segunda solución del enlace anterior de cloud sugiere leer cada directorio por separado y luego unirlos. Aunque él sugiere la unión de RDD-s, hay una opción para unir DataFrames también. Si genero las cadenas de fecha a partir del período de fecha dado manualmente, entonces puedo crear una ruta que no existe y en lugar de ignorarla, la lectura completa falla. En su lugar, podría usar AWS SDK y usar la función listObjects de AmazonS3Client para obtener todas las claves como en la solución de iMKanchwala del enlace anterior.

    El único problema es que mis datos están cambiando constantemente. Si la función read.json() obtiene todos los datos como un solo parámetro, lee todos los datos necesarios y es lo suficientemente inteligente como para inferir el esquema json a partir de los datos. Si leo 2 directorios por separado y sus esquemas no coinciden, entonces creo que la unión de estos dos marcos de datos se convierte en un problema.

  4. Sintaxis de Glob (?) : This solución de nhahtdh es un poco mejor que las opciones 1 y 2 porque ofrecen la opción de especificar fechas y directorios con más detalle y como una sola "ruta", por lo que también funciona con read.json() .

    Pero una vez más, se produce un problema familiar sobre los directorios que faltan. Digamos que quiero todos los datos del 20.07 al 30.07, puedo declararlos así:

    val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")

    Pero si me faltan datos del, digamos, el 25 de julio, entonces la ruta ... ..16/07/25/ no existe y la función falla.

Y, obviamente, se vuelve más difícil cuando el período solicitado es, por ejemplo, 25.11.2015-12.02.2016, entonces necesitaría programáticamente (en mi script de Scala) crear una ruta de cadena como esta:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"

Y al crearlo, me gustaría estar seguro de que, de alguna manera, estoy seguro de que estos intervalos 25-30 y 01-12 tienen rutas correspondientes, si falta alguno, vuelve a fallar. (Asterisco, afortunadamente, se ocupa de los directorios que faltan, ya que lee todo lo que existe)

¿Cómo puedo leer todos los datos necesarios de una sola ruta de directorio de una sola vez sin la posibilidad de fallar debido a la falta de un directorio entre un intervalo de fecha?


Hay una solución mucho más simple. Si observa la API DataFrameReader , notará que hay un .json(paths: String*) . Simplemente cree una colección de las rutas que desee, con globos de no, como prefiera, y luego llame al método, por ejemplo,

val paths: Seq[String] = ... val df = sqlContext.read.json(paths: _*)