networking - síndrome - tesis relacionadas con sindrome de down
Un conducto de procesamiento, 2 fuentes de IO del mismo tipo (1)
No sé si es de alguna ayuda, pero traté de implementar la sugerencia de Iain e hice una variante de mergeSources''
que se detiene tan pronto como cualquiera de los canales lo hace:
mergeSources'' :: (MonadIO m, MonadBaseControl IO m)
=> [Source (ResourceT m) a] -- ^ The sources to merge.
-> Int -- ^ The bound of the intermediate channel.
-> ResourceT m (Source (ResourceT m) a)
mergeSources'' sx bound = do
c <- liftSTM $ newTBMChan bound
mapM_ (/s -> resourceForkIO $
s $$ chanSink c writeTBMChan closeTBMChan) sx
return $ sourceTBMChan c
(Esta simple adición está disponible here ).
Algunos comentarios a su versión de mergeSources
( mergeSources
con un grano de sal, puede ser que no entendí bien algo):
- Usar
...TMChan
lugar de...TBMChan
parece peligroso. Si los escritores son más rápidos que el lector, su montón explotará. Al mirar su diagrama, parece que esto puede suceder fácilmente, si su par TCP no lee los datos lo suficientemente rápido. Así que definitivamente usaría...TBMChan
quizás con un límite grande pero limitado. No necesita la restricción
MonadSTM m
. Todas las cosas de STM se envuelven enIO
conliftSTM = liftIO . atomically
Tal vez esto te ayude un poco cuando usas
mergeSources''
enserverApp
.Solo un problema cosmético, encontré
liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
muy difícil de leer debido a su uso de
liftA2
en la(->) r
mónada. Yo diría quedo c <- liftSTM newTMChan fsrc sx c retn c
sería más largo, pero mucho más fácil de leer.
¿Podría quizás crear un proyecto independiente donde sea posible jugar con serverApp
?
En mi aplicación GHC Haskell
que utiliza stm, conducto de red y conducto, tengo un cable para cada socket que se bifurca automáticamente utilizando runTCPServer
. Los filamentos pueden comunicarse con otros filamentos a través del uso de un TChan de difusión.
Esto muestra cómo me gustaría configurar la "cadena" de conductos:
Entonces, lo que tenemos aquí son dos fuentes (cada una ligada a conductos auxiliares) que producen un objeto Packet
que el encoder
aceptará y convertirá en ByteString
, luego enviará el socket. He tenido una gran cantidad de dificultades con la fusión eficiente (el rendimiento es una preocupación) de las dos entradas.
Agradecería si alguien pudiera señalarme en la dirección correcta.
Dado que sería grosero de mi parte publicar esta pregunta sin hacer un intento, pondré lo que intenté anteriormente aquí;
He escrito / seleccionado una función que (bloqueo) produce una Fuente desde un TMChan (canal de cierre);
-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit ''Source'' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> STM (Maybe b)) -- ^ The read function
-> (a -> STM ()) -- ^ The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done ()) (HaveOutput pull close)
Del mismo modo, una función para transformar un Chan en un sumidero;
-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> b -> STM()) -- ^ The write function
-> (a -> STM()) -- ^ The close/finalizer function
-> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink
Entonces mergeSources es sencillo; tenedor 2 hilos (que realmente no quiero hacer, pero qué diablos) que puede poner sus nuevos elementos en la lista de la que luego produzco una fuente;
-- | Merges a list of ''Source'' objects, sinking them into a ''TMChan'' and returns
-- a source which consumes the elements of the channel.
mergeSources
:: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
=> [Source (ResourceT m) a] -- ^ The list of sources
-> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
where push c s = s $$ chanSink c writeTMChan closeTMChan
fsrc x c = mapM_ (/s -> resourceForkIO $ push c s) x
retn c = return $ chanSource c readTMChan closeTMChan
Si bien tuve éxito en hacer estas funciones de comprobación de tipo, no tuve éxito en obtener ninguna utilización de estas funciones para comprobar el tipo;
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel
}
makeLenses ''''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
Veo que este método está defectuoso de todos modos: hay muchas listas intermedias y conversiones. Esto no puede ser bueno para el rendimiento. Buscando guía.
PD. Por lo que puedo entender, este no es un duplicado de; Fusionando conductos con múltiples entradas , como en mi situación, ambas fuentes producen el mismo tipo y no me importa de qué fuente se produce el objeto Packet
, siempre que no espere en una mientras que otra tenga objetos listos para ser consumidos.
PPS. Me disculpo por el uso (y, por lo tanto, el requisito de conocimiento) de la lente en el código de ejemplo.