c# - thread - ¿Por qué Monitor.PulseAll da como resultado un patrón de latencia de “escalones” en los hilos señalizados?
multithreading c# (2)
Para empezar, esto no es una respuesta, simplemente mis notas de mirar el SSCLI para averiguar exactamente qué está pasando. La mayor parte de esto está muy por encima de mi cabeza, pero no obstante, es interesante.
El viaje por el agujero del conejo comienza con una llamada a Monitor.PulseAll
, que se implementa en C #:
clr/src/bcl/system/threading/monitor.cs
:
namespace System.Threading
{
public static class Monitor
{
// other methods omitted
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void ObjPulseAll(Object obj);
public static void PulseAll(Object obj)
{
if (obj==null) {
throw new ArgumentNullException("obj");
}
ObjPulseAll(obj);
}
}
}
Los métodos InternalCall se enrutan en clr/src/vm/ecall.cpp
:
FCFuncStart(gMonitorFuncs)
FCFuncElement("Enter", JIT_MonEnter)
FCFuncElement("Exit", JIT_MonExit)
FCFuncElement("TryEnterTimeout", JIT_MonTryEnter)
FCFuncElement("ObjWait", ObjectNative::WaitTimeout)
FCFuncElement("ObjPulse", ObjectNative::Pulse)
FCFuncElement("ObjPulseAll", ObjectNative::PulseAll)
FCFuncElement("ReliableEnter", JIT_MonReliableEnter)
FCFuncEnd()
ObjectNative
vive en clr/src/vm/comobject.cpp
:
FCIMPL1(void, ObjectNative::PulseAll, Object* pThisUNSAFE)
{
CONTRACTL
{
MODE_COOPERATIVE;
DISABLED(GC_TRIGGERS); // can''t use this in an FCALL because we''re in forbid gc mode until we setup a H_M_F.
THROWS;
SO_TOLERANT;
}
CONTRACTL_END;
OBJECTREF pThis = (OBJECTREF) pThisUNSAFE;
HELPER_METHOD_FRAME_BEGIN_1(pThis);
//-[autocvtpro]-------------------------------------------------------
if (pThis == NULL)
COMPlusThrow(kNullReferenceException, L"NullReference_This");
pThis->PulseAll();
//-[autocvtepi]-------------------------------------------------------
HELPER_METHOD_FRAME_END();
}
FCIMPLEND
OBJECTREF
es un poco de magia esparcida sobre el Object
(el operador ->
está sobrecargado), por lo que OBJECTREF->PulseAll()
es en realidad Object->PulseAll()
que se implementa en clr/src/vm/object.h
y solo avanza llame a ObjHeader->PulseAll
:
class Object
{
// snip
public:
// snip
ObjHeader *GetHeader()
{
LEAF_CONTRACT;
return PTR_ObjHeader(PTR_HOST_TO_TADDR(this) - sizeof(ObjHeader));
}
// snip
void PulseAll()
{
WRAPPER_CONTRACT;
GetHeader()->PulseAll();
}
// snip
}
ObjHeader::PulseAll
recupera el SyncBlock
, que utiliza AwareLock
para AwareLock
y Exit
del bloqueo en el objeto. AwareLock
( clr/src/vm/syncblk.cpp
) usa un CLREvent
( clr/src/vm/synch.cpp
) creado como MonitorEvent
( CLREvent::CreateMonitorEvent(SIZE_T)
), que se llama UnsafeCreateEvent
( clr/src/inc/unsafe.h
) o los métodos de sincronización del entorno de alojamiento.
clr/src/vm/syncblk.cpp
:
void ObjHeader::PulseAll()
{
CONTRACTL
{
INSTANCE_CHECK;
THROWS;
GC_TRIGGERS;
MODE_ANY;
INJECT_FAULT(COMPlusThrowOM(););
}
CONTRACTL_END;
// The following code may cause GC, so we must fetch the sync block from
// the object now in case it moves.
SyncBlock *pSB = GetBaseObject()->GetSyncBlock();
// GetSyncBlock throws on failure
_ASSERTE(pSB != NULL);
// make sure we own the crst
if (!pSB->DoesCurrentThreadOwnMonitor())
COMPlusThrow(kSynchronizationLockException);
pSB->PulseAll();
}
void SyncBlock::PulseAll()
{
CONTRACTL
{
INSTANCE_CHECK;
NOTHROW;
GC_NOTRIGGER;
MODE_ANY;
}
CONTRACTL_END;
WaitEventLink *pWaitEventLink;
while ((pWaitEventLink = ThreadQueue::DequeueThread(this)) != NULL)
pWaitEventLink->m_EventWait->Set();
}
DequeueThread
utiliza un crst
( clr/src/vm/crst.cpp
), que es un contenedor para las secciones críticas. m_EventWait
es un CLREvent
manual.
Por lo tanto, todo esto está usando primitivas del sistema operativo a menos que el proveedor de alojamiento predeterminado esté anulando las cosas.
En una biblioteca que usa Monitor.PulseAll () para la sincronización de subprocesos, noté que la latencia desde el momento en que se llama a PulseAll (...) hasta el momento en que se despierta un subproceso parece seguir una distribución de "escalones", con extremadamente grandes pasos Los hilos despiertos casi no hacen trabajo; Y casi inmediatamente volver a esperar en el monitor. Por ejemplo, en una caja con 12 núcleos con 24 subprocesos esperando en un monitor (2x Xeon5680 / Gulftown; 6 núcleos físicos por procesador; HT desactivado), la latencia entre el impulso y el encendido del hilo es como tal:
Los primeros 12 subprocesos (nota que tenemos 12 núcleos) toman entre 30 y 60 microsegundos para responder. Entonces empezamos a conseguir saltos muy grandes; con las mesetas alrededor de 700, 1300, 1900 y 2600 microsegundos.
Pude recrear exitosamente este comportamiento independientemente de la biblioteca de terceros usando el código a continuación. Lo que hace este código es lanzar una gran cantidad de subprocesos (cambiar el parámetro numThreads) que simplemente esperan en un monitor, leen una marca de tiempo, lo registran en un ConcurrentSet, y luego regresan inmediatamente a la espera. Una vez que un segundo PulseAll () despierta todos los hilos. Lo hace 20 veces e informa las latencias de la décima iteración a la Consola.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace PulseAllTest
{
class Program
{
static long LastTimestamp;
static long Iteration;
static object SyncObj = new object();
static Stopwatch s = new Stopwatch();
static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>();
static void Main(string[] args)
{
long numThreads = 32;
for (int i = 0; i < numThreads; ++i)
{
Task.Factory.StartNew(ReadLastTimestampAndPublish, TaskCreationOptions.LongRunning);
}
s.Start();
for (int i = 0; i < 20; ++i)
{
lock (SyncObj)
{
++Iteration;
LastTimestamp = s.Elapsed.Ticks;
Monitor.PulseAll(SyncObj);
}
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Console.WriteLine(String.Join("/n",
from n in IterationToTicks where n.Item1 == 10 orderby n.Item2
select ((decimal)n.Item2)/TimeSpan.TicksPerMillisecond));
Console.Read();
}
static void ReadLastTimestampAndPublish()
{
while(true)
{
lock(SyncObj)
{
Monitor.Wait(SyncObj);
}
IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
}
}
}
}
Utilizando el código anterior, aquí hay un ejemplo de latencias en una caja con 8 núcleos / w hyperthreading habilitado (es decir, 16 núcleos en el Administrador de tareas) y 32 subprocesos (* 2x Xeon5550 / Gainestown; 4 núcleos físicos por procesador; HT habilitado):
EDITAR: Para intentar sacar a NUMA de la ecuación, a continuación se muestra un gráfico que ejecuta el programa de ejemplo con 16 subprocesos en un Core i7-3770 (Ivy Bridge); 4 núcleos físicos; HT habilitado:
¿Alguien puede explicar por qué Monitor.PulseAll () se comporta de esta manera?
EDIT2:
Para probar y mostrar que este comportamiento no es inherente a despertar un montón de subprocesos al mismo tiempo, he replicado el comportamiento del programa de prueba usando Eventos; y en lugar de medir la latencia de PulseAll (), estoy midiendo la latencia de ManualResetEvent.Set (). El código está creando una serie de subprocesos de trabajo que esperan un evento ManualResetEvent.Set () en el mismo objeto ManualResetEvent. Cuando se desencadena el evento, toman una medición de latencia e inmediatamente esperan en su propio AutoResetEvent por hilo individual. Mucho antes de la siguiente iteración (500 ms antes), ManualResetEvent es Reset () y luego cada AutoResetEvent es Set () para que los hilos puedan volver a esperar en el ManualResetEvent compartido.
Dudé en publicar esto porque podría ser una audiencia roja gigante (no hago reclamos. Los Monitores y los Eventos se comportan de manera similar), además de usar prácticas absolutamente terribles para lograr que un Evento se comporte como un Monitor (me encantaría / odiaría ver cuál es mi lo harían los compañeros de trabajo si enviara esto a una revisión del código); Pero creo que los resultados son esclarecedores.
Esta prueba se realizó en la misma máquina que la prueba original; un 2xXeon5680 / Gulftown; 6 núcleos por procesador (12 núcleos en total); Hyperthreading deshabilitado.
Si no es obvio cuán radicalmente diferente es esto que Monitor.PulseAll; Aquí está el primer gráfico superpuesto sobre el último gráfico:
El código utilizado para generar estas medidas es el siguiente:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace MRETest
{
class Program
{
static long LastTimestamp;
static long Iteration;
static ManualResetEventSlim MRES = new ManualResetEventSlim(false);
static List<ReadLastTimestampAndPublish> Publishers =
new List<ReadLastTimestampAndPublish>();
static Stopwatch s = new Stopwatch();
static ConcurrentBag<Tuple<long, long>> IterationToTicks =
new ConcurrentBag<Tuple<long, long>>();
static void Main(string[] args)
{
long numThreads = 24;
s.Start();
for (int i = 0; i < numThreads; ++i)
{
AutoResetEvent ares = new AutoResetEvent(false);
ReadLastTimestampAndPublish spinner = new ReadLastTimestampAndPublish(
new AutoResetEvent(false));
Task.Factory.StartNew(spinner.Spin, TaskCreationOptions.LongRunning);
Publishers.Add(spinner);
}
for (int i = 0; i < 20; ++i)
{
++Iteration;
LastTimestamp = s.Elapsed.Ticks;
MRES.Set();
Thread.Sleep(500);
MRES.Reset();
foreach (ReadLastTimestampAndPublish publisher in Publishers)
{
publisher.ARES.Set();
}
Thread.Sleep(500);
}
Console.WriteLine(String.Join("/n",
from n in IterationToTicks where n.Item1 == 10 orderby n.Item2
select ((decimal)n.Item2) / TimeSpan.TicksPerMillisecond));
Console.Read();
}
class ReadLastTimestampAndPublish
{
public AutoResetEvent ARES { get; private set; }
public ReadLastTimestampAndPublish(AutoResetEvent ares)
{
this.ARES = ares;
}
public void Spin()
{
while (true)
{
MRES.Wait();
IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
ARES.WaitOne();
}
}
}
}
}
Una diferencia entre estas versiones es que, en el caso de PulseAll, los hilos repiten inmediatamente el bucle y bloquean el objeto nuevamente.
Tiene 12 núcleos, por lo que se están ejecutando 12 subprocesos, ejecute el bucle y vuelva a ingresar al bucle, bloquee el objeto (uno tras otro) y luego ingrese al estado de espera. Todo ese tiempo los otros hilos esperan. En el caso de ManualEvent, tiene dos eventos, por lo que los subprocesos no repiten inmediatamente el bucle, sino que se bloquean en los eventos ARES, lo que permite que otros subprocesos tomen la propiedad del bloqueo más rápido.
He simulado un comportamiento similar en PulseAll agregando la suspensión al final del bucle en ReadLastTimestampAndPublish. Esto permite que otros subprocesos bloqueen syncObj más rápido y parecen mejorar los números que estoy obteniendo del programa.
static void ReadLastTimestampAndPublish()
{
while(true)
{
lock(SyncObj)
{
Monitor.Wait(SyncObj);
}
IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
Thread.Sleep(TimeSpan.FromMilliseconds(100)); // <===
}
}