Aplicación en tiempo real (Twitter)

Analicemos una aplicación en tiempo real para obtener los últimos feeds de Twitter y sus hashtags. Anteriormente, hemos visto la integración de Storm y Spark con Kafka. En ambos escenarios, creamos un productor de Kafka (usando cli) para enviar un mensaje al ecosistema de Kafka. Luego, la integración de tormenta y chispa lee los mensajes utilizando el consumidor de Kafka y lo inyecta en el ecosistema de tormenta y chispa, respectivamente. Entonces, prácticamente necesitamos crear un productor de Kafka, que debería:

  • Lea los feeds de Twitter utilizando la "API de transmisión de Twitter",
  • Procesar los feeds,
  • Extraiga los HashTags y
  • Envíelo a Kafka.

Una vez que Kafka recibe los HashTags , la integración Storm / Spark recibe la información y la envía al ecosistema Storm / Spark.

API de transmisión de Twitter

Se puede acceder a la "API de transmisión de Twitter" en cualquier lenguaje de programación. El “twitter4j” es una biblioteca Java no oficial de código abierto, que proporciona un módulo basado en Java para acceder fácilmente a la “API de transmisión de Twitter”. El “twitter4j” proporciona un marco basado en oyentes para acceder a los tweets. Para acceder a la "API de transmisión de Twitter", debemos iniciar sesión en la cuenta de desarrollador de Twitter y obtener lo siguienteOAuth detalles de autenticación.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Una vez creada la cuenta de desarrollador, descargue los archivos jar “twitter4j” y colóquelos en la ruta de clases de Java.

La codificación completa del productor de Twitter Kafka (KafkaTwitterProducer.java) se enumera a continuación:

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName() 
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               + statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" + 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId + 
            "upToStatusId:" + upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Compilacion

Compile la aplicación usando el siguiente comando:

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

Ejecución

Abra dos consolas. Ejecute la aplicación compilada anterior como se muestra a continuación en una consola.

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

Ejecute cualquiera de las aplicaciones Spark / Storm explicadas en el capítulo anterior en otra ventana. El punto principal a tener en cuenta es que el tema utilizado debe ser el mismo en ambos casos. Aquí, hemos utilizado "mi-primer-tema" como nombre del tema.

Salida

El resultado de esta aplicación dependerá de las palabras clave y el feed actual de Twitter. A continuación se especifica un resultado de muestra (integración de tormenta).

. . .
food : 1
foodie : 2
burger : 1
. . .