.net asynchronous stream task-parallel-library system.reactive

.net - Multidifusión de secuencias: lea una secuencia una vez pero procese de diferentes maneras, con un almacenamiento en búfer mínimo



asynchronous stream (3)

Creo que lo que quieres hacer es tener un flujo de entrada, múltiples flujos de salida y luego copiar la entrada a todos los resultados, algo así como:

Stream input; IList<Stream> outputs; byte[] buffer = new byte[BufferSize]; int read; while ((read = input.Read(buffer, 0, buffer.Length)) != 0) { foreach (var output in outputs) { output.Write(buffer, 0, read); } }

Esos flujos de salida no tienen que ser transmisiones normales, podrían ser flujos especiales que, por ejemplo, solo calculan la longitud. Tendrían que sobrescribir únicamente el método Write() , por lo que una transmisión base personalizada podría ser útil:

public class OutputStreamBase : Stream { private int length; public override void Flush() {} public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } public override void Write(byte[] buffer, int offset, int count) { length += count; } public override bool CanRead { get { return false; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return true; } } public override long Length { get { return length; } } public override long Position { get { return length; } set { throw new NotSupportedException(); } } }

Esta secuencia se puede usar directamente como la secuencia de recuento o para implementar fácilmente la secuencia de búsqueda MIME.

Para la escalabilidad y para conservar los recursos, es mejor evitar leer toda una secuencia de entrada en la memoria, sino tratar de procesarla como una secuencia, leyendo pequeños fragmentos a la vez. Esto es fácil de lograr en .NET cuando tiene una cosa que desea hacer con los datos, como leerla desde una solicitud web y guardarla en un archivo. Ejemplo fácil:

input.CopyTo(output); // reads chunks of 4096 bytes and writes them to `output`

Pero cuando quiero hacer varias cosas con esa información, es un poco más complicado. Por ejemplo, quiero:

  • Cuente la longitud de una secuencia que no admite la propiedad Length . Podemos hacer esto con un DummyStream personalizado que no hace nada con los datos escritos, excepto el seguimiento de la longitud.
  • Calcule el MD5 de la secuencia .
  • Calcule el tipo de MIME de los datos. FindMimeFromData solo necesita los primeros 256 bytes de la transmisión.
  • Guarde toda la secuencia en la base de datos .

... pero solo lo hace con un solo pase sobre el flujo de entrada, con un uso mínimo de almacenamiento en búfer.

Estoy seguro de que esto es posible. Probablemente podría coordinar múltiples hilos, con un hilo haciendo la lectura real del flujo de entrada y otros hilos para cada una de las tareas de "procesamiento" que quiero realizar. Pero eso podría volverse bastante complicado y frágil si no se hace correctamente.

Mi pregunta es esta:

  • ¿Tengo que usar múltiples hilos? Me encantaría algún tipo de solución de co-rutina donde todo el procesamiento se intercala en un hilo.
  • ¿Hay formas en que pueda aprovechar las características async C # o las Extensiones reactivas para simplificar esta solución?

Estoy teniendo problemas para envolver mi cerebro en este. Estoy buscando orientación sobre la mejor (limpia, sostenible, uso eficiente de los recursos informáticos) para lograr esto, especialmente a la luz de las tecnologías más recientes como TPL, async y RX.

Este es un ejemplo de la sintaxis que estoy visualizando:

public static void Multicast(this Stream input, params Action<Stream>[] processingActions) { // TODO: ??? complicated stream multicasting logic goes here. ??? throw new NotImplementedException(); }

Y lo usarías así:

long length; byte[] md5; string mimeType; int uploadId; input.Multicast( s => length = GetLength(s), s => md5 = CalculateMd5(s), s => mimeType = DetermineMimeType(s, filename, mimeTypeAsReportedByClient) s => uploadId = SaveToDatabase(s) );

Y aquí hay un ejemplo de una de las acciones de procesamiento:

private static byte[] CalculateMd5(Stream input) { return MD5.Create().ComputeHash(input); }


Decidí probar una implementación de Rx. Esto es lo que obtuve hasta ahora. No realiza ninguna escritura de base de datos, pero calcula la longitud, el hash MD5 y el tipo de letra mimet con solo una pasada sobre el archivo y un almacenamiento en búfer mínimo.

using System; using System.Diagnostics; using System.IO; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Runtime.InteropServices; using System.Security.Cryptography; namespace RxTest { internal static class Program { private static void Main() { var expectedValues = ReadExpectedValuesDirectly("demo.txt"); new FileInfo("demo.txt") .ReadAsObserveable(4096) .ToFileData() .Subscribe(observed => Compare(expectedValues, observed)); } private static void Compare(FileData expected, FileData observed) { Console.WriteLine(); WriteLine("expected", expected); WriteLine("observed", observed); Console.WriteLine(); Debug.Assert(observed.Length == expected.Length); Debug.Assert(BitConverter.ToString(observed.Hash) == BitConverter.ToString(expected.Hash)); Debug.Assert(observed.MimeType == expected.MimeType); } private static void WriteLine(string prefix, FileData observed) { Console.WriteLine("{0}: {1:N0} {2} {3}", prefix, observed.Length, observed.MimeType, BitConverter.ToString(observed.Hash).Replace("-", "")); } private static FileData ReadExpectedValuesDirectly(string fileName) { return new FileData { Length = new FileInfo(fileName).Length, Hash = MD5.Create().ComputeHash(File.ReadAllBytes(fileName)), MimeType = FileDataExtensions.FindMimeType(GetFirst256Bytes(fileName)) }; } private static byte[] GetFirst256Bytes(string path) { using (var stream = File.OpenRead(path)) { var buffer = new byte[256]; if (stream.Length >= 256) stream.Read(buffer, 0, 256); else stream.Read(buffer, 0, (int) stream.Length); return buffer; } } } public class FileData { public long Length { get; set; } public string MimeType { get; set; } public byte[] Hash { get; set; } } public static class FileDataExtensions { public static IObservable<byte[]> ReadAsObserveable(this FileInfo file, int bufferSize) { return Observable.Create<byte[]>(observer => { using (var stream = file.OpenRead()) { return stream.ReadAsObservable(bufferSize).Subscribe(observer); } }); } public static IObservable<byte[]> ReadAsObservable(this Stream stream, int bufferSize) { // TODO: Add scheduling/canceling return Observable.Create<byte[]>(observer => { var block = new byte[bufferSize]; int bytesRead; while ((bytesRead = stream.Read(block, 0, bufferSize)) > 0) { if (bytesRead == bufferSize) observer.OnNext(block); else { var lastBlock = new byte[bytesRead]; Array.Copy(block, lastBlock, bytesRead); observer.OnNext(lastBlock); observer.OnCompleted(); } } return Disposable.Empty; }); } public static IObservable<FileData> ToFileData(this IObservable<byte[]> file) { return Observable.Create<FileData>(observer => { var counter = 0; var connectable = file .Do(_ => Console.WriteLine()) .Do(_ => Console.Write(++counter)) .Publish(); var combineSub = Observable.CombineLatest( connectable.TotalLength(), connectable.ComputeHash(MD5.Create()), connectable.FindMimeType(), (length, hash, mimeType) => new FileData { Hash = hash, Length = length, MimeType = mimeType }) .Subscribe(observer); var connectSub = connectable.Connect(); return new CompositeDisposable(combineSub, connectSub); }); } public static IObservable<long> TotalLength(this IObservable<byte[]> file) { return file .Do(block => Console.Write("/tLength()")) .Select(block => block.LongLength) .Sum(); } public static IObservable<byte[]> ComputeHash(this IObservable<byte[]> file, HashAlgorithm algorithm) { return Observable.Create<byte[]>(observer => file .Do(block => Console.Write("/tComputeHash()")) .Subscribe( block => algorithm.TransformBlock(block, 0, block.Length, null, 0), () => { algorithm.TransformFinalBlock(new byte[0], 0, 0); observer.OnNext(algorithm.Hash); observer.OnCompleted(); })); } public static IObservable<string> FindMimeType(this IObservable<byte[]> file) { // this doesn''t handle cases where the file is less than 256 bytes in length. return file .Do(block => Console.Write("/tFindMimeType()")) .Take(1) .Select(block => { var first256 = new byte[256]; Array.Copy(block, first256, 256); return FindMimeType(first256); }); } public static string FindMimeType(byte[] first256) { try { UInt32 unMimeType; FindMimeFromData(0, null, first256, 256, null, 0, out unMimeType, 0); var pMimeType = new IntPtr(unMimeType); var sMimeTypeFromFile = Marshal.PtrToStringUni(pMimeType); Marshal.FreeCoTaskMem(pMimeType); return sMimeTypeFromFile; } catch (Exception ex) { // not exactly robust exeption handling Console.WriteLine(ex.ToString()); return null; } } [DllImport(@"urlmon.dll", CharSet = CharSet.Auto)] private static extern UInt32 FindMimeFromData( UInt32 pBC, [MarshalAs(UnmanagedType.LPStr)] String pwzUrl, [MarshalAs(UnmanagedType.LPArray)] byte[] pBuffer, UInt32 cbSize, [MarshalAs(UnmanagedType.LPStr)] String pwzMimeProposed, UInt32 dwMimeFlags, out UInt32 ppwzMimeOut, UInt32 dwReserverd ); } }


Usando Rx podría usar Observable.Create para crear un elemento observable que lea la secuencia, luego use Publish para permitir múltiples suscripciones al vapor sin iniciarlo todavía, y luego llame a Connect en el flujo Published para que todo funcione y se ejecute. Puede usar ObserveOn y SubscribeOn para cada una de las diferentes "rutas" que toman los datos de la secuencia para determinar cuándo, dónde y cómo se ejecuta cada porción del código, lo que significa que puede almacenar toda la secuencia y enviarla a la base de datos de una sola vez , haga lo mismo con el MD5, cuente el flujo usando Scan o Aggregate , pero también podría tener una "ruta" que determine el tipo de mimo y anule la suscripción anticipada. Además, si necesita sincronizar estos elementos nuevamente, podría usar CombineLatest .

Esta pregunta es muy interesante para mí, y me gustaría tener tiempo ahora para publicar algunos ejemplos de código real. Des afortunadamente yo no. Con suerte, esto le da una idea de qué operadores se podrían utilizar en qué configuraciones para lograr lo que está buscando.

Aquí hay un código de psuedo para las partes de lectura que no son de transmisión ...

var connectable = ReadStreamAsObservable(stream).Publish(); var mimeType = connectable.ReadMimeTypeAsObservable(); var md5 = connectable.ReadMD5AsObservable(); var record = connectable.SubmitToDatabaseAsObservable(myDbConnection); var length = connectable.Aggregate(0, (acc, x) => acc + x.Bytes.Length); var parts = Observable.CombineLatest(mimeType, md5, length, record, (mimeType, md5, length, record) => new { MimeType = mimeType, MD5 = md5, Length = length, Record = record }); var subscription = new CompositeDisposable( parts.Subscribe((x) => Console.WriteLine(x)), connectable.Connect() );