Skip to content

Commit

Permalink
fix(frontend): require primary key for system table (#15126)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Feb 23, 2024
1 parent ea0b012 commit b95d9a9
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 23 deletions.
8 changes: 8 additions & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ query TT
SELECT * FROM pg_catalog.pg_settings where name='dummy';
----

# https://github.com/risingwavelabs/risingwave/issues/15125
query TT
SELECT min(name) name, context FROM pg_catalog.pg_settings GROUP BY context;
----
application_name user
backup_storage_directory postmaster
block_size_kb internal

# Tab-completion of `SET` command
query T
SELECT name
Expand Down
29 changes: 29 additions & 0 deletions src/common/fields-derive/src/gen/test_empty_pk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
impl ::risingwave_common::types::Fields for Data {
const PRIMARY_KEY: Option<&'static [usize]> = Some(&[]);
fn fields() -> Vec<(&'static str, ::risingwave_common::types::DataType)> {
vec![
("v1", < i16 as ::risingwave_common::types::WithDataType >
::default_data_type()), ("v2", < String as
::risingwave_common::types::WithDataType > ::default_data_type())
]
}
fn into_owned_row(self) -> ::risingwave_common::row::OwnedRow {
::risingwave_common::row::OwnedRow::new(
vec![
::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.v1),
::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.v2)
],
)
}
}
impl From<Data> for ::risingwave_common::types::ScalarImpl {
fn from(v: Data) -> Self {
::risingwave_common::types::StructValue::new(
vec![
::risingwave_common::types::ToOwnedDatum::to_owned_datum(v.v1),
::risingwave_common::types::ToOwnedDatum::to_owned_datum(v.v2)
],
)
.into()
}
}
29 changes: 29 additions & 0 deletions src/common/fields-derive/src/gen/test_no_pk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
impl ::risingwave_common::types::Fields for Data {
const PRIMARY_KEY: Option<&'static [usize]> = None;
fn fields() -> Vec<(&'static str, ::risingwave_common::types::DataType)> {
vec![
("v1", < i16 as ::risingwave_common::types::WithDataType >
::default_data_type()), ("v2", < String as
::risingwave_common::types::WithDataType > ::default_data_type())
]
}
fn into_owned_row(self) -> ::risingwave_common::row::OwnedRow {
::risingwave_common::row::OwnedRow::new(
vec![
::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.v1),
::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.v2)
],
)
}
}
impl From<Data> for ::risingwave_common::types::ScalarImpl {
fn from(v: Data) -> Self {
::risingwave_common::types::StructValue::new(
vec![
::risingwave_common::types::ToOwnedDatum::to_owned_datum(v.v1),
::risingwave_common::types::ToOwnedDatum::to_owned_datum(v.v2)
],
)
.into()
}
}
4 changes: 1 addition & 3 deletions src/common/fields-derive/src/gen/test_output.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
impl ::risingwave_common::types::Fields for Data {
const PRIMARY_KEY: Option<&'static [usize]> = Some(&[1usize, 0usize]);
fn fields() -> Vec<(&'static str, ::risingwave_common::types::DataType)> {
vec![
("v1", < i16 as ::risingwave_common::types::WithDataType >
Expand All @@ -21,9 +22,6 @@ impl ::risingwave_common::types::Fields for Data {
],
)
}
fn primary_key() -> &'static [usize] {
&[1usize, 0usize]
}
}
impl From<Data> for ::risingwave_common::types::ScalarImpl {
fn from(v: Data) -> Self {
Expand Down
61 changes: 47 additions & 14 deletions src/common/fields-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,17 @@ fn gen(tokens: TokenStream) -> Result<TokenStream> {
.iter()
.map(|field| field.ident.as_ref().expect("field no name"))
.collect::<Vec<_>>();
let primary_key = get_primary_key(&input).map(|indices| {
quote! {
fn primary_key() -> &'static [usize] {
&[#(#indices),*]
}
}
});
let primary_key = get_primary_key(&input).map_or_else(
|| quote! { None },
|indices| {
quote! { Some(&[#(#indices),*]) }
},
);

Ok(quote! {
impl ::risingwave_common::types::Fields for #ident {
const PRIMARY_KEY: Option<&'static [usize]> = #primary_key;

fn fields() -> Vec<(&'static str, ::risingwave_common::types::DataType)> {
vec![#(#fields_rw),*]
}
Expand All @@ -100,7 +101,6 @@ fn gen(tokens: TokenStream) -> Result<TokenStream> {
::risingwave_common::types::ToOwnedDatum::to_owned_datum(self.#names)
),*])
}
#primary_key
}
impl From<#ident> for ::risingwave_common::types::ScalarImpl {
fn from(v: #ident) -> Self {
Expand Down Expand Up @@ -133,7 +133,9 @@ fn get_primary_key(input: &syn::DeriveInput) -> Option<Vec<usize>> {
return Some(
keys.to_string()
.split(',')
.map(|s| index(s.trim()))
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(index)
.collect(),
);
}
Expand Down Expand Up @@ -199,6 +201,18 @@ mod tests {
prettyplease::unparse(&output)
}

fn do_test(code: &str, expected_path: &str) {
let input: TokenStream = str::parse(code).unwrap();

let output = super::gen(input).unwrap();

let output = pretty_print(output);

let expected = expect_test::expect_file![expected_path];

expected.assert_eq(&output);
}

#[test]
fn test_gen() {
let code = indoc! {r#"
Expand All @@ -213,14 +227,33 @@ mod tests {
}
"#};

let input: TokenStream = str::parse(code).unwrap();
do_test(code, "gen/test_output.rs");
}

let output = super::gen(input).unwrap();
#[test]
fn test_no_pk() {
let code = indoc! {r#"
#[derive(Fields)]
struct Data {
v1: i16,
v2: String,
}
"#};

let output = pretty_print(output);
do_test(code, "gen/test_no_pk.rs");
}

let expected = expect_test::expect_file!["gen/test_output.rs"];
#[test]
fn test_empty_pk() {
let code = indoc! {r#"
#[derive(Fields)]
#[primary_key()]
struct Data {
v1: i16,
v2: String,
}
"#};

expected.assert_eq(&output);
do_test(code, "gen/test_empty_pk.rs");
}
}
11 changes: 6 additions & 5 deletions src/common/src/types/fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@ use crate::util::chunk_coalesce::DataChunkBuilder;
/// }
/// ```
pub trait Fields {
/// The primary key of the table.
///
/// - `None` if the primary key is not applicable.
/// - `Some(&[])` if the primary key is empty, i.e., there'll be at most one row in the table.
const PRIMARY_KEY: Option<&'static [usize]>;

/// Return the schema of the struct.
fn fields() -> Vec<(&'static str, DataType)>;

/// Convert the struct to an `OwnedRow`.
fn into_owned_row(self) -> OwnedRow;

/// The primary key of the table.
fn primary_key() -> &'static [usize] {
&[]
}

/// Create a [`DataChunkBuilder`](crate::util::chunk_coalesce::DataChunkBuilder) with the schema of the struct.
fn data_chunk_builder(capacity: usize) -> DataChunkBuilder {
DataChunkBuilder::new(
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ fn gen_sys_table(attr: Attr, item_fn: ItemFn) -> Result<TokenStream2> {
#[linkme::distributed_slice(crate::catalog::system_catalog::SYS_CATALOGS_SLICE)]
#[no_mangle] // to prevent duplicate schema.table name
fn #gen_fn_name() -> crate::catalog::system_catalog::BuiltinCatalog {
const _: () = {
assert!(#struct_type::PRIMARY_KEY.is_some(), "primary key is required for system table");
};

crate::catalog::system_catalog::BuiltinCatalog::Table(crate::catalog::system_catalog::BuiltinTable {
name: #table_name,
schema: #schema_name,
columns: #struct_type::fields(),
pk: #struct_type::primary_key(),
pk: #struct_type::PRIMARY_KEY.unwrap(),
function: |reader| std::boxed::Box::pin(async {
let rows = #user_fn_name(reader) #_await #handle_error;
let mut builder = #struct_type::data_chunk_builder(rows.len() + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::expr::cast_map_array;
/// Ref: [`https://www.postgresql.org/docs/current/catalog-pg-cast.html`]
#[derive(Fields)]
struct PgCast {
#[primary_key]
oid: i32,
castsource: i32,
casttarget: i32,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::catalog::system_catalog::SysCatalogReaderImpl;
/// The catalog `pg_settings` stores settings.
/// Ref: [`https://www.postgresql.org/docs/current/view-pg-settings.html`]
#[derive(Fields)]
#[primary_key(name, context)]
struct PgSetting {
name: String,
setting: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;

#[derive(Fields)]
#[primary_key(object_id, sst_id)] // TODO: is this correct?
struct RwHummockBranchedObject {
object_id: i64,
sst_id: i64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::error::Result;

#[derive(Fields)]
struct RwHummockPinnedSnapshot {
#[primary_key]
worker_node_id: i32,
min_pinned_snapshot_id: i64,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::error::Result;

#[derive(Fields)]
struct RwHummockPinnedVersion {
#[primary_key]
worker_node_id: i32,
min_pinned_version_id: i64,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::error::Result;

#[derive(Fields)]
struct RwHummockVersion {
#[primary_key]
version_id: i64,
max_committed_epoch: i64,
safe_epoch: i64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::error::Result;

#[derive(Fields)]
struct RwMetaSnapshot {
#[primary_key]
meta_snapshot_id: i64,
hummock_version_id: i64,
// the smallest epoch this meta snapshot includes
Expand Down

0 comments on commit b95d9a9

Please sign in to comment.