performance - secretos - spark gt
¿Por qué Spark funciona peor cuando se usa la serialización de Kryo? (2)
Permití la serialización de Kryo para mi trabajo Spark, habilité la configuración para solicitar el registro y me aseguré de que todos mis tipos estuvieran registrados.
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
El rendimiento en el tiempo de Wallclock del trabajo empeoró en aproximadamente un 20% y la cantidad de bytes mezclados aumentó en casi un 400%.
Esto me parece realmente sorprendente, dada la sugerencia de la documentación de Spark de que Kryo debería ser mejor.
Kryo es significativamente más rápido y más compacto que la serialización de Java (a menudo tanto como 10 veces)
Invoqué manualmente el método de serialize
en instancias de Spark''s org.apache.spark.serializer.KryoSerializer
y org.apache.spark.serializer.JavaSerializer
con un ejemplo de mis datos. Los resultados fueron consistentes con las sugerencias en la documentación de Spark: Kryo produjo 98 bytes; Java produjo 993 bytes. Eso realmente es una mejora de 10x.
Un posible factor de confusión es que los objetos que se serializan y barajan implementan la interfaz Avro GenericRecord
. Intenté registrar los esquemas de Avro en SparkConf
, pero eso no mostró ninguna mejora.
Traté de hacer nuevas clases para barajar los datos, que eran simples case class
Scala, sin incluir ninguna de las máquinas de Avro. No mejoró el rendimiento de mezcla o el número de bytes intercambiados.
El código de Spark termina hirviendo a lo siguiente:
case class A(
f1: Long,
f2: Option[Long],
f3: Int,
f4: Int,
f5: Option[String],
f6: Option[Int],
f7: Option[String],
f8: Option[Int],
f9: Option[Int],
f10: Option[Int],
f11: Option[Int],
f12: String,
f13: Option[Double],
f14: Option[Int],
f15: Option[Double],
f16: Option[Double],
f17: List[String],
f18: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(f: Int) : AnyRef = ???
def put(f: Int, value: Any) : Unit = ???
def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
val SCHEMA$: org.apache.avro.Schema = ???
}
case class B(
f1: Long
f2: Long
f3: String
f4: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(field$ : Int) : AnyRef = ???
def getSchema() : org.apache.avro.Schema = B.SCHEMA$
def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
val SCHEMA$ : org.apache.avro.Schema = ???
}
def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
joined.map { case (_, asAndBs) => asAndBs }
}
¿Tiene alguna idea de lo que podría estar pasando o cómo podría obtener el mejor rendimiento que debería estar disponible desde Kryo?
Como tiene RDD de alta cardinalidad, la combinación de hash de transmisión / emisión parecería estar fuera de los límites por desgracia.
Lo mejor que puede hacer es fusionar () sus RDD antes de unirse. ¿Estás viendo una gran inclinación en tus tiempos de mezcla? Si es así, es posible que desee fusionarse con shuffle = true.
Por último, si tiene RDD de estructuras anidadas (por ejemplo, JSON), eso a veces le permitirá omitir las mezclas. Vea las diapositivas y / o videos aquí para una explicación más detallada.
Si su tamaño de registro individual es demasiado pequeño y tener una gran cantidad de registros puede hacer que su trabajo sea lento. Trate de aumentar el tamaño del búfer y ver si mejora.
Prueba el siguiente uno si no lo has hecho ya ...
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Now it''s 24 Mb of buffer by default instead of 0.064 Mb
.set("spark.kryoserializer.buffer.mb","24")
Ref: https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/