scala apache-spark

scala - ¿Por qué este código Spark crea NullPointerException?



apache-spark (2)

No puede usar un Dataframe dentro de un udf . Deberá unirse a productInformation y dict , y hacer la lógica de udf después de la unión.

Tengo un problema al ejecutar una aplicación Spark.

Código fuente:

// Read table From HDFS val productInformation = spark.table("temp.temp_table1") val dict = spark.table("temp.temp_table2") // Custom UDF val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) => dict.filter( (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7 ).count ) val result = productInformation.withColumn("positive_count", countPositiveSimilarity($"title", $"internal_category")) // Error occurs! result.show

Mensaje de error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 54.0 failed 4 times, most recent failure: Lost task 0.3 in stage 54.0 (TID 5887, ip-10-211-220-33.ap-northeast-2.compute.internal, executor 150): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at $anonfun$1.apply(<console>:45) at $anonfun$1.apply(<console>:43) ... 16 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795) at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) at org.apache.spark.sql.Dataset.show(Dataset.scala:636) at org.apache.spark.sql.Dataset.show(Dataset.scala:595) at org.apache.spark.sql.Dataset.show(Dataset.scala:604) ... 48 elided Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) ... 3 more Caused by: java.lang.NullPointerException at $anonfun$1.apply(<console>:45) at $anonfun$1.apply(<console>:43) ... 16 more

He comprobado si productInformation y dict tienen un valor nulo en Columns . Pero no hay valores nulos.

¿Alguien puede ayudarme? Adjunto código de ejemplo para hacerle saber más detalles:

case class Target(wordListOne: Seq[String], WordListTwo: Seq[String]) val targetData = Seq(Target(Seq("Spark", "Wrong", "Something"), Seq("Java", "Grape", "Banana")), Target(Seq("Java", "Scala"), Seq("Scala", "Banana")), Target(Seq(""), Seq("Grape", "Banana")), Target(Seq(""), Seq(""))) val targets = spark.createDataset(targetData) case class WordSimilarity(first: String, second: String, similarity: Double) val similarityData = Seq(WordSimilarity("Spark", "Java", 0.8), WordSimilarity("Scala", "Spark", 0.9), WordSimilarity("Java", "Scala", 0.9), WordSimilarity("Apple", "Grape", 0.66), WordSimilarity("Scala", "Apple", -0.1), WordSimilarity("Gine", "Spark", 0.1)) val dict = spark.createDataset(similarityData) val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) => dict.filter( (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7 ).count ) val countDF = targets.withColumn("positive_count", countPositiveSimilarity($"wordListOne", $"wordListTwo"))

Este es un código de ejemplo y es similar a mi código original. El código de ejemplo funciona bien. ¿Qué punto debo verificar en el código y los datos originales?


Pregunta muy interesante Tengo que hacer un poco de búsqueda, y aquí está mi sin embargo. Espero que esto te ayude un poco.

Cuando crea un Dataset través de createDataset , spark asignará este conjunto de datos con el plan de consulta lógica LocalRelation .

def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = { val enc = encoderFor[T] val attributes = enc.schema.toAttributes val encoded = data.map(d => enc.toRow(d).copy()) val plan = new LocalRelation(attributes, encoded) Dataset[T](self, plan) }

Siga este link : LocalRelation is a leaf logical plan that allow functions like collect or take to be executed locally, ie without using Spark executors.

Y, es cierto como el método isLocal señala

/** * Returns true if the `collect` and `take` methods can be run locally * (without any Spark executors). * * @group basic * @since 1.6.0 */ def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]

Obviamente, puede verificar que sus 2 conjuntos de datos sean locales.

Y, el método show realmente llama a take internamente.

private[sql] def showString(_numRows: Int, truncate: Int = 20): String = { val numRows = _numRows.max(0) val takeResult = toDF().take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows)

Entonces, con esas pruebas, creo que la llamada countDF.show se ejecuta, se comportará de manera similar a como cuando se llama a count en el conjunto de datos dict del controlador , el número de veces que se llama es el número de registros de targets . Y, por supuesto, el conjunto de datos dict no necesita ser local para el programa en el trabajo countDF .

Puede intentar guardar countDF , le dará la misma excepción que el primer caso org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)