preparedstatement java jdbc lambda java-stream jooq

preparedstatement - resultset java 8



java.util.stream con ResultSet (4)

Tengo pocas tablas con gran cantidad de datos (alrededor de 100 millones de registros). Por lo tanto, no puedo almacenar estos datos en la memoria, pero me gustaría transmitir este conjunto de resultados usando la clase java.util.stream y pasar esta transmisión a otra clase. Leí sobre los operadores Stream.of y Stream.Builder pero son flujos almacenados en la memoria intermedia. Entonces, ¿hay alguna manera de resolver esta pregunta? Gracias por adelantado.

ACTUALIZACIÓN # 1

Bien, busqué en Google y encontré la biblioteca jooq . No estoy seguro, pero parece que podría ser aplicable a mi caso de prueba. Para resumir, tengo pocas tablas con gran cantidad de datos. Me gustaría transmitir mi conjunto de resultados y transferir esta transmisión a otro método. Algo como esto:

// why return Stream<String>? Because my result set has String type private Stream<Record> writeTableToStream(DataSource dataSource, String table) { Stream<Record> record = null; try (Connection connection = dataSource.getConnection()) { String sql = "select * from " + table; try (PreparedStatement pSt = connection.prepareStatement(sql)) { connection.setAutoCommit(false); pSt.setFetchSize(5000); ResultSet resultSet = pSt.executeQuery(); // record = DSL.using(connection) .fetch(resultSet).stream(); } } catch (SQLException sqlEx) { logger.error(sqlEx); } return record; }

¿Podría alguien aconsejarme, estoy en el camino correcto? Gracias.

ACTUALIZACIÓN # 2

Hice un experimento con jooq y podría decir ahora que la decisión anterior no es adecuada para mí. Este record = DSL.using(connection).fetch(resultSet).stream(); código record = DSL.using(connection).fetch(resultSet).stream(); toma mucho tiempo


jOOQ

Voy a responder la parte de jOOQ de tu pregunta. A partir de jOOQ 3.8, ahora ha habido bastantes características adicionales relacionadas con la combinación de jOOQ con Stream. Otros usos también están documentados en esta página de jOOQ .

Su uso sugerido:

Intentaste esto:

Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();

De hecho, esto no funciona bien para grandes conjuntos de resultados porque fetch(ResultSet) todo el conjunto de resultados en la memoria y luego llama a Collection.stream() en él.

Mejor uso (perezoso):

En cambio, podrías escribir esto:

try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) { ... }

... lo cual es esencialmente conveniente para esto:

try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) { Stream<Record> stream = cursor.stream(); ... }

Consulte también DSLContext.fetchStream(ResultSet)

Por supuesto, también puedes dejar que jOOQ ejecute tu cadena SQL, en lugar de luchar con JDBC:

try (Stream<Record> stream = DSL.using(dataSource) .resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection .fetchSize(5000) .fetchStream()) { ... }

Sobre el uso de prueba con recursos

Tenga en cuenta que una Stream producida por jOOQ es "ingeniosa", es decir, contiene una referencia a un ResultSet (y PreparedStatement ) abierto. Por lo tanto, si realmente desea devolver esa transmisión fuera de su método, asegúrese de que esté cerrada correctamente.


Aquí está la muestra más simple de AbacusUtil .

final DataSource ds = JdbcUtil.createDataSource(url, user, password); final SQLExecutor sqlExecutor = new SQLExecutor(ds); sqlExecutor.stream(sql, parameters);

Divulgación: Soy el desarrollador de AbacusUtil.


Lo primero que debes entender es que ese código

try (Connection connection = dataSource.getConnection()) { … try (PreparedStatement pSt = connection.prepareStatement(sql)) { … return stream; } }

no funciona ya que cuando abandonas los bloques de try , los recursos están cerrados mientras el procesamiento de Stream ni siquiera ha comenzado.

La construcción de gestión de recursos "probar con recursos" funciona para los recursos utilizados dentro de un ámbito de bloque dentro de un método, pero está creando un método de fábrica que devuelve un recurso. Por lo tanto, debe asegurarse de que el cierre de la secuencia devuelta cerrará los recursos y la persona que llama es responsable de cerrar la Stream .

Además, necesita una función que produzca un elemento a partir de una sola línea desde ResultSet . Supongamos que tienes un método como

Record createRecord(ResultSet rs) { … }

puedes crear un Stream<Record> básicamente como

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>( Long.MAX_VALUE,Spliterator.ORDERED) { @Override public boolean tryAdvance(Consumer<? super Record> action) { if(!resultSet.next()) return false; action.accept(createRecord(resultSet)); return true; } }, false);

Pero para hacerlo correctamente, debe incorporar el manejo de excepciones y el cierre de recursos. Puede usar Stream.onClose para registrar una acción que se realizará cuando se cierre Stream , pero tiene que ser un Runnable que no pueda lanzar excepciones marcadas. Del mismo modo, el método tryAdvance no puede lanzar excepciones marcadas. Y dado que no podemos simplemente anidar los bloques try(…) aquí, la lógica del programa de excepciones de supresión lanzada de close , cuando ya hay una excepción pendiente, no es gratis.

Para ayudarnos aquí, presentamos un nuevo tipo que puede ajustar las operaciones de cierre que pueden arrojar excepciones marcadas y entregarlas envueltas en una excepción no verificada. Al implementar AutoCloseable , puede utilizar la construcción try(…) para encadenar operaciones de cierre de forma segura:

interface UncheckedCloseable extends Runnable, AutoCloseable { default void run() { try { close(); } catch(Exception ex) { throw new RuntimeException(ex); } } static UncheckedCloseable wrap(AutoCloseable c) { return c::close; } default UncheckedCloseable nest(AutoCloseable c) { return ()->{ try(UncheckedCloseable c1=this) { c.close(); } }; } }

Con esto, toda la operación se convierte en:

private Stream<Record> tableAsStream(DataSource dataSource, String table) throws SQLException { UncheckedCloseable close=null; try { Connection connection = dataSource.getConnection(); close=UncheckedCloseable.wrap(connection); String sql = "select * from " + table; PreparedStatement pSt = connection.prepareStatement(sql); close=close.nest(pSt); connection.setAutoCommit(false); pSt.setFetchSize(5000); ResultSet resultSet = pSt.executeQuery(); close=close.nest(resultSet); return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>( Long.MAX_VALUE,Spliterator.ORDERED) { @Override public boolean tryAdvance(Consumer<? super Record> action) { try { if(!resultSet.next()) return false; action.accept(createRecord(resultSet)); return true; } catch(SQLException ex) { throw new RuntimeException(ex); } } }, false).onClose(close); } catch(SQLException sqlEx) { if(close!=null) try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); } throw sqlEx; } }

Este método envuelve la operación de cierre necesaria para todos los recursos, Connection , Statement y ResultSet dentro de una instancia de la clase de utilidad descrita anteriormente. Si ocurre una excepción durante la inicialización, la operación de cierre se realiza de inmediato y la excepción se entrega a la persona que llama. Si la construcción de la transmisión tiene éxito, la operación de cierre se registra a través de onClose .

Por lo tanto, la persona que llama debe garantizar un cierre adecuado como

try(Stream<Record> s=tableAsStream(dataSource, table)) { // stream operation }

Tenga en cuenta que también se ha agregado la entrega de una SQLException través de RuntimeException al método tryAdvance . Por lo tanto, ahora puede agregar throws SQLException al método createRecord sin problemas.


No conozco ninguna biblioteca conocida que lo haga por usted.

Dicho esto, este artículo muestra cómo envolver el conjunto de resultados con un Iterator (ResultSetIterator) y pasarlo como el primer parámetro a Spliterators.spliteratorUnknownSize() para crear un Spliterator .

StreamSupport puede utilizar el StreamSupport para crear un Stream encima de él.

Su implementación sugerida de la clase ResultSetIterator :

public class ResultSetIterator implements Iterator { private ResultSet rs; private PreparedStatement ps; private Connection connection; private String sql; public ResultSetIterator(Connection connection, String sql) { assert connection != null; assert sql != null; this.connection = connection; this.sql = sql; } public void init() { try { ps = connection.prepareStatement(sql); rs = ps.executeQuery(); } catch (SQLException e) { close(); throw new DataAccessException(e); } } @Override public boolean hasNext() { if (ps == null) { init(); } try { boolean hasMore = rs.next(); if (!hasMore) { close(); } return hasMore; } catch (SQLException e) { close(); throw new DataAccessException(e); } } private void close() { try { rs.close(); try { ps.close(); } catch (SQLException e) { //nothing we can do here } } catch (SQLException e) { //nothing we can do here } } @Override public Tuple next() { try { return SQL.rowAsTuple(sql, rs); } catch (DataAccessException e) { close(); throw e; } } }

y entonces:

public static Stream stream(final Connection connection, final String sql, final Object... parms) { return StreamSupport .stream(Spliterators.spliteratorUnknownSize( new ResultSetIterator(connection, sql), 0), false); }