spark significado parallelize lazy scala apache-spark rdd

scala - significado - Spark: Cómo usar mapPartition y crear/cerrar conexión por partición



rdd spark (2)

Por lo tanto, quiero hacer ciertas operaciones en mi spark DataFrame, escribirlas en DB y crear otro DataFrame al final. Se parece a esto :

import sqlContext.implicits._ val newDF = myDF.mapPartitions( iterator => { val conn = new DbConnection iterator.map( row => { addRowToBatch(row) convertRowToObject(row) }) conn.writeTheBatchToDB() conn.close() }) .toDF()

Esto me da un error ya que mapPartitions espera el tipo de devolución de Iterator[NotInferedR] , pero aquí está Unit . Sé que esto es posible con forEachPartition, pero también me gustaría hacer el mapeo. Hacerlo por separado sería una sobrecarga (trabajo adicional de chispa). ¿Qué hacer?

¡Gracias!


La última expresión en la implementación de la función anónima debe ser el valor de retorno:

import sqlContext.implicits._ val newDF = myDF.mapPartitions( iterator => { val conn = new DbConnection // using toList to force eager computation - make it happen now when connection is open val result = iterator.map(/* the same... */).toList conn.writeTheBatchToDB() conn.close() result.iterator } ).toDF()


En la mayoría de los casos, consumir ansioso el iterador dará como resultado un error de ejecución si no se ralentiza el trabajo. Por lo tanto, lo que hice fue verificar si el iterador ya está vacío y luego hacer las rutinas de limpieza.

rdd.mapPartitions(itr => { val conn = new DbConnection itr.map(data => { val yourActualResult = // do something with your data and conn here if(itr.isEmpty) conn.close // close the connection yourActualResult }) })

Pensé que esto era un problema de chispa al principio, pero en realidad era un scala uno. http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean