scala - tiempo - sincronizacion chevrolet spark
Mejor manera de convertir un campo de cadena en marca de tiempo en Spark (6)
Me gustaría mover el método getTimeStamp
escrito por usted a mapPartitions de rdd y reutilizar GenericMutableRow entre filas en un iterador:
val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split(''/t'')).mapPartitions { iter =>
new Iterator[Row] {
val row = new GenericMutableRow(4)
var current: Array[String] = _
def hasNext = iter.hasNext
def next() = {
current = iter.next()
row(0) = current(0)
row(1) = current(1)
row(2) = current(2)
val ts = getTimestamp(current(3))
if(ts != null) {
row.update(3, ts)
} else {
row.setNullAt(3)
}
row
}
}
}
Y todavía debes usar el esquema para generar un DataFrame
val df = sqlContext.createDataFrame(rowRdd, tableSchema)
El uso de GenericMutableRow dentro de una implementación de iterador se puede encontrar en Aggregate Operator , InMemoryColumnarTableScan , ParquetTableOperations , etc.
Tengo un CSV en el que un campo es datetime en un formato específico. No puedo importarlo directamente en mi Dataframe porque necesita ser una marca de tiempo. Así que lo importo como una cadena y lo convierto en una Timestamp
como esta
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row
def getTimestamp(x:Any) : Timestamp = {
val format = new SimpleDateFormat("MM/dd/yyyy'' ''HH:mm:ss")
if (x.toString() == "")
return null
else {
val d = format.parse(x.toString());
val t = new Timestamp(d.getTime());
return t
}
}
def convert(row : Row) : Row = {
val d1 = getTimestamp(row(3))
return Row(row(0),row(1),row(2),d1)
}
¿Hay una forma mejor y más concisa de hacer esto, con la API Dataframe o spark-sql? El método anterior requiere la creación de un RDD y volver a proporcionar el esquema para el Dataframe.
No he jugado con Spark SQL todavía, pero creo que esto sería un scala más idiomático (el uso nulo no se considera una buena práctica):
def getTimestamp(s: String) : Option[Timestamp] = s match {
case "" => None
case _ => {
val format = new SimpleDateFormat("MM/dd/yyyy'' ''HH:mm:ss")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => Some(t)
case Failure(_) => None
}
}
}
Tenga en cuenta que asumo que conoce los tipos de elementos de la Row
antemano (si lo lee desde un archivo csv, todos ellos son String
), por eso utilizo un tipo adecuado como String
y not Any
(todo es subtipo de Any
).
También depende de cómo desea manejar las excepciones de análisis. En este caso, si se produce una excepción de análisis, simplemente se devuelve una None
.
Podrías usarlo más adelante con:
rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
Tengo una marca de tiempo ISO8601 en mi conjunto de datos y tuve que convertirla al formato "aaaa-MM-dd". Esto es lo que hice:
import org.joda.time.{DateTime, DateTimeZone}
object DateUtils extends Serializable {
def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
}
sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))
Y simplemente puede utilizar el UDF en su consulta de SQL de chispa.
Tuve algunos problemas con to_timestamp donde estaba devolviendo una cadena vacía. Después de una gran cantidad de pruebas y errores, pude sortearlo como marca de tiempo y luego como cadena. Espero que esto ayude a cualquier otra persona con el mismo problema:
df.columns.intersect(cols).foldLeft(df)((newDf, col) => {
val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
newDf.withColumn(col, conversionFunc)
})
Yo usaría https://github.com/databricks/spark-csv
Esto inferirá marcas de tiempo para usted.
import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")
val df : DataFrame = new CsvParser().withDelimiter(''|'')
.withInferSchema(true)
.withParseMode("DROPMALFORMED")
.csvRdd(sqlContext, rdd)
Chispa> = 2.2
Desde tu 2.2 puedes proporcionar la cadena de formato directamente:
import org.apache.spark.sql.functions.to_timestamp
val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")
df.withColumn("ts", ts).show(2, false)
// +---+-------------------+-------------------+
// |id |dts |ts |
// +---+-------------------+-------------------+
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2 |#$@#@# |null |
// +---+-------------------+-------------------+
Chispa> = 1.6, <2.2
Puede usar las funciones de procesamiento de fecha que se han introducido en Spark 1.5. Suponiendo que tiene los siguientes datos:
val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")
Puedes usar unix_timestamp
para analizar cadenas y convertirlas en la marca de tiempo
import org.apache.spark.sql.functions.unix_timestamp
val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")
df.withColumn("ts", ts).show(2, false)
// +---+-------------------+---------------------+
// |id |dts |ts |
// +---+-------------------+---------------------+
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2 |#$@#@# |null |
// +---+-------------------+---------------------+
Como puede ver, cubre tanto el análisis como el manejo de errores. La cadena de formato debe ser compatible con Java SimpleDateFormat
.
Chispa> = 1.5, <1.6
Tendrás que usar algo como esto:
unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")
o
(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")
Debido a SPARK-11724 .
Chispa <1.5
debería poder usarlos con expr
y HiveContext
.