java coroutine

Implementando coroutines en java.



(6)

Esta pregunta está relacionada con mi pregunta sobre implementaciones de coroutine existentes en Java . Si, como sospecho, resulta que no hay una implementación completa de coroutines actualmente disponibles en Java, ¿qué se requeriría para implementarlas?

Como dije en esa pregunta, sé lo siguiente:

  1. Puede implementar "coroutines" como subprocesos / grupos de subprocesos detrás de las escenas.
  2. Puede hacer trucos y cosas con el código de bytes JVM detrás de escena para hacer posible las rutinas.
  3. La implementación de JVM llamada "Máquina Da Vinci" tiene primitivas que hacen que las rutinas sean factibles sin la manipulación del código de bytes.
  4. También hay varios enfoques basados ​​en JNI para coroutines posibles.

Abordaré las deficiencias de cada uno a su vez.

Coroutines a base de hilo.

Esta "solución" es patológica. El objetivo principal de coroutines es evitar la sobrecarga de subprocesos, bloqueos, programación del núcleo, etc. Se supone que los coroutines son ligeros y rápidos y se ejecutan solo en el espacio del usuario. Implementándolos en términos de subprocesos de inclinación completa con restricciones estrictas, se libran de todas las ventajas.

Manipulación de bytecode JVM

Esta solución es más práctica, aunque un poco difícil de lograr. Esto es más o menos lo mismo que saltar a un lenguaje ensamblador para bibliotecas coroutine en C (que es la forma en que funcionan) con la ventaja de que solo tiene una arquitectura de la cual preocuparse y hacer las cosas bien.

También lo relaciona con solo ejecutar su código en pilas JVM totalmente compatibles (lo que significa, por ejemplo, no en Android) a menos que pueda encontrar una manera de hacer lo mismo en la pila no compatible. Sin embargo, si encuentra una manera de hacerlo, ahora ha duplicado la complejidad de su sistema y las necesidades de prueba.

La maquina da vinci

La máquina Da Vinci es genial para la experimentación, pero como no es una JVM estándar, sus características no estarán disponibles en todas partes. De hecho, sospecho que la mayoría de los entornos de producción prohibirían específicamente el uso de la máquina Da Vinci. Por lo tanto, podría usar esto para hacer experimentos geniales, pero no para ningún código que espero lanzar al mundo real.

Esto también tiene un problema adicional similar al de la solución de manipulación de código de bytes JVM anterior: no funcionará en pilas alternativas (como la de Android).

Implementación JNI

Esta solución hace que el punto de hacer esto en Java sea discutible. Cada combinación de CPU y sistema operativo requiere pruebas independientes y cada una es un punto de falla sutil potencialmente frustrante. Alternativamente, por supuesto, podría atarme a una plataforma por completo, pero esto también hace que el punto de hacer las cosas en Java sea completamente discutible.

Asi que...

¿Hay alguna forma de implementar coroutines en Java sin usar una de estas cuatro técnicas? ¿O me veré obligado a usar uno de esos cuatro que huele menos (manipulación de JVM) en su lugar?

Editado para añadir:

Solo para asegurarnos de que la confusión está contenida, esta es una pregunta relacionada con la otra , pero no es la misma. Ese está buscando una implementación existente en un intento por evitar reinventar la rueda innecesariamente. Esta es una pregunta relacionada con la forma en que se implementaría una aplicación de corrutinas en Java si la otra no fuera respondible. La intención es mantener diferentes preguntas en diferentes hilos.


Acabo de encontrar esta pregunta y solo quiero mencionar que creo que podría ser posible implementar coroutines o generadores de manera similar a C #. Dicho esto, en realidad no uso Java, pero el CIL tiene limitaciones bastante similares a las de la JVM.

La declaración de rendimiento en C # es una función de lenguaje puro y no forma parte del código de bytes CIL. El compilador de C # simplemente crea una clase privada oculta para cada función del generador. Si usa la declaración de rendimiento en una función, debe devolver un IEnumerator o un IEnumerable. El compilador "empaqueta" su código en una clase similar a una máquina estadística.

El compilador de C # puede usar algunos "goto" en el código generado para facilitar la conversión a una máquina de estadísticas. No conozco las capacidades del código de bytes de Java y si hay algo así como un salto incondicional, pero a nivel de ensamblaje generalmente es posible.

Como ya se mencionó, esta característica debe ser implementada en el compilador. Debido a que tengo poco conocimiento sobre Java y su compilador, no puedo decir si es posible alterar / extender el compilador, tal vez con un "preprocesador" o algo así.

Personalmente me encantan las coroutines. Como desarrollador de juegos de Unity los uso bastante a menudo. Debido a que juego muchos Minecraft con ComputerCraft, tenía curiosidad por saber por qué los coroutines en Lua (LuaJ) se implementan con subprocesos.


Hay otra opción aquí para Java6 +.

Una implementación corthínica pitónica:

import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; class CorRunRAII { private final List<WeakReference<? extends CorRun>> resources = new ArrayList<>(); public CorRunRAII add(CorRun resource) { if (resource == null) { return this; } resources.add(new WeakReference<>(resource)); return this; } public CorRunRAII addAll(List<? extends CorRun> arrayList) { if (arrayList == null) { return this; } for (CorRun corRun : arrayList) { add(corRun); } return this; } @Override protected void finalize() throws Throwable { super.finalize(); for (WeakReference<? extends CorRun> corRunWeakReference : resources) { CorRun corRun = corRunWeakReference.get(); if (corRun != null) { corRun.stop(); } } } } class CorRunYieldReturn<ReceiveType, YieldReturnType> { public final AtomicReference<ReceiveType> receiveValue; public final LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue; CorRunYieldReturn(AtomicReference<ReceiveType> receiveValue, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { this.receiveValue = receiveValue; this.yieldReturnValue = yieldReturnValue; } } interface CorRun<ReceiveType, YieldReturnType> extends Runnable, Callable<YieldReturnType> { boolean start(); void stop(); void stop(final Throwable throwable); boolean isStarted(); boolean isEnded(); Throwable getError(); ReceiveType getReceiveValue(); void setResultForOuter(YieldReturnType resultForOuter); YieldReturnType getResultForOuter(); YieldReturnType receive(ReceiveType value); ReceiveType yield(); ReceiveType yield(YieldReturnType value); <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another); <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value); } abstract class CorRunSync<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { private ReceiveType receiveValue; public final List<WeakReference<CorRun>> potentialChildrenCoroutineList = new ArrayList<>(); // Outside private AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isEnded = new AtomicBoolean(false); private Throwable error; private YieldReturnType resultForOuter; @Override public boolean start() { boolean isStarted = this.isStarted.getAndSet(true); if ((! isStarted) && (! isEnded())) { receive(null); } return isStarted; } @Override public void stop() { stop(null); } @Override public void stop(Throwable throwable) { isEnded.set(true); if (throwable != null) { error = throwable; } for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { CorRun child = weakReference.get(); if (child != null) { child.stop(); } } } @Override public boolean isStarted() { return isStarted.get(); } @Override public boolean isEnded() { return isEnded.get(); } @Override public Throwable getError() { return error; } @Override public ReceiveType getReceiveValue() { return receiveValue; } @Override public void setResultForOuter(YieldReturnType resultForOuter) { this.resultForOuter = resultForOuter; } @Override public YieldReturnType getResultForOuter() { return resultForOuter; } @Override public synchronized YieldReturnType receive(ReceiveType value) { receiveValue = value; run(); return getResultForOuter(); } @Override public ReceiveType yield() { return yield(null); } @Override public ReceiveType yield(YieldReturnType value) { resultForOuter = value; return receiveValue; } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another) { return yieldFrom(another, null); } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another, TargetReceiveType value) { if (another == null || another.isEnded()) { throw new RuntimeException("Call null or isEnded coroutine"); } potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); synchronized (another) { boolean isStarted = another.start(); boolean isJustStarting = ! isStarted; if (isJustStarting && another instanceof CorRunSync) { return another.getResultForOuter(); } return another.receive(value); } } @Override public void run() { try { this.call(); } catch (Exception e) { e.printStackTrace(); stop(e); return; } } } abstract class CorRunThread<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { private final ExecutorService childExecutorService = newExecutorService(); private ExecutorService executingOnExecutorService; private static final CorRunYieldReturn DUMMY_COR_RUN_YIELD_RETURN = new CorRunYieldReturn(new AtomicReference<>(null), new LinkedBlockingDeque<AtomicReference>()); private final CorRun<ReceiveType, YieldReturnType> self; public final List<WeakReference<CorRun>> potentialChildrenCoroutineList; private CorRunYieldReturn<ReceiveType, YieldReturnType> lastCorRunYieldReturn; private final LinkedBlockingDeque<CorRunYieldReturn<ReceiveType, YieldReturnType>> receiveQueue; // Outside private AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isEnded = new AtomicBoolean(false); private Future<YieldReturnType> future; private Throwable error; private final AtomicReference<YieldReturnType> resultForOuter = new AtomicReference<>(); CorRunThread() { executingOnExecutorService = childExecutorService; receiveQueue = new LinkedBlockingDeque<>(); potentialChildrenCoroutineList = new ArrayList<>(); self = this; } @Override public void run() { try { self.call(); } catch (Exception e) { stop(e); return; } stop(); } @Override public abstract YieldReturnType call(); @Override public boolean start() { return start(childExecutorService); } protected boolean start(ExecutorService executorService) { boolean isStarted = this.isStarted.getAndSet(true); if (!isStarted) { executingOnExecutorService = executorService; future = (Future<YieldReturnType>) executingOnExecutorService.submit((Runnable) self); } return isStarted; } @Override public void stop() { stop(null); } @Override public void stop(final Throwable throwable) { if (throwable != null) { error = throwable; } isEnded.set(true); returnYieldValue(null); // Do this for making sure the coroutine has checked isEnd() after getting a dummy value receiveQueue.offer(DUMMY_COR_RUN_YIELD_RETURN); for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { CorRun child = weakReference.get(); if (child != null) { if (child instanceof CorRunThread) { ((CorRunThread)child).tryStop(childExecutorService); } } } childExecutorService.shutdownNow(); } protected void tryStop(ExecutorService executorService) { if (this.executingOnExecutorService == executorService) { stop(); } } @Override public boolean isEnded() { return isEnded.get() || ( future != null && (future.isCancelled() || future.isDone()) ); } @Override public boolean isStarted() { return isStarted.get(); } public Future<YieldReturnType> getFuture() { return future; } @Override public Throwable getError() { return error; } @Override public void setResultForOuter(YieldReturnType resultForOuter) { this.resultForOuter.set(resultForOuter); } @Override public YieldReturnType getResultForOuter() { return this.resultForOuter.get(); } @Override public YieldReturnType receive(ReceiveType value) { LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue = new LinkedBlockingDeque<>(); offerReceiveValue(value, yieldReturnValue); try { AtomicReference<YieldReturnType> takeValue = yieldReturnValue.take(); return takeValue == null ? null : takeValue.get(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } @Override public ReceiveType yield() { return yield(null); } @Override public ReceiveType yield(final YieldReturnType value) { returnYieldValue(value); return getReceiveValue(); } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another) { return yieldFrom(another, null); } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value) { if (another == null || another.isEnded()) { throw new RuntimeException("Call null or isEnded coroutine"); } boolean isStarted = false; potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); synchronized (another) { if (another instanceof CorRunThread) { isStarted = ((CorRunThread)another).start(childExecutorService); } else { isStarted = another.start(); } boolean isJustStarting = ! isStarted; if (isJustStarting && another instanceof CorRunSync) { return another.getResultForOuter(); } TargetYieldReturnType send = another.receive(value); return send; } } @Override public ReceiveType getReceiveValue() { setLastCorRunYieldReturn(takeLastCorRunYieldReturn()); return lastCorRunYieldReturn.receiveValue.get(); } protected void returnYieldValue(final YieldReturnType value) { CorRunYieldReturn<ReceiveType, YieldReturnType> corRunYieldReturn = lastCorRunYieldReturn; if (corRunYieldReturn != null) { corRunYieldReturn.yieldReturnValue.offer(new AtomicReference<>(value)); } } protected void offerReceiveValue(final ReceiveType value, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { receiveQueue.offer(new CorRunYieldReturn(new AtomicReference<>(value), yieldReturnValue)); } protected CorRunYieldReturn<ReceiveType, YieldReturnType> takeLastCorRunYieldReturn() { try { return receiveQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } protected void setLastCorRunYieldReturn(CorRunYieldReturn<ReceiveType,YieldReturnType> lastCorRunYieldReturn) { this.lastCorRunYieldReturn = lastCorRunYieldReturn; } protected ExecutorService newExecutorService() { return Executors.newCachedThreadPool(getThreadFactory()); } protected ThreadFactory getThreadFactory() { return new ThreadFactory() { @Override public Thread newThread(final Runnable runnable) { Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread thread, Throwable throwable) { throwable.printStackTrace(); if (runnable instanceof CorRun) { CorRun self = (CorRun) runnable; self.stop(throwable); thread.interrupt(); } } }); return thread; } }; } }

Ahora puede usar coroutinas pitónicas de esta manera (por ejemplo, números de fibonacci)

Versión del hilo:

class Fib extends CorRunThread<Integer, Integer> { @Override public Integer call() { Integer times = getReceiveValue(); do { int a = 1, b = 1; for (int i = 0; times != null && i < times; i++) { int temp = a + b; a = b; b = temp; } // A pythonic "yield", i.e., it returns `a` to the caller and waits `times` value from the next caller times = yield(a); } while (! isEnded()); setResultForOuter(Integer.MAX_VALUE); return getResultForOuter(); } } class MainRun extends CorRunThread<String, String> { @Override public String call() { // The fib coroutine would be recycled by its parent // (no requirement to call its start() and stop() manually) // Otherwise, if you want to share its instance and start/stop it manually, // please start it before being called by yieldFrom() and stop it in the end. Fib fib = new Fib(); String result = ""; Integer current; int times = 10; for (int i = 0; i < times; i++) { // A pythonic "yield from", i.e., it calls fib with `i` parameter and waits for returned value as `current` current = yieldFrom(fib, i); if (fib.getError() != null) { throw new RuntimeException(fib.getError()); } if (current == null) { continue; } if (i > 0) { result += ","; } result += current; } setResultForOuter(result); return result; } }

Versión de sincronización (sin hilo):

class Fib extends CorRunSync<Integer, Integer> { @Override public Integer call() { Integer times = getReceiveValue(); int a = 1, b = 1; for (int i = 0; times != null && i < times; i++) { int temp = a + b; a = b; b = temp; } yield(a); return getResultForOuter(); } } class MainRun extends CorRunSync<String, String> { @Override public String call() { CorRun<Integer, Integer> fib = null; try { fib = new Fib(); } catch (Exception e) { e.printStackTrace(); } String result = ""; Integer current; int times = 10; for (int i = 0; i < times; i++) { current = yieldFrom(fib, i); if (fib.getError() != null) { throw new RuntimeException(fib.getError()); } if (current == null) { continue; } if (i > 0) { result += ","; } result += current; } stop(); setResultForOuter(result); if (Utils.isEmpty(result)) { throw new RuntimeException("Error"); } return result; } }

Ejecución (ambas versiones funcionarán):

// Run the entry coroutine MainRun mainRun = new MainRun(); mainRun.start(); // Wait for mainRun ending for 5 seconds long startTimestamp = System.currentTimeMillis(); while(!mainRun.isEnded()) { if (System.currentTimeMillis() - startTimestamp > TimeUnit.SECONDS.toMillis(5)) { throw new RuntimeException("Wait too much time"); } } // The result should be "1,1,2,3,5,8,13,21,34,55" System.out.println(mainRun.getResultForOuter());


Kotlin utiliza el siguiente enfoque para co-rutinas
(de https://kotlinlang.org/docs/reference/coroutines.html ):

Coroutines se implementa completamente a través de una técnica de compilación (no se requiere soporte del lado de VM o OS), y la suspensión funciona a través de la transformación del código. Básicamente, cada función de suspensión (las optimizaciones pueden aplicarse, pero no entraremos en esto aquí) se transforma en una máquina de estados donde los estados corresponden a la suspensión de llamadas. Justo antes de una suspensión, el siguiente estado se almacena en un campo de una clase generada por el compilador junto con las variables locales relevantes, etc. Al reanudar esa rutina, las variables locales se restauran y la máquina de estados procede del estado justo después de la suspensión.

Una coroutina suspendida se puede almacenar y transmitir como un objeto que mantiene su estado y locales suspendidos. El tipo de tales objetos es Continuación, y la transformación de código global descrita aquí corresponde al estilo clásico de paso de Continuación. En consecuencia, las funciones de suspensión toman un parámetro adicional de tipo Continuación bajo el capó.

Consulte el documento de diseño en https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md


Me gustaría echarle un vistazo a esto: http://www.chiark.greenend.org.uk/~sgtatham/coroutines.html , es bastante interesante y debería proporcionar un buen lugar para comenzar. Pero, por supuesto, estamos usando Java para que podamos hacerlo mejor (o quizás peor porque no hay macros :))

Desde mi entendimiento con coroutines, usualmente tiene un productor y un consumidor coroutine (o al menos este es el patrón más común) Pero semánticamente no quiere que el productor llame al consumidor o viceversa porque esto introduce una asimetría. Pero dada la forma en que funcionan los lenguajes basados ​​en la pila, necesitaremos que alguien haga la llamada.

Así que aquí hay una jerarquía de tipos muy simple:

public interface CoroutineProducer<T> { public T Produce(); public boolean isDone(); } public interface CoroutineConsumer<T> { public void Consume(T t); } public class CoroutineManager { public static Execute<T>(CoroutineProducer<T> prod, CoroutineConsumer<T> con) { while(!prod.IsDone()) // really simple { T d = prod.Produce(); con.Consume(d); } } }

Ahora, por supuesto, la parte difícil es implementar las interfaces, en particular, es difícil dividir un cálculo en pasos individuales. Para esto probablemente querrá un conjunto completo de estructuras de control persistentes . La idea básica es que queremos simular una transferencia de control no local (al final, es como si estuviéramos simulando un goto ). Básicamente, queremos dejar de usar la pila y el pc (programa-contador) manteniendo el estado de nuestras operaciones actuales en el montón en lugar de en la pila. Por lo tanto, vamos a necesitar un montón de clases de ayuda.

Por ejemplo:

Digamos que en un mundo ideal querías escribir un consumidor que se pareciera a esto (psuedocode):

boolean is_done; int other_state; while(!is_done) { //read input //parse input //yield input to coroutine //update is_done and other_state; }

necesitamos abstraer la variable local como is_done y other_state y necesitamos abstraer el propio bucle while porque nuestra operación de yield no va a usar la pila. Así que vamos a crear una abstracción de bucle while y clases asociadas:

enum WhileState {BREAK, CONTINUE, YIELD} abstract class WhileLoop<T> { private boolean is_done; public boolean isDone() { return is_done;} private T rval; public T getReturnValue() {return rval;} protected void setReturnValue(T val) { rval = val; } public T loop() { while(true) { WhileState state = execute(); if(state == WhileState.YIELD) return getReturnValue(); else if(state == WhileState.BREAK) { is_done = true; return null; } } } protected abstract WhileState execute(); }

El truco básico aquí es mover las variables locales para que sean variables de clase y convertir los bloques de alcance en clases, lo que nos da la posibilidad de ''reingresar'' nuestro ''bucle'' después de obtener nuestro valor de retorno.

Ahora a implementar nuestro productor.

public class SampleProducer : CoroutineProducer<Object> { private WhileLoop<Object> loop;//our control structures become state!! public SampleProducer() { loop = new WhileLoop() { private int other_state;//our local variables become state of the control structure protected WhileState execute() { //this implements a single iteration of the loop if(is_done) return WhileState.BREAK; //read input //parse input Object calcluated_value = ...; //update is_done, figure out if we want to continue setReturnValue(calculated_value); return WhileState.YIELD; } }; } public Object Produce() { Object val = loop.loop(); return val; } public boolean isDone() { //we are done when the loop has exited return loop.isDone(); } }

Se podrían hacer trucos similares para otras estructuras básicas de control de flujo. Lo ideal sería crear una biblioteca de estas clases auxiliares y luego usarlas para implementar estas interfaces simples que en última instancia le darían la semántica de las co-rutinas. Estoy seguro de que todo lo que he escrito aquí se puede generalizar y ampliar enormemente.


Tengo una clase de Coroutine que uso en Java. Se basa en subprocesos y el uso de subprocesos tiene la ventaja de permitir el funcionamiento en paralelo, lo que en máquinas de varios núcleos puede ser una ventaja. Por lo tanto, es posible que desee considerar un enfoque basado en hilos.


Yo sugeriría mirar Kotlin coroutines en JVM . Sin embargo, cae en una categoría diferente. No hay manipulación de byte-código involucrada y funciona en Android, también. Sin embargo, tendrás que escribir tus coroutines en Kotlin. La ventaja es que Kotlin está diseñado para la interoperabilidad con Java en mente, por lo que aún puede seguir usando todas sus bibliotecas de Java y combinar libremente Kotlin y el código de Java en el mismo proyecto, incluso colocándolos de lado a lado en los mismos directorios y paquetes

Esta Guía de kotlinx.coroutines proporciona muchos más ejemplos, mientras que el documento de diseño de las guías de navegación explica toda la motivación, los casos de uso y los detalles de la implementación.