tutorial spark example español ejemplo java apache-spark hadoop apache-spark-sql hdfs

java - example - spark sql español



Analice CSV como DataFrame/DataSet con Apache Spark y Java (4)

El archivo CSV se puede analizar con el lector de CSV incorporado de Spark . Devolverá DataFrame / DataSet en la lectura exitosa del archivo. En la parte superior de DataFrame / DataSet, puede aplicar operaciones similares a SQL fácilmente.

Usando Spark 2.x (y superior) con Java

Crear SparkSession objeto también conocido como spark

import org.apache.spark.sql.SparkSession; SparkSession spark = SparkSession .builder() .appName("Java Spark SQL Example") .getOrCreate();

Crear esquema para fila con StructType

import org.apache.spark.sql.types.StructType; StructType schema = new StructType() .add("department", "string") .add("designation", "string") .add("ctc", "long") .add("state", "string");

Cree un marco de datos a partir de un archivo CSV y aplíquele un esquema

Dataset<Row> df = spark.read() .option("mode", "DROPMALFORMED") .schema(schema) .csv("hdfs://path/input.csv");

Más opciones en la lectura de datos de archivo CSV

Ahora podemos agregar datos de 2 maneras.

1. forma SQL

Registre una tabla en metastore de spark sql para realizar la operación de SQL

df.createOrReplaceTempView("employee");

Ejecutar consulta SQL en dataframe registrado

Dataset<Row> sqlResult = spark.sql( "SELECT department, designation, state, SUM(ctc), COUNT(department)" + " FROM employee GROUP BY department, designation, state"); sqlResult.show(); //for testing

Incluso podemos ejecutar SQL directamente en un archivo CSV sin crear una tabla con Spark SQL

2. Encadenamiento de objetos o programación o forma similar a Java.

Haga la importación necesaria para las funciones SQL.

import static org.apache.spark.sql.functions.count; import static org.apache.spark.sql.functions.sum;

Utilice groupBy y agg en dataframe / dataset para realizar el count y la sum de los datos

Dataset<Row> dfResult = df.groupBy("department", "designation", "state") .agg(sum("ctc"), count("department")); // After Spark 1.6 columns mentioned in group by will be added to result by default dfResult.show();//for testing

bibliotecas dependientes

"org.apache.spark" % "spark-core_2.11" % "2.0.0" "org.apache.spark" % "spark-sql_2.11" % "2.0.0"

Soy nuevo en Spark, y quiero usar group-by & reduce para encontrar lo siguiente de CSV (una línea por empleado):

Department, Designation, costToCompany, State Sales, Trainee, 12000, UP Sales, Lead, 32000, AP Sales, Lead, 32000, LA Sales, Lead, 32000, TN Sales, Lead, 32000, AP Sales, Lead, 32000, TN Sales, Lead, 32000, LA Sales, Lead, 32000, LA Marketing, Associate, 18000, TN Marketing, Associate, 18000, TN HR, Manager, 58000, TN

Me gustaría simplificar el CSV con grupo por departamento, designación, estado con columnas adicionales con sum (costToCompany) y TotalEmployeeCount

Debería obtener un resultado como:

Dept, Desg, state, empCount, totalCost Sales,Lead,AP,2,64000 Sales,Lead,LA,3,96000 Sales,Lead,TN,2,64000

¿Hay alguna manera de lograr esto usando transformaciones y acciones? ¿O deberíamos ir a las operaciones de RDD?


Procedimiento

  • Cree una Clase (Esquema) para encapsular su estructura (no es necesario para el enfoque B, pero haría que su código sea más fácil de leer si está utilizando Java)

    public class Record implements Serializable { String department; String designation; long costToCompany; String state; // constructor , getters and setters }

  • Cargando archivo CVS (JSON)

    JavaSparkContext sc; JavaRDD<String> data = sc.textFile("path/input.csv"); //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified JavaRDD<Record> rdd_records = sc.textFile(data).map( new Function<String, Record>() { public Record call(String line) throws Exception { // Here you can use JSON // Gson gson = new Gson(); // gson.fromJson(line, Record.class); String[] fields = line.split(","); Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; } });

En este punto tienes 2 enfoques:

A. SparkSQL

  • Registre una tabla (usando la clase de esquema definida)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class); table.registerAsTable("record_table"); table.printSchema();

  • Consulta la tabla con tu Query-group-by deseado

    JavaSchemaRDD res = sqlContext.sql(" select department,designation,state,sum(costToCompany),count(*) from record_table group by department,designation,state ");

  • Aquí también podría hacer cualquier otra consulta que desee, utilizando un enfoque de SQL

B. Chispa

  • Mapeo usando una clave compuesta: Department , Designation , State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = rdd_records.mapToPair(new PairFunction<Record, String, Tuple2<Long, Integer>>(){ public Tuple2<String, Tuple2<Long, Integer>> call(Record record){ Tuple2<String, Tuple2<Long, Integer>> t2 = new Tuple2<String, Tuple2<Long,Integer>>( record.Department + record.Designation + record.State, new Tuple2<Long, Integer>(record.costToCompany,1) ); return t2; }

    });

  • reduceByKey utilizando la clave compuesta, sumando la columna costToCompany y acumulando el número de registros por clave

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long, Integer>>() { public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1, Tuple2<Long, Integer> v2) throws Exception { return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2); } });


Es posible que lo siguiente no sea del todo correcto, pero debería darle una idea de cómo hacer malabarismos con los datos. No es bonito, debe ser reemplazado por clases de casos, etc., pero como un ejemplo rápido de cómo usar el api de chispa, espero que sea suficiente :)

val rawlines = sc.textfile("hdfs://.../*.csv") case class Employee(dep: String, des: String, cost: Double, state: String) val employees = rawlines .map(_.split(",") /*or use a proper CSV parser*/ .map( Employee(row(0), row(1), row(2), row(3) ) # the 1 is the amount of employees (which is obviously 1 per line) val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost)) val results = keyVals.reduceByKey{ a,b => (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost ) } #debug output results.take(100).foreach(println) results .map( keyval => someThingToFormatAsCsvStringOrWhatever ) .saveAsTextFile("hdfs://.../results")

O puedes usar SparkSQL:

val sqlContext = new SQLContext(sparkContext) # case classes can easily be registered as tables employees.registerAsTable("employees") val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) from employees group by dep,des,state"""


Para JSON, si su archivo de texto contiene un objeto JSON por línea, puede usar sqlContext.jsonFile(path) para que Spark SQL lo cargue como SchemaRDD (el esquema se SchemaRDD automáticamente). Luego, puede registrarlo como una tabla y consultarlo con SQL. También puede cargar manualmente el archivo de texto como un RDD[String] contiene un objeto JSON por registro y usar sqlContext.jsonRDD(rdd) para convertirlo como un SchemaRDD . jsonRDD es útil cuando necesita preprocesar sus datos.