java - thread - Procesamiento secuencial y paralelo.
synchronized java (10)
Los tokens con el mismo valor deben procesarse secuencialmente.
La forma de asegurar que ocurran dos cosas en secuencia es hacerlas en el mismo hilo.
Tendría una colección de muchos hilos de trabajadores, y tendría un Mapa. Cada vez que recibo un token que no he visto antes, escogeré un hilo al azar e ingresaré el token y el hilo en el mapa. A partir de entonces, usaré ese mismo hilo para ejecutar tareas asociadas con ese token.
Crear nuevos Runnables sería muy costoso
Runnable
es una interfaz. Crear nuevos objetos que implementen Runnable
no será significativamente más costoso que crear cualquier otro tipo de objeto.
Tengo un productor y muchos consumidores.
- El productor es rápido y genera muchos resultados.
- Los tokens con el mismo valor deben procesarse secuencialmente.
- Los tokens con diferentes valores deben procesarse en paralelo.
- crear nuevos Runnables sería muy costoso y también el código de producción podría funcionar con 100k de Tokens (para crear un Runnable tengo que pasarle al constructor algo complejo para construir objetos)
¿Puedo lograr los mismos resultados con un algoritmo más simple? Anidar un bloque de sincronización con un bloqueo de reentrada parece un poco antinatural. ¿Hay alguna condición de raza que puedas notar?
Actualización: una segunda solución que encontré estaba trabajando con 3 colecciones. Uno para cachear los resultados del productor, segundo una cola de bloqueo y tercero usando una lista para rastrear en las tareas en progreso. De nuevo un poco complicado.
Mi version de codigo
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
public class Main1 {
static class Token {
private int order;
private String value;
Token() {
}
Token(int o, String v) {
order = o;
value = v;
}
int getOrder() {
return order;
}
String getValue() {
return value;
}
}
private final static BlockingQueue<Token> queue = new ArrayBlockingQueue<Token>(10);
private final static ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
private final static ReentrantLock reentrantLock = new ReentrantLock();
private final static Token STOP_TOKEN = new Token();
private final static List<String> lockList = Collections.synchronizedList(new ArrayList<String>());
public static void main(String[] args) {
ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
producerExecutor.submit(new Runnable() {
public void run() {
Random random = new Random();
try {
for (int i = 1; i <= 100; i++) {
Token token = new Token(i, String.valueOf(random.nextInt(1)));
queue.put(token);
}
queue.put(STOP_TOKEN);
}catch(InterruptedException e){
e.printStackTrace();
}
}
});
ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
for(int i=1; i<=10;i++) {
// creating to many runnable would be inefficient because of this complex not thread safe object
final Object dependecy = new Object(); //new ComplexDependecy()
consumerExecutor.submit(new Runnable() {
public void run() {
while(true) {
try {
//not in order
Token token = queue.take();
if (token == STOP_TOKEN) {
queue.add(STOP_TOKEN);
return;
}
System.out.println("Task start" + Thread.currentThread().getId() + " order " + token.getOrder());
Random random = new Random();
Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy)
lockList.remove(token.getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}});
}
}}
¿Es todo lo que necesita para asegurarse de que los tokens con el mismo valor no se procesen simultáneamente? Su código es demasiado complicado para entender lo que quiere decir (no se compila y tiene muchas variables, bloqueos y mapas no utilizados, que se crean pero nunca se usan). Parece que estás pensando demasiado en esto. Todo lo que necesita es una cola y un mapa. Algo así me imagino:
class Consumer implements Runnable {
ConcurrentHashMap<String, Token> inProcess;
BlockingQueue<Token> queue;
public void run() {
Token token = null;
while ((token = queue.take()) != null) {
if(inProcess.putIfAbsent(token.getValue(), token) != null) {
queue.put(token);
continue;
}
processToken(token);
inProcess.remove(token.getValue());
}
}
}
Creo que hay un problema de diseño más fundamental oculto detrás de esta tarea, pero está bien. No puedo deducir de usted la descripción del problema si desea una ejecución en orden o si solo desea que las operaciones en las tareas descritas por tokens individuales sean atómicas / transaccionales. Lo que propongo a continuación es más una "solución rápida" a este problema que una solución real.
Para el caso real de "ejecución ordenada", propongo una solución basada en proxies de cola que ordenan la salida:
Defina una implementación de la cola que proporcione un método de fábrica que genere las colas de proxy que están representadas en el lado del productor por este único objeto de cola; El método de fábrica también debe registrar estos objetos de cola proxy. agregar un elemento a la cola de entrada debe agregarlo directamente a una de las colas de salida si coincide con uno de los elementos en una de las colas de salida. De lo contrario, agréguelo a cualquier cola de salida (la más corta). (implementar el cheque para esto de manera eficiente). Alternativamente (ligeramente mejor): no haga esto cuando se agregue el elemento, sino cuando alguna de las colas de salida se vacíe.
Proporcione a cada uno de sus consumidores ejecutables un campo que almacene una interfaz de cola individual (en lugar de acceder a un solo objeto). Inicialice este campo con el método de fábrica definido anteriormente.
Para el caso de la transacción, creo que es más fácil abarcar más hilos que los núcleos (use las estadísticas para calcular esto) e implementar el mecanismo de bloqueo en un nivel inferior (objeto).
La siguiente solución solo utilizará un Mapa único que utilizan el productor y los consumidores para procesar los pedidos en orden secuencial para cada número de pedido mientras procesan diferentes números de pedido en paralelo. Aquí está el código:
public class Main {
private static final int NUMBER_OF_CONSUMER_THREADS = 10;
private static volatile int sync = 0;
public static void main(String[] args) {
final ConcurrentHashMap<String,Controller> queues = new ConcurrentHashMap<String, Controller>();
final CountDownLatch latch = new CountDownLatch(NUMBER_OF_CONSUMER_THREADS);
final AtomicBoolean done = new AtomicBoolean(false);
// Create a Producer
new Thread() {
{
this.setDaemon(true);
this.setName("Producer");
this.start();
}
public void run() {
Random rand = new Random();
for(int i =0 ; i < 1000 ; i++) {
int order = rand.nextInt(20);
String key = String.valueOf(order);
String value = String.valueOf(rand.nextInt());
Controller controller = queues.get(key);
if (controller == null) {
controller = new Controller();
queues.put(key, controller);
}
controller.add(new Token(order, value));
Main.sync++;
}
done.set(true);
}
};
while (queues.size() < 10) {
try {
// Allow the producer to generate several entries that need to
// be processed.
Thread.sleep(5000);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
// System.out.println(queues);
// Create the Consumers
ExecutorService consumers = Executors.newFixedThreadPool(NUMBER_OF_CONSUMER_THREADS);
for(int i = 0 ; i < NUMBER_OF_CONSUMER_THREADS ; i++) {
consumers.submit(new Runnable() {
private Random rand = new Random();
public void run() {
String name = Thread.currentThread().getName();
try {
boolean one_last_time = false;
while (true) {
for (Map.Entry<String, Controller> entry : queues.entrySet()) {
Controller controller = entry.getValue();
if (controller.lock(this)) {
ConcurrentLinkedQueue<Token> list = controller.getList();
Token token;
while ((token = list.poll()) != null) {
try {
System.out.println(name + " processing order: " + token.getOrder()
+ " value: " + token.getValue());
Thread.sleep(rand.nextInt(200));
} catch (InterruptedException e) {
}
}
int last = Main.sync;
queues.remove(entry.getKey());
while(done.get() == false && last == Main.sync) {
// yield until the producer has added at least another entry
Thread.yield();
}
// Purge any new entries added
while ((token = list.poll()) != null) {
try {
System.out.println(name + " processing order: " + token.getOrder()
+ " value: " + token.getValue());
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
controller.unlock(this);
}
}
if (one_last_time) {
return;
}
if (done.get()) {
one_last_time = true;
}
}
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
consumers.shutdown();
System.out.println("Exiting.. remaining number of entries: " + queues.size());
}
}
Tenga en cuenta que la clase Main contiene una instancia de colas que es un mapa. La clave del mapa es la identificación del pedido que desea que los consumidores procesen secuencialmente. El valor es una clase de Controlador que contendrá todos los pedidos asociados con ese ID de pedido.
El productor generará los pedidos y agregará el pedido (Token) a su Controlador asociado. Los consumidores repetirán los valores del mapa de colas y llamarán al método de bloqueo del controlador para determinar si puede procesar pedidos para esa identificación de pedido en particular. Si el bloqueo devuelve falso, comprobará la próxima instancia del Controlador. Si el bloqueo es verdadero, procesará todos los pedidos y luego verificará el siguiente Controlador.
actualizado Se agregó el número entero de sincronización que se usa para garantizar que cuando se elimina una instancia del Controlador del mapa de colas. Todas sus entradas serán consumidas. Hubo un error lógico en el código del consumidor donde se llamó al método de desbloqueo pronto.
La clase Token es similar a la que has publicado aquí.
class Token {
private int order;
private String value;
Token(int order, String value) {
this.order = order;
this.value = value;
}
int getOrder() {
return order;
}
String getValue() {
return value;
}
@Override
public String toString() {
return "Token [order=" + order + ", value=" + value + "]/n";
}
}
La clase de Controlador que sigue se usa para asegurar que solo un solo hilo dentro del grupo de hilos procesará las órdenes. Los métodos de bloqueo / desbloqueo se utilizan para determinar cuáles de los subprocesos podrán procesar los pedidos.
class Controller {
private ConcurrentLinkedQueue<Token> tokens = new ConcurrentLinkedQueue<Token>();
private ReentrantLock lock = new ReentrantLock();
private Runnable current = null;
void add(Token token) {
tokens.add(token);
}
public ConcurrentLinkedQueue<Token> getList() {
return tokens;
}
public void unlock(Runnable runnable) {
lock.lock();
try {
if (current == runnable) {
current = null;
}
} finally {
lock.unlock();
}
}
public boolean lock(Runnable runnable) {
lock.lock();
try {
if (current == null) {
current = runnable;
}
} finally {
lock.unlock();
}
return current == runnable;
}
@Override
public String toString() {
return "Controller [tokens=" + tokens + "]";
}
}
Información adicional sobre la implementación. Utiliza un CountDownLatch para asegurar que todas las órdenes producidas se procesarán antes de salir del proceso. La variable realizada es igual que su variable STOP_TOKEN.
La implementación contiene un problema que debe resolver. Existe el problema de que no purga el controlador para un ID de pedido cuando se han procesado todos los pedidos. Esto provocará instancias en las que un subproceso en el grupo de subprocesos se asigna a un controlador que no contiene órdenes. Lo que desperdiciará ciclos de cpu que podrían usarse para realizar otras tareas.
No estoy del todo seguro de haber entendido la pregunta, pero intentaré un algoritmo.
Los actores son:
- Una
queue
de tareas - Un
pool
deexecutors
libres. - Un
set
de tokensin-process
actualmente en proceso - Un
controller
Entonces,
Inicialmente todos los
executors
están disponibles y elset
está vacío.controller
elige unexecutor
disponible y recorre laqueue
busca de unatask
con un token que no está en el conjuntoin-process set
y cuando lo encuentra- agrega el token al conjunto
in-process
- asigna el
executor
para procesar latask
y - vuelve al principio de la cola
- agrega el token al conjunto
el
executor
elimina el token delset
cuando termina de procesarse y se agrega a la agrupación
Por la naturaleza de su código, la única manera de garantizar que los tokens con el mismo valor se procesen en serie es esperar a que llegue STOP_TOKEN.
Necesitará una configuración de un solo productor y un solo consumidor, con el consumidor recolectando y clasificando los tokens por su valor (en el Multimap, digamos).
Solo entonces sabrá qué tokens se pueden procesar en serie y cuáles se pueden procesar en paralelo.
De todos modos, te aconsejo que mires el interruptor LMAX , que es una forma muy efectiva de compartir datos entre hilos.
No sufre de sobrecarga de sincronización como Ejecutores, ya que está libre de bloqueos (lo que puede brindarle un buen beneficio de rendimiento, dependiendo de la naturaleza de su procesamiento de datos).
La solución utilizando dos disruptores.
// single thread for processing as there will be only on consumer
Disruptor<InEvent> inboundDisruptor = new Disruptor<>(InEvent::new, 32, Executors.newSingleThreadExecutor());
// outbound disruptor that uses 3 threads for event processing
Disruptor<OutEvent> outboundDisruptor = new Disruptor<>(OutEvent::new, 32, Executors.newFixedThreadPool(3));
inboundDisruptor.handleEventsWith(new InEventHandler(outboundDisruptor));
// setup 3 event handlers, doing round robin consuming, effectively processing OutEvents in 3 threads
outboundDisruptor.handleEventsWith(new OutEventHandler(0, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(1, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(2, 3, new Object()));
inboundDisruptor.start();
outboundDisruptor.start();
// publisher code
for (int i = 0; i < 10; i++) {
inboundDisruptor.publishEvent(InEventTranslator.INSTANCE, new Token());
}
El controlador de eventos en el disruptor de entrada simplemente recopila tokens entrantes. Cuando se recibe el token de STOP, publica la serie de tokens al disruptor de salida para su posterior procesamiento:
public class InEventHandler implements EventHandler<InEvent> {
private ListMultimap<String, Token> tokensByValue = ArrayListMultimap.create();
private Disruptor<OutEvent> outboundDisruptor;
public InEventHandler(Disruptor<OutEvent> outboundDisruptor) {
this.outboundDisruptor = outboundDisruptor;
}
@Override
public void onEvent(InEvent event, long sequence, boolean endOfBatch) throws Exception {
if (event.token == STOP_TOKEN) {
// publish indexed tokens to outbound disruptor for parallel processing
tokensByValue.asMap().entrySet().stream().forEach(entry -> outboundDisruptor.publishEvent(OutEventTranslator.INSTANCE, entry.getValue()));
} else {
tokensByValue.put(event.token.value, event.token);
}
}
}
El controlador de eventos de salida procesa tokens del mismo valor de forma secuencial:
public class OutEventHandler implements EventHandler<OutEvent> {
private final long order;
private final long allHandlersCount;
private Object yourComplexDependency;
public OutEventHandler(long order, long allHandlersCount, Object yourComplexDependency) {
this.order = order;
this.allHandlersCount = allHandlersCount;
this.yourComplexDependency = yourComplexDependency;
}
@Override
public void onEvent(OutEvent event, long sequence, boolean endOfBatch) throws Exception {
if (sequence % allHandlersCount != order ) {
// round robin, do not consume every event to allow parallel processing
return;
}
for (Token token : event.tokensToProcessSerially) {
// do procesing of the token using your complex class
}
}
}
El resto de la infraestructura requerida (propósito descrito en los documentos del disruptor):
public class InEventTranslator implements EventTranslatorOneArg<InEvent, Token> {
public static final InEventTranslator INSTANCE = new InEventTranslator();
@Override
public void translateTo(InEvent event, long sequence, Token arg0) {
event.token = arg0;
}
}
public class OutEventTranslator implements EventTranslatorOneArg<OutEvent, Collection<Token>> {
public static final OutEventTranslator INSTANCE = new OutEventTranslator();
@Override
public void translateTo(OutEvent event, long sequence, Collection<Token> tokens) {
event.tokensToProcessSerially = tokens;
}
}
public class InEvent {
// Note that no synchronization is used here,
// even though the field is used among multiple threads.
// Memory barrier used by Disruptor guarantee changes are visible.
public Token token;
}
public class OutEvent {
// ... again, no locks.
public Collection<Token> tokensToProcessSerially;
}
public class Token {
String value;
}
Puede crear previamente un conjunto de Runnables
que recogerá las tareas entrantes (tokens) y las colocará en colas de acuerdo con su valor de orden.
Como se señaló en los comentarios, no está garantizado que los tokens con valores diferentes siempre se ejecuten en paralelo (en general, está limitado, al menos, por el número de núcleos físicos en su caja). Sin embargo, se garantiza que las fichas con la misma orden se ejecutarán en el orden de llegada.
Código de muestra:
/**
* Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}).
*/
public class TasksOrderingExecutor {
public interface Task extends Runnable {
/**
* @return ordering value which will be used to sequence tasks with the same value.<br>
* Tasks with different ordering values <i>may</i> be executed in parallel, but not guaranteed to.
*/
String getOrder();
}
private static class Worker implements Runnable {
private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
private volatile boolean stopped;
void schedule(Task task) {
tasks.add(task);
}
void stop() {
stopped = true;
}
@Override
public void run() {
while (!stopped) {
try {
Task task = tasks.take();
task.run();
} catch (InterruptedException ie) {
// perhaps, handle somehow
}
}
}
}
private final Worker[] workers;
private final ExecutorService executorService;
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
workers = new Worker[queuesNr];
for (int i = 0; i < queuesNr; i++) {
Worker worker = new Worker();
executorService.submit(worker);
workers[i] = worker;
}
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.schedule(task);
}
public void stop() {
for (Worker w : workers) w.stop();
executorService.shutdown();
}
private Worker getWorker(Task task) {
return workers[task.getOrder().hashCode() % workers.length];
}
}
Si tiene muchos tokens diferentes, entonces la solución más simple es crear un número de ejecutores de un solo hilo (aproximadamente 2 veces su número de núcleos), y luego distribuir cada tarea a un ejecutor determinado por el hash de su token.
De esa forma, todas las tareas con el mismo token irán al mismo ejecutor y se ejecutarán de forma secuencial, ya que cada ejecutor solo tiene un subproceso.
Si tiene algunos requisitos no declarados sobre la imparcialidad de la programación, entonces es bastante fácil evitar cualquier desequilibrio significativo haciendo que el subproceso del productor ponga en cola sus solicitudes (o bloque) antes de distribuirlas, hasta que haya, por ejemplo, menos de 10 solicitudes por ejecutor pendientes. .
Tal vez estoy malinterpretando algo. Pero parece que sería más fácil filtrar los Tokens con el mismo valor de los que tienen valores diferentes en dos colas diferentes inicialmente.
Y luego use Stream con mapa o foreach para la secuencia. Y simplemente use la versión de flujo paralelo para el resto.
Si sus tokens en el entorno de producción se generan de forma perezosa y solo obtiene uno a la vez, simplemente hace algún tipo de filtro que los distribuye a las dos colas diferentes.
Si puede implementarlo con Streams, lo mejor es hacerlo, ya que son simples, fáciles de usar y RÁPIDOS.
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
Hice un breve ejemplo de lo que quiero decir. En este caso, los números de Tokens están construidos artificialmente, pero eso no viene al caso. También las secuencias se inician en el subproceso principal, lo que probablemente tampoco sería ideal.
public static void main(String args[]) {
ArrayList<Token> sameValues = new ArrayList<Token>();
ArrayList<Token> distinctValues = new ArrayList<Token>();
Random random = new Random();
for (int i = 0; i < 100; i++) {
int next = random.nextInt(100);
Token n = new Token(i, String.valueOf(next));
if (next == i) {
sameValues.add(n);
} else {
distinctValues.add(n);
}
}
distinctValues.stream().parallel().forEach(token -> System.out.println("Distinct: " + token.value));
sameValues.stream().forEach(token -> System.out.println("Same: " + token.value));
}
Una forma de hacer esto es tener un ejecutor para el procesamiento de secuencias y otro para el procesamiento paralelo. También necesitamos un servicio de administrador de un solo hilo que decidirá qué token de servicio debe enviarse para su procesamiento. // Cola para ser compartida por ambos hilos. Contiene los tokens producidos por el productor.
BlockingQueue tokenList = new ArrayBlockingQueue (10);
private void startProcess() {
ExecutorService producer = Executors.newSingleThreadExecutor();
final ExecutorService consumerForSequence = Executors
.newSingleThreadExecutor();
final ExecutorService consumerForParallel = Executors.newFixedThreadPool(10);
ExecutorService manager = Executors.newSingleThreadExecutor();
producer.submit(new Producer(tokenList));
manager.submit(new Runnable() {
public void run() {
try {
while (true) {
Token t = tokenList.take();
System.out.println("consumed- " + t.orderid
+ " element");
if (t.orderid % 7 == 0) { // any condition to check for sequence processing
consumerForSequence.submit(new ConsumerForSequenceProcess(t));
} else {
ConsumerForParallel.submit(new ConsumerForParallelProcess(t));
}
}
}
catch (InterruptedException e) { // TODO Auto-generated catch
// block
e.printStackTrace();
}
}
});
}