separadores separador plantillas plantilla para online office libros libro imprimir hacer cómo como biblia java java-8 java-stream spliterator

java - plantillas - plantilla separadores word



¿Cómo puedo crear un separador de paginación de propósito general? (2)

La razón principal, su separador no lo acerca a su objetivo, es que trata de dividir las páginas, en lugar del espacio del elemento fuente. Si conoce el número total de elementos y tiene una fuente que le permite obtener una página a través de desplazamiento y límite, la forma más natural de separador es encapsular un rango dentro de estos elementos, por ejemplo, mediante desplazamiento y límite o final. Luego, dividir significa simplemente dividir ese rango , adaptar el desplazamiento de su separador a la posición de división y crear un nuevo separador que represente el prefijo, desde el "desplazamiento anterior" a la posición de división.

Before splitting: this spliterator: offset=x, end=y After splitting: this spliterator: offset=z, end=y returned spliterator: offset=x, end=z x <= z <= y

Mientras que en el mejor de los casos, z está exactamente en el medio entre x y y , para producir divisiones equilibradas, pero en nuestro caso, lo adaptaremos ligeramente para producir conjuntos de trabajo que sean múltiplos del tamaño de la página.

Esta lógica funciona sin la necesidad de obtener páginas, por lo que si difiere la búsqueda de páginas hasta el momento, el marco desea iniciar el recorrido, es decir, después de la división, las operaciones de búsqueda pueden ejecutarse en paralelo. El mayor obstáculo es el hecho de que necesita buscar la primera página para obtener información sobre el número total de elementos. La solución a continuación separa esta primera búsqueda del resto, simplificando la implementación. Por supuesto, tiene que pasar el resultado de esta búsqueda de la primera página, que se consumirá en el primer recorrido (en el caso secuencial) o se devolverá como el primer prefijo de separación, aceptando una división no balanceada en este punto, pero no teniendo para lidiar con eso más tarde.

public class PagingSpliterator<T> implements Spliterator<T> { public interface PageFetcher<T> { List<T> fetchPage(long offset, long limit, LongConsumer totalSizeSink); } public static final long DEFAULT_PAGE_SIZE = 100; public static <T> Stream<T> paged(PageFetcher<T> pageAccessor) { return paged(pageAccessor, DEFAULT_PAGE_SIZE, false); } public static <T> Stream<T> paged(PageFetcher<T> pageAccessor, long pageSize, boolean parallel) { if(pageSize<=0) throw new IllegalArgumentException(); return StreamSupport.stream(() -> { PagingSpliterator<T> pgSp = new PagingSpliterator<>(pageAccessor, 0, 0, pageSize); pgSp.danglingFirstPage =spliterator(pageAccessor.fetchPage(0, pageSize, l -> pgSp.end=l)); return pgSp; }, CHARACTERISTICS, parallel); } private static final int CHARACTERISTICS = IMMUTABLE|ORDERED|SIZED|SUBSIZED; private final PageFetcher<T> supplier; long start, end, pageSize; Spliterator<T> currentPage, danglingFirstPage; PagingSpliterator(PageFetcher<T> supplier, long start, long end, long pageSize) { this.supplier = supplier; this.start = start; this.end = end; this.pageSize = pageSize; } public boolean tryAdvance(Consumer<? super T> action) { for(;;) { if(ensurePage().tryAdvance(action)) return true; if(start>=end) return false; currentPage=null; } } public void forEachRemaining(Consumer<? super T> action) { do { ensurePage().forEachRemaining(action); currentPage=null; } while(start<end); } public Spliterator<T> trySplit() { if(danglingFirstPage!=null) { Spliterator<T> fp=danglingFirstPage; danglingFirstPage=null; start=fp.getExactSizeIfKnown(); return fp; } if(currentPage!=null) return currentPage.trySplit(); if(end-start>pageSize) { long mid=(start+end)>>>1; mid=mid/pageSize*pageSize; if(mid==start) mid+=pageSize; return new PagingSpliterator<>(supplier, start, start=mid, pageSize); } return ensurePage().trySplit(); } /** * Fetch data immediately before traversing or sub-page splitting. */ private Spliterator<T> ensurePage() { if(danglingFirstPage!=null) { Spliterator<T> fp=danglingFirstPage; danglingFirstPage=null; currentPage=fp; start=fp.getExactSizeIfKnown(); return fp; } Spliterator<T> sp = currentPage; if(sp==null) { if(start>=end) return Spliterators.emptySpliterator(); sp = spliterator(supplier.fetchPage( start, Math.min(end-start, pageSize), l->{})); start += sp.getExactSizeIfKnown(); currentPage=sp; } return sp; } /** * Ensure that the sub-spliterator provided by the List is compatible with * ours, i.e. is {@code SIZED | SUBSIZED}. For standard List implementations, * the spliterators are, so the costs of dumping into an intermediate array * in the other case is irrelevant. */ private static <E> Spliterator<E> spliterator(List<E> list) { Spliterator<E> sp = list.spliterator(); if((sp.characteristics()&(SIZED|SUBSIZED))!=(SIZED|SUBSIZED)) sp=Spliterators.spliterator( StreamSupport.stream(sp, false).toArray(), IMMUTABLE | ORDERED); return sp; } public long estimateSize() { if(currentPage!=null) return currentPage.estimateSize(); return end-start; } public int characteristics() { return CHARACTERISTICS; } }

Utiliza una interfaz funcional especializada de PageFetcher que puede implementarse invocando el método de accept de la devolución de llamada con el tamaño total resultante y devolviendo una lista de elementos. El separador de paginación simplemente delegará en el separador de la lista para el recorrido y, en caso de que la concurrencia sea significativamente mayor que el número de páginas resultante, incluso puede beneficiarse de la división de estos separadores de páginas, lo que implica que las listas de acceso aleatorio, como ArrayList , son las preferidas. tipo de lista aquí.

Adaptar su código de ejemplo a

private static <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) { return PagingSpliterator.paged( (offset, limit, totalSizeSink) -> { totalSizeSink.accept(things.size()); if(offset>things.size()) return Collections.emptyList(); int beginIndex = (int)offset; assert beginIndex==offset; int endIndex = Math.min(beginIndex+(int)limit, things.size()); System.out.printf("Page %6d-%6d:/t%s%n", beginIndex, endIndex, Thread.currentThread()); // artificial slowdown LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); return things.subList(beginIndex, endIndex); }, pageSize, true); }

puedes probarlo como

List<Integer> samples=IntStream.range(0, 555_000).boxed().collect(Collectors.toList()); List<Integer> result =asSlowPagedSource(10_000, samples) .collect(Collectors.toList()); if(!samples.equals(result)) throw new AssertionError();

Dados suficientes núcleos de CPU libres, demostrará cómo se recuperan las páginas al mismo tiempo, por lo tanto no ordenadas, mientras que el resultado estará correctamente en el orden de encuentro. También puede probar la concurrencia de subpáginas que se aplica cuando hay menos páginas:

Set<Thread> threads=ConcurrentHashMap.newKeySet(); List<Integer> samples=IntStream.range(0, 1_000_000).boxed().collect(Collectors.toList()); List<Integer> result=asSlowPagedSource(500_000, samples) .peek(x -> threads.add(Thread.currentThread())) .collect(Collectors.toList()); if(!samples.equals(result)) throw new AssertionError(); System.out.println("Concurrency: "+threads.size());

Me gustaría poder procesar una lectura de flujo de Java desde una fuente a la que se debe acceder en las páginas. Como primer acercamiento, implementé un iterador de paginación que simplemente solicitaba páginas cuando la página actual se StreamSupport.stream(iterator, false) sin elementos y luego usaba StreamSupport.stream(iterator, false) para obtener un identificador de flujo sobre el iterador.

Como descubrí que mis páginas son bastante caras de obtener, me gustaría acceder a las páginas por medio de un flujo paralelo. En este punto, descubrí que el paralelismo provisto por mi enfoque ingenuo no existe debido a la implementación del separador que Java proporciona directamente desde un iterador. Como en realidad sé bastante sobre los elementos que me gustaría recorrer (sé el recuento total de resultados después de solicitar la primera página, y la fuente admite un desplazamiento y un límite), creo que debería ser posible implementar mi propio separador que logre concurrencia real (tanto en el trabajo realizado en los elementos de una página Y la consulta de una página).

He podido lograr la concurrencia de "trabajo realizado en elementos" con bastante facilidad, pero en mi implementación inicial, la consulta de una página solo la realiza el divisor de mayor nivel y, por lo tanto, no se beneficia de la división de trabajo ofrecido por la implementación fork-join.

¿Cómo puedo escribir un separador que logre ambos objetivos?

Para referencia, proporcionaré lo que he hecho hasta ahora (sé que no divide las consultas de manera adecuada).

public final class PagingSourceSpliterator<T> implements Spliterator<T> { public static final long DEFAULT_PAGE_SIZE = 100; private Page<T> result; private Iterator<T> results; private boolean needsReset = false; private final PageProducer<T> generator; private long offset = 0L; private long limit = DEFAULT_PAGE_SIZE; public PagingSourceSpliterator(PageProducer<T> generator) { this.generator = generator; } public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) { this.generator = generator; this.limit = pageSize; } @Override public boolean tryAdvance(Consumer<? super T> action) { if (hasAnotherElement()) { if (!results.hasNext()) { loadPageAndPrepareNextPaging(); } if (results.hasNext()) { action.accept(results.next()); return true; } } return false; } @Override public Spliterator<T> trySplit() { // if we know there''s another page, go ahead and hand off whatever // remains of this spliterator as a new spliterator for other // threads to work on, and then mark that next time something is // requested from this spliterator it needs to be reset to the head // of the next page if (hasAnotherPage()) { Spliterator<T> other = result.getPage().spliterator(); needsReset = true; return other; } else { return null; } } @Override public long estimateSize() { if(limit == 0) { return 0; } ensureStateIsUpToDateEnoughToAnswerInquiries(); return result.getTotalResults(); } @Override public int characteristics() { return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED; } private boolean hasAnotherElement() { ensureStateIsUpToDateEnoughToAnswerInquiries(); return isBound() && (results.hasNext() || hasAnotherPage()); } private boolean hasAnotherPage() { ensureStateIsUpToDateEnoughToAnswerInquiries(); return isBound() && (result.getTotalResults() > offset); } private boolean isBound() { return Objects.nonNull(results) && Objects.nonNull(result); } private void ensureStateIsUpToDateEnoughToAnswerInquiries() { ensureBound(); ensureResetIfNecessary(); } private void ensureBound() { if (!isBound()) { loadPageAndPrepareNextPaging(); } } private void ensureResetIfNecessary() { if(needsReset) { loadPageAndPrepareNextPaging(); needsReset = false; } } private void loadPageAndPrepareNextPaging() { // keep track of the overall result so that we can reference the original list and total size this.result = generator.apply(offset, limit); // make sure that the iterator we use to traverse a single page removes // results from the underlying list as we go so that we can simply pass // off the list spliterator for the trySplit rather than constructing a // new kind of spliterator for what remains. this.results = new DelegatingIterator<T>(result.getPage().listIterator()) { @Override public T next() { T next = super.next(); this.remove(); return next; } }; // update the paging for the next request and inquiries prior to the next request // we use the page of the actual result set instead of the limit in case the limit // was not respected exactly. this.offset += result.getPage().size(); } public static class DelegatingIterator<T> implements Iterator<T> { private final Iterator<T> iterator; public DelegatingIterator(Iterator<T> iterator) { this.iterator = iterator; } @Override public boolean hasNext() { return iterator.hasNext(); } @Override public T next() { return iterator.next(); } @Override public void remove() { iterator.remove(); } @Override public void forEachRemaining(Consumer<? super T> action) { iterator.forEachRemaining(action); } } }

Y la fuente de mis páginas:

public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> { }

Y una página:

public final class Page<T> { private long totalResults; private final List<T> page = new ArrayList<>(); public long getTotalResults() { return totalResults; } public List<T> getPage() { return page; } public Page setTotalResults(long totalResults) { this.totalResults = totalResults; return this; } public Page setPage(List<T> results) { this.page.clear(); this.page.addAll(results); return this; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof Page)) { return false; } Page<?> page1 = (Page<?>) o; return totalResults == page1.totalResults && Objects.equals(page, page1.page); } @Override public int hashCode() { return Objects.hash(totalResults, page); } }

Y una muestra de cómo obtener un flujo con la paginación "lenta" para las pruebas.

private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) { PageProducer<T> producer = (offset, limit) -> { try { Thread.sleep(5000L); } catch (InterruptedException e) { throw new RuntimeException(e); } int beginIndex = offset.intValue(); int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size()); return new Page<T>().setTotalResults(things.size()) .setPage(things.subList(beginIndex, endIndex)); }; return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true); }


https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html

Desde mi entendimiento, la velocidad de la separación proviene de la inmutabilidad. Cuanto más inmutable sea la fuente, más rápido será el procesamiento, ya que la inmutabilidad permite un procesamiento paralelo o una división.

La idea que parece es abordar los cambios, si los hay, a la fuente lo mejor posible antes de vincularlos como un todo (mejor) o en partes (generalmente el caso y, por lo tanto, el desafío tuyo y muchos otros) a los separadores.

En su caso, esto podría significar que primero se respetaron los tamaños de página en lugar de:

//.. in case the limit was not respected exactly. this.offset += result.getPage().size();

También puede significar que la alimentación de flujo debe estar preparada y no ser utilizada como fuente directa.

Hay un ejemplo al final del documento de "cómo un marco de cálculo paralelo, como el paquete java.util.stream, usaría Spliterator en un cálculo paralelo"

Tenga en cuenta que es la forma en que la secuencia usaría el separador y no como la fuente utiliza el divisor.

Hay un método interesante de "cálculo" al final del ejemplo.

PD: si alguna vez obtiene una clase de PageSpliterator eficiente genérica, asegúrese de informarnos a algunos de nosotros.

aclamaciones.