java - sparksession - spark core last version
Resolver problemas de dependencia en Apache Spark (7)
Además de la muy extensa respuesta ya dada por el usuario 7337271, si el problema resulta de la falta de dependencias externas, puede construir un jar con sus dependencias con, por ejemplo , el complemento de ensamblaje maven
En ese caso, asegúrese de marcar todas las dependencias centrales de chispa como "provistas" en su sistema de compilación y, como ya se señaló, asegúrese de que se correlacionen con su versión de chispa de tiempo de ejecución.
Los problemas comunes al construir e implementar aplicaciones Spark son:
-
java.lang.ClassNotFoundException
. -
object x is not a member of package y
errores de compilaciónobject x is not a member of package y
. -
java.lang.NoSuchMethodError
¿Cómo se pueden resolver?
Agregue todos los archivos jar de spark-2.4.0-bin-hadoop2.7 / spark-2.4.0-bin-hadoop2.7 / jars en el proyecto. El spark-2.4.0-bin-hadoop2.7 se puede descargar desde https://spark.apache.org/downloads.html
Al compilar e implementar aplicaciones de Spark, todas las dependencias requieren versiones compatibles.
-
Versión Scala . Todos los paquetes deben usar la misma versión Scala principal (2.10, 2.11, 2.12).
Considere seguir (incorrecto)
build.sbt
:name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
Utilizamos
spark-streaming
porspark-streaming
para Scala 2.10, mientras que los paquetes restantes son para Scala 2.11. Un archivo válido podría sername := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.11" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
pero es mejor especificar la versión globalmente y usar
%%
:name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1", "org.apache.spark" %% "spark-streaming" % "2.0.1", "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1" )
Del mismo modo en Maven:
<project> <groupId>com.example</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <properties> <spark.version>2.0.1</spark.version> </properties> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>spark-streaming-twitter_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies> </project>
-
Versión de Spark Todos los paquetes deben usar la misma versión principal de Spark (1.6, 2.0, 2.1, ...).
Considere seguir (incorrecto) build.sbt:
name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "1.6.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
Usamos
spark-core
1.6 mientras que los componentes restantes están en Spark 2.0. Un archivo válido podría sername := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
pero es mejor usar una variable:
name := "Simple Project" version := "1.0" val sparkVersion = "2.0.1" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % sparkVersion, "org.apache.spark" % "spark-streaming_2.10" % sparkVersion, "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion )
Del mismo modo en Maven:
<project> <groupId>com.example</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <properties> <spark.version>2.0.1</spark.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>spark-streaming-twitter_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> </dependencies> </project>
-
La versión de Spark utilizada en las dependencias de Spark debe coincidir con la versión de Spark de la instalación de Spark. Por ejemplo, si usa 1.6.1 en el clúster, debe usar 1.6.1 para construir jarras. Las versiones menores no coinciden no siempre se aceptan.
-
La versión de Scala utilizada para construir jar tiene que coincidir con la versión de Scala utilizada para construir Spark implementado. Por defecto (binarios descargables y compilaciones predeterminadas):
- Spark 1.x -> Scala 2.10
- Spark 2.x -> Scala 2.11
-
Los paquetes adicionales deben estar accesibles en los nodos de trabajo si se incluyen en el frasco gordo. Hay varias opciones que incluyen:
-
--jars
argumento de--jars
paraspark-submit
- para distribuir archivosjar
locales. -
--packages
argumento de--packages
paraspark-submit
- para buscar dependencias del repositorio de Maven.
Al enviar en el nodo del clúster, debe incluir el
jar
aplicación en--jars
. -
Creo que este problema debe resolver un complemento de ensamblaje. Necesitas construir un tarro gordo. Por ejemplo en sbt:
-
agregue el archivo
$PROJECT_ROOT/project/assembly.sbt
con el códigoaddSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
-
to build.sbt
added some libraries
libraryDependencies ++ = Seq ("com.some.company" %% "some-lib"% "1.0.0") ` - en la consola sbt, ingrese "ensamblaje" e implemente el jar de ensamblaje
Si necesita más información, vaya a https://github.com/sbt/sbt-assembly
El classpath de Apache Spark está construido dinámicamente (para acomodar el código de usuario por aplicación) lo que lo hace vulnerable a tales problemas. La respuesta de es correcta, pero hay algunas preocupaciones más, dependiendo del administrador de clúster ("maestro") que esté utilizando.
Primero, una aplicación Spark consta de estos componentes (cada uno es una JVM separada, por lo tanto, potencialmente contiene diferentes clases en su classpath):
-
Controlador
: esa es
su
aplicación que crea una
SparkSession
(oSparkContext
) y se conecta a un administrador de clúster para realizar el trabajo real - Administrador de clústeres : sirve como un "punto de entrada" al clúster, a cargo de asignar ejecutores para cada aplicación. Hay varios tipos diferentes admitidos en Spark: independiente, YARN y Mesos, que describiremos a continuación.
- Ejecutores : estos son los procesos en los nodos del clúster, que realizan el trabajo real (ejecución de tareas de Spark)
La relación entre estos se describe en este diagrama de la descripción general del modo de clúster de Apache Spark:
Ahora, ¿ qué clases deberían residir en cada uno de estos componentes?
Esto se puede responder con el siguiente diagrama:
Analicemos eso lentamente:
-
Spark Code son las bibliotecas de Spark. Deben existir en los tres componentes, ya que incluyen el pegamento que permite a Spark realizar la comunicación entre ellos. Por cierto, los autores de Spark tomaron la decisión de diseño de incluir código para TODOS los componentes en TODOS los componentes (por ejemplo, incluir código que solo debería ejecutarse en Executor en el controlador también) para simplificar esto, por lo que el "tarro gordo" de Spark (en versiones hasta 1.6 ) o "archivo" (en 2.0, detalles a continuación) contienen el código necesario para todos los componentes y deben estar disponibles en todos ellos.
-
Código de solo controlador: este es un código de usuario que no incluye nada que deba usarse en los Ejecutores, es decir, código que no se usa en ninguna transformación en el RDD / DataFrame / Dataset. Esto no necesariamente tiene que estar separado del código de usuario distribuido, pero puede serlo.
-
Código distribuido: este es un código de usuario que se compila con el código del controlador, pero que también debe ejecutarse en los Ejecutores; todo lo que usan las transformaciones reales debe incluirse en este (s) jar (s).
Ahora que lo entendimos bien, ¿cómo hacemos que las clases se carguen correctamente en cada componente y qué reglas deben seguir?
-
Código de Spark : como indican las respuestas anteriores, debe usar las mismas versiones de Scala y Spark en todos los componentes.
1.1 En el modo independiente , hay una instalación de Spark "preexistente" a la que se pueden conectar las aplicaciones (controladores). Eso significa que todos los controladores deben usar la misma versión de Spark que se ejecuta en el maestro y los ejecutores.
1.2 En YARN / Mesos , cada aplicación puede usar una versión diferente de Spark, pero todos los componentes de la misma aplicación deben usar la misma. Eso significa que si usó la versión X para compilar y empaquetar su aplicación de controlador, debe proporcionar la misma versión al iniciar SparkSession (por ejemplo, a través de los parámetros
spark.yarn.archive
ospark.yarn.jars
cuando usa YARN). Los jars / archive que proporcione deben incluir todas las dependencias de Spark ( incluidas las dependencias transitivas ), y el administrador del clúster lo enviará a cada ejecutor cuando se inicie la aplicación. -
Código del controlador : eso depende totalmente: el código del controlador se puede enviar como un montón de frascos o un "tarro gordo", siempre que incluya todas las dependencias de Spark + todo el código de usuario
-
Código distribuido : además de estar presente en el controlador, este código debe enviarse a los ejecutores (nuevamente, junto con todas sus dependencias transitivas). Esto se hace usando el parámetro
spark.jars
.
Para resumir , aquí hay un enfoque sugerido para construir e implementar una aplicación Spark (en este caso, usando YARN):
- Cree una biblioteca con su código distribuido, empaquételo como un jar "regular" (con un archivo .pom que describa sus dependencias) y como un "jar gordo" (con todas sus dependencias transitivas incluidas).
- Cree una aplicación de controlador, con dependencias de compilación en su biblioteca de códigos distribuidos y en Apache Spark (con una versión específica)
- Empaquete la aplicación del controlador en un tarro gordo para implementarlo en el controlador
-
Pase la versión correcta de su código distribuido como el valor del parámetro
spark.jars
al iniciarSparkSession
-
Pase la ubicación de un archivo de almacenamiento (por ejemplo, gzip) que contenga todos los archivos
spark.yarn.archive
carpetalib/
de los archivos binarios de Spark descargados como el valor despark.yarn.archive
Las clases de dependencia de su aplicación se especificarán en la opción de aplicación-jar de su comando de inicio.
Se pueden encontrar más detalles en la documentación de Spark
Tomado de la documentación:
application-jar: ruta a un jar incluido que incluye su aplicación y todas las dependencias. La URL debe ser visible globalmente dentro de su clúster, por ejemplo, una ruta hdfs: // o una ruta file: // que esté presente en todos los nodos
Tengo el siguiente build.sbt
lazy val root = (project in file(".")).
settings(
name := "spark-samples",
version := "1.0",
scalaVersion := "2.11.12",
mainClass in Compile := Some("StreamingExample")
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0",
"org.apache.spark" %% "spark-streaming" % "2.4.0",
"org.apache.spark" %% "spark-sql" % "2.4.0",
"com.couchbase.client" %% "spark-connector" % "2.2.0"
)
// META-INF discarding
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
He creado un tarro gordo de mi aplicación usando el complemento de ensamblaje sbt, pero cuando se ejecuta usando spark-submit falla con el error:
java.lang.NoClassDefFoundError: rx/Completable$OnSubscribe
at com.couchbase.spark.connection.CouchbaseConnection.streamClient(CouchbaseConnection.scala:154)
Puedo ver que la clase existe en mi tarro gordo:
jar tf target/scala-2.11/spark-samples-assembly-1.0.jar | grep ''Completable$OnSubscribe''
rx/Completable$OnSubscribe.class
No estoy seguro de lo que me estoy perdiendo aquí, ¿alguna pista?