una repetidos repetido para mostrar iguales encontrar elemento duplicados detectar datos cómo comparar como combinar columnas columna checar buscar scala apache-spark apache-spark-sql

scala - repetidos - excel buscar datos iguales en una columna



¿Cómo evitar columnas duplicadas después de unir? (7)

Tengo dos marcos de datos con las siguientes columnas:

df1.columns // Array(ts, id, X1, X2)

y

df2.columns // Array(ts, id, Y1, Y2)

Despues de hacer

val df_combined = df1.join(df2, Seq(ts,id))

Termino con las siguientes columnas: Array(ts, id, X1, X2, ts, id, Y1, Y2) . Podría esperar que las columnas comunes se eliminen. ¿Hay algo adicional que deba hacerse?


Este es un comportamiento esperado. DataFrame.join método DataFrame.join es equivalente a una unión SQL como esta

SELECT * FROM a JOIN b ON joinExprs

Si desea ignorar las columnas duplicadas, simplemente suéltelas o seleccione columnas de interés después. Si desea DataFrames la DataFrames puede usar el acceso a estos utilizando DataFrames padre:

val a: DataFrame = ??? val b: DataFrame = ??? val joinExprs: Column = ??? a.join(b, joinExprs).select(a("id"), b("foo")) // drop equivalent a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))

o use alias:

// As for now aliases don''t work with drop a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")

Para equi-une existe una sintaxis de acceso directo especial que toma una secuencia de cadenas :

val usingColumns: Seq[String] = ??? a.join(b, usingColumns)

o como una sola cadena

val usingColumn: String = ??? a.join(b, usingColumn)

que mantienen solo una copia de las columnas utilizadas en una condición de unión.


Este es un comportamiento normal de SQL, lo que estoy haciendo para esto:

  • Soltar o renombrar columnas de origen
  • Haz la unión
  • Suelte la columna renombrada si hay alguna

Aquí estoy reemplazando la columna "nombre completo":

Algún código en Java:

this .sqlContext .read() .parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day)) .drop("fullname") .registerTempTable("data_original"); this .sqlContext .read() .parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day)) .registerTempTable("data_v2"); this .sqlContext .sql(etlQuery) .repartition(1) .write() .mode(SaveMode.Overwrite) .parquet(outputPath);

Donde está la consulta:

SELECT d.*, concat_ws(''_'', product_name, product_module, name) AS fullname FROM {table_source} d LEFT OUTER JOIN {table_updates} u ON u.id = d.id

Esto es algo que solo puede hacer con Spark, creo (suelte la columna de la lista), ¡muy, muy útil!


He estado atrapado con esto por un tiempo, y solo recientemente se me ocurrió una solución que es bastante fácil.

Decir que es

scala> val a = Seq(("a", 1), ("b", 2)).toDF("key", "vala") a: org.apache.spark.sql.DataFrame = [key: string, vala: int] scala> a.show +---+----+ |key|vala| +---+----+ | a| 1| | b| 2| +---+----+ and scala> val b = Seq(("a", 1)).toDF("key", "valb") b: org.apache.spark.sql.DataFrame = [key: string, valb: int] scala> b.show +---+----+ |key|valb| +---+----+ | a| 1| +---+----+

y puedo hacer esto para seleccionar solo el valor en el marco de datos a:

scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show +---+----+ |key|vala| +---+----+ | a| 1| | b| 2| +---+----+


La mejor práctica es hacer que el nombre de la columna sea diferente en ambos DF antes de unirse a ellos y soltarlos en consecuencia.

df1.columns = [id, edad, ingresos] df2.column = [id, age_group]

df1.join (df2, on = df1.id == df2.id, how = ''inner''). write.saveAsTable (''table_name'')

// devolverá el error mientras que el error para columnas duplicadas

// prueba esto

df1.join (df2.withColumnRenamed (''id'', ''id_2''), on = df1.id == df2.id_2, how = ''inner''). drop (''id_2'')


La respuesta simple (de las Preguntas frecuentes de Databricks sobre este asunto ) es realizar la unión donde las columnas unidas se expresan como una matriz de cadenas (o una cadena) en lugar de un predicado.

A continuación se muestra un ejemplo adaptado de las Preguntas frecuentes de Databricks, pero con dos columnas de unión para responder a la pregunta del póster original.

Aquí está el marco de datos izquierdo :

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10)) val left = llist.toDF("firstname","lastname","date","duration") left.show() /* +---------+--------+----------+--------+ |firstname|lastname| date|duration| +---------+--------+----------+--------+ | bob| b|2015-01-13| 4| | alice| a|2015-04-23| 10| +---------+--------+----------+--------+ */

Aquí está el marco de datos correcto :

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload") right.show() /* +---------+--------+------+ |firstname|lastname|upload| +---------+--------+------+ | alice| a| 100| | bob| b| 23| +---------+--------+------+ */

Aquí hay una solución incorrecta , donde las columnas de unión se definen como el predicado left("firstname")===right("firstname") && left("lastname")===right("lastname") .

El resultado incorrecto es que las columnas de nombre y lastname están duplicadas en el marco de datos unidos:

left.join(right, left("firstname")===right("firstname") && left("lastname")===right("lastname")).show /* +---------+--------+----------+--------+---------+--------+------+ |firstname|lastname| date|duration|firstname|lastname|upload| +---------+--------+----------+--------+---------+--------+------+ | bob| b|2015-01-13| 4| bob| b| 23| | alice| a|2015-04-23| 10| alice| a| 100| +---------+--------+----------+--------+---------+--------+------+ */

La solución correcta es definir las columnas de unión como una matriz de cadenas Seq("firstname", "lastname") . El marco de datos de salida no tiene columnas duplicadas:

left.join(right, Seq("firstname", "lastname")).show /* +---------+--------+----------+--------+------+ |firstname|lastname| date|duration|upload| +---------+--------+----------+--------+------+ | bob| b|2015-01-13| 4| 23| | alice| a|2015-04-23| 10| 100| +---------+--------+----------+--------+------+ */


Simplemente puedes usar esto

df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")

Aquí puede ser TYPE-OF-JOIN

  • izquierda
  • derecho
  • interior
  • fullouter

Por ejemplo, tengo dos marcos de datos como este:

// df1 word count1 w1 10 w2 15 w3 20 // df2 word count2 w1 100 w2 150 w5 200

Si te unes a fullouter, entonces el resultado es así

df1.join(df2, Seq("word"),"fullouter").show() word count1 count2 w1 10 100 w2 15 150 w3 20 null w5 null 200


prueba esto,

val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))