tutorial spark software org examples scala hadoop apache-spark

spark - scala apache



Spark Scala lista de carpetas en el directorio (7)

Azure Blog Storage se asigna a una ubicación HDFS, por lo que todas las operaciones de Hadoop

En el Portal de Azure , vaya a Cuenta de almacenamiento, encontrará los siguientes detalles:

  • Cuenta de almacenamiento

  • Llave -

  • Envase -

  • Patrón de ruta - / users / accountsdata /

  • Formato de fecha - aaaa-mm-dd

  • Formato de serialización de eventos - json

  • Formato - línea separada

El patrón de ruta aquí es la ruta HDFS, puede iniciar sesión / putty en el nodo Hadoop Edge y hacer:

hadoop fs -ls /users/accountsdata

El comando de arriba listará todos los archivos. En Scala puedes usar

import scala.sys.process._ val lsResult = Seq("hadoop","fs","-ls","/users/accountsdata/").!!

Quiero enumerar todas las carpetas dentro de un directorio hdfs usando Scala / Spark. En Hadoop puedo hacer esto usando el comando: hadoop fs -ls hdfs://sandbox.hortonworks.com/demo/

Lo probé con:

val conf = new Configuration() val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"), conf) val path = new Path("hdfs://sandbox.hortonworks.com/demo/") val files = fs.listFiles(path, false)

Pero no parece que busque en el directorio de Hadoop ya que no puedo encontrar mis carpetas / archivos.

También probé con:

FileSystem.get(sc.hadoopConfiguration).listFiles(new Path("hdfs://sandbox.hortonworks.com/demo/"), true)

Pero esto tampoco ayuda.

¿Tiene usted alguna otra idea?

PD: también verifiqué este tema: Spark iterate HDFS directory, pero no funciona para mí, ya que parece que no busca en el directorio hdfs, sino en el sistema de archivos local con el archivo de esquema //.


Debido a que está utilizando Scala, también puede interesarle lo siguiente:

import scala.sys.process._ val lsResult = Seq("hadoop","fs","-ls","hdfs://sandbox.hortonworks.com/demo/").!!

Desafortunadamente, esto devolverá la salida completa del comando como una cadena, por lo que el análisis de solo los nombres de archivo requiere un poco de esfuerzo. (Utilice fs.listStatus en fs.listStatus lugar). Pero si necesita ejecutar otros comandos donde pueda hacerlo fácilmente en la línea de comandos y no está seguro de cómo hacerlo en Scala, solo use la línea de comandos a través de scala.sys.process._ . (¡Utilice una sola ! Si desea obtener el código de retorno).


Estaba buscando lo mismo, sin embargo, en lugar de HDFS , para S3 .

Resolví creando el FileSystem con mi ruta S3 como se muestra a continuación:

def getSubFolders(path: String)(implicit sparkContext: SparkContext): Seq[String] = { val hadoopConf = sparkContext.hadoopConfiguration val uri = new URI(path) FileSystem.get(uri, hadoopConf).listStatus(new Path(path)).map { _.getPath.toString } }

Sé que esta pregunta estaba relacionada con HDFS, pero quizás otros como yo vendrán aquí buscando la solución S3. Dado que sin especificar el URI en FileSystem, buscará los HDFS.

java.lang.IllegalArgumentException: Wrong FS: s3://<bucket>/dummy_path expected: hdfs://<ip-machine>.eu-west-1.compute.internal:8020


Estamos usando hadoop 1.4 y no tiene el método listFiles, así que usamos listStatus para obtener directorios. No tiene opción recursiva pero es fácil de administrar la búsqueda recursiva.

val fs = FileSystem.get(new Configuration()) val status = fs.listStatus(new Path(YOUR_HDFS_PATH)) status.foreach(x=> println(x.getPath))


object HDFSProgram extends App { val uri = new URI("hdfs://HOSTNAME:PORT") val fs = FileSystem.get(uri,new Configuration()) val filePath = new Path("/user/hive/") val status = fs.listStatus(filePath) status.map(sts => sts.getPath).foreach(println) }

Este es un código de ejemplo para obtener una lista de los archivos o carpetas hdf presentes en / user / hive /


val spark = SparkSession.builder().appName("Demo").getOrCreate() val path = new Path("enter your directory path") val fs:FileSystem = projects.getFileSystem(spark.sparkContext.hadoopConfiguration) val it = fs.listLocatedStatus(path)

Esto creará un iterador sobre org.apache.hadoop.fs.LocatedFileStatus que es su subdirectorio


val listStatus = org.apache.hadoop.fs.FileSystem.get(new URI(url), sc.hadoopConfiguration) .globStatus(new org.apache.hadoop.fs.Path(url)) for (urlStatus <- listStatus) { println("urlStatus get Path:" + urlStatus.getPath())

}