varias superponer studio lineas graficos graficas scala apache-spark dataframe apache-spark-sql partitioning

scala - superponer - ¿Cómo definir la partición de DataFrame?



superponer graficas en r (5)

Chispa> = 2.3.0

SPARK-22614 expone la división de rango.

val partitionedByRange = df.repartitionByRange(42, $"k") partitionedByRange.explain // == Parsed Logical Plan == // ''RepartitionByExpression [''k ASC NULLS FIRST], 42 // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6] // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#5 ASC NULLS FIRST], 42 // +- Project [_1#2 AS k#5, _2#3 AS v#6] // +- LocalRelation [_1#2, _2#3] // // == Optimized Logical Plan == // RepartitionByExpression [k#5 ASC NULLS FIRST], 42 // +- LocalRelation [k#5, v#6] // // == Physical Plan == // Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42) // +- LocalTableScan [k#5, v#6]

SPARK-22389 expone el particionamiento de formato externo en la fuente de datos API v2 .

Chispa> = 1.6.0

En Spark> = 1.6 es posible usar particiones por columna para consultas y almacenamiento en caché. Ver: SPARK-11410 y SPARK-4849 usando el método de repartition :

val df = Seq( ("A", 1), ("B", 2), ("A", 3), ("C", 1) ).toDF("k", "v") val partitioned = df.repartition($"k") partitioned.explain // scala> df.repartition($"k").explain(true) // == Parsed Logical Plan == // ''RepartitionByExpression [''k], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27 // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#7], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27 // // == Optimized Logical Plan == // RepartitionByExpression [k#7], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27 // // == Physical Plan == // TungstenExchange hashpartitioning(k#7,200), None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- Scan PhysicalRDD[_1#5,_2#6]

A diferencia de los RDDs Spark Dataset (incluido Dataset[Row] también DataFrame como DataFrame ) no puede usar un particionador personalizado por ahora. Por lo general, puede abordar eso creando una columna de partición artificial, pero no le dará la misma flexibilidad.

Chispa <1.6.0:

Una cosa que puede hacer es particionar previamente los datos de entrada antes de crear un DataFrame

import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.HashPartitioner val schema = StructType(Seq( StructField("x", StringType, false), StructField("y", LongType, false), StructField("z", DoubleType, false) )) val rdd = sc.parallelize(Seq( Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0), Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99) )) val partitioner = new HashPartitioner(5) val partitioned = rdd.map(r => (r.getString(0), r)) .partitionBy(partitioner) .values val df = sqlContext.createDataFrame(partitioned, schema)

Dado que la creación de DataFrame partir de un RDD requiere solo una fase de mapa simple, se debe preservar el diseño de partición existente *:

assert(df.rdd.partitions == partitioned.partitions)

De la misma manera que puede DataFrame existente:

sqlContext.createDataFrame( df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values, df.schema )

Entonces parece que no es imposible. La pregunta sigue siendo si tiene sentido. Argumentaré que la mayoría de las veces no lo hace:

  1. Reparticionar es un proceso costoso. En un escenario típico, la mayoría de los datos deben ser serializados, barajados y deserializados. Por otro lado, el número de operaciones que pueden beneficiarse de los datos particionados previamente es relativamente pequeño y se limita aún más si la API interna no está diseñada para aprovechar esta propiedad.

    • se une en algunos escenarios, pero requeriría un soporte interno,
    • funciones de ventana llamadas con particionador coincidente. Igual que el anterior, limitado a una sola definición de ventana. Sin embargo, ya está particionado internamente, por lo que la partición previa puede ser redundante,
    • agregaciones simples con GROUP BY : es posible reducir la huella de memoria de los búferes temporales **, pero el costo general es mucho mayor. Más o menos equivalente a groupByKey.mapValues(_.reduce) (comportamiento actual) frente a reduceByKey (prepartición). Es poco probable que sea útil en la práctica.
    • compresión de datos con SqlContext.cacheTable . Dado que parece que está utilizando la codificación de longitud de ejecución, la aplicación de OrderedRDDFunctions.repartitionAndSortWithinPartitions podría mejorar la relación de compresión.
  2. El rendimiento depende en gran medida de una distribución de las claves. Si está sesgado, dará como resultado una utilización de recursos subóptima. En el peor de los casos, será imposible terminar el trabajo.

  3. Un punto de usar una API declarativa de alto nivel es aislarse de los detalles de implementación de bajo nivel. Como ya lo mencionaron @dwysakowicz y @RomiKuntsman optimización es un trabajo del Optimizador Catalyst . Es una bestia bastante sofisticada y realmente dudo que puedas mejorarla fácilmente sin sumergirte mucho más en su interior.

Conceptos relacionados

Particionamiento con fuentes JDBC :

Las fuentes de datos JDBC admiten argumentos predicates . Se puede usar de la siguiente manera:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Crea una única partición JDBC por predicado. Tenga en cuenta que si los conjuntos creados con predicados individuales no son disjuntos, verá duplicados en la tabla resultante.

partitionBy método en DataFrameWriter :

Spark DataFrameWriter proporciona el método de la partitionBy que se puede utilizar para "particionar" los datos en la escritura. Separa los datos en escritura usando el conjunto de columnas provisto

val df = Seq( ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6) ).toDF("k", "v") df.write.partitionBy("k").json("/tmp/foo.json")

Esto permite que el predicado empuje hacia abajo en la lectura de consultas basadas en la clave:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json") df1.where($"k" === "bar")

pero no es equivalente a DataFrame.repartition . En particular agregaciones como:

val cnts = df1.groupBy($"k").sum()

aún requerirá TungstenExchange :

cnts.explain // == Physical Plan == // TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93]) // +- TungstenExchange hashpartitioning(k#90,200), None // +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99]) // +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

Método bucketBy en DataFrameWriter (Spark> = 2.0):

bucketBy tiene aplicaciones similares a la partitionBy pero está disponible solo para tablas ( saveAsTable ). La información de agrupación se puede utilizar para optimizar las uniones:

// Temporarily disable broadcast joins spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df.write.bucketBy(42, "k").saveAsTable("df1") val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2") df2.write.bucketBy(42, "k").saveAsTable("df2") // == Physical Plan == // *Project [k#41, v#42, v2#47] // +- *SortMergeJoin [k#41], [k#46], Inner // :- *Sort [k#41 ASC NULLS FIRST], false, 0 // : +- *Project [k#41, v#42] // : +- *Filter isnotnull(k#41) // : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int> // +- *Sort [k#46 ASC NULLS FIRST], false, 0 // +- *Project [k#46, v2#47] // +- *Filter isnotnull(k#46) // +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Por diseño de partición me refiero solo a una distribución de datos. RDD partitioned ya no tiene un particionador. ** Suponiendo que no hay proyección temprana. Si la agregación cubre solo un pequeño subconjunto de columnas, probablemente no haya ganancia alguna.

Comencé a usar Spark SQL y DataFrames en Spark 1.4.0. Quiero definir un particionador personalizado en DataFrames, en Scala, pero no veo cómo hacerlo.

Una de las tablas de datos con las que estoy trabajando contiene una lista de transacciones, por cuenta, silimar para el siguiente ejemplo.

Account Date Type Amount 1001 2014-04-01 Purchase 100.00 1001 2014-04-01 Purchase 50.00 1001 2014-04-05 Purchase 70.00 1001 2014-04-01 Payment -150.00 1002 2014-04-01 Purchase 80.00 1002 2014-04-02 Purchase 22.00 1002 2014-04-04 Payment -120.00 1002 2014-04-04 Purchase 60.00 1003 2014-04-02 Purchase 210.00 1003 2014-04-03 Purchase 15.00

Al menos inicialmente, la mayoría de los cálculos ocurrirán entre las transacciones dentro de una cuenta. Por lo tanto, me gustaría tener los datos particionados para que todas las transacciones de una cuenta estén en la misma partición Spark.

Pero no veo una manera de definir esto. La clase DataFrame tiene un método llamado ''repartición (Int)'', donde puede especificar el número de particiones para crear. Pero no veo ningún método disponible para definir un particionador personalizado para un DataFrame, como puede especificarse para un RDD.

Los datos de origen se almacenan en Parquet. Vi que al escribir un DataFrame en Parquet, puede especificar una columna para particionar, por lo que presumiblemente podría decirle a Parquet que particione sus datos en la columna ''Cuenta''. Pero podría haber millones de cuentas, y si entiendo Parquet correctamente, crearía un directorio distinto para cada Cuenta, por lo que no parecía una solución razonable.

¿Hay alguna manera de hacer que Spark particione este DataFrame para que todos los datos de una Cuenta estén en la misma partición?


En Spark <1.6 Si crea un HiveContext , no el viejo SqlContext , puede usar HiveQL DISTRIBUTE BY colX... (garantiza que cada uno de los reductores N obtenga rangos no superpuestos de x) & CLUSTER BY colX... (acceso directo para Distribuir por y ordenar por) por ejemplo;

df.registerTempTable("partitionMe") hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

No estoy seguro de cómo encaja esto con la API de Spark DF. Estas palabras clave no son compatibles con el SqlContext normal (tenga en cuenta que no necesita tener una meta tienda de colmena para usar HiveContext)

EDITAR: Spark 1.6+ ahora tiene esto en la API nativa de DataFrame


Para comenzar con algún tipo de respuesta:) - No puedes

No soy un experto, pero hasta donde entiendo DataFrames, no son iguales a rdd y DataFrame no tiene tal cosa como Partitioner.

En general, la idea de DataFrame es proporcionar otro nivel de abstracción que maneje tales problemas por sí mismo. Las consultas en DataFrame se traducen en un plan lógico que se traduce aún más en operaciones en RDD. La partición que sugirió probablemente se aplicará automáticamente o al menos debería.

Si no confía en SparkSQL que proporcionará algún tipo de trabajo óptimo, siempre puede transformar DataFrame a RDD [Row] como se sugiere en los comentarios.


Pude hacer esto usando RDD. Pero no sé si esta es una solución aceptable para usted. Una vez que tenga el DF disponible como RDD, puede aplicar repartitionAndSortWithinPartitions para realizar un reparto de datos personalizado.

Aquí hay una muestra que utilicé:

class DatePartitioner(partitions: Int) extends Partitioner { override def getPartition(key: Any): Int = { val start_time: Long = key.asInstanceOf[Long] Objects.hash(Array(start_time)) % partitions } override def numPartitions: Int = partitions } myRDD .repartitionAndSortWithinPartitions(new DatePartitioner(24)) .map { v => v._2 } .toDF() .write.mode(SaveMode.Overwrite)


Use el DataFrame devuelto por:

yourDF.orderBy(account)

No hay una forma explícita de utilizar la partitionBy en un DataFrame, solo en un PairRDD, pero cuando ordena un DataFrame, lo usará en su LogicalPlan y eso ayudará cuando necesite hacer cálculos en cada Cuenta.

Me topé con el mismo problema exacto, con un marco de datos que quiero particionar por cuenta. Supongo que cuando dice "desea tener los datos particionados para que todas las transacciones de una cuenta estén en la misma partición Spark", lo desea por escala y rendimiento, pero su código no depende de ello (como usar mapPartitions() etc.), ¿verdad?