json amazon-web-services amazon-s3 parquet amazon-kinesis-firehose

json - Escribir parquet de AWS Kinesis firehose a AWS S3



amazon-web-services amazon-s3 (3)

Amazon Kinesis Firehose recibe registros de transmisión y puede almacenarlos en Amazon S3 (o Amazon Redshift o Amazon Elasticsearch Service).

Cada registro puede ser de hasta 1000KB.

Sin embargo, los registros se agregan juntos en un archivo de texto, con lotes basados ​​en el tiempo o el tamaño. Tradicionalmente, los registros son formato JSON.

No podrá enviar un archivo de parquet porque no se ajustará a este formato de archivo.

Es posible activar una función de transformación de datos Lambda, pero esto tampoco será capaz de generar un archivo de parquet.

De hecho, dada la naturaleza de los archivos de parquet, es poco probable que pueda construirlos un registro a la vez . Siendo un formato de almacenamiento en columnas, sospecho que realmente deben crearse en un lote en lugar de tener datos agregados por registro.

En pocas palabras : no .

Me gustaría ingerir datos en s3 de kinesis firehose formateado como parquet. Hasta ahora solo he encontrado una solución que implica crear un EMR, pero estoy buscando algo más barato y más rápido como almacenar el json recibido como parquet directamente desde Firehose o usar una función Lambda.

Muchas gracias, Javi.


Buenas noticias, esta característica fue lanzada hoy!

Amazon Kinesis Data Firehose puede convertir el formato de sus datos de entrada de JSON a Apache Parquet o Apache ORC antes de almacenar los datos en Amazon S3. Parquet y ORC son formatos de datos en columnas que ahorran espacio y permiten consultas más rápidas

Para habilitarlo, vaya a su secuencia de Firehose y haga clic en Editar . Debería ver la sección Conversión de formato de grabación como en la captura de pantalla siguiente:

Consulte la documentación para obtener más información: https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html


Después de tratar con el servicio de soporte de AWS y cientos de implementaciones diferentes, me gustaría explicar lo que he logrado.

Finalmente, he creado una función Lambda que procesa todos los archivos generados por Kinesis Firehose, clasifica mis eventos según la carga útil y almacena el resultado en archivos Parquet en S3.

Hacer eso no es muy fácil:

  1. En primer lugar, debe crear un env virtual de Python, incluidas todas las bibliotecas requeridas (en mi caso Pandas, NumPy, Fastparquet, etc.). Como el archivo resultante (que incluye todas las bibliotecas y mi función de Lambda es pesado, es necesario lanzar una instancia de EC2, he usado el que se incluye en el nivel gratuito). Para crear el env virtual siga estos pasos:

    • Iniciar sesión en EC2
    • Crea una carpeta llamada lambda (o cualquier otro nombre)
    • Sudo yum -y actualización
    • Sudo yum -y actualización
    • sudo yum -y groupinstall "Herramientas de desarrollo"
    • sudo yum -y instalar blas
    • sudo yum -y instalar lapack
    • sudo yum -y instalar atlas-sse3-devel
    • sudo yum instalar python27-devel python27-pip gcc
    • Virtualenv env
    • fuente env / bin / activar
    • pip instalar boto3
    • pip instalar fastparquet
    • pip instalar pandas
    • pip instalar thriftpy
    • pip instalar s3fs
    • pip instalar (cualquier otra biblioteca requerida)
    • encuentra ~ / lambda / env / lib * / python2.7 / site-packages / -name "* .so" | tira de xargs
    • pushd env / lib / python2.7 / site-packages /
    • zip -r -9 -q ~ / lambda.zip *
    • Popd
    • pushd env / lib64 / python2.7 / site-packages /
    • zip -r -9 -q ~ / lambda.zip *
    • Popd
  2. Crea la función lambda apropiadamente:

    import json import boto3 import datetime as dt import urllib import zlib import s3fs from fastparquet import write import pandas as pd import numpy as np import time def _send_to_s3_parquet(df): s3_fs = s3fs.S3FileSystem() s3_fs_open = s3_fs.open # FIXME add something else to the key or it will overwrite the file key = ''mybeautifullfile.parquet.gzip'' # Include partitions! key1 and key2 write( ''ExampleS3Bucket''+ ''/key1=value/key2=othervalue/'' + key, df, compression=''GZIP'',open_with=s3_fs_open) def lambda_handler(event, context): # Get the object from the event and show its content type bucket = event[''Records''][0][''s3''][''bucket''][''name''] key = urllib.unquote_plus(event[''Records''][0][''s3''][''object''][''key'']) try: s3 = boto3.client(''s3'') response = s3.get_object(Bucket=bucket, Key=key) data = response[''Body''].read() decoded = data.decode(''utf-8'') lines = decoded.split(''/n'') # Do anything you like with the dataframe (Here what I do is to classify them # and write to different folders in S3 according to the values of # the columns that I want df = pd.DataFrame(lines) _send_to_s3_parquet(df) except Exception as e: print(''Error getting object {} from bucket {}.''.format(key, bucket)) raise e

  3. Copie la función lambda en lambda.zip y despliegue la función lambda:

    • Vuelva a su instancia de EC2 y agregue la función lambda deseada a zip: zip -9 lambda.zip lambda_function.py (lambda_function.py es el archivo generado en el paso 2)
    • Copie el archivo zip generado en S3, ya que es muy pesado para ser implementado con suficiente hacerlo a través de S3. aws s3 cp lambda.zip s3: // support-bucket / lambda_packages /
    • Despliegue la función lambda: aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages / lambda.zip
  4. Desencadene el que se ejecutará cuando lo desee, por ejemplo, cada vez que se crea un nuevo archivo en S3, o incluso puede asociar la función lambda a Firehose. (No elegí esta opción porque los límites ''lambda'' son más bajos que los límites de Firehose, puede configurar Firehose para escribir un archivo cada 128 Mb o 15 minutos, pero si asocia esta función lambda a Firehose, se ejecutará la función lambda cada 3 minutos o 5 MB, en mi caso tuve el problema de generar muchos pequeños archivos de parquet, ya que cada vez que se lanza la función lambda genero al menos 10 archivos).