Skip to content

Commit

Permalink
fix(cdc): ensure connector is inited after the CREATE TABLE is finish…
Browse files Browse the repository at this point in the history
…ed (#13130) (#13251)

Co-authored-by: StrikeW <[email protected]>
  • Loading branch information
github-actions[bot] and StrikeW authored Nov 4, 2023
1 parent 6510ff4 commit 5a6df32
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 75 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ echo "--- e2e, ci-1cn-1fe, mysql & postgres cdc"
mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc.sql

# import data to postgres
export PGHOST=db PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test
export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test
createdb
psql < ./e2e_test/source/cdc/postgres_cdc.sql

Expand Down
53 changes: 53 additions & 0 deletions e2e_test/source/cdc_inline/postgres_create_drop.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# create and drop CDC postgres tables concurrently
control substitution on

system ok
psql -c "
DROP TABLE IF EXISTS tt1;
CREATE TABLE tt1 (v1 int primary key, v2 timestamp);
INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');"

statement ok
create table tt1 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:8432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:123456}',
database.name = '${PGDATABASE:mydb}',
schema.name = 'public',
table.name = 'tt1',
slot.name = 'tt1_slot',
);

statement ok
drop table tt1;

statement ok
create table tt1 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:8432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:123456}',
database.name = '${PGDATABASE:mydb}',
schema.name = 'public',
table.name = 'tt1',
slot.name = 'tt1_slot',
);

sleep 3s

query IT
SELECT * FROM tt1;
----
1 2023-10-23 10:00:00+00:00

statement ok
drop table tt1;
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import com.risingwave.connector.api.source.*;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -40,8 +40,7 @@ private DbzCdcEngineRunner(CdcEngine engine) {
}

public static CdcEngineRunner newCdcEngineRunner(
DbzConnectorConfig config,
StreamObserver<ConnectorServiceProto.GetEventStreamResponse> responseObserver) {
DbzConnectorConfig config, StreamObserver<GetEventStreamResponse> responseObserver) {
DbzCdcEngineRunner runner = null;
try {
var sourceId = config.getSourceId();
Expand Down Expand Up @@ -98,12 +97,21 @@ public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) {
}

/** Start to run the cdc engine */
public void start() {
public void start() throws InterruptedException {
if (isRunning()) {
LOG.info("engine#{} already started", engine.getId());
return;
}

// put a handshake message to notify the Source executor
var controlInfo =
GetEventStreamResponse.ControlInfo.newBuilder().setHandshakeOk(true).build();
engine.getOutputChannel()
.put(
GetEventStreamResponse.newBuilder()
.setSourceId(engine.getId())
.setControl(controlInfo)
.build());
executor.execute(engine);
running.set(true);
LOG.info("engine#{} started", engine.getId());
Expand Down
4 changes: 4 additions & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,12 @@ message GetEventStreamRequest {
}

message GetEventStreamResponse {
message ControlInfo {
bool handshake_ok = 1;
}
uint64 source_id = 1;
repeated CdcMessage events = 2;
ControlInfo control = 3;
}

message ValidateSourceRequest {
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ impl<'a> From<&'a str> for CdcSourceType {
}
}

impl CdcSourceType {
pub fn as_str_name(&self) -> &str {
match self {
CdcSourceType::Mysql => "MySQL",
CdcSourceType::Postgres => "Postgres",
CdcSourceType::Citus => "Citus",
CdcSourceType::Unspecified => "Unspecified",
}
}
}

#[derive(Clone, Debug, Default)]
pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Properties specified in the WITH clause by user
Expand Down
128 changes: 72 additions & 56 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use jni::objects::JValue;
use prost::Message;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::JniSenderType;
use risingwave_jni_core::{JniReceiverType, JniSenderType};
use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse};
use tokio::sync::mpsc;

Expand All @@ -46,6 +46,7 @@ pub struct CdcSplitReader<T: CdcSourceTypeTrait> {
snapshot_done: bool,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
rx: JniReceiverType<anyhow::Result<GetEventStreamResponse>>,
}

const DEFAULT_CHANNEL_SIZE: usize = 16;
Expand All @@ -66,50 +67,13 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
assert_eq!(splits.len(), 1);
let split = splits.into_iter().next().unwrap();
let split_id = split.id();
match T::source_type() {
CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: None,
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
}),
CdcSourceType::Citus => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: split.server_addr().clone(),
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

fn into_stream(self) -> BoxSourceWithStateStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
}
}

impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn into_data_stream(self) {
// rewrite the hostname and port for the split
let mut properties = self.conn_props.props.clone();
let mut properties = conn_props.props.clone();

// For citus, we need to rewrite the table.name to capture sharding tables
if self.server_addr.is_some() {
let addr = self.server_addr.unwrap();
let host_addr = HostAddr::from_str(&addr)
// For citus, we need to rewrite the `table.name` to capture sharding tables
if matches!(T::source_type(), CdcSourceType::Citus) && let Some(server_addr) = split.server_addr() {
let host_addr = HostAddr::from_str(&server_addr)
.map_err(|err| anyhow!("invalid server address for cdc split. {}", err))?;
properties.insert("hostname".to_string(), host_addr.host);
properties.insert("port".to_string(), host_addr.port.to_string());
Expand All @@ -121,30 +85,27 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
properties.insert("table.name".into(), table_name);
}

let source_id = split.split_id() as u64;
let source_type = conn_props.get_source_type_pb();
let (mut tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

let jvm = JVM
.get_or_init()
.map_err(|e| anyhow!("jvm not initialized properly: {:?}", e))?;

let get_event_stream_request = GetEventStreamRequest {
source_id: self.source_id,
source_type: self.conn_props.get_source_type_pb() as _,
start_offset: self.start_offset.unwrap_or_default(),
source_id,
source_type: source_type as _,
start_offset: split.start_offset().clone().unwrap_or_default(),
properties,
snapshot_done: self.snapshot_done,
snapshot_done: split.snapshot_done(),
};

let source_id = get_event_stream_request.source_id.to_string();
let source_type = get_event_stream_request.source_type.to_string();

std::thread::spawn(move || {
let result: anyhow::Result<_> = try {
let env = jvm.attach_current_thread()?;

let get_event_stream_request_bytes =
env.byte_array_from_slice(&Message::encode_to_vec(&get_event_stream_request))?;

(env, get_event_stream_request_bytes)
};

Expand All @@ -171,21 +132,76 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {

match result {
Ok(_) => {
tracing::info!("end of jni call runJniDbzSourceThread");
tracing::info!(?source_id, "end of jni call runJniDbzSourceThread");
}
Err(e) => {
tracing::error!("jni call error: {:?}", e);
tracing::error!(?source_id, "jni call error: {:?}", e);
}
}
});

if let Some(res) = rx.recv().await {
let resp: GetEventStreamResponse = res?;
let inited = match resp.control {
Some(info) => info.handshake_ok,
None => false,
};
if !inited {
return Err(anyhow!("failed to start cdc connector"));
}
}
tracing::info!(?source_id, "cdc connector started");

match T::source_type() {
CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: None,
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
rx,
}),
CdcSourceType::Citus => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: split.server_addr(),
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
rx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

fn into_stream(self) -> BoxSourceWithStateStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
}
}

impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn into_data_stream(self) {
let source_type = T::source_type();
let mut rx = self.rx;
let source_id = self.source_id.to_string();
let metrics = self.source_ctx.metrics.clone();

while let Some(result) = rx.recv().await {
let GetEventStreamResponse { events, .. } = result?;
tracing::trace!("receive events {:?}", events.len());
self.source_ctx
.metrics
metrics
.connector_source_rows_received
.with_label_values(&[&source_type, &source_id])
.with_label_values(&[source_type.as_str_name(), &source_id])
.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/source/cdc/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,11 @@ impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
unreachable!("invalid debezium split")
}

pub fn server_addr(&self) -> &Option<String> {
pub fn server_addr(&self) -> Option<String> {
if let Some(split) = &self.pg_split {
return &split.server_addr;
split.server_addr.clone()
} else {
None
}
unreachable!("invalid debezium split")
}
}
9 changes: 8 additions & 1 deletion src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ impl JavaVmWrapper {
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size));
.option(format!("-Xmx{}", jvm_heap_size))
// Quoted from the debezium document:
// > Your application should always properly stop the engine to ensure graceful and complete
// > shutdown and that each source record is sent to the application exactly one time.
// In RisingWave we assume the upstream changelog may contain duplicate events and
// handle conflicts in the mview operator, thus we don't need to obey the above
// instructions. So we decrease the wait time here to reclaim jvm thread faster.
.option("-Ddebezium.embedded.shutdown.pause.before.interrupt.ms=1");

tracing::info!("JVM args: {:?}", args_builder);
let jvm_args = args_builder
Expand Down
11 changes: 1 addition & 10 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,7 @@ impl ConnectorSource {
// TODO: is this reader split across multiple threads...? Realistically, we want
// source_ctx to live in a single actor.
let source_ctx = source_ctx.clone();
async move {
create_split_reader(
*props,
splits,
parser_config,
source_ctx,
data_gen_columns,
)
.await
}
create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns)
}))
.await?
};
Expand Down

0 comments on commit 5a6df32

Please sign in to comment.