Skip to content

Commit

Permalink
Only remove counters if instance state is current
Browse files Browse the repository at this point in the history
Add a clause to the provisioning counter deletion query that checks (atomically
via sub-selection) that the instance of interest has not advanced past a
particular state generation number. This prevents a TOCTTOU bug that can cause
a record to be deleted when an instance is running:

- Sled agent tries to send an update stopping an instance; this gets stuck and
  the attempt times out
- Sled agent tries again and the attempt succeeds; this deletes the provisioning
  counters and allows the instance to start somewhere else
- The instance indeed starts somewhere else, taking new charges
- The original attempt finally makes progress again and proceeds to delete the
  newly-allocated charges

Also fix an ordering bug in the start saga: provisioning counters should only
be charged after the saga has passed through the "only one start attempt at a
time" interlock. While the old ordering didn't allow counters to be leaked
(parallel start sagas that lose the race to set the instance's Propolis ID will
unwind, which would have released the counters), this did allow multiple
parallel start sagas to take multiple charges for the same instance.
  • Loading branch information
gjcolombo committed Oct 13, 2023
1 parent 9dfaeff commit 16b7fe6
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 66 deletions.
5 changes: 5 additions & 0 deletions nexus/db-model/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,11 @@ table! {
}
}

allow_tables_to_appear_in_same_query! {
virtual_provisioning_resource,
instance
}

table! {
zpool (id) {
id -> Uuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,27 @@ impl DataStore {
Ok(provisions)
}

/// Transitively updates all CPU/RAM provisions from project -> fleet.
/// Transitively removes the CPU and memory charges for an instance from the
/// instance's project, silo, and fleet, provided that the instance's state
/// generation is less than `max_instance_gen`. This allows a caller who is
/// about to apply generation G to an instance to avoid deleting resources
/// if its update was superseded.
pub async fn virtual_provisioning_collection_delete_instance(
&self,
opctx: &OpContext,
id: Uuid,
project_id: Uuid,
cpus_diff: i64,
ram_diff: ByteCount,
max_instance_gen: i64,
) -> Result<Vec<VirtualProvisioningCollection>, Error> {
let provisions =
VirtualProvisioningCollectionUpdate::new_delete_instance(
id, cpus_diff, ram_diff, project_id,
id,
max_instance_gen,
cpus_diff,
ram_diff,
project_id,
)
.get_results_async(&*self.pool_connection_authorized(opctx).await?)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,20 +368,49 @@ impl VirtualProvisioningCollectionUpdate {

pub fn new_delete_instance(
id: uuid::Uuid,
max_instance_gen: i64,
cpus_diff: i64,
ram_diff: ByteCount,
project_id: uuid::Uuid,
) -> Self {
use crate::db::schema::instance::dsl as instance_dsl;
use virtual_provisioning_collection::dsl as collection_dsl;
use virtual_provisioning_resource::dsl as resource_dsl;

Self::apply_update(
// We should delete the record if it exists.
DoUpdate::new_for_delete(id),
// The query to actually delete the record.
//
// The filter condition here ensures that the provisioning record is
// only deleted if the corresponding instance has a generation
// number less than the supplied `max_instance_gen`. This allows a
// caller that is about to apply an instance update that will stop
// the instance and that bears generation G to avoid deleting
// resources if the instance generation was already advanced to or
// past G.
//
// If the relevant instance ID is not in the database, then some
// other operation must have ensured the instance was previously
// stopped (because that's the only way it could have been deleted),
// and that operation should have cleaned up the resources already,
// in which case there's nothing to do here.
//
// There is an additional "direct" filter on the target resource ID
// to avoid a full scan of the resource table.
UnreferenceableSubquery(
diesel::delete(resource_dsl::virtual_provisioning_resource)
.filter(resource_dsl::id.eq(id))
.filter(
resource_dsl::id.nullable().eq(instance_dsl::instance
.filter(instance_dsl::id.eq(id))
.filter(
instance_dsl::state_generation
.lt(max_instance_gen),
)
.select(instance_dsl::id)
.single_value()),
)
.returning(virtual_provisioning_resource::all_columns),
),
// Within this project, silo, fleet...
Expand Down
12 changes: 4 additions & 8 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,27 +1313,23 @@ impl super::Nexus {
)
.await?;

// If the supplied instance state is at least as new as what's currently
// in the database, and it indicates the instance has no active VMM, the
// instance has been stopped and should have its virtual provisioning
// charges released.
// If the supplied instance state indicates that the instance no longer
// has an active VMM, attempt to delete the virtual provisioning record
//
// As with updating networking state, this must be done before
// committing the new runtime state to the database: once the DB is
// written, a new start saga can arrive and start the instance, which
// will try to create its own virtual provisioning charges, which will
// race with this operation.
if new_runtime_state.instance_state.propolis_id.is_none()
&& new_runtime_state.instance_state.gen
>= db_instance.runtime().gen.0
{
if new_runtime_state.instance_state.propolis_id.is_none() {
self.db_datastore
.virtual_provisioning_collection_delete_instance(
opctx,
*instance_id,
db_instance.project_id,
i64::from(db_instance.ncpus.0 .0),
db_instance.memory,
(&new_runtime_state.instance_state.gen).into(),
)
.await?;
}
Expand Down
122 changes: 66 additions & 56 deletions nexus/src/app/sagas/instance_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ declare_saga_actions! {
+ sis_alloc_propolis_ip
}

ADD_VIRTUAL_RESOURCES -> "virtual_resources" {
+ sis_account_virtual_resources
- sis_account_virtual_resources_undo
}

CREATE_VMM_RECORD -> "vmm_record" {
+ sis_create_vmm_record
- sis_destroy_vmm_record
Expand All @@ -57,6 +52,11 @@ declare_saga_actions! {
- sis_move_to_starting_undo
}

ADD_VIRTUAL_RESOURCES -> "virtual_resources" {
+ sis_account_virtual_resources
- sis_account_virtual_resources_undo
}

// TODO(#3879) This can be replaced with an action that triggers the NAT RPW
// once such an RPW is available.
DPD_ENSURE -> "dpd_ensure" {
Expand Down Expand Up @@ -101,9 +101,9 @@ impl NexusSaga for SagaInstanceStart {

builder.append(alloc_server_action());
builder.append(alloc_propolis_ip_action());
builder.append(add_virtual_resources_action());
builder.append(create_vmm_record_action());
builder.append(mark_as_starting_action());
builder.append(add_virtual_resources_action());
builder.append(dpd_ensure_action());
builder.append(v2p_ensure_action());
builder.append(ensure_registered_action());
Expand Down Expand Up @@ -155,56 +155,6 @@ async fn sis_alloc_propolis_ip(
allocate_sled_ipv6(&opctx, sagactx.user_data().datastore(), sled_uuid).await
}

async fn sis_account_virtual_resources(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let params = sagactx.saga_params::<Params>()?;
let instance_id = params.db_instance.id();

let opctx = crate::context::op_context_for_saga_action(
&sagactx,
&params.serialized_authn,
);
osagactx
.datastore()
.virtual_provisioning_collection_insert_instance(
&opctx,
instance_id,
params.db_instance.project_id,
i64::from(params.db_instance.ncpus.0 .0),
nexus_db_model::ByteCount(*params.db_instance.memory),
)
.await
.map_err(ActionError::action_failed)?;
Ok(())
}

async fn sis_account_virtual_resources_undo(
sagactx: NexusActionContext,
) -> Result<(), anyhow::Error> {
let osagactx = sagactx.user_data();
let params = sagactx.saga_params::<Params>()?;
let instance_id = params.db_instance.id();

let opctx = crate::context::op_context_for_saga_action(
&sagactx,
&params.serialized_authn,
);
osagactx
.datastore()
.virtual_provisioning_collection_delete_instance(
&opctx,
instance_id,
params.db_instance.project_id,
i64::from(params.db_instance.ncpus.0 .0),
nexus_db_model::ByteCount(*params.db_instance.memory),
)
.await
.map_err(ActionError::action_failed)?;
Ok(())
}

async fn sis_create_vmm_record(
sagactx: NexusActionContext,
) -> Result<db::model::Vmm, ActionError> {
Expand Down Expand Up @@ -361,6 +311,66 @@ async fn sis_move_to_starting_undo(
Ok(())
}

async fn sis_account_virtual_resources(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let params = sagactx.saga_params::<Params>()?;
let instance_id = params.db_instance.id();

let opctx = crate::context::op_context_for_saga_action(
&sagactx,
&params.serialized_authn,
);

osagactx
.datastore()
.virtual_provisioning_collection_insert_instance(
&opctx,
instance_id,
params.db_instance.project_id,
i64::from(params.db_instance.ncpus.0 .0),
nexus_db_model::ByteCount(*params.db_instance.memory),
)
.await
.map_err(ActionError::action_failed)?;
Ok(())
}

async fn sis_account_virtual_resources_undo(
sagactx: NexusActionContext,
) -> Result<(), anyhow::Error> {
let osagactx = sagactx.user_data();
let params = sagactx.saga_params::<Params>()?;
let instance_id = params.db_instance.id();

let opctx = crate::context::op_context_for_saga_action(
&sagactx,
&params.serialized_authn,
);

let started_record =
sagactx.lookup::<db::model::Instance>("started_record")?;

osagactx
.datastore()
.virtual_provisioning_collection_delete_instance(
&opctx,
instance_id,
params.db_instance.project_id,
i64::from(params.db_instance.ncpus.0 .0),
nexus_db_model::ByteCount(*params.db_instance.memory),
// Use the next instance generation number as the generation limit
// to ensure the provisioning counters are released. (The "mark as
// starting" undo step will "publish" this new state generation when
// it moves the instance back to Stopped.)
(&started_record.runtime().gen.next()).into(),
)
.await
.map_err(ActionError::action_failed)?;
Ok(())
}

async fn sis_dpd_ensure(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
Expand Down

0 comments on commit 16b7fe6

Please sign in to comment.