Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Commit

Permalink
Add a SQL database backed data store
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Nov 26, 2018
1 parent c4fbd91 commit 375c700
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
id 'io.spring.dependency-management' version '1.0.6.RELEASE'
id 'com.github.hierynomus.license' version '0.14.0'
id 'com.jfrog.bintray' version '1.8.1'
id 'org.jetbrains.kotlin.jvm' version '1.3.0-rc-198'
id 'org.jetbrains.kotlin.jvm' version '1.3.10'
id 'org.jetbrains.dokka' version '0.9.17'
}

Expand Down
3 changes: 3 additions & 0 deletions dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ dependencyManagement {
dependency('com.google.errorprone:error_prone_core:2.3.2')
dependency('com.google.errorprone:javac:9+181-r4173-1')
dependency('com.google.guava:guava:27.0-jre')
dependency('com.h2database:h2:1.4.197')
dependency('com.jolbox:bonecp:0.8.0.RELEASE')
dependency('com.squareup.okhttp3:okhttp:3.11.0')
dependency('com.winterbe:expekt:0.5.0')
dependency('io.lettuce:lettuce-core:5.1.1.RELEASE')
Expand Down Expand Up @@ -33,6 +35,7 @@ dependencyManagement {
}
dependencySet(group: 'org.jetbrains.kotlinx', version: '1.0.1') {
entry 'kotlinx-coroutines-core'
entry 'kotlinx-coroutines-guava'
entry 'kotlinx-coroutines-jdk8'
}
dependencySet(group: 'org.jetbrains.spek', version: '1.1.5') {
Expand Down
1 change: 1 addition & 0 deletions gradle/check-licenses.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ downloadLicenses {
'Apache License Version 2.0',
'Apache License, Version 2.0',
'Apache Software Licenses',
'Apache v2',
'ASL, Version 2',
'The Apache License, Version 2.0',
'The Apache Software License, Version 2.0',
Expand Down
5 changes: 5 additions & 0 deletions kv/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ dependencies {
compile project(':bytes')
compile project(':concurrent-coroutines')
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-guava'
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8'
compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
compileOnly 'com.jolbox:bonecp'
compileOnly 'io.lettuce:lettuce-core'
compileOnly 'org.fusesource.leveldbjni:leveldbjni-all'
compileOnly 'org.mapdb:mapdb'

testCompile project(':concurrent')
testCompile project(':junit')
testCompile 'com.jolbox:bonecp'
testCompile 'com.github.kstyrc:embedded-redis'
testCompile 'com.h2database:h2'
testCompile 'com.winterbe:expekt'
testCompile 'io.lettuce:lettuce-core'
testCompile 'org.fusesource.leveldbjni:leveldbjni-all'
Expand Down
113 changes: 113 additions & 0 deletions kv/src/main/kotlin/net/consensys/cava/kv/SQLKeyValueStore.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package net.consensys.cava.kv

import com.jolbox.bonecp.BoneCP
import com.jolbox.bonecp.BoneCPConfig
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.guava.await
import kotlinx.coroutines.withContext
import net.consensys.cava.bytes.Bytes
import java.io.IOException

/**
* A key-value store backed by a relational database.
*
* @param jdbcurl The JDBC url to connect to the database.
* @param tableName the name of the table to use for storage.
* @param keyColumn the key column of the store.
* @param valueColumn the value column of the store.
* @param dispatcher The co-routine context for blocking tasks.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
* @constructor Open a relational database backed key-value store.
*/
class SQLKeyValueStore
@Throws(IOException::class)
constructor(
jdbcurl: String,
val tableName: String = "store",
val keyColumn: String = "key",
val valueColumn: String = "value",
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) : KeyValueStore {

companion object {
/**
* Open a relational database backed key-value store.
*
* @param jdbcUrl The JDBC url to connect to the database.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
fun open(jdbcUrl: String) = SQLKeyValueStore(jdbcUrl)

/**
* Open a relational database backed key-value store.
*
* @param jdbcUrl The JDBC url to connect to the database.
* @param tableName the name of the table to use for storage.
* @param keyColumn the key column of the store.
* @param valueColumn the value column of the store.
* @return A key-value store.
* @throws IOException If an I/O error occurs.
*/
@JvmStatic
@Throws(IOException::class)
fun open(jdbcUrl: String, tableName: String, keyColumn: String, valueColumn: String) =
SQLKeyValueStore(jdbcUrl, tableName, keyColumn, valueColumn)
}

private val connectionPool: BoneCP

init {
val config = BoneCPConfig()
config.jdbcUrl = jdbcurl

connectionPool = BoneCP(config)
}

override suspend fun get(key: Bytes): Bytes? = withContext(dispatcher) {
connectionPool.asyncConnection.await().use {
val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?")
stmt.setBytes(1, key.toArrayUnsafe())
stmt.execute()

val rs = stmt.resultSet

if (rs.next()) {
Bytes.wrap(rs.getBytes(1))
} else {
null
}
}
}

override suspend fun put(key: Bytes, value: Bytes) = withContext(dispatcher) {
connectionPool.asyncConnection.await().use {
val stmt = it.prepareStatement("INSERT INTO $tableName($keyColumn, $valueColumn) VALUES(?,?)")
stmt.setBytes(1, key.toArrayUnsafe())
stmt.setBytes(2, value.toArrayUnsafe())
stmt.execute()
Unit
}
}

/**
* Closes the underlying connection pool.
*/
override fun close() = connectionPool.shutdown()
}
56 changes: 56 additions & 0 deletions kv/src/test/kotlin/net/consensys/cava/kv/KeyValueStoreSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import java.nio.file.Files
import java.nio.file.Paths
import java.sql.DriverManager
import java.util.concurrent.RejectedExecutionException

object Vars {
val foo = Bytes.wrap("foo".toByteArray())!!
Expand Down Expand Up @@ -135,3 +138,56 @@ object LevelDBKeyValueStoreSpec : Spek({
}
}
})

object SQLKeyValueStoreSpec : Spek({
Files.deleteIfExists(Paths.get(System.getProperty("java.io.tmpdir"), "testdb.mv.db"))
Files.deleteIfExists(Paths.get(System.getProperty("java.io.tmpdir"), "testdb.trace.db"))
val jdbcUrl = "jdbc:h2:${System.getProperty("java.io.tmpdir")}/testdb"
DriverManager.getConnection(jdbcUrl).use {
val st = it.createStatement()
st.executeUpdate("create table store(key binary, value binary, primary key(key))")
st.executeUpdate("create table store2(id binary, val binary, primary key(id))")
}
val kv = SQLKeyValueStore(jdbcUrl)
val otherkv = SQLKeyValueStore.open(jdbcUrl, "store2", "id", "val")
afterGroup {
kv.close()
otherkv.close()
}
describe("a SQL-backed key value store") {

it("should allow to retrieve values") {
runBlocking {
kv.put(foobar, foo)
kv.get(foobar).should.equal(foo)
}
}

it("should allow to retrieve values when configured with a different table") {
runBlocking {
otherkv.put(foobar, foo)
otherkv.get(foobar).should.equal(foo)
}
}

it("should return an empty optional when no value is present") {
runBlocking {
kv.get(Bytes.wrap("foofoobar".toByteArray())).should.be.`null`
}
}

it("should not allow usage after the DB is closed") {
val kv2 = SQLKeyValueStore("jdbc:h2:mem:testdb")
kv2.close()
runBlocking {
var caught = false
try {
kv2.put(foobar, foo)
} catch (e: RejectedExecutionException) {
caught = true
}
caught.should.be.`true`
}
}
}
})

0 comments on commit 375c700

Please sign in to comment.