diff --git a/e2e_test/source/cdc_inline/postgres_create_drop.slt b/e2e_test/source/cdc_inline/postgres_create_drop.slt new file mode 100644 index 0000000000000..47ae5ccd542ad --- /dev/null +++ b/e2e_test/source/cdc_inline/postgres_create_drop.slt @@ -0,0 +1,51 @@ +# create and drop CDC postgres tables concurrently +control substitution on + +system ok +psql -c " + DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1; + USE testdb1; + CREATE TABLE tt1 (v1 int primary key, v2 timestamp); + INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');" + +statement ok +create table tt1 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:postgres}', + database.name = 'testdb1', + schema.name = 'public', + table.name = 'tt1', + slot.name = 'tt1_slot', +); + +sleep 1s + +query IT +SELECT * FROM tt1; +---- +1 2023-10-23 10:00:00+00:00 + +statement ok +drop table tt1; + +statement ok +create table tt1 (v1 int, + v2 timestamptz, + PRIMARY KEY (v1) +) with ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:postgres}', + database.name = 'testdb1', + schema.name = 'public', + table.name = 'tt1', + slot.name = 'tt1_slot', +); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java index ba9511b02303b..38454dc0e0a9b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java @@ -98,12 +98,19 @@ public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) { } /** Start to run the cdc engine */ - public void start() { + public void start() throws InterruptedException { if (isRunning()) { LOG.info("engine#{} already started", engine.getId()); return; } + // put a handshake message to notify the Source executor + engine.getOutputChannel() + .put( + ConnectorServiceProto.GetEventStreamResponse.newBuilder() + .setSourceId(engine.getId()) + .setHandshake(true) + .build()); executor.execute(engine); running.set(true); LOG.info("engine#{} started", engine.getId()); diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 56ce3c252bd2f..5f2e01fb9b712 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -169,6 +169,7 @@ message GetEventStreamRequest { message GetEventStreamResponse { uint64 source_id = 1; repeated CdcMessage events = 2; + bool handshake = 3; } message ValidateSourceRequest { diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 8f5a2ee83b2ee..0ca48fa0217ed 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -56,6 +56,17 @@ impl<'a> From<&'a str> for CdcSourceType { } } +impl CdcSourceType { + pub fn as_str_name(&self) -> &str { + match self { + CdcSourceType::Mysql => "MySQL", + CdcSourceType::Postgres => "Postgres", + CdcSourceType::Citus => "Citus", + CdcSourceType::Unspecified => "Unspecified", + } + } +} + #[derive(Clone, Debug, Default)] pub struct CdcProperties { /// Properties specified in the WITH clause by user diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 491ce18cfd493..9fcf6db218093 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -22,7 +22,7 @@ use jni::objects::JValue; use prost::Message; use risingwave_common::util::addr::HostAddr; use risingwave_jni_core::jvm_runtime::JVM; -use risingwave_jni_core::JniSenderType; +use risingwave_jni_core::{JniReceiverType, JniSenderType}; use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse}; use tokio::sync::mpsc; @@ -46,6 +46,7 @@ pub struct CdcSplitReader { snapshot_done: bool, parser_config: ParserConfig, source_ctx: SourceContextRef, + rx: JniReceiverType>, } const DEFAULT_CHANNEL_SIZE: usize = 16; @@ -66,49 +67,13 @@ impl SplitReader for CdcSplitReader { assert_eq!(splits.len(), 1); let split = splits.into_iter().next().unwrap(); let split_id = split.id(); - match T::source_type() { - CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self { - source_id: split.split_id() as u64, - start_offset: split.start_offset().clone(), - server_addr: None, - conn_props, - split_id, - snapshot_done: split.snapshot_done(), - parser_config, - source_ctx, - }), - CdcSourceType::Citus => Ok(Self { - source_id: split.split_id() as u64, - start_offset: split.start_offset().clone(), - server_addr: split.server_addr().clone(), - conn_props, - split_id, - snapshot_done: split.snapshot_done(), - parser_config, - source_ctx, - }), - CdcSourceType::Unspecified => { - unreachable!(); - } - } - } - - fn into_stream(self) -> BoxSourceWithStateStream { - let parser_config = self.parser_config.clone(); - let source_context = self.source_ctx.clone(); - into_chunk_stream(self, parser_config, source_context) - } -} -impl CommonSplitReader for CdcSplitReader { - #[try_stream(ok = Vec, error = anyhow::Error)] - async fn into_data_stream(self) { // rewrite the hostname and port for the split - let mut properties = self.conn_props.props.clone(); + let mut properties = conn_props.props.clone(); // For citus, we need to rewrite the table.name to capture sharding tables - if self.server_addr.is_some() { - let addr = self.server_addr.unwrap(); + if split.server_addr().is_some() { + let addr = split.server_addr().clone().unwrap(); let host_addr = HostAddr::from_str(&addr) .map_err(|err| anyhow!("invalid server address for cdc split. {}", err))?; properties.insert("hostname".to_string(), host_addr.host); @@ -121,6 +86,8 @@ impl CommonSplitReader for CdcSplitReader { properties.insert("table.name".into(), table_name); } + let source_id = split.split_id() as u64; + let source_type = conn_props.get_source_type_pb(); let (mut tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let jvm = JVM @@ -128,23 +95,18 @@ impl CommonSplitReader for CdcSplitReader { .map_err(|e| anyhow!("jvm not initialized properly: {:?}", e))?; let get_event_stream_request = GetEventStreamRequest { - source_id: self.source_id, - source_type: self.conn_props.get_source_type_pb() as _, - start_offset: self.start_offset.unwrap_or_default(), + source_id, + source_type: source_type as _, + start_offset: split.start_offset().clone().unwrap_or_default(), properties, - snapshot_done: self.snapshot_done, + snapshot_done: split.snapshot_done(), }; - let source_id = get_event_stream_request.source_id.to_string(); - let source_type = get_event_stream_request.source_type.to_string(); - std::thread::spawn(move || { let result: anyhow::Result<_> = try { let env = jvm.attach_current_thread()?; - let get_event_stream_request_bytes = env.byte_array_from_slice(&Message::encode_to_vec(&get_event_stream_request))?; - (env, get_event_stream_request_bytes) }; @@ -171,21 +133,70 @@ impl CommonSplitReader for CdcSplitReader { match result { Ok(_) => { - tracing::info!("end of jni call runJniDbzSourceThread"); + tracing::info!(?source_id, "end of jni call runJniDbzSourceThread"); } Err(e) => { - tracing::error!("jni call error: {:?}", e); + tracing::error!(?source_id, "jni call error: {:?}", e); } } }); + if let Some(res) = rx.recv().await { + let resp: GetEventStreamResponse = res?; + assert_eq!(resp.handshake); + } + tracing::info!("cdc split reader thread started"); + + match T::source_type() { + CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self { + source_id: split.split_id() as u64, + start_offset: split.start_offset().clone(), + server_addr: None, + conn_props, + split_id, + snapshot_done: split.snapshot_done(), + parser_config, + source_ctx, + rx, + }), + CdcSourceType::Citus => Ok(Self { + source_id: split.split_id() as u64, + start_offset: split.start_offset().clone(), + server_addr: split.server_addr().clone(), + conn_props, + split_id, + snapshot_done: split.snapshot_done(), + parser_config, + source_ctx, + rx, + }), + CdcSourceType::Unspecified => { + unreachable!(); + } + } + } + + fn into_stream(self) -> BoxSourceWithStateStream { + let parser_config = self.parser_config.clone(); + let source_context = self.source_ctx.clone(); + into_chunk_stream(self, parser_config, source_context) + } +} + +impl CommonSplitReader for CdcSplitReader { + #[try_stream(ok = Vec, error = anyhow::Error)] + async fn into_data_stream(self) { + let source_type = T::source_type(); + let mut rx = self.rx; + let source_id = self.source_id.to_string(); + let metrics = self.source_ctx.metrics.clone(); + while let Some(result) = rx.recv().await { let GetEventStreamResponse { events, .. } = result?; - tracing::trace!("receive events {:?}", events.len()); - self.source_ctx - .metrics + tracing::debug!("receive events {:?}", events.len()); + metrics .connector_source_rows_received - .with_label_values(&[&source_type, &source_id]) + .with_label_values(&[source_type.as_str_name(), &source_id]) .inc_by(events.len() as u64); let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); yield msgs; diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs index b51d16943e45c..13af32a48d584 100644 --- a/src/jni_core/src/jvm_runtime.rs +++ b/src/jni_core/src/jvm_runtime.rs @@ -100,7 +100,14 @@ impl JavaVmWrapper { .option("-Dis_embedded_connector=true") .option(format!("-Djava.class.path={}", class_vec.join(":"))) .option("-Xms16m") - .option(format!("-Xmx{}", jvm_heap_size)); + .option(format!("-Xmx{}", jvm_heap_size)) + // Quoted from the debezium document: + // > Your application should always properly stop the engine to ensure graceful and complete + // > shutdown and that each source record is sent to the application exactly one time. + // In RisingWave we assume the upstream changelog may contain duplicate events and + // handle conflicts in the mview operator, thus we don't need to obey the above + // instructions. So we decrease the wait time here to reclaim jvm thread faster. + .option("-Ddebezium.embedded.shutdown.pause.before.interrupt.ms=1"); tracing::info!("JVM args: {:?}", args_builder); let jvm_args = args_builder diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 31ee763d2a0b9..e0a3ac62276d1 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -162,16 +162,7 @@ impl ConnectorSource { // TODO: is this reader split across multiple threads...? Realistically, we want // source_ctx to live in a single actor. let source_ctx = source_ctx.clone(); - async move { - create_split_reader( - *props, - splits, - parser_config, - source_ctx, - data_gen_columns, - ) - .await - } + create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns) })) .await? };