Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdc): ensure connector is inited after the CREATE TABLE is finished #13130

Merged
merged 4 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ echo "--- e2e, ci-1cn-1fe, mysql & postgres cdc"
mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cdc.sql

# import data to postgres
export PGHOST=db PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test
export PGHOST=db PGPORT=5432 PGUSER=postgres PGPASSWORD=postgres PGDATABASE=cdc_test
createdb
psql < ./e2e_test/source/cdc/postgres_cdc.sql

Expand Down
53 changes: 53 additions & 0 deletions e2e_test/source/cdc_inline/postgres_create_drop.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# create and drop CDC postgres tables concurrently
control substitution on

system ok
psql -c "
DROP TABLE IF EXISTS tt1;
CREATE TABLE tt1 (v1 int primary key, v2 timestamp);
INSERT INTO tt1 VALUES (1, '2023-10-23 10:00:00');"

statement ok
create table tt1 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:8432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:123456}',
database.name = '${PGDATABASE:mydb}',
schema.name = 'public',
table.name = 'tt1',
slot.name = 'tt1_slot',
);

statement ok
drop table tt1;

statement ok
create table tt1 (v1 int,
v2 timestamptz,
PRIMARY KEY (v1)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:8432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:123456}',
database.name = '${PGDATABASE:mydb}',
schema.name = 'public',
table.name = 'tt1',
slot.name = 'tt1_slot',
);

sleep 3s

query IT
SELECT * FROM tt1;
----
1 2023-10-23 10:00:00+00:00

statement ok
drop table tt1;
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import com.risingwave.connector.api.source.*;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -40,8 +40,7 @@ private DbzCdcEngineRunner(CdcEngine engine) {
}

public static CdcEngineRunner newCdcEngineRunner(
DbzConnectorConfig config,
StreamObserver<ConnectorServiceProto.GetEventStreamResponse> responseObserver) {
DbzConnectorConfig config, StreamObserver<GetEventStreamResponse> responseObserver) {
DbzCdcEngineRunner runner = null;
try {
var sourceId = config.getSourceId();
Expand Down Expand Up @@ -98,12 +97,21 @@ public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) {
}

/** Start to run the cdc engine */
public void start() {
public void start() throws InterruptedException {
if (isRunning()) {
LOG.info("engine#{} already started", engine.getId());
return;
}

// put a handshake message to notify the Source executor
var controlInfo =
GetEventStreamResponse.ControlInfo.newBuilder().setHandshakeOk(true).build();
engine.getOutputChannel()
.put(
GetEventStreamResponse.newBuilder()
.setSourceId(engine.getId())
.setControl(controlInfo)
.build());
executor.execute(engine);
running.set(true);
LOG.info("engine#{} started", engine.getId());
Expand Down
4 changes: 4 additions & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,12 @@ message GetEventStreamRequest {
}

message GetEventStreamResponse {
message ControlInfo {
bool handshake_ok = 1;
}
uint64 source_id = 1;
repeated CdcMessage events = 2;
ControlInfo control = 3;
}

message ValidateSourceRequest {
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ impl<'a> From<&'a str> for CdcSourceType {
}
}

impl CdcSourceType {
pub fn as_str_name(&self) -> &str {
match self {
CdcSourceType::Mysql => "MySQL",
CdcSourceType::Postgres => "Postgres",
CdcSourceType::Citus => "Citus",
CdcSourceType::Unspecified => "Unspecified",
}
}
}

#[derive(Clone, Debug, Default)]
pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Properties specified in the WITH clause by user
Expand Down
128 changes: 72 additions & 56 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use jni::objects::JValue;
use prost::Message;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::JniSenderType;
use risingwave_jni_core::{JniReceiverType, JniSenderType};
use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse};
use tokio::sync::mpsc;

Expand All @@ -46,6 +46,7 @@ pub struct CdcSplitReader<T: CdcSourceTypeTrait> {
snapshot_done: bool,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
rx: JniReceiverType<anyhow::Result<GetEventStreamResponse>>,
}

const DEFAULT_CHANNEL_SIZE: usize = 16;
Expand All @@ -66,50 +67,13 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
assert_eq!(splits.len(), 1);
let split = splits.into_iter().next().unwrap();
let split_id = split.id();
match T::source_type() {
CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: None,
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
}),
CdcSourceType::Citus => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: split.server_addr().clone(),
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

fn into_stream(self) -> BoxSourceWithStateStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
}
}

impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn into_data_stream(self) {
// rewrite the hostname and port for the split
let mut properties = self.conn_props.props.clone();
let mut properties = conn_props.props.clone();

// For citus, we need to rewrite the table.name to capture sharding tables
if self.server_addr.is_some() {
let addr = self.server_addr.unwrap();
let host_addr = HostAddr::from_str(&addr)
// For citus, we need to rewrite the `table.name` to capture sharding tables
if matches!(T::source_type(), CdcSourceType::Citus) && let Some(server_addr) = split.server_addr() {
let host_addr = HostAddr::from_str(&server_addr)
.map_err(|err| anyhow!("invalid server address for cdc split. {}", err))?;
properties.insert("hostname".to_string(), host_addr.host);
properties.insert("port".to_string(), host_addr.port.to_string());
Expand All @@ -121,30 +85,27 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
properties.insert("table.name".into(), table_name);
}

let source_id = split.split_id() as u64;
let source_type = conn_props.get_source_type_pb();
let (mut tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

let jvm = JVM
.get_or_init()
.map_err(|e| anyhow!("jvm not initialized properly: {:?}", e))?;

let get_event_stream_request = GetEventStreamRequest {
source_id: self.source_id,
source_type: self.conn_props.get_source_type_pb() as _,
start_offset: self.start_offset.unwrap_or_default(),
source_id,
source_type: source_type as _,
start_offset: split.start_offset().clone().unwrap_or_default(),
properties,
snapshot_done: self.snapshot_done,
snapshot_done: split.snapshot_done(),
};

let source_id = get_event_stream_request.source_id.to_string();
let source_type = get_event_stream_request.source_type.to_string();

std::thread::spawn(move || {
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
let result: anyhow::Result<_> = try {
let env = jvm.attach_current_thread()?;

let get_event_stream_request_bytes =
env.byte_array_from_slice(&Message::encode_to_vec(&get_event_stream_request))?;

(env, get_event_stream_request_bytes)
};

Expand All @@ -171,21 +132,76 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {

match result {
Ok(_) => {
tracing::info!("end of jni call runJniDbzSourceThread");
tracing::info!(?source_id, "end of jni call runJniDbzSourceThread");
}
Err(e) => {
tracing::error!("jni call error: {:?}", e);
tracing::error!(?source_id, "jni call error: {:?}", e);
}
}
});

if let Some(res) = rx.recv().await {
let resp: GetEventStreamResponse = res?;
let inited = match resp.control {
Some(info) => info.handshake_ok,
None => false,
};
if !inited {
return Err(anyhow!("failed to start cdc connector"));
}
}
tracing::info!(?source_id, "cdc connector started");

match T::source_type() {
CdcSourceType::Mysql | CdcSourceType::Postgres => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: None,
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
rx,
}),
CdcSourceType::Citus => Ok(Self {
source_id: split.split_id() as u64,
start_offset: split.start_offset().clone(),
server_addr: split.server_addr(),
conn_props,
split_id,
snapshot_done: split.snapshot_done(),
parser_config,
source_ctx,
rx,
}),
CdcSourceType::Unspecified => {
unreachable!();
}
}
}

fn into_stream(self) -> BoxSourceWithStateStream {
let parser_config = self.parser_config.clone();
let source_context = self.source_ctx.clone();
into_chunk_stream(self, parser_config, source_context)
}
}

impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
#[try_stream(ok = Vec<SourceMessage>, error = anyhow::Error)]
async fn into_data_stream(self) {
let source_type = T::source_type();
let mut rx = self.rx;
let source_id = self.source_id.to_string();
let metrics = self.source_ctx.metrics.clone();

while let Some(result) = rx.recv().await {
let GetEventStreamResponse { events, .. } = result?;
tracing::trace!("receive events {:?}", events.len());
self.source_ctx
.metrics
metrics
.connector_source_rows_received
.with_label_values(&[&source_type, &source_id])
.with_label_values(&[source_type.as_str_name(), &source_id])
.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/source/cdc/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,11 @@ impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
unreachable!("invalid debezium split")
}

pub fn server_addr(&self) -> &Option<String> {
pub fn server_addr(&self) -> Option<String> {
if let Some(split) = &self.pg_split {
return &split.server_addr;
split.server_addr.clone()
} else {
None
}
unreachable!("invalid debezium split")
}
}
9 changes: 8 additions & 1 deletion src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ impl JavaVmWrapper {
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size));
.option(format!("-Xmx{}", jvm_heap_size))
// Quoted from the debezium document:
// > Your application should always properly stop the engine to ensure graceful and complete
// > shutdown and that each source record is sent to the application exactly one time.
// In RisingWave we assume the upstream changelog may contain duplicate events and
// handle conflicts in the mview operator, thus we don't need to obey the above
// instructions. So we decrease the wait time here to reclaim jvm thread faster.
.option("-Ddebezium.embedded.shutdown.pause.before.interrupt.ms=1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw can you share more about the param?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


tracing::info!("JVM args: {:?}", args_builder);
let jvm_args = args_builder
Expand Down
11 changes: 1 addition & 10 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,7 @@ impl ConnectorSource {
// TODO: is this reader split across multiple threads...? Realistically, we want
// source_ctx to live in a single actor.
let source_ctx = source_ctx.clone();
async move {
create_split_reader(
*props,
splits,
parser_config,
source_ctx,
data_gen_columns,
)
.await
}
create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no await here? I thought it is about to wait till recv handshake resp from embedded dbz

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We has await on the outer try_join_all

}))
.await?
};
Expand Down
Loading