mysql - tutorial - Inserción masiva de Slick 3.0 o actualización
scala h2 (3)
¿Cuál es la forma correcta de hacer un insertOrUpdate masivo en Slick 3.0?
Estoy usando MySQL donde la consulta apropiada sería
INSERT INTO table (a,b,c) VALUES (1,2,3),(4,5,6)
ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);
MySQL a granel INSERTAR o ACTUALIZAR
Aquí está mi código actual que es muy lento :-(
// FIXME -- this is slow but will stop repeats, an insertOrUpdate
// functions for a list would be much better
val rowsInserted = rows.map {
row => await(run(TableQuery[FooTable].insertOrUpdate(row)))
}.sum
Lo que busco es el equivalente de
def insertOrUpdate(values: Iterable[U]): DriverAction[MultiInsertResult, NoStream, Effect.Write]
Como puede ver en los ejemplos de Slick , puede usar la función ++=
para insertar utilizando la función de inserción por lotes JDBC. Por instancia:
val foos = TableQuery[FooTable]
val rows: Seq[Foo] = ...
foos ++= rows // here slick will use batch insert
También puede "dimensionar" por lotes "agrupando" la secuencia de filas:
val batchSize = 1000
rows.grouped(batchSize).foreach { group => foos ++= group }
Hay varias maneras de hacer que este código sea más rápido (cada uno debería ser más rápido que los anteriores, pero se vuelve cada vez menos idiomático):
Ejecute
insertOrUpdateAll
lugar deinsertOrUpdate
si está en slick-pg 0.16.1+await(run(TableQuery[FooTable].insertOrUpdateAll rows)).sum
Ejecute sus eventos DBIO a la vez, en lugar de esperar a que se confirme cada uno antes de ejecutar el siguiente:
val toBeInserted = rows.map { row => TableQuery[FooTable].insertOrUpdate(row) } val inOneGo = DBIO.sequence(toBeInserted) val dbioFuture = run(inOneGo) // Optionally, you can add a `.transactionally` // and / or `.withPinnedSession` here to pin all of these upserts // to the same transaction / connection // which *may* get you a little more speed: // val dbioFuture = run(inOneGo.transactionally) val rowsInserted = await(dbioFuture).sum
Desplácese hasta el nivel JDBC y ejecute su upsert todo de una vez ( idea a través de esta respuesta ):
val SQL = """INSERT INTO table (a,b,c) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);""" SimpleDBIO[List[Int]] { session => val statement = session.connection.prepareStatement(SQL) rows.map { row => statement.setInt(1, row.a) statement.setInt(2, row.b) statement.setInt(3, row.c) statement.addBatch() } statement.executeBatch() }
usar sqlu
este trabajo de demostración
case ("insertOnDuplicateKey",answers:List[Answer])=>{
def buildInsert(r: Answer): DBIO[Int] =
sqlu"insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values (${r.aid},${r.bid},${r.sbid},${r.qid},${r.ups},${r.author},${r.uid},${r.nick},${r.pub_time},${r.content},${r.good},${r.hot},${r.id},${r.reply},${r.pic},${r.spider_time}) ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)"
val inserts: Seq[DBIO[Int]] = answers.map(buildInsert)
val combined: DBIO[Seq[Int]] = DBIO.sequence(inserts)
DEST_DB.run(combined).onComplete(data=>{
println("insertOnDuplicateKey data result",data.get.mkString)
if (data.isSuccess){
println(data.get)
val lastid=answers.last.id
Sync.lastActor !("upsert",tablename,lastid)
}else{
//retry
self !("insertOnDuplicateKey",answers)
}
})
}
y trato de usar sqlu en un solo sql pero el error tal vez sqlu no suministra la interpolación de cadenas
esta demostración no funciona
case ("insertOnDuplicateKeyError",answers:List[Answer])=>{
def buildSql(execpre:String,values: String,execafter:String): DBIO[Int] = sqlu"$execpre $values $execafter"
val execpre="insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values "
val execafter=" ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)"
val valuesstr=answers.map(row=>("("+List(row.aid,row.bid,row.sbid,row.qid,row.ups,"''"+row.author+"''","''"+row.uid+"''","''"+row.nick+"''","''"+row.pub_time+"''","''"+row.content+"''",row.good,row.hot,row.id,row.reply,row.pic,"''"+row.spider_time+"''").mkString(",")+")")).mkString(",/n")
val insertOrUpdateAction=DBIO.seq(
buildSql(execpre,valuesstr,execafter)
)
DEST_DB.run(insertOrUpdateAction).onComplete(data=>{
if (data.isSuccess){
println("insertOnDuplicateKey data result",data)
//retry
val lastid=answers.last.id
Sync.lastActor !("upsert",tablename,lastid)
}else{
self !("insertOnDuplicateKey2",answers)
}
})
}
una herramienta de sincronización de mysql con scala slick https://github.com/cclient/ScalaMysqlSync