python - framework - tutorial django
¿Cómo leer la cámara web en un proceso separado en OSX? (2)
Estoy leyendo una cámara web en OSX, que funciona bien con este simple script:
import cv2
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms before going to the next frame
except KeyboardInterrupt:
# cleanup the camera and close any open windows
camera.release()
cv2.destroyAllWindows()
print "/n/nBye bye/n"
break
Ahora quiero leer el video en un proceso separado para el cual tengo un script que es mucho más largo y que lee correctamente el video en un proceso separado en Linux:
import numpy as np
import time
import ctypes
import argparse
from multiprocessing import Array, Value, Process
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(floating point is allowed)
"""
self._cap = cv2.VideoCapture(device)
self._delay = delay
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there''s no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def get_stream_function(self):
"""
Returns stream_function object function
"""
def stream_function(image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing(see multiprocessing.Array)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: nothing
"""
# Incorrect input array
if image.shape != self.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
image[:, :, :] = self._proper_frame(self._delay)
# Release the device
self.release()
return stream_function
def release(self):
self._cap.release()
def main():
# Add program arguments
parser = argparse.ArgumentParser(description=''Captures the video from the webcamera and /nwrites it into the output file with predefined fps.'', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(''-output'', dest="output", default="output.avi", help=''name of the output video file'')
parser.add_argument(''-log'', dest="log", default="frames.log", help=''name of the log file'')
parser.add_argument(''-fps'', dest="fps", default=25., help=''frames per second value'')
# Read the arguments if any
result = parser.parse_args()
fps = float(result.fps)
output = result.output
log = result.log
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
# Define shared variables(which are synchronised so race condition is excluded)
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value(''i'', 0)
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.terminate()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow(''frame'', frame)
cv2.waitKey(1) # Display it at least one ms before going to the next frame
time.sleep(0.1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == ''__main__'':
main()
Esto funciona bien en Linux, pero en OSX tengo problemas porque parece que no puede hacer un .read()
en el objeto creado cv2.VideoCapture(device)
(almacenado en el var self._cap
).
Después de buscar, encontré esta respuesta SO , que sugiere usar Billiard , un reemplazo para el multiprocesamiento de pitones que supuestamente tiene algunas mejoras muy útiles. Así que en la parte superior del archivo simplemente agregué la importación después de mi importación previa de multiprocesamiento (anulando de manera efectiva el multiprocessing.Process
):
from billiard import Process, forking_enable
y justo antes de la instanciación de la variable forking_enable
utilizo forking_enable
siguiente manera:
forking_enable(0) # Supposedly this is all I need for billiard to do it''s magic
video_process = Process(target=stream, args=(frame, finished))
Entonces en esta versión ( aquí en pastebin ) volví a ejecutar el archivo, lo que me da este error:
pickle.PicklingError: Can not pickle: no se encuentra como main .stream_function
Una búsqueda de ese error me llevó a una pregunta de SO con una larga lista de respuestas, de las cuales una me sugirió utilizar la serialización de eneldo lib para resolver esto. Sin embargo, esa lib debe usarse con la horquilla de multiprocesamiento de Pathos . Así que simplemente intenté cambiar mi línea de importación multiproceso
from multiprocessing import Array, Value, Process
a
from pathos.multiprocessing import Array, Value, Process
Pero ninguno de Array
, Value
and Process
parece existir en el paquete pathos.multiprocessing
.
Y desde este punto estoy totalmente perdido. Estoy buscando cosas de las que apenas tengo conocimiento suficiente, y ni siquiera sé en qué dirección necesito buscar o depurar más.
Entonces, ¿puede alguien más brillante que yo ayudarme a capturar video en un proceso separado? ¡Todos los consejos son bienvenidos!
El principal desafío con multiprocessing
es comprender el modelo de memoria en el caso de espacios de direcciones de memoria separados.
Python hace que las cosas sean aún más confusas ya que abstrae muchos de estos aspectos que ocultan varios mecanismos con pocas API de aspecto inocente.
Cuando escribes esta lógica:
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
...
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
Está pasando a la función Process
stream_function
que hace referencia a componentes internos de la clase self.get_size
( self.get_size
) pero, lo que es más importante, que no está disponible como función de nivel superior .
El proceso hijo no podrá volver a construir el objeto requerido, ya que lo que recibe es solo el nombre de una función. Intenta buscarlo en el módulo principal, de ahí el mensaje:
pickle.PicklingError: Can not pickle: no se encuentra como main.stream_function
El proceso secundario intenta resolver la función en el módulo principal como main.stream_function
y falla la búsqueda.
Mi primera sugerencia sería cambiar tu lógica para que pases al proceso hijo el método que devuelve la función stream_function
.
video_process = Process(target=cap.get_stream_function, args=(...))
Sin embargo, es posible que aún encuentre problemas ya que está compartiendo el estado entre los dos procesos.
Lo que generalmente sugiero a las personas cuando se acercan a los paradigmas de multiprocesamiento en Python es pensar en los procesos como si se ejecutaran en máquinas separadas. En estos casos, sería definitivamente obvio que su arquitectura es problemática.
Le recomendaría que separe las responsabilidades de los dos procesos, asegurándose de que un proceso (el niño) se ocupe de la captura completa del video y el otro (el padre o un tercer proceso) se ocupe del procesamiento de los marcos.
Este paradigma se conoce como el Productor y el Problema del consumidor y es muy adecuado para su sistema. El proceso de captura de video sería el productor y el otro el consumidor. Un simple multiprocessing.Pipe
o multiprocessing.Queue
aseguraría que los marcos se transfieran del productor al consumidor tan pronto como estén listos.
Agregando un ejemplo en pseudo-código ya que no conozco las API de captura de video. El punto es lidiar con toda la lógica de captura de video en el proceso del productor, abstrayéndolo del consumidor. Lo único que el consumidor necesita saber es que recibe un objeto de marco a través de una tubería.
def capture_video(writer):
"""This runs in the producer process."""
# The VideoCapture class wraps the video acquisition logic
cap = VideoCapture()
while True:
frame = cap.get_next_frame() # the method returns the next frame
writer.send(frame) # send the new frame to the consumer process
def main():
reader, writer = multiprocessing.Pipe(False)
# producer process
video_process = Process(target=capture_video, args=[writer])
video_process.start() # Launch capture process
while True:
try:
frame = reader.recv() # receive next frame from the producer
process_frame(frame)
except KeyboardInterrupt:
video_process.terminate()
break
Tenga en cuenta que no hay un estado compartido entre los procesos (no es necesario compartir ninguna matriz). La comunicación pasa por Pipes y es unidireccional haciendo que la lógica sea muy simple. Como dije antes, esta lógica funcionaría también en diferentes máquinas. Solo necesitarías reemplazar la tubería con un enchufe.
Es posible que desee un enfoque de terminación más limpio para el proceso del productor. Le sugiero que use un multiprocessing.Event
. multiprocessing.Event
. Simplemente while not event.is_set()
desde el elemento primario en el bloque KeyboardInterrupt
y verifique su estado en el elemento secundario en cada iteración ( while not event.is_set()
).
Su primer problema fue que no podía acceder a la cámara web en un proceso forked
. Surgen varios problemas cuando las bibliotecas externas se usan con fork
ya que la operación fork no limpia todos los descriptores de archivos abiertos por el proceso principal, lo que genera un comportamiento extraño. La biblioteca a menudo es más robusta a este tipo de problema en Linux, pero no es una buena idea compartir un objeto IO como cv2.VideoCapture
entre los 2 procesos.
Cuando usa billard.forking_enabled
y lo establece en False
, le pide a la biblioteca que no use fork
para generar nuevos procesos, sino métodos spawn
o forkserver
, que son más limpios ya que cierran todos los descriptores de archivos pero también son más lentos para comenzar. Esto no debería ser un problema en tu caso. Si está usando python3.4+
, puede hacerlo usando multiprocessing.set_start_method(''forkserver'')
por ejemplo.
Cuando utiliza uno de estos métodos, la función de destino y los argumentos deben ser serializados para pasarlos al proceso hijo. La serialización se realiza de forma predeterminada utilizando pickle
, que tiene varios flujos, como el que mencionó, sin poder serializar objetos definidos localmente y también cv2.VideoCapture
. Pero puede simplificar su programa para que todos los argumentos de su Process
elegibles. Aquí hay un intento para resolver su problema:
import numpy as np
import time
import ctypes
from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(float allowed)
"""
self._delay = delay
self._device = device
self._cap = cv2.VideoCapture(device)
assert self._cap.isOpened()
def __getstate__(self):
self._cap.release()
return (self._delay, self._device)
def __setstate__(self, state):
self._delay, self._device = state
self._cap = cv2.VideoCapture(self._device)
assert self._cap.grab(), "The child could not grab the video capture"
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there''s no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture "
"the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def release(self):
self._cap.release()
def stream(capturer, image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing
:param finished: synchronized wrapper for int
:return: nothing
"""
shape = capturer.get_size()
# Define shared variables
frame = np.ctypeslib.as_array(image.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
# Incorrect input array
if frame.shape != capturer.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
frame[:, :, :] = capturer._proper_frame(capturer._delay)
# Release the device
capturer.release()
def main():
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
# Define shared variables
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value(''i'', 0)
# Start processes which run in parallel
video_process = Process(target=stream,
args=(cap, shared_array_base, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.join()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow(''frame'', frame)
# Display it at least one ms before going to the next frame
time.sleep(0.1)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == ''__main__'':
set_start_method("spawn")
main()
No pude probarlo en mac en este momento, por lo que podría no funcionar de la caja, pero no debería haber errores relacionados con multiprocessing
. Algunas notas:
- I instancia el objeto
cv2.VideoCapture
en el nuevo elemento secundario y agarro la cámara ya que solo un proceso debe leer desde la cámara. - Tal vez el problema en su primer programa con
fork
solo se deba alcv2.VideoCapture
compartido.cv2.VideoCapture
ycv2.VideoCapture
en la funciónstream
resuelva su problema. - No se puede pasar la envoltura numpy al niño, ya que no compartirá el búfer
mp.Array
(esto es realmente extraño y me tomó un tiempo darme cuenta). Necesita pasar explícitamente laArray
y volver a crear un contenedor. Tal vez el problema en su primer programa con
fork
solo se deba alcv2.VideoCapture
compartido.cv2.VideoCapture
ycv2.VideoCapture
en la funciónstream
resuelva su problema.Supuse que estaba ejecutando su código en
python3.4+
así que no usébillard
pero usarforking_enabled(False)
lugar deset_start_method
debería ser similar.
¡Avíseme si esto funciona!