c# - Convertir un IEnumerable<T> a IObservable<T>, con el máximo paralelismo
async-await system.reactive (1)
EDITAR
Lo siguiente debería funcionar Esta sobrecarga limita el número de suscripciones concurrentes.
var resultObservable = pages
.Select(p => Observable.FromAsync(() => GetPage(p)))
.Merge(maxConcurrent);
Explicación
Para entender por qué es necesario este cambio, necesitamos algunos antecedentes
FromAsync
devuelve un elemento observable que invocará elFunc
pasado cada vez que se suscriba . Esto implica que si el observable nunca está suscrito, nunca será invocado.Merge
lee con entusiasmo la secuencia fuente y solo se suscribe an
observables simultáneamente.
Con estas dos piezas podemos saber por qué la versión original ejecutará todo en paralelo: debido a (2), GetPage
se habrá invocado a GetPage
para todas las cadenas fuente cuando Merge
decida cuántos observables deben suscribirse.
Y también podemos ver por qué funciona la segunda versión: aunque la secuencia se haya iterado por completo, (1) significa que GetPage
no se invoca hasta que Merge
decide que necesita suscribirse a n
observables. Esto lleva al resultado deseado de solo n
tareas ejecutadas simultáneamente.
Tengo una secuencia de tareas asíncronas para hacer (por ejemplo, buscar N páginas web). Ahora lo que quiero es exponerlos a todos como un IObservable<T>
. Mi solución actual usa la respuesta de esta pregunta :
async Task<ResultObj> GetPage(string page) {
Console.WriteLine("Before");
var result = await FetchFromInternet(page);
Console.WriteLine("After");
return result;
}
// pages is an IEnumerable<string>
IObservable<ResultObj> resultObservable =pages.Select(GetPage).
Select(t => Observable.FromAsync(() => t)).Merge();
// Now consume the list
foreach(ResultObj obj in resultObservable.ToEnumerable()) {
Console.WriteLine(obj.ToString());
}
El problema es que no sé la cantidad de páginas que se van a buscar, y podría ser grande. No quiero hacer cientos de solicitudes simultáneas. Así que quiero una forma de limitar el número máximo de tareas que se ejecutarán en paralelo. ¿Hay alguna manera de limitar el número de invocaciones simultáneas de GetPage
?
Hay una sobrecarga Merge
que toma un parámetro maxConcurrent, pero no parece limitar realmente la concurrencia de la invocación de la función. La consola imprime todos los mensajes Antes de los mensajes Después.
Nota: Necesito convertir de nuevo a IEnumerable<T>
. Estoy escribiendo una fuente de datos para un sistema que me da descriptores de datos para recuperar, y necesito devolverle una lista de los datos descargados.