java threadpool priority-queue executor futuretask

java - Cómo implementar PriorityBlockingQueue con ThreadPoolExecutor y tareas personalizadas



priority-queue futuretask (6)

Intentaré explicar este problema con un código completamente funcional. Pero antes de sumergirme en el código, me gustaría explicar acerca de PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue es una implementación de BlockingQueue. Acepta las tareas junto con su prioridad y envía la tarea con la prioridad más alta para la ejecución primero. Si cualquiera de las dos tareas tiene la misma prioridad, entonces necesitamos proporcionar alguna lógica personalizada para decidir qué tarea va primero.

Ahora vamos a entrar en el código de inmediato.

Clase de controlador : esta clase crea un ejecutor que acepta tareas y luego las envía para su ejecución. Aquí creamos dos tareas, una con prioridad BAJA y la otra con prioridad ALTA. Aquí le decimos al ejecutor que ejecute un MAX de 1 subprocesos y use el PriorityBlockingQueue

public static void main(String[] args) { /* Minimum number of threads that must be running : 0 Maximium number of threads that can be created : 1 If a thread is idle, then the minimum time to keep it alive : 1000 Which queue to use : PriorityBlockingQueue */ PriorityBlockingQueue queue = new PriorityBlockingQueue(); ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1, 1000, TimeUnit.MILLISECONDS,queue); MyTask task = new MyTask(Priority.LOW,"Low"); executor.execute(new MyFutureTask(task)); task = new MyTask(Priority.HIGH,"High"); executor.execute(new MyFutureTask(task)); }

Clase MyTask : MyTask implementa Runnable y acepta la prioridad como un argumento en el constructor. Cuando se ejecuta esta tarea, imprime un mensaje y luego pone el hilo en suspensión durante 1 segundo.

public class MyTask implements Runnable { public int getPriority() { return priority.getValue(); } private Priority priority; public String getName() { return name; } private String name; public MyTask(Priority priority,String name){ this.priority = priority; this.name = name; } @Override public void run() { System.out.println("The following Runnable is getting executed "+getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }

Clase MyFutureTask : Ya que estamos usando PriorityBlocingQueue para realizar nuestras tareas, nuestras tareas deben estar integradas dentro de FutureTask y nuestra implementación de FutureTask debe implementar una interfaz comparable. La interfaz Comparable compara la prioridad de 2 tareas diferentes y envía la tarea con la prioridad más alta para la ejecución.

public class MyFutureTask extends FutureTask<MyFutureTask> implements Comparable<MyFutureTask> { private MyTask task = null; public MyFutureTask(MyTask task){ super(task,null); this.task = task; } @Override public int compareTo(MyFutureTask another) { return task.getPriority() - another.task.getPriority(); } }

Clase de prioridad : Clase de prioridad autoexplicativa.

public enum Priority { HIGHEST(0), HIGH(1), MEDIUM(2), LOW(3), LOWEST(4); int value; Priority(int val) { this.value = val; } public int getValue(){ return value; } }

Ahora cuando ejecutamos este ejemplo, obtenemos la siguiente salida

The following Runnable is getting executed High The following Runnable is getting executed Low

Aunque enviamos la prioridad BAJA primero, pero luego la tarea ALTA prioridad, pero como estamos usando una PriorityBlockingQueue, una tarea con una prioridad más alta se ejecutará primero.

He buscado mucho pero no pude encontrar una solución a mi problema.

Tengo mi propia clase, BaseTask , que usa un ThreadPoolExecutor para manejar las tareas.
Si no quiero la priorización (es decir, usar un LinkedBlockingQueue ), esto funciona bien, pero cuando intento usar un PriorityBlockingQueue obtengo la ClassCastException porque el ThreadPoolExecutor ajusta mis tareas en un objeto FutureTask .
Obviamente, esto está bien porque FutureTask no implementa Comparable , pero ¿cómo podría resolver el problema de prioridad?
He leído que podría anular newTaskFor en ThreadPoolExecutor , pero parece que no puedo encontrar este método en absoluto ...

¡Cualquier sugerencia sería muy apreciada!

Algún código para ayudar:

En mi clase de BaseTask tengo

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>(); private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor( 1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory); private final BaseFutureTask<Result> mFuture; public BaseTask(int priority) { mFuture = new BaseFutureTask<Result>(mWorker, priority); } public final BaseTask<Params, Progress, Result> execute(Params... params) { /* Some unimportant code here */ sExecutor.execute(mFuture); }

En la clase BaseFutureTask

@Override public int compareTo(BaseFutureTask another) { long diff = this.priority - another.priority; return Long.signum(diff); }

En la clase BaseThreadPoolExecutor , BaseThreadPoolExecutor los 3 métodos de submit ...
Se llama al constructor de esta clase, pero ninguno de los métodos de submit


Mi solución:

public class XThreadPoolExecutor extends ThreadPoolExecutor { public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new ComparableFutureTask<>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new ComparableFutureTask<>(callable); } protected class ComparableFutureTask<V> extends FutureTask<V> implements Comparable<ComparableFutureTask<V>> { private Object object; public ComparableFutureTask(Callable<V> callable) { super(callable); object = callable; } public ComparableFutureTask(Runnable runnable, V result) { super(runnable, result); object = runnable; } @Override @SuppressWarnings("unchecked") public int compareTo(ComparableFutureTask<V> o) { if (this == o) { return 0; } if (o == null) { return -1; // high priority } if (object != null && o.object != null) { if (object.getClass().equals(o.object.getClass())) { if (object instanceof Comparable) { return ((Comparable) object).compareTo(o.object); } } } return 0; } } }


Para responder a su pregunta: el método newTaskFor() se encuentra en la superclase de ThreadPoolExecutor , AbstractExecutorService . Sin embargo, puede anularlo simplemente en ThreadPoolExecutor .


Parece que dejaron eso de la armonía apache. Hay un registro de confirmación svn hace aproximadamente un año que newTaskFor la ausencia de newTaskFor . Es probable que simplemente pueda anular las funciones de submit en un ThreadPoolExecutor extendido para crear una FutureTask extendida que sea Comparable . No son muy largos .


Puedes usar estas clases de ayuda:

public class PriorityFuture<T> implements RunnableFuture<T> { private RunnableFuture<T> src; private int priority; public PriorityFuture(RunnableFuture<T> other, int priority) { this.src = other; this.priority = priority; } public int getPriority() { return priority; } public boolean cancel(boolean mayInterruptIfRunning) { return src.cancel(mayInterruptIfRunning); } public boolean isCancelled() { return src.isCancelled(); } public boolean isDone() { return src.isDone(); } public T get() throws InterruptedException, ExecutionException { return src.get(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return src.get(); } public void run() { src.run(); } public static Comparator<Runnable> COMP = new Comparator<Runnable>() { public int compare(Runnable o1, Runnable o2) { if (o1 == null && o2 == null) return 0; else if (o1 == null) return -1; else if (o2 == null) return 1; else { int p1 = ((PriorityFuture<?>) o1).getPriority(); int p2 = ((PriorityFuture<?>) o2).getPriority(); return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); } } }; }

Y

public interface PriorityCallable<T> extends Callable<T> { int getPriority(); }

Y este método de ayuda:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { RunnableFuture<T> newTaskFor = super.newTaskFor(callable); return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority()); } }; }

Y luego usarlo de esta manera:

class LenthyJob implements PriorityCallable<Long> { private int priority; public LenthyJob(int priority) { this.priority = priority; } public Long call() throws Exception { System.out.println("Executing: " + priority); long num = 1000000; for (int i = 0; i < 1000000; i++) { num *= Math.random() * 1000; num /= Math.random() * 1000; if (num == 0) num = 1000000; } return num; } public int getPriority() { return priority; } } public class TestPQ { public static void main(String[] args) throws InterruptedException, ExecutionException { ThreadPoolExecutor exec = getPriorityExecutor(2); for (int i = 0; i < 20; i++) { int priority = (int) (Math.random() * 100); System.out.println("Scheduling: " + priority); LenthyJob job = new LenthyJob(priority); exec.submit(job); } } }


public class ExecutorPriority { public static void main(String[] args) { PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority()); Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq); exe.execute(new RunWithPriority(2) { @Override public void run() { System.out.println(this.getPriority() + " started"); try { Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); } System.out.println(this.getPriority() + " finished"); } }); exe.execute(new RunWithPriority(10) { @Override public void run() { System.out.println(this.getPriority() + " started"); try { Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); } System.out.println(this.getPriority() + " finished"); } }); } private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> { @Override public int compare(T o1, T o2) { return o1.getPriority().compareTo(o2.getPriority()); } }

}

Como puede adivinar, RunWithPriority es una clase abstracta que se puede ejecutar y tiene un campo de prioridad de entero