Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick refactor: secret ref columns in protobuf #17335

Merged
merged 4 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "common.proto";
import "data.proto";
import "expr.proto";
import "plan_common.proto";
import "secret.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;
Expand Down Expand Up @@ -83,8 +84,8 @@ message StreamSourceInfo {
// Options specified by user in the FORMAT ENCODE clause.
map<string, string> format_encode_options = 14;

// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 16;
// Handle the source relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, secret.SecretRef> secret_ref = 16;
}

message Source {
Expand Down Expand Up @@ -180,8 +181,8 @@ message Sink {
// Whether it should use background ddl or block until backfill finishes.
CreateType create_type = 24;

// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 25;
// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id and type.
map<string, secret.SecretRef> secret_ref = 25;
}

message Subscription {
Expand Down
11 changes: 11 additions & 0 deletions proto/secret.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,14 @@ message Secret {
SecretHashicropValutBackend hashicorp_vault = 2;
}
}

message SecretRef {
enum RefAsType {
UNSPECIFIED = 0;
TEXT = 1;
// AS FILE
FILE = 2;
}
uint32 secret_id = 1;
RefAsType ref_as = 2;
}
3 changes: 2 additions & 1 deletion src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, CreateType, DatabaseId, SchemaId, TableId, UserId,
};
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::secret::PbSecretRef;
use risingwave_pb::stream_plan::PbSinkDesc;

use super::{SinkCatalog, SinkFormatDesc, SinkId, SinkType};
Expand Down Expand Up @@ -83,7 +84,7 @@ impl SinkDesc {
owner: UserId,
connection_id: Option<ConnectionId>,
dependent_relations: Vec<TableId>,
secret_ref: BTreeMap<String, u32>,
secret_ref: BTreeMap<String, PbSecretRef>,
) -> SinkCatalog {
SinkCatalog {
id: self.id,
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{
PbCreateType, PbSink, PbSinkFormatDesc, PbSinkType, PbStreamJobStatus,
};
use risingwave_pb::secret::PbSecretRef;

use super::{
SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION,
Expand Down Expand Up @@ -339,7 +340,7 @@ pub struct SinkCatalog {
pub create_type: CreateType,

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_ref: BTreeMap<String, u32>,
pub secret_ref: BTreeMap<String, PbSecretRef>,
}

impl SinkCatalog {
Expand Down
4 changes: 2 additions & 2 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,12 +555,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
*id = *connection_rewrite.get(id).unwrap();
}
for secret_id in s.secret_ref.values_mut() {
*secret_id = *secret_rewrite.get(secret_id).unwrap();
secret_id.secret_id = *secret_rewrite.get(&secret_id.secret_id).unwrap();
}
object_dependencies.extend(s.secret_ref.values().map(|id| {
object_dependency::ActiveModel {
id: NotSet,
oid: Set(*id as _),
oid: Set(id.secret_id as _),
used_by: Set(s.id as _),
}
}));
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_connector::source::kafka::private_link::{
insert_privatelink_broker_rewrite_map, CONNECTION_NAME_KEY, PRIVATELINK_ENDPOINT_KEY,
};
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::secret::PbSecretRef;
use risingwave_sqlparser::ast::{
CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
CreateSubscriptionStatement, SqlOption, Statement, Value,
Expand Down Expand Up @@ -119,7 +120,7 @@ impl WithOptions {
pub(crate) fn resolve_secret_in_with_options(
_with_options: &mut WithOptions,
_session: &SessionImpl,
) -> RwResult<BTreeMap<String, u32>> {
) -> RwResult<BTreeMap<String, PbSecretRef>> {
// todo: implement the function and take `resolve_privatelink_in_with_option` as reference

Ok(BTreeMap::new())
Expand Down
28 changes: 26 additions & 2 deletions src/meta/model_v2/migration/src/m20240525_090457_secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,22 @@ impl MigrationTrait for Migration {
)
.await?;

// Add a new column to the table
// Add a new column to the `sink` table
manager
.alter_table(
MigrationTable::alter()
.table(Sink::Table)
.add_column(ColumnDef::new(Sink::SecretRef).json_binary())
.add_column(ColumnDef::new(Sink::SecretRef).binary())
.to_owned(),
)
.await?;

// Add a new column to the `source` table
manager
.alter_table(
MigrationTable::alter()
.table(Source::Table)
.add_column(ColumnDef::new(Source::SecretRef).binary())
.to_owned(),
)
.await?;
Expand All @@ -60,6 +70,14 @@ impl MigrationTrait for Migration {
.to_owned(),
)
.await?;
manager
.alter_table(
MigrationTable::alter()
.table(Source::Table)
.drop_column(Source::SecretRef)
.to_owned(),
)
.await?;
Ok(())
}
}
Expand All @@ -77,3 +95,9 @@ enum Sink {
Table,
SecretRef,
}

#[derive(DeriveIden)]
enum Source {
Table,
SecretRef,
}
52 changes: 51 additions & 1 deletion src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::BTreeMap;

use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
use risingwave_pb::secret::PbSecretRef;
use risingwave_pb::stream_plan::PbStreamNode;
use sea_orm::entity::prelude::*;
use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
Expand Down Expand Up @@ -258,6 +259,55 @@ macro_rules! derive_array_from_blob {
};
}

macro_rules! derive_btreemap_from_blob {
($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
#[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
pub struct $struct_name(#[sea_orm] Vec<u8>);

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct $field_type {
#[prost(btree_map = "string, message")]
inner: BTreeMap<$key_type, $value_type>,
}
impl Eq for $field_type {}

impl $struct_name {
pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
data.inner
}

fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
}
}

impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
Self::from_protobuf(value)
}
}

impl std::fmt::Debug for $struct_name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.to_protobuf().fmt(f)
}
}

impl Default for $struct_name {
fn default() -> Self {
Self(vec![])
}
}

impl sea_orm::sea_query::Nullable for $struct_name {
fn null() -> Value {
Value::Bytes(None)
}
}
};
}

pub(crate) use {derive_array_from_blob, derive_from_blob};

derive_from_json_struct!(I32Array, Vec<i32>);
Expand Down Expand Up @@ -286,7 +336,7 @@ impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
}
}

derive_from_json_struct!(SecretRef, BTreeMap<String, u32>);
derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);

derive_from_blob!(StreamNode, PbStreamNode);
derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct Model {
pub sink_from_name: String,
pub sink_format_desc: Option<SinkFormatDesc>,
pub target_table: Option<TableId>,
// `secret_ref` stores a json string, mapping from property name to secret id.
// `secret_ref` stores the mapping info mapping from property name to secret id and type.
pub secret_ref: Option<SecretRef>,
}

Expand Down
7 changes: 5 additions & 2 deletions src/meta/model_v2/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::{
ColumnCatalogArray, ConnectionId, I32Array, Property, SourceId, StreamSourceInfo, TableId,
WatermarkDescArray,
ColumnCatalogArray, ConnectionId, I32Array, Property, SecretRef, SourceId, StreamSourceInfo,
TableId, WatermarkDescArray,
};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
Expand All @@ -39,6 +39,8 @@ pub struct Model {
pub optional_associated_table_id: Option<TableId>,
pub connection_id: Option<ConnectionId>,
pub version: i64,
// `secret_ref` stores the mapping info mapping from property name to secret id and type.
pub secret_ref: Option<SecretRef>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -101,6 +103,7 @@ impl From<PbSource> for ActiveModel {
optional_associated_table_id: Set(optional_associated_table_id),
connection_id: Set(source.connection_id.map(|id| id as _)),
version: Set(source.version as _),
secret_ref: Set(None),
}
}
}
4 changes: 2 additions & 2 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ impl From<ObjectModel<source::Model>> for PbSource {

impl From<ObjectModel<sink::Model>> for PbSink {
fn from(value: ObjectModel<sink::Model>) -> Self {
let mut secret_ref_map: BTreeMap<String, u32> = BTreeMap::new();
let mut secret_ref_map = BTreeMap::new();
if let Some(secret_ref) = value.0.secret_ref {
secret_ref_map = secret_ref.into_inner();
secret_ref_map = secret_ref.to_protobuf();
}
Self {
id: value.0.sink_id as _,
Expand Down
3 changes: 3 additions & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
".plan_common.ExternalTableDesc",
".hummock.CompactTask",
".catalog.StreamSourceInfo",
".catalog.SecretRef",
".catalog.Source",
".catalog.Sink",
".catalog.View",
Expand Down Expand Up @@ -111,6 +112,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr
.type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]")
.type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]")
.type_attribute("secret.SecretRef", "#[derive(Eq, Hash)]")
.type_attribute("catalog.IndexColumnProperties", "#[derive(Eq, Hash)]")
.type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]")
.type_attribute("data.DataType", "#[derive(Eq, Hash)]")
.type_attribute("expr.ExprNode.rex_node", "#[derive(Eq, Hash)]")
Expand Down
Loading