scala - Conjunto de datos Spark 2.0 vs DataFrame
apache-spark apache-spark-sql (2)
-
La diferencia entre
df.select("foo")
ydf.select($"foo")
es la firma. El primero toma al menos unaString
, el último cero o másColumns
. No hay diferencia práctica más allá de eso. -
myDataSet.map(foo.someVal)
comprobaciones de tipomyDataSet.map(foo.someVal)
, pero como cualquier operación deDataset
usaRDD
de objetos, y en comparación conDataFrame
operaciones deDataFrame
, hay una sobrecarga significativa. Echemos un vistazo a un ejemplo simple:case class FooBar(foo: Int, bar: String) val ds = Seq(FooBar(1, "x")).toDS ds.map(_.foo).explain
== Physical Plan == *SerializeFromObject [input[0, int, true] AS value#123] +- *MapElements <function1>, obj#122: int +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar +- LocalTableScan [foo#117, bar#118]
Como puede ver, este plan de ejecución requiere acceso a todos los campos y tiene que
DeserializeToObject
. -
No. En general, otros métodos no son azúcar sintáctica y generan un plan de ejecución significativamente diferente. Por ejemplo:
ds.select($"foo").explain
== Physical Plan == LocalTableScan [foo#117]
En comparación con el plan que se muestra antes de que pueda acceder a la columna directamente. No es tanto una limitación de la API sino un resultado de una diferencia en la semántica operativa.
-
¿Cómo podría df.select ("foo") type-safe sin una declaración de mapa?
No hay tal opción. Si bien las columnas escritas le permiten transformar el
Dataset
estático en otroDataset
estáticamente escrito:ds.select($"bar".as[Int])
No hay tipo seguro. Existen otros intentos de incluir operaciones optimizadas de tipo seguro, como agregaciones escritas , pero esta API experimental.
-
¿Por qué debería usar un UDF / UADF en lugar de un mapa?
Depende completamente de ti. Cada estructura de datos distribuidos en Spark proporciona sus propias ventajas y desventajas (consulte, por ejemplo, Spark UDAF con ArrayType como problemas de rendimiento de bufferSchema ).
Personalmente, considero que el
Dataset
de
Dataset
tipo estático es el menos útil:
-
No proporcione el mismo rango de optimizaciones que
Dataset[Row]
(aunque comparten el formato de almacenamiento y algunas optimizaciones del plan de ejecución, no se beneficia completamente de la generación de código o el almacenamiento fuera del montón) ni el acceso a todas las capacidades analíticas delDataFrame
. -
Las transformaciones escritas son cajas negras y crean efectivamente una barrera de análisis para el optimizador. Por ejemplo, las selecciones (filtros) no se pueden empujar sobre la transformación escrita:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
== Physical Plan == *Filter (foo#133 = 1) +- *Filter <function1>.apply +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- LocalTableScan [foo#133, bar#134]
Comparado con:
ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
== Physical Plan == *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- *Filter (foo#133 = 1) +- LocalTableScan [foo#133, bar#134]
Esto afecta características como el pushdown de predicados o el pushdown de proyección.
-
No son tan flexibles como los
RDDs
con solo un pequeño subconjunto de tipos admitidos de forma nativa. -
La "seguridad de tipo" con los
Encoders
es discutible cuando elDataset
se convierteas
método. Como la forma de los datos no se codifica con una firma, un compilador solo puede verificar la existencia de unEncoder
.
Preguntas relacionadas:
- Realizar una unión escrita en Scala con conjuntos de datos de Spark
- Spark 2.0 DataSets groupByKey y división de operación y seguridad de tipo
comenzando con la chispa 2.0.1 Tengo algunas preguntas. Leí mucha documentación pero hasta ahora no pude encontrar suficientes respuestas:
-
Cuál es la diferencia entre
-
df.select("foo")
-
df.select($"foo")
-
-
entiendo correctamente que
-
myDataSet.map(foo.someVal)
es seguro y no se convertirá enRDD
pero permanecerá en la representación de DataSet / sin sobrecarga adicional (rendimiento inteligente para 2.0.0)
-
-
todos los otros comandos, por ejemplo, select, .. son solo azúcar sintáctico.
No son de tipo seguro y en su lugar podría usarse un mapa.
¿Cómo podría
df.select("foo")
type-safe sin una declaración de mapa?- ¿por qué debería usar un UDF / UADF en lugar de un mapa (suponiendo que el mapa permanezca en la representación del conjunto de datos)?
Spark
Dataset
es mucho más poderoso que Spark
Dataframe
.
Pequeño ejemplo: solo puede crear
Dataframe
de
Row
,
Tuple
o cualquier tipo de datos primitivo, pero
Dataset
le da el poder de crear
Dataset
de cualquier tipo no primitivo.
es decir, literalmente puede crear un
Dataset
de
Dataset
de tipo de objeto.
Ex:
case class Employee(id:Int,name:String)
Dataset[Employee] // is valid
Dataframe[Employee] // is invalid