scala apache-spark random apache-spark-sql user-defined-functions

Acerca de cómo agregar una nueva columna a un DataFrame existente con valores aleatorios en Scala



apache-spark random (2)

Puede utilizar monotonically_increasing_id para generar valores aleatorios.

Luego, puede definir un UDF para agregarle una cadena después de convertirlo en String ya que monotonically_increasing_id devuelve Long de forma predeterminada.

scala> var df = Seq(("Ron"), ("John"), ("Steve"), ("Brawn"), ("Rock"), ("Rick")).toDF("names") +-----+ |names| +-----+ | Ron| | John| |Steve| |Brawn| | Rock| | Rick| +-----+ scala> val appendD = spark.sqlContext.udf.register("appendD", (s: String) => s.concat("D")) scala> df = df.withColumn("ID",monotonically_increasing_id).selectExpr("names","cast(ID as String) ID").withColumn("ID",appendD($"ID")) +-----+---+ |names| ID| +-----+---+ | Ron| 0D| | John| 1D| |Steve| 2D| |Brawn| 3D| | Rock| 4D| | Rick| 5D| +-----+---+

Tengo un marco de datos con un archivo de parquet y tengo que agregar una nueva columna con algunos datos aleatorios, pero necesito que esos datos aleatorios sean diferentes entre sí. Este es mi código real y la versión actual de spark es 1.5.1-cdh-5.5.2:

val mydf = sqlContext.read.parquet("some.parquet") // mydf.count() // 63385686 mydf.cache val r = scala.util.Random import org.apache.spark.sql.functions.udf def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1 ).toString.concat("D")} val myFunction = udf(myNextPositiveNumber _) val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))

con este código, tengo estos datos:

scala> myNewDF.select("myNewColumn").show(10,false) +-----------+ |myNewColumn| +-----------+ |889488717D | |889488717D | |889488717D | |889488717D | |889488717D | |889488717D | |889488717D | |889488717D | |889488717D | |889488717D | +-----------+

Parece que el udf myNextPositiveNumber se invoca solo una vez, ¿no?

actualización confirmada, solo hay un valor distinto:

scala> myNewDF.select("myNewColumn").distinct.show(50,false) 17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl ... +-----------+ |myNewColumn| +-----------+ |889488717D | +-----------+

¿Qué estoy haciendo mal?

Actualización 2: finalmente, con la ayuda de @ user6910411 tengo este código:

val mydf = sqlContext.read.parquet("some.parquet") // mydf.count() // 63385686 mydf.cache val r = scala.util.Random import org.apache.spark.sql.functions.udf val accum = sc.accumulator(1) def myNextPositiveNumber():String = { accum+=1 accum.value.toString.concat("D") } val myFunction = udf(myNextPositiveNumber _) val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber)) myNewDF.select("myNewColumn").count // 63385686

actualización 3

El código real genera datos como este:

scala> mydf.select("myNewColumn").show(5,false) 17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +-----------+ |myNewColumn| +-----------+ |2D | |2D | |2D | |2D | |2D | +-----------+ only showing top 5 rows

Parece que la función udf se invoca solo una vez, ¿no? Necesito un nuevo elemento aleatorio en esa columna.

actualizar 4 @ user6910411

Tengo este código real que aumenta la identificación, pero no concatena el carácter final, es extraño. Este es mi código:

import org.apache.spark.sql.functions.udf val mydf = sqlContext.read.parquet("some.parquet") mydf.cache def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D") val myFunction = udf(myNextPositiveNumber _) val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber)) scala> myNewDF.select("myNewColumn").show(5,false) 17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1: [rdd_4_0] +-----------+ |myNewColumn| +-----------+ |0 | |1 | |2 | |3 | |4 | +-----------+

Necesito algo como:

+-----------+ |myNewColumn| +-----------+ |1D | |2D | |3D | |4D | +-----------+


Chispa> = 2.3

Es posible deshabilitar algunas optimizaciones utilizando el método asNondeterministic :

import org.apache.spark.sql.expressions.UserDefinedFunction val f: UserDefinedFunction = ??? val fNonDeterministic: UserDefinedFunction = f.asNondeterministic

Asegúrese de comprender las garantías antes de usar esta opción.

Chispa <2.3

La función que se pasa a udf debe ser determinista (con la posible excepción de SPARK-20586 ) y las llamadas a funciones nulares se pueden reemplazar por constantes. Si desea generar números aleatorios, use las funciones integradas:

  • rand : generar una columna aleatoria con muestras independientes e idénticamente distribuidas (iid) de U [0.0, 1.0].
  • randn : genera una columna con muestras independientes e idénticamente distribuidas (iid) a partir de la distribución normal estándar.

y transformar la salida para obtener la distribución requerida, por ejemplo:

(rand * Integer.MAX_VALUE).cast("bigint").cast("string")