scala - Manejo de excepciones en una biblioteca iteratee sin un estado de error
haskell scalaz (2)
Estoy intentando escribir un enumerador para leer archivos línea por línea de un java.io.BufferedReader
utilizando la biblioteca iteratee de Scalaz 7, que actualmente solo proporciona un enumerador (extremadamente lento) para java.io.Reader
.
Los problemas con los que me estoy enfrentando están relacionados con el hecho de que todas las otras bibliotecas iteratee que he usado (p. Ej. Play 2.0''s y el enumerator
John Millikin para Haskell) han tenido un estado de error como uno de los constructores de su tipo Step
, y Scalaz 7 no.
Mi implementación actual
Esto es lo que tengo actualmente. Primero para algunas importaciones y envoltorios IO
:
import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I, _ }
def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
def readLine(r: BufferedReader) = IO(Option(r.readLine))
def closeReader(r: BufferedReader) = IO(r.close())
Y un alias tipo para limpiar un poco las cosas:
type ErrorOr[A] = Either[Throwable, A]
Y ahora es un asistente de tryIO
, modelado (de manera vaga y probablemente errónea) en el enumerator
:
def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, ErrorOr[B]](
action.catchLeft.map(
r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
)
)
Un enumerador para el BufferedReader
sí mismo:
def enumBuffered(r: => BufferedReader) = new EnumeratorT[ErrorOr[String], IO] {
lazy val reader = r
def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
tryIO(readLine(reader)) flatMap {
case Right(None) => s.pointI
case Right(Some(line)) => k(I.elInput(Right(line))) >>== apply[A]
case Left(e) => k(I.elInput(Left(e)))
}
)
}
Y finalmente un enumerador que es responsable de abrir y cerrar al lector:
def enumFile(f: File) = new EnumeratorT[ErrorOr[String], IO] {
def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
tryIO(openFile(f)) flatMap {
case Right(reader) => I.iterateeT(
enumBuffered(reader).apply(s).value.ensuring(closeReader(reader))
)
case Left(e) => k(I.elInput(Left(e)))
}
)
}
Ahora supongamos, por ejemplo, que quiero recopilar todas las líneas de un archivo que contiene al menos veinticinco ''0''
caracteres en una lista. Puedo escribir:
val action: IO[ErrorOr[List[String]]] = (
I.consume[ErrorOr[String], IO, List] %=
I.filter(_.fold(_ => true, _.count(_ == ''0'') >= 25)) &=
enumFile(new File("big.txt"))
).run.map(_.sequence)
En muchos sentidos, esto parece funcionar de maravilla: puedo patear la acción con unsafePerformIO
y se unsafePerformIO
en decenas de millones de líneas y gigabytes de datos en un par de minutos, en la memoria constante y sin soplar la pila, y luego cerrar el lector cuando está hecho. Si le doy el nombre de un archivo que no existe, obedientemente me devolverá la excepción envuelta en una Left
, y enumBuffered
al menos parece comportarse apropiadamente si golpea una excepción durante la lectura.
Problemas potenciales
Sin embargo, tengo algunas preocupaciones sobre mi implementación, particularmente sobre tryIO
. Por ejemplo, supongamos que trato de componer algunas iteraciones:
val it = for {
_ <- tryIO[Unit, Unit](IO(println("a")))
_ <- tryIO[Unit, Unit](IO(throw new Exception("!")))
r <- tryIO[Unit, Unit](IO(println("b")))
} yield r
Si ejecuto esto, obtengo lo siguiente:
scala> it.run.unsafePerformIO()
a
b
res11: ErrorOr[Unit] = Right(())
Si intento lo mismo con el enumerator
en GHCi, el resultado es más parecido a lo que esperaba:
...> run $ tryIO (putStrLn "a") >> tryIO (error "!") >> tryIO (putStrLn "b")
a
Left !
Simplemente no veo una manera de obtener este comportamiento sin un estado de error en la biblioteca de iteratee.
Mis preguntas
No pretendo ser ningún tipo de experto en iteraciones, pero he utilizado varias implementaciones de Haskell en algunos proyectos, siento que más o menos entiendo los conceptos fundamentales, y tomé un café con Oleg una vez. Sin embargo, estoy perdido aquí. ¿Es esta una forma razonable de manejar excepciones en ausencia de un estado de error? ¿Hay alguna manera de implementar tryIO
que se comporte más como la versión del enumerator
? ¿Hay algún tipo de bomba de tiempo esperándome en el hecho de que mi implementación se comporta de manera diferente?
EDITAR aquí es la verdadera solución. Me fui en la publicación original porque creo que vale la pena ver el patrón. Lo que funciona para Klesli funciona para IterateeT
import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }
object IterateeIOExample {
type ErrorOr[+A] = EitherT[IO, Throwable, A]
def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
def readLine(r: BufferedReader) = IO(Option(r.readLine))
def closeReader(r: BufferedReader) = IO(r.close())
def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
EitherT.fromEither(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
}
def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] {
lazy val reader = r
def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k =>
tryIO(readLine(reader)) flatMap {
case None => s.pointI
case Some(line) => k(I.elInput(line)) >>== apply[A]
})
}
def enumFile(f: File) = new EnumeratorT[String, ErrorOr] {
def apply[A] = (s: StepT[String, ErrorOr, A]) =>
tryIO(openFile(f)).flatMap(reader => I.iterateeT[String, ErrorOr, A](
EitherT(
enumBuffered(reader).apply(s).value.run.ensuring(closeReader(reader)))))
}
def main(args: Array[String]) {
val action = (
I.consume[String, ErrorOr, List] %=
I.filter(a => a.count(_ == ''0'') >= 25) &=
enumFile(new File(args(0)))).run.run
println(action.unsafePerformIO().map(_.size))
}
}
===== Mensaje original =====
Siento que necesitas un EitherT en la mezcla. Sin EitherT usted acaba de terminar con 3 izquierdas o derechos. Con EitherT sería apropiado para la izquierda.
Creo que lo que realmente quieres es
type ErrorOr[+A] = EitherT[IO, Throwable, A]
I.iterateeT[A, ErrorOr, B]
El siguiente código imita cómo estás componiendo cosas en este momento. Debido a que IterateeT no tiene ningún concepto de izquierda y derecha, cuando lo compones, terminas con un montón de IO / Id.
scala> Kleisli((a:Int) => 4.right[String].point[Id])
res11: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.//[String,Int]] = scalaz.KleisliFunctions$$anon$18@73e771ca
scala> Kleisli((a:Int) => "aa".left[Int].point[Id])
res12: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.//[String,Int]] = scalaz.KleisliFunctions$$anon$18@be41b41
scala> for { a <- res11; b <- res12 } yield (a,b)
res15: scalaz.Kleisli[scalaz.Scalaz.Id,Int,(scalaz.//[String,Int], scalaz.//[String,Int])] = scalaz.KleisliFunctions$$anon$18@42fd1445
scala> res15.run(1)
res16: (scalaz.//[String,Int], scalaz.//[String,Int]) = (//-(4),-//(aa))
En el siguiente código, en lugar de usar Id, usamos un EitherT. Como EitherT tiene el mismo comportamiento de vinculación que cualquiera, terminamos con lo que queremos.
scala> type ErrorOr[+A] = EitherT[Id, String, A]
defined type alias ErrorOr
scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT(4.right[String].point[Id]))
res22: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@58b547a0
scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT("aa".left[Int].point[Id]))
res24: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@342f2ceb
scala> for { a <- res22; b <- res24 } yield 2
res25: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@204eab31
scala> res25.run(2).run
res26: scalaz.Scalaz.Id[scalaz.//[String,Int]] = -//(aa)
Puede reemplazar Keisli con IterateeT e Id con IO para obtener lo que necesita.
La forma en que lo hacen las pipes
es componer la clase de clase usando la clase de tipo de Channel
:
class Channel p where
{-| ''idT'' acts like a /'T/'ransparent proxy, passing all requests further
upstream, and passing all responses further downstream. -}
idT :: (Monad m) => a'' -> p a'' a a'' a m r
{-| Compose two proxies, satisfying all requests from downstream with
responses from upstream. -}
(>->) :: (Monad m)
=> (b'' -> p a'' a b'' b m r)
-> (c'' -> p b'' b c'' c m r)
-> (c'' -> p a'' a c'' c m r)
p1 >-> p2 = p2 <-< p1
... y derivó una composición levantada sobre EitherT
partir de la composición de base. Este es un caso especial del principio de los transformadores proxy, introducido en pipes-2.4
, que permite elevar la composición sobre extensiones arbitrarias.
Este levantamiento requiere definir un EitherT
especializado en la forma del tipo Proxy
en Control.Proxy.Trans.Either
:
newtype EitherP e p a'' a b'' b (m :: * -> *) r
= EitherP { runEitherP :: p a'' a b'' b m (Either e r) }
Esta especialización a la forma de Proxy
es necesaria para poder definir una instancia bien tipada de la clase Channel
. Scala podría ser más flexible en este sentido que Haskell.
Luego, simplemente redefino la instancia de Monad
(y otras instancias) junto con todas las operaciones ordinarias de EitherT
para este tipo especializado:
throw :: (Monad (p a'' a b'' b m)) => e -> EitherP e p a'' a b'' b m r
throw = EitherP . return . Left
catch
:: (Monad (p a'' a b'' b m))
=> EitherP e p a'' a b'' b m r -- ^ Original computation
-> (e -> EitherP f p a'' a b'' b m r) -- ^ Handler
-> EitherP f p a'' a b'' b m r -- ^ Handled computation
catch m f = EitherP $ do
e <- runEitherP m
runEitherP $ case e of
Left l -> f l
Right r -> right r
Con esto en mano, puedo definir la siguiente instancia de composición elevada:
-- Given that ''p'' is composable, so is ''EitherP e p''
instance (Channel p) => Channel (EitherP e p) where
idT = EitherP . idT
p1 >-> p2 = (EitherP .) $ runEitherP . p1 >-> runEitherP . p2
Para entender qué está pasando allí, simplemente siga los tipos:
p1 :: b'' -> EitherP e p a'' a b'' b m r
p2 :: c'' -> EitherP e p b'' b c'' c m r
runEitherP . p1 :: b'' -> p a'' a b'' b m (Either e r)
runEitherP . p2 :: c'' -> p b'' b c'' c m (Either e r)
-- Use the base composition for ''p''
runEitherP . p1 >-> runEitherP . p2
:: c'' -> p a'' a c'' c m (Either e r)
-- Rewrap in EitherP
(EitherP . ) $ runEitherP . p1 >-> runEitherP . p2
:: c'' -> EitherP e p a'' a c'' c m r
Esto le permite lanzar y atrapar errores dentro de una etapa particular sin interrumpir otras etapas. Aquí hay un ejemplo que he copiado y pegado de mi publicación de anuncio de pipes-2.4
:
import Control.Monad (forever)
import Control.Monad.Trans (lift)
import Control.Proxy
import Control.Proxy.Trans.Either as E
import Safe (readMay)
promptInts :: () -> EitherP String Proxy C () () Int IO r
promptInts () = recover $ forever $ do
str <- lift getLine
case readMay str of
Nothing -> E.throw "Could not parse an integer"
Just n -> liftP $ respond n
recover p =
p `E.catch` (/str -> lift (putStrLn str) >> recover p)
main = runProxy $ runEitherK $ mapP printD <-< promptInts
Este es el resultado:
>>> main
1<Enter>
1
Test<Enter>
Could not parse an integer
Apple<Enter>
Could not parse an integer
5<Enter>
5
La respuesta al enfoque iteratee es similar. Debe tomar su forma actual de componer iteraciones y levantarla sobre EitherT
. Si usted usa clases de tipos o simplemente define un nuevo operador de composición, depende de usted.
Algunos otros enlaces útiles:
-
pipes-2.4
anuncios depipes-2.4
-
Control.Proxy.Class
,Control.Proxy.Trans
yControl.Proxy.Trans.Either
- Una pregunta de desbordamiento de pila muy similar sobre el mismo tema (a excepción de `pipes)