sobre semaforos programacion productor problema monitores lectores hilos escritores consumidor concurrente java concurrency

productor - semaforos programacion concurrente java



concurrencia java: muchos escritores, un lector (9)

¿Has mirado en ScheduledThreadPoolExecutor ? Podría usar eso para programar a sus escritores, que podrían escribir en una colección concurrente, como el ConcurrentLinkedQueue mencionado por @Chris Dail. Puede tener un trabajo programado por separado para leer desde la cola según sea necesario, y el SDK de Java debe manejar casi todas sus inquietudes de concurrencia, sin necesidad de bloqueo manual.

Necesito reunir algunas estadísticas en mi software y estoy intentando que sea rápido y correcto, lo cual no es fácil (¡para mí!)

Primero mi código hasta ahora con dos clases, un StatsService y un StatsHarvester

public class StatsService { private Map<String, Long> stats = new HashMap<String, Long>(1000); public void notify ( String key ) { Long value = 1l; synchronized (stats) { if (stats.containsKey(key)) { value = stats.get(key) + 1; } stats.put(key, value); } } public Map<String, Long> getStats ( ) { Map<String, Long> copy; synchronized (stats) { copy = new HashMap<String, Long>(stats); stats.clear(); } return copy; } }

esta es mi segunda clase, un recolector que recopila las estadísticas de vez en cuando y las escribe en una base de datos.

public class StatsHarvester implements Runnable { private StatsService statsService; private Thread t; public void init ( ) { t = new Thread(this); t.start(); } public synchronized void run ( ) { while (true) { try { wait(5 * 60 * 1000); // 5 minutes collectAndSave(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void collectAndSave ( ) { Map<String, Long> stats = statsService.getStats(); // do something like: // saveRecords(stats); } }

En el tiempo de ejecución, tendrá cerca de 30 subprocesos en ejecución simultáneos, cada notify(key) llamada notify(key) unas 100 veces. Sólo un StatsHarvester está llamando statsService.getStats()

Así que tengo muchos escritores y un solo lector. Sería bueno tener estadísticas precisas, pero no me importa si algunos registros se pierden en alta concurrencia.

El lector debe ejecutar cada 5 minutos o lo que sea razonable.

La escritura debe ser lo más rápida posible. La lectura debe ser rápida, pero si se bloquea durante unos 300 ms cada 5 minutos, está bien.

He leído muchos documentos (Java concurrencia en la práctica, java efectivo, etc.), pero tengo la fuerte sensación de que necesito tu consejo para hacerlo bien.

Espero haber expresado mi problema de manera clara y lo suficientemente breve como para obtener una ayuda valiosa.

EDITAR

Gracias a todos por sus respuestas detalladas y útiles. Como esperaba, hay más de una forma de hacerlo.

Probé la mayoría de sus propuestas (aquellas que entendí) y cargué un proyecto de prueba en Google Code para futuras referencias (proyecto de Maven)

http://code.google.com/p/javastats/

He probado diferentes implementaciones de mi StatsService

  • HashMapStatsService (HMSS)
  • Servicio de mapa de hash concurrente (CHMSS)
  • LinkedQueueStatsService (LQSS)
  • GoogleStatsService (GSS)
  • ExecutorConcurrentHashMapStatsService (ECHMSS)
  • ExecutorHashMapStatsService (EHMSS)

y los probé con x número de subprocesos en cada llamada, notificando y veces, los resultados están en ms

10,100 10,1000 10,5000 50,100 50,1000 50,5000 100,100 100,1000 100,5000 GSS 1 5 17 7 21 117 7 37 254 Summe: 466 ECHMSS 1 6 21 5 32 132 8 54 249 Summe: 508 HMSS 1 8 45 8 52 233 11 103 449 Summe: 910 EHMSS 1 5 24 7 31 113 8 67 235 Summe: 491 CHMSS 1 2 9 3 11 40 7 26 72 Summe: 171 LQSS 0 3 11 3 16 56 6 27 144 Summe: 266

En este momento, creo que usaré ConcurrentHashMap, ya que ofrece un buen rendimiento y es bastante fácil de entender.

¡Gracias por todas sus aportaciones! Janning


¿Por qué no usas java.util.concurrent.ConcurrentHashMap<K, V> ? Maneja todo internamente, evitando bloqueos inútiles en el mapa y ahorrándote mucho trabajo: no tendrás que preocuparte por las sincronizaciones al obtener y poner.

De la documentación:

Una tabla hash que admite la concurrencia total de las recuperaciones y la concurrencia esperada ajustable para las actualizaciones. Esta clase obedece a la misma especificación funcional que Hashtable e incluye versiones de los métodos correspondientes a cada método de Hashtable. Sin embargo, a pesar de que todas las operaciones son seguras para subprocesos, las operaciones de recuperación no implican el bloqueo , y no hay soporte para bloquear toda la tabla de una manera que impida todo acceso.

Puede especificar su nivel de concurrencia :

La concurrencia permitida entre las operaciones de actualización está guiada por el argumento opcional de constructor concurrencyLevel (valor predeterminado 16), que se usa como una sugerencia para el tamaño interno . La tabla está particionada internamente para intentar permitir el número indicado de actualizaciones simultáneas sin disputa. Debido a que la ubicación en las tablas hash es esencialmente aleatoria, la concurrencia real variará. Idealmente, debería elegir un valor para acomodar tantos subprocesos como siempre modificará la tabla al mismo tiempo . El uso de un valor significativamente más alto del que necesita puede desperdiciar espacio y tiempo, y un valor significativamente menor puede llevar a la contención de hilos. Pero las sobreestimaciones y subestimaciones en un orden de magnitud no suelen tener un impacto notable. Un valor de uno es apropiado cuando se sabe que solo un hilo se modificará y todos los demás solo leerán. Además, cambiar el tamaño de este o cualquier otro tipo de tabla hash es una operación relativamente lenta, por lo que, cuando sea posible, es una buena idea proporcionar estimaciones de los tamaños de tabla esperados en los constructores.

Como se sugiere en los comentarios, lea atentamente la documentación de ConcurrentHashMap , especialmente cuando se trata de operaciones atómicas o no atómicas.

Para tener la garantía de atomicidad, debe considerar qué operaciones son atómicas, desde ConcurrentMap interfaz ConcurrentMap sabrá que:

V putIfAbsent(K key, V value) V replace(K key, V value) boolean replace(K key,V oldValue, V newValue) boolean remove(Object key, Object value)

Se puede utilizar de forma segura.


Aquí se explica cómo hacerlo con un impacto mínimo en el rendimiento de los hilos que se miden. Esta es la solución más rápida posible en Java, sin tener que recurrir a registros de hardware especiales para el conteo de rendimiento.

Haga que cada hilo emita sus estadísticas independientemente de las otras, es decir, sin sincronización, a algún objeto de estadísticas. Haga que el campo que contiene el recuento sea volátil, de modo que esté vallado en memoria:

class Stats { public volatile long count; } class SomeRunnable implements Runnable { public void run() { doStuff(); stats.count++; } }

Haga que otro hilo, que contiene una referencia a todos los objetos Stats, vaya alrededor de todos ellos y agregue los conteos en todos los hilos:

public long accumulateStats() { long count = previousCount; for (Stats stat : allStats) { count += stat.count; } long resultDelta = count - previousCount; previousCount = count; return resultDelta; }

Este hilo del recolector también necesita que se le agregue un sleep () (o algún otro acelerador). Por ejemplo, puede generar periódicamente recuentos por segundo en la consola, para brindarle una vista "en vivo" de cómo se está ejecutando su aplicación.

Esto evita la sobrecarga de sincronización tanto como sea posible.

El otro truco a considerar es rellenar los objetos de Estadísticas a 128 (o 256 bytes en SandyBridge o posterior), para mantener los diferentes conteos de hilos en diferentes líneas de caché, o habrá una disputa de almacenamiento en caché en la CPU.

Cuando solo se lee un hilo y se escribe uno, no se necesitan bloqueos ni atómicos, basta con una volatilidad. Todavía habrá cierta contención de subprocesos, cuando el subproceso del lector de estadísticas interactúe con la línea de caché de la CPU del subproceso que se está midiendo. Esto no se puede evitar, pero es la forma de hacerlo con un impacto mínimo en el hilo en ejecución; lea las estadísticas tal vez una vez por segundo o menos.


Como Jack se estaba escapando a usted, puede usar la biblioteca java.util.concurrent que incluye un ConcurrentHashMap y AtomicLong. Puede poner AtomicLong en caso de que no esté presente, puede incrementar el valor. Dado que AtomicLong es seguro para subprocesos, podrá incrementar la variable sin tener que preocuparse por un problema de concurrencia.

public void notify(String key) { AtomicLong value = stats.get(key); if (value == null) { value = stats.putIfAbsent(key, new AtomicLong(1)); } if (value != null) { value.incrementAndGet(); } }

Esto debería ser rápido y seguro para la rosca

Edición: Refactored sligthly por lo que solo hay como máximo dos búsquedas.


La respuesta de Chris Dail parece ser un buen enfoque.

Otra alternativa sería usar un Multiset concurrente. Hay uno en la biblioteca de colecciones de Google . Puedes usar esto de la siguiente manera:

private Multiset<String> stats = ConcurrentHashMultiset.create(); public void notify ( String key ) { stats.add(key, 1); }

Mirando la source , esto se implementa usando un ConcurrentHashMap y usando putIfAbsent y la versión de replace de tres argumentos para detectar modificaciones y reintentos concurrentes.


Otra alternativa para implementar ambos métodos utilizando ReentranReadWriteLock . Esta implementación protege contra las condiciones de carrera en el método getStats, si necesita borrar los contadores. También elimina el AtomicLong mutable de los getStats y utiliza un Long inmutable.

public class StatsService { private final Map<String, AtomicLong> stats = new HashMap<String, AtomicLong>(1000); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public void notify(final String key) { r.lock(); AtomicLong count = stats.get(key); if (count == null) { r.unlock(); w.lock(); count = stats.get(key); if(count == null) { count = new AtomicLong(); stats.put(key, count); } r.lock(); w.unlock(); } count.incrementAndGet(); r.unlock(); } public Map<String, Long> getStats() { w.lock(); Map<String, Long> copy = new HashMap<String, Long>(); for(Entry<String,AtomicLong> entry : stats.entrySet() ){ copy.put(entry.getKey(), entry.getValue().longValue()); } stats.clear(); w.unlock(); return copy; } }

Espero que esto ayude, cualquier comentario es bienvenido!


Si ignoramos la parte de recolección y nos enfocamos en la escritura, el principal cuello de botella del programa es que las estadísticas están bloqueadas a un nivel de granularidad muy aproximado. Si dos hilos quieren actualizar claves diferentes, deben esperar.

Si conoce el conjunto de claves de antemano y puede preinicializar el mapa de modo que para cuando llegue un subproceso de actualización se garantice la existencia de la clave, podrá realizar el bloqueo en la variable del acumulador en lugar de todo el mapa, o usar un objeto acumulador de hilo seguro.

En lugar de implementar esto usted mismo, hay implementaciones de mapas que están diseñadas específicamente para la concurrencia y hacen este bloqueo más preciso para usted.

Una advertencia, sin embargo, son las estadísticas, ya que necesitaría obtener bloqueos en todos los acumuladores aproximadamente al mismo tiempo. Si usa un mapa compatible con la concurrencia existente, podría haber una construcción para obtener una instantánea.


Sugeriría echar un vistazo a la biblioteca util.concurrent de Java. Creo que puedes implementar esta solución mucho más limpia. No creo que necesites un mapa aquí. Recomendaría implementar esto usando el ConcurrentLinkedQueue . Cada ''productor'' puede escribir libremente en esta cola sin preocuparse por los demás. Puede poner un objeto en la cola con los datos de sus estadísticas.

El recolector puede consumir la cola continuamente extrayendo los datos y procesándolos. Entonces puede almacenarlo como necesite.


Un enfoque diferente del problema es explotar la seguridad (trivial) de los hilos mediante el confinamiento de hilos. Básicamente, cree un único hilo de fondo que se ocupe de la lectura y la escritura. Tiene unas características bastante buenas en términos de escalabilidad y simplicidad.

La idea es que en lugar de todos los subprocesos que intentan actualizar los datos directamente, producen una tarea de "actualización" para que el subproceso en segundo plano se procese. El mismo hilo también puede hacer la tarea de lectura, asumiendo que algunos retrasos en el procesamiento de actualizaciones son tolerables.

Este diseño es bastante bueno porque los subprocesos ya no tendrán que competir por un bloqueo para actualizar los datos, y como el mapa se limita a un solo subproceso, simplemente puede usar un HashMap simple para obtener / poner, etc. En términos de implementación , significaría crear un ejecutor de un solo hilo y enviar tareas de escritura que también puedan realizar la operación opcional "collectAndSave".

Un bosquejo de código puede parecerse a lo siguiente:

public class StatsService { private ExecutorService executor = Executors.newSingleThreadExecutor(); private final Map<String,Long> stats = new HashMap<String,Long>(); public void notify(final String key) { Runnable r = new Runnable() { public void run() { Long value = stats.get(key); if (value == null) { value = 1L; } else { value++; } stats.put(key, value); // do the optional collectAndSave periodically if (timeToDoCollectAndSave()) { collectAndSave(); } } }; executor.execute(r); } }

Existe un BlockingQueue asociado con un ejecutor, y cada hilo que produce una tarea para el StatsService usa el BlockingQueue. El punto clave es este: la duración del bloqueo para esta operación debe ser mucho más corta que la duración del bloqueo en el código original, por lo que la contención debe ser mucho menor. En general, debería dar como resultado un rendimiento y una latencia mucho mejores.

Otro beneficio es que dado que solo un hilo lee y escribe en el mapa, se puede usar HashMap simple y tipo primitivo largo (no se trata de HashMap Concurrente ni de tipos atómicos). Esto también simplifica el código que en realidad lo procesa mucho.

Espero eso ayude.