java bigdata apache-flink flink-streaming

java - Flink Streaming: ¿Cómo emitir un flujo de datos a diferentes salidas dependiendo de los datos?



bigdata apache-flink (1)

En Apache Flink tengo una corriente de tuplas. Asumamos un Tuple1<String> realmente simple. La tupla puede tener un valor arbitrario en su campo de valor (por ejemplo, ''P1'', ''P2'', etc.). El conjunto de valores posibles es finito, pero no conozco el conjunto completo de antemano (por lo que podría haber un ''P362''). Quiero escribir esa tupla en una determinada ubicación de salida dependiendo del valor dentro de la tupla. Así que, por ejemplo, me gustaría tener la siguiente estructura de archivos:

  • /output/P1
  • /output/P2

En la documentación, solo encontré posibilidades para escribir en ubicaciones que conozco de antemano (por ejemplo, stream.writeCsv("/output/somewhere") ), pero no hay forma de dejar que el contenido de los datos decida dónde terminan los datos.

Leí sobre la división de salida en la documentación, pero esto no parece proporcionar una manera de redirigir la salida a diferentes destinos de la forma en que me gustaría tenerla (o simplemente no entiendo cómo funcionaría esto).

¿Se puede hacer esto con la API de Flink? Si es así, ¿cómo? Si no es así, ¿hay tal vez una biblioteca de terceros que pueda hacerlo o tendría que construir una cosa por mi cuenta?

Actualizar

Siguiendo la sugerencia de Matthias, se me ocurrió una función de sumidero de tamizado que determina la ruta de salida y luego escribe la tupla en el archivo respectivo después de serializarla. Lo puse aquí como referencia, tal vez sea útil para otra persona:

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> { private final OutputSelector<IT> outputSelector; private final MapFunction<IT, String> serializationFunction; private final String basePath; Map<String, TextOutputFormat<String>> formats = new HashMap<>(); /** * @param outputSelector the selector which determines into which output(s) a record is written. * @param serializationFunction a function which serializes the record to a string. * @param basePath the base path for writing the records. It will be appended with the output selector. */ public SiftingSinkFunction(OutputSelector<IT> outputSelector, MapFunction<IT, String> serializationFunction, String basePath) { this.outputSelector = outputSelector; this.serializationFunction = serializationFunction; this.basePath = basePath; } @Override public void invoke(IT value) throws Exception { // find out where to write. Iterable<String> selection = outputSelector.select(value); for (String s : selection) { // ensure we have a format for this. TextOutputFormat<String> destination = ensureDestinationExists(s); // then serialize and write. destination.writeRecord(serializationFunction.map(value)); } } private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException { // if we know the destination, we just return the format. if (formats.containsKey(selection)) { return formats.get(selection); } // create a new output format and initialize it from the context. TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection)); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); format.configure(context.getTaskStubParameters()); format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); // put it into our map. formats.put(selection, format); return format; } @Override public void close() throws IOException { Exception lastException = null; try { for (TextOutputFormat<String> format : formats.values()) { try { format.close(); } catch (Exception e) { lastException = e; format.tryCleanupOnError(); } } } finally { formats.clear(); } if (lastException != null) { throw new IOException("Close failed.", lastException); } } }


Puede implementar un sumidero personalizado. Heredar de uno de los dos

  • org.apache.flink.streaming.api.functions.sink.SinkFunction
  • org.apache.flink.streaming.api.functions.sink.RichSinkFunction

En su programa de uso:

stream.addSink(SinkFunction<T> sinkFunction);

en lugar de stream.writeCsv("/output/somewhere") .