programming parallel net library examples example asparallel c# multithreading parallel-processing task-parallel-library plinq

c# - parallel - Cómo paralelizar correctamente un trabajo que depende en gran medida de la E/S



task parallel library c# (5)

Estoy construyendo una aplicación de consola que tiene que procesar un montón de datos.

Básicamente, la aplicación toma referencias de una base de datos. Para cada referencia, analice el contenido del archivo y realice algunos cambios. Los archivos son archivos HTML, y el proceso está realizando un trabajo pesado con los reemplazos RegEx (encuentre referencias y transforme en enlaces). Los resultados luego se almacenan en el sistema de archivos y se envían a un sistema externo.

Si reanudo el proceso, de forma secuencial:

var refs = GetReferencesFromDB(); // ~5000 Datarow returned foreach(var ref in refs) { var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list var html = File.ReadAllText(filePath); // Read html locally, or from a network drive var convertedHtml = ParseHtml(html); File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive SendToWs(ref, convertedHtml); }

Mi programa está funcionando correctamente pero es bastante lento. Por eso quiero paralelizar el proceso.

Por ahora, hice una paralelización simple agregando AsParallel:

var refs = GetReferencesFromDB().AsParallel(); refs.ForAll(ref=> { var filePath = GetFilePath(ref); var html = File.ReadAllText(filePath); var convertedHtml = ParseHtml(html); File.WriteAllText(destinationFilePath); SendToWs(ref, convertedHtml); });

Este simple cambio disminuye la duración del proceso (25% menos de tiempo). Sin embargo, lo que entiendo con la paralelización es que no habrá muchos beneficios (o, peor aún, menos beneficios) si se paralizan los recursos que dependen de la E / S, porque la I / O no se duplicará mágicamente.

Por eso creo que debería cambiar mi enfoque no para paralelizar todo el proceso, sino para crear tareas en cola dependientes encadenadas.

IE, debería crear un flujo como:

Cola de lectura de archivo. Cuando termine, haga cola ParseHtml. Cuando haya terminado, la cola se envía a WS y se escribe localmente. Cuando haya terminado, registre el resultado.

Sin embargo, no sé cómo realizar tal pensamiento.

Siento que terminará en un conjunto de colas de consumidores / productores, pero no encontré una muestra correcta.

Y además, no estoy seguro de si habrá beneficios.

gracias por los consejos

[Editar] De hecho, soy el candidato perfecto para usar c # 4.5 ... si solo fuera rtm :)

[Editar 2] Otra cosa que me hace pensar que no está correctamente paralelizado, es que en el monitor de recursos, veo gráficos de CPU, E / S de red y E / S de disco no estables. cuando uno es alto, otros son bajos a medio


Creo que su enfoque para dividir la lista de archivos y procesar cada archivo en un lote es correcto. Mi sensación es que podrías obtener más ganancia de rendimiento si juegas con un grado de paralelismo. Consulte: var refs = GetReferencesFromDB().AsParallel().WithDegreeOfParallelism(16); Esto comenzaría a procesar 16 archivos al mismo tiempo. Actualmente está procesando probablemente 2 o 4 archivos, dependiendo de la cantidad de núcleos que tenga. Esto solo es eficaz cuando solo tiene cálculos sin IO. Para las tareas intensivas de IO, el ajuste puede traer increíbles mejoras de rendimiento que reducen el tiempo de inactividad del procesador.

Si va a dividirse y volver a unir las tareas usando la apariencia de productor-consumidor en esta muestra: Uso de Extensiones Linq Paralelas para unir dos secuencias, ¿cómo se pueden obtener los resultados más rápidos primero?


La buena noticia es que su lógica podría ser fácilmente dividida en pasos que van en una tubería de productor-consumidor.

  • Paso 1: Leer archivo
  • Paso 2: Parse file
  • Paso 3: Escribir archivo
  • Paso 4: SendToWs

Si está utilizando .NET 4.0, puede usar la estructura de datos de BlockingCollection como la red troncal para la cola productor-consumidor de cada paso. El hilo principal pondrá en cola cada elemento de trabajo en la cola del paso 1, donde se recogerá y procesará y luego se reenviará a la cola del paso 2 y así sucesivamente.

Si está dispuesto a pasar al CTP asíncrono , también puede aprovechar las nuevas estructuras de flujo de datos TPL para esto. Existe la estructura de datos BufferBlock<T> , entre otras, que se comporta de manera similar a BlockingCollection y se integra bien con las nuevas palabras clave async y await .

Debido a que su algoritmo está limitado por la IO, es posible que las estrategias productor / consumidor no le den el impulso de rendimiento que está buscando, pero al menos tendrá una solución muy elegante que se escalaría bien si pudiera aumentar el rendimiento de IO. Me temo que los pasos 1 y 3 serán los cuellos de botella y que la tubería no se equilibre bien, pero vale la pena experimentar.


No está aprovechando ninguna API de E / S asíncrona en ninguno de sus códigos. Todo lo que está haciendo está vinculado a la CPU y todas sus operaciones de E / S van a desperdiciar el bloqueo de los recursos de la CPU. AsParallel es para tareas de cómputo, si desea aprovechar las ventajas de la E / S asíncrona, necesita aprovechar las API basadas en el Modelo de programación asíncrono (APM) en <= v4.0. Esto se hace buscando los métodos BeginXXX/EndXXX en las clases basadas en E / S que está utilizando y aprovechándolos cuando estén disponibles.

Lea esta publicación para comenzar : TPL TaskFactory.FromAsync vs Tareas con métodos de bloqueo

A continuación, no desea utilizar AsParallel en este caso de todos modos. AsParallel habilita la transmisión, lo que resultará en una programación inmediata de una nueva tarea por elemento, pero no es necesario o desea eso aquí. Sería mucho mejor si particionara el trabajo utilizando Parallel::ForEach .

Veamos cómo puede utilizar este conocimiento para lograr la máxima concurrencia en su caso específico:

var refs = GetReferencesFromDB(); // Using Parallel::ForEach here will partition and process your data on separate worker threads Parallel.ForEach( refs, ref => { string filePath = GetFilePath(ref); byte[] fileDataBuffer = new byte[1048576]; // Need to use FileStream API directly so we can enable async I/O FileStream sourceFileStream = new FileStream( filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 8192, true); // Use FromAsync to read the data from the file Task<int> readSourceFileStreamTask = Task.Factory.FromAsync( sourceFileStream.BeginRead sourceFileStream.EndRead fileDataBuffer, fileDataBuffer.Length, null); // Add a continuation that will fire when the async read is completed readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent => { int soureFileStreamBytesRead; try { // Determine exactly how many bytes were read // NOTE: this will propagate any potential exception that may have occurred in EndRead sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result; } finally { // Always clean up the source stream sourceFileStream.Close(); sourceFileStream = null; } // This is here to make sure you don''t end up trying to read files larger than this sample code can handle if(sourceFileStreamBytesRead == fileDataBuffer.Length) { throw new NotSupportedException("You need to implement reading files larger than 1MB. :P"); } // Convert the file data to a string string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead); // Parse the HTML string convertedHtml = ParseHtml(html); // This is here to make sure you don''t end up trying to write files larger than this sample code can handle if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length) { throw new NotSupportedException("You need to implement writing files larger than 1MB. :P"); } // Convert the file data back to bytes for writing Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0); // Need to use FileStream API directly so we can enable async I/O FileStream destinationFileStream = new FileStream( destinationFilePath, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 8192, true); // Use FromAsync to read the data from the file Task destinationFileStreamWriteTask = Task.Factory.FromAsync( destinationFileStream.BeginWrite, destinationFileStream.EndWrite, fileDataBuffer, 0, fileDataBuffer.Length, null); // Add a continuation that will fire when the async write is completed destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent => { try { // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite destinationFileStreamWriteAntecedent.Wait(); } finally { // Always close the destination file stream destinationFileStream.Close(); destinationFileStream = null; } }, TaskContinuationOptions.AttachedToParent); // Send to external system **concurrent** to writing to destination file system above SendToWs(ref, convertedHtml); }, TaskContinuationOptions.AttachedToParent); });

Ahora, aquí hay algunas notas:

  1. Este es un código de muestra, así que estoy usando un búfer de 1MB para leer / escribir archivos. Esto es excesivo para los archivos HTML y un desperdicio de recursos del sistema. Puede reducirlo para adaptarlo a sus necesidades máximas o implementar lecturas / escrituras encadenadas en un StringBuilder que es un ejercicio que le dejo ya que estaría escribiendo ~ 500 líneas más de código para hacer lecturas / escrituras asíncronas encadenadas. :PAG
  2. Notará que en las continuaciones para las tareas de lectura / escritura tengo TaskContinuationOptions.AttachedToParent . Esto es muy importante ya que evitará que el subproceso de trabajo con el que Parallel::ForEach comience el trabajo se complete hasta que todas las llamadas asíncronas subyacentes se hayan completado. Si no estuviera aquí, iniciaría el trabajo para los 5000 elementos al mismo tiempo, lo que contaminaría el subsistema TPL con miles de tareas programadas y no se escalaría correctamente.
  3. Llamo a SendToWs al mismo tiempo para escribir el archivo en el recurso compartido de archivos aquí. No sé qué es lo que subyace en la implementación de SendToWs, pero también suena como un buen candidato para hacer async. En este momento se asume que es un trabajo de cálculo puro y, como tal, va a grabar un subproceso de la CPU mientras se ejecuta. Lo dejo como ejercicio para descubrir la mejor manera de aprovechar lo que le he mostrado para mejorar el rendimiento allí.
  4. Esto es todo forma libre y mi cerebro fue el único compilador aquí y el resaltado de sintaxis de SO es todo lo que usé para asegurarme de que la sintaxis fuera buena. Entonces, perdonen los errores de sintaxis y háganme saber si he cometido un error tan grave que no se puede poner cara de cara y seguiré.

Solo una sugerencia, pero ¿has examinado el patrón de Consumidor / Productor? Un cierto número de subprocesos leerían sus archivos en el disco y enviarían el contenido a una cola. Luego, otro conjunto de subprocesos, conocido como los consumidores, "consumiría" la cola a medida que se llena. http://zone.ni.com/devzone/cda/tut/p/id/3023


Su mejor apuesta en este tipo de escenario es definitivamente el modelo productor-consumidor. Un hilo para extraer los datos y un montón de trabajadores para procesarlos. No hay una manera fácil de evitar la E / S, por lo que también debería centrarse en optimizar el cálculo en sí.

Ahora intentaré dibujar un modelo:

// producer thread var refs = GetReferencesFromDB(); // ~5000 Datarow returned foreach(var ref in refs) { lock(queue) { queue.Enqueue(ref); event.Set(); } // if the queue is limited, test if the queue is full and wait. } // consumer threads while(true) { value = null; lock(queue) { if(queue.Count > 0) { value = queue.Dequeue(); } } if(value != null) // process value else event.WaitOne(); // event to signal that an item was placed in the queue. }

Puede encontrar más detalles sobre el productor / consumidor en la parte 4 de Threading en C #: http://www.albahari.com/threading/part4.aspx