spark row_number registros obtener numero numerar generar fila ejemplos correlativo contador consecutivo sql apache-spark row-number rdd

row_number - ¿Cómo obtengo un número de fila SQL equivalente para un Spark RDD?



spark sql (3)

Este es un problema interesante que estás planteando. Lo responderé en Python, pero estoy seguro de que podrás traducir sin problemas a Scala.

Así es como lo abordaría:

1- Simplifica tus datos:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))

temp2 es ahora un par clave-valor "real". Se ve así:

[ ((3, 4), (5, 5, 5)), ((3, 4), (5, 5, 9)), ((3, 4), (7, 5, 5)), ((1, 2), (1, 2, 3)), ((1, 2), (1, 4, 7)), ((1, 2), (2, 2, 3))

]

2- Luego, usa la función group-by para reproducir el efecto de la PARTICIÓN POR:

temp3 = temp2.groupByKey()

temp3 ahora es un RDD con 2 filas:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>), ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]

3- Ahora, debe aplicar una función de clasificación para cada valor del RDD. En Python, usaría la función ordenada simple (la enumeración creará su columna de número de fila):

temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)

Tenga en cuenta que para implementar su orden en particular, necesitaría alimentar el argumento "clave" correcto (en python, solo crearía una función lambda como esas:

lambda tuple : (tuple[0],-tuple[1],tuple[2])

Al final (sin la función de argumento clave, se ve así):

[ ((1, 2), ((1, 2, 3), 0)), ((1, 2), ((1, 4, 7), 1)), ((1, 2), ((2, 2, 3), 2)), ((3, 4), ((5, 5, 5), 0)), ((3, 4), ((5, 5, 9), 1)), ((3, 4), ((7, 5, 5), 2))

]

¡Espero que ayude!

Buena suerte.

Necesito generar una lista completa de números de fila para una tabla de datos con muchas columnas.

En SQL, esto se vería así:

select key_value, col1, col2, col3, row_number() over (partition by key_value order by col1, col2 desc, col3) from temp ;

Ahora, digamos que en Spark tengo un RDD de la forma (K, V), donde V = (col1, col2, col3), por lo que mis entradas son como

(key1, (1,2,3)) (key1, (1,4,7)) (key1, (2,2,3)) (key2, (5,5,5)) (key2, (5,5,9)) (key2, (7,5,5)) etc.

Quiero ordenar estos usando comandos como sortBy (), sortWith (), sortByKey (), zipWithIndex, etc. y tener un nuevo RDD con el número de fila correcto

(key1, (1,2,3), 2) (key1, (1,4,7), 1) (key1, (2,2,3), 3) (key2, (5,5,5), 1) (key2, (5,5,9), 2) (key2, (7,5,5), 3) etc.

(No me interesan los paréntesis, por lo que la forma también puede ser (K, (col1, col2, col3, rownum))

¿Cómo hago esto?

Aquí está mi primer intento:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) val temp1 = sc.parallelize(sample_data) temp1.collect().foreach(println) // ((3,4),5,5,5) // ((3,4),5,5,9) // ((3,4),7,5,5) // ((1,2),1,2,3) // ((1,2),1,4,7) // ((1,2),2,2,3) temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println) // ((((1,2),1,2,3),1),0) // ((((1,2),1,4,7),1),1) // ((((1,2),2,2,3),1),2) // ((((3,4),5,5,5),1),3) // ((((3,4),5,5,9),1),4) // ((((3,4),7,5,5),1),5) // note that this isn''t ordering with a partition on key value K! val temp2 = temp1.???

También tenga en cuenta que la función sortBy no se puede aplicar directamente a un RDD, pero primero se debe ejecutar collect (), y luego la salida tampoco es un RDD, sino una matriz

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println) // ((1,2),1,4,7) // ((1,2),1,2,3) // ((1,2),2,2,3) // ((3,4),5,5,5) // ((3,4),5,5,9) // ((3,4),7,5,5)

Aquí hay un poco más de progreso, pero aún no particionado:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1)) temp2.collect().foreach(println) // ((1,2),1,4,7,1) // ((1,2),1,2,3,2) // ((1,2),2,2,3,3) // ((3,4),5,5,5,4) // ((3,4),5,5,9,5) // ((3,4),7,5,5,6)


La funcionalidad row_number() over (partition by ... order by ...) se agregó a Spark 1.4. Esta respuesta utiliza PySpark / DataFrames.

Crear un DataFrame de prueba:

from pyspark.sql import Row, functions as F testDF = sc.parallelize( (Row(k="key1", v=(1,2,3)), Row(k="key1", v=(1,4,7)), Row(k="key1", v=(2,2,3)), Row(k="key2", v=(5,5,5)), Row(k="key2", v=(5,5,9)), Row(k="key2", v=(7,5,5)) ) ).toDF()

Agregue el número de fila particionada:

from pyspark.sql.window import Window (testDF .select("k", "v", F.rowNumber() .over(Window .partitionBy("k") .orderBy("k") ) .alias("rowNum") ) .show() ) +----+-------+------+ | k| v|rowNum| +----+-------+------+ |key1|[1,2,3]| 1| |key1|[1,4,7]| 2| |key1|[2,2,3]| 3| |key2|[5,5,5]| 1| |key2|[5,5,9]| 2| |key2|[7,5,5]| 3| +----+-------+------+


val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))

prueba: Seq [(String, (Int, Int, Int))] = List ((key1, (1,2,3)), (key1, (4,5,6)), (key2, (7,8 , 9)), (clave2, (0,1,2)))

test.foreach(println)

(clave1, (1,2,3))

(clave1, (4,5,6))

(clave2, (7,8,9))

(clave2, (0,1,2))

val rdd = sc.parallelize(test, 2)

rdd: org.apache.spark.rdd.RDD [(String, (Int, Int, Int))] = ParallelCollectionRDD [41] en paralelización en: 26

val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))

rdd1: org.apache.spark.rdd.RDD [(String, Array [((Int, Int, Int), Int)))] = MapPartitionsRDD [44] en el mapa en: 25

val rdd2 = rdd1.flatMap{ elem => val key = elem._1 elem._2.map(row => (key, row._1, row._2)) }

rdd2: org.apache.spark.rdd.RDD [(String, (Int, Int, Int), Int)] = MapPartitionsRDD [45] en flatMap en: 25

rdd2.collect.foreach(println)

(clave1, (1,2,3), 0)

(clave1, (4,5,6), 1)

(clave2, (0,1,2), 0)

(clave2, (7,8,9), 1)