java - procesos - ¿Cómo resolver al productor consumidor usando semáforos?
procesos productores y consumidores java (3)
Parece que estás usando un mutex, no un semáforo?
Al usar un mutex solo tiene sincronización binaria, bloqueando y desbloqueando un recurso. Sempahores tiene un valor que puede señalar o adquirir.
Está intentando bloquear / desbloquear todo el búfer, pero ese es el camino equivocado porque, como está viendo, el productor o el consumidor bloquea, y cuando el lector lo ha bloqueado, el productor no puede llenar el búfer (porque tiene que bloquear primero).
En su lugar, debe crear un Sempahore, luego, cuando el productor escribe un paquete o bloque de datos, puede señalar el semáforo. Los consumidores pueden entonces tratar de adquirir el semáforo, por lo que estarán esperando hasta que el productor haya indicado que se ha escrito un paquete. Al señalar un paquete escrito, uno de los consumidores se despertará y sabrá que puede leer un paquete. Puede leer un paquete, luego volver a tratar de adquirir en el semáforo. Si en ese momento el productor ha escrito otro paquete, lo habrá señalado nuevamente y cualquiera de los consumidores continuará leyendo otro paquete. Etc ...
Por ejemplo:
(Productor) - Escribe un paquete - Semaphore.release (1)
(Consumo xN) - Semaphore.acquire (1) - Leer un paquete
Si tiene múltiples consumidores, entonces los consumidores (no el productor) deben bloquear el búfer al leer el paquete (pero no al adquirir el semáforo) para evitar condiciones de carrera. En el ejemplo siguiente, el productor también bloquea la lista, ya que todo está en la misma JVM.
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
public class Semaphores {
static Object LOCK = new Object();
static LinkedList list = new LinkedList();
static Semaphore sem = new Semaphore(0);
static Semaphore mutex = new Semaphore(1);
static class Consumer extends Thread {
String name;
public Consumer(String name) {
this.name = name;
}
public void run() {
try {
while (true) {
sem.acquire(1);
mutex.acquire();
System.out.println("Consumer /""+name+"/" read: "+list.removeFirst());
mutex.release();
}
} catch (Exception x) {
x.printStackTrace();
}
}
}
static class Producer extends Thread {
public void run() {
try {
int N = 0;
while (true) {
mutex.acquire();
list.add(new Integer(N++));
mutex.release();
sem.release(1);
Thread.sleep(500);
}
} catch (Exception x) {
x.printStackTrace();
}
}
}
public static void main(String [] args) {
new Producer().start();
new Consumer("Alice").start();
new Consumer("Bob").start();
}
}
Necesito codificar un problema similar al productor-consumidor, que debe usar semáforos. Intenté un par de soluciones y ninguna funcionó. Primero probé una solución en Wikipedia y no funcionó. Mi código actual es algo así:
Método de ejecución del consumidor:
public void run() {
int i=0;
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
String s = new String();
while (1!=2){
Date datainicio = new Date();
String inicio=dateFormat.format(datainicio);
try {
Thread.sleep(1000);///10000
} catch (InterruptedException e) {
System.out.println("Excecao InterruptedException lancada.");
}
//this.encheBuffer.down();
this.mutex.down();
// RC
i=0;
while (i<buffer.length) {
if (buffer[i] == null) {
i++;
} else {
break;
}
}
if (i<buffer.length) {
QuantidadeBuffer.quantidade--;
Date datafim = new Date();
String fim=dateFormat.format(datafim);
int identificador;
identificador=buffer[i].getIdentificador()[0];
s="Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i;
//System.out.println("Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
buffer[i]= null;
}
// RC
this.mutex.up();
//this.esvaziaBuffer.up();
System.out.println(s);
// lock.up();
}
}
Ejecución del método del productor:
public void run() {
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
int i=0;
while (1!=2){
Date datainicio = new Date();
String inicio=dateFormat.format(datainicio);
// Produz Item
try {
Thread.sleep(500);//50000
} catch (InterruptedException e) {
System.out.println("Excecao InterruptedException lancada.");
}
//this.esvaziaBuffer.down();
this.mutex.down();
// RC
i=0;
while (i<buffer.length) {
if (buffer[i]!=null) {
i++;
} else {
break;
}
}
if (i<buffer.length) {
int identificador[]=new int[Pedido.getTamanho_identificador()];
identificador[0]=i;
buffer[i]=new Pedido();
Produtor.buffer[i].setIdentificador(identificador);
Produtor.buffer[i].setTexto("pacote de dados");
QuantidadeBuffer.quantidade++;
Date datafim = new Date();
String fim=dateFormat.format(datafim);
System.out.println("Produtor Thread: "+Thread.currentThread()+" Pedido: "+identificador[0]+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
i++;
}
// RC
this.mutex.up();
//this.encheBuffer.up();
}
//this.encheBuffer.up();
}
En el código anterior, sucedió que un hilo de consumidor leyó una posición y luego, otro hilo leyó la misma posición sin que un productor llenara esa posición, algo como esto:
Consumidor Thread: Thread[Thread-17,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
Consumidor Thread: Thread[Thread-19,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/**
*
* @author sakshi
*/
public class SemaphoreDemo {
static Semaphore producer = new Semaphore(1);
static Semaphore consumer = new Semaphore(0);
static List<Integer> list = new ArrayList<Integer>();
static class Producer extends Thread {
List<Integer> list;
public Producer(List<Integer> list) {
this.list = list;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
producer.acquire();
} catch (InterruptedException ex) {
Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println("produce=" + i);
list.add(i);
consumer.release();
}
}
}
static class Consumer extends Thread {
List<Integer> list;
public Consumer(List<Integer> list) {
this.list = list;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
consumer.acquire();
} catch (InterruptedException ex) {
Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println("consume=" + list.get(i));
producer.release();
}
}
}
public static void main(String[] args) {
Producer produce = new Producer(list);
Consumer consume = new Consumer(list);
produce.start();
consume.start();
}
}
output:
produce=0
consume=0
produce=1
consume=1
produce=2
consume=2
produce=3
consume=3
produce=4
consume=4
produce=5
consume=5
produce=6
consume=6
produce=7
consume=7
produce=8
consume=8
produce=9
consume=9
import java.util.concurrent.Semaphore;
public class ConsumerProducer{
public static void main(String[] args) {
Semaphore semaphoreProducer=new Semaphore(1);
Semaphore semaphoreConsumer=new Semaphore(0);
System.out.println("semaphoreProducer permit=1 | semaphoreConsumer permit=0");
new Producer(semaphoreProducer,semaphoreConsumer).start();
new Consumer(semaphoreConsumer,semaphoreProducer).start();
}
/**
* Producer Class.
*/
static class Producer extends Thread{
Semaphore semaphoreProducer;
Semaphore semaphoreConsumer;
public Producer(Semaphore semaphoreProducer,Semaphore semaphoreConsumer) {
this.semaphoreProducer=semaphoreProducer;
this.semaphoreConsumer=semaphoreConsumer;
}
public void run() {
for(;;){
try {
semaphoreProducer.acquire();
System.out.println("Produced : "+Thread.currentThread().getName());
semaphoreConsumer.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* Consumer Class.
*/
static class Consumer extends Thread{
Semaphore semaphoreConsumer;
Semaphore semaphoreProducer;
public Consumer(Semaphore semaphoreConsumer,Semaphore semaphoreProducer) {
this.semaphoreConsumer=semaphoreConsumer;
this.semaphoreProducer=semaphoreProducer;
}
public void run() {
for(;;){
try {
semaphoreConsumer.acquire();
System.out.println("Consumed : "+Thread.currentThread().getName());
semaphoreProducer.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}