run - c#.net 4.5 async/multithread?
create async method c# (4)
Creo que no necesitas async
y await
cosas aquí. Pueden ayudar en la aplicación de escritorio donde necesita mover su trabajo a un hilo que no sea de GUI. En mi opinión, será mejor usar el método Parallel.ForEach
en su caso. Algo como esto:
DataSet alldata;
var bag = new ConcurrentBag<DataSet>();
Parallel.ForEach(the8000urls, url =>
{
// ScrapeData downloads the html from the url with WebClient.DownloadString
// and scrapes the data into several datatables which it returns as a dataset.
DataSet dataForOnePage = ScrapeData(url);
// Add data for one page to temp bag
bag.Add(dataForOnePage);
});
//merge each table in dataForOnePage into allData from bag
PushAllDataToSql(alldata);
Estoy escribiendo una aplicación de consola C # que raspa datos de páginas web.
Esta aplicación irá a aproximadamente 8000 páginas web y eliminará datos (el mismo formato de datos en cada página).
Lo tengo funcionando ahora mismo sin métodos asíncronos y sin subprocesos múltiples.
Sin embargo, necesito que sea más rápido. Solo usa alrededor del 3% -6% de la CPU, creo que porque se demora en descargar el HTML (WebClient.DownloadString (url))
Este es el flujo básico de mi programa.
DataSet alldata;
foreach(var url in the8000urls)
{
// ScrapeData downloads the html from the url with WebClient.DownloadString
// and scrapes the data into several datatables which it returns as a dataset.
DataSet dataForOnePage = ScrapeData(url);
//merge each table in dataForOnePage into allData
}
// PushAllDataToSql(alldata);
He estado tratando de multiprocesar esto, pero no estoy seguro de cómo comenzar correctamente. Estoy usando .net 4.5 y mi entendimiento es asíncrono y espero que en 4.5 se haga para que esto sea mucho más fácil de programar pero todavía estoy un poco perdido.
Mi idea era seguir creando nuevos hilos que son asíncronos para esta línea.
DataSet dataForOnePage = ScrapeData(url);
y luego como cada uno termina, corre
//merge each table in dataForOnePage into allData
¿Alguien puede indicarme la dirección correcta sobre cómo hacer que esa línea sea asíncrona en .net 4.5 c # y luego hacer que mi método de combinación se ejecute?
Gracias.
Edición: Aquí está mi método ScrapeData:
public static DataSet GetProperyData(CookieAwareWebClient webClient, string pageid)
{
var dsPageData = new DataSet();
// DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT
string url = @"https://domain.com?&id=" + pageid + @"restofurl";
string html = webClient.DownloadString(url);
var doc = new HtmlDocument();
doc.LoadHtml(html );
// A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData
return dsPageData ;
}
Recomiendo leer mi introducción razonablemente completa a async
/ await
.
Primero, haga que todo sea asíncrono, comenzando en el nivel inferior:
public static async Task<DataSet> ScrapeDataAsync(string pageid)
{
CookieAwareWebClient webClient = ...;
var dsPageData = new DataSet();
// DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT
string url = @"https://domain.com?&id=" + pageid + @"restofurl";
string html = await webClient.DownloadStringTaskAsync(url).ConfigureAwait(false);
var doc = new HtmlDocument();
doc.LoadHtml(html);
// A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData
return dsPageData;
}
Luego puede consumirlo de la siguiente manera (utilizando async
con LINQ):
DataSet alldata;
var tasks = the8000urls.Select(async url =>
{
var dataForOnePage = await ScrapeDataAsync(url);
//merge each table in dataForOnePage into allData
});
await Task.WhenAll(tasks);
PushAllDataToSql(alldata);
Y use AsyncContext.RunTask de mi biblioteca AsyncEx ya que esta es una aplicación de consola :
class Program
{
static int Main(string[] args)
{
try
{
return AsyncContext.Run(() => MainAsync(args));
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
return -1;
}
}
static async Task<int> MainAsync(string[] args)
{
...
}
}
Eso es. No hay necesidad de bloqueo ni continuaciones ni nada de eso.
Si desea usar el async
y await
palabras clave (aunque no tiene que hacerlo, pero hacen las cosas más fáciles en .NET 4.5), primero desearía cambiar su método ScrapeData
para devolver una instancia de Task<T>
usando la palabra clave async
, como así:
async Task<DataSet> ScrapeDataAsync(Uri url)
{
// Create the HttpClientHandler which will handle cookies.
var handler = new HttpClientHandler();
// Set cookies on handler.
// Await on an async call to fetch here, convert to a data
// set and return.
var client = new HttpClient(handler);
// Wait for the HttpResponseMessage.
HttpResponseMessage response = await client.GetAsync(url);
// Get the content, await on the string content.
string content = await response.Content.ReadAsStringAsync();
// Process content variable here into a data set and return.
DataSet ds = ...;
// Return the DataSet, it will return Task<DataSet>.
return ds;
}
Tenga en cuenta que probablemente querrá alejarse de la clase de WebClient
, ya que no es compatible con la Task<T>
inherentemente en sus operaciones asíncronas. Una mejor opción en .NET 4.5 es la clase HttpClient
. He elegido usar HttpClient
arriba. Además, eche un vistazo a la clase HttpClientHandler
, específicamente la propiedad CookieContainer
, que utilizará para enviar cookies con cada solicitud.
Sin embargo, esto significa que es muy probable que tenga que utilizar la palabra clave await
para esperar otra operación asíncrona, que en este caso, probablemente sea la descarga de la página. Tendrá que adaptar las llamadas que descargan datos para usar las versiones asíncronas y await
.
Una vez que se completa, normalmente llamará a await
en eso, pero no puede hacer eso en este escenario porque await
en una variable. En este escenario, está ejecutando un bucle, por lo que la variable se restablecería con cada iteración. En este caso, es mejor simplemente almacenar la Task<T>
en una matriz como la siguiente:
DataSet alldata = ...;
var tasks = new List<Task<DataSet>>();
foreach(var url in the8000urls)
{
// ScrapeData downloads the html from the url with
// WebClient.DownloadString
// and scrapes the data into several datatables which
// it returns as a dataset.
tasks.Add(ScrapeDataAsync(url));
}
Existe la cuestión de fusionar los datos en allData
. Para ese fin, desea llamar al método ContinueWith
en la instancia de Task<T>
devuelta y realizar la tarea de agregar los datos a allData
:
DataSet alldata = ...;
var tasks = new List<Task<DataSet>>();
foreach(var url in the8000urls)
{
// ScrapeData downloads the html from the url with
// WebClient.DownloadString
// and scrapes the data into several datatables which
// it returns as a dataset.
tasks.Add(ScrapeDataAsync(url).ContinueWith(t => {
// Lock access to the data set, since this is
// async now.
lock (allData)
{
// Add the data.
}
});
}
Luego, puede esperar en todas las tareas utilizando el método WhenAll
en la clase de Task
y await
en eso:
// After your loop.
await Task.WhenAll(tasks);
// Process allData
Sin embargo, tenga en cuenta que tiene un foreach
, y WhenAll
toma una IEnumerable<T>
. Este es un buen indicador de que es adecuado para usar LINQ, que es:
DataSet alldata;
var tasks =
from url in the8000Urls
select ScrapeDataAsync(url).ContinueWith(t => {
// Lock access to the data set, since this is
// async now.
lock (allData)
{
// Add the data.
}
});
await Task.WhenAll(tasks);
// Process allData
También puede optar por no utilizar la sintaxis de consulta si lo desea, no importa en este caso.
Tenga en cuenta que si el método que contiene no está marcado como async
(debido a que está en una aplicación de consola y tiene que esperar los resultados antes de que la aplicación termine), simplemente puede llamar al método Wait
en la Task
devuelta cuando llame a WhenAll
: WhenAll
:
// This will block, waiting for all tasks to complete, all
// tasks will run asynchronously and when all are done, then the
// code will continue to execute.
Task.WhenAll(tasks).Wait();
// Process allData.
A saber, el punto es que desea recopilar sus instancias de Task
en una secuencia y luego esperar en toda la secuencia antes de procesar allData
.
Sin embargo, sugeriría intentar procesar los datos antes de fusionarlos en allData
si es posible; a menos que el procesamiento de datos requiera el conjunto completo de datos , obtendrá aún más ganancias de rendimiento al procesar la mayor cantidad de datos que recupera cuando los recupera, en lugar de esperar a que todos regresen.
También puede usar TPL Dataflow , que es un buen ajuste para este tipo de problema.
En este caso, crea una "malla de flujo de datos" y luego los datos fluyen a través de ella.
Este es en realidad más como una tubería que una "malla". Estoy poniendo en tres pasos: Descargar los datos (cadena) de la URL; Analice los datos (de cadena) en HTML y luego en un DataSet
; y fusionar el conjunto de datos en el conjunto de datos maestro.
Primero, creamos los bloques que irán en la malla:
DataSet allData;
var downloadData = new TransformBlock<string, string>(
async pageid =>
{
System.Net.WebClient webClient = null;
var url = "https://domain.com?&id=" + pageid + "restofurl";
return await webClient.DownloadStringTaskAsync(url);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
var parseHtml = new TransformBlock<string, DataSet>(
html =>
{
var dsPageData = new DataSet();
var doc = new HtmlDocument();
doc.LoadHtml(html);
// HTML Agility parsing
return dsPageData;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
var merge = new ActionBlock<DataSet>(
dataForOnePage =>
{
// merge dataForOnePage into allData
});
Luego unimos los tres bloques para crear la malla:
downloadData.LinkTo(parseHtml);
parseHtml.LinkTo(merge);
A continuación, comenzamos a bombear datos en la malla:
foreach (var pageid in the8000urls)
downloadData.Post(pageid);
Y finalmente, esperamos que se complete cada paso en la malla (esto también propagará cualquier error):
downloadData.Complete();
await downloadData.Completion;
parseHtml.Complete();
await parseHtml.Completion;
merge.Complete();
await merge.Completion;
Lo bueno de TPL Dataflow es que puede controlar fácilmente qué tan paralelas son cada parte. Por ahora, he establecido que tanto los bloques de descarga como los de análisis sean Sin Unbounded
, pero es posible que desee restringirlos. El bloque de combinación utiliza el paralelismo máximo predeterminado de 1, por lo que no es necesario ningún bloqueo al fusionar.