scaleout mvc backplane c# signalr failover booksleeve

c# - backplane - signalr mvc



Uso de SignalR con conmutaciĆ³n por error de Redis MessageSleeve con ConnectionUtils.Connect() (1)

Estoy intentando crear un escenario de conmutación por error de bus de mensajes Redis con una aplicación SignalR.

Al principio, probamos un failover de carga-equilibrador de hardware simple, que simplemente monitoreaba dos servidores Redis. La aplicación SignalR apuntaba al punto final HLB singular. Luego fallé en un servidor, pero no pude enviar mensajes correctamente en el segundo servidor Redis sin reciclar el grupo de aplicaciones SignalR. Presumiblemente esto se debe a que necesita emitir los comandos de configuración al nuevo bus de mensajes Redis.

A partir de SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBus usa RedisConnection de RedisConnection() para conectarse a un solo Redis para pub / sub.

RedisMessageBusCluster() una clase nueva, RedisMessageBusCluster() que usa ConnectionUtils.Connect() Booksleeve para conectarse a una en un clúster de servidores Redis.

using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using BookSleeve; using Microsoft.AspNet.SignalR.Infrastructure; namespace Microsoft.AspNet.SignalR.Redis { /// <summary> /// WIP: Getting scaleout for Redis working /// </summary> public class RedisMessageBusCluster : ScaleoutMessageBus { private readonly int _db; private readonly string[] _keys; private RedisConnection _connection; private RedisSubscriberConnection _channel; private Task _connectTask; private readonly TaskQueue _publishQueue = new TaskQueue(); public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver) : base(resolver) { _db = db; _keys = keys.ToArray(); // uses a list of connections _connection = ConnectionUtils.Connect(serverList); //_connection = new RedisConnection(host: server, port: port, password: password); _connection.Closed += OnConnectionClosed; _connection.Error += OnConnectionError; // Start the connection - TODO: can remove this Open as the connection is already opened, but there''s the _connectTask is used later on _connectTask = _connection.Open().Then(() => { // Create a subscription channel in redis _channel = _connection.GetOpenSubscriberChannel(); // Subscribe to the registered connections _channel.Subscribe(_keys, OnMessage); // Dirty hack but it seems like subscribe returns before the actual // subscription is properly setup in some cases while (_channel.SubscriptionCount == 0) { Thread.Sleep(500); } }); } protected override Task Send(Message[] messages) { return _connectTask.Then(msgs => { var taskCompletionSource = new TaskCompletionSource<object>(); // Group messages by source (connection id) var messagesBySource = msgs.GroupBy(m => m.Source); SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource); return taskCompletionSource.Task; }, messages); } private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource) { if (!enumerator.MoveNext()) { taskCompletionSource.TrySetResult(null); } else { IGrouping<string, Message> group = enumerator.Current; // Get the channel index we''re going to use for this message int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length; string key = _keys[index]; // Increment the channel number _connection.Strings.Increment(_db, key) .Then((id, k) => { var message = new RedisMessage(id, group.ToArray()); return _connection.Publish(k, message.GetBytes()); }, key) .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource) .ContinueWithNotComplete(taskCompletionSource); } } private void OnConnectionClosed(object sender, EventArgs e) { // Should we auto reconnect? if (true) { ; } } private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e) { // How do we bubble errors? if (true) { ; } } private void OnMessage(string key, byte[] data) { // The key is the stream id (channel) var message = RedisMessage.Deserialize(data); _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages)); } protected override void Dispose(bool disposing) { if (disposing) { if (_channel != null) { _channel.Unsubscribe(_keys); _channel.Close(abort: true); } if (_connection != null) { _connection.Close(abort: true); } } base.Dispose(disposing); } } }

Booksleeve tiene su propio mecanismo para determinar un maestro, y se conmutará automáticamente a otro servidor, y ahora estoy probando esto con SignalR.Chat .

En web.config , configuré la lista de servidores disponibles:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>

Luego en Application_Start() :

// Redis cluster server list string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"]; List<string> eventKeys = new List<string>(); eventKeys.Add("SignalR.Redis.FailoverTest"); GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);

Agregué dos métodos adicionales a Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions :

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys) { return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys); } public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys) { var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver)); resolver.Register(typeof(IMessageBus), () => bus.Value); return resolver; }

Ahora el problema es que cuando tengo varios puntos de interrupción habilitados, hasta después de agregar un nombre de usuario, deshabilita todos los puntos de interrupción, la aplicación funciona como se esperaba. Sin embargo, con los puntos de interrupción desactivados desde el principio, parece haber alguna condición de carrera que puede estar fallando durante el proceso de conexión.

Por lo tanto, en RedisMessageCluster() :

// Start the connection _connectTask = _connection.Open().Then(() => { // Create a subscription channel in redis _channel = _connection.GetOpenSubscriberChannel(); // Subscribe to the registered connections _channel.Subscribe(_keys, OnMessage); // Dirty hack but it seems like subscribe returns before the actual // subscription is properly setup in some cases while (_channel.SubscriptionCount == 0) { Thread.Sleep(500); } });

Traté de agregar tanto una Task.Wait , e incluso un Sleep() adicional Sleep() (que no se muestra arriba) - que estaban esperando / etc, pero sigue recibiendo errores.

El error recurrente parece estar en Booksleeve.MessageQueue.cs ~ ln 71:

A first chance exception of type ''System.InvalidOperationException'' occurred in BookSleeve.dll iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/MessageQueue.cs:line 71 at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/RedisConnectionBase.cs:line 910 at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/RedisConnectionBase.cs:line 826 at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/IStringCommands.cs:line 277 at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/IStringCommands.cs:line 270 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:/Projects/Frameworks/SignalR/SignalR.1.0RC1/SignalR/src/Microsoft.AspNet.SignalR.Redis/RedisMessageBusCluster.cs:line 90 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:/Projects/Frameworks/SignalR/SignalR.1.0RC1/SignalR/src/Microsoft.AspNet.SignalR.Redis/RedisMessageBusCluster.cs:line 67 at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:/Projects/Frameworks/SignalR/SignalR.1.0RC1/SignalR/src/Microsoft.AspNet.SignalR.Core/TaskAsyncHelper.cs:line 893 at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:/Projects/Frameworks/SignalR/SignalR.1.0RC1/SignalR/src/Microsoft.AspNet.SignalR.Core/TaskAsyncHelper.cs:line 821 --- End of inner exception stack trace --- ---> (Inner Exception #0) System.InvalidOperationException: The queue is closed at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/MessageQueue.cs:line 71 at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/RedisConnectionBase.cs:line 910 at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/RedisConnectionBase.cs:line 826 at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/IStringCommands.cs:line 277 at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:/Projects/Frameworks/BookSleeve-1.2.0.5/BookSleeve/IStringCommands.cs:line 270 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:/Projects/Frameworks/SignalR/SignalR.1.0RC1/SignalR/src/Microsoft.AspNet.SignalR.Redis/RedisMessageBusCluster.cs:line 90 at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:/Projects/Frameworks/SignalR/SignalR.1.0RC1/SignalR/src/Microsoft.AspNet.SignalR.Redis/RedisMessageBusCluster.cs:line 67 at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:/Projects/Frameworks/SignalR/SignalR.1.0RC1/SignalR/src/Microsoft.AspNet.SignalR.Core/TaskAsyncHelper.cs:line 893 at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:/Projects/Frameworks/SignalR/SignalR.1.0RC1/SignalR/src/Microsoft.AspNet.SignalR.Core/TaskAsyncHelper.cs:line 821<--- public void Enqueue(RedisMessage item, bool highPri) { lock (stdPriority) { if (closed) { throw new InvalidOperationException("The queue is closed"); }

Donde se lanza una excepción de cola cerrada.

Preveo otro problema: como la conexión de Redis se hace en Application_Start() , puede haber algunos problemas para "reconectarse" a otro servidor. Sin embargo, creo que esto es válido cuando se utiliza el singular RedisConnection() , donde solo hay una conexión para elegir. Sin embargo, con la introducción de ConnectionUtils.Connect() me gustaría escuchar a @dfowler u otros chicos de SignalR sobre cómo se maneja este escenario en SignalR.


El equipo SignalR ahora ha implementado soporte para una fábrica de conexiones personalizadas con StackExchange.Redis , el sucesor de BookSleeve, que admite conexiones Redis redundantes a través de ConnectionMultiplexer.

El problema inicial encontrado fue que, a pesar de crear mis propios métodos de extensión en BookSleeve para aceptar una colección de servidores, el fail-over no fue posible.

Ahora, con la evolución de BookSleeve a StackExchange.Redis, ahora podemos configure colección de servidores / puertos directamente en la inicialización de Connect .

La nueva implementación es mucho más simple que la ruta que estaba bajando, al crear un método UseRedisCluster , y el pluming de back-end ahora es compatible con el verdadero fail-over:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true");

StackExchange.Redis también permite la configuración manual adicional como se describe en la sección Automatic and Manual Configuration de la documentación:

ConfigurationOptions config = new ConfigurationOptions { EndPoints = { { "redis0", 6379 }, { "redis1", 6380 } }, CommandMap = CommandMap.Create(new HashSet<string> { // EXCLUDE a few commands "INFO", "CONFIG", "CLUSTER", "PING", "ECHO", "CLIENT" }, available: false), KeepAlive = 180, DefaultVersion = new Version(2, 8, 8), Password = "changeme" };

En esencia, la capacidad de inicializar nuestro entorno de escalabilidad SignalR con una colección de servidores ahora resuelve el problema inicial.