preparedstatement - resultset java 8
java.util.stream con ResultSet (4)
Tengo pocas tablas con gran cantidad de datos (alrededor de 100 millones de registros).
Por lo tanto, no puedo almacenar estos datos en la memoria, pero me gustaría transmitir este
conjunto de resultados
usando la clase
java.util.stream
y pasar esta transmisión a otra clase.
Leí sobre los operadores
Stream.of
y
Stream.Builder
pero son flujos almacenados en la memoria intermedia.
Entonces, ¿hay alguna manera de resolver esta pregunta?
Gracias por adelantado.
ACTUALIZACIÓN # 1
Bien, busqué en Google y encontré la biblioteca jooq . No estoy seguro, pero parece que podría ser aplicable a mi caso de prueba. Para resumir, tengo pocas tablas con gran cantidad de datos. Me gustaría transmitir mi conjunto de resultados y transferir esta transmisión a otro método. Algo como esto:
// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {
Stream<Record> record = null;
try (Connection connection = dataSource.getConnection()) {
String sql = "select * from " + table;
try (PreparedStatement pSt = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);
pSt.setFetchSize(5000);
ResultSet resultSet = pSt.executeQuery();
//
record = DSL.using(connection)
.fetch(resultSet).stream();
}
} catch (SQLException sqlEx) {
logger.error(sqlEx);
}
return record;
}
¿Podría alguien aconsejarme, estoy en el camino correcto? Gracias.
ACTUALIZACIÓN # 2
Hice un experimento con
jooq
y podría decir ahora que la decisión anterior no es adecuada para mí.
Este
record = DSL.using(connection).fetch(resultSet).stream();
código
record = DSL.using(connection).fetch(resultSet).stream();
toma mucho tiempo
jOOQ
Voy a responder la parte de jOOQ de tu pregunta. A partir de jOOQ 3.8, ahora ha habido bastantes características adicionales relacionadas con la combinación de jOOQ con Stream. Otros usos también están documentados en esta página de jOOQ .
Su uso sugerido:
Intentaste esto:
Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();
De hecho, esto no funciona bien para grandes conjuntos de resultados porque
fetch(ResultSet)
todo el conjunto de resultados en la memoria y luego llama a
Collection.stream()
en él.
Mejor uso (perezoso):
En cambio, podrías escribir esto:
try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
...
}
... lo cual es esencialmente conveniente para esto:
try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
Stream<Record> stream = cursor.stream();
...
}
Consulte también
DSLContext.fetchStream(ResultSet)
Por supuesto, también puedes dejar que jOOQ ejecute tu cadena SQL, en lugar de luchar con JDBC:
try (Stream<Record> stream =
DSL.using(dataSource)
.resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection
.fetchSize(5000)
.fetchStream()) {
...
}
Sobre el uso de prueba con recursos
Tenga en cuenta que una
Stream
producida por jOOQ es "ingeniosa", es decir, contiene una referencia a un
ResultSet
(y
PreparedStatement
) abierto.
Por lo tanto, si realmente desea devolver esa transmisión fuera de su método, asegúrese de que esté cerrada correctamente.
Aquí está la muestra más simple de AbacusUtil .
final DataSource ds = JdbcUtil.createDataSource(url, user, password);
final SQLExecutor sqlExecutor = new SQLExecutor(ds);
sqlExecutor.stream(sql, parameters);
Divulgación: Soy el desarrollador de AbacusUtil.
Lo primero que debes entender es que ese código
try (Connection connection = dataSource.getConnection()) {
…
try (PreparedStatement pSt = connection.prepareStatement(sql)) {
…
return stream;
}
}
no funciona ya que cuando abandonas los bloques de
try
, los recursos están cerrados mientras el procesamiento de
Stream
ni siquiera ha comenzado.
La construcción de gestión de recursos "probar con recursos" funciona para los recursos utilizados dentro de un ámbito de bloque dentro de un método, pero está creando un método de fábrica que devuelve un recurso.
Por lo tanto, debe asegurarse de que el cierre de la secuencia devuelta cerrará los recursos y la persona que llama es responsable de cerrar la
Stream
.
Además, necesita una función que produzca un elemento a partir de una sola línea desde
ResultSet
.
Supongamos que tienes un método como
Record createRecord(ResultSet rs) {
…
}
puedes crear un
Stream<Record>
básicamente como
Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
}
}, false);
Pero para hacerlo correctamente, debe incorporar el manejo de excepciones y el cierre de recursos.
Puede usar
Stream.onClose
para registrar una acción que se realizará cuando se cierre
Stream
, pero tiene que ser un
Runnable
que no pueda lanzar excepciones marcadas.
Del mismo modo, el método
tryAdvance
no puede lanzar excepciones marcadas.
Y dado que no podemos simplemente anidar los bloques
try(…)
aquí, la lógica del programa de excepciones de supresión lanzada de
close
, cuando ya hay una excepción pendiente, no es gratis.
Para ayudarnos aquí, presentamos un nuevo tipo que puede ajustar las operaciones de cierre que pueden arrojar excepciones marcadas y entregarlas envueltas en una excepción no verificada.
Al implementar
AutoCloseable
, puede utilizar la construcción
try(…)
para encadenar operaciones de cierre de forma segura:
interface UncheckedCloseable extends Runnable, AutoCloseable {
default void run() {
try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
}
static UncheckedCloseable wrap(AutoCloseable c) {
return c::close;
}
default UncheckedCloseable nest(AutoCloseable c) {
return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
}
}
Con esto, toda la operación se convierte en:
private Stream<Record> tableAsStream(DataSource dataSource, String table)
throws SQLException {
UncheckedCloseable close=null;
try {
Connection connection = dataSource.getConnection();
close=UncheckedCloseable.wrap(connection);
String sql = "select * from " + table;
PreparedStatement pSt = connection.prepareStatement(sql);
close=close.nest(pSt);
connection.setAutoCommit(false);
pSt.setFetchSize(5000);
ResultSet resultSet = pSt.executeQuery();
close=close.nest(resultSet);
return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
try {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
}
}, false).onClose(close);
} catch(SQLException sqlEx) {
if(close!=null)
try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
throw sqlEx;
}
}
Este método envuelve la operación de cierre necesaria para todos los recursos,
Connection
,
Statement
y
ResultSet
dentro de una instancia de la clase de utilidad descrita anteriormente.
Si ocurre una excepción durante la inicialización, la operación de cierre se realiza de inmediato y la excepción se entrega a la persona que llama.
Si la construcción de la transmisión tiene éxito, la operación de cierre se registra a través de
onClose
.
Por lo tanto, la persona que llama debe garantizar un cierre adecuado como
try(Stream<Record> s=tableAsStream(dataSource, table)) {
// stream operation
}
Tenga en cuenta que también se ha agregado la entrega de una
SQLException
través de
RuntimeException
al método
tryAdvance
.
Por lo tanto, ahora puede agregar
throws SQLException
al método
createRecord
sin problemas.
No conozco ninguna biblioteca conocida que lo haga por usted.
Dicho esto,
este artículo
muestra cómo envolver el conjunto de resultados con un Iterator (ResultSetIterator) y pasarlo como el primer parámetro a
Spliterators.spliteratorUnknownSize()
para crear un
Spliterator
.
StreamSupport puede utilizar el
StreamSupport
para crear un Stream encima de él.
Su implementación sugerida de la clase
ResultSetIterator
:
public class ResultSetIterator implements Iterator {
private ResultSet rs;
private PreparedStatement ps;
private Connection connection;
private String sql;
public ResultSetIterator(Connection connection, String sql) {
assert connection != null;
assert sql != null;
this.connection = connection;
this.sql = sql;
}
public void init() {
try {
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
} catch (SQLException e) {
close();
throw new DataAccessException(e);
}
}
@Override
public boolean hasNext() {
if (ps == null) {
init();
}
try {
boolean hasMore = rs.next();
if (!hasMore) {
close();
}
return hasMore;
} catch (SQLException e) {
close();
throw new DataAccessException(e);
}
}
private void close() {
try {
rs.close();
try {
ps.close();
} catch (SQLException e) {
//nothing we can do here
}
} catch (SQLException e) {
//nothing we can do here
}
}
@Override
public Tuple next() {
try {
return SQL.rowAsTuple(sql, rs);
} catch (DataAccessException e) {
close();
throw e;
}
}
}
y entonces:
public static Stream stream(final Connection connection,
final String sql,
final Object... parms) {
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
new ResultSetIterator(connection, sql), 0), false);
}