Skip to content

Commit

Permalink
Add a Spanner backend.
Browse files Browse the repository at this point in the history
  • Loading branch information
jdm-square committed Aug 10, 2022
1 parent 8b30078 commit aaaebc0
Show file tree
Hide file tree
Showing 14 changed files with 653 additions and 0 deletions.
3 changes: 3 additions & 0 deletions buildSrc/src/main/kotlin/Dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ object Dependencies {
val aws2DynamodbEnhanced = "software.amazon.awssdk:dynamodb-enhanced:2.16.99"
val dokkaGradlePlugin = "org.jetbrains.dokka:dokka-gradle-plugin:1.5.0"
val flywayGradleBuildscriptDep = "gradle.plugin.com.boxfuse.client:flyway-release:5.0.2"
val gcpSpanner = "com.google.cloud:google-cloud-spanner:6.13.0"
val guava = "com.google.guava:guava:31.0.1-jre"
val guice = "com.google.inject:guice:5.1.0"
val jCommander = "com.beust:jcommander:1.72"
Expand Down Expand Up @@ -41,6 +42,8 @@ object Dependencies {
val miskAws2Dynamodb = "com.squareup.misk:misk-aws2-dynamodb:${Versions.misk}"
val miskAws2DynamodbTesting = "com.squareup.misk:misk-aws2-dynamodb-testing:${Versions.misk}"
val miskCore = "com.squareup.misk:misk-core:${Versions.misk}"
val miskGcp = "com.squareup.misk:misk-gcp:${Versions.misk}"
val miskGcpTesting = "com.squareup.misk:misk-gcp-testing:${Versions.misk}"
val miskHibernate = "com.squareup.misk:misk-hibernate:${Versions.misk}"
val miskHibernateTesting = "com.squareup.misk:misk-hibernate-testing:${Versions.misk}"
val miskJdbc = "com.squareup.misk:misk-jdbc:${Versions.misk}"
Expand Down
45 changes: 45 additions & 0 deletions client-misk-spanner/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apply(plugin = "kotlin")

dependencies {
implementation(Dependencies.guava)
implementation(Dependencies.moshiCore)
implementation(Dependencies.moshiKotlin)
implementation(Dependencies.wireRuntime)
implementation(Dependencies.gcpSpanner)
implementation(Dependencies.guice)
implementation(Dependencies.okio)
implementation(Dependencies.kotlinStdLib)
implementation(Dependencies.loggingApi)
implementation(Dependencies.wireMoshiAdapter)

api(project(":client"))
// We do not want to leak client-base implementation details to customers.
implementation(project(":client-base"))

implementation(Dependencies.misk)
implementation(Dependencies.miskActions)
implementation(Dependencies.miskCore)
implementation(Dependencies.miskGcp)
implementation(Dependencies.miskInject)

testImplementation(Dependencies.assertj)
testImplementation(Dependencies.miskTesting)
testImplementation(Dependencies.miskGcpTesting)
testImplementation(project(":client-misk"))
testImplementation(Dependencies.kotlinTest)
testImplementation(Dependencies.junitEngine)
testImplementation(Dependencies.okHttp)

testImplementation(project(":backfila-embedded"))
testImplementation(project(":client-testing"))
}

val jar by tasks.getting(Jar::class) {
baseName = "backfila-client-misk-spanner"
}

if (rootProject.file("hooks.gradle").exists()) {
apply(from = rootProject.file("hooks.gradle"))
}

apply(from = "$rootDir/gradle-mvn-publish.gradle")
4 changes: 4 additions & 0 deletions client-misk-spanner/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
POM_ARTIFACT_ID=client-misk-spanner
POM_NAME=client-misk-spanner
POM_DESCRIPTION=Misk spanner backfila client backend implementation library
POM_PACKAGING=jar
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package app.cash.backfila.client.misk.spanner

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import com.google.cloud.spanner.DatabaseClient
import com.google.cloud.spanner.KeyRange

abstract class SpannerBackfill<Param : Any> : Backfill {
/**
* A previously established connection to the DB that owns the table.
*/
abstract val dbClient: DatabaseClient

/**
* The name of the table to be used as the source of the backfill.
*/
abstract val tableName: String

/**
* A list of names of columns that make up the table's primary keys.
* Only primary key columns that are strings are supported.
*/
abstract val primaryKeyColumns: List<String>

/**
* Override this and throw an exception to prevent the backfill from being created.
* This is also a good place to do any prep work before batches are run.
*/
open fun validate(config: BackfillConfig<Param>) {}

/**
* Run a backfill operation based on the provided range of primary keys from `tableName`.
*/
abstract fun runBatch(range: KeyRange, config: BackfillConfig<Param>)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package app.cash.backfila.client.misk.spanner

import app.cash.backfila.client.spi.BackfillBackend
import app.cash.backfila.client.misk.spanner.internal.SpannerBackend
import com.google.inject.Binder
import com.google.inject.Provides
import com.google.inject.Singleton
import com.google.inject.TypeLiteral
import com.google.inject.multibindings.MapBinder
import com.google.inject.multibindings.Multibinder
import com.squareup.moshi.Moshi
import com.squareup.moshi.kotlin.reflect.KotlinJsonAdapterFactory
import javax.inject.Qualifier
import kotlin.reflect.KClass
import misk.inject.KAbstractModule
import kotlin.reflect.jvm.jvmName

class SpannerBackfillModule<T : SpannerBackfill<*>> private constructor(
private val backfillClass: KClass<T>
) : KAbstractModule() {
override fun configure() {
install(SpannerBackfillBackendModule)
// Ensures that the backfill class is injectable. If you are failing this check you probably
// want to add an @Inject annotation to your class or check that all of your dependencies are provided.
binder().getProvider(backfillClass.java)
mapBinder(binder()).addBinding(backfillClass.jvmName).toInstance(backfillClass)
}

companion object {
inline fun <reified T : SpannerBackfill<*>> create(): SpannerBackfillModule<T> =
create(T::class)

@JvmStatic
fun <T : SpannerBackfill<*>> create(backfillClass: KClass<T>): SpannerBackfillModule<T> {
return SpannerBackfillModule(backfillClass)
}

@JvmStatic
fun <T : SpannerBackfill<*>> create(backfillClass: Class<T>): SpannerBackfillModule<T> {
return SpannerBackfillModule(backfillClass.kotlin)
}
}
}

/**
* This is a kotlin object so these dependencies are only installed once.
*/
private object SpannerBackfillBackendModule : KAbstractModule() {
override fun configure() {
Multibinder.newSetBinder(binder(), BackfillBackend::class.java).addBinding()
.to(SpannerBackend::class.java)
}

@Provides @Singleton @ForSpannerBackend
fun provideSpannerMoshi(): Moshi {
return Moshi.Builder()
.add(KotlinJsonAdapterFactory())
.build()
}
}

private fun mapBinder(binder: Binder) = MapBinder.newMapBinder(
binder,
object : TypeLiteral<String>() {},
object : TypeLiteral<KClass<out SpannerBackfill<*>>>() {},
ForSpannerBackend::class.java
)

/** Annotation for specifying dependencies specifically for this Backend. */
@Qualifier annotation class ForSpannerBackend
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package app.cash.backfila.client.misk.spanner.internal

import app.cash.backfila.client.DeleteBy
import app.cash.backfila.client.Description
import app.cash.backfila.client.misk.spanner.ForSpannerBackend
import app.cash.backfila.client.misk.spanner.SpannerBackfill
import app.cash.backfila.client.parseDeleteByDate
import app.cash.backfila.client.spi.BackfillBackend
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.client.spi.BackfillRegistration
import com.google.cloud.spanner.Spanner
import com.google.inject.Injector
import com.google.inject.TypeLiteral
import com.squareup.moshi.Moshi
import com.squareup.moshi.Types
import java.lang.reflect.ParameterizedType
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.reflect.KClass
import kotlin.reflect.full.findAnnotation

@Singleton
internal class SpannerBackend @Inject constructor(
private val injector: Injector,
@ForSpannerBackend private val backfills: MutableMap<String, KClass<out SpannerBackfill<*>>>,
@ForSpannerBackend internal val moshi: Moshi,
internal val spanner: Spanner,
) : BackfillBackend {

private fun getBackfill(name: String): SpannerBackfill<*>? {
val backfillClass = backfills[name] ?: return null
return injector.getInstance(backfillClass.java) as SpannerBackfill<*>
}

private fun <Param : Any> createSpannerOperator(
backfill: SpannerBackfill<Param>
) = SpannerBackfillOperator(
backfill,
BackfilaParametersOperator(parametersClass(backfill::class)),
this,
)

override fun create(backfillName: String, backfillId: String): BackfillOperator? {
val backfill = getBackfill(backfillName)

if (backfill != null) {
@Suppress("UNCHECKED_CAST") // We don't know the types statically, so fake them.
return createSpannerOperator(backfill as SpannerBackfill<Any>)
}

return null
}

override fun backfills(): Set<BackfillRegistration> {
return backfills.map {
BackfillRegistration(
name = it.key,
description = it.value.findAnnotation<Description>()?.text,
parametersClass = parametersClass(it.value as KClass<SpannerBackfill<Any>>),
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
)
}.toSet()
}

private fun <T : Any> parametersClass(backfillClass: KClass<out SpannerBackfill<T>>): KClass<T> {
// Like MyBackfill.
val thisType = TypeLiteral.get(backfillClass.java)

// Like Backfill<MyDataClass>.
val supertype = thisType.getSupertype(SpannerBackfill::class.java).type as ParameterizedType

// Like MyDataClass
return (Types.getRawType(supertype.actualTypeArguments[0]) as Class<T>).kotlin
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package app.cash.backfila.client.misk.spanner.internal

import app.cash.backfila.client.misk.spanner.SpannerBackfill
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
import app.cash.backfila.protos.clientservice.PrepareBackfillResponse
import app.cash.backfila.protos.clientservice.RunBatchRequest
import app.cash.backfila.protos.clientservice.RunBatchResponse
import com.google.cloud.spanner.Key
import com.google.cloud.spanner.KeyRange
import com.google.cloud.spanner.KeySet
import com.squareup.moshi.Moshi
import misk.moshi.adapter
import okio.ByteString
import okio.ByteString.Companion.encodeUtf8

class SpannerBackfillOperator<Param : Any> internal constructor(
override val backfill: SpannerBackfill<Param>,
private val parametersOperator: BackfilaParametersOperator<Param>,
backend: SpannerBackend,
) : BackfillOperator {
private var moshi: Moshi = backend.moshi
private val adapter = moshi.adapter<List<String>>()

override fun name() = backfill.javaClass.toString()

override fun prepareBackfill(request: PrepareBackfillRequest): PrepareBackfillResponse {
val config =
parametersOperator.constructBackfillConfig(request.parameters, request.dry_run)
backfill.validate(config)

val partitions = listOf(
PrepareBackfillResponse.Partition.Builder()
.backfill_range(request.range)
.partition_name("partition")
.build()
)

return PrepareBackfillResponse.Builder()
.partitions(partitions)
.build()
}

override fun getNextBatchRange(request: GetNextBatchRangeRequest): GetNextBatchRangeResponse {
val range = if (request.previous_end_key == null) {
KeySet.all()
} else {
val previousEndKey = adapter.fromJson(request.previous_end_key.utf8())!!
KeySet.range(
KeyRange.openClosed(
Key.of(*previousEndKey.toTypedArray()),
Key.of(),
)
)
}

val query = backfill.dbClient.singleUseReadOnlyTransaction()
.read(backfill.tableName, range, backfill.primaryKeyColumns)

val keys = mutableListOf<ByteString>()
var numberToScan = request.scan_size
while (numberToScan > 0 && query.next()) {
val newKey = adapter.toJson(
backfill.primaryKeyColumns.map { query.getString(it) }
).encodeUtf8()
keys.add(newKey)
numberToScan -= 1
}
query.close()

val batches = keys.chunked(request.batch_size.toInt()).map {
GetNextBatchRangeResponse.Batch.Builder()
.batch_range(
app.cash.backfila.protos.clientservice.KeyRange.Builder()
.start(it.first())
.end(it.last())
.build()
)
.scanned_record_count(it.size.toLong())
.matching_record_count(it.size.toLong())
.build()
}

return GetNextBatchRangeResponse.Builder()
.batches(batches)
.build()
}

override fun runBatch(request: RunBatchRequest): RunBatchResponse {
val config =
parametersOperator.constructBackfillConfig(request.parameters, request.dry_run)

val startKey = adapter.fromJson(request.batch_range.start.utf8())!!.toTypedArray()
val endKey = adapter.fromJson(request.batch_range.end.utf8())!!.toTypedArray()
val keyRange = KeyRange.closedClosed(
Key.of(*startKey),
Key.of(*endKey),
)

backfill.runBatch(keyRange, config)

return RunBatchResponse.Builder()
.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package app.cash.backfila.client.misk.spanner

import app.cash.backfila.client.BackfilaHttpClientConfig
import app.cash.backfila.client.misk.MiskBackfillModule
import misk.inject.KAbstractModule

class BackfillsModule : KAbstractModule() {
override fun configure() {
install(
MiskBackfillModule(
BackfilaHttpClientConfig(
url = "test.url", slack_channel = "#test"
),
dependsOn = listOf()
)
)
install(SpannerBackfillModule.create<SpannerBackfillTest.MakeTracksExplicitBackfill>())
}
}
Loading

0 comments on commit aaaebc0

Please sign in to comment.