hadoop - example - spark sql español
Guarde el marco de datos de Spark como tabla particionada dinámica en Hive (5)
Tengo una aplicación de ejemplo trabajando para leer desde archivos csv a un marco de datos. El marco de datos se puede almacenar en una tabla Hive en formato de parquet utilizando el método df.saveAsTable(tablename,mode)
.
El código anterior funciona bien, pero tengo tantos datos para cada día que quiero particionar dinámicamente la tabla hive en función de la fecha de creación (columna de la tabla).
¿Hay alguna forma de particionar dinámicamente el marco de datos y almacenarlo en el almacén de la colmena? Desee abstenerse de codificar la instrucción de inserción mediante hivesqlcontext.sql(insert into table partittioin by(date)....)
.
La pregunta puede considerarse como una extensión de: ¿Cómo guardar DataFrame directamente en Hive?
Cualquier ayuda es muy apreciada.
Creo que funciona algo como esto:
df
es un marco de datos con año, mes y otras columnas
df.write.partitionBy(''year'', ''month'').saveAsTable(...)
o
df.write.partitionBy(''year'', ''month'').insertInto(...)
Esto es lo que funciona para mí. Establecí estos ajustes y luego puse los datos en tablas particionadas.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode",
"nonstrict")
Esto me funcionó usando python y spark 2.1.0.
No estoy seguro si es la mejor manera de hacer esto pero funciona ...
# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession /
.builder /
.master("local[*]") /
.config("hive.exec.dynamic.partition", "true") /
.config("hive.exec.dynamic.partition.mode", "nonstrict") /
.enableHiveSupport() /
.getOrCreate()
### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat ''PARQUET'')
PARTITIONED BY (partition_bin)
LOCATION ''hive_df''
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, ''init_record'')
""")
###
### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION ''non_hive_df''
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, ''init_record'')
""")
###
### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, ''new_record'', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, ''new_record'', 1)
""")
spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
Pude escribir en la tabla de la sección dividida usando df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")
Tuve que habilitar las siguientes propiedades para que funcionara.
hiveContext.setConf("hive.exec.dynamic.partition", "true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
También me enfrenté a lo mismo pero usando los siguientes trucos que resolví.
Cuando hacemos cualquier tabla como particionada, la columna particionada se hace sensible a mayúsculas y minúsculas
La columna particionada debe estar presente en DataFrame con el mismo nombre (distingue entre mayúsculas y minúsculas). Código:
var dbName="your database name" var finaltable="your table name" // First check if table is available or not.. if (sparkSession.sql("show tables in " + dbName).filter("tableName=''" +finaltable + "''").collect().length == 0) { //If table is not available then it will create for you.. println("Table Not Present /n Creating table " + finaltable) sparkSession.sql("use Database_Name") sparkSession.sql("SET hive.exec.dynamic.partition = true") sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ") sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400") sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID string,EMP_Name string,EMP_Address string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)") //Table is created now insert the DataFrame in append Mode df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable) }