java cassandra datastax-java-driver

El rendimiento de Cassandra Bulk-Write con Java Driver es atroz en comparación con MongoDB



datastax-java-driver (2)

Construí un importador para MongoDB y Cassandra. Básicamente, todas las operaciones del importador son las mismas, a excepción de la última parte en la que se forman los datos para que coincida con el esquema necesario de la tabla de cassandra y la estructura del documento mongodb deseada. El rendimiento de escritura de Cassandra es realmente malo en comparación con MongoDB y creo que estoy haciendo algo mal.

Básicamente, mi clase de importador abstracto carga los datos, lee todos los datos y los pasa a la clase que extiende MongoDBImporter o CassandraImporter para enviar datos a las bases de datos. Una base de datos está dirigida a la vez: no hay inserciones "duales" para C * y MongoDB al mismo tiempo. El importador se ejecuta en la misma máquina contra el mismo número de nodos (6).

El problema:

La importación de MongoDB finalizó después de 57 minutos. Ingerí 10.000.000 de documentos y espero la misma cantidad de filas para Cassandra. Mi importador de Cassandra ahora está funcionando desde 2,5 horas y solo está en 5.000.000 filas insertadas. Esperaré a que el importador termine y edite el tiempo real de finalización aquí.

Cómo importo con Cassandra:

Preparo dos declaraciones una vez antes de ingerir datos. Ambas declaraciones son consultas de ACTUALIZACIÓN porque a veces tengo que anexar datos a una lista existente. Mi tabla se borra por completo antes de comenzar la importación. Las declaraciones preparadas se usan una y otra vez.

PreparedStatement statementA = session.prepare(queryA); PreparedStatement statementB = session.prepare(queryB);

Para cada fila, creo un BoundStatement y paso esa declaración a mi método de procesamiento por lotes "personalizado":

BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B bs = bs.bind(); //add data... with several bs.setXXX(..) calls cassandraConnection.executeBatch(bs);

Con MongoDB, puedo insertar 1000 documentos (el máximo) a la vez sin problemas. Para Cassandra, el importador se bloquea con com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large para solo 10 de mis declaraciones en algún momento. Estoy usando este código para construir los lotes. Por cierto, comencé con 1000, 500, 300, 200, 100, 50, 20 lotes antes pero obviamente no funcionan. Luego lo puse a 10 y arrojó la excepción nuevamente. Ahora me he quedado sin ideas de por qué se está rompiendo.

private static final int MAX_BATCH_SIZE = 10; private Session session; private BatchStatement currentBatch; ... @Override public ResultSet executeBatch(Statement statement) { if (session == null) { throw new IllegalStateException(CONNECTION_STATE_EXCEPTION); } if (currentBatch == null) { currentBatch = new BatchStatement(Type.UNLOGGED); } currentBatch.add(statement); if (currentBatch.size() == MAX_BATCH_SIZE) { ResultSet result = session.execute(currentBatch); currentBatch = new BatchStatement(Type.UNLOGGED); return result; } return null; }

Mi esquema C * se ve así

CREATE TYPE stream.event ( data_dbl frozen<map<text, double>>, data_str frozen<map<text, text>>, data_bool frozen<map<text, boolean>>, ); CREATE TABLE stream.data ( log_creator text, date text, //date of the timestamp ts timestamp, log_id text, //some id hour int, //just the hour of the timestmap x double, y double, events list<frozen<event>>, PRIMARY KEY ((log_creator, date, hour), ts, log_id) ) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)

A veces necesito agregar nuevos eventos a una fila existente. Es por eso que necesito una lista de UDT. Mi UDT contiene tres mapas porque los creadores del evento producen datos diferentes (pares clave / valor de tipo cadena / doble / booleano). Soy consciente del hecho de que los UDT están congelados y no puedo tocar los mapas de eventos ya ingeridos. Eso está bien para mí, solo necesito agregar nuevos eventos que tengan la misma marca de tiempo a veces. Participo en el creador de los registros (algunos nombres de sensores), así como la fecha del registro (es decir, "22-09-2016") y la hora de la marca de tiempo (para distribuir los datos más mientras se mantienen los datos relacionados muy juntos en una partición).

Estoy usando Cassandra 3.0.8 con Datastax Java Driver, versión 3.1.0 en mi pom. De acuerdo con ¿Cuál es el límite de lote en Cassandra? , No debería aumentar el tamaño del lote ajustando batch_size_fail_threshold_in_kb en mi cassandra.yaml . Entonces ... ¿qué hace o qué está mal con mi importación?

ACTUALIZACIÓN Así que he ajustado mi código para ejecutar consultas asíncronas y almacenar las inserciones actualmente en ejecución en una lista. Cada vez que finaliza una inserción asíncrona, se eliminará de la lista. Cuando el tamaño de la lista excede un umbral y se produce un error en una inserción anterior, el método esperará 500 ms hasta que las inserciones estén por debajo del umbral. Mi código ahora aumenta automáticamente el umbral cuando no se produjo ninguna inserción.

Pero después de transmitir 3.300.000 filas, se procesaron 280.000 inserciones, pero no se produjo ningún error. Parece que el número de insertos actualmente procesados ​​parece demasiado alto. Los 6 nodos de cassandra se ejecutan en hardware básico, que tiene 2 años.

¿Es este el alto número (280,000 para 6 nodos) de insertos simultáneos un problema? ¿Debo agregar una variable como MAX_CONCURRENT_INSERT_LIMIT ?

private List<ResultSetFuture> runningInsertList; private static int concurrentInsertLimit = 1000; private static int concurrentInsertSleepTime = 500; ... @Override public void executeBatch(Statement statement) throws InterruptedException { if (this.runningInsertList == null) { this.runningInsertList = new ArrayList<>(); } //Sleep while the currently processing number of inserts is too high while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) { Thread.sleep(concurrentInsertSleepTime); } ResultSetFuture future = this.executeAsync(statement); this.runningInsertList.add(future); Futures.addCallback(future, new FutureCallback<ResultSet>() { @Override public void onSuccess(ResultSet result) { runningInsertList.remove(future); } @Override public void onFailure(Throwable t) { concurrentInsertErrorOccured = true; } }, MoreExecutors.sameThreadExecutor()); if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) { concurrentInsertLimit += 2000; LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit)); } return; }


Cuando ejecuta un lote en Cassandra, elige un solo nodo para actuar como coordinador. Este nodo se convierte en responsable de asegurarse de que las escrituras por lotes encuentren sus nodos apropiados. Así que (por ejemplo) agrupando 10000 escrituras juntas, ahora le ha encomendado a un nodo el trabajo de coordinar 10000 escrituras, la mayoría de las cuales serán para diferentes nodos. Es muy fácil volcar un nodo o matar latencia para un clúster completo haciendo esto. Por lo tanto, la razón para el límite en los tamaños de lote.

El problema es que Cassandra CQL BATCH es un nombre inapropiado, y no hace lo que tú o los demás piensan que es. No debe usarse para aumentar el rendimiento. Paralelamente, las escrituras asincrónicas siempre serán más rápidas que ejecutar el mismo número de declaraciones con BATCH juntas.

Sé que podría agrupar fácilmente 10.000 filas juntas porque irán a la misma partición. ... ¿Seguiría utilizando inserciones de fila única (asíncronas) en lugar de lotes?

Eso depende de si el rendimiento de escritura es o no su verdadero objetivo. Si es así, aún me quedaría con escrituras paralelas y asíncronas.

Para obtener más información acerca de esto, consulte estas dos publicaciones de blog de Ryan Svihla de DataStax:

Cassandra: carga por lotes sin la palabra clave Batch

Cassandra: Carga por lotes sin lotes - The Nuanced Edition


Después de usar C * por un momento, estoy convencido de que realmente debería usar lotes solo para mantener múltiples tablas sincronizadas. Si no necesita esa característica , entonces no use lotes en absoluto porque incurrirá en penalizaciones de rendimiento.

La forma correcta de cargar datos en C * es con escrituras asíncronas, con contrapresión opcional si su clúster no puede mantener el ritmo de ingestión. Debe reemplazar su método de procesamiento por lotes "personalizado" por algo que:

  • realiza escrituras asíncronas
  • mantener bajo control cuantas escrituras en vuelo tiene
  • realizar algunos intentos cuando un tiempo de espera de escritura.

Para realizar escrituras asíncronas, use el método .executeAsync , que le devolverá un objeto ResultSetFuture .

Para mantener bajo control cuántas consultas en vuelo solo recopilan el objeto ResultSetFuture recuperado del método .executeAsync en una lista, y si la lista obtiene (valores de parque aquí) digamos elementos 1k, espere a que todos terminen antes de emitir más escrituras. O puede esperar a que el primero termine antes de emitir una escritura más, solo para mantener la lista completa.

Y, por último, puede verificar las fallas de escritura cuando espera que se complete una operación. En ese caso, podrías:

  1. escribir de nuevo con el mismo valor de tiempo de espera
  2. escribir de nuevo con un mayor valor de tiempo de espera
  3. espere un poco de tiempo y luego vuelva a escribir con el mismo valor de tiempo de espera
  4. espere un poco de tiempo y luego vuelva a escribir con un mayor valor de tiempo de espera

De 1 a 4, tiene una mayor resistencia a la contrapresión. Elija el que mejor se ajuste a su caso.

EDITAR después de la actualización de la pregunta

Tu lógica de inserción me parece un poco rota:

  1. No veo ninguna lógica de reintento
  2. No elimina el elemento en la lista si falla
  3. Su while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) es incorrecto, porque solo dormirá cuando el número de consultas emitidas sea> concurrentInsertLimit , y debido a 2. su hilo simplemente se estacionará allí.
  4. Nunca configuraste en falso concurrentInsertErrorOccured

Normalmente guardo una lista de consultas (fallidas) con el fin de volver a intentarlas en otro momento. Eso me da un control poderoso sobre las consultas, y cuando las consultas fallidas comienzan a acumularse, duermo por unos momentos, y luego sigo intentándolo nuevamente (hasta X veces, luego fallamos ...).

Esta lista debe ser muy dinámica, por ejemplo, debe agregar elementos allí cuando las consultas fallan y eliminar los elementos cuando realice un reintento. Ahora puede comprender los límites de su clúster y ajustar su concurrentInsertLimit según, por ejemplo, el promedio de consultas fallidas en el último segundo, o seguir con el enfoque más simple " pause si tenemos un elemento en la lista de reintentos ", etc.

EDITAR 2 después de los comentarios

Como no quieres ninguna lógica de reintento, cambiaría tu código de esta manera:

private List<ResultSetFuture> runningInsertList; private static int concurrentInsertLimit = 1000; private static int concurrentInsertSleepTime = 500; ... @Override public void executeBatch(Statement statement) throws InterruptedException { if (this.runningInsertList == null) { this.runningInsertList = new ArrayList<>(); } ResultSetFuture future = this.executeAsync(statement); this.runningInsertList.add(future); Futures.addCallback(future, new FutureCallback<ResultSet>() { @Override public void onSuccess(ResultSet result) { runningInsertList.remove(future); } @Override public void onFailure(Throwable t) { runningInsertList.remove(future); concurrentInsertErrorOccured = true; } }, MoreExecutors.sameThreadExecutor()); //Sleep while the currently processing number of inserts is too high while (runningInsertList.size() >= concurrentInsertLimit) { Thread.sleep(concurrentInsertSleepTime); } if (!concurrentInsertErrorOccured) { // Increase your ingestion rate if no query failed so far concurrentInsertLimit += 10; } else { // Decrease your ingestion rate because at least one query failed concurrentInsertErrorOccured = false; concurrentInsertLimit = Max(1, concurrentInsertLimit - 50); while (runningInsertList.size() >= concurrentInsertLimit) { Thread.sleep(concurrentInsertSleepTime); } } return; }

También puede optimizar un poco el procedimiento al reemplazar su List<ResultSetFuture> con un contador.

Espero que ayude.