thread sincronizacion resueltos juegos hilos fuente example ejemplos ejemplo con codigo java thread-safety circular-buffer

resueltos - sincronizacion de hilos en java



BĂșfer circular seguro para subprocesos en Java (7)

Aquí hay una implementación de buffer de anillo sin bloqueo. Implementa un búfer de tamaño fijo, no hay funcionalidad FIFO. Le sugiero que almacene una Collection de solicitudes para cada servidor en su lugar. De esa manera, su informe puede hacer el filtrado en lugar de hacer que su estructura de datos se filtre.

/** * Container * --------- * * A lock-free container that offers a close-to O(1) add/remove performance. * */ public class Container<T> implements Iterable<T> { // The capacity of the container. final int capacity; // The list. AtomicReference<Node<T>> head = new AtomicReference<Node<T>>(); // TESTING { AtomicLong totalAdded = new AtomicLong(0); AtomicLong totalFreed = new AtomicLong(0); AtomicLong totalSkipped = new AtomicLong(0); private void resetStats() { totalAdded.set(0); totalFreed.set(0); totalSkipped.set(0); } // TESTING } // Constructor public Container(int capacity) { this.capacity = capacity; // Construct the list. Node<T> h = new Node<T>(); Node<T> it = h; // One created, now add (capacity - 1) more for (int i = 0; i < capacity - 1; i++) { // Add it. it.next = new Node<T>(); // Step on to it. it = it.next; } // Make it a ring. it.next = h; // Install it. head.set(h); } // Empty ... NOT thread safe. public void clear() { Node<T> it = head.get(); for (int i = 0; i < capacity; i++) { // Trash the element it.element = null; // Mark it free. it.free.set(true); it = it.next; } // Clear stats. resetStats(); } // Add a new one. public Node<T> add(T element) { // Get a free node and attach the element. totalAdded.incrementAndGet(); return getFree().attach(element); } // Find the next free element and mark it not free. private Node<T> getFree() { Node<T> freeNode = head.get(); int skipped = 0; // Stop when we hit the end of the list // ... or we successfully transit a node from free to not-free. while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) { skipped += 1; freeNode = freeNode.next; } // Keep count of skipped. totalSkipped.addAndGet(skipped); if (skipped < capacity) { // Put the head as next. // Doesn''t matter if it fails. That would just mean someone else was doing the same. head.set(freeNode.next); } else { // We hit the end! No more free nodes. throw new IllegalStateException("Capacity exhausted."); } return freeNode; } // Mark it free. public void remove(Node<T> it, T element) { totalFreed.incrementAndGet(); // Remove the element first. it.detach(element); // Mark it as free. if (!it.free.compareAndSet(false, true)) { throw new IllegalStateException("Freeing a freed node."); } } // The Node class. It is static so needs the <T> repeated. public static class Node<T> { // The element in the node. private T element; // Are we free? private AtomicBoolean free = new AtomicBoolean(true); // The next reference in whatever list I am in. private Node<T> next; // Construct a node of the list private Node() { // Start empty. element = null; } // Attach the element. public Node<T> attach(T element) { // Sanity check. if (this.element == null) { this.element = element; } else { throw new IllegalArgumentException("There is already an element attached."); } // Useful for chaining. return this; } // Detach the element. public Node<T> detach(T element) { // Sanity check. if (this.element == element) { this.element = null; } else { throw new IllegalArgumentException("Removal of wrong element."); } // Useful for chaining. return this; } public T get () { return element; } @Override public String toString() { return element != null ? element.toString() : "null"; } } // Provides an iterator across all items in the container. public Iterator<T> iterator() { return new UsedNodesIterator<T>(this); } // Iterates across used nodes. private static class UsedNodesIterator<T> implements Iterator<T> { // Where next to look for the next used node. Node<T> it; int limit = 0; T next = null; public UsedNodesIterator(Container<T> c) { // Snapshot the head node at this time. it = c.head.get(); limit = c.capacity; } public boolean hasNext() { // Made into a `while` loop to fix issue reported by @Nim in code review while (next == null && limit > 0) { // Scan to the next non-free node. while (limit > 0 && it.free.get() == true) { it = it.next; // Step down 1. limit -= 1; } if (limit != 0) { next = it.element; } } return next != null; } public T next() { T n = null; if ( hasNext () ) { // Give it to them. n = next; next = null; // Step forward. it = it.next; limit -= 1; } else { // Not there!! throw new NoSuchElementException (); } return n; } public void remove() { throw new UnsupportedOperationException("Not supported."); } } @Override public String toString() { StringBuilder s = new StringBuilder(); Separator comma = new Separator(","); // Keep counts too. int usedCount = 0; int freeCount = 0; // I will iterate the list myself as I want to count free nodes too. Node<T> it = head.get(); int count = 0; s.append("["); // Scan to the end. while (count < capacity) { // Is it in-use? if (it.free.get() == false) { // Grab its element. T e = it.element; // Is it null? if (e != null) { // Good element. s.append(comma.sep()).append(e.toString()); // Count them. usedCount += 1; } else { // Probably became free while I was traversing. // Because the element is detached before the entry is marked free. freeCount += 1; } } else { // Free one. freeCount += 1; } // Next it = it.next; count += 1; } // Decorate with counts "]used+free". s.append("]").append(usedCount).append("+").append(freeCount); if (usedCount + freeCount != capacity) { // Perhaps something was added/freed while we were iterating. s.append("?"); } return s.toString(); } }

Tenga en cuenta que esto está cerca de O1 put y get. Un Separator simplemente emite "" la primera vez y luego su parámetro a partir de entonces.

Edición: Métodos de prueba añadidos.

// ***** Following only needed for testing. ***** private static boolean Debug = false; private final static String logName = "Container.log"; private final static NamedFileOutput log = new NamedFileOutput("C://Junk//"); private static synchronized void log(boolean toStdoutToo, String s) { if (Debug) { if (toStdoutToo) { System.out.println(s); } log(s); } } private static synchronized void log(String s) { if (Debug) { try { log.writeLn(logName, s); } catch (IOException ex) { ex.printStackTrace(); } } } static volatile boolean testing = true; // Tester object to exercise the container. static class Tester<T> implements Runnable { // My name. T me; // The container I am testing. Container<T> c; public Tester(Container<T> container, T name) { c = container; me = name; } private void pause() { try { Thread.sleep(0); } catch (InterruptedException ex) { testing = false; } } public void run() { // Spin on add/remove until stopped. while (testing) { // Add it. Node<T> n = c.add(me); log("Added " + me + ": " + c.toString()); pause(); // Remove it. c.remove(n, me); log("Removed " + me + ": " + c.toString()); pause(); } } } static final String[] strings = { "One", "Two", "Three", "Four", "Five", "Six", "Seven", "Eight", "Nine", "Ten" }; static final int TEST_THREADS = Math.min(10, strings.length); public static void main(String[] args) throws InterruptedException { Debug = true; log.delete(logName); Container<String> c = new Container<String>(10); // Simple add/remove log(true, "Simple test"); Node<String> it = c.add(strings[0]); log("Added " + c.toString()); c.remove(it, strings[0]); log("Removed " + c.toString()); // Capacity test. log(true, "Capacity test"); ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length); // Fill it. for (int i = 0; i < strings.length; i++) { nodes.add(i, c.add(strings[i])); log("Added " + strings[i] + " " + c.toString()); } // Add one more. try { c.add("Wafer thin mint!"); } catch (IllegalStateException ise) { log("Full!"); } c.clear(); log("Empty: " + c.toString()); // Iterate test. log(true, "Iterator test"); for (int i = 0; i < strings.length; i++) { nodes.add(i, c.add(strings[i])); } StringBuilder all = new StringBuilder (); Separator sep = new Separator(","); for (String s : c) { all.append(sep.sep()).append(s); } log("All: "+all); for (int i = 0; i < strings.length; i++) { c.remove(nodes.get(i), strings[i]); } sep.reset(); all.setLength(0); for (String s : c) { all.append(sep.sep()).append(s); } log("None: " + all.toString()); // Multiple add/remove log(true, "Multi test"); for (int i = 0; i < strings.length; i++) { nodes.add(i, c.add(strings[i])); log("Added " + strings[i] + " " + c.toString()); } log("Filled " + c.toString()); for (int i = 0; i < strings.length - 1; i++) { c.remove(nodes.get(i), strings[i]); log("Removed " + strings[i] + " " + c.toString()); } c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]); log("Empty " + c.toString()); // Multi-threaded add/remove log(true, "Threads test"); c.clear(); for (int i = 0; i < TEST_THREADS; i++) { Thread t = new Thread(new Tester<String>(c, strings[i])); t.setName("Tester " + strings[i]); log("Starting " + t.getName()); t.start(); } // Wait for 10 seconds. long stop = System.currentTimeMillis() + 10 * 1000; while (System.currentTimeMillis() < stop) { Thread.sleep(100); } // Stop the testers. testing = false; // Wait some more. Thread.sleep(1 * 100); // Get stats. double added = c.totalAdded.doubleValue(); double skipped = c.totalSkipped.doubleValue(); //double freed = c.freed.doubleValue(); log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")"); }

Considere algunas instancias de servidor web que se ejecutan en paralelo. Cada servidor contiene una referencia a un único "Guardián de estado" compartido, cuya función es mantener las últimas N solicitudes de todos los servidores.

Por ejemplo ( N=3 ):

Server a: "Request id = ABCD" Status keeper=["ABCD"] Server b: "Request id = XYZZ" Status keeper=["ABCD", "XYZZ"] Server c: "Request id = 1234" Status keeper=["ABCD", "XYZZ", "1234"] Server b: "Request id = FOO" Status keeper=["XYZZ", "1234", "FOO"] Server a: "Request id = BAR" Status keeper=["1234", "FOO", "BAR"]

En cualquier momento, se puede llamar al "Conservador de estado" desde una aplicación de monitoreo que lee estas últimas N solicitudes de un informe de SLA.

¿Cuál es la mejor manera de implementar este escenario productor-consumidor en Java, dando a los servidores web una mayor prioridad que el informe de SLA?

CircularFifoBuffer parece ser la estructura de datos adecuada para mantener las solicitudes, pero no estoy seguro de cuál es la manera óptima de implementar una concurrencia eficiente.


Hazelcast''s Queue ofrece casi todo lo que pides, pero no admite circularidad. Pero por su descripción no estoy seguro de si realmente lo necesita.


Me gustaría ver ArrayDeque o, para una implementación más concurrente, echar un vistazo a la biblioteca Disruptor , que es uno de los búferes de anillo más sofisticados / complejos de Java.

Una alternativa es usar una cola ilimitada que sea más concurrente ya que el productor nunca necesita esperar al consumidor. Crónica de java

A menos que sus necesidades justifiquen la complejidad, una ArrayDeque puede ser todo lo que necesita.


Si fuera yo, usaría CircularFIFOBuffer como usted indicó, y sincronizaría alrededor del búfer al escribir (agregar). Cuando la aplicación de monitoreo desea leer el búfer, sincronícelo en el búfer, y luego cópielo o clónelo para usarlo en los informes.

Esta sugerencia se basa en el supuesto de que la latencia es mínima para copiar / clonar el búfer en un nuevo objeto. Si hay una gran cantidad de elementos y el tiempo de copia es lento, no es una buena idea.

Ejemplo de pseudocódigo:

public void writeRequest(String requestID) { synchronized(buffer) { buffer.add(requestID); } } public Collection<String> getRequests() { synchronized(buffer) { return buffer.clone(); } }


Tal vez usted quiera mirar Disruptor .

  • Encuentre un artículo que describa las alternativas, el diseño y también una comparación de rendimiento para java.util.concurrent.ArrayBlockingQueue aquí: pdf
  • Considera leer los primeros tres artículos de BlogsAndArticles

Si la biblioteca es demasiado, apégate a java.util.concurrent.ArrayBlockingQueue



Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());