tutorial mysql sql scala slick typesafe

mysql - tutorial - Inserción masiva de Slick 3.0 o actualización



scala h2 (3)

¿Cuál es la forma correcta de hacer un insertOrUpdate masivo en Slick 3.0?

Estoy usando MySQL donde la consulta apropiada sería

INSERT INTO table (a,b,c) VALUES (1,2,3),(4,5,6) ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);

MySQL a granel INSERTAR o ACTUALIZAR

Aquí está mi código actual que es muy lento :-(

// FIXME -- this is slow but will stop repeats, an insertOrUpdate // functions for a list would be much better val rowsInserted = rows.map { row => await(run(TableQuery[FooTable].insertOrUpdate(row))) }.sum

Lo que busco es el equivalente de

def insertOrUpdate(values: Iterable[U]): DriverAction[MultiInsertResult, NoStream, Effect.Write]


Como puede ver en los ejemplos de Slick , puede usar la función ++= para insertar utilizando la función de inserción por lotes JDBC. Por instancia:

val foos = TableQuery[FooTable] val rows: Seq[Foo] = ... foos ++= rows // here slick will use batch insert

También puede "dimensionar" por lotes "agrupando" la secuencia de filas:

val batchSize = 1000 rows.grouped(batchSize).foreach { group => foos ++= group }


Hay varias maneras de hacer que este código sea más rápido (cada uno debería ser más rápido que los anteriores, pero se vuelve cada vez menos idiomático):

  • Ejecute insertOrUpdateAll lugar de insertOrUpdate si está en slick-pg 0.16.1+

    await(run(TableQuery[FooTable].insertOrUpdateAll rows)).sum

  • Ejecute sus eventos DBIO a la vez, en lugar de esperar a que se confirme cada uno antes de ejecutar el siguiente:

    val toBeInserted = rows.map { row => TableQuery[FooTable].insertOrUpdate(row) } val inOneGo = DBIO.sequence(toBeInserted) val dbioFuture = run(inOneGo) // Optionally, you can add a `.transactionally` // and / or `.withPinnedSession` here to pin all of these upserts // to the same transaction / connection // which *may* get you a little more speed: // val dbioFuture = run(inOneGo.transactionally) val rowsInserted = await(dbioFuture).sum

  • Desplácese hasta el nivel JDBC y ejecute su upsert todo de una vez ( idea a través de esta respuesta ):

    val SQL = """INSERT INTO table (a,b,c) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);""" SimpleDBIO[List[Int]] { session => val statement = session.connection.prepareStatement(SQL) rows.map { row => statement.setInt(1, row.a) statement.setInt(2, row.b) statement.setInt(3, row.c) statement.addBatch() } statement.executeBatch() }


usar sqlu

este trabajo de demostración

case ("insertOnDuplicateKey",answers:List[Answer])=>{ def buildInsert(r: Answer): DBIO[Int] = sqlu"insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values (${r.aid},${r.bid},${r.sbid},${r.qid},${r.ups},${r.author},${r.uid},${r.nick},${r.pub_time},${r.content},${r.good},${r.hot},${r.id},${r.reply},${r.pic},${r.spider_time}) ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)" val inserts: Seq[DBIO[Int]] = answers.map(buildInsert) val combined: DBIO[Seq[Int]] = DBIO.sequence(inserts) DEST_DB.run(combined).onComplete(data=>{ println("insertOnDuplicateKey data result",data.get.mkString) if (data.isSuccess){ println(data.get) val lastid=answers.last.id Sync.lastActor !("upsert",tablename,lastid) }else{ //retry self !("insertOnDuplicateKey",answers) } }) }

y trato de usar sqlu en un solo sql pero el error tal vez sqlu no suministra la interpolación de cadenas

esta demostración no funciona

case ("insertOnDuplicateKeyError",answers:List[Answer])=>{ def buildSql(execpre:String,values: String,execafter:String): DBIO[Int] = sqlu"$execpre $values $execafter" val execpre="insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values " val execafter=" ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)" val valuesstr=answers.map(row=>("("+List(row.aid,row.bid,row.sbid,row.qid,row.ups,"''"+row.author+"''","''"+row.uid+"''","''"+row.nick+"''","''"+row.pub_time+"''","''"+row.content+"''",row.good,row.hot,row.id,row.reply,row.pic,"''"+row.spider_time+"''").mkString(",")+")")).mkString(",/n") val insertOrUpdateAction=DBIO.seq( buildSql(execpre,valuesstr,execafter) ) DEST_DB.run(insertOrUpdateAction).onComplete(data=>{ if (data.isSuccess){ println("insertOnDuplicateKey data result",data) //retry val lastid=answers.last.id Sync.lastActor !("upsert",tablename,lastid) }else{ self !("insertOnDuplicateKey2",answers) } }) }

una herramienta de sincronización de mysql con scala slick https://github.com/cclient/ScalaMysqlSync