java - una - La conversión de la tabla mysql al conjunto de datos de chispa es muy lenta en comparación con el mismo archivo csv
importar datos de excel a una tabla de mysql (2)
Tengo un archivo csv en Amazon s3 con un tamaño de 62mb (114 000 filas). Lo estoy convirtiendo en un conjunto de datos de chispa y tomando las primeras 500 filas de él. El código es el siguiente;
DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set=df.load("s3n://"+this.accessId.replace("/"", "")+":"+this.accessToken.replace("/"", "")+"@"+this.bucketName.replace("/"", "")+"/"+this.filePath.replace("/"", "")+"");
set.take(500)
Toda la operación tarda de 20 a 30 segundos.
Ahora estoy intentando lo mismo, pero en lugar de usar csv, estoy usando la tabla mySQL con 119 000 filas. El servidor MySQL está en amazon ec2. El código es el siguiente;
String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password;
SparkSession spark=StartSpark.getSparkSession();
SQLContext sc = spark.sqlContext();
DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set = sc
.read()
.option("url", url)
.option("dbtable", this.tableName)
.option("driver","com.mysql.jdbc.Driver")
.format("jdbc")
.load();
set.take(500);
Esto demora de 5 a 10 minutos. Estoy ejecutando chispa dentro de jvm. Utilizando la misma configuración en ambos casos.
Puedo usar partitionColumn, numParttition, etc., pero no tengo ninguna columna numérica y un problema más es que el esquema de la tabla es desconocido para mí.
Mi problema no es cómo disminuir el tiempo requerido, ya que sé que en el caso ideal la chispa se ejecutará en el clúster, pero lo que no puedo entender es ¿por qué esta gran diferencia horaria en los dos casos anteriores?
Por favor, siga los pasos a continuación
1.descargar una copia del conector JDBC para mysql. Creo que ya tienes uno.
wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar
2.crear un archivo db-properties.flat en el siguiente formato
jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}
user=<username>
password=<password>
3.cree primero una tabla vacía donde desea cargar los datos.
invocar chispa shell con la clase de controlador
spark-shell --driver-class-path <your path to mysql jar>
luego importe todo el paquete requerido
import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
iniciar un contexto de colmena o un contexto de sql
val sQLContext = new HiveContext(sc)
import sQLContext.implicits._
import sQLContext.sql
establecer algunas de las propiedades
sQLContext.setConf("hive.exec.dynamic.partition", "true")
sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
Cargar propiedades mysql db del archivo
val dbProperties = new Properties()
dbProperties.load(new FileInputStream(new File("your_path_to/db- properties.flat")))
val jdbcurl = dbProperties.getProperty("jdbcUrl")
crea una consulta para leer los datos de tu tabla y pásalos al método de lectura de #sqlcontext. aquí es donde puedes administrar tu cláusula where
val df1 = "(SELECT * FROM your_table_name) as s1"
pase el jdbcurl, seleccione consulta y propiedades de db para leer el método
val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)
escríbelo en tu mesa
df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")
Este problema ha sido cubierto varias veces en :
- ¿Cómo mejorar el rendimiento de los trabajos Spark lentos utilizando DataFrame y la conexión JDBC?
- spark jdbc df limit ... ¿qué está haciendo?
- ¿Cómo usar la fuente JDBC para escribir y leer datos en (Py) Spark?
y en fuentes externas:
solo para reiterar: de forma predeterminada, DataFrameReader.jdbc
no distribuye datos ni lee. Utiliza un solo hilo, un solo ejecutante.
Para distribuir lecturas:
usar rangos con
lowerBound
/upperBound
:Properties properties; Lower Dataset<Row> set = sc .read() .option("partitionColumn", "foo") .option("numPartitions", "3") .option("lowerBound", 0) .option("upperBound", 30) .option("url", url) .option("dbtable", this.tableName) .option("driver","com.mysql.jdbc.Driver") .format("jdbc") .load();
predicates
Properties properties; Dataset<Row> set = sc .read() .jdbc( url, this.tableName, {"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"}, properties )