python - ordenar - ¿Cómo*realmente*leer datos CSV en TensorFlow?
manipular archivos csv en python (4)
Soy relativamente nuevo en el mundo de TensorFlow, y estoy bastante perplejo por la forma en que realmente leerías datos CSV en un ejemplo utilizable / tensores de etiquetas en TensorFlow. El ejemplo del tutorial de TensorFlow sobre la lectura de datos CSV está bastante fragmentado y solo le ayuda a entrenar en datos CSV.
Aquí está mi código que he armado, basado en ese tutorial CSV:
from __future__ import print_function
import tensorflow as tf
def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1
filename = "csv_test_data.csv"
# setup text reader
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)
# setup CSV decoding
record_defaults = [[0],[0],[0],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults)
# turn features back into a tensor
features = tf.stack([col1,col2,col3,col4])
print("loading, " + str(file_length) + " line(s)/n")
with tf.Session() as sess:
tf.initialize_all_variables().run()
# start populating filename queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(file_length):
# retrieve a single instance
example, label = sess.run([features, col5])
print(example, label)
coord.request_stop()
coord.join(threads)
print("/ndone loading")
Y aquí hay un breve ejemplo del archivo CSV que estoy cargando: datos bastante básicos: 4 columnas de características y 1 columna de etiqueta:
0,0,0,0,0
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0
Todo lo que el código anterior hace es imprimir cada ejemplo del archivo CSV, uno por uno , que, aunque agradable, es bastante inútil para el entrenamiento.
Con lo que estoy luchando aquí es cómo convertiría esos ejemplos individuales, cargados uno por uno, en un conjunto de datos de entrenamiento. Por ejemplo, aquí hay un cuaderno en el que estaba trabajando en el curso Udacity Deep Learning. Básicamente quiero tomar los datos CSV que estoy cargando y colocarlos en algo como train_dataset y train_labels :
def reformat(dataset, labels):
dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
# Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print(''Training set'', train_dataset.shape, train_labels.shape)
print(''Validation set'', valid_dataset.shape, valid_labels.shape)
print(''Test set'', test_dataset.shape, test_labels.shape)
He intentado usar
tf.train.shuffle_batch
, como este, pero simplemente se cuelga inexplicablemente:
for i in range(file_length):
# retrieve a single instance
example, label = sess.run([features, colRelevant])
example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000)
print(example, label)
Para resumir, aquí están mis preguntas:
-
¿Qué me estoy perdiendo de este proceso?
- Parece que hay una intuición clave que me falta sobre cómo construir correctamente una tubería de entrada.
-
¿Hay alguna manera de evitar tener que conocer la longitud del archivo CSV?
-
Se siente bastante poco elegante tener que saber la cantidad de líneas que desea procesar (la línea de código
for i in range(file_length)
arriba)
-
Se siente bastante poco elegante tener que saber la cantidad de líneas que desea procesar (la línea de código
Editar: Tan pronto como Yaroslav señaló que probablemente estaba mezclando piezas imperativas y de construcción gráfica, comenzó a aclararse. Pude reunir el siguiente código, que creo que está más cerca de lo que normalmente se haría al entrenar un modelo de CSV (excluyendo cualquier código de entrenamiento de modelo):
from __future__ import print_function
import numpy as np
import tensorflow as tf
import math as math
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(''dataset'')
args = parser.parse_args()
def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1
def read_from_csv(filename_queue):
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)
record_defaults = [[0],[0],[0],[0],[0]]
colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults)
features = tf.stack([colHour,colQuarter,colAction,colUser])
label = tf.stack([colLabel])
return features, label
def input_pipeline(batch_size, num_epochs=None):
filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True)
example, label = read_from_csv(filename_queue)
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch, label_batch = tf.train.shuffle_batch(
[example, label], batch_size=batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch
file_length = file_len(args.dataset) - 1
examples, labels = input_pipeline(file_length, 1)
with tf.Session() as sess:
tf.initialize_all_variables().run()
# start populating filename queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
try:
while not coord.should_stop():
example_batch, label_batch = sess.run([examples, labels])
print(example_batch)
except tf.errors.OutOfRangeError:
print(''Done training, epoch reached'')
finally:
coord.request_stop()
coord.join(threads)
Creo que estás mezclando piezas imperativas y de construcción gráfica aquí.
La operación
tf.train.shuffle_batch
crea un nuevo nodo de cola y se puede usar un solo nodo para procesar todo el conjunto de datos.
Así que creo que estás colgando porque creaste un montón de colas
shuffle_batch
en tu bucle for y no comenzaste los corredores de cola para ellos.
El uso normal de la tubería de entrada se ve así:
-
Agregue nodos como
shuffle_batch
a la tubería de entrada - (opcional, para evitar modificaciones involuntarias del gráfico) finalizar gráfico
--- fin de la construcción del gráfico, comienzo de la programación imperativa -
-
tf.start_queue_runners
-
while(True): session.run()
Para ser más escalable (para evitar Python GIL), puede generar todos sus datos utilizando la tubería TensorFlow.
Sin embargo, si el rendimiento no es crítico, puede conectar una matriz numpy a una tubería de entrada utilizando
slice_input_producer.
Aquí hay un ejemplo con algunos nodos de
Print
para ver qué está pasando (los mensajes en
Print
van a stdout cuando se ejecuta el nodo)
tf.reset_default_graph()
num_examples = 5
num_features = 2
data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features))
print data
(data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False)
data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ")
data_batch = tf.batch([data_node_debug], batch_size=2)
data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ")
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.get_default_graph().finalize()
tf.start_queue_runners()
try:
while True:
print sess.run(data_batch_debug)
except tf.errors.OutOfRangeError as e:
print "No more inputs."
Debería ver algo como esto
[[0 1]
[2 3]
[4 5]
[6 7]
[8 9]]
[[0 1]
[2 3]]
[[4 5]
[6 7]]
No more inputs.
Los números "8, 9" no llenaron el lote completo, por lo que no se produjeron.
También
tf.Print
se imprime en sys.stdout, por lo que se muestran por separado en la Terminal para mí.
PD: un mínimo de
batch
de conexión a una cola inicializada manualmente está en el
problema 2193 de github
Además, para fines de depuración, es posible que desee establecer un
timeout
de
timeout
en su sesión para que su computadora portátil IPython no se cuelgue en las colas de espera vacías.
Yo uso esta función auxiliar para mis sesiones
def create_session():
config = tf.ConfigProto(log_device_placement=True)
config.gpu_options.per_process_gpu_memory_fraction=0.3 # don''t hog all vRAM
config.operation_timeout_in_ms=60000 # terminate on long hangs
# create interactive session to register a default session
sess = tf.InteractiveSession("", config=config)
return sess
Notas de escalabilidad:
-
tf.constant
. copia entf.constant
constante de sus datos en el gráfico. Hay un límite fundamental de 2 GB en el tamaño de la definición de gráfico, por lo que es un límite superior en el tamaño de los datos -
Puede superar ese límite utilizando
v=tf.Variable
y guardando los datos allí ejecutandov.assign_op
con untf.placeholder
en el lado derecho y alimentando una matriz numpy al marcador de posición (feed_dict
) -
Eso todavía crea dos copias de datos, por lo que para ahorrar memoria puede hacer su propia versión de
slice_input_producer
que opera en matrices numpy y carga filas una a la vez usandofeed_dict
O podría intentar esto, el código carga el conjunto de datos de Iris en el flujo de tensor usando pandas y numpy y se imprime una salida simple de una neurona en la sesión. Espero que ayude para una comprensión básica ... [No he agregado la forma de una etiqueta de decodificación en caliente].
import tensorflow as tf
import numpy
import pandas as pd
df=pd.read_csv(''/home/nagarjun/Desktop/Iris.csv'',usecols = [0,1,2,3,4],skiprows = [0],header=None)
d = df.values
l = pd.read_csv(''/home/nagarjun/Desktop/Iris.csv'',usecols = [5] ,header=None)
labels = l.values
data = numpy.float32(d)
labels = numpy.array(l,''str'')
#print data, labels
#tensorflow
x = tf.placeholder(tf.float32,shape=(150,5))
x = data
w = tf.random_normal([100,150],mean=0.0, stddev=1.0, dtype=tf.float32)
y = tf.nn.softmax(tf.matmul(w,x))
with tf.Session() as sess:
print sess.run(y)
Puede usar la última API tf.data:
dataset = tf.contrib.data.make_csv_dataset(filepath)
iterator = dataset.make_initializable_iterator()
columns = iterator.get_next()
with tf.Session() as sess:
sess.run([iteator.initializer])
Si alguien vino aquí buscando una manera simple de leer archivos CSV absolutamente grandes y fragmentados en tf.estimator API, por favor, vea debajo de mi código
CSV_COLUMNS = [''ID'',''text'',''class'']
LABEL_COLUMN = ''class''
DEFAULTS = [[''x''],[''no''],[0]] #Default values
def read_dataset(filename, mode, batch_size = 512):
def _input_fn(v_test=False):
# def decode_csv(value_column):
# columns = tf.decode_csv(value_column, record_defaults = DEFAULTS)
# features = dict(zip(CSV_COLUMNS, columns))
# label = features.pop(LABEL_COLUMN)
# return add_engineered(features), label
# Create list of files that match pattern
file_list = tf.gfile.Glob(filename)
# Create dataset from file list
#dataset = tf.data.TextLineDataset(file_list).map(decode_csv)
dataset = tf.contrib.data.make_csv_dataset(file_list,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
label_name=LABEL_COLUMN)
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
dataset = dataset.shuffle(buffer_size = 10 * batch_size)
else:
num_epochs = 1 # end-of-input after this
batch_features, batch_labels = dataset.make_one_shot_iterator().get_next()
#Begins - Uncomment for testing only -----------------------------------------------------<
if v_test == True:
with tf.Session() as sess:
print(sess.run(batch_features))
#End - Uncomment for testing only -----------------------------------------------------<
return add_engineered(batch_features), batch_labels
return _input_fn
Ejemplo de uso en TF.estimator:
train_spec = tf.estimator.TrainSpec(input_fn = read_dataset(
filename = train_file,
mode = tf.estimator.ModeKeys.TRAIN,
batch_size = 128),
max_steps = num_train_steps)