Cargando datos de filas grandes en Cassandra usando Java y CQLSSTableWriter
bulk (1)
He intentado cargar un conjunto de datos enormemente grande en Cassandra utilizando la API de SSTableLoader para Java, pero mi programa agota el tiempo de espera mientras lo hago.
Estoy tomando artículos y descomponiéndolos en ngrams de palabras (unigrams, bigrams, trigrams). Tengo un espacio de teclas con tres familias de columnas (unigrams, trigrams, bigrams). Dentro de estas familias de columnas, la clave de fila sería la ID del documento, luego, para cada ngram dentro de ese documento, se agregaría una columna que contiene ese ngrama.
Así que un artículo con la ID de ''artículo1'' y con el contenido "Esta es una oración de prueba", la fila se vería como ...
row id | col | col | col | col | col
----------------------------------------------------
article1 | This | is | a | test | sentence
Este es el archivo de Java que estoy usando para iniciar SSTableWriters y agregarles datos es:
package cassandrabulktest.cassandra;
import static NGramProperties.*;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
public class NGramLoader {
private static final String UNIGRAM_SCHEMA = "CREATE TABLE articles.unigrams (" +
"docid text, " +
"unigram text, " +
"PRIMARY KEY (unigram, docid))";
private static CQLSSTableWriter unigram_writer;
private static final String BIGRAM_SCHEMA = "CREATE TABLE articles.bigrams (" +
"docid text, " +
"bigram text, " +
"PRIMARY KEY (bigram, docid))";
private static CQLSSTableWriter bigram_writer;
private static final String TRIGRAM_SCHEMA = "CREATE TABLE articles.trigrams (" +
"docid text, " +
"trigram text, " +
"PRIMARY KEY (trigram, docid))";
private static CQLSSTableWriter trigram_writer;
public static void initDirectories(String startdate, int count) {
String[] grams = { "unigrams", "bigrams", "trigrams" };
for (String gram : grams) {
File f = new File(BASE_LOCATION + "/" + startdate + "/articles/" + gram + "/");
f.mkdirs();
}
unigram_writer = CQLSSTableWriter.builder()
.inDirectory(BASE_LOCATION + "/" + startdate + "/articles/unigrams/")
.forTable(UNIGRAM_SCHEMA)
.using("INSERT INTO articles.unigrams (docid, unigram) VALUES (?, ?)")
.build();
bigram_writer = CQLSSTableWriter.builder()
.inDirectory(BASE_LOCATION + "/" + startdate + "/articles/bigrams/")
.forTable(BIGRAM_SCHEMA)
.using("INSERT INTO articles.bigrams (docid, bigram) VALUES (?, ?)")
.build();
trigram_writer = CQLSSTableWriter.builder()
.inDirectory(BASE_LOCATION + "/" + startdate + "/articles/trigrams/")
.forTable(TRIGRAM_SCHEMA)
.using("INSERT INTO articles.trigrams (docid, trigram) VALUES (?, ?)")
.build();
}
public static void load(String articleId, ArrayList<String> unigrams, ArrayList<String> bigrams, ArrayList<String> trigrams) throws IOException, InvalidRequestException {
for (String unigram : unigrams) {
unigram_writer.addRow(unigram, articleId);
}
for (String bigram : bigrams) {
bigram_writer.addRow(bigram, articleId);
}
for (String trigram : trigrams) {
trigram_writer.addRow(trigram, articleId);
}
}
public static void closeWriter() throws IOException {
unigram_writer.close();
bigram_writer.close();
trigram_writer.close();
}
}
Llamo a ''carga'' para cada artículo que itero, y las listas de arreglos son solo listas de los ngramos que deben agregarse.
El programa comienza lo suficientemente rápido, pero después de unos 100.000 artículos, se vuelve increíblemente lento. Supongo que el escritor está fusionando datos en un único SSTable sobre la marcha, lo que está desacelerando enormemente a medida que el número de elementos crece demasiado.
¿Alguna idea sobre formas de evitar esto?
Entonces esto ayudó. Noté que cuantos más registros se crearon en el mismo directorio de entrada, más tiempo obtuvo la importación. He mirado alrededor y parece que esto es un problema debido al tamaño del índice y que Casandra debe reconstruirlo cada vez que escribe. No he verificado esto, pero tiene sentido por el resultado que vi. Aquí está mi solución para recrear el escritor de tablas y crear un nuevo directorio para que la construcción del índice sea mucho más rápida.
No es perfecto, pero es dramáticamente más rápido.
class CassandraLoader {
private static final Logger logger = Logger.getLogger(CassandraLoader.class
.getName());
// After half a million records we will rotate the directory for efficiency
private static final int MAX_RECORD_COUNT = 500000;
private CQLSSTableWriter tableWriter;
private final CsvIOFactory csvIOFactory = CsvIOFactory.createFactory(
createCsvConfig(), AdBlockLog.class);;
private final CsvDeserializer deSerializer;
private final String cqlKeySpace;
private final String cqlTable;
/**
* This is the total number of output directories we have processed.
*/
private int rolloverFileCount = 0;
/**
* Output directory name.
*/
private String outputDirectory;
/**
* Constructor that initializes the output cql keyspace and the cql table where
* the data needed to be stored.
*
* @param cqlKeySpace
* @param cqlTable
* @param outputDirectory
*/
protected CassandraLoader(final String cqlKeySpace, final String cqlTable,
final String outputDirectory) {
this.cqlKeySpace = cqlKeySpace;
this.cqlTable = cqlTable;
this.outputDirectory = outputDirectory;
// Create a new Desieralizer.
deSerializer = csvIOFactory.createDeserializer();
tableWriter = createTableWriter(outputDirectory, rolloverFileCount);
}
public int load(final String s3Bucket, final String s3Regex)
throws InvalidRequestException, IllegalArgumentException,
IllegalAccessException, IOException {
int totalRecordCount = 0;
int rolloverRecordCount = 0;
logger.info("Loading files from bucket " + s3Bucket + " with regex "
+ s3Regex);
final List<String> s3FileKeys = S3Util.listBucketToKeys(s3Bucket, s3Regex);
logger.info("Found " + s3FileKeys.size() + " total s3 files");
for (String s3fileKey : s3FileKeys) {
logger.info("Processing file " + s3fileKey);
int recordsProcessed = loadCsvFromS3Bulk(s3Bucket, s3fileKey);
totalRecordCount += recordsProcessed;
rolloverRecordCount += recordsProcessed;
logger.info("Total Record Count " + totalRecordCount);
logger.info("Rollover Record Count " + rolloverRecordCount);
if (rolloverRecordCount >= MAX_RECORD_COUNT) {
tableWriter.close();
tableWriter = createTableWriter(outputDirectory,
++rolloverFileCount);
rolloverRecordCount = 0;
}
}
return totalRecordCount;
}
private int loadCsvFromS3Bulk(final String bucketName, final String key)
throws IOException, InvalidRequestException,
IllegalArgumentException, IllegalAccessException {
// Have to close all of these
InputStream s3InputStream = null;
InputStream gzStream = null;
InputStreamReader bufReader = null;
int recordsProcessed = 0;
try {
s3InputStream = S3Util.getFileInputStream(bucketName, key);
gzStream = new GZIPInputStream(s3InputStream);
bufReader = new InputStreamReader(gzStream,
StandardCharsets.US_ASCII);
deSerializer.open(bufReader);
for (; deSerializer.hasNext(); recordsProcessed++) {
AdBlockLog abl = deSerializer.next();
tableWriter.addRow(getRowMap(abl));
}
} finally {
deSerializer.close(true);
bufReader.close();
gzStream.close();
s3InputStream.close();
}
MemoryUtils.printUsage(logger);
return recordsProcessed;
}
public void close() throws IOException {
tableWriter.close();
}
@VisibleForTesting
protected Map<String, Object> getRowMap(final CassandraEntity casEntity)
throws IllegalArgumentException, IllegalAccessException {
Map<String, Object> rowMap = new HashMap<String, Object>();
for (Field f : casEntity.getClass().getDeclaredFields()) {
if (f.isAnnotationPresent(Column.class)) {
Column columnAnnotation = f.getAnnotation(Column.class);
Class<?> clazz = f.getType();
f.setAccessible(true);
logger.finest("adding column with class " + clazz.getName());
if (clazz.isAssignableFrom(BigDecimal.class)) {
BigDecimal value = (BigDecimal) f.get(casEntity);
rowMap.put(columnAnnotation.name(), (value == null ? null
: value.doubleValue()));
continue;
}
// Anything other than BigDecimal we can just add.
if (clazz.isAssignableFrom(String.class)) {
String value = (String) f.get(casEntity);
// I think this should save space
rowMap.put(columnAnnotation.name(),
(value == null || value.isEmpty()) ? null : value);
}
rowMap.put(columnAnnotation.name(), f.get(casEntity));
}
}
return rowMap;
}
/**
* Create a new tableWriter. This is most important for doing rollover
* to a new directory to increase speed and efficiency.
*
* The output will be stored in the same directory where the application is
* being ran in the format of cqlKeySpace/outputDirectoryName_iterator
*
* ex.
* s3dump/dt=2015-02-01_1
*
* @param outputDirectoryName The directory name that you want to write the output to
* @param iteration The iteration that will be appended to the directory.
* @return A newly created {@link CQLSSTableWriter}
*/
private final CQLSSTableWriter createTableWriter(
String outputDirectoryName, int iteration) {
final String directoryName = String.format(cqlKeySpace + "/%s_%s",
outputDirectoryName, Integer.toString(iteration));
final File currentOutputDirectory = new File(directoryName);
if (!currentOutputDirectory.exists()) {
logger.info("Creating sstable director "
+ currentOutputDirectory.getName());
currentOutputDirectory.mkdirs();
}
String schema = String.format(AdBlockLog.AD_BLOCK_LOG_SCHEMA,
cqlKeySpace, cqlTable);
String insert = String.format(AdBlockLog.AD_BLOCK_LOG_INSERT_STMT,
cqlKeySpace, cqlTable);
return CQLSSTableWriter.builder()
.inDirectory(currentOutputDirectory.getAbsolutePath())
.withPartitioner(new Murmur3Partitioner())
.withBufferSizeInMB(128).forTable(schema).using(insert).build();
}
private static final CsvConfiguration createCsvConfig() {
CsvConfiguration config = new CsvConfiguration();
config.setFieldDelimiter('','');
return config;
}
}