redes que neuronales libreria instalar funciones español entrenar ejemplos python multithreading latency tensorflow prefetch

python - que - tensorflow libreria



Cómo recuperar datos utilizando una función de python personalizada en tensorflow (2)

En otras palabras, un hilo realiza el preprocesamiento de datos y el otro realiza el entrenamiento. ¿Es esto posible en TensorFlow?

Sí lo es. La solución de mrry funciona, pero existe más simple.

Recuperacion de datos

tf.py_func ajusta una función python y la usa como un operador TensorFlow. Entonces, podemos cargar los datos en sess.run() cada vez. El problema con este enfoque es que los datos se cargan durante sess.run() través del hilo principal.

Un ejemplo mínimo:

def get_numpy_tensor(): return np.array([[1,2],[3,4]], dtype=np.float32) tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32)

Un ejemplo más complejo:

def get_numpy_tensors(): # Load data from the disk into numpy arrays. input = np.array([[1,2],[3,4]], dtype=np.float32) target = np.int32(1) return input, target tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target sess = tf.InteractiveSession() numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target]) assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2

Obtener datos en otro hilo

Para sess.run() en cola nuestros datos en otro hilo (para que sess.run() no tenga que esperar los datos), podemos usar tf.train.batch() en nuestros operadores desde tf.py_func() .

Un ejemplo mínimo:

tensor_shape = get_numpy_tensor().shape tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape]) # Run `tf.train.start_queue_runners()` once session is created.

Podemos omitir las shapes argumento si tensorflow_tensor tiene su forma especificada:

tensor_shape = get_numpy_tensor().shape tensorflow_tensor.set_shape(tensor_shape) tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32) # Run `tf.train.start_queue_runners()` once session is created.

Un ejemplo más complejo:

input_shape, target_shape = (2, 2), () def get_numpy_tensors(): input = np.random.rand(*input_shape).astype(np.float32) target = np.random.randint(10, dtype=np.int32) print(''f'', end='''') return input, target tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) batch_size = 2 tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2) # Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`. tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets sess = tf.InteractiveSession() tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it. for _ in range(10): numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets]) assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape) print(''r'', end='''') # Prints `fffffrrffrfrffrffrffrffrffrffrf`.

En caso de que get_numpy_tensor() devuelva un lote de tensores, entonces tf.train.batch(..., enqueue_many=True) ayudará.

Estoy tratando de captar previamente los datos de entrenamiento para ocultar la latencia de E / S. Me gustaría escribir un código Python personalizado que cargue datos del disco y preproceses los datos (por ejemplo, agregando una ventana de contexto). En otras palabras, un hilo realiza el preprocesamiento de datos y el otro realiza el entrenamiento. ¿Es esto posible en TensorFlow?

Actualización: tengo un ejemplo de trabajo basado en el ejemplo de @ mrry.

import numpy as np import tensorflow as tf import threading BATCH_SIZE = 5 TRAINING_ITERS = 4100 feature_input = tf.placeholder(tf.float32, shape=[128]) label_input = tf.placeholder(tf.float32, shape=[128]) q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]]) enqueue_op = q.enqueue([label_input, feature_input]) label_batch, feature_batch = q.dequeue_many(BATCH_SIZE) c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128]) sess = tf.Session() def load_and_enqueue(sess, enqueue_op, coord): with open(''dummy_data/features.bin'') as feature_file, open(''dummy_data/labels.bin'') as label_file: while not coord.should_stop(): feature_array = np.fromfile(feature_file, np.float32, 128) if feature_array.shape[0] == 0: print(''reach end of file, reset using seek(0,0)'') feature_file.seek(0,0) label_file.seek(0,0) continue label_value = np.fromfile(label_file, np.float32, 128) sess.run(enqueue_op, feed_dict={feature_input: feature_array, label_input: label_value}) coord = tf.train.Coordinator() t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord)) t.start() for i in range(TRAINING_ITERS): sum = sess.run(c) print(''train_iter=''+str(i)) print(sum) coord.request_stop() coord.join([t])


Este es un caso de uso común, y la mayoría de las implementaciones usan las colas de TensorFlow para desacoplar el código de preprocesamiento del código de capacitación. Hay un tutorial sobre cómo usar las colas , pero los pasos principales son los siguientes:

  1. Defina una cola, q , que almacenará en búfer los datos preprocesados. TensorFlow admite el tf.FIFOQueue simple que produce elementos en el orden en que se pusieron en cola, y el tf.RandomShuffleQueue más avanzado que produce elementos en un orden aleatorio. Un elemento de cola es una tupla de uno o más tensores (que pueden tener diferentes tipos y formas). Todas las colas soportan operaciones de un solo elemento ( dequeue , dequeue ) y de lotes ( enqueue_many , dequeue_many ), pero para usar las operaciones por lotes, debe especificar las formas de cada tensor en un elemento de cola al construir la cola.

  2. Cree un subgrafo que encola los elementos preprocesados ​​en la cola. Una forma de hacerlo sería definir algunas tf.placeholder() para tensores correspondientes a un solo ejemplo de entrada, y luego pasarlas a q.enqueue() . (Si su preprocesamiento produce un lote a la vez, debe usar q.enqueue_many() lugar.) También puede incluir operaciones de TensorFlow en este subgráfico.

  3. Construye un subgrafo que realiza entrenamiento. Esto se verá como un gráfico regular de TensorFlow, pero obtendrá su entrada llamando a q.dequeue_many(BATCH_SIZE) .

  4. Comience su sesión

  5. Cree uno o más hilos que ejecuten su lógica de preprocesamiento, luego ejecute el enqueue op, introduciendo los datos preprocesados. Puede encontrar las clases de utilidad tf.train.Coordinator y tf.train.QueueRunner útiles para esto.

  6. Ejecute su gráfico de entrenamiento (optimizador, etc.) como de costumbre.

EDITAR: Aquí hay una función simple load_and_enqueue() y un fragmento de código para comenzar:

# Features are length-100 vectors of floats feature_input = tf.placeholder(tf.float32, shape=[100]) # Labels are scalar integers. label_input = tf.placeholder(tf.int32, shape=[]) # Alternatively, could do: # feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100]) # label_batch_input = tf.placeholder(tf.int32, shape=[None]) q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []]) enqueue_op = q.enqueue([feature_input, label_input]) # For batch input, do: # enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input]) feature_batch, label_batch = q.dequeue_many(BATCH_SIZE) # Build rest of model taking label_batch, feature_batch as input. # [...] train_op = ... sess = tf.Session() def load_and_enqueue(): with open(...) as feature_file, open(...) as label_file: while True: feature_array = numpy.fromfile(feature_file, numpy.float32, 100) if not feature_array: return label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0] sess.run(enqueue_op, feed_dict={feature_input: feature_array, label_input: label_value}) # Start a thread to enqueue data asynchronously, and hide I/O latency. t = threading.Thread(target=load_and_enqueue) t.start() for _ in range(TRAINING_EPOCHS): sess.run(train_op)