guide - Tensorflow: carga de datos personalizada+cálculo asincrónico
tensorflow playground (0)
Así es como creo que se echa de menos de los ejemplos de TF.
Tarea:
- las muestras para cada clase se dan en un directorio separado y, por lo tanto, las etiquetas son indirectas (es decir, por dir)
- carga desacoplada y cálculos en TF
Cada bit separado podría encontrarse, sin embargo, creo que tenerlos a todos juntos en un solo lugar ayudará a ahorrar mucho tiempo para los principiantes de TF (como yo).
Vamos a abordar 1. en mi caso se trata de dos conjuntos de imágenes:
# all filenames for .jpg in dir
# - list of fnames
# - list of labels
def path_fnames(f_path, label, ext = [''.jpg'', ''.jpeg'']):
f_n = [f_path+''/''+f for f in sorted(os.listdir(f_path)) if os.path.splitext(f)[1].lower() in ext]
f_l = [label] * len(f_n)
return f_n, f_l
#
def dense_to_one_hot(labels_dense, num_classes=10, dtype=np.float32):
"""Convert class labels from scalars to one-hot vectors."""
num_labels = labels_dense.shape[0]
index_offset = np.arange(num_labels) * num_classes
labels_one_hot = np.zeros((num_labels, num_classes),dtype=dtype)
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
data_dir = ''/mnt/dataset/''
dir_1 = ''/class_1''
dir_2 = ''/class_2''
# --- get filenames for data ---
dpath = [data_dir+dir_1, data_dir+dir_2]
f_n1, f_l1 = path_fnames(dpath[0], 0)
f_n2, f_l2 = path_fnames(dpath[1], 1)
# --- create one-hot labels ---
ohl = dense_to_one_hot(np.asarray(f_l1+f_l2), num_classes=2, dtype = np.float32)
fnames = f_n1+f_n2; # one-hot labels created in this sequence
Ahora tenemos precargados todos los nombres de archivos y etiquetas únicas.
Vamos a pasar al 2.
Se basa en Cómo recapturar datos utilizando una función de pitón personalizada en tensorflow . En resumen, tiene:
- lector de imágenes personalizado (reemplace con el suyo)
- queue fnl_q con [label de nombre de archivo] que el lector utiliza para alimentar
- queue proc_q con [sample label] que se usa para alimentar el procesamiento some_op
- thread que realiza read_op para obtener [sample label] y enqueue_op para poner pair en proc_q . El hilo está controlado por tf.Coordinator
- some_op que primero obtiene datos de proc_q por dequeue_many () y el resto del cálculo (también se puede poner en un hilo separado).
Notas:
- feature_read_op y label_read_op son dos operaciones independientes.
- Uso sleep () para disminuir la velocidad y controlar op - solo para fines de prueba
- he separado partes de "alimentación" y "cálculo"; en el caso real, simplemente ejecútelas en paralelo
print ''TF version:'', tf.__version__
# --- params ----
im_s = [30, 30, 1] # target image size
BATCH_SIZE = 16
# image reader
# - fnl_queue: queue with [fn l] pairs
# Notes
# - to resize: image_tensor = tf.image.resize_image_with_crop_or_pad(image_tensor, HEIGHT, WIDTH)
# - how about image preprocessing?
def img_reader_jpg(fnl_queue, ch = 3, keep = False):
fn, label = fnl_queue.dequeue()
if keep:
fnl_queue.enqueue([fn, label])
img_bytes = tf.read_file(fn)
img_u8 = tf.image.decode_jpeg(img_bytes, channels=ch)
img_f32 = tf.cast(img_u8, tf.float32)/256.0
#img_4 = tf.expand_dims(img_f32,0)
return img_f32, label
# load [feature, label] and enqueue to processing queue
# - sess: tf session
# - sess: tf Coordinator
# - [fr_op, lr_op ]: feature_read_op label_read_op
# - enqueue_op: [f l] pairs enqueue op
def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
i = 0
while not coord.should_stop():
# for testing purpose
time.sleep(0.1)
#print ''load_and_enqueue i='',i
#i = i +1
feature, label = sess.run([feature_read_op, label_read_op ])
feed_dict = {feature_input: feature,
label_input : label}
sess.run(enqueue_op, feed_dict=feed_dict)
# --- TF part ---
# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)
#fnl_q = tf.FIFOQueue(len(fnames), [tf.string, tf.float32])
fnl_q = tf.RandomShuffleQueue(len(fnames), 0, [tf.string, tf.float32])
do_enq = fnl_q.enqueue_many([fv, lv])
# reading_op: feature_read_op label_read_op
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = im_s[2])
# samples queue
f_s = im_s
l_s = 2
feature_input = tf.placeholder(tf.float32, shape=f_s, name=''feature_input'')
label_input = tf.placeholder(tf.float32, shape=l_s, name=''label_input'')
#proc_q = tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])
proc_q = tf.FIFOQueue(len(fnames), [tf.float32, tf.float32], shapes=[f_s, l_s])
enqueue_op = proc_q.enqueue([feature_input, label_input])
# test:
# - some op
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)
some_op = [img_batch, lab_batch]
# service ops
init_op = tf.initialize_all_variables()
# let run stuff
with tf.Session() as sess:
sess.run(init_op)
sess.run(do_enq)
print "fnl_q.size:", fnl_q.size().eval()
print "proc_q.size:", proc_q.size().eval()
# --- test thread stuff ---
# - fill proc_q
coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op))
t.start()
time.sleep(2.1)
coord.request_stop()
coord.join([t])
print "fnl_q.size:", fnl_q.size().eval()
print "proc_q.size:", proc_q.size().eval()
# - process a bit
ss = sess.run(some_op)
print ''ss[0].shape'', ss[0].shape
print '' ss[1]:/n'', ss[1]
print "fnl_q.size:", fnl_q.size().eval()
print "proc_q.size:", proc_q.size().eval()
print ''ok''
Salida típica:
TF version: 0.6.0
fnl_q.size: 1225
proc_q.size: 0
fnl_q.size: 1204
proc_q.size: 21
ss[0].shape (16, 30, 30, 1)
ss[1]:
[[ 0. 1.]
[ 1. 0.]
[ 1. 0.]
[ 0. 1.]
[ 0. 1.]
[ 1. 0.]
[ 1. 0.]
[ 0. 1.]
[ 1. 0.]
[ 0. 1.]
[ 0. 1.]
[ 1. 0.]
[ 1. 0.]
[ 0. 1.]
[ 1. 0.]
[ 0. 1.]]
fnl_q.size: 1204
proc_q.size: 5
ok
Todo como se esperaba
- lote de pares [etiqueta de muestra] se crean
- los pares se barajan
Lo único que queda es aplicar TF, ya que está destinado a ser utilizado reemplazando some_op :)
Y una pregunta: un problema de problema observado - en caso de que use tf.FIFOQueue
para nombres de archivo y tf.RandomShuffleQueue
para muestras - no se produzca un cambio de orden. Sin embargo, de otra manera (como se describe arriba) se mezcla perfectamente.
Cualquier problema con barajar para
tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])
?
ADD: la versión con dos hilos:
- uno para rellenar / actualizar / cambiar la cola de nombre de archivo
- segundo para llenar las muestras para procesar la cola.
También se agregó la forma correcta de detener los hilos.
def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
try:
while not coord.should_stop():
feature, label = sess.run([feature_read_op, label_read_op ])
feed_dict = {feature_input: feature,
label_input : label}
sess.run(enqueue_op, feed_dict=feed_dict)
except Exception as e:
return
# periodically check the state of fnl queue and if needed refill it
# - enqueue_op: ''refill'' file-name_label queue
def enqueue_fnl(sess, coord, fnl_q, enqueue_op):
try:
while not coord.should_stop():
time.sleep(0.5)
s = sess.run(fnl_q.size())
if s < (9*BATCH_SIZE) :
sess.run(enqueue_op)
except Exception as e:
return
# -- ops for feed part --
# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)
# read op
fnl_q = tf.RandomShuffleQueue(len(fnames)*2, 0, [tf.string, tf.float32], name = ''fnl_q'') # add some margin for re-fill to fit
do_fnl_enq = fnl_q.enqueue_many([fv, lv])
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = IMG_SIZE[2])
# samples queue
feature_input = tf.placeholder(tf.float32, shape=IMG_SIZE, name=''feature_input'')
label_input = tf.placeholder(tf.float32, shape=LAB_SIZE, name=''label_input'')
proc_q = tf.FIFOQueue(len(fnames)*3, [tf.float32, tf.float32], shapes=[IMG_SIZE, LAB_SIZE], name = ''fe_la_q'')
enqueue_op = proc_q.enqueue([feature_input, label_input])
# -- ops for trainind end eval
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)
... here is your model
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits, lab_ph))
optimizer = tf.train.AdamOptimizer(1e-4).minimize(loss)
with tf.Session() as sess:
coord = tf.train.Coordinator()
t_le = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op) , name = ''load_and_enqueue'')
t_re = threading.Thread(target=enqueue_fnl, args = (sess, coord, fnl_q, do_fnl_enq), name = ''enqueue_fnl'') # re-enq thread i.e. refiling filename queue
t_le.start()
t_re.start()
try:
# training
for step in xrange(823):
# some proc
img_v, lab_v = sess.run([img_batch, lab_batch])
feed_dict = { img_ph : img_v,
lab_ph : lab_v,
keep_prob: 0.7}
_, loss_v = sess.run([optimizer, loss], feed_dict = feed_dict)
except Exception as e:
print ''Training: Exception:'', e
# stop threads
coord.request_stop() # ask to stop
sess.run(fnl_q.close(cancel_pending_enqueues=True)) # tell proc_q don''t wait for enque anymore
sess.run(proc_q.close(cancel_pending_enqueues=True)) # tell proc_q don''t wait for enque anymore
coord.join([t_le, t_re], stop_grace_period_secs=8)