python multithreading keras deep-learning multiprocessing

python - ¿Cómo se puede aprovechar el multiprocesamiento y multiproceso en el aprendizaje profundo con Keras?



multithreading deep-learning (1)

¡Es bueno que entrenar un modelo no use todo el 100% de su CPU! Ahora tenemos espacio para entrenar varios modelos en paralelo y acelerar sus tiempos de entrenamiento en general.

NB: Si solo desea acelerar este modelo, busque GPU o cambie los hiperparámetros como el tamaño de lote y la cantidad de neuronas (tamaño de capa).

A continuación, le multiprocessing cómo puede utilizar el multiprocessing para capacitar a varios modelos al mismo tiempo (utilizando procesos que se ejecutan en paralelo en cada núcleo de CPU de su máquina).

El multiprocessing.Pool La multiprocessing.Pool básicamente crea un conjunto de trabajos que deben realizarse. Los procesos recogerán estos trabajos y los ejecutarán. Cuando se termina un trabajo, el proceso recogerá otro trabajo del grupo.

import time import signal import multiprocessing def init_worker(): '''''' Add KeyboardInterrupt exception to mutliprocessing workers '''''' signal.signal(signal.SIGINT, signal.SIG_IGN) def train_model(layer_size): '''''' This code is parallelised and runs on each process It trains a model with different layer sizes (hyperparameters) It saves the model and returns the score (error) '''''' import keras from keras.models import Sequential from keras.layers import Dense print(f''Training a model with layer size {layer_size}'') # build your model here model_RNN = Sequential() model_RNN.add(Dense(layer_size)) # fit the model (the bit that takes time!) model_RNN.fit(...) # lets demonstrate with a sleep timer time.sleep(5) # save trained model to a file model_RNN.save(...) # you can also return values eg. the eval score return model_RNN.evaluate(...) num_workers = 4 hyperparams = [800, 960, 1100] pool = multiprocessing.Pool(num_workers, init_worker) scores = pool.map(train_model, hyperparams) print(scores)

Salida:

Training a model with layer size 800 Training a model with layer size 960 Training a model with layer size 1100 [{''size'':960,''score'':1.0}, {''size'':800,''score'':1.2}, {''size'':1100,''score'':0.7}]

Esto se demuestra fácilmente con un time.sleep en el código. Verá que los 3 procesos comienzan el trabajo de capacitación y luego todos terminan casi al mismo tiempo. Si se procesó solo, tendría que esperar a que cada uno termine antes de comenzar el siguiente (¡bostezo!).

EDITAR OP también quería código completo. Esto es difícil en Stack Overflow porque no puedo realizar pruebas en su entorno y con su código. Me he tomado la libertad de copiar y pegar su código en mi plantilla de arriba. Es posible que deba agregar algunas importaciones, pero esto es lo más cerca que se puede llegar al código "ejecutable" y "completo".

import time import signal import numpy as np import pandas as pd import multiprocessing from sklearn.preprocessing import MinMaxScaler from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error from sklearn.metrics import accuracy_score def init_worker(): '''''' Add KeyboardInterrupt exception to mutliprocessing workers '''''' signal.signal(signal.SIGINT, signal.SIG_IGN) def train_model(model_type): '''''' This code is parallelised and runs on each process It trains a model with different layer sizes (hyperparameters) It saves the model and returns the score (error) '''''' from keras.layers import LSTM, SimpleRNN, Dense, Activation from keras.models import Sequential from keras.callbacks import EarlyStopping, ReduceLROnPlateau from keras.layers.normalization import BatchNormalization print(f''Training a model: {model_type}'') callbacks = [ EarlyStopping(patience=10, verbose=1), ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1), ] model = Sequential() if model_type == ''rnn'': model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2]))) elif model_type == ''lstm'': model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2]))) model.add(Dense(480)) model.add(BatchNormalization()) model.add(Activation(''tanh'')) model.compile(loss=''mean_squared_error'', optimizer=''adam'') model.fit( trainX, trainY, epochs=50, batch_size=20, validation_data=(testX, testY), verbose=1, callbacks=callbacks, ) # predict Y_Train_pred = model.predict(trainX) Y_Test_pred = model.predict(testX) train_MSE = mean_squared_error(trainY, Y_Train_pred) test_MSE = mean_squared_error(testY, Y_Test_pred) # you can also return values eg. the eval score return {''type'': model_type, ''train_MSE'': train_MSE, ''test_MSE'': test_MSE} # Your code # --------- df = pd.read_csv("D:/Train.csv", header=None) index = [i for i in list(range(1440)) if i % 3 == 2] Y_train = df[index] df = df.values # making history by using look-back to prediction next def create_dataset(dataset, data_train, look_back=1): dataX, dataY = [], [] print("Len:", len(dataset) - look_back - 1) for i in range(len(dataset) - look_back - 1): a = dataset[i : (i + look_back), :] dataX.append(a) dataY.append(data_train[i + look_back, :]) return np.array(dataX), np.array(dataY) Y_train = np.array(Y_train) df = np.array(df) look_back = 10 trainX, trainY = create_dataset(df, Y_train, look_back=look_back) # Split data into train & test trainX, testX, trainY, testY = train_test_split( trainX, trainY, test_size=0.2, shuffle=False ) # My Code # ------- num_workers = 2 model_types = [''rnn'', ''lstm''] pool = multiprocessing.Pool(num_workers, init_worker) scores = pool.map(train_model, model_types) print(scores)

Salida del programa:

[{''type'': ''rnn'', ''train_MSE'': 0.06648435491248038, ''test_MSE'': 0.062323388902691866}, {''type'': ''lstm'', ''train_MSE'': 0.10114341514420684, ''test_MSE'': 0.09998065769499974}]

Supongo que la mayoría de los marcos como keras / tensorflow / ... utilizan automáticamente todos los núcleos de la CPU, pero en la práctica parece que no lo son. Solo pude encontrar algunas fuentes que nos pueden llevar a utilizar toda la capacidad de la CPU durante el proceso de aprendizaje profundo. Encontré un article que está escrito sobre el uso de

from multiprocessing import Pool import psutil import ray

por otro lado, basado en esta answer para usar un modelo de keras en múltiples procesos, no hay un seguimiento de las bibliotecas mencionadas anteriormente. ¿Existe la forma más elegante de aprovechar Multiprocessing for Keras ya que es muy popular para la implementación?

  • Por ejemplo, ¿cómo se puede modificar después de la implementación simple de RNN para lograr al menos un 50% de capacidad de CPU durante el proceso de aprendizaje?

  • ¿Debo usar el 2º modelo como multitarea como LSTM que comento a continuación? Quiero decir, ¿podemos gestionar simultáneamente la ejecución de varios modelos utilizando más capacidad de CPU?

import numpy as np import pandas as pd import matplotlib.pyplot as plt from keras.layers.normalization import BatchNormalization from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error from sklearn.metrics import accuracy_score from sklearn.model_selection import train_test_split from keras.layers import Dense from keras.layers import Dropout from keras.layers import LSTM,SimpleRNN from keras.models import Sequential from keras.optimizers import Adam, RMSprop df = pd.read_csv("D:/Train.csv", header=None) index = [i for i in list(range(1440)) if i%3==2] Y_train= df[index] df = df.values #making history by using look-back to prediction next def create_dataset(dataset,data_train,look_back=1): dataX,dataY = [],[] print("Len:",len(dataset)-look_back-1) for i in range(len(dataset)-look_back-1): a = dataset[i:(i+look_back), :] dataX.append(a) dataY.append(data_train[i + look_back, :]) return np.array(dataX), np.array(dataY) Y_train=np.array(Y_train) df=np.array(df) look_back = 10 trainX,trainY = create_dataset(df,Y_train, look_back=look_back) #Split data into train & test trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False) #Shape of train and test data trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False) print("train size: {}".format(trainX.shape)) print("train Label size: {}".format(trainY.shape)) print("test size: {}".format(testX.shape)) print("test Label size: {}".format(testY.shape)) #train size: (23, 10, 1440) #train Label size: (23, 960) #test size: (6, 10, 1440) #test Label size: (6, 960) model_RNN = Sequential() model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2]))) model_RNN.add(Dense(960)) model_RNN.add(BatchNormalization()) model_RNN.add(Activation(''tanh'')) # Compile model model_RNN.compile(loss=''mean_squared_error'', optimizer=''adam'') callbacks = [ EarlyStopping(patience=10, verbose=1), ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)] # Fit the model hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks) #predict Y_train=np.array(trainY) Y_test=np.array(testX) Y_RNN_Train_pred=model_RNN.predict(trainX) Y_RNN_Test_pred=model_RNN.predict(testX) train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred) test_MSE=mean_squared_error(testY, Y_RNN_Test_pred) # create and fit the Simple LSTM model as 2nd model for multi-tasking #model_LSTM = Sequential() #model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2]))) #model_LSTM.add(Dense(units = 960)) #model_LSTM.add(BatchNormalization()) #model_LSTM.add(Activation(''tanh'')) #model_LSTM.compile(loss=''mean_squared_error'', optimizer=''adam'') #hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks) #Y_train=np.array(trainY) #Y_test=np.array(testX) #Y_LSTM_Train_pred=model_LSTM.predict(trainX) #Y_LSTM_Test_pred=model_LSTM.predict(testX) #train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred) #test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred) #plot losses for RNN + LSTM f, ax = plt.subplots(figsize=(20, 15)) plt.subplot(1, 2, 1) ax=plt.plot(hist_RNN.history[''loss''] ,label=''Train loss'') ax=plt.plot(hist_RNN.history[''val_loss''],label=''Test/Validation/Prediction loss'') plt.xlabel(''Training steps (Epochs = 50)'') plt.ylabel(''Loss (MSE) for Sx-Sy & Sxy'') plt.title('' RNN Loss on Train and Test data'') plt.legend() plt.subplot(1, 2, 2) ax=plt.plot(hist_LSTM.history[''loss''] ,label=''Train loss'') ax=plt.plot(hist_LSTM.history[''val_loss''],label=''Test/Validation/Prediction loss'') plt.xlabel(''Training steps (Epochs = 50)'') plt.ylabel(''Loss (MSE) for Sx-Sy & Sxy'') plt.title(''LSTM Loss on Train and Test data'') plt.legend() plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28) #plt.savefig(''All_Losses_history_.png'') plt.show()

Nota : no accedo a CUDA, solo accedo a un servidor potente sin VGA. Mi objetivo es aprovechar el multiproceso y el subprocesamiento múltiple para utilizar la capacidad máxima de CPU en lugar del 30%. ¡Significa solo un núcleo mientras que tengo Quad-core! Cualquier consejo sería muy apreciado. He subido un conjunto de datos csv formato.

Actualización: mi configuración HW es la siguiente:

  • CPU: AMD A8-7650K Radeon R7 10 Núcleos informáticos 4C + 6G 3.30 GHz
  • RAM: 16GB
  • OS: Win 7
  • Python ver 3.6.6
  • Tensorflow ver 1.8.0
  • Keras ver 2.2.4