Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/cargo/pbjson-0.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao authored Sep 19, 2023
2 parents ab21bda + 161b9b0 commit 475307d
Show file tree
Hide file tree
Showing 20 changed files with 495 additions and 255 deletions.
260 changes: 89 additions & 171 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ echo "--- e2e, $mode, batch"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cluster_start
sqllogictest -p 4566 -d dev './e2e_test/ddl/**/*.slt' --junit "batch-ddl-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/background_ddl/**/*.slt' --junit "batch-ddl-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/visibility_mode/*.slt' --junit "batch-${profile}"
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
sqllogictest -p 4566 -d test './e2e_test/database/test.slt'
Expand Down
73 changes: 73 additions & 0 deletions e2e_test/background_ddl/basic.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
statement ok
SET BACKGROUND_DDL=true;

statement ok
ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 4;

statement ok
CREATE TABLE t (v1 int);

statement ok
INSERT INTO t select * from generate_series(1, 500000);

statement ok
FLUSH;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;

statement ok
CREATE MATERIALIZED VIEW m2 as SELECT * FROM t;

statement ok
CREATE MATERIALIZED VIEW m3 as SELECT * FROM t;

sleep 3s

query I
select count(*) from rw_catalog.rw_ddl_progress;
----
3

statement error
SELECT * FROM m1;

# Meta should always reject duplicate mview.
statement error
CREATE MATERIALIZED VIEW m3 as SELECT * FROM t;

# Wait for background ddl to finish
sleep 30s

query I
select count(*) from m1;
----
500000

query I
select count(*) from m2;
----
500000

query I
select count(*) from m3;
----
500000

statement ok
DROP MATERIALIZED VIEW m1;

statement ok
DROP MATERIALIZED VIEW m2;

statement ok
DROP MATERIALIZED VIEW m3;

statement ok
DROP TABLE t;

statement ok
SET BACKGROUND_DDL=false;

statement ok
ALTER SYSTEM SET max_concurrent_creating_streaming_jobs TO 1;
7 changes: 7 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,16 @@ message DropSinkResponse {
uint64 version = 2;
}

enum StreamJobExecutionMode {
STREAM_JOB_EXECUTION_MODE_UNSPECIFIED = 0;
BACKGROUND = 1;
FOREGROUND = 2;
}

message CreateMaterializedViewRequest {
catalog.Table materialized_view = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
StreamJobExecutionMode stream_job_execution_mode = 3;
}

message CreateMaterializedViewResponse {
Expand Down
20 changes: 19 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
// TODO: refine this using the adt_const_params feature.
const CONFIG_KEYS: [&str; 37] = [
const CONFIG_KEYS: [&str; 38] = [
"RW_IMPLICIT_FLUSH",
"CREATE_COMPACTION_GROUP_FOR_MV",
"QUERY_MODE",
Expand Down Expand Up @@ -74,6 +74,7 @@ const CONFIG_KEYS: [&str; 37] = [
"STREAMING_RATE_LIMIT",
"CDC_BACKFILL",
"RW_STREAMING_OVER_WINDOW_CACHE_POLICY",
"BACKGROUND_DDL",
];

// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
Expand Down Expand Up @@ -115,6 +116,7 @@ const STANDARD_CONFORMING_STRINGS: usize = 33;
const STREAMING_RATE_LIMIT: usize = 34;
const CDC_BACKFILL: usize = 35;
const STREAMING_OVER_WINDOW_CACHE_POLICY: usize = 36;
const BACKGROUND_DDL: usize = 37;

trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
fn entry_name() -> &'static str;
Expand Down Expand Up @@ -339,6 +341,7 @@ type RowSecurity = ConfigBool<ROW_SECURITY, true>;
type StandardConformingStrings = ConfigString<STANDARD_CONFORMING_STRINGS>;
type StreamingRateLimit = ConfigU64<STREAMING_RATE_LIMIT, 0>;
type CdcBackfill = ConfigBool<CDC_BACKFILL, false>;
type BackgroundDdl = ConfigBool<BACKGROUND_DDL, false>;

/// Report status or notice to caller.
pub trait ConfigReporter {
Expand Down Expand Up @@ -486,6 +489,8 @@ pub struct ConfigMap {
/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "recent_first_n" or "recent_last_n".
streaming_over_window_cache_policy: OverWindowCachePolicy,

background_ddl: BackgroundDdl,
}

impl ConfigMap {
Expand Down Expand Up @@ -603,6 +608,8 @@ impl ConfigMap {
self.cdc_backfill = val.as_slice().try_into()?
} else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) {
self.streaming_over_window_cache_policy = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) {
self.background_ddl = val.as_slice().try_into()?;
} else {
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
}
Expand Down Expand Up @@ -690,6 +697,8 @@ impl ConfigMap {
Ok(self.cdc_backfill.to_string())
} else if key.eq_ignore_ascii_case(OverWindowCachePolicy::entry_name()) {
Ok(self.streaming_over_window_cache_policy.to_string())
} else if key.eq_ignore_ascii_case(BackgroundDdl::entry_name()) {
Ok(self.background_ddl.to_string())
} else {
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
}
Expand Down Expand Up @@ -882,6 +891,11 @@ impl ConfigMap {
setting: self.streaming_over_window_cache_policy.to_string(),
description: String::from(r#"Cache policy for partition cache in streaming over window. Can be "full", "recent", "recent_first_n" or "recent_last_n"."#),
},
VariableInfo{
name: BackgroundDdl::entry_name().to_lowercase(),
setting: self.background_ddl.to_string(),
description: String::from("Run DDL statements in background"),
},
]
}

Expand Down Expand Up @@ -1020,4 +1034,8 @@ impl ConfigMap {
pub fn get_streaming_over_window_cache_policy(&self) -> OverWindowCachePolicy {
self.streaming_over_window_cache_policy
}

pub fn get_background_ddl(&self) -> bool {
self.background_ddl.0
}
}
3 changes: 3 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ mysql_common = { version = "0.30", default-features = false, features = [
"chrono",
] }
nexmark = { version = "0.2", features = ["serde"] }
nkeys = "0.3.2"
num-bigint = "0.4"
opendal = "0.39"
parking_lot = "0.12"
Expand Down Expand Up @@ -101,6 +102,7 @@ serde_with = { version = "3", features = ["json"] }
simd-json = "0.10.6"
tempfile = "3"
thiserror = "1"
time = "0.3.28"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
Expand All @@ -117,6 +119,7 @@ tonic = { workspace = true }
tracing = "0.1"
url = "2"
urlencoding = "2"

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
118 changes: 87 additions & 31 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use risingwave_common::error::anyhow_error;
use serde_derive::{Deserialize, Serialize};
use serde_with::json::JsonString;
use serde_with::{serde_as, DisplayFromStr};
use time::OffsetDateTime;

use crate::aws_auth::AwsAuthProps;
use crate::deserialize_duration_from_string;
use crate::sink::SinkError;

use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
// sink.

Expand Down Expand Up @@ -342,37 +343,73 @@ pub struct UpsertMessage<'a> {
#[serde_as]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct NatsCommon {
#[serde(rename = "nats.server_url")]
#[serde(rename = "server_url")]
pub server_url: String,
#[serde(rename = "nats.subject")]
#[serde(rename = "subject")]
pub subject: String,
#[serde(rename = "nats.user")]
#[serde(rename = "connect_mode")]
pub connect_mode: Option<String>,
#[serde(rename = "username")]
pub user: Option<String>,
#[serde(rename = "nats.password")]
#[serde(rename = "password")]
pub password: Option<String>,
#[serde(rename = "nats.max_bytes")]
#[serde(rename = "jwt")]
pub jwt: Option<String>,
#[serde(rename = "nkey")]
pub nkey: Option<String>,
#[serde(rename = "max_bytes")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_bytes: Option<i64>,
#[serde(rename = "nats.max_messages")]
#[serde(rename = "max_messages")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_messages: Option<i64>,
#[serde(rename = "nats.max_messages_per_subject")]
#[serde(rename = "max_messages_per_subject")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_messages_per_subject: Option<i64>,
#[serde(rename = "nats.max_consumers")]
#[serde(rename = "max_consumers")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_consumers: Option<i32>,
#[serde(rename = "nats.max_message_size")]
#[serde(rename = "max_message_size")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_message_size: Option<i32>,
}

impl NatsCommon {
pub(crate) async fn build_client(&self) -> anyhow::Result<async_nats::Client> {
let mut connect_options = async_nats::ConnectOptions::new();
if let (Some(v_user), Some(v_password)) = (self.user.as_ref(), self.password.as_ref()) {
connect_options = connect_options.user_and_password(v_user.into(), v_password.into());
}
match self.connect_mode.as_deref() {
Some("user_and_password") => {
if let (Some(v_user), Some(v_password)) =
(self.user.as_ref(), self.password.as_ref())
{
connect_options =
connect_options.user_and_password(v_user.into(), v_password.into())
} else {
return Err(anyhow_error!(
"nats connect mode is user_and_password, but user or password is empty"
));
}
}

Some("credential") => {
if let (Some(v_nkey), Some(v_jwt)) = (self.nkey.as_ref(), self.jwt.as_ref()) {
connect_options = connect_options
.credentials(&self.create_credential(v_nkey, v_jwt)?)
.expect("failed to parse static creds")
} else {
return Err(anyhow_error!(
"nats connect mode is credential, but nkey or jwt is empty"
));
}
}
Some("plain") => {}
_ => {
return Err(anyhow_error!(
"nats connect mode only accept user_and_password/credential/plain"
));
}
};

let servers = self.server_url.split(',').collect::<Vec<&str>>();
let client = connect_options
.connect(
Expand All @@ -394,8 +431,8 @@ impl NatsCommon {

pub(crate) async fn build_consumer(
&self,
split_id: i32,
start_sequence: Option<u64>,
split_id: String,
start_sequence: NatsOffset,
) -> anyhow::Result<
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
> {
Expand All @@ -406,23 +443,28 @@ impl NatsCommon {
ack_policy: jetstream::consumer::AckPolicy::None,
..Default::default()
};
match start_sequence {
Some(v) => {
let consumer = stream
.get_or_create_consumer(&name, {
config.deliver_policy = DeliverPolicy::ByStartSequence {
start_sequence: v + 1,
};
config
})
.await?;
Ok(consumer)
}
None => {
let consumer = stream.get_or_create_consumer(&name, config).await?;
Ok(consumer)

let deliver_policy = match start_sequence {
NatsOffset::Earliest => DeliverPolicy::All,
NatsOffset::Latest => DeliverPolicy::Last,
NatsOffset::SequenceNumber(v) => {
let parsed = v.parse::<u64>()?;
DeliverPolicy::ByStartSequence {
start_sequence: 1 + parsed,
}
}
}
NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime {
start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?,
},
NatsOffset::None => DeliverPolicy::All,
};
let consumer = stream
.get_or_create_consumer(&name, {
config.deliver_policy = deliver_policy;
config
})
.await?;
Ok(consumer)
}

pub(crate) async fn build_or_get_stream(
Expand All @@ -432,6 +474,7 @@ impl NatsCommon {
let mut config = jetstream::stream::Config {
// the subject default use name value
name: self.subject.clone(),
max_bytes: 1000000,
..Default::default()
};
if let Some(v) = self.max_bytes {
Expand All @@ -452,4 +495,17 @@ impl NatsCommon {
let stream = jetstream.get_or_create_stream(config).await?;
Ok(stream)
}

pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> anyhow::Result<String> {
let creds = format!(
"-----BEGIN NATS USER JWT-----\n{}\n------END NATS USER JWT------\n\n\
************************* IMPORTANT *************************\n\
NKEY Seed printed below can be used to sign and prove identity.\n\
NKEYs are sensitive and should be treated as secrets.\n\n\
-----BEGIN USER NKEY SEED-----\n{}\n------END USER NKEY SEED------\n\n\
*************************************************************",
jwt, seed
);
Ok(creds)
}
}
Loading

0 comments on commit 475307d

Please sign in to comment.