por - Analizar archivos CSV muy grandes con C++
leer archivo csv c++ (3)
Mi objetivo es analizar grandes archivos csv con C ++ en un proyecto de QT en el entorno OSX. (Cuando digo csv quiero decir tsv y otras variantes de 1GB ~ 5GB).
Parece una tarea simple, pero las cosas se complican cuando el tamaño de los archivos aumenta. No quiero escribir mi propio analizador debido a los muchos casos extremos relacionados con el análisis de archivos csv.
He encontrado varias librerías de procesamiento de csv para manejar este trabajo, pero analizar un archivo de 1GB toma aproximadamente 90 ~ 120 segundos en mi máquina, lo que no es aceptable. No estoy haciendo nada con los datos en este momento, solo proceso y descarto los datos para fines de prueba.
cccsvparser es una de las bibliotecas que he probado. Pero la única biblioteca lo suficientemente rápida fue quick-cpp-csv-parser, que da resultados aceptables: 15 segundos en mi máquina, pero funciona solo cuando se conoce la estructura del archivo.
Ejemplo de uso: fast-cpp-csv-parser
#include "csv.h"
int main(){
io::CSVReader<3> in("ram.csv");
in.read_header(io::ignore_extra_column, "vendor", "size", "speed");
std::string vendor; int size; double speed;
while(in.read_row(vendor, size, speed)){
// do stuff with the data
}
}
Como puede ver, no puedo cargar archivos arbitrarios y debo definir variables específicamente para que coincidan con mi estructura de archivos. No conozco ningún método que me permita crear esas variables dinámicamente en tiempo de ejecución.
El otro enfoque que he intentado es leer el archivo csv línea por línea con la clase LineReader fast-cpp-csv-parser que es realmente rápido (aproximadamente 7 segundos para leer todo el archivo), y luego analizar cada línea con cccsvparser lib que puede procesar cadenas . pero esto toma alrededor de 40 segundos hasta que esté listo, es una mejora en comparación con los primeros intentos, pero aún inaceptable.
He visto varias preguntas de stackoverflow relacionadas con el análisis de archivos csv. Ninguno de ellos toma el procesamiento de archivos grandes en la cuenta.
También pasé mucho tiempo buscando en Google para encontrar una solución a este problema, y realmente echo de menos la libertad que ofrecen los administradores de paquetes como npm o pip cuando buscan soluciones listas para usar .
Apreciaré cualquier sugerencia sobre cómo manejar este problema.
Editar:
Cuando se utiliza el enfoque de @fbucek , el tiempo de procesamiento se reduce a 25 segundos, lo cual es una gran mejora.
podemos optimizar esto aún más?
Supongo que estás usando solo un hilo.
El subprocesamiento múltiple puede acelerar su proceso.
El mejor logro hasta el momento es 40 seg . Mantengamos eso.
He supuesto que primero lees y luego procesas -> (alrededor de 7 segundos para leer todo el archivo)
7 segundos para leer 33 segundos para procesar
Antes que nada , puedes dividir tu archivo en fragmentos , digamos 50MB. Eso significa que puede comenzar a procesar después de leer 50MB de archivo. No necesita esperar hasta que termine el archivo completo. Eso es 0.35 segundos para leer (ahora es 0.35 + 33 segundos para procesamiento = cca 34 segundos)
Cuando utiliza el subprocesamiento múltiple, puede procesar múltiples trozos a la vez . Eso puede acelerar el proceso teóricamente hasta el número de sus núcleos. Digamos que tienes 4 núcleos. Eso es 33/4 = 8.25 seg.
Creo que puedes acelerar tu procesamiento con 4 núcleos hasta 9 s. en total.
Mire QThreadPool y QRunnable o QtConcurrent Preferiría QThreadPool
Divida la tarea en partes:
- Primero intente recorrer el archivo y dividirlo en fragmentos. Y no hagas nada con eso.
- Luego crea la clase "ChunkProcessor" que puede procesar ese fragmento
- Haga que "ChunkProcessor" sea una subclase de QRunnable y en la función reimplementada run () ejecute su proceso
- Cuando tienes trozos, tienes una clase que puede procesarlos y esa clase es compatible con QThreadPool, puedes pasarla a
Podría verse así
loopoverfile {
whenever chunk is ready {
ChunkProcessor *chunkprocessor = new ChunkProcessor(chunk);
QThreadPool::globalInstance()->start(chunkprocessor);
connect(chunkprocessor, SIGNAL(finished(std::shared_ptr<ProcessedData>)), this, SLOT(readingFinished(std::shared_ptr<ProcessedData>)));
}
}
Puede usar std :: share_ptr para pasar datos procesados con el fin de no usar QMutex u otra cosa y evitar problemas de serialización con el acceso de subprocesos múltiples a algún recurso.
Nota: para utilizar una señal personalizada, debe registrarla antes de su uso
qRegisterMetaType<std::shared_ptr<ProcessedData>>("std::shared_ptr<ProcessedData>");
Editar: (basado en la discusión, mi respuesta no fue clara al respecto) No importa qué disco use o cuán rápido sea. La lectura es una operación de hilo único. Esta solución solo se sugirió porque tardó 7 segundos en leerse y, una vez más, no importa qué disco sea. 7 segundos es lo que cuenta. Y el único propósito es comenzar a procesar tan pronto como sea posible y no esperar hasta que termine la lectura.
Puedes usar:
QByteArray data = file.readAll();
O puede usar la idea principal: (No sé por qué tardan 7 segundos en leer, qué hay detrás)
QFile file("in.txt");
if (!file.open(QIODevice::ReadOnly | QIODevice::Text))
return;
QByteArray* data = new QByteArray;
int count = 0;
while (!file.atEnd()) {
++count;
data->append(file.readLine());
if ( count > 10000 ) {
ChunkProcessor *chunkprocessor = new ChunkProcessor(data);
QThreadPool::globalInstance()->start(chunkprocessor);
connect(chunkprocessor, SIGNAL(finished(std::shared_ptr<ProcessedData>)), this, SLOT(readingFinished(std::shared_ptr<ProcessedData>)));
data = new QByteArray;
count = 0;
}
}
Un archivo, un hilo, leído casi tan rápido como leído por línea "sin" interrupción. Lo que haces con los datos es otro problema, pero no tiene nada que ver con E / S. Ya está en la memoria. Por lo tanto, la única preocupación sería un archivo de 5 GB y una cantidad insuficiente de RAM en la máquina.
Es una solución muy simple. Todo lo que necesita es la subclase QRunnable, la función de ejecutar de nuevo, emitir señal cuando está terminada, pasar datos procesados usando un puntero compartido y en el hilo principal unir esos datos en una estructura o lo que sea. Solución simple de seguridad de subprocesos.
Propondría una sugerencia multi-hilo con una pequeña variación, es que un hilo está dedicado a leer el archivo en un tamaño predefinido (configurable) de trozos y continúa alimentando datos a un conjunto de hilos (más de un núcleo de cpu basado). Digamos que la configuración se ve así:
tamaño del fragmento = 50 MB
Hilo de disco = 1
Hilos de proceso = 5
- Crea una clase para leer datos del archivo. En esta clase, contiene una estructura de datos que se utiliza para comunicarse con hilos de proceso. Por ejemplo, esta estructura contendría offset de inicio, desplazamiento final del buffer de lectura para cada hilo de proceso. Para leer datos de archivo, la clase de lector tiene 2 búferes de tamaño de fragmento (50 MB en este caso)
- Cree una clase de proceso que contenga punteros (compartidos) para los almacenamientos intermedios de lectura y compensa la estructura de datos.
- Ahora crea el controlador (probablemente el hilo principal), crea todos los hilos y espera su finalización y maneja las señales.
- El hilo del lector se invoca con la clase de lector, lee 50 MB de los datos y, en función del número de subprocesos, crea el objeto de estructura de datos de desplazamientos. En este caso, t1 maneja de 0 a 10 MB, t2 maneja de 10 a 20 MB y así sucesivamente. Una vez listo, notifica los hilos del procesador. A continuación, lee inmediatamente el siguiente fragmento del disco y espera a que el hilo del procesador llegue a la notificación de finalización desde los hilos del proceso.
- Procesador subprocesos en la notificación, lee datos del búfer y lo procesa. Una vez hecho esto, notifica al lector acerca de la finalización y espera el siguiente fragmento.
- Este proceso finaliza hasta que se leen y procesan todos los datos. A continuación, el hilo del lector notifica al hilo principal sobre la finalización que envía PROCESS_COMPLETION, en todos los hilos que salen. o el hilo principal elige procesar el siguiente archivo en la cola.
Tenga en cuenta que los desplazamientos se toman para facilitar la explicación, los desplazamientos a la asignación del delimitador de línea deben manejarse mediante programación.
Si el analizador que ha utilizado no se distribuye, obviamente, el enfoque no es escalable.
Yo votaría por una técnica como esta a continuación
- trocear el archivo en un tamaño que puede ser manejado por una restricción máquina / tiempo
- distribuya los trozos a un grupo de máquinas (1 .. *) que puedan cumplir con sus requisitos de tiempo / espacio
- Considere tratar en tamaños de bloque para un pedazo dado
- Evite los hilos en el mismo recurso (es decir, el bloque dado) para salvarse de todos los problemas relacionados con el hilo.
- Utilice subprocesos para lograr operaciones no competitivas (en un recurso), como leer en un subproceso y escribir en un subproceso diferente en un archivo diferente.
- haz tu análisis sintáctico (ahora para este pequeño fragmento puedes invocar tu analizador sintáctico).
- haz tus operaciones
- fusionar los resultados / si puede distribuirlos tal como están.
Ahora, habiendo dicho eso, ¿por qué no puedes usar Hadoop como frameworks?