tutorial - api de twitter en java
detener la transmisiĆ³n de Twitter y devolver la lista de estado con twitter4j (3)
Considere usar un BlockingQueue
como intermediario, usando el oyente para agregarle los objetos de Status
.
Una vez que la transmisión ha comenzado, puede comenzar a tomar Status
de la cola hasta que tenga los mil que necesita.
Como punto de partida, se vería algo como lo siguiente:
public class Stream {
private static final int TOTAL_TWEETS = 1000;
public List<Status> execute() throws TwitterException {
// skipped for brevity...
// TODO: You may have to tweak the capacity of the queue, depends on the filter query
final BlockingQueue<Status> statuses = new LinkedBlockingQueue<Status>(10000);
final StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
statuses.offer(status); // Add received status to the queue
}
// etc...
};
final FilterQuery fq = new FilterQuery();
final String keywords[] = {"Keyword 1", "Keyword 2"};
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
// Collect the 1000 statues
final List<Status> collected = new ArrayList<Status>(TOTAL_TWEETS);
while (collected.size() < TOTAL_TWEETS) {
// TODO: Handle InterruptedException
final Status status = statuses.poll(10, TimeUnit.SECONDS);
if (status == null) {
// TODO: Consider hitting this too often could indicate no further Tweets
continue;
}
collected.add(status);
}
twitterStream.shutdown();
return collected;
}
}
Utilizando el ejemplo de código provisto por Twitter4j, me gustaría detener la transmisión después de que se haya recopilado una lista de 1,000 estados y devolver esta lista. ¿Cómo puedo hacer eso?
public class Stream {
public List<Status> execute throws TwitterException {
List<Status> statuses = new ArrayList();
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey("bbb");
cb.setOAuthConsumerSecret("bbb");
cb.setOAuthAccessToken("bbb");
cb.setOAuthAccessTokenSecret("bbb");
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
statuses.add(status);
if (statuses.size>1000){
//return statuses. Obviously that''s not the correct place for a return statement...
}
}
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
}
public void onScrubGeo(long userId, long upToStatusId) {
System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);
}
public void onException(Exception ex) {
ex.printStackTrace();
}
};
FilterQuery fq = new FilterQuery();
String keywords[] = {"Keyword 1", "Keyword 2"};
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
}
Hay dos maneras:
Si solo necesita finalizar el hilo que consume la secuencia (este es el hilo que llama a su oyente) puede usar twitterStream.cleanUp();
. Esto cortará con gracia el hilo. Es posible que desee utilizar una variable boolean stopped
en su escucha de estado para ignorar cualquier llamada que reciba después de este evento.
También puede cerrar el hilo que consume la secuencia junto con su cadena de distribución (que es un hilo deamon) llamando a twitterStream.shutdown();
Sin embargo, este es un enfoque más brutal para terminar la comunicación con la API de Twitter. Aunque funciona si llamas esto desde dentro del oyente, preferiría el enfoque sugerido por @krishnakumarp
No es una buena idea forzar una pieza de código asincrónica en modo síncrono. Por favor, consulte https://.com/a/5934656/276263 Vea si puede volver a trabajar su lógica.
Sin embargo, el siguiente código funciona según su requisito.
public class Stream {
public static void main(String[] args) throws TwitterException {
Stream stream = new Stream();
stream.execute();
}
private final Object lock = new Object();
public List<Status> execute() throws TwitterException {
final List<Status> statuses = new ArrayList();
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey("bbb");
cb.setOAuthConsumerSecret("bbb");
cb.setOAuthAccessToken("bbb");
cb.setOAuthAccessTokenSecret("bbb");
TwitterStream twitterStream = new TwitterStreamFactory(cb.build())
.getInstance();
StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
statuses.add(status);
System.out.println(statuses.size() + ":" + status.getText());
if (statuses.size() > 100) {
synchronized (lock) {
lock.notify();
}
System.out.println("unlocked");
}
}
public void onDeletionNotice(
StatusDeletionNotice statusDeletionNotice) {
System.out.println("Got a status deletion notice id:"
+ statusDeletionNotice.getStatusId());
}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
System.out.println("Got track limitation notice:"
+ numberOfLimitedStatuses);
}
public void onScrubGeo(long userId, long upToStatusId) {
System.out.println("Got scrub_geo event userId:" + userId
+ " upToStatusId:" + upToStatusId);
}
public void onException(Exception ex) {
ex.printStackTrace();
}
@Override
public void onStallWarning(StallWarning sw) {
System.out.println(sw.getMessage());
}
};
FilterQuery fq = new FilterQuery();
String keywords[] = { "federer", "nadal", "#Salute" };
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
try {
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("returning statuses");
twitterStream.shutdown();
return statuses;
}
}