tutorial spark iniciar español ejemplo scala apache-spark dataframe uuid

scala - iniciar - apache spark tutorial español



Marco de datos Spark Cambios UUID aleatorios después de cada transformación/acción (1)

Es un comportamiento esperado. Las funciones definidas por el usuario deben ser deterministas :

Las funciones definidas por el usuario deben ser deterministas. Debido a la optimización, las invocaciones duplicadas pueden eliminarse o incluso la función puede invocarse más veces de lo que está presente en la consulta.

Si desea incluir una función no determinista y preservar el resultado, debe escribir datos intermedios en un almacenamiento persistente y leerlos nuevamente. El punto de referencia o el almacenamiento en caché pueden funcionar en algunos casos simples, pero no será confiable en general.

Si el proceso ascendente es determinista (para los principiantes hay mezcla) puede intentar usar la función rand con seed , convertir a byte array y pasar a UUID.nameUUIDFromBytes .

Consulte también: Acerca de cómo agregar una nueva columna a un DataFrame existente con valores aleatorios en Scala

Nota : SPARK-20586 introdujo el indicador deterministic , que puede desactivar cierta optimización, pero no está claro cómo se comporta cuando persisted datos y se produce una pérdida de ejecutor.

Tengo un marco de datos Spark con una columna que incluye un UUID generado. Sin embargo, cada vez que realizo una acción o transformación en el marco de datos, cambia el UUID en cada etapa.

¿Cómo puedo generar el UUID solo una vez y hacer que el UUID permanezca estático a partir de entonces?

Algunos ejemplos de código para volver a producir mi problema están a continuación:

def process(spark: SparkSession): Unit = { import spark.implicits._ val sc = spark.sparkContext val sqlContext = spark.sqlContext sc.setLogLevel("OFF") // create dataframe val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2") df.createOrReplaceTempView("df") df.show(false) // register an UDF that creates a random UUID val generateUUID = udf(() => UUID.randomUUID().toString) // generate UUID for new column val dfWithUuid = df.withColumn("new_uuid", generateUUID()) dfWithUuid.show(false) dfWithUuid.show(false) // uuid is different // new transformations also change the uuid val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1) dfWithUuidWithNewCol.show(false) }

El resultado es:

+----+----+ |col1|col2| +----+----+ |a |1 | |b |2 | |c |3 | +----+----+ +----+----+------------------------------------+ |col1|col2|new_uuid | +----+----+------------------------------------+ |a |1 |a414e73b-24b8-4f64-8d21-f0bc56d3d290| |b |2 |f37935e5-0bfc-4863-b6dc-897662307e0a| |c |3 |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26| +----+----+------------------------------------+ +----+----+------------------------------------+ |col1|col2|new_uuid | +----+----+------------------------------------+ |a |1 |1c6597bf-f257-4e5f-be81-34a0efa0f6be| |b |2 |6efe4453-29a8-4b7f-9fa1-7982d2670bd6| |c |3 |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e| +----+----+------------------------------------+ +----+----+------------------------------------+----+ |col1|col2|new_uuid |col3| +----+----+------------------------------------+----+ |a |1 |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 | |b |2 |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 | |c |3 |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 | +----+----+------------------------------------+----+

Tenga en cuenta que el UUID es diferente en cada paso.