Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a way to delete backfill runs #207

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Dependencies.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
object Dependencies {
val apacheCommonsLang3 = "org.apache.commons:commons-lang3:3.11"
val assertj = "org.assertj:assertj-core:3.16.1"
val assertj = "org.assertj:assertj-core:3.20.2"
val awsDynamodb = "com.amazonaws:aws-java-sdk-dynamodb:1.11.774"
val flywayGradleBuildscriptDep = "gradle.plugin.com.boxfuse.client:flyway-release:5.0.2"
val guava = "com.google.guava:guava:30.0-jre"
Expand Down
36 changes: 20 additions & 16 deletions service/src/main/kotlin/app/cash/backfila/BackfillCreator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
import app.cash.backfila.protos.clientservice.PrepareBackfillResponse
import app.cash.backfila.protos.service.CreateBackfillRequest
import app.cash.backfila.service.persistence.BackfilaDb
import app.cash.backfila.service.persistence.BackfillPartitionState
import app.cash.backfila.service.persistence.BackfillState
import app.cash.backfila.service.persistence.DbBackfillRun
import app.cash.backfila.service.persistence.DbRegisteredBackfill
Expand Down Expand Up @@ -84,12 +85,13 @@ class BackfillCreator @Inject constructor(
)
session.save(backfillRun)

check(backfillRun.state == BackfillState.PAUSED)
for (partition in partitions) {
val dbRunPartition = DbRunPartition(
backfillRun.id,
partition.partition_name,
partition.backfill_range ?: KeyRange.Builder().build(),
backfillRun.state,
BackfillPartitionState.PAUSED,
partition.estimated_record_count
)
session.save(dbRunPartition)
Expand Down Expand Up @@ -125,21 +127,7 @@ class BackfillCreator @Inject constructor(
logger.info(e) { "PrepareBackfill on `$service` failed" }
throw BadRequestException("PrepareBackfill on `$service` failed: ${e.message}", e)
}

val partitions = prepareBackfillResponse.partitions
if (partitions.isEmpty()) {
throw BadRequestException("PrepareBackfill returned no partitions")
}
if (partitions.any { it.partition_name == null }) {
throw BadRequestException("PrepareBackfill returned unnamed partitions")
}
if (partitions.distinctBy { it.partition_name }.size != partitions.size) {
throw BadRequestException(
"PrepareBackfill did not return distinct partition names:" +
" ${partitions.map { it.partition_name }}"
)
}

prepareBackfillResponse.validate()
return prepareBackfillResponse
}

Expand Down Expand Up @@ -181,5 +169,21 @@ class BackfillCreator @Inject constructor(

companion object {
private val logger = getLogger<BackfillCreator>()

fun PrepareBackfillResponse.validate() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why extension instead of a method that takes it in?

val partitions = this.partitions
if (partitions.isEmpty()) {
throw BadRequestException("PrepareBackfill returned no partitions")
}
if (partitions.any { it.partition_name == null }) {
throw BadRequestException("PrepareBackfill returned unnamed partitions")
}
if (partitions.distinctBy { it.partition_name }.size != partitions.size) {
throw BadRequestException(
"PrepareBackfill did not return distinct partition names:" +
" ${partitions.map { it.partition_name }}"
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app.cash.backfila.dashboard
import app.cash.backfila.service.SlackHelper
import app.cash.backfila.service.persistence.BackfilaDb
import app.cash.backfila.service.persistence.BackfillState
import app.cash.backfila.service.persistence.BackfillState.CANCELLED
import app.cash.backfila.service.persistence.BackfillState.PAUSED
import app.cash.backfila.service.persistence.BackfillState.RUNNING
import app.cash.backfila.service.persistence.DbBackfillRun
Expand All @@ -25,7 +26,10 @@ class BackfillStateToggler @Inject constructor(
val requiredCurrentState = when (desiredState) {
PAUSED -> RUNNING
RUNNING -> PAUSED
else -> throw IllegalArgumentException("can only toggle to RUNNING or PAUSED")
CANCELLED -> PAUSED
else -> throw IllegalArgumentException(
"can only toggle between RUNNING and PAUSED or cancel a PAUSED run. "
)
}

transacter.transaction { session ->
Expand All @@ -46,22 +50,27 @@ class BackfillStateToggler @Inject constructor(
}
run.setState(session, queryFactory, desiredState)

val startedOrStopped = if (desiredState == RUNNING) "started" else "stopped"
val action = when (desiredState) {
PAUSED -> "stopped"
RUNNING -> "started"
CANCELLED -> "cancelled"
else -> desiredState.name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not reachable right? throw exception instead?

}
session.save(
DbEventLog(
run.id,
partition_id = null,
user = caller.principal,
type = DbEventLog.Type.STATE_CHANGE,
message = "backfill $startedOrStopped"
message = "backfill $action"
)
)
}

if (desiredState == RUNNING) {
slackHelper.runStarted(Id(id), caller.principal)
} else {
slackHelper.runPaused(Id(id), caller.principal)
when (desiredState) {
RUNNING -> slackHelper.runStarted(Id(id), caller.principal)
PAUSED -> slackHelper.runPaused(Id(id), caller.principal)
CANCELLED -> slackHelper.runCancelled(Id(id), caller.principal)
Copy link
Collaborator

@shellderp shellderp Aug 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you really want to notify? I wouldn't think anyone cares

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package app.cash.backfila.dashboard

import app.cash.backfila.service.persistence.BackfillState
import javax.inject.Inject
import misk.MiskCaller
import misk.scope.ActionScoped
import misk.security.authz.Authenticated
import misk.web.PathParam
import misk.web.Post
import misk.web.RequestBody
import misk.web.RequestContentType
import misk.web.ResponseContentType
import misk.web.actions.WebAction
import misk.web.mediatype.MediaTypes
import wisp.logging.getLogger

class CancelBackfillRequest
class CancelBackfillResponse

class CancelBackfillAction @Inject constructor(
private val caller: @JvmSuppressWildcards ActionScoped<MiskCaller?>,
private val backfillStateToggler: BackfillStateToggler
) : WebAction {

@Post("/backfills/{id}/cancel")
@RequestContentType(MediaTypes.APPLICATION_JSON)
@ResponseContentType(MediaTypes.APPLICATION_JSON)
// TODO allow any user
@Authenticated(capabilities = ["users"])
fun cancel(
@PathParam id: Long,
@RequestBody request: CancelBackfillRequest
): CancelBackfillResponse {
// TODO check user has permissions for this service with access api
logger.info { "Canceling backfill $id by ${caller.get()?.user}" }
backfillStateToggler.toggleRunningState(id, caller.get()!!, BackfillState.CANCELLED)
return CancelBackfillResponse()
}

companion object {
private val logger = getLogger<CancelBackfillAction>()
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package app.cash.backfila.dashboard

import app.cash.backfila.BackfillCreator.Companion.validate
import app.cash.backfila.client.ConnectorProvider
import app.cash.backfila.protos.clientservice.KeyRange
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
import app.cash.backfila.protos.clientservice.PrepareBackfillResponse
import app.cash.backfila.service.persistence.BackfilaDb
import app.cash.backfila.service.persistence.BackfillState
import app.cash.backfila.service.persistence.BackfillState.Companion.getPartitionState
import app.cash.backfila.service.persistence.DbBackfillRun
import app.cash.backfila.service.persistence.DbRegisteredBackfill
import app.cash.backfila.service.persistence.DbRunPartition
Expand Down Expand Up @@ -138,13 +140,14 @@ class CloneBackfillAction @Inject constructor(
)
session.save(backfillRun)

check(backfillRun.state == BackfillState.PAUSED)
if (request.range_clone_type == RangeCloneType.NEW) {
for (partition in partitions) {
val dbRunPartition = DbRunPartition(
backfillRun.id,
partition.partition_name,
partition.backfill_range ?: KeyRange.Builder().build(),
backfillRun.state,
backfillRun.state.getPartitionState(),
partition.estimated_record_count
)
session.save(dbRunPartition)
Expand All @@ -161,12 +164,13 @@ class CloneBackfillAction @Inject constructor(
)
}

check(backfillRun.state == BackfillState.PAUSED)
for (sourcePartition in sourcePartitions) {
val dbRunPartition = DbRunPartition(
backfillRun.id,
sourcePartition.partition_name,
sourcePartition.backfillRange(),
backfillRun.state,
backfillRun.state.getPartitionState(),
sourcePartition.estimated_record_count
)
// Copy the cursor if continuing, otherwise just leave blank to start from beginning.
Expand Down Expand Up @@ -203,19 +207,7 @@ class CloneBackfillAction @Inject constructor(
logger.info(e) { "PrepareBackfill on `${dbData.serviceName}` failed" }
throw BadRequestException("PrepareBackfill on `${dbData.serviceName}` failed: ${e.message}", e)
}
val partitions = prepareBackfillResponse.partitions
if (partitions.isEmpty()) {
throw BadRequestException("PrepareBackfill returned no partitions")
}
if (partitions.any { it.partition_name == null }) {
throw BadRequestException("PrepareBackfill returned unnamed partitions")
}
if (partitions.distinctBy { it.partition_name }.size != partitions.size) {
throw BadRequestException(
"PrepareBackfill did not return distinct partition names:" +
" ${partitions.map { it.partition_name }}"
)
}
prepareBackfillResponse.validate()
return prepareBackfillResponse
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app.cash.backfila.dashboard

import app.cash.backfila.service.persistence.BackfilaDb
import app.cash.backfila.service.persistence.BackfillPartitionState
import app.cash.backfila.service.persistence.BackfillState
import app.cash.backfila.service.persistence.DbBackfillRun
import app.cash.backfila.service.persistence.DbEventLog
Expand All @@ -26,7 +27,7 @@ import misk.web.mediatype.MediaTypes
data class UiPartition(
val id: Long,
val name: String,
val state: BackfillState,
val state: BackfillPartitionState,
val pkey_cursor: String?,
val pkey_start: String?,
val pkey_end: String?,
Expand Down Expand Up @@ -105,7 +106,7 @@ class GetBackfillStatusAction @Inject constructor(
UiPartition(
partition.id.id,
partition.partition_name,
partition.run_state,
partition.partition_state,
partition.pkey_cursor?.utf8(),
partition.pkey_range_start?.utf8(),
partition.pkey_range_end?.utf8(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ class SlackHelper @Inject constructor(
slackClient.postMessage("Backfila", ":backfila:", message, channel)
}

fun runCancelled(id: Id<DbBackfillRun>, user: String) {
val (message, channel) = transacter.transaction { session ->
val run = session.load(id)
val message = ":backfila_cancel:${dryRunEmoji(run)} ${nameAndId(run)} cancelled by @$user"
message to run.service.slack_channel
}
slackClient.postMessage("Backfila", ":backfila:", message, channel)
}

fun runErrored(id: Id<DbBackfillRun>) {
val (message, channel) = transacter.transaction { session ->
val run = session.load(id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package app.cash.backfila.service.persistence

enum class BackfillPartitionState {
PAUSED, // A resumable backfill partition that is not currently meant to be running
RUNNING, // A backfill partition that is allowed to run.
COMPLETE, // A completed partition, this is a final non-resumable state.
STALE, // A partition that is no longer relevant, this is a final non-resumable state (possibly a split partition)
CANCELLED; // A partition that has been manually cancelled, this is a final non-resumable state.

companion object {
val FINAL_STATES = setOf(STALE, CANCELLED, COMPLETE)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
package app.cash.backfila.service.persistence

enum class BackfillState {
PAUSED,
RUNNING,
COMPLETE
PAUSED, // A resumable backfill that is not currently meant to be running
RUNNING, // A backfill that is allowed to run.
COMPLETE, // A completed backfill, this is a final non-resumable state.
CANCELLED; // A backfill that has been manually cancelled, this is a final non-resumable state.

companion object {
val FINAL_STATES = setOf(CANCELLED, COMPLETE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TERMINAL might be a better word

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

however, either is fine


/**
* When the Backfill state changes modify the underlying partitions to these corresponding states.
*/
fun BackfillState.getPartitionState() = when (this) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toPartitionState

PAUSED -> BackfillPartitionState.PAUSED
RUNNING -> BackfillPartitionState.RUNNING
CANCELLED -> BackfillPartitionState.CANCELLED
COMPLETE -> BackfillPartitionState.COMPLETE
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package app.cash.backfila.service.persistence

import app.cash.backfila.service.persistence.BackfillState.Companion.getPartitionState
import com.google.common.base.Preconditions.checkState
import java.time.Instant
import javax.persistence.Column
Expand Down Expand Up @@ -130,18 +131,18 @@ class DbBackfillRun() : DbUnsharded<DbBackfillRun>, DbTimestampedEntity {
.list(session)

fun setState(session: Session, queryFactory: Query.Factory, state: BackfillState) {
// State can't be changed after being completed.
checkState(this.state != BackfillState.COMPLETE)
// Backfills in final states cannot change.
checkState(!BackfillState.FINAL_STATES.contains(this.state))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could instead do

this.state !in BackfillState.FINAL_STATES

this.state = state
// Set the state of all the partitions that are not complete
val query = session.hibernateSession.createQuery(
"update DbRunPartition " +
"set run_state = :newState, version = version + 1 " +
"where backfill_run_id = :runId and run_state <> :completed"
"set partition_state = :newPartitionState, version = version + 1 " +
"where backfill_run_id = :runId and partition_state not in ( :finalPartitionStates )"
)
query.setParameter("runId", id)
query.setParameter("newState", state)
query.setParameter("completed", BackfillState.COMPLETE)
query.setParameter("newPartitionState", state.getPartitionState())
query.setParameterList("finalPartitionStates", BackfillPartitionState.FINAL_STATES)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool this works well. I was worried it would harder to read

query.executeUpdate()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DbRunPartition() : DbUnsharded<DbRunPartition>, DbTimestampedEntity {
*/
@Column(nullable = false)
@Enumerated(EnumType.STRING)
lateinit var run_state: BackfillState
lateinit var partition_state: BackfillPartitionState

@Column
var lease_token: String? = null
Expand Down Expand Up @@ -120,14 +120,14 @@ class DbRunPartition() : DbUnsharded<DbRunPartition>, DbTimestampedEntity {
backfill_run_id: Id<DbBackfillRun>,
partition_name: String,
backfill_range: KeyRange,
run_state: BackfillState,
run_state: BackfillPartitionState,
estimated_record_count: Long?
) : this() {
this.backfill_run_id = backfill_run_id
this.partition_name = partition_name
this.pkey_range_start = backfill_range.start
this.pkey_range_end = backfill_range.end
this.run_state = run_state
this.partition_state = run_state
this.lease_expires_at = Instant.ofEpochSecond(1L)
this.estimated_record_count = estimated_record_count
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ interface RunPartitionQuery : Query<DbRunPartition> {
@Constraint("backfill_run_id", Operator.IN)
fun backfillRunIdIn(backfillRunIds: Collection<Id<DbBackfillRun>>): RunPartitionQuery

@Constraint("run_state")
fun runState(runState: BackfillState): RunPartitionQuery
@Constraint("partition_state")
fun partitionState(runState: BackfillPartitionState): RunPartitionQuery

@Constraint("lease_expires_at", Operator.LT)
fun leaseExpiresAtBefore(time: Instant): RunPartitionQuery
Expand Down
Loading