Apache Storm: ejemplo de trabajo

Hemos revisado los detalles técnicos básicos de Apache Storm y ahora es el momento de codificar algunos escenarios simples.

Escenario: analizador de registro de llamadas móviles

La llamada móvil y su duración se proporcionarán como entrada a Apache Storm y Storm procesará y agrupará la llamada entre la misma persona que llama y el mismo receptor y su número total de llamadas.

Creación de pico

Spout es un componente que se utiliza para la generación de datos. Básicamente, un pico implementará una interfaz IRichSpout. La interfaz "IRichSpout" tiene los siguientes métodos importantes:

  • open- Proporciona al pico un entorno para ejecutar. Los ejecutores ejecutarán este método para inicializar el pico.

  • nextTuple - Emite los datos generados a través del recolector.

  • close - Este método se llama cuando se va a apagar un pico.

  • declareOutputFields - Declara el esquema de salida de la tupla.

  • ack - Reconoce que se procesa una tupla específica

  • fail - Especifica que una tupla específica no se procesa y no se reprocesará.

Abierto

La firma del open El método es el siguiente:

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Proporciona configuración de tormenta para este pico.

  • context - Proporciona información completa sobre el lugar del pico dentro de la topología, su identificación de tarea, información de entrada y salida.

  • collector - Nos permite emitir la tupla que será procesada por los pernos.

nextTuple

La firma del nextTuple El método es el siguiente:

nextTuple()

nextTuple () se llama periódicamente desde el mismo ciclo que los métodos ack () y fail (). Debe liberar el control del hilo cuando no hay trabajo por hacer, para que los otros métodos tengan la oportunidad de ser llamados. Entonces, la primera línea de nextTuple verifica si el procesamiento ha finalizado. Si es así, debería dormir durante al menos un milisegundo para reducir la carga en el procesador antes de regresar.

cerrar

La firma del close El método es el siguiente:

close()

declareOutputFields

La firma del declareOutputFields El método es el siguiente:

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Se utiliza para declarar ID de flujo de salida, campos de salida, etc.

Este método se utiliza para especificar el esquema de salida de la tupla.

ack

La firma del ack El método es el siguiente:

ack(Object msgId)

Este método reconoce que se ha procesado una tupla específica.

fallar

La firma del nextTuple El método es el siguiente:

ack(Object msgId)

Este método informa que una tupla específica no se ha procesado por completo. Storm reprocesará la tupla específica.

FakeCallLogReaderSpout

En nuestro escenario, necesitamos recopilar los detalles del registro de llamadas. La información del registro de llamadas contiene.

  • número de la persona que llama
  • número de receptor
  • duration

Dado que no tenemos información en tiempo real de los registros de llamadas, generaremos registros de llamadas falsos. La información falsa se creará usando la clase Random. El código completo del programa se proporciona a continuación.

Codificación - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Creación de pernos

Bolt es un componente que toma tuplas como entrada, procesa la tupla y produce nuevas tuplas como salida. Los pernos implementaránIRichBoltinterfaz. En este programa, dos clases de tornillosCallLogCreatorBolt y CallLogCounterBolt se utilizan para realizar las operaciones.

La interfaz IRichBolt tiene los siguientes métodos:

  • prepare- Proporciona al cerrojo un entorno para ejecutar. Los ejecutores ejecutarán este método para inicializar el pico.

  • execute - Procesar una sola tupla de entrada.

  • cleanup - Llamado cuando un cerrojo se va a apagar.

  • declareOutputFields - Declara el esquema de salida de la tupla.

Preparar

La firma del prepare El método es el siguiente:

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Proporciona configuración Storm para este perno.

  • context - Proporciona información completa sobre el lugar del perno dentro de la topología, su identificación de tarea, información de entrada y salida, etc.

  • collector - Nos permite emitir la tupla procesada.

ejecutar

La firma del execute El método es el siguiente:

execute(Tuple tuple)

aquí tuple es la tupla de entrada que se procesará.

los executeEl método procesa una sola tupla a la vez. Se puede acceder a los datos de la tupla mediante el método getValue de la clase Tuple. No es necesario procesar la tupla de entrada inmediatamente. Se pueden procesar múltiples tuplas y generar como una única tupla de salida. La tupla procesada se puede emitir utilizando la clase OutputCollector.

limpiar

La firma del cleanup El método es el siguiente:

cleanup()

declareOutputFields

La firma del declareOutputFields El método es el siguiente:

declareOutputFields(OutputFieldsDeclarer declarer)

Aquí el parámetro declarer se utiliza para declarar ID de flujo de salida, campos de salida, etc.

Este método se utiliza para especificar el esquema de salida de la tupla.

Registro de llamadas Creator Bolt

El perno del creador del registro de llamadas recibe la tupla del registro de llamadas. La tupla del registro de llamadas tiene el número de la persona que llama, el número del receptor y la duración de la llamada. Este perno simplemente crea un nuevo valor combinando el número de la persona que llama y el número del receptor. El formato del nuevo valor es "Número de llamante - Número de receptor" y se denomina como nuevo campo, "llamada". El código completo se proporciona a continuación.

Codificación - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Perno contador de registro de llamadas

El perno del contador del registro de llamadas recibe la llamada y su duración como una tupla. Este perno inicializa un objeto de diccionario (mapa) en el método de preparación. Enexecute, comprueba la tupla y crea una nueva entrada en el objeto de diccionario para cada nuevo valor de "llamada" en la tupla y establece un valor 1 en el objeto de diccionario. Para la entrada ya disponible en el diccionario, simplemente incrementa su valor. En términos simples, este perno guarda la llamada y su recuento en el objeto de diccionario. En lugar de guardar la llamada y su recuento en el diccionario, también podemos guardarla en una fuente de datos. El código completo del programa es el siguiente:

Codificación - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Crear topología

La topología de Storm es básicamente una estructura de ahorro. La clase TopologyBuilder proporciona métodos sencillos y sencillos para crear topologías complejas. La clase TopologyBuilder tiene métodos para configurar spout(setSpout) y para fijar el perno (setBolt). Finalmente, TopologyBuilder tiene createTopology para crear topología. Utilice el siguiente fragmento de código para crear una topología:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping y fieldsGrouping Los métodos ayudan a configurar la agrupación de arroyos para picos y pernos.

Clúster local

Para propósitos de desarrollo, podemos crear un clúster local usando el objeto "LocalCluster" y luego enviar la topología usando el método "submitTopology" de la clase "LocalCluster". Uno de los argumentos para "submitTopology" es una instancia de la clase "Config". La clase "Config" se utiliza para establecer opciones de configuración antes de enviar la topología. Esta opción de configuración se fusionará con la configuración del clúster en tiempo de ejecución y se enviará a todas las tareas (boquilla y cerrojo) con el método de preparación. Una vez que la topología se envía al clúster, esperaremos 10 segundos para que el clúster calcule la topología enviada y luego apagaremos el clúster mediante el método de "apagado" de "LocalCluster". El código completo del programa es el siguiente:

Codificación - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Crear y ejecutar la aplicación

La aplicación completa tiene cuatro códigos Java. Ellos son -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

La aplicación se puede construir usando el siguiente comando:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

La aplicación se puede ejecutar usando el siguiente comando:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Salida

Una vez que se inicia la aplicación, se mostrarán los detalles completos sobre el proceso de inicio del clúster, el procesamiento de la salida y el perno y, finalmente, el proceso de cierre del clúster. En "CallLogCounterBolt", hemos impreso la llamada y sus detalles de recuento. Esta información se mostrará en la consola de la siguiente manera:

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Idiomas que no son JVM

Las topologías de tormenta se implementan mediante interfaces Thrift, lo que facilita el envío de topologías en cualquier idioma. Storm admite Ruby, Python y muchos otros lenguajes. Echemos un vistazo al enlace de Python.

Enlace de Python

Python es un lenguaje de programación de uso general interpretado, interactivo, orientado a objetos y de alto nivel. Storm admite Python para implementar su topología. Python admite operaciones de emisión, anclaje, seguimiento y registro.

Como sabe, los tornillos se pueden definir en cualquier idioma. Los pernos escritos en otro idioma se ejecutan como subprocesos y Storm se comunica con esos subprocesos con mensajes JSON a través de stdin / stdout. Primero, tome un WordCount de perno de muestra que admita el enlace de Python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Aqui la clase WordCount implementa el IRichBoltinterfaz y se ejecuta con la implementación de Python especificada como argumento del súper método "splitword.py". Ahora cree una implementación de Python llamada "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

Esta es la implementación de muestra para Python que cuenta las palabras en una oración determinada. De manera similar, también puede vincularse con otros idiomas de soporte.