apache-spark - to_date - spark sql to date
Convertir fecha de cadena a formato de fecha en marcos de datos (8)
Estoy tratando de convertir una columna que está en formato de cadena a formato de fecha usando la función
to_date
pero sus valores nulos devueltos.
df.createOrReplaceTempView("incidents")
spark.sql("select Date from incidents").show()
+----------+
| Date|
+----------+
|08/26/2016|
|08/26/2016|
|08/26/2016|
|06/14/2016|
spark.sql("select to_date(Date) from incidents").show()
+---------------------------+
|to_date(CAST(Date AS DATE))|
+---------------------------+
| null|
| null|
| null|
| null|
La columna Fecha está en formato de cadena:
|-- Date: string (nullable = true)
Como su objetivo principal era convertir el tipo de columna en un DataFrame de String a Timestamp, creo que este enfoque sería mejor.
import org.apache.spark.sql.functions.{to_date, to_timestamp}
val modifiedDF = DF.withColumn("Date", to_date($"Date", "MM/dd/yyyy"))
También puede usar
to_timestamp
(creo que está disponible en Spark 2.x) si necesita una marca de tiempo de grano fino.
La solución propuesta anteriormente por Sai Kiriti Badam funcionó para mí.
Estoy usando Azure Databricks para leer datos capturados de un EventHub. Contiene una columna de cadena llamada EnqueuedTimeUtc con el siguiente formato ...
12/7/2018 12:54:13 PM
Estoy usando una computadora portátil Python y usé lo siguiente ...
import pyspark.sql.functions as func
sports_messages = sports_df.withColumn("EnqueuedTimestamp", func.to_timestamp("EnqueuedTimeUtc", "MM/dd/yyyy hh:mm:ss aaa"))
... para crear una nueva columna EnqueuedTimestamp del tipo "marca de tiempo" con datos en el siguiente formato ...
2018-12-07 12:54:13
Personalmente, he encontrado algunos errores al usar conversaciones de fecha basadas en unix_timestamp del formato dd-MMM-aaaa a aaaa-mm-dd, usando spark 1.6, pero esto puede extenderse a versiones recientes. A continuación, explico una forma de resolver el problema usando java.time que debería funcionar en todas las versiones de spark:
He visto errores al hacer:
from_unixtime(unix_timestamp(StockMarketClosingDate, ''dd-MMM-yyyy''), ''yyyy-MM-dd'') as FormattedDate
A continuación hay un código para ilustrar el error y mi solución para solucionarlo. Primero leí en los datos del mercado de valores, en un formato de archivo estándar común:
import sys.process._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DateType}
import sqlContext.implicits._
val EODSchema = StructType(Array(
StructField("Symbol" , StringType, true), //$1
StructField("Date" , StringType, true), //$2
StructField("Open" , StringType, true), //$3
StructField("High" , StringType, true), //$4
StructField("Low" , StringType, true), //$5
StructField("Close" , StringType, true), //$6
StructField("Volume" , StringType, true) //$7
))
val textFileName = "/user/feeds/eoddata/INDEX/INDEX_19*.csv"
// below is code to read using later versions of spark
//val eoddata = spark.read.format("csv").option("sep", ",").schema(EODSchema).option("header", "true").load(textFileName)
// here is code to read using 1.6, via, "com.databricks:spark-csv_2.10:1.2.0"
val eoddata = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("delimiter", ",") //.option("dateFormat", "dd-MMM-yyyy") failed to work
.schema(EODSchema)
.load(textFileName)
eoddata.registerTempTable("eoddata")
Y aquí están las conversiones de fechas que tienen problemas:
%sql
-- notice there are errors around the turn of the year
Select
e.Date as StringDate
, cast(from_unixtime(unix_timestamp(e.Date, "dd-MMM-yyyy"), ''YYYY-MM-dd'') as Date) as ProperDate
, e.Close
from eoddata e
where e.Symbol = ''SPX.IDX''
order by cast(from_unixtime(unix_timestamp(e.Date, "dd-MMM-yyyy"), ''YYYY-MM-dd'') as Date)
limit 1000
Un gráfico hecho en zeppelin muestra picos, que son errores.
y aquí está la verificación que muestra los errores de conversión de fecha:
// shows the unix_timestamp conversion approach can create errors
val result = sqlContext.sql("""
Select errors.* from
(
Select
t.*
, substring(t.OriginalStringDate, 8, 11) as String_Year_yyyy
, substring(t.ConvertedCloseDate, 0, 4) as Converted_Date_Year_yyyy
from
( Select
Symbol
, cast(from_unixtime(unix_timestamp(e.Date, "dd-MMM-yyyy"), ''YYYY-MM-dd'') as Date) as ConvertedCloseDate
, e.Date as OriginalStringDate
, Close
from eoddata e
where e.Symbol = ''SPX.IDX''
) t
) errors
where String_Year_yyyy <> Converted_Date_Year_yyyy
""")
//df.withColumn("tx_date", to_date(unix_timestamp($"date", "M/dd/yyyy").cast("timestamp")))
result.registerTempTable("SPX")
result.cache()
result.show(100)
result: org.apache.spark.sql.DataFrame = [Symbol: string, ConvertedCloseDate: date, OriginalStringDate: string, Close: string, String_Year_yyyy: string, Converted_Date_Year_yyyy: string]
res53: result.type = [Symbol: string, ConvertedCloseDate: date, OriginalStringDate: string, Close: string, String_Year_yyyy: string, Converted_Date_Year_yyyy: string]
+-------+------------------+------------------+-------+----------------+------------------------+
| Symbol|ConvertedCloseDate|OriginalStringDate| Close|String_Year_yyyy|Converted_Date_Year_yyyy|
+-------+------------------+------------------+-------+----------------+------------------------+
|SPX.IDX| 1997-12-30| 30-Dec-1996| 753.85| 1996| 1997|
|SPX.IDX| 1997-12-31| 31-Dec-1996| 740.74| 1996| 1997|
|SPX.IDX| 1998-12-29| 29-Dec-1997| 953.36| 1997| 1998|
|SPX.IDX| 1998-12-30| 30-Dec-1997| 970.84| 1997| 1998|
|SPX.IDX| 1998-12-31| 31-Dec-1997| 970.43| 1997| 1998|
|SPX.IDX| 1998-01-01| 01-Jan-1999|1229.23| 1999| 1998|
+-------+------------------+------------------+-------+----------------+------------------------+
FINISHED
Después de este resultado, cambié a las conversiones java.time con un UDF como este, que funcionó para mí:
// now we will create a UDF that uses the very nice java.time library to properly convert the silly stockmarket dates
// start by importing the specific java.time libraries that superceded the joda.time ones
import java.time.LocalDate
import java.time.format.DateTimeFormatter
// now define a specific data conversion function we want
def fromEODDate (YourStringDate: String): String = {
val formatter = DateTimeFormatter.ofPattern("dd-MMM-yyyy")
var retDate = LocalDate.parse(YourStringDate, formatter)
// this should return a proper yyyy-MM-dd date from the silly dd-MMM-yyyy formats
// now we format this true local date with a formatter to the desired yyyy-MM-dd format
val retStringDate = retDate.format(DateTimeFormatter.ISO_LOCAL_DATE)
return(retStringDate)
}
Ahora lo registro como una función para usar en sql:
sqlContext.udf.register("fromEODDate", fromEODDate(_:String))
y verifique los resultados, y vuelva a ejecutar la prueba:
val results = sqlContext.sql("""
Select
e.Symbol as Symbol
, e.Date as OrigStringDate
, Cast(fromEODDate(e.Date) as Date) as ConvertedDate
, e.Open
, e.High
, e.Low
, e.Close
from eoddata e
order by Cast(fromEODDate(e.Date) as Date)
""")
results.printSchema()
results.cache()
results.registerTempTable("results")
results.show(10)
results: org.apache.spark.sql.DataFrame = [Symbol: string, OrigStringDate: string, ConvertedDate: date, Open: string, High: string, Low: string, Close: string]
root
|-- Symbol: string (nullable = true)
|-- OrigStringDate: string (nullable = true)
|-- ConvertedDate: date (nullable = true)
|-- Open: string (nullable = true)
|-- High: string (nullable = true)
|-- Low: string (nullable = true)
|-- Close: string (nullable = true)
res79: results.type = [Symbol: string, OrigStringDate: string, ConvertedDate: date, Open: string, High: string, Low: string, Close: string]
+--------+--------------+-------------+-------+-------+-------+-------+
| Symbol|OrigStringDate|ConvertedDate| Open| High| Low| Close|
+--------+--------------+-------------+-------+-------+-------+-------+
|ADVA.IDX| 01-Jan-1996| 1996-01-01| 364| 364| 364| 364|
|ADVN.IDX| 01-Jan-1996| 1996-01-01| 1527| 1527| 1527| 1527|
|ADVQ.IDX| 01-Jan-1996| 1996-01-01| 1283| 1283| 1283| 1283|
|BANK.IDX| 01-Jan-1996| 1996-01-01|1009.41|1009.41|1009.41|1009.41|
| BKX.IDX| 01-Jan-1996| 1996-01-01| 39.39| 39.39| 39.39| 39.39|
|COMP.IDX| 01-Jan-1996| 1996-01-01|1052.13|1052.13|1052.13|1052.13|
| CPR.IDX| 01-Jan-1996| 1996-01-01| 1.261| 1.261| 1.261| 1.261|
|DECA.IDX| 01-Jan-1996| 1996-01-01| 205| 205| 205| 205|
|DECN.IDX| 01-Jan-1996| 1996-01-01| 825| 825| 825| 825|
|DECQ.IDX| 01-Jan-1996| 1996-01-01| 754| 754| 754| 754|
+--------+--------------+-------------+-------+-------+-------+-------+
only showing top 10 rows
que se ve bien, y vuelvo a ejecutar mi gráfico, para ver si hay errores / picos:
Como puede ver, no más picos o errores. Ahora uso un UDF como he mostrado para aplicar mis transformaciones de formato de fecha a un formato estándar aaaa-MM-dd, y desde entonces no he tenido errores espurios. :-)
Resolví el mismo problema sin la tabla / vista temporal y con las funciones del marco de datos.
Por supuesto, descubrí que solo un formato funciona con esta solución y es
yyyy-MM-DD
.
Por ejemplo:
val df = sc.parallelize(Seq("2016-08-26")).toDF("Id")
val df2 = df.withColumn("Timestamp", (col("Id").cast("timestamp")))
val df3 = df2.withColumn("Date", (col("Id").cast("date")))
df3.printSchema
root
|-- Id: string (nullable = true)
|-- Timestamp: timestamp (nullable = true)
|-- Date: date (nullable = true)
df3.show
+----------+--------------------+----------+
| Id| Timestamp| Date|
+----------+--------------------+----------+
|2016-08-26|2016-08-26 00:00:...|2016-08-26|
+----------+--------------------+----------+
La marca de tiempo, por supuesto, tiene
00:00:00.0
como valor de tiempo.
También puedes pasar el formato de fecha
df.withColumn("Date",to_date(unix_timestamp(df.col("your_date_column"), "your_date_format").cast("timestamp")))
Por ejemplo
import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq("06 Jul 2018")).toDF("dateCol")
df.withColumn("Date",to_date(unix_timestamp(df.col("dateCol"), "dd MMM yyyy").cast("timestamp")))
Use
to_date
con Java
SimpleDateFormat
.
TO_DATE(CAST(UNIX_TIMESTAMP(date, ''MM/dd/yyyy'') AS TIMESTAMP))
Ejemplo:
spark.sql("""
SELECT TO_DATE(CAST(UNIX_TIMESTAMP(''08/26/2016'', ''MM/dd/yyyy'') AS TIMESTAMP)) AS newdate"""
).show()
+----------+
| dt|
+----------+
|2016-08-26|
+----------+
dateID es una columna int que contiene la fecha en formato Int
spark.sql("SELECT from_unixtime(unix_timestamp(cast(dateid as varchar(10)), ''yyyymmdd''), ''yyyy-mm-dd'') from XYZ").show(50, false)