tutorial spark examples example apache-spark

apache spark - spark - ¿Cómo leer múltiples archivos de texto en un solo RDD?



apache spark wikipedia (9)

Quiero leer un montón de archivos de texto de una ubicación hdfs y realizar un mapeo en él en una iteración usando chispa.

JavaRDD<String> records = ctx.textFile(args[1], 1); es capaz de leer solo un archivo a la vez.

Quiero leer más de un archivo y procesarlos como un solo RDD. ¿Cómo?


En PySpark, he encontrado una forma adicional útil de analizar archivos. Tal vez haya un equivalente en Scala, pero no me siento lo suficientemente cómodo para hacer una traducción funcional. Es, en efecto, una llamada de archivo de texto con la adición de etiquetas (en el siguiente ejemplo la clave = nombre de archivo, valor = 1 línea del archivo).

Texto "etiquetado"

entrada:

import glob from pyspark import SparkContext SparkContext.stop(sc) sc = SparkContext("local","example") # if running locally sqlContext = SQLContext(sc) for filename in glob.glob(Data_File + "/*"): Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

output: array con cada entrada que contiene una tupla usando filename-as-key y con value = cada línea de archivo. (Técnicamente, con este método también puede usar una clave diferente además del nombre de ruta de archivo real, tal vez una representación de hash para guardar en la memoria). es decir.

[(''/home/folder_with_text_files/file1.txt'', ''file1_contents_line1''), (''/home/folder_with_text_files/file1.txt'', ''file1_contents_line2''), (''/home/folder_with_text_files/file1.txt'', ''file1_contents_line3''), (''/home/folder_with_text_files/file2.txt'', ''file2_contents_line1''), ...]

También puede recombinarse como una lista de líneas:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[(''/home/folder_with_text_files/file1.txt'', [''file1_contents_line1'', ''file1_contents_line2'',''file1_contents_line3'']), (''/home/folder_with_text_files/file2.txt'', [''file2_contents_line1''])]

O vuelva a combinar los archivos completos en cadenas únicas (en este ejemplo, el resultado es el mismo que obtiene de wholeTextFiles, pero con la cadena "file:" eliminada del archivo).

Spark_Full.groupByKey().map(lambda x: (x[0], '' ''.join(list(x[1])))).collect()


Hay una solución limpia directa disponible. Use el método wholeTextFiles (). Esto tomará un directorio y formará un par de valores clave. El RDD devuelto será un par RDD. Encuentra debajo la descripción de Spark docs :

SparkContext.wholeTextFiles le permite leer un directorio que contiene múltiples archivos de texto pequeños, y devuelve cada uno de ellos como pares (nombre de archivo, contenido). Esto está en contraste con textFile, que devolvería un registro por línea en cada archivo


Puede especificar directorios completos, usar comodines e incluso CSV de directorios y comodines. P.ej:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

Como Nick Chammas señala, esta es una exposición de FileInputFormat de Hadoop y, por lo tanto, también funciona con Hadoop (y Scalding).


Puede usar una sola invocación de archivo de texto para leer múltiples archivos. Scala:

sc.textFile('',''.join(files))


Puedes usar esto

Primero, puede obtener un búfer / lista de rutas S3:

import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala }

Ahora pase este objeto List a la siguiente pieza de código, nota: sc es un objeto de SQLContext

var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } }

Ahora tienes un RDD Unificado final, es decir, df

Opcional, y también puede reparticionarlo en un solo BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Reparticionar siempre funciona: D


Todas las respuestas son correctas con sc.textFile

Me preguntaba por qué no wholeTextFiles en este caso ...

sc.wholeTextFiles(yourfileListFromFolder.mkString(",")) .flatMap{case (path, text) ...

Una limitación es que tenemos que cargar archivos pequeños, de lo contrario el rendimiento será malo y puede llevar a OOM.

Nota :

  • El archivo completo debe caber en la memoria
  • Bueno para formatos de archivo que NO son divisibles por línea ... como archivos XML

Referencia adicional para visit


Use la union siguiente manera:

val sc = new SparkContext(...) val r1 = sc.textFile("xxx1") val r2 = sc.textFile("xxx2") ... val rdds = Seq(r1, r2, ...) val bigRdd = sc.union(rdds)

Entonces el bigRdd es el RDD con todos los archivos.


puede usar - JavaRDD records = sc.wholeTextFiles ("ruta de su directorio") aquí obtendrá la ruta de su archivo y el contenido de ese archivo. para que pueda realizar cualquier acción de un archivo completo a la vez que ahorra la sobrecarga


rdd = textFile(''/data/{1.txt,2.txt}'')