poll offer method library example concurrentlinkedqueue concurrent java collections concurrency queue set

java - offer - Concurrent Set Queue



queue java (7)

¿Qué quiere decir con una cola concurrente con Set semántica? Si te refieres a una estructura verdaderamente concurrente (en oposición a una estructura segura para subprocesos) entonces yo diría que estás pidiendo un poni.

¿Qué ocurre, por ejemplo, si llamas a put(element) y detectan que algo ya está allí y que se elimina inmediatamente? Por ejemplo, ¿qué significa en tu caso si offer(element) || queue.contains(element) offer(element) || queue.contains(element) devuelve false ?

Este tipo de cosas a menudo deben pensarse de forma ligeramente diferente en un mundo concurrente ya que a menudo nada es lo que parece a menos que detenga el mundo (lo bloquea). De lo contrario, generalmente estás mirando algo en el pasado. Entonces, ¿qué estás tratando de hacer?

Tal vez esta es una pregunta tonta, pero parece que no puedo encontrar una respuesta obvia.

Necesito una cola FIFO simultánea que contenga solo valores únicos. Intentar agregar un valor que ya existe en la cola simplemente ignora ese valor. Lo cual, si no fuera por la seguridad del hilo, sería trivial. ¿Hay una estructura de datos en Java o tal vez un snipit de código en las interwebs que exhibe este comportamiento?


No hay una colección integrada que haga esto. Existen algunas implementaciones de Set concurrentes que se podrían usar junto con una Queue simultánea.

Por ejemplo, un elemento se agrega a la cola solo después de que se haya agregado correctamente al conjunto, y cada elemento eliminado de la cola se elimina del conjunto. En este caso, el contenido de la cola, lógicamente, es realmente lo que haya en el conjunto, y la cola solo se utiliza para rastrear el pedido y proporcionar operaciones eficientes take() y poll() que solo se encuentran en un BlockingQueue .


Si desea una mejor concurrencia que la sincronización completa, hay una manera que conozco de hacerlo, utilizando un ConcurrentHashMap como mapa de respaldo. El siguiente es un boceto solamente.

public final class ConcurrentHashSet<E> extends ForwardingSet<E> implements Set<E>, Queue<E> { private enum Dummy { VALUE } private final ConcurrentMap<E, Dummy> map; ConcurrentHashSet(ConcurrentMap<E, Dummy> map) { super(map.keySet()); this.map = Preconditions.checkNotNull(map); } @Override public boolean add(E element) { return map.put(element, Dummy.VALUE) == null; } @Override public boolean addAll(Collection<? extends E> newElements) { // just the standard implementation boolean modified = false; for (E element : newElements) { modified |= add(element); } return modified; } @Override public boolean offer(E element) { return add(element); } @Override public E remove() { E polled = poll(); if (polled == null) { throw new NoSuchElementException(); } return polled; } @Override public E poll() { for (E element : this) { // Not convinced that removing via iterator is viable (check this?) if (map.remove(element) != null) { return element; } } return null; } @Override public E element() { return iterator().next(); } @Override public E peek() { Iterator<E> iterator = iterator(); return iterator.hasNext() ? iterator.next() : null; } }

No todo es sol con este enfoque. No tenemos una forma decente de seleccionar un elemento de encabezado que no sea el uso del entrySet().iterator().next() de entrada entrySet().iterator().next() , el resultado es que el mapa se desequilibra cada vez más a medida que pasa el tiempo. Este desequilibrio es un problema tanto debido a mayores colisiones de cubeta como a una mayor contención de segmentos.

Nota: este código usa guayaba en algunos lugares.


Tal vez extienda ArrayBlockingQueue . Para poder acceder al bloqueo (de acceso del paquete), tuve que poner mi subclase dentro del mismo paquete. Advertencia: no he probado esto.

package java.util.concurrent; import java.util.Collection; import java.util.concurrent.locks.ReentrantLock; public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> { public DeDupingBlockingQueue(int capacity) { super(capacity); } public DeDupingBlockingQueue(int capacity, boolean fair) { super(capacity, fair); } public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { super(capacity, fair, c); } @Override public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { if (contains(e)) return false; return super.add(e); } finally { lock.unlock(); } } @Override public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { if (contains(e)) return true; return super.offer(e); } finally { lock.unlock(); } } @Override public void put(E e) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //Should this be lock.lock() instead? try { if (contains(e)) return; super.put(e); //if it blocks, it does so without holding the lock. } finally { lock.unlock(); } } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { if (contains(e)) return true; return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock. } finally { lock.unlock(); } } }


Un java.util.concurrent.ConcurrentLinkedQueue te lleva la mayor parte del camino hasta allí.

Envuelva el ConcurrentLinkedQueue con su propia clase que verifica la exclusividad de un complemento. Tu código debe ser seguro para subprocesos.


Una respuesta simple para una cola de objetos únicos puede ser la siguiente:

import java.util.concurrent.ConcurrentLinkedQueue; public class FinalQueue { class Bin { private int a; private int b; public Bin(int a, int b) { this.a = a; this.b = b; } @Override public int hashCode() { return a * b; } public String toString() { return a + ":" + b; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Bin other = (Bin) obj; if ((a != other.a) || (b != other.b)) return false; return true; } } private ConcurrentLinkedQueue<Bin> queue; public FinalQueue() { queue = new ConcurrentLinkedQueue<Bin>(); } public synchronized void enqueue(Bin ipAddress) { if (!queue.contains(ipAddress)) queue.add(ipAddress); } public Bin dequeue() { return queue.poll(); } public String toString() { return "" + queue; } /** * @param args */ public static void main(String[] args) { FinalQueue queue = new FinalQueue(); Bin a = queue.new Bin(2,6); queue.enqueue(a); queue.enqueue(queue.new Bin(13, 3)); queue.enqueue(queue.new Bin(13, 3)); queue.enqueue(queue.new Bin(14, 3)); queue.enqueue(queue.new Bin(13, 9)); queue.enqueue(queue.new Bin(18, 3)); queue.enqueue(queue.new Bin(14, 7)); Bin x= queue.dequeue(); System.out.println(x.a); System.out.println(queue.toString()); System.out.println("Dequeue..." + queue.dequeue()); System.out.println("Dequeue..." + queue.dequeue()); System.out.println(queue.toString()); } }


Usaría un LinkedHashSet sincronizado hasta que hubiera suficiente justificación para considerar alternativas. El principal beneficio que una solución más concurrente podría ofrecer es la división de bloqueo.

El enfoque concurrente más simple sería aa ConcurrentHashMap (que actúa como un conjunto) y ConcurrentLinkedQueue. El orden de las operaciones proporcionaría la restricción deseada. Una oferta () primero realizaría un CHM # putIfAbsent () y si se inserta satisfactoriamente en el CLQ. Una encuesta () tomaría de la CLQ y luego la eliminaría del CHM. Esto significa que consideramos una entrada en nuestra cola si está en el mapa y la CLQ proporciona el orden. El rendimiento podría ajustarse aumentando el nivel de concurrencia del mapa. Si eres tolerante con la habilidad adicional, entonces un CHM # get () barato podría actuar como una precondición razonable (pero puede sufrir por ser una vista un poco obsoleta).