golang - libro de programacion en java netbeans pdf
Equivalente de canal de golang en Java (3)
Tengo un requisito en el que necesito leer de un conjunto de colas de bloqueo. Las colas de bloqueo son creadas por la Biblioteca que estoy usando. Mi código tiene que leer de las colas. No quiero crear un hilo lector para cada una de estas colas de bloqueo. En cambio, quiero sondearlos sobre la disponibilidad de los datos con un solo hilo (o probablemente con 2/3 hilos al máximo). Como es posible que algunas de las colas de bloqueo no tengan datos durante mucho tiempo, mientras que algunas de ellas pueden generar ráfagas de datos. Sondear las colas con un pequeño tiempo de espera funcionará, pero eso no es eficiente en absoluto, ya que todavía necesita seguir dando vueltas en todas las colas, incluso cuando algunas de ellas están sin datos durante mucho tiempo. Básicamente, estoy buscando un tipo de mecanismo select / epoll (usado en sockets) en las colas de bloqueo. Cualquier pista es realmente apreciada.
Sin embargo, hacerlo en Go es muy fácil. A continuación, el código simula lo mismo con canales y goroutines:
package main
import "fmt"
import "time"
import "math/rand"
func sendMessage(sc chan string) {
var i int
for {
i = rand.Intn(10)
for ; i >= 0 ; i-- {
sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
}
i = 1000 + rand.Intn(32000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func sendNum(c chan int) {
var i int
for {
i = rand.Intn(16);
for ; i >= 0; i-- {
time.Sleep(20 * time.Millisecond)
c <- rand.Intn(65534)
}
i = 1000 + rand.Intn(24000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func main() {
msgchan := make(chan string, 32)
numchan := make(chan int, 32)
i := 0
for ; i < 8 ; i++ {
go sendNum(numchan)
go sendMessage(msgchan)
}
for {
select {
case msg := <- msgchan:
fmt.Printf("Worked on %s/n", msg)
case x := <- numchan:
fmt.Printf("I got %d /n", x)
}
}
}
La única forma es reemplazar las colas estándar con objetos de una clase más funcional, que notifica al consumidor (s) cuando se inserta un dato en una cola vacía. Esta clase todavía puede implementar la interfaz BlockingQueue, por lo que el otro lado (productor) no ve ninguna diferencia. El truco es que la operación de put
también debe levantar una bandera y notificar al consumidor. El consumidor, después de sondear todos los hilos, borra el indicador y llama a Object.wait()
.
Otra opción está aquí para Java6 +
Una clase de implementación BlockingDeque:
import java.lang.ref.WeakReference;
import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
class GoChannelPool {
private final static GoChannelPool defaultInstance = newPool();
private final AtomicLong serialNumber = new AtomicLong();
private final WeakHashMap<Long, WeakReference<GoChannel>> channelWeakHashMap = new WeakHashMap<>();
private final LinkedBlockingDeque<GoChannelObject> totalQueue = new LinkedBlockingDeque<>();
public <T> GoChannel<T> newChannel() {
GoChannel<T> channel = new GoChannel<>();
channelWeakHashMap.put(channel.getId(), new WeakReference<GoChannel>(channel));
return channel;
}
public void select(GoSelectConsumer consumer) throws InterruptedException {
consumer.accept(getTotalQueue().take());
}
public int size() {
return getTotalQueue().size();
}
public int getChannelCount() {
return channelWeakHashMap.values().size();
}
private LinkedBlockingDeque<GoChannelObject> getTotalQueue() {
return totalQueue;
}
public static GoChannelPool getDefaultInstance() {
return defaultInstance;
}
public static GoChannelPool newPool() {
return new GoChannelPool();
}
private GoChannelPool() {}
private long getSerialNumber() {
return serialNumber.getAndIncrement();
}
private synchronized void syncTakeAndDispatchObject() throws InterruptedException {
select(new GoSelectConsumer() {
@Override
void accept(GoChannelObject t) {
WeakReference<GoChannel> goChannelWeakReference = channelWeakHashMap.get(t.channel_id);
GoChannel channel = goChannelWeakReference != null ? goChannelWeakReference.get() : null;
if (channel != null) {
channel.offerBuffer(t);
}
}
});
}
class GoChannel<E> {
// Instance
private final long id;
private final LinkedBlockingDeque<GoChannelObject<E>> buffer = new LinkedBlockingDeque<>();
public GoChannel() {
this(getSerialNumber());
}
private GoChannel(long id) {
this.id = id;
}
public long getId() {
return id;
}
public E take() throws InterruptedException {
GoChannelObject object;
while((object = pollBuffer()) == null) {
syncTakeAndDispatchObject();
}
return (E) object.data;
}
public void offer(E object) {
GoChannelObject<E> e = new GoChannelObject();
e.channel_id = getId();
e.data = object;
getTotalQueue().offer(e);
}
protected void offerBuffer(GoChannelObject<E> data) {
buffer.offer(data);
}
protected GoChannelObject<E> pollBuffer() {
return buffer.poll();
}
public int size() {
return buffer.size();
}
@Override
protected void finalize() throws Throwable {
super.finalize();
channelWeakHashMap.remove(getId());
}
}
class GoChannelObject<E> {
long channel_id;
E data;
boolean belongsTo(GoChannel channel) {
return channel != null && channel_id == channel.id;
}
}
abstract static class GoSelectConsumer{
abstract void accept(GoChannelObject t);
}
}
entonces podemos usarlo de esta manera:
GoChannelPool pool = GoChannelPool.getDefaultInstance();
final GoChannelPool.GoChannel<Integer> numberCh = pool.newChannel();
final GoChannelPool.GoChannel<String> stringCh = pool.newChannel();
final GoChannelPool.GoChannel<String> otherCh = pool.newChannel();
ExecutorService executorService = Executors.newCachedThreadPool();
int times;
times = 2000;
final CountDownLatch countDownLatch = new CountDownLatch(times * 2);
final AtomicInteger numTimes = new AtomicInteger();
final AtomicInteger strTimes = new AtomicInteger();
final AtomicInteger defaultTimes = new AtomicInteger();
final int finalTimes = times;
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < finalTimes; i++) {
numberCh.offer(i);
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < finalTimes; i++) {
stringCh.offer("s"+i+"e");
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
int otherTimes = 3;
for (int i = 0; i < otherTimes; i++) {
otherCh.offer("a"+i);
}
for (int i = 0; i < times*2 + otherTimes; i++) {
pool.select(new GoChannelPool.GoSelectConsumer() {
@Override
void accept(GoChannelPool.GoChannelObject t) {
// The data order should be randomized.
System.out.println(t.data);
countDownLatch.countDown();
if (t.belongsTo(stringCh)) {
strTimes.incrementAndGet();
return;
}
else if (t.belongsTo(numberCh)) {
numTimes.incrementAndGet();
return;
}
defaultTimes.incrementAndGet();
}
});
}
countDownLatch.await(10, TimeUnit.SECONDS);
/**
The console output of data should be randomized.
numTimes.get() should be 2000
strTimes.get() should be 2000
defaultTimes.get() should be 3
*/
y tenga en cuenta que la selección funciona solo si los canales pertenecen al mismo GoChannelPool, o simplemente usa el GoChannelPool predeterminado (sin embargo, el rendimiento sería menor si demasiados canales comparten el mismo GoChannelPool)
Te sugiero que estudies el uso de la biblioteca JCSP . El equivalente de la select
de Go se llama Alternativa . Solo necesitaría un hilo consumidor, que no necesitaría sondear los canales entrantes si los conecta con Alternative
. Por lo tanto, esta sería una forma eficiente de multiplexar los datos fuente.
Ayudará mucho si puede reemplazar las BlockingQueues con canales JCSP. Los canales se comportan básicamente de la misma manera, pero proporcionan un mayor grado de flexibilidad con respecto a la distribución en abanico o al fan-in de compartir los extremos del canal, y en particular, el uso de canales con Alternative
.
Para un ejemplo de uso, aquí hay un multiplexor justo. Este ejemplo demuestra un proceso que multiplexa bastante el tráfico de su conjunto de canales de entrada a su único canal de salida. No se morirá de hambre a ningún canal de entrada, independientemente del entusiasmo de sus competidores.
import org.jcsp.lang.*;
public class FairPlex implements CSProcess {
private final AltingChannelInput[] in;
private final ChannelOutput out;
public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
this.in = in;
this.out = out;
}
public void run () {
final Alternative alt = new Alternative (in);
while (true) {
final int index = alt.fairSelect ();
out.write (in[index].read ());
}
}
}
Tenga en cuenta que si se utilizara priSelect
anteriormente, los canales de mayor índice se verían privados de priSelect
si los canales de menor índice estuvieran exigiendo continuamente el servicio. O en lugar de fairSelect
, select
podría usarse, pero no es posible realizar análisis de inanición. El mecanismo de select
solo debe usarse cuando la inanición no es un problema.
Libertad del punto muerto
Al igual que con Go, un programa Java que use canales debe estar diseñado para no estancarse. La implementación de primitivas de concurrencia de bajo nivel en Java es muy difícil de corregir y necesita algo confiable. Afortunadamente, Alternative
ha sido validado por análisis formal, junto con los canales JCSP. Esto lo convierte en una opción sólida y confiable.
Para aclarar un ligero punto de confusión, la versión actual de JCSP es 1.1-rc5 en los repositorios Maven, no lo que dice el sitio web.