scala apache-spark dataframe apache-spark-sql

scala - Spark Dataframe: cómo agregar un índice Columna: Índice de datos distribuidos de Aka



apache-spark apache-spark-sql (5)

Cómo obtener una columna de identificación secuencial :

from pyspark.sql.functions import desc, row_number, monotonically_increasing_id df_with_seq_id = df.withColumn(''index_column_name'', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

Tenga en cuenta que row_number () comienza en 1, por lo tanto, reste 1 si desea una columna indexada en 0

Leí datos de un archivo csv, pero no tengo índice.

Quiero agregar una columna del 1 al número de la fila.

¿Qué debo hacer, gracias (scala)


Como dijo Ram, zippedwithindex es mejor que una identificación que aumenta monotónicamente, la identificación necesita números de fila consecutivos. Prueba esto (entorno PySpark):

from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)]) zipped_rdd = **original_dataframe**.rdd.zipWithIndex() indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))

donde original_dataframe es el marco de datos en el que tiene que agregar índice y row_with_index es el nuevo esquema con el índice de columna que puede escribir como

row_with_index = Row( "calendar_date" ,"year_week_number" ,"year_period_number" ,"realization" ,"index" )

Aquí, calendar_date , year_week_number , year_period_number y realización fueron las columnas de mi marco de datos original. Puede reemplazar los nombres con los nombres de sus columnas. index es el nuevo nombre de columna que tuvo que agregar para los números de fila.


Con Scala puedes usar:

import org.apache.spark.sql.functions._ df.withColumn("id",monotonicallyIncreasingId)

Puede consultar este exemple y docs scala.

Con Pyspark puedes usar:

from pyspark.sql.functions import monotonically_increasing_id df_index = df.select("*").withColumn("id", monotonically_increasing_id())


monotonically_increasing_id : se garantiza que la ID generada será monotónicamente creciente y única, pero no consecutiva.

"Quiero agregar una columna del 1 al número de la fila".

Digamos que tenemos el siguiente DF

+--------+-------------+-------+ | userId | productCode | count | +--------+-------------+-------+ | 25 | 6001 | 2 | | 11 | 5001 | 8 | | 23 | 123 | 5 | +--------+-------------+-------+

Para generar las ID a partir de 1

val w = Window.orderBy("count") val result = df.withColumn("index", row_number().over(w))

Esto agregaría una columna de índice ordenada al aumentar el valor de la cuenta.

+--------+-------------+-------+-------+ | userId | productCode | count | index | +--------+-------------+-------+-------+ | 25 | 6001 | 2 | 1 | | 23 | 123 | 5 | 2 | | 11 | 5001 | 8 | 3 | +--------+-------------+-------+-------+


NOTA : Los enfoques anteriores no dan un número de secuencia, pero sí dan una identificación creciente.

Una forma sencilla de hacerlo y garantizar que el orden de los índices sea el siguiente: zipWithIndex .

Data de muestra.

+-------------------+ | Name| +-------------------+ | Ram Ghadiyaram| | Ravichandra| | ilker| | nick| | Naveed| | Gobinathan SP| |Sreenivas Venigalla| | Jackela Kowski| | Arindam Sengupta| | Liangpi| | Omar14| | anshu kumar| +-------------------+

package com.example import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row} /** * DistributedDataIndex : Program to index an RDD with */ object DistributedDataIndex extends App with Logging { val spark = builder .master("local[*]") .appName(this.getClass.getName) .getOrCreate() import spark.implicits._ val df = spark.sparkContext.parallelize( Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick" , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar" )).toDF("Name") df.show logInfo("addColumnIndex here") // Add index now... val df1WithIndex = addColumnIndex(df) .withColumn("monotonically_increasing_id", monotonically_increasing_id) df1WithIndex.show(false) /** * Add Column Index to dataframe */ def addColumnIndex(df: DataFrame) = { spark.sqlContext.createDataFrame( df.rdd.zipWithIndex.map { case (row, index) => Row.fromSeq(row.toSeq :+ index) }, // Create schema for index column StructType(df.schema.fields :+ StructField("index", LongType, false))) } }

Resultado:

+-------------------+-----+---------------------------+ |Name |index|monotonically_increasing_id| +-------------------+-----+---------------------------+ |Ram Ghadiyaram |0 |0 | |Ravichandra |1 |8589934592 | |ilker |2 |8589934593 | |nick |3 |17179869184 | |Naveed |4 |25769803776 | |Gobinathan SP |5 |25769803777 | |Sreenivas Venigalla|6 |34359738368 | |Jackela Kowski |7 |42949672960 | |Arindam Sengupta |8 |42949672961 | |Liangpi |9 |51539607552 | |Omar14 |10 |60129542144 | |anshu kumar |11 |60129542145 | +-------------------+-----+---------------------------+