tutorial textfile spark iniciar español ejemplo scala apache-spark

textfile - ¿Cómo itero RDD''s en apache spark(scala)?



spark rdd take (5)

Intentaría hacer uso de una función de asignación de partición. El siguiente código muestra cómo se puede procesar un conjunto de datos RDD completo en un bucle para que cada entrada pase por la misma función. Me temo que no tengo conocimiento de Scala, por lo que todo lo que tengo para ofrecer es el código Java . Sin embargo, no debería ser muy difícil traducirlo a scala.

JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){ @Override public Iterable<String> call(Iterator <String> t) throws Exception { ArrayList<String[]> tmpRes = new ArrayList <>(); String[] fillData = new String[2]; fillData[0] = "filename"; fillData[1] = "content"; while(t.hasNext()){ tmpRes.add(fillData); } return Arrays.asList(tmpRes); } }).cache();

Utilizo el siguiente comando para llenar un RDD con un conjunto de arreglos que contienen 2 cadenas ["nombre de archivo", "contenido"].

Ahora quiero repetir cada una de esas ocurrencias para hacer algo con cada nombre de archivo y contenido.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")

Sin embargo, no puedo encontrar ninguna documentación sobre cómo hacer esto.

Así que lo que quiero es esto:

foreach occurrence-in-the-rdd{ //do stuff with the array found on loccation n of the RDD }


Las operaciones fundamentales en Spark son map y filter .

val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }

El txtRDD ahora solo contendrá archivos que tengan la extensión ".txt"

Y si quieres contar palabras esos archivos puedes decir

//split the documents into words in one long list val words = txtRDD flatMap { case (id,text) => text.split("//s+") } // give each word a count of 1 val wordT = words map (x => (x,1)) //sum up the counts for each word val wordCount = wordsT reduceByKey((a, b) => a + b)

Desea usar mapPartitions cuando tiene una inicialización costosa que necesita realizar, por ejemplo, si desea hacer un Reconocimiento de Entidades Nombradas con una biblioteca como las herramientas de Stanford coreNLP.

flatMap map maestro, filter , flatMap map flatMap y reduce , y flatMap en el buen camino para dominar Spark.


Llama a varios métodos en el RDD que aceptan funciones como parámetros.

// set up an example -- an RDD of arrays val sparkConf = new SparkConf().setMaster("local").setAppName("Example") val sc = new SparkContext(sparkConf) val testData = Array(Array(1,2,3), Array(4,5,6,7,8)) val testRDD = sc.parallelize(testData, 2) // Print the RDD of arrays. testRDD.collect().foreach(a => println(a.size)) // Use map() to create an RDD with the array sizes. val countRDD = testRDD.map(a => a.size) // Print the elements of this new RDD. countRDD.collect().foreach(a => println(a)) // Use filter() to create an RDD with just the longer arrays. val bigRDD = testRDD.filter(a => a.size > 3) // Print each remaining array. bigRDD.collect().foreach(a => { a.foreach(e => print(e + " ")) println() }) }

Observe que las funciones que escribe aceptan un solo elemento RDD como entrada y devuelven datos de algún tipo uniforme, por lo que crea un RDD del último tipo. Por ejemplo, countRDD es un RDD[Int] , mientras que bigRDD sigue siendo un RDD[Array[Int]] .

Probablemente será tentador en algún momento escribir un foreach que modifique algunos otros datos, pero debe resistirse por las razones descritas en esta pregunta y respuesta .

Edición: no intente imprimir grandes RDD

Varios lectores han preguntado sobre el uso de collect() y println() para ver sus resultados, como en el ejemplo anterior. Por supuesto, esto solo funciona si está ejecutando en un modo interactivo como el Spark REPL (read-eval-print-print). Es mejor llamar a collect() en el RDD para obtener una matriz secuencial para una impresión ordenada. Pero collect() puede devolver demasiados datos y, en cualquier caso, puede que se impriman demasiados. Aquí hay algunas formas alternativas de obtener información sobre sus RDD si son grandes:

  1. RDD.take() : Esto le da un control preciso sobre la cantidad de elementos que obtiene, pero no de dónde provienen, definidos como los "primeros", que es un concepto que se trata en varias otras preguntas y respuestas aquí.

    // take() returns an Array so no need to collect() myHugeRDD.take(20).foreach(a => println(a))

  2. RDD.sample() : Esto le permite (aproximadamente) controlar la fracción de los resultados que obtiene, si el muestreo usa reemplazo, e incluso opcionalmente la semilla de números aleatorios.

    // sample() does return an RDD so you may still want to collect() myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))

  3. RDD.takeSample() : este es un híbrido: utiliza un muestreo aleatorio que puede controlar, pero ambos le permiten especificar el número exacto de resultados y devolver una Array .

    // takeSample() returns an Array so no need to collect() myHugeRDD.takeSample(true, 20).foreach(a => println(a))

  4. RDD.count() : a veces la mejor idea proviene de la cantidad de elementos con los que terminó: a menudo hago esto primero.

    println(myHugeRDD.count())


lo que wholeTextFiles el wholeTextFiles es un RDD de par:

def wholeTextFiles (ruta: String, minPartitions: Int): RDD [(String, String)]

Lea un directorio de archivos de texto de HDFS, un sistema de archivos local (disponible en todos los nodos) o cualquier URI de sistema de archivos compatible con Hadoop. Cada archivo se lee como un único registro y se devuelve en un par clave-valor, donde la clave es la ruta de acceso de cada archivo, el valor es el contenido de cada archivo.

Aquí hay un ejemplo de cómo leer los archivos en una ruta local y luego imprimir cada nombre de archivo y contenido.

val conf = new SparkConf().setAppName("scala-test").setMaster("local") val sc = new SparkContext(conf) sc.wholeTextFiles("file:///Users/leon/Documents/test/") .collect .foreach(t => println(t._1 + ":" + t._2));

el resultado:

file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12} file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22} file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18}

o convirtiendo el par RDD a un RDD primero

sc.wholeTextFiles("file:///Users/leon/Documents/test/") .map(t => t._2) .collect .foreach { x => println(x)}

el resultado:

{"name":"tom","age":12} {"name":"john","age":22} {"name":"leon","age":18}

Y creo que wholeTextFiles es más compatible con archivos pequeños.


for (element <- YourRDD) { // do what you want with element in each iteration, and if you want the index of element, simply use a counter variable in this loop beginning from 0 println (element._1) // this will print all filenames }