c# async-await system.reactive

c# - Convertir un IEnumerable<T> a IObservable<T>, con el máximo paralelismo



async-await system.reactive (1)

EDITAR

Lo siguiente debería funcionar Esta sobrecarga limita el número de suscripciones concurrentes.

var resultObservable = pages .Select(p => Observable.FromAsync(() => GetPage(p))) .Merge(maxConcurrent);

Explicación

Para entender por qué es necesario este cambio, necesitamos algunos antecedentes

  1. FromAsync devuelve un elemento observable que invocará el Func pasado cada vez que se suscriba . Esto implica que si el observable nunca está suscrito, nunca será invocado.

  2. Merge lee con entusiasmo la secuencia fuente y solo se suscribe a n observables simultáneamente.

Con estas dos piezas podemos saber por qué la versión original ejecutará todo en paralelo: debido a (2), GetPage se habrá invocado a GetPage para todas las cadenas fuente cuando Merge decida cuántos observables deben suscribirse.

Y también podemos ver por qué funciona la segunda versión: aunque la secuencia se haya iterado por completo, (1) significa que GetPage no se invoca hasta que Merge decide que necesita suscribirse a n observables. Esto lleva al resultado deseado de solo n tareas ejecutadas simultáneamente.

Tengo una secuencia de tareas asíncronas para hacer (por ejemplo, buscar N páginas web). Ahora lo que quiero es exponerlos a todos como un IObservable<T> . Mi solución actual usa la respuesta de esta pregunta :

async Task<ResultObj> GetPage(string page) { Console.WriteLine("Before"); var result = await FetchFromInternet(page); Console.WriteLine("After"); return result; } // pages is an IEnumerable<string> IObservable<ResultObj> resultObservable =pages.Select(GetPage). Select(t => Observable.FromAsync(() => t)).Merge(); // Now consume the list foreach(ResultObj obj in resultObservable.ToEnumerable()) { Console.WriteLine(obj.ToString()); }

El problema es que no sé la cantidad de páginas que se van a buscar, y podría ser grande. No quiero hacer cientos de solicitudes simultáneas. Así que quiero una forma de limitar el número máximo de tareas que se ejecutarán en paralelo. ¿Hay alguna manera de limitar el número de invocaciones simultáneas de GetPage ?

Hay una sobrecarga Merge que toma un parámetro maxConcurrent, pero no parece limitar realmente la concurrencia de la invocación de la función. La consola imprime todos los mensajes Antes de los mensajes Después.

Nota: Necesito convertir de nuevo a IEnumerable<T> . Estoy escribiendo una fuente de datos para un sistema que me da descriptores de datos para recuperar, y necesito devolverle una lista de los datos descargados.