clojure - online - Productor consumidor con calificaciones.
clojure web framework (1)
Aquí está mi opinión sobre ello. Me aseguré de usar solo las estructuras de datos de Clojure para ver cómo funcionaría. Tenga en cuenta que habría sido perfectamente habitual e idiomático tomar una cola de bloqueo de la caja de herramientas de Java y usarla aquí; El código sería fácil de adaptar, creo. Actualización: en realidad lo java.util.concurrent.LinkedBlockingQueue
a java.util.concurrent.LinkedBlockingQueue
, ver más abajo.
clojure.lang.PersistentQueue
Llame (pro-con)
para iniciar una ejecución de prueba; luego observe el contenido de la output
para ver si sucedió algo y las queue-lengths
para ver si se mantuvieron dentro del límite dado.
Actualización: para explicar por qué sentí la necesidad de usar la ensure
continuación (se me preguntó acerca de esto en el IRC), esto es para evitar el sesgo de escritura (consulte el artículo de Wikipedia sobre aislamiento de instantáneas para una definición) Si sustituyera @queue
por (ensure queue)
, sería posible que dos o más productores verifiquen la longitud de la cola, descubran que es menos de 4, luego coloque elementos adicionales en la cola y posiblemente traiga la longitud total de la cola por encima de 4, rompiendo la restricción. De manera similar, dos consumidores que hacen @queue
podrían aceptar el mismo artículo para procesar, y luego sacar dos artículos de la cola. ensure
evita que cualquiera de estos escenarios suceda.
(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn queue-length-watch [_ _ _ new-queue-state]
(dosync (alter queue-lengths conj (count new-queue-state))))
(add-watch queue :queue-length-watch queue-length-watch)
(defn producer [tag]
(future
(while @go-on?
(if (dosync (let [l (count (ensure queue))]
(when (< l *max-queue-length*)
(alter queue conj tag)
true)))
(Thread/sleep (rand-int 2000))))))
(defn consumer []
(future
(while @go-on?
(Thread/sleep 100) ; don''t look at the queue too often
(when-let [item (dosync (let [item (first (ensure queue))]
(alter queue pop)
item))]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))
java.util.concurrent.LinkedBlockingQueue
Una versión de lo anterior escrita usando LinkedBlockingQueue
. Observe cómo el esquema general del código es básicamente el mismo, con algunos detalles en realidad ligeramente más claros. LBQ
queue-lengths
de esta versión, ya que LBQ
se encarga de esa restricción para nosotros.
(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn producer [tag]
(future
(while @go-on?
(.put queue tag)
(Thread/sleep (rand-int 2000)))))
(defn consumer []
(future
(while @go-on?
;; I''m using .poll on the next line so as not to block
;; indefinitely if we''re done; note that this has the
;; side effect that nulls = nils on the queue will not
;; be handled; there''s a number of other ways to go about
;; this if this is a problem, see docs on LinkedBlockingQueue
(when-let [item (.poll queue)]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))
Soy nuevo en clojure y estoy tratando de entender cómo usar correctamente sus funciones de concurrencia, por lo que cualquier crítica / sugerencia es apreciada. Así que estoy tratando de escribir un pequeño programa de prueba en clojure que funciona de la siguiente manera:
- Hay 5 productores y 2 consumidores.
- un productor espera un tiempo aleatorio y luego inserta un número en una cola compartida.
- un consumidor debe sacar un número de la cola tan pronto como la cola no esté vacía y luego suspender durante un breve periodo de tiempo para simular un trabajo
- Los consumidores deben bloquear cuando la cola está vacía.
- los productores deben bloquear cuando la cola tiene más de 4 elementos para evitar que crezca mucho
Aquí está mi plan para cada paso anterior:
- los productores y consumidores serán agentes que realmente no se preocupan por su estado (solo valores nulos o algo así); Yo solo uso los agentes para enviar una función de "consumidor" o "productor" para hacer en algún momento. Entonces la cola compartida será (defueue (ref [])). Tal vez esto debería ser un átomo sin embargo?
- en la función de agente "productor", simplemente (Hilo / reposo (rand-int 1000)) y luego (dosync (alter colas conj (rand-int 100))) para ingresar a la cola.
- Estoy pensando en hacer que los agentes del consumidor vigilen la cola para cambios con add-watcher. Sin embargo, no estoy seguro de esto ... despertará a los consumidores ante cualquier cambio, incluso si el cambio provino de un consumidor que está quitando algo (posiblemente haciéndolo vacío). Quizás comprobar esto en la función de observador es suficiente. Otro problema que veo es que si todos los consumidores están ocupados, ¿qué sucede cuando un productor agrega algo nuevo a la cola? ¿El evento observado se pone en cola en algún agente del consumidor o desaparece?
- véase más arriba
- Realmente no sé cómo hacer esto. Escuché que la secuela de clojure puede ser útil, pero no pude encontrar suficiente documento sobre cómo usarlo y mis pruebas iniciales no parecieron funcionar (lo siento, ya no tengo el código)