-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement support for Google Spanner #271
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
apply(plugin = "kotlin") | ||
|
||
dependencies { | ||
implementation(Dependencies.guava) | ||
implementation(Dependencies.moshiCore) | ||
implementation(Dependencies.moshiKotlin) | ||
implementation(Dependencies.wireRuntime) | ||
implementation(Dependencies.gcpSpanner) | ||
implementation(Dependencies.guice) | ||
implementation(Dependencies.okio) | ||
implementation(Dependencies.kotlinStdLib) | ||
implementation(Dependencies.loggingApi) | ||
implementation(Dependencies.wireMoshiAdapter) | ||
|
||
api(project(":client")) | ||
// We do not want to leak client-base implementation details to customers. | ||
implementation(project(":client-base")) | ||
|
||
implementation(Dependencies.misk) | ||
implementation(Dependencies.miskActions) | ||
implementation(Dependencies.miskCore) | ||
implementation(Dependencies.miskGcp) | ||
implementation(Dependencies.miskInject) | ||
|
||
testImplementation(Dependencies.assertj) | ||
testImplementation(Dependencies.miskTesting) | ||
testImplementation(Dependencies.miskGcpTesting) | ||
testImplementation(project(":client-misk")) | ||
testImplementation(Dependencies.kotlinTest) | ||
testImplementation(Dependencies.junitEngine) | ||
testImplementation(Dependencies.okHttp) | ||
|
||
testImplementation(project(":backfila-embedded")) | ||
testImplementation(project(":client-testing")) | ||
} | ||
|
||
val jar by tasks.getting(Jar::class) { | ||
baseName = "backfila-client-misk-spanner" | ||
} | ||
|
||
if (rootProject.file("hooks.gradle").exists()) { | ||
apply(from = rootProject.file("hooks.gradle")) | ||
} | ||
|
||
apply(from = "$rootDir/gradle-mvn-publish.gradle") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
POM_ARTIFACT_ID=client-misk-spanner | ||
POM_NAME=client-misk-spanner | ||
POM_DESCRIPTION=Misk spanner backfila client backend implementation library | ||
POM_PACKAGING=jar |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package app.cash.backfila.client.misk.spanner | ||
|
||
import app.cash.backfila.client.Backfill | ||
import app.cash.backfila.client.BackfillConfig | ||
import com.google.cloud.spanner.DatabaseClient | ||
import com.google.cloud.spanner.KeyRange | ||
|
||
abstract class SpannerBackfill<Param : Any> : Backfill { | ||
/** | ||
* A previously established connection to the DB that owns the table. | ||
*/ | ||
abstract val dbClient: DatabaseClient | ||
|
||
/** | ||
* The name of the table to be used as the source of the backfill. | ||
*/ | ||
abstract val tableName: String | ||
|
||
/** | ||
* A list of names of columns that make up the table's primary keys. | ||
* Only primary key columns that are strings are supported. | ||
*/ | ||
abstract val primaryKeyColumns: List<String> | ||
|
||
/** | ||
* Override this and throw an exception to prevent the backfill from being created. | ||
* This is also a good place to do any prep work before batches are run. | ||
*/ | ||
open fun validate(config: BackfillConfig<Param>) {} | ||
|
||
/** | ||
* Run a backfill operation based on the provided range of primary keys from `tableName`. | ||
*/ | ||
abstract fun runBatch(range: KeyRange, config: BackfillConfig<Param>) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package app.cash.backfila.client.misk.spanner | ||
|
||
import app.cash.backfila.client.spi.BackfillBackend | ||
import app.cash.backfila.client.misk.spanner.internal.SpannerBackend | ||
import com.google.inject.Binder | ||
import com.google.inject.Provides | ||
import com.google.inject.Singleton | ||
import com.google.inject.TypeLiteral | ||
import com.google.inject.multibindings.MapBinder | ||
import com.google.inject.multibindings.Multibinder | ||
import com.squareup.moshi.Moshi | ||
import com.squareup.moshi.kotlin.reflect.KotlinJsonAdapterFactory | ||
import javax.inject.Qualifier | ||
import kotlin.reflect.KClass | ||
import misk.inject.KAbstractModule | ||
import kotlin.reflect.jvm.jvmName | ||
|
||
class SpannerBackfillModule<T : SpannerBackfill<*>> private constructor( | ||
private val backfillClass: KClass<T> | ||
) : KAbstractModule() { | ||
override fun configure() { | ||
install(SpannerBackfillBackendModule) | ||
// Ensures that the backfill class is injectable. If you are failing this check you probably | ||
// want to add an @Inject annotation to your class or check that all of your dependencies are provided. | ||
binder().getProvider(backfillClass.java) | ||
mapBinder(binder()).addBinding(backfillClass.jvmName).toInstance(backfillClass) | ||
} | ||
|
||
companion object { | ||
inline fun <reified T : SpannerBackfill<*>> create(): SpannerBackfillModule<T> = | ||
create(T::class) | ||
|
||
@JvmStatic | ||
fun <T : SpannerBackfill<*>> create(backfillClass: KClass<T>): SpannerBackfillModule<T> { | ||
return SpannerBackfillModule(backfillClass) | ||
} | ||
|
||
@JvmStatic | ||
fun <T : SpannerBackfill<*>> create(backfillClass: Class<T>): SpannerBackfillModule<T> { | ||
return SpannerBackfillModule(backfillClass.kotlin) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* This is a kotlin object so these dependencies are only installed once. | ||
*/ | ||
private object SpannerBackfillBackendModule : KAbstractModule() { | ||
override fun configure() { | ||
Multibinder.newSetBinder(binder(), BackfillBackend::class.java).addBinding() | ||
.to(SpannerBackend::class.java) | ||
} | ||
|
||
@Provides @Singleton @ForSpannerBackend | ||
fun provideSpannerMoshi(): Moshi { | ||
return Moshi.Builder() | ||
.add(KotlinJsonAdapterFactory()) | ||
.build() | ||
} | ||
} | ||
|
||
private fun mapBinder(binder: Binder) = MapBinder.newMapBinder( | ||
binder, | ||
object : TypeLiteral<String>() {}, | ||
object : TypeLiteral<KClass<out SpannerBackfill<*>>>() {}, | ||
ForSpannerBackend::class.java | ||
) | ||
|
||
/** Annotation for specifying dependencies specifically for this Backend. */ | ||
@Qualifier annotation class ForSpannerBackend |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package app.cash.backfila.client.misk.spanner.internal | ||
|
||
import app.cash.backfila.client.DeleteBy | ||
import app.cash.backfila.client.Description | ||
import app.cash.backfila.client.misk.spanner.ForSpannerBackend | ||
import app.cash.backfila.client.misk.spanner.SpannerBackfill | ||
import app.cash.backfila.client.parseDeleteByDate | ||
import app.cash.backfila.client.spi.BackfillBackend | ||
import app.cash.backfila.client.spi.BackfillOperator | ||
import app.cash.backfila.client.spi.BackfilaParametersOperator | ||
import app.cash.backfila.client.spi.BackfillRegistration | ||
import com.google.cloud.spanner.Spanner | ||
import com.google.inject.Injector | ||
import com.google.inject.TypeLiteral | ||
import com.squareup.moshi.Moshi | ||
import com.squareup.moshi.Types | ||
import java.lang.reflect.ParameterizedType | ||
import javax.inject.Inject | ||
import javax.inject.Singleton | ||
import kotlin.reflect.KClass | ||
import kotlin.reflect.full.findAnnotation | ||
|
||
@Singleton | ||
internal class SpannerBackend @Inject constructor( | ||
jdm-square marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private val injector: Injector, | ||
@ForSpannerBackend private val backfills: MutableMap<String, KClass<out SpannerBackfill<*>>>, | ||
@ForSpannerBackend internal val moshi: Moshi, | ||
internal val spanner: Spanner, | ||
) : BackfillBackend { | ||
|
||
private fun getBackfill(name: String): SpannerBackfill<*>? { | ||
val backfillClass = backfills[name] ?: return null | ||
return injector.getInstance(backfillClass.java) as SpannerBackfill<*> | ||
} | ||
|
||
private fun <Param : Any> createSpannerOperator( | ||
backfill: SpannerBackfill<Param> | ||
) = SpannerBackfillOperator( | ||
backfill, | ||
BackfilaParametersOperator(parametersClass(backfill::class)), | ||
this, | ||
) | ||
|
||
override fun create(backfillName: String, backfillId: String): BackfillOperator? { | ||
val backfill = getBackfill(backfillName) | ||
|
||
if (backfill != null) { | ||
@Suppress("UNCHECKED_CAST") // We don't know the types statically, so fake them. | ||
return createSpannerOperator(backfill as SpannerBackfill<Any>) | ||
} | ||
|
||
return null | ||
} | ||
|
||
override fun backfills(): Set<BackfillRegistration> { | ||
return backfills.map { | ||
BackfillRegistration( | ||
name = it.key, | ||
description = it.value.findAnnotation<Description>()?.text, | ||
parametersClass = parametersClass(it.value as KClass<SpannerBackfill<Any>>), | ||
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(), | ||
) | ||
}.toSet() | ||
} | ||
|
||
private fun <T : Any> parametersClass(backfillClass: KClass<out SpannerBackfill<T>>): KClass<T> { | ||
// Like MyBackfill. | ||
val thisType = TypeLiteral.get(backfillClass.java) | ||
|
||
// Like Backfill<MyDataClass>. | ||
val supertype = thisType.getSupertype(SpannerBackfill::class.java).type as ParameterizedType | ||
|
||
// Like MyDataClass | ||
return (Types.getRawType(supertype.actualTypeArguments[0]) as Class<T>).kotlin | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package app.cash.backfila.client.misk.spanner.internal | ||
|
||
import app.cash.backfila.client.misk.spanner.SpannerBackfill | ||
import app.cash.backfila.client.spi.BackfillOperator | ||
import app.cash.backfila.client.spi.BackfilaParametersOperator | ||
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest | ||
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse | ||
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest | ||
import app.cash.backfila.protos.clientservice.PrepareBackfillResponse | ||
import app.cash.backfila.protos.clientservice.RunBatchRequest | ||
import app.cash.backfila.protos.clientservice.RunBatchResponse | ||
import com.google.cloud.spanner.Key | ||
import com.google.cloud.spanner.KeyRange | ||
import com.google.cloud.spanner.KeySet | ||
import com.squareup.moshi.Moshi | ||
import misk.moshi.adapter | ||
import okio.ByteString | ||
import okio.ByteString.Companion.encodeUtf8 | ||
|
||
class SpannerBackfillOperator<Param : Any> internal constructor( | ||
override val backfill: SpannerBackfill<Param>, | ||
private val parametersOperator: BackfilaParametersOperator<Param>, | ||
backend: SpannerBackend, | ||
) : BackfillOperator { | ||
private var moshi: Moshi = backend.moshi | ||
private val adapter = moshi.adapter<List<String>>() | ||
|
||
override fun name() = backfill.javaClass.toString() | ||
|
||
override fun prepareBackfill(request: PrepareBackfillRequest): PrepareBackfillResponse { | ||
val config = | ||
parametersOperator.constructBackfillConfig(request.parameters, request.dry_run) | ||
backfill.validate(config) | ||
|
||
val partitions = listOf( | ||
PrepareBackfillResponse.Partition.Builder() | ||
.backfill_range(request.range) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you requiring range to be passed in? In other implementations we compute the ranges if you don't pass it in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The range is actually completely ignored. Spanner is unlike many other DBs, where for optimal performance primary keys really can't be in anything like a monotonic increasing range. I don't know how to compute a range without doing a full table scan, which seems... suboptimal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can't ask for min/max primary key value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Primary keys are often random values like UUIDs and unordered for optimal performance. Min/max aren't valid concepts, as far as I can tell. Source: https://cloud.google.com/spanner/docs/schema-design#primary-key-prevent-hotspots There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backfila requires ordered key values to operate. I'm curious how you would use it if that's not the case. I haven't used spanner but my understanding was its ordered, you just want to avoid sequential writes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And to answer the original question - we don’t require a range to be passed in. That’s optional. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I'm well aware how primary key design works in spanner, and you can have items added within the range. That's true even in auto increment, technically. It doesn't matter since the expectation is you are inserting new items that don't need backfilling. It sounds like you are able to just ask spanner for records and it will give in some order, that should be fine I guess There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't this work like dynamo backfills? Dynamo is somewhat different, but has a scan mechanism we use, and I believe we don't do ranges on it either? You could check that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean that you will essentially run your backfill single threaded? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So there must be some distributed way to process the whole data set in bulk? In Dynamo it is this idea of segments. |
||
.partition_name("partition") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer something like |
||
.build() | ||
) | ||
|
||
return PrepareBackfillResponse.Builder() | ||
.partitions(partitions) | ||
.build() | ||
} | ||
|
||
override fun getNextBatchRange(request: GetNextBatchRangeRequest): GetNextBatchRangeResponse { | ||
// Establish a range to scan - either we want to start at the first key, | ||
// or start from (and exclude) the last key that was scanned. | ||
val range = if (request.previous_end_key == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we're not using the backfill_range at all, that's what would be passed in by the user (or I missed it somewhere) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. If I'm not mistaken, the DynamoDB backend also ignores it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DynamoDb is pretty limited because of dynamo itself, the hibernate one is pretty good to copy from. Obviously, build whatever features you want, I won't be using it :P There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need some guarantees around the end key otherwise you may be missing items, no? This was tricky with DynamoDb as well. We figured out some optimizations but since they weren't really documented we didn't add those to the client. In Dynamo we split up by segment but then don't complete the "batch" until the range is completed. Maybe Google has some better guarantees? |
||
KeySet.all() | ||
} else { | ||
val previousEndKey = adapter.fromJson(request.previous_end_key.utf8())!! | ||
KeySet.range( | ||
KeyRange.openClosed( | ||
Key.of(*previousEndKey.toTypedArray()), | ||
Key.of(), | ||
) | ||
) | ||
} | ||
|
||
// Query the table with the desired range, only fetching the components of the primary key. | ||
val query = backfill.dbClient.singleUseReadOnlyTransaction() | ||
.read(backfill.tableName, range, backfill.primaryKeyColumns) | ||
jdm-square marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
val keys = mutableListOf<ByteString>() | ||
|
||
// For each result, until we reach the maximum scan size, create a key representation that | ||
// can be used to uniquely identify a result row. | ||
var numberToScan = request.scan_size | ||
while (numberToScan > 0 && query.next()) { | ||
val newKey = adapter.toJson( | ||
backfill.primaryKeyColumns.map { query.getString(it) } | ||
).encodeUtf8() | ||
keys.add(newKey) | ||
numberToScan -= 1 | ||
} | ||
query.close() | ||
|
||
// Return the starting and ending keys obtained from the scan. | ||
val batches = keys.chunked(request.batch_size.toInt()).map { | ||
GetNextBatchRangeResponse.Batch.Builder() | ||
.batch_range( | ||
app.cash.backfila.protos.clientservice.KeyRange.Builder() | ||
.start(it.first()) | ||
.end(it.last()) | ||
.build() | ||
) | ||
.scanned_record_count(it.size.toLong()) | ||
.matching_record_count(it.size.toLong()) | ||
.build() | ||
} | ||
|
||
return GetNextBatchRangeResponse.Builder() | ||
.batches(batches) | ||
.build() | ||
} | ||
|
||
override fun runBatch(request: RunBatchRequest): RunBatchResponse { | ||
val config = | ||
parametersOperator.constructBackfillConfig(request.parameters, request.dry_run) | ||
|
||
// Create a range that encompasses the batch's starting and ending keys. | ||
val startKey = adapter.fromJson(request.batch_range.start.utf8())!!.toTypedArray() | ||
val endKey = adapter.fromJson(request.batch_range.end.utf8())!!.toTypedArray() | ||
val keyRange = KeyRange.closedClosed( | ||
Key.of(*startKey), | ||
Key.of(*endKey), | ||
) | ||
|
||
// Let the backfill do whatever it wants for the given batch. | ||
backfill.runBatch(keyRange, config) | ||
|
||
return RunBatchResponse.Builder() | ||
.build() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package app.cash.backfila.client.misk.spanner | ||
|
||
import app.cash.backfila.client.BackfilaHttpClientConfig | ||
import app.cash.backfila.client.misk.MiskBackfillModule | ||
import misk.inject.KAbstractModule | ||
|
||
class BackfillsModule : KAbstractModule() { | ||
override fun configure() { | ||
install( | ||
MiskBackfillModule( | ||
BackfilaHttpClientConfig( | ||
url = "test.url", slack_channel = "#test" | ||
), | ||
dependsOn = listOf() | ||
) | ||
) | ||
install(SpannerBackfillModule.create<SpannerBackfillTest.MakeTracksExplicitBackfill>()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we limit our use of misk at least in non-test? Do we really need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking through your code I think these only need to be testImplementation dependencies. Let's move those dependencies to test, rename the module, and add a comment so they don't leak to the main implementation.