scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

scala - Conjunto de datos Spark 2.0 vs DataFrame



apache-spark apache-spark-sql (2)

  1. La diferencia entre df.select("foo") y df.select($"foo") es la firma. El primero toma al menos una String , el último cero o más Columns . No hay diferencia práctica más allá de eso.
  2. myDataSet.map(foo.someVal) comprobaciones de tipo myDataSet.map(foo.someVal) , pero como cualquier operación de Dataset usa RDD de objetos, y en comparación con DataFrame operaciones de DataFrame , hay una sobrecarga significativa. Echemos un vistazo a un ejemplo simple:

    case class FooBar(foo: Int, bar: String) val ds = Seq(FooBar(1, "x")).toDS ds.map(_.foo).explain

    == Physical Plan == *SerializeFromObject [input[0, int, true] AS value#123] +- *MapElements <function1>, obj#122: int +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar +- LocalTableScan [foo#117, bar#118]

    Como puede ver, este plan de ejecución requiere acceso a todos los campos y tiene que DeserializeToObject .

  3. No. En general, otros métodos no son azúcar sintáctica y generan un plan de ejecución significativamente diferente. Por ejemplo:

    ds.select($"foo").explain

    == Physical Plan == LocalTableScan [foo#117]

    En comparación con el plan que se muestra antes de que pueda acceder a la columna directamente. No es tanto una limitación de la API sino un resultado de una diferencia en la semántica operativa.

  4. ¿Cómo podría df.select ("foo") type-safe sin una declaración de mapa?

    No hay tal opción. Si bien las columnas escritas le permiten transformar el Dataset estático en otro Dataset estáticamente escrito:

    ds.select($"bar".as[Int])

    No hay tipo seguro. Existen otros intentos de incluir operaciones optimizadas de tipo seguro, como agregaciones escritas , pero esta API experimental.

  5. ¿Por qué debería usar un UDF / UADF en lugar de un mapa?

    Depende completamente de ti. Cada estructura de datos distribuidos en Spark proporciona sus propias ventajas y desventajas (consulte, por ejemplo, Spark UDAF con ArrayType como problemas de rendimiento de bufferSchema ).

Personalmente, considero que el Dataset de Dataset tipo estático es el menos útil:

  • No proporcione el mismo rango de optimizaciones que Dataset[Row] (aunque comparten el formato de almacenamiento y algunas optimizaciones del plan de ejecución, no se beneficia completamente de la generación de código o el almacenamiento fuera del montón) ni el acceso a todas las capacidades analíticas del DataFrame .

  • Las transformaciones escritas son cajas negras y crean efectivamente una barrera de análisis para el optimizador. Por ejemplo, las selecciones (filtros) no se pueden empujar sobre la transformación escrita:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain

    == Physical Plan == *Filter (foo#133 = 1) +- *Filter <function1>.apply +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- LocalTableScan [foo#133, bar#134]

    Comparado con:

    ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain

    == Physical Plan == *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- *Filter (foo#133 = 1) +- LocalTableScan [foo#133, bar#134]

    Esto afecta características como el pushdown de predicados o el pushdown de proyección.

  • No son tan flexibles como los RDDs con solo un pequeño subconjunto de tipos admitidos de forma nativa.

  • La "seguridad de tipo" con los Encoders es discutible cuando el Dataset se convierte as método. Como la forma de los datos no se codifica con una firma, un compilador solo puede verificar la existencia de un Encoder .

Preguntas relacionadas:

  • Realizar una unión escrita en Scala con conjuntos de datos de Spark
  • Spark 2.0 DataSets groupByKey y división de operación y seguridad de tipo

comenzando con la chispa 2.0.1 Tengo algunas preguntas. Leí mucha documentación pero hasta ahora no pude encontrar suficientes respuestas:

  • Cuál es la diferencia entre
    • df.select("foo")
    • df.select($"foo")
  • entiendo correctamente que
    • myDataSet.map(foo.someVal) es seguro y no se convertirá en RDD pero permanecerá en la representación de DataSet / sin sobrecarga adicional (rendimiento inteligente para 2.0.0)
  • todos los otros comandos, por ejemplo, select, .. son solo azúcar sintáctico. No son de tipo seguro y en su lugar podría usarse un mapa. ¿Cómo podría df.select("foo") type-safe sin una declaración de mapa?
    • ¿por qué debería usar un UDF / UADF en lugar de un mapa (suponiendo que el mapa permanezca en la representación del conjunto de datos)?

Spark Dataset es mucho más poderoso que Spark Dataframe . Pequeño ejemplo: solo puede crear Dataframe de Row , Tuple o cualquier tipo de datos primitivo, pero Dataset le da el poder de crear Dataset de cualquier tipo no primitivo. es decir, literalmente puede crear un Dataset de Dataset de tipo de objeto.

Ex:

case class Employee(id:Int,name:String) Dataset[Employee] // is valid Dataframe[Employee] // is invalid