thread squad que method lock for example book black c# multithreading synchronization

c# - que - thread sync black squad



¿Cómo gestionaría una llamada a un hilo de fondo y lo ejecutaría dentro de su contexto en C#? (3)

Olvídate de dispatcher.invoke, olvida el hilo de la interfaz de usuario. Imagine que tengo 2 hilos de trabajo y quiero enviar mi evento a ambos hilos de trabajo; ¿Qué puedo usar?

Utilizaría dos planificadores de tareas para esto (como sugiere la respuesta de @ YuvalItzchakov ), uno para cada hilo. También usaría un contexto de sincronización personalizado para el hilo de trabajo, como sugiere la respuesta de @ TheMouthofaCow .

Es decir, para un hilo de interfaz de usuario, solo TaskScheduler.FromCurrentSynchronizationContext() y usaría TaskScheduler.FromCurrentSynchronizationContext() . Para el hilo de trabajo, iniciaría un hilo e instalaría un contexto de sincronización personalizado en él, luego usar FromCurrentSynchronizationContext también.

Algo como esto (no probado):

// UI thread var uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(); using (var worker = new ThreadWithPumpingSyncContext()) { // call the worker thread var result = await worker.Run(async () => { // worker thread await Task.Delay(1000); // call the UI thread await Task.Factory.StartNew(async () => { // UI thread await Task.Delay(2000); MessageBox.Show("UI Thread!"), // call the worker thread await worker.Run(() => { // worker thread Thread.Sleep(3000) }); // UI thread await Task.Delay(4000); }, uiTaskScheduler).Unwrap(); // worker thread await Task.Delay(5000); return Type.Missing; // or implement a non-generic version of Run }); } // ... // ThreadWithSerialSyncContext renamed to ThreadWithPumpingSyncContext class ThreadWithPumpingSyncContext : SynchronizationContext, IDisposable { public readonly TaskScheduler Scheduler; // can be used to run tasks on the pumping thread readonly Task _mainThreadTask; // wrap the pumping thread as Task readonly BlockingCollection<Action> _actions = new BlockingCollection<Action>(); // track async void methods readonly object _lock = new Object(); volatile int _pendingOps = 0; // the number of pending async void method calls volatile TaskCompletionSource<Empty> _pendingOpsTcs = null; // to wait for pending async void method calls public ThreadWithPumpingSyncContext() { var tcs = new TaskCompletionSource<TaskScheduler>(); _mainThreadTask = Task.Factory.StartNew(() => { try { SynchronizationContext.SetSynchronizationContext(this); tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext()); // pumping loop foreach (var action in _actions.GetConsumingEnumerable()) action(); } finally { SynchronizationContext.SetSynchronizationContext(null); } }, TaskCreationOptions.LongRunning); Scheduler = tcs.Task.Result; } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { lock (_lock) { if (_pendingOpsTcs != null && _pendingOpsTcs.Task.IsCompleted) throw new InvalidOperationException("OperationStarted"); // shutdown requested _pendingOps++; } } public override void OperationCompleted() { lock (_lock) { _pendingOps--; if (0 == _pendingOps && null != _pendingOpsTcs) _pendingOpsTcs.SetResult(Empty.Value); } } public override void Post(SendOrPostCallback d, object state) { _actions.Add(() => d(state)); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } // Task start helpers public Task Run(Action action, CancellationToken token = default(CancellationToken)) { return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler); } public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken)) { return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap(); } public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token = default(CancellationToken)) { return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap(); } // IDispose public void Dispose() { var disposingAlready = false; lock (_lock) { disposingAlready = null != _pendingOpsTcs; if (!disposingAlready) { // do not allow new async void method calls _pendingOpsTcs = new TaskCompletionSource<Empty>(); if (0 == _pendingOps) _pendingOpsTcs.TrySetResult(Empty.Value); } } // outside the lock if (!disposingAlready) { // wait for pending async void method calls _pendingOpsTcs.Task.Wait(); // request the end of the pumping loop _actions.CompleteAdding(); } _mainThreadTask.Wait(); } struct Empty { public static readonly Empty Value = default(Empty); } }

Esto le da algún tipo de ejecución asincrónica cooperativa entre dos hilos.

Digamos que tengo un subproceso de IU y un subproceso en segundo plano que se suscriben a una ObservableCollection segura para subprocesos personalizada que creé para que cada vez que la colección cambie ejecute la devolución de llamada dentro del contexto apropiado.

Ahora digamos que agrego algo a la colección (de cualquier hilo, no importa cuál) y ahora tiene que ordenar la devolución de llamada a ambos hilos. Para ejecutar la devolución de llamada dentro del contexto de la interfaz de usuario, simplemente puedo hacer un Dispatcher.Invoke (...) y ejecuta la devolución de llamada dentro del contexto de la interfaz de usuario; estupendo.

Ahora quiero ejecutar la devolución de llamada dentro del contexto del hilo de fondo (no me pregunten por qué, puede ser que lo que sea que esté accediendo esté affinitizado a ese hilo específico o tenga un almacenamiento local de subprocesos al que necesite acceder); ¿Como podría hacerlo?

Los subprocesos de fondo no tienen un mecanismo de envío / mensaje de bombeo así que no puedo usar un despachador o un SincronizaciónContexto, entonces ¿cómo se podría interrumpir un hilo de fondo y hacer que ejecute mi devolución de llamada dentro de su contexto?

EDITAR: sigo recibiendo respuestas que obviamente están mal, así que no debí haberme explicado correctamente. Olvídese de los subprocesadores de la interfaz de usuario y los operadores de la interfaz de usuario chicos, estaban destinados a llamar a la cadena de interfaz de usuario, eso es todo! Imagine dos hilos de trabajo A y B. Si A modifica mi colección, A se encarga de coordinar la devolución de llamada consigo mismo y con B. Ejecutar la devolución de llamada dentro del contexto de A es fácil, ya que A fue quien lo activó: simplemente llame al delegado en su lugar . Ahora A necesita ordenar la devolución de llamada a B ... ¿y ahora qué? Dispatcher y SynContext son inútiles en esta situación.


Tenemos un componente que siempre debe ejecutarse en el mismo hilo de fondo STA. Lo hemos logrado escribiendo nuestro propio SynchronizationContext . Este artículo es muy útil.

En resumen, no quiere interrumpir su hilo de trabajo, quiere que permanezca inactivo esperando la siguiente tarea que debe ejecutar. Agrega trabajos a una cola y procesa esos trabajos en orden. SynchronizationContext es una abstracción conveniente alrededor de esa idea. SynchronizationContext es el propietario del subproceso de trabajo, y el mundo exterior no interactúa directamente con el subproceso: los llamadores que desean ejecutar una tarea en el subproceso de trabajo realizan la solicitud al contexto que agrega el trabajo a la cola de trabajos. El trabajador está trabajando o sondeando la cola hasta que se agregue otro trabajo, momento en el que comienza a funcionar nuevamente.

Actualizar

Aquí hay un ejemplo:

using System.Collections.Concurrent; using System.Threading; class LoadBalancedContext : SynchronizationContext { readonly Thread thread1; readonly Thread thread2; readonly ConcurrentQueue<JobInfo> jobs = new ConcurrentQueue<JobInfo>(); public LoadBalancedContext() { this.thread1 = new Thread(this.Poll) { Name = "T1" }; this.thread2 = new Thread(this.Poll) { Name = "T2" }; this.thread1.Start(); this.thread2.Start(); } public override void Post(SendOrPostCallback d, object state) { this.jobs.Enqueue(new JobInfo { Callback = d, State = state }); } void Poll() { while (true) { JobInfo info; if (this.jobs.TryDequeue(out info)) { info.Callback(info.State); } Thread.Sleep(100); } } class JobInfo { public SendOrPostCallback Callback { get; set; } public object State { get; set; } } }

Uso:

var context = new LoadBalancedContext(); SendOrPostCallback callback = x => { Trace.WriteLine(Thread.CurrentThread.Name); Thread.Sleep(200); }; context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); Thread.Sleep(1000);

El caso Send es un poco más complicado ya que tendrá que escuchar un evento de reinicio. Esto no es calidad de producción, pero debe darle una idea de lo que debe hacer.

Espero que ayude.


Una buena idea también podría ser ampliar tu propio TaskScheduler, tendrás que implementar tres métodos:

QueueTask, TryExecuteTaskInline y GetScheduledTasks

puedes leer sobre esto aquí

De esta forma, cada vez que necesite ejecutar algo en su hilo dedicado, puede hacer lo siguiente:

Task.Factory.StartNew(() => { SomeAction }, SomeCancellationToken, TaskCreationOptions new MyTaskSchedular());

y haz que se ejecute en tu hilo.