tutorial - scala lenguaje de programacion ejemplos
Futuro con tiempo de espera en Scala (5)
Aunque ya tiene algunas respuestas sobre cómo lograrlo con el bloqueo del subproceso adicional para manejar el tiempo de espera, le sugiero que intente una forma diferente, por la razón que Rex Kerr ya dio. No sé exactamente qué está haciendo en f()
, pero si está enlazado a E / S, le sugiero que use una biblioteca de E / S asíncrona. Si es algún tipo de bucle, puede pasar el valor del tiempo de espera directamente a esa función y lanzar una TimeoutException
allí, si supera el tiempo de espera. Ejemplo:
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
def doSth(timeout: Deadline) = {
for {
i <- 0 to 10
} yield {
Thread.sleep(1000)
if (timeout.isOverdue)
throw new TimeoutException("Operation timed out.")
i
}
}
scala> future { doSth(12.seconds.fromNow) }
res3: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] =
scala.concurrent.impl.Promise$DefaultPromise@3d104456
scala> Await.result(res3, Duration.Inf)
res6: scala.collection.immutable.IndexedSeq[Int] =
Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> future { doSth(2.seconds.fromNow) }
res7: scala.concurrent.Future[scala.collection.immutable.IndexedSeq[Int]] =
scala.concurrent.impl.Promise$DefaultPromise@f7dd680
scala> Await.result(res7, Duration.Inf)
java.util.concurrent.TimeoutException: Operation timed out.
at $anonfun$doSth$1.apply$mcII$sp(<console>:17)
at $anonfun$doSth$1.apply(<console>:13)
at $anonfun$doSth$1.apply(<console>:13)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
...
scala> res7.value
res10: Option[scala.util.Try[scala.collection.immutable.IndexedSeq[Int]]] =
Some(Failure(java.util.concurrent.TimeoutException: Operation timed out.))
Esto solo utilizará 1 subproceso, que se terminará después del tiempo de espera + el tiempo de ejecución de un solo paso.
Supongamos que tengo una función que invoca una operación interrumpible de bloqueo. Me gustaría ejecutarlo de forma asíncrona con un tiempo de espera. Es decir, me gustaría interrumpir la función cuando caduque el tiempo de espera.
Así que estoy tratando de hacer algo así:
import scala.util.Try import scala.concurrent.Future def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = { val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() import ExecutionContext.Implicits.global Future {Thread.sleep(timeout); aref.get().interrupt} // 1 Future {aref.set(Thread.currentThread); Try(f())} // 2 }
El problema es que aref
en (1) puede ser nulo porque (2) aún no lo ha establecido en el subproceso actual. En este caso me gustaría esperar hasta que se establezca aref
. ¿Cuál es la mejor manera de hacer eso?
Puede ir para un enfoque un poco más fácil utilizando Await . El método Await.result
toma la duración del tiempo de espera como un segundo parámetro y lanza una TimeoutException
en el tiempo de espera.
try {
import scala.concurrent.duration._
Await.result(aref, 10 seconds);
} catch {
case e: TimeoutException => // whatever you want to do.
}
Si agrega un CountDownLatch
puede lograr el comportamiento que desea. (Tenga en cuenta que el bloqueo (es decir, quedarse atascado a la await
) en muchos lotes de Future
puede provocar el hambre de los grupos de subprocesos).
import scala.util.Try
import scala.concurrent.Future
def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {
val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
val cdl = new java.util.concurrent.CountDownLatch(1)
import ExecutionContext.Implicits.global
Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt} // 1
Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())} // 2
}
También puedes intentar usar un CountDownLatch
como este:
def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {
val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
import ExecutionContext.Implicits.global
val latch = new CountDownLatch(1)
Future {
latch.await()
aref.get().interrupt
}
Future {
aref.set(Thread.currentThread)
latch.countDown()
Try(f())
}
}
Ahora estoy esperando por siempre con mi llamada a latch.await()
, pero ciertamente podrías cambiar eso a:
latch.await(1, TimeUnit.SECONDS)
y luego envuélvelo con un Try
de manejar si / si se agota el tiempo.
Yo también necesitaba el mismo comportamiento, así que así es como lo resolví. Básicamente, creé un objeto que crea un temporizador y falla la promesa con una TimeoutException si el futuro no se ha completado en la duración especificada.
package mypackage
import scala.concurrent.{Promise, Future}
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.Implicits.global
object TimeoutFuture {
val actorSystem = ActorSystem("myActorSystem")
def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
val promise = Promise[A]()
actorSystem.scheduler.scheduleOnce(timeout) {
promise tryFailure new java.util.concurrent.TimeoutException
}
Future {
try {
promise success block
}
catch {
case e:Throwable => promise failure e
}
}
promise.future
}
}