tutorial rxjava2 rxjava reactiva programacion example español ejemplos ejemplo curso java multithreading thread-safety rx-java rx-android

reactiva - rxjava2 android español



Casos de uso para planificadores RxJava (3)

En RxJava hay 5 programadores diferentes para elegir:

  1. inmediata () : crea y devuelve un Programador que ejecuta el trabajo inmediatamente en el hilo actual.

  2. trampoline () : crea y devuelve un Programador que pone en cola el trabajo en el hilo actual para que se ejecute después de que se complete el trabajo actual.

  3. newThread () : crea y devuelve un programador que crea un nuevo subproceso para cada unidad de trabajo.

  4. computation () : crea y devuelve un programador destinado al trabajo computacional. Esto se puede usar para bucles de eventos, procesamiento de devoluciones de llamada y otros trabajos computacionales. No realice trabajos vinculados a IO en este planificador. Usar programadores. io () en su lugar.

  5. io () : crea y devuelve un planificador destinado a trabajos vinculados a IO. La implementación está respaldada por un grupo de subprocesos Ejecutor que crecerá según sea necesario. Esto se puede usar para realizar asincrónicamente el bloqueo de E / S. No realice trabajos computacionales en este planificador. Usar programadores. computation () en su lugar.

Preguntas:

Los primeros 3 programadores se explican por sí mismos; Sin embargo, estoy un poco confundido acerca de la computación y io .

  1. ¿Qué es exactamente el "trabajo vinculado a IO"? ¿Se utiliza para tratar secuencias ( java.io ) y archivos ( java.nio.files )? ¿Se utiliza para consultas de bases de datos? ¿Se utiliza para descargar archivos o acceder a las API REST?
  2. ¿Cómo es la computación () diferente de newThread () ? ¿Es que todas las llamadas de cómputo () están en un solo hilo (de fondo) en lugar de un nuevo hilo (de fondo) cada vez?
  3. ¿Por qué es malo llamar a computation () cuando hago trabajo IO?
  4. ¿Por qué es malo llamar a io () cuando se realiza un trabajo computacional?

El punto más importante es que tanto la programación Schedulers.io como Schedulers.com están respaldadas por grupos de subprocesos ilimitados en lugar de los otros mencionados en la pregunta. Esta característica solo es compartida por Schedulers.from (Ejecutor) en el caso de que el Ejecutor se cree con newCachedThreadPool (sin límites con un grupo de subprocesos de recuperación automática).

Como se ha explicado abundantemente en las respuestas anteriores y en varios artículos en la web, la publicación Schedulers.io y Schedulers.com se utilizará con cuidado, ya que están optimizados para el tipo de trabajo en su nombre. Pero, desde mi punto de vista, su función más importante es proporcionar concurrencia real a las corrientes reactivas .

Contrariamente a la creencia de los recién llegados, las corrientes reactivas no son inherentemente concurrentes sino inherentemente asíncronas y secuenciales. Por esta misma razón, Schedulers.io se usará solo cuando la operación de E / S esté bloqueando (por ejemplo: usando un comando de bloqueo como Apache IOUtils FileUtils.readFileAsString (...) ) así congelaría el hilo de llamada hasta que la operación esté hecho.

El uso de un método asincrónico como Java AsynchronousFileChannel (...) no bloquearía el hilo de llamada durante la operación, por lo que no tiene sentido usar un hilo separado. De hecho, los subprocesos de Schedulers.io no son realmente una buena opción para operaciones asincrónicas, ya que no ejecutan un bucle de eventos y la devolución de llamada nunca ... se llamará.

La misma lógica se aplica para el acceso a la base de datos o las llamadas API remotas. No use Schedulers.io si puede usar una API asíncrona o reactiva para realizar la llamada.

De vuelta a la concurrencia. Es posible que no tenga acceso a una API asíncrona o reactiva para realizar operaciones de E / S de forma asincrónica o simultánea, por lo que su única alternativa es enviar múltiples llamadas en un hilo separado. Por desgracia, las secuencias reactivas son secuenciales en sus extremos, pero la buena noticia es que el operador flatMap () puede introducir la concurrencia en su núcleo .

La simultaneidad debe construirse en la construcción de flujo, generalmente utilizando el operador flatMap () . Este potente operador se puede configurar para proporcionar internamente un contexto de subprocesos múltiples a su función incorporada flatMap () <T, R>. Ese contexto lo proporciona un programador de subprocesos múltiples, como Scheduler.io o Scheduler.computation .

Encuentre más detalles en los artículos sobre RxJava2 Schedulers and Concurrency donde encontrará ejemplos de código y explicaciones detalladas sobre cómo usar Schedulers de forma secuencial y simultánea.

Espero que esto ayude,

Softjake


Grandes preguntas, creo que la documentación podría hacer con más detalles.

  1. io() está respaldado por un grupo de subprocesos ilimitado y es el tipo de cosa que usaría para tareas que no requieren mucha computación, es decir, cosas que no ponen mucha carga en la CPU. Entonces, la interacción con el sistema de archivos, la interacción con bases de datos o servicios en un host diferente son buenos ejemplos.
  2. computation() está respaldado por un grupo de subprocesos limitado con un tamaño igual al número de procesadores disponibles. Si trató de programar el trabajo intensivo de la CPU en paralelo en más de los procesadores disponibles (por ejemplo, usando newThread() ), entonces está preparado para la sobrecarga de creación de subprocesos y la sobrecarga de cambio de contexto a medida que los subprocesos compiten por un procesador y es potencialmente un gran impacto en el rendimiento.
  3. Es mejor dejar el computation() para el trabajo intensivo de la CPU, de lo contrario no obtendrá una buena utilización de la CPU.
  4. Es malo llamar a io() para el trabajo computacional por la razón discutida en 2. io() no tiene límites y si programa mil tareas computacionales en io() en paralelo, entonces cada una de esas mil tareas tendrá su propio hilo y será compitiendo por CPU incurriendo en costos de cambio de contexto.

Esta publicación de blog ofrece una excelente respuesta

De la publicación del blog:

Schedulers.io () está respaldado por un grupo de subprocesos ilimitado. Se utiliza para trabajos de tipo de E / S que no requieren mucha CPU, incluida la interacción con el sistema de archivos, la realización de llamadas de red, interacciones de bases de datos, etc.

Schedulers.computation () está respaldado por un grupo de subprocesos limitado con un tamaño de hasta el número de procesadores disponibles. Se utiliza para el trabajo computacional o intensivo de la CPU, como cambiar el tamaño de las imágenes, procesar grandes conjuntos de datos, etc. Tenga cuidado: cuando asigna más subprocesos de cómputo que los núcleos disponibles, el rendimiento se degradará debido al cambio de contexto y la sobrecarga de la creación de subprocesos a medida que los subprocesos compiten por tiempo de procesadores.

Schedulers.newThread () crea un nuevo hilo para cada unidad de trabajo programada. Este programador es costoso ya que cada vez se genera un nuevo subproceso y no se reutiliza.

Schedulers.from (Ejecutor ejecutor) crea y devuelve un programador personalizado respaldado por el ejecutor especificado. Para limitar el número de subprocesos simultáneos en el grupo de subprocesos, use Scheduler.from (Executors.newFixedThreadPool (n)). Esto garantiza que si una tarea se programa cuando todos los hilos están ocupados, se pondrá en cola. Los subprocesos en el grupo existirán hasta que se cierre explícitamente.

El subproceso principal o AndroidSchedulers.mainThread () es proporcionado por la biblioteca de extensiones RxAndroid para RxJava. El subproceso principal (también conocido como subproceso de interfaz de usuario) es donde ocurre la interacción del usuario. Se debe tener cuidado de no sobrecargar este hilo para evitar una interfaz de usuario que no responda de manera irregular o, peor aún, un cuadro de diálogo de Aplicación que no responde (ANR).

Schedulers.single () es nuevo en RxJava 2. Este planificador está respaldado por un solo hilo que ejecuta tareas secuencialmente en el orden solicitado.

Schedulers.trampoline () ejecuta tareas de manera FIFO (Primero en entrar, Primero en salir) por uno de los hilos de trabajo participantes. A menudo se usa al implementar la recursión para evitar aumentar la pila de llamadas.