tutorial proyectos mpu6050 mercadolibre mega giroscopio codigo ccs acelerometro scala concurrency future

scala - proyectos - Cómo esperar varios Futuros



mpu6050 mercadolibre (8)

Aquí hay una solución sin usar actores.

import scala.util._ import scala.concurrent._ import java.util.concurrent.atomic.AtomicInteger // Nondeterministic. // If any failure, return it immediately, else return the final success. def allSucceed[T](fs: Future[T]*): Future[T] = { val remaining = new AtomicInteger(fs.length) val p = promise[T] fs foreach { _ onComplete { case s @ Success(_) => { if (remaining.decrementAndGet() == 0) { // Arbitrarily return the final success p tryComplete s } } case f @ Failure(_) => { p tryComplete f } } } p.future }

Supongamos que tengo varios futuros y necesito esperar hasta que cualquiera de ellos falle o todos tengan éxito.

Por ejemplo: Deje que haya 3 futuros: f1 , f2 , f3 .

  • Si f1 tiene éxito y f2 falla, no espero a f3 (y devuelvo el fallo al cliente).

  • Si f2 falla mientras f1 y f3 continúan ejecutándose, no los espero (y no respondo )

  • Si f1 tiene éxito y luego f2 tiene éxito, sigo esperando f3 .

¿Cómo lo implementarías?


En su lugar, puede usar una comprensión forzada de la siguiente manera:

val fut1 = Future{...} val fut2 = Future{...} val fut3 = Future{...} val aggFut = for{ f1Result <- fut1 f2Result <- fut2 f3Result <- fut3 } yield (f1Result, f2Result, f3Result)

En este ejemplo, los futuros 1, 2 y 3 se lanzan en paralelo. Luego, en la sección de comprensión, esperamos hasta que los resultados 1 y luego 2 y luego 3 estén disponibles. Si falla 1 o 2, ya no esperamos 3. Si los 3 tienen éxito, entonces el aggFut val tendrá una tupla con 3 espacios, correspondiente a los resultados de los 3 futuros.

Ahora bien, si necesita el comportamiento en el que desea dejar de esperar si dice que fut2 falla primero, las cosas se ponen un poco más complicadas. En el ejemplo anterior, tendría que esperar a que fut1 se complete antes de darse cuenta de que fut2 falló. Para resolver eso, podrías intentar algo como esto:

val fut1 = Future{Thread.sleep(3000);1} val fut2 = Promise.failed(new RuntimeException("boo")).future val fut3 = Future{Thread.sleep(1000);3} def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = { val fut = if (futures.size == 1) futures.head._2 else Future.firstCompletedOf(futures.values) fut onComplete{ case Success(value) if (futures.size == 1)=> prom.success(value :: values) case Success(value) => processFutures(futures - value, value :: values, prom) case Failure(ex) => prom.failure(ex) } prom.future } val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]()) aggFut onComplete{ case value => println(value) }

Ahora esto funciona correctamente, pero el problema proviene de saber qué Future eliminar del Map cuando se ha completado con éxito. Siempre que tenga alguna forma de correlacionar adecuadamente un resultado con el futuro que generó ese resultado, entonces algo como esto funciona. Simplemente continúa extrayendo Futuros completados del Mapa y luego llama a Future.firstCompletedOf en los Futures restantes hasta que no quede ninguno, recopilando los resultados a lo largo del camino. No es bonito, pero si realmente necesitas el comportamiento del que hablas, entonces esto o algo similar podría funcionar.



Esta pregunta ha sido respondida pero estoy publicando mi solución de clase de valor (las clases de valor se agregaron en 2.10) ya que no hay una aquí. Por favor, siéntete libre de criticar.

implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal { def concurrently = ConcurrentFuture(self) } case class ConcurrentFuture[A](future: Future[A]) extends AnyVal { def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future)) def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class } def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = { val p = Promise[B]() val inner = f(outer.future) inner.future onFailure { case t => p.tryFailure(t) } outer.future onFailure { case t => p.tryFailure(t) } inner.future onSuccess { case b => p.trySuccess(b) } ConcurrentFuture(p.future) }

ConcurrentFuture es un contenedor de futuros sin límite que cambia el mapa futuro predeterminado / mapa plano de do-this-then-that a combine-all-and-fail-if-any-fail. Uso:

def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 } def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" } def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 } val f : Future[(Int,String,Double)] = { for { f1 <- func1.concurrently f2 <- func2.concurrently f3 <- func3.concurrently } yield for { v1 <- f1 v2 <- f2 v3 <- f3 } yield (v1,v2,v3) }.future f.onFailure { case t => println("future failed $t") }

En el ejemplo anterior, f1, f2 y f3 se ejecutarán simultáneamente y si alguno falla en cualquier orden, el futuro de la tupla fallará inmediatamente.


Para este propósito usaría un actor Akka. A diferencia de la comprensión, falla tan pronto como falla el futuro, por lo que es un poco más eficiente en ese sentido.

class ResultCombiner(futs: Future[_]*) extends Actor { var origSender: ActorRef = null var futsRemaining: Set[Future[_]] = futs.toSet override def receive = { case () => origSender = sender for(f <- futs) f.onComplete(result => self ! if(result.isSuccess) f else false) case false => origSender ! SomethingFailed case f: Future[_] => futsRemaining -= f if(futsRemaining.isEmpty) origSender ! EverythingSucceeded } } sealed trait Result case object SomethingFailed extends Result case object EverythingSucceeded extends Result

Luego, crea el actor, envíale un mensaje (para que sepa a dónde enviar su respuesta) y espera una respuesta.

val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3))) try { val f4: Future[Result] = actor ? () implicit val timeout = new Timeout(30 seconds) // or whatever Await.result(f4, timeout.duration).asInstanceOf[Result] match { case SomethingFailed => println("Oh noes!") case EverythingSucceeded => println("It all worked!") } } finally { // Avoid memory leaks: destroy the actor actor ! PoisonPill }


Puede utilizar una promesa y enviarle ya sea la primera falla o el éxito total final completado:

def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { val p = Promise[M[A]]() // the first Future to fail completes the promise in.foreach(_.onFailure{case i => p.tryFailure(i)}) // if the whole sequence succeeds (i.e. no failures) // then the promise is completed with the aggregated success Future.sequence(in).foreach(p trySuccess _) p.future }

Entonces puedes Await en ese Future resultante si quieres bloquear, o simplemente map a otra cosa.

La diferencia con respecto a la comprensión es que aquí se obtiene el error de que el primero falle, mientras que para la comprensión se obtiene el primer error en el orden transversal de la colección de entrada (incluso si otro falló primero). Por ejemplo:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)} // this waits one second, then prints "java.lang.ArithmeticException: / by zero" // the first to fail in traversal order

Y:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)} // this immediately prints "java.util.NoSuchElementException: None.get" // the ''actual'' first to fail (usually...) // and it returns early (it does not wait 1 sec)


Puedes hacer esto solo con futuros. Aquí hay una implementación. Tenga en cuenta que no terminará la ejecución anticipadamente. En ese caso, debe hacer algo más sofisticado (y probablemente implementar la interrupción por su cuenta). Pero si simplemente no quiere seguir esperando algo que no va a funcionar, la clave es seguir esperando que termine lo primero y detenerse cuando ya no quede nada o llegue a una excepción:

import scala.annotation.tailrec import scala.util.{Try, Success, Failure} import scala.concurrent._ import scala.concurrent.duration.Duration import ExecutionContext.Implicits.global @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): Either[Throwable, Seq[A]] = { val first = Future.firstCompletedOf(fs) Await.ready(first, Duration.Inf).value match { case None => awaitSuccess(fs, done) // Shouldn''t happen! case Some(Failure(e)) => Left(e) case Some(Success(_)) => val (complete, running) = fs.partition(_.isCompleted) val answers = complete.flatMap(_.value) answers.find(_.isFailure) match { case Some(Failure(e)) => Left(e) case _ => if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done) else Right( answers.map(_.get) ++: done ) } } }

Aquí hay un ejemplo de esto en acción cuando todo funciona bien:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); println("Fancy meeting you here!") }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! Fancy meeting you here! Bye! res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))

Pero cuando algo sale mal

scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); throw new Exception("boo"); () }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo) scala> Bye!


Puedes usar esto:

val l = List(1, 6, 8) val f = l.map{ i => future { println("future " +i) Thread.sleep(i* 1000) if (i == 12) throw new Exception("6 is not legal.") i } } val f1 = Future.sequence(f) f1 onSuccess{ case l => { logInfo("onSuccess") l.foreach(i => { logInfo("h : " + i) }) } } f1 onFailure{ case l => { logInfo("onFailure") }