parallel - plinq c#
Cómo se cancela correctamente después de una tarea larga se cancela (1)
La programación cuidadosa es lo único que lo va a cortar. Incluso si cancela la operación, es posible que tenga una operación pendiente que no se completa en un período de moda. Podría ser una operación de bloqueo en punto muerto. En este caso, su programa no terminará realmente.
Por ejemplo, si llamo a su método CleanUp varias veces o si no llamo a Start primero, tengo la sensación de que se va a bloquear.
Un tiempo de espera de 2 segundos durante la limpieza, se siente más arbitrario de lo planeado, e incluso llegaría a cerciorarme de que las cosas se apaguen correctamente o se cuelguen / cuelguen (nunca querrá dejar cosas concurrentes en un estado desconocido).
Además, IsRunning
está explícitamente establecido, no inferido del estado del objeto.
En busca de inspiración, me gustaría que miraras una clase similar que escribí recientemente, es un patrón productor / consumidor que hace su trabajo en un hilo de fondo. Puede encontrar ese código fuente en CodePlex . Sin embargo, esto fue diseñado para resolver un problema muy específico.
Aquí, la cancelación se resuelve ingresando un tipo específico que solo el hilo de trabajo reconoce y, por lo tanto, comienza a cerrarse. Esto también garantiza que nunca cancele el trabajo pendiente, solo se consideran unidades de trabajo completas.
Para mejorar esta situación un poco, puede tener un temporizador separado para el trabajo actual y cancelar o cancelar el trabajo incompleto si se cancela. Ahora, implementar una transacción como el comportamiento va a llevar un poco de prueba y error porque necesita ver cada posible caso de esquina y preguntarse, ¿qué sucede si el programa falla aquí? Idealmente, todas estas rutas de código conducen a un estado recuperable o conocido desde el que puede reanudar su trabajo. Pero como creo que ya has adivinado, eso requerirá una programación cuidadosa y muchas pruebas.
Creé una clase cuyo objetivo es abstraer el control del acceso concurrente a una cola.
La clase está diseñada para ser instanciada en un único hilo, escrita por varios hilos y luego leída de un solo hilo posterior.
Tengo una única tarea de ejecución larga generada dentro de la clase que realizará un ciclo de bloqueo y activará un evento si un elemento se elimina con éxito.
Mi pregunta es esta: ¿mi implementación de la cancelación de la tarea de larga ejecución Y el posterior uso correcto de limpieza / restablecimiento del objeto CancellationTokenSource
?
Idealmente, me gustaría que un objeto activo pueda detenerse y reiniciarse mientras se mantiene la disponibilidad para agregar a la cola.
Utilicé el artículo de Peter Bromberg como base: Producer / Consumer Queue y BlockingCollection in C # 4.0
Código a continuación:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
public delegate void DeliverNextQueuedItemHandler<T>(T item);
public sealed class SOQueueManagerT<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning { get; private set; }
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public CancellationTokenSource CancellationTokenSource
{
get
{
if (_canceller == null)
_canceller = new CancellationTokenSource();
return _canceller;
}
}
public SOQueueManagerT()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
IsRunning = false;
}
public void Start()
{
if (_listener == null)
{
IsRunning = true;
_listener = Task.Factory.StartNew(() =>
{
while (!CancellationTokenSource.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
OnNextItem(item);
}
}
}
},
CancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
CancellationTokenSource.Cancel();
CleanUp();
}
}
public void Add(T item)
{
_queue.Add(item);
}
private void CleanUp()
{
_listener.Wait(2000);
if (_listener.IsCompleted)
{
IsRunning = false;
_listener = null;
_canceller = null;
}
}
}
}
ACTUALIZAR Aquí es lo que he ido al final. No es perfecto, pero hasta ahora está haciendo el trabajo.
public sealed class TaskQueueManager<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning
{
get
{
if (_listener == null)
return false;
else if (_listener.Status == TaskStatus.Running ||
_listener.Status == TaskStatus.Created ||
_listener.Status == TaskStatus.WaitingForActivation ||
_listener.Status == TaskStatus.WaitingToRun ||
_listener.IsCanceled)
return true;
else
return false;
}
}
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public TaskQueueManager()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
}
public void Start()
{
if (_listener == null)
{
_canceller = new CancellationTokenSource();
_listener = Task.Factory.StartNew(() =>
{
while (!_canceller.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
try
{
OnNextItem(item);
}
catch (Exception e)
{
//log or call an event
}
}
}
}
},
_canceller.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
_canceller.Cancel();
if (_listener.IsCanceled && !_listener.IsCompleted)
_listener.Wait();
_listener = null;
_canceller = null;
}
}
public void Add(T item)
{
if (item != null)
{
_queue.Add(item);
}
else
{
throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
}
}
}