scala apache-spark dataframe apache-spark-sql

scala - Causado por: java.lang.NullPointerException en org.apache.spark.sql.Dataset



apache-spark dataframe (2)

El problema es que intenta acceder a prodRows desde prodRows.foreach . No puede usar un marco de datos dentro de una transformación, los marcos de datos solo existen en el controlador.

A continuación proporciono mi código. I iterar sobre los marcos de datos prodRows y para cada product_PK encuentro alguna sublista coincidente de product_PKs de prodRows .

numRecProducts = 10 var listOfProducts: Map[Long,Array[(Long, Int)]] = Map() prodRows.foreach{ row : Row => val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong val gender = row.get(row.fieldIndex("gender_PK")).toString val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK") var productList: Array[(Long, Int)] = Array() if (!selection.rdd.isEmpty()) { productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect() } listOfProducts = listOfProducts + (product_PK -> productList) }

Pero cuando lo ejecuto, me da el siguiente error. Parece que la selection está vacía en algunas iteraciones. Sin embargo, no entiendo cómo puedo manejar este error:

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreach(RDD.scala:916) at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2325) at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325) at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2325) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823) at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324) at org.test.ComputeNumSim.run(ComputeNumSim.scala:69) at org.test.ComputeNumSimRunner$.main(ComputeNumSimRunner.scala:19) at org.test.ComputeNumSimRunner.main(ComputeNumSimRunner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635) Caused by: java.lang.NullPointerException at org.apache.spark.sql.Dataset.<init>(Dataset.scala:170) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:61) at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2877) at org.apache.spark.sql.Dataset.filter(Dataset.scala:1304) at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:74) at org.test.ComputeNumSim$$anonfun$run$1.apply(ComputeNumSim.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

¿Qué significa y cómo puedo manejarlo?


No puede acceder a ninguna de las abstracciones del "lado del conductor" de Spark (RDD, DataFrames, Datasets, SparkSession ...) desde una función que se transfiere a una de las transformaciones de DataFrame / RDD de Spark. Tampoco puede actualizar objetos mutables del lado del controlador desde estas funciones.

En su caso, está tratando de usar prodRows y selection (ambos son DataFrames) dentro de una función pasada a DataFrame.foreach . También está tratando de actualizar listOfProducts (una variable local del lado del controlador) desde esa misma función.

¿Por qué?

  • DataFrames, RDDs y SparkSession solo existen en su aplicación Driver. Sirven como un "identificador" para acceder a los datos distribuidos en el clúster de máquinas de trabajo.
  • Las funciones pasadas a las transformaciones RDD / DataFrame se serializan y envían a ese clúster, para ejecutarse en las particiones de datos en cada una de las máquinas de trabajo. Cuando los DataFrames / RDD serializados se deserializan en esas máquinas, son inútiles, todavía no pueden representar los datos en el clúster ya que son solo copias huecas de las creadas en la aplicación del controlador, que en realidad mantiene una conexión con el clúster maquinas
  • Por la misma razón, el intento de actualizar las variables del lado del conductor fallará: las variables (comenzando como vacías, en la mayoría de los casos) se serializarán, deserializarán en cada uno de los trabajadores, se actualizarán localmente en los trabajadores y permanecerán allí. la variable original del lado del conductor permanecerá sin cambios

¿Cómo puedes resolver esto? Cuando trabaje con Spark, especialmente con DataFrames, debe intentar evitar la "iteración" sobre los datos, y utilizar en su lugar las operaciones declarativas de DataFrame. En la mayoría de los casos, cuando desee hacer referencia a datos de otro DataFrame para cada registro en su DataFrame, querrá usar join para crear un nuevo DataFrame con registros que combinen datos de los dos DataFrames.

En este caso específico, aquí hay una solución más o menos equivalente que hace lo que estás tratando de hacer, si logré concluirla correctamente. Intente usar esto y lea la documentación de DataFrame para descubrir los detalles:

import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import spark.implicits._ val numRecProducts = 10 val result = prodRows.as("left") // self-join by gender: .join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX") // limit to 10 results per record: .withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK"))) .filter($"rn" <= numRecProducts).drop($"rn") // group and collect_list to create products column: .groupBy($"left.product_PK" as "product_PK") .agg(collect_list(struct($"right.product_PK", lit(1))) as "products")