studio paquetes org mexico library language apache-spark distinct

apache spark - paquetes - ¿Cómo funciona la función Distinct() en Spark?



r packages list (5)

Soy un novato en Apache Spark y estaba aprendiendo funcionalidades básicas. Tenía una pequeña duda. Supongamos que tengo un RDD de tuplas (clave, valor) y quería obtener algunas únicas de ellas. Yo uso la función distinta (). Me pregunto sobre qué base considera la función que las tuplas son tan dispares ... ¿Se basa en las claves, los valores, o ambos?


.distinct () está definitivamente haciendo un shuffle entre particiones. Para ver más de lo que está sucediendo, ejecute un .toDebugString en su RDD.

val hashPart = new HashPartitioner(<number of partitions>) val myRDDPreStep = <load some RDD> val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER) myRDD.checkpoint println(myRDD.toDebugString)

que para un ejemplo de RDD que tengo (myRDDPreStep ya tiene partición hash por clave, persistido por StorageLevel.MEMORY_AND_DISK_SER y marcado), devuelve:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated] +-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] | ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] | myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated] | CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B | myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]

Tenga en cuenta que puede haber formas más eficientes de obtener un distintivo que implique menos aleaciones, ESPECIALMENTE si su RDD ya está particionado de manera inteligente y las particiones no están demasiado inclinadas.

Consulte ¿Hay alguna forma de volver a escribir Spark RDD distintas para usar mapPartitions en lugar de distintas? y Apache Spark: ¿Cuál es la implementación equivalente de RDD.groupByKey () usando RDD.aggregateByKey ()?


Justin Pihony tiene razón. Distintivo usa el código de hash y el método de los objetos para esta determinación. Se devuelven los distintos elementos (objeto).

val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))

Distinto

rdd.distinct.collect().foreach(println) (2,22) (1,20) (3,22) (2,20) (1,21) (3,21)

Si desea aplicar distinto en clave. En ese caso reducir por es mejor opción.

Reducir por

val reduceRDD= rdd.map(tup => (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2) reduceRDD.collect().foreach(println)

Salida:-

(2,20) (1,20) (3,21)


Los documentos API para RDD.distinct() solo proporcionan una descripción de una oración:

"Devolver un nuevo RDD que contiene los elementos distintos en este RDD".

Por experiencia reciente, puedo decirles que en una tupla-RDD se considera la tupla como un todo.

Si desea claves distintas o valores distintos, entonces, dependiendo de lo que quiera lograr, puede:

A. llame a groupByKey() para transformar {(k1,v11),(k1,v12),(k2,v21),(k2,v22)} a {(k1,[v11,v12]), (k2,[v21,v22])} ; o

B. elimine las claves o los valores llamando a las keys() o los values() seguidos de distinct()

A partir de este escrito (junio de 2015), UC Berkeley + EdX está ejecutando un curso en línea gratuito Introducción a Big Data y Apache Spark que brindaría práctica práctica con estas funciones.


Parece que los distinct se desharán de los duplicados (clave, valor).

En el ejemplo siguiente (1,20) y (2,20) se repiten dos veces en myRDD , pero después de un distinct() , se eliminan los duplicados.

scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22 scala> myRDD.collect().foreach(println _) (1,20) (1,21) (1,20) (2,20) (2,22) (2,20) (3,21) (3,22) scala> myRDD.distinct.collect().foreach(println _) (2,22) (1,20) (3,22) (2,20) (1,21) (3,21)


distinct utiliza el hashCode y el método equals de los objetos para esta determinación. Las tuplas vienen integradas con los mecanismos de igualdad que delegan en la igualdad y la posición de cada objeto. Por lo tanto, distinct trabajará contra todo el objeto Tuple2 . Como señaló Paul, puedes llamar keys o values y luego distinct . O puede escribir sus propios valores distintos mediante aggregateByKey , lo que mantendría el emparejamiento de claves. O si quieres las claves distintas, entonces podrías usar un aggregate regular