threads sistemas resueltos programas operativos multihilos hilos fuente ejemplos creacion control codigo multithreading design-patterns concurrency threadpool

multithreading - sistemas - Garantizar el orden de ejecución de la tarea en el hilo de trabajo



hilos sistemas operativos (16)

¿Cómo se aseguraría de que esas tareas estén ordenadas?

push task1 push task2 push task346 push task5

En respuesta a la edición:

push task1 push task27 ** push task3468 * push task5 push task9

He estado leyendo sobre el patrón de grupo de hilos y no puedo encontrar la solución habitual para el siguiente problema.

A veces quiero que las tareas se ejecuten en serie. Por ejemplo, leo trozos de texto de un archivo y, por alguna razón, necesito que los fragmentos se procesen en ese orden. Entonces, básicamente, quiero eliminar la concurrencia para algunas de las tareas .

Considere este escenario en el que las tareas con * deben procesarse en el orden en que se insertaron. Las otras tareas se pueden procesar en cualquier orden.

push task1 push task2 push task3 * push task4 * push task5 push task6 * .... and so on

En el contexto de un grupo de subprocesos, sin esta restricción, una única cola de tareas pendientes funciona bien pero claramente aquí no.

Pensé en hacer que algunos de los hilos funcionen en una cola específica para hilos y los demás en la cola "global". Luego, para ejecutar algunas de las tareas en serie, simplemente tengo que empujarlas a una cola donde se ve un solo hilo. Suena un poco torpe.

Entonces, la verdadera pregunta en esta larga historia: ¿cómo resolverías esto? ¿Cómo se aseguraría de que esas tareas estén ordenadas ?

EDITAR

Como problema más general, supongamos que el escenario anterior se convierte en

push task1 push task2 ** push task3 * push task4 * push task5 push task6 * push task7 ** push task8 * push task9 .... and so on

Lo que quiero decir es que las tareas dentro de un grupo deberían ejecutarse secuencialmente, pero los grupos mismos pueden mezclarse. Entonces puedes tener 3-2-5-4-7 por ejemplo.

Otra cosa a tener en cuenta es que no tengo acceso a todas las tareas en un grupo por adelantado (y no puedo esperar a que lleguen todas antes de comenzar el grupo).

Gracias por tu tiempo.


Opción 1 - La compleja

Debido a que tiene trabajos secuenciales, puede reunir esos trabajos en una cadena y dejar que los trabajos se vuelvan a enviar al grupo de subprocesos una vez que finalicen. Supongamos que tenemos una lista de trabajos:

[Task1, ..., Task6]

como en tu ejemplo. Tenemos una dependencia secuencial, tal que [Task3, Task4, Task6] es una cadena de dependencia. Ahora hacemos un trabajo (pseudocódigo de Erlang):

Task4Job = fun() -> Task4(), % Exec the Task4 job push_job(Task6Job) end. Task3Job = fun() -> Task3(), % Execute the Task3 Job push_job(Task4Job) end. push_job(Task3Job).

Es decir, modificamos el trabajo Task3 envolviéndolo en un trabajo que, como continuación, empuja el siguiente trabajo de la cola al grupo de subprocesos. Existen fuertes similitudes con un estilo de paso de continuación general que también se ve en sistemas como Node.js o Pythons Twisted framework.

Al generalizarse, se crea un sistema en el que se pueden definir cadenas de trabajo que pueden defer el trabajo adicional y volver a enviar el trabajo posterior.

Opción 2 - La simple

¿Por qué nos molestamos en dividir los trabajos? Es decir, dado que son dependientes de forma secuencial, ejecutarlos todos en el mismo subproceso no será más rápido o más lento que tomar esa cadena y distribuirla en varios subprocesos. Asumiendo una carga de trabajo "suficiente", cualquier hilo siempre tendrá trabajo de todos modos, por lo que agrupar los trabajos es probablemente más fácil:

Task = fun() -> Task3(), Task4(), Task6() % Just build a new job, executing them in the order desired end, push_job(Task).

Es bastante fácil hacer cosas como esta si tienes funciones como ciudadanos de primera clase para que puedas construirlos en tu idioma a tu antojo, como lo haces en, por ejemplo, cualquier lenguaje de programación funcional, Python, bloques de Ruby, etc. .

No me gusta particularmente la idea de crear una cola, o una pila de continuación, como en la "Opción 1", y definitivamente iría con la segunda opción. En Erlang, incluso tenemos programas llamados jobs escritos por Erlang Solutions y lanzados como de código abierto. jobs está diseñado para ejecutar y cargar las ejecuciones de trabajo de regulación de este tipo. Probablemente combinaría la opción 2 con trabajos si tuviera que resolver este problema.


Algo como lo siguiente permitirá que las tareas seriales y paralelas se pongan en cola, donde las tareas seriales se ejecutarán una después de la otra, y las tareas paralelas se ejecutarán en cualquier orden, pero en paralelo. Esto le da la capacidad de serializar tareas cuando sea necesario, también tiene tareas paralelas, pero haga esto cuando se reciban tareas, es decir, no necesita saber toda la secuencia por adelantado, el orden de ejecución se mantiene dinámicamente.

internal class TaskQueue { private readonly object _syncObj = new object(); private readonly Queue<QTask> _tasks = new Queue<QTask>(); private int _runningTaskCount; public void Queue(bool isParallel, Action task) { lock (_syncObj) { _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task }); } ProcessTaskQueue(); } public int Count { get{lock (_syncObj){return _tasks.Count;}} } private void ProcessTaskQueue() { lock (_syncObj) { if (_runningTaskCount != 0) return; while (_tasks.Count > 0 && _tasks.Peek().IsParallel) { QTask parallelTask = _tasks.Dequeue(); QueueUserWorkItem(parallelTask); } if (_tasks.Count > 0 && _runningTaskCount == 0) { QTask serialTask = _tasks.Dequeue(); QueueUserWorkItem(serialTask); } } } private void QueueUserWorkItem(QTask qTask) { Action completionTask = () => { qTask.Task(); OnTaskCompleted(); }; _runningTaskCount++; ThreadPool.QueueUserWorkItem(_ => completionTask()); } private void OnTaskCompleted() { lock (_syncObj) { if (--_runningTaskCount == 0) { ProcessTaskQueue(); } } } private class QTask { public Action Task { get; set; } public bool IsParallel { get; set; } } }

Actualizar

Para manejar grupos de tareas con mezclas de tareas en serie y paralelas, un GroupedTaskQueue puede administrar un TaskQueue para cada grupo. Nuevamente, no es necesario que conozca los grupos por adelantado, todo se gestiona de forma dinámica a medida que se reciben las tareas.

internal class GroupedTaskQueue { private readonly object _syncObj = new object(); private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>(); private readonly string _defaultGroup = Guid.NewGuid().ToString(); public void Queue(bool isParallel, Action task) { Queue(_defaultGroup, isParallel, task); } public void Queue(string group, bool isParallel, Action task) { TaskQueue queue; lock (_syncObj) { if (!_queues.TryGetValue(group, out queue)) { queue = new TaskQueue(); _queues.Add(group, queue); } } Action completionTask = () => { task(); OnTaskCompleted(group, queue); }; queue.Queue(isParallel, completionTask); } private void OnTaskCompleted(string group, TaskQueue queue) { lock (_syncObj) { if (queue.Count == 0) { _queues.Remove(group); } } } }


Básicamente, hay una serie de tareas pendientes. Algunas de las tareas solo se pueden realizar cuando una o más tareas pendientes han terminado de ejecutarse.

Las tareas pendientes se pueden modelar en un gráfico de dependencia:

  • "tarea 1 -> tarea2" significa que "la tarea 2 se puede ejecutar solo después de que finalice la tarea 1". las flechas apuntan en la dirección del orden de ejecución.
  • El grado de una tarea (el número de tareas que lo señalan) determina si la tarea está lista para su ejecución. Si el grado de indegrado es 0, se puede ejecutar.
  • a veces, una tarea debe esperar a que finalicen múltiples tareas; el grado de indegree es entonces> 1.
  • si una tarea no tiene que esperar a que otras tareas terminen (su grado es cero), puede enviarse al grupo de subprocesos con subprocesos de trabajo, o la cola con tareas que esperan ser recogidas por un subproceso de trabajo. Usted sabe que la tarea enviada no causará un punto muerto, porque la tarea no está esperando nada. Como optimización, puede usar una cola de prioridad, por ejemplo, en qué tareas de las que dependen más tareas del gráfico de dependencia se ejecutarán primero. Esto tampoco puede provocar un punto muerto, porque todas las tareas en el grupo de subprocesos se pueden ejecutar. Sin embargo, puede provocar el hambre.
  • Si una tarea finaliza la ejecución, puede eliminarse del gráfico de dependencia, posiblemente reduciendo el grado de otras tareas, que a su vez puede enviarse al grupo de hilos de trabajo.

Por lo tanto, hay (al menos) un subproceso utilizado para agregar / eliminar tareas pendientes, y hay un grupo de subprocesos de subprocesos de trabajo.

Cuando se agrega una tarea al gráfico de dependencia, debe verificar:

  • cómo se conecta la tarea en el gráfico de dependencia: ¿qué tareas debe esperar para finalizar y qué tareas deben esperar para finalizar? Dibuja las conexiones desde y hacia la nueva tarea en consecuencia.
  • una vez que se establecen las conexiones: ¿las nuevas conexiones causaron algún ciclo en el gráfico de dependencia? Si es así, hay una situación de punto muerto.

Rendimiento :

  • este patrón es más lento que la ejecución secuencial si la ejecución paralela de hecho no es posible en la mayoría de los casos, porque de todos modos se necesita una administración adicional para hacer todo casi secuencialmente.
  • este patrón es rápido si se pueden realizar muchas tareas simultáneamente en la práctica.

Suposiciones

Como puede haber leído entre líneas, debe diseñar las tareas para que no interfieran con otras tareas. Además, debe haber una forma de determinar la prioridad de las tareas. La prioridad de la tarea debe incluir los datos manejados por cada tarea. Dos tareas pueden no alterar el mismo objeto simultáneamente; una de las tareas debe tener prioridad sobre la otra en su lugar, o las operaciones realizadas en el objeto deben ser seguras para subprocesos.


Como solo necesita esperar a que se complete una sola tarea antes de comenzar la tarea dependiente, puede hacerlo fácilmente si puede programar la tarea dependiente en la primera tarea. Entonces en su segundo ejemplo: al final de la tarea 2, programe la tarea 7 y al final de la tarea 3, programe la tarea 4 y así sucesivamente para 4-> 6 y 6-> 8.

Al principio, solo programe tareas 1,2,5,9 ... y el resto debería seguir.

Un problema aún más general es cuando tiene que esperar varias tareas antes de que pueda comenzar una tarea dependiente. Manejar eso eficientemente es un ejercicio no trivial.


Creo que el grupo de subprocesos se puede usar de manera efectiva en esta situación. La idea es usar un objeto separado para cada grupo de tareas dependientes. Agrega tareas a su cola con o sin objeto. Utiliza el mismo objeto de strand con tareas dependientes. Su programador verifica si la siguiente tarea tiene un strand y si este strand está bloqueado. Si no, bloquea este strand y ejecuta esta tarea. Si strand ya está bloqueada, mantenga esta tarea en la cola hasta el próximo evento de programación. Cuando la tarea está hecha, desbloquee su strand .

Como resultado, necesita una cola única, no necesita ningún hilo adicional, no hay grupos complicados, etc. El objeto strand puede ser muy simple con dos métodos de lock y unlock .

A menudo encuentro el mismo problema de diseño, por ejemplo, para un servidor de red asíncrono que maneja múltiples sesiones simultáneas. Las sesiones son independientes (esto las asigna a sus tareas independientes y grupos de tareas dependientes) cuando las tareas dentro de las sesiones son dependientes (esto asigna tareas internas de la sesión a sus tareas dependientes dentro de un grupo). Usando el enfoque descrito evito la sincronización explícita dentro de la sesión por completo. Cada sesión tiene un objeto propio.

Y, lo que es más, uso la implementación existente (excelente) de esta idea: Biblioteca Boost Asio (C ++). Acabo de usar su strand términos. La implementación es elegante: envuelvo mis tareas asíncronas en el objeto correspondiente antes de programarlas.


Creo que estás mezclando conceptos. Threadpool está bien cuando quieres distribuir algo de trabajo entre hilos, pero si comienzas a mezclar dependencias entre hilos, entonces no es una buena idea.

Mi consejo, simplemente no uses el threadpool para esas tareas. Simplemente cree un hilo dedicado y mantenga una cola simple de elementos secuenciales que debe procesar solo ese hilo. Luego puede seguir impulsando tareas al grupo de subprocesos cuando no tiene un requisito secuencial y usar el subproceso dedicado cuando lo necesita.

Una aclaración: utilizando el sentido común, una cola de tareas seriales se ejecutará con un único hilo procesando cada tarea una tras otra :)


Esto es posible, bueno, por lo que entiendo su escenario. Básicamente, lo que necesita es hacer algo inteligente para coordinar sus tareas en el hilo principal. La API de Java es su necesidad ExecutorCompletionService y se puede Callable

Primero, implementa tu tarea invocable:

public interface MyAsyncTask extends Callable<MyAsyncTask> { // tells if I am a normal or dependent task private boolean isDependent; public MyAsyncTask call() { // do your job here. return this; } }

Luego, en su hilo principal, use CompletionService para coordinar la ejecución de la tarea dependiente (es decir, un mecanismo de espera):

ExecutorCompletionService<MyAsyncTask> completionExecutor = new ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5)); Future<MyAsyncTask> dependentFutureTask = null; for (MyAsyncTask task : tasks) { if (task.isNormal()) { // if it is a normal task, submit it immediately. completionExecutor.submit(task); } else { if (dependentFutureTask == null) { // submit the first dependent task, get a reference // of this dependent task for later use. dependentFutureTask = completionExecutor.submit(task); } else { // wait for last one completed, before submit a new one. dependentFutureTask.get(); dependentFutureTask = completionExecutor.submit(task); } } }

Al hacer esto, usa un único ejecutor (tamaño de subproceso 5) ejecuta tareas normales y dependientes, la tarea normal se ejecuta inmediatamente tan pronto como se envía, las tareas dependientes se ejecutan una por una (se realizan espera en el hilo principal al llamar a get () en Futuro antes de enviar una nueva tarea dependiente), por lo que en cualquier punto del tiempo, siempre tendrá varias tareas normales y una sola tarea dependiente (si existe) ejecutándose en un solo grupo de subprocesos.

Esto es solo una ventaja, al usar ExecutorCompletionService, FutureTask y Semaphore, puede implementar un escenario de coordinación de subprocesos más complejo.


Ha habido muchas respuestas, y obviamente una ha sido aceptada. Pero, ¿por qué no usar las continuaciones?

Si tiene una condición de "serie" conocida, cuando encola la primera tarea con esta condición, mantenga presionada la Tarea; y para tareas adicionales invoque Task.ContinueWith ().

public class PoolsTasks { private readonly object syncLock = new object(); private Task serialTask = Task.CompletedTask; private bool isSerialTask(Action task) { // However you determine what is serial ... return true; } public void RunMyTask(Action myTask) { if (isSerialTask(myTask)) { lock (syncLock) serialTask = serialTask.ContinueWith(_ => myTask()); } else Task.Run(myTask); } }


Hay un marco de Java específicamente para este propósito llamado dexecutor (descargo de responsabilidad: yo soy el propietario)

DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor(); executor.addDependency("task1", "task2"); executor.addDependency("task4", "task6"); executor.addDependency("task6", "task8"); executor.addIndependent("task3"); executor.addIndependent("task5"); executor.addIndependent("task7"); executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);

task1, task3, task5, task7 se ejecuta en paralelo (dependiendo del tamaño del grupo de subprocesos), una vez que task1 finaliza, task2 se ejecuta, una vez que task2 termina task4 se ejecuta, una vez que task4 termina task6 ejecuta y finalmente una vez que task6 termina task8 se ejecuta.


Las respuestas que sugieren no usar un grupo de hilos son como codificar el conocimiento de las dependencias de tareas / orden de ejecución. En cambio, crearía una CompositeTask que gestiona la dependencia de inicio / final entre dos tareas. Al encapsular la dependencia detrás de la interfaz de la tarea, todas las tareas se pueden tratar uniformemente y agregar al grupo. Esto oculta los detalles de ejecución y permite que las dependencias de tareas cambien sin afectar si se usa o no un grupo de subprocesos.

La pregunta no especifica un idioma: usaré Java, que espero sea legible para la mayoría.

class CompositeTask implements Task { Task firstTask; Task secondTask; public void run() { firstTask.run(); secondTask.run(); } }

Esto ejecuta las tareas secuencialmente y en el mismo hilo. Puede encadenar muchas CompositeTask juntas para crear una secuencia de tantas tareas secuenciales como sea necesario.

La desventaja aquí es que esto ata el hilo por la duración de todas las tareas ejecutándose secuencialmente. Es posible que tenga otras tareas que preferiría ejecutar entre la primera y la segunda tarea. Por lo tanto, en lugar de ejecutar la segunda tarea directamente, tenga la ejecución del cronograma de tareas compuestas de la segunda tarea:

class CompositeTask implements Runnable { Task firstTask; Task secondTask; ExecutorService executor; public void run() { firstTask.run(); executor.submit(secondTask); } }

Esto garantiza que la segunda tarea no se ejecute hasta después de que se complete la primera tarea y también permite que el grupo ejecute otras tareas (posiblemente más urgentes). Tenga en cuenta que la primera y la segunda tarea pueden ejecutarse en subprocesos separados, por lo que, aunque no se ejecutan simultáneamente, cualquier información compartida utilizada por las tareas debe hacerse visible para otros subprocesos (por ejemplo, haciendo que las variables sean volatile ).

Este es un enfoque simple, pero potente y flexible, y permite a las tareas mismas definir las restricciones de ejecución, en lugar de hacerlo mediante el uso de diferentes grupos de subprocesos.


Los grupos de subprocesos son buenos para casos en los que el orden relativo de las tareas no es importante, siempre que se completen. En particular, debe estar bien que todos se hagan en paralelo.

Si sus tareas deben realizarse en un orden específico, entonces no son adecuadas para el paralelismo, por lo que un grupo de subprocesos no es apropiado.

Si desea mover estas tareas en serie fuera del hilo principal, entonces una sola cadena de fondo con una cola de tareas sería apropiada para esas tareas. Puede continuar utilizando un grupo de subprocesos para las tareas restantes que son adecuadas para el paralelismo.

Sí, significa que debe decidir dónde enviar la tarea dependiendo de si es una tarea en orden o una tarea "puede ser paralelizada", pero esto no es gran cosa.

Si tiene grupos que deben ser serializados, pero que pueden ejecutarse en paralelo con otras tareas, entonces tiene múltiples opciones:

  1. Cree una única tarea para cada grupo, que haga las tareas de grupo relevantes en orden y publique esta tarea en el grupo de subprocesos.
  2. Haga que cada tarea de un grupo espere explícitamente la tarea anterior en el grupo y publíquelas en el grupo de subprocesos. Esto requiere que su grupo de subprocesos pueda manejar el caso donde un subproceso está esperando una tarea aún no programada sin interbloqueo.
  3. Tenga un hilo dedicado para cada grupo y publique las tareas del grupo en la cola de mensajes adecuada.

Para hacer lo que desea hacer con un threadpool, es posible que deba crear algún tipo de planificador.

Algo como eso:

TaskQueue -> Programador -> Cola -> ThreadPool

Scheduler se ejecuta en su propio hilo, manteniendo un registro de las dependencias entre los trabajos. Cuando un trabajo está listo para completarse, el planificador simplemente lo coloca en la cola del grupo de subprocesos.

Es posible que ThreadPool tenga que enviar señales al Programador para indicar cuándo se realiza un trabajo, de modo que el planificador pueda colocar trabajos en función de ese trabajo en la cola.

En su caso, las dependencias probablemente podrían almacenarse en una lista vinculada.

Supongamos que tiene las siguientes dependencias: 3 -> 4 -> 6 -> 8

Job 3 se está ejecutando en el grupo de subprocesos, aún no tiene ideas de que el trabajo 8 existe.

Job 3 termina. Quita el 3 de la lista vinculada, coloca el trabajo 4 en la cola del threadpool.

Job 8 llega. Lo pones al final de la lista enlazada.

Las únicas construcciones que tienen que estar totalmente sincronizadas son las colas antes y después del planificador.


Si entiendo el problema correctamente, los ejecutores de jdk no tienen esta capacidad, pero es fácil de implementar. Básicamente necesitas

  • un grupo de subprocesos de trabajo, cada uno de los cuales tiene una cola dedicada
  • algo de abstracción sobre las colas a las que ofreces trabajo (cf el ExecutorService )
  • Algún algoritmo que deterministically selecciona una cola específica para cada trabajo
  • cada pieza de trabajo recibe ofertas en la fila correcta y, por lo tanto, se procesa en el orden correcto

La diferencia para los ejecutores jdk es que tienen 1 cola con n subprocesos pero quieres n colas ym subprocesos (donde n puede o no ser igual a m)

* edite después de leer que cada tarea tiene una clave *

En un poco más de detalle

  • escriba un código que transforma una clave en un índice (un int) en un rango determinado (0-n donde n es el número de subprocesos que desea), esto podría ser tan simple como key.hashCode() % n o podría ser Algún mapeo estático de valores clave conocidos a hilos o lo que quieras
  • Al inicio
    • cree n colas, colóquelas en una estructura indexada (matriz, lista lo que sea)
    • inicie n subprocesos, cada subproceso solo hace una toma de bloqueo de la cola
    • cuando recibe algún trabajo, sabe cómo ejecutar un trabajo específico para esa tarea / evento (obviamente, puede tener una asignación de tareas a acciones si tiene eventos heterogéneos)
  • almacenar esto detrás de una fachada que acepta los elementos de trabajo
  • cuando llega una tarea, entrégala a la fachada
    • la fachada encuentra la cola correcta para la tarea basada en la clave, la ofrece a esa cola

es más fácil agregar hilos de reinicio automáticos a este esquema, solo necesita que el hilo de trabajo se registre con algún administrador para indicar "Tengo esta cola" y luego algo de limpieza alrededor de eso + detección de errores en el hilo (lo que significa que anula el registro de la propiedad de esa cola devolviendo la cola a un grupo libre de colas que es un activador para iniciar un nuevo subproceso)


Tienes dos tipos diferentes de tareas. Mezclarlos en una sola cola es bastante extraño. En lugar de tener una cola, tiene dos. En aras de la simplicidad, incluso podría usar un ThreadPoolExecutor para ambos. Para las tareas en serie solo dale un tamaño fijo de 1, para las tareas que se pueden ejecutar al mismo tiempo dale más. No veo por qué eso sería torpe en absoluto. Mantenlo simple y estúpido. Usted tiene dos tareas diferentes, así que trátelas en consecuencia.


Use dos objetos activos . En dos palabras: el patrón de objeto activo consta de la cola de prioridad y 1 o muchos hilos de trabajo que pueden obtener tareas de la cola y procesarla.

Por lo tanto, utilice un objeto activo con un hilo de trabajo: todas las tareas que serían lugares para cola se procesarían secuencialmente. Use el segundo objeto activo con el número de hilo activo más que 1. En este caso, los hilos de trabajo obtendrán y procesarán las tareas de la cola en cualquier orden.

Suerte.