python - ¿Implementación de red neuronal personalizada en MNIST usando Tensorflow 2.0?
python-3.x neural-network (3)
Intenté escribir una implementación personalizada de la red neuronal básica con dos capas ocultas en el conjunto de datos MNIST usando
*TensorFlow 2.0 beta*
pero no estoy seguro de qué salió mal aquí, pero mi
pérdida de entrenamiento
y
precisión
parecen quedarse en
1.5
y alrededor de
85
respectivamente.
Pero si construyo usando
Keras
,
obtenía
una pérdida de entrenamiento muy baja y una precisión superior al
95%
con solo
8-10
épocas.
¿Creo que tal vez no estoy actualizando mis pesas o algo así? Entonces, ¿debo asignar mis nuevos pesos que calculo en respaldos de funciones de backprop a sus respectivas variables de pesos / sesgos?
Realmente aprecio que alguien pueda ayudarme con esto y estas pocas preguntas más que he mencionado a continuación.
Pocas preguntas más :
1) ¿Cómo agregar una capa de abandono y normalización de lote en esta implementación personalizada? ( es decir, hacer que funcione tanto para el tren como para el tiempo de prueba)
2) ¿Cómo puedo usar las devoluciones de llamada en este código? es decir (haciendo uso de las devoluciones de llamada EarlyStopping y ModelCheckpoint)
3) ¿Hay algo más en mi código a continuación que pueda optimizar aún más en este código, como quizás hacer uso de tensorflow 2.x @ tf.function decorator, etc.)
4) También requeriría extraer los pesos finales que obtengo para trazar y verificar sus distribuciones. Para investigar problemas como la desaparición de gradiente o la explosión. (Por ejemplo: Quizás Tensorboard)
5) También quiero ayuda para escribir este código de una manera más generalizada para poder implementar fácilmente otras redes como ConvNets (es decir, Conv, MaxPool, etc.) basadas en este código fácilmente.
Aquí está mi código completo para una fácil reproducibilidad :
Nota: Sé que puedo usar API de alto nivel como Keras para construir el modelo mucho más fácilmente, pero ese no es mi objetivo aquí. Por favor entiende.
import numpy as np
import os
import logging
logging.getLogger(''tensorflow'').setLevel(logging.ERROR)
import tensorflow as tf
import tensorflow_datasets as tfds
(x_train, y_train), (x_test, y_test) = tfds.load(''mnist'', split=[''train'', ''test''],
batch_size=-1, as_supervised=True)
# reshaping
x_train = tf.reshape(x_train, shape=(x_train.shape[0], 784))
x_test = tf.reshape(x_test, shape=(x_test.shape[0], 784))
ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# rescaling
ds_train = ds_train.map(lambda x, y: (tf.cast(x, tf.float32)/255.0, y))
class Model(object):
def __init__(self, hidden1_size, hidden2_size, device=None):
# layer sizes along with input and output
self.input_size, self.output_size, self.device = 784, 10, device
self.hidden1_size, self.hidden2_size = hidden1_size, hidden2_size
self.lr_rate = 1e-03
# weights initializationg
self.glorot_init = tf.initializers.glorot_uniform(seed=42)
# weights b/w input to hidden1 --> 1
self.w_h1 = tf.Variable(self.glorot_init((self.input_size, self.hidden1_size)))
# weights b/w hidden1 to hidden2 ---> 2
self.w_h2 = tf.Variable(self.glorot_init((self.hidden1_size, self.hidden2_size)))
# weights b/w hidden2 to output ---> 3
self.w_out = tf.Variable(self.glorot_init((self.hidden2_size, self.output_size)))
# bias initialization
self.b1 = tf.Variable(self.glorot_init((self.hidden1_size,)))
self.b2 = tf.Variable(self.glorot_init((self.hidden2_size,)))
self.b_out = tf.Variable(self.glorot_init((self.output_size,)))
self.variables = [self.w_h1, self.b1, self.w_h2, self.b2, self.w_out, self.b_out]
def feed_forward(self, x):
if self.device is not None:
with tf.device(''gpu:0'' if self.device==''gpu'' else ''cpu''):
# layer1
self.layer1 = tf.nn.sigmoid(tf.add(tf.matmul(x, self.w_h1), self.b1))
# layer2
self.layer2 = tf.nn.sigmoid(tf.add(tf.matmul(self.layer1,
self.w_h2), self.b2))
# output layer
self.output = tf.nn.softmax(tf.add(tf.matmul(self.layer2,
self.w_out), self.b_out))
return self.output
def loss_fn(self, y_pred, y_true):
self.loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y_true,
logits=y_pred)
return tf.reduce_mean(self.loss)
def acc_fn(self, y_pred, y_true):
y_pred = tf.cast(tf.argmax(y_pred, axis=1), tf.int32)
y_true = tf.cast(y_true, tf.int32)
predictions = tf.cast(tf.equal(y_true, y_pred), tf.float32)
return tf.reduce_mean(predictions)
def backward_prop(self, batch_xs, batch_ys):
optimizer = tf.keras.optimizers.Adam(learning_rate=self.lr_rate)
with tf.GradientTape() as tape:
predicted = self.feed_forward(batch_xs)
step_loss = self.loss_fn(predicted, batch_ys)
grads = tape.gradient(step_loss, self.variables)
optimizer.apply_gradients(zip(grads, self.variables))
n_shape = x_train.shape[0]
epochs = 20
batch_size = 128
ds_train = ds_train.repeat().shuffle(n_shape).batch(batch_size).prefetch(batch_size)
neural_net = Model(512, 256, ''gpu'')
for epoch in range(epochs):
no_steps = n_shape//batch_size
avg_loss = 0.
avg_acc = 0.
for (batch_xs, batch_ys) in ds_train.take(no_steps):
preds = neural_net.feed_forward(batch_xs)
avg_loss += float(neural_net.loss_fn(preds, batch_ys)/no_steps)
avg_acc += float(neural_net.acc_fn(preds, batch_ys) /no_steps)
neural_net.backward_prop(batch_xs, batch_ys)
print(f''Epoch: {epoch}, Training Loss: {avg_loss}, Training ACC: {avg_acc}'')
# output for 10 epochs:
Epoch: 0, Training Loss: 1.7005115111824125, Training ACC: 0.7603832868262543
Epoch: 1, Training Loss: 1.6052448933478445, Training ACC: 0.8524806404020637
Epoch: 2, Training Loss: 1.5905528008006513, Training ACC: 0.8664196092868224
Epoch: 3, Training Loss: 1.584107405738905, Training ACC: 0.8727630912326276
Epoch: 4, Training Loss: 1.5792385798413306, Training ACC: 0.8773203844903037
Epoch: 5, Training Loss: 1.5759121985174716, Training ACC: 0.8804754322627559
Epoch: 6, Training Loss: 1.5739163148682564, Training ACC: 0.8826455712551251
Epoch: 7, Training Loss: 1.5722616605926305, Training ACC: 0.8840812018606812
Epoch: 8, Training Loss: 1.569699136307463, Training ACC: 0.8867688354803249
Epoch: 9, Training Loss: 1.5679460542742163, Training ACC: 0.8885049475356936
Además, si hay algo que podría mejorar en el código, házmelo saber también.
Adopta la API de alto nivel para algo como esto. Puede hacerlo en solo unas pocas líneas de código y es mucho más fácil depurar, leer y razonar sobre:
(x_train, y_train), (x_test, y_test) = tfds.load(''mnist'', split=[''train'', ''test''],
batch_size=-1, as_supervised=True)
x_train = tf.cast(tf.reshape(x_train, shape=(x_train.shape[0], 784)), tf.float32)
x_test = tf.cast(tf.reshape(x_test, shape=(x_test.shape[0], 784)), tf.float32)
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(512, activation=''sigmoid''),
tf.keras.layers.Dense(256, activation=''sigmoid''),
tf.keras.layers.Dense(10, activation=''softmax'')
])
model.fit(x_train, y_train, epochs=5)
model.evaluate(x_test, y_test)
Traté de escribir una implementación personalizada de la red neuronal básica con dos capas ocultas en el conjunto de datos MNIST usando tensorflow 2.0 beta, pero no estoy seguro de lo que salió mal aquí, pero mi pérdida de entrenamiento y precisión parecen quedarse en 1.5 y alrededor de 85 respectivamente.
¿Dónde está la parte de entrenamiento?
La capacitación de los modelos TF 2.0, ya sea la sintaxis de Keras o la
ejecución Eager
con
tf.GradientTape()
.
¿Puedes pegar el código con capas conv y densas, y cómo lo entrenaste?
Otras preguntas:
1) ¿Cómo agregar una capa de abandono en esta implementación personalizada? es decir (hacer que funcione tanto para el tren como para el tiempo de prueba)
Puede agregar una capa Dropout () con:
from tensorflow.keras.layers import Dropout
Y luego lo inserta en un modelo Sequential () solo con:
Dropout(dprob) # where dprob = dropout probability
2) ¿Cómo agregar Batch Normalization en este código?
Igual que antes, con:
from tensorflow.keras.layers import BatchNormalization
La elección de dónde poner batchnorm en el modelo, bueno, eso depende de usted. No hay una regla general, le sugiero que haga experimentos. Con ML siempre es un proceso de prueba y error.
3) ¿Cómo puedo usar las devoluciones de llamada en este código? es decir (haciendo uso de las devoluciones de llamada EarlyStopping y ModelCheckpoint)
Si está entrenando usando la sintaxis de Keras, simplemente puede usar eso. Consulte este tutorial muy completo sobre cómo usarlo. Solo toma unas pocas líneas de código. Si está ejecutando un modelo en ejecución Eager , debe implementar estas técnicas usted mismo, con su propio código. Es más complejo, pero también te da más libertad en la implementación.
4) ¿Hay algo más en el código que pueda optimizar aún más en este código? es decir (haciendo uso de tensorflow 2.x @ tf.function decorator, etc.)
Depende.
Si está utilizando la sintaxis de Keras, no creo que necesite agregarle más.
En caso de que esté entrenando el modelo en la ejecución Eager, le sugiero que use el decorador
@tf.function
en alguna función para acelerar un poco.
Puede ver un ejemplo práctico de TF 2.0 sobre cómo usar el decorador en
este cuaderno
.
Aparte de esto, te sugiero que juegues con técnicas de regularización como inicializaciones de pesas, pérdida de L1-L2, etc.
5) También necesito una forma de extraer todos mis pesos finales para todas las capas después del entrenamiento para poder trazarlos y verificar sus distribuciones. Para comprobar problemas como la desaparición de gradiente o la explosión.
Una vez que el modelo está entrenado, puede extraer sus pesos con:
weights = model.get_weights()
o:
weights = model.trainable_weights
Si quieres mantener solo entrenables.
6) También quiero ayuda para escribir este código de una manera más generalizada para poder implementar fácilmente otras redes como redes convolucionales (es decir, Conv, MaxPool, etc.) basadas en este código fácilmente.
Puede empaquetar todo su código en una función, entonces. Al final de este cuaderno hice algo como esto (es para un NN de retroalimentación, que es mucho más simple, pero eso es un comienzo y puede cambiar el código según sus necesidades).
---
ACTUALIZACIÓN :
Verifique mi implementación de TensorFlow 2.0 de un clasificador CNN . Esto podría ser una pista útil: está entrenado en el conjunto de datos Fashion MNIST , lo que lo hace muy similar a su tarea.
Me pregunté por dónde empezar con su multiquestion, y decidí hacerlo con una declaración:
Su código definitivamente no debería verse así y no está cerca de las mejores prácticas actuales de Tensorflow .
Lo sentimos, pero depurarlo paso a paso es una pérdida de tiempo para todos y no nos beneficiaría a ninguno de nosotros.
Ahora, pasando al tercer punto:
3) ¿Hay algo más en mi código a continuación que pueda optimizar aún más en este código, como quizás hacer uso de tensorflow 2.x @ tf.function decorator, etc.)
Sí, puede usar
tensorflow2.0
funcionalidades de
tensorflow2.0
y parece que está huyendo de ellas (el decorador de
tf.function
no sirve de nada aquí, déjelo por el momento).
Seguir nuevas pautas aliviará sus problemas con su quinto punto también, a saber:
5) También quiero ayuda para escribir este código de una manera más generalizada para poder implementar fácilmente otras redes como ConvNets (es decir, Conv, MaxPool, etc.) basadas en este código fácilmente.
ya que está diseñado específicamente para eso. Después de una pequeña introducción, intentaré presentarle esos conceptos en unos pocos pasos:
1. Divide tu programa en partes lógicas
Tensorflow hizo mucho daño cuando se trata de la legibilidad del código;
todo en
tf1.x
generalmente se
tf1.x
en un lugar, globales seguido por la definición de la función seguida por otros globales o tal vez cargando datos, todo en un lío.
Realmente no es culpa de los desarrolladores ya que el diseño del sistema alentó esas acciones.
Ahora, en
tf2.0
se alienta al programador a dividir su trabajo de manera similar a la estructura que se puede ver en
pytorch
,
chainer
y otros marcos más fáciles de usar.
1.1 Carga de datos
Estaba en buen camino con los conjuntos de datos de Tensorflow, pero rechazó sin razón aparente.
Aquí está su código con comentarios sobre lo que está sucediendo:
# You already have tf.data.Dataset objects after load
(x_train, y_train), (x_test, y_test) = tfds.load(''mnist'', split=[''train'', ''test''],
batch_size=-1, as_supervised=True)
# But you are reshaping them in a strange manner...
x_train = tf.reshape(x_train, shape=(x_train.shape[0], 784))
x_test = tf.reshape(x_test, shape=(x_test.shape[0], 784))
# And building from slices...
ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Unreadable rescaling (there are built-ins for that)
Puede generalizar fácilmente esta idea
para cualquier conjunto de datos
, colocarlo en un módulo separado, por ejemplo,
datasets.py
:
import tensorflow as tf
import tensorflow_datasets as tfds
class ImageDatasetCreator:
@classmethod
# More portable and readable than dividing by 255
def _convert_image_dtype(cls, dataset):
return dataset.map(
lambda image, label: (
tf.image.convert_image_dtype(image, tf.float32),
label,
)
)
def __init__(self, name: str, batch: int, cache: bool = True, split=None):
# Load dataset, every dataset has default train, test split
dataset = tfds.load(name, as_supervised=True, split=split)
# Convert to float range
try:
self.train = ImageDatasetCreator._convert_image_dtype(dataset["train"])
self.test = ImageDatasetCreator._convert_image_dtype(dataset["test"])
except KeyError as exception:
raise ValueError(
f"Dataset {name} does not have train and test, write your own custom dataset handler."
) from exception
if cache:
self.train = self.train.cache() # speed things up considerably
self.test = self.test.cache()
self.batch: int = batch
def get_train(self):
return self.train.shuffle().batch(self.batch).repeat()
def get_test(self):
return self.test.batch(self.batch).repeat()
Entonces ahora puede cargar más que
mnist
usando un comando simple:
from datasets import ImageDatasetCreator
if __name__ == "__main__":
dataloader = ImageDatasetCreator("mnist", batch=64, cache = True)
train, test = dataloader.get_train(), dataloader.get_test()
Y a partir de ahora puede usar cualquier nombre que no sea
mnist
que desea cargar conjuntos de datos.
Por favor, deja de hacer que todo el aprendizaje profundo esté relacionado con scripts de transferencia, tú también eres un programador .
1.2 Creación de modelo
Desde
tf2.0
hay dos formas recomendadas de proceder según la complejidad de los modelos:
-
tensorflow.keras.models.Sequential
: esta manera fue mostrada por @Stewart_R , no es necesario reiterar sus puntos. Se usa para los modelos más simples (debe usar este con su feedforward). -
Heredar
tensorflow.keras.Model
y escribir modelo personalizado. Este debe usarse cuando tiene algún tipo de lógica dentro de su módulo o es más complicado (cosas como ResNets, redes de múltiples rutas, etc.). En definitiva, más legible y personalizable.
Su clase de
Model
intentó parecerse a algo así, pero se fue al sur nuevamente;
backprop
definitivamente no es parte del modelo en sí, tampoco lo es la
loss
o la
accuracy
,
sepárelos en otro módulo o función, ¡definitivamente no es un miembro!
Dicho esto, codifiquemos la red utilizando el segundo enfoque (debe colocar este código en
model.py
por brevedad).
Antes de eso, codificaré la capa de
YourDense
desde cero heredando de
tf.keras.Layers
(esta podría ir al módulo
layers.py
):
import tensorflow as tf
class YourDense(tf.keras.layers.Layer):
def __init__(self, units):
# It''s Python 3, you don''t have to specify super parents explicitly
super().__init__()
self.units = units
# Use build to create variables, as shape can be inferred from previous layers
# If you were to create layers in __init__, one would have to provide input_shape
# (same as it occurs in PyTorch for example)
def build(self, input_shape):
# You could use different initializers here as well
self.kernel = self.add_weight(
shape=(input_shape[-1], self.units),
initializer="random_normal",
trainable=True,
)
# You could define bias in __init__ as well as it''s not input dependent
self.bias = self.add_weight(shape=(self.units,), initializer="random_normal")
# Oh, trainable=True is default
def call(self, inputs):
# Use overloaded operators instead of tf.add, better readability
return tf.matmul(inputs, self.kernel) + self.bias
En cuanto a su
1) ¿Cómo agregar una capa de abandono y normalización de lote en esta implementación personalizada? (es decir, hacer que funcione tanto para el tren como para el tiempo de prueba)
Supongo que le gustaría crear una implementación personalizada de esas capas.
De lo contrario, puede importar
from tensorflow.keras.layers import Dropout
y usarlo en cualquier lugar que desee como señaló
@Leevo
.
Abandono invertido con diferente comportamiento durante el
train
y
test
continuación:
class CustomDropout(layers.Layer):
def __init__(self, rate, **kwargs):
super().__init__(**kwargs)
self.rate = rate
def call(self, inputs, training=None):
if training:
# You could simply create binary mask and multiply here
return tf.nn.dropout(inputs, rate=self.rate)
# You would need to multiply by dropout rate if you were to do that
return inputs
Capas tomadas de aquí y modificadas para adaptarse mejor al propósito de exhibición.
Ahora puede crear su modelo finalmente (doble feedforward simple):
import tensorflow as tf
from layers import YourDense
class Model(tf.keras.Model):
def __init__(self):
super().__init__()
# Use Sequential here for readability
self.network = tf.keras.Sequential(
[YourDense(100), tf.keras.layers.ReLU(), YourDense(10)]
)
def call(self, inputs):
# You can use non-parametric layers inside call as well
flattened = tf.keras.layers.Flatten()(inputs)
return self.network(flattened)
Por lo general, debe usar las incorporaciones tanto como sea posible en implementaciones generales.
Esta estructura es bastante extensible, por lo que se debe generalizar a redes convolucionales, senets, senets, lo que se debe hacer a través de este módulo . Puedes leer más sobre esto aquí .
Creo que cumple tu quinto punto:
5) También quiero ayuda para escribir este código de una manera más generalizada para poder implementar fácilmente otras redes como ConvNets (es decir, Conv, MaxPool, etc.) basadas en este código fácilmente.
Lo último, puede que tenga que usar
model.build(shape)
para construir el gráfico de su modelo.
model.build((None, 28, 28, 1))
Esto sería para la forma de entrada 28x28x1 de
28x28x1
, donde
None
representa lote.
1.3 Entrenamiento
Una vez más, la capacitación se puede realizar de dos maneras distintas:
-
Keras
model.fit(dataset)
estándar : útil en tareas simples como la clasificación -
tf.GradientTape
: esquemas de entrenamiento más complicados, el ejemplo más destacado sería Redes Adversarias Generativas , donde dos modelos optimizan los objetivos ortogonales jugando el juego minmax
Como señaló @Leevo una vez más, si va a usar la segunda forma, no podrá simplemente usar las devoluciones de llamada proporcionadas por Keras, por lo tanto, le aconsejo que se quede con la primera opción siempre que sea posible.
En teoría, podría llamar a las funciones de devolución de llamada manualmente como
on_batch_begin()
y otras cuando sea necesario, pero sería engorroso y no estoy seguro de cómo funcionaría.
Cuando se trata de la primera opción, puede usar objetos
tf.data.Dataset
directamente con fit.
Aquí se presenta dentro de otro módulo (preferiblemente
train.py
):
def train(
model: tf.keras.Model,
path: str,
train: tf.data.Dataset,
epochs: int,
steps_per_epoch: int,
validation: tf.data.Dataset,
steps_per_validation: int,
stopping_epochs: int,
optimizer=tf.optimizers.Adam(),
):
model.compile(
optimizer=optimizer,
# I used logits as output from the last layer, hence this
loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.metrics.SparseCategoricalAccuracy()],
)
model.fit(
train,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
validation_data=validation,
validation_steps=steps_per_validation,
callbacks=[
# Tensorboard logging
tf.keras.callbacks.TensorBoard(
pathlib.Path("logs")
/ pathlib.Path(datetime.datetime.now().strftime("%Y%m%d-%H%M%S")),
histogram_freq=1,
),
# Early stopping with best weights preserving
tf.keras.callbacks.EarlyStopping(
monitor="val_sparse_categorical_accuracy",
patience=stopping_epochs,
restore_best_weights=True,
),
],
)
model.save(path)
Un enfoque más complicado es muy similar (casi copiar y pegar) a los bucles de entrenamiento de
PyTorch
, por lo que si está familiarizado con ellos, no deberían representar un gran problema.
Puede encontrar ejemplos en
tf2.0
documentos de
tf2.0
, por ejemplo,
aquí
o
here
.
2. Otras cosas
2.1 Preguntas sin respuesta
4) ¿Hay algo más en el código que pueda optimizar aún más en este código? es decir (haciendo uso de tensorflow 2.x @ tf.function decorator, etc.)
Lo anterior ya transforma el modelo en gráficos, por lo tanto, no creo que se beneficie de llamarlo en este caso. Y la optimización prematura es la raíz de todo mal, recuerda medir tu código antes de hacerlo.
Ganaría mucho más con el almacenamiento en caché adecuado de los datos (como se describe al comienzo del n. ° 1.1) y una buena canalización en lugar de esos.
5) También necesito una forma de extraer todos mis pesos finales para todas las capas después del entrenamiento para poder trazarlos y verificar sus distribuciones. Para comprobar problemas como la desaparición de gradiente o la explosión.
Como señaló anteriormente @Leevo ,
weights = model.get_weights()
Te conseguiría los pesos.
Puede transformarlos en
np.array
y trazar usando
seaborn
,
matplotlib
, analizar, verificar o cualquier otra cosa que desee.
2.2 Poniéndolo por completo
Con todo, su
main.py
(o punto de entrada o algo similar) consistiría en esto (más o menos):
from dataset import ImageDatasetCreator
from model import Model
from train import train
# You could use argparse for things like batch, epochs etc.
if __name__ == "__main__":
dataloader = ImageDatasetCreator("mnist", batch=64, cache=True)
train, test = dataloader.get_train(), dataloader.get_test()
model = Model()
model.build((None, 28, 28, 1))
train(
model, train, path epochs, test, len(train) // batch, len(test) // batch, ...
) # provide necessary arguments appropriately
# Do whatever you want with those
weights = model.get_weights()
Ah, recuerde que las funciones anteriores no son para pegar copias y deben tratarse más como una guía. Contáctame si tienes alguna pregunta.
3. Preguntas de comentarios
3.1 Cómo inicializar capas personalizadas e integradas
3.1.1 TLDR lo que estás a punto de leer
- Función de inicialización de Poisson personalizada, pero requiere tres argumentos
-
tf.keras.initalization
API necesita dos argumentos (ver el último punto en sus documentos ), por lo tanto, uno se especifica a través delambda
de Python dentro de la capa personalizada que hemos escrito antes - Se agrega sesgo opcional para la capa, que se puede desactivar con boolean
¿Por qué es tan inútilmente complicado?
Para mostrar que en
tf2.0
finalmente puede usar la funcionalidad de Python
, no más problemas de gráficos,
if
lugar de
tf.cond
etc.
3.1.2 De TLDR a la implementación
Los inicializadores Keras se pueden encontrar aquí y el sabor de Tensorflow here .
Tenga en cuenta las inconsistencias de la API (letras mayúsculas como clases, letras pequeñas con subrayado como funciones), especialmente en
tf2.0
, pero eso no
tf2.0
al caso.
Puede usarlos pasando una cadena (como se hace en
YourDense
arriba) o durante la creación de objetos.
Para permitir la inicialización personalizada en sus capas personalizadas, simplemente puede agregar argumentos adicionales al constructor (la clase
tf.keras.Model
sigue siendo la clase Python y su
__init__
debe usarse igual que la de Python).
Antes de eso, te mostraré cómo crear una inicialización personalizada:
# Poisson custom initialization because why not.
def my_dumb_init(shape, lam, dtype=None):
return tf.squeeze(tf.random.poisson(shape, lam, dtype=dtype))
Tenga en cuenta que su firma toma tres argumentos, mientras que solo debe tomar
(shape, dtype)
.
Aún así, uno puede "arreglar" esto fácilmente mientras crea su propia capa, como la siguiente (
YourLinear
extendida):
import typing
import tensorflow as tf
class YourDense(tf.keras.layers.Layer):
# It''s still Python, use it as Python, that''s the point of tf.2.0
@classmethod
def register_initialization(cls, initializer):
# Set defaults if init not provided by user
if initializer is None:
# let''s make the signature proper for init in tf.keras
return lambda shape, dtype: my_dumb_init(shape, 1, dtype)
return initializer
def __init__(
self,
units: int,
bias: bool = True,
# can be string or callable, some typing info added as well...
kernel_initializer: typing.Union[str, typing.Callable] = None,
bias_initializer: typing.Union[str, typing.Callable] = None,
):
super().__init__()
self.units: int = units
self.kernel_initializer = YourDense.register_initialization(kernel_initializer)
if bias:
self.bias_initializer = YourDense.register_initialization(bias_initializer)
else:
self.bias_initializer = None
def build(self, input_shape):
# Simply pass your init here
self.kernel = self.add_weight(
shape=(input_shape[-1], self.units),
initializer=self.kernel_initializer,
trainable=True,
)
if self.bias_initializer is not None:
self.bias = self.add_weight(
shape=(self.units,), initializer=self.bias_initializer
)
else:
self.bias = None
def call(self, inputs):
weights = tf.matmul(inputs, self.kernel)
if self.bias is not None:
return weights + self.bias
He agregado
my_dumb_initialization
como predeterminado (si el usuario no proporciona uno) e hice el sesgo opcional con el argumento de
bias
.
Tenga en cuenta que puede usarlo libremente siempre que no dependa de los datos.
Si es (o depende de
tf.Tensor
alguna manera), uno tiene que usar el decorador
@tf.function
que cambia el flujo de Python a su contraparte de
tensorflow
(por ejemplo,
if
es
tf.cond
).
Vea here para más información sobre autógrafos, es muy fácil de seguir.
Si desea incorporar los cambios de inicializador anteriores en su modelo, debe crear el objeto apropiado y listo.
... # Previous of code Model here
self.network = tf.keras.Sequential(
[
YourDense(100, bias=False, kernel_initializer="lecun_uniform"),
tf.keras.layers.ReLU(),
YourDense(10, bias_initializer=tf.initializers.Ones()),
]
)
... # and the same afterwards
Con las capas
tf.keras.layers.Dense
, uno puede hacer lo mismo (los nombres de los argumentos difieren, pero la idea se mantiene).
3.2 Diferenciación automática usando
tf.GradientTape
3.2.1 Introducción
El punto de
tf.GradientTape
es permitir a los usuarios el flujo de control de Python normal y el cálculo de gradiente de variables con respecto a otra variable.
Ejemplo tomado de here pero dividido en piezas separadas:
def f(x, y):
output = 1.0
for i in range(y):
if i > 1 and i < 5:
output = tf.multiply(output, x)
return output
Función Python regular con declaraciones de control de flujo
for
y
if
def grad(x, y):
with tf.GradientTape() as t:
t.watch(x)
out = f(x, y)
return t.gradient(out, x)
Con la cinta de degradado puede grabar todas las operaciones en los
Tensors
(y también sus estados intermedios) y "reproducir" al revés (realizar una diferenciación automática hacia atrás utilizando la regla de chaing).
Cada
Tensor
dentro del
tf.GradientTape()
contexto
tf.GradientTape()
se graba automáticamente.
Si algún Tensor está fuera de alcance, use el método
watch()
como se puede ver arriba.
Finalmente, gradiente de
output
con respecto a
x
(se devuelve la entrada).
3.2.2 Conexión con el aprendizaje profundo
Lo que se describió anteriormente es el algoritmo de
backpropagation
.
Los gradientes wrt (con respecto a) las salidas se calculan para cada nodo en la red (o más bien para cada capa).
Esos gradientes luego son utilizados por varios optimizadores para hacer correcciones y así se repite.
Continuemos y supongamos que tiene su
tf.keras.Model
, instancia del optimizador,
tf.data.Dataset
y la función de pérdida ya configurada.
Uno puede definir una clase de
Trainer
que nos capacitará.
Lea los comentarios en el código si tiene dudas
:
class Trainer:
def __init__(self, model, optimizer, loss_function):
self.model = model
self.loss_function = loss_function
self.optimizer = optimizer
# You could pass custom metrics in constructor
# and adjust train_step and test_step accordingly
self.train_loss = tf.keras.metrics.Mean(name="train_loss")
self.test_loss = tf.keras.metrics.Mean(name="train_loss")
def train_step(self, x, y):
# Setup tape
with tf.GradientTape() as tape:
# Get current predictions of network
y_pred = self.model(x)
# Calculate loss generated by predictions
loss = self.loss_function(y, y_pred)
# Get gradients of loss w.r.t. EVERY trainable variable (iterable returned)
gradients = tape.gradient(loss, self.model.trainable_variables)
# Change trainable variable values according to gradient by applying optimizer policy
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
# Record loss of current step
self.train_loss(loss)
def train(self, dataset):
# For N epochs iterate over dataset and perform train steps each time
for x, y in dataset:
self.train_step(x, y)
def test_step(self, x, y):
# Record test loss separately
self.test_loss(self.loss_function(y, self.model(x)))
def test(self, dataset):
# Iterate over whole dataset
for x, y in dataset:
self.test_step(x, y)
def __str__(self):
# You need Python 3.7 with f-string support
# Just return metrics
return f"Loss: {self.train_loss.result()}, Test Loss: {self.test_loss.result()}"
Ahora, podría usar esta clase en su código realmente así:
EPOCHS = 5
# model, optimizer, loss defined beforehand
trainer = Trainer(model, optimizer, loss)
for _ in range(EPOCHS):
trainer.train(train_dataset) # Same for training and test datasets
trainer.test(test_dataset)
print(f"Epoch {epoch}: {trainer})")
La impresión le indicaría la pérdida de entrenamiento y prueba para cada época. Puede combinar el entrenamiento y las pruebas de la forma que desee (por ejemplo, 5 épocas para el entrenamiento y 1 prueba), puede agregar diferentes métricas, etc.
Vea here si desea un enfoque no orientado a OOP (IMO menos legible, pero para cada uno es propio).