threads parallel method example completionstage await async java parallel-processing

method - "Parallel.For" para Java?



java await (10)

Aquí está mi contribución a este tema https://github.com/pablormier/parallel-loops . El uso es muy simple:

Collection<String> upperCaseWords = Parallel.ForEach(words, new Parallel.F<String, String>() { public String apply(String s) { return s.toUpperCase(); } });

También es posible cambiar algunos aspectos de comportamiento, como el número de subprocesos (de forma predeterminada, utiliza un grupo de subprocesos en caché):

Collection<String> upperCaseWords = new Parallel.ForEach<String, String>(words) .withFixedThreads(4) .apply(new Parallel.F<String, String>() { public String apply(String s) { return s.toUpperCase(); } }).values();

Todo el código es autónomo en una clase Java y no tiene más dependencias que el JDK. También te animo a que compruebes la nueva forma de paralelizar de forma funcional con Java 8

Me preguntaba si existe un Parallel.For equivalente a la versión de .NET para Java?

Si hay alguien podría por favor dar un ejemplo? ¡Gracias!


Basado en la sugerencia legal, agregue CountDownLatch. Agrega chunksize para reducir submit ().

Cuando se prueba con una matriz de 4 millones de elementos, esta proporciona una velocidad 5 veces superior a la secuencial para () en mi CPU Core i7 2630QM.

public class Loop { public interface Each { void run(int i); } private static final int CPUs = Runtime.getRuntime().availableProcessors(); public static void withIndex(int start, int stop, final Each body) { int chunksize = (stop - start + CPUs - 1) / CPUs; int loops = (stop - start + chunksize - 1) / chunksize; ExecutorService executor = Executors.newFixedThreadPool(CPUs); final CountDownLatch latch = new CountDownLatch(loops); for (int i=start; i<stop;) { final int lo = i; i += chunksize; final int hi = (i<stop) ? i : stop; executor.submit(new Runnable() { public void run() { for (int i=lo; i<hi; i++) body.run(i); latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) {} executor.shutdown(); } public static void main(String [] argv) { Loop.withIndex(0, 9, new Loop.Each() { public void run(int i) { System.out.println(i*10); } }); } }


Creo que lo más cercano sería:

ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); try { for (final Object o : list) { exec.submit(new Runnable() { @Override public void run() { // do stuff with o. } }); } } finally { exec.shutdown(); }

En función de los comentarios de TheLQ, debe establecer SUM_NUM_THREADS en Runtime.getRuntime().availableProcessors(); AvailableProcessors Runtime.getRuntime().availableProcessors();

Editar: Decidió agregar una implementación básica "Paralela.Para"

public class Parallel { private static final int NUM_CORES = Runtime.getRuntime().availableProcessors(); private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For")); public static <T> void For(final Iterable<T> elements, final Operation<T> operation) { try { // invokeAll blocks for us until all submitted tasks in the call complete forPool.invokeAll(createCallables(elements, operation)); } catch (InterruptedException e) { e.printStackTrace(); } } public static <T> Collection<Callable<Void>> createCallables(final Iterable<T> elements, final Operation<T> operation) { List<Callable<Void>> callables = new LinkedList<Callable<Void>>(); for (final T elem : elements) { callables.add(new Callable<Void>() { @Override public Void call() { operation.perform(elem); return null; } }); } return callables; } public static interface Operation<T> { public void perform(T pParameter); } }

Ejemplo de uso de Parallel.For

// Collection of items to process in parallel Collection<Integer> elems = new LinkedList<Integer>(); for (int i = 0; i < 40; ++i) { elems.add(i); } Parallel.For(elems, // The operation to perform with each item new Parallel.Operation<Integer>() { public void perform(Integer param) { System.out.println(param); }; });

Supongo que esta implementación es más parecida a Parallel.ForEach

Editar . Puse esto en GitHub si alguien está interesado. Paralelo para en GitHub


Hay un equivalente para Paralelo. Está disponible como una extensión de Java. Se llama Ateji PX, tienen una versión gratuita con la que puedes jugar. http://www.ateji.com/px/index.html

Es el equivalente exacto de parallel.for y se ve similar a.

For ||

Más ejemplos y explicaciones en wikipedia: http://en.wikipedia.org/wiki/Ateji_PX

Cerrado en Java IMO


La sincronización a menudo mata la aceleración de los bucles for paralelos. Por lo tanto, los bucles for paralelos a menudo necesitan sus datos privados y un mecanismo de reducción para reducir todos los hilos de datos privados para que comprendan un único resultado.

Así que extendí el Paralelo.Para la versión de Weimin Xiao por un mecanismo de reducción.

public class Parallel { public static interface IntLoopBody { void run(int i); } public static interface LoopBody<T> { void run(T i); } public static interface RedDataCreator<T> { T run(); } public static interface RedLoopBody<T> { void run(int i, T data); } public static interface Reducer<T> { void run(T returnData, T addData); } private static class ReductionData<T> { Future<?> future; T data; } static final int nCPU = Runtime.getRuntime().availableProcessors(); public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) { ExecutorService executor = Executors.newFixedThreadPool(nCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (final T param : parameters) { futures.add(executor.submit(() -> loopBody.run(param) )); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException | ExecutionException e) { System.out.println(e); } } executor.shutdown(); } public static void For(int start, int stop, final IntLoopBody loopBody) { final int chunkSize = (stop - start + nCPU - 1)/nCPU; final int loops = (stop - start + chunkSize - 1)/chunkSize; ExecutorService executor = Executors.newFixedThreadPool(loops); List<Future<?>> futures = new LinkedList<Future<?>>(); for (int i=start; i < stop; ) { final int iStart = i; i += chunkSize; final int iStop = (i < stop) ? i : stop; futures.add(executor.submit(() -> { for (int j = iStart; j < iStop; j++) loopBody.run(j); })); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException | ExecutionException e) { System.out.println(e); } } executor.shutdown(); } public static <T> void For(int start, int stop, T result, final RedDataCreator<T> creator, final RedLoopBody<T> loopBody, final Reducer<T> reducer) { final int chunkSize = (stop - start + nCPU - 1)/nCPU; final int loops = (stop - start + chunkSize - 1)/chunkSize; ExecutorService executor = Executors.newFixedThreadPool(loops); List<ReductionData<T>> redData = new LinkedList<ReductionData<T>>(); for (int i = start; i < stop; ) { final int iStart = i; i += chunkSize; final int iStop = (i < stop) ? i : stop; final ReductionData<T> rd = new ReductionData<T>(); rd.data = creator.run(); rd.future = executor.submit(() -> { for (int j = iStart; j < iStop; j++) { loopBody.run(j, rd.data); } }); redData.add(rd); } for (ReductionData<T> rd : redData) { try { rd.future.get(); if (rd.data != null) { reducer.run(result, rd.data); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); } }

Aquí hay un ejemplo de prueba simple: un contador de caracteres paralelos que utiliza un mapa no sincronizado.

import java.util.*; public class ParallelTest { static class Counter { int cnt; Counter() { cnt = 1; } } public static void main(String[] args) { String text = "More formally, if this map contains a mapping from a key k to a " + "value v such that key compares equal to k according to the map''s ordering, then " + "this method returns v; otherwise it returns null."; Map<Character, Counter> charCounter1 = new TreeMap<Character, Counter>(); Map<Character, Counter> charCounter2 = new TreeMap<Character, Counter>(); // first sequentially for(int i=0; i < text.length(); i++) { char c = text.charAt(i); Counter cnt = charCounter1.get(c); if (cnt == null) { charCounter1.put(c, new Counter()); } else { cnt.cnt++; } } for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) { System.out.println(entry.getKey() + ": " + entry.getValue().cnt); } // now parallel without synchronization Parallel.For(0, text.length(), charCounter2, // Creator () -> new TreeMap<Character, Counter>(), // Loop Body (i, map) -> { char c = text.charAt(i); Counter cnt = map.get(c); if (cnt == null) { map.put(c, new Counter()); } else { cnt.cnt++; } }, // Reducer (result, map) -> { for(Map.Entry<Character, Counter> entry: map.entrySet()) { Counter cntR = result.get(entry.getKey()); if (cntR == null) { result.put(entry.getKey(), entry.getValue()); } else { cntR.cnt += entry.getValue().cnt; } } } ); // compare results assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size(); Iterator<Map.Entry<Character, Counter>> it2 = charCounter2.entrySet().iterator(); for(Map.Entry<Character, Counter> entry: charCounter1.entrySet()) { Map.Entry<Character, Counter> entry2 = it2.next(); assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content"; } System.out.println("Well done!"); } }


La solución de MLaw es un Parallel.ForEach muy práctico. Agregué un poco de modificación para hacer un Parallel.For.

public class Parallel { static final int iCPU = Runtime.getRuntime().availableProcessors(); public static <T> void ForEach(Iterable <T> parameters, final LoopBody<T> loopBody) { ExecutorService executor = Executors.newFixedThreadPool(iCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (final T param : parameters) { Future<?> future = executor.submit(new Runnable() { public void run() { loopBody.run(param); } }); futures.add(future); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); } public static void For(int start, int stop, final LoopBody<Integer> loopBody) { ExecutorService executor = Executors.newFixedThreadPool(iCPU); List<Future<?>> futures = new LinkedList<Future<?>>(); for (int i=start; i<stop; i++) { final Integer k = i; Future<?> future = executor.submit(new Runnable() { public void run() { loopBody.run(k); } }); futures.add(future); } for (Future<?> f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); } } public interface LoopBody <T> { void run(T i); } public class ParallelTest { int k; public ParallelTest() { k = 0; Parallel.For(0, 10, new LoopBody <Integer>() { public void run(Integer i) { k += i; System.out.println(i); } }); System.out.println("Sum = "+ k); } public static void main(String [] argv) { ParallelTest test = new ParallelTest(); } }


Tengo una clase Java Parallel actualizada que puede hacer Parallel.For, Parallel.ForEach, Parallel.Tasks y particiones de bucle paralelo. El código fuente es el siguiente:

Ejemplos de uso de esos bucles paralelos son los siguientes:

public static void main(String [] argv) { //sample data final ArrayList<String> ss = new ArrayList<String>(); String [] s = {"a", "b", "c", "d", "e", "f", "g"}; for (String z : s) ss.add(z); int m = ss.size(); //parallel-for loop System.out.println("Parallel.For loop:"); Parallel.For(0, m, new LoopBody<Integer>() { public void run(Integer i) { System.out.println(i +"/t"+ ss.get(i)); } }); //parallel for-each loop System.out.println("Parallel.ForEach loop:"); Parallel.ForEach(ss, new LoopBody<String>() { public void run(String p) { System.out.println(p); } }); //partitioned parallel loop System.out.println("Partitioned Parallel loop:"); Parallel.ForEach(Parallel.create(0, m), new LoopBody<Partition>() { public void run(Partition p) { for(int i=p.start; i<p.end; i++) System.out.println(i +"/t"+ ss.get(i)); } }); //parallel tasks System.out.println("Parallel Tasks:"); Parallel.Tasks(new Task [] { //task-1 new Task() {public void run() { for(int i=0; i<3; i++) System.out.println(i +"/t"+ ss.get(i)); }}, //task-2 new Task() {public void run() { for (int i=3; i<6; i++) System.out.println(i +"/t"+ ss.get(i)); }} }); }


Una opción más simple sería

// A thread pool which runs for the life of the application. private static final ExecutorService EXEC = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); //later EXEC.invokeAll(tasks); // you can optionally specify a timeout.



Código fuente: https://github.com/crammeur/MyInvoices/blob/master/crammeurLibrairy/src/main/java/ca/qc/bergeron/marcantoine/crammeur/librairy/utils/Parallel.java

[ACTUALIZAR]

Clase paralela:

private static final int NUM_CORES = Runtime.getRuntime().availableProcessors(); private static final int MAX_THREAD = NUM_CORES*2; @NotNull public static <T> void For(final Collection<T> elements, final Operation<T> operation) { ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD); final Iterator<T> iterator = elements.iterator(); final Runnable runnable = new Runnable() { final Callable<Void> callable = new Callable<Void>() { @Override public Void call() throws Exception { T result; synchronized (iterator) { result = iterator.next(); } operation.perform(result); return null; } }; @Override public void run() { while (iterator.hasNext()) { try { synchronized (operation) { if (operation.follow()) { synchronized (callable) { callable.call(); } if (!operation.follow()) { break; } } else { break; } } } catch (NoSuchElementException e) { break; } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } } }; for (int threadIndex=0; threadIndex<MAX_THREAD; threadIndex++) { executor.execute(runnable); } executor.shutdown(); while (!executor.isTerminated()) { try { Thread.sleep(0,1); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } } } public interface Operation<T,R> { R perform(T pParameter); boolean follow(); }

java parallel multithreading