multiple findany conditions java parallel-processing java-8 java-stream

findany - Visualización de la paralelización de Java Stream



map function java 8 (2)

La implementación actual de la API de Stream utiliza el combinador de recopiladores para combinar los resultados intermedios exactamente de la misma manera que se dividieron previamente. Además, la estrategia de división depende de la fuente y del nivel de paralelismo de la agrupación común, pero no depende de la operación de reducción exacta utilizada (lo mismo para reduce , collect , para forEach , count , etc.). Confiando en esto no es muy difícil crear el colector de visualización:

public static Collector<Object, ?, List<String>> parallelVisualize() { class Range { private String first, last; private Range left, right; void accept(Object obj) { if (first == null) first = obj.toString(); else last = obj.toString(); } Range combine(Range that) { Range p = new Range(); p.first = first == null ? that.first : first; p.last = Stream .of(that.last, that.first, this.last, this.first) .filter(Objects::nonNull).findFirst().orElse(null); p.left = this; p.right = that; return p; } String pad(String s, int left, int len) { if (len == s.length()) return s; char[] result = new char[len]; Arrays.fill(result, '' ''); s.getChars(0, s.length(), result, left); return new String(result); } public List<String> finish() { String cur = toString(); if (left == null) { return Collections.singletonList(cur); } List<String> l = left.finish(); List<String> r = right.finish(); int len1 = l.get(0).length(); int len2 = r.get(0).length(); int totalLen = len1 + len2 + 1; int leftAdd = 0; if (cur.length() < totalLen) { cur = pad(cur, (totalLen - cur.length()) / 2, totalLen); } else { leftAdd = (cur.length() - totalLen) / 2; totalLen = cur.length(); } List<String> result = new ArrayList<>(); result.add(cur); char[] dashes = new char[totalLen]; Arrays.fill(dashes, '' ''); Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1 + leftAdd, ''_''); int mid = totalLen / 2; dashes[mid] = ''/''; dashes[mid + 1] = ''//'; result.add(new String(dashes)); Arrays.fill(dashes, '' ''); dashes[len1 / 2 + leftAdd] = ''|''; dashes[len1 + len2 / 2 + 1 + leftAdd] = ''|''; result.add(new String(dashes)); int maxSize = Math.max(l.size(), r.size()); for (int i = 0; i < maxSize; i++) { String lstr = l.size() > i ? l.get(i) : String.format("%" + len1 + "s", ""); String rstr = r.size() > i ? r.get(i) : String.format("%" + len2 + "s", ""); result.add(pad(lstr + " " + rstr, leftAdd, totalLen)); } return result; } public String toString() { if (first == null) return "(empty)"; else if (last == null) return "[" + first + "]"; return "[" + first + ".." + last + "]"; } } return Collector.of(Range::new, Range::accept, Range::combine, Range::finish); }

Aquí hay algunos resultados interesantes obtenidos con este colector usando una máquina de 4 núcleos (los resultados diferirán en la máquina con un número diferente de Procesadores availableProcessors() ).

División de rango simple :

IntStream.range(0, 100) .boxed().parallel().collect(parallelVisualize()) .forEach(System.out::println);

Incluso dividido en 16 tareas:

[0..99] ___________________________________//________________________________ | | [0..49] [50..99] _________________//______________ _________________//________________ | | | | [0..24] [25..49] [50..74] [75..99] ________//_____ ________//_______ ________//_______ ________//_______ | | | | | | | | [0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99] ___//_ ___//___ ___//___ ___//___ ___//___ ___//___ ___//___ ___//___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

División de dos corrientes de concatenación :

IntStream .concat(IntStream.range(0, 10), IntStream.range(10, 100)) .boxed().parallel().collect(parallelVisualize()) .forEach(System.out::println);

Como puede ver, la primera división no concatena los flujos:

[0..99] _______________________________________________________________________//_____ | | [0..9] [10..99] __//__ ___________________________________//__________________________________ | | | | [0..4] [5..9] [10..54] [55..99] _________________//________________ _________________//________________ | | | | [10..31] [32..54] [55..76] [77..99] ________//_______ ________//_______ ________//_______ ________//_______ | | | | | | | | [10..20] [21..31] [32..42] [43..54] [55..65] [66..76] [77..87] [88..99] ___//___ ___//___ ___//___ ___//___ ___//___ ___//___ ___//___ ___//___ | | | | | | | | | | | | | | | | [10..14] [15..20] [21..25] [26..31] [32..36] [37..42] [43..48] [49..54] [55..59] [60..65] [66..70] [71..76] [77..81] [82..87] [88..93] [94..99]

División de dos concatenaciones de flujo donde se realizó la operación intermedia (en caja ()) antes de la concatenación :

Stream.concat(IntStream.range(0, 50).boxed().parallel(), IntStream.range(50, 100).boxed()) .collect(parallelVisualize()) .forEach(System.out::println);

Si uno de los flujos de entrada no se convirtió en modo paralelo antes de la concatenación, se niega a dividirse:

[0..99] ___//_________________________________ | | [0..49] [50..99] _________________//______________ | | [0..24] [25..49] ________//_____ ________//_______ | | | | [0..11] [12..24] [25..36] [37..49] ___//_ ___//___ ___//___ ___//___ | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49]

División de flatmapping :

Stream.of(0, 50) .flatMap(start -> IntStream.range(start, start+50).boxed().parallel()) .parallel().collect(parallelVisualize()) .forEach(System.out::println);

Mapa plano nunca paraleliza dentro de arroyos anidados:

[0..99] ____//__ | | [0..49] [50..99]

Transmita desde un iterador de tamaño desconocido de 7000 elementos (consulte esta respuesta para ver el contexto):

StreamSupport .stream(Spliterators.spliteratorUnknownSize( IntStream.range(0, 7000).iterator(), Spliterator.ORDERED), true) .collect(parallelVisualize()).forEach(System.out::println);

La división es realmente mala, todos esperan la mayor parte [3072..6143]:

[0..6999] _______________________//___ | | [0..1023] [1024..6999] ________________//____ | | [1024..3071] [3072..6999] _________//_____ | | [3072..6143] [6144..6999] ___//____ | | [6144..6999] (empty)

Fuente de iterador con tamaño conocido :

StreamSupport .stream(Spliterators.spliterator(IntStream.range(0, 7000) .iterator(), 7000, Spliterator.ORDERED), true) .collect(parallelVisualize()).forEach(System.out::println);

Suministrar el tamaño hace que sea mucho mejor desbloquear la división adicional:

[0..6999] ______________________________________________________________________________________________//________ | | [0..1023] [1024..6999] _____//__ ____________________________________________________________________//________________________ | | | | [0..511] [512..1023] [1024..3071] [3072..6999] ____________//___________ ________________//__________________________________________________ | | | | [1024..2047] [2048..3071] [3072..6143] [6144..6999] _____//_____ _____//_____ _________________________//________________________ ___//___________ | | | | | | | | [1024..1535] [1536..2047] [2048..2559] [2560..3071] [3072..4607] [4608..6143] [6144..6999] (empty) ____________//___________ ____________//___________ _____//_____ | | | | | | [3072..3839] [3840..4607] [4608..5375] [5376..6143] [6144..6571] [6572..6999] _____//_____ _____//_____ _____//_____ _____//_____ | | | | | | | | [3072..3455] [3456..3839] [3840..4223] [4224..4607] [4608..4991] [4992..5375] [5376..5759] [5760..6143]

Mejoras adicionales de dicho colector son posibles para generar imágenes gráficas (como svg), rastrear los hilos donde se procesa cada nodo, mostrar el número de elementos por cada grupo y así sucesivamente. Úsalo si quieres.

A menudo no está muy claro cómo exactamente la secuencia paralela divide la entrada en fragmentos y en qué orden se unen. ¿Hay alguna forma de visualizar todo el procedimiento para cualquier fuente de transmisión para comprender mejor lo que está pasando? Supongamos que he creado una secuencia como esta:

Stream<Integer> stream = IntStream.range(0, 100).boxed().parallel();

Quiero ver alguna estructura en forma de árbol:

[0..99] _____/ /_____ | | [0..49] [50..99] __/ /__ __/ /__ | | | | [0..24] [25..49] [50..74] [75..99]

Lo que significa que todo el rango de entrada [0..99] se divide a los [0..49] y [50..99] que a su vez se dividen más. Por supuesto, dicho diagrama debe reflejar el trabajo real de Stream API, por lo que si realizo alguna operación real con dicha secuencia, la división se debe realizar de la misma manera.


Quiero aumentar la gran respuesta de Tagir con una solución para monitorear la división en el lado de origen o incluso en las operaciones intermedias (con algunas restricciones impuestas por la implementación actual de la API de la corriente):

public static <E> Stream<E> proxy(Stream<E> src) { Class<Stream<E>> sClass=(Class)Stream.class; Class<Spliterator<E>> spClass=(Class)Spliterator.class; return proxy(src, sClass, spClass, StreamSupport::stream); } public static IntStream proxy(IntStream src) { return proxy(src, IntStream.class, Spliterator.OfInt.class, StreamSupport::intStream); } public static LongStream proxy(LongStream src) { return proxy(src, LongStream.class, Spliterator.OfLong.class, StreamSupport::longStream); } public static DoubleStream proxy(DoubleStream src) { return proxy(src, DoubleStream.class, Spliterator.OfDouble.class, StreamSupport::doubleStream); } static final Object EMPTY=new StringBuilder("empty"); static <E,S extends BaseStream<E,S>, Sp extends Spliterator<E>> S proxy( S src, Class<S> sc, Class<Sp> spc, BiFunction<Sp,Boolean,S> f) { final class Node<T> implements InvocationHandler,Runnable, Consumer<Object>, IntConsumer, LongConsumer, DoubleConsumer { final Class<? extends Spliterator> type; Spliterator<T> src; Object first=EMPTY, last=EMPTY; Node<T> left, right; Object currConsumer; public Node(Spliterator<T> src, Class<? extends Spliterator> type) { this.src = src; this.type=type; } private void value(Object t) { if(first==EMPTY) first=t; last=t; } public void accept(Object t) { value(t); ((Consumer)currConsumer).accept(t); } public void accept(int t) { value(t); ((IntConsumer)currConsumer).accept(t); } public void accept(long t) { value(t); ((LongConsumer)currConsumer).accept(t); } public void accept(double t) { value(t); ((DoubleConsumer)currConsumer).accept(t); } public void run() { System.out.println(); finish().forEach(System.out::println); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Node<T> curr=this; while(curr.right!=null) curr=curr.right; if(method.getName().equals("tryAdvance")||method.getName().equals("forEachRemaining")) { curr.currConsumer=args[0]; args[0]=curr; } if(method.getName().equals("trySplit")) { Spliterator s=curr.src.trySplit(); if(s==null) return null; Node<T> pfx=new Node<>(s, type); pfx.left=curr.left; curr.left=pfx; curr.right=new Node<>(curr.src, type); src=null; return pfx.create(); } return method.invoke(curr.src, args); } Object create() { return Proxy.newProxyInstance(null, new Class<?>[]{type}, this); } String pad(String s, int left, int len) { if (len == s.length()) return s; char[] result = new char[len]; Arrays.fill(result, '' ''); s.getChars(0, s.length(), result, left); return new String(result); } public List<String> finish() { String cur = toString(); if (left == null) { return Collections.singletonList(cur); } List<String> l = left.finish(); List<String> r = right.finish(); int len1 = l.get(0).length(); int len2 = r.get(0).length(); int totalLen = len1 + len2 + 1; int leftAdd = 0; if (cur.length() < totalLen) { cur = pad(cur, (totalLen - cur.length()) / 2, totalLen); } else { leftAdd = (cur.length() - totalLen) / 2; totalLen = cur.length(); } List<String> result = new ArrayList<>(); result.add(cur); char[] dashes = new char[totalLen]; Arrays.fill(dashes, '' ''); Arrays.fill(dashes, len1 / 2 + leftAdd + 1, len1 + len2 / 2 + 1 + leftAdd, ''_''); int mid = totalLen / 2; dashes[mid] = ''/''; dashes[mid + 1] = ''//'; result.add(new String(dashes)); Arrays.fill(dashes, '' ''); dashes[len1 / 2 + leftAdd] = ''|''; dashes[len1 + len2 / 2 + 1 + leftAdd] = ''|''; result.add(new String(dashes)); int maxSize = Math.max(l.size(), r.size()); for (int i = 0; i < maxSize; i++) { String lstr = l.size() > i ? l.get(i) : String.format("%" + len1 + "s", ""); String rstr = r.size() > i ? r.get(i) : String.format("%" + len2 + "s", ""); result.add(pad(lstr + " " + rstr, leftAdd, totalLen)); } return result; } private Object first() { if(left==null) return first; Object o=left.first(); if(o==EMPTY) o=right.first(); return o; } private Object last() { if(right==null) return last; Object o=right.last(); if(o==EMPTY) o=left.last(); return o; } public String toString() { Object o=first(), p=last(); return o==EMPTY? "(empty)": "["+o+(o!=p? ".."+p+'']'': "]"); } } Node<E> n=new Node<>(src.spliterator(), spc); Sp sp=(Sp)Proxy.newProxyInstance(null, new Class<?>[]{n.type}, n); return f.apply(sp, true).onClose(n); }

Permite envolver un separador con un proxy que controlará las operaciones de división y los objetos encontrados. La lógica del manejo de trozos es similar a la de Tagir, de hecho, copié su (s) rutina (s) de impresión de resultados.

Puede pasar la fuente de la secuencia o una secuencia con las mismas operaciones que ya se agregaron. (En este último caso, debe aplicar .parallel() lo antes posible a la transmisión). Como explicó Tagir, en la mayoría de los casos, el comportamiento de la división depende de la fuente y el paralelismo configurado, por lo tanto, en la mayoría de los casos, el monitoreo de estados intermedios puede cambiar los valores, pero no los fragmentos procesados:

try(IntStream is=proxy(IntStream.range(0, 100).parallel())) { is.filter(i -> i/20%2==0) .mapToObj(ix->"/""+ix+''"'') .forEach(s->{}); }

imprimirá

[0..99] ___________________________________//________________________________ | | [0..49] [50..99] _________________//______________ _________________//________________ | | | | [0..24] [25..49] [50..74] [75..99] ________//_____ ________//_______ ________//_______ ________//_______ | | | | | | | | [0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99] ___//_ ___//___ ___//___ ___//___ ___//___ ___//___ ___//___ ___//___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

mientras

try(Stream<String> s=proxy(IntStream.range(0, 100).parallel().filter(i -> i/20%2==0) .mapToObj(ix->"/""+ix+''"''))) { s.forEach(str->{}); }

imprimirá

["0".."99"] ___________________________________________//___________________________________________ | | ["0".."49"] ["50".."99"] ____________________//______________________ ______________________//___________________ | | | | ["0".."19"] ["40".."49"] ["50".."59"] ["80".."99"] ____________//_________ ____________//______ _______//___________ ____________//________ | | | | | | | | ["0".."11"] ["12".."19"] (empty) ["40".."49"] ["50".."59"] (empty) ["80".."86"] ["87".."99"] _____//___ _____//_____ ___//__ _____//_____ _____//_____ ___//__ _____//__ _____//_____ | | | | | | | | | | | | | | | | ["0".."5"] ["6".."11"] ["12".."17"] ["18".."19"] (empty) (empty) ["40".."42"] ["43".."49"] ["50".."55"] ["56".."59"] (empty) (empty) ["80"] ["81".."86"] ["87".."92"] ["93".."99"]

Como podemos ver aquí, estamos monitoreando el resultado de .filter(…).mapToObj(…) pero los fragmentos están claramente determinados por la fuente, lo que posiblemente produce fragmentos vacíos en el flujo descendente dependiendo de la condición del filtro.

Tenga en cuenta que podemos combinar la supervisión de origen con la supervisión del recopilador de Tagir:

try(IntStream s=proxy(IntStream.range(0, 100))) { s.parallel().filter(i -> i/20%2==0) .boxed().collect(parallelVisualize()) .forEach(System.out::println); }

Esto se imprimirá (tenga en cuenta que la salida de collect se imprime primero):

[0..99] ________________________________//_______________________________ | | [0..49] [50..99] ________________//______________ _______________//_______________ | | | | [0..19] [40..49] [50..59] [80..99] ________//_____ ________//______ _______//_______ ________//_____ | | | | | | | | [0..11] [12..19] (empty) [40..49] [50..59] (empty) [80..86] [87..99] ___//_ ___//___ ___//__ ___//___ ___//___ ___//__ ___//_ ___//___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..19] (empty) (empty) [40..42] [43..49] [50..55] [56..59] (empty) (empty) [80] [81..86] [87..92] [93..99] [0..99] ___________________________________//________________________________ | | [0..49] [50..99] _________________//______________ _________________//________________ | | | | [0..24] [25..49] [50..74] [75..99] ________//_____ ________//_______ ________//_______ ________//_______ | | | | | | | | [0..11] [12..24] [25..36] [37..49] [50..61] [62..74] [75..86] [87..99] ___//_ ___//___ ___//___ ___//___ ___//___ ___//___ ___//___ ___//___ | | | | | | | | | | | | | | | | [0..5] [6..11] [12..17] [18..24] [25..30] [31..36] [37..42] [43..49] [50..55] [56..61] [62..67] [68..74] [75..80] [81..86] [87..92] [93..99]

Podemos ver claramente cómo los trozos del procesamiento coinciden, pero después del filtrado, algunos trozos tienen menos elementos, algunos de ellos están completamente vacíos.

Este es el lugar para demostrar, donde las dos formas de monitoreo pueden hacer una diferencia significativa:

try(DoubleStream is=proxy(DoubleStream.iterate(0, i->i+1)).parallel().limit(100)) { is.boxed() .collect(parallelVisualize()) .forEach(System.out::println); }

[0.0..99.0] ___________________________________________________//________________________________________________ | | [0.0..49.0] [50.0..99.0] _________________________//______________________ _________________________//________________________ | | | | [0.0..24.0] [25.0..49.0] [50.0..74.0] [75.0..99.0] ____________//_________ ____________//___________ ____________//___________ ____________//___________ | | | | | | | | [0.0..11.0] [12.0..24.0] [25.0..36.0] [37.0..49.0] [50.0..61.0] [62.0..74.0] [75.0..86.0] [87.0..99.0] _____//___ _____//_____ _____//_____ _____//_____ _____//_____ _____//_____ _____//_____ _____//_____ | | | | | | | | | | | | | | | | [0.0..5.0] [6.0..11.0] [12.0..17.0] [18.0..24.0] [25.0..30.0] [31.0..36.0] [37.0..42.0] [43.0..49.0] [50.0..55.0] [56.0..61.0] [62.0..67.0] [68.0..74.0] [75.0..80.0] [81.0..86.0] [87.0..92.0] [93.0..99.0] [0.0..10239.0] _____________________________//_____ | | [0.0..1023.0] [1024.0..10239.0] ____________________//_______ | | [1024.0..3071.0] [3072.0..10239.0] ____________//______ | | [3072.0..6143.0] [6144.0..10239.0] ___//_______ | | [6144.0..10239.0] (empty)

Esto demuestra lo que Tagir ya explicó , los flujos con un tamaño desconocido se dividen pobremente, e incluso el hecho de que el limit(…) ofrece la posibilidad de una buena estimación (de hecho, el límite infinito + es teóricamente predecible), la implementación no toma ninguna ventaja de eso

La fuente se divide en trozos utilizando un tamaño de lote de 1024 , aumentado en 1024 después de cada división, creando trozos fuera del rango impuesto por el limit . También podemos ver cómo un prefijo se divide cada vez.

Pero cuando observamos la salida de la división del terminal, podemos ver que entre estos trozos en exceso se han caído y ha ocurrido otra división del primer trozo. Debido a que este fragmento está integrado por una matriz intermedia que se ha llenado con la implementación predeterminada en la primera división, no lo notamos en la fuente, pero podemos ver en la acción del terminal que esta matriz se ha dividido (como es de esperar) bien equilibrada .

Así que necesitamos ambas formas de monitoreo para obtener la imagen completa aquí ...