Cómo utilizar la función de escritura asincrónica/por lotes con el controlador Datastax Java
cassandra datastax-java-driver (2)
Para el ejemplo proporcionado en la respuesta de Lyuben, establecer ciertos atributos de un lote como Type.COUNTER
(si necesita actualizar contadores) usando cadenas no funcionará. En su lugar, puede organizar sus declaraciones preparadas en lotes como las siguientes:
final String insertQuery = "INSERT INTO test.prepared (id, col_1) VALUES (?,?);";
final PreparedStatement prepared = session.prepare(insertQuery);
final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
batch.add(prepared.bind(userId1, "something"));
batch.add(prepared.bind(userId2, "another"));
batch.add(prepared.bind(userId3, "thing"));
session.executeAsync(batch);
Estoy planeando utilizar el controlador Datastax Java para escribir en Cassandra. Me interesaron principalmente las funciones de Batch Writes
y Asycnhronous
del controlador Datastax java, pero no puedo obtener ningún tutorial que me explique cómo incorporar estas funciones en mi código a continuación. que utiliza el controlador Datastax Java ..
/**
* Performs an upsert of the specified attributes for the specified id.
*/
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {
try {
// make a sql here using the above input parameters.
String sql = sqlPart1.toString()+sqlPart2.toString();
DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);
BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));
DatastaxConnection.getSession().execute(query);
} catch (InvalidQueryException e) {
LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
} catch (Exception e) {
LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
}
}
En el siguiente código, estoy creando una conexión a los nodos Cassandra utilizando el controlador Datastax Java.
/**
* Creating Cassandra connection using Datastax Java driver
*
*/
private DatastaxConnection() {
try{
builder = Cluster.builder();
builder.addContactPoint("some_nodes");
builder.poolingOptions().setCoreConnectionsPerHost(
HostDistance.LOCAL,
builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));
cluster = builder
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.build();
StringBuilder s = new StringBuilder();
Set<Host> allHosts = cluster.getMetadata().getAllHosts();
for (Host h : allHosts) {
s.append("[");
s.append(h.getDatacenter());
s.append(h.getRack());
s.append(h.getAddress());
s.append("]");
}
System.out.println("Cassandra Cluster: " + s.toString());
session = cluster.connect("testdatastaxks");
} catch (NoHostAvailableException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (Exception e) {
}
}
¿Alguien puede ayudarme sobre cómo agregar escrituras en lote o características asíncronas a mi código anterior? Gracias por la ayuda.
Estoy ejecutando Cassandra 1.2.9
Para asynch es tan simple como usar la función executeAsync
:
...
DatastaxConnection.getSession().executeAsync(query);
Para el lote, debe compilar la consulta (yo uso cadenas porque el compilador sabe cómo optimizar la concatenación de cadenas realmente bien):
String cql = "BEGIN BATCH "
cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
cql += "APPLY BATCH; "
DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);
// this is where you need to be careful
// bind expects a comma separated list of values for all the params (?) above
// so for the above batch we need to supply 4 params:
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");
DatastaxConnection.getSession().execute(query);
En una nota lateral, creo que su enlace de la declaración puede parecerse a esto, suponiendo que cambie los atributos a una lista de mapas donde cada mapa representa una actualización / inserción dentro del lote:
BoundStatement query = prepStatement.bind(userId,
attributesList.get(0).values().toArray(new Object[attributes.size()]),
userId_2,
attributesList.get(1).values().toArray(new Object[attributes.size()]));