python - ¿Cómo se puede aprovechar el multiprocesamiento y multiproceso en el aprendizaje profundo con Keras?
multithreading deep-learning (1)
¡Es bueno que entrenar un modelo no use todo el 100% de su CPU! Ahora tenemos espacio para entrenar varios modelos en paralelo y acelerar sus tiempos de entrenamiento en general.
NB: Si solo desea acelerar este modelo, busque GPU o cambie los hiperparámetros como el tamaño de lote y la cantidad de neuronas (tamaño de capa).
A continuación, le
multiprocessing
cómo puede utilizar el
multiprocessing
para capacitar a varios modelos al mismo tiempo (utilizando procesos que se ejecutan en paralelo en cada núcleo de CPU de su máquina).
El
multiprocessing.Pool
La
multiprocessing.Pool
básicamente crea un conjunto de trabajos que deben realizarse.
Los procesos recogerán estos trabajos y los ejecutarán.
Cuando se termina un trabajo, el proceso recogerá otro trabajo del grupo.
import time
import signal
import multiprocessing
def init_worker():
'''''' Add KeyboardInterrupt exception to mutliprocessing workers ''''''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(layer_size):
''''''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
''''''
import keras
from keras.models import Sequential
from keras.layers import Dense
print(f''Training a model with layer size {layer_size}'')
# build your model here
model_RNN = Sequential()
model_RNN.add(Dense(layer_size))
# fit the model (the bit that takes time!)
model_RNN.fit(...)
# lets demonstrate with a sleep timer
time.sleep(5)
# save trained model to a file
model_RNN.save(...)
# you can also return values eg. the eval score
return model_RNN.evaluate(...)
num_workers = 4
hyperparams = [800, 960, 1100]
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, hyperparams)
print(scores)
Salida:
Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{''size'':960,''score'':1.0}, {''size'':800,''score'':1.2}, {''size'':1100,''score'':0.7}]
Esto se demuestra fácilmente con un
time.sleep
en el código.
Verá que los 3 procesos comienzan el trabajo de capacitación y luego todos terminan casi al mismo tiempo.
Si se procesó solo, tendría que esperar a que cada uno termine antes de comenzar el siguiente (¡bostezo!).
EDITAR OP también quería código completo. Esto es difícil en Stack Overflow porque no puedo realizar pruebas en su entorno y con su código. Me he tomado la libertad de copiar y pegar su código en mi plantilla de arriba. Es posible que deba agregar algunas importaciones, pero esto es lo más cerca que se puede llegar al código "ejecutable" y "completo".
import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
def init_worker():
'''''' Add KeyboardInterrupt exception to mutliprocessing workers ''''''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(model_type):
''''''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
''''''
from keras.layers import LSTM, SimpleRNN, Dense, Activation
from keras.models import Sequential
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.layers.normalization import BatchNormalization
print(f''Training a model: {model_type}'')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
]
model = Sequential()
if model_type == ''rnn'':
model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
elif model_type == ''lstm'':
model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model.add(Dense(480))
model.add(BatchNormalization())
model.add(Activation(''tanh''))
model.compile(loss=''mean_squared_error'', optimizer=''adam'')
model.fit(
trainX,
trainY,
epochs=50,
batch_size=20,
validation_data=(testX, testY),
verbose=1,
callbacks=callbacks,
)
# predict
Y_Train_pred = model.predict(trainX)
Y_Test_pred = model.predict(testX)
train_MSE = mean_squared_error(trainY, Y_Train_pred)
test_MSE = mean_squared_error(testY, Y_Test_pred)
# you can also return values eg. the eval score
return {''type'': model_type, ''train_MSE'': train_MSE, ''test_MSE'': test_MSE}
# Your code
# ---------
df = pd.read_csv("D:/Train.csv", header=None)
index = [i for i in list(range(1440)) if i % 3 == 2]
Y_train = df[index]
df = df.values
# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
dataX, dataY = [], []
print("Len:", len(dataset) - look_back - 1)
for i in range(len(dataset) - look_back - 1):
a = dataset[i : (i + look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train = np.array(Y_train)
df = np.array(df)
look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)
# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
trainX, trainY, test_size=0.2, shuffle=False
)
# My Code
# -------
num_workers = 2
model_types = [''rnn'', ''lstm'']
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, model_types)
print(scores)
Salida del programa:
[{''type'': ''rnn'', ''train_MSE'': 0.06648435491248038, ''test_MSE'': 0.062323388902691866},
{''type'': ''lstm'', ''train_MSE'': 0.10114341514420684, ''test_MSE'': 0.09998065769499974}]
Supongo que la mayoría de los marcos como keras / tensorflow / ... utilizan automáticamente todos los núcleos de la CPU, pero en la práctica parece que no lo son. Solo pude encontrar algunas fuentes que nos pueden llevar a utilizar toda la capacidad de la CPU durante el proceso de aprendizaje profundo. Encontré un article que está escrito sobre el uso de
from multiprocessing import Pool
import psutil
import ray
por otro lado, basado en esta answer para usar un modelo de keras en múltiples procesos, no hay un seguimiento de las bibliotecas mencionadas anteriormente. ¿Existe la forma más elegante de aprovechar Multiprocessing for Keras ya que es muy popular para la implementación?
-
Por ejemplo, ¿cómo se puede modificar después de la implementación simple de RNN para lograr al menos un 50% de capacidad de CPU durante el proceso de aprendizaje?
-
¿Debo usar el 2º modelo como multitarea como LSTM que comento a continuación? Quiero decir, ¿podemos gestionar simultáneamente la ejecución de varios modelos utilizando más capacidad de CPU?
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop
df = pd.read_csv("D:/Train.csv", header=None)
index = [i for i in list(range(1440)) if i%3==2]
Y_train= df[index]
df = df.values
#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
dataX,dataY = [],[]
print("Len:",len(dataset)-look_back-1)
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train=np.array(Y_train)
df=np.array(df)
look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)
#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)
model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation(''tanh''))
# Compile model
model_RNN.compile(loss=''mean_squared_error'', optimizer=''adam'')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#predict
Y_train=np.array(trainY)
Y_test=np.array(testX)
Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)
train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)
# create and fit the Simple LSTM model as 2nd model for multi-tasking
#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation(''tanh''))
#model_LSTM.compile(loss=''mean_squared_error'', optimizer=''adam'')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#Y_train=np.array(trainY)
#Y_test=np.array(testX)
#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)
#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)
#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
plt.subplot(1, 2, 1)
ax=plt.plot(hist_RNN.history[''loss''] ,label=''Train loss'')
ax=plt.plot(hist_RNN.history[''val_loss''],label=''Test/Validation/Prediction loss'')
plt.xlabel(''Training steps (Epochs = 50)'')
plt.ylabel(''Loss (MSE) for Sx-Sy & Sxy'')
plt.title('' RNN Loss on Train and Test data'')
plt.legend()
plt.subplot(1, 2, 2)
ax=plt.plot(hist_LSTM.history[''loss''] ,label=''Train loss'')
ax=plt.plot(hist_LSTM.history[''val_loss''],label=''Test/Validation/Prediction loss'')
plt.xlabel(''Training steps (Epochs = 50)'')
plt.ylabel(''Loss (MSE) for Sx-Sy & Sxy'')
plt.title(''LSTM Loss on Train and Test data'')
plt.legend()
plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
#plt.savefig(''All_Losses_history_.png'')
plt.show()
Nota : no accedo a CUDA, solo accedo a un servidor potente sin VGA. Mi objetivo es aprovechar el multiproceso y el subprocesamiento múltiple para utilizar la capacidad máxima de CPU en lugar del 30%. ¡Significa solo un núcleo mientras que tengo Quad-core! Cualquier consejo sería muy apreciado. He subido un conjunto de datos csv formato.
Actualización: mi configuración HW es la siguiente:
- CPU: AMD A8-7650K Radeon R7 10 Núcleos informáticos 4C + 6G 3.30 GHz
- RAM: 16GB
- OS: Win 7
- Python ver 3.6.6
- Tensorflow ver 1.8.0
- Keras ver 2.2.4