python multiprocessing deep-learning caffe gpgpu

Problemas de clasificación de imágenes en tiempo real de Python con redes neuronales



multiprocessing deep-learning (4)

Parece que la envoltura python de Caffe bloquea el bloqueo global de intérprete (GIL) . Por lo tanto, al llamar a cualquier comando caffe python, se bloquean TODOS los hilos pitón.

Una solución (a su propio riesgo) sería deshabilitar la GIL para funciones específicas de caffe. Por ejemplo, si desea poder forward sin bloqueo, puede editar $CAFFE_ROOT/python/caffe/_caffe.cpp . Añade esta función:

void Net_Forward(Net<Dtype>& net, int start, int end) { Py_BEGIN_ALLOW_THREADS; // <-- disable GIL net.ForwardFromTo(start, end); Py_END_ALLOW_THREADS; // <-- restore GIL }

Y reemplace .def("_forward", &Net<Dtype>::ForwardFromTo) con:

.def("_forward", &Net_Forward)

No te olvides de make pycaffe después del cambio.

Mira this para más detalles.

Estoy intentando usar caffe y python para hacer una clasificación de imágenes en tiempo real. Estoy usando OpenCV para transmitir desde mi cámara web en un proceso, y en un proceso separado, usando caffe para realizar la clasificación de imágenes en los cuadros extraídos de la cámara web. Luego, estoy pasando el resultado de la clasificación al hilo principal para subtitular el flujo de la cámara web.

El problema es que a pesar de que tengo una GPU NVIDIA y estoy realizando las predicciones de caffe en la GPU, el hilo principal se ralentiza. Normalmente, sin hacer ninguna predicción, la transmisión de mi cámara web se ejecuta a 30 fps; sin embargo, con las predicciones, la transmisión de mi cámara web alcanza, en el mejor de los casos, 15 fps.

He comprobado que caffe está utilizando la GPU al realizar las predicciones, y que mi GPU o la memoria de mi GPU no se está agotando. También he verificado que mis núcleos de CPU no se agotan en ningún momento durante el programa. Me pregunto si estoy haciendo algo mal o si no hay manera de mantener estos 2 procesos verdaderamente separados. Cualquier consejo es apreciado. Aquí está mi código de referencia

class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue #other initialization stuff def run(self): caffe.set_mode_gpu() caffe.set_device(0) #Load caffe net -- code omitted while True: image = self.task_queue.get() #crop image -- code omitted text = net.predict(image) self.result_queue.put(text) return import cv2 import caffe import multiprocessing import Queue tasks = multiprocessing.Queue() results = multiprocessing.Queue() consumer = Consumer(tasks,results) consumer.start() #Creating window and starting video capturer from camera cv2.namedWindow("preview") vc = cv2.VideoCapture(0) #Try to get the first frame if vc.isOpened(): rval, frame = vc.read() else: rval = False frame_copy[:] = frame task_empty = True while rval: if task_empty: tasks.put(frame_copy) task_empty = False if not results.empty(): text = results.get() #Add text to frame cv2.putText(frame,text) task_empty = True #Showing the frame with all the applied modifications cv2.imshow("preview", frame) #Getting next frame from camera rval, frame = vc.read() frame_copy[:] = frame #Getting keyboard input key = cv2.waitKey(1) #exit on ESC if key == 27: break

Estoy bastante seguro de que es la predicción de caffe que ralentiza todo, porque cuando comento la predicción y paso el texto ficticio entre los procesos, obtengo 30 fps de nuevo.

class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue #other initialization stuff def run(self): caffe.set_mode_gpu() caffe.set_device(0) #Load caffe net -- code omitted while True: image = self.task_queue.get() #crop image -- code omitted #text = net.predict(image) text = "dummy text" self.result_queue.put(text) return import cv2 import caffe import multiprocessing import Queue tasks = multiprocessing.Queue() results = multiprocessing.Queue() consumer = Consumer(tasks,results) consumer.start() #Creating window and starting video capturer from camera cv2.namedWindow("preview") vc = cv2.VideoCapture(0) #Try to get the first frame if vc.isOpened(): rval, frame = vc.read() else: rval = False frame_copy[:] = frame task_empty = True while rval: if task_empty: tasks.put(frame_copy) task_empty = False if not results.empty(): text = results.get() #Add text to frame cv2.putText(frame,text) task_empty = True #Showing the frame with all the applied modifications cv2.imshow("preview", frame) #Getting next frame from camera rval, frame = vc.read() frame_copy[:] = frame #Getting keyboard input key = cv2.waitKey(1) #exit on ESC if key == 27: break


Pruebe el enfoque de múltiples subprocesos en lugar del multiprocesamiento. Los procesos de desove son más lentos que el desove en los hilos. Una vez que se están ejecutando, no hay mucha diferencia. En su caso, creo que el enfoque de enhebrado se beneficiará ya que hay tantos datos de cuadros involucrados.


Se podría pensar en el código, es decir, funciona en modo gpu para la primera llamada y en llamadas posteriores calcula la clasificación en modo cpu, ya que es el modo predeterminado. En la versión anterior de caffe, el modo gpu establecido por una vez era suficiente, ahora la versión más reciente necesita establecer el modo cada vez. Puedes probar con el siguiente cambio:

def run(self): #Load caffe net -- code omitted while True: caffe.set_mode_gpu() caffe.set_device(0) image = self.task_queue.get() #crop image -- code omitted text = net.predict(image) self.result_queue.put(text) return

También, por favor, eche un vistazo a los tiempos gpu mientras se está ejecutando el hilo del consumidor. Puedes usar el siguiente comando para nvidia:

nvidia-smi

El comando anterior le mostrará la utilización de gpu en tiempo de ejecución.

Si no resuelve otra solución, haga que el código de extracción del marco de opencv esté debajo de un hilo. Dado que está relacionado con la E / S y el acceso al dispositivo, puede obtener beneficios ejecutándolo en un subproceso separado del subproceso de la GUI / subproceso principal. Ese hilo empujará los marcos en una cola y el hilo del consumidor actual lo predecirá. En ese caso maneja con cuidado la cola con bloque crítico.


Algunas explicaciones y algunas reflexiones:

  1. Intel Core i5-6300HQ @2.3GHz mi código a continuación en una computadora portátil con un Intel Core i5-6300HQ @2.3GHz , 8 GB RAM y NVIDIA GeForce GTX 960M gpu (memoria de 2GB), y el resultado fue:

    Si ejecuté el código con caffe corriendo o no (al comentar o no net_output = this->net_->Forward(net_input) y algunas cosas necesarias en el void Consumer::entry() ), siempre pude obtener alrededor de 30 fps en El hilo principal.

    El resultado similar se obtuvo en una PC con una Intel Core i5-4440 , 8 GB RAM , NVIDIA GeForce GT 630 gpu (1 GB de memoria).

  2. @user3543300 el código de @user3543300 en la pregunta en la misma computadora portátil, el resultado fue:

    Si caffe se estaba ejecutando (en gpu) o no, también podría obtener alrededor de 30 fps.

  3. Según los comentarios de @user3543300 , con las 2 versiones de código mencionadas anteriormente, @ user3543300 podría obtener solo alrededor de 15 fps, cuando ejecuta caffe (en una Nvidia GeForce 940MX GPU and Intel® Core™ i7-6500U CPU @ 2.50GHz × 4 ordenador portátil). Y también habrá una desaceleración de la velocidad de cuadros de la cámara web cuando se ejecuta caffe en gpu como un programa independiente.

Así que sigo pensando que el problema puede estar en las limitaciones de E / S del hardware, como el ancho de banda DMA (este hilo sobre DMA puede indicar) o el ancho de banda de la RAM. Hope @user3543300 puede verificar esto o descubrir el verdadero problema del que no me he dado cuenta.

Si el problema es realmente lo que pienso más arriba, un pensamiento sensato sería reducir la sobrecarga de E / S de memoria introducida por la red CNN. De hecho, para resolver el problema similar en sistemas integrados con recursos de hardware limitados, se han realizado algunas investigaciones sobre este tema, por ejemplo, Redes Neuronales Profundas Estructuralmente SqueezeNet , SqueezeNet , Deep-Compression . Con suerte, también ayudará a mejorar la velocidad de fotogramas de la cámara web en la pregunta mediante la aplicación de estas habilidades.

Respuesta Original:

Prueba esta solución c ++. Utiliza subprocesos para la sobrecarga de E / S en su tarea, lo probé usando bvlc_alexnet.caffemodel , deploy.prototxt para hacer la clasificación de imagen y no deploy.prototxt una desaceleración obvia del hilo principal (flujo de cámara web) cuando se ejecutaba caffe (en GPU )

#include <stdio.h> #include <iostream> #include <string> #include <boost/thread.hpp> #include <boost/shared_ptr.hpp> #include "caffe/caffe.hpp" #include "caffe/util/blocking_queue.hpp" #include "caffe/data_transformer.hpp" #include "opencv2/opencv.hpp" using namespace cv; //Queue pair for sharing image/results between webcam and caffe threads template<typename T> class QueuePair { public: explicit QueuePair(int size); ~QueuePair(); caffe::BlockingQueue<T*> free_; caffe::BlockingQueue<T*> full_; DISABLE_COPY_AND_ASSIGN(QueuePair); }; template<typename T> QueuePair<T>::QueuePair(int size) { // Initialize the free queue for (int i = 0; i < size; ++i) { free_.push(new T); } } template<typename T> QueuePair<T>::~QueuePair(){ T *data; while (free_.try_pop(&data)){ delete data; } while (full_.try_pop(&data)){ delete data; } } template class QueuePair<Mat>; template class QueuePair<std::string>; //Do image classification(caffe predict) using a subthread class Consumer{ public: Consumer(boost::shared_ptr<QueuePair<Mat>> task , boost::shared_ptr<QueuePair<std::string>> result); ~Consumer(); void Run(); void Stop(); void entry(boost::shared_ptr<QueuePair<Mat>> task , boost::shared_ptr<QueuePair<std::string>> result); private: bool must_stop(); boost::shared_ptr<QueuePair<Mat> > task_q_; boost::shared_ptr<QueuePair<std::string> > result_q_; //caffe::Blob<float> *net_input_blob_; boost::shared_ptr<caffe::DataTransformer<float> > data_transformer_; boost::shared_ptr<caffe::Net<float> > net_; std::vector<std::string> synset_words_; boost::shared_ptr<boost::thread> thread_; }; Consumer::Consumer(boost::shared_ptr<QueuePair<Mat>> task , boost::shared_ptr<QueuePair<std::string>> result) : task_q_(task), result_q_(result), thread_(){ //for data preprocess caffe::TransformationParameter trans_para; //set mean trans_para.set_mean_file("/path/to/imagenet_mean.binaryproto"); //set crop size, here is cropping 227x227 from 256x256 trans_para.set_crop_size(227); //instantiate a DataTransformer using trans_para for image preprocess data_transformer_.reset(new caffe::DataTransformer<float>(trans_para , caffe::TEST)); //initialize a caffe net net_.reset(new caffe::Net<float>(std::string("/path/to/deploy.prototxt") , caffe::TEST)); //net parameter net_->CopyTrainedLayersFrom(std::string("/path/to/bvlc_alexnet.caffemodel")); std::fstream synset_word("path/to/caffe/data/ilsvrc12/synset_words.txt"); std::string line; if (!synset_word.good()){ std::cerr << "synset words open failed!" << std::endl; } while (std::getline(synset_word, line)){ synset_words_.push_back(line.substr(line.find_first_of('' ''), line.length())); } //a container for net input, holds data converted from cv::Mat //net_input_blob_ = new caffe::Blob<float>(1, 3, 227, 227); } Consumer::~Consumer(){ Stop(); //delete net_input_blob_; } void Consumer::entry(boost::shared_ptr<QueuePair<Mat>> task , boost::shared_ptr<QueuePair<std::string>> result){ caffe::Caffe::set_mode(caffe::Caffe::GPU); caffe::Caffe::SetDevice(0); cv::Mat *frame; cv::Mat resized_image(256, 256, CV_8UC3); cv::Size re_size(resized_image.cols, resized_image.rows); //for caffe input and output const std::vector<caffe::Blob<float> *> net_input = this->net_->input_blobs(); std::vector<caffe::Blob<float> *> net_output; //net_input.push_back(net_input_blob_); std::string *res; int pre_num = 1; while (!must_stop()){ std::stringstream result_strm; frame = task->full_.pop(); cv::resize(*frame, resized_image, re_size, 0, 0, CV_INTER_LINEAR); this->data_transformer_->Transform(resized_image, *net_input[0]); net_output = this->net_->Forward(); task->free_.push(frame); res = result->free_.pop(); //Process results here for (int i = 0; i < pre_num; ++i){ result_strm << synset_words_[net_output[0]->cpu_data()[i]] << " " << net_output[0]->cpu_data()[i + pre_num] << "/n"; } *res = result_strm.str(); result->full_.push(res); } } void Consumer::Run(){ if (!thread_){ try{ thread_.reset(new boost::thread(&Consumer::entry, this, task_q_, result_q_)); } catch (std::exception& e) { std::cerr << "Thread exception: " << e.what() << std::endl; } } else std::cout << "Consumer thread may have been running!" << std::endl; }; void Consumer::Stop(){ if (thread_ && thread_->joinable()){ thread_->interrupt(); try { thread_->join(); } catch (boost::thread_interrupted&) { } catch (std::exception& e) { std::cerr << "Thread exception: " << e.what() << std::endl; } } } bool Consumer::must_stop(){ return thread_ && thread_->interruption_requested(); } int main(void) { int max_queue_size = 1000; boost::shared_ptr<QueuePair<Mat>> tasks(new QueuePair<Mat>(max_queue_size)); boost::shared_ptr<QueuePair<std::string>> results(new QueuePair<std::string>(max_queue_size)); char str[100], info_str[100] = " results: "; VideoCapture vc(0); if (!vc.isOpened()) return -1; Consumer consumer(tasks, results); consumer.Run(); Mat frame, *frame_copy; namedWindow("preview"); double t, fps; while (true){ t = (double)getTickCount(); vc.read(frame); if (waitKey(1) >= 0){ consuer.Stop(); break; } if (tasks->free_.try_peek(&frame_copy)){ frame_copy = tasks->free_.pop(); *frame_copy = frame.clone(); tasks->full_.push(frame_copy); } std::string *res; std::string frame_info(""); if (results->full_.try_peek(&res)){ res = results->full_.pop(); frame_info = frame_info + info_str; frame_info = frame_info + *res; results->free_.push(res); } t = ((double)getTickCount() - t) / getTickFrequency(); fps = 1.0 / t; sprintf(str, " fps: %.2f", fps); frame_info = frame_info + str; putText(frame, frame_info, Point(5, 20) , FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0)); imshow("preview", frame); } }

Y en src/caffe/util/blocking_queue.cpp , haga un pequeño cambio a continuación y reconstruya caffe:

...//Other stuff template class BlockingQueue<Batch<float>*>; template class BlockingQueue<Batch<double>*>; template class BlockingQueue<Datum*>; template class BlockingQueue<shared_ptr<DataReader::QueuePair> >; template class BlockingQueue<P2PSync<float>*>; template class BlockingQueue<P2PSync<double>*>; //add these 2 lines below template class BlockingQueue<cv::Mat*>; template class BlockingQueue<std::string*>;