quartz example ejemplo java multithreading parallel-processing

example - ¿Cuál es la forma más fácil de paralelizar una tarea en java?



quartz java ejemplo (8)

Digamos que tengo una tarea como:

for(Object object: objects) { Result result = compute(objects); list.add(result); }

¿Cuál es la forma más fácil de paralelizar cada cálculo () (suponiendo que ya son paralelizables)?

No necesito una respuesta que coincida estrictamente con el código anterior, solo una respuesta general. Pero si necesita más información: mis tareas están vinculadas a IO y esto es para una aplicación web de Spring y las tareas se ejecutarán en una solicitud HTTP.


Aquí hay algo que uso en mis propios proyectos:

public class ParallelTasks { private final Collection<Runnable> tasks = new ArrayList<Runnable>(); public ParallelTasks() { } public void add(final Runnable task) { tasks.add(task); } public void go() throws InterruptedException { final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); try { final CountDownLatch latch = new CountDownLatch(tasks.size()); for (final Runnable task : tasks) threads.execute(new Runnable() { public void run() { try { task.run(); } finally { latch.countDown(); } } }); latch.await(); } finally { threads.shutdown(); } } } // ... public static void main(final String[] args) throws Exception { ParallelTasks tasks = new ParallelTasks(); final Runnable waitOneSecond = new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } } }; tasks.add(waitOneSecond); tasks.add(waitOneSecond); tasks.add(waitOneSecond); tasks.add(waitOneSecond); final long start = System.currentTimeMillis(); tasks.go(); System.err.println(System.currentTimeMillis() - start); }

Lo que imprime un poco más de 2000 en mi caja de doble núcleo.


Con Java8 y versiones posteriores, puede crear una secuencia y luego realizar el procesamiento en paralelo con parallelStream :

List<T> objects = ...; List<Result> result = objects.parallelStream().map(object -> { return compute(object); }).collect(Collectors.toList());

Nota: el orden de los resultados puede no coincidir con el orden de los objetos en la lista.

Los detalles sobre cómo configurar el número correcto de subprocesos están disponibles en esta pregunta sobre el flujo de apilamiento.




Recomendaría echar un vistazo a ExecutorService .

En particular, algo como esto:

ExecutorService EXEC = Executors.newCachedThreadPool(); List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); for (final Object object: objects) { Callable<Result> c = new Callable<Result>() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } List<Future<Result>> results = EXEC.invokeAll(tasks);

Tenga en cuenta que usar newCachedThreadPool puede ser malo si los objects son una gran lista. Un grupo de subprocesos en caché podría crear un subproceso por tarea! Es posible que desee utilizar newFixedThreadPool(n) donde n es algo razonable (como el número de núcleos que tiene, suponiendo que compute() está enlazado a la CPU).

Aquí está el código completo que realmente se ejecuta:

import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceExample { private static final Random PRNG = new Random(); private static class Result { private final int wait; public Result(int code) { this.wait = code; } } public static Result compute(Object obj) throws InterruptedException { int wait = PRNG.nextInt(3000); Thread.sleep(wait); return new Result(wait); } public static void main(String[] args) throws InterruptedException, ExecutionException { List<Object> objects = new ArrayList<Object>(); for (int i = 0; i < 100; i++) { objects.add(new Object()); } List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); for (final Object object : objects) { Callable<Result> c = new Callable<Result>() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } ExecutorService exec = Executors.newCachedThreadPool(); // some other exectuors you could try to see the different behaviours // ExecutorService exec = Executors.newFixedThreadPool(3); // ExecutorService exec = Executors.newSingleThreadExecutor(); try { long start = System.currentTimeMillis(); List<Future<Result>> results = exec.invokeAll(tasks); int sum = 0; for (Future<Result> fr : results) { sum += fr.get().wait; System.out.println(String.format("Task waited %d ms", fr.get().wait)); } long elapsed = System.currentTimeMillis() - start; System.out.println(String.format("Elapsed time: %d ms", elapsed)); System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d))); } finally { exec.shutdown(); } } }


Uno puede simplemente crear algunos hilos y obtener el resultado.

Thread t = new Mythread(object); if (t.done()) { // get result // add result }

EDIT: Creo que otras soluciones son más frescas.


Yo iba a mencionar una clase ejecutora. Aquí hay un código de ejemplo que colocaría en la clase ejecutora.

private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4); private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>(); public void addCallable(Callable<Object> callable) { this.callableList.add(callable); } public void clearCallables(){ this.callableList.clear(); } public void executeThreads(){ try { threadLauncher.invokeAll(this.callableList); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } public Object[] getResult() { List<Future<Object>> resultList = null; Object[] resultArray = null; try { resultList = threadLauncher.invokeAll(this.callableList); resultArray = new Object[resultList.size()]; for (int i = 0; i < resultList.size(); i++) { resultArray[i] = resultList.get(i).get(); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return resultArray; }

Luego, para usarlo, harías llamadas a la clase ejecutora para poblarlo y ejecutarlo.

executor.addCallable( some implementation of callable) // do this once for each task Object[] results = executor.getResult();