apache spark - structtype - Eliminar duplicados de filas basadas en columnas específicas en un RDD/Spark DataFrame
spark sql tutorial (7)
Digamos que tengo un conjunto de datos bastante grande en la siguiente forma:
data = sc.parallelize([(''Foo'',41,''US'',3),
(''Foo'',39,''UK'',1),
(''Bar'',57,''CA'',2),
(''Bar'',72,''CA'',2),
(''Baz'',22,''US'',6),
(''Baz'',36,''US'',6)])
Lo que me gustaría hacer es eliminar filas duplicadas basadas en los valores de la primera, tercera y cuarta columnas solamente.
Eliminar filas completamente duplicadas es sencillo:
data = data.distinct()
y la fila 5 o la fila 6 se eliminarán
¿Pero cómo solo elimino las filas duplicadas basadas en las columnas 1, 3 y 4 solamente? es decir, eliminar cualquiera de estos:
(''Baz'',22,''US'',6)
(''Baz'',36,''US'',6)
En Python, esto se puede hacer especificando columnas con .drop_duplicates()
. ¿Cómo puedo lograr lo mismo en Spark / Pyspark?
De acuerdo con David Para agregar, puede que no sea el caso que deseemos agrupar por todas las columnas que no sean la (s) columna (s) en función agregada, es decir, si queremos eliminar duplicados puramente basados en un subconjunto de columnas y conservar todas las columnas en el marco de datos original . Entonces, la mejor forma de hacerlo es usando dropDuplicates Dataframe api disponible en Spark 1.4.0
Para referencia, consulte: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame
De su pregunta, no está claro como qué columnas desea usar para determinar los duplicados. La idea general detrás de la solución es crear una clave basada en los valores de las columnas que identifican duplicados. Luego, puede usar reducirByKey o reducir operaciones para eliminar duplicados.
Aquí hay un código para comenzar:
def get_key(x):
return "{0}{1}{2}".format(x[0],x[2],x[3])
m = data.map(lambda x: (get_key(x),x))
Ahora, tiene un RDD
clave-valor que está codificado por las columnas 1, 3 y 4. El siguiente paso sería un reduceByKey
o groupByKey
y el filter
. Esto eliminaría duplicados.
r = m.reduceByKey(lambda x,y: (x))
Este es mi Df contiene 4 se repite dos veces por lo que aquí eliminará los valores repetidos.
scala> df.show
+-----+
|value|
+-----+
| 1|
| 4|
| 3|
| 5|
| 4|
| 18|
+-----+
scala> val newdf=df.dropDuplicates
scala> newdf.show
+-----+
|value|
+-----+
| 1|
| 3|
| 5|
| 4|
| 18|
+-----+
Pyspark incluye un método dropDuplicates()
. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ /
... Row(name=''Alice'', age=5, height=80), /
... Row(name=''Alice'', age=5, height=80), /
... Row(name=''Alice'', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
>>> df.dropDuplicates([''name'', ''height'']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
+---+------+-----+
¿Tal vez fue presentado en una versión posterior a lo que usaba @Jason (OP)?
editar: sí, se introdujo en 1.4
Sé que ya has aceptado la otra respuesta, pero si quieres hacer esto como un DataFrame, solo usa groupBy y agg. Suponiendo que tenía un DF ya creado (con columnas llamadas "col1", "col2", etc.) podría hacer:
myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")
Tenga en cuenta que en este caso, elegí Max de col2, pero podría hacer avg, min, etc.
Usé la función incorporada dropDuplicates (). Código de Scala dado a continuación
val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")
data.dropDuplicates(Array("x","count")).show()
Salida:
+---+---+---+-----+
| x| y| z|count|
+---+---+---+-----+
|Baz| 22| US| 6|
|Foo| 39| UK| 1|
|Foo| 41| US| 3|
|Bar| 57| CA| 2|
+---+---+---+-----+
El siguiente programa lo ayudará a eliminar duplicados en su totalidad, o si desea eliminar duplicados según ciertas columnas, incluso puede hacerlo:
import org.apache.spark.sql.SparkSession
object DropDuplicates {
def main(args: Array[String]) {
val spark =
SparkSession.builder()
.appName("DataFrame-DropDuplicates")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// create an RDD of tuples with some data
val custs = Seq(
(1, "Widget Co", 120000.00, 0.00, "AZ"),
(2, "Acme Widgets", 410500.00, 500.00, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(4, "Widgets R Us", 410500.00, 0.0, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
(6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)
// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")
println("*** Here''s the whole DataFrame with duplicates")
customerDF.printSchema()
customerDF.show()
// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()
println("*** Now without duplicates")
withoutDuplicates.show()
// drop fully identical rows
val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))
println("*** Now without partial duplicates too")
withoutPartials.show()
}
}