c# - Patrón ZeroMQ PUB/SUB con cancelación de Poller multihilo
c++ multithreading (2)
Tengo dos aplicaciones, un servidor C ++ y una interfaz de usuario WPF de C #. El código C ++ recibe solicitudes (desde cualquier lugar / cualquier persona) a través de un servicio [PUB / SUB] de mensajería de ZeroMQ. Utilizo mi código C # para realizar pruebas y crear "pruebas" y las ejecuto. Estas pruebas posteriores se pueden componer de muchas "pruebas unitarias" y cada una de ellas envía y recibe miles de mensajes del servidor C ++.
Actualmente, las pruebas inversas individuales funcionan bien y pueden enviar N pruebas de unidad, cada una con miles de solicitudes y capturas. Mi problema es la arquitectura; cuando envío otra prueba de respaldo (después de la primera), tengo un problema con la suscripción del evento que se realiza una segunda vez debido a que el hilo de sondeo no se cancela ni se elimina. Esto se traduce en una salida errónea. Esto puede parecer un problema trivial (quizás sea para algunos de ustedes), pero la cancelación de esta tarea de sondeo en mi configuración actual está resultando problemática. Algún código ...
Mi clase de intermediario de mensajes es simple y parece
public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
private Task pollingTask;
private NetMQContext context;
private PublisherSocket pubSocket;
private CancellationTokenSource source;
private CancellationToken token;
private ManualResetEvent pollerCancelled;
public MessageBroker()
{
this.source = new CancellationTokenSource();
this.token = source.Token;
StartPolling();
context = NetMQContext.Create();
pubSocket = context.CreatePublisherSocket();
pubSocket.Connect(PublisherAddress);
}
public void Dispatch(Taurus.FeedMux message)
{
pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
}
private void StartPolling()
{
pollerCancelled = new ManualResetEvent(false);
pollingTask = Task.Run(() =>
{
try
{
using (var context = NetMQContext.Create())
using (var subSocket = context.CreateSubscriberSocket())
{
byte[] buffer = null;
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect(SubscriberAddress);
subSocket.Subscribe(String.Empty);
while (true)
{
buffer = subSocket.Receive();
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
if (this.token.IsCancellationRequested)
this.token.ThrowIfCancellationRequested();
}
}
}
catch (OperationCanceledException)
{
pollerCancelled.Set();
}
}, this.token);
}
private void CancelPolling()
{
source.Cancel();
pollerCancelled.WaitOne();
pollerCancelled.Close();
}
public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }
private bool disposed = false;
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
if (this.pollingTask != null)
{
CancelPolling();
if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
this.pollingTask.Status == TaskStatus.Faulted ||
this.pollingTask.Status == TaskStatus.Canceled)
{
this.pollingTask.Dispose();
this.pollingTask = null;
}
}
if (this.context != null)
{
this.context.Dispose();
this.context = null;
}
if (this.pubSocket != null)
{
this.pubSocket.Dispose();
this.pubSocket = null;
}
if (this.source != null)
{
this.source.Dispose();
this.source = null;
}
}
disposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~MessageBroker()
{
Dispose(false);
}
}
El "motor" de backtesting que se usa para ejecutar cada prueba posterior, primero construye un Dictionary
contiene cada Test
(unidad de prueba) y los mensajes para enviar a la aplicación de C ++ para cada prueba.
El método DispatchTests
, aquí está
private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
broker = new MessageBroker();
broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
testCompleted = new ManualResetEvent(false);
try
{
// Loop through the tests.
foreach (var kvp in feedMuxCollection)
{
testCompleted.Reset();
Test t = kvp.Key;
t.Bets = new List<Taurus.Bet>();
foreach (Taurus.FeedMux mux in kvp.Value)
{
token.ThrowIfCancellationRequested();
broker.Dispatch(mux);
}
broker.Dispatch(new Taurus.FeedMux()
{
type = Taurus.FeedMux.Type.PING,
ping = new Taurus.Ping() { event_id = t.EventID }
});
testCompleted.WaitOne(); // Wait until all messages are received for this test.
}
testCompleted.Close();
}
finally
{
broker.Dispose(); // Dispose the broker.
}
}
El mensaje PING
al final, le dice a C ++ que hemos terminado. Luego forzamos una espera, para que la siguiente [unidad] de prueba no se envíe antes de que se reciban todas las devoluciones del código C ++; lo hacemos utilizando un ManualResetEvent
.
Cuando C ++ recibe el mensaje PING, envía el mensaje directamente hacia atrás. Manejamos los mensajes recibidos a través de OnMessageRecieved
y el PING nos dice que configuremos ManualResetEvent.Set()
para que podamos continuar con la prueba de la unidad; "Siguiente por favor"...
private async void OnMessageRecieved(Taurus.FeedMux mux)
{
string errorMsg = String.Empty;
if (mux.type == Taurus.FeedMux.Type.MSG)
{
// Do stuff.
}
else if (mux.type == Taurus.FeedMux.Type.PING)
{
// Do stuff.
// We are finished reciving messages for this "unit test"
testCompleted.Set();
}
}
Mi problema es que, broker.Dispose()
en el broker.Dispose()
arriba nunca es alcanzado.Aprecio que finalmente los bloques que se ejecutan en subprocesos en segundo plano no están garantizados para ejecutarse .
El texto tachado arriba se debió a que me metí con el código; Estaba deteniendo un hilo padre antes de que el niño hubiera completado. Sin embargo, todavía hay problemas ...
Ahora se llama a broker.Dispose()
correctamente, y se llama a broker.Dispose()
, en este método intento cancelar la cadena del sondeador y eliminar la Task
correctamente para evitar suscripciones múltiples.
Para cancelar el hilo utilizo el método CancelPolling()
private void CancelPolling()
{
source.Cancel();
pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
pollerCancelled.Close();
}
pero en el método StartPolling()
while (true)
{
buffer = subSocket.Receive();
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
if (this.token.IsCancellationRequested)
this.token.ThrowIfCancellationRequested();
}
ThrowIfCancellationRequested()
nunca se llama y el subproceso nunca se cancela, por lo tanto, nunca se elimina correctamente. El subSocket.Receive subSocket.Receive()
está bloqueando el subSocket.Receive()
del subSocket.Receive()
.
Ahora, no me queda claro cómo lograr lo que quiero, necesito invocar al broker.Dispose()
/ PollerCancel()
en un subproceso distinto al utilizado para sondear los mensajes y cómo forzar la cancelación. Abortar el hilo no es lo que quiero obtener a cualquier costo.
Esencialmente, quiero deshacerme del broker
antes de ejecutar la siguiente prueba, ¿cómo manejo esto correctamente, divido el sondeo y lo ejecuto en un dominio de aplicación separado?
Lo he intentado, desechándolo dentro del controlador OnMessageRecived
, pero esto se ejecuta claramente en el mismo subproceso que el poller y no es la forma de hacerlo, sin invocar subprocesos adicionales, se bloquea.
¿Cuál es la mejor manera de lograr lo que quiero y hay un patrón para este tipo de caso que puedo seguir?
Gracias por tu tiempo.
Una vista de nivel superior sobre el tema.
Su enfoque y sus esfuerzos, dedicados a crear un marco de prueba, indican que su voluntad apunta a desarrollar un enfoque riguroso y de nivel profesional, lo que me ha hecho levantar mi sombrero en un saludo de admiración a una empresa tan valiente.
Si bien las pruebas son una actividad importante para proporcionar una evidencia cuantitativa razonable, que un Sistema bajo prueba cumple con las expectativas definidas, el éxito en esto depende de qué tan cerca el entorno de prueba cumpla con las condiciones de la implementación real.
Uno puede estar de acuerdo, que las pruebas en otras bases diferentes no prueban que la implementación real se ejecutará como se espera en un entorno, que es principalmente diferente de la (s) prueba (s).
Control de elementos o simplemente un control de estado, esa es la pregunta.
Sus esfuerzos (al menos en el momento en que se publicó el OP) se concentran en la arquitectura de código, que intenta mantener las instancias en el lugar e intenta restablecer el estado interno de una instancia de Poller antes de que comience la próxima prueba de batería.
En mi opinión, las pruebas deben seguir algunos principios, si se esfuerza por realizar pruebas profesionales:
Principio de la repetibilidad de una prueba (las repeticiones de las pruebas darán los mismos resultados, evitando así una cuasi prueba que proporciona solo un resultado: "lotería")
Principio de las pruebas de no intervención (las repeticiones de las pruebas no deben ser objeto de interferencia "externa", no están controladas por el escenario de prueba)
Habiendo dicho esto, permítame traer algunas notas inspiradas por Harry Markowitz, Premio Nobel otorgado por sus notables estudios cuantitativos de optimización de cartera.
Más bien, retroceda un paso para obtener el control sobre el ciclo de vida completo de los elementos.
CACI Simulations, Inc., (una de las compañías de Harry Markowitz) desarrolló a principios de los años 90 su marco de software insignia COMET III: un motor de simulación excepcionalmente potente para grandes prototipos de diseño complejo y simulaciones de rendimiento de procesos operados en computación a gran escala. Redes / redes de telecomunicaciones.
La mayor impresión de COMET III fue su capacidad para generar escenarios de prueba, incluida una carga previa (s) de precalentamiento configurable, que ha hecho que los elementos probados entren en un estado similar al que significa "fatiga" en mecánica. experimentos de prueba de tortura o lo que significa la fragilidad de difusión de hidrógeno para los metalúrgicos de centrales nucleares.
Sí, una vez que vaya a detalles de bajo nivel sobre cómo los algoritmos, las memorias intermedias de nodos, las asignaciones de memoria, las selecciones de arquitectura de línea de tubería / carga equilibrada / procesamiento de red, los gastos generales de resiliencia a fallos, las políticas de recolección de basura y los algoritmos limitados de uso compartido de recursos Trabajo e impacto (bajo los patrones de carga de trabajo de uso real "presión") rendimiento / latencias de extremo a extremo, esta función es simplemente indispensable.
Esto significa que un simple control de estado individual relacionado con la instancia no es suficiente, ya que no proporciona medios para la repetibilidad de la prueba y el comportamiento de la prueba del aislamiento / no intervenir. En pocas palabras, incluso si encuentra una manera de "restablecer" una instancia de Poller, esto no lo llevará a realizar pruebas realistas con la posibilidad de repetición de pruebas garantizada, con un posible calentamiento previo a la prueba.
Se necesita un paso atrás y una capa superior de controles de abstracción y escenarios de prueba.
¿Cómo se aplica esto al problema OP?
- En lugar de solo el control del estado
- Crear una arquitectura de varias capas / plano (s) de control / señalización separada
Una forma ZeroMQ de apoyar este objetivo.
- Crea superestructuras como patrones no triviales.
- Utilice controles de ciclo de vida completo de instancias utilizadas en escenarios de prueba
- Mantener ZeroMQ-maxims: Cero-intercambio, Cero-bloqueo, ...
- Benefíciese del contexto múltiple ()
Así es como finalmente resolví esto [¡aunque estoy abierto a una mejor solución!]
public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
// Vars.
private NetMQContext context;
private PublisherSocket pubSocket;
private Poller poller;
private CancellationTokenSource source;
private CancellationToken token;
private ManualResetEvent pollerCancelled;
/// <summary>
/// Default ctor.
/// </summary>
public FeedMuxMessageBroker()
{
context = NetMQContext.Create();
pubSocket = context.CreatePublisherSocket();
pubSocket.Connect(PublisherAddress);
pollerCancelled = new ManualResetEvent(false);
source = new CancellationTokenSource();
token = source.Token;
StartPolling();
}
#region Methods.
/// <summary>
/// Send the mux message to listners.
/// </summary>
/// <param name="message">The message to dispatch.</param>
public void Dispatch(Taurus.FeedMux message)
{
pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
}
/// <summary>
/// Start polling for messages.
/// </summary>
private void StartPolling()
{
Task.Run(() =>
{
using (var subSocket = context.CreateSubscriberSocket())
{
byte[] buffer = null;
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect(SubscriberAddress);
subSocket.Subscribe(String.Empty);
subSocket.ReceiveReady += (s, a) =>
{
buffer = subSocket.Receive();
if (MessageRecieved != null)
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
};
// Poll.
poller = new Poller();
poller.AddSocket(subSocket);
poller.PollTillCancelled();
token.ThrowIfCancellationRequested();
}
}, token).ContinueWith(ant =>
{
pollerCancelled.Set();
}, TaskContinuationOptions.OnlyOnCanceled);
}
/// <summary>
/// Cancel polling to allow the broker to be disposed.
/// </summary>
private void CancelPolling()
{
source.Cancel();
poller.Cancel();
pollerCancelled.WaitOne();
pollerCancelled.Close();
}
#endregion // Methods.
#region Properties.
/// <summary>
/// Event that is raised when a message is recived.
/// </summary>
public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
/// <summary>
/// The address to use for the publisher socket.
/// </summary>
public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }
/// <summary>
/// The address to use for the subscriber socket.
/// </summary>
public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
#endregion // Properties.
#region IDisposable Members.
private bool disposed = false;
/// <summary>
/// Dispose managed resources.
/// </summary>
/// <param name="disposing">Is desposing.</param>
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
CancelPolling();
if (pubSocket != null)
{
pubSocket.Disconnect(PublisherAddress);
pubSocket.Dispose();
pubSocket = null;
}
if (poller != null)
{
poller.Dispose();
poller = null;
}
if (context != null)
{
context.Terminate();
context.Dispose();
context = null;
}
if (source != null)
{
source.Dispose();
source = null;
}
}
// Shared cleanup logic.
disposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Finalizer.
/// </summary>
~FeedMuxMessageBroker()
{
Dispose(false);
}
#endregion // IDisposable Members.
}
Entonces, hacemos la encuesta de la misma manera, pero usando la clase Poller
de NetMQ. En la continuación de la Tarea configuramos, por lo que estamos seguros de que tanto el Poller
como la Task
están cancelados. Entonces somos seguros para disponer ...