java - spark - Serializar RDD
spark group by (1)
Tengo un RDD que intento serializar y luego reconstruir deserializando. Estoy tratando de ver si esto es posible en Apache Spark.
static JavaSparkContext sc = new JavaSparkContext(conf);
static SerializerInstance si = SparkEnv.get().closureSerializer().newInstance();
static ClassTag<JavaRDD<String>> tag = scala.reflect.ClassTag$.MODULE$.apply(JavaRDD.class);
..
..
JavaRDD<String> rdd = sc.textFile(logFile, 4);
System.out.println("Element 1 " + rdd.first());
ByteBuffer bb= si.serialize(rdd, tag);
JavaRDD<String> rdd2 = si.deserialize(bb, Thread.currentThread().getContextClassLoader(),tag);
System.out.println(rdd2.partitions().size());
System.out.println("Element 0 " + rdd2.first());
Obtengo una excepción en la última línea cuando realizo una acción en el RDD recién creado. La forma en que estoy serializando es similar a cómo se hace internamente en Spark.
Exception in thread "main" org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
at org.apache.spark.rdd.RDD.take(RDD.scala:1177)
at org.apache.spark.rdd.RDD.first(RDD.scala:1189)
at org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:477)
at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32)
at SimpleApp.sparkSend(SimpleApp.java:63)
at SimpleApp.main(SimpleApp.java:91)
El RDD se crea y se carga dentro del mismo proceso, por lo que no entiendo cómo ocurre este error.
Soy el autor de este mensaje de advertencia.
Spark no admite la realización de acciones y transformaciones en copias de RDD creadas mediante deserialización. Los RDD se pueden serializar para que se puedan invocar ciertos métodos en ellos en los ejecutores, pero los usuarios finales no deben tratar de realizar manualmente la serialización de RDD.
Cuando un RDD se serializa, pierde su referencia al SparkContext que lo creó, lo que impide que se inicien trabajos con él (consulte aquí ). En versiones anteriores de Spark, su código daría como resultado una NullPointerException cuando Spark intentó acceder al campo privado, nulo RDD.sc
Este mensaje de error se redactó de esta manera porque los usuarios se encontraban frecuentemente confundiendo NullPointerExceptions cuando intentaban hacer cosas como rdd1.map { _ => rdd2.count() }
, lo que hacía que las acciones se invocaran en RDD deserializados en máquinas ejecutoras. No anticipé que alguien intente serializar / deserializar manualmente sus RDD en el controlador, por lo que puedo ver cómo este mensaje de error podría ser un poco engañoso.