válido una tabla pasar número línea los importar exportar desde datos como columnas actualizar java mysql apache-spark jdbc amazon-s3

java - una - La conversión de la tabla mysql al conjunto de datos de chispa es muy lenta en comparación con el mismo archivo csv



importar datos de excel a una tabla de mysql (2)

Tengo un archivo csv en Amazon s3 con un tamaño de 62mb (114 000 filas). Lo estoy convirtiendo en un conjunto de datos de chispa y tomando las primeras 500 filas de él. El código es el siguiente;

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true); Dataset<Row> set=df.load("s3n://"+this.accessId.replace("/"", "")+":"+this.accessToken.replace("/"", "")+"@"+this.bucketName.replace("/"", "")+"/"+this.filePath.replace("/"", "")+""); set.take(500)

Toda la operación tarda de 20 a 30 segundos.

Ahora estoy intentando lo mismo, pero en lugar de usar csv, estoy usando la tabla mySQL con 119 000 filas. El servidor MySQL está en amazon ec2. El código es el siguiente;

String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password; SparkSession spark=StartSpark.getSparkSession(); SQLContext sc = spark.sqlContext(); DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true); Dataset<Row> set = sc .read() .option("url", url) .option("dbtable", this.tableName) .option("driver","com.mysql.jdbc.Driver") .format("jdbc") .load(); set.take(500);

Esto demora de 5 a 10 minutos. Estoy ejecutando chispa dentro de jvm. Utilizando la misma configuración en ambos casos.

Puedo usar partitionColumn, numParttition, etc., pero no tengo ninguna columna numérica y un problema más es que el esquema de la tabla es desconocido para mí.

Mi problema no es cómo disminuir el tiempo requerido, ya que sé que en el caso ideal la chispa se ejecutará en el clúster, pero lo que no puedo entender es ¿por qué esta gran diferencia horaria en los dos casos anteriores?


Por favor, siga los pasos a continuación

1.descargar una copia del conector JDBC para mysql. Creo que ya tienes uno.

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar

2.crear un archivo db-properties.flat en el siguiente formato

jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase} user=<username> password=<password>

3.cree primero una tabla vacía donde desea cargar los datos.

invocar chispa shell con la clase de controlador

spark-shell --driver-class-path <your path to mysql jar>

luego importe todo el paquete requerido

import java.io.{File, FileInputStream} import java.util.Properties import org.apache.spark.sql.SaveMode import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext}

iniciar un contexto de colmena o un contexto de sql

val sQLContext = new HiveContext(sc) import sQLContext.implicits._ import sQLContext.sql

establecer algunas de las propiedades

sQLContext.setConf("hive.exec.dynamic.partition", "true") sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

Cargar propiedades mysql db del archivo

val dbProperties = new Properties() dbProperties.load(new FileInputStream(new File("your_path_to/db- properties.flat"))) val jdbcurl = dbProperties.getProperty("jdbcUrl")

crea una consulta para leer los datos de tu tabla y pásalos al método de lectura de #sqlcontext. aquí es donde puedes administrar tu cláusula where

val df1 = "(SELECT * FROM your_table_name) as s1"

pase el jdbcurl, seleccione consulta y propiedades de db para leer el método

val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)

escríbelo en tu mesa

df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")


Este problema ha sido cubierto varias veces en :

  • ¿Cómo mejorar el rendimiento de los trabajos Spark lentos utilizando DataFrame y la conexión JDBC?
  • spark jdbc df limit ... ¿qué está haciendo?
  • ¿Cómo usar la fuente JDBC para escribir y leer datos en (Py) Spark?

y en fuentes externas:

solo para reiterar: de forma predeterminada, DataFrameReader.jdbc no distribuye datos ni lee. Utiliza un solo hilo, un solo ejecutante.

Para distribuir lecturas:

  • usar rangos con lowerBound / upperBound :

    Properties properties; Lower Dataset<Row> set = sc .read() .option("partitionColumn", "foo") .option("numPartitions", "3") .option("lowerBound", 0) .option("upperBound", 30) .option("url", url) .option("dbtable", this.tableName) .option("driver","com.mysql.jdbc.Driver") .format("jdbc") .load();

  • predicates

    Properties properties; Dataset<Row> set = sc .read() .jdbc( url, this.tableName, {"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"}, properties )