java - serializar - Spark-Tarea no serializable: ¿Cómo trabajar con cierres complejos de mapas que llaman a clases/objetos externos?
serialización en netbeans (2)
¡Descubrí cómo hacerlo yo mismo!
Simplemente necesita serializar los objetos antes de pasar por el cierre y deserializarlos después. Este enfoque simplemente funciona, incluso si tus clases no son serializables, porque usa Kryo detrás de escena. Todo lo que necesitas es un poco de curry. ;)
Aquí hay un ejemplo de cómo lo hice:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
Siéntase libre de hacer Blah tan complicado como desee, clase, objeto complementario, clases anidadas, referencias a múltiples libs de terceros.
KryoSerializationWrapper hace referencia a: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
Eche un vistazo a esta pregunta: Scala + Spark - Tarea no serializable: java.io.NotSerializableExceptionon. Al llamar a la función fuera del cierre solo en clases, no en objetos .
Problema:
Supongamos que mis mapeadores pueden ser funciones (def) que llaman internamente a otras clases y crean objetos y hacen cosas diferentes dentro. (O incluso pueden ser clases que amplíen (Foo) => Bar y realicen el procesamiento en su método de aplicación, pero ignoremos este caso por el momento)
Spark solo admite la serialización Java para cierres. ¿Hay ALGUNA manera de salir de esto? ¿Podemos usar algo en lugar de cierres para hacer lo que quiero hacer? Podemos hacer fácilmente este tipo de cosas con Hadoop. Esta única cosa hace que Spark casi no se pueda usar para mí. ¡No se puede esperar que todas las bibliotecas de terceros tengan todas las clases ampliables!
Soluciones probables:
¿Algo como esto parece ser de alguna utilidad ?: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
Ciertamente parece que una envoltura es la respuesta, pero no puedo ver exactamente cómo.
En caso de utilizar la API de Java, debe evitar la clase anónima al pasar al cierre de la función de mapeo. En lugar de hacer un mapa (nueva Función) necesitas una clase que amplíe tu función y la pase al mapa (..) Ver: https://yanago.wordpress.com/2015/03/21/apache-spark/