tutorial sqlcontext spark functions example espaƱol scala apache-spark dataframe apache-spark-sql rdd

scala - sqlcontext - Igualdad de DataFrame en Apache Spark



spark sql tutorial (5)

Supongamos que df1 y df2 son dos DataFrame en Apache Spark, calculados utilizando dos mecanismos diferentes, por ejemplo, Spark SQL frente a la API de Scala / Java / Python.

¿Existe una forma idiomática de determinar si los dos marcos de datos son equivalentes (iguales, isomórficos), donde la equivalencia está determinada por los datos (nombres de columna y valores de columna para cada fila) siendo idénticos salvo para ordenar filas y columnas?

La motivación para la pregunta es que a menudo hay muchas formas de calcular algunos resultados de big data, cada uno con sus propios intercambios. A medida que se exploran estas concesiones, es importante mantener la corrección y, por lo tanto, la necesidad de verificar la equivalencia / igualdad en un conjunto de datos de prueba significativo.


Existen algunas formas estándar en las suites de prueba Apache Spark, sin embargo, la mayoría de ellas implican la recopilación local de datos y, si desea realizar pruebas de igualdad en marcos de datos grandes, probablemente no sea una solución adecuada.

Verificando el esquema primero y luego podrías hacer una intersección con df3 y verificar que el recuento de df1, df2 y df3 son todos iguales (sin embargo, esto solo funciona si no hay filas duplicadas, si hay diferentes filas duplicadas, este método aún podría devolver verdadero).

Otra opción sería obtener los RDDs subyacentes de ambos DataFrames, mapearlos a (Fila, 1), hacer un reducedByKey para contar el número de cada Fila, y luego agrupar los dos RDDs resultantes y luego hacer un agregado regular y devolver falso si cualquiera de los iteradores no son iguales.


No sé nada de idiomático, pero creo que se puede obtener una forma robusta de comparar DataFrames como se describe a continuación. (Estoy usando PySpark para ilustración, pero el enfoque se transmite a través de los idiomas).

a = spark.range(5) b = spark.range(5) a_prime = a.groupBy(sorted(a.columns)).count() b_prime = b.groupBy(sorted(b.columns)).count() assert a_prime.subtract(b_prime).count() == b_prime.subtract(a_prime).count() == 0

Este enfoque maneja correctamente los casos donde los DataFrames pueden tener filas duplicadas, filas en diferentes órdenes y / o columnas en diferentes órdenes.

Por ejemplo:

a = spark.createDataFrame([(''nick'', 30), (''bob'', 40)], [''name'', ''age'']) b = spark.createDataFrame([(40, ''bob''), (30, ''nick'')], [''age'', ''name'']) c = spark.createDataFrame([(''nick'', 30), (''bob'', 40), (''nick'', 30)], [''name'', ''age'']) a_prime = a.groupBy(sorted(a.columns)).count() b_prime = b.groupBy(sorted(b.columns)).count() c_prime = c.groupBy(sorted(c.columns)).count() assert a_prime.subtract(b_prime).count() == b_prime.subtract(a_prime).count() == 0 assert a_prime.subtract(c_prime).count() != 0

Este enfoque es bastante caro, pero la mayor parte del gasto es inevitable dada la necesidad de realizar una diferencia completa. Y esto debería escalarse bien ya que no requiere recolectar nada localmente. Si relaja la restricción de que la comparación debe tener en cuenta las filas duplicadas, puede descartar el groupBy() y simplemente hacer el subtract() , lo que probablemente acelerará las cosas notablemente.


La biblioteca de pruebas de chispa rápida tiene dos métodos para hacer comparaciones de DataFrame (yo soy el creador de la biblioteca):

El método assertSmallDataFrameEquality recopila DataFrames en el nodo del controlador y hace la comparación

def assertSmallDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = { if (!actualDF.schema.equals(expectedDF.schema)) { throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF)) } if (!actualDF.collect().sameElements(expectedDF.collect())) { throw new DataFrameContentMismatch(contentMismatchMessage(actualDF, expectedDF)) } }

El método assertLargeDataFrameEquality compara DataFrames spread en varias máquinas (el código se copia básicamente de spark-testing-base )

def assertLargeDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = { if (!actualDF.schema.equals(expectedDF.schema)) { throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF)) } try { actualDF.rdd.cache expectedDF.rdd.cache val actualCount = actualDF.rdd.count val expectedCount = expectedDF.rdd.count if (actualCount != expectedCount) { throw new DataFrameContentMismatch(countMismatchMessage(actualCount, expectedCount)) } val expectedIndexValue = zipWithIndex(actualDF.rdd) val resultIndexValue = zipWithIndex(expectedDF.rdd) val unequalRDD = expectedIndexValue .join(resultIndexValue) .filter { case (idx, (r1, r2)) => !(r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, 0.0)) } val maxUnequalRowsToShow = 10 assertEmpty(unequalRDD.take(maxUnequalRowsToShow)) } finally { actualDF.rdd.unpersist() expectedDF.rdd.unpersist() } }

assertSmallDataFrameEquality es más rápido para pequeñas comparaciones de DataFrame y lo he encontrado suficiente para mis suites de prueba.


Puede hacerlo utilizando un poco de deduplicación en combinación con una combinación externa completa. La ventaja de este enfoque es que no requiere que recopile resultados para el controlador, y que evita ejecutar múltiples trabajos.

import org.apache.spark.sql._ import org.apache.spark.sql.functions._ // Generate some random data. def random(n: Int, s: Long) = { spark.range(n).select( (rand(s) * 10000).cast("int").as("a"), (rand(s + 5) * 1000).cast("int").as("b")) } val df1 = random(10000000, 34) val df2 = random(10000000, 17) // Move all the keys into a struct (to make handling nulls easy), deduplicate the given dataset // and count the rows per key. def dedup(df: Dataset[Row]): Dataset[Row] = { df.select(struct(df.columns.map(col): _*).as("key")) .groupBy($"key") .agg(count(lit(1)).as("row_count")) } // Deduplicate the inputs and join them using a full outer join. The result can contain // the following things: // 1. Both keys are not null (and thus equal), and the row counts are the same. The dataset // is the same for the given key. // 2. Both keys are not null (and thus equal), and the row counts are not the same. The dataset // contains the same keys. // 3. Only the right key is not null. // 4. Only the left key is not null. val joined = dedup(df1).as("l").join(dedup(df2).as("r"), $"l.key" === $"r.key", "full") // Summarize the differences. val summary = joined.select( count(when($"l.key".isNotNull && $"r.key".isNotNull && $"r.row_count" === $"l.row_count", 1)).as("left_right_same_rc"), count(when($"l.key".isNotNull && $"r.key".isNotNull && $"r.row_count" =!= $"l.row_count", 1)).as("left_right_different_rc"), count(when($"l.key".isNotNull && $"r.key".isNull, 1)).as("left_only"), count(when($"l.key".isNull && $"r.key".isNotNull, 1)).as("right_only")) summary.show()


Java:

assert resultDs.union(answerDs).distinct().count() == resultDs.intersect(answerDs).count();