Skip to content

Commit

Permalink
create secret with backend
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion committed May 2, 2024
1 parent 8e4637f commit 157f156
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 26 deletions.
20 changes: 20 additions & 0 deletions proto/secret.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
syntax = "proto3";

package secret;

message SecretMetaBackend {
bytes value = 1;
}

message SecretHashicropValutBackend {
string host = 1;
string vault_token = 2;
}

message Secret {
// the message is stored in meta as encrypted bytes and is interpreted as bytes by catalog
oneof secret_backend {
SecretMetaBackend meta = 1;
SecretHashicropValutBackend hashicorp_vault = 2;
}
}
39 changes: 34 additions & 5 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ pub struct MetaConfig {
/// Whether compactor should rewrite row to remove dropped column.
#[serde(default = "default::meta::enable_dropped_column_reclaim")]
pub enable_dropped_column_reclaim: bool,

#[serde(default = "default::meta::secret_store_private_key")]
pub secret_store_private_key: Vec<u8>,
}

#[derive(Copy, Clone, Debug, Default)]
Expand Down Expand Up @@ -1210,9 +1213,14 @@ pub mod default {
pub fn parallelism_control_trigger_first_delay_sec() -> u64 {
30
}

pub fn enable_dropped_column_reclaim() -> bool {
false
}

pub fn secret_store_private_key() -> Vec<u8> {
"demo-secret-private-key".as_bytes().to_vec()
}
}

pub mod server {
Expand Down Expand Up @@ -1368,6 +1376,7 @@ pub mod default {
pub fn max_preload_io_retry_times() -> usize {
3
}

pub fn mem_table_spill_threshold() -> usize {
4 << 20
}
Expand Down Expand Up @@ -1408,7 +1417,6 @@ pub mod default {
}

pub mod file_cache {

pub fn dir() -> String {
"".to_string()
}
Expand Down Expand Up @@ -1582,12 +1590,15 @@ pub mod default {
pub fn memory_controller_threshold_aggressive() -> f64 {
0.9
}

pub fn memory_controller_threshold_graceful() -> f64 {
0.8
}

pub fn memory_controller_threshold_stable() -> f64 {
0.7
}

pub fn stream_enable_arrangement_backfill() -> bool {
true
}
Expand Down Expand Up @@ -1615,16 +1626,20 @@ pub mod default {
}

pub mod compaction_config {
const DEFAULT_MAX_COMPACTION_BYTES: u64 = 2 * 1024 * 1024 * 1024; // 2GB
const DEFAULT_MIN_COMPACTION_BYTES: u64 = 128 * 1024 * 1024; // 128MB
const DEFAULT_MAX_COMPACTION_BYTES: u64 = 2 * 1024 * 1024 * 1024;
// 2GB
const DEFAULT_MIN_COMPACTION_BYTES: u64 = 128 * 1024 * 1024;
// 128MB
const DEFAULT_MAX_BYTES_FOR_LEVEL_BASE: u64 = 512 * 1024 * 1024; // 512MB

// decrease this configure when the generation of checkpoint barrier is not frequent.
const DEFAULT_TIER_COMPACT_TRIGGER_NUMBER: u64 = 12;
const DEFAULT_TARGET_FILE_SIZE_BASE: u64 = 32 * 1024 * 1024; // 32MB
const DEFAULT_TARGET_FILE_SIZE_BASE: u64 = 32 * 1024 * 1024;
// 32MB
const DEFAULT_MAX_SUB_COMPACTION: u32 = 4;
const DEFAULT_LEVEL_MULTIPLIER: u64 = 5;
const DEFAULT_MAX_SPACE_RECLAIM_BYTES: u64 = 512 * 1024 * 1024; // 512MB;
const DEFAULT_MAX_SPACE_RECLAIM_BYTES: u64 = 512 * 1024 * 1024;
// 512MB;
const DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER: u64 = 300;
const DEFAULT_MAX_COMPACTION_FILE_COUNT: u64 = 100;
const DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT: u32 = 3;
Expand All @@ -1637,42 +1652,55 @@ pub mod default {
pub fn max_bytes_for_level_base() -> u64 {
DEFAULT_MAX_BYTES_FOR_LEVEL_BASE
}

pub fn max_bytes_for_level_multiplier() -> u64 {
DEFAULT_LEVEL_MULTIPLIER
}

pub fn max_compaction_bytes() -> u64 {
DEFAULT_MAX_COMPACTION_BYTES
}

pub fn sub_level_max_compaction_bytes() -> u64 {
DEFAULT_MIN_COMPACTION_BYTES
}

pub fn level0_tier_compact_file_number() -> u64 {
DEFAULT_TIER_COMPACT_TRIGGER_NUMBER
}

pub fn target_file_size_base() -> u64 {
DEFAULT_TARGET_FILE_SIZE_BASE
}

pub fn compaction_filter_mask() -> u32 {
(CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL).into()
}

pub fn max_sub_compaction() -> u32 {
DEFAULT_MAX_SUB_COMPACTION
}

pub fn level0_stop_write_threshold_sub_level_number() -> u64 {
DEFAULT_LEVEL0_STOP_WRITE_THRESHOLD_SUB_LEVEL_NUMBER
}

pub fn level0_sub_level_compact_level_count() -> u32 {
DEFAULT_MIN_SUB_LEVEL_COMPACT_LEVEL_COUNT
}

pub fn level0_overlapping_sub_level_compact_level_count() -> u32 {
DEFAULT_MIN_OVERLAPPING_SUB_LEVEL_COMPACT_LEVEL_COUNT
}

pub fn max_space_reclaim_bytes() -> u64 {
DEFAULT_MAX_SPACE_RECLAIM_BYTES
}

pub fn level0_max_compact_file_number() -> u64 {
DEFAULT_MAX_COMPACTION_FILE_COUNT
}

pub fn tombstone_reclaim_ratio() -> u32 {
DEFAULT_TOMBSTONE_RATIO_PERCENT
}
Expand Down Expand Up @@ -1746,6 +1774,7 @@ pub mod default {

pub mod developer {
use crate::util::env_var::env_var_is_true_or;

const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3";

pub fn object_store_retry_unknown_service_error() -> bool {
Expand Down
66 changes: 57 additions & 9 deletions src/frontend/src/handler/create_secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_sqlparser::ast::{CreateSecretStatement, Value};
use prost::Message;
use risingwave_sqlparser::ast::{CreateSecretStatement, SqlOption, Value};

use crate::error::{ErrorCode, Result};
use crate::handler::{HandlerArgs, RwPgResponse};
use crate::Binder;
use crate::{Binder, WithOptions};

const SECRET_BACKEND_KEY: &str = "backend";

const SECRET_BACKEND_META: &str = "meta";
const SECRET_BACKEND_HASHICORP_VAULT: &str = "hashicorp_vault";

pub async fn handle_create_secret(
handler_args: HandlerArgs,
Expand All @@ -37,17 +43,59 @@ pub async fn handle_create_secret(
Err(e)
};
}
let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
let secret_payload = match &stmt.credential {
Value::SingleQuotedString(ref s) => s.as_bytes().to_vec(),
_ => {
return Err(ErrorCode::InvalidParameterValue(
"secret payload must be a string".to_string(),
)

// check if the secret backend is supported
let with_props = WithOptions::try_from(stmt.with_properties.0.as_ref() as &[SqlOption])?;
let secret_payload: Vec<u8> = {
if let Some(backend) = with_props.inner().get(SECRET_BACKEND_KEY) {
match backend.to_lowercase().as_ref() {
SECRET_BACKEND_META => {
let backend = risingwave_pb::secret::Secret {
secret_backend: Some(risingwave_pb::secret::secret::SecretBackend::Meta(
risingwave_pb::secret::SecretMetaBackend { value: vec![] },
)),
};
backend.encode_to_vec()
}
SECRET_BACKEND_HASHICORP_VAULT => {
if stmt.credential != Value::Null {
return Err(ErrorCode::InvalidParameterValue(
"credential must be null for hashicorp_vault backend".to_string(),
)
.into());
}
todo!()
}
_ => {
return Err(ErrorCode::InvalidParameterValue(format!(
"secret backend \"{}\" is not supported. Supported backends are: {}",
backend,
[SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",")
))
.into());
}
}
} else {
return Err(ErrorCode::InvalidParameterValue(format!(
"secret backend is not specified in with clause. Supported backends are: {}",
[SECRET_BACKEND_META, SECRET_BACKEND_HASHICORP_VAULT].join(",")
))
.into());
}
};

let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
// let secret_payload = match &stmt.credential {
// Value::SingleQuotedString(ref s) => s.as_bytes().to_vec(),
// Value::Null => Vec::new(),
// _ => {
// return Err(ErrorCode::InvalidParameterValue(
// "secret payload must be a string".to_string(),
// )
// .into());
// }
// };

let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_secret(
Expand Down
1 change: 1 addition & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
.developer
.max_trivial_move_task_count_per_loop,
max_get_task_probe_times: config.meta.developer.max_get_task_probe_times,
secret_store_private_key: config.meta.secret_store_private_key,
},
config.system.into_init_system_params(),
Default::default(),
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ use crate::controller::utils::{
check_connection_name_duplicate, check_database_name_duplicate,
check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate,
check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty,
ensure_user_id, get_fragment_mappings, get_fragment_mappings_by_jobs,
get_referring_objects, get_referring_objects_cascade,
get_user_privilege, list_user_info_by_ids, resolve_source_register_info_for_jobs,
PartialObject,
ensure_user_id, get_fragment_mappings, get_fragment_mappings_by_jobs, get_referring_objects,
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
resolve_source_register_info_for_jobs, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::{
actor, actor_dispatcher, connection, database, fragment, function, index, object,
object_dependency, schema, secret, sink, source, table, user, user_privilege, view,
ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping,
I32Array, ObjectId, PrivilegeId, SchemaId, SourceId, StreamNode, UserId,
object_dependency, schema, sink, source, table, user, user_privilege, view, ActorId,
DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId,
SchemaId, SourceId, StreamNode, UserId,
};
use risingwave_pb::catalog::{PbConnection, PbFunction, PbSecret};
use risingwave_pb::meta::PbFragmentParallelUnitMapping;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,8 @@ impl DdlController {
// here, we need to encrypt it before storing it in the catalog.

let encrypted_payload = simplestcrypt::encrypt_and_serialize(
self.env.opts.secret_store_private_key.as_bytes(),
secret.get_value().as_bytes(),
self.env.opts.secret_store_private_key.as_slice(),
secret.get_value().as_slice(),
)
.map_err(|e| {
MetaError::from(MetaErrorInner::InvalidParameter(format!(
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"task_service",
"telemetry",
"user",
"secret",
];
let protos: Vec<String> = proto_files
.iter()
Expand Down
10 changes: 9 additions & 1 deletion src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub mod batch_plan;
#[cfg_attr(madsim, path = "sim/task_service.rs")]
pub mod task_service;
#[rustfmt::skip]
#[cfg_attr(madsim, path="sim/connector_service.rs")]
#[cfg_attr(madsim, path = "sim/connector_service.rs")]
pub mod connector_service;
#[rustfmt::skip]
#[cfg_attr(madsim, path = "sim/stream_plan.rs")]
Expand Down Expand Up @@ -91,6 +91,10 @@ pub mod health;
#[rustfmt::skip]
#[path = "sim/telemetry.rs"]
pub mod telemetry;

#[rustfmt::skip]
#[path = "sim/secret.rs"]
pub mod secret;
#[rustfmt::skip]
#[path = "connector_service.serde.rs"]
pub mod connector_service_serde;
Expand Down Expand Up @@ -158,6 +162,10 @@ pub mod java_binding_serde;
#[path = "telemetry.serde.rs"]
pub mod telemetry_serde;

#[rustfmt::skip]
#[path = "secret.serde.rs"]
pub mod secret_serde;

#[derive(Clone, PartialEq, Eq, Debug, Error)]
#[error("field `{0}` not found")]
pub struct PbFieldNotFound(pub &'static str);
Expand Down
12 changes: 10 additions & 2 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ impl fmt::Display for DeclareCursor {
v.iter().join(" ").fmt(f)
}
}

// sql_grammar!(DeclareCursorStatement {
// cursor_name: Ident,
// [Keyword::SUBSCRIPTION]
Expand Down Expand Up @@ -706,6 +707,7 @@ impl ParseTo for DeclareCursorStatement {
})
}
}

impl fmt::Display for DeclareCursorStatement {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut v: Vec<String> = vec![];
Expand Down Expand Up @@ -777,6 +779,7 @@ impl ParseTo for CloseCursorStatement {
Ok(Self { cursor_name })
}
}

impl fmt::Display for CloseCursorStatement {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut v: Vec<String> = vec![];
Expand Down Expand Up @@ -837,19 +840,24 @@ pub struct CreateSecretStatement {
pub if_not_exists: bool,
pub secret_name: ObjectName,
pub credential: Value,
pub with_properties: WithProperties,
}

impl ParseTo for CreateSecretStatement {
fn parse_to(parser: &mut Parser) -> Result<Self, ParserError> {
impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
impl_parse_to!(secret_name: ObjectName, parser);
parser.expect_keyword(Keyword::AS)?;
impl_parse_to!(with_properties: WithProperties, parser);
let mut credential = Value::Null;
if parser.parse_keyword(Keyword::AS) {
credential = parser.parse_value()?;
}
// impl_parse_to!(credential: Value, parser);
let credential = parser.parse_value()?;
Ok(Self {
if_not_exists,
secret_name,
credential,
with_properties,
})
}
}
Expand Down

0 comments on commit 157f156

Please sign in to comment.