From 375c7004f8e2f3c5ad35eac5674341c4ecb5c50b Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Mon, 26 Nov 2018 13:02:29 -0800 Subject: [PATCH] Add a SQL database backed data store --- build.gradle | 2 +- dependency-versions.gradle | 3 + gradle/check-licenses.gradle | 1 + kv/build.gradle | 5 + .../net/consensys/cava/kv/SQLKeyValueStore.kt | 113 ++++++++++++++++++ .../consensys/cava/kv/KeyValueStoreSpec.kt | 56 +++++++++ 6 files changed, 179 insertions(+), 1 deletion(-) create mode 100644 kv/src/main/kotlin/net/consensys/cava/kv/SQLKeyValueStore.kt diff --git a/build.gradle b/build.gradle index 04248e69..f9d18b09 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ plugins { id 'io.spring.dependency-management' version '1.0.6.RELEASE' id 'com.github.hierynomus.license' version '0.14.0' id 'com.jfrog.bintray' version '1.8.1' - id 'org.jetbrains.kotlin.jvm' version '1.3.0-rc-198' + id 'org.jetbrains.kotlin.jvm' version '1.3.10' id 'org.jetbrains.dokka' version '0.9.17' } diff --git a/dependency-versions.gradle b/dependency-versions.gradle index 034217ba..a43bbf3e 100644 --- a/dependency-versions.gradle +++ b/dependency-versions.gradle @@ -6,6 +6,8 @@ dependencyManagement { dependency('com.google.errorprone:error_prone_core:2.3.2') dependency('com.google.errorprone:javac:9+181-r4173-1') dependency('com.google.guava:guava:27.0-jre') + dependency('com.h2database:h2:1.4.197') + dependency('com.jolbox:bonecp:0.8.0.RELEASE') dependency('com.squareup.okhttp3:okhttp:3.11.0') dependency('com.winterbe:expekt:0.5.0') dependency('io.lettuce:lettuce-core:5.1.1.RELEASE') @@ -33,6 +35,7 @@ dependencyManagement { } dependencySet(group: 'org.jetbrains.kotlinx', version: '1.0.1') { entry 'kotlinx-coroutines-core' + entry 'kotlinx-coroutines-guava' entry 'kotlinx-coroutines-jdk8' } dependencySet(group: 'org.jetbrains.spek', version: '1.1.5') { diff --git a/gradle/check-licenses.gradle b/gradle/check-licenses.gradle index 756aac47..a2f3cde9 100644 --- a/gradle/check-licenses.gradle +++ b/gradle/check-licenses.gradle @@ -74,6 +74,7 @@ downloadLicenses { 'Apache License Version 2.0', 'Apache License, Version 2.0', 'Apache Software Licenses', + 'Apache v2', 'ASL, Version 2', 'The Apache License, Version 2.0', 'The Apache Software License, Version 2.0', diff --git a/kv/build.gradle b/kv/build.gradle index 4dd9d097..c7ec6701 100644 --- a/kv/build.gradle +++ b/kv/build.gradle @@ -4,14 +4,19 @@ dependencies { compile project(':bytes') compile project(':concurrent-coroutines') compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core' + compile 'org.jetbrains.kotlinx:kotlinx-coroutines-guava' compile 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8' + compile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' + compileOnly 'com.jolbox:bonecp' compileOnly 'io.lettuce:lettuce-core' compileOnly 'org.fusesource.leveldbjni:leveldbjni-all' compileOnly 'org.mapdb:mapdb' testCompile project(':concurrent') testCompile project(':junit') + testCompile 'com.jolbox:bonecp' testCompile 'com.github.kstyrc:embedded-redis' + testCompile 'com.h2database:h2' testCompile 'com.winterbe:expekt' testCompile 'io.lettuce:lettuce-core' testCompile 'org.fusesource.leveldbjni:leveldbjni-all' diff --git a/kv/src/main/kotlin/net/consensys/cava/kv/SQLKeyValueStore.kt b/kv/src/main/kotlin/net/consensys/cava/kv/SQLKeyValueStore.kt new file mode 100644 index 00000000..f38f908e --- /dev/null +++ b/kv/src/main/kotlin/net/consensys/cava/kv/SQLKeyValueStore.kt @@ -0,0 +1,113 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.kv + +import com.jolbox.bonecp.BoneCP +import com.jolbox.bonecp.BoneCPConfig +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.guava.await +import kotlinx.coroutines.withContext +import net.consensys.cava.bytes.Bytes +import java.io.IOException + +/** + * A key-value store backed by a relational database. + * + * @param jdbcurl The JDBC url to connect to the database. + * @param tableName the name of the table to use for storage. + * @param keyColumn the key column of the store. + * @param valueColumn the value column of the store. + * @param dispatcher The co-routine context for blocking tasks. + * @return A key-value store. + * @throws IOException If an I/O error occurs. + * @constructor Open a relational database backed key-value store. + */ +class SQLKeyValueStore +@Throws(IOException::class) +constructor( + jdbcurl: String, + val tableName: String = "store", + val keyColumn: String = "key", + val valueColumn: String = "value", + private val dispatcher: CoroutineDispatcher = Dispatchers.IO +) : KeyValueStore { + + companion object { + /** + * Open a relational database backed key-value store. + * + * @param jdbcUrl The JDBC url to connect to the database. + * @return A key-value store. + * @throws IOException If an I/O error occurs. + */ + @JvmStatic + @Throws(IOException::class) + fun open(jdbcUrl: String) = SQLKeyValueStore(jdbcUrl) + + /** + * Open a relational database backed key-value store. + * + * @param jdbcUrl The JDBC url to connect to the database. + * @param tableName the name of the table to use for storage. + * @param keyColumn the key column of the store. + * @param valueColumn the value column of the store. + * @return A key-value store. + * @throws IOException If an I/O error occurs. + */ + @JvmStatic + @Throws(IOException::class) + fun open(jdbcUrl: String, tableName: String, keyColumn: String, valueColumn: String) = + SQLKeyValueStore(jdbcUrl, tableName, keyColumn, valueColumn) + } + + private val connectionPool: BoneCP + + init { + val config = BoneCPConfig() + config.jdbcUrl = jdbcurl + + connectionPool = BoneCP(config) + } + + override suspend fun get(key: Bytes): Bytes? = withContext(dispatcher) { + connectionPool.asyncConnection.await().use { + val stmt = it.prepareStatement("SELECT $valueColumn FROM $tableName WHERE $keyColumn = ?") + stmt.setBytes(1, key.toArrayUnsafe()) + stmt.execute() + + val rs = stmt.resultSet + + if (rs.next()) { + Bytes.wrap(rs.getBytes(1)) + } else { + null + } + } + } + + override suspend fun put(key: Bytes, value: Bytes) = withContext(dispatcher) { + connectionPool.asyncConnection.await().use { + val stmt = it.prepareStatement("INSERT INTO $tableName($keyColumn, $valueColumn) VALUES(?,?)") + stmt.setBytes(1, key.toArrayUnsafe()) + stmt.setBytes(2, value.toArrayUnsafe()) + stmt.execute() + Unit + } + } + + /** + * Closes the underlying connection pool. + */ + override fun close() = connectionPool.shutdown() +} diff --git a/kv/src/test/kotlin/net/consensys/cava/kv/KeyValueStoreSpec.kt b/kv/src/test/kotlin/net/consensys/cava/kv/KeyValueStoreSpec.kt index bc7bf717..b95c1676 100644 --- a/kv/src/test/kotlin/net/consensys/cava/kv/KeyValueStoreSpec.kt +++ b/kv/src/test/kotlin/net/consensys/cava/kv/KeyValueStoreSpec.kt @@ -24,6 +24,9 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import java.nio.file.Files +import java.nio.file.Paths +import java.sql.DriverManager +import java.util.concurrent.RejectedExecutionException object Vars { val foo = Bytes.wrap("foo".toByteArray())!! @@ -135,3 +138,56 @@ object LevelDBKeyValueStoreSpec : Spek({ } } }) + +object SQLKeyValueStoreSpec : Spek({ + Files.deleteIfExists(Paths.get(System.getProperty("java.io.tmpdir"), "testdb.mv.db")) + Files.deleteIfExists(Paths.get(System.getProperty("java.io.tmpdir"), "testdb.trace.db")) + val jdbcUrl = "jdbc:h2:${System.getProperty("java.io.tmpdir")}/testdb" + DriverManager.getConnection(jdbcUrl).use { + val st = it.createStatement() + st.executeUpdate("create table store(key binary, value binary, primary key(key))") + st.executeUpdate("create table store2(id binary, val binary, primary key(id))") + } + val kv = SQLKeyValueStore(jdbcUrl) + val otherkv = SQLKeyValueStore.open(jdbcUrl, "store2", "id", "val") + afterGroup { + kv.close() + otherkv.close() + } + describe("a SQL-backed key value store") { + + it("should allow to retrieve values") { + runBlocking { + kv.put(foobar, foo) + kv.get(foobar).should.equal(foo) + } + } + + it("should allow to retrieve values when configured with a different table") { + runBlocking { + otherkv.put(foobar, foo) + otherkv.get(foobar).should.equal(foo) + } + } + + it("should return an empty optional when no value is present") { + runBlocking { + kv.get(Bytes.wrap("foofoobar".toByteArray())).should.be.`null` + } + } + + it("should not allow usage after the DB is closed") { + val kv2 = SQLKeyValueStore("jdbc:h2:mem:testdb") + kv2.close() + runBlocking { + var caught = false + try { + kv2.put(foobar, foo) + } catch (e: RejectedExecutionException) { + caught = true + } + caught.should.be.`true` + } + } + } +})