scala - proyectos - Cómo esperar varios Futuros
mpu6050 mercadolibre (8)
Aquí hay una solución sin usar actores.
import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger
// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
val remaining = new AtomicInteger(fs.length)
val p = promise[T]
fs foreach {
_ onComplete {
case s @ Success(_) => {
if (remaining.decrementAndGet() == 0) {
// Arbitrarily return the final success
p tryComplete s
}
}
case f @ Failure(_) => {
p tryComplete f
}
}
}
p.future
}
Supongamos que tengo varios futuros y necesito esperar hasta que cualquiera de ellos falle o todos tengan éxito.
Por ejemplo: Deje que haya 3 futuros: f1
, f2
, f3
.
Si
f1
tiene éxito yf2
falla, no espero af3
(y devuelvo el fallo al cliente).Si
f2
falla mientrasf1
yf3
continúan ejecutándose, no los espero (y no respondo )Si
f1
tiene éxito y luegof2
tiene éxito, sigo esperandof3
.
¿Cómo lo implementarías?
En su lugar, puede usar una comprensión forzada de la siguiente manera:
val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}
val aggFut = for{
f1Result <- fut1
f2Result <- fut2
f3Result <- fut3
} yield (f1Result, f2Result, f3Result)
En este ejemplo, los futuros 1, 2 y 3 se lanzan en paralelo. Luego, en la sección de comprensión, esperamos hasta que los resultados 1 y luego 2 y luego 3 estén disponibles. Si falla 1 o 2, ya no esperamos 3. Si los 3 tienen éxito, entonces el aggFut
val tendrá una tupla con 3 espacios, correspondiente a los resultados de los 3 futuros.
Ahora bien, si necesita el comportamiento en el que desea dejar de esperar si dice que fut2 falla primero, las cosas se ponen un poco más complicadas. En el ejemplo anterior, tendría que esperar a que fut1 se complete antes de darse cuenta de que fut2 falló. Para resolver eso, podrías intentar algo como esto:
val fut1 = Future{Thread.sleep(3000);1}
val fut2 = Promise.failed(new RuntimeException("boo")).future
val fut3 = Future{Thread.sleep(1000);3}
def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
val fut = if (futures.size == 1) futures.head._2
else Future.firstCompletedOf(futures.values)
fut onComplete{
case Success(value) if (futures.size == 1)=>
prom.success(value :: values)
case Success(value) =>
processFutures(futures - value, value :: values, prom)
case Failure(ex) => prom.failure(ex)
}
prom.future
}
val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
aggFut onComplete{
case value => println(value)
}
Ahora esto funciona correctamente, pero el problema proviene de saber qué Future
eliminar del Map
cuando se ha completado con éxito. Siempre que tenga alguna forma de correlacionar adecuadamente un resultado con el futuro que generó ese resultado, entonces algo como esto funciona. Simplemente continúa extrayendo Futuros completados del Mapa y luego llama a Future.firstCompletedOf
en los Futures
restantes hasta que no quede ninguno, recopilando los resultados a lo largo del camino. No es bonito, pero si realmente necesitas el comportamiento del que hablas, entonces esto o algo similar podría funcionar.
Es posible que desee consultar la futura API de Twitter. Notablemente el método Future.collect. Hace exactamente lo que quiere: https://twitter.github.io/scala_school/finagle.html
El código fuente Future.scala está disponible aquí: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
Esta pregunta ha sido respondida pero estoy publicando mi solución de clase de valor (las clases de valor se agregaron en 2.10) ya que no hay una aquí. Por favor, siéntete libre de criticar.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
def concurrently = ConcurrentFuture(self)
}
case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
}
def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
val p = Promise[B]()
val inner = f(outer.future)
inner.future onFailure { case t => p.tryFailure(t) }
outer.future onFailure { case t => p.tryFailure(t) }
inner.future onSuccess { case b => p.trySuccess(b) }
ConcurrentFuture(p.future)
}
ConcurrentFuture es un contenedor de futuros sin límite que cambia el mapa futuro predeterminado / mapa plano de do-this-then-that a combine-all-and-fail-if-any-fail. Uso:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }
val f : Future[(Int,String,Double)] = {
for {
f1 <- func1.concurrently
f2 <- func2.concurrently
f3 <- func3.concurrently
} yield for {
v1 <- f1
v2 <- f2
v3 <- f3
} yield (v1,v2,v3)
}.future
f.onFailure { case t => println("future failed $t") }
En el ejemplo anterior, f1, f2 y f3 se ejecutarán simultáneamente y si alguno falla en cualquier orden, el futuro de la tupla fallará inmediatamente.
Para este propósito usaría un actor Akka. A diferencia de la comprensión, falla tan pronto como falla el futuro, por lo que es un poco más eficiente en ese sentido.
class ResultCombiner(futs: Future[_]*) extends Actor {
var origSender: ActorRef = null
var futsRemaining: Set[Future[_]] = futs.toSet
override def receive = {
case () =>
origSender = sender
for(f <- futs)
f.onComplete(result => self ! if(result.isSuccess) f else false)
case false =>
origSender ! SomethingFailed
case f: Future[_] =>
futsRemaining -= f
if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
}
}
sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result
Luego, crea el actor, envíale un mensaje (para que sepa a dónde enviar su respuesta) y espera una respuesta.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
val f4: Future[Result] = actor ? ()
implicit val timeout = new Timeout(30 seconds) // or whatever
Await.result(f4, timeout.duration).asInstanceOf[Result] match {
case SomethingFailed => println("Oh noes!")
case EverythingSucceeded => println("It all worked!")
}
} finally {
// Avoid memory leaks: destroy the actor
actor ! PoisonPill
}
Puede utilizar una promesa y enviarle ya sea la primera falla o el éxito total final completado:
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
val p = Promise[M[A]]()
// the first Future to fail completes the promise
in.foreach(_.onFailure{case i => p.tryFailure(i)})
// if the whole sequence succeeds (i.e. no failures)
// then the promise is completed with the aggregated success
Future.sequence(in).foreach(p trySuccess _)
p.future
}
Entonces puedes Await
en ese Future
resultante si quieres bloquear, o simplemente map
a otra cosa.
La diferencia con respecto a la comprensión es que aquí se obtiene el error de que el primero falle, mientras que para la comprensión se obtiene el primer error en el orden transversal de la colección de entrada (incluso si otro falló primero). Por ejemplo:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order
Y:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the ''actual'' first to fail (usually...)
// and it returns early (it does not wait 1 sec)
Puedes hacer esto solo con futuros. Aquí hay una implementación. Tenga en cuenta que no terminará la ejecución anticipadamente. En ese caso, debe hacer algo más sofisticado (y probablemente implementar la interrupción por su cuenta). Pero si simplemente no quiere seguir esperando algo que no va a funcionar, la clave es seguir esperando que termine lo primero y detenerse cuando ya no quede nada o llegue a una excepción:
import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global
@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()):
Either[Throwable, Seq[A]] = {
val first = Future.firstCompletedOf(fs)
Await.ready(first, Duration.Inf).value match {
case None => awaitSuccess(fs, done) // Shouldn''t happen!
case Some(Failure(e)) => Left(e)
case Some(Success(_)) =>
val (complete, running) = fs.partition(_.isCompleted)
val answers = complete.flatMap(_.value)
answers.find(_.isFailure) match {
case Some(Failure(e)) => Left(e)
case _ =>
if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
else Right( answers.map(_.get) ++: done )
}
}
}
Aquí hay un ejemplo de esto en acción cuando todo funciona bien:
scala> awaitSuccess(Seq(Future{ println("Hi!") },
Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
Pero cuando algo sale mal
scala> awaitSuccess(Seq(Future{ println("Hi!") },
Future{ Thread.sleep(1000); throw new Exception("boo"); () },
Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)
scala> Bye!
Puedes usar esto:
val l = List(1, 6, 8)
val f = l.map{
i => future {
println("future " +i)
Thread.sleep(i* 1000)
if (i == 12)
throw new Exception("6 is not legal.")
i
}
}
val f1 = Future.sequence(f)
f1 onSuccess{
case l => {
logInfo("onSuccess")
l.foreach(i => {
logInfo("h : " + i)
})
}
}
f1 onFailure{
case l => {
logInfo("onFailure")
}