tutorial textfile spark parallelize example espaƱol ejemplo python apache-spark

python - parallelize - Spark Context Textfile: carga varios archivos



spark python example (4)

Necesito procesar múltiples archivos dispersos en varios directorios. Me gustaría cargar todo esto en un solo RDD y luego realizar un mapa / reducir en él. Veo que SparkContext puede cargar varios archivos desde un solo directorio usando comodines. No estoy seguro de cómo cargar archivos desde múltiples carpetas.

El siguiente fragmento de código falla:

for fileEntry in files: fileName = basePath + "/" + fileEntry lines = sc.textFile(fileName) if retval == None: retval = lines else: retval = sc.union(retval, lines)

Esto falla en el tercer ciclo con el siguiente mensaje de error:

retval = sc.union(retval, lines) TypeError: union() takes exactly 2 arguments (3 given)

Lo cual es extraño dado que proporciono solo 2 argumentos. Cualquier puntero apreciado.


¿Qué tal esta frase en su lugar?

sc.union([sc.textFile(basepath + "/" + f) for f in files])

En Scala SparkContext.union() tiene dos variantes, una que toma argumentos vararg y otra que toma una lista. Solo el segundo existe en Python (ya que Python no tiene polimorfismo).

ACTUALIZAR

Puede usar una sola textFile para leer múltiples archivos.

sc.textFile('',''.join(files))


Puede usar la siguiente función de SparkContext:

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

Lea un directorio de archivos de texto de HDFS, un sistema de archivos local (disponible en todos los nodos) o cualquier URI del sistema de archivos soportado por Hadoop. Cada archivo se lee como un registro único y se devuelve en un par clave-valor, donde la clave es la ruta de cada archivo, el valor es el contenido de cada archivo.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext


Puedes usar esto

Primero, puede obtener un búfer / lista de rutas S3:

import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala }

Ahora pase este objeto List a la siguiente pieza de código, nota: sc es un objeto de SQLContext

var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } }

Ahora tienes un RDD Unificado final, es decir, df

Opcional, y también puede reparticionarlo en un solo BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Reparticionar siempre funciona: D


Resuelvo problemas similares usando comodines.

por ejemplo, encontré algunos rasgos en los archivos que quiero cargar en chispa,

dir

subdir1 / folder1 / x.txt

subdir2 / folder2 / y.txt

puedes usar la siguiente oración

sc.textFile("dir/*/*/*.txt")

para cargar todos los archivos relativos.

El comodín ''*'' solo funciona en el directorio de un solo nivel, que no es recursivo.