scala stream resultset

Tratar un conjunto de resultados SQL como un flujo de Scala



stream resultset (6)

Cuando pregunto en una base de datos y recibo un conjunto de resultados (solo lectura, solo lectura).

Estoy tratando de encontrar una manera de tratar este ResultSet como un Scala Stream . Esto permitirá operaciones tales como filter , map , etc., mientras no consuma grandes cantidades de RAM.

Implementé un método recursivo de cola para extraer los elementos individuales, pero esto requiere que todos los elementos estén en la memoria al mismo tiempo, un problema si el ResultSet es muy grande:

// Iterate through the result set and gather all of the String values into a list // then return that list @tailrec def loop(resultSet: ResultSet, accumulator: List[String] = List()): List[String] = { if (!resultSet.next) accumulator.reverse else { val value = resultSet.getString(1) loop(resultSet, value +: accumulator) } }


Debido a que ResultSet es solo un objeto mutable que se navega por el siguiente, debemos definir nuestro propio concepto de una fila siguiente. Podemos hacerlo con una función de entrada de la siguiente manera:

class ResultSetIterator[T](rs: ResultSet, nextRowFunc: ResultSet => T) extends Iterator[T] { private var nextVal: Option[T] = None override def hasNext: Boolean = { val ret = rs.next() if(ret) { nextVal = Some(nextRowFunc(rs)) } else { nextVal = None } ret } override def next(): T = nextVal.getOrElse { hasNext nextVal.getOrElse( throw new ResultSetIteratorOutOfBoundsException )} class ResultSetIteratorOutOfBoundsException extends Exception("ResultSetIterator reached end of list and next can no longer be called. hasNext should return false.") }

EDITAR: Traducir a la transmisión o algo más según lo anterior.


Esta implementación, aunque más larga y torpe, está en mejor correspondencia con el contrato ResultSet. El efecto secundario se ha eliminado de hasNext (...) y se ha movido a next ().

new Iterator[String] { private var available = resultSet.next() override def hasNext: Boolean = available override def next(): String = { val string = resultSet.getString(1) available = resultSet.next() string } }


Esto suena como una gran oportunidad para una clase implícita. Primero define la clase implícita en algún lugar:

import java.sql.ResultSet object Implicits { implicit class ResultSetStream(resultSet: ResultSet) { def toStream: Stream[ResultSet] = { new Iterator[ResultSet] { def hasNext = resultSet.next() def next() = resultSet }.toStream } } }

A continuación, simplemente importe esta clase implícita donde haya ejecutado su consulta y definido el objeto ResultSet:

import com.company.Implicits._

Finalmente saque los datos usando el método toStream. Por ejemplo, obtenga todos los identificadores como se muestra a continuación:

val allIds = resultSet.toStream.map(result => result.getInt("id"))


Función de utilidad para la respuesta de @ elbowich:

def results[T](resultSet: ResultSet)(f: ResultSet => T) = { new Iterator[T] { def hasNext = resultSet.next() def next() = f(resultSet) } }

Le permite utilizar la inferencia de tipos. P.ej:

stmt.execute("SELECT mystr, myint FROM mytable") // Example 1: val it = results(stmt.resultSet) { case rs => rs.getString(1) -> 100 * rs.getInt(2) } val m = it.toMap // Map[String, Int] // Example 2: val it = results(stmt.resultSet)(_.getString(1))


Necesitaba algo similar. Aprovechando la muy buena respuesta de elbowich, la envolví un poco y, en lugar de la cadena, devuelvo el resultado (para que pueda obtener cualquier columna)

def resultSetItr(resultSet: ResultSet): Stream[ResultSet] = { new Iterator[ResultSet] { def hasNext = resultSet.next() def next() = resultSet }.toStream }

Necesitaba acceder a los metadatos de la tabla, pero esto funcionará para las filas de la tabla (podría hacer un stmt.executeQuery (sql) en lugar de md.getColumns):

val md = connection.getMetaData() val columnItr = resultSetItr( md.getColumns(null, null, "MyTable", null)) val columns = columnItr.map(col => { val columnType = col.getString("TYPE_NAME") val columnName = col.getString("COLUMN_NAME") val columnSize = col.getString("COLUMN_SIZE") new Column(columnName, columnType, columnSize.toInt, false) })


No lo probé, pero ¿por qué no funcionaría?

new Iterator[String] { def hasNext = resultSet.next() def next() = resultSet.getString(1) }.toStream