thread start acquire python multithreading testing timer race-condition

start - ¿Cómo puedo reproducir las condiciones de carrera en este código python de manera confiable?



start() python (4)

En general, esta no es una solución viable. Puede reproducir esta condición de carrera utilizando depurador (establezca puntos de interrupción en algunas ubicaciones del código, que, cuando llegue a uno de los puntos de interrupción, congele el subproceso y ejecute el código hasta que llegue a otro punto de interrupción, luego congele este subproceso y descongele el primero hilo, puede intercalar la ejecución de hilos de cualquier manera usando esta técnica).

El problema es que mientras más hilos y código tenga, más formas de intercalar efectos secundarios tendrán. En realidad, crecerá exponencialmente. No hay una solución viable para probarlo en general. Es posible solo en algunos casos simples.

La solución a este problema es bien conocida. Escriba código que tenga en cuenta sus efectos secundarios, controle los efectos secundarios con primitivas de sincronización como bloqueos, semáforos o colas, o use datos inmutables si es posible.

Tal vez de manera más práctica es usar controles de tiempo de ejecución para forzar el orden correcto de las llamadas. Por ejemplo (pseudocódigo):

class RacyObject: def __init__(self): self.__cnt = 0 ... def isReadyAndLocked(self): acquire_object_lock if self.__cnt % 2 != 0: # another thread is ready to start the Job return False if self.__is_ready: self.__cnt += 1 return True # Job is in progress or doesn''t ready yet return False release_object_lock def doJobAndRelease(self): acquire_object_lock if self.__cnt % 2 != 1: raise RaceConditionDetected("Incorrect order") self.__cnt += 1 do_job() release_object_lock

Este código arrojará una excepción si no comprueba isReadyAndLock antes de llamar a doJobAndRelease . Esto se puede probar fácilmente usando solo un hilo.

obj = RacyObject() ... # correct usage if obj.isReadyAndLocked() obj.doJobAndRelease()

Contexto

Recientemente publiqué una clase de temporizador para revisar en Code Review . Tenía la intuición de que había errores de concurrencia, ya que una vez había visto que fallaba una prueba de una unidad, pero no pude reproducir la falla. De ahí mi revisión de publicación a código.

Obtuve algunos buenos comentarios destacando varias condiciones de carrera en el código. (Pensé) Entendí el problema y la solución, pero antes de hacer ninguna corrección, quería exponer los errores con una prueba unitaria. Cuando lo intenté, me di cuenta de que era difícil. Varias respuestas de intercambio de pila sugirieron que tendría que controlar la ejecución de los hilos para exponer el error (s) y cualquier sincronización artificial no sería necesariamente portátil a una máquina diferente. Esto parecía una gran cantidad de complejidad accidental más allá del problema que estaba tratando de resolver.

En cambio, traté de usar la mejor herramienta de análisis estático (SA) para python , PyLint, para ver si seleccionaba alguno de los errores, pero no podía. ¿Por qué un humano podría encontrar los errores a través de la revisión del código (esencialmente SA), pero una herramienta de SA no podría?

Temeroso de intentar que Valgrind trabaje con Python (que sonaba como el afeitado de yak), decidí hacer un buen trabajo para arreglar los errores sin reproducirlos primero. Ahora estoy en un aprieto.

Aquí está el código ahora.

from threading import Timer, Lock from time import time class NotRunningError(Exception): pass class AlreadyRunningError(Exception): pass class KitchenTimer(object): '''''' Loosely models a clockwork kitchen timer with the following differences: You can start the timer with arbitrary duration (e.g. 1.2 seconds). The timer calls back a given function when time''s up. Querying the time remaining has 0.1 second accuracy. '''''' PRECISION_NUM_DECIMAL_PLACES = 1 RUNNING = "RUNNING" STOPPED = "STOPPED" TIMEUP = "TIMEUP" def __init__(self): self._stateLock = Lock() with self._stateLock: self._state = self.STOPPED self._timeRemaining = 0 def start(self, duration=1, whenTimeup=None): '''''' Starts the timer to count down from the given duration and call whenTimeup when time''s up. '''''' with self._stateLock: if self.isRunning(): raise AlreadyRunningError else: self._state = self.RUNNING self.duration = duration self._userWhenTimeup = whenTimeup self._startTime = time() self._timer = Timer(duration, self._whenTimeup) self._timer.start() def stop(self): '''''' Stops the timer, preventing whenTimeup callback. '''''' with self._stateLock: if self.isRunning(): self._timer.cancel() self._state = self.STOPPED self._timeRemaining = self.duration - self._elapsedTime() else: raise NotRunningError() def isRunning(self): return self._state == self.RUNNING def isStopped(self): return self._state == self.STOPPED def isTimeup(self): return self._state == self.TIMEUP @property def timeRemaining(self): if self.isRunning(): self._timeRemaining = self.duration - self._elapsedTime() return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES) def _whenTimeup(self): with self._stateLock: self._state = self.TIMEUP self._timeRemaining = 0 if callable(self._userWhenTimeup): self._userWhenTimeup() def _elapsedTime(self): return time() - self._startTime

Pregunta

En el contexto de este ejemplo de código, ¿cómo puedo exponer las condiciones de la carrera, corregirlas y demostrar que están corregidas?

Puntos extra

puntos extra para un marco de prueba adecuado para otras implementaciones y problemas en lugar de específicamente para este código.

Para llevar

Mi conclusión es que la solución técnica para reproducir las condiciones de carrera identificadas es controlar el sincronismo de dos hilos para garantizar que se ejecutan en el orden que expondrá un error. El punto importante aquí es que ya están identificadas como condiciones de carrera. La mejor manera que he encontrado para identificar las condiciones de carrera es poner su código para la revisión del código y alentar a más personas expertas a analizarlo.


La solución más común para probar el código de subproceso (no) seguro es comenzar una gran cantidad de subprocesos y esperar lo mejor. El problema que yo, y puedo imaginar otros, con esto es que depende del azar y hace que las pruebas sean "pesadas".

Cuando me encontré con esto hace un tiempo, quería buscar la precisión en lugar de la fuerza bruta. El resultado es una pieza de código de prueba para causar condiciones de carrera al permitir que los hilos corran en la misma dirección.

Ejemplo de código de raza

spam = [] def set_spam(): spam[:] = foo() use(spam)

Si se llama a set_spam desde varios hilos, existe una condición de carrera entre la modificación y el uso de spam . Tratemos de reproducirlo consistentemente.

Cómo causar condiciones de carrera

class TriggeredThread(threading.Thread): def __init__(self, sequence=None, *args, **kwargs): self.sequence = sequence self.lock = threading.Condition() self.event = threading.Event() threading.Thread.__init__(self, *args, **kwargs) def __enter__(self): self.lock.acquire() while not self.event.is_set(): self.lock.wait() self.event.clear() def __exit__(self, *args): self.lock.release() if self.sequence: next(self.sequence).trigger() def trigger(self): with self.lock: self.event.set() self.lock.notify()

Entonces para demostrar el uso de este hilo:

spam = [] # Use a list to share values across threads. results = [] # Register the results. def set_spam(): thread = threading.current_thread() with thread: # Acquires the lock. # Set ''spam'' to thread name spam[:] = [thread.name] # Thread ''releases'' the lock upon exiting the context. # The next thread is triggered and this thread waits for a trigger. with thread: # Since each thread overwrites the content of the ''spam'' # list, this should only result in True for the last thread. results.append(spam == [thread.name]) threads = [ TriggeredThread(name=''a'', target=set_spam), TriggeredThread(name=''b'', target=set_spam), TriggeredThread(name=''c'', target=set_spam)] # Create a shifted sequence of threads and share it among the threads. thread_sequence = itertools.cycle(threads[1:] + threads[:1]) for thread in threads: thread.sequence = thread_sequence # Start each thread [thread.start() for thread in threads] # Trigger first thread. # That thread will trigger the next thread, and so on. threads[0].trigger() # Wait for each thread to finish. [thread.join() for thread in threads] # The last thread ''has won the race'' overwriting the value # for ''spam'', thus [False, False, True]. # If set_spam were thread-safe, all results would be true. assert results == [False, False, True], "race condition triggered" assert results == [True, True, True], "code is thread-safe"

Creo que ya expliqué lo suficiente sobre esta construcción para que pueda implementarla para su propia situación. Creo que esto encaja muy bien en la sección de ''puntos extra'':

puntos extra para un marco de prueba adecuado para otras implementaciones y problemas en lugar de específicamente para este código.

Resolviendo condiciones de carrera

Variables compartidas

Cada problema de enhebrado se resuelve de forma específica. En el ejemplo anterior, causé una condición de carrera al compartir un valor entre subprocesos. Problemas similares pueden ocurrir cuando se usan variables globales, como un atributo de módulo. La clave para resolver estos problemas puede ser utilizar un almacenamiento local de subprocesos:

# The thread local storage is a global. # This may seem weird at first, but it isn''t actually shared among threads. data = threading.local() data.spam = [] # This list only exists in this thread. results = [] # Results *are* shared though. def set_spam(): thread = threading.current_thread() # ''get'' or set the ''spam'' list. This actually creates a new list. # If the list was shared among threads this would cause a race-condition. data.spam = getattr(data, ''spam'', []) with thread: data.spam[:] = [thread.name] with thread: results.append(data.spam == [thread.name]) # Start the threads as in the example above. assert all(results) # All results should be True.

Lecturas concurrentes

Un problema común de enhebrado es el problema de múltiples hilos que leen y / o escriben a un titular de datos al mismo tiempo. Este problema se resuelve mediante la implementación de un bloqueo de lectura y escritura. La implementación real de un bloqueo de lectura / escritura puede diferir. Puede elegir un bloqueo de lectura previa, un bloqueo de escritura previa o simplemente al azar.

Estoy seguro de que hay ejemplos que describen tales técnicas de bloqueo. Puedo escribir un ejemplo más adelante ya que esta es una respuesta bastante larga. ;-)

Notas

Eche un vistazo a la documentación del módulo de subprocesamiento y experimente un poco. Como cada problema de subprocesamiento es diferente, se aplican diferentes soluciones.

Mientras que en el tema de enhebrar, eche un vistazo a Python GIL (Global Interpreter Lock). Es importante tener en cuenta que el enhebrado puede no ser el mejor enfoque para optimizar el rendimiento (pero este no es su objetivo). Encontré esta presentación bastante buena: https://www.youtube.com/watch?v=zEaosS1U5qY


Puedes probarlo usando muchos hilos:

import sys, random, thread def timeup(): sys.stdout.write("Timer:: Up %f" % time()) def trdfunc(kt, tid): while True : sleep(1) if not kt.isRunning(): if kt.start(1, timeup): sys.stdout.write("[%d]: started/n" % tid) else: if random.random() < 0.1: kt.stop() sys.stdout.write("[%d]: stopped/n" % tid) sys.stdout.write("[%d] remains %f/n" % ( tid, kt.timeRemaining)) kt = KitchenTimer() kt.start(1, timeup) for i in range(1, 100): thread.start_new_thread ( trdfunc, (kt, i) ) trdfunc(kt, 0)

Un par de problemas que veo:

  • Cuando un hilo ve el temporizador como no ejecutado e intenta iniciarlo, el código generalmente genera una excepción debido al cambio de contexto entre la prueba y el inicio. Creo que elevar una excepción es demasiado. O puede tener una función testAndStart atómica

  • Un problema similar ocurre con parada. Puede implementar una función testAndStop.

  • Incluso este código de la función timeRemaining :

    if self.isRunning(): self._timeRemaining = self.duration - self._elapsedTime()

    Necesita algún tipo de atomicidad, quizás necesites agarrar un candado antes de probar.

Si planea compartir esta clase entre hilos, debe abordar estos problemas.


Tradicionalmente, forzar las condiciones de carrera en código multiproceso se hace con semáforos, por lo que puede forzar a un subproceso a esperar hasta que otro subproceso haya alcanzado alguna condición de borde antes de continuar.

Por ejemplo, su objeto tiene algún código para verificar que no se invoca el start si el objeto ya se está ejecutando. Podría forzar esta condición para asegurarse de que se comporte como se esperaba haciendo algo como esto:

  • comenzando un KitchenTimer
  • tener el bloque de temporizador en un semáforo mientras está en estado de ejecución
  • iniciar el mismo temporizador en otro hilo
  • atrapando AlreadyRunningError

Para hacer algo de esto, puede necesitar extender la clase KitchenTimer. Las pruebas de unidades formales a menudo usarán objetos simulados que se definen para bloquear en momentos críticos. Los objetos falsos son un tema más grande de lo que puedo abordar aquí, pero buscar en Google "objeto simulado de Python" mostrará una gran cantidad de documentación y muchas implementaciones para elegir.

Aquí hay una manera en que puedes forzar tu código para lanzar AlreadyRunningError :

import threading class TestKitchenTimer(KitchenTimer): _runningLock = threading.Condition() def start(self, duration=1, whenTimeUp=None): KitchenTimer.start(self, duration, whenTimeUp) with self._runningLock: print "waiting on _runningLock" self._runningLock.wait() def resume(self): with self._runningLock: self._runningLock.notify() timer = TestKitchenTimer() # Start the timer in a subthread. This thread will block as soon as # it is started. thread_1 = threading.Thread(target = timer.start, args = (10, None)) thread_1.start() # Attempt to start the timer in a second thread, causing it to throw # an AlreadyRunningError. try: thread_2 = threading.Thread(target = timer.start, args = (10, None)) thread_2.start() except AlreadyRunningError: print "AlreadyRunningError" timer.resume() timer.stop()

Leyendo el código, identifique algunas de las condiciones de contorno que desea probar, luego piense dónde debería pausar el temporizador para forzar esa condición y agregue condiciones, semáforos, eventos, etc. para hacerlo realidad. por ejemplo, ¿qué sucede si, justo cuando el temporizador ejecuta la devolución de llamada whenTimeUp, otro hilo intenta detenerlo? Puede forzar esa condición haciendo que el temporizador espere tan pronto como se ingrese _cuando Time Up:

import threading class TestKitchenTimer(KitchenTimer): _runningLock = threading.Condition() def _whenTimeup(self): with self._runningLock: self._runningLock.wait() KitchenTimer._whenTimeup(self) def resume(self): with self._runningLock: self._runningLock.notify() def TimeupCallback(): print "TimeupCallback was called" timer = TestKitchenTimer() # The timer thread will block when the timer expires, but before the callback # is invoked. thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback)) thread_1.start() sleep(2) # The timer is now blocked. In the parent thread, we stop it. timer.stop() print "timer is stopped: %r" % timer.isStopped() # Now allow the countdown thread to resume. timer.resume()

Subclase de la clase que desea probar no es una manera increíble de instrumentarlo para la prueba: tendrá que anular básicamente todos los métodos para probar las condiciones de carrera en cada uno, y en ese punto hay un buen argumento para ser hecho de que no está realmente probando el código original. En su lugar, puede encontrarlo más limpio para colocar los semáforos en el objeto KitchenTimer pero inicializado en None por defecto, y hacer que sus métodos comprueben if testRunningLock is not None: antes de adquirir o esperar el bloqueo. Entonces puedes forzar carreras con el código real que estás enviando.

Algunas lecturas en Python simulan marcos que pueden ser útiles. De hecho, no estoy seguro de que los simulacros sean útiles para probar este código: es casi completamente autónomo y no depende de muchos objetos externos. Pero los tutoriales falsos a veces abordan problemas como estos. No he usado ninguno de estos, pero la documentación sobre estos es un buen lugar para comenzar: