method - task parallel library c#
StaTaskScheduler y STA mensaje de bombeo de mensajes (2)
El tema del bombeo de hilo STA es grande, con muy pocos programadores que tengan un tiempo agradable para resolver el punto muerto. El documento seminal al respecto fue escrito por Chris Brumme, un chico inteligente principal que trabajó en .NET. Lo encontrarás en this . Lamentablemente, es bastante breve en detalles, no va más allá de señalar que el CLR hace un poco de bombeo, pero sin ningún detalle sobre las reglas exactas.
El código del que está hablando, agregado en .NET 2.0, está presente en una función CLR interna llamada MsgWaitHelper (). El código fuente de .NET 2.0 está disponible a través de la distribución SSCLI20. Muy completo, pero la fuente de MsgWaitHelper () no está incluida. Bastante inusual. Descompilar es más bien una causa perdida, es muy grande.
Lo único que se puede quitar de su publicación en el blog es el peligro de volver a ingresar . El bombeo en un subproceso de STA es peligroso por su capacidad de enviar mensajes de Windows y obtener código arbitrario para ejecutar cuando su programa no está en el estado correcto para permitir la ejecución de dicho código. Algo que la mayoría de los programadores de VB6 saben cuando usaba DoEvents () para obtener un bucle modal en su código para detener la congelación de la IU. Escribí una publicación sobre sus peligros más típicos. MsgWaitHelper () hace este tipo exacto de bombeo, sin embargo, es muy selectivo sobre exactamente qué tipo de código permite ejecutar.
Puede obtener una idea de lo que hace dentro de su programa de prueba ejecutando el programa sin un depurador adjunto y luego adjuntando un depurador no administrado. Lo verá bloqueando en NtWaitForMultipleObjects (). Lo llevé un paso más allá y establecí un punto de interrupción en PeekMessageW (), para obtener este rastro de pila:
user32.dll!PeekMessageW() Unknown
combase.dll!CCliModalLoop::MyPeekMessage(tagMSG * pMsg, HWND__ * hwnd, unsigned int min, unsigned int max, unsigned short wFlag) Line 2305 C++
combase.dll!CCliModalLoop::PeekRPCAndDDEMessage() Line 2008 C++
combase.dll!CCliModalLoop::FindMessage(unsigned long dwStatus) Line 2087 C++
combase.dll!CCliModalLoop::HandleWakeForMsg() Line 1707 C++
combase.dll!CCliModalLoop::BlockFn(void * * ahEvent, unsigned long cEvents, unsigned long * lpdwSignaled) Line 1645 C++
combase.dll!ClassicSTAThreadWaitForHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * pdwIndex) Line 46 C++
combase.dll!CoWaitForMultipleHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * lpdwindex) Line 120 C++
clr.dll!MsgWaitHelper(int,void * *,int,unsigned long,int) Unknown
clr.dll!Thread::DoAppropriateWaitWorker(int,void * *,int,unsigned long,enum WaitMode) Unknown
clr.dll!Thread::DoAppropriateWait(int,void * *,int,unsigned long,enum WaitMode,struct PendingSync *) Unknown
clr.dll!CLREventBase::WaitEx(unsigned long,enum WaitMode,struct PendingSync *) Unknown
clr.dll!CLREventBase::Wait(unsigned long,int,struct PendingSync *) Unknown
clr.dll!Thread::Block(int,struct PendingSync *) Unknown
clr.dll!SyncBlock::Wait(int,int) Unknown
clr.dll!ObjectNative::WaitTimeout(bool,int,class Object *) Unknown
Tenga en cuenta que grabé este seguimiento de pila en Windows 8.1, se verá bastante diferente en las versiones anteriores de Windows. El bucle modal COM ha sido muy modificado en Windows 8, también es un gran problema para los programas WinRT. No sé mucho al respecto, pero parece tener otro modelo de enhebrado STA llamado ASTA que realiza un tipo de bombeo más restrictivo, consagrado en CoWaitForMultipleObjects () agregado
ObjectNative :: WaitTimeout () es donde el SemaphoreSlim.Wait () dentro del método BlockingCollection.Take () comienza a ejecutar el código CLR. Lo ves avanzando a través de los niveles del código CLR interno para llegar a la mítica función MsgWaitHelper (), y luego cambia al infame loop de despachador modal COM.
El signo de la señal de bat que realiza el tipo de bombeo "incorrecto" en su programa es la llamada al método CliModalLoop :: PeekRPCAndDDEMessage (). En otras palabras, solo considera el tipo de mensajes de interoperabilidad que se publican en una ventana interna específica que distribuye las llamadas COM que cruzan un límite de apartamento. No inyectará los mensajes que están en la cola de mensajes para su propia ventana.
Este es un comportamiento comprensible, Windows solo puede estar absolutamente seguro de que la reentrada no matará a su programa cuando vea que su subproceso de interfaz de usuario está inactivo . Está inactivo cuando bombea el bucle de mensajes, una llamada a PeekMessage () o GetMessage () indica ese estado. El problema es que no te bombeas. Usted violó el contrato principal de un hilo STA, debe bombear el bucle de mensajes. Esperar que el bucle modal COM haga el bombeo para usted es esperanza ociosa.
Realmente puedes arreglar esto, aunque no te recomiendo que lo hagas. El CLR dejará que la propia aplicación realice la espera mediante un objeto SynchronizationContext.Current correctamente construido. Puede crear uno derivando su propia clase y anulando el método Wait (). Llame al método SetWaitNotificationRequired () para convencer al CLR de que debe dejarlo en sus manos. Una versión incompleta que demuestra el enfoque:
class MySynchronizationProvider : System.Threading.SynchronizationContext {
public MySynchronizationProvider() {
base.SetWaitNotificationRequired();
}
public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) {
for (; ; ) {
int result = MsgWaitForMultipleObjects(waitHandles.Length, waitHandles, waitAll, millisecondsTimeout, 8);
if (result == waitHandles.Length) System.Windows.Forms.Application.DoEvents();
else return result;
}
}
[DllImport("user32.dll")]
private static extern int MsgWaitForMultipleObjects(int cnt, IntPtr[] waitHandles, bool waitAll,
int millisecondTimeout, int mask);
}
E instálalo al comienzo de tu hilo:
System.ComponentModel.AsyncOperationManager.SynchronizationContext =
new MySynchronizationProvider();
Ahora verá su mensaje WM_TEST siendo despachado. Es la llamada a Application.DoEvents () que lo envió. Pude haberlo tapado utilizando PeekMessage + DispatchMessage, pero eso ofuscaría el peligro de este código, es mejor no pegar DoEvents () debajo de la tabla. Realmente estás jugando un juego de reentrada muy peligroso aquí. No use este código
Para resumir, la única esperanza de usar StaThreadScheduler correctamente es cuando se usa en código que ya implementó el contrato STA y las bombas como lo haría un hilo STA. En realidad, se suponía que era una curita de código antiguo en el que no era necesario el lujo para controlar el estado del hilo. Como cualquier código que comenzó su vida en un programa VB6 o complemento de Office. Experimentando un poco con eso, no creo que realmente pueda funcionar. También es notable que la necesidad de hacerlo debería eliminarse por completo con la disponibilidad de asych / await.
TL; DR: un interbloqueo dentro de una tarea ejecutada por StaTaskScheduler
. Versión larga:
Estoy usando StaTaskScheduler
de ParallelExtensionsExtras por Parallel Team, para alojar algunos objetos heredados STA COM suministrados por un tercero. La descripción de los detalles de implementación de StaTaskScheduler
dice lo siguiente:
La buena noticia es que la implementación de TPL puede ejecutarse en subprocesos MTA o STA, y tiene en cuenta las diferencias relevantes alrededor de las API subyacentes, como WaitHandle.WaitAll (que solo admite subprocesos MTA cuando se proporciona el método múltiples identificadores de espera).
Pensé que eso significaría que las partes de bloqueo de TPL usarían una API de espera que bombea mensajes, como CoWaitForMultipleHandles
, para evitar situaciones de interbloqueo cuando se CoWaitForMultipleHandles
un hilo STA.
En mi situación, creo que está sucediendo lo siguiente: in-proc STA COM objeto A realiza una llamada al objeto B fuera de pro, luego espera una devolución de llamada desde B como parte de la llamada saliente.
En una forma simplificada:
var result = await Task.Factory.StartNew(() =>
{
// in-proc object A
var a = new A();
// out-of-proc object B
var b = new B();
// A calls B and B calls back A during the Method call
return a.Method(b);
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);
El problema es que a.Method(b)
nunca regresa. Por lo que puedo decir, esto sucede porque una espera de bloqueo en algún lugar dentro de BlockingCollection<Task>
no bombea mensajes, por lo que mi suposición sobre la declaración citada es probablemente incorrecta.
EDITADO El mismo código funciona cuando se ejecuta en el subproceso de interfaz de usuario de la aplicación WinForms de prueba (es decir, proporciona TaskScheduler.FromCurrentSynchronizationContext()
lugar de staTaskScheduler
a Task.Factory.StartNew
).
¿Cuál es la forma correcta de resolver esto? ¿Debería implementar un contexto de sincronización personalizado, que explícitamente CoWaitForMultipleHandles
mensajes con CoWaitForMultipleHandles
, e instalarlo en cada subproceso STA iniciado por StaTaskScheduler
?
Si es así, ¿la implementación subyacente de BlockingCollection
llamará a mi método SynchronizationContext.Wait
? ¿Puedo usar SynchronizationContext.WaitHelper
para implementar SynchronizationContext.Wait
?
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleTestApp
{
class Program
{
// start and run an STA thread
static void RunStaThread(bool pump)
{
// test a blocking wait with BlockingCollection.Take
var tasks = new BlockingCollection<Task>();
var thread = new Thread(() =>
{
// Create a simple Win32 window
var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);
// subclass it with a custom WndProc
IntPtr prevWndProc = IntPtr.Zero;
var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
{
if (msg == NativeMethods.WM_TEST)
Console.WriteLine("WM_TEST processed");
return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
});
prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
if (prevWndProc == IntPtr.Zero)
throw new ApplicationException();
// post a test WM_TEST message to it
NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);
// BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
try { var task = tasks.Take(); }
catch (Exception e) { Console.WriteLine(e.Message); }
if (pump)
{
// NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
Console.WriteLine("Now start pumping...");
NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
}
});
thread.SetApartmentState(ApartmentState.STA);
thread.Start();
Thread.Sleep(2000);
// this causes the STA thread to end
tasks.CompleteAdding();
thread.Join();
}
static void Main(string[] args)
{
Console.WriteLine("Testing without pumping...");
RunStaThread(false);
Console.WriteLine("/nTest with pumping...");
RunStaThread(true);
Console.WriteLine("Press Enter to exit");
Console.ReadLine();
}
}
// Interop
static class NativeMethods
{
[DllImport("user32")]
public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);
[DllImport("user32")]
public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);
[DllImport("user32.dll")]
public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);
[DllImport("user32.dll")]
public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);
[DllImport("user32.dll")]
public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);
public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);
public const int GWL_WNDPROC = -4;
public const int WS_POPUP = unchecked((int)0x80000000);
public const int WM_USER = 0x0400;
public const int WM_TEST = WM_USER + 1;
}
}
Esto produce la salida:
Testing without pumping... The collection argument is empty and has been marked as complete with regards to additions. Test with pumping... The collection argument is empty and has been marked as complete with regards to additions. Now start pumping... WM_TEST processed Press Enter to exit
Mi comprensión de su problema: está utilizando StaTaskScheduler
solo para organizar el apartamento COM STA clásico para sus objetos COM heredados. No está ejecutando un bucle de mensaje central WinForms o WPF en el hilo STA de StaTaskScheduler
. Es decir, no está utilizando nada como Application.Run
, Application.DoEvents
o Dispatcher.PushFrame
dentro de ese hilo. Corrígeme si esta es una suposición incorrecta.
Por sí solo, StaTaskScheduler
no instala ningún contexto de sincronización en los hilos STA que crea. Por lo tanto, confía en el CLR para transmitir mensajes por usted. Solo he encontrado una confirmación implícita de que el CLR funciona con hilos STA, en this por Chris Brumme:
Sigo diciendo que el bloqueo administrado realizará "algún bombeo" cuando se llame a un hilo STA. ¿No sería genial saber exactamente qué se bombeará? Desafortunadamente, el bombeo es un arte negro que está más allá de la comprensión mortal. En Win2000 y versiones posteriores, simplemente delegamos en el servicio CoWaitForMultipleHandles de OLE32 .
Esto indica que CLR usa CoWaitForMultipleHandles
internamente para hilos de STA. Además, los documentos de MSDN para el indicador COWAIT_DISPATCH_WINDOW_MESSAGES
mencionan esto :
... en STA solo se envía un pequeño conjunto de mensajes con recuadro especial.
Hice algunas investigaciones al respecto , pero no pude obtener el WM_TEST
de su código de muestra con CoWaitForMultipleHandles
, lo discutimos en los comentarios a su pregunta. WM_TEST
entendido, el pequeño conjunto de mensajes con recuadro especial antes mencionado está realmente limitado a algunos mensajes COM Marshaller específicos, y no incluye ningún mensaje regular de propósito general como su WM_TEST
.
Por lo tanto, para responder a su pregunta:
... ¿Debería implementar un contexto de sincronización personalizado, que explícitamente bombearía mensajes con CoWaitForMultipleHandles, e instalarlo en cada subproceso STA iniciado por StaTaskScheduler?
Sí, creo que la creación de un contexto de sincronización personalizado y la superación de SynchronizationContext.Wait
es la solución correcta.
Sin embargo, debe evitar el uso de CoWaitForMultipleHandles
, y usar MsgWaitForMultipleObjectsEx
lugar . Si MsgWaitForMultipleObjectsEx
indica que hay un mensaje pendiente en la cola, debe bombearlo manualmente con PeekMessage(PM_REMOVE)
y DispatchMessage
. Luego debe continuar esperando los identificadores, todo dentro de la misma llamada SynchronizationContext.Wait
.
Tenga en cuenta que hay una diferencia sutil pero importante entre MsgWaitForMultipleObjectsEx
y MsgWaitForMultipleObjects
. Este último no regresa y sigue bloqueando, si ya hay un mensaje visto en la cola (por ejemplo, con PeekMessage(PM_NOREMOVE)
o GetQueueStatus
), pero no eliminado. Eso no es bueno para el bombeo, porque sus objetos COM podrían estar usando algo como PeekMessage
para inspeccionar la cola de mensajes. Eso podría causar más tarde que MsgWaitForMultipleObjects
bloquee cuando no se espera.
OTOH, MsgWaitForMultipleObjectsEx
con indicador MWMO_INPUTAVAILABLE
no tiene ese inconveniente, y volvería en este caso.
Hace un tiempo, creé una versión personalizada de StaTaskScheduler
( disponible aquí como ThreadAffinityTaskScheduler
) para tratar de resolver un problema diferente : mantener un conjunto de hilos con afinidad de hilos para posteriores ThreadAffinityTaskScheduler
en await
. La afinidad del hilo es vital si usa objetos STA COM en múltiples awaits
. El StaTaskScheduler
original muestra este comportamiento solo cuando su grupo está limitado a 1 subproceso.
Así que seguí adelante y experimenté un poco más con su caso WM_TEST
. Originalmente, instalé una instancia de la clase estándar SynchronizationContext
en el hilo STA. El mensaje WM_TEST
no se bombeó, lo que se esperaba.
Luego anulo SynchronizationContext.Wait
para reenviarlo a SynchronizationContext.WaitHelper
. Lo llamaron, pero aun así no bombeó.
Finalmente, implementé un bucle de bomba de mensajes con todas las funciones, aquí está la parte central de esto:
// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
// MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
// even if there''s a message already seen but not removed in the message queue
nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
count, waitHandles,
(uint)remainingTimeout,
QS_MASK,
NativeMethods.MWMO_INPUTAVAILABLE);
if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
return managedResult;
// there is a message, pump and dispatch it
if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
{
NativeMethods.TranslateMessage(ref msg);
NativeMethods.DispatchMessage(ref msg);
}
if (hasTimedOut())
return WaitHandle.WaitTimeout;
}
Esto funciona, WM_TEST
se bombea. A continuación hay una versión adaptada de su prueba:
public static async Task RunAsync()
{
using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
{
Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
await staThread.Run(async () =>
{
Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
// create a simple Win32 window
IntPtr hwnd = CreateTestWindow();
// Post some WM_TEST messages
Console.WriteLine("Post some WM_TEST messages...");
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
Console.WriteLine("Press Enter to continue...");
await ReadLineAsync();
Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));
Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
}, CancellationToken.None);
}
Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}
El resultado :
Initial thread #9 On STA thread #10 Post some WM_TEST messages... Press Enter to continue... WM_TEST processed: 1 WM_TEST processed: 2 WM_TEST processed: 3 After await, thread #10 Pending messages in the queue: False Exiting STA thread #10 Current thread #12 Press any key to exit
Tenga en cuenta que esta implementación admite tanto la afinidad de subprocesos (permanece en el subproceso n. ° 10 después de await
) como el bombeo del mensaje. El código fuente completo contiene partes ThreadAffinityTaskScheduler
( ThreadAffinityTaskScheduler
y ThreadWithAffinityContext
) y está disponible aquí como una aplicación de consola autónoma . No ha sido probado exhaustivamente, por lo tanto, úselo bajo su propio riesgo.