scala serialization apache-spark json4s

scala - Spark excepción no serializable al analizar JSON con json4s



serialization apache-spark (2)

Me he encontrado con un problema al intentar analizar a json en mi trabajo de chispa. Estoy usando la spark 1.1.0 , json4s y el Cassandra Spark Connector . La excepción lanzada es:

java.io.NotSerializableException: org.json4s.DefaultFormats

Al examinar el objeto complementario DefaultFormats, y con esta pregunta de la pila , está claro que los DefaultFormats no se pueden serializar. La pregunta ahora es qué hacer.

Puedo ver que este ticket aparentemente ha abordado este problema en la base del código de chispa, agregando la palabra clave transitoria, pero no estoy seguro exactamente cómo o dónde aplicarlo en mi caso. ¿La solución es solo instanciar la clase DefaultFormats en los ejecutores, para evitar la serialización todos juntos? ¿Hay otra biblioteca de análisis JSON para scala / spark que las personas estén usando? Inicialmente traté de usar jackson por sí mismo, pero encontré algunos errores con anotaciones que no pude resolver fácilmente, y json4s funcionó de la caja. Aquí está mi código:

import org.json4s._ import org.json4s.jackson.JsonMethods._ implicit val formats = DefaultFormats val count = rdd.map(r => checkUa(r._2, r._1)).reduce((x, y) => x + y)

Hago mi análisis json en la función checkUa. Traté de hacer que la cuenta fuera floja, con la esperanza de que retrasara la ejecución de alguna manera, pero no tuvo ningún efecto. ¿Quizás moviendo el val implícito dentro de la verificaciónUA? Cualquier consejo muy apreciado.


Esto ya fue respondido en un boleto abierto con json4s . La solución es poner la declaración implicit dentro de la función

val count = rdd .map(r => {implicit val formats = DefaultFormats; checkUa(r._2, r._1)}) .reduce((x, y) => x + y)


Tuve el mismo error cuando puse la forma implicit val formats = ... declaration dentro del método que contiene el análisis sintáctico, en lugar de declararlo en la clase (objeto).

Entonces esto arrojaría un error:

object Application { //... Lots of other code here, which eventually calls // setupStream(...) def setupStream(streamingContext: StreamingContext, brokers: String, topologyTopicName: String) = { implicit val formats = DefaultFormats _createDStream(streamingContext, brokers, topologyTopicName) // Remove the message key, which is always null in our case .map(_._2) .map((json: String) => parse(json).camelizeKeys .extract[Record[TopologyMetadata, Unused]]) .print() }

Pero esto estaría bien:

object Application { implicit val formats = DefaultFormats //... Lots of other code here, which eventually calls // setupStream(...) def setupStream(streamingContext: StreamingContext, brokers: String, topologyTopicName: String) = { _createDStream(streamingContext, brokers, topologyTopicName) // Remove the message key, which is always null in our case .map(_._2) .map((json: String) => parse(json).camelizeKeys .extract[Record[TopologyMetadata, Unused]]) .print() }