google-bigquery google-cloud-dataflow apache-beam-io

google bigquery - Creación/escritura en la tabla BigQuery parititonada a través de Google Cloud Dataflow



google-bigquery google-cloud-dataflow (6)

Apache Beam versión 2.0 admite la división de tablas de salida de BigQuery fuera de la caja .

Quería aprovechar la nueva funcionalidad BigQuery de las tablas con particiones de tiempo, pero no estoy seguro de que esto sea posible actualmente en la versión 1.6 del SDK de Dataflow.

Mirando la API BigQuery JSON , para crear una tabla particionada de día, uno debe pasar

"timePartitioning": { "type": "DAY" }

opción, pero la interfaz com.google.cloud.dataflow.sdk.io.BigQueryIO solo permite especificar una TableReference.

Pensé que tal vez podría precrear la tabla y escabullirme en un decorador de partición a través de un lambda BigQueryIO.Write.toTableReference. ¿Alguien más está teniendo éxito al crear / escribir tablas particionadas a través de Dataflow?

Esto parece un problema similar a la configuración del tiempo de caducidad de la tabla que actualmente tampoco está disponible.


Como dice Pavan, definitivamente es posible escribir en tablas de partición con Dataflow. ¿Está utilizando DataflowPipelineRunner funciona en modo de transmisión o en modo por lotes?

La solución que propusiste debería funcionar. Específicamente, si crea previamente una tabla con la configuración de particionamiento de fecha, puede usar una lambda BigQueryIO.Write.toTableReference para escribir en una partición de fecha. Por ejemplo:

/** * A Joda-time formatter that prints a date in format like {@code "20160101"}. * Threadsafe. */ private static final DateTimeFormatter FORMATTER = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC); // This code generates a valid BigQuery partition name: Instant instant = Instant.now(); // any Joda instant in a reasonable time range String baseTableName = "project:dataset.table"; // a valid BigQuery table name String partitionName = String.format("%s$%s", baseTableName, FORMATTER.print(instant));


Creo que debería ser posible usar el decorador de partición cuando no esté usando la transmisión. Estamos trabajando activamente para admitir decoradores de particiones a través de la transmisión. Háganos saber si ve algún error hoy con el modo sin transmisión.


El enfoque que tomé (también funciona en el modo de transmisión):

  • Definir una ventana personalizada para el registro entrante
  • Convierta la ventana en el nombre de la tabla / partición

    p.apply(PubsubIO.Read .subscription(subscription) .withCoder(TableRowJsonCoder.of()) ) .apply(Window.into(new TablePartitionWindowFn()) ) .apply(BigQueryIO.Write .to(new DayPartitionFunc(dataset, table)) .withSchema(schema) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) );

Al configurar la ventana en función de los datos entrantes, se puede ignorar End Instant, ya que el valor inicial se usa para configurar la partición:

public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> { private IntervalWindow assignWindow(AssignContext context) { TableRow source = (TableRow) context.element(); String dttm_str = (String) source.get("DTTM"); DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC(); Instant start_point = Instant.parse(dttm_str,formatter); Instant end_point = start_point.withDurationAdded(1000, 1); return new IntervalWindow(start_point, end_point); }; @Override public Coder<IntervalWindow> windowCoder() { return IntervalWindow.getCoder(); } @Override public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception { return Arrays.asList(assignWindow(c)); } @Override public boolean isCompatible(WindowFn<?, ?> other) { return false; } @Override public IntervalWindow getSideInputWindow(BoundedWindow window) { if (window instanceof GlobalWindow) { throw new IllegalArgumentException( "Attempted to get side input window for GlobalWindow from non-global WindowFn"); } return null; }

Configuración dinámica de la partición de la tabla:

public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> { String destination = ""; public DayPartitionFunc(String dataset, String table) { this.destination = dataset + "." + table+ "$"; } @Override public String apply(BoundedWindow boundedWindow) { // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows. String dayString = DateTimeFormat.forPattern("yyyyMMdd") .withZone(DateTimeZone.UTC) .print(((IntervalWindow) boundedWindow).start()); return destination + dayString; }}

¿Hay una mejor manera de lograr el mismo resultado?


He escrito datos en tablas particionadas bigquery a través del flujo de datos. Estos escritos son dinámicos como si los datos en esa partición ya existen, entonces puedo agregarlos o sobrescribirlos.

He escrito el código en Python. Es una operación de escritura en modo por lotes en bigquery.

client = bigquery.Client(project=projectName) dataset_ref = client.dataset(datasetName) table_ref = dataset_ref.table(bqTableName) job_config = bigquery.LoadJobConfig() job_config.skip_leading_rows = skipLeadingRows job_config.source_format = bigquery.SourceFormat.CSV if tableExists(client, table_ref): job_config.autodetect = autoDetect previous_rows = client.get_table(table_ref).num_rows #assert previous_rows > 0 if allowJaggedRows is True: job_config.allowJaggedRows = True if allowFieldAddition is True: job_config._properties[''load''][''schemaUpdateOptions''] = [''ALLOW_FIELD_ADDITION''] if isPartitioned is True: job_config._properties[''load''][''timePartitioning''] = {"type": "DAY"} if schemaList is not None: job_config.schema = schemaList job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE else: job_config.autodetect = autoDetect job_config._properties[''createDisposition''] = ''CREATE_IF_NEEDED'' job_config.schema = schemaList if isPartitioned is True: job_config._properties[''load''][''timePartitioning''] = {"type": "DAY"} if schemaList is not None: table = bigquery.Table(table_ref, schema=schemaList) load_job = client.load_table_from_uri(gcsFileName, table_ref, job_config=job_config) assert load_job.job_type == ''load'' load_job.result() assert load_job.state == ''DONE''

Funciona bien.