spark read functions example apache-spark apache-spark-sql spark-dataframe

apache-spark - read - spark sql functions



¿Cómo usar la especificación de ventana y la condición de unión por valor de columna? (2)

Aquí está mi DF1

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction 4295858898|^|204|^|205|^|1|^|I|!| 4295858898|^|204|^|208|^|2|^|I|!| 4295858898|^|204|^|209|^|2|^|I|!| 4295858898|^|204|^|211|^|3|^|I|!| 4295858898|^|204|^|212|^|3|^|I|!| 4295858898|^|204|^|214|^|4|^|I|!| 4295858898|^|204|^|215|^|4|^|I|!| 4295858898|^|206|^|207|^|1|^|I|!| 4295858898|^|206|^|210|^|2|^|I|!| 4295858898|^|206|^|213|^|3|^|I|!|

Aquí está mi DF2

DataPartition|^|PartitionYear|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!| SelfSourcedPublic|^|2002|^|1511224917595|^|4295858941|^|24|^|25|^|4|^|O|!| SelfSourcedPublic|^|2002|^|1511224917596|^|4295858941|^|24|^|25|^|4|^|O|!| SelfSourcedPublic|^|2003|^|1511224917597|^|4295858941|^|30|^|31|^|2|^|O|!| SelfSourcedPublic|^|2003|^|1511224917598|^|4295858941|^|30|^|31|^|2|^|O|!| SelfSourcedPublic|^|2003|^|1511224917599|^|4295858941|^|30|^|32|^|1|^|O|!| SelfSourcedPublic|^|2003|^|1511224917600|^|4295858941|^|30|^|32|^|1|^|O|!| SelfSourcedPublic|^|2002|^|1511224917601|^|4295858941|^|24|^|33|^|3|^|O|!| SelfSourcedPublic|^|2002|^|1511224917602|^|4295858941|^|24|^|33|^|3|^|O|!| SelfSourcedPublic|^|2002|^|1511224917603|^|4295858941|^|24|^|34|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1511224917604|^|4295858941|^|24|^|34|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1511224917605|^|4295858941|^|1|^|2|^|4|^|O|!| SelfSourcedPublic|^|2002|^|1511224917606|^|4295858941|^|1|^|3|^|4|^|O|!| SelfSourcedPublic|^|2001|^|1511224917607|^|4295858941|^|5|^|6|^|4|^|O|!| SelfSourcedPublic|^|2001|^|1511224917608|^|4295858941|^|5|^|7|^|4|^|O|!| SelfSourcedPublic|^|2003|^|1511224917609|^|4295858941|^|12|^|10|^|2|^|O|!| SelfSourcedPublic|^|2003|^|1511224917610|^|4295858941|^|12|^|11|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1511224917611|^|4295858941|^|1|^|13|^|1|^|O|!| SelfSourcedPublic|^|2003|^|1511224917612|^|4295858941|^|12|^|14|^|1|^|O|!| SelfSourcedPublic|^|2001|^|1511224917613|^|4295858941|^|5|^|15|^|3|^|O|!| SelfSourcedPublic|^|2001|^|1511224917614|^|4295858941|^|5|^|16|^|3|^|O|!| SelfSourcedPublic|^|2002|^|1511224917615|^|4295858941|^|1|^|17|^|3|^|O|!| SelfSourcedPublic|^|2002|^|1511224917616|^|4295858941|^|1|^|18|^|3|^|O|!| SelfSourcedPublic|^|2001|^|1511224917617|^|4295858941|^|5|^|19|^|1|^|O|!| SelfSourcedPublic|^|2001|^|1511224917618|^|4295858941|^|5|^|20|^|2|^|O|!| SelfSourcedPublic|^|2001|^|1511224917619|^|4295858941|^|5|^|21|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1511224917620|^|4295858941|^|1|^|22|^|2|^|O|!| SelfSourcedPublic|^|2002|^|1511224917621|^|4295858941|^|1|^|23|^|2|^|O|!| SelfSourcedPublic|^|2016|^|1511224917622|^|4295858941|^|35|^|36|^|1|^|I|!| SelfSourcedPublic|^|2016|^|1511224917642|^|4295858941|^|null|^|35|^|null|^|D|!| SelfSourcedPublic|^|2016|^|1511224917643|^|4295858941|^|null|^|36|^|null|^|D|!| SelfSourcedPublic|^|2016|^|1511224917644|^|4295858941|^|null|^|37|^|null|^|D|!|

Quiero implementar join basado en el valor de la columna.

Esto es lo que intento lograr en Spark-Scala, por ejemplo, pero no sé cómo implementarlo

Si el FFAction_1 =I en el DF2, entonces condición inferior

(join y partitionBy en tres columnas "OrganizationId", "AnnualPeriodId","InterimPeriodId" )

val windowSpec = Window.partitionBy("OrganizationId", "AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer") .select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId", when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction")) .filter(!$"FFAction".contains("D"))

Si el FFAction_1 =O or D , condición a continuación

(join y partitionBy en dos columnas "OrganizationId","InterimPeriodId" )

val windowSpec = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("OrganizationId","AnnualPeriodId","InterimPeriodId"), "outer") .select($"OrganizationId", $"AnnualPeriodId",$"InterimPeriodId", when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction")) .filter(!$"FFAction".contains("D"))

A continuación está mi código completo

val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ import org.apache.spark.{ SparkConf, SparkContext } import java.sql.{Date, Timestamp} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.input_file_name import org.apache.spark.sql.functions.regexp_extract val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("//.")(3)) val get_cus_YearPartition = spark.udf.register("get_cus_YearPartition", (filePath: String) => filePath.split("//.")(4)) val rdd = sc.textFile("s3://trfsmallfffile/Interim2Annual/MAIN") val header = rdd.filter(_.contains("OrganizationId")).map(line => line.split("//|//^//|")).first() val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq) val data = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("//|//^//|").toSeq)), schema) val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq) val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("//|//^//|").toSeq)), schemaHeader) val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name)) val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_YearPartition(input_file_name)) //Loading Incremental val rdd1 = sc.textFile("s3://trfsmallfffile/Interim2Annual/INCR") val header1 = rdd1.filter(_.contains("OrganizationId")).map(line => line.split("//|//^//|")).first() val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq) val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("//|//^//|").toSeq)), schema1) //------------------------------- filtering only the latest from increamental ------------------------------ import org.apache.spark.sql.expressions._ val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey1 = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank") val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2)) .filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|")) .drop("tobefiltered", "TimeStamp") //-----------------separating the increamental df for insert, deletion and overwrite---------------- //---------------insert rows are selected ------------------------------- //insert a row if I is detected and if O is found then first delete and then insert val insertdf = latestForEachKey.filter($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*) //------------------deleted rows with primary key "OrganizationId", "InterimPeriodId"------------------ // delete rows from parent if both D or O is found in increamental val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete")) //join by two primary keys for deletion and delete from the parent dataframe val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete") val dfToSave=dfMainOutput.union(insertdf).withColumn("FFAction|!|", when($"FFAction|!|" === "O|!|" || $"FFAction|!|" === "I|!|", lit("I|!|"))) val dfMainOutputFinal = dfToSave.na.fill("").select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated")) val headerColumn = dataHeader.columns.toSeq val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3) val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header) dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear") .format("csv") .option("nullValue", "") .option("delimiter", "/t") .option("quote", "/u0000") .option("header", "true") .option("codec", "gzip") .save("s3://trfsmallfffile/Interim2Annual/output") val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear").count FFRowCount.coalesce(1).write.format("com.databricks.spark.xml") .option("rootTag", "FFFileType") .option("rowTag", "FFPhysicalFile") .save("s3://trfsmallfffile/Interim2Annual/Descr")


Como ya ha seleccionado las últimas filas de su DF2 usando sus tres claves principales "OrganizationId", "AnnualPeriodId", "InterimPeriodId" usando la función de Window , ya tiene rows para insertar utilizando esas tres claves principales. Todo lo que necesita es filter rows con I y O en la columna FFAction_1 .

Y como dijiste, deseas eliminar las rows de DF1 si FFAction_1 = O o D usando dos claves principales "OrganizationId","InterimPeriodId" . Para esto también, debe filter rows de DF2 con D u O en la columna FFAction_1 para eliminarlas.

Después de separar el DF2 utilizando la lógica mencionada anteriormente, el primer paso sería eliminar rows de DF1 al join . Y luego inserte las otras filas filtradas de DF2 usando union .

Aquí está la solución completa

//------------------------------- filtering only the latest from increamental ------------------------------ import org.apache.spark.sql.expressions._ val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc) val latestForEachKey = df2.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") //-----------------separating the increamental df for insert, deletion and overwrite---------------- //---------------insert rows are selected ------------------------------- //insert a row if I is detected and if O is found then first delete and then insert val insertdf = latestForEachKey.filter($"FFAction" === "I|!|" || $"FFAction" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*) //------------------deleted rows with primary key "OrganizationId", "InterimPeriodId"------------------ // delete rows from parent if both D or O is found in increamental val deletedf = latestForEachKey.filter($"FFAction" === "D|!|" || $"FFAction" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete")) //join by two primary keys for deletion and delete from the parent dataframe val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left") .filter($"Delete".isNull) .drop("Delete") //final required output dfMainOutput.union(insertdf).show(false)


DESCARGO DE RESPONSABILIDAD De alguna manera, esta y la otra pregunta que acabo de responder parecen duplicados, por lo que una será marcada como tal pronto o descubriremos la diferencia entre ellas y la renuncia desaparecerá. El tiempo dirá.

Dado el requisito de seleccionar la especificación de ventana final y la condición de unión en función de los valores de la columna FFAction_1 , me gustaría filter primero y decidir qué agregación de ventana y unirme para usar.

val df1 = spark. read. option("header", true). option("sep", "|"). csv("df1.csv"). select("OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber", "FFAction") scala> df1.show +--------------+--------------+---------------+-------------+--------+ |OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber|FFAction| +--------------+--------------+---------------+-------------+--------+ | 4295858898| 204| 205| 1| I| | 4295858898| 204| 208| 2| I| | 4295858898| 204| 209| 2| I| | 4295858898| 204| 211| 3| I| | 4295858898| 204| 212| 3| I| | 4295858898| 204| 214| 4| I| | 4295858898| 204| 215| 4| I| | 4295858898| 206| 207| 1| I| | 4295858898| 206| 210| 2| I| | 4295858898| 206| 213| 3| I| +--------------+--------------+---------------+-------------+--------+

El lado derecho de la unión es bastante similar en "forma".

val df2 = spark. read. option("header", true). option("sep", "|"). csv("df2.csv"). select("DataPartition_1", "PartitionYear_1", "TimeStamp", "OrganizationId", "AnnualPeriodId", "InterimPeriodId", "InterimNumber_1", "FFAction_1") scala> df2.show +-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+ | DataPartition_1|PartitionYear_1| TimeStamp|OrganizationId|AnnualPeriodId|InterimPeriodId|InterimNumber_1|FFAction_1| +-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+ |SelfSourcedPublic| 2002|1510725106270| 4295858941| 24| 25| 4| O| |SelfSourcedPublic| 2002|1510725106271| 4295858941| 24| 25| 5| O| |SelfSourcedPublic| 2003|1510725106272| 4295858941| 30| 31| 2| O| |SelfSourcedPublic| 2003|1510725106273| 4295858941| 30| 31| 3| O| |SelfSourcedPublic| 2001|1510725106293| 4295858941| 5| 20| 2| O| |SelfSourcedPublic| 2001|1510725106294| 4295858941| 5| 21| 3| O| |SelfSourcedPublic| 2002|1510725106295| 4295858941| 1| 22| 4| O| |SelfSourcedPublic| 2002|1510725106296| 4295858941| 1| 23| 5| O| |SelfSourcedPublic| 2016|1510725106297| 4295858941| 35| 36| 1| I| |SelfSourcedPublic| 2016|1510725106297| 4295858941| 35| 36| 1| D| +-----------------+---------------+-------------+--------------+--------------+---------------+---------------+----------+

Con los conjuntos de datos anteriores, me gustaría filter para ver si hay al menos un I en df2 en la columna FFAction_1 y seleccionar la especificación de ventana correcta y la condición de unión.

El truco consiste en utilizar el operador de join seguido por el operador de where (o filter ) para que pueda decidir qué condición de combinación usar.

val noIs = df2.filter($"FFAction_1" === "I").take(1).isEmpty val (windowSpec, joinCond) = if (noIs) { (windowSpecForOs, joinForOs) } else { (windowSpecForIs, joinForIs) } val latestForEachKey = df2result.withColumn("rank", rank() over windowSpec) val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey).where(joinCond)