java concurrency java-8 out-of-memory concurrenthashmap

java - Controlador de bloqueo para claves arbitrarias



concurrency java-8 (10)

Aquí hay una versión corta y dulce que aprovecha la versión weak de la clase Interner de Guava para hacer el gran trabajo de crear un objeto "canónico" para que cada clave se use como la cerradura, e implementar una semántica de referencia débil para que las entradas no utilizadas se limpien .

public class InternerHandler { private final Interner = Interners.newWeakInterner(); public void handle(Key key) throws InterruptedException { Key canonKey = Interner.intern(key); synchronized (canonKey) { someWorkExecutor.process(key); } } }

Básicamente, pedimos una canonKey canónica que es equal() a key , y luego canonKey esta canonKey . Todos estarán de acuerdo con la clave canónica y, por lo tanto, todas las personas que llaman que pasan claves iguales estarán de acuerdo con el objeto sobre el cual bloquear.

La naturaleza débil de Interner significa que cada vez que no se usa la clave canónica, se puede eliminar la entrada, para evitar la acumulación de entradas en el interner. Más tarde, si vuelve a aparecer una clave igual, se elige una nueva entrada canónica.

El código simple anterior se basa en el monitor incorporado para synchronize , pero si esto no funciona para usted (por ejemplo, ya se usa para otro propósito), puede incluir un objeto de bloqueo en la clase Key o crear un objeto de titular.

Tengo un código que implementa un "controlador de bloqueo" para claves arbitrarias. Dada una key , garantiza que solo un hilo a la vez pueda process esa clave (o igual a) (lo que aquí significa llamar a la llamada a externalSystem.process(key) ).

Hasta ahora, tengo un código como este:

public class MyHandler { private final SomeWorkExecutor someWorkExecutor; private final ConcurrentHashMap<Key, Lock> lockMap = new ConcurrentHashMap<>(); public void handle(Key key) { // This can lead to OOM as it creates locks without removing them Lock keyLock = lockMap.computeIfAbsent( key, (k) -> new ReentrantLock() ); keyLock.lock(); try { someWorkExecutor.process(key); } finally { keyLock.unlock(); } } }

Entiendo que este código puede llevar a OutOfMemoryError porque no hay un mapa claro.

Pienso en cómo hacer un mapa que acumule una cantidad limitada de elementos. Cuando se exceda el límite, deberíamos reemplazar el elemento de acceso más antiguo por nuevo (este código debe sincronizarse con el elemento más antiguo como monitor). Pero no sé cómo tener devolución de llamada que me dirá que el límite superó.

Por favor comparte tus pensamientos.

PD

Volví a leer la tarea y ahora veo que tengo una limitación de que el método de handle no puede invocarse más de 8 subprocesos. No sé cómo puede ayudarme, pero acabo de mencionarlo.

PS2

Por @Boris, la araña fue sugerida solución agradable y simple:

} finally { lockMap.remove(key); keyLock.unlock(); }

Pero después de que Boris se dio cuenta de que el código no está seguro para subprocesos porque rompe el comportamiento:
Permite investigar 3 hilos invocados con igual clave:

  1. El hilo # 1 adquiere el bloqueo y ahora antes de map.remove(key);
  2. El subproceso # 2 invoca con la tecla igual para que espere cuando el subproceso # 1 libera el bloqueo.
  3. luego el hilo # 1 ejecuta map.remove(key); . Después de este hilo # 3 invoca el handle método. Comprueba que el bloqueo para esta clave está ausente en el mapa, por lo que crea un nuevo bloqueo y lo adquiere.
  4. El subproceso # 1 libera el bloqueo y, por lo tanto, el subproceso # 2 lo adquiere.
    Por lo tanto, el hilo # 2 y el hilo # 3 se pueden invocar en paralelo para claves iguales. Pero no se debe permitir.

Para evitar esta situación, antes de borrar el mapa, debemos bloquear cualquier subproceso para adquirir el bloqueo, mientras que todos los subprocesos de waitset no son adquiridos y liberar el bloqueo. Parece que se necesita suficiente sincronización complicada y conducirá a un funcionamiento lento del algoritmo. Tal vez deberíamos borrar el mapa de vez en cuando cuando el tamaño del mapa excede un valor limitado.

Perdí mucho tiempo, pero desafortunadamente no tengo ideas de cómo lograrlo.


Crear y eliminar el objeto de bloqueo de una key cada vez es una operación costosa en términos de rendimiento. Cuando agrega / elimina bloqueos del mapa concurrente (por ejemplo, caché), debe asegurarse de que colocar / eliminar objetos del caché sea seguro para subprocesos. Así que esto no parece una buena idea, pero se puede implementar a través de ConcurrentHashMap

El enfoque de bloqueo de franjas (también utilizado por el mapa de hash simultáneo internamente) es un mejor enfoque. De los documentos de Google Guava se explica como

Cuando desea asociar un bloqueo con un objeto, la garantía clave que necesita es que si key1.equals (key2), el bloqueo asociado con key1 es el mismo que el bloqueo asociado con key2.

La forma más burda de hacer esto es asociar cada clave con la misma cerradura, lo que resulta en la sincronización más gruesa posible. Por otro lado, puede asociar cada clave distinta con un bloqueo diferente, pero esto requiere un consumo de memoria lineal y administración de concurrencia para el sistema de bloqueos en sí, a medida que se descubren nuevas claves.

Rayado permite al programador seleccionar una cantidad de bloqueos, que se distribuyen entre las teclas según su código hash. Esto le permite al programador seleccionar dinámicamente un compromiso entre la concurrencia y el consumo de memoria, mientras mantiene la clave invariante que si es clave1.equals (clave2), luego striped.get (key1) == striped.get (key2)

código:

//declare globally; e.g. class field level Striped<Lock> rwLockStripes = Striped.lock(16); Lock lock = rwLockStripes.get("key"); lock.lock(); try { // do you work here } finally { lock.unlock(); }

Seguir el código cortado puede ayudar a implementar la eliminación / eliminación de bloqueo.

private ConcurrentHashMap<String, ReentrantLock> caches = new ConcurrentHashMap<>(); public void processWithLock(String key) { ReentrantLock lock = findAndGetLock(key); lock.lock(); try { // do you work here } finally { unlockAndClear(key, lock); } } private void unlockAndClear(String key, ReentrantLock lock) { // *** Step 1: Release the lock. lock.unlock(); // *** Step 2: Attempt to remove the lock // This is done by calling compute method, if given lock is present in // cache. if current lock object in cache is same instance as ''lock'' // then remove it from cache. If not, some other thread is succeeded in // putting new lock object and hence we can leave the removal of lock object to that // thread. caches.computeIfPresent(key, (k, current) -> lock == current ? null : current); } private ReentrantLock findAndGetLock(String key) { // Merge method given us the access to the previously( if available) and // newer lock object together. return caches.merge(key, new ReentrantLock(), (older, newer) -> nonNull(older) ? older : newer); }


En lugar de escribir lo que es suyo, puede intentar algo como JKeyLockManager . De la descripción de los proyectos:

JKeyLockManager proporciona un bloqueo preciso con claves específicas de la aplicación.

Código de ejemplo dado en el sitio:

public class WeatherServiceProxy { private final KeyLockManager lockManager = KeyLockManagers.newManager(); public void updateWeatherData(String cityName, float temperature) { lockManager.executeLocked(cityName, () -> delegate.updateWeatherData(cityName, temperature)); }


Gracias Ben Mane
He encontrado esta variante.

public class MyHandler { private final int THREAD_COUNT = 8; private final int K = 100; private final Striped<Lock> striped = Striped.lazyWeakLock(THREAD_COUNT * K); private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor(); public void handle(Key key) throws InterruptedException { Lock keyLock = striped.get(key); keyLock.lock(); try { someWorkExecutor.process(key); } finally { keyLock.unlock(); } } }


No es necesario que intente limitar el tamaño a un valor arbitrario, ya que resulta que puede realizar este tipo de lenguaje de "manejador de bloqueos" mientras solo almacena exactamente el número de claves actualmente bloqueadas en el mapa.

La idea es usar una convención simple: agregar con éxito la asignación al mapa cuenta como la operación de "bloqueo", y eliminarlo cuenta como la operación de "desbloqueo". Esto evita perfectamente el problema de eliminar una asignación, mientras que un subproceso todavía lo tiene bloqueado y otras condiciones de carrera.

En este punto, el value de la asignación solo se utiliza para bloquear otros subprocesos que llegan con la misma clave y deben esperar hasta que se elimine la asignación.

Aquí hay un ejemplo 1 con CountDownLatch lugar de Lock como el valor del mapa:

public void handle(Key key) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); // try to acquire the lock by inserting our latch as a // mapping for key while(true) { CountDownLatch existing = lockMap.putIfAbsent(key, latch); if (existing != null) { // there is an existing key, wait on it existing.await(); } else { break; } } try { externalSystem.process(key); } finally { lockMap.remove(key); latch.countDown(); } }

Aquí, la vida útil de la asignación es solo mientras se mantenga el bloqueo. El mapa nunca tendrá más entradas que solicitudes simultáneas para claves diferentes.

La diferencia con su enfoque es que las asignaciones no se "reutilizan": cada llamada de handle creará un nuevo enclavamiento y asignación. Dado que ya está realizando operaciones atómicas costosas, no es probable que esto sea una desaceleración en la práctica. Otro inconveniente es que con muchos hilos en espera, todos se activan cuando el pestillo cuenta hacia atrás, pero solo uno logrará colocar un nuevo mapeo y, por lo tanto, adquirir el bloqueo; el resto volverá a dormir en el nuevo bloqueo.

Podría crear otra versión de esto que reutilice las asignaciones cuando se presenten subprocesos y esperar una asignación existente. Básicamente, el hilo de desbloqueo simplemente hace un "traspaso" a uno de los hilos en espera. Solo se utilizará una asignación para un conjunto completo de hilos que esperan en la misma clave: se entrega a cada uno en secuencia. El tamaño aún está limitado porque una vez que no hay más subprocesos esperando en una asignación determinada, aún se eliminan.

Para implementar eso, reemplaza CountDownLatch con un valor de mapa que puede contar el número de subprocesos en espera. Cuando un hilo realiza el desbloqueo, primero verifica si hay algún hilo en espera y, si es así, lo despierta para hacer el traspaso. Si no hay hilos en espera, "destruye" el objeto (es decir, establece un indicador de que el objeto ya no está en el mapeo) y lo elimina del mapa.

Debe realizar las manipulaciones anteriores bajo un bloqueo adecuado, y hay algunos detalles difíciles. En la práctica, encuentro que el ejemplo corto y dulce de arriba funciona muy bien.

1 Escrito sobre la marcha, no compilado ni probado, pero la idea funciona.


Puede confiar en el compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) del método compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) para sincronizar las llamadas a su process método para una tecla determinada, ni siquiera necesita usar Lock como Escriba los valores de su mapa, ya que no confía más en él.

La idea es confiar en el mecanismo de bloqueo interno de su ConcurrentHashMap para ejecutar su método, esto permitirá que los subprocesos ejecuten en paralelo el método de process para claves cuyos hashes correspondientes no son parte de la misma bandeja. Esto equivale al enfoque basado en bloqueos rayados, excepto que no necesita una biblioteca de terceros adicional.

El enfoque de los bloqueos rayados es interesante porque es muy ligero en términos de huella de memoria, ya que solo necesita una cantidad limitada de bloqueos para hacerlo, por lo que la huella de memoria necesaria para sus bloqueos es conocida y nunca cambia, lo que no es el caso de enfoques que utilizan un bloqueo para cada clave (como en su pregunta), de modo que generalmente es mejor / se recomienda utilizar enfoques basados ​​en bloqueos con franjas para tal necesidad.

Entonces tu código podría ser algo como esto:

// This will create a ConcurrentHashMap with an initial table size of 16 // bins by default, you may provide an initialCapacity and loadFactor // if too much or not enough to get the expected table size in order // increase or reduce the concurrency level of your map // NB: We don''t care much of the type of the value so I arbitrarily // used Void but it could be any type like simply Object private final ConcurrentMap<Key, Void> lockMap = new ConcurrentHashMap<>(); public void handle(Key lockKey) { // Execute the method process through the remapping Function lockMap.compute( lockKey, (key, value) -> { // Execute the process method under the protection of the // lock of the bin of hashes corresponding to the key someWorkExecutor.process(key); // Returns null to keep the Map empty return null; } ); }

NB 1: Como siempre devolvemos null el mapa siempre estará vacío, por lo que nunca se quedará sin memoria debido a este mapa.

NB 2: Como nunca afectamos un valor a una clave dada, tenga en cuenta que también podría hacerse usando el método computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction) :

public void handle(Key lockKey) { // Execute the method process through the remapping Function lockMap.computeIfAbsent( lockKey, key -> { // Execute the process method under the protection of the // lock of the segment of hashes corresponding to the key someWorkExecutor.process(key); // Returns null to keep the Map empty return null; } ); }

NB 3: asegúrese de que el process su método nunca llame al handle método para las claves, ya que terminaría con infinitos bucles (la misma clave) o puntos muertos (otras claves no ordenadas, por ejemplo: si un hilo llama al handle(key1) y luego process internamente las llamadas de handle(key2) y otro subproceso llama en handle(key2) paralelo handle(key2) y luego process internamente las llamadas de handle(key1) , obtendrá un interbloqueo independientemente del enfoque utilizado). Este comportamiento no es específico de este enfoque, ocurrirá con cualquier enfoque.


Puede usar un caché en proceso que almacene referencias de objetos, como cafeína, guayaba, EHCache o cache2k. Aquí hay un ejemplo de cómo construir un caché con cache2k :

final Cache<Key, Lock> locks = new Cache2kBuilder<Key, Lock>(){} .loader( new CacheLoader<Key, Lock>() { @Override public Lock load(Key o) { return new ReentrantLock(); } } ) .storeByReference(true) .entryCapacity(1000) .build();

El patrón de uso es el que tiene en la pregunta:

Lock keyLock = locks.get(key); keyLock.lock(); try { externalSystem.process(key); } finally { keyLock.unlock(); }

Como el caché está limitado a 1000 entradas, se realiza una limpieza automática de los bloqueos que ya no están en uso.

Existe la posibilidad de que un bloqueo en uso sea desalojado por la memoria caché, si la capacidad y la cantidad de subprocesos en la aplicación no coinciden. Esta solución funciona perfectamente durante años en nuestras aplicaciones. La memoria caché desaloja un bloqueo que está en uso, cuando hay una tarea de ejecución lo suficientemente larga Y se excede la capacidad. En una aplicación real, siempre controla el número de subprocesos de vida, por ejemplo, en un contenedor web limitaría el número de subprocesos de procesamiento a (ejemplo) 100. Así que sabe que nunca hay más de 100 bloqueos en uso. Si esto se tiene en cuenta, esta solución tiene una sobrecarga mínima.

Tenga en cuenta que el bloqueo solo funciona siempre que su aplicación se ejecute en una sola máquina virtual. Es posible que desee echar un vistazo a los gestores de bloqueo distribuidos (DLM). Ejemplos de productos que proporcionan bloqueos distribuidos: hazelcast, infinispan, teracotta, redis / redisson.


Se agregarán nuevos valores cuando llame

lockMap.computeIfAbsent()

Por lo tanto, solo puede verificar lockMap.size () para el recuento de elementos.

Pero, ¿cómo vas a encontrar el primer artículo añadido? sería mejor simplemente eliminar los elementos después de usarlos.


Un enfoque es prescindir completamente del mapa hash concurrente, y simplemente usar un HashMap regular con bloqueo para realizar la manipulación requerida del mapa y el estado de bloqueo de forma atómica.

A primera vista, esto parece reducir la concurrencia del sistema, pero si asumimos que la llamada al process(key) es relativamente larga en comparación con las manipulaciones de bloqueo muy rápidas, funciona bien porque las llamadas al process() aún se ejecutan al mismo tiempo. Solo una cantidad pequeña y fija de trabajo ocurre en la sección crítica exclusiva.

Aquí hay un bosquejo:

public class MyHandler { private static class LockHolder { ReentrantLock lock = new ReentrantLock(); int refcount = 0; void lock(){ lock.lock(); } } private final SomeWorkExecutor someWorkExecutor; private final Lock mapLock = new ReentrantLock(); private final HashMap<Key, LockHolder> lockMap = new HashMap<>(); public void handle(Key key) { // lock the map mapLock.lock(); LockHolder holder = lockMap.computeIfAbsent(key, k -> new LockHolder()); // the lock in holder is either unlocked (newly created by us), or an existing lock, let''s increment refcount holder.refcount++; mapLock.unlock(); holder.lock(); try { someWorkExecutor.process(key); } finally { mapLock.lock() keyLock.unlock(); if (--holder.refcount == 0) { // no more users, remove lock holder map.remove(key); } mapLock.unlock(); } } }

Usamos refcount , que solo se manipula bajo el mapLock compartido para realizar un seguimiento de cuántos usuarios del bloqueo hay. Cuando el refcount es cero, podemos deshacernos de la entrada cuando salgamos del controlador. Este enfoque es bueno porque es bastante fácil de razonar y funcionará bien si la llamada process() es relativamente costosa en comparación con la sobrecarga de bloqueo. Dado que la manipulación del mapa ocurre bajo un bloqueo compartido, también es sencillo agregar lógica adicional, por ejemplo, mantener algunos objetos Holder en el mapa, realizar un seguimiento de las estadísticas, etc.


class MyHandler { private final Map<Key, Lock> lockMap = Collections.synchronizedMap(new WeakHashMap<>()); private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor(); public void handle(Key key) throws InterruptedException { Lock keyLock = lockMap.computeIfAbsent(key, (k) -> new ReentrantLock()); keyLock.lock(); try { someWorkExecutor.process(key); } finally { keyLock.unlock(); } } }