form docs spring reactive-programming

spring - docs - Primavera de webflux y lectura desde base de datos.



spring form tags submit button (4)

Basado en este blog , deberías reescribir tu fragmento de la siguiente manera.

@GetMapping(value = "/v1/measurements") public Flux<Measurement> getMeasurements() { return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L)))) .subscribeOn(Schedulers.elastic()); }

Spring 5 introduce el estilo de programación reactiva para las API de descanso con webflux . Yo mismo soy bastante nuevo y me preguntaba si encerrar llamadas sincrónicas a una base de datos en Flux o Mono tiene sentido en cuanto a la preformación. Si es así, ¿es esta la manera de hacerlo?

@RestController public class HomeController { private MeasurementRepository repository; public HomeController(MeasurementRepository repository){ this.repository = repository; } @GetMapping(value = "/v1/measurements") public Flux<Measurement> getMeasurements() { return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))); } }

¿Hay algo así como un CrudRepository asíncrono? No pude encontrarlo


Los datos de Spring admiten la interfaz de repositorio reactivo para Mongo y Cassandra.

Datos de primavera Interfaz reactiva MongoDb

Spring Data MongoDB proporciona soporte de repositorio reactivo con los tipos reactivos Project Reactor y RxJava 1. La API reactiva admite la conversión de tipos reactivos entre tipos reactivos.

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> { Flux<Person> findByLastname(String lastname); @Query("{ ''firstname'': ?0, ''lastname'': ?1}") Mono<Person> findByFirstnameAndLastname(String firstname, String lastname); // Accept parameter inside a reactive type for deferred execution Flux<Person> findByLastname(Mono<String> lastname); Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname); @InfiniteStream // Use a tailable cursor Flux<Person> findWithTailableCursorBy(); } public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> { Observable<Person> findByLastname(String lastname); @Query("{ ''firstname'': ?0, ''lastname'': ?1}") Single<Person> findByFirstnameAndLastname(String firstname, String lastname); // Accept parameter inside a reactive type for deferred execution Observable<Person> findByLastname(Single<String> lastname); Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname); @InfiniteStream // Use a tailable cursor Observable<Person> findWithTailableCursorBy(); }


Obtener un Flux o un Mono no significa necesariamente que se ejecutará en un Thread dedicado. En cambio, la mayoría de los operadores continúan trabajando en el subproceso en el que se ejecutó el operador anterior. A menos que se especifique, el operador superior (la fuente) se ejecuta en el subproceso en el que se realizó la llamada subscribe ().

Si tiene que bloquear las API de persistencia (JPA, JDBC) o las API de red para usar, Spring MVC es la mejor opción para las arquitecturas comunes, al menos. Es técnicamente factible, tanto con Reactor como con RxJava, realizar llamadas de bloqueo en un subproceso separado, pero no estaría aprovechando al máximo una pila web sin bloqueo.

Entonces ... ¿Cómo encierro una llamada de bloqueo síncrono?

Use Callable para diferir la ejecución. Y debe usar Schedulers.elastic porque crea un subproceso dedicado para esperar el recurso de bloqueo sin comprometer otro recurso.

  • Schedulers.immediate (): Tema actual.
  • Schedulers.single (): Un único hilo reutilizable.
  • Schedulers.newSingle (): Un subproceso dedicado por llamada.
  • Schedulers.elastic (): Un conjunto de hilos elásticos. Crea nuevos grupos de trabajadores según sea necesario y reutiliza los inactivos. Esta es una buena opción para el trabajo de bloqueo de E / S, por ejemplo.
  • Schedulers.parallel (): Un grupo fijo de trabajadores que se ajusta para el trabajo paralelo.

ejemplo:

Mono.fromCallable(() -> blockingRepository.save()) .subscribeOn(Schedulers.elastic());


Una opción sería utilizar clientes SQL alternativos que no sean totalmente bloqueados. Algunos ejemplos incluyen: https://github.com/mauricio/postgresql-async o https://github.com/finagle/roc . Por supuesto, ninguno de estos controladores es oficialmente compatible con los proveedores de bases de datos todavía. Además, la funcionalidad es mucho menos atractiva en comparación con las abstracciones basadas en JDBC maduras como Hibernate o jOOQ.

La idea alternativa me vino del mundo Scala. La idea es enviar llamadas de bloqueo en ThreadPool aislado para no mezclar llamadas de bloqueo y no bloqueo. Esto nos permitirá controlar el número total de subprocesos y permitirá que la CPU realice tareas no bloqueantes en el contexto de ejecución principal con algunas optimizaciones potenciales. Suponiendo que tenemos una implementación basada en JDBC, como Spring Data JPA, que de hecho está bloqueando, podemos hacer que su ejecución sea asíncrona y se envíe en el grupo de subprocesos dedicados.

@RestController public class HomeController { private final MeasurementRepository repository; private final Scheduler scheduler; public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) { this.repository = repository; this.scheduler = scheduler; } @GetMapping(value = "/v1/measurements") public Flux<Measurement> getMeasurements() { return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler); } }

Nuestro programador para JDBC debe configurarse utilizando un grupo de subprocesos dedicado con un recuento de tamaño igual al número de conexiones.

@Configuration public class SchedulerConfiguration { private final Integer connectionPoolSize; public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) { this.connectionPoolSize = connectionPoolSize; } @Bean public Scheduler jdbcScheduler() { return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize)); } }

Sin embargo, hay dificultades con este enfoque. El principal es la gestión de transacciones. En JDBC, las transacciones solo son posibles dentro de una única java.sql.Connection. Para realizar varias operaciones en una transacción, tienen que compartir una conexión. Si queremos hacer algunos cálculos entre ellos, tenemos que mantener la conexión. Esto no es muy efectivo, ya que mantenemos un número limitado de conexiones inactivas mientras hacemos cálculos intermedios.

Esta idea de un contenedor JDBC asíncrono no es nueva y ya está implementada en la biblioteca Scala Slick 3. Finalmente, el JDBC sin bloqueo puede aparecer en la hoja de ruta de Java. Como se anunció en JavaOne en septiembre de 2016, es posible que lo veamos en Java 10.