scala - Cómo realizar una operación en cada ejecutor una vez en chispa.
apache-spark weka (2)
Tengo un modelo weka almacenado en S3 que es de alrededor de 400 MB. Ahora, tengo un conjunto de registros en el que quiero ejecutar el modelo y realizar la predicción.
Para realizar la predicción, lo que he intentado es,
Descargue y cargue el modelo en el controlador como un objeto estático, transmítalo a todos los ejecutores. Realizar una operación de mapa en la predicción RDD. ----> No funciona, como en Weka para realizar la predicción, el objeto modelo debe modificarse y la transmisión requiere una copia de solo lectura.
Descargue y cargue el modelo en el controlador como un objeto estático y envíelo al ejecutor en cada operación de mapa. -----> Trabajo (no eficiente, como en cada operación de mapa, estoy pasando un objeto de 400MB)
Descargue el modelo en el controlador, cárguelo en cada ejecutor y guárdelo allí. (No sé cómo hacerlo)
¿Alguien tiene alguna idea de cómo puedo cargar el modelo en cada ejecutor una vez y guardarlo en la memoria caché para que para otros registros no lo vuelva a cargar?
Esto es lo que funcionó para mí incluso mejor que el inicializador perezoso. Creé un puntero de nivel de objeto inicializado a nulo y dejé que cada ejecutor lo inicializara. En el bloque de inicialización puede tener un código de una sola ejecución. Tenga en cuenta que cada lote de procesamiento restablecerá las variables locales pero no las de nivel de objeto.
object Thing1 {
var bigObject : BigObject = null
def main(args: Array[String]) : Unit = {
val sc = <spark/scala magic here>
sc.textFile(infile).map(line => {
if (bigObject == null) {
// this takes a minute but runs just once
bigObject = new BigObject(parameters)
}
bigObject.transform(line)
})
}
}
Este enfoque crea exactamente un objeto grande por ejecutor, en lugar del único objeto grande por partición de otros enfoques.
Si coloca la variable bigObject: BigObject = null dentro del espacio de nombres de la función principal, se comporta de manera diferente. En ese caso, ejecuta el constructor bigObject al comienzo de cada partición (es decir, por lotes). Si tiene una pérdida de memoria, esto eventualmente matará al ejecutor. Recolección de basura también tendría que hacer más trabajo.
Tienes dos opciones:
1. Cree un objeto singleton con un valor perezoso que represente los datos:
object WekaModel {
lazy val data = {
// initialize data here. This will only happen once per JVM process
}
}
Luego, puedes usar el valor perezoso en tu función de map
. La lazy val
asegura que cada trabajador JVM inicialice su propia instancia de los datos. No se realizarán serializaciones ni difusiones de data
.
elementsRDD.map { element =>
// use WekaModel.data here
}
Ventajas
- es más eficiente, ya que le permite inicializar sus datos una vez por instancia de JVM. Este enfoque es una buena opción cuando se necesita inicializar un grupo de conexión de base de datos, por ejemplo.
Desventajas
- Menos control sobre la inicialización. Por ejemplo, es más difícil inicializar su objeto si necesita parámetros de tiempo de ejecución.
- Realmente no puedes liberar o liberar el objeto si lo necesitas. Por lo general, eso es aceptable, ya que el sistema operativo liberará los recursos cuando finalice el proceso.
2. Use el mapPartition
(o foreachPartition
) en el RDD en lugar de solo map
.
Esto le permite inicializar lo que necesite para la partición completa.
elementsRDD.mapPartition { elements =>
val model = new WekaModel()
elements.map { element =>
// use model and element. there is a single instance of model per partition.
}
}
Ventajas :
- Proporciona más flexibilidad en la inicialización y desinicialización de objetos.
Desventajas
- Cada partición creará e inicializará una nueva instancia de su objeto. Dependiendo de cuántas particiones tenga por instancia de JVM, puede o no ser un problema.