java - ConcurrentHashMap: evite la creación de objetos adicionales con "putIfAbsent"?
synchronization thread-safety (7)
Estoy agregando valores múltiples para claves en un entorno de subprocesos múltiples. Las llaves no se conocen de antemano. Pensé que haría algo como esto:
class Aggregator {
protected ConcurrentHashMap<String, List<String>> entries =
new ConcurrentHashMap<String, List<String>>();
public Aggregator() {}
public void record(String key, String value) {
List<String> newList =
Collections.synchronizedList(new ArrayList<String>());
List<String> existingList = entries.putIfAbsent(key, newList);
List<String> values = existingList == null ? newList : existingList;
values.add(value);
}
}
El problema que veo es que cada vez que se ejecuta este método, necesito crear una nueva instancia de ArrayList
, que luego elimino (en la mayoría de los casos). Esto parece un abuso injustificado del recolector de basura. ¿Hay alguna forma mejor y más segura de inicializar este tipo de estructura sin tener que synchronize
el método de record
? Estoy algo sorprendido por la decisión de que el método putIfAbsent
no devuelva el elemento recién creado, y por la falta de una forma de aplazar la instanciación a menos que se putIfAbsent
(por así decirlo).
A partir de Java-8 puede crear Multi Mapas usando el siguiente patrón:
public void record(String key, String value) { entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())) .add(value); }
La documentación de ConcurrentHashMap (no el contrato general) especifica que ArrayList solo se creará una vez para cada clave, por el pequeño costo inicial de demorar las actualizaciones mientras ArrayList se está creando para una nueva clave:
Al final, implementé una ligera modificación de la respuesta de @ Bohemian. Su solución propuesta sobrescribe la variable de values
con la llamada putIfAbsent
, que crea el mismo problema que tuve antes. El código que parece funcionar tiene el siguiente aspecto:
public void record(String key, String value) {
List<String> values = entries.get(key);
if (values == null) {
values = Collections.synchronizedList(new ArrayList<String>());
List<String> values2 = entries.putIfAbsent(key, values);
if (values2 != null)
values = values2;
}
values.add(value);
}
No es tan elegante como me gustaría, pero es mejor que el original que crea una nueva instancia de ArrayList
en cada llamada.
Creó dos versiones basadas en la respuesta de Gene
public static <K,V> void putIfAbsetMultiValue(ConcurrentHashMap<K,List<V>> entries, K key, V value) {
List<V> values = entries.get(key);
if (values == null) {
values = Collections.synchronizedList(new ArrayList<V>());
List<V> values2 = entries.putIfAbsent(key, values);
if (values2 != null)
values = values2;
}
values.add(value);
}
public static <K,V> void putIfAbsetMultiValueSet(ConcurrentMap<K,Set<V>> entries, K key, V value) {
Set<V> values = entries.get(key);
if (values == null) {
values = Collections.synchronizedSet(new HashSet<V>());
Set<V> values2 = entries.putIfAbsent(key, values);
if (values2 != null)
values = values2;
}
values.add(value);
}
Funciona bien
El enfoque con putIfAbsent
tiene el tiempo de ejecución más rápido, es de 2 a 50 veces más rápido que el enfoque "lambda" en entornos con alta contención. El Lambda no es la razón detrás de este "powerloss", el problema es la sincronización obligatoria dentro del cómputo si está presente antes de las optimizaciones de Java-9.
el punto de referencia:
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class ConcurrentHashMapTest {
private final static int numberOfRuns = 1000000;
private final static int numberOfThreads = Runtime.getRuntime().availableProcessors();
private final static int keysSize = 10;
private final static String[] strings = new String[keysSize];
static {
for (int n = 0; n < keysSize; n++) {
strings[n] = "" + (char) (''A'' + n);
}
}
public static void main(String[] args) throws InterruptedException {
for (int n = 0; n < 20; n++) {
testPutIfAbsent();
testComputeIfAbsentLamda();
}
}
private static void testPutIfAbsent() throws InterruptedException {
final AtomicLong totalTime = new AtomicLong();
final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
final Random random = new Random();
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
long start, end;
for (int n = 0; n < numberOfRuns; n++) {
String s = strings[random.nextInt(strings.length)];
start = System.nanoTime();
AtomicInteger count = map.get(s);
if (count == null) {
count = new AtomicInteger(0);
AtomicInteger prevCount = map.putIfAbsent(s, count);
if (prevCount != null) {
count = prevCount;
}
}
count.incrementAndGet();
end = System.nanoTime();
totalTime.addAndGet(end - start);
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
+ " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
}
private static void testComputeIfAbsentLamda() throws InterruptedException {
final AtomicLong totalTime = new AtomicLong();
final ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<String, AtomicInteger>();
final Random random = new Random();
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
long start, end;
for (int n = 0; n < numberOfRuns; n++) {
String s = strings[random.nextInt(strings.length)];
start = System.nanoTime();
AtomicInteger count = map.computeIfAbsent(s, (k) -> new AtomicInteger(0));
count.incrementAndGet();
end = System.nanoTime();
totalTime.addAndGet(end - start);
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
System.out.println("Test " + Thread.currentThread().getStackTrace()[1].getMethodName()
+ " average time per run: " + (double) totalTime.get() / numberOfThreads / numberOfRuns + " ns");
}
}
Los resultados:
Test testPutIfAbsent average time per run: 115.756501 ns
Test testComputeIfAbsentLamda average time per run: 276.9667055 ns
Test testPutIfAbsent average time per run: 134.2332435 ns
Test testComputeIfAbsentLamda average time per run: 223.222063625 ns
Test testPutIfAbsent average time per run: 119.968893625 ns
Test testComputeIfAbsentLamda average time per run: 216.707419875 ns
Test testPutIfAbsent average time per run: 116.173902375 ns
Test testComputeIfAbsentLamda average time per run: 215.632467375 ns
Test testPutIfAbsent average time per run: 112.21422775 ns
Test testComputeIfAbsentLamda average time per run: 210.29563725 ns
Test testPutIfAbsent average time per run: 120.50643475 ns
Test testComputeIfAbsentLamda average time per run: 200.79536475 ns
Este es un problema. También busqué una respuesta. El método putIfAbsent
no resuelve realmente el problema de creación de objetos adicionales, solo se asegura de que uno de esos objetos no reemplace a otro. Pero las condiciones de carrera entre subprocesos pueden provocar la creación de instancias de objetos múltiples. Pude encontrar 3 soluciones para este problema (y seguiría este orden de preferencia):
1- Si está en Java 8, la mejor manera de lograr esto es probablemente el nuevo método computeIfAbsent
de ConcurrentMap
. Solo necesita darle una función de cálculo que se ejecutará de forma síncrona (al menos para la implementación de ConcurrentHashMap
). Ejemplo:
private final ConcurrentMap<String, List<String>> entries =
new ConcurrentHashMap<String, List<String>>();
public void method1(String key, String value) {
entries.computeIfAbsent(key, s -> new ArrayList<String>())
.add(value);
}
Esto es del javadoc de ConcurrentHashMap.computeIfAbsent
:
Si la clave especificada ya no está asociada con un valor, intenta calcular su valor utilizando la función de mapeo dada y lo ingresa en este mapa a menos que sea nulo. La invocación completa del método se realiza atómicamente, por lo que la función se aplica como máximo una vez por clave. Algunas operaciones de actualización intentadas en este mapa por otros hilos pueden ser bloqueadas mientras el cálculo está en progreso, por lo que el cálculo debe ser breve y simple, y no debe intentar actualizar ninguna otra asignación de este mapa.
2- Si no puede usar Java 8, puede usar LoadingCache
Guava
, que es seguro para subprocesos. Usted define una función de carga (al igual que la función de compute
anterior), y puede estar seguro de que se llamará sincrónicamente. Ejemplo:
private final LoadingCache<String, List<String>> entries = CacheBuilder.newBuilder()
.build(new CacheLoader<String, List<String>>() {
@Override
public List<String> load(String s) throws Exception {
return new ArrayList<String>();
}
});
public void method2(String key, String value) {
entries.getUnchecked(key).add(value);
}
3- Si tampoco puede usar Guava, siempre puede sincronizar manualmente y hacer un doble bloqueo comprobado. Ejemplo:
private final ConcurrentMap<String, List<String>> entries =
new ConcurrentHashMap<String, List<String>>();
public void method3(String key, String value) {
List<String> existing = entries.get(key);
if (existing != null) {
existing.add(value);
} else {
synchronized (entries) {
List<String> existingSynchronized = entries.get(key);
if (existingSynchronized != null) {
existingSynchronized.add(value);
} else {
List<String> newList = new ArrayList<>();
newList.add(value);
entries.put(key, newList);
}
}
}
}
Hice una implementación de ejemplo de todos los 3 métodos y, además, el método no sincronizado, que causa la creación de objetos adicionales: http://pastebin.com/qZ4DUjTr
Java 8 introdujo una API para atender este problema exacto, haciendo una solución de 1 línea:
public void record(String key, String value) {
entries.computeIfAbsent(key, k -> Collections.synchronizedList(new ArrayList<String>())).add(value);
}
Para Java 7:
public void record(String key, String value) {
List<String> values = entries.get(key);
if (values == null) {
entries.putIfAbsent(key, Collections.synchronizedList(new ArrayList<String>()));
// At this point, there will definitely be a list for the key.
// We don''t know or care which thread''s new object is in there, so:
values = entries.get(key);
}
values.add(value);
}
Este es el patrón de código estándar cuando se completa un ConcurrentHashMap
.
El método especial putIfAbsent(K, V))
colocará su objeto de valor, o si otro hilo tiene delante de usted, entonces ignorará su objeto de valor. De cualquier manera, después de la llamada a putIfAbsent(K, V))
, se garantiza que get(key)
es coherente entre hilos y, por lo tanto, el código anterior es threadsafe.
La única sobrecarga desperdiciada es si algún otro subproceso agrega una nueva entrada al mismo tiempo para la misma clave: puede terminar tirando el valor recién creado, pero eso solo ocurre si no hay una entrada y hay una carrera que su el hilo pierde, lo que normalmente sería raro.
Perdida de memoria (también GC, etc.) que el problema de creación de la lista Empty Array se maneja con Java 1.7.40. No te preocupes por crear una lista de arrays vacía. Referencia: http://javarevisited.blogspot.com.tr/2014/07/java-optimization-empty-arraylist-and-Hashmap-cost-less-memory-jdk-17040-update.html