scala - iniciar - apache spark tutorial español
Marco de datos Spark Cambios UUID aleatorios después de cada transformación/acción (1)
Es un comportamiento esperado. Las funciones definidas por el usuario deben ser deterministas :
Las funciones definidas por el usuario deben ser deterministas. Debido a la optimización, las invocaciones duplicadas pueden eliminarse o incluso la función puede invocarse más veces de lo que está presente en la consulta.
Si desea incluir una función no determinista y preservar el resultado, debe escribir datos intermedios en un almacenamiento persistente y leerlos nuevamente. El punto de referencia o el almacenamiento en caché pueden funcionar en algunos casos simples, pero no será confiable en general.
Si el proceso ascendente es determinista (para los principiantes hay mezcla) puede intentar usar la función rand
con seed , convertir a byte array y pasar a UUID.nameUUIDFromBytes
.
Consulte también: Acerca de cómo agregar una nueva columna a un DataFrame existente con valores aleatorios en Scala
Nota : SPARK-20586 introdujo el indicador deterministic
, que puede desactivar cierta optimización, pero no está claro cómo se comporta cuando persisted
datos y se produce una pérdida de ejecutor.
Tengo un marco de datos Spark con una columna que incluye un UUID generado. Sin embargo, cada vez que realizo una acción o transformación en el marco de datos, cambia el UUID en cada etapa.
¿Cómo puedo generar el UUID solo una vez y hacer que el UUID permanezca estático a partir de entonces?
Algunos ejemplos de código para volver a producir mi problema están a continuación:
def process(spark: SparkSession): Unit = {
import spark.implicits._
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
sc.setLogLevel("OFF")
// create dataframe
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
df.createOrReplaceTempView("df")
df.show(false)
// register an UDF that creates a random UUID
val generateUUID = udf(() => UUID.randomUUID().toString)
// generate UUID for new column
val dfWithUuid = df.withColumn("new_uuid", generateUUID())
dfWithUuid.show(false)
dfWithUuid.show(false) // uuid is different
// new transformations also change the uuid
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)
}
El resultado es:
+----+----+
|col1|col2|
+----+----+
|a |1 |
|b |2 |
|c |3 |
+----+----+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |a414e73b-24b8-4f64-8d21-f0bc56d3d290|
|b |2 |f37935e5-0bfc-4863-b6dc-897662307e0a|
|c |3 |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26|
+----+----+------------------------------------+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |1c6597bf-f257-4e5f-be81-34a0efa0f6be|
|b |2 |6efe4453-29a8-4b7f-9fa1-7982d2670bd6|
|c |3 |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e|
+----+----+------------------------------------+
+----+----+------------------------------------+----+
|col1|col2|new_uuid |col3|
+----+----+------------------------------------+----+
|a |1 |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 |
|b |2 |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 |
|c |3 |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 |
+----+----+------------------------------------+----+
Tenga en cuenta que el UUID es diferente en cada paso.