tgz spark org scala apache-spark sbt sbt-assembly spark-structured-streaming

scala - org - ¿Por qué falla la aplicación Spark con la "ClassNotFoundException: no se pudo encontrar la fuente de datos: kafka" como uber-jar con el ensamblado sbt?



spark apache org download (7)

Esto es a la vista de la respuesta de Jacek Laskowski.

Aquellos de ustedes que construyen su proyecto en maven pueden probar esto. Agregue la línea mencionada a continuación a su plug-in de maven-shade.

META-INF / services / org.apache.spark.sql.sources.DataSourceRegister

He puesto el código del complemento para el archivo pom como ejemplo para mostrar dónde agregar la línea.

<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource> META-INF/services/org.apache.spark.sql.sources.DataSourceRegister </resource> </transformer> </transformers> <finalName>${project.artifactId}-${project.version}-uber</finalName> </configuration> </execution> </executions> </plugin>

Por favor, disculpe mis habilidades de formateo.

Estoy intentando ejecutar un ejemplo como https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala . Comencé con la guía de programación de Spark Structured Streaming en http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html .

Mi codigo es

package io.boontadata.spark.job1 import org.apache.spark.sql.SparkSession object DirectKafkaAggregateEvents { val FIELD_MESSAGE_ID = 0 val FIELD_DEVICE_ID = 1 val FIELD_TIMESTAMP = 2 val FIELD_CATEGORY = 3 val FIELD_MEASURE1 = 4 val FIELD_MEASURE2 = 5 def main(args: Array[String]) { if (args.length < 3) { System.err.println(s""" |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics> | <brokers> is a list of one or more Kafka brokers | <subscribeType> sample value: subscribe | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) } val Array(bootstrapServers, subscribeType, topics) = args val spark = SparkSession .builder .appName("boontadata-spark-job1") .getOrCreate() import spark.implicits._ // Create DataSet representing the stream of input lines from kafka val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .selectExpr("CAST(value AS STRING)") .as[String] // Generate running word count val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() // Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }

Agregué los siguientes archivos sbt:

build.sbt:

name := "boontadata-spark-job1" version := "0.1" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided" libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided" libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided" libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1" libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1" // META-INF discarding assemblyMergeStrategy in assembly := { { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first } }

También agregué project / assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Esto crea un tarro de Uber con los tarros no provided .

Lo presento con la siguiente línea:

spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic

pero me sale este error de tiempo de ejecución:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41) at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132) ... 18 more 16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook

¿Hay alguna manera de saber qué clase no se encuentra para poder buscar el repositorio de maven.org para esa clase?

El código fuente de lookupDataSource parece estar en la línea 543 en https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala lookupDataSource https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala pero no pude encontrar un enlace directo con la fuente de datos Kafka ...

El código fuente completo está aquí: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f


El problema es la siguiente sección en build.sbt :

// META-INF discarding assemblyMergeStrategy in assembly := { { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first } }

Dice que todos META-INF registros META-INF deben descartarse, incluido el "código" que hace que los alias de origen de datos (por ejemplo, kafka ) funcionen.

Pero los archivos META-INF son muy importantes para que kafka (y otros alias de fuentes de datos de transmisión).

Para que el alias kafka funcione, Spark SQL utiliza META-INF/services/org.apache.spark.sql.sources.DataSourceRegister con la siguiente entrada:

org.apache.spark.sql.kafka010.KafkaSourceProvider

KafkaSourceProvider es responsable de registrar el alias de kafka con la fuente de datos de transmisión adecuada, es decir, KafkaSource .

Solo para verificar que el código real esté realmente disponible, pero el "código" que hace que se registre el alias no lo está, puede usar la fuente de datos kafka con el nombre completo (no el alias) de la siguiente manera:

spark.readStream. format("org.apache.spark.sql.kafka010.KafkaSourceProvider"). load

Verá otros problemas debido a las opciones que faltan, como kafka.bootstrap.servers , pero ... estamos divagando .

Una solución es para MergeStrategy.concat todos los META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (que crearía un uber-jar con todas las fuentes de datos, incluida la fuente de datos kafka ).

case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat


En mi caso, también obtuve este error al compilar con sbt, y la causa fue que el sbt assembly no incluía el artefacto spark-sql-kafka-0-10_2.11 como parte del spark-sql-kafka-0-10_2.11 de grasa.

(Me gustaría recibir comentarios aquí. La dependencia no se especificó como un alcance, por lo que no se debe asumir que se "proporciona").

Así que cambié a implementar un jar normal (delgado) e --jars las dependencias con los parámetros --jars para enviar con chispa.

Para reunir todas las dependencias en un solo lugar, puede agregar retrieveManaged := true a la configuración de su proyecto sbt, o puede, en la consola sbt, emitir:

> set retrieveManaged := true > package

Eso debería traer todas las dependencias a la carpeta lib_managed .

Luego puedes copiar todos esos archivos (con un comando bash puedes, por ejemplo, usar algo como esto

cd /path/to/your/project JARLIST=$(find lib_managed -name ''*.jar''| paste -sd , -) spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"


Estoy usando gradle como herramienta de compilación y el complemento shadowJar para crear el uberJar. La solución fue simplemente agregar un archivo.

src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

al proyecto.

En este archivo debe colocar, línea por línea, los nombres de clase de los DataSources que usa, en este caso serían org.apache.spark.sql.kafka010.KafkaSourceProvider (encuentre el nombre de la clase, por ejemplo, META-INF/services/org.apache.spark.sql.sources.DataSourceRegister )

La razón es que Spark utiliza el ServiceLoader Java en sus mecanismos internos de administración de dependencias.

Ejemplo completo here .


Estoy usando la chispa 2.1 y enfrentando el mismo problema, mi solución es

1) spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

2) cd ~/.ivy2/jars aquí estás, todos los tarros necesarios están en esta carpeta ahora

3) copie todos los archivos en esta carpeta a todos los nodos (puede crear una carpeta específica que los contenga)

4) agregue el nombre de la carpeta a spark.driver.extraClassPath y spark.driver.extraClassPath , por ejemplo, spark.driver.extraClassPath=/opt/jars/*:your_other_jars

5 spark-submit --class ClassNm --Other-Options YourJar.jar funciona bien ahora


Intenté así que a mí me funciona. Envíe de esta forma y avíseme cuando tenga algún problema.

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar


Lo resolví descargando el archivo jar al sistema del controlador. A partir de ahí, proporcioné el frasco para activar el envío con la opción --jar.

También se debe tener en cuenta que estaba empaquetando todo el entorno de la chispa 2.1 en mi uber jar (ya que mi grupo todavía está en 1.6.1) Por alguna razón, no se ha detectado cuando se incluye en uber jar.

spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class ClassNm --Other-Options YourJar.jar