hilos - Patrón de concurrencia de Java para recursos compartidos externos(tarjetas inteligentes)
threadpoolexecutor java 8 (5)
¿Has considerado usar Apache Commons Pool ?
Debe mantener un grupo de objetos de SmartcardWrapper donde cada SmartcardWrapper representará una tarjeta inteligente física. Cada vez que necesite realizar un nuevo cálculo, tome prestado el objeto de la agrupación, realice el cálculo y devuelva el objeto en la agrupación para que el siguiente hilo pueda reutilizarlo.
La agrupación en sí es segura para subprocesos y se bloquea cuando no hay objetos disponibles. Todo lo que necesita hacer es implementar una api para agregar / eliminar objetos de SmartcardWrapper al grupo.
Tengo un servicio de servidor web donde los clientes solicitan un cálculo de tarjeta inteligente y obtienen sus resultados. El número de tarjeta inteligente disponible puede disminuir o aumentar durante el tiempo de actividad del servidor, por ejemplo, puedo agregar o eliminar físicamente la tarjeta inteligente del lector (o muchos otros eventos ... como excepción, etc.).
Un cálculo de tarjeta inteligente puede tardar un tiempo, por lo que tengo que optimizar estos trabajos para usar todas las tarjetas inteligentes disponibles si hay solicitudes simultáneas al servidor web.
Pensé trabajar con un grupo de hilos de tarjetas inteligentes. Lo inusual, al menos para mí, es que la agrupación debe cambiar su tamaño, no dependiendo de las solicitudes del cliente, sino solo de la disponibilidad de la tarjeta inteligente.
Estudié muchos ejemplos de:
- BlockingQueue : Se ve bien almacenar la solicitud y detener el hilo esperando algo que hacer.
- Tarea futura : Puedo usar esta clase para dejar que el cliente espere su respuesta, pero ¿qué tipo de ejecutor debe hacer la tarea?
- ThreadPoolExecutor : parece lo que necesito, pero con esto no puedo cambiar el tamaño del grupo, además, cada hilo debe estar vinculado a una única ranura para tarjeta inteligente. Esto puede ser una solución si pudiera cambiar el tamaño del grupo (agregar un hilo cuando se inserta una tarjeta inteligente y quitar un hilo cuando se retira una tarjeta inteligente) y si puedo asignar una tarjeta inteligente específica a cada hilo.
Este es el control de la tarjeta inteligente, tengo una tarjeta inteligente para cada tarjeta inteligente, cada tarjeta inteligente tiene su propio número de ranura.
public class SmartcardWrapper{
private int slot;
public SmartcardWrapper(int slot) {
this.slot=slot;
}
public byte[] compute(byte[] input) {
byte[] out=new byte[];
SmartcardApi.computerInput(slot,input,out); //Native method
return out;
}
}
Intenté crear un grupo de hilos con un hilo por tarjeta inteligente:
private class SmartcardThread extends Thread{
protected SmartcardWrapper sw;
public SmartcardThread(SmartcardWrapper sw){
this.sw=sw;
}
@Override
public void run() {
while(true){
byte[] input=queue.take();
byte output=sw.compute(input);
// I have to return back the output to the client
}
}
}
Todos esperando algo en la misma cola de entrada:
BlockingQueue<byte[]> queue=new BlockingQueue<byte[]>();
Pero, ¿cómo devolver la salida de un subproceso de tarjeta inteligente al servidor web-cliente? Esto me deja pensar que BlockingQueue no es mi solución.
¿Cómo abordar este problema? ¿Qué patrón de concurrencia debo seguir? ¿es correcto asignar un hilo por tarjeta inteligente o debo simplemente usar semáforos?
Al observar los requisitos, la mejor arquitectura sería desacoplar el cálculo de la tarjeta inteligente de sus servicios web.
Confiar en los servicios web para esperar en las tareas intensivas del procesador resultará en tiempos de espera.
La mejor solución es pre-computar la tarjeta inteligente usando un trabajo periódico y almacenando esas ranuras, pares de cómputo en un servidor de caché como Redis.
El trabajo del sincronizador de tarjetas inteligentes es una aplicación independiente independiente J2SE que verifica periódicamente qué tarjeta inteligente está disponible y activa (sin errores) y actualiza el caché de Redis con ranura y cálculo como un par clave / valor. Si hay una tarjeta inteligente no disponible, se eliminará de la memoria caché.
El servicio web solo revisará la memoria caché Redis para una clave de ranura en particular y si encuentra un valor lo devolverá o, de lo contrario, devolverá un no encontrado para esa ranura (no disponible o error)
Este diseño es escalable tanto en el extremo de la tarjeta inteligente como en el final de las solicitudes del cliente.
En respuesta a su pregunta sobre cómo devolver el resultado a la persona que llama:
Todos esperando algo en la misma cola de entrada:
BlockingQueue queue = new BlockingQueue ();
Pero, ¿cómo devolver la salida de un subproceso de tarjeta inteligente al servidor web-cliente? Esto me deja pensar que BlockingQueue no es mi solución.
La idea de la cola de envío está muy bien, pero también necesita una cola por hilo para devolver el resultado al remitente del trabajo ...
Cambia tu cola de envío a:
BlockingQueue<JobSubmitRec> queue=new BlockingQueue<JobSubmitRec>();
y JobSubmitRec tendrá el byte [] y una cola de un solo uso para devolver el resultado:
class JobSubmitRec
{
byte[] data;
BlockingQueue<JobSubmitResult> result=new LinkedBlockingQueue<JobSubmitResult>();
}
y su hilo de ejecución de trabajador se verá algo como:
public void run() {
while(true){
JobSubmitRec submitrec = queue.take();
byte[] input = submitrec.data;
byte output = sw.compute(input);
submitrec.result.put( new JobSubmitResult(output) );
}
}
y el cliente que envíe el trabajo se verá así:
JobSubmitRec jsr = new JobSubmitRec( data );
queue.put( jsr );
JobSubmitResult result = jsr.result.take();
// use result here
Podría haber encontrado una solución simple razonable basada en los siguientes supuestos:
- un proceso separado administra las notificaciones (eventos del sistema) para las tarjetas inteligentes que están disponibles o se eliminan.
- a un cliente no le importa qué tarjeta inteligente use, siempre que pueda usar una sin interferencia.
Estas dos suposiciones en realidad facilitan la creación de una solución de agrupación (recursos compartidos), ya que generalmente es la propia agrupación la responsable de crear y eliminar recursos cuando sea apropiado. Sin esta funcionalidad, una solución de agrupación se vuelve más simple. Supongo que el cliente que obtiene una tarjeta inteligente del grupo para usar, puede ejecutar las funciones de tarjeta inteligente requeridas dentro de su propio hilo de ejecución (similar a cómo se usa una conexión de base de datos desde un grupo de conexiones de base de datos para consultar datos de una base de datos).
Solo he realizado algunas pruebas mínimas para las dos clases que se muestran a continuación, y me temo que la mayor parte del trabajo consiste en pruebas escritas (de unidad) que prueban que la agrupación funciona correctamente con solicitudes de clientes concurrentes combinadas con la adición y eliminación de recursos de tarjetas inteligentes. Si no desea hacer eso, entonces la respuesta del usuario769771 es probablemente una mejor solución. Pero si lo haces, pruébalo, mira si encaja. La idea es que solo todos los clientes creen y utilicen una instancia de pool de recursos y la actualicen mediante un proceso independiente que gestiona la disponibilidad de tarjetas inteligentes.
import java.util.*;
import java.util.concurrent.*;
/**
* A resource pool that expects shared resources
* to be added and removed from the pool by an external process
* (i.e. not done by the pool itself, see {@link #add(Object)} and {@link #remove(Object)}.
* <br>A {@link ResourcePoolValidator} can optionally be used.
* @param <T> resource type handed out by the pool.
*/
public class ResourcePool<T> {
private final Set<T> registered = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());
/* Use a linked list as FIFO queue for resources to lease. */
private final List<T> available = Collections.synchronizedList(new LinkedList<T>());
private final Semaphore availableLock = new Semaphore(0, true);
private final ResourcePoolValidator<T> validator;
public ResourcePool() {
this(null);
}
public ResourcePool(ResourcePoolValidator<T> validator) {
super();
this.validator = validator;
}
/**
* Add a resource to the pool.
* @return true if resource is not already in the pool.
*/
public synchronized boolean add(T resource) {
boolean added = false;
if (!registered.contains(resource)) {
registered.add(resource);
available.add(resource);
availableLock.release();
added = true;
}
return added;
}
/**
* Removes a resource from the pool.
* The resource might be in use (see {@link #isLeased(Object)})
* in which case {@link ResourcePoolValidator#abandoned(Object)} will be called
* when the resource is no longer used (i.e. released).
* @return true if resource was part of the pool and removed from the pool.
*/
public synchronized boolean remove(T resource) {
// method is synchronized to prevent multiple threads calling add and remove at the same time
// which could in turn bring the pool in an invalid state.
return registered.remove(resource);
}
/**
* If the given resource is (or was, see also {@link #remove(Object)} part of the pool,
* a returned value true indicates the resource is in use / checked out.
* <br>This is a relative expensive method, do not call it frequently.
*/
public boolean isLeased(T resource) {
return !available.contains(resource);
}
/**
* Try to get a shared resource for usage.
* If a resource is acquired, it must be {@link #release(Object)}d in a finally-block.
* @return A resource that can be exclusively used by the caller.
* @throws InterruptedException When acquiring a resource is interrupted.
* @throws TimeoutException When a resource is not available within the given timeout period.
*/
public T tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException, TimeoutException {
T resource = null;
long timeRemaining = tunit.toMillis(timeout);
final long tend = System.currentTimeMillis() + timeRemaining;
do {
if (availableLock.tryAcquire(timeRemaining, TimeUnit.MILLISECONDS)) {
resource = available.remove(0);
if (registered.contains(resource)) {
boolean valid = false;
try {
valid = (validator == null ? true : validator.isValid(resource));
} catch (Exception e) {
// TODO: log exception
e.printStackTrace();
}
if (valid) {
break; // return the "checked out" resource
} else {
// remove invalid resource from pool
registered.remove(resource);
if (validator != null) {
validator.abandoned(resource);
}
}
}
// resource was removed from pool, try acquire again
// note that this implicitly lowers the maximum available resources
// (an acquired permit from availableLock goes unused).
// TODO: retry puts us at the back of availableLock queue but should put us at the front of the queue
resource = null;
}
timeRemaining = tend - System.currentTimeMillis();
} while (timeRemaining > 0L);
if (resource == null) {
throw new TimeoutException("Unable to acquire a resource within " + tunit.toMillis(timeout) + " ms.");
}
return resource;
}
/**
* This method must be called by the caller / client whenever {@link #tryAcquire(long, TimeUnit)}
* has returned a resource. If the caller has determined the resource is no longer valid,
* the caller should call {@link #remove(Object)} before calling this method.
* @param resource no longer used.
*/
public void release(T resource) {
if (resource == null) {
return;
}
if (registered.contains(resource)) {
available.add(resource);
availableLock.release();
} else {
if (validator != null) {
validator.abandoned(resource);
}
}
}
/** An array (copy) of all resources registered in the pool. */
@SuppressWarnings("unchecked")
public T[] getRegisteredResources() {
return (T[]) registered.toArray(new Object[registered.size()]);
}
}
Y una clase separada con funciones relacionadas con el proceso separado que administra la disponibilidad de smarcard.
import java.util.concurrent.TimeUnit;
/**
* Used by a {@link ResourcePool} to validate a resource before handing it out for lease
* (see {@link #isValid(Object)} and signal a resource is no longer used (see {@link #abandoned(Object)}).
*/
public class ResourcePoolValidator<T> {
/**
* Overload this method (this method does nothing by default)
* to validate a resource before handing it out for lease.
* If this method returns false or throws an exception (which it preferably should not do),
* the resource is removed from the pool.
* @return true if the resource is valid for leasing
*/
public boolean isValid(T resource) {
return true;
}
/**
* Called by the {@link ResourcePool#release(Object)} method when a resource is released by a caller
* but the resource was previously removed from the pool and in use.
* <br>Called by {@link ResourcePool#tryAcquire(long, TimeUnit)} if a resource if not valid
* (see {@link #isValid(Object)}.
* <br>Overload this method (this method does nothing by default) to create a notification of an unused resource,
* do NOT do any long period of processing as this method is called from a caller (client) thread.
*/
public void abandoned(T resource) {
// NO-OP
}
}
Su suposición:
ThreadPoolExecutor: parece lo que necesito, pero con esto no puedo cambiar el tamaño del grupo, además, cada hilo debe estar vinculado a una única ranura para tarjeta inteligente.
no es correcto.
You can set thread pool size dynamically.
Eche un vistazo a las siguientes API de ThreadPoolExecutor
public void setMaximumPoolSize(int maximumPoolSize)
Establece el número máximo permitido de hilos. Esto anula cualquier valor establecido en el constructor. Si el nuevo valor es más pequeño que el valor actual, el exceso de subprocesos existentes se terminará cuando estén inactivos.
public void setCorePoolSize(int corePoolSize)
Establece el número de núcleo de hilos. Esto anula cualquier valor establecido en el constructor. Si el nuevo valor es más pequeño que el valor actual, el exceso de subprocesos existentes se terminará cuando estén inactivos. Si son más grandes, los nuevos subprocesos, si es necesario, se iniciarán para ejecutar las tareas en cola.
Core and maximum pool sizes:
Un ThreadPoolExecutor
ajustará automáticamente el tamaño del grupo de acuerdo con los límites establecidos por corePoolSize
y maximumPoolSize
.
Cuando se envía una nueva tarea en el método execute(java.lang.Runnable)
, y se ejecutan menos hilos de corePoolSize
, se crea un nuevo hilo para manejar la solicitud, incluso si otros hilos de trabajo están inactivos.
Si hay más de corePoolSize
pero menos de maximumPoolSize
ejecutando, se maximumPoolSize
un nuevo thread solo si la cola está llena.
Al establecer maximumPoolSize
en un valor esencialmente ilimitado, como Integer.MAX_VALUE
, permite que la agrupación acomode un número arbitrario de tareas simultáneas . Pero no recomendaría tener tantos números de hilos. Establezca este valor con precaución.
Normalmente, los tamaños de agrupación máxima y máxima se establecen solo en la construcción, pero también pueden cambiarse dinámicamente usando setCorePoolSize(int
) y setMaximumPoolSize(int)
.
EDITAR:
Para una mejor utilización del conjunto de subprocesos, si sabe que el número máximo de tarjetas es 6, puede usar
ExecutorService executor = Executors.newFixedThreadPool(6);
O