scala future

Scala a la espera de la secuencia de futuros.



future (4)

Esperaba que el código como el siguiente esperara ambos futuros, pero no lo hace.

object Fiddle { val f1 = Future { throw new Throwable("baaa") // emulating a future that bumped into an exception } val f2 = Future { Thread.sleep(3000L) // emulating a future that takes a bit longer to complete 2 } val lf = List(f1, f2) // in the general case, this would be a dynamically sized list val seq = Future.sequence(lf) seq.onComplete { _ => lf.foreach(f => println(f.isCompleted)) } } val a = FuturesSequence

Asumí que seq.onComplete esperaría a que todos se completen antes de completarse, pero no es así; en resultado de:

true false

.sequence fue un poco difícil de seguir en la fuente de scala.concurrent.Future, me pregunto cómo implementaría un paralelo que espera todos los futuros originales de una secuencia (de tamaño dinámico), o cuál podría ser el problema aquí.

Edición: una pregunta relacionada: https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)


Este es un ejemplo que apoya la respuesta anterior. Hay una manera fácil de hacer esto usando solo las API de Scala estándar.

En el ejemplo, estoy creando 3 futuros. Estos se completarán a los 5, 7 y 9 segundos respectivamente. La llamada a Await.result se bloqueará hasta que todos los futuros se hayan resuelto. Una vez que se hayan completado los 3 futuros, a se establecerá a List(5,7,9) y la ejecución continuará.

Además, si se lanza una excepción en alguno de los futuros, Await.result desbloqueará y lanzará la excepción inmediatamente. Descomente la línea Exception(...) para ver esto en acción.

try { val a = Await.result(Future.sequence(Seq( Future({ blocking { Thread.sleep(5000) } System.err.println("A") 5 }), Future({ blocking { Thread.sleep(7000) } System.err.println("B") 7 //throw new Exception("Ha!") }), Future({ blocking { Thread.sleep(9000) } System.err.println("C") 9 }))), Duration("100 sec")) System.err.println(a) } catch { case e: Exception ⇒ e.printStackTrace() }


Podemos enriquecer Seq[Future[T]] con su propio método onComplete través de una clase implícita:

def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] = f map { Success(_) } recover { case e => Failure(e) } def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] = fs map { lift(_) } implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal { def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = { Future.sequence(lift(fs)) onComplete { case Success(s) => f(s) case Failure(e) => throw e // will never happen, because of the Try lifting } } }

Luego, en tu MWE particular, puedes hacer:

val f1 = Future { throw new Throwable("baaa") // emulating a future that bumped into an exception } val f2 = Future { Thread.sleep(3000L) // emulating a future that takes a bit longer to complete 2 } val lf = List(f1, f2) lf onComplete { _ map { case Success(v) => ??? case Failure(e) => ??? }}

Esta solución tiene la ventaja de permitirle llamar a un onComplete en una secuencia de futuros como lo haría en un solo futuro.


Un Future producido por Future.sequence completa cuando:

  • todos los futuros se han completado con éxito, o
  • uno de los futuros ha fracasado

El segundo punto es lo que está sucediendo en su caso, y tiene sentido completarlo tan pronto como uno de los Future envueltos haya fallado, porque el Future envolvente solo puede contener un solo Throwable en el caso de falla. No tiene sentido esperar a los otros futuros porque el resultado será el mismo fracaso.


Un enfoque común para esperar todos los resultados (fallidos o no) es "elevar" las fallas a una nueva representación en el futuro, de modo que todos los futuros se completen con algún resultado (aunque pueden completarse con un resultado que represente una falla). Una forma natural de conseguirlo es levantarlo para Try .

La implementación de futuros de Twitter proporciona un método liftToTry que hace que esto sea trivial, pero puedes hacer algo similar con la implementación de la biblioteca estándar:

import scala.util.{ Failure, Success, Try } val lifted: List[Future[Try[Int]]] = List(f1, f2).map( _.map(Success(_)).recover { case t => Failure(t) } )

Ahora Future.sequence(lifted) se completará cuando se complete cada futuro, y representará los éxitos y los fracasos usando Try .

Y así, una solución genérica para esperar en todos los futuros originales de una secuencia de futuros puede verse como sigue, asumiendo que un contexto de ejecución está, por supuesto, implícitamente disponible.

import scala.util.{ Failure, Success, Try } private def lift[T](futures: Seq[Future[T]]) = futures.map(_.map { Success(_) }.recover { case t => Failure(t) }) def waitAll[T](futures: Seq[Future[T]]) = Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used waitAll(SeqOfFutures).map { // do whatever with the completed futures }