una tipo seleccionar quƩ nueva leer insertar filas fila eliminar datos data con columna agregar performance apache-spark dataframe apache-spark-sql apache-spark-dataset

performance - tipo - seleccionar filas data frame python



DataFrame/Dataset group Por comportamiento/optimizaciĆ³n (1)

Supongamos que tenemos DataFrame df consta de las siguientes columnas:

Nombre, apellido, tamaño, ancho, largo, peso

Ahora queremos realizar un par de operaciones, por ejemplo, queremos crear un par de marcos de datos que contengan datos sobre tamaño y ancho.

val df1 = df.groupBy("surname").agg( sum("size") ) val df2 = df.groupBy("surname").agg( sum("width") )

Como puede observar, otras columnas, como Longitud, no se usan en ninguna parte. ¿Es Spark lo suficientemente inteligente como para soltar las columnas redundantes antes de la fase de barajado o son transportadas? Wil corriendo:

val dfBasic = df.select("surname", "size", "width")

antes de agrupar de alguna manera afectar el rendimiento?


Sí, es "lo suficientemente inteligente ". groupBy realizado en un DataFrame no es la misma operación que groupBy realizado en un RDD simple. En un escenario que ha descrito, no hay necesidad de mover datos sin procesar. Creemos un pequeño ejemplo para ilustrar eso:

val df = sc.parallelize(Seq( ("a", "foo", 1), ("a", "foo", 3), ("b", "bar", 5), ("b", "bar", 1) )).toDF("x", "y", "z") df.groupBy("x").agg(sum($"z")).explain // == Physical Plan == // *HashAggregate(keys=[x#148], functions=[sum(cast(z#150 as bigint))]) // +- Exchange hashpartitioning(x#148, 200) // +- *HashAggregate(keys=[x#148], functions=[partial_sum(cast(z#150 as bigint))]) // +- *Project [_1#144 AS x#148, _3#146 AS z#150] // +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#144, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#145, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#146] // +- Scan ExternalRDDScan[obj#143]

Como puede, la primera fase es una proyección donde solo se conservan las columnas requeridas. Los siguientes datos se agregan localmente y finalmente se transfieren y agregan globalmente. Obtendrá un resultado de respuesta un poco diferente si usa Spark <= 1.4 pero la estructura general debería ser exactamente la misma.

Finalmente, una visualización de DAG que muestra que la descripción anterior describe el trabajo real:

Del mismo modo, Dataset.groupByKey seguido de reduceGroups , contiene el lado del mapa ( ObjectHashAggregate con partial_reduceaggregator ) y reduce-side ( ObjectHashAggregate con la reducción reduceaggregator ):

case class Foo(x: String, y: String, z: Int) val ds = df.as[Foo] ds.groupByKey(_.x).reduceGroups((x, y) => x.copy(z = x.z + y.z)).explain // == Physical Plan == // ObjectHashAggregate(keys=[value#126], functions=[reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@54d90261, Some(newInstance(class $line40.$read$$iw$$iw$Foo)), Some(class $line40.$read$$iw$$iw$Foo), Some(StructType(StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#128, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null else named_struct(x, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).x, true, false) AS x#25, y, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).y, true, false) AS y#26, z, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).z AS z#27) AS _2#129, newInstance(class scala.Tuple2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).x, true, false) AS x#25, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).y, true, false) AS y#26, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).z AS z#27, StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false), true, 0, 0)]) // +- Exchange hashpartitioning(value#126, 200) // +- ObjectHashAggregate(keys=[value#126], functions=[partial_reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@54d90261, Some(newInstance(class $line40.$read$$iw$$iw$Foo)), Some(class $line40.$read$$iw$$iw$Foo), Some(StructType(StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#128, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null else named_struct(x, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).x, true, false) AS x#25, y, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).y, true, false) AS y#26, z, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).z AS z#27) AS _2#129, newInstance(class scala.Tuple2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).x, true, false) AS x#25, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).y, true, false) AS y#26, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).z AS z#27, StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false), true, 0, 0)]) // +- AppendColumns <function1>, newInstance(class $line40.$read$$iw$$iw$Foo), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#126] // +- *Project [_1#4 AS x#8, _2#5 AS y#9, _3#6 AS z#10] // +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#5, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#6] // +- Scan ExternalRDDScan[obj#3]

Sin embargo, otros métodos de KeyValueGroupedDataset podrían funcionar de manera similar a RDD.groupByKey . Por ejemplo, mapGroups (o flatMapGroups ) no usa agregación parcial.

ds.groupByKey(_.x) .mapGroups((_, iter) => iter.reduce((x, y) => x.copy(z = x.z + y.z))) .explain //== Physical Plan == //*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).x, true, false) AS x#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).y, true, false) AS y#38, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).z AS z#39] //+- MapGroups <function2>, value#32.toString, newInstance(class $line15.$read$$iw$$iw$Foo), [value#32], [x#8, y#9, z#10], obj#36: $line15.$read$$iw$$iw$Foo // +- *Sort [value#32 ASC NULLS FIRST], false, 0 // +- Exchange hashpartitioning(value#32, 200) // +- AppendColumns <function1>, newInstance(class $line15.$read$$iw$$iw$Foo), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#32] // +- *Project [_1#4 AS x#8, _2#5 AS y#9, _3#6 AS z#10] // +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#5, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#6] // +- Scan ExternalRDDScan[obj#3]