Java BlockingQueue latencia alta en Linux
multithreading latency (4)
Estoy usando BlockingQueue: s (probando ArrayBlockingQueue y LinkedBlockingQueue) para pasar objetos entre diferentes subprocesos en una aplicación en la que estoy trabajando actualmente. El rendimiento y la latencia son relativamente importantes en esta aplicación, por lo que tuve curiosidad por cuánto tiempo se tarda en pasar objetos entre dos subprocesos usando un BlockingQueue. Para medir esto, escribí un programa simple con dos subprocesos (un consumidor y un productor), donde permito al productor pasar una marca de tiempo (tomada con System.nanoTime ()) al consumidor, consulte el código a continuación.
Recuerdo haber leído en algún lugar de algún foro que tardó unos 10 microsegundos para alguien que probó esto (no sé en qué sistema operativo y hardware estaba encendido), así que no me sorprendí demasiado cuando me tomó unos 30 microsegundos en mi Windows 7 Box (Intel E7500 Core 2 Duo CPU, 2.93GHz), mientras ejecuta muchas otras aplicaciones en segundo plano. Sin embargo, me sorprendió bastante cuando hice la misma prueba en nuestro servidor Linux mucho más rápido (dos CPU Intel X5677 de 466 GHz de cuatro núcleos, ejecutando Debian 5 con el kernel 2.6.26-2-amd64). Esperaba que la latencia fuera más baja que en mi caja de ventanas, pero por el contrario, era mucho más alta: ¡~ 75 - 100 microsegundos! Ambas pruebas se realizaron con Sun''s Hotspot JVM versión 1.6.0-23.
¿Alguien más ha realizado pruebas similares con resultados similares en Linux? ¿O alguien sabe por qué es mucho más lento en Linux (con mejor hardware), podría ser que el cambio de hilo simplemente sea mucho más lento en Linux en comparación con Windows? Si ese es el caso, parece que Windows es en realidad mucho más adecuado para algún tipo de aplicaciones. Cualquier ayuda para ayudarme a entender las cifras relativamente altas es muy apreciada.
Editar:
Después de un comentario de DaveC, también hice una prueba en la que restringí la JVM (en la máquina Linux) a un solo núcleo (es decir, todos los subprocesos que se ejecutan en el mismo núcleo). Esto cambió los resultados dramáticamente: la latencia se redujo a menos de 20 microsegundos, es decir, mejor que los resultados en la máquina de Windows. También hice algunas pruebas en las que restringí el subproceso productor a un núcleo y el subproceso del consumidor a otro (intentando que ambos estuvieran en el mismo socket y en diferentes sockets), pero esto no parecía ayudar: la latencia seguía siendo de ~ 75 microsegundos Por cierto, esta aplicación de prueba es prácticamente todo lo que estoy ejecutando en la máquina mientras realizo la prueba.
¿Alguien sabe si estos resultados tienen sentido? ¿Realmente debería ser mucho más lento si el productor y el consumidor se ejecutan en diferentes núcleos? Cualquier entrada es realmente apreciada.
Editado de nuevo (6 de enero):
Experimenté con diferentes cambios en el código y el entorno de ejecución:
Actualicé el kernel de Linux a 2.6.36.2 (desde 2.6.26.2). Después de la actualización del kernel, el tiempo medido cambió a 60 microsegundos con variaciones muy pequeñas, de 75-100 antes de la actualización. La configuración de la afinidad de la CPU para los subprocesos del productor y del consumidor no tuvo ningún efecto, excepto al restringirlos al mismo núcleo. Cuando se ejecuta en el mismo núcleo, la latencia medida fue de 13 microsegundos.
En el código original, hice que el productor se fuera a dormir durante 1 segundo entre cada iteración, para dar al consumidor el tiempo suficiente para calcular el tiempo transcurrido e imprimirlo en la consola. Si elimino la llamada a Thread.sleep () y en lugar de eso, el productor y el consumidor llaman a barrier.await () en cada iteración (el consumidor la llama después de haber impreso el tiempo transcurrido en la consola), la latencia medida se reduce desde 60 microsegundos a menos de 10 microsegundos. Si ejecuta los subprocesos en el mismo núcleo, la latencia es inferior a 1 microsegundo. ¿Alguien puede explicar por qué esto redujo la latencia tan significativamente? Mi primera suposición fue que el cambio tuvo el efecto que el productor llamó queue.put () antes que el consumidor llamado queue.take (), por lo que el consumidor nunca tuvo que bloquear, pero después de jugar con una versión modificada de ArrayBlockingQueue, encontré esta suposición es falsa: el consumidor, de hecho, bloqueó. Si tiene alguna otra conjetura, por favor hágamelo saber. (Por cierto, si dejo que el productor llame a Thread.sleep () y barrier.await (), la latencia permanece en 60 microsegundos).
También probé otro enfoque: en lugar de llamar a queue.take (), llamé a queue.poll () con un tiempo de espera de 100 micros. Esto redujo la latencia promedio a menos de 10 microsegundos, pero, por supuesto, es mucho más intensivo en la CPU (pero, probablemente, menos intensivo en la CPU que ocupado).
Editado de nuevo (10 de enero) - Problema resuelto:
ninjalj sugirió que la latencia de ~ 60 microsegundos se debió a que la CPU tuvo que despertarse de estados de sueño más profundos, ¡y tenía toda la razón! Después de deshabilitar los estados C en BIOS, la latencia se redujo a <10 microsegundos. Esto explica por qué obtuve una latencia mucho mejor en el punto 2 anterior: cuando enviaba objetos con más frecuencia, la CPU se mantenía lo suficientemente ocupada como para no pasar a los estados de sueño más profundo. ¡Muchas gracias a todos los que se han tomado el tiempo de leer mi pregunta y compartir sus pensamientos aquí!
...
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;
public class QueueTest {
ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
Thread consumerThread;
CyclicBarrier barrier = new CyclicBarrier(2);
static final int RUNS = 500000;
volatile int sleep = 1000;
public void start() {
consumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.await();
for(int i = 0; i < RUNS; i++) {
consume();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
consumerThread.start();
try {
barrier.await();
} catch (Exception e) { e.printStackTrace(); }
for(int i = 0; i < RUNS; i++) {
try {
if(sleep > 0)
Thread.sleep(sleep);
produce();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void produce() {
try {
queue.put(System.nanoTime());
} catch (InterruptedException e) {
}
}
public void consume() {
try {
long t = queue.take();
long now = System.nanoTime();
long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
if(sleep > 0) {
System.out.println("Time: " + time);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
QueueTest test = new QueueTest();
System.out.println("Starting...");
// Run first once, ignoring results
test.sleep = 0;
test.start();
// Run again, printing the results
System.out.println("Starting again...");
test.sleep = 1000;
test.start();
}
}
@Peter Lawrey
Ciertas operaciones utilizan llamadas del sistema operativo (como bloqueo / barreras cíclicas)
Esas no son llamadas del sistema operativo (kernel). Implementado a través de un CAS simple (que en x86 también viene con una valla de memoria libre)
Una más: no use ArrayBlockingQueue a menos que sepa por qué (lo usa).
@OP: Mire ThreadPoolExecutor, que ofrece un excelente marco de productor / consumidor.
Editar abajo :
para reducir la latencia (dejando la espera ocupada), cambie la cola a SynchronousQueue agregue lo siguiente como antes de iniciar el consumidor
...
consumerThread.setPriority(Thread.MAX_PRIORITY);
consumerThread.start();
Esto es lo mejor que puedes conseguir.
Edit2: Aquí w / sync. cola. Y no imprimiendo los resultados.
package t1;
import java.math.BigDecimal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
public class QueueTest {
static final int RUNS = 250000;
final SynchronousQueue<Long> queue = new SynchronousQueue<Long>();
int sleep = 1000;
long[] results = new long[0];
public void start(final int runs) throws Exception {
results = new long[runs];
final CountDownLatch barrier = new CountDownLatch(1);
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
barrier.countDown();
try {
for(int i = 0; i < runs; i++) {
results[i] = consume();
}
} catch (Exception e) {
return;
}
}
});
consumerThread.setPriority(Thread.MAX_PRIORITY);
consumerThread.start();
barrier.await();
final long sleep = this.sleep;
for(int i = 0; i < runs; i++) {
try {
doProduce(sleep);
} catch (Exception e) {
return;
}
}
}
private void doProduce(final long sleep) throws InterruptedException {
produce();
}
public void produce() throws InterruptedException {
queue.put(new Long(System.nanoTime()));//new Long() is faster than value of
}
public long consume() throws InterruptedException {
long t = queue.take();
long now = System.nanoTime();
return now-t;
}
public static void main(String[] args) throws Throwable {
QueueTest test = new QueueTest();
System.out.println("Starting + warming up...");
// Run first once, ignoring results
test.sleep = 0;
test.start(15000);//10k is the normal warm-up for -server hotspot
// Run again, printing the results
System.gc();
System.out.println("Starting again...");
test.sleep = 1000;//ignored now
Thread.yield();
test.start(RUNS);
long sum = 0;
for (long elapsed: test.results){
sum+=elapsed;
}
BigDecimal elapsed = BigDecimal.valueOf(sum, 3).divide(BigDecimal.valueOf(test.results.length), BigDecimal.ROUND_HALF_UP);
System.out.printf("Avg: %1.3f micros%n", elapsed);
}
}
Si la latencia es crítica y no requiere una semántica FIFO estricta, es posible que desee considerar LinkedTransferQueue de JSR-166. Permite la eliminación para que las operaciones opuestas puedan intercambiar valores en lugar de sincronizarse en la estructura de datos de la cola. Este enfoque ayuda a reducir la contención, permite intercambios paralelos y evita las penalizaciones de activación / desactivación de subprocesos.
Su prueba no es una buena medida de la latencia de transferencia de cola porque tiene un solo hilo leyendo de la cola que se escribe de forma síncrona en System.out
(haciendo una Cadena y una concatenación larga mientras está en eso) antes de que vuelva a comenzar. Para medir esto correctamente, necesita mover esta actividad fuera de este hilo y hacer el menor trabajo posible en el hilo de toma.
Estaría mejor haciendo el cálculo (entonces-ahora) en el tomador y agregando el resultado a alguna otra colección que es drenada periódicamente por otro hilo que genera los resultados. Tiendo a hacer esto agregando a una estructura respaldada por un arreglo apropiadamente preseado a la que se accede mediante una referencia atómica (por lo tanto, el subproceso de informes solo tiene que obtener y establecer esa referencia con otra instancia de esa estructura de almacenamiento para obtener el último lote de resultados; por ejemplo, make 2 listas, establezca una como activa, cada subproceso xsa se activa y cambia las activas y las pasivas). Luego puede informar sobre alguna distribución en lugar de cada resultado individual (por ejemplo, un rango de deciles), lo que significa que no genera vastos archivos de registro con cada ejecución y obtiene información útil impresa para usted.
FWIW Estoy de acuerdo con las veces que Peter Lawrey declaró y si la latencia es realmente crítica, entonces debe pensar en esperar ocupado con la afinidad de CPU adecuada (es decir, dedicar un núcleo a ese hilo)
EDITAR después del 6 de enero
Si elimino la llamada a Thread.sleep () y en lugar de eso, el productor y el consumidor llaman a barrier.await () en cada iteración (el consumidor la llama después de haber impreso el tiempo transcurrido en la consola), la latencia medida se reduce desde 60 microsegundos a menos de 10 microsegundos. Si ejecuta los subprocesos en el mismo núcleo, la latencia es inferior a 1 microsegundo. ¿Alguien puede explicar por qué esto redujo la latencia tan significativamente?
Estás viendo la diferencia entre java.util.concurrent.locks.LockSupport#park
(y correspondiente correspondiente) y Thread#sleep
. La mayoría de las cosas de juc se basan en LockSupport
(a menudo a través de un AbstractQueuedSynchronizer
que ReentrantLock
proporciona o directamente) y esto (en Hotspot) se resuelve en sun.misc.Unsafe#park
(y unpark
) y esto tiende a terminar en las manos del pthread (Posix hilos) lib. Por lo general, pthread_cond_broadcast
para despertarse y pthread_cond_wait
o pthread_cond_timedwait
para cosas como BlockingQueue#take
.
No puedo decir que haya visto cómo se implementa realmente Thread#sleep
(porque nunca he encontrado algo de baja latencia que no sea una espera basada en la condición) pero me imagino que hace que sea degradado por el horario de una manera más agresiva que el mecanismo de señalización pthread y eso es lo que explica la diferencia de latencia.
Yo usaría solo un ArrayBlockingQueue si puedes. Cuando lo he usado, la latencia fue de 8-18 microsegundos en Linux. Algún punto de nota.
- El costo es en gran parte el tiempo que toma despertar el hilo. Cuando despierte un hilo, sus datos / código no estarán en la memoria caché, por lo que encontrará que si pasa lo que pasa después de que un hilo se haya despertado, puede demorar de 2 a 5 veces más que si tuviera que ejecutar el mismo programa repetidamente.
- Ciertas operaciones utilizan llamadas del sistema operativo (como bloqueo / barreras cíclicas) que a menudo son más costosas en un escenario de baja latencia que en espera activa. Le sugiero que intente ocuparse de esperar a su productor en lugar de usar un CyclicBarrier. También podría ocuparse de esperar a su consumidor, pero esto podría ser excesivamente caro en un sistema real.