Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create secret catalog #16288

Merged
merged 50 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
6906021
init
tabVersion Apr 12, 2024
bc25a9e
Merge remote-tracking branch 'origin' into tab/create-secret
tabVersion Apr 15, 2024
d2a171a
stage
tabVersion Apr 16, 2024
612eb4a
Merge remote-tracking branch 'origin' into tab/create-secret
tabVersion Apr 22, 2024
1e3961e
stash
tabVersion Apr 24, 2024
49eefc0
stash
tabVersion Apr 24, 2024
e3e7795
stash
tabVersion Apr 24, 2024
1dfe129
stash
tabVersion Apr 26, 2024
94f0993
stash
tabVersion Apr 26, 2024
ec8f375
support create secret.
tabVersion Apr 28, 2024
53d7d6c
support drop secret
tabVersion Apr 28, 2024
961fde4
Merge remote-tracking branch 'origin' into tab/create-secret
tabVersion Apr 28, 2024
b2de4da
fix
tabVersion Apr 28, 2024
b64288b
fix meta snapshot
tabVersion Apr 28, 2024
15bfd02
fix license
tabVersion Apr 28, 2024
bf11ee2
encrypt
tabVersion Apr 28, 2024
8e4637f
Merge branch 'main' into tab/create-secret
tabVersion May 2, 2024
157f156
create secret with backend
tabVersion May 2, 2024
1a75e35
Merge branch 'main' into tab/create-secret
tabVersion May 6, 2024
fa15399
Merge remote-tracking branch 'origin' into tab/create-secret
tabVersion May 14, 2024
8240a0e
fix
tabVersion May 14, 2024
dd5e7e6
minor
tabVersion May 14, 2024
384fb85
minor
tabVersion May 14, 2024
c6ddbab
minor
tabVersion May 14, 2024
800b919
handle update secret
tabVersion May 14, 2024
c52ef86
add support for show secret
tabVersion May 14, 2024
cfe4f97
fix comment style
tabVersion May 15, 2024
72cf752
fix
tabVersion May 15, 2024
3620456
make sink work again
tabVersion May 16, 2024
7f4c9bc
resolve
tabVersion May 21, 2024
5ffce3a
Merge remote-tracking branch 'origin' into tab/create-secret
tabVersion May 21, 2024
06f6718
merge main
tabVersion May 21, 2024
e1ef232
update
tabVersion May 21, 2024
02f3941
proto format
tabVersion May 21, 2024
eca94ab
fix config test
tabVersion May 23, 2024
1a4dc62
Merge remote-tracking branch 'origin' into tab/create-secret
tabVersion May 25, 2024
12d4182
resolve
tabVersion May 25, 2024
1180cac
rename migration
tabVersion May 25, 2024
7720f33
remove comment
tabVersion May 25, 2024
2015899
fix
tabVersion May 25, 2024
78027b1
add sql logic test
tabVersion May 25, 2024
a1280fb
fix meta-backend
tabVersion May 25, 2024
0c00675
fix: use dedicated `secret_ref_count` instead of `relation_ref_count`…
tabVersion May 27, 2024
62eba9c
Update lib.rs
tabVersion May 27, 2024
6e04612
Merge branch 'main' into tab/create-secret
tabVersion May 28, 2024
f475dbc
fix
tabVersion May 28, 2024
af892fa
impl aes-siv
tabVersion May 28, 2024
cef7115
fix dylint
tabVersion May 28, 2024
bb9a4c9
Update src/meta/Cargo.toml
tabVersion May 28, 2024
9ac8c60
fix cargo lock
tabVersion May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ message StreamSourceInfo {
SchemaRegistryNameStrategy name_strategy = 10;
optional string key_message_name = 11;
plan_common.ExternalTableDesc external_table = 12;
// **This field should now be called `is_shared`.** Not renamed for backwards compatibility.
// **This field should now be called `is_shared`.** Not renamed for backwards
// compatibility.
//
// Whether the stream source is a shared source (it has a streaming job).
// This is related with [RFC: Reusable Source Executor](https://github.com/risingwavelabs/rfcs/pull/72).
// This is related with [RFC: Reusable Source
// Executor](https://github.com/risingwavelabs/rfcs/pull/72).
//
// Currently, the following sources can be shared:
//
Expand All @@ -80,6 +82,9 @@ message StreamSourceInfo {
bool is_distributed = 15;
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 16;
}

message Source {
Expand Down Expand Up @@ -173,6 +178,9 @@ message Sink {

// Whether it should use background ddl or block until backfill finishes.
CreateType create_type = 24;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
map<string, uint32> secret_ref = 25;
}

message Subscription {
Expand Down Expand Up @@ -238,7 +246,8 @@ message Index {
optional uint64 created_at_epoch = 11;
StreamJobStatus stream_job_status = 12;

// Use to record the prefix len of the index_item to reconstruct index columns provided by users.
// Use to record the prefix len of the index_item to reconstruct index columns
// provided by users.
uint32 index_columns_len = 13;
// Cluster version (tracked by git commit) when initialized/created
optional string initialized_at_cluster_version = 14;
Expand Down Expand Up @@ -299,9 +308,7 @@ message Table {
repeated plan_common.ColumnCatalog columns = 5;
repeated common.ColumnOrder pk = 6;
repeated uint32 dependent_relations = 8;
oneof optional_associated_source_id {
uint32 associated_source_id = 9;
}
oneof optional_associated_source_id { uint32 associated_source_id = 9; }
TableType table_type = 10;
repeated int32 distribution_key = 12;
// pk_indices of the corresponding materialize operator's output.
Expand All @@ -314,8 +321,8 @@ message Table {
// an optional column index which is the vnode of each row computed by the
// table's consistent hash distribution
optional uint32 vnode_col_index = 18;
// An optional column index of row id. If the primary key is specified by users,
// this will be `None`.
// An optional column index of row id. If the primary key is specified by
// users, this will be `None`.
optional uint32 row_id_index = 19;
// The column indices which are stored in the state store's value with
// row-encoding. Currently is not supported yet and expected to be
Expand All @@ -324,23 +331,26 @@ message Table {
string definition = 21;
// Used to control whether handling pk conflict for incoming data.
HandleConflictBehavior handle_pk_conflict_behavior = 22;
// Anticipated read prefix pattern (number of fields) for the table, which can be utilized
// for implementing the table's bloom filter or other storage optimization techniques.
// Anticipated read prefix pattern (number of fields) for the table, which can
// be utilized for implementing the table's bloom filter or other storage
// optimization techniques.
uint32 read_prefix_len_hint = 23;
repeated int32 watermark_indices = 24;
repeated int32 dist_key_in_pk = 25;
// A dml fragment id corresponds to the table, used to decide where the dml statement is executed.
// A dml fragment id corresponds to the table, used to decide where the dml
// statement is executed.
optional uint32 dml_fragment_id = 26;
// The range of row count of the table.
// This field is not always present due to backward compatibility. Use `Cardinality::unknown` in this case.
// This field is not always present due to backward compatibility. Use
// `Cardinality::unknown` in this case.
plan_common.Cardinality cardinality = 27;

optional uint64 initialized_at_epoch = 28;
optional uint64 created_at_epoch = 29;

// This field is introduced in v1.2.0. It is used to indicate whether the table should use
// watermark_cache to avoid state cleaning as a performance optimization.
// In older versions we can just initialize without it.
// This field is introduced in v1.2.0. It is used to indicate whether the
// table should use watermark_cache to avoid state cleaning as a performance
// optimization. In older versions we can just initialize without it.
bool cleaned_by_watermark = 30;

// Used to filter created / creating tables in meta.
Expand All @@ -358,14 +368,18 @@ message Table {
optional string initialized_at_cluster_version = 35;
optional string created_at_cluster_version = 36;

// TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
// TTL of the record in the table, to ensure the consistency with other tables
// in the streaming plan, it only applies to append-only tables.
optional uint32 retention_seconds = 37;

// This field specifies the index of the column set in the "with version column" within all the columns. It is used for filtering during "on conflict" operations.
// This field specifies the index of the column set in the "with version
// column" within all the columns. It is used for filtering during "on
// conflict" operations.
optional uint32 version_column_index = 38;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
TableVersion version = 100;
}

Expand Down Expand Up @@ -410,3 +424,12 @@ message Comment {
optional uint32 column_index = 4;
optional string description = 5;
}

message Secret {
uint32 id = 1;
string name = 2;
uint32 database_id = 3;
bytes value = 4;
uint32 owner = 5;

}
35 changes: 35 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,41 @@ impl From<ConnectionId> for u32 {
}
}

#[derive(Clone, Copy, Debug, Display, Default, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub struct SecretId(pub u32);

impl SecretId {
pub const fn new(id: u32) -> Self {
SecretId(id)
}

pub const fn placeholder() -> Self {
SecretId(OBJECT_ID_PLACEHOLDER)
}

pub fn secret_id(&self) -> u32 {
self.0
}
}

impl From<u32> for SecretId {
fn from(id: u32) -> Self {
Self::new(id)
}
}

impl From<&u32> for SecretId {
fn from(id: &u32) -> Self {
Self::new(*id)
}
}

impl From<SecretId> for u32 {
fn from(id: SecretId) -> Self {
id.0
}
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ConflictBehavior {
#[default]
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};

use itertools::Itertools;
use risingwave_common::catalog::{
Expand Down Expand Up @@ -83,6 +83,7 @@ impl SinkDesc {
owner: UserId,
connection_id: Option<ConnectionId>,
dependent_relations: Vec<TableId>,
secret_ref: HashMap<String, u32>,
) -> SinkCatalog {
SinkCatalog {
id: self.id,
Expand All @@ -108,6 +109,7 @@ impl SinkDesc {
created_at_cluster_version: None,
initialized_at_cluster_version: None,
create_type: self.create_type,
secret_ref,
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ pub struct SinkCatalog {
pub created_at_cluster_version: Option<String>,
pub initialized_at_cluster_version: Option<String>,
pub create_type: CreateType,

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_ref: HashMap<String, u32>,
}

impl SinkCatalog {
Expand Down Expand Up @@ -351,6 +354,7 @@ impl SinkCatalog {
created_at_cluster_version: self.created_at_cluster_version.clone(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
create_type: self.create_type.to_proto() as i32,
secret_ref: self.secret_ref.clone(),
}
}

Expand Down Expand Up @@ -444,6 +448,7 @@ impl From<PbSink> for SinkCatalog {
initialized_at_cluster_version: pb.initialized_at_cluster_version,
created_at_cluster_version: pb.created_at_cluster_version,
create_type: CreateType::from_proto(create_type),
secret_ref: pb.secret_ref,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub(crate) type SchemaId = u32;
pub(crate) type TableId = risingwave_common::catalog::TableId;
pub(crate) type ColumnId = risingwave_common::catalog::ColumnId;
pub(crate) type FragmentId = u32;
pub(crate) type SecretId = risingwave_common::catalog::SecretId;

/// Check if the column name does not conflict with the internally reserved column name.
pub fn check_valid_column_name(column_name: &str) -> Result<()> {
Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::system_catalog::SystemTableCatalog;
use crate::catalog::table_catalog::TableCatalog;
use crate::catalog::view_catalog::ViewCatalog;
use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SinkId, SourceId, ViewId};
use crate::catalog::{ConnectionId, DatabaseId, SchemaId, SecretId, SinkId, SourceId, ViewId};
use crate::expr::{infer_type_name, infer_type_with_sigmap, Expr, ExprImpl};
use crate::user::UserId;

Expand Down Expand Up @@ -483,6 +483,10 @@ impl SchemaCatalog {
.expect("connection not found by name");
}

pub fn create_secret(&mut self, secret_id: SecretId) {
todo!()
}

pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name.values()
}
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationC
use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
use crate::session::SessionImpl;
use crate::stream_fragmenter::build_graph;
use crate::utils::resolve_privatelink_in_with_option;
use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_in_with_options};
use crate::{Explain, Planner, TableCatalog, WithOptions};

pub fn gen_sink_subscription_query_from_name(from_name: ObjectName) -> Result<Query> {
Expand Down Expand Up @@ -166,6 +166,7 @@ pub fn gen_sink_plan(
resolve_privatelink_in_with_option(&mut with_options, &sink_schema_name, session)?;
conn_id.map(ConnectionId)
};
let secret_ref = resolve_secret_in_with_options(&mut with_options)?;

let emit_on_window_close = stmt.emit_mode == Some(EmitMode::OnWindowClose);
if emit_on_window_close {
Expand Down Expand Up @@ -256,6 +257,7 @@ pub fn gen_sink_plan(
UserId::new(session.user_id()),
connection_id,
dependent_relations.into_iter().collect_vec(),
secret_ref,
);

if let Some(table_catalog) = &target_table_catalog {
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ impl WithOptions {
}
}

pub(crate) fn resolve_secret_in_with_options(
with_options: &mut WithOptions,
) -> RwResult<HashMap<String, u32>> {
todo!()
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
}

pub(crate) fn resolve_privatelink_in_with_option(
with_options: &mut WithOptions,
schema_name: &Option<String>,
Expand Down
1 change: 1 addition & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub type IndexId = ObjectId;
pub type ViewId = ObjectId;
pub type FunctionId = ObjectId;
pub type ConnectionId = ObjectId;
pub type SecretId = ObjectId;
pub type UserId = i32;
pub type PrivilegeId = i32;

Expand Down
60 changes: 60 additions & 0 deletions src/meta/model_v2/src/secret.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "secret")]
pub struct Model {
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
pub secret_id: u32,
pub name: String,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::object::Entity",
from = "Column::SecretId",
to = "super::object::Column::Oid",
on_update = "NoAction",
on_delete = "Cascade"
)]
Object,
// #[sea_orm(
// belongs_to = "super"
// )]
// Sink,
// #[sea_orm(has_many = "super::source::Entity")]
// Source,
}

impl Related<super::object::Entity> for Entity {
fn to() -> RelationDef {
Relation::Object.def()
}
}

// impl Related<super::sink::Entity> for Entity {
// fn to() -> RelationDef {
// Relation::Sink.def()
// }
// }

// impl Related<super::source::Entity> for Entity {
// fn to() -> RelationDef {
// Relation::Source.def()
// }
// }

Loading
Loading