Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(secret): alter secret in catalog #19495

Merged
merged 13 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 30
timeout_in_minutes: 32
retry: *auto-retry

# FIXME(kwannoel): Let the github PR labeller label it, if sqlsmith source files has changes.
Expand Down
20 changes: 20 additions & 0 deletions e2e_test/ddl/secret.slt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ create secret secret_1 with (
backend = 'meta'
) as 'demo_secret';

statement ok
alter secret secret_1 with (
backend = 'meta'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backend option should not appear in the alter secret statement. To follow the convention of other alter commands, only the changed part should be provided. Here the backend option is obviously untouched, so users should not write it here.

But, furthermore, I think backend shouldn't be a per-secret option. In my mind it should be a global-wise config and can't be changed after cluster initialized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, furthermore, I think backend shouldn't be a per-secret option. In my mind it should be a global-wise config and can't be changed after cluster initialized.

It's a bit off-topic. We can create a new issue

Copy link
Contributor Author

@yuhao-su yuhao-su Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concern here. I was thinking the same. But there are 2 problems

  1. We are doing something unusual here. The backend info in with option is also encrypted. We need to decrypt the original secret to write the new one. I can do this.
  2. For hashvault, there is no value we can alter, we can only alter the with. For meta backend, user can alter the secret to using hashvault or just the value. So I try to ask user to write both WITH and AS so they know what they are doing.

So anyway, I think get rid of the WITH here is also a good choice. Will change this.

) as 'demo_secret_altered';

statement ok
alter secret secret_1 as 'demo_secret_altered_again';

statement error
alter secret secret_2 with (
backend = 'meta'
) as 'demo_secret_altered';
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Catalog error
2: secret not found: secret_2


# wait for support for hashicorp_vault backend
# statement ok
# create secret secret_2 with (
Expand Down
5 changes: 5 additions & 0 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;
statement ok
CREATE SECRET iceberg_s3_access_key WITH (
backend = 'meta'
) as 'hummockadmin_wrong';

statement ok
ALTER SECRET iceberg_s3_access_key WITH (
backend = 'meta'
) as 'hummockadmin';

statement ok
Expand Down
57 changes: 55 additions & 2 deletions e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,31 @@ mysql --protocol=tcp -u root mytest < e2e_test/source_legacy/cdc/mysql_init_data
statement ok
create secret mysql_pwd with (
backend = 'meta'
) as '${MYSQL_PWD:}';
) as 'incorrect_password';

# create a cdc source job, with incorrct password
statement error
create source mysql_mytest with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'rwcdc',
password = secret mysql_pwd,
database.name = 'mytest',
server.id = '5601'
);
# The detailed error message is commented out because the user IP in error message may vary in different environments.
# ----
# db error: ERROR: Failed to run the query
# Caused by these errors (recent errors listed first):
# 1: gRPC request to meta service failed: Internal error
# 2: failed to create source worker
# 3: failed to create SplitEnumerator
# 4: source cannot pass validation
# 5: Internal error: Access denied for user 'rwcdc'@'172.17.0.1' (using password: YES)

statement ok
alter secret mysql_pwd as '${MYSQL_PWD:}';

# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON`
statement ok
Expand All @@ -36,7 +60,6 @@ create source mysql_mytest with (
server.id = '5601'
);


statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source
create materialized view mv as select * from mysql_mytest;

Expand All @@ -62,6 +85,22 @@ from mysql_mytest table 'mytest.products';
# sleep to ensure (default,'Milk','Milk is a white liquid food') is consumed from Debezium message instead of backfill.
sleep 10s

### BEGIN test change secret in MySQL and ALTER SECRET in Risingwave
onlyif can-use-recover
statement ok
alter secret mysql_pwd with (
backend = 'meta'
) as 'new_password';

onlyif can-use-recover
system ok
mysql --protocol=tcp -u root -e "ALTER USER 'rwcdc'@'%' IDENTIFIED BY 'new_password';"

onlyif can-use-recover
statement ok
recover;
### END

statement error Permission denied
drop secret mysql_pwd;

Expand Down Expand Up @@ -621,3 +660,17 @@ query II
select * from test_pg_default_value;
----
1 noris Shanghai

### BEGIN reset the password to the original one
onlyif can-use-recover
statement ok
alter secret mysql_pwd as '${MYSQL_PWD:}';

onlyif can-use-recover
system ok
mysql --protocol=tcp -u root -e "ALTER USER 'rwcdc'@'%' IDENTIFIED BY '${MYSQL_PWD:}';"

onlyif can-use-recover
statement ok
recover;
### END
14 changes: 14 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,19 @@ message DropSecretResponse {
WaitVersion version = 1;
}

message AlterSecretRequest {
uint32 secret_id = 1;
string name = 2;
bytes value = 3;
uint32 database_id = 4;
uint32 schema_id = 5;
uint32 owner_id = 6;
}

message AlterSecretResponse {
WaitVersion version = 1;
}

message CreateConnectionRequest {
message PrivateLink {
catalog.Connection.PrivateLinkService.PrivateLinkProvider provider = 1;
Expand Down Expand Up @@ -515,6 +528,7 @@ service DdlService {
rpc CreateTable(CreateTableRequest) returns (CreateTableResponse);
rpc CreateSecret(CreateSecretRequest) returns (CreateSecretResponse);
rpc DropSecret(DropSecretRequest) returns (DropSecretResponse);
rpc AlterSecret(AlterSecretRequest) returns (AlterSecretResponse);
rpc AlterName(AlterNameRequest) returns (AlterNameResponse);
rpc AlterSource(AlterSourceRequest) returns (AlterSourceResponse);
rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse);
Expand Down
31 changes: 27 additions & 4 deletions src/common/secret/src/secret_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,23 @@ impl LocalSecretManager {

pub fn add_secret(&self, secret_id: SecretId, secret: Vec<u8>) {
let mut secret_guard = self.secrets.write();
secret_guard.insert(secret_id, secret);
if secret_guard.insert(secret_id, secret).is_some() {
tracing::error!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I tend to make it a real error i.e. return Err(...) and reject the add_secret

The status quo is somehow weird to me - an error log is printed, but the action actually succeeded.

Copy link
Contributor Author

@yuhao-su yuhao-su Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually intentional or something we have to do.

  1. add_secret is called using the LocalSecretManager on each worker node asynchronizly by notification service, so we can't return an error just like any other notification serivces.
  2. alter secret result was already persisted to meta before we call add_secret. So returning an error here can confuse users unless we decide to roll back the meta commit. Besides, this secret_id cames from meta catalog, so we should trust the data from meta instead of LocalSecretManager

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still a little weird to me. I understand the LocalSecretManager cannot reject adding a new secret but we can always do such check on the meta node.

secret_id = secret_id,
"adding a secret but it already exists, overwriting it"
);
};
}

pub fn update_secret(&self, secret_id: SecretId, secret: Vec<u8>) {
let mut secret_guard = self.secrets.write();
if secret_guard.insert(secret_id, secret).is_none() {
tracing::error!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it trigger the existing actors to upgrade to the new secret?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it trigger the existing actors to upgrade to the new secret?

No. Already doced this limitation in PR.

secret_id = secret_id,
"updating a secret but it does not exist, adding it"
);
}
self.remove_secret_file_if_exist(&secret_id);
}

pub fn init_secrets(&self, secrets: Vec<PbSecret>) {
Expand Down Expand Up @@ -174,14 +190,21 @@ impl LocalSecretManager {
}

fn get_secret_value(pb_secret_bytes: &[u8]) -> SecretResult<Vec<u8>> {
let pb_secret = risingwave_pb::secret::Secret::decode(pb_secret_bytes)
.context("failed to decode secret")?;
let secret_value = match pb_secret.get_secret_backend().unwrap() {
let secret_value = match Self::get_pb_secret_backend(pb_secret_bytes)? {
risingwave_pb::secret::secret::SecretBackend::Meta(backend) => backend.value.clone(),
risingwave_pb::secret::secret::SecretBackend::HashicorpVault(_) => {
return Err(anyhow!("hashicorp_vault backend is not implemented yet").into())
}
};
Ok(secret_value)
}

/// Get the secret backend from the given decrypted secret bytes.
pub fn get_pb_secret_backend(
pb_secret_bytes: &[u8],
) -> SecretResult<risingwave_pb::secret::secret::SecretBackend> {
let pb_secret = risingwave_pb::secret::Secret::decode(pb_secret_bytes)
.context("failed to decode secret")?;
Ok(pb_secret.get_secret_backend().unwrap().clone())
}
}
3 changes: 3 additions & 0 deletions src/compute/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ impl ObserverState for ComputeObserverNode {
Operation::Delete => {
LocalSecretManager::global().remove_secret(s.id);
}
Operation::Update => {
LocalSecretManager::global().update_secret(s.id, s.value);
}
_ => {
panic!("error type notification");
}
Expand Down
33 changes: 33 additions & 0 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,16 @@ pub trait CatalogWriter: Send + Sync {

async fn drop_secret(&self, secret_id: SecretId) -> Result<()>;

async fn alter_secret(
&self,
secret_id: u32,
secret_name: String,
database_id: u32,
schema_id: u32,
owner_id: u32,
payload: Vec<u8>,
) -> Result<()>;

async fn alter_name(
&self,
object_id: alter_name_request::Object,
Expand Down Expand Up @@ -511,6 +521,29 @@ impl CatalogWriter for CatalogWriterImpl {
let version = self.meta_client.alter_swap_rename(object).await?;
self.wait_version(version).await
}

async fn alter_secret(
&self,
secret_id: u32,
secret_name: String,
database_id: u32,
schema_id: u32,
owner_id: u32,
payload: Vec<u8>,
) -> Result<()> {
let version = self
.meta_client
.alter_secret(
secret_id,
secret_name,
database_id,
schema_id,
owner_id,
payload,
)
.await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/catalog/secret_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

use risingwave_pb::catalog::PbSecret;

use crate::catalog::{DatabaseId, OwnedByUserCatalog, SecretId};
use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, SecretId};
use crate::user::UserId;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct SecretCatalog {
pub id: SecretId,
pub name: String,
pub database_id: DatabaseId,
pub schema_id: SchemaId,
pub value: Vec<u8>,
pub owner: UserId,
}
Expand All @@ -34,6 +35,7 @@ impl From<&PbSecret> for SecretCatalog {
owner: value.owner,
name: value.name.clone(),
value: value.value.clone(),
schema_id: value.schema_id,
}
}
}
Expand Down
101 changes: 101 additions & 0 deletions src/frontend/src/handler/alter_secret.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use pgwire::pg_response::StatementType;
use prost::Message;
use risingwave_common::bail_not_implemented;
use risingwave_common::license::Feature;
use risingwave_common::secret::LocalSecretManager;
use risingwave_pb::secret::secret;
use risingwave_sqlparser::ast::{AlterSecretOperation, ObjectName, SqlOption};

use super::create_secret::{get_secret_payload, secret_to_str};
use super::drop_secret::fetch_secret_catalog_with_db_schema_id;
use crate::error::Result;
use crate::handler::{HandlerArgs, RwPgResponse};
use crate::WithOptions;

pub async fn handle_alter_secret(
handler_args: HandlerArgs,
secret_name: ObjectName,
sql_options: Vec<SqlOption>,
operation: AlterSecretOperation,
) -> Result<RwPgResponse> {
Feature::SecretManagement
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;

let session = handler_args.session;

if let Some((secret_catalog, _, _)) =
fetch_secret_catalog_with_db_schema_id(&session, &secret_name, false)?
{
let AlterSecretOperation::ChangeCredential { new_credential } = operation;

let secret_id = secret_catalog.id.secret_id();
let secret_payload = if sql_options.is_empty() {
let original_pb_secret_bytes = LocalSecretManager::global()
.get_secret(secret_id)
.ok_or(anyhow!(
"Failed to get secret in secret manager, secret_id: {}",
secret_id
))?;
let original_secret_backend =
LocalSecretManager::get_pb_secret_backend(&original_pb_secret_bytes)?;
match original_secret_backend {
secret::SecretBackend::Meta(_) => {
let new_secret_value_bytes =
secret_to_str(&new_credential)?.as_bytes().to_vec();
let secret_payload = risingwave_pb::secret::Secret {
secret_backend: Some(risingwave_pb::secret::secret::SecretBackend::Meta(
risingwave_pb::secret::SecretMetaBackend {
value: new_secret_value_bytes,
},
)),
};
secret_payload.encode_to_vec()
}
secret::SecretBackend::HashicorpVault(_) => {
bail_not_implemented!("hashicorp_vault backend is not implemented yet")
}
}
} else {
let with_options = WithOptions::try_from(sql_options.as_ref() as &[SqlOption])?;
get_secret_payload(new_credential, with_options)?
};

let catalog_writer = session.catalog_writer()?;

catalog_writer
.alter_secret(
secret_id,
secret_catalog.name.clone(),
secret_catalog.database_id,
secret_catalog.schema_id,
secret_catalog.owner,
secret_payload,
)
.await?;

Ok(RwPgResponse::empty_result(StatementType::ALTER_SECRET))
} else {
Ok(RwPgResponse::builder(StatementType::ALTER_SECRET)
.notice(format!(
"secret \"{}\" does not exist, skipping",
secret_name
))
.into())
}
}
Loading