google bigquery - Creación/escritura en la tabla BigQuery parititonada a través de Google Cloud Dataflow
google-bigquery google-cloud-dataflow (6)
Apache Beam versión 2.0 admite la división de tablas de salida de BigQuery fuera de la caja .
Quería aprovechar la nueva funcionalidad BigQuery de las tablas con particiones de tiempo, pero no estoy seguro de que esto sea posible actualmente en la versión 1.6 del SDK de Dataflow.
Mirando la API BigQuery JSON , para crear una tabla particionada de día, uno debe pasar
"timePartitioning": { "type": "DAY" }
opción, pero la interfaz com.google.cloud.dataflow.sdk.io.BigQueryIO solo permite especificar una TableReference.
Pensé que tal vez podría precrear la tabla y escabullirme en un decorador de partición a través de un lambda BigQueryIO.Write.toTableReference. ¿Alguien más está teniendo éxito al crear / escribir tablas particionadas a través de Dataflow?
Esto parece un problema similar a la configuración del tiempo de caducidad de la tabla que actualmente tampoco está disponible.
Como dice Pavan, definitivamente es posible escribir en tablas de partición con Dataflow.
¿Está utilizando
DataflowPipelineRunner
funciona en modo de transmisión o en modo por lotes?
La solución que propusiste debería funcionar.
Específicamente, si crea previamente una tabla con la configuración de particionamiento de fecha, puede usar una lambda
BigQueryIO.Write.toTableReference
para escribir en una partición de fecha.
Por ejemplo:
/**
* A Joda-time formatter that prints a date in format like {@code "20160101"}.
* Threadsafe.
*/
private static final DateTimeFormatter FORMATTER =
DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);
// This code generates a valid BigQuery partition name:
Instant instant = Instant.now(); // any Joda instant in a reasonable time range
String baseTableName = "project:dataset.table"; // a valid BigQuery table name
String partitionName =
String.format("%s$%s", baseTableName, FORMATTER.print(instant));
Creo que debería ser posible usar el decorador de partición cuando no esté usando la transmisión. Estamos trabajando activamente para admitir decoradores de particiones a través de la transmisión. Háganos saber si ve algún error hoy con el modo sin transmisión.
El enfoque que tomé (también funciona en el modo de transmisión):
- Definir una ventana personalizada para el registro entrante
-
Convierta la ventana en el nombre de la tabla / partición
p.apply(PubsubIO.Read .subscription(subscription) .withCoder(TableRowJsonCoder.of()) ) .apply(Window.into(new TablePartitionWindowFn()) ) .apply(BigQueryIO.Write .to(new DayPartitionFunc(dataset, table)) .withSchema(schema) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) );
Al configurar la ventana en función de los datos entrantes, se puede ignorar End Instant, ya que el valor inicial se usa para configurar la partición:
public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
private IntervalWindow assignWindow(AssignContext context) {
TableRow source = (TableRow) context.element();
String dttm_str = (String) source.get("DTTM");
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();
Instant start_point = Instant.parse(dttm_str,formatter);
Instant end_point = start_point.withDurationAdded(1000, 1);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public IntervalWindow getSideInputWindow(BoundedWindow window) {
if (window instanceof GlobalWindow) {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
return null;
}
Configuración dinámica de la partición de la tabla:
public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {
String destination = "";
public DayPartitionFunc(String dataset, String table) {
this.destination = dataset + "." + table+ "$";
}
@Override
public String apply(BoundedWindow boundedWindow) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyyMMdd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) boundedWindow).start());
return destination + dayString;
}}
¿Hay una mejor manera de lograr el mismo resultado?
He escrito datos en tablas particionadas bigquery a través del flujo de datos. Estos escritos son dinámicos como si los datos en esa partición ya existen, entonces puedo agregarlos o sobrescribirlos.
He escrito el código en Python. Es una operación de escritura en modo por lotes en bigquery.
client = bigquery.Client(project=projectName)
dataset_ref = client.dataset(datasetName)
table_ref = dataset_ref.table(bqTableName)
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = skipLeadingRows
job_config.source_format = bigquery.SourceFormat.CSV
if tableExists(client, table_ref):
job_config.autodetect = autoDetect
previous_rows = client.get_table(table_ref).num_rows
#assert previous_rows > 0
if allowJaggedRows is True:
job_config.allowJaggedRows = True
if allowFieldAddition is True:
job_config._properties[''load''][''schemaUpdateOptions''] = [''ALLOW_FIELD_ADDITION'']
if isPartitioned is True:
job_config._properties[''load''][''timePartitioning''] = {"type": "DAY"}
if schemaList is not None:
job_config.schema = schemaList
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
else:
job_config.autodetect = autoDetect
job_config._properties[''createDisposition''] = ''CREATE_IF_NEEDED''
job_config.schema = schemaList
if isPartitioned is True:
job_config._properties[''load''][''timePartitioning''] = {"type": "DAY"}
if schemaList is not None:
table = bigquery.Table(table_ref, schema=schemaList)
load_job = client.load_table_from_uri(gcsFileName, table_ref, job_config=job_config)
assert load_job.job_type == ''load''
load_job.result()
assert load_job.state == ''DONE''
Funciona bien.
Si pasa el nombre de la tabla en formato
table_name_YYYYMMDD
, BigQuery lo tratará como una tabla fragmentada, que puede simular las características de la tabla de particiones.
Consulte la documentación:
https://cloud.google.com/bigquery/docs/partitioned-tables