c# .net task-parallel-library async-await .net-4.6

c# - TaskContinuationOptions.RunContinuationAsynchronously y Stack Dives



.net task-parallel-library (2)

En esta publicación del blog , Stephan Toub describe una nueva característica que se incluirá en .NET 4.6 que agrega otro valor a las enumeraciones de Tareas de Creación y Tareas de RunContinuationsAsynchronously Llamadas, que se llama RunContinuationsAsynchronously .

El explica:

"Hablé sobre una ramificación de los métodos {Try} Set * de llamada en TaskCompletionSource, que cualquier continuación sincrónica fuera de TaskCompletionSource''s Task podría ejecutarse sincrónicamente como parte de la llamada. Si tuviéramos que invocar SetResult aquí mientras manteníamos el bloqueo, entonces continuamos sincronizándonos. fuera de esa Tarea se ejecutaría mientras se mantiene el bloqueo, y eso podría dar lugar a problemas muy reales. Por lo tanto, mientras mantenemos el bloqueo, tomamos el TaskCompletionSource para completar, pero no lo completamos todavía, demorando hasta que el bloqueo ha sido liberado"

Y da el siguiente ejemplo para demostrar:

private SemaphoreSlim _gate = new SemaphoreSlim(1, 1); private async Task WorkAsync() { await _gate.WaitAsync().ConfigureAwait(false); try { // work here } finally { _gate.Release(); } }

Ahora imagine que tiene muchas llamadas a WorkAsync:

await Task.WhenAll(from i in Enumerable.Range(0, 10000) select WorkAsync());

Acabamos de crear 10,000 llamadas a WorkAsync que serán serializadas apropiadamente en el semáforo. Una de las tareas ingresará a la región crítica y las otras formarán una cola en la llamada de WaitAsync, dentro de SemaphoreSlim, encolando efectivamente la tarea que se completará cuando alguien llame Liberar. Si la Liberación completó esa Tarea de forma síncrona, entonces cuando la primera tarea llame a Liberación, comenzará a ejecutar la segunda tarea de manera síncrona, y cuando llame a Liberación, comenzará a ejecutar la tercera tarea de manera síncrona, y así sucesivamente. Si la sección "// trabaja aquí" del código anterior no incluyó ninguna espera que haya dado lugar, entonces potencialmente vamos a bucear aquí y, finalmente, potencialmente a volar la pila.

Me está costando entender la parte en la que él habla acerca de cómo ejecutar la continuación de forma síncrona.

Pregunta

¿Cómo podría esto causar una caída de pila? Más aún, ¿y qué es lo que RunContinuationsAsynchronously va a hacer de forma RunContinuationsAsynchronously eficaz para resolver ese problema?


¿Cómo podría esto causar una caída de pila? Más aún, ¿y qué es lo que RunContinuations va a hacer de forma asíncrona y eficaz para resolver ese problema?

i3arnon provides una muy buena explicación de las razones detrás de la introducción de RunContinuationsAsynchronously . Mi respuesta es más bien ortogonal a la suya; de hecho, también estoy escribiendo esto para mi propia referencia (yo mismo no recordaré ninguna sutileza de esto dentro de medio año a partir de ahora :)

En primer lugar, veamos cómo la opción RunContinuationsAsynchronously es diferente de Task.Run(() => tcs.SetResult(result)) o los "me gusta". Probemos una aplicación de consola simple:

using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplications { class Program { static void Main(string[] args) { ThreadPool.SetMinThreads(100, 100); Console.WriteLine("start, " + new { System.Environment.CurrentManagedThreadId }); var tcs = new TaskCompletionSource<bool>(); // test ContinueWith-style continuations (TaskContinuationOptions.ExecuteSynchronously) ContinueWith(1, tcs.Task); ContinueWith(2, tcs.Task); ContinueWith(3, tcs.Task); // test await-style continuations ContinueAsync(4, tcs.Task); ContinueAsync(5, tcs.Task); ContinueAsync(6, tcs.Task); Task.Run(() => { Console.WriteLine("before SetResult, " + new { System.Environment.CurrentManagedThreadId }); tcs.TrySetResult(true); Thread.Sleep(10000); }); Console.ReadLine(); } // log static void Continuation(int id) { Console.WriteLine(new { continuation = id, System.Environment.CurrentManagedThreadId }); Thread.Sleep(1000); } // await-style continuation static async Task ContinueAsync(int id, Task task) { await task.ConfigureAwait(false); Continuation(id); } // ContinueWith-style continuation static Task ContinueWith(int id, Task task) { return task.ContinueWith( t => Continuation(id), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } } }

Observe cómo todas las continuaciones se ejecutan de manera síncrona en el mismo hilo donde se ha llamado a TrySetResult :

start, { CurrentManagedThreadId = 1 } before SetResult, { CurrentManagedThreadId = 3 } { continuation = 1, CurrentManagedThreadId = 3 } { continuation = 2, CurrentManagedThreadId = 3 } { continuation = 3, CurrentManagedThreadId = 3 } { continuation = 4, CurrentManagedThreadId = 3 } { continuation = 5, CurrentManagedThreadId = 3 } { continuation = 6, CurrentManagedThreadId = 3 }

Ahora, ¿qué sucede si no queremos que esto suceda y queremos que cada continuación se ejecute de forma asíncrona (es decir, en paralelo con otras continuaciones y posiblemente en otro subproceso, en ausencia de cualquier contexto de sincronización)?

Hay un truco que podría hacerlo para las continuaciones de estilo de await , instalando un contexto de sincronización temporal falso (más detalles here ):

public static class TaskExt { class SimpleSynchronizationContext : SynchronizationContext { internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext(); }; public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations) { if (!asyncAwaitContinuations) { @this.TrySetResult(result); return; } var sc = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance); try { @this.TrySetResult(result); } finally { SynchronizationContext.SetSynchronizationContext(sc); } } }

Ahora, usando tcs.TrySetResult(true, asyncAwaitContinuations: true) en nuestro código de prueba:

start, { CurrentManagedThreadId = 1 } before SetResult, { CurrentManagedThreadId = 3 } { continuation = 1, CurrentManagedThreadId = 3 } { continuation = 2, CurrentManagedThreadId = 3 } { continuation = 3, CurrentManagedThreadId = 3 } { continuation = 4, CurrentManagedThreadId = 4 } { continuation = 5, CurrentManagedThreadId = 5 } { continuation = 6, CurrentManagedThreadId = 6 }

Observe cómo las continuaciones de await ahora se ejecutan en paralelo (aunque, aún después de todas las ContinueWith sincrónicas de ContinueWith ).

Este asyncAwaitContinuations: true lógica es un truco y funciona solo para await continuaciones. El nuevo RunContinuationsAsynchronously hace funcionar de manera consistente para cualquier tipo de continuación, adjunto a TaskCompletionSource.Task .

Otro aspecto RunContinuationsAsynchronously de RunContinuationsAsynchronously forma RunContinuationsAsynchronously es que cualquier continuación de estilo de await programada para reanudarse en un contexto de sincronización específico se ejecutará en ese contexto de forma asincrónica (utilizando SynchronizationContext.Post , incluso si TCS.Task completa en el mismo contexto (a diferencia del comportamiento actual de TCS.SetResult ). ContinueWith - TCS.SetResult continuations también se ejecutará de forma asincrónica por sus programadores de tareas correspondientes (la mayoría de las veces, TaskScheduler.Default o TaskScheduler.FromCurrentSynchronizationContext ). No se TaskScheduler.TryExecuteTaskInline través de TaskScheduler.TryExecuteTaskInline . Los comentarios a su blog , y también se puede ver aquí en Task.cs de CoreCLR .

¿Por qué deberíamos preocuparnos por imponer la asincronía en todas las continuaciones?

Normalmente lo necesito cuando trato con métodos async que se ejecutan de forma cooperativa (co-rutinas).

Un ejemplo simple es un procesamiento asíncrono que se puede pausar: un proceso asíncrono detiene / reanuda la ejecución de otro. Su flujo de trabajo de ejecución se sincroniza en ciertos puntos de await , y TaskCompletionSource se utiliza para este tipo de sincronización, directa o indirectamente.

A continuación se muestra un código de ejemplo listo para jugar que utiliza una adaptación de PauseTokenSource de Stephen Toub. Aquí, un método async StartAndControlWorkAsync inicia y hace una pausa / reanuda periódicamente otro método async , DoWorkAsync . Intente cambiar asyncAwaitContinuations: true a asyncAwaitContinuations: false y vea que la lógica está completamente rota:

using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp { class Program { static void Main() { StartAndControlWorkAsync(CancellationToken.None).Wait(); } // Do some work which can be paused/resumed public static async Task DoWorkAsync(PauseToken pause, CancellationToken token) { try { var step = 0; while (true) { token.ThrowIfCancellationRequested(); Console.WriteLine("Working, step: " + step++); await Task.Delay(1000).ConfigureAwait(false); Console.WriteLine("Before await pause.WaitForResumeAsync()"); await pause.WaitForResumeAsync(); Console.WriteLine("After await pause.WaitForResumeAsync()"); } } catch (Exception e) { Console.WriteLine("Exception: {0}", e); throw; } } // Start DoWorkAsync and pause/resume it static async Task StartAndControlWorkAsync(CancellationToken token) { var pts = new PauseTokenSource(); var task = DoWorkAsync(pts.Token, token); while (true) { token.ThrowIfCancellationRequested(); Console.WriteLine("Press enter to pause..."); Console.ReadLine(); Console.WriteLine("Before pause requested"); await pts.PauseAsync(); Console.WriteLine("After pause requested, paused: " + pts.IsPaused); Console.WriteLine("Press enter to resume..."); Console.ReadLine(); Console.WriteLine("Before resume"); pts.Resume(); Console.WriteLine("After resume"); } } // Based on Stephen Toub''s PauseTokenSource // http://blogs.msdn.com/b/pfxteam/archive/2013/01/13/cooperatively-pausing-async-methods.aspx // the main difference is to make sure that when the consumer-side code - which requested the pause - continues, // the producer-side code has already reached the paused (awaiting) state. // E.g. a media player "Pause" button is clicked, gets disabled, playback stops, // and only then "Resume" button gets enabled public class PauseTokenSource { internal static readonly Task s_completedTask = Task.Delay(0); readonly object _lock = new Object(); bool _paused = false; TaskCompletionSource<bool> _pauseResponseTcs; TaskCompletionSource<bool> _resumeRequestTcs; public PauseToken Token { get { return new PauseToken(this); } } public bool IsPaused { get { lock (_lock) return _paused; } } // request a resume public void Resume() { TaskCompletionSource<bool> resumeRequestTcs = null; lock (_lock) { resumeRequestTcs = _resumeRequestTcs; _resumeRequestTcs = null; if (!_paused) return; _paused = false; } if (resumeRequestTcs != null) resumeRequestTcs.TrySetResult(true, asyncAwaitContinuations: true); } // request a pause (completes when paused state confirmed) public Task PauseAsync() { Task responseTask = null; lock (_lock) { if (_paused) return _pauseResponseTcs.Task; _paused = true; _pauseResponseTcs = new TaskCompletionSource<bool>(); responseTask = _pauseResponseTcs.Task; _resumeRequestTcs = null; } return responseTask; } // wait for resume request internal Task WaitForResumeAsync() { Task resumeTask = s_completedTask; TaskCompletionSource<bool> pauseResponseTcs = null; lock (_lock) { if (!_paused) return s_completedTask; _resumeRequestTcs = new TaskCompletionSource<bool>(); resumeTask = _resumeRequestTcs.Task; pauseResponseTcs = _pauseResponseTcs; _pauseResponseTcs = null; } if (pauseResponseTcs != null) pauseResponseTcs.TrySetResult(true, asyncAwaitContinuations: true); return resumeTask; } } // consumer side public struct PauseToken { readonly PauseTokenSource _source; public PauseToken(PauseTokenSource source) { _source = source; } public bool IsPaused { get { return _source != null && _source.IsPaused; } } public Task WaitForResumeAsync() { return IsPaused ? _source.WaitForResumeAsync() : PauseTokenSource.s_completedTask; } } } public static class TaskExt { class SimpleSynchronizationContext : SynchronizationContext { internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext(); }; public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations) { if (!asyncAwaitContinuations) { @this.TrySetResult(result); return; } var sc = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance); try { @this.TrySetResult(result); } finally { SynchronizationContext.SetSynchronizationContext(sc); } } } }

No quería usar Task.Run(() => tcs.SetResult(result)) porque sería redundante enviar continuaciones a ThreadPool cuando ya están programadas para ejecutarse de forma asíncrona en un subproceso de interfaz de usuario con una adecuada contexto de sincronización. Al mismo tiempo, si StartAndControlWorkAsync y DoWorkAsync ejecutan en el mismo contexto de sincronización de UI, también tendríamos una inmersión de pila (si tcs.SetResult(result) se usa sin Task.Run o SynchronizationContext.Post envoltorio).

Ahora, RunContinuationsAsynchronously es probablemente la mejor solución para este problema.


El concepto clave aquí es que la continuación de una tarea puede ejecutarse sincrónicamente en el mismo hilo que completó la tarea antecedente.

Imaginemos que esta es la implementación de SemaphoreSlim.Release (en realidad es AsyncSemphore de Toub):

public void Release() { TaskCompletionSource<bool> toRelease = null; lock (m_waiters) { if (m_waiters.Count > 0) toRelease = m_waiters.Dequeue(); else ++m_currentCount; } if (toRelease != null) toRelease.SetResult(true); }

Podemos ver que completa de forma sincrónica una tarea (utilizando TaskCompletionSource ). En este caso, si WorkAsync no tiene otros puntos asíncronos (es decir, no hay ninguna await en absoluto, o todas las await están en una tarea ya completada) y llamar a _gate.Release() puede completar una llamada pendiente a _gate.WaitAsync() sincronizada En el mismo hilo, puede alcanzar un estado en el que un solo hilo libera secuencialmente el semáforo, completa la siguiente llamada pendiente, ejecuta // work here y luego libera el semáforo nuevamente, etc. etc.

Esto significa que el mismo hilo va más y más profundo en la pila, por lo tanto, bucear en la pila.

RunContinuationsAsynchronously se asegura de que la continuación no se ejecute de forma síncrona, por lo que el hilo que libera el semáforo se mueve y la continuación se programa para otro hilo (el cual depende de los otros parámetros de continuación, por ejemplo, TaskScheduler )

Esto se asemeja lógicamente a la publicación de la finalización en ThreadPool :

public void Release() { TaskCompletionSource<bool> toRelease = null; lock (m_waiters) { if (m_waiters.Count > 0) toRelease = m_waiters.Dequeue(); else ++m_currentCount; } if (toRelease != null) Task.Run(() => toRelease.SetResult(true)); }