scala apache-spark intellij-idea

scala - Ejecutando un recuento de palabras Spark en IntelliJ



apache-spark intellij-idea (3)

En IntelliJ Idea, haga clic en Archivo -> Nuevo -> Proyecto -> Escala -> SBT -> (seleccione ubicación y nombre para el proyecto) -> Finalizar.

Escribir en build.sbt

scalaVersion := "2.11.11" libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"

sbt update en la línea de comandos (desde dentro de la carpeta principal del proyecto) o presiona el botón Actualizar en la ventana de SBT Tool dentro de IntelliJ Idea).

Escriba su código en src/main/scala/WordCount.scala

import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]) { val conf = new SparkConf() .setMaster("local") .setAppName("Word Count") .setSparkHome("src/main/resources") val sc = new SparkContext(conf) val input = sc.textFile("src/main/resources/input.txt") val count = input.flatMap(line ⇒ line.split(" ")) .map(word ⇒ (word, 1)) .reduceByKey(_ + _) count.saveAsTextFile("src/main/resources/outfile") println("OK") } }

Pon tu archivo como src/main/resources/input.txt

Ejecute su código: Ctrl + Shift + F10 o sbt run

En la carpeta src/main/resources debería aparecer una nueva subcarpeta outfile con varios archivos.

Salida de la consola:

Using Spark''s default log4j profile: org/apache/spark/log4j-defaults.properties 17/09/02 14:57:08 INFO SparkContext: Running Spark version 2.2.0 17/09/02 14:57:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/09/02 14:57:09 WARN Utils: Your hostname, dmitin-HP-Pavilion-Notebook resolves to a loopback address: 127.0.1.1; using 192.168.1.104 instead (on interface wlan0) 17/09/02 14:57:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 17/09/02 14:57:09 INFO SparkContext: Submitted application: Word Count 17/09/02 14:57:09 INFO SecurityManager: Changing view acls to: dmitin 17/09/02 14:57:09 INFO SecurityManager: Changing modify acls to: dmitin 17/09/02 14:57:09 INFO SecurityManager: Changing view acls groups to: 17/09/02 14:57:09 INFO SecurityManager: Changing modify acls groups to: 17/09/02 14:57:09 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dmitin); groups with view permissions: Set(); users with modify permissions: Set(dmitin); groups with modify permissions: Set() 17/09/02 14:57:10 INFO Utils: Successfully started service ''sparkDriver'' on port 38186. 17/09/02 14:57:10 INFO SparkEnv: Registering MapOutputTracker 17/09/02 14:57:10 INFO SparkEnv: Registering BlockManagerMaster 17/09/02 14:57:10 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 17/09/02 14:57:10 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 17/09/02 14:57:10 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-d90a4735-6a2b-42b2-85ea-55b0ed9b1dfd 17/09/02 14:57:10 INFO MemoryStore: MemoryStore started with capacity 1950.3 MB 17/09/02 14:57:10 INFO SparkEnv: Registering OutputCommitCoordinator 17/09/02 14:57:10 INFO Utils: Successfully started service ''SparkUI'' on port 4040. 17/09/02 14:57:11 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.104:4040 17/09/02 14:57:11 INFO Executor: Starting executor ID driver on host localhost 17/09/02 14:57:11 INFO Utils: Successfully started service ''org.apache.spark.network.netty.NettyBlockTransferService'' on port 46432. 17/09/02 14:57:11 INFO NettyBlockTransferService: Server created on 192.168.1.104:46432 17/09/02 14:57:11 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 17/09/02 14:57:11 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.104, 46432, None) 17/09/02 14:57:11 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.104:46432 with 1950.3 MB RAM, BlockManagerId(driver, 192.168.1.104, 46432, None) 17/09/02 14:57:11 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.104, 46432, None) 17/09/02 14:57:11 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.104, 46432, None) 17/09/02 14:57:12 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 214.5 KB, free 1950.1 MB) 17/09/02 14:57:12 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.4 KB, free 1950.1 MB) 17/09/02 14:57:12 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.104:46432 (size: 20.4 KB, free: 1950.3 MB) 17/09/02 14:57:12 INFO SparkContext: Created broadcast 0 from textFile at WordCount.scala:16 17/09/02 14:57:12 INFO FileInputFormat: Total input paths to process : 1 17/09/02 14:57:12 INFO SparkContext: Starting job: saveAsTextFile at WordCount.scala:20 17/09/02 14:57:12 INFO DAGScheduler: Registering RDD 3 (map at WordCount.scala:18) 17/09/02 14:57:12 INFO DAGScheduler: Got job 0 (saveAsTextFile at WordCount.scala:20) with 1 output partitions 17/09/02 14:57:12 INFO DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at WordCount.scala:20) 17/09/02 14:57:12 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 17/09/02 14:57:12 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 17/09/02 14:57:12 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:18), which has no missing parents 17/09/02 14:57:13 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 1950.1 MB) 17/09/02 14:57:13 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 1950.1 MB) 17/09/02 14:57:13 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.104:46432 (size: 2.7 KB, free: 1950.3 MB) 17/09/02 14:57:13 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 17/09/02 14:57:13 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:18) (first 15 tasks are for partitions Vector(0)) 17/09/02 14:57:13 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 17/09/02 14:57:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 4873 bytes) 17/09/02 14:57:13 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 17/09/02 14:57:13 INFO HadoopRDD: Input split: file:/home/dmitin/Projects/sparkdemo/src/main/resources/input.txt:0+11 17/09/02 14:57:13 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1154 bytes result sent to driver 17/09/02 14:57:13 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 289 ms on localhost (executor driver) (1/1) 17/09/02 14:57:13 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:18) finished in 0,321 s 17/09/02 14:57:13 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 17/09/02 14:57:13 INFO DAGScheduler: looking for newly runnable stages 17/09/02 14:57:13 INFO DAGScheduler: running: Set() 17/09/02 14:57:13 INFO DAGScheduler: waiting: Set(ResultStage 1) 17/09/02 14:57:13 INFO DAGScheduler: failed: Set() 17/09/02 14:57:13 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCount.scala:20), which has no missing parents 17/09/02 14:57:13 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 65.3 KB, free 1950.0 MB) 17/09/02 14:57:13 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 23.3 KB, free 1950.0 MB) 17/09/02 14:57:13 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.104:46432 (size: 23.3 KB, free: 1950.3 MB) 17/09/02 14:57:13 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006 17/09/02 14:57:13 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at WordCount.scala:20) (first 15 tasks are for partitions Vector(0)) 17/09/02 14:57:13 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 17/09/02 14:57:13 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 4621 bytes) 17/09/02 14:57:13 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 17/09/02 14:57:13 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 17/09/02 14:57:13 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms 17/09/02 14:57:13 INFO FileOutputCommitter: Saved output of task ''attempt_20170902145712_0001_m_000000_1'' to file:/home/dmitin/Projects/sparkdemo/src/main/resources/outfile/_temporary/0/task_20170902145712_0001_m_000000 17/09/02 14:57:13 INFO SparkHadoopMapRedUtil: attempt_20170902145712_0001_m_000000_1: Committed 17/09/02 14:57:13 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1224 bytes result sent to driver 17/09/02 14:57:13 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 221 ms on localhost (executor driver) (1/1) 17/09/02 14:57:13 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 17/09/02 14:57:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at WordCount.scala:20) finished in 0,223 s 17/09/02 14:57:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at WordCount.scala:20, took 1,222133 s OK 17/09/02 14:57:13 INFO SparkContext: Invoking stop() from shutdown hook 17/09/02 14:57:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.104:4040 17/09/02 14:57:13 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 17/09/02 14:57:13 INFO MemoryStore: MemoryStore cleared 17/09/02 14:57:13 INFO BlockManager: BlockManager stopped 17/09/02 14:57:13 INFO BlockManagerMaster: BlockManagerMaster stopped 17/09/02 14:57:13 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 17/09/02 14:57:13 INFO SparkContext: Successfully stopped SparkContext 17/09/02 14:57:13 INFO ShutdownHookManager: Shutdown hook called 17/09/02 14:57:13 INFO ShutdownHookManager: Deleting directory /tmp/spark-663047b2-415a-45b5-bcad-20bd18270baa Process finished with exit code 0

Pasé horas revisando videos y tutoriales de You Tube tratando de entender cómo ejecuto un programa de recuento de palabras para Spark, en Scala, y lo convierto en un archivo jar. Me estoy confundiendo por completo ahora.

Tengo Hello World ejecutándose, y he aprendido acerca de ir a las bibliotecas para agregar en Apache.spark.spark-core, pero ahora estoy obteniendo

Error: Could not find or load main class WordCount

Además, estoy completamente desconcertado sobre por qué estos dos tutoriales que pensé que estábamos enseñando lo mismo parecen diferir mucho: tutorial1 tutorial2

El segundo parece ser dos veces más largo que el primero y arroja cosas que el primero no mencionó. ¿Debería confiar en cualquiera de estos para ayudarme a obtener un programa simple de recuento de palabras y ponerme en marcha?

PD. Mi código actualmente se ve así. Lo copié de alguna parte:

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object WordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) val input = sc.textFile("../Data/input.txt") val count = input.flatMap(line ⇒ line.split(" ")) .map(word ⇒ (word, 1)) .reduceByKey(_ + _) count.saveAsTextFile("outfile") System.out.println("OK"); } }


Siempre puedes hacer la aplicación WordCount extends y esto debería funcionar. Creo que se trata de la forma en que ha estructurado su proyecto.

Lea más sobre el rasgo de la aplicación aquí.

http://www.scala-lang.org/api/2.12.1/scala/App.html

En cualquier caso, asegúrese de que el diseño de su directorio sea así.

./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/WordCount.scala


verifique el código de muestra que escribí a continuación

package com.spark.app import org.scalatra._ import org.apache.spark.{ SparkContext, SparkConf } class MySparkAppServlet extends MySparkAppStack { get("/wc") { val inputFile = "/home/limitless/Documents/projects/test/my-spark-app/README.md" val outputFile = "/home/limitless/Documents/projects/test/my-spark-app/README.txt" val conf = new SparkConf().setAppName("wordCount").setMaster("local[*]") val sc = new SparkContext(conf) val input = sc.textFile(inputFile) val words = input.flatMap(line => line.split(" ")) val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y} counts.saveAsTextFile(outputFile) } }