software - spark scala tutorial
Cómo imprimir los contenidos de RDD? (8)
En lugar de escribir cada vez, puede;
[1] Cree un método de impresión genérico dentro de Spark Shell.
def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println)
[2] O mejor aún, usando implícitos, puedes agregar la función a la clase RDD para imprimir su contenido.
implicit class Printer(rdd: org.apache.spark.rdd.RDD[_]) {
def print = rdd.foreach(println)
}
Ejemplo de uso:
val rdd = sc.parallelize(List(1,2,3,4)).map(_*2)
p(rdd) // 1
rdd.print // 2
Salida:
2
6
4
8
PD. Esto solo tiene sentido si está trabajando en modo local y con una pequeña cantidad de conjunto de datos. De lo contrario, no podrá ver los resultados en el cliente o se quedará sin memoria debido al gran resultado del conjunto de datos.
Estoy intentando imprimir el contenido de una colección en la consola Spark.
Tengo un tipo:
linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3]
Y uso el comando:
scala> linesWithSessionId.map(line => println(line))
Pero esto está impreso:
res1: org.apache.spark.rdd.RDD [Unit] = MappedRDD [4] en el mapa en: 19
¿Cómo puedo escribir el RDD en la consola o guardarlo en el disco para que pueda ver su contenido?
En python
linesWithSessionIdCollect = linesWithSessionId.collect()
linesWithSessionIdCollect
Esto imprimirá todos los contenidos del RDD
La función de map
es una transformación , lo que significa que Spark no evaluará realmente su RDD hasta que ejecute una acción en él.
Para imprimirlo, puede usar foreach
(que es una acción):
linesWithSessionId.foreach(println)
Para escribirlo en el disco, puede usar una de las funciones saveAs...
(acciones fijas) de la API de RDD
Probablemente haya muchas diferencias arquitectónicas entre myRDD.foreach(println)
y myRDD.collect().foreach(println)
(no solo ''collect'', sino también otras acciones). Una de las diferencias que vi es que al hacer myRDD.foreach(println)
, la salida estará en orden aleatorio. Por ejemplo: si mi rdd proviene de un archivo de texto donde cada línea tiene un número, la salida tendrá un orden diferente. Pero cuando hice myRDD.collect().foreach(println)
, el orden sigue siendo igual que el archivo de texto.
Puede convertir su RDD
en un DataFrame
luego show()
.
// For implicit conversion from RDD to DataFrame
import spark.implicits._
fruits = sc.parallelize([("apple", 1), ("banana", 2), ("orange", 17)])
// convert to DF then show it
fruits.toDF().show()
Esto mostrará las 20 líneas principales de sus datos, por lo que el tamaño de sus datos no debería ser un problema.
+------+---+
| _1| _2|
+------+---+
| apple| 1|
|banana| 2|
|orange| 17|
+------+---+
Si desea ver el contenido de un RDD, una forma es usar collect()
:
myRDD.collect().foreach(println)
Sin embargo, esa no es una buena idea cuando el RDD tiene miles de millones de líneas. Use take()
para tomar solo unos pocos para imprimir:
myRDD.take(n).foreach(println)
Si está ejecutando esto en un clúster, println
no volverá a imprimir en su contexto. RDD
traer los datos del RDD
a su sesión. Para hacer esto, puede forzarlo a la matriz local y luego imprimirlo:
linesWithSessionId.toArray().foreach(line => println(line))
También puede guardar como un archivo: rdd.saveAsTextFile("alicia.txt")