workflow foundation 4 - Actividades de flujo de trabajo del bus de servicio
workflow-foundation-4 async-await (3)
Las entidades de cola proporcionan las siguientes capacidades: "La capacidad de especificar una hora en la que el mensaje se agregará a la cola".
¿Después de un tiempo de espera que no puede recibir debido a esta regla?
La resolución de mayo es:
Detección de duplicados de mensajes entrantes, lo que permite a los clientes enviar el mismo mensaje varias veces sin consecuencias adversas.
Me gustaría acceder a Colas de bus de servicio y Temas de Workflows con algunas actividades específicas.
No pude encontrar nada adecuado a este escenario ( este artículo de MSDN y este artículo de Roman Kiss ) son los más cercanos.
Me gustaría diseñar una actividad personalizada que use el QueueClient para recibir de forma asincrónica los mensajes intermediarios, utilizando el método BeginReceive implementado con el patrón async / await (vea mi pregunta al respecto).
En primer lugar, me gustaría preguntar si hay alguna razón por la que preferiría el enfoque sugerido (WCF adaptado) en lugar del método deseado (utilizando el QueueClient).
Entonces, agradecería que me ayudes a diseñarlo de una manera amigable para la persistencia.
Actualizar:
Esto es lo que intenté hasta ahora:
public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
[RequiredArgument]
public InArgument<string> ConnectionString { get; set; }
[RequiredArgument]
public InArgument<string> Path { get; set; }
protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
{
var connectionString = this.ConnectionString.Get(context);
var path = this.Path.Get(context);
var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
var cts = new CancellationTokenSource();
context.UserState = new ReceiveState
{
CancellationTokenSource = cts,
QueueClient = queueClient
};
var task = ExecuteAsync(context, cts.Token);
var tcs = new TaskCompletionSource<BrokeredMessage>(state);
task.ContinueWith(
t =>
{
if (t.IsFaulted)
{
tcs.TrySetException(t.Exception.InnerExceptions);
}
else if (t.IsCanceled)
{
tcs.TrySetCanceled();
}
else
{
tcs.TrySetResult(t.Result);
}
if (callback != null)
{
callback(tcs.Task);
}
});
return tcs.Task;
}
protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
{
var task = (Task<BrokeredMessage>)result;
try
{
return task.Result;
}
catch (OperationCanceledException)
{
if (context.IsCancellationRequested)
{
context.MarkCanceled();
}
else
{
throw;
}
return null; // or throw?
}
catch (AggregateException exception)
{
if (exception.InnerException is OperationCanceledException)
{
if (context.IsCancellationRequested)
{
context.MarkCanceled();
}
else
{
throw;
}
return null; // or throw?
}
ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
throw;
}
}
protected override void Cancel(AsyncCodeActivityContext context)
{
var state = (ReceiveState)context.UserState;
state.CancellationTokenSource.Cancel();
}
private async Task<BrokeredMessage> ExecuteAsync(
AsyncCodeActivityContext context, CancellationToken cancellationToken)
{
var receiveState = context.UserState as ReceiveState;
var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
var completionTask = receiveTask.ContinueWith(
t =>
{
BrokeredMessage result;
if (t.IsCanceled)
{
context.MarkCanceled();
result = null;
}
else if (t.IsFaulted)
{
result = null;
}
else
{
t.Result.Complete();
result = t.Result;
}
receiveState.QueueClient.Close();
return result;
},
cancellationToken);
return await completionTask;
}
private class ReceiveState
{
public CancellationTokenSource CancellationTokenSource { get; set; }
public QueueClient QueueClient { get; set; }
}
}
Y probado de esta manera (usando el Bus de servicio local de Windows Server):
var connectionString = new Variable<string>
{
Default = connectionStringValue
};
var path = new Variable<string>
{
Default = pathValue
};
var test = new While
{
Body =
new Pick
{
Branches =
{
new PickBranch
{
Trigger =
new AsyncReceiveBrokeredMessage
{
ConnectionString = new InArgument<string>(connectionString),
Path = new InArgument<string>(path)
},
Action =
new WriteLine
{
Text =
"Received message"
}
},
new PickBranch
{
Trigger =
new Delay
{
Duration = TimeSpan.FromSeconds(10)
},
Action =
new WriteLine
{
Text =
"Timeout!"
}
}
}
},
Condition = true,
Variables = { connectionString, path }
};
WorkflowInvoker.Invoke(test);
Recibo mensajes como se esperaba si los envío continuamente. Los problemas vienen después del primer tiempo de espera, porque entonces ya no recibo ningún mensaje. Cualquier aclaración es apreciada.
Puede ser que el problema esté en DefaultMessageTimeToLive de las propiedades de TimeToLive.
NamespaceManager.CreateSubscription(
new SubscriptionDescription(TopicName, SubscriptionName)
{
LockDuration = TimeSpan.FromMinutes(5),
DefaultMessageTimeToLive = TimeSpan.FromDays(7),
EnableDeadLetteringOnMessageExpiration = true
});
En primer lugar, debe saber algunas cosas importantes: 1) Los flujos de trabajo son procesos de larga ejecución destinados a ser pausados y recuperables posteriormente. 2) La forma en que se despiertan y restauran los flujos de trabajo es Marcadores. 3) Por lo general, a las personas les gusta que sus flujos de trabajo sean persistentes mientras se pausan también. (Si no le importa la persistencia, ¿por qué está usando WF de todos modos, solo para las herramientas de diseño visual?)
Problema lógico:
Si todos sus flujos de trabajo y sus actividades se mantienen y se suspenden, entonces ni siquiera se carga su código de actividad, entonces, ¿quién está escuchando? Respuesta: algo más , no una actividad, tiene que ser lo que está escuchando en la cola ServiceBus y asumir la responsabilidad de reanudar marcadores para activar sus flujos de trabajo.
Ese algo es el flujo de trabajo ''Host'', o alguna extensión de él. Aquí hay un par de publicaciones en el blog sobre cómo puede personalizar un host para escuchar los mensajes [desde un botón de la GUI] y activar una actividad de flujo de trabajo.
http://blogs.msdn.com/b/tilovell/archive/2011/02/26/wf4-workflow-4-0-hosting-extensions-redux.aspx
http://blogs.msdn.com/b/tilovell/archive/2010/06/08/wf4-workflow-4-0-hosting-extensions.aspx
Lo que podría hacer es tomar este código y adaptarlo para escuchar en una cola ServiceBus en lugar de un botón GUI, y activar su propia actividad ReceiveFromServiceBus, que es análoga a PageActivity: tenga en cuenta que debe escribir una NativeActivity para poder trabajar con marcadores de página correctamente
Todo bastante engorroso ... pero creo que es la forma "correcta" de hacerlo con WF .