java comet servlet-3.0

java - Servlet-3 Async Context, ¿cómo hacer escrituras asíncronas?



comet servlet-3.0 (6)

Descripción del problema

Servlet-3.0 API permite separar un contexto de solicitud / respuesta y responder a él más tarde.

Sin embargo, si intento escribir una gran cantidad de datos, algo como:

AsyncContext ac = getWaitingContext() ; ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(some_big_data); out.flush()

En realidad puede bloquear, y se bloquea en casos de prueba triviales, tanto para Tomcat 7 como para Jetty 8. Los tutoriales recomiendan crear un grupo de subprocesos que manejaría dicha configuración, que generalmente es contrapropuesta a una arquitectura tradicional de 10K.

Sin embargo, si tengo 10,000 conexiones abiertas y un grupo de subprocesos de, digamos, 10 subprocesos, es suficiente incluso para el 1% de clientes que tienen conexiones de baja velocidad o simplemente bloquean la conexión para bloquear el grupo de subprocesos y bloquear completamente la respuesta del cometa o ralentizarla significativamente.

La práctica esperada es obtener una notificación de "escritura preparada" o una notificación de finalización de E / S y continuar presionando los datos.

¿Cómo se puede hacer esto usando Servlet-3.0 API, es decir, cómo puedo obtener:

  • Notificación de finalización asincrónica en la operación de E / S.
  • Obtenga E / S sin bloqueo con notificación de escritura preparada.

Si esto no es compatible con la API Servlet-3.0, ¿hay alguna API específica de servidor web (como Jetty Continuation o Tomcat CometEvent) que permita manejar dichos eventos de manera realmente asincrónica sin falsificar la E / S asincrónica utilizando el grupo de subprocesos.

¿Alguien sabe?

Y si esto no es posible, ¿puede confirmarlo con una referencia a la documentación?

Demostración de problemas en un código de muestra

He adjuntado el siguiente código que emula la secuencia de eventos.

Notas:

  • usa ServletOutputStream que arroja IOException para detectar clientes desconectados
  • envía mensajes de keep-alive para asegurarse de que los clientes estén todavía allí
  • Creé un grupo de subprocesos para "emular" las operaciones asincrónicas.

En ese ejemplo, definí explícitamente el grupo de subprocesos del tamaño 1 para mostrar el problema:

  • Comience una aplicación
  • Ejecutar desde dos terminales curl http://localhost:8080/path/to/app (dos veces)
  • Ahora envíe los datos con curd -dm=message http://localhost:8080/path/to/app
  • Ambos clientes recibieron los datos
  • Ahora suspenda uno de los clientes (Ctrl + Z) y envíe el mensaje una vez más curd -dm=message http://localhost:8080/path/to/app
  • Observe que otro cliente no suspendido no recibió nada o, una vez que se transfirió el mensaje, dejó de recibir solicitudes de mantener activo debido a que otro hilo está bloqueado.

Quiero resolver ese problema sin usar el grupo de subprocesos, porque con 1000-5000 conexiones abiertas puedo agotar el grupo de subprocesos muy rápido.

El código de muestra a continuación.

import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; import javax.servlet.AsyncContext; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.ServletOutputStream; @WebServlet(urlPatterns = "", asyncSupported = true) public class HugeStreamWithThreads extends HttpServlet { private long id = 0; private String message = ""; private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); // it is explicitly small for demonstration purpose private final Thread timer = new Thread(new Runnable() { public void run() { try { while(true) { Thread.sleep(1000); sendKeepAlive(); } } catch(InterruptedException e) { // exit } } }); class RunJob implements Runnable { volatile long lastUpdate = System.nanoTime(); long id = 0; AsyncContext ac; RunJob(AsyncContext ac) { this.ac = ac; } public void keepAlive() { if(System.nanoTime() - lastUpdate > 1000000000L) pool.submit(this); } String formatMessage(String msg) { StringBuilder sb = new StringBuilder(); sb.append("id"); sb.append(id); for(int i=0;i<100000;i++) { sb.append("data:"); sb.append(msg); sb.append("/n"); } sb.append("/n"); return sb.toString(); } public void run() { String message = null; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) { this.id = HugeStreamWithThreads.this.id; message = HugeStreamWithThreads.this.message; } } if(message == null) message = ":keep-alive/n/n"; else message = formatMessage(message); if(!sendMessage(message)) return; boolean once_again = false; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) once_again = true; } if(once_again) pool.submit(this); } boolean sendMessage(String message) { try { ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(message); out.flush(); lastUpdate = System.nanoTime(); return true; } catch(IOException e) { ac.complete(); removeContext(this); return false; } } }; private HashSet<RunJob> asyncContexts = new HashSet<RunJob>(); @Override public void init(ServletConfig config) throws ServletException { super.init(config); timer.start(); } @Override public void destroy() { for(;;){ try { timer.interrupt(); timer.join(); break; } catch(InterruptedException e) { continue; } } pool.shutdown(); super.destroy(); } protected synchronized void removeContext(RunJob ac) { asyncContexts.remove(ac); } // GET method is used to establish a stream connection @Override protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // Content-Type header response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); // Access-Control-Allow-Origin header response.setHeader("Access-Control-Allow-Origin", "*"); final AsyncContext ac = request.startAsync(); ac.setTimeout(0); RunJob job = new RunJob(ac); asyncContexts.add(job); if(id!=0) { pool.submit(job); } } private synchronized void sendKeepAlive() { for(RunJob job : asyncContexts) { job.keepAlive(); } } // POST method is used to communicate with the server @Override protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setCharacterEncoding("utf-8"); id++; message = request.getParameter("m"); for(RunJob job : asyncContexts) { pool.submit(job); } } }

El ejemplo anterior usa subprocesos para evitar el bloqueo ... Sin embargo, si el número de clientes de bloqueo es mayor que el tamaño del grupo de subprocesos, se bloquearía.

¿Cómo podría implementarse sin bloquear?




Encontré que la Servlet 3.0 Asynchronous Servlet 3.0 difícil de implementar correctamente y la documentación útil es escasa. Después de muchos intentos de prueba y error y probando muchos enfoques diferentes, pude encontrar una solución robusta con la que estoy muy feliz. Cuando miro mi código y lo comparo con el suyo, noto una gran diferencia que puede ayudarlo con su problema en particular. Uso una ServletResponse para escribir los datos y no un ServletOutputStream .

Aquí mi clase Go-to asíncrona del servlet se adaptó levemente para su caso some_big_data :

import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebInitParam; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; import org.apache.log4j.Logger; @javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") }) public class AsyncServlet extends HttpServlet { private static final Logger logger = Logger.getLogger(AsyncServlet.class); public static final int CALLBACK_TIMEOUT = 10000; // ms /** executor service */ private ExecutorService exec; @Override public void init(ServletConfig config) throws ServletException { super.init(config); int size = Integer.parseInt(getInitParameter("threadpoolsize")); exec = Executors.newFixedThreadPool(size); } @Override public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { final AsyncContext ctx = req.startAsync(); final HttpSession session = req.getSession(); // set the timeout ctx.setTimeout(CALLBACK_TIMEOUT); // attach listener to respond to lifecycle events of this AsyncContext ctx.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent event) throws IOException { logger.info("onComplete called"); } @Override public void onTimeout(AsyncEvent event) throws IOException { logger.info("onTimeout called"); } @Override public void onError(AsyncEvent event) throws IOException { logger.info("onError called: " + event.toString()); } @Override public void onStartAsync(AsyncEvent event) throws IOException { logger.info("onStartAsync called"); } }); enqueLongRunningTask(ctx, session); } /** * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact) * <p/> * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked). */ private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) { exec.execute(new Runnable() { @Override public void run() { String some_big_data = getSomeBigData(); try { ServletResponse response = ctx.getResponse(); if (response != null) { response.getWriter().write(some_big_data); ctx.complete(); } else { throw new IllegalStateException(); // this is caught below } } catch (IllegalStateException ex) { logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called. } catch (Exception e) { logger.error("ERROR IN AsyncServlet", e); } } }); } /** destroy the executor */ @Override public void destroy() { exec.shutdown(); } }


He echado un vistazo rápido a su lista, por lo que puede haber perdido algunos puntos. La ventaja de un hilo de grupo es compartir recursos de hilo entre varias tareas a lo largo del tiempo. Tal vez pueda resolver su problema espaciando las respuestas keepAlive de diferentes conexiones http, en lugar de agruparlas todas al mismo tiempo.


No podemos hacer que las escrituras sean asincrónicas. Realmente tenemos que vivir con la limitación de que cuando escribimos algo a un cliente, esperamos poder hacerlo rápidamente y podemos tratarlo como un error si no lo hacemos. Es decir, si nuestro objetivo es transmitir datos al cliente lo más rápido posible y usar el estado de bloqueo / no bloqueo del canal como forma de controlar el flujo, no tendremos suerte. Pero, si estamos enviando datos a un ritmo bajo que un cliente debería poder manejar, podemos, al menos, desconectar de inmediato los clientes que no leen con la suficiente rapidez.

Por ejemplo, en su aplicación, enviamos los keepalives a un ritmo lento (cada pocos segundos) y esperamos que los clientes puedan mantenerse al día con todos los eventos que se envían. Derrochamos los datos al cliente, y si no puede mantener el ritmo, podemos desconectarlo de manera rápida y limpia. Eso es un poco más limitado que la verdadera E / S asíncrona, pero debería satisfacer su necesidad (y, por cierto, la mía).

El truco es que todos los métodos para escribir resultados que solo arrojan IOExceptions realmente hacen un poco más que eso: en la implementación, todas las llamadas a cosas que pueden ser interrumpidas () ed se envolverán con algo como esto (tomado de Jetty 9):

catch (InterruptedException x) throw (IOException)new InterruptedIOException().initCause(x);

(También observo que esto no ocurre en Jetty 8, donde se registra una interruptedException y el ciclo de bloqueo se vuelve a intentar inmediatamente. Presumiblemente, debe asegurarse de que su contenedor de servlets se comporte correctamente para usar este truco).

Es decir, cuando un cliente lento hace que se bloquee un hilo de escritura, simplemente obligamos a que se genere la escritura como una IOException llamando a interrupt () en el hilo. Piénselo: el código sin bloqueo consumiría una unidad de tiempo en uno de nuestros subprocesos de procesamiento para ejecutarse de todos modos, por lo que el uso de escrituras de bloqueo que simplemente se cancelan (digamos un milisegundo) es realmente idéntico en principio. Todavía estamos masticando un corto período de tiempo en el hilo, solo un poco menos eficientemente.

Modifiqué su código para que el hilo del temporizador principal ejecute un trabajo para enlazar el tiempo en cada escritura justo antes de que comencemos la escritura, y el trabajo se cancela si la escritura se completa rápidamente, lo que debería hacer.

Una nota final: en un contenedor de servlets bien implementado, hacer que la E / S se descarte debería ser seguro. Sería bueno si pudiéramos atrapar la InterruptedIOException e intentar escribir de nuevo más tarde. Tal vez nos gustaría dar a los clientes lentos un subconjunto de los eventos si no pueden mantenerse al día con la secuencia completa. Por lo que puedo decir, en Jetty esto no es completamente seguro. Si se produce una escritura, el estado interno del objeto HttpResponse puede no ser lo suficientemente constante como para manejar el reingreso a la escritura de forma segura más adelante. Espero que no sea prudente tratar de empujar un contenedor de servlets de esta manera, a menos que haya documentos específicos que me he olvidado de ofrecer esta garantía. Creo que la idea es que una conexión esté diseñada para cerrarse si ocurre una IOException.

Aquí está el código, con una versión modificada de RunJob :: run () usando una simple ilustración grotesca (en realidad, nos gustaría usar el hilo del temporizador principal aquí en lugar de girar uno por escritura, que es tonto).

public void run() { String message = null; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) { this.id = HugeStreamWithThreads.this.id; message = HugeStreamWithThreads.this.message; } } if(message == null) message = ":keep-alive/n/n"; else message = formatMessage(message); final Thread curr = Thread.currentThread(); Thread canceller = new Thread(new Runnable() { public void run() { try { Thread.sleep(2000); curr.interrupt(); } catch(InterruptedException e) { // exit } } }); canceller.start(); try { if(!sendMessage(message)) return; } finally { canceller.interrupt(); while (true) { try { canceller.join(); break; } catch (InterruptedException e) { } } } boolean once_again = false; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) once_again = true; } if(once_again) pool.submit(this); }