redis stackexchange stackexchange.redis

StackExchange.Redis-Uso de LockTake/LockRelease



(2)

Estoy usando Redis con StackExchange.Redis. Tengo varios subprocesos que, en algún momento, accederán y editarán el valor de la misma clave, por lo que debo sincronizar la manipulación de los datos.

En cuanto a las funciones disponibles, veo que hay dos funciones, TakeLock y ReleaseLock. Sin embargo, estas funciones toman una clave y un parámetro de valor en lugar de la clave única esperada para bloquearse. La documentación y la fuente de intellisene en GitHub no explican cómo usar las funciones LockTake y LockRelease o qué se debe pasar para los parámetros clave y de valor.

P: ¿Cuál es el uso correcto de LockTake y LockRelease en StackExchange.Redis?

Ejemplo de pseudocódigo de lo que pretendo hacer:

//Add Items Before Parallel Execution redis.StringSet("myJSONKey", myJSON); //Parallel Execution Parallel.For(0, 100, i => { //Some work here //.... //Lock redis.LockTake("myJSONKey"); //Manipulate var myJSONObject = redis.StringGet("myJSONKey"); myJSONObject.Total++; Console.WriteLine(myJSONObject.Total); redis.StringSet("myJSONKey", myNewJSON); //Unlock redis.LockRelease("myJSONKey"); //More work here //... });


Hay 3 partes de una cerradura:

  • la clave (el nombre único del bloqueo en la base de datos)
  • el valor (un token definido por el llamador que se puede usar para indicar quién "posee" el bloqueo y para comprobar que la liberación y la extensión del bloqueo se están haciendo correctamente)
  • la duración (un bloqueo intencionalmente es una cosa de duración finita)

Si no se le ocurre ningún otro valor, una guía puede crear un "valor" adecuado. Tendemos a usar el nombre de la máquina (o una versión combinada del nombre de la máquina si varios procesos podrían competir en la misma máquina).

Además, tenga en cuenta que tomar un bloqueo es especulativo , no bloqueante . Es totalmente posible que no logres obtener el bloqueo y, por lo tanto, es posible que tengas que probar esto y tal vez agregar alguna lógica de reintento.

Un ejemplo típico podría ser:

RedisValue token = Environment.MachineName; if(db.LockTake(key, token, duration)) { try { // you have the lock do work } finally { db.LockRelease(key, token); } }

Tenga en cuenta que si el trabajo es largo (un bucle, en particular), es posible que desee agregar algunas llamadas ocasionales de LockExtend en el medio - nuevamente, recuerde verificar el éxito (en caso de que se agote el tiempo).

Tenga en cuenta también que todos los comandos redis individuales son atómicos, por lo que no necesita preocuparse por dos operaciones discretas que compitan. Para unidades de operación múltiple más complejas, las transacciones y los scripts son opciones.


Tengo mi parte del código para bloquear-> obtener-> modificar (si es necesario) -> desbloquear acciones con comentarios.

public static T GetCachedAndModifyWithLock<T>(string key, Func<T> retrieveDataFunc, TimeSpan timeExpiration, Func<T, bool> modifyEntityFunc, TimeSpan? lockTimeout = null, bool isSlidingExpiration=false) where T : class { int lockCounter = 0;//for logging in case when too many locks per key Exception logException = null; var cache = Connection.GetDatabase(); var lockToken = Guid.NewGuid().ToString(); //unique token for current part of code var lockName = key + "_lock"; //unique lock name. key-relative. T tResult = null; while ( lockCounter < 20) { //check for access to cache object, trying to lock it if (!cache.LockTake(lockName, lockToken, lockTimeout ?? TimeSpan.FromSeconds(10))) { lockCounter++; Thread.Sleep(100); //sleep for 100 milliseconds for next lock try. you can play with that continue; } try { RedisValue result = RedisValue.Null; if (isSlidingExpiration) { //in case of sliding expiration - get object with expiry time var exp = cache.StringGetWithExpiry(key); //check ttl. if (exp.Expiry.HasValue && exp.Expiry.Value.TotalSeconds >= 0) { //get only if not expired result = exp.Value; } } else //in absolute expiration case simply get { result = cache.StringGet(key); } //"REDIS_NULL" is for cases when our retrieveDataFunc function returning null (we cannot store null in redis, but can store pre-defined string :) ) if (result.HasValue && result == "REDIS_NULL") return null; //in case when cache is epmty if (!result.HasValue) { //retrieving data from caller function (from db from example) tResult = retrieveDataFunc(); if (tResult != null) { //trying to modify that entity. if caller modifyEntityFunc returns true, it means that caller wants to resave modified entity. if (modifyEntityFunc(tResult)) { //json serialization var json = JsonConvert.SerializeObject(tResult); cache.StringSet(key, json, timeExpiration); } } else { //save pre-defined string in case if source-value is null. cache.StringSet(key, "REDIS_NULL", timeExpiration); } } else { //retrieve from cache and serialize to required object tResult = JsonConvert.DeserializeObject<T>(result); //trying to modify if (modifyEntityFunc(tResult)) { //and save if required var json = JsonConvert.SerializeObject(tResult); cache.StringSet(key, json, timeExpiration); } } //refresh exiration in case of sliding expiration flag if(isSlidingExpiration) cache.KeyExpire(key, timeExpiration); } catch (Exception ex) { logException = ex; } finally { cache.LockRelease(lockName, lockToken); } break; } if (lockCounter >= 20 || logException!=null) { //log it } return tResult; }

y uso:

public class User { public int ViewCount { get; set; } } var cachedAndModifiedItem = GetCachedAndModifyWithLock<User>( "MyAwesomeKey", () => { //return from db or kind of that return new User() { ViewCount = 0 }; }, TimeSpan.FromMinutes(10), user=> { if (user.ViewCount< 3) { user.ViewCount++; return true; //save it to cache } return false; //do not update it in cache }, TimeSpan.FromSeconds(10),true);

Ese código puede mejorarse (por ejemplo, puede agregar transacciones para un menor número de llamadas a caché y etc.), pero me alegro de que sea útil para usted.