sorting - Spark get collection ordenados por valor
apache-spark word-count (10)
Estaba probando este tutorial http://spark.apache.org/docs/latest/quick-start.html Primero creé una colección de un archivo
textFile = sc.textFile("README.md")
Luego intenté un comando para rodear las palabras:
wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
Para imprimir la colección:
wordCounts.collect()
Encontré cómo ordenarlo por palabra usando el comando sortByKey. Me preguntaba cómo podría ser posible hacer lo mismo para ordenar por el valor, que en este caso en el número que aparece una palabra en el documento.
Creo que puede usar la transformación genérica sortBy
(no una acción, es decir, devuelve un RDD, no una matriz) documentada here .
Entonces en tu caso, podrías hacer
wordCounts.sortBy(lambda (word, count): count)
Haciéndolo de manera más pitonica.
# In descending order
'''''' The first parameter tells number of elements
to be present in output.
''''''
data.takeOrdered(10, key=lambda x: -x[1])
# In Ascending order
data.takeOrdered(10, key=lambda x: x[1])
La clasificación generalmente debe hacerse antes de llamar a collect () ya que devuelve el conjunto de datos al programa del controlador y también es la forma en que un trabajo hadoop map-reduce se programará en java para que el resultado final que desea esté escrito (típicamente) a HDFS. Con la API de chispa, este enfoque proporciona la flexibilidad de escribir la salida en forma "en bruto" donde lo desee, por ejemplo, en un archivo donde podría usarse como entrada para un procesamiento posterior.
Usar la clasificación API de scala de chispa antes de collect () puede hacerse siguiendo la sugerencia de eliasah y usando Tuple2.swap () dos veces, una antes de la clasificación y otra para producir una lista de tuplas ordenadas en orden creciente o decreciente de su segundo campo (que se llama _2) y contiene el recuento de palabras en su primer campo (llamado _1). A continuación se muestra un ejemplo de cómo esto está escrito en spark-shell:
// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _, 1) // 2nd arg configures one task (same as number of partitions)
.map(item => item.swap) // interchanges position of entries in each tuple
.sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
.map(item => item.swap)
Para invertir el orden del género, utilice sortByKey (false, 1) ya que su primer arg es el valor booleano de ascendente. Su segundo argumento es el número de tareas (equivalente al número de particiones) que se establece en 1 para probar con un pequeño archivo de entrada donde solo se desea un archivo de datos de salida; reduceByKey también toma este argumento opcional.
Después de esto, el wordCounts RDD se puede guardar como archivos de texto en un directorio con saveAsTextFile (directory_pathname) en el que se depositarán uno o más archivos xxxxx (comenzando con part-00000) dependiendo de la cantidad de reductores configurados para el trabajo (1 archivo de datos de salida por reductor), un archivo _SUCCESS dependiendo de si el trabajo fue exitoso o no y archivos .crc.
El uso de pyspark una secuencia de comandos python muy similar a la secuencia de comandos scala que se muestra arriba produce resultados que son efectivamente los mismos. Aquí está la versión de pyspark demostrando ordenar una colección por valor:
file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) /
.map(lambda word: (word, 1)) /
.reduceByKey(lambda a, b: a + b, 1) / # last arg configures one reducer task
.map(lambda (a, b): (b, a)) /
.sortByKey(1, 1) / # 1st arg configures ascending sort, 2nd configures 1 task
.map(lambda (a, b): (b, a))
Para ordenar byKey en orden descendente, su primer arg debe ser 0. Dado que python captura el espacio en blanco inicial y final como datos, strip () se inserta antes de dividir cada línea en espacios, pero esto no es necesario usando spark-shell / scala.
La principal diferencia en la salida de la versión de spark y python de wordCount es que donde se generan las chispas (palabra, 3) salidas de pitón (u''word '', 3).
Para obtener más información sobre los métodos de chispa RDD, consulte http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html para python y https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD para scala.
En el spark-shell, ejecutar collect () en wordCounts lo transforma de un RDD a un Array [(String, Int)] = Array [Tuple2 (String, Int)] que se puede ordenar en el segundo campo de cada elemento Tuple2 utilizando:
Array.sortBy(_._2)
sortBy también toma una matemática implícita opcional. El argumento de ordenación como Romeo Kienzler mostró en una respuesta previa a esta pregunta. Array.sortBy (_._ 2) realizará una ordenación inversa de los elementos Array Tuple2 en sus campos _2 simplemente definiendo un orden inverso implícito antes de ejecutar el script map-reduce porque anula el orden preexistente de Int. El orden inverso de int. Ya definido por Romeo Kienzler es:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}
Otra forma común de definir este orden inverso es invertir el orden de ayb y soltar el (-1) en el lado derecho de la definición de comparación:
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = b.compare(a)
}
La forma más simple de ordenar la salida por valores. Después de reduceByKey puede intercambiar la salida como clave como valor y valor como clave y luego puede aplicar el método sortByKey donde falsa ordena en la orden descendente. Por defecto, ordenará en orden ascendente.
val test=textFile.flatMap(line=> line.split(" ")).map(word=> (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(false)
La solución de @kef para python es perfecta ...
Lo siguiente debe ser cambiado:
.map(lambda (a, b): (b, a))
a
.map(lambda a: (a[1], a[0]))
Logré resolverlo usando Python. Entonces creo una lista de valores de par y la ordené por valor:
out = wordCounts.collect()
outSort = sorted(out, key=lambda word:word[1])
Para aquellos que buscan obtener N elementos principales ordenados por valor:
theRDD.takeOrdered(N, lambda (key, value): -1 * len(value))
si desea ordenar por longitud de cuerda.
Por otro lado, si los valores ya están en la forma adecuada para su orden deseada, entonces:
theRDD.takeOrdered(N, lambda (key, value): -1 * value)
bastaría.
Una mejor forma de hacer sortByValue usando SCALA es
val count = oozie.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).sortBy(x => x._2)
x._2 representa el segundo elemento de cualquier lista x.
Para hacer el orden en orden descendente "-x._2"
scala> val count = oozie.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).sortBy(x => -x._2)
count: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[26] at sortBy at <console>:25
scala> count.take(10)
res6: Array[(String, Int)] = Array((the,4603), (to,1707), (and,1595), (of,1337), (a,1319), (Oozie,1302), (in,1131), (.,994), (is,956), (for,753))
puedes hacerlo de esta manera
// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}
counts.collect.toSeq.sortBy(_._2)
Así que básicamente convierte su RDD a una secuencia y utiliza el método de clasificación para ordenarlo.
El bloque anterior globalmente cambia el comportamiento de clasificación para obtener un orden de clasificación descendente.
wordCounts.map(lambda (a,b) : (b,a)).sortByKey(ascending=False).map(lambda (a,b) : (b,a)).collect()
Esta solución funciona porque cada fila de wordCount rdd se ve así:
(EL RECUENTO DE PALABRAS)
el primer mapa produce un rdd con el orden invertido de las tuplas, es decir, ahora se ven así
(CUENTA, PALABRA)
Ahora cuando hacemos sortByKey el COUNT se toma como la clave que es lo que queremos. El segundo mapa luego asigna el segundo rdd ahora ordenado al formato original de
(EL RECUENTO DE PALABRAS)
para cada fila, pero ahora no, las filas están ordenadas por el recuento de palabras.
Una suposición implícita aquí es que la asignación no cambia el orden de las filas de RDD, de lo contrario, el segundo mapa podría interferir con la ordenación.