Problemas de clasificación de imágenes en tiempo real de Python con redes neuronales
multiprocessing deep-learning (4)
Parece que la envoltura python de Caffe bloquea el bloqueo global de intérprete (GIL) . Por lo tanto, al llamar a cualquier comando caffe python, se bloquean TODOS los hilos pitón.
Una solución (a su propio riesgo) sería deshabilitar la GIL para funciones específicas de caffe. Por ejemplo, si desea poder forward
sin bloqueo, puede editar $CAFFE_ROOT/python/caffe/_caffe.cpp
. Añade esta función:
void Net_Forward(Net<Dtype>& net, int start, int end) {
Py_BEGIN_ALLOW_THREADS; // <-- disable GIL
net.ForwardFromTo(start, end);
Py_END_ALLOW_THREADS; // <-- restore GIL
}
Y reemplace .def("_forward", &Net<Dtype>::ForwardFromTo)
con:
.def("_forward", &Net_Forward)
No te olvides de make pycaffe
después del cambio.
Mira this para más detalles.
Estoy intentando usar caffe y python para hacer una clasificación de imágenes en tiempo real. Estoy usando OpenCV para transmitir desde mi cámara web en un proceso, y en un proceso separado, usando caffe para realizar la clasificación de imágenes en los cuadros extraídos de la cámara web. Luego, estoy pasando el resultado de la clasificación al hilo principal para subtitular el flujo de la cámara web.
El problema es que a pesar de que tengo una GPU NVIDIA y estoy realizando las predicciones de caffe en la GPU, el hilo principal se ralentiza. Normalmente, sin hacer ninguna predicción, la transmisión de mi cámara web se ejecuta a 30 fps; sin embargo, con las predicciones, la transmisión de mi cámara web alcanza, en el mejor de los casos, 15 fps.
He comprobado que caffe está utilizando la GPU al realizar las predicciones, y que mi GPU o la memoria de mi GPU no se está agotando. También he verificado que mis núcleos de CPU no se agotan en ningún momento durante el programa. Me pregunto si estoy haciendo algo mal o si no hay manera de mantener estos 2 procesos verdaderamente separados. Cualquier consejo es apreciado. Aquí está mi código de referencia
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
#other initialization stuff
def run(self):
caffe.set_mode_gpu()
caffe.set_device(0)
#Load caffe net -- code omitted
while True:
image = self.task_queue.get()
#crop image -- code omitted
text = net.predict(image)
self.result_queue.put(text)
return
import cv2
import caffe
import multiprocessing
import Queue
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False
frame_copy[:] = frame
task_empty = True
while rval:
if task_empty:
tasks.put(frame_copy)
task_empty = False
if not results.empty():
text = results.get()
#Add text to frame
cv2.putText(frame,text)
task_empty = True
#Showing the frame with all the applied modifications
cv2.imshow("preview", frame)
#Getting next frame from camera
rval, frame = vc.read()
frame_copy[:] = frame
#Getting keyboard input
key = cv2.waitKey(1)
#exit on ESC
if key == 27:
break
Estoy bastante seguro de que es la predicción de caffe que ralentiza todo, porque cuando comento la predicción y paso el texto ficticio entre los procesos, obtengo 30 fps de nuevo.
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
#other initialization stuff
def run(self):
caffe.set_mode_gpu()
caffe.set_device(0)
#Load caffe net -- code omitted
while True:
image = self.task_queue.get()
#crop image -- code omitted
#text = net.predict(image)
text = "dummy text"
self.result_queue.put(text)
return
import cv2
import caffe
import multiprocessing
import Queue
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False
frame_copy[:] = frame
task_empty = True
while rval:
if task_empty:
tasks.put(frame_copy)
task_empty = False
if not results.empty():
text = results.get()
#Add text to frame
cv2.putText(frame,text)
task_empty = True
#Showing the frame with all the applied modifications
cv2.imshow("preview", frame)
#Getting next frame from camera
rval, frame = vc.read()
frame_copy[:] = frame
#Getting keyboard input
key = cv2.waitKey(1)
#exit on ESC
if key == 27:
break
Pruebe el enfoque de múltiples subprocesos en lugar del multiprocesamiento. Los procesos de desove son más lentos que el desove en los hilos. Una vez que se están ejecutando, no hay mucha diferencia. En su caso, creo que el enfoque de enhebrado se beneficiará ya que hay tantos datos de cuadros involucrados.
Se podría pensar en el código, es decir, funciona en modo gpu para la primera llamada y en llamadas posteriores calcula la clasificación en modo cpu, ya que es el modo predeterminado. En la versión anterior de caffe, el modo gpu establecido por una vez era suficiente, ahora la versión más reciente necesita establecer el modo cada vez. Puedes probar con el siguiente cambio:
def run(self):
#Load caffe net -- code omitted
while True:
caffe.set_mode_gpu()
caffe.set_device(0)
image = self.task_queue.get()
#crop image -- code omitted
text = net.predict(image)
self.result_queue.put(text)
return
También, por favor, eche un vistazo a los tiempos gpu mientras se está ejecutando el hilo del consumidor. Puedes usar el siguiente comando para nvidia:
nvidia-smi
El comando anterior le mostrará la utilización de gpu en tiempo de ejecución.
Si no resuelve otra solución, haga que el código de extracción del marco de opencv esté debajo de un hilo. Dado que está relacionado con la E / S y el acceso al dispositivo, puede obtener beneficios ejecutándolo en un subproceso separado del subproceso de la GUI / subproceso principal. Ese hilo empujará los marcos en una cola y el hilo del consumidor actual lo predecirá. En ese caso maneja con cuidado la cola con bloque crítico.
Algunas explicaciones y algunas reflexiones:
Intel Core i5-6300HQ @2.3GHz
mi código a continuación en una computadora portátil con unIntel Core i5-6300HQ @2.3GHz
,8 GB RAM
yNVIDIA GeForce GTX 960M
gpu (memoria de 2GB), y el resultado fue:Si ejecuté el código con caffe corriendo o no (al comentar o no
net_output = this->net_->Forward(net_input)
y algunas cosas necesarias en elvoid Consumer::entry()
), siempre pude obtener alrededor de 30 fps en El hilo principal.El resultado similar se obtuvo en una PC con una
Intel Core i5-4440
,8 GB RAM
,NVIDIA GeForce GT 630
gpu (1 GB de memoria).@user3543300 el código de @user3543300 en la pregunta en la misma computadora portátil, el resultado fue:
Si caffe se estaba ejecutando (en gpu) o no, también podría obtener alrededor de 30 fps.
Según los comentarios de @user3543300 , con las 2 versiones de código mencionadas anteriormente, @ user3543300 podría obtener solo alrededor de 15 fps, cuando ejecuta caffe (en una
Nvidia GeForce 940MX GPU and Intel® Core™ i7-6500U CPU @ 2.50GHz × 4
ordenador portátil). Y también habrá una desaceleración de la velocidad de cuadros de la cámara web cuando se ejecuta caffe en gpu como un programa independiente.
Así que sigo pensando que el problema puede estar en las limitaciones de E / S del hardware, como el ancho de banda DMA (este hilo sobre DMA puede indicar) o el ancho de banda de la RAM. Hope @user3543300 puede verificar esto o descubrir el verdadero problema del que no me he dado cuenta.
Si el problema es realmente lo que pienso más arriba, un pensamiento sensato sería reducir la sobrecarga de E / S de memoria introducida por la red CNN. De hecho, para resolver el problema similar en sistemas integrados con recursos de hardware limitados, se han realizado algunas investigaciones sobre este tema, por ejemplo, Redes Neuronales Profundas Estructuralmente SqueezeNet , SqueezeNet , Deep-Compression . Con suerte, también ayudará a mejorar la velocidad de fotogramas de la cámara web en la pregunta mediante la aplicación de estas habilidades.
Respuesta Original:
Prueba esta solución c ++. Utiliza subprocesos para la sobrecarga de E / S en su tarea, lo probé usando bvlc_alexnet.caffemodel
, deploy.prototxt para hacer la clasificación de imagen y no deploy.prototxt una desaceleración obvia del hilo principal (flujo de cámara web) cuando se ejecutaba caffe (en GPU )
#include <stdio.h>
#include <iostream>
#include <string>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include "caffe/caffe.hpp"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/data_transformer.hpp"
#include "opencv2/opencv.hpp"
using namespace cv;
//Queue pair for sharing image/results between webcam and caffe threads
template<typename T>
class QueuePair {
public:
explicit QueuePair(int size);
~QueuePair();
caffe::BlockingQueue<T*> free_;
caffe::BlockingQueue<T*> full_;
DISABLE_COPY_AND_ASSIGN(QueuePair);
};
template<typename T>
QueuePair<T>::QueuePair(int size) {
// Initialize the free queue
for (int i = 0; i < size; ++i) {
free_.push(new T);
}
}
template<typename T>
QueuePair<T>::~QueuePair(){
T *data;
while (free_.try_pop(&data)){
delete data;
}
while (full_.try_pop(&data)){
delete data;
}
}
template class QueuePair<Mat>;
template class QueuePair<std::string>;
//Do image classification(caffe predict) using a subthread
class Consumer{
public:
Consumer(boost::shared_ptr<QueuePair<Mat>> task
, boost::shared_ptr<QueuePair<std::string>> result);
~Consumer();
void Run();
void Stop();
void entry(boost::shared_ptr<QueuePair<Mat>> task
, boost::shared_ptr<QueuePair<std::string>> result);
private:
bool must_stop();
boost::shared_ptr<QueuePair<Mat> > task_q_;
boost::shared_ptr<QueuePair<std::string> > result_q_;
//caffe::Blob<float> *net_input_blob_;
boost::shared_ptr<caffe::DataTransformer<float> > data_transformer_;
boost::shared_ptr<caffe::Net<float> > net_;
std::vector<std::string> synset_words_;
boost::shared_ptr<boost::thread> thread_;
};
Consumer::Consumer(boost::shared_ptr<QueuePair<Mat>> task
, boost::shared_ptr<QueuePair<std::string>> result) :
task_q_(task), result_q_(result), thread_(){
//for data preprocess
caffe::TransformationParameter trans_para;
//set mean
trans_para.set_mean_file("/path/to/imagenet_mean.binaryproto");
//set crop size, here is cropping 227x227 from 256x256
trans_para.set_crop_size(227);
//instantiate a DataTransformer using trans_para for image preprocess
data_transformer_.reset(new caffe::DataTransformer<float>(trans_para
, caffe::TEST));
//initialize a caffe net
net_.reset(new caffe::Net<float>(std::string("/path/to/deploy.prototxt")
, caffe::TEST));
//net parameter
net_->CopyTrainedLayersFrom(std::string("/path/to/bvlc_alexnet.caffemodel"));
std::fstream synset_word("path/to/caffe/data/ilsvrc12/synset_words.txt");
std::string line;
if (!synset_word.good()){
std::cerr << "synset words open failed!" << std::endl;
}
while (std::getline(synset_word, line)){
synset_words_.push_back(line.substr(line.find_first_of('' ''), line.length()));
}
//a container for net input, holds data converted from cv::Mat
//net_input_blob_ = new caffe::Blob<float>(1, 3, 227, 227);
}
Consumer::~Consumer(){
Stop();
//delete net_input_blob_;
}
void Consumer::entry(boost::shared_ptr<QueuePair<Mat>> task
, boost::shared_ptr<QueuePair<std::string>> result){
caffe::Caffe::set_mode(caffe::Caffe::GPU);
caffe::Caffe::SetDevice(0);
cv::Mat *frame;
cv::Mat resized_image(256, 256, CV_8UC3);
cv::Size re_size(resized_image.cols, resized_image.rows);
//for caffe input and output
const std::vector<caffe::Blob<float> *> net_input = this->net_->input_blobs();
std::vector<caffe::Blob<float> *> net_output;
//net_input.push_back(net_input_blob_);
std::string *res;
int pre_num = 1;
while (!must_stop()){
std::stringstream result_strm;
frame = task->full_.pop();
cv::resize(*frame, resized_image, re_size, 0, 0, CV_INTER_LINEAR);
this->data_transformer_->Transform(resized_image, *net_input[0]);
net_output = this->net_->Forward();
task->free_.push(frame);
res = result->free_.pop();
//Process results here
for (int i = 0; i < pre_num; ++i){
result_strm << synset_words_[net_output[0]->cpu_data()[i]] << " "
<< net_output[0]->cpu_data()[i + pre_num] << "/n";
}
*res = result_strm.str();
result->full_.push(res);
}
}
void Consumer::Run(){
if (!thread_){
try{
thread_.reset(new boost::thread(&Consumer::entry, this, task_q_, result_q_));
}
catch (std::exception& e) {
std::cerr << "Thread exception: " << e.what() << std::endl;
}
}
else
std::cout << "Consumer thread may have been running!" << std::endl;
};
void Consumer::Stop(){
if (thread_ && thread_->joinable()){
thread_->interrupt();
try {
thread_->join();
}
catch (boost::thread_interrupted&) {
}
catch (std::exception& e) {
std::cerr << "Thread exception: " << e.what() << std::endl;
}
}
}
bool Consumer::must_stop(){
return thread_ && thread_->interruption_requested();
}
int main(void)
{
int max_queue_size = 1000;
boost::shared_ptr<QueuePair<Mat>> tasks(new QueuePair<Mat>(max_queue_size));
boost::shared_ptr<QueuePair<std::string>> results(new QueuePair<std::string>(max_queue_size));
char str[100], info_str[100] = " results: ";
VideoCapture vc(0);
if (!vc.isOpened())
return -1;
Consumer consumer(tasks, results);
consumer.Run();
Mat frame, *frame_copy;
namedWindow("preview");
double t, fps;
while (true){
t = (double)getTickCount();
vc.read(frame);
if (waitKey(1) >= 0){
consuer.Stop();
break;
}
if (tasks->free_.try_peek(&frame_copy)){
frame_copy = tasks->free_.pop();
*frame_copy = frame.clone();
tasks->full_.push(frame_copy);
}
std::string *res;
std::string frame_info("");
if (results->full_.try_peek(&res)){
res = results->full_.pop();
frame_info = frame_info + info_str;
frame_info = frame_info + *res;
results->free_.push(res);
}
t = ((double)getTickCount() - t) / getTickFrequency();
fps = 1.0 / t;
sprintf(str, " fps: %.2f", fps);
frame_info = frame_info + str;
putText(frame, frame_info, Point(5, 20)
, FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
imshow("preview", frame);
}
}
Y en src/caffe/util/blocking_queue.cpp , haga un pequeño cambio a continuación y reconstruya caffe:
...//Other stuff
template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
template class BlockingQueue<Datum*>;
template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
template class BlockingQueue<P2PSync<float>*>;
template class BlockingQueue<P2PSync<double>*>;
//add these 2 lines below
template class BlockingQueue<cv::Mat*>;
template class BlockingQueue<std::string*>;