diff --git a/buildSrc/src/main/kotlin/Dependencies.kt b/buildSrc/src/main/kotlin/Dependencies.kt index 9ab518e33..d0b95f884 100644 --- a/buildSrc/src/main/kotlin/Dependencies.kt +++ b/buildSrc/src/main/kotlin/Dependencies.kt @@ -1,6 +1,6 @@ object Dependencies { val apacheCommonsLang3 = "org.apache.commons:commons-lang3:3.11" - val assertj = "org.assertj:assertj-core:3.16.1" + val assertj = "org.assertj:assertj-core:3.20.2" val awsDynamodb = "com.amazonaws:aws-java-sdk-dynamodb:1.11.774" val flywayGradleBuildscriptDep = "gradle.plugin.com.boxfuse.client:flyway-release:5.0.2" val guava = "com.google.guava:guava:30.0-jre" diff --git a/service/src/main/kotlin/app/cash/backfila/BackfillCreator.kt b/service/src/main/kotlin/app/cash/backfila/BackfillCreator.kt index 18134eaac..9dd4c0e23 100644 --- a/service/src/main/kotlin/app/cash/backfila/BackfillCreator.kt +++ b/service/src/main/kotlin/app/cash/backfila/BackfillCreator.kt @@ -6,6 +6,7 @@ import app.cash.backfila.protos.clientservice.PrepareBackfillRequest import app.cash.backfila.protos.clientservice.PrepareBackfillResponse import app.cash.backfila.protos.service.CreateBackfillRequest import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillPartitionState import app.cash.backfila.service.persistence.BackfillState import app.cash.backfila.service.persistence.DbBackfillRun import app.cash.backfila.service.persistence.DbRegisteredBackfill @@ -84,12 +85,13 @@ class BackfillCreator @Inject constructor( ) session.save(backfillRun) + check(backfillRun.state == BackfillState.PAUSED) for (partition in partitions) { val dbRunPartition = DbRunPartition( backfillRun.id, partition.partition_name, partition.backfill_range ?: KeyRange.Builder().build(), - backfillRun.state, + BackfillPartitionState.PAUSED, partition.estimated_record_count ) session.save(dbRunPartition) @@ -125,21 +127,7 @@ class BackfillCreator @Inject constructor( logger.info(e) { "PrepareBackfill on `$service` failed" } throw BadRequestException("PrepareBackfill on `$service` failed: ${e.message}", e) } - - val partitions = prepareBackfillResponse.partitions - if (partitions.isEmpty()) { - throw BadRequestException("PrepareBackfill returned no partitions") - } - if (partitions.any { it.partition_name == null }) { - throw BadRequestException("PrepareBackfill returned unnamed partitions") - } - if (partitions.distinctBy { it.partition_name }.size != partitions.size) { - throw BadRequestException( - "PrepareBackfill did not return distinct partition names:" + - " ${partitions.map { it.partition_name }}" - ) - } - + prepareBackfillResponse.validate() return prepareBackfillResponse } @@ -181,5 +169,21 @@ class BackfillCreator @Inject constructor( companion object { private val logger = getLogger() + + fun PrepareBackfillResponse.validate() { + val partitions = this.partitions + if (partitions.isEmpty()) { + throw BadRequestException("PrepareBackfill returned no partitions") + } + if (partitions.any { it.partition_name == null }) { + throw BadRequestException("PrepareBackfill returned unnamed partitions") + } + if (partitions.distinctBy { it.partition_name }.size != partitions.size) { + throw BadRequestException( + "PrepareBackfill did not return distinct partition names:" + + " ${partitions.map { it.partition_name }}" + ) + } + } } } diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/BackfillStateToggler.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/BackfillStateToggler.kt index a6b510fbf..34fb98360 100644 --- a/service/src/main/kotlin/app/cash/backfila/dashboard/BackfillStateToggler.kt +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/BackfillStateToggler.kt @@ -3,6 +3,7 @@ package app.cash.backfila.dashboard import app.cash.backfila.service.SlackHelper import app.cash.backfila.service.persistence.BackfilaDb import app.cash.backfila.service.persistence.BackfillState +import app.cash.backfila.service.persistence.BackfillState.CANCELLED import app.cash.backfila.service.persistence.BackfillState.PAUSED import app.cash.backfila.service.persistence.BackfillState.RUNNING import app.cash.backfila.service.persistence.DbBackfillRun @@ -25,7 +26,10 @@ class BackfillStateToggler @Inject constructor( val requiredCurrentState = when (desiredState) { PAUSED -> RUNNING RUNNING -> PAUSED - else -> throw IllegalArgumentException("can only toggle to RUNNING or PAUSED") + CANCELLED -> PAUSED + else -> throw IllegalArgumentException( + "can only toggle between RUNNING and PAUSED or cancel a PAUSED run. " + ) } transacter.transaction { session -> @@ -46,22 +50,27 @@ class BackfillStateToggler @Inject constructor( } run.setState(session, queryFactory, desiredState) - val startedOrStopped = if (desiredState == RUNNING) "started" else "stopped" + val action = when (desiredState) { + PAUSED -> "stopped" + RUNNING -> "started" + CANCELLED -> "cancelled" + else -> desiredState.name + } session.save( DbEventLog( run.id, partition_id = null, user = caller.principal, type = DbEventLog.Type.STATE_CHANGE, - message = "backfill $startedOrStopped" + message = "backfill $action" ) ) } - if (desiredState == RUNNING) { - slackHelper.runStarted(Id(id), caller.principal) - } else { - slackHelper.runPaused(Id(id), caller.principal) + when (desiredState) { + RUNNING -> slackHelper.runStarted(Id(id), caller.principal) + PAUSED -> slackHelper.runPaused(Id(id), caller.principal) + CANCELLED -> slackHelper.runCancelled(Id(id), caller.principal) } } diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/CancelBackfillAction.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/CancelBackfillAction.kt new file mode 100644 index 000000000..ba46aac6c --- /dev/null +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/CancelBackfillAction.kt @@ -0,0 +1,43 @@ +package app.cash.backfila.dashboard + +import app.cash.backfila.service.persistence.BackfillState +import javax.inject.Inject +import misk.MiskCaller +import misk.scope.ActionScoped +import misk.security.authz.Authenticated +import misk.web.PathParam +import misk.web.Post +import misk.web.RequestBody +import misk.web.RequestContentType +import misk.web.ResponseContentType +import misk.web.actions.WebAction +import misk.web.mediatype.MediaTypes +import wisp.logging.getLogger + +class CancelBackfillRequest +class CancelBackfillResponse + +class CancelBackfillAction @Inject constructor( + private val caller: @JvmSuppressWildcards ActionScoped, + private val backfillStateToggler: BackfillStateToggler +) : WebAction { + + @Post("/backfills/{id}/cancel") + @RequestContentType(MediaTypes.APPLICATION_JSON) + @ResponseContentType(MediaTypes.APPLICATION_JSON) + // TODO allow any user + @Authenticated(capabilities = ["users"]) + fun cancel( + @PathParam id: Long, + @RequestBody request: CancelBackfillRequest + ): CancelBackfillResponse { + // TODO check user has permissions for this service with access api + logger.info { "Canceling backfill $id by ${caller.get()?.user}" } + backfillStateToggler.toggleRunningState(id, caller.get()!!, BackfillState.CANCELLED) + return CancelBackfillResponse() + } + + companion object { + private val logger = getLogger() + } +} diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/CloneBackfillAction.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/CloneBackfillAction.kt index eef6e288f..eeaac4d69 100644 --- a/service/src/main/kotlin/app/cash/backfila/dashboard/CloneBackfillAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/CloneBackfillAction.kt @@ -1,11 +1,13 @@ package app.cash.backfila.dashboard +import app.cash.backfila.BackfillCreator.Companion.validate import app.cash.backfila.client.ConnectorProvider import app.cash.backfila.protos.clientservice.KeyRange import app.cash.backfila.protos.clientservice.PrepareBackfillRequest import app.cash.backfila.protos.clientservice.PrepareBackfillResponse import app.cash.backfila.service.persistence.BackfilaDb import app.cash.backfila.service.persistence.BackfillState +import app.cash.backfila.service.persistence.BackfillState.Companion.getPartitionState import app.cash.backfila.service.persistence.DbBackfillRun import app.cash.backfila.service.persistence.DbRegisteredBackfill import app.cash.backfila.service.persistence.DbRunPartition @@ -138,13 +140,14 @@ class CloneBackfillAction @Inject constructor( ) session.save(backfillRun) + check(backfillRun.state == BackfillState.PAUSED) if (request.range_clone_type == RangeCloneType.NEW) { for (partition in partitions) { val dbRunPartition = DbRunPartition( backfillRun.id, partition.partition_name, partition.backfill_range ?: KeyRange.Builder().build(), - backfillRun.state, + backfillRun.state.getPartitionState(), partition.estimated_record_count ) session.save(dbRunPartition) @@ -161,12 +164,13 @@ class CloneBackfillAction @Inject constructor( ) } + check(backfillRun.state == BackfillState.PAUSED) for (sourcePartition in sourcePartitions) { val dbRunPartition = DbRunPartition( backfillRun.id, sourcePartition.partition_name, sourcePartition.backfillRange(), - backfillRun.state, + backfillRun.state.getPartitionState(), sourcePartition.estimated_record_count ) // Copy the cursor if continuing, otherwise just leave blank to start from beginning. @@ -203,19 +207,7 @@ class CloneBackfillAction @Inject constructor( logger.info(e) { "PrepareBackfill on `${dbData.serviceName}` failed" } throw BadRequestException("PrepareBackfill on `${dbData.serviceName}` failed: ${e.message}", e) } - val partitions = prepareBackfillResponse.partitions - if (partitions.isEmpty()) { - throw BadRequestException("PrepareBackfill returned no partitions") - } - if (partitions.any { it.partition_name == null }) { - throw BadRequestException("PrepareBackfill returned unnamed partitions") - } - if (partitions.distinctBy { it.partition_name }.size != partitions.size) { - throw BadRequestException( - "PrepareBackfill did not return distinct partition names:" + - " ${partitions.map { it.partition_name }}" - ) - } + prepareBackfillResponse.validate() return prepareBackfillResponse } diff --git a/service/src/main/kotlin/app/cash/backfila/dashboard/GetBackfillStatusAction.kt b/service/src/main/kotlin/app/cash/backfila/dashboard/GetBackfillStatusAction.kt index 20855ff2f..a672900d2 100644 --- a/service/src/main/kotlin/app/cash/backfila/dashboard/GetBackfillStatusAction.kt +++ b/service/src/main/kotlin/app/cash/backfila/dashboard/GetBackfillStatusAction.kt @@ -1,6 +1,7 @@ package app.cash.backfila.dashboard import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillPartitionState import app.cash.backfila.service.persistence.BackfillState import app.cash.backfila.service.persistence.DbBackfillRun import app.cash.backfila.service.persistence.DbEventLog @@ -26,7 +27,7 @@ import misk.web.mediatype.MediaTypes data class UiPartition( val id: Long, val name: String, - val state: BackfillState, + val state: BackfillPartitionState, val pkey_cursor: String?, val pkey_start: String?, val pkey_end: String?, @@ -105,7 +106,7 @@ class GetBackfillStatusAction @Inject constructor( UiPartition( partition.id.id, partition.partition_name, - partition.run_state, + partition.partition_state, partition.pkey_cursor?.utf8(), partition.pkey_range_start?.utf8(), partition.pkey_range_end?.utf8(), diff --git a/service/src/main/kotlin/app/cash/backfila/service/SlackHelper.kt b/service/src/main/kotlin/app/cash/backfila/service/SlackHelper.kt index 1aa90965a..0fc8c41ca 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/SlackHelper.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/SlackHelper.kt @@ -33,6 +33,15 @@ class SlackHelper @Inject constructor( slackClient.postMessage("Backfila", ":backfila:", message, channel) } + fun runCancelled(id: Id, user: String) { + val (message, channel) = transacter.transaction { session -> + val run = session.load(id) + val message = ":backfila_cancel:${dryRunEmoji(run)} ${nameAndId(run)} cancelled by @$user" + message to run.service.slack_channel + } + slackClient.postMessage("Backfila", ":backfila:", message, channel) + } + fun runErrored(id: Id) { val (message, channel) = transacter.transaction { session -> val run = session.load(id) diff --git a/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillPartitionState.kt b/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillPartitionState.kt new file mode 100644 index 000000000..768e6d780 --- /dev/null +++ b/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillPartitionState.kt @@ -0,0 +1,13 @@ +package app.cash.backfila.service.persistence + +enum class BackfillPartitionState { + PAUSED, // A resumable backfill partition that is not currently meant to be running + RUNNING, // A backfill partition that is allowed to run. + COMPLETE, // A completed partition, this is a final non-resumable state. + STALE, // A partition that is no longer relevant, this is a final non-resumable state (possibly a split partition) + CANCELLED; // A partition that has been manually cancelled, this is a final non-resumable state. + + companion object { + val FINAL_STATES = setOf(STALE, CANCELLED, COMPLETE) + } +} diff --git a/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillState.kt b/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillState.kt index 369c7cf6f..33a38202e 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillState.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/persistence/BackfillState.kt @@ -1,7 +1,22 @@ package app.cash.backfila.service.persistence enum class BackfillState { - PAUSED, - RUNNING, - COMPLETE + PAUSED, // A resumable backfill that is not currently meant to be running + RUNNING, // A backfill that is allowed to run. + COMPLETE, // A completed backfill, this is a final non-resumable state. + CANCELLED; // A backfill that has been manually cancelled, this is a final non-resumable state. + + companion object { + val FINAL_STATES = setOf(CANCELLED, COMPLETE) + + /** + * When the Backfill state changes modify the underlying partitions to these corresponding states. + */ + fun BackfillState.getPartitionState() = when (this) { + PAUSED -> BackfillPartitionState.PAUSED + RUNNING -> BackfillPartitionState.RUNNING + CANCELLED -> BackfillPartitionState.CANCELLED + COMPLETE -> BackfillPartitionState.COMPLETE + } + } } diff --git a/service/src/main/kotlin/app/cash/backfila/service/persistence/DbBackfillRun.kt b/service/src/main/kotlin/app/cash/backfila/service/persistence/DbBackfillRun.kt index 6734d275d..03ca52a60 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/persistence/DbBackfillRun.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/persistence/DbBackfillRun.kt @@ -1,5 +1,6 @@ package app.cash.backfila.service.persistence +import app.cash.backfila.service.persistence.BackfillState.Companion.getPartitionState import com.google.common.base.Preconditions.checkState import java.time.Instant import javax.persistence.Column @@ -130,18 +131,18 @@ class DbBackfillRun() : DbUnsharded, DbTimestampedEntity { .list(session) fun setState(session: Session, queryFactory: Query.Factory, state: BackfillState) { - // State can't be changed after being completed. - checkState(this.state != BackfillState.COMPLETE) + // Backfills in final states cannot change. + checkState(!BackfillState.FINAL_STATES.contains(this.state)) this.state = state // Set the state of all the partitions that are not complete val query = session.hibernateSession.createQuery( "update DbRunPartition " + - "set run_state = :newState, version = version + 1 " + - "where backfill_run_id = :runId and run_state <> :completed" + "set partition_state = :newPartitionState, version = version + 1 " + + "where backfill_run_id = :runId and partition_state not in ( :finalPartitionStates )" ) query.setParameter("runId", id) - query.setParameter("newState", state) - query.setParameter("completed", BackfillState.COMPLETE) + query.setParameter("newPartitionState", state.getPartitionState()) + query.setParameterList("finalPartitionStates", BackfillPartitionState.FINAL_STATES) query.executeUpdate() } diff --git a/service/src/main/kotlin/app/cash/backfila/service/persistence/DbRunPartition.kt b/service/src/main/kotlin/app/cash/backfila/service/persistence/DbRunPartition.kt index e183d2ad5..ce0a88e78 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/persistence/DbRunPartition.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/persistence/DbRunPartition.kt @@ -55,7 +55,7 @@ class DbRunPartition() : DbUnsharded, DbTimestampedEntity { */ @Column(nullable = false) @Enumerated(EnumType.STRING) - lateinit var run_state: BackfillState + lateinit var partition_state: BackfillPartitionState @Column var lease_token: String? = null @@ -120,14 +120,14 @@ class DbRunPartition() : DbUnsharded, DbTimestampedEntity { backfill_run_id: Id, partition_name: String, backfill_range: KeyRange, - run_state: BackfillState, + run_state: BackfillPartitionState, estimated_record_count: Long? ) : this() { this.backfill_run_id = backfill_run_id this.partition_name = partition_name this.pkey_range_start = backfill_range.start this.pkey_range_end = backfill_range.end - this.run_state = run_state + this.partition_state = run_state this.lease_expires_at = Instant.ofEpochSecond(1L) this.estimated_record_count = estimated_record_count } diff --git a/service/src/main/kotlin/app/cash/backfila/service/persistence/RunPartitionQuery.kt b/service/src/main/kotlin/app/cash/backfila/service/persistence/RunPartitionQuery.kt index 29c14fb6a..a93699616 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/persistence/RunPartitionQuery.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/persistence/RunPartitionQuery.kt @@ -14,8 +14,8 @@ interface RunPartitionQuery : Query { @Constraint("backfill_run_id", Operator.IN) fun backfillRunIdIn(backfillRunIds: Collection>): RunPartitionQuery - @Constraint("run_state") - fun runState(runState: BackfillState): RunPartitionQuery + @Constraint("partition_state") + fun partitionState(runState: BackfillPartitionState): RunPartitionQuery @Constraint("lease_expires_at", Operator.LT) fun leaseExpiresAtBefore(time: Instant): RunPartitionQuery diff --git a/service/src/main/kotlin/app/cash/backfila/service/runner/BackfillRunner.kt b/service/src/main/kotlin/app/cash/backfila/service/runner/BackfillRunner.kt index e99b4b705..0df5e29df 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/runner/BackfillRunner.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/runner/BackfillRunner.kt @@ -9,6 +9,7 @@ import app.cash.backfila.protos.clientservice.RunBatchResponse import app.cash.backfila.service.BackfilaMetrics import app.cash.backfila.service.SlackHelper import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillPartitionState import app.cash.backfila.service.persistence.BackfillState import app.cash.backfila.service.persistence.DbBackfillRun import app.cash.backfila.service.persistence.DbEventLog @@ -169,7 +170,7 @@ class BackfillRunner private constructor( batchAwaiter.updateProgress(dbRunPartition) // Now that state is stored, check if we should exit. - if (dbRunPartition.run_state != BackfillState.RUNNING) { + if (dbRunPartition.partition_state != BackfillPartitionState.RUNNING) { logger.info { "Backfill is no longer in RUNNING state, stopping runner ${logLabel()}" } running = false return@transaction diff --git a/service/src/main/kotlin/app/cash/backfila/service/runner/statemachine/BatchAwaiter.kt b/service/src/main/kotlin/app/cash/backfila/service/runner/statemachine/BatchAwaiter.kt index f6fe4c698..f52c4a71d 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/runner/statemachine/BatchAwaiter.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/runner/statemachine/BatchAwaiter.kt @@ -2,7 +2,7 @@ package app.cash.backfila.service.runner.statemachine import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse import app.cash.backfila.protos.clientservice.RunBatchResponse -import app.cash.backfila.service.persistence.BackfillState +import app.cash.backfila.service.persistence.BackfillPartitionState import app.cash.backfila.service.persistence.DbEventLog import app.cash.backfila.service.persistence.DbRunPartition import app.cash.backfila.service.runner.BackfillRunner @@ -171,7 +171,7 @@ class BatchAwaiter( private fun completePartition() { val runComplete = backfillRunner.factory.transacter.transaction { session -> val dbRunPartition = session.load(backfillRunner.partitionId) - dbRunPartition.run_state = BackfillState.COMPLETE + dbRunPartition.partition_state = BackfillPartitionState.COMPLETE updateProgress(dbRunPartition) session.save( @@ -190,7 +190,7 @@ class BatchAwaiter( session, backfillRunner.factory.queryFactory ) - if (partitions.all { it.run_state == BackfillState.COMPLETE }) { + if (partitions.all { it.partition_state == BackfillPartitionState.COMPLETE }) { dbRunPartition.backfill_run.complete() logger.info { "Backfill ${backfillRunner.backfillName} completed" } diff --git a/service/src/main/kotlin/app/cash/backfila/service/scheduler/LeaseHunter.kt b/service/src/main/kotlin/app/cash/backfila/service/scheduler/LeaseHunter.kt index 860836b78..313262b35 100644 --- a/service/src/main/kotlin/app/cash/backfila/service/scheduler/LeaseHunter.kt +++ b/service/src/main/kotlin/app/cash/backfila/service/scheduler/LeaseHunter.kt @@ -1,7 +1,7 @@ package app.cash.backfila.service.scheduler import app.cash.backfila.service.persistence.BackfilaDb -import app.cash.backfila.service.persistence.BackfillState +import app.cash.backfila.service.persistence.BackfillPartitionState import app.cash.backfila.service.persistence.RunPartitionQuery import app.cash.backfila.service.runner.BackfillRunner import java.time.Clock @@ -26,7 +26,7 @@ class LeaseHunter @Inject constructor( // will win the lease. return transacter.transaction { session -> val unleasedPartitions = queryFactory.newQuery() - .runState(BackfillState.RUNNING) + .partitionState(BackfillPartitionState.RUNNING) .leaseExpiresAtBefore(clock.instant()) .list(session) diff --git a/service/src/main/resources/migrations/v14__backfila.sql b/service/src/main/resources/migrations/v14__backfila.sql new file mode 100644 index 000000000..991339453 --- /dev/null +++ b/service/src/main/resources/migrations/v14__backfila.sql @@ -0,0 +1,4 @@ +ALTER TABLE run_partitions + CHANGE `run_state` `partition_state` enum('PAUSED','RUNNING','COMPLETE','STALE','CANCELLED') NOT NULL, + DROP KEY idx_run_state_lease_expires_at, + ADD KEY `idx_partition_state_lease_expires_at` (`partition_state`,`lease_expires_at`); \ No newline at end of file diff --git a/service/src/main/resources/migrations/v15__backfila.sql b/service/src/main/resources/migrations/v15__backfila.sql new file mode 100644 index 000000000..bc0e3be3e --- /dev/null +++ b/service/src/main/resources/migrations/v15__backfila.sql @@ -0,0 +1,2 @@ +ALTER TABLE backfill_runs + MODIFY COLUMN `state` enum('PAUSED','RUNNING','COMPLETE','CANCELLED') NOT NULL; \ No newline at end of file diff --git a/service/src/test/kotlin/app/cash/backfila/TestTools.kt b/service/src/test/kotlin/app/cash/backfila/TestTools.kt index ae3648c2c..7a2403751 100644 --- a/service/src/test/kotlin/app/cash/backfila/TestTools.kt +++ b/service/src/test/kotlin/app/cash/backfila/TestTools.kt @@ -1,8 +1,13 @@ package app.cash.backfila +import app.cash.backfila.dashboard.UiEventLog +import app.cash.backfila.service.persistence.DbEventLog import misk.MiskCaller import misk.inject.keyOf import misk.scope.ActionScope +import org.assertj.core.api.Assertions +import org.assertj.core.api.Condition +import org.assertj.core.api.SoftAssertions fun ActionScope.fakeCaller( service: String? = null, @@ -12,3 +17,24 @@ fun ActionScope.fakeCaller( return enter(mapOf(keyOf() to MiskCaller(service = service, user = user))) .use { function() } } + +// For useful soft assertions without forgetting to call assertAll() +fun softAssert(assertions: SoftAssertions.() -> Unit) { + with(SoftAssertions()) { + assertions() + assertAll() + } +} + +// Makes asserting on UiEventLog entries easy. +fun uiEventLogWith( + type: DbEventLog.Type, + user: String?, + message: String +): Condition { + return Assertions.allOf( + Condition({ it.type == type }, "uiEventLog.type"), + Condition({ it.user == user }, "uiEventLog.user"), + Condition({ it.message == message }, "uiEventLog.message") + ) +} diff --git a/service/src/test/kotlin/app/cash/backfila/actions/CancelBackfillActionTest.kt b/service/src/test/kotlin/app/cash/backfila/actions/CancelBackfillActionTest.kt new file mode 100644 index 000000000..6352f17f2 --- /dev/null +++ b/service/src/test/kotlin/app/cash/backfila/actions/CancelBackfillActionTest.kt @@ -0,0 +1,182 @@ +package app.cash.backfila.actions + +import app.cash.backfila.BackfilaTestingModule +import app.cash.backfila.api.ConfigureServiceAction +import app.cash.backfila.client.Connectors +import app.cash.backfila.client.FakeBackfilaClientServiceClient +import app.cash.backfila.dashboard.CancelBackfillAction +import app.cash.backfila.dashboard.CancelBackfillRequest +import app.cash.backfila.dashboard.CreateBackfillAction +import app.cash.backfila.dashboard.GetBackfillStatusAction +import app.cash.backfila.dashboard.StartBackfillAction +import app.cash.backfila.dashboard.StartBackfillRequest +import app.cash.backfila.dashboard.StopBackfillAction +import app.cash.backfila.fakeCaller +import app.cash.backfila.protos.service.ConfigureServiceRequest +import app.cash.backfila.protos.service.CreateBackfillRequest +import app.cash.backfila.protos.service.CreateBackfillResponse +import app.cash.backfila.protos.service.Parameter +import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillPartitionState +import app.cash.backfila.service.persistence.BackfillRunQuery +import app.cash.backfila.service.persistence.BackfillState +import app.cash.backfila.service.persistence.RunPartitionQuery +import app.cash.backfila.service.scheduler.LeaseHunter +import com.google.inject.Module +import misk.exceptions.BadRequestException +import javax.inject.Inject +import misk.hibernate.Query +import misk.hibernate.Transacter +import misk.hibernate.newQuery +import misk.scope.ActionScope +import misk.testing.MiskTest +import misk.testing.MiskTestModule +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import kotlin.test.assertNotNull + +@MiskTest(startService = true) +class CancelBackfillActionTest { + @Suppress("unused") + @MiskTestModule val module: Module = BackfilaTestingModule() + + @Inject lateinit var configureServiceAction: ConfigureServiceAction + @Inject lateinit var createBackfillAction: CreateBackfillAction + @Inject lateinit var startBackfillAction: StartBackfillAction + @Inject lateinit var stopBackfillAction: StopBackfillAction + @Inject lateinit var getBackfillStatusAction: GetBackfillStatusAction + @Inject lateinit var cancelBackfillAction: CancelBackfillAction + + @Inject lateinit var scope: ActionScope + @Inject lateinit var leaseHunter: LeaseHunter + + @Inject @BackfilaDb lateinit var transacter: Transacter + @Inject lateinit var queryFactory: Query.Factory + + @Inject lateinit var fakeBackfilaClientServiceClient: FakeBackfilaClientServiceClient + + @BeforeEach + fun setup() { + scope.fakeCaller(service = "deep-fryer") { + configureServiceAction.configureService( + ConfigureServiceRequest.Builder() + .backfills( + listOf( + ConfigureServiceRequest.BackfillData.Builder() + .name("ChickenSandwich") + .parameters( + listOf( + Parameter.Builder() + .name("param1") + .build() + ) + ) + .build() + ) + ) + .connector_type(Connectors.ENVOY) + .build() + ) + } + } + + @Test fun `cancel newly created backfill`() { + val createdBackfill = createBackfillRun(false) + + scope.fakeCaller(user = "molly") { + cancelBackfillAction.cancel( + createdBackfill.backfill_run_id, + CancelBackfillRequest() + ) + } + + transacter.transaction { session -> + val run = queryFactory.newQuery().uniqueResult(session) + assertNotNull(run) + assertThat(run.state).isEqualTo(BackfillState.CANCELLED) + assertThat(run.created_by_user).isEqualTo("molly") + assertThat(createdBackfill.backfill_run_id).isEqualTo(run.id.id) + + val partitions = queryFactory.newQuery() + .backfillRunId(run.id) + .orderByName() + .list(session) + assertThat(partitions).hasSize(2) + assertThat(partitions).allSatisfy { + assertThat(it.partition_state).isEqualTo(BackfillPartitionState.CANCELLED) + assertThat(it.lease_token).isNull() + } + } + } + + @Test fun `cannot cancel running backfill`() { + val createdBackfill = createBackfillRun(true) + + val exception = assertThrows { + scope.fakeCaller(user = "molly") { + cancelBackfillAction.cancel( + createdBackfill.backfill_run_id, + CancelBackfillRequest() + ) + } + } + assertThat(exception).hasMessageEndingWith("isn't PAUSED, can't move to state CANCELLED") + } + + @Test fun `cancel stopped in progress backfill`() { + val createdBackfill = createBackfillRun(true) + + val exception = assertThrows { + scope.fakeCaller(user = "molly") { + cancelBackfillAction.cancel( + createdBackfill.backfill_run_id, + CancelBackfillRequest() + ) + } + } + assertThat(exception).hasMessageEndingWith("isn't PAUSED, can't move to state CANCELLED") + } + + private fun createBackfillRun(startBackfill: Boolean): CreateBackfillResponse { + scope.fakeCaller(service = "deep-fryer") { + configureServiceAction.configureService( + ConfigureServiceRequest.Builder() + .backfills( + listOf( + ConfigureServiceRequest.BackfillData( + "ChickenSandwich", "Description", listOf(), null, + null, false + ) + ) + ) + .connector_type(Connectors.ENVOY) + .build() + ) + } + val createdBackfill = scope.fakeCaller(user = "molly") { + createBackfillAction.create( + "deep-fryer", + CreateBackfillRequest.Builder() + .backfill_name("ChickenSandwich") + .build() + ) + } + + if (startBackfill) { + scope.fakeCaller(user = "molly") { + startBackfillAction.start(createdBackfill.backfill_run_id, StartBackfillRequest()) + } + leaseHunter.hunt().single() + } + return createdBackfill + } +/* + BREAKING CHANGE + TO tell us to add more tests. + 1 - progress has been made but still running both get cancelled + 2 - double cancelling ? what does it do? + 3 - one partition is complete. It should stay complete. + 4 - Add a stale test that will turn into a split partition test later that shows that stale does not change either. */ +} diff --git a/service/src/test/kotlin/app/cash/backfila/actions/CreateBackfillActionTest.kt b/service/src/test/kotlin/app/cash/backfila/actions/CreateBackfillActionTest.kt index 4dc61d900..b18aefc9d 100644 --- a/service/src/test/kotlin/app/cash/backfila/actions/CreateBackfillActionTest.kt +++ b/service/src/test/kotlin/app/cash/backfila/actions/CreateBackfillActionTest.kt @@ -11,6 +11,7 @@ import app.cash.backfila.protos.clientservice.PrepareBackfillResponse import app.cash.backfila.protos.service.ConfigureServiceRequest import app.cash.backfila.protos.service.CreateBackfillRequest import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillPartitionState import app.cash.backfila.service.persistence.BackfillRunQuery import app.cash.backfila.service.persistence.BackfillState import app.cash.backfila.service.persistence.RunPartitionQuery @@ -132,10 +133,10 @@ class CreateBackfillActionTest { assertThat(partitions).hasSize(2) assertThat(partitions[0].partition_name).isEqualTo("-80") assertThat(partitions[0].lease_token).isNull() - assertThat(partitions[0].run_state).isEqualTo(BackfillState.PAUSED) + assertThat(partitions[0].partition_state).isEqualTo(BackfillPartitionState.PAUSED) assertThat(partitions[1].partition_name).isEqualTo("80-") assertThat(partitions[1].lease_token).isNull() - assertThat(partitions[1].run_state).isEqualTo(BackfillState.PAUSED) + assertThat(partitions[1].partition_state).isEqualTo(BackfillPartitionState.PAUSED) } } } diff --git a/service/src/test/kotlin/app/cash/backfila/actions/StartStopBackfillActionTest.kt b/service/src/test/kotlin/app/cash/backfila/actions/StartStopBackfillActionTest.kt index 42e0201ea..416b7dab7 100644 --- a/service/src/test/kotlin/app/cash/backfila/actions/StartStopBackfillActionTest.kt +++ b/service/src/test/kotlin/app/cash/backfila/actions/StartStopBackfillActionTest.kt @@ -3,6 +3,7 @@ package app.cash.backfila.actions import app.cash.backfila.BackfilaTestingModule import app.cash.backfila.api.ConfigureServiceAction import app.cash.backfila.client.Connectors +import app.cash.backfila.client.FakeBackfilaClientServiceClient import app.cash.backfila.dashboard.CreateBackfillAction import app.cash.backfila.dashboard.GetBackfillRunsAction import app.cash.backfila.dashboard.GetBackfillStatusAction @@ -14,11 +15,13 @@ import app.cash.backfila.fakeCaller import app.cash.backfila.protos.service.ConfigureServiceRequest import app.cash.backfila.protos.service.CreateBackfillRequest import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillPartitionState import app.cash.backfila.service.persistence.BackfillState import app.cash.backfila.service.persistence.DbBackfillRun +import app.cash.backfila.service.persistence.DbEventLog +import app.cash.backfila.softAssert +import app.cash.backfila.uiEventLogWith import com.google.inject.Module -import javax.inject.Inject -import kotlin.test.assertNotNull import misk.exceptions.BadRequestException import misk.hibernate.Id import misk.hibernate.Query @@ -27,128 +30,162 @@ import misk.hibernate.load import misk.scope.ActionScope import misk.testing.MiskTest import misk.testing.MiskTestModule +import org.assertj.core.api.Assertions import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test +import javax.inject.Inject +import kotlin.test.assertNotNull @MiskTest(startService = true) class StartStopBackfillActionTest { @Suppress("unused") - @MiskTestModule - val module: Module = BackfilaTestingModule() + @MiskTestModule val module: Module = BackfilaTestingModule() - @Inject - lateinit var configureServiceAction: ConfigureServiceAction + @Inject lateinit var configureServiceAction: ConfigureServiceAction - @Inject - lateinit var createBackfillAction: CreateBackfillAction + @Inject lateinit var createBackfillAction: CreateBackfillAction - @Inject - lateinit var startBackfillAction: StartBackfillAction + @Inject lateinit var startBackfillAction: StartBackfillAction - @Inject - lateinit var stopBackfillAction: StopBackfillAction + @Inject lateinit var stopBackfillAction: StopBackfillAction - @Inject - lateinit var getBackfillRunsAction: GetBackfillRunsAction + @Inject lateinit var getBackfillRunsAction: GetBackfillRunsAction - @Inject - lateinit var getBackfillStatusAction: GetBackfillStatusAction + @Inject lateinit var getBackfillStatusAction: GetBackfillStatusAction - @Inject - lateinit var queryFactory: Query.Factory + @Inject lateinit var queryFactory: Query.Factory - @Inject - lateinit var scope: ActionScope + @Inject lateinit var scope: ActionScope - @Inject - @BackfilaDb - lateinit var transacter: Transacter + @Inject @BackfilaDb lateinit var transacter: Transacter - @Test - fun startAndStop() { + @Inject lateinit var fakeBackfilaClientServiceClient: FakeBackfilaClientServiceClient + + @BeforeEach + fun setup() { + // Always configure the backfills before any test scope.fakeCaller(service = "deep-fryer") { configureServiceAction.configureService( ConfigureServiceRequest.Builder() .backfills( listOf( - ConfigureServiceRequest.BackfillData( - "ChickenSandwich", "Description", listOf(), null, - null, false - ) + ConfigureServiceRequest.BackfillData.Builder() + .name("ChickenSandwich") + .description("Description") + .build(), + ConfigureServiceRequest.BackfillData.Builder() + .name("BeefSandwich") + .description("Description") + .build() ) ) .connector_type(Connectors.ENVOY) .build() ) } + } + + @Test fun `Backfila starts with no backfills`() { scope.fakeCaller(user = "molly") { var backfillRuns = getBackfillRunsAction.backfillRuns("deep-fryer") - assertThat(backfillRuns.paused_backfills).hasSize(0) - assertThat(backfillRuns.running_backfills).hasSize(0) + softAssert { + assertThat(backfillRuns.paused_backfills).hasSize(0) + assertThat(backfillRuns.running_backfills).hasSize(0) + } + } + } - val response = createBackfillAction.create( - "deep-fryer", - CreateBackfillRequest.Builder() - .backfill_name("ChickenSandwich") - .build() - ) + @Nested inner class `After a backfill is created` { + var createdBackfillRunId: Long = 0 // lateinit won't work on primitives - backfillRuns = getBackfillRunsAction.backfillRuns("deep-fryer") - assertThat(backfillRuns.paused_backfills).hasSize(1) - assertThat(backfillRuns.running_backfills).hasSize(0) + @BeforeEach fun createBackfill() { + scope.fakeCaller(user = "molly") { + val response = createBackfillAction.create( + "deep-fryer", + CreateBackfillRequest.Builder() + .backfill_name("ChickenSandwich") + .build() + ) + createdBackfillRunId = response.backfill_run_id + } + } - val id = response.backfill_run_id - assertThat(backfillRuns.paused_backfills[0].id).isEqualTo(id.toString()) - startBackfillAction.start(id, StartBackfillRequest()) + @Test fun `The backfill starts out paused`() { + scope.fakeCaller(user = "molly") { + var backfillRuns = getBackfillRunsAction.backfillRuns("deep-fryer") + softAssert { + assertThat(backfillRuns.paused_backfills).hasSize(1) + assertThat(backfillRuns.running_backfills).hasSize(0) + assertThat(backfillRuns.paused_backfills).singleElement().extracting { it.id } + .isEqualTo(createdBackfillRunId.toString()) + } + } + } + + @Nested inner class `And then the backfill is started` { + @BeforeEach fun startBackfill() { + scope.fakeCaller(user = "molly") { + startBackfillAction.start(createdBackfillRunId, StartBackfillRequest()) + } + } + + @Test fun `the Backfill is running`() { + softAssert { + var status = getBackfillStatusAction.status(createdBackfillRunId) + assertThat(status.state).isEqualTo(BackfillState.RUNNING) + assertThat(status.partitions).extracting { it.state } + .containsOnly(BackfillPartitionState.RUNNING) + + assertThat(status.event_logs).first().`is`( + uiEventLogWith( + type = DbEventLog.Type.STATE_CHANGE, + user = "molly", + message = "backfill started" + ) + ) + + val backfillRuns = getBackfillRunsAction.backfillRuns("deep-fryer") + assertThat(backfillRuns.paused_backfills).hasSize(0) + assertThat(backfillRuns.running_backfills).hasSize(1) + } + } + + @Nested inner class `And then the backfill is stopped` { + @BeforeEach fun stopBackfill() { + scope.fakeCaller(user = "molly") { + stopBackfillAction.stop(createdBackfillRunId, StopBackfillRequest()) + } + } - var status = getBackfillStatusAction.status(id) - assertThat(status.state).isEqualTo(BackfillState.RUNNING) - assertThat(status.partitions.map { it.state }) - .containsOnly(BackfillState.RUNNING) - assertThat(status.event_logs[0].message).isEqualTo("backfill started") - assertThat(status.event_logs[0].user).isEqualTo("molly") - - backfillRuns = getBackfillRunsAction.backfillRuns("deep-fryer") - assertThat(backfillRuns.paused_backfills).hasSize(0) - assertThat(backfillRuns.running_backfills).hasSize(1) - - stopBackfillAction.stop(id, StopBackfillRequest()) - - status = getBackfillStatusAction.status(id) - assertThat(status.state).isEqualTo(BackfillState.PAUSED) - assertThat(status.partitions.map { it.state }) - .containsOnly(BackfillState.PAUSED) - assertThat(status.event_logs[0].message).isEqualTo("backfill stopped") - assertThat(status.event_logs[0].user).isEqualTo("molly") - - backfillRuns = getBackfillRunsAction.backfillRuns("deep-fryer") - assertThat(backfillRuns.paused_backfills).hasSize(1) - assertThat(backfillRuns.running_backfills).hasSize(0) + @Test fun `the Backfill is stopped`() { + softAssert { + val status = getBackfillStatusAction.status(createdBackfillRunId) + assertThat(status.state).describedAs("state").isEqualTo(BackfillState.PAUSED) + assertThat(status.partitions.map { it.state }) + .containsOnly(BackfillPartitionState.PAUSED) + + Assertions.assertThat(status.event_logs).first().`is`( + uiEventLogWith( + type = DbEventLog.Type.STATE_CHANGE, + user = "molly", + message = "backfill stopped" + ) + ) + + val backfillRuns = getBackfillRunsAction.backfillRuns("deep-fryer") + assertThat(backfillRuns.paused_backfills).hasSize(1) + assertThat(backfillRuns.running_backfills).hasSize(0) + } + } + } } } @Test fun pagination() { - scope.fakeCaller(service = "deep-fryer") { - configureServiceAction.configureService( - ConfigureServiceRequest.Builder() - .backfills( - listOf( - ConfigureServiceRequest.BackfillData.Builder() - .name("ChickenSandwich") - .description("Description") - .build(), - ConfigureServiceRequest.BackfillData.Builder() - .name("BeefSandwich") - .description("Description") - .build() - ) - ) - .connector_type(Connectors.ENVOY) - .build() - ) - } scope.fakeCaller(user = "molly") { repeat(15) { createBackfillAction.create( @@ -164,34 +201,19 @@ class StartStopBackfillActionTest { .build() ) } - val backfillRuns = getBackfillRunsAction.backfillRuns("deep-fryer") - assertThat(backfillRuns.paused_backfills).hasSize(20) + val backfillRunsPage1 = getBackfillRunsAction.backfillRuns("deep-fryer") + assertThat(backfillRunsPage1.paused_backfills).hasSize(20) val backfillRunsPage2 = getBackfillRunsAction.backfillRuns( "deep-fryer", - pagination_token = backfillRuns.next_pagination_token + pagination_token = backfillRunsPage1.next_pagination_token ) assertThat(backfillRunsPage2.paused_backfills).hasSize(10) } } @Test - fun backfillDoesntExist() { - scope.fakeCaller(service = "deep-fryer") { - configureServiceAction.configureService( - ConfigureServiceRequest.Builder() - .backfills( - listOf( - ConfigureServiceRequest.BackfillData( - "ChickenSandwich", "Description", listOf(), null, - null, false - ) - ) - ) - .connector_type(Connectors.ENVOY) - .build() - ) - } + fun `an incorrect backfill does not exist`() { scope.fakeCaller(user = "molly") { val response = createBackfillAction.create( "deep-fryer", @@ -199,31 +221,16 @@ class StartStopBackfillActionTest { .backfill_name("ChickenSandwich") .build() ) - val id = response.backfill_run_id + val incorrectId = response.backfill_run_id + 1 assertThatThrownBy { - startBackfillAction.start(id + 1, StartBackfillRequest()) + startBackfillAction.start(incorrectId, StartBackfillRequest()) }.isInstanceOf(BadRequestException::class.java) } } @Test fun cantStartRunningBackfill() { - scope.fakeCaller(service = "deep-fryer") { - configureServiceAction.configureService( - ConfigureServiceRequest.Builder() - .backfills( - listOf( - ConfigureServiceRequest.BackfillData( - "ChickenSandwich", "Description", listOf(), null, - null, false - ) - ) - ) - .connector_type(Connectors.ENVOY) - .build() - ) - } scope.fakeCaller(user = "molly") { val response = createBackfillAction.create( "deep-fryer", @@ -248,21 +255,6 @@ class StartStopBackfillActionTest { @Test fun cantStopPausedBackfill() { - scope.fakeCaller(service = "deep-fryer") { - configureServiceAction.configureService( - ConfigureServiceRequest.Builder() - .backfills( - listOf( - ConfigureServiceRequest.BackfillData( - "ChickenSandwich", "Description", listOf(), null, - null, false - ) - ) - ) - .connector_type(Connectors.ENVOY) - .build() - ) - } scope.fakeCaller(user = "molly") { val response = createBackfillAction.create( "deep-fryer", @@ -279,21 +271,6 @@ class StartStopBackfillActionTest { @Test fun cantToggleCompletedBackfill() { - scope.fakeCaller(service = "deep-fryer") { - configureServiceAction.configureService( - ConfigureServiceRequest.Builder() - .backfills( - listOf( - ConfigureServiceRequest.BackfillData( - "ChickenSandwich", "Description", listOf(), null, - null, false - ) - ) - ) - .connector_type(Connectors.ENVOY) - .build() - ) - } scope.fakeCaller(user = "molly") { val response = createBackfillAction.create( "deep-fryer", diff --git a/service/src/test/kotlin/app/cash/backfila/api/CreateAndStartBackfillActionTest.kt b/service/src/test/kotlin/app/cash/backfila/api/CreateAndStartBackfillActionTest.kt index 01169c096..5f43a50d3 100644 --- a/service/src/test/kotlin/app/cash/backfila/api/CreateAndStartBackfillActionTest.kt +++ b/service/src/test/kotlin/app/cash/backfila/api/CreateAndStartBackfillActionTest.kt @@ -8,6 +8,7 @@ import app.cash.backfila.protos.service.ConfigureServiceRequest import app.cash.backfila.protos.service.CreateAndStartBackfillRequest import app.cash.backfila.protos.service.CreateBackfillRequest import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillPartitionState import app.cash.backfila.service.persistence.BackfillState import com.google.inject.Module import javax.inject.Inject @@ -76,9 +77,9 @@ class CreateAndStartBackfillActionTest { assertThat(status.state).isEqualTo(BackfillState.RUNNING) assertThat(status.created_by_user).isEqualTo("deep-fryer") assertThat(status.partitions[0].name).isEqualTo("-80") - assertThat(status.partitions[0].state).isEqualTo(BackfillState.RUNNING) + assertThat(status.partitions[0].state).isEqualTo(BackfillPartitionState.RUNNING) assertThat(status.partitions[1].name).isEqualTo("80-") - assertThat(status.partitions[1].state).isEqualTo(BackfillState.RUNNING) + assertThat(status.partitions[1].state).isEqualTo(BackfillPartitionState.RUNNING) } } } diff --git a/service/src/test/kotlin/app/cash/backfila/client/FakeBackfilaClientServiceClient.kt b/service/src/test/kotlin/app/cash/backfila/client/FakeBackfilaClientServiceClient.kt index c3de99509..807f82dab 100644 --- a/service/src/test/kotlin/app/cash/backfila/client/FakeBackfilaClientServiceClient.kt +++ b/service/src/test/kotlin/app/cash/backfila/client/FakeBackfilaClientServiceClient.kt @@ -1,5 +1,8 @@ package app.cash.backfila.client +import app.cash.backfila.client.FakeBackfilaClientServiceClient.Companion.ResponseBehaviour.FAILURE +import app.cash.backfila.client.FakeBackfilaClientServiceClient.Companion.ResponseBehaviour.SUCCESS +import app.cash.backfila.client.FakeBackfilaClientServiceClient.Companion.ResponseBehaviour.TO_CHANNEL import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse import app.cash.backfila.protos.clientservice.KeyRange @@ -11,6 +14,7 @@ import java.util.LinkedList import javax.inject.Inject import javax.inject.Singleton import kotlinx.coroutines.channels.Channel +import misk.exceptions.BadRequestException import okio.ByteString.Companion.encodeUtf8 @Singleton @@ -25,12 +29,30 @@ class FakeBackfilaClientServiceClient @Inject constructor() : BackfilaClientServ /** Send responses or exceptions here to return them to the runner. */ val runBatchResponses = Channel>() + // Storing our response behavior for our two partitions and the default. This allows one partition + // to make progress ahead of the other. + val eightyDashBehaviour = PartitionBehaviour(TO_CHANNEL, TO_CHANNEL) + val dashEightyBehaviour = PartitionBehaviour(TO_CHANNEL, TO_CHANNEL) + val defaultBehaviour = PartitionBehaviour(TO_CHANNEL, TO_CHANNEL) + + private fun getPartitionBehaviour(partitionName: String): PartitionBehaviour { + return when (partitionName) { + DASH_EIGHTY -> dashEightyBehaviour + EIGHTY_DASH -> eightyDashBehaviour + else -> defaultBehaviour + } + } + fun dontBlockGetNextBatch() { - getNextBatchRangeRequests.close() + eightyDashBehaviour.nextBatch = SUCCESS + dashEightyBehaviour.nextBatch = SUCCESS + defaultBehaviour.nextBatch = SUCCESS } fun dontBlockRunBatch() { - runBatchRequests.close() + eightyDashBehaviour.runBatch = SUCCESS + dashEightyBehaviour.runBatch = SUCCESS + defaultBehaviour.runBatch = SUCCESS } override fun prepareBackfill(request: PrepareBackfillRequest): PrepareBackfillResponse { @@ -41,12 +63,12 @@ class FakeBackfilaClientServiceClient @Inject constructor() : BackfilaClientServ .partitions( listOf( PrepareBackfillResponse.Partition( - "-80", + DASH_EIGHTY, KeyRange("0".encodeUtf8(), "1000".encodeUtf8()), 1_000_000L ), PrepareBackfillResponse.Partition( - "80-", + EIGHTY_DASH, KeyRange("0".encodeUtf8(), "1000".encodeUtf8()), null ) @@ -57,38 +79,69 @@ class FakeBackfilaClientServiceClient @Inject constructor() : BackfilaClientServ override suspend fun getNextBatchRange(request: GetNextBatchRangeRequest): GetNextBatchRangeResponse { - if (!getNextBatchRangeRequests.isClosedForSend) { - getNextBatchRangeRequests.send(request) - return getNextBatchRangeResponses.receive().getOrThrow() + when (getPartitionBehaviour(request.partition_name).nextBatch) { + TO_CHANNEL -> { + getNextBatchRangeRequests.send(request) + return getNextBatchRangeResponses.receive().getOrThrow() + } + FAILURE -> { + throw BadRequestException("Forced Test GetNextBatchRange Failure") + } + SUCCESS -> { + val nextStart = if (request.previous_end_key != null) { + request.previous_end_key.utf8().toLong() + 1 + } else { + request.backfill_range.start.utf8().toLong() + } + var nextEnd = nextStart + request.batch_size - 1 + if (nextEnd > request.backfill_range.end.utf8().toLong()) { + nextEnd = request.backfill_range.end.utf8().toLong() + } + if (nextStart > request.backfill_range.end.utf8().toLong()) { + return GetNextBatchRangeResponse(listOf()) + } + return GetNextBatchRangeResponse( + listOf( + GetNextBatchRangeResponse.Batch( + KeyRange(nextStart.toString().encodeUtf8(), nextEnd.toString().encodeUtf8()), + nextEnd - nextStart + 1, + nextEnd - nextStart + 1 + ) + ) + ) + } } - val nextStart = if (request.previous_end_key != null) { - request.previous_end_key.utf8().toLong() + 1 - } else { - request.backfill_range.start.utf8().toLong() + } + + override suspend fun runBatch(request: RunBatchRequest): RunBatchResponse { + when (getPartitionBehaviour(request.partition_name).runBatch) { + TO_CHANNEL -> { + runBatchRequests.send(request) + return runBatchResponses.receive().getOrThrow() } - var nextEnd = nextStart + request.batch_size - 1 - if (nextEnd > request.backfill_range.end.utf8().toLong()) { - nextEnd = request.backfill_range.end.utf8().toLong() + FAILURE -> { + throw BadRequestException("Forced Test RunBatch Failure") } - if (nextStart > request.backfill_range.end.utf8().toLong()) { - return GetNextBatchRangeResponse(listOf()) + SUCCESS -> { + return RunBatchResponse.Builder().build() } - return GetNextBatchRangeResponse( - listOf( - GetNextBatchRangeResponse.Batch( - KeyRange(nextStart.toString().encodeUtf8(), nextEnd.toString().encodeUtf8()), - nextEnd - nextStart + 1, - nextEnd - nextStart + 1 - ) - ) - ) } + } - override suspend fun runBatch(request: RunBatchRequest): RunBatchResponse { - if (runBatchRequests.isClosedForSend) { - return RunBatchResponse.Builder().build() + companion object { + // Partition constants + val DASH_EIGHTY = "-80" + val EIGHTY_DASH = "80-" + + data class PartitionBehaviour( + var nextBatch: ResponseBehaviour, + var runBatch: ResponseBehaviour + ) + + enum class ResponseBehaviour { + TO_CHANNEL, + SUCCESS, + FAILURE } - runBatchRequests.send(request) - return runBatchResponses.receive().getOrThrow() } } diff --git a/service/src/test/kotlin/app/cash/backfila/service/BackfillRunnerTest.kt b/service/src/test/kotlin/app/cash/backfila/service/BackfillRunnerTest.kt index d18a65ab2..4c4a5c8ea 100644 --- a/service/src/test/kotlin/app/cash/backfila/service/BackfillRunnerTest.kt +++ b/service/src/test/kotlin/app/cash/backfila/service/BackfillRunnerTest.kt @@ -20,6 +20,7 @@ import app.cash.backfila.protos.service.ConfigureServiceRequest import app.cash.backfila.protos.service.CreateBackfillRequest import app.cash.backfila.protos.service.Parameter import app.cash.backfila.service.persistence.BackfilaDb +import app.cash.backfila.service.persistence.BackfillPartitionState import app.cash.backfila.service.persistence.BackfillState import app.cash.backfila.service.runner.BackfillRunner import app.cash.backfila.service.runner.EXTEND_LEASE_PERIOD @@ -66,7 +67,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isNull() - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } runBlockingTestCancellable { @@ -76,7 +77,7 @@ class BackfillRunnerTest { var status = getBackfillStatusAction.status(runner.backfillRunId.id) var partition = status.partitions.find { it.id == runner.partitionId.id }!! assertThat(partition.pkey_cursor).isEqualTo("1000") - assertThat(partition.state).isEqualTo(BackfillState.COMPLETE) + assertThat(partition.state).isEqualTo(BackfillPartitionState.COMPLETE) // Not all partitions complete. assertThat(status.state).isEqualTo(BackfillState.RUNNING) @@ -84,7 +85,7 @@ class BackfillRunnerTest { partition = status.partitions.find { it.id == runner.partitionId.id }!! assertThat(partition.pkey_cursor).isNull() - assertThat(partition.state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.state).isEqualTo(BackfillPartitionState.RUNNING) runBlockingTestCancellable { runner.start(this) @@ -94,7 +95,7 @@ class BackfillRunnerTest { // All partitions complete. assertThat(status.state).isEqualTo(BackfillState.COMPLETE) partition = status.partitions.find { it.id == runner.partitionId.id }!! - assertThat(partition.state).isEqualTo(BackfillState.COMPLETE) + assertThat(partition.state).isEqualTo(BackfillPartitionState.COMPLETE) assertThat(partition.pkey_cursor).isEqualTo("1000") assertThat(partition.precomputing_pkey_cursor).isEqualTo("1000") assertThat(partition.precomputing_done).isEqualTo(true) @@ -186,7 +187,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isEqualTo("99".encodeUtf8()) - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } } @@ -227,7 +228,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isEqualTo("99".encodeUtf8()) - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } } @@ -298,7 +299,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isEqualTo("99".encodeUtf8()) - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } // After the RunBatch completed, a getNextBatch request is buffered. @@ -341,7 +342,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isNull() - assertThat(partition.run_state).isEqualTo(BackfillState.PAUSED) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.PAUSED) } } @@ -374,7 +375,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isNull() - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } fakeBackfilaClientServiceClient.runBatchResponses.send( @@ -386,7 +387,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isEqualTo("199".encodeUtf8()) - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } } @@ -421,7 +422,7 @@ class BackfillRunnerTest { val partition = status.partitions.find { it.id == runner.partitionId.id }!! // Cursor not updated, backfill paused assertThat(partition.pkey_cursor).isNull() - assertThat(partition.state).isEqualTo(BackfillState.PAUSED) + assertThat(partition.state).isEqualTo(BackfillPartitionState.PAUSED) assertThat(status.event_logs[0].message).isEqualTo( "error running batch [0, 99], RPC error after 0ms. paused backfill due to 2 consecutive errors" @@ -463,7 +464,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isNull() - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } fakeBackfilaClientServiceClient.runBatchResponses.send( @@ -476,7 +477,7 @@ class BackfillRunnerTest { val partition = status.partitions.find { it.id == runner.partitionId.id }!! // Cursor updated assertThat(partition.pkey_cursor).isEqualTo("199") - assertThat(partition.state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.state).isEqualTo(BackfillPartitionState.RUNNING) assertThat(status.event_logs[0].message).isEqualTo( "error running batch [0, 99], client exception after 0ms. backing off for 1000ms" @@ -511,7 +512,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isEqualTo("99".encodeUtf8()) - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } } @@ -544,7 +545,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isEqualTo("199".encodeUtf8()) - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } } @@ -574,7 +575,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isEqualTo("199".encodeUtf8()) - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } } @@ -617,7 +618,7 @@ class BackfillRunnerTest { transacter.transaction { session -> val partition = session.load(runner.partitionId) assertThat(partition.pkey_cursor).isEqualTo("199".encodeUtf8()) - assertThat(partition.run_state).isEqualTo(BackfillState.RUNNING) + assertThat(partition.partition_state).isEqualTo(BackfillPartitionState.RUNNING) } }