Skip to content

Commit

Permalink
ensure cdc connector is inited before emit init barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Nov 2, 2023
1 parent b7195f8 commit eff2edd
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 67 deletions.
51 changes: 51 additions & 0 deletions e2e_test/source/cdc_inline/postgres_create_drop.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# create and drop CDC postgres tables concurrently
control substitution on

system ok
psql -c "
DROP DATABASE IF EXISTS testdb1; CREATE DATABASE testdb1;
USE testdb1;
CREATE TABLE tt1 (v1 int primary key, v2 timestamp);
INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');"

statement ok
create table tt1 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:postgres}',
database.name = 'testdb1',
schema.name = 'public',
table.name = 'tt1',
slot.name = 'tt1_slot',
);

sleep 1s

query IT
SELECT * FROM tt1;
----
1 2023-10-23 10:00:00+00:00

statement ok
drop table tt1;

statement ok
create table tt1 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:postgres}',
database.name = 'testdb1',
schema.name = 'public',
table.name = 'tt1',
slot.name = 'tt1_slot',
);
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,19 @@ public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) {
}

/** Start to run the cdc engine */
public void start() {
public void start() throws InterruptedException {
if (isRunning()) {
LOG.info("engine#{} already started", engine.getId());
return;
}

// put a handshake message to notify the Source executor
engine.getOutputChannel()
.put(
ConnectorServiceProto.GetEventStreamResponse.newBuilder()
.setSourceId(engine.getId())
.setHandshake(true)
.build());
executor.execute(engine);
running.set(true);
LOG.info("engine#{} started", engine.getId());
Expand Down
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ message GetEventStreamRequest {
message GetEventStreamResponse {
uint64 source_id = 1;
repeated CdcMessage events = 2;
bool handshake = 3;
}

message ValidateSourceRequest {
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ impl<'a> From<&'a str> for CdcSourceType {
}
}

impl CdcSourceType {
pub fn as_str_name(&self) -> &str {
match self {
CdcSourceType::Mysql => "MySQL",
CdcSourceType::Postgres => "Postgres",
CdcSourceType::Citus => "Citus",
CdcSourceType::Unspecified => "Unspecified",
}
}
}

#[derive(Clone, Debug, Default)]
pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Properties specified in the WITH clause by user
Expand Down
121 changes: 66 additions & 55 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use jni::objects::JValue;
use prost::Message;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::JniSenderType;
use risingwave_jni_core::{JniReceiverType, JniSenderType};
use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse};
use tokio::sync::mpsc;

Expand All @@ -46,6 +46,7 @@ pub struct CdcSplitReader<T: CdcSourceTypeTrait> {
snapshot_done: bool,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
rx: JniReceiverType<anyhow::Result<GetEventStreamResponse>>,
}

const DEFAULT_CHANNEL_SIZE: usize = 16;
Expand All @@ -66,49 +67,13 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
assert_eq!(splits.len(), 1);
let split = splits.into_iter().next().unwrap();
let split_id = split.id();
match T::source_type() {
CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: None,
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
}),
CdcSourceType::Citus => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: split.server_addr().clone(),
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

fn into_stream(self) -> BoxSourceWithStateStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
}
}

impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn into_data_stream(self) {
// rewrite the hostname and port for the split
let mut properties = self.conn_props.props.clone();
let mut properties = conn_props.props.clone();

// For citus, we need to rewrite the table.name to capture sharding tables
if self.server_addr.is_some() {
let addr = self.server_addr.unwrap();
if split.server_addr().is_some() {
let addr = split.server_addr().clone().unwrap();
let host_addr = HostAddr::from_str(&addr)
.map_err(|err| anyhow!("invalid server address for cdc split. {}", err))?;
properties.insert("hostname".to_string(), host_addr.host);
Expand All @@ -121,30 +86,27 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
properties.insert("table.name".into(), table_name);
}

let source_id = split.split_id() as u64;
let source_type = conn_props.get_source_type_pb();
let (mut tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

let jvm = JVM
.get_or_init()
.map_err(|e| anyhow!("jvm not initialized properly: {:?}", e))?;

let get_event_stream_request = GetEventStreamRequest {
source_id: self.source_id,
source_type: self.conn_props.get_source_type_pb() as _,
start_offset: self.start_offset.unwrap_or_default(),
source_id,
source_type: source_type as _,
start_offset: split.start_offset().clone().unwrap_or_default(),
properties,
snapshot_done: self.snapshot_done,
snapshot_done: split.snapshot_done(),
};

let source_id = get_event_stream_request.source_id.to_string();
let source_type = get_event_stream_request.source_type.to_string();

std::thread::spawn(move || {
let result: anyhow::Result<_> = try {
let env = jvm.attach_current_thread()?;

let get_event_stream_request_bytes =
env.byte_array_from_slice(&Message::encode_to_vec(&get_event_stream_request))?;

(env, get_event_stream_request_bytes)
};

Expand All @@ -171,21 +133,70 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {

match result {
Ok(_) => {
tracing::info!("end of jni call runJniDbzSourceThread");
tracing::info!(?source_id, "end of jni call runJniDbzSourceThread");
}
Err(e) => {
tracing::error!("jni call error: {:?}", e);
tracing::error!(?source_id, "jni call error: {:?}", e);
}
}
});

if let Some(res) = rx.recv().await {
let resp: GetEventStreamResponse = res?;
assert_eq!(resp.handshake);
}
tracing::info!("cdc split reader thread started");

match T::source_type() {
CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: None,
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
rx,
}),
CdcSourceType::Citus => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: split.server_addr().clone(),
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
rx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

fn into_stream(self) -> BoxSourceWithStateStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
}
}

impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn into_data_stream(self) {
let source_type = T::source_type();
let mut rx = self.rx;
let source_id = self.source_id.to_string();
let metrics = self.source_ctx.metrics.clone();

while let Some(result) = rx.recv().await {
let GetEventStreamResponse { events, .. } = result?;
tracing::trace!("receive events {:?}", events.len());
self.source_ctx
.metrics
tracing::debug!("receive events {:?}", events.len());
metrics
.connector_source_rows_received
.with_label_values(&[&source_type, &source_id])
.with_label_values(&[source_type.as_str_name(), &source_id])
.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
Expand Down
9 changes: 8 additions & 1 deletion src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ impl JavaVmWrapper {
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size));
.option(format!("-Xmx{}", jvm_heap_size))
// Quoted from the debezium document:
// > Your application should always properly stop the engine to ensure graceful and complete
// > shutdown and that each source record is sent to the application exactly one time.
// In RisingWave we assume the upstream changelog may contain duplicate events and
// handle conflicts in the mview operator, thus we don't need to obey the above
// instructions. So we decrease the wait time here to reclaim jvm thread faster.
.option("-Ddebezium.embedded.shutdown.pause.before.interrupt.ms=1");

tracing::info!("JVM args: {:?}", args_builder);
let jvm_args = args_builder
Expand Down
11 changes: 1 addition & 10 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,7 @@ impl ConnectorSource {
// TODO: is this reader split across multiple threads...? Realistically, we want
// source_ctx to live in a single actor.
let source_ctx = source_ctx.clone();
async move {
create_split_reader(
*props,
splits,
parser_config,
source_ctx,
data_gen_columns,
)
.await
}
create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns)
}))
.await?
};
Expand Down

0 comments on commit eff2edd

Please sign in to comment.