sirve serializacion que para deserializacion scala serialization apache-spark typesafe

scala - serializacion - Tarea no serializable: java.io.NotSerializableException cuando se llama a la funciĆ³n fuera del cierre solo en clases no en objetos



serializacion en java netbeans (6)

Obtener un comportamiento extraño al llamar a una función fuera de un cierre:

  • cuando la función está en un objeto, todo está funcionando
  • cuando la función está en una clase, obtén:

Tarea no serializable: java.io.NotSerializableException: testing

El problema es que necesito mi código en una clase y no en un objeto. ¿Alguna idea de por qué está pasando esto? ¿Se serializa un objeto de Scala (¿predeterminado?)?

Este es un ejemplo de código de trabajo:

object working extends App { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) //calling function outside closure val after = rddList.map(someFunc(_)) def someFunc(a:Int) = a+1 after.collect().map(println(_)) }

Este es el ejemplo que no funciona:

object NOTworking extends App { new testing().doIT } //adding extends Serializable wont help class testing { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) def doIT = { //again calling the fucntion someFunc val after = rddList.map(someFunc(_)) //this will crash (spark lazy) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 }


Charla completa explicando completamente el problema, que propone un gran cambio de paradigma para evitar estos problemas de serialización: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md

La respuesta más votado básicamente es sugerir tirar una característica de idioma completa, que ya no usa métodos y solo usa funciones. De hecho, en los métodos de programación funcional en las clases debe evitarse, pero convertirlos en funciones no es resolver el problema de diseño aquí (ver enlace anterior).

Como una solución rápida en esta situación particular, puede usar la anotación @transient para indicarle que no intente serializar el valor ofensivo (aquí, Spark.ctx es una clase personalizada, no Spark, que sigue a la denominación de OP):

@transient val rddList = Spark.ctx.parallelize(list)

También puede reestructurar el código para que rddList viva en otro lugar, pero eso también es desagradable.

El futuro es probablemente esporas

En el futuro, Scala incluirá estas cosas llamadas "esporas" que nos deberían permitir controlar el grano fino, lo que no se consigue con un cierre. Además, esto debería convertir todos los errores de la extracción accidental de tipos no serializables (o cualquier valor no deseado) en errores de compilación en lugar de ahora, que son excepciones / fugas de memoria horribles en el tiempo de ejecución.

http://docs.scala-lang.org/sips/pending/spores.html

Un consejo sobre la serialización de Kryo

Cuando use kyro, haga que el registro sea necesario, esto significará que obtendrá errores en lugar de fugas de memoria:

"Finalmente, sé que kryo tiene kryo.setRegistrationOptional (verdadero) pero estoy teniendo un momento muy difícil tratando de averiguar cómo usarlo. Cuando esta opción está activada, kryo todavía parece arrojar excepciones si no me he registrado. clases ".

Estrategia para registrar clases con kryo

Por supuesto, esto solo le otorga control de nivel de tipo, no control de nivel de valor.

... más ideas por venir.


Enfrenté un problema similar, y lo que entiendo de la respuesta de Grega es

object NOTworking extends App { new testing().doIT } //adding extends Serializable wont help class testing { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) def doIT = { //again calling the fucntion someFunc val after = rddList.map(someFunc(_)) //this will crash (spark lazy) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 }

su método doIT está intentando serializar algún método de Func (_) , pero como el método no es serializable, intenta serializar las pruebas de clase que de nuevo no son serializables.

Así que haga que su código funcione, debe definir algúnFunc dentro del método doIT . Por ejemplo:

def doIT = { def someFunc(a:Int) = a+1 //function definition } val after = rddList.map(someFunc(_)) after.collect().map(println(_)) }

Y si hay múltiples funciones que entran en la imagen, entonces todas esas funciones deberían estar disponibles para el contexto principal.


No creo que la otra respuesta sea del todo correcta. Los RDD son de hecho serializables , así que esto no es lo que está causando que su tarea falle.

Spark es un motor de computación distribuida y su abstracción principal es un conjunto de datos distribuidos ( RDD ) resiliente, que se puede ver como una colección distribuida. Básicamente, los elementos de RDD se dividen en particiones en los nodos del clúster, pero Spark los abstrae del usuario, lo que permite al usuario interactuar con el RDD (colección) como si fuera local.

Para no entrar en demasiados detalles, pero cuando ejecuta diferentes transformaciones en un RDD ( map , map flatMap , filter y otros), su código de transformación (cierre) es:

  1. serializado en el nodo del controlador,
  2. enviado a los nodos apropiados en el clúster,
  3. deserializado,
  4. y finalmente ejecutado en los nodos

Por supuesto, puede ejecutarlo localmente (como en su ejemplo), pero todas esas fases (además del envío a través de la red) aún ocurren. [Esto le permite detectar cualquier error incluso antes de implementar en la producción]

Lo que ocurre en su segundo caso es que está llamando a un método, definido en la testing clase desde dentro de la función de mapa. Spark ve eso y dado que los métodos no se pueden serializar por sí solos, Spark intenta serializar toda la clase de testing , de modo que el código seguirá funcionando cuando se ejecute en otra JVM. Tienes dos posibilidades:

O haces que las pruebas de clase sean serializables, por lo que toda la clase puede ser serializada por Spark:

import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test extends java.io.Serializable { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } def someFunc(a: Int) = a + 1 }

o realiza someFunc función de someFunc lugar de un método (las funciones son objetos en Scala), de modo que Spark pueda serializarlo:

import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } val someFunc = (a: Int) => a + 1 }

Similar, pero no el mismo problema con la serialización de clases, puede ser de su interés y puede leerlo en esta presentación de Spark Summit 2013 .

Como nota al margen, puede reescribir rddList.map(someFunc(_)) a rddList.map(someFunc) , son exactamente lo mismo. Por lo general, se prefiere el segundo ya que es menos detallado y más limpio de leer.

EDITAR (2015-03-15): SPARK-5307 introdujo SerializationDebugger y Spark 1.3.0 es la primera versión en usarlo. Agrega ruta de serialización a una NotSerializableException . Cuando se encuentra una NotSerializableException, el depurador visita el gráfico de objetos para encontrar la ruta hacia el objeto que no se puede serializar, y construye información para ayudar al usuario a encontrar el objeto.

En el caso de OP, esto es lo que se imprime en stdout:

Serialization stack: - object not serializable (class: testing, value: testing@2dfe2f00) - field (class: testing$$anonfun$1, name: $outer, type: class testing) - object (class testing$$anonfun$1, <function1>)


No estoy del todo seguro de que esto se aplique a Scala pero, en Java, resolví la NotSerializableException al NotSerializableException mi código para que el cierre no NotSerializableException acceso a un campo final no serializable.


Resolví este problema usando un enfoque diferente. Simplemente necesita serializar los objetos antes de pasar por el cierre y deserializarlos después. Este enfoque simplemente funciona, incluso si tus clases no son serializables, porque usa Kryo detrás de escena. Todo lo que necesitas es un poco de curry. ;)

Aquí hay un ejemplo de cómo lo hice:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) (foo: Foo) : Bar = { kryoWrapper.value.apply(foo) } val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _ rdd.flatMap(mapper).collectAsMap() object Blah(abc: ABC) extends (Foo => Bar) { def apply(foo: Foo) : Bar = { //This is the real function } }

Siéntase libre de hacer Blah tan complicado como desee, clase, objeto complementario, clases anidadas, referencias a múltiples libs de terceros.

KryoSerializationWrapper se refiere a: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala


La respuesta de Grega es excelente para explicar por qué el código original no funciona y dos formas de solucionar el problema. Sin embargo, esta solución no es muy flexible; considere el caso en que su cierre incluye una llamada a un método en una clase no Serializable la que no tiene control. No puede agregar la etiqueta Serializable a esta clase ni cambiar la implementación subyacente para cambiar el método en una función.

Nilesh presenta una gran solución para esto, pero la solución puede ser más concisa y general:

def genMapper[A, B](f: A => B): A => B = { val locker = com.twitter.chill.MeatLocker(f) x => locker.get.apply(x) }

Este serializador de funciones se puede usar para envolver automáticamente cierres y llamadas a métodos:

rdd map genMapper(someFunc)

Esta técnica también tiene el beneficio de no requerir las dependencias adicionales de Shark para acceder a KryoSerializationWrapper , ya que Twitter''s Chill ya está KryoSerializationWrapper por core Spark