Got the generic processor

This commit is contained in:
Julien Lengrand-Lambert
2020-03-09 20:50:38 +01:00
parent cc1c8da149
commit 2e372db8df
6 changed files with 126 additions and 48 deletions

View File

@@ -17,7 +17,9 @@ See [LICENSE](/LICENSE)
* `Database.connect("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1", driver = "org.h2.Driver", user = "root", password = "")`
* Use `?useSSL=false` to avoid SSL exceptions (for dev only!) on MySQL.
* `Database.connect("jdbc:mysql://localhost:3308/imdb?useSSL=false", driver = "com.mysql.jdbc.Driver", user = "root", password = "aRootPassword")`
* Use `rewriteBatchedStatements=true` when inserting large volumes of data to have your driver rewrite your query
* `.map` keeps stack of memory while `for` loop doesn't? I get a OME when running with map
## Author
* [Julien Lengrand-Lambert](https://github.com/jlengrand/)
* [Julien Lengrand-Lambert](https://github.com/jlengrand/)

View File

@@ -12,12 +12,4 @@ object TitleRatings : Table(){
val numVotes : Column<Int?> = integer("numVotes").nullable()
override val primaryKey = PrimaryKey(tconst, name = "tconst")
fun insertFromListString(values : List<String>){
TitleRatings.insert {
it[tconst] = values[0]
it[averageRating] = if (values[1] != NO_DATA) values[1].toFloat() else null
it[numVotes] = if (values[2] != NO_DATA) values[2].toInt() else null
}
}
}

View File

@@ -1,15 +1,49 @@
package loader
import dsl.TitleRatings
import dsl.Titles
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.statements.BatchInsertStatement
import org.jetbrains.exposed.sql.transactions.transaction
import tsv.Reader
import kotlin.system.measureTimeMillis
fun main(){
fun main() {
val time = measureTimeMillis() {
// var db = Database.connect("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1", driver = "org.h2.Driver", user = "root", password = "")
var db = Database.connect("jdbc:mysql://localhost:3308/imdb?useSSL=false&allowPublicKeyRetrieval=true", driver = "com.mysql.jdbc.Driver", user = "root", password = "aRootPassword")
println("Running loader")
// var db = Database.connect("jdbc:mysql://localhost:3308?useSSL=false&allowPublicKeyRetrieval=true&rewriteBatchedStatements=true", driver = "com.mysql.jdbc.Driver", user = "root", password = "aRootPassword")
var db = Database.connect(
"jdbc:mysql://localhost:3306?useSSL=false&allowPublicKeyRetrieval=true&rewriteBatchedStatements=true",
driver = "com.mysql.jdbc.Driver",
user = "root",
password = ""
)
val titleRatings = TitleRatingsLoader(db)
titleRatings.createTable()
titleRatings.loadData()
titleRatings.showSome()
transaction(db) { SchemaUtils.createDatabase("imdb") }
db = Database.connect(
"jdbc:mysql://localhost:3306/imdb?useSSL=false&allowPublicKeyRetrieval=true&rewriteBatchedStatements=true",
driver = "com.mysql.jdbc.Driver",
user = "root",
password = ""
)
// db = Database.connect("jdbc:mysql://localhost:3308/imdb?useSSL=false&allowPublicKeyRetrieval=true&rewriteBatchedStatements=true", driver = "com.mysql.jdbc.Driver", user = "root", password = "aRootPassword")
TableLoader.process(db, TitleRatings, "./datasets/title.ratings.tsv", 5, load())
}
println("Time was : ${time / 1000 / 60 } minutes ${time / 1000 % 60 } seconds")
}
fun load(): BatchInsertStatement.(String) -> Unit {
return {
val items = it.split("\t")
this[TitleRatings.tconst] = items[0]
this[TitleRatings.averageRating] = if (items[1] != Reader.NO_DATA) items[1].toFloat() else null
this[TitleRatings.numVotes] = if (items[2] != Reader.NO_DATA) items[2].toInt() else null
}
}

View File

@@ -0,0 +1,49 @@
package loader
import dsl.TitleRatings
import dsl.Titles
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.statements.BatchInsertStatement
import org.jetbrains.exposed.sql.transactions.transaction
import java.io.File
import tsv.Reader
fun BatchInsertStatement.loadItem(item: String){
val items = item.split("\t")
this[TitleRatings.tconst] = items[0]
this[TitleRatings.averageRating] = if (items[1] != Reader.NO_DATA) items[1].toFloat() else null
this[TitleRatings.numVotes] = if (items[2] != Reader.NO_DATA) items[2].toInt() else null
}
object TableLoader{
fun process(db : Database, table: Table, fileName: String, partitions: Int, operation: BatchInsertStatement.(String) -> Unit){
createTable(db, table)
loadData(db, table, fileName, partitions, operation)
}
private fun createTable(db : Database, table: Table){
transaction(db) { SchemaUtils.create (table) }
}
private fun loadData(db : Database, table: Table, fileName: String, partitions: Int, operation: BatchInsertStatement.(String) -> Unit){
val reader = File(fileName).bufferedReader()
reader.readLine() // pass headers
val lines = reader.readLines()
val partitionSize = lines.size / partitions
val listOfLists = lines.chunked(partitionSize)
var increment = 0
for(listOfList in listOfLists){
increment += 1
println("-------- $increment")
transaction(db) {
table.batchInsert(listOfList, body = operation)
}
}
}
}

View File

@@ -3,8 +3,10 @@ package loader
import dsl.Titles
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.batchInsert
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction
import tsv.Reader
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
@@ -22,12 +24,35 @@ class TitleBasicsLoader(private val db: Database) {
println("Title.Basics loaded")
val loader = AtomicInteger()
transaction {
nameBasicsReader.lines().forEach {
Titles.insertFromListString(it.split("\t"))
if (loader.incrementAndGet() % 100 == 0) println(it)
}
val lines = nameBasicsReader.readLines()
val partitions = lines.size / 50
println(partitions)
val listOfLists = lines.chunked(partitions)
for(listOfList in listOfLists){
println("Chugging list")
println("--------")
transaction(db) {
Titles.batchInsert(listOfList){
val items = it.split("\t")
this[Titles.tconst] = items[0]
this[Titles.titleType] = items[1]
this[Titles.primaryTitle] = items[2]
this[Titles.originalTitle] = items[3]
this[Titles.isAdult] = items[4].toBoolean()
this[Titles.startYear] = if (items[5] != Reader.NO_DATA) items[5].toInt() else null
this[Titles.endYear] = if (items[6] != Reader.NO_DATA) items[6].toInt() else null
this[Titles.runtimeMinutes] = if (items[7] != Reader.NO_DATA) items[7].toLong() else null
this[Titles.genres] = items[8]
if (loader.incrementAndGet() % 25000 == 0) println(items[0])
}
}
}
println("Done loading title basics!")

View File

@@ -1,10 +1,7 @@
package loader
import dsl.TitleRatings
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.batchInsert
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.transaction
import tsv.Reader
import java.io.File
@@ -17,41 +14,20 @@ class TitleRatingsLoader(private val db: Database) {
transaction(db) { SchemaUtils.create (TitleRatings) }
}
fun loadDataMultiInsert(){
val nameBasicsReader = File("./datasets/title.ratings.tsv").bufferedReader()
nameBasicsReader.readLine()
println("Title.Ratings loaded")
val loader = AtomicInteger()
transaction {
nameBasicsReader.lines().forEach {
TitleRatings.insertFromListString(it.split("\t"))
if (loader.incrementAndGet() % 10000 == 0) println(it)
}
}
println("Done loading title ratings!")
}
fun loadData(){
val nameBasicsReader = File("./datasets/title.ratings.tsv").bufferedReader()
nameBasicsReader.readLine()
println("Title.Ratings loaded")
val lines = nameBasicsReader.readLines()
transaction(db) {
val loader = AtomicInteger()
transaction {
val lines = nameBasicsReader.readLines()
TitleRatings.batchInsert(lines){
val items = it.split("\t")
this[TitleRatings.tconst] = items[0]
this[TitleRatings.averageRating] = if (items[1] != Reader.NO_DATA) items[1].toFloat() else null
this[TitleRatings.numVotes] = if (items[2] != Reader.NO_DATA) items[2].toInt() else null
if (loader.incrementAndGet() % 10000 == 0) println(it)
}
}
println("Done loading title ratings!")