Skip to content

Commit

Permalink
CTE is not necessary for the INSERT with two foreign keys
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco committed Oct 30, 2023
1 parent 64ed053 commit 7b21243
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 217 deletions.
2 changes: 1 addition & 1 deletion nexus/db-model/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ joinable!(ip_pool_range -> ip_pool (ip_pool_id));

allow_tables_to_appear_in_same_query!(inv_collection, inv_collection_error);
joinable!(inv_collection_error -> inv_collection (inv_collection_id));
allow_tables_to_appear_in_same_query!(sw_caboose, inv_caboose);
allow_tables_to_appear_in_same_query!(hw_baseboard_id, sw_caboose, inv_caboose);

allow_tables_to_appear_in_same_query!(
dataset,
Expand Down
351 changes: 135 additions & 216 deletions nexus/db-queries/src/db/datastore/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@ use anyhow::Context;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use async_bb8_diesel::AsyncSimpleConnection;
use chrono::DateTime;
use chrono::Utc;
use diesel::expression::SelectableHelper;
use diesel::sql_types;
use diesel::sql_types::Nullable;
use diesel::Column;
use diesel::BoolExpressionMethods;
use diesel::ExpressionMethods;
use diesel::IntoSql;
use diesel::JoinOnDsl;
use diesel::NullableExpressionMethods;
use diesel::QueryDsl;
use diesel::QuerySource;
use diesel::Table;
use futures::future::BoxFuture;
use futures::FutureExt;
use nexus_db_model::CabooseWhich;
use nexus_db_model::CabooseWhichEnum;
use nexus_db_model::HwBaseboardId;
use nexus_db_model::HwPowerState;
Expand All @@ -45,8 +41,6 @@ use nexus_db_model::InvServiceProcessor;
use nexus_db_model::SpType;
use nexus_db_model::SpTypeEnum;
use nexus_db_model::SwCaboose;
use nexus_types::inventory::BaseboardId;
use nexus_types::inventory::CabooseFound;
use nexus_types::inventory::Collection;
use omicron_common::api::external::Error;
use omicron_common::api::external::InternalContext;
Expand Down Expand Up @@ -336,19 +330,142 @@ impl DataStore {

// Insert rows for the cabooses that we found. Like service
// processors and roots of trust, we do this using INSERT INTO ...
// SELECT. But because there are two foreign keys, we need a more
// complicated `SELECT`, which requires using a CTE.
// SELECT. This one's a little more complicated because there are
// two foreign keys. Concretely, we have these three tables:
//
// - `hw_baseboard` with an "id" primary key and lookup columns
// "part_number" and "serial_number"
// - `sw_caboose` with an "id" primary key and lookup columns
// "board", "git_commit", "name", and "version"
// - `inv_caboose` with foreign keys "hw_baseboard_id",
// "sw_caboose_id", and various other columns
//
// We want to INSERT INTO `inv_caboose` a row with:
//
// - hw_baseboard_id (foreign key) the result of looking up an
// hw_baseboard row by a specific part number and serial number
//
// - sw_caboose_id (foreign key) the result of looking up a
// specific sw_caboose row by board, git_commit, name, and version
//
// - the other columns being literals
//
// To achieve this, we're going to generate something like:
//
// INSERT INTO
// inv_caboose (
// hw_baseboard_id,
// sw_caboose_id,
// inv_collection_id,
// time_collected,
// source,
// which,
// )
// SELECT (
// hw_baseboard_id.id,
// sw_caboose.id,
// ... /* literal collection id */
// ... /* literal time collected */
// ... /* literal source */
// ... /* literal 'which' */
// )
// FROM
// hw_baseboard
// INNER JOIN
// sw_caboose
// ON hw_baseboard.part_number = ...
// AND hw_baseboard.serial_number = ...
// AND sw_caboose.board = ...
// AND sw_caboose.git_commit = ...
// AND sw_caboose.name = ...
// AND sw_caboose.version = ...;
//
// Again, the whole point is to avoid back-and-forth between the
// client and the database. Those back-and-forth interactions can
// significantly increase latency and the probability of transaction
// conflicts. See RFD 192 for details. (Unfortunately, we still
// _are_ going back and forth here to issue each of these queries.
// But that's an artifact of the interface we currently have for
// sending queries. It should be possible to send all of these in
// one batch.
for (which, tree) in &collection.cabooses_found {
let db_which = nexus_db_model::CabooseWhich::from(*which);
for (baseboard_id, found_caboose) in tree {
InvCabooseInsert::new(
collection_id,
baseboard_id,
found_caboose,
db_which,
)
.execute_async(&conn)
.await?;
use db::schema::hw_baseboard_id::dsl as dsl_baseboard_id;
use db::schema::inv_caboose::dsl as dsl_inv_caboose;
use db::schema::sw_caboose::dsl as dsl_sw_caboose;

let selection = db::schema::hw_baseboard_id::table
.inner_join(
db::schema::sw_caboose::table.on(
dsl_baseboard_id::part_number
.eq(baseboard_id.part_number.clone())
.and(
dsl_baseboard_id::serial_number.eq(
baseboard_id.serial_number.clone(),
),
)
.and(dsl_sw_caboose::board.eq(
found_caboose.caboose.board.clone(),
))
.and(
dsl_sw_caboose::git_commit.eq(
found_caboose
.caboose
.git_commit
.clone(),
),
)
.and(
dsl_sw_caboose::name.eq(found_caboose
.caboose
.name
.clone()),
)
.and(dsl_sw_caboose::version.eq(
found_caboose.caboose.version.clone(),
)),
),
)
.select((
dsl_baseboard_id::id,
dsl_sw_caboose::id,
collection_id.into_sql::<diesel::sql_types::Uuid>(),
found_caboose
.time_collected
.into_sql::<diesel::sql_types::Timestamptz>(),
found_caboose
.source
.clone()
.into_sql::<diesel::sql_types::Text>(),
db_which.into_sql::<CabooseWhichEnum>(),
));

let _ = diesel::insert_into(db::schema::inv_caboose::table)
.values(selection)
.into_columns((
dsl_inv_caboose::hw_baseboard_id,
dsl_inv_caboose::sw_caboose_id,
dsl_inv_caboose::inv_collection_id,
dsl_inv_caboose::time_collected,
dsl_inv_caboose::source,
dsl_inv_caboose::which,
))
.execute_async(&conn)
.await?;

// See the comments above. The same applies here. If you
// update the statement below because the schema for
// `inv_caboose` has changed, be sure to update the code
// above, too!
let (
_hw_baseboard_id,
_sw_caboose_id,
_inv_collection_id,
_time_collected,
_source,
_which,
) = dsl_inv_caboose::inv_caboose::all_columns();
}
}

Expand Down Expand Up @@ -732,204 +849,6 @@ impl DataStore {
/// database. Those back-and-forth interactions can significantly increase
/// latency and the probability of transaction conflicts. See RFD 192 for
/// details.
#[must_use = "Queries must be executed"]
struct InvCabooseInsert {
// fields used to look up baseboard id
baseboard_part_number: String,
baseboard_serial_number: String,

// fields used to look up caboose id
caboose_board: String,
caboose_git_commit: String,
caboose_name: String,
caboose_version: String,

// literal values for the rest of the inv_caboose columns
collection_id: Uuid,
time_collected: DateTime<Utc>,
source: String,
which: CabooseWhich,

// These are Diesel structures representing table names in the "from" or
// "into" parts of queries (e.g., "SELECT FROM tablename" or "INSERT INTO
// tablename"). We need this in `walk_ast()` below, but they must outlive
// `walk_ast()`, so they need to be created ahead of time.
//
// TODO-cleanup These Diesel-internal types are nasty. It's not clear how
// else to do this.
from_hw_baseboard_id:
diesel::internal::table_macro::StaticQueryFragmentInstance<
db::schema::hw_baseboard_id::table,
>,
from_sw_caboose: diesel::internal::table_macro::StaticQueryFragmentInstance<
db::schema::sw_caboose::table,
>,
into_inv_caboose:
diesel::internal::table_macro::StaticQueryFragmentInstance<
db::schema::inv_caboose::table,
>,
}

impl InvCabooseInsert {
pub fn new(
collection_id: Uuid,
baseboard: &BaseboardId,
found_caboose: &CabooseFound,
which: CabooseWhich,
) -> InvCabooseInsert {
InvCabooseInsert {
baseboard_part_number: baseboard.part_number.clone(),
baseboard_serial_number: baseboard.serial_number.clone(),
caboose_board: found_caboose.caboose.board.clone(),
caboose_git_commit: found_caboose.caboose.git_commit.clone(),
caboose_name: found_caboose.caboose.name.clone(),
caboose_version: found_caboose.caboose.version.clone(),
collection_id,
time_collected: found_caboose.time_collected,
source: found_caboose.source.clone(),
which,
from_hw_baseboard_id: db::schema::hw_baseboard_id::table
.from_clause(),
from_sw_caboose: db::schema::sw_caboose::table.from_clause(),
// It sounds a little goofy to use "from_clause()" when this is
// really part of an INSERT. But really this just produces the
// table name as an identifier. This is the same for both "FROM"
// and "INSERT" clauses. And diesel internally does the same thing
// here (see the type of `InsertStatement::into_clause`).
into_inv_caboose: db::schema::inv_caboose::table.from_clause(),
}
}
}

impl diesel::query_builder::QueryFragment<diesel::pg::Pg> for InvCabooseInsert {
fn walk_ast<'b>(
&'b self,
mut pass: diesel::query_builder::AstPass<'_, 'b, diesel::pg::Pg>,
) -> diesel::QueryResult<()> {
use db::schema::hw_baseboard_id::dsl as dsl_baseboard_id;
use db::schema::inv_caboose::dsl as dsl_inv_caboose;
use db::schema::sw_caboose::dsl as dsl_sw_caboose;

pass.unsafe_to_cache_prepared();
pass.push_sql("WITH my_new_row AS (");

pass.push_sql("SELECT ");

// Emit the values that we're going to insert into `inv_caboose`.
// First, emit the looked-up foreign keys.
self.from_hw_baseboard_id.walk_ast(pass.reborrow())?;
pass.push_sql(".");
pass.push_identifier(dsl_baseboard_id::id::NAME)?;
pass.push_sql(", ");
self.from_sw_caboose.walk_ast(pass.reborrow())?;
pass.push_sql(".");
pass.push_identifier(dsl_sw_caboose::id::NAME)?;
pass.push_sql(", ");
// Next, emit the literal values used for the rest of the columns.
pass.push_bind_param::<sql_types::Uuid, _>(&self.collection_id)?;
pass.push_sql(", ");
pass.push_bind_param::<sql_types::Timestamptz, _>(
&self.time_collected,
)?;
pass.push_sql(", ");
pass.push_bind_param::<sql_types::Text, _>(&self.source)?;
pass.push_sql(", ");
pass.push_bind_param::<CabooseWhichEnum, _>(&self.which)?;

// Finish the SELECT by adding the list of tables and the WHERE to pick
// out only the relevant row from each tables.
pass.push_sql(" FROM ");

self.from_hw_baseboard_id.walk_ast(pass.reborrow())?;
pass.push_sql(", ");
self.from_sw_caboose.walk_ast(pass.reborrow())?;

pass.push_sql(" WHERE ");
self.from_hw_baseboard_id.walk_ast(pass.reborrow())?;
pass.push_sql(".");
pass.push_identifier(dsl_baseboard_id::part_number::NAME)?;
pass.push_sql(" = ");
pass.push_bind_param::<sql_types::Text, _>(
&self.baseboard_part_number,
)?;
pass.push_sql(" AND ");
self.from_hw_baseboard_id.walk_ast(pass.reborrow())?;
pass.push_sql(".");
pass.push_identifier(dsl_baseboard_id::serial_number::NAME)?;
pass.push_sql(" = ");
pass.push_bind_param::<sql_types::Text, _>(
&self.baseboard_serial_number,
)?;
pass.push_sql(" AND ");
self.from_sw_caboose.walk_ast(pass.reborrow())?;
pass.push_sql(".");
pass.push_identifier(dsl_sw_caboose::board::NAME)?;
pass.push_sql(" = ");
pass.push_bind_param::<sql_types::Text, _>(&self.caboose_board)?;
pass.push_sql(" AND ");
self.from_sw_caboose.walk_ast(pass.reborrow())?;
pass.push_sql(".");
pass.push_identifier(dsl_sw_caboose::git_commit::NAME)?;
pass.push_sql(" = ");
pass.push_bind_param::<sql_types::Text, _>(&self.caboose_git_commit)?;
pass.push_sql(" AND ");
self.from_sw_caboose.walk_ast(pass.reborrow())?;
pass.push_sql(".");
pass.push_identifier(dsl_sw_caboose::name::NAME)?;
pass.push_sql(" = ");
pass.push_bind_param::<sql_types::Text, _>(&self.caboose_name)?;
pass.push_sql(" AND ");
self.from_sw_caboose.walk_ast(pass.reborrow())?;
pass.push_sql(".");
pass.push_identifier(dsl_sw_caboose::version::NAME)?;
pass.push_sql(" = ");
pass.push_bind_param::<sql_types::Text, _>(&self.caboose_version)?;

pass.push_sql(")\n"); // end of the SELECT query within the WITH

pass.push_sql("INSERT INTO ");
self.into_inv_caboose.walk_ast(pass.reborrow())?;

pass.push_sql("(");
pass.push_identifier(dsl_inv_caboose::hw_baseboard_id::NAME)?;
pass.push_sql(", ");
pass.push_identifier(dsl_inv_caboose::sw_caboose_id::NAME)?;
pass.push_sql(", ");
pass.push_identifier(dsl_inv_caboose::inv_collection_id::NAME)?;
pass.push_sql(", ");
pass.push_identifier(dsl_inv_caboose::time_collected::NAME)?;
pass.push_sql(", ");
pass.push_identifier(dsl_inv_caboose::source::NAME)?;
pass.push_sql(", ");
pass.push_identifier(dsl_inv_caboose::which::NAME)?;
pass.push_sql(")\n");
pass.push_sql("SELECT * FROM my_new_row");

// See the comment in inventory_insert_collection() where we use
// `inv_service_processor::all_columns()`. The same applies here.
// If you update the statement below because the schema for
// `inv_caboose` has changed, be sure to update the code above, too!
let (
_hw_baseboard_id,
_sw_caboose_id,
_inv_collection_id,
_time_collected,
_source,
_which,
) = dsl_inv_caboose::inv_caboose::all_columns();

Ok(())
}
}

// This is required to be able to call `inv_caboose_insert.execute_async()`.
impl diesel::RunQueryDsl<db::pool::DbConnection> for InvCabooseInsert {}

// This is required to be able to call `inv_caboose_insert.execute_async()`.
impl diesel::query_builder::QueryId for InvCabooseInsert {
type QueryId = ();
const HAS_STATIC_QUERY_ID: bool = false;
}
/// Extra interfaces that are not intended (and potentially unsafe) for use in
/// Nexus, but useful for testing and `omdb`
Expand Down

0 comments on commit 7b21243

Please sign in to comment.