read - spark documentation java
Entendiendo los cierres de Spark y su serialización. (1)
Descargo de responsabilidad: acaba de empezar a jugar con Spark.
Tengo problemas para entender la famosa excepción "Tarea no serializable", pero mi pregunta es un poco diferente de las que veo en SO (o eso creo).
Tengo un pequeño RDD personalizado ( TestRDD
). Tiene un campo que almacena objetos cuya clase no implementa Serializable ( NonSerializable
). He configurado la opción de configuración "spark.serializer" para usar Kryo. Sin embargo, cuando intento count()
en mi RDD, obtengo lo siguiente:
Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: com.test.spark.NonSerializable@2901e052)
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)
Cuando miro dentro de DAGScheduler.submitMissingTasks
veo que usa su serializador de cierre en mi RDD, que es el serializador de Java, no el serializador de Kryo que yo esperaría. He leído que Kryo tiene problemas para serializar cierres y Spark siempre usa el serializador de Java para los cierres, pero no entiendo cómo los cierres entran en juego aquí. Todo lo que estoy haciendo aquí es esto:
SparkConf conf = new SparkConf()
.setAppName("ScanTest")
.setMaster("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaSparkContext sc = new JavaSparkContext(conf);
TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());
Es decir, no hay mapeadores ni nada que requiera la serialización de cierres. OTOH esto funciona:
sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()
El serializador Kryo se usa como se esperaba, el serializador de cierre no está involucrado. Si no estableciera la propiedad del serializador en Kryo, también obtendría una excepción aquí.
Aprecio cualquier puntero que explique de dónde proviene el cierre y cómo puedo asegurarme de que puedo usar Kryo para serializar RDD personalizados.
ACTUALIZACIÓN : aquí está TestRDD
con su mNS
campo no serializable:
class TestRDD extends RDD<String> {
private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);
NonSerializable mNS = new NonSerializable();
public TestRDD(final SparkContext _sc) {
super(_sc,
JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
STRING_TAG);
}
@Override
public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
"test_" + thePartition.index(),
"test_" + thePartition.index()).iterator()).asScala();
}
@Override
public Partition[] getPartitions() {
return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
}
static class TestPartition implements Partition {
final int mIndex;
public TestPartition(final int theIndex) {
mIndex = theIndex;
}
public int index() {
return mIndex;
}
}
}
Cuando miro dentro de
DAGScheduler.submitMissingTasks
veo que usa su serializador de cierre en mi RDD, que es el serializador de Java, no el serializador de Kryo que yo esperaría.
SparkEnv
admite dos serializadores, uno denominado serializer
que se usa para la serialización de sus datos, puntos de control, mensajes entre trabajadores, etc. y está disponible bajo el spark.serializer
configuración spark.serializer
. El otro se llama closureSerializer
bajo spark.closure.serializer
que se usa para verificar que su objeto sea de hecho serializable y configurable para Spark <= 1.6.2 (pero nada más que JavaSerializer
realmente funciona) y está codificado a partir de 2.0.0 y superior a JavaSerializer
.
El serializador de cierre Kryo tiene un error que lo hace inutilizable, puede ver ese error en SPARK-7708 (esto puede solucionarse con Kryo 3.0.0, pero Spark actualmente está arreglado con una versión específica de Chill que está corregida en Kryo 2.2). 1). Además, para Spark 2.0.x, el JavaSerializer ahora está arreglado en lugar de configurable (puede verlo en esta solicitud de extracción ). Esto significa que efectivamente estamos atrapados con el JavaSerializer
para la serialización del cierre.
¿Es extraño que estemos usando un serializador para enviar tareas y otro para serializar datos entre trabajadores y demás? Definitivamente, pero esto es lo que tenemos.
En resumen, si está configurando la configuración de spark.serializer
, o utilizando SparkContext.registerKryoClasses
, utilizará Kryo para la mayor parte de su serialización en Spark. Dicho esto, para verificar si una clase dada es serializable y la serialización de tareas para los trabajadores, Spark utilizará JavaSerializer
.