scala - org - ¿Por qué falla la aplicación Spark con la "ClassNotFoundException: no se pudo encontrar la fuente de datos: kafka" como uber-jar con el ensamblado sbt?
spark apache org download (7)
Esto es a la vista de la respuesta de Jacek Laskowski.
Aquellos de ustedes que construyen su proyecto en maven pueden probar esto. Agregue la línea mencionada a continuación a su plug-in de maven-shade.
META-INF / services / org.apache.spark.sql.sources.DataSourceRegister
He puesto el código del complemento para el archivo pom como ejemplo para mostrar dónde agregar la línea.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
</resource>
</transformer>
</transformers>
<finalName>${project.artifactId}-${project.version}-uber</finalName>
</configuration>
</execution>
</executions>
</plugin>
Por favor, disculpe mis habilidades de formateo.
Estoy intentando ejecutar un ejemplo como https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala . Comencé con la guía de programación de Spark Structured Streaming en http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html .
Mi codigo es
package io.boontadata.spark.job1
import org.apache.spark.sql.SparkSession
object DirectKafkaAggregateEvents {
val FIELD_MESSAGE_ID = 0
val FIELD_DEVICE_ID = 1
val FIELD_TIMESTAMP = 2
val FIELD_CATEGORY = 3
val FIELD_MEASURE1 = 4
val FIELD_MEASURE2 = 5
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
| <brokers> is a list of one or more Kafka brokers
| <subscribeType> sample value: subscribe
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(bootstrapServers, subscribeType, topics) = args
val spark = SparkSession
.builder
.appName("boontadata-spark-job1")
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
Agregué los siguientes archivos sbt:
build.sbt:
name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"
// META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
También agregué project / assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Esto crea un tarro de Uber con los tarros no provided
.
Lo presento con la siguiente línea:
spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
pero me sale este error de tiempo de ejecución:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
¿Hay alguna manera de saber qué clase no se encuentra para poder buscar el repositorio de maven.org para esa clase?
El código fuente de lookupDataSource
parece estar en la línea 543 en https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala lookupDataSource
https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala pero no pude encontrar un enlace directo con la fuente de datos Kafka ...
El código fuente completo está aquí: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
El problema es la siguiente sección en build.sbt
:
// META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
Dice que todos META-INF
registros META-INF
deben descartarse, incluido el "código" que hace que los alias de origen de datos (por ejemplo, kafka
) funcionen.
Pero los archivos META-INF
son muy importantes para que kafka
(y otros alias de fuentes de datos de transmisión).
Para que el alias kafka
funcione, Spark SQL utiliza META-INF/services/org.apache.spark.sql.sources.DataSourceRegister con la siguiente entrada:
org.apache.spark.sql.kafka010.KafkaSourceProvider
KafkaSourceProvider
es responsable de registrar el alias de kafka
con la fuente de datos de transmisión adecuada, es decir, KafkaSource .
Solo para verificar que el código real esté realmente disponible, pero el "código" que hace que se registre el alias no lo está, puede usar la fuente de datos kafka
con el nombre completo (no el alias) de la siguiente manera:
spark.readStream.
format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
load
Verá otros problemas debido a las opciones que faltan, como kafka.bootstrap.servers
, pero ... estamos divagando .
Una solución es para MergeStrategy.concat
todos los META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
(que crearía un uber-jar con todas las fuentes de datos, incluida la fuente de datos kafka
).
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
En mi caso, también obtuve este error al compilar con sbt, y la causa fue que el sbt assembly
no incluía el artefacto spark-sql-kafka-0-10_2.11
como parte del spark-sql-kafka-0-10_2.11
de grasa.
(Me gustaría recibir comentarios aquí. La dependencia no se especificó como un alcance, por lo que no se debe asumir que se "proporciona").
Así que cambié a implementar un jar normal (delgado) e --jars
las dependencias con los parámetros --jars
para enviar con chispa.
Para reunir todas las dependencias en un solo lugar, puede agregar retrieveManaged := true
a la configuración de su proyecto sbt, o puede, en la consola sbt, emitir:
> set retrieveManaged := true
> package
Eso debería traer todas las dependencias a la carpeta lib_managed
.
Luego puedes copiar todos esos archivos (con un comando bash puedes, por ejemplo, usar algo como esto
cd /path/to/your/project
JARLIST=$(find lib_managed -name ''*.jar''| paste -sd , -)
spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"
Estoy usando gradle como herramienta de compilación y el complemento shadowJar para crear el uberJar. La solución fue simplemente agregar un archivo.
src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
al proyecto.
En este archivo debe colocar, línea por línea, los nombres de clase de los DataSources que usa, en este caso serían org.apache.spark.sql.kafka010.KafkaSourceProvider
(encuentre el nombre de la clase, por ejemplo, META-INF/services/org.apache.spark.sql.sources.DataSourceRegister )
La razón es que Spark utiliza el ServiceLoader Java en sus mecanismos internos de administración de dependencias.
Ejemplo completo here .
Estoy usando la chispa 2.1 y enfrentando el mismo problema, mi solución es
1) spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
2) cd ~/.ivy2/jars
aquí estás, todos los tarros necesarios están en esta carpeta ahora
3) copie todos los archivos en esta carpeta a todos los nodos (puede crear una carpeta específica que los contenga)
4) agregue el nombre de la carpeta a spark.driver.extraClassPath
y spark.driver.extraClassPath
, por ejemplo, spark.driver.extraClassPath=/opt/jars/*:your_other_jars
5 spark-submit --class ClassNm --Other-Options YourJar.jar
funciona bien ahora
Intenté así que a mí me funciona. Envíe de esta forma y avíseme cuando tenga algún problema.
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
Lo resolví descargando el archivo jar al sistema del controlador. A partir de ahí, proporcioné el frasco para activar el envío con la opción --jar.
También se debe tener en cuenta que estaba empaquetando todo el entorno de la chispa 2.1 en mi uber jar (ya que mi grupo todavía está en 1.6.1) Por alguna razón, no se ha detectado cuando se incluye en uber jar.
spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class ClassNm --Other-Options YourJar.jar