hive - read - spark sql java
¿Cómo guardar DataFrame directamente en Hive? (7)
¿Es posible guardar
DataFrame
en chispa directamente en Hive?
He intentado convertir
DataFrame
a
Rdd
y luego guardarlo como archivo de texto y luego cargarlo en la colmena.
Pero me pregunto si puedo guardar directamente el
dataframe
de
dataframe
para colmena
Aquí está la versión de PySpark para crear la tabla Hive a partir del archivo de parquet. Es posible que haya generado archivos de Parquet utilizando un esquema inferido y ahora desea insertar la definición en Hive metastore. También puede enviar definiciones al sistema como AWS Glue o AWS Athena y no solo a Hive metastore. Aquí estoy usando spark.sql para empujar / crear una tabla permanente.
# Location where my parquet files are present.
df = spark.read.parquet("s3://my-location/data/")
cols = df.dtypes
buf = []
buf.append(''CREATE EXTERNAL TABLE test123 ('')
keyanddatatypes = df.dtypes
sizeof = len(df.dtypes)
print ("size----------",sizeof)
count=1;
for eachvalue in keyanddatatypes:
print count,sizeof,eachvalue
if count == sizeof:
total = str(eachvalue[0])+str('' '')+str(eachvalue[1])
else:
total = str(eachvalue[0]) + str('' '') + str(eachvalue[1]) + str('','')
buf.append(total)
count = count + 1
buf.append('' )'')
buf.append('' STORED as parquet '')
buf.append("LOCATION")
buf.append("''")
buf.append(''s3://my-location/data/'')
buf.append("''")
buf.append("''")
##partition by pt
tabledef = ''''.join(buf)
print "---------print definition ---------"
print tabledef
## create a table using spark.sql. Assuming you are using spark 2.1+
spark.sql(tabledef);
Guardar en Hive es solo una cuestión de usar el método
write()
de su SQLContext:
df.write.saveAsTable(tableName)
Desde Spark 2.2: use DataSet en lugar de DataFrame.
No veo
df.write.saveAsTable(...)
en desuso en la documentación de Spark 2.0.
Nos ha funcionado en Amazon EMR.
Pudimos leer perfectamente los datos de S3 en un marco de datos, procesarlos, crear una tabla a partir del resultado y leerlos con MicroStrategy.
La respuesta de Vinays también ha funcionado.
Para las tablas externas de Hive, uso esta función en PySpark:
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
print("Saving result in {}.{}".format(database, table_name))
output_schema = "," /
.join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) /
.replace("StringType", "STRING") /
.replace("IntegerType", "INT") /
.replace("DateType", "DATE") /
.replace("LongType", "INT") /
.replace("TimestampType", "INT") /
.replace("BooleanType", "BOOLEAN") /
.replace("FloatType", "FLOAT")/
.replace("DoubleType","FLOAT")
output_schema = re.sub(r''DecimalType[(][0-9]+,[0-9]+[)]'', ''FLOAT'', output_schema)
sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))
query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION ''/user/hive/{}/{}''" /
.format(database, table_name, output_schema, save_format, database, table_name)
sparkSession.sql(query)
dataframe.write.insertInto(''{}.{}''.format(database, table_name),overwrite = True)
Puede crear una tabla temporal en memoria y almacenarla en la tabla de colmena usando sqlContext.
Digamos que su marco de datos es myDf. Puede crear una tabla temporal usando,
myDf.createOrReplaceTempView("mytempTable")
Luego puede usar una declaración de colmena simple para crear una tabla y volcar los datos de su tabla temporal.
sqlContext.sql("create table mytable as select * from mytempTable");
Utilice
DataFrameWriter.saveAsTable
.
(
df.write.saveAsTable(...)
) Consulte la
Guía de Spark SQL y DataFrame
.
necesitas tener / crear un HiveContext
import org.apache.spark.sql.hive.HiveContext;
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
Luego, guarde directamente el marco de datos o seleccione las columnas para almacenar como tabla de colmena
df es dataframe
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
o
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
o
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
SaveModes are Append / Ignore / Overwrite / ErrorIfExists
Agregué aquí la definición de HiveContext de Spark Documentation,
Además del SQLContext básico, también puede crear un HiveContext, que proporciona un superconjunto de la funcionalidad proporcionada por el SQLContext básico. Las características adicionales incluyen la capacidad de escribir consultas utilizando el analizador HiveQL más completo, el acceso a los UDF de Hive y la capacidad de leer datos de las tablas de Hive. Para usar un HiveContext, no necesita tener una configuración de Hive existente, y todas las fuentes de datos disponibles para un SQLContext aún están disponibles. HiveContext solo se empaqueta por separado para evitar incluir todas las dependencias de Hive en la compilación predeterminada de Spark.
en Spark versión 1.6.2, el uso de "dbName.tableName" da este error:
org.apache.spark.sql.AnalysisException: no se permite especificar el nombre de la base de datos u otros calificadores para las tablas temporales. Si el nombre de la tabla tiene puntos (.), Indique el nombre de la tabla con comillas invertidas ().