Tormenta Apache - Tridente

Trident es una extensión de Storm. Al igual que Storm, Trident también fue desarrollado por Twitter. La razón principal detrás del desarrollo de Trident es proporcionar una abstracción de alto nivel sobre Storm junto con procesamiento de flujo con estado y consultas distribuidas de baja latencia.

Trident usa pico y cerrojo, pero estos componentes de bajo nivel son generados automáticamente por Trident antes de la ejecución. Trident tiene funciones, filtros, uniones, agrupación y agregación.

Trident procesa los flujos como una serie de lotes que se denominan transacciones. Generalmente, el tamaño de esos pequeños lotes será del orden de miles o millones de tuplas, según el flujo de entrada. De esta manera, Trident es diferente de Storm, que realiza el procesamiento tupla por tupla.

El concepto de procesamiento por lotes es muy similar al de las transacciones de bases de datos. A cada transacción se le asigna un ID de transacción. La transacción se considera exitosa, una vez que se completa todo su procesamiento. Sin embargo, una falla en el procesamiento de una de las tuplas de la transacción hará que se retransmita toda la transacción. Para cada lote, Trident llamará a beginCommit al comienzo de la transacción y se comprometerá al final de la misma.

Topología tridente

Trident API expone una opción fácil para crear topología Trident usando la clase "TridentTopology". Básicamente, la topología Trident recibe el flujo de entrada del canal y realiza una secuencia ordenada de operación (filtro, agregación, agrupación, etc.) en el flujo. Storm Tuple se reemplaza por Trident Tuple y Bolts se reemplazan por operaciones. Se puede crear una topología Trident simple de la siguiente manera:

TridentTopology topology = new TridentTopology();

Tuplas tridentes

La tupla tridente es una lista de valores con nombre. La interfaz TridentTuple es el modelo de datos de una topología Trident. La interfaz TridentTuple es la unidad básica de datos que puede procesar una topología Trident.

Pico tridente

El pico Trident es similar al pico Storm, con opciones adicionales para usar las funciones de Trident. De hecho, todavía podemos usar el IRichSpout, que hemos usado en la topología de Storm, pero será de naturaleza no transaccional y no podremos usar las ventajas proporcionadas por Trident.

El pico básico que tiene todas las funciones para utilizar las funciones de Trident es "ITridentSpout". Es compatible con semántica transaccional transaccional y opaca. Los otros surtidores son IBatchSpout, IPartitionedTridentSpout e IOpaquePartitionedTridentSpout.

Además de estos picos genéricos, Trident tiene muchos ejemplos de implementación del pico tridente. Uno de ellos es el pico FeederBatchSpout, que podemos usar para enviar listas nombradas de tuplas tridentes fácilmente sin preocuparnos por el procesamiento por lotes, el paralelismo, etc.

La creación de FeederBatchSpout y la alimentación de datos se pueden realizar como se muestra a continuación:

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Operaciones de tridente

Trident se basa en la "Operación Trident" para procesar el flujo de entrada de tuplas trident. La API Trident tiene una serie de operaciones integradas para manejar el procesamiento de flujos de simples a complejos. Estas operaciones van desde la validación simple hasta la agrupación compleja y agregación de tuplas de tridentes. Repasemos las operaciones más importantes y de uso frecuente.

Filtrar

El filtro es un objeto utilizado para realizar la tarea de validación de entrada. Un filtro Trident obtiene un subconjunto de campos de tupla trident como entrada y devuelve verdadero o falso dependiendo de si se cumplen ciertas condiciones o no. Si se devuelve true, la tupla se mantiene en el flujo de salida; de lo contrario, la tupla se elimina de la secuencia. El filtro básicamente heredará delBaseFilter clase e implementar el isKeepmétodo. Aquí hay una implementación de muestra del funcionamiento del filtro:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

La función de filtro se puede llamar en la topología utilizando el método "cada". La clase "Campos" se puede utilizar para especificar la entrada (subconjunto de la tupla tridente). El código de muestra es el siguiente:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Función

Functiones un objeto que se utiliza para realizar una operación simple en una tupla de un solo tridente. Toma un subconjunto de campos de tupla tridente y emite cero o más campos de tupla tridente nuevos.

Function básicamente hereda de la BaseFunction clase e implementa el executemétodo. A continuación se ofrece una implementación de muestra:

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Al igual que la operación de filtro, la operación de función se puede llamar en una topología utilizando el eachmétodo. El código de muestra es el siguiente:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Agregación

La agregación es un objeto que se utiliza para realizar operaciones de agregación en un lote de entrada, una partición o un flujo. Trident tiene tres tipos de agregación. Son los siguientes:

  • aggregate- Agrega cada lote de tupla tridente de forma aislada. Durante el proceso de agregación, las tuplas se reparten inicialmente utilizando la agrupación global para combinar todas las particiones del mismo lote en una sola partición.

  • partitionAggregate- Agrega cada partición en lugar de todo el lote de tupla tridente. La salida del agregado de la partición reemplaza completamente la tupla de entrada. La salida del agregado de partición contiene una única tupla de campo.

  • persistentaggregate - Agrega todas las tuplas de tridentes en todos los lotes y almacena el resultado en la memoria o en la base de datos.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

La operación de agregación se puede crear utilizando CombinerAggregator, ReducerAggregator o la interfaz de agregación genérica. El agregador "count" utilizado en el ejemplo anterior es uno de los agregadores incorporados. Se implementa mediante "CombinerAggregator". La implementación es la siguiente:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Agrupamiento

La operación de agrupamiento es una operación incorporada y puede ser invocada por el groupBymétodo. El método groupBy reparte la secuencia haciendo un partitionBy en los campos especificados, y luego dentro de cada partición, agrupa las tuplas cuyos campos de grupo son iguales. Normalmente, usamos "groupBy" junto con "persistentAggregate" para obtener la agregación agrupada. El código de muestra es el siguiente:

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Fusión y unión

La fusión y la unión se pueden realizar mediante el método "fusionar" y "unión", respectivamente. La fusión combina una o más corrientes. La unión es similar a la fusión, excepto por el hecho de que la unión utiliza un campo de tupla tridente de ambos lados para verificar y unir dos corrientes. Además, la unión solo funcionará a nivel de lote. El código de muestra es el siguiente:

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Mantenimiento del estado

Trident proporciona un mecanismo para el mantenimiento del estado. La información de estado se puede almacenar en la topología misma; de lo contrario, también puede almacenarla en una base de datos separada. El motivo es mantener un estado en el que si alguna tupla falla durante el procesamiento, se reintentará la tupla fallida. Esto crea un problema al actualizar el estado porque no está seguro de si el estado de esta tupla se ha actualizado previamente o no. Si la tupla ha fallado antes de actualizar el estado, volver a intentar la tupla hará que el estado sea estable. Sin embargo, si la tupla ha fallado después de actualizar el estado, volver a intentar la misma tupla aumentará nuevamente el recuento en la base de datos y hará que el estado sea inestable. Es necesario realizar los siguientes pasos para garantizar que un mensaje se procese solo una vez:

  • Procese las tuplas en pequeños lotes.

  • Asigne una identificación única a cada lote. Si se vuelve a intentar el lote, se le asigna el mismo ID único.

  • Las actualizaciones de estado se ordenan entre lotes. Por ejemplo, la actualización del estado del segundo lote no será posible hasta que se complete la actualización del estado del primer lote.

RPC distribuido

La RPC distribuida se utiliza para consultar y recuperar el resultado de la topología Trident. Storm tiene un servidor RPC distribuido incorporado. El servidor RPC distribuido recibe la solicitud RPC del cliente y la pasa a la topología. La topología procesa la solicitud y envía el resultado al servidor RPC distribuido, que es redirigido por el servidor RPC distribuido al cliente. La consulta RPC distribuida de Trident se ejecuta como una consulta RPC normal, excepto por el hecho de que estas consultas se ejecutan en paralelo.

¿Cuándo usar Trident?

Como en muchos casos de uso, si el requisito es procesar una consulta solo una vez, podemos lograrlo escribiendo una topología en Trident. Por otro lado, será difícil lograr exactamente una vez procesado en el caso de Storm. Por lo tanto, Trident será útil para aquellos casos de uso en los que requiera exactamente un procesamiento. Trident no es para todos los casos de uso, especialmente los casos de uso de alto rendimiento porque agrega complejidad a Storm y administra el estado.

Ejemplo de trabajo de Trident

Vamos a convertir nuestra aplicación analizadora de registro de llamadas desarrollada en la sección anterior al marco Trident. La aplicación Trident será relativamente fácil en comparación con la simple tormenta, gracias a su API de alto nivel. Se requerirá básicamente que Storm realice cualquiera de las operaciones de Función, Filtro, Agregado, GroupBy, Join y Merge en Trident. Finalmente iniciaremos el servidor DRPC usando elLocalDRPC class y busque alguna palabra clave usando la execute método de la clase LocalDRPC.

Formatear la información de la llamada

El propósito de la clase FormatCall es formatear la información de la llamada que comprende "Número de llamante" y "Número de receptor". El código completo del programa es el siguiente:

Codificación: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

El propósito de la clase CSVSplit es dividir la cadena de entrada basándose en “coma (,)” y emitir cada palabra en la cadena. Esta función se utiliza para analizar el argumento de entrada de la consulta distribuida. El código completo es el siguiente:

Codificación: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Analizador de registros

Esta es la aplicación principal. Inicialmente, la aplicación inicializará TridentTopology y alimentará la información de la persona que llama usandoFeederBatchSpout. El flujo de topología tridente se puede crear utilizando elnewStreammétodo de la clase TridentTopology. De manera similar, el flujo DRPC de topología Trident se puede crear utilizando elnewDRCPStreammétodo de la clase TridentTopology. Se puede crear un servidor DRCP simple usando la clase LocalDRPC.LocalDRPCtiene un método de ejecución para buscar alguna palabra clave. El código completo se proporciona a continuación.

Codificación: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Crear y ejecutar la aplicación

La aplicación completa tiene tres códigos Java. Son los siguientes:

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

La aplicación se puede construir usando el siguiente comando:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

La aplicación se puede ejecutar usando el siguiente comando:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Salida

Una vez que se inicia la aplicación, la aplicación generará los detalles completos sobre el proceso de inicio del clúster, el procesamiento de operaciones, el servidor DRPC y la información del cliente y, finalmente, el proceso de cierre del clúster. Esta salida se mostrará en la consola como se muestra a continuación.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends