-
Notifications
You must be signed in to change notification settings - Fork 51
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8b30078
commit 1a6ad72
Showing
14 changed files
with
662 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
apply(plugin = "kotlin") | ||
|
||
dependencies { | ||
implementation(Dependencies.guava) | ||
implementation(Dependencies.moshiCore) | ||
implementation(Dependencies.moshiKotlin) | ||
implementation(Dependencies.wireRuntime) | ||
implementation(Dependencies.gcpSpanner) | ||
implementation(Dependencies.guice) | ||
implementation(Dependencies.okio) | ||
implementation(Dependencies.kotlinStdLib) | ||
implementation(Dependencies.loggingApi) | ||
implementation(Dependencies.wireMoshiAdapter) | ||
|
||
api(project(":client")) | ||
// We do not want to leak client-base implementation details to customers. | ||
implementation(project(":client-base")) | ||
|
||
implementation(Dependencies.misk) | ||
implementation(Dependencies.miskActions) | ||
implementation(Dependencies.miskCore) | ||
implementation(Dependencies.miskGcp) | ||
implementation(Dependencies.miskInject) | ||
|
||
testImplementation(Dependencies.assertj) | ||
testImplementation(Dependencies.miskTesting) | ||
testImplementation(Dependencies.miskGcpTesting) | ||
testImplementation(project(":client-misk")) | ||
testImplementation(Dependencies.kotlinTest) | ||
testImplementation(Dependencies.junitEngine) | ||
testImplementation(Dependencies.okHttp) | ||
|
||
testImplementation(project(":backfila-embedded")) | ||
testImplementation(project(":client-testing")) | ||
} | ||
|
||
val jar by tasks.getting(Jar::class) { | ||
baseName = "backfila-client-misk-spanner" | ||
} | ||
|
||
if (rootProject.file("hooks.gradle").exists()) { | ||
apply(from = rootProject.file("hooks.gradle")) | ||
} | ||
|
||
apply(from = "$rootDir/gradle-mvn-publish.gradle") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
POM_ARTIFACT_ID=client-misk-spanner | ||
POM_NAME=client-misk-spanner | ||
POM_DESCRIPTION=Misk spanner backfila client backend implementation library | ||
POM_PACKAGING=jar |
35 changes: 35 additions & 0 deletions
35
client-misk-spanner/src/main/kotlin/app/cash/backfila/client/misk/spanner/SpannerBackfill.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package app.cash.backfila.client.misk.spanner | ||
|
||
import app.cash.backfila.client.Backfill | ||
import app.cash.backfila.client.BackfillConfig | ||
import com.google.cloud.spanner.DatabaseClient | ||
import com.google.cloud.spanner.KeyRange | ||
|
||
abstract class SpannerBackfill<Param : Any> : Backfill { | ||
/** | ||
* A previously established connection to the DB that owns the table. | ||
*/ | ||
abstract val dbClient: DatabaseClient | ||
|
||
/** | ||
* The name of the table to be used as the source of the backfill. | ||
*/ | ||
abstract val tableName: String | ||
|
||
/** | ||
* A list of names of columns that make up the table's primary keys. | ||
* Only primary key columns that are strings are supported. | ||
*/ | ||
abstract val primaryKeyColumns: List<String> | ||
|
||
/** | ||
* Override this and throw an exception to prevent the backfill from being created. | ||
* This is also a good place to do any prep work before batches are run. | ||
*/ | ||
open fun validate(config: BackfillConfig<Param>) {} | ||
|
||
/** | ||
* Run a backfill operation based on the provided range of primary keys from `tableName`. | ||
*/ | ||
abstract fun runBatch(range: KeyRange, config: BackfillConfig<Param>) | ||
} |
70 changes: 70 additions & 0 deletions
70
...sk-spanner/src/main/kotlin/app/cash/backfila/client/misk/spanner/SpannerBackfillModule.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package app.cash.backfila.client.misk.spanner | ||
|
||
import app.cash.backfila.client.spi.BackfillBackend | ||
import app.cash.backfila.client.misk.spanner.internal.SpannerBackend | ||
import com.google.inject.Binder | ||
import com.google.inject.Provides | ||
import com.google.inject.Singleton | ||
import com.google.inject.TypeLiteral | ||
import com.google.inject.multibindings.MapBinder | ||
import com.google.inject.multibindings.Multibinder | ||
import com.squareup.moshi.Moshi | ||
import com.squareup.moshi.kotlin.reflect.KotlinJsonAdapterFactory | ||
import javax.inject.Qualifier | ||
import kotlin.reflect.KClass | ||
import misk.inject.KAbstractModule | ||
import kotlin.reflect.jvm.jvmName | ||
|
||
class SpannerBackfillModule<T : SpannerBackfill<*>> private constructor( | ||
private val backfillClass: KClass<T> | ||
) : KAbstractModule() { | ||
override fun configure() { | ||
install(SpannerBackfillBackendModule) | ||
// Ensures that the backfill class is injectable. If you are failing this check you probably | ||
// want to add an @Inject annotation to your class or check that all of your dependencies are provided. | ||
binder().getProvider(backfillClass.java) | ||
mapBinder(binder()).addBinding(backfillClass.jvmName).toInstance(backfillClass) | ||
} | ||
|
||
companion object { | ||
inline fun <reified T : SpannerBackfill<*>> create(): SpannerBackfillModule<T> = | ||
create(T::class) | ||
|
||
@JvmStatic | ||
fun <T : SpannerBackfill<*>> create(backfillClass: KClass<T>): SpannerBackfillModule<T> { | ||
return SpannerBackfillModule(backfillClass) | ||
} | ||
|
||
@JvmStatic | ||
fun <T : SpannerBackfill<*>> create(backfillClass: Class<T>): SpannerBackfillModule<T> { | ||
return SpannerBackfillModule(backfillClass.kotlin) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* This is a kotlin object so these dependencies are only installed once. | ||
*/ | ||
private object SpannerBackfillBackendModule : KAbstractModule() { | ||
override fun configure() { | ||
Multibinder.newSetBinder(binder(), BackfillBackend::class.java).addBinding() | ||
.to(SpannerBackend::class.java) | ||
} | ||
|
||
@Provides @Singleton @ForSpannerBackend | ||
fun provideSpannerMoshi(): Moshi { | ||
return Moshi.Builder() | ||
.add(KotlinJsonAdapterFactory()) | ||
.build() | ||
} | ||
} | ||
|
||
private fun mapBinder(binder: Binder) = MapBinder.newMapBinder( | ||
binder, | ||
object : TypeLiteral<String>() {}, | ||
object : TypeLiteral<KClass<out SpannerBackfill<*>>>() {}, | ||
ForSpannerBackend::class.java | ||
) | ||
|
||
/** Annotation for specifying dependencies specifically for this Backend. */ | ||
@Qualifier annotation class ForSpannerBackend |
76 changes: 76 additions & 0 deletions
76
...-spanner/src/main/kotlin/app/cash/backfila/client/misk/spanner/internal/SpannerBackend.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package app.cash.backfila.client.misk.spanner.internal | ||
|
||
import app.cash.backfila.client.DeleteBy | ||
import app.cash.backfila.client.Description | ||
import app.cash.backfila.client.misk.spanner.ForSpannerBackend | ||
import app.cash.backfila.client.misk.spanner.SpannerBackfill | ||
import app.cash.backfila.client.parseDeleteByDate | ||
import app.cash.backfila.client.spi.BackfillBackend | ||
import app.cash.backfila.client.spi.BackfillOperator | ||
import app.cash.backfila.client.spi.BackfilaParametersOperator | ||
import app.cash.backfila.client.spi.BackfillRegistration | ||
import com.google.cloud.spanner.Spanner | ||
import com.google.inject.Injector | ||
import com.google.inject.TypeLiteral | ||
import com.squareup.moshi.Moshi | ||
import com.squareup.moshi.Types | ||
import java.lang.reflect.ParameterizedType | ||
import javax.inject.Inject | ||
import javax.inject.Singleton | ||
import kotlin.reflect.KClass | ||
import kotlin.reflect.full.findAnnotation | ||
|
||
@Singleton | ||
internal class SpannerBackend @Inject constructor( | ||
private val injector: Injector, | ||
@ForSpannerBackend private val backfills: MutableMap<String, KClass<out SpannerBackfill<*>>>, | ||
@ForSpannerBackend internal val moshi: Moshi, | ||
internal val spanner: Spanner, | ||
) : BackfillBackend { | ||
|
||
private fun getBackfill(name: String): SpannerBackfill<*>? { | ||
val backfillClass = backfills[name] ?: return null | ||
return injector.getInstance(backfillClass.java) as SpannerBackfill<*> | ||
} | ||
|
||
private fun <Param : Any> createSpannerOperator( | ||
backfill: SpannerBackfill<Param> | ||
) = SpannerBackfillOperator( | ||
backfill, | ||
BackfilaParametersOperator(parametersClass(backfill::class)), | ||
this, | ||
) | ||
|
||
override fun create(backfillName: String, backfillId: String): BackfillOperator? { | ||
val backfill = getBackfill(backfillName) | ||
|
||
if (backfill != null) { | ||
@Suppress("UNCHECKED_CAST") // We don't know the types statically, so fake them. | ||
return createSpannerOperator(backfill as SpannerBackfill<Any>) | ||
} | ||
|
||
return null | ||
} | ||
|
||
override fun backfills(): Set<BackfillRegistration> { | ||
return backfills.map { | ||
BackfillRegistration( | ||
name = it.key, | ||
description = it.value.findAnnotation<Description>()?.text, | ||
parametersClass = parametersClass(it.value as KClass<SpannerBackfill<Any>>), | ||
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(), | ||
) | ||
}.toSet() | ||
} | ||
|
||
private fun <T : Any> parametersClass(backfillClass: KClass<out SpannerBackfill<T>>): KClass<T> { | ||
// Like MyBackfill. | ||
val thisType = TypeLiteral.get(backfillClass.java) | ||
|
||
// Like Backfill<MyDataClass>. | ||
val supertype = thisType.getSupertype(SpannerBackfill::class.java).type as ParameterizedType | ||
|
||
// Like MyDataClass | ||
return (Types.getRawType(supertype.actualTypeArguments[0]) as Class<T>).kotlin | ||
} | ||
} |
117 changes: 117 additions & 0 deletions
117
...src/main/kotlin/app/cash/backfila/client/misk/spanner/internal/SpannerBackfillOperator.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package app.cash.backfila.client.misk.spanner.internal | ||
|
||
import app.cash.backfila.client.misk.spanner.SpannerBackfill | ||
import app.cash.backfila.client.spi.BackfillOperator | ||
import app.cash.backfila.client.spi.BackfilaParametersOperator | ||
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest | ||
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse | ||
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest | ||
import app.cash.backfila.protos.clientservice.PrepareBackfillResponse | ||
import app.cash.backfila.protos.clientservice.RunBatchRequest | ||
import app.cash.backfila.protos.clientservice.RunBatchResponse | ||
import com.google.cloud.spanner.Key | ||
import com.google.cloud.spanner.KeyRange | ||
import com.google.cloud.spanner.KeySet | ||
import com.squareup.moshi.Moshi | ||
import misk.moshi.adapter | ||
import okio.ByteString | ||
import okio.ByteString.Companion.encodeUtf8 | ||
|
||
class SpannerBackfillOperator<Param : Any> internal constructor( | ||
override val backfill: SpannerBackfill<Param>, | ||
private val parametersOperator: BackfilaParametersOperator<Param>, | ||
backend: SpannerBackend, | ||
) : BackfillOperator { | ||
private var moshi: Moshi = backend.moshi | ||
private val adapter = moshi.adapter<List<String>>() | ||
|
||
override fun name() = backfill.javaClass.toString() | ||
|
||
override fun prepareBackfill(request: PrepareBackfillRequest): PrepareBackfillResponse { | ||
val config = | ||
parametersOperator.constructBackfillConfig(request.parameters, request.dry_run) | ||
backfill.validate(config) | ||
|
||
val partitions = listOf( | ||
PrepareBackfillResponse.Partition.Builder() | ||
.backfill_range(request.range) | ||
.partition_name("partition") | ||
.build() | ||
) | ||
|
||
return PrepareBackfillResponse.Builder() | ||
.partitions(partitions) | ||
.build() | ||
} | ||
|
||
override fun getNextBatchRange(request: GetNextBatchRangeRequest): GetNextBatchRangeResponse { | ||
// Establish a range to scane - either we want to start at the first key, | ||
// or start from (and exclude) the last key that was scanned. | ||
val range = if (request.previous_end_key == null) { | ||
KeySet.all() | ||
} else { | ||
val previousEndKey = adapter.fromJson(request.previous_end_key.utf8())!! | ||
KeySet.range( | ||
KeyRange.openClosed( | ||
Key.of(*previousEndKey.toTypedArray()), | ||
Key.of(), | ||
) | ||
) | ||
} | ||
|
||
// Query the table with the desired range, only fetching the components of the primary key. | ||
val query = backfill.dbClient.singleUseReadOnlyTransaction() | ||
.read(backfill.tableName, range, backfill.primaryKeyColumns) | ||
|
||
val keys = mutableListOf<ByteString>() | ||
|
||
// For each result, until we reach the maximum scan size, create a key representation that | ||
// can be used to uniquely identify a result row. | ||
var numberToScan = request.scan_size | ||
while (numberToScan > 0 && query.next()) { | ||
val newKey = adapter.toJson( | ||
backfill.primaryKeyColumns.map { query.getString(it) } | ||
).encodeUtf8() | ||
keys.add(newKey) | ||
numberToScan -= 1 | ||
} | ||
query.close() | ||
|
||
// Return the starting and ending keys obtained from the scan. | ||
val batches = keys.chunked(request.batch_size.toInt()).map { | ||
GetNextBatchRangeResponse.Batch.Builder() | ||
.batch_range( | ||
app.cash.backfila.protos.clientservice.KeyRange.Builder() | ||
.start(it.first()) | ||
.end(it.last()) | ||
.build() | ||
) | ||
.scanned_record_count(it.size.toLong()) | ||
.matching_record_count(it.size.toLong()) | ||
.build() | ||
} | ||
|
||
return GetNextBatchRangeResponse.Builder() | ||
.batches(batches) | ||
.build() | ||
} | ||
|
||
override fun runBatch(request: RunBatchRequest): RunBatchResponse { | ||
val config = | ||
parametersOperator.constructBackfillConfig(request.parameters, request.dry_run) | ||
|
||
// Create a range that encompasses the batch's starting and ending keys. | ||
val startKey = adapter.fromJson(request.batch_range.start.utf8())!!.toTypedArray() | ||
val endKey = adapter.fromJson(request.batch_range.end.utf8())!!.toTypedArray() | ||
val keyRange = KeyRange.closedClosed( | ||
Key.of(*startKey), | ||
Key.of(*endKey), | ||
) | ||
|
||
// Let the backfill do whatever it wants for the given batch. | ||
backfill.runBatch(keyRange, config) | ||
|
||
return RunBatchResponse.Builder() | ||
.build() | ||
} | ||
} |
19 changes: 19 additions & 0 deletions
19
client-misk-spanner/src/test/kotlin/app/cash/backfila/client/misk/spanner/BackfillsModule.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package app.cash.backfila.client.misk.spanner | ||
|
||
import app.cash.backfila.client.BackfilaHttpClientConfig | ||
import app.cash.backfila.client.misk.MiskBackfillModule | ||
import misk.inject.KAbstractModule | ||
|
||
class BackfillsModule : KAbstractModule() { | ||
override fun configure() { | ||
install( | ||
MiskBackfillModule( | ||
BackfilaHttpClientConfig( | ||
url = "test.url", slack_channel = "#test" | ||
), | ||
dependsOn = listOf() | ||
) | ||
) | ||
install(SpannerBackfillModule.create<SpannerBackfillTest.MakeTracksExplicitBackfill>()) | ||
} | ||
} |
Oops, something went wrong.