versiones guia español actualizar scala apache-spark dataframe apache-spark-sql spark-csv

scala - guia - Proporcione un esquema mientras lee el archivo csv como un marco de datos



qgis español (5)

Aquí le mostramos cómo puede trabajar con un esquema personalizado, una demostración completa:

$> código de shell,

echo " Slingo, iOS Slingo, Android " > game.csv

Código Scala:

import org.apache.spark.sql.types._ val customSchema = StructType(Array( StructField("game_id", StringType, true), StructField("os_id", StringType, true) )) val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv") csv_df.show csv_df.orderBy(asc("game_id"), desc("os_id")).show csv_df.createOrReplaceTempView("game_view") val sort_df = sql("select * from game_view order by game_id, os_id desc") sort_df.show

Estoy tratando de leer un archivo csv en un marco de datos. Sé cuál debería ser el esquema de mi marco de datos ya que conozco mi archivo csv. También estoy usando el paquete spark csv para leer el archivo. Intento especificar el esquema como abajo.

val pagecount = sqlContext.read.format("csv") .option("delimiter"," ").option("quote","") .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long") .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

Pero cuando verifico el esquema del marco de datos que creé, parece que tomó su propio esquema. Estoy haciendo algo mal ? ¿Cómo hacer chispa para recoger el esquema que mencioné?

> pagecount.printSchema root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) |-- _c3: string (nullable = true)


Estoy usando la solución provista por Arunakiran Nulu en mi análisis (vea el código). A pesar de que es capaz de asignar los tipos correctos a las columnas, todos los valores devueltos son null . Anteriormente, probé la opción .option("inferSchema", "true") y devuelve los valores correctos en el marco de datos (aunque de tipo diferente).

val customSchema = StructType(Array( StructField("numicu", StringType, true), StructField("fecha_solicitud", TimestampType, true), StructField("codtecnica", StringType, true), StructField("tecnica", StringType, true), StructField("finexploracion", TimestampType, true), StructField("ultimavalidacioninforme", TimestampType, true), StructField("validador", StringType, true))) val df_explo = spark.read .format("csv") .option("header", "true") .option("delimiter", "/t") .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") .schema(customSchema) .load(filename)

Resultado

root |-- numicu: string (nullable = true) |-- fecha_solicitud: timestamp (nullable = true) |-- codtecnica: string (nullable = true) |-- tecnica: string (nullable = true) |-- finexploracion: timestamp (nullable = true) |-- ultimavalidacioninforme: timestamp (nullable = true) |-- validador: string (nullable = true)

y la mesa es:

|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador| +------+---------------+----------+-------+--------------+-----------------------+---------+ | null| null| null| null| null| null| null| | null| null| null| null| null| null| null| | null| null| null| null| null| null| null| | null| null| null| null| null| null| null|


Gracias a la respuesta de @Nulu, funciona para pyspark con un mínimo de ajustes

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType customSchema = StructType(Array( StructField("project", StringType, true), StructField("article", StringType, true), StructField("requests", IntegerType, true), StructField("bytes_served", DoubleType, true))) pagecount = sc.read.format("com.databricks.spark.csv") .option("delimiter"," ") .option("quote","") .option("header", "false") .schema(customSchema) .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")


Intente a continuación, no necesita especificar el esquema. cuando le das a inferSchema como verdadero, debería sacarlo de tu archivo csv.

val pagecount = sqlContext.read.format("csv") .option("delimiter"," ").option("quote","") .option("header", "true") .option("inferSchema", "true") .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

Si desea especificar manualmente el esquema, debe hacer lo siguiente:

import org.apache.spark.sql.types._ val customSchema = StructType(Array( StructField("project", StringType, true), StructField("article", StringType, true), StructField("requests", IntegerType, true), StructField("bytes_served", DoubleType, true))) val pagecount = sqlContext.read.format("csv") .option("delimiter"," ").option("quote","") .option("header", "true") .schema(customSchema) .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")


Para aquellos interesados ​​en hacer esto en Python, aquí hay una versión funcional.

customSchema = StructType([ StructField("IDGC", StringType(), True), StructField("SEARCHNAME", StringType(), True), StructField("PRICE", DoubleType(), True) ]) productDF = spark.read.load(''/home/ForTesting/testProduct.csv'', format="csv", header="true", sep=''|'', schema=customSchema) testProduct.csv ID|SEARCHNAME|PRICE 6607|EFKTON75LIN|890.88 6612|EFKTON100HEN|55.66

Espero que esto ayude.