tutorial programacion lenguaje ejemplos scala concurrency future

tutorial - scala lenguaje de programacion ejemplos



Futuro con tiempo de espera en Scala (5)

Aunque ya tiene algunas respuestas sobre cómo lograrlo con el bloqueo del subproceso adicional para manejar el tiempo de espera, le sugiero que intente una forma diferente, por la razón que Rex Kerr ya dio. No sé exactamente qué está haciendo en f() , pero si está enlazado a E / S, le sugiero que use una biblioteca de E / S asíncrona. Si es algún tipo de bucle, puede pasar el valor del tiempo de espera directamente a esa función y lanzar una TimeoutException allí, si supera el tiempo de espera. Ejemplo:

import scala.concurrent.duration._ import java.util.concurrent.TimeoutException def doSth(timeout: Deadline) = { for { i <- 0 to 10 } yield { Thread.sleep(1000) if (timeout.isOverdue) throw new TimeoutException("Operation timed out.") i } } scala> future { doSth(12.seconds.fromNow) } res3: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = scala.concurrent.impl.Promise$DefaultPromise@3d104456 scala> Await.result(res3, Duration.Inf) res6: scala.collection.immutable.IndexedSeq[Int] = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> future { doSth(2.seconds.fromNow) } res7: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] = scala.concurrent.impl.Promise$DefaultPromise@f7dd680 scala> Await.result(res7, Duration.Inf) java.util.concurrent.TimeoutException: Operation timed out. at $anonfun$doSth$1.apply$mcII$sp(<console>:17) at $anonfun$doSth$1.apply(<console>:13) at $anonfun$doSth$1.apply(<console>:13) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ... scala> res7.value res10: Option[scala.util.Try[scala.collection.immutable.IndexedSeq[Int]]] = Some(Failure(java.util.concurrent.TimeoutException: Operation timed out.))

Esto solo utilizará 1 subproceso, que se terminará después del tiempo de espera + el tiempo de ejecución de un solo paso.

Supongamos que tengo una función que invoca una operación interrumpible de bloqueo. Me gustaría ejecutarlo de forma asíncrona con un tiempo de espera. Es decir, me gustaría interrumpir la función cuando caduque el tiempo de espera.

Así que estoy tratando de hacer algo así:

import scala.util.Try import scala.concurrent.Future def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = { val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() import ExecutionContext.Implicits.global Future {Thread.sleep(timeout); aref.get().interrupt} // 1 Future {aref.set(Thread.currentThread); Try(f())} // 2 }

El problema es que aref en (1) puede ser nulo porque (2) aún no lo ha establecido en el subproceso actual. En este caso me gustaría esperar hasta que se establezca aref . ¿Cuál es la mejor manera de hacer eso?


Puede ir para un enfoque un poco más fácil utilizando Await . El método Await.result toma la duración del tiempo de espera como un segundo parámetro y lanza una TimeoutException en el tiempo de espera.

try { import scala.concurrent.duration._ Await.result(aref, 10 seconds); } catch { case e: TimeoutException => // whatever you want to do. }


Si agrega un CountDownLatch puede lograr el comportamiento que desea. (Tenga en cuenta que el bloqueo (es decir, quedarse atascado a la await ) en muchos lotes de Future puede provocar el hambre de los grupos de subprocesos).

import scala.util.Try import scala.concurrent.Future def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = { val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() val cdl = new java.util.concurrent.CountDownLatch(1) import ExecutionContext.Implicits.global Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt} // 1 Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())} // 2 }


También puedes intentar usar un CountDownLatch como este:

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = { val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() import ExecutionContext.Implicits.global val latch = new CountDownLatch(1) Future { latch.await() aref.get().interrupt } Future { aref.set(Thread.currentThread) latch.countDown() Try(f()) } }

Ahora estoy esperando por siempre con mi llamada a latch.await() , pero ciertamente podrías cambiar eso a:

latch.await(1, TimeUnit.SECONDS)

y luego envuélvelo con un Try de manejar si / si se agota el tiempo.


Yo también necesitaba el mismo comportamiento, así que así es como lo resolví. Básicamente, creé un objeto que crea un temporizador y falla la promesa con una TimeoutException si el futuro no se ha completado en la duración especificada.

package mypackage import scala.concurrent.{Promise, Future} import scala.concurrent.duration.FiniteDuration import akka.actor.ActorSystem import scala.concurrent.ExecutionContext.Implicits.global object TimeoutFuture { val actorSystem = ActorSystem("myActorSystem") def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = { val promise = Promise[A]() actorSystem.scheduler.scheduleOnce(timeout) { promise tryFailure new java.util.concurrent.TimeoutException } Future { try { promise success block } catch { case e:Throwable => promise failure e } } promise.future } }