threads - tap c#
Lectura/escritura de flujo asÃncrono.NET (6)
A pesar de que va contra corriente ayudar a las personas con sus tareas, dado que esto tiene más de un año, esta es la manera correcta de lograr esto. Todo lo que necesita para superponer sus operaciones de lectura / escritura - no se necesita engendrar hilos adicionales, ni nada más.
public static class StreamExtensions
{
private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
public static void CopyTo( this Stream input , Stream output )
{
input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
return ;
}
public static void CopyTo( this Stream input , Stream output , int bufferSize )
{
if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ;
int[] bufl = { 0 , 0 } ;
int bufno = 0 ;
IAsyncResult read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
IAsyncResult write = null ;
while ( true )
{
// wait for the read operation to complete
read.AsyncWaitHandle.WaitOne() ;
bufl[bufno] = input.EndRead(read) ;
// if zero bytes read, the copy is complete
if ( bufl[bufno] == 0 )
{
break ;
}
// wait for the in-flight write operation, if one exists, to complete
// the only time one won''t exist is after the very first read operation completes
if ( write != null )
{
write.AsyncWaitHandle.WaitOne() ;
output.EndWrite(write) ;
}
// start the new write operation
write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;
// toggle the current, in-use buffer
// and start the read operation on the new buffer.
//
// Changed to use XOR to toggle between 0 and 1.
// A little speedier than using a ternary expression.
bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
}
// wait for the final in-flight write operation, if one exists, to complete
// the only time one won''t exist is if the input stream is empty.
if ( write != null )
{
write.AsyncWaitHandle.WaitOne() ;
output.EndWrite(write) ;
}
output.Flush() ;
// return to the caller ;
return ;
}
public static async Task CopyToAsync( this Stream input , Stream output )
{
await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
return;
}
public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
{
if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ;
int[] bufl = { 0 , 0 } ;
int bufno = 0 ;
Task<int> read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
Task write = null ;
while ( true )
{
await read ;
bufl[bufno] = read.Result ;
// if zero bytes read, the copy is complete
if ( bufl[bufno] == 0 )
{
break;
}
// wait for the in-flight write operation, if one exists, to complete
// the only time one won''t exist is after the very first read operation completes
if ( write != null )
{
await write ;
}
// start the new write operation
write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;
// toggle the current, in-use buffer
// and start the read operation on the new buffer.
//
// Changed to use XOR to toggle between 0 and 1.
// A little speedier than using a ternary expression.
bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );
}
// wait for the final in-flight write operation, if one exists, to complete
// the only time one won''t exist is if the input stream is empty.
if ( write != null )
{
await write;
}
output.Flush();
// return to the caller ;
return;
}
}
Aclamaciones.
He estado tratando de resolver este ejercicio de examen de "Simultaneidad Concurrente" (en C #):
Sabiendo que la clase
Stream
contiene los métodosint Read(byte[] buffer, int offset, int size)
yvoid Write(byte[] buffer, int offset, int size)
, implementa en C # el métodoNetToFile
que copia todos los datos recibidos deNetworkStream net
instancia a la instancia delFileStream file
. Para realizar la transferencia, utilice lecturas asíncronas y escrituras síncronas, evitando que se bloquee un hilo durante las operaciones de lectura. La transferencia finaliza cuando la operación de lectura de lanet
devuelve el valor 0. Para simplificar, no es necesario admitir la cancelación controlada de la operación.
void NetToFile(NetworkStream net, FileStream file);
He estado tratando de resolver este ejercicio, pero estoy luchando con una pregunta relacionada con la pregunta en sí. Pero primero, aquí está mi código:
public static void NetToFile(NetworkStream net, FileStream file) {
byte[] buffer = new byte[4096]; // buffer with 4 kB dimension
int offset = 0; // read/write offset
int nBytesRead = 0; // number of bytes read on each cycle
IAsyncResult ar;
do {
// read partial content of net (asynchronously)
ar = net.BeginRead(buffer,offset,buffer.Length,null,null);
// wait until read is completed
ar.AsyncWaitHandle.WaitOne();
// get number of bytes read on each cycle
nBytesRead = net.EndRead(ar);
// write partial content to file (synchronously)
fs.Write(buffer,offset,nBytesRead);
// update offset
offset += nBytesRead;
}
while( nBytesRead > 0);
}
La pregunta que tengo es que, en la declaración de pregunta, se dice:
Para realizar la transferencia, utilice lecturas asíncronas y escrituras síncronas, evitando que se bloquee un hilo durante las operaciones de lectura
No estoy seguro de si mi solución logra lo que se quiere en este ejercicio, porque estoy usando AsyncWaitHandle.WaitOne()
para esperar hasta que se complete la lectura asincrónica.
Por otro lado, no estoy realmente averiguando qué se supone que es una solución "sin bloqueo" en este escenario, ya que la escritura FileStream
está hecha de forma sincrónica ... y para hacerlo, tengo que esperar hasta que se complete la lectura de NetworkStream
para continuar con la escritura de FileStream
, ¿no es así?
¿Puedes ayudarme con esto?
[EDITAR 1] Usar la solución de devolución de llamada
De acuerdo, si entendí lo que Mitchel Sellers y willvv respondieron, se me aconsejó utilizar un método de devolución de llamada para convertir esto en una solución "sin bloqueo". Aquí está mi código, entonces:
byte[] buffer; // buffer
public static void NetToFile(NetworkStream net, FileStream file) {
// buffer with same dimension as file stream data
buffer = new byte[file.Length];
//start asynchronous read
net.BeginRead(buffer,0,buffer.Length,OnEndRead,net);
}
//asynchronous callback
static void OnEndRead(IAsyncResult ar) {
//NetworkStream retrieve
NetworkStream net = (NetworkStream) ar.IAsyncState;
//get number of bytes read
int nBytesRead = net.EndRead(ar);
//write content to file
//... and now, how do I write to FileStream instance without
//having its reference??
//fs.Write(buffer,0,nBytesRead);
}
Como habrás notado, estoy atascado en el método de devolución de llamada, ya que no tengo una referencia a la instancia de FileStream
en la que deseo invocar el método "Write (...)".
Además, esta no es una solución segura para subprocesos, ya que el campo byte[]
está expuesto y puede compartirse entre invocaciones de NetToFile
simultáneas. No sé cómo resolver este problema sin exponer este campo byte[]
en el alcance externo ... y estoy casi seguro de que no se expondrá de esta manera.
No quiero utilizar una solución de método lambda o anónima, porque eso no está en el plan de estudios del curso de "Programación concurrente".
Dudo que este sea el código más rápido (hay una sobrecarga de la abstracción de tareas de .NET) pero creo que es un enfoque más limpio para todo lo de la copia asincrónica.
Necesitaba un CopyTransformAsync
en el que podía pasar a un delegado para hacer algo a medida que se pasaban trozos a través de la operación de copia. por ejemplo, calcular un resumen del mensaje mientras se copia. Es por eso que me interesé en rodar mi propia opción.
Recomendaciones:
- CopyToAsync bufferSize es sensible (se requiere un gran buffer)
- FileOptions.Asynchronous -> lo hace tremendamente lento (no estoy seguro de por qué es eso)
- El bufferSize de los objetos FileStream puede ser más pequeño (no es tan importante)
- La prueba
Serial
es claramente la más rápida y la más intensiva en recursos
Esto es lo que encontré y el código fuente completo del programa que usé para probar esto. En mi máquina, estas pruebas se ejecutaron en un disco SSD y es el equivalente de una copia de archivo. Normalmente, no querrás usar esto solo para copiar archivos; en cambio, cuando tengas una transmisión en red (que es mi caso de uso), es cuando querrás usar algo como esto.
4K buffer
Serial... in 0.474s
CopyToAsync... timed out
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... timed out
CopyTransformAsync (Asynchronous)... timed out
8K buffer
Serial... in 0.344s
CopyToAsync... timed out
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... in 1.116s
CopyTransformAsync (Asynchronous)... timed out
40K buffer
Serial... in 0.195s
CopyToAsync... in 0.624s
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... in 0.378s
CopyTransformAsync (Asynchronous)... timed out
80K buffer
Serial... in 0.190s
CopyToAsync... in 0.355s
CopyToAsync (Asynchronous)... in 1.196s
CopyTransformAsync... in 0.300s
CopyTransformAsync (Asynchronous)... in 0.886s
160K buffer
Serial... in 0.432s
CopyToAsync... in 0.252s
CopyToAsync (Asynchronous)... in 0.454s
CopyTransformAsync... in 0.447s
CopyTransformAsync (Asynchronous)... in 0.555s
Aquí puede ver el Explorador de procesos, gráfico de rendimiento a medida que se ejecuta la prueba. Básicamente, cada parte superior (en el más bajo de los tres gráficos) es el comienzo de la prueba en serie. Puede ver claramente cómo el rendimiento aumenta dramáticamente a medida que crece el tamaño del búfer. Parecería que planea en algún lugar alrededor de 80K, que es lo que el método .NET framework CopyToAsync
usa internamente.
Lo bueno aquí es que la implementación final no fue tan complicada:
static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
, Stream outputStream
, Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
)
{
var temp = new byte[bufferSize];
var temp2 = new byte[bufferSize];
int i = 0;
var readTask = inputStream
.ReadAsync(temp, 0, bufferSize)
.ConfigureAwait(false);
var writeTask = CompletedTask.ConfigureAwait(false);
for (; ; )
{
// synchronize read
int read = await readTask;
if (read == 0)
{
break;
}
if (i++ > 0)
{
// synchronize write
await writeTask;
}
var chunk = new ArraySegment<byte>(temp, 0, read);
// do transform (if any)
if (!(transform == null))
{
chunk = transform(chunk);
}
// queue write
writeTask = outputStream
.WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
.ConfigureAwait(false);
// queue read
readTask = inputStream
.ReadAsync(temp2, 0, bufferSize)
.ConfigureAwait(false);
// swap buffer
var temp3 = temp;
temp = temp2;
temp2 = temp3;
}
await writeTask; // complete any lingering write task
}
Este método de entrelazado de lectura / escritura a pesar de los enormes búferes es en algún momento entre 18% más rápido que BCL CopyToAsync
.
Por curiosidad, modifiqué las llamadas asincrónicas a las típicas llamadas de patrón asíncronas de inicio / finalización y eso no mejoró un poco la situación, empeoró las cosas. Por todo lo que me gusta bash sobre la tara de abstracción de tareas, hacen algunas cosas ingeniosas cuando escribes tu código con las palabras clave async / await y es mucho más agradable leer ese código.
Es extraño que nadie haya mencionado a TPL.
Here es una muy buena publicación del equipo de PFX (Stephen Toub) sobre cómo implementar una copia de transmisión asincrónica simultánea. La publicación contiene refenrece obsoleta a las muestras así que aquí está el primer:
Obtenga Extras de Extensiones Paralelas de code.msdn luego
var task = sourceStream.CopyStreamToStreamAsync(destinationStream);
// do what you want with the task, for example wait when it finishes:
task.Wait();
También considere usar AsyncEnumerator de AsyncEnumerator .
Necesitará usar la devolución de llamada desde la lectura de NetStream para manejar esto. Y, francamente, podría ser más fácil ajustar la lógica de copia en su propia clase para que pueda mantener la instancia de las secuencias activas.
Así es como me acercaría (no probado):
public class Assignment1
{
public static void NetToFile(NetworkStream net, FileStream file)
{
var copier = new AsyncStreamCopier(net, file);
copier.Start();
}
public static void NetToFile_Option2(NetworkStream net, FileStream file)
{
var completedEvent = new ManualResetEvent(false);
// copy as usual but listen for completion
var copier = new AsyncStreamCopier(net, file);
copier.Completed += (s, e) => completedEvent.Set();
copier.Start();
completedEvent.WaitOne();
}
/// <summary>
/// The Async Copier class reads the input Stream Async and writes Synchronously
/// </summary>
public class AsyncStreamCopier
{
public event EventHandler Completed;
private readonly Stream input;
private readonly Stream output;
private byte[] buffer = new byte[4096];
public AsyncStreamCopier(Stream input, Stream output)
{
this.input = input;
this.output = output;
}
public void Start()
{
GetNextChunk();
}
private void GetNextChunk()
{
input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
}
private void InputReadComplete(IAsyncResult ar)
{
// input read asynchronously completed
int bytesRead = input.EndRead(ar);
if (bytesRead == 0)
{
RaiseCompleted();
return;
}
// write synchronously
output.Write(buffer, 0, bytesRead);
// get next
GetNextChunk();
}
private void RaiseCompleted()
{
if (Completed != null)
{
Completed(this, EventArgs.Empty);
}
}
}
}
Tienes razón, lo que estás haciendo es básicamente lectura síncrona, porque usas el método WaitOne () y simplemente detiene la ejecución hasta que los datos estén listos, eso es básicamente lo mismo que hacerlo usando Read () en lugar de BeginRead ( ) y EndRead ().
Lo que tienes que hacer es usar el argumento de devolución de llamada en el método BeginRead (), con él definirás un método de devolución de llamada (o una expresión lambda), este método se invocará cuando la información haya sido leída (en el método de devolución de llamada que tiene que verificar el final del flujo y escribir en el flujo de salida), de esta manera no estará bloqueando el hilo principal (no necesitará el WaitOne () ni el EndRead ().
Espero que esto ayude.
Wow, ¡todos son muy complejos! Aquí está mi solución asíncrona, y es solo una función. Read () y BeginWrite () se ejecutan al mismo tiempo.
/// <summary>
/// Copies a stream.
/// </summary>
/// <param name="source">The stream containing the source data.</param>
/// <param name="target">The stream that will receive the source data.</param>
/// <remarks>
/// This function copies until no more can be read from the stream
/// and does not close the stream when done.<br/>
/// Read and write are performed simultaneously to improve throughput.<br/>
/// If no data can be read for 60 seconds, the copy will time-out.
/// </remarks>
public static void CopyStream(Stream source, Stream target)
{
// This stream copy supports a source-read happening at the same time
// as target-write. A simpler implementation would be to use just
// Write() instead of BeginWrite(), at the cost of speed.
byte[] readbuffer = new byte[4096];
byte[] writebuffer = new byte[4096];
IAsyncResult asyncResult = null;
for (; ; )
{
// Read data into the readbuffer. The previous call to BeginWrite, if any,
// is executing in the background..
int read = source.Read(readbuffer, 0, readbuffer.Length);
// Ok, we have read some data and we''re ready to write it, so wait here
// to make sure that the previous write is done before we write again.
if (asyncResult != null)
{
// This should work down to ~0.01kb/sec
asyncResult.AsyncWaitHandle.WaitOne(60000);
target.EndWrite(asyncResult); // Last step to the ''write''.
if (!asyncResult.IsCompleted) // Make sure the write really completed.
throw new IOException("Stream write failed.");
}
if (read <= 0)
return; // source stream says we''re done - nothing else to read.
// Swap the read and write buffers so we can write what we read, and we can
// use the then use the other buffer for our next read.
byte[] tbuf = writebuffer;
writebuffer = readbuffer;
readbuffer = tbuf;
// Asynchronously write the data, asyncResult.AsyncWaitHandle will
// be set when done.
asyncResult = target.BeginWrite(writebuffer, 0, read, null, null);
}
}