c# .net multithreading async-await concurrentdictionary

c# - Caché de operaciones asíncronas.



.net multithreading (6)

Estoy buscando una forma elegante de almacenar en caché los resultados de mis operaciones asíncronas.

Primero tuve un método síncrono como este:

public String GetStuff(String url) { WebRequest request = WebRequest.Create(url); using (var response = request.GetResponse()) using (var sr = new StreamReader(response.GetResponseStream())) return sr.ReadToEnd(); }

Entonces lo hice asíncrono:

public async Task<String> GetStuffAsync(String url) { WebRequest request = WebRequest.Create(url); using (var response = await request.GetResponseAsync()) using (var sr = new StreamReader(response.GetResponseStream())) return await sr.ReadToEndAsync(); }

Entonces decidí que debería almacenar en caché los resultados, por lo que no necesito consultar fuera de eso a menudo:

ConcurrentDictionary<String, String> _cache = new ConcurrentDictionary<String, String>(); public async Task<String> GetStuffAsync(String url) { return _cache.GetOrAdd(url, await GetStuffInternalAsync(url)); } private async Task<String> GetStuffInternalAsync(String url) { WebRequest request = WebRequest.Create(url); using (var response = await request.GetResponseAsync()) using (var sr = new StreamReader(response.GetResponseStream())) return await sr.ReadToEndAsync(); }

Luego leí un artículo (o vi un video) sobre cómo es mejor el almacenamiento en caché de la Task<T> , porque crearlos es costoso:

ConcurrentDictionary<String, Task<String>> _cache = new ConcurrentDictionary<String, Task<String>>(); public Task<String> GetStuffAsync(String url) { return _cache.GetOrAdd(url, GetStuffInternalAsync(url)); } private async Task<String> GetStuffInternalAsync(String url) { WebRequest request = WebRequest.Create(url); using (var response = await request.GetResponseAsync()) using (var sr = new StreamReader(response.GetResponseStream())) return await sr.ReadToEndAsync(); }

Y ahora el problema es que si la solicitud falla (por ejemplo, un HTTP 401), la memoria caché contendrá una Task<String> fallida y tendré que restablecer la aplicación porque será imposible reenviar la solicitud.

¿Existe una manera elegante de usar ConcurrentDictionary<T1,T2> para almacenar en caché solo las tareas exitosas y aún tener el comportamiento atómico?


Aquí hay una manera de almacenar en caché los resultados de las operaciones asíncronas que garantizan que no se pierda ninguna caché.

En la respuesta aceptada, si la misma url se solicita muchas veces en un bucle (según el SynchronizationContext) o desde varios subprocesos, la solicitud web continuará enviándose hasta que haya una respuesta que se almacena en caché, momento en el cual la caché comenzará a recibir usado.

El siguiente método crea un objeto SemaphoreSlim para cada clave única. Esto evitará que la operación asíncrona de larga ejecución se ejecute varias veces para la misma clave, mientras que permite que se ejecute simultáneamente para diferentes claves. Obviamente, existe la sobrecarga de mantener los objetos de SemaphoreSlim para evitar las fallas de caché, por lo que puede no valer la pena según el caso de uso. Pero si garantizar que no se pierda la caché es importante, esto lo logra.

private readonly ConcurrentDictionary<string, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<string, SemaphoreSlim>(); private readonly ConcurrentDictionary<string, string> _cache = new ConcurrentDictionary<string, string>(); public async Task<string> GetSomethingAsync(string key) { string value; // get the semaphore specific to this key var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1)); await keyLock.WaitAsync(); try { // try to get value from cache if (!_cache.TryGetValue(key, out value)) { // if value isn''t cached, get it the long way asynchronously value = await GetSomethingTheLongWayAsync(); // cache value _cache.TryAdd(key, value); } } finally { keyLock.Release(); } return value; }

Edición: Como se mencionó @mtkachenko en los comentarios, se podría realizar una verificación de caché adicional al comienzo de este método para omitir potencialmente el paso de adquisición de bloqueo.


En primer lugar, ambos enfoques son incorrectos, porque no le guardan ninguna solicitud (aunque el segundo le ahorra tiempo al menos).

Tu primer código (el que está a la await ) hace esto:

  1. Haga la solicitud.
  2. Espere a que se complete la solicitud.
  3. Si ya hubo un resultado en el caché, ignore el resultado de la solicitud.

Su segundo código elimina el paso 2, por lo que es más rápido, pero aún está haciendo muchas solicitudes innecesarias.

Lo que debes hacer en su lugar es usar la sobrecarga de GetOrAdd() que toma un delegado :

public Task<String> GetStuffAsync(String url) { return _cache.GetOrAdd(url, GetStuffInternalAsync); }

Esto no elimina completamente la posibilidad de que se ignoren las solicitudes, pero las hace mucho menos probables. (Para eso, podrías intentar cancelar las solicitudes que sabes que están siendo ignoradas, pero no creo que valga la pena el esfuerzo aquí).

Ahora a su pregunta real. Lo que creo que deberías hacer es usar el método AddOrUpdate() . Si el valor aún no está allí, agréguelo. Si está allí, reemplácelo si está defectuoso:

public Task<String> GetStuffAsync(String url) { return _cache.AddOrUpdate( url, GetStuffInternalAsync, (u, task) => { if (task.IsCanceled || task.IsFaulted) return GetStuffInternalAsync(u); return task; }); }


En realidad, es razonable (y dependiendo de su diseño y rendimiento, crucial) mantener esas tareas fallidas como caché negativo . De lo contrario, si una url siempre falla, usarla una y otra vez elimina el punto de usar una memoria caché por completo.

Lo que necesitas es una forma de borrar el caché de vez en cuando. La forma más sencilla es tener un temporizador que reemplace la instancia de ConcurrentDictionarry . La solución más robusta es construir tu propio LruDictionary o algo similar.


Este trabajo para mí:

ObjectCache _cache = MemoryCache.Default; static object _lockObject = new object(); public Task<T> GetAsync<T>(string cacheKey, Func<Task<T>> func, TimeSpan? cacheExpiration = null) where T : class { var task = (T)_cache[cacheKey]; if (task != null) return task; lock (_lockObject) { task = (T)_cache[cacheKey](cacheKey); if (task != null) return task; task = func(); Set(cacheKey, task, cacheExpiration); task.ContinueWith(t => { if (t.Status != TaskStatus.RanToCompletion) _cache.Remove(cacheKey); }); } return task; }


He creado un contenedor para MemoryCache que básicamente almacena en caché objetos Lazy<Task<T>> y funciona para que se resuelvan todos los problemas siguientes:

  • No se iniciarán operaciones paralelas o innecesarias para obtener un valor. Varios sitios de llamada o subprocesos podrían esperar el mismo valor de la memoria caché.
  • Las tareas fallidas no se almacenan en caché. (No cacheo negativo.)
  • Los usuarios de la memoria caché no pueden obtener resultados invalidados de la memoria caché, incluso si el valor se invalida durante una espera.

La solución se explica con más detalle en mi blog y el código de trabajo completo está disponible en GitHub .


Otra forma fácil de hacer esto es extender Lazy<T> para que sea AsyncLazy<T> , así:

public class AsyncLazy<T> : Lazy<Task<T>> { public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) : base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode) { } public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); } }

Entonces puedes hacer esto:

private readonly ConcurrentDictionary<string, AsyncLazy<string>> _cache = new ConcurrentDictionary<string, AsyncLazy<string>>(); public async Task<string> GetStuffAsync(string url) { return await _cache.GetOrAdd(url, new AsyncLazy<string>( () => GetStuffInternalAsync(url), LazyThreadSafetyMode.ExecutionAndPublication)); }