sparkcontext spark que org introduccion examples ejemplo datos batch apache-spark

que - ¿Cómo puedo ejecutar un shell Apache Spark de forma remota?



spark batch (3)

Tengo una configuración de clúster Spark con un maestro y 3 trabajadores. También tengo Spark instalado en una máquina virtual CentOS. Estoy intentando ejecutar un shell de Spark desde mi máquina virtual local que se conectaría al maestro y me permitiría ejecutar código Scala simple. Entonces, aquí está el comando que ejecuto en mi máquina virtual local:

bin/spark-shell --master spark://spark01:7077

El shell se ejecuta hasta el punto donde puedo ingresar el código Scala. Dice que a los ejecutores se les ha otorgado (x3 - uno por cada trabajador). Si miro la interfaz de usuario del Maestro, puedo ver una aplicación en ejecución, Spark shell . Todos los trabajadores están VIVOS, tienen 2/2 núcleos utilizados y han asignado 512 MB (de 5 GB) a la aplicación. Entonces, trato de ejecutar el siguiente código de Scala:

sc.parallelize(1 to 100).count

Desafortunadamente, el comando no funciona. La cáscara simplemente imprimirá la misma advertencia sin fin:

INFO SparkContext: Starting job: count at <console>:13 INFO DAGScheduler: Got job 0 (count at <console>:13) with 2 output partitions (allowLocal=false) INFO DAGScheduler: Final stage: Stage 0(count at <console>:13) with 2 output partitions (allowLocal=false) INFO DAGScheduler: Parents of final stage: List() INFO DAGScheduler: Missing parents: List() INFO DAGScheduler: Submitting Stage 0 (Parallel CollectionRDD[0] at parallelize at <console>:13), which has no missing parents INFO DAGScheduler: Submitting 2 missing tasts from Stage 0 (ParallelCollectionRDD[0] at parallelize at <console>:13) INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

Después de mi investigación sobre el problema, he confirmado que la URL maestra que estoy usando es idéntica a la de la interfaz de usuario web. Puedo hacer ping y ssh en ambos sentidos (agrupar a máquina virtual local, y viceversa). Además, he jugado con el parámetro de memoria de ejecutor (tanto el aumento como la disminución de la memoria) en vano. Finalmente, intenté deshabilitar el firewall (iptables) en ambos lados, pero sigo recibiendo el mismo error. Estoy usando Spark 1.0.2.

TL; DR ¿Es posible ejecutar un shell de Apache Spark de forma remota (y de forma inherente enviar aplicaciones de forma remota)? Si es así, ¿qué me estoy perdiendo?

EDITAR: Eché un vistazo a los registros de trabajadores y descubrí que los trabajadores tenían problemas para encontrar a Spark:

ERROR org.apache.spark.deploy.worker.ExecutorRunner: Error running executor java.io.IOException: Cannot run program "/usr/bin/spark-1.0.2/bin/compute-classpath.sh" (in directory "."): error=2, No such file or directory ...

Spark se instala en un directorio diferente en mi máquina virtual local que en el clúster. La ruta que el trabajador está tratando de encontrar es la de mi máquina virtual local. ¿Hay alguna manera de especificar este camino? ¿O deben ser idénticos en todas partes?

Por el momento, ajusté mis directorios para evitar este error. Ahora, mi Spark Shell falla antes de que tenga la oportunidad de ingresar el comando de conteo (el Master removed our application: FAILED ). Todos los trabajadores tienen el mismo error:

ERROR akka.remote.EndpointWriter: AssociationError [akka.tcp://sparkWorker@spark02:7078] -> [akka.tcp://sparkExecutor@spark02:53633]: Error [Association failed with [akka.tcp://sparkExecutor@spark02:53633]] [akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@spark02:53633] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$annon2: Connection refused: spark02/192.168.64.2:53633

Como se sospecha, estoy teniendo problemas de red. ¿Qué debo mirar ahora?


Este problema puede ser causado por la configuración de la red. Parece que el error TaskSchedulerImpl: Initial job has not accepted any resources puede tener varias causas (vea también esta respuesta ):

  • escasez real de recursos
  • Comunicación rota entre maestro y trabajadores.
  • Comunicación rota entre maestro / trabajadores y conductor.

La forma más sencilla de excluir las primeras posibilidades es ejecutar una prueba con un shell Spark ejecutándose directamente en el maestro. Si esto funciona, la comunicación del clúster dentro del propio clúster está bien y el problema se debe a la comunicación con el host del controlador. Para analizar más a fondo el problema, es útil examinar los registros de los trabajadores, que contienen entradas como

16/08/14 09:21:52 INFO ExecutorRunner: Launch command: "/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" ... "--driver-url" "spark://[email protected]:37752" ...

y pruebe si el trabajador puede establecer una conexión con el puerto / IP del conductor. Además de los problemas generales del firewall / reenvío de puertos, es posible que el controlador esté vinculado a la interfaz de red incorrecta. En este caso, puede exportar SPARK_LOCAL_IP en el controlador antes de iniciar el shell de Spark para enlazar a una interfaz diferente.

Algunas referencias adicionales:


Resuelvo este problema en mi cliente de chispa y grupo de chispas。

Compruebe su red, el cliente A puede hacer ping a clúster entre sí! Luego agregue la configuración de dos líneas en su spark-env.sh en el cliente A。

primero

export SPARK_MASTER_IP=172.100.102.156 export SPARK_JAR=/usr/spark-1.1.0-bin-hadoop2.4/lib/spark-assembly-1.1.0-hadoop2.4.0.jar

Segundo

Pon a prueba tu cáscara de chispa con el modo cluster


Yo sugeriría escribir un programa simple de Scala o Java haciendo un proyecto en su IDE.

Digamos que ha creado un proyecto llamado "simpleapp" que tiene una estructura de directorios como esta.

simpleapp - src/main/java - org.apache.spark.examples -SimpleApp.java - lib - dependent.jars (you can put all dependent jars inside lib directory) - target - simpleapp.jar (after compiling your source)

Cree objetos SparkConf y SparkContext en su "SimpleApp.java".

SparkConf conf = new SparkConf().setAppName(appName).setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf);

Crea un archivo JAR usando el siguiente comando. Puede encontrar el archivo SimpleApp.class en la carpeta "target / classes". CD a este directorio.

jar cfve file.jar SimpleApp.class

Coloque este archivo JAR en su proyecto en el directorio de destino. Este archivo JAR contiene la dependencia de su clase SimpleApp al enviar su trabajo a Spark. Ahora ve a tu directorio de chispas. Estoy usando spark-1.4.0-bin-hadoop2.6. Tu cmd se ve así.

spark-1.4.0-bin-hadoop2.6>

Inicie el maestro y el trabajador utilizando los siguientes comandos.

spark-1.4.0-bin-hadoop2.6> ./sbin/start-all.sh

Si esto no funciona, inicie maestro y esclavos por separado.

spark-1.4.0-bin-hadoop2.6> ./sbin/start-master.sh spark-1.4.0-bin-hadoop2.6> ./sbin/start-slaves.sh

Envíe su programa de chispa utilizando Spark Submit. Si tiene una estructura como la que expliqué, pase este argumento en clase.

--class org.apache.spark.examples.SimpleApp

más

--class SimpleApp

Finalmente envíe su programa de chispa.

spark-1.4.0-bin-hadoop2.6>./bin/spark-submit --class SimpleApp --master local[2] /home/hadoopnod/Spark_Java/target/file.jar