python - parallelize - Spark Context Textfile: carga varios archivos
spark python example (4)
Necesito procesar múltiples archivos dispersos en varios directorios. Me gustaría cargar todo esto en un solo RDD y luego realizar un mapa / reducir en él. Veo que SparkContext puede cargar varios archivos desde un solo directorio usando comodines. No estoy seguro de cómo cargar archivos desde múltiples carpetas.
El siguiente fragmento de código falla:
for fileEntry in files:
fileName = basePath + "/" + fileEntry
lines = sc.textFile(fileName)
if retval == None:
retval = lines
else:
retval = sc.union(retval, lines)
Esto falla en el tercer ciclo con el siguiente mensaje de error:
retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)
Lo cual es extraño dado que proporciono solo 2 argumentos. Cualquier puntero apreciado.
¿Qué tal esta frase en su lugar?
sc.union([sc.textFile(basepath + "/" + f) for f in files])
En Scala SparkContext.union()
tiene dos variantes, una que toma argumentos vararg y otra que toma una lista. Solo el segundo existe en Python (ya que Python no tiene polimorfismo).
ACTUALIZAR
Puede usar una sola textFile
para leer múltiples archivos.
sc.textFile('',''.join(files))
Puede usar la siguiente función de SparkContext:
wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]
Lea un directorio de archivos de texto de HDFS, un sistema de archivos local (disponible en todos los nodos) o cualquier URI del sistema de archivos soportado por Hadoop. Cada archivo se lee como un registro único y se devuelve en un par clave-valor, donde la clave es la ruta de cada archivo, el valor es el contenido de cada archivo.
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext
Puedes usar esto
Primero, puede obtener un búfer / lista de rutas S3:
import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest
def listFiles(s3_bucket:String, base_prefix : String) = {
var files = new ArrayList[String]
//S3 Client and List Object Request
var s3Client = new AmazonS3Client();
var objectListing: ObjectListing = null;
var listObjectsRequest = new ListObjectsRequest();
//Your S3 Bucket
listObjectsRequest.setBucketName(s3_bucket)
//Your Folder path or Prefix
listObjectsRequest.setPrefix(base_prefix)
//Adding s3:// to the paths and adding to a list
do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());
//Removing Base Directory Name
files.remove(0)
//Creating a Scala List for same
files.asScala
}
Ahora pase este objeto List a la siguiente pieza de código, nota: sc es un objeto de SQLContext
var df: DataFrame = null;
for (file <- files) {
val fileDf= sc.textFile(file)
if (df!= null) {
df= df.unionAll(fileDf)
} else {
df= fileDf
}
}
Ahora tienes un RDD Unificado final, es decir, df
Opcional, y también puede reparticionarlo en un solo BigRDD
val files = sc.textFile(filename, 1).repartition(1)
Reparticionar siempre funciona: D
Resuelvo problemas similares usando comodines.
por ejemplo, encontré algunos rasgos en los archivos que quiero cargar en chispa,
dir
subdir1 / folder1 / x.txt
subdir2 / folder2 / y.txt
puedes usar la siguiente oración
sc.textFile("dir/*/*/*.txt")
para cargar todos los archivos relativos.
El comodín ''*'' solo funciona en el directorio de un solo nivel, que no es recursivo.