python - que - tensorflow libreria
Cómo recuperar datos utilizando una función de python personalizada en tensorflow (2)
En otras palabras, un hilo realiza el preprocesamiento de datos y el otro realiza el entrenamiento. ¿Es esto posible en TensorFlow?
Sí lo es. La solución de mrry funciona, pero existe más simple.
Recuperacion de datos
tf.py_func
ajusta una función python y la usa como un operador TensorFlow. Entonces, podemos cargar los datos en sess.run()
cada vez. El problema con este enfoque es que los datos se cargan durante sess.run()
través del hilo principal.
Un ejemplo mínimo:
def get_numpy_tensor():
return np.array([[1,2],[3,4]], dtype=np.float32)
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32)
Un ejemplo más complejo:
def get_numpy_tensors():
# Load data from the disk into numpy arrays.
input = np.array([[1,2],[3,4]], dtype=np.float32)
target = np.int32(1)
return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target
sess = tf.InteractiveSession()
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target])
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2
Obtener datos en otro hilo
Para sess.run()
en cola nuestros datos en otro hilo (para que sess.run()
no tenga que esperar los datos), podemos usar tf.train.batch()
en nuestros operadores desde tf.py_func()
.
Un ejemplo mínimo:
tensor_shape = get_numpy_tensor().shape
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape])
# Run `tf.train.start_queue_runners()` once session is created.
Podemos omitir las shapes
argumento si tensorflow_tensor
tiene su forma especificada:
tensor_shape = get_numpy_tensor().shape
tensorflow_tensor.set_shape(tensor_shape)
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32)
# Run `tf.train.start_queue_runners()` once session is created.
Un ejemplo más complejo:
input_shape, target_shape = (2, 2), ()
def get_numpy_tensors():
input = np.random.rand(*input_shape).astype(np.float32)
target = np.random.randint(10, dtype=np.int32)
print(''f'', end='''')
return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
batch_size = 2
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2)
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`.
tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets
sess = tf.InteractiveSession()
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it.
for _ in range(10):
numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets])
assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape)
print(''r'', end='''')
# Prints `fffffrrffrfrffrffrffrffrffrffrf`.
En caso de que get_numpy_tensor()
devuelva un lote de tensores, entonces tf.train.batch(..., enqueue_many=True)
ayudará.
Estoy tratando de captar previamente los datos de entrenamiento para ocultar la latencia de E / S. Me gustaría escribir un código Python personalizado que cargue datos del disco y preproceses los datos (por ejemplo, agregando una ventana de contexto). En otras palabras, un hilo realiza el preprocesamiento de datos y el otro realiza el entrenamiento. ¿Es esto posible en TensorFlow?
Actualización: tengo un ejemplo de trabajo basado en el ejemplo de @ mrry.
import numpy as np
import tensorflow as tf
import threading
BATCH_SIZE = 5
TRAINING_ITERS = 4100
feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])
q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])
label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])
sess = tf.Session()
def load_and_enqueue(sess, enqueue_op, coord):
with open(''dummy_data/features.bin'') as feature_file, open(''dummy_data/labels.bin'') as label_file:
while not coord.should_stop():
feature_array = np.fromfile(feature_file, np.float32, 128)
if feature_array.shape[0] == 0:
print(''reach end of file, reset using seek(0,0)'')
feature_file.seek(0,0)
label_file.seek(0,0)
continue
label_value = np.fromfile(label_file, np.float32, 128)
sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})
coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()
for i in range(TRAINING_ITERS):
sum = sess.run(c)
print(''train_iter=''+str(i))
print(sum)
coord.request_stop()
coord.join([t])
Este es un caso de uso común, y la mayoría de las implementaciones usan las colas de TensorFlow para desacoplar el código de preprocesamiento del código de capacitación. Hay un tutorial sobre cómo usar las colas , pero los pasos principales son los siguientes:
Defina una cola,
q
, que almacenará en búfer los datos preprocesados. TensorFlow admite eltf.FIFOQueue
simple que produce elementos en el orden en que se pusieron en cola, y eltf.RandomShuffleQueue
más avanzado que produce elementos en un orden aleatorio. Un elemento de cola es una tupla de uno o más tensores (que pueden tener diferentes tipos y formas). Todas las colas soportan operaciones de un solo elemento (dequeue
,dequeue
) y de lotes (enqueue_many
,dequeue_many
), pero para usar las operaciones por lotes, debe especificar las formas de cada tensor en un elemento de cola al construir la cola.Cree un subgrafo que encola los elementos preprocesados en la cola. Una forma de hacerlo sería definir algunas
tf.placeholder()
para tensores correspondientes a un solo ejemplo de entrada, y luego pasarlas aq.enqueue()
. (Si su preprocesamiento produce un lote a la vez, debe usarq.enqueue_many()
lugar.) También puede incluir operaciones de TensorFlow en este subgráfico.Construye un subgrafo que realiza entrenamiento. Esto se verá como un gráfico regular de TensorFlow, pero obtendrá su entrada llamando a
q.dequeue_many(BATCH_SIZE)
.Comience su sesión
Cree uno o más hilos que ejecuten su lógica de preprocesamiento, luego ejecute el enqueue op, introduciendo los datos preprocesados. Puede encontrar las clases de utilidad
tf.train.Coordinator
ytf.train.QueueRunner
útiles para esto.Ejecute su gráfico de entrenamiento (optimizador, etc.) como de costumbre.
EDITAR: Aquí hay una función simple load_and_enqueue()
y un fragmento de código para comenzar:
# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])
# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])
q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])
# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])
feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...
sess = tf.Session()
def load_and_enqueue():
with open(...) as feature_file, open(...) as label_file:
while True:
feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
if not feature_array:
return
label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]
sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})
# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()
for _ in range(TRAINING_EPOCHS):
sess.run(train_op)