c# - que - qué es un flujo en java
¿Cuál es un buen método para manejar flujos de E/S de red basados en línea? (2)
Esa es una pregunta bastante interesante. La solución para mí en el pasado ha sido utilizar un hilo separado con operaciones sincrónicas, como usted propone. (Logré solucionar la mayoría de los problemas con el bloqueo de sockets usando bloqueos y muchos manejadores de excepciones.) Aún así, el uso de las operaciones asíncronas incorporadas es generalmente aconsejable, ya que permite E / S asíncronas de verdadero nivel del sistema operativo, por lo que entiendo tu punto.
Bueno, me fui y escribí una clase para lograr lo que creo que necesitas (de una manera relativamente limpia, diría yo). Déjame saber lo que piensas.
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
public class AsyncStreamProcessor : IDisposable
{
protected StringBuilder _buffer; // Buffer for unprocessed data.
private bool _isDisposed = false; // True if object has been disposed
public AsyncStreamProcessor()
{
_buffer = null;
}
public IEnumerable<string> Process(byte[] newData)
{
// Note: replace the following encoding method with whatever you are reading.
// The trick here is to add an extra line break to the new data so that the algorithm recognises
// a single line break at the end of the new data.
using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
{
// Read all lines from new data, returning all but the last.
// The last line is guaranteed to be incomplete (or possibly complete except for the line break,
// which will be processed with the next packet of data).
string line, prevLine = null;
while ((line = newDataReader.ReadLine()) != null)
{
if (prevLine != null)
{
yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
_buffer = null;
}
prevLine = line;
}
// Store last incomplete line in buffer.
if (_buffer == null)
// Note: the (* 2) gives you the prediction of the length of the incomplete line,
// so that the buffer does not have to be expanded in most/all situations.
// Change it to whatever seems appropiate.
_buffer = new StringBuilder(prevLine, prevLine.Length * 2);
else
_buffer.Append(prevLine);
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (!_isDisposed)
{
if (disposing)
{
// Dispose managed resources.
_buffer = null;
GC.Collect();
}
// Dispose native resources.
// Remember that object has been disposed.
_isDisposed = true;
}
}
}
Se debe crear una instancia de esta clase para cada NetworkStream y se debe invocar la función de Proceso cada vez que se reciban datos nuevos (en el método de devolución de llamada para BeginRead, antes de llamar al próximo BeginRead que me imagino).
Nota: Solo he verificado este código con datos de prueba, no datos reales transmitidos a través de la red. Sin embargo, no anticiparía ninguna diferencia ...
También, una advertencia de que la clase, por supuesto, no es segura para subprocesos, pero mientras BeginRead no se ejecute nuevamente hasta después de que se hayan procesado los datos actuales (como supongo que está haciendo), no debería haber ningún problema.
Espero que esto funcione para usted. Avíseme si quedan problemas pendientes e intentaré modificar la solución para resolverlos. (¡Bien podría haber algo de sutil en la pregunta que extrañé, a pesar de leerla con cuidado!)
Nota: Permítanme pedir disculpas por la longitud de esta pregunta, tuve que poner mucha información en ella. Espero que eso no cause que mucha gente simplemente lo mire y haga suposiciones. Por favor, lea en su totalidad. Gracias.
Tengo un flujo de datos entrando por un socket. Esta información está orientada a la línea.
Estoy usando el APM (Método de programación Async) de .NET (BeginRead, etc.). Esto imposibilita el uso de E / S basadas en flujo porque Async I / O está basado en el buffer. Es posible volver a empaquetar los datos y enviarlos a una secuencia, como una secuencia de memoria, pero también hay problemas.
El problema es que mi flujo de entrada (que no tengo control) no me da ninguna información sobre cuánto tiempo dura la transmisión. Simplemente es una secuencia de líneas nuevas que se ven así:
COMMAND/n
...Unpredictable number of lines of data.../n
END COMMAND/n
....repeat....
Entonces, usando APM, y como no sé cuánto tiempo será un conjunto de datos dado, es probable que los bloques de datos crucen los límites del buffer que requieren múltiples lecturas, pero esas lecturas múltiples también abarcarán múltiples bloques de datos.
Ejemplo:
Byte buffer[1024] = ".................blah/nThis is another l"
[another read]
"ine/n.............................More Lines..."
Lo primero que pensé fue utilizar un StringBuilder y simplemente agregar las líneas de buffer a la SB. Esto funciona hasta cierto punto, pero me resultó difícil extraer bloques de datos. Intenté usar un StringReader para leer datos nuevos, pero no había forma de saber si obtenía una línea completa o no, ya que StringReader devuelve una línea parcial al final del último bloque agregado, seguido de devolver nulo posteriormente. No hay forma de saber si lo que se devolvió fue una línea de datos completa.
Ejemplo:
// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line/nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine(); // returns "This is incomp.."
Lo que es peor, es que si sigo añadiendo datos, los almacenamientos intermedios se hacen cada vez más grandes, y dado que esto podría funcionar durante semanas o meses a la vez, esa no es una buena solución.
Mi siguiente pensamiento fue eliminar bloques de datos de SB cuando los leí. Esto requirió escribir mi propia función ReadLine, pero luego me quedé atrapado bloqueando los datos durante las lecturas y escrituras. Además, los bloques de datos más grandes (que pueden consistir en cientos de lecturas y megabytes de datos) requieren escanear todo el búfer en busca de nuevas líneas. No es eficiente y bastante feo.
Estoy buscando algo que tenga la simplicidad de un StreamReader / Writer con la conveniencia de Async I / O.
Mi siguiente pensamiento fue utilizar un MemoryStream, y escribir los bloques de datos en una secuencia de memoria, luego adjuntar un StreamReader a la secuencia y usar ReadLine, pero nuevamente tengo problemas para saber si la última lectura en el buffer es una línea completa o no, además es aún más difícil eliminar los datos "obsoletos" de la transmisión.
También pensé en usar un hilo con lecturas sincrónicas. Esto tiene la ventaja de que al usar un StreamReader, siempre devolverá una línea completa desde una ReadLine (), excepto en situaciones de conexión interrumpidas. Sin embargo, esto tiene problemas para cancelar la conexión, y ciertos tipos de problemas de red pueden resultar en enchufes bloqueados durante un período prolongado. Estoy usando async IO porque no quiero atar un hilo durante la vida del programa que bloquea la recepción de datos.
La conexión es de larga duración. Y los datos continuarán fluyendo con el tiempo. Durante la conexión inicial, hay un gran flujo de datos, y una vez que se realiza el flujo, el socket permanece abierto esperando actualizaciones en tiempo real. No sé exactamente cuándo el flujo inicial ha "terminado", ya que la única forma de saber es que ya no se envían más datos de inmediato. Esto significa que no puedo esperar a que termine la carga de datos inicial antes del procesamiento, estoy bastante atascado procesando "en tiempo real" cuando entra.
Entonces, ¿alguien puede sugerir un buen método para manejar esta situación de una manera que no sea demasiado complicada? Realmente quiero que esto sea lo más simple y elegante posible, pero sigo encontrando soluciones cada vez más complicadas debido a todos los casos extremos. Supongo que lo que quiero es algún tipo de FIFO en el que pueda agregar más datos fácilmente y, al mismo tiempo, extraer datos que coincidan con ciertos criterios (es decir, cadenas terminadas en nueva línea).
Lo que estás explicando en tu pregunta, me recuerda mucho a las cadenas ASCIZ. ( enlace de texto ). Eso puede ser un comienzo útil.
Tuve que escribir algo similar a esto en la universidad para un proyecto en el que estaba trabajando. Desgraciadamente, tenía control sobre el socket de envío, así que inserté una longitud de campo de mensaje como parte del protocolo. Sin embargo, creo que un enfoque similar puede beneficiarlo.
Cómo me acerqué a mi solución fue que enviaría algo como 5HELLO, así que primero vería 5, y sabría que tenía una longitud de mensaje de 5, y por lo tanto el mensaje que necesitaba era de 5 caracteres. Sin embargo, si en mi lectura asíncrona, solo obtuve 5HE, vería que tengo longitud de mensaje 5, pero solo pude leer 3 bytes del cable (supongamos caracteres ASCII). Debido a esto, sabía que me faltaban algunos bytes, y almacenaba lo que tenía en el buffer de fragmentos. Tenía un buffer de fragmento por socket, por lo tanto, evité cualquier problema de sincronización. El proceso aproximado es
- Leer desde el socket en una matriz de bytes, registrar cuántos bytes se leyeron
- Escanee por byte por byte, hasta que encuentre un carácter de nueva línea (esto se vuelve muy complejo si no está recibiendo caracteres ascii, pero los caracteres pueden ser de varios bytes, usted es el único para eso)
- Convierta su buffer de fragmentación en una cadena, y anexe su buffer de lectura hasta la nueva línea. Coloque esta cadena como un mensaje completo en una cola o su propio delegado para ser procesado. (Puede optimizar estos búferes si realmente tiene su escritura de socket de lectura en la misma matriz de bytes como fragmento, pero eso es más difícil de explicar)
- Continúe repitiendo, cada vez que encontremos una nueva línea, cree una cadena desde el arreglo de byte desde una posición de inicio / final grabada y suéltelo en la cola / delegue para procesarlo.
- Una vez que llegamos al final de nuestro buffer de lectura, copia todo lo que quede en el buffer de frag.
- Llame a BeginRead en el zócalo, que saltará al paso 1. cuando haya datos disponibles en el zócalo.
Luego, usa otro hilo para leer que está en cola de mensajes de incommign, o simplemente deje que Threadpool lo maneje usando delegados. Y haz el procesamiento de datos que tengas que hacer. Alguien me va a corregir si estoy equivocado, pero hay muy pocos problemas de sincronización de hilos con esto, ya que solo puedes leer o esperar a leer desde el socket en cualquier momento, así que no te preocupes por los bloqueos (excepto si eres poblando una cola, utilicé delegados en mi implementación). Hay algunos detalles que deberá resolver por su cuenta, como qué tan grande de un buffer de fragmentación dejar, si recibe 0 nuevas líneas cuando lee, todo el mensaje debe ser anexado al buffer de fragmentos sin sobreescribir cualquier cosa. Creo que al final me dieron aproximadamente entre 700 y 800 líneas de código, pero eso incluía la configuración de la conexión, la negociación para el cifrado y algunas otras cosas más.
Esta configuración funcionó muy bien para mí; Pude realizar hasta 80Mbps en una LAN ethernet de 100Mbps usando esta implementación con un opteron de 1.8Ghz, incluido el procesamiento de encriptación. Y como está vinculado al socket, el servidor escalará ya que se pueden trabajar múltiples sockets al mismo tiempo. Si necesita artículos procesados en orden, deberá usar una cola, pero si el pedido no importa, los delegados le darán un rendimiento muy escalable fuera del grupo de temas.
Espero que esto ayude, no pretende ser una solución completa, sino una dirección para comenzar a buscar.
* Solo una nota, mi implementación fue puramente a nivel de bytes y cifrado compatible, utilicé caracteres para mi ejemplo para que sea más fácil de visualizar.