apache-spark spark-cassandra-connector google-cloud-dataproc

apache spark - Versión de guayaba mientras se usa chispa



apache-spark spark-cassandra-connector (3)

Estoy intentando usar el conector spark-cassandra mediante spark-shell en dataproc, pero no puedo conectarme a mi clúster. Parece que hay una discrepancia en la versión ya que classpath incluye una versión de guayaba mucho más antigua de otra parte, incluso cuando especifico la versión correcta al inicio. Sospecho que esto probablemente sea causado por todas las dependencias de Hadoop puestas en la ruta de clase de forma predeterminada.

¿Hay alguna manera de tener el uso de la chispa-cáscara solamente la versión apropiada de la guayaba, sin deshacerse de todos los tarros incluidos en el proceso de datos relacionados con Hadoop?

Datos relevantes:

Iniciando spark-shell, mostrando que tiene la versión correcta de guayaba: $ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

:: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar!/org/apache/ivy/core/settings/ivysettings.xml com.datastax.spark#spark-cassandra-connector_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 in central found org.apache.cassandra#cassandra-clientutil;2.2.2 in central found com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 in central found io.netty#netty-handler;4.0.27.Final in central found io.netty#netty-buffer;4.0.27.Final in central found io.netty#netty-common;4.0.27.Final in central found io.netty#netty-transport;4.0.27.Final in central found io.netty#netty-codec;4.0.27.Final in central found com.codahale.metrics#metrics-core;3.0.2 in central found org.slf4j#slf4j-api;1.7.5 in central found org.apache.commons#commons-lang3;3.3.2 in central found com.google.guava#guava;16.0.1 in central found org.joda#joda-convert;1.2 in central found joda-time#joda-time;2.3 in central found com.twitter#jsr166e;1.1.0 in central found org.scala-lang#scala-reflect;2.10.5 in central :: resolution report :: resolve 502ms :: artifacts dl 10ms :: modules in use: com.codahale.metrics#metrics-core;3.0.2 from central in [default] com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 from central in [default] com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 from central in [default] com.google.guava#guava;16.0.1 from central in [default] com.twitter#jsr166e;1.1.0 from central in [default] io.netty#netty-buffer;4.0.27.Final from central in [default] io.netty#netty-codec;4.0.27.Final from central in [default] io.netty#netty-common;4.0.27.Final from central in [default] io.netty#netty-handler;4.0.27.Final from central in [default] io.netty#netty-transport;4.0.27.Final from central in [default] joda-time#joda-time;2.3 from central in [default] org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default] org.apache.commons#commons-lang3;3.3.2 from central in [default] org.joda#joda-convert;1.2 from central in [default] org.scala-lang#scala-reflect;2.10.5 from central in [default] org.slf4j#slf4j-api;1.7.5 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 16 | 0 | 0 | 0 || 16 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 16 already retrieved (0kB/12ms) Welcome to ____ __ / __/__ ___ _____/ /__ _/ // _ // _ `/ __/ ''_/ /___/ .__//_,_/_/ /_//_/ version 1.5.2 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.8.0_66-internal) Type in expressions to have them evaluated. Type :help for more information. 15/12/10 17:38:46 WARN org.apache.spark.metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. Spark context available as sc. ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used 15/12/10 17:38:54 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/12/10 17:38:54 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. SQL context available as sqlContext.

Stack Trace al hacer la conexión inicial:

java.io.IOException: Failed to open native connection to Cassandra at {10.240.0.7}:9042 at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:51) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:146) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921) at org.apache.spark.rdd.RDD.count(RDD.scala:1125) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51) at $iwC$$iwC$$iwC.<init>(<console>:53) at $iwC$$iwC.<init>(<console>:55) at $iwC.<init>(<console>:57) at <init>(<console>:59) at .<init>(<console>:63) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:825) at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/Listenab leFuture; at com.datastax.driver.core.Connection.initAsync(Connection.java:178) at com.datastax.driver.core.Connection$Factory.open(Connection.java:742) at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:240) at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:187) at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1393) at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:402) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155) ... 70 more



Desafortunadamente, la dependencia de Hadoop en Guava 11 (que no menciona el método Futures.withFallback) es un problema de larga data y, de hecho, Hadoop 2.7.1 todavía depende de Guava 11 .

Spark core usa Guava 14, como se puede ver aquí, pero esto se soluciona sombreando la guayaba dentro del ensamblaje Spark:

$ jar tf /usr/lib/spark/lib/spark-assembly.jar | grep concurrent.Futures org/spark-project/guava/util/concurrent/Futures$1.class org/spark-project/guava/util/concurrent/Futures$2.class org/spark-project/guava/util/concurrent/Futures$3.class org/spark-project/guava/util/concurrent/Futures$4.class org/spark-project/guava/util/concurrent/Futures$5.class org/spark-project/guava/util/concurrent/Futures$6.class org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture$1.class org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture$1.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture$2.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1$1.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture.class org/spark-project/guava/util/concurrent/Futures$FutureCombiner.class org/spark-project/guava/util/concurrent/Futures$ImmediateCancelledFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFailedCheckedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFailedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulCheckedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulFuture.class org/spark-project/guava/util/concurrent/Futures$MappingCheckedFuture.class org/spark-project/guava/util/concurrent/Futures.class $ javap -cp /usr/lib/spark/lib/spark-assembly.jar org.spark-project.guava.util.concurrent.Futures Compiled from "Futures.java" public final class org.spark-project.guava.util.concurrent.Futures { public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> makeChecked(org.spark-project.guava.util.concurrent.ListenableFuture<V>, com.google.common.base.Function<java.lang.Exception, X>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFuture(V); public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateCheckedFuture(V); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateFailedFuture(java.lang.Throwable); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> immediateCancelledFuture(); public static <V, X extends java.lang.Exception> org.spark-project.guava.util.concurrent.CheckedFuture<V, X> immediateFailedCheckedFuture(X); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> withFallback(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>, org.spark-project.guava.util.concurrent.FutureFallback<? extends V>, java.util.concurrent.Executor); public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>); public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, org.spark-project.guava.util.concurrent.AsyncFunction<? super I, ? extends O>, java.util.concurrent.Executor); public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>); public static <I, O> org.spark-project.guava.util.concurrent.ListenableFuture<O> transform(org.spark-project.guava.util.concurrent.ListenableFuture<I>, com.google.common.base.Function<? super I, ? extends O>, java.util.concurrent.Executor); public static <I, O> java.util.concurrent.Future<O> lazyTransform(java.util.concurrent.Future<I>, com.google.common.base.Function<? super I, ? extends O>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<V> dereference(org.spark-project.guava.util.concurrent.ListenableFuture<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> allAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>...); public static <V> org.spark-project.guava.util.concurrent.ListenableFuture<java.util.List<V>> successfulAsList(java.lang.Iterable<? extends org.spark-project.guava.util.concurrent.ListenableFuture<? extends V>>); public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>); public static <V> void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture<V>, org.spark-project.guava.util.concurrent.FutureCallback<? super V>, java.util.concurrent.Executor); public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, java.lang.Class<X>) throws X; public static <V, X extends java.lang.Exception> V get(java.util.concurrent.Future<V>, long, java.util.concurrent.TimeUnit, java.lang.Class<X>) throws X; public static <V> V getUnchecked(java.util.concurrent.Future<V>); static {}; }

Puedes seguir las instrucciones aquí http://arjon.es/2015/10/12/making-hadoop-2-dot-6-plus-spark-cassandra-driver-play-nice-together/ para hacer también sombreado durante Compilacion. Con la chispa-capa puede ser capaz de salirse con algunos cambios en spark.driver.extraClassPath como se menciona aquí , aunque las colisiones pueden seguir surgiendo en varios puntos.


En caso de que esté usando gradle para crear un contenedor gordo, puede usar relocate en su shadowJar :

shadowJar{ zip64 true mergeServiceFiles() relocate ''com.google'', ''hidden.google'' }