java synchronization thread-safety concurrenthashmap

java - ConcurrentHashMap: evite la creación de objetos adicionales con "putIfAbsent"?



synchronization thread-safety (7)

Estoy agregando valores múltiples para claves en un entorno de subprocesos múltiples. Las llaves no se conocen de antemano. Pensé que haría algo como esto:

class Aggregator { protected ConcurrentHashMap<String, List<String>> entries = new ConcurrentHashMap<String, List<String>>(); public Aggregator() {} public void record(String key, String value) { List<String> newList = Collections.synchronizedList(new ArrayList<String>()); List<String> existingList = entries.putIfAbsent(key, newList); List<String> values = existingList == null ? newList : existingList; values.add(value); } }

El problema que veo es que cada vez que se ejecuta este método, necesito crear una nueva instancia de ArrayList , que luego elimino (en la mayoría de los casos). Esto parece un abuso injustificado del recolector de basura. ¿Hay alguna forma mejor y más segura de inicializar este tipo de estructura sin tener que synchronize el método de record ? Estoy algo sorprendido por la decisión de que el método putIfAbsent no devuelva el elemento recién creado, y por la falta de una forma de aplazar la instanciación a menos que se putIfAbsent (por así decirlo).


A partir de Java-8 puede crear Multi Mapas usando el siguiente patrón:

public void record(String key, String value) { entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())) .add(value); }

La documentación de ConcurrentHashMap (no el contrato general) especifica que ArrayList solo se creará una vez para cada clave, por el pequeño costo inicial de demorar las actualizaciones mientras ArrayList se está creando para una nueva clave:

http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function-


Al final, implementé una ligera modificación de la respuesta de @ Bohemian. Su solución propuesta sobrescribe la variable de values con la llamada putIfAbsent , que crea el mismo problema que tuve antes. El código que parece funcionar tiene el siguiente aspecto:

public void record(String key, String value) { List<String> values = entries.get(key); if (values == null) { values = Collections.synchronizedList(new ArrayList<String>()); List<String> values2 = entries.putIfAbsent(key, values); if (values2 != null) values = values2; } values.add(value); }

No es tan elegante como me gustaría, pero es mejor que el original que crea una nueva instancia de ArrayList en cada llamada.


Creó dos versiones basadas en la respuesta de Gene

public static <K,V> void putIfAbsetMultiValue(ConcurrentHashMap<K,List<V>> entries, K key, V value) { List<V> values = entries.get(key); if (values == null) { values = Collections.synchronizedList(new ArrayList<V>()); List<V> values2 = entries.putIfAbsent(key, values); if (values2 != null) values = values2; } values.add(value); } public static <K,V> void putIfAbsetMultiValueSet(ConcurrentMap<K,Set<V>> entries, K key, V value) { Set<V> values = entries.get(key); if (values == null) { values = Collections.synchronizedSet(new HashSet<V>()); Set<V> values2 = entries.putIfAbsent(key, values); if (values2 != null) values = values2; } values.add(value); }

Funciona bien


El enfoque con putIfAbsent tiene el tiempo de ejecución más rápido, es de 2 a 50 veces más rápido que el enfoque "lambda" en entornos con alta contención. El Lambda no es la razón detrás de este "powerloss", el problema es la sincronización obligatoria dentro del cómputo si está presente antes de las optimizaciones de Java-9.

el punto de referencia:

import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class ConcurrentHashMapTest { private final static int numberOfRuns = 1000000; private final static int numberOfThreads = Runtime.getRuntime().availableProcessors(); private final static int keysSize = 10; private final static String[] strings = new String[keysSize]; static { for (int n = 0; n < keysSize; n++) { strings[n] = "" + (char) (''A'' + n); } } public static void main(String[] args) throws InterruptedException { for (int n = 0; n < 20; n++) { testPutIfAbsent(); testComputeIfAbsentLamda(); } } private static void testPutIfAbsent() throws InterruptedException { final AtomicLong totalTime = new AtomicLong(); final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>(); final Random random = new Random(); ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executorService.execute(new Runnable() { @Override public void run() { long start, end; for (int n = 0; n < numberOfRuns; n++) { String s = strings[random.nextInt(strings.length)]; start = System.nanoTime(); AtomicInteger count = map.get(s); if (count == null) { count = new AtomicInteger(0); AtomicInteger prevCount = map.putIfAbsent(s, count); if (prevCount != null) { count = prevCount; } } count.incrementAndGet(); end = System.nanoTime(); totalTime.addAndGet(end - start); } } }); } executorService.shutdown(); executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName() + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns"); } private static void testComputeIfAbsentLamda() throws InterruptedException { final AtomicLong totalTime = new AtomicLong(); final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>(); final Random random = new Random(); ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { executorService.execute(new Runnable() { @Override public void run() { long start, end; for (int n = 0; n < numberOfRuns; n++) { String s = strings[random.nextInt(strings.length)]; start = System.nanoTime(); AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0)); count.incrementAndGet(); end = System.nanoTime(); totalTime.addAndGet(end - start); } } }); } executorService.shutdown(); executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName() + " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns"); } }

Los resultados:

Test testPutIfAbsent average time per run: 115.756501 ns Test testComputeIfAbsentLamda average time per run: 276.9667055 ns Test testPutIfAbsent average time per run: 134.2332435 ns Test testComputeIfAbsentLamda average time per run: 223.222063625 ns Test testPutIfAbsent average time per run: 119.968893625 ns Test testComputeIfAbsentLamda average time per run: 216.707419875 ns Test testPutIfAbsent average time per run: 116.173902375 ns Test testComputeIfAbsentLamda average time per run: 215.632467375 ns Test testPutIfAbsent average time per run: 112.21422775 ns Test testComputeIfAbsentLamda average time per run: 210.29563725 ns Test testPutIfAbsent average time per run: 120.50643475 ns Test testComputeIfAbsentLamda average time per run: 200.79536475 ns


Este es un problema. También busqué una respuesta. El método putIfAbsent no resuelve realmente el problema de creación de objetos adicionales, solo se asegura de que uno de esos objetos no reemplace a otro. Pero las condiciones de carrera entre subprocesos pueden provocar la creación de instancias de objetos múltiples. Pude encontrar 3 soluciones para este problema (y seguiría este orden de preferencia):

1- Si está en Java 8, la mejor manera de lograr esto es probablemente el nuevo método computeIfAbsent de ConcurrentMap . Solo necesita darle una función de cálculo que se ejecutará de forma síncrona (al menos para la implementación de ConcurrentHashMap ). Ejemplo:

private final ConcurrentMap<String, List<String>> entries = new ConcurrentHashMap<String, List<String>>(); public void method1(String key, String value) { entries.computeIfAbsent(key, s -> new ArrayList<String>()) .add(value); }

Esto es del javadoc de ConcurrentHashMap.computeIfAbsent :

Si la clave especificada ya no está asociada con un valor, intenta calcular su valor utilizando la función de mapeo dada y lo ingresa en este mapa a menos que sea nulo. La invocación completa del método se realiza atómicamente, por lo que la función se aplica como máximo una vez por clave. Algunas operaciones de actualización intentadas en este mapa por otros hilos pueden ser bloqueadas mientras el cálculo está en progreso, por lo que el cálculo debe ser breve y simple, y no debe intentar actualizar ninguna otra asignación de este mapa.

2- Si no puede usar Java 8, puede usar LoadingCache Guava , que es seguro para subprocesos. Usted define una función de carga (al igual que la función de compute anterior), y puede estar seguro de que se llamará sincrónicamente. Ejemplo:

private final LoadingCache<String, List<String>> entries = CacheBuilder.newBuilder() .build(new CacheLoader<String, List<String>>() { @Override public List<String> load(String s) throws Exception { return new ArrayList<String>(); } }); public void method2(String key, String value) { entries.getUnchecked(key).add(value); }

3- Si tampoco puede usar Guava, siempre puede sincronizar manualmente y hacer un doble bloqueo comprobado. Ejemplo:

private final ConcurrentMap<String, List<String>> entries = new ConcurrentHashMap<String, List<String>>(); public void method3(String key, String value) { List<String> existing = entries.get(key); if (existing != null) { existing.add(value); } else { synchronized (entries) { List<String> existingSynchronized = entries.get(key); if (existingSynchronized != null) { existingSynchronized.add(value); } else { List<String> newList = new ArrayList<>(); newList.add(value); entries.put(key, newList); } } } }

Hice una implementación de ejemplo de todos los 3 métodos y, además, el método no sincronizado, que causa la creación de objetos adicionales: http://pastebin.com/qZ4DUjTr


Java 8 introdujo una API para atender este problema exacto, haciendo una solución de 1 línea:

public void record(String key, String value) { entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())).add(value); }

Para Java 7:

public void record(String key, String value) { List<String> values = entries.get(key); if (values == null) { entries.putIfAbsent(key, Collections.synchronizedList(new ArrayList<String>())); // At this point, there will definitely be a list for the key. // We don''t know or care which thread''s new object is in there, so: values = entries.get(key); } values.add(value); }

Este es el patrón de código estándar cuando se completa un ConcurrentHashMap .

El método especial putIfAbsent(K, V)) colocará su objeto de valor, o si otro hilo tiene delante de usted, entonces ignorará su objeto de valor. De cualquier manera, después de la llamada a putIfAbsent(K, V)) , se garantiza que get(key) es coherente entre hilos y, por lo tanto, el código anterior es threadsafe.

La única sobrecarga desperdiciada es si algún otro subproceso agrega una nueva entrada al mismo tiempo para la misma clave: puede terminar tirando el valor recién creado, pero eso solo ocurre si no hay una entrada y hay una carrera que su el hilo pierde, lo que normalmente sería raro.