From fec603734ad223a589c780fa2dd29afe6185fc2c Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 15 Apr 2024 13:49:03 +0800 Subject: [PATCH] cherry pick #16058 --- .../connector/source/core/DbzCdcEngine.java | 17 +-- .../source/core/DbzCdcEngineRunner.java | 18 +-- ...sumer.java => DbzChangeEventConsumer.java} | 74 ++++++++++-- .../source/core/DbzSourceHandler.java | 7 +- .../source/core/JniDbzSourceHandler.java | 51 +++++++-- .../source/core/JniDbzSourceRegistry.java | 35 ++++++ .../EmbeddedEngineChangeEventProxy.java | 27 +++++ src/connector/src/lib.rs | 1 + src/connector/src/source/base.rs | 20 ++++ src/connector/src/source/cdc/jni_source.rs | 39 +++++++ src/connector/src/source/cdc/mod.rs | 2 + src/connector/src/source/cdc/source/reader.rs | 14 +-- src/connector/src/source/reader/reader.rs | 11 ++ src/stream/src/common/table/state_table.rs | 8 ++ .../src/executor/source/fetch_executor.rs | 12 +- .../src/executor/source/fs_source_executor.rs | 4 +- .../src/executor/source/source_executor.rs | 107 +++++++++++++++++- .../executor/source/state_table_handler.rs | 26 ++--- 18 files changed, 398 insertions(+), 75 deletions(-) rename java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/{DbzCdcEventConsumer.java => DbzChangeEventConsumer.java} (76%) create mode 100644 java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceRegistry.java create mode 100644 java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEventProxy.java create mode 100644 src/connector/src/source/cdc/jni_source.rs diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java index 61d1f6284a67f..b515ce8bd79b6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java @@ -17,7 +17,6 @@ import static io.debezium.config.CommonConnectorConfig.TOPIC_PREFIX; import static io.debezium.schema.AbstractTopicNamingStrategy.*; -import com.risingwave.connector.api.source.CdcEngine; import com.risingwave.proto.ConnectorServiceProto; import io.debezium.embedded.Connect; import io.debezium.engine.DebeziumEngine; @@ -25,11 +24,11 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -public class DbzCdcEngine implements CdcEngine { +public class DbzCdcEngine implements Runnable { static final int DEFAULT_QUEUE_CAPACITY = 16; private final DebeziumEngine engine; - private final DbzCdcEventConsumer consumer; + private final DbzChangeEventConsumer changeEventConsumer; private final long id; /** If config is not valid will throw exceptions */ @@ -41,7 +40,7 @@ public DbzCdcEngine( var topicPrefix = config.getProperty(TOPIC_PREFIX.name()); var transactionTopic = String.format("%s.%s", topicPrefix, DEFAULT_TRANSACTION_TOPIC); var consumer = - new DbzCdcEventConsumer( + new DbzChangeEventConsumer( sourceId, heartbeatTopicPrefix, transactionTopic, @@ -49,7 +48,7 @@ public DbzCdcEngine( // Builds a debezium engine but not start it this.id = sourceId; - this.consumer = consumer; + this.changeEventConsumer = consumer; this.engine = DebeziumEngine.create(Connect.class) .using(config) @@ -64,7 +63,6 @@ public void run() { engine.run(); } - @Override public long getId() { return id; } @@ -73,8 +71,11 @@ public void stop() throws Exception { engine.close(); } - @Override public BlockingQueue getOutputChannel() { - return consumer.getOutputChannel(); + return changeEventConsumer.getOutputChannel(); + } + + public DbzChangeEventConsumer getChangeEventConsumer() { + return changeEventConsumer; } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java index 2f2a9408e7e06..b223d0dfba142 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java @@ -28,15 +28,15 @@ import org.slf4j.LoggerFactory; /** Single-thread engine runner */ -public class DbzCdcEngineRunner implements CdcEngineRunner { +public class DbzCdcEngineRunner { static final Logger LOG = LoggerFactory.getLogger(DbzCdcEngineRunner.class); private final ExecutorService executor; private final AtomicBoolean running = new AtomicBoolean(false); - private CdcEngine engine; + private DbzCdcEngine engine; private final DbzConnectorConfig config; - public static CdcEngineRunner newCdcEngineRunner( + public static DbzCdcEngineRunner newCdcEngineRunner( DbzConnectorConfig config, StreamObserver responseObserver) { DbzCdcEngineRunner runner = null; try { @@ -69,7 +69,7 @@ public static CdcEngineRunner newCdcEngineRunner( return runner; } - public static CdcEngineRunner create(DbzConnectorConfig config, long channelPtr) { + public static DbzCdcEngineRunner create(DbzConnectorConfig config, long channelPtr) { DbzCdcEngineRunner runner = new DbzCdcEngineRunner(config); try { var sourceId = config.getSourceId(); @@ -123,7 +123,7 @@ private DbzCdcEngineRunner(DbzConnectorConfig config) { this.config = config; } - private void withEngine(CdcEngine engine) { + private void withEngine(DbzCdcEngine engine) { this.engine = engine; } @@ -160,16 +160,18 @@ public void stop() throws Exception { } } - @Override - public CdcEngine getEngine() { + public DbzCdcEngine getEngine() { return engine; } - @Override public boolean isRunning() { return running.get(); } + public DbzChangeEventConsumer getChangeEventConsumer() { + return engine.getChangeEventConsumer(); + } + private void cleanUp() { running.set(false); // interrupt the runner thread if it is still running diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java similarity index 76% rename from java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java rename to java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java index bcc532038c9c6..b6d030537c105 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java @@ -18,13 +18,17 @@ import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer; import com.risingwave.proto.ConnectorServiceProto.CdcMessage; import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse; +import io.debezium.connector.postgresql.PostgresOffsetContext; +import io.debezium.embedded.EmbeddedEngineChangeEventProxy; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; @@ -40,9 +44,9 @@ enum EventType { DATA, } -public class DbzCdcEventConsumer +public class DbzChangeEventConsumer implements DebeziumEngine.ChangeConsumer> { - static final Logger LOG = LoggerFactory.getLogger(DbzCdcEventConsumer.class); + static final Logger LOG = LoggerFactory.getLogger(DbzChangeEventConsumer.class); private final BlockingQueue outputChannel; private final long sourceId; @@ -51,7 +55,10 @@ public class DbzCdcEventConsumer private final String heartbeatTopicPrefix; private final String transactionTopic; - DbzCdcEventConsumer( + private volatile DebeziumEngine.RecordCommitter> + currentRecordCommitter; + + DbzChangeEventConsumer( long sourceId, String heartbeatTopicPrefix, String transactionTopic, @@ -108,6 +115,7 @@ public void handleBatch( DebeziumEngine.RecordCommitter> committer) throws InterruptedException { var respBuilder = GetEventStreamResponse.newBuilder(); + currentRecordCommitter = committer; for (ChangeEvent event : events) { var record = event.value(); EventType eventType = getEventType(record); @@ -199,9 +207,6 @@ var record = event.value(); default: break; } - - // mark the event as processed - committer.markProcessed(event); } LOG.debug("recv {} events", respBuilder.getEventsCount()); @@ -211,16 +216,61 @@ var record = event.value(); var response = respBuilder.build(); outputChannel.put(response); } + } - committer.markBatchFinished(); + public BlockingQueue getOutputChannel() { + return this.outputChannel; } - @Override - public boolean supportsTombstoneEvents() { - return DebeziumEngine.ChangeConsumer.super.supportsTombstoneEvents(); + /** + * Commit the offset to the Debezium engine. NOTES: The input offset is passed from the source + * executor to here + * + * @param offset persisted offset in the Source state table + */ + @SuppressWarnings("unchecked") + public void commitOffset(DebeziumOffset offset) throws InterruptedException { + // Although the committer is read/write by multi-thread, the committer will be not changed + // frequently. + if (currentRecordCommitter == null) { + LOG.info( + "commitOffset() called on Debezium change consumer which doesn't receive records yet."); + return; + } + + // only the offset is used + SourceRecord recordWrapper = + new SourceRecord( + offset.sourcePartition, + adjustSourceOffset((Map) offset.sourceOffset), + "DUMMY", + Schema.BOOLEAN_SCHEMA, + true); + ChangeEvent changeEvent = + EmbeddedEngineChangeEventProxy.create(null, recordWrapper, recordWrapper); + currentRecordCommitter.markProcessed(changeEvent); + currentRecordCommitter.markBatchFinished(); } - public BlockingQueue getOutputChannel() { - return this.outputChannel; + /** + * We have to adjust type of LSN values to Long, because it might be Integer after + * deserialization, however {@link + * io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(Map, Map)} + * requires Long. + */ + private Map adjustSourceOffset(Map sourceOffset) { + if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)) { + String value = + sourceOffset + .get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY) + .toString(); + sourceOffset.put( + PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value)); + } + if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)) { + String value = sourceOffset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY).toString(); + sourceOffset.put(PostgresOffsetContext.LAST_COMMIT_LSN_KEY, Long.parseLong(value)); + } + return sourceOffset; } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java index e5da5a3680a2b..76aff296bf359 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java @@ -14,7 +14,6 @@ package com.risingwave.connector.source.core; -import com.risingwave.connector.api.source.CdcEngineRunner; import com.risingwave.connector.api.source.SourceHandler; import com.risingwave.connector.source.common.DbzConnectorConfig; import com.risingwave.metrics.ConnectorNodeMetrics; @@ -25,7 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** * handler for starting a debezium source connectors */ +/** handler for starting a debezium source connectors */ public class DbzSourceHandler implements SourceHandler { static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class); @@ -36,11 +35,11 @@ public DbzSourceHandler(DbzConnectorConfig config) { } class OnReadyHandler implements Runnable { - private final CdcEngineRunner runner; + private final DbzCdcEngineRunner runner; private final ServerCallStreamObserver responseObserver; public OnReadyHandler( - CdcEngineRunner runner, + DbzCdcEngineRunner runner, ServerCallStreamObserver responseObserver) { this.runner = runner; this.responseObserver = responseObserver; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index 949ccd403edc9..30092195f40f2 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -16,28 +16,40 @@ import static com.risingwave.proto.ConnectorServiceProto.SourceType.POSTGRES; -import com.risingwave.connector.api.source.CdcEngineRunner; import com.risingwave.connector.api.source.SourceTypeE; +import com.risingwave.connector.cdc.debezium.internal.DebeziumOffset; +import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer; +import com.risingwave.connector.source.common.CdcConnectorException; import com.risingwave.connector.source.common.DbzConnectorConfig; import com.risingwave.connector.source.common.DbzSourceUtils; import com.risingwave.java.binding.Binding; import com.risingwave.metrics.ConnectorNodeMetrics; import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** handler for starting a debezium source connectors for jni */ + /** handler for starting a debezium source connectors for jni */ public class JniDbzSourceHandler { static final Logger LOG = LoggerFactory.getLogger(JniDbzSourceHandler.class); private final DbzConnectorConfig config; + private final DbzCdcEngineRunner runner; - public JniDbzSourceHandler(DbzConnectorConfig config) { + public JniDbzSourceHandler(DbzConnectorConfig config, long channelPtr) { this.config = config; + this.runner = DbzCdcEngineRunner.create(config, channelPtr); + + if (runner == null) { + throw new CdcConnectorException("Failed to create engine runner"); + } } public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr) @@ -66,15 +78,31 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long mutableUserProps, request.getSnapshotDone(), isCdcSourceJob); - JniDbzSourceHandler handler = new JniDbzSourceHandler(config); + JniDbzSourceHandler handler = new JniDbzSourceHandler(config, channelPtr); + // register handler to the registry + JniDbzSourceRegistry.register(config.getSourceId(), handler); handler.start(channelPtr); } - public void start(long channelPtr) { - var runner = DbzCdcEngineRunner.create(config, channelPtr); - if (runner == null) { - return; + public void commitOffset(String encodedOffset) throws InterruptedException { + try { + DebeziumOffset offset = + DebeziumOffsetSerializer.INSTANCE.deserialize( + encodedOffset.getBytes(StandardCharsets.UTF_8)); + var changeEventConsumer = runner.getChangeEventConsumer(); + if (changeEventConsumer != null) { + changeEventConsumer.commitOffset(offset); + LOG.info("Engine#{}: committed offset {}", config.getSourceId(), offset); + } else { + LOG.warn("Engine#{}: changeEventConsumer is null", config.getSourceId()); + } + } catch (IOException err) { + LOG.error("Engine#{}: fail to commit offset.", config.getSourceId(), err); + throw new CdcConnectorException(err.getMessage()); } + } + + public void start(long channelPtr) { try { // Start the engine @@ -83,6 +111,8 @@ public void start(long channelPtr) { LOG.error( "Failed to send handshake message to channel. sourceId={}", config.getSourceId()); + // remove the handler from registry + JniDbzSourceRegistry.unregister(config.getSourceId()); return; } @@ -122,10 +152,13 @@ public void start(long channelPtr) { LOG.warn("Failed to stop Engine#{}", config.getSourceId(), e); } } + + // remove the handler from registry + JniDbzSourceRegistry.unregister(config.getSourceId()); } - private boolean sendHandshakeMessage(CdcEngineRunner runner, long channelPtr, boolean startOk) - throws Exception { + private boolean sendHandshakeMessage( + DbzCdcEngineRunner runner, long channelPtr, boolean startOk) throws Exception { // send a handshake message to notify the Source executor // if the handshake is not ok, the split reader will return error to source actor var controlInfo = diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceRegistry.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceRegistry.java new file mode 100644 index 0000000000000..97eca6003b72c --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceRegistry.java @@ -0,0 +1,35 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source.core; + +import java.util.concurrent.ConcurrentHashMap; + +/** Global registry for all JNI Debezium source handlers. */ +public class JniDbzSourceRegistry { + private static final ConcurrentHashMap sourceHandlers = + new ConcurrentHashMap<>(); + + public static void register(long sourceId, JniDbzSourceHandler handler) { + sourceHandlers.put(sourceId, handler); + } + + public static JniDbzSourceHandler getSourceHandler(long sourceId) { + return sourceHandlers.get(sourceId); + } + + public static void unregister(long sourceId) { + sourceHandlers.remove(sourceId); + } +} diff --git a/java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEventProxy.java b/java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEventProxy.java new file mode 100644 index 0000000000000..d76075a91e7e4 --- /dev/null +++ b/java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEventProxy.java @@ -0,0 +1,27 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.debezium.embedded; + +import io.debezium.engine.ChangeEvent; +import java.util.Collections; +import org.apache.kafka.connect.source.SourceRecord; + +/** Act as a proxy to the package-private class EmbeddedEngineChangeEvent */ +public class EmbeddedEngineChangeEventProxy { + public static ChangeEvent create( + SourceRecord key, SourceRecord value, SourceRecord sourceRecord) { + return new EmbeddedEngineChangeEvent<>(key, value, Collections.emptyList(), sourceRecord); + } +} diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index fb38f2db00c4f..bd201785a8f15 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -56,6 +56,7 @@ pub mod common; pub mod mqtt_common; pub use paste::paste; +pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime}; mod with_options; pub use with_options::WithPropertiesExt; diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 52724b1707660..a95db3541616e 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -47,6 +47,7 @@ use crate::parser::ParserConfig; pub(crate) use crate::source::common::CommonSplitReader; use crate::source::filesystem::FsPageItem; use crate::source::monitor::EnumeratorMetrics; +use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc}; use crate::with_options::WithOptions; use crate::{ dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties, @@ -444,6 +445,24 @@ impl SplitImpl { |other| bail!("connector '{}' is not supported", other) ) } + + pub fn is_cdc_split(&self) -> bool { + matches!( + self, + MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) + ) + } + + /// Get the current split offset. + pub fn get_cdc_split_offset(&self) -> String { + match self { + MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(), + PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(), + MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(), + CitusCdc(split) => split.start_offset().clone().unwrap_or_default(), + _ => unreachable!("get_cdc_split_offset() is only for cdc split"), + } + } } impl SplitMetaData for SplitImpl { @@ -558,6 +577,7 @@ pub trait SplitMetaData: Sized { Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap()) } + /// Encode the whole split metadata to a JSON object fn encode_to_json(&self) -> JsonbVal; fn restore_from_json(value: JsonbVal) -> Result; fn update_with_offset(&mut self, start_offset: String) -> crate::error::ConnectorResult<()>; diff --git a/src/connector/src/source/cdc/jni_source.rs b/src/connector/src/source/cdc/jni_source.rs new file mode 100644 index 0000000000000..406756d0f84fe --- /dev/null +++ b/src/connector/src/source/cdc/jni_source.rs @@ -0,0 +1,39 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, JVM}; +use risingwave_jni_core::{call_method, call_static_method}; + +pub fn commit_cdc_offset(source_id: u64, encoded_offset: String) -> anyhow::Result<()> { + let jvm = JVM.get_or_init()?; + execute_with_jni_env(jvm, |env| { + // get source handler by source id + let handler = call_static_method!( + env, + {com.risingwave.connector.source.core.JniDbzSourceRegistry}, + {com.risingwave.connector.source.core.JniDbzSourceHandler getSourceHandler(long sourceId)}, + source_id + )?; + + let offset_str = env.new_string(&encoded_offset).with_context(|| { + format!("Failed to create jni string from source offset: {encoded_offset}.") + })?; + // commit offset to upstream + call_method!(env, handler, {void commitOffset(String)}, &offset_str).with_context( + || format!("Failed to commit offset to upstream for source: {source_id}."), + )?; + Ok(()) + }) +} diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 02e94dd337bd6..6ecdf19a78333 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -14,8 +14,10 @@ pub mod enumerator; pub mod external; +pub mod jni_source; pub mod source; pub mod split; + use std::collections::HashMap; use std::marker::PhantomData; diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 029be2a6e30ea..cf2b5c3d17e00 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -97,7 +97,6 @@ impl SplitReader for CdcSplitReader { let (mut tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let jvm = JVM.get_or_init()?; - let get_event_stream_request = GetEventStreamRequest { source_id, source_type: source_type as _, @@ -158,8 +157,8 @@ impl SplitReader for CdcSplitReader { } tracing::info!(?source_id, "cdc connector started"); - match T::source_type() { - CdcSourceType::Mysql | CdcSourceType::Postgres | CdcSourceType::Mongodb => Ok(Self { + let instance = match T::source_type() { + CdcSourceType::Mysql | CdcSourceType::Postgres | CdcSourceType::Mongodb => Self { source_id: split.split_id() as u64, start_offset: split.start_offset().clone(), server_addr: None, @@ -169,8 +168,8 @@ impl SplitReader for CdcSplitReader { parser_config, source_ctx, rx, - }), - CdcSourceType::Citus => Ok(Self { + }, + CdcSourceType::Citus => Self { source_id: split.split_id() as u64, start_offset: split.start_offset().clone(), server_addr: citus_server_addr, @@ -180,11 +179,12 @@ impl SplitReader for CdcSplitReader { parser_config, source_ctx, rx, - }), + }, CdcSourceType::Unspecified => { unreachable!(); } - } + }; + Ok(instance) } fn into_stream(self) -> BoxChunkSourceStream { diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 5cfd10835998a..e8c20f66dedc4 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -105,6 +105,17 @@ impl SourceReader { } } + /// Postgres and Oracle connectors need to commit the offset to upstream. + pub fn need_commit_offset_to_upstream(&self) -> bool { + matches!( + &self.config, + ConnectorProperties::PostgresCdc(_) + | ConnectorProperties::MysqlCdc(_) + | ConnectorProperties::MongodbCdc(_) + | ConnectorProperties::CitusCdc(_) + ) + } + pub async fn to_stream( &self, state: ConnectorState, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 730e94c5345da..a43b527b0ee8a 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -95,6 +95,8 @@ pub struct StateTableInner< /// Id for this table. table_id: TableId, + store: S, + /// State store backend. local_store: S::Local, @@ -198,6 +200,10 @@ where .expect("non-replicated state store should start immediately.") .expect("non-replicated state store should not wait_for_epoch, and fail because of it.") } + + pub fn state_store(&self) -> &S { + &self.store + } } fn consistent_old_value_op(row_serde: impl ValueRowSerde) -> OpConsistencyLevel { @@ -421,6 +427,7 @@ where Self { table_id, + store, local_store: local_state_store, pk_serde, row_serde, @@ -602,6 +609,7 @@ where }; Self { table_id, + store, local_store: local_state_store, pk_serde, row_serde, diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index ce571c47f5403..9b57d1c1ee680 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -97,9 +97,9 @@ impl FsFetchExecutor { stream: &mut StreamReaderWithPause, ) -> StreamExecutorResult<()> { let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE); - 'vnodes: for vnode in state_store_handler.state_store.vnodes().iter_vnodes() { + 'vnodes: for vnode in state_store_handler.state_table.vnodes().iter_vnodes() { let table_iter = state_store_handler - .state_store + .state_table .iter_with_vnode( vnode, &(Bound::::Unbounded, Bound::::Unbounded), @@ -220,7 +220,7 @@ impl FsFetchExecutor { // Hence we try building a reader first. Self::replace_with_new_batch_reader( &mut splits_on_fetch, - &state_store_handler, + &state_store_handler, // move into the function core.column_ids.clone(), self.build_source_ctx(&source_desc, core.source_id, &core.source_name), &source_desc, @@ -251,7 +251,7 @@ impl FsFetchExecutor { } state_store_handler - .state_store + .state_table .commit(barrier.epoch) .await?; @@ -261,7 +261,7 @@ impl FsFetchExecutor { // if _cache_may_stale, we must rebuild the stream to adjust vnode mappings let (_prev_vnode_bitmap, cache_may_stale) = state_store_handler - .state_store + .state_table .update_vnode_bitmap(vnode_bitmap); if cache_may_stale { @@ -305,7 +305,7 @@ impl FsFetchExecutor { }) .collect(); state_store_handler.set_states(file_assignment).await?; - state_store_handler.state_store.try_flush().await?; + state_store_handler.state_table.try_flush().await?; } _ => unreachable!(), } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 94576d6a4c459..848c03f4feb9b 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -244,7 +244,7 @@ impl FsSourceExecutor { core.split_state_store.set_all_complete(completed).await? } // commit anyway, even if no message saved - core.split_state_store.state_store.commit(epoch).await?; + core.split_state_store.state_table.commit(epoch).await?; core.updated_splits_in_epoch.clear(); Ok(()) @@ -253,7 +253,7 @@ impl FsSourceExecutor { async fn try_flush_data(&mut self) -> StreamExecutorResult<()> { let core = &mut self.stream_source_core; - core.split_state_store.state_store.try_flush().await?; + core.split_state_store.state_table.try_flush().await?; Ok(()) } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 36358bdcd372e..0fbc3d2b84b17 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::fmt::Formatter; +use std::str::FromStr; use std::time::Duration; use anyhow::anyhow; @@ -22,14 +23,17 @@ use futures_async_stream::try_stream; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; +use risingwave_connector::source::cdc::jni_source; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData, }; use risingwave_connector::ConnectorParams; +use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::StateStore; use thiserror_ext::AsReport; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::time::Instant; use super::executor_core::StreamSourceCore; @@ -85,6 +89,18 @@ impl SourceExecutor { } } + pub fn spawn_wait_epoch_worker( + core: &StreamSourceCore, + ) -> UnboundedSender<(Epoch, HashMap)> { + let (wait_epoch_tx, wait_epoch_rx) = mpsc::unbounded_channel(); + let wait_epoch_worker = WaitEpochWorker { + wait_epoch_rx, + state_store: core.split_state_store.state_table.state_store().clone(), + }; + tokio::spawn(wait_epoch_worker.run()); + wait_epoch_tx + } + pub async fn build_stream_source_reader( &self, source_desc: &SourceDesc, @@ -305,7 +321,7 @@ impl SourceExecutor { async fn persist_state_and_clear_cache( &mut self, epoch: EpochPair, - ) -> StreamExecutorResult<()> { + ) -> StreamExecutorResult> { let core = self.stream_source_core.as_mut().unwrap(); let cache = core @@ -320,16 +336,19 @@ impl SourceExecutor { } // commit anyway, even if no message saved - core.split_state_store.state_store.commit(epoch).await?; + core.split_state_store.state_table.commit(epoch).await?; + + let updated_splits = core.updated_splits_in_epoch.clone(); + core.updated_splits_in_epoch.clear(); - Ok(()) + Ok(updated_splits) } /// try mem table spill async fn try_flush_data(&mut self) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); - core.split_state_store.state_store.try_flush().await?; + core.split_state_store.state_table.try_flush().await?; Ok(()) } @@ -361,6 +380,12 @@ impl SourceExecutor { .build() .map_err(StreamExecutorError::connector_error)?; + let wait_epoch_tx = if source_desc.source.need_commit_offset_to_upstream() { + Some(Self::spawn_wait_epoch_worker(&core)) + } else { + None + }; + let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns) else { unreachable!("Partition and offset columns must be set."); @@ -487,7 +512,24 @@ impl SourceExecutor { } } - self.persist_state_and_clear_cache(epoch).await?; + let updated_splits = self.persist_state_and_clear_cache(epoch).await?; + + // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification + if barrier.kind.is_checkpoint() + && !updated_splits.is_empty() + && let Some(ref tx) = wait_epoch_tx + { + let mut updated_offsets = HashMap::new(); + for (split_id, split_impl) in updated_splits { + if split_impl.is_cdc_split() { + updated_offsets.insert(split_id, split_impl.get_cdc_split_offset()); + } + } + + tracing::debug!("epoch to wait {:?}", epoch); + tx.send((Epoch(epoch.prev), updated_offsets)) + .expect("wait_epoch_tx send success"); + } yield Message::Barrier(barrier); } @@ -618,6 +660,59 @@ impl Debug for SourceExecutor { } } +struct WaitEpochWorker { + wait_epoch_rx: UnboundedReceiver<(Epoch, HashMap)>, + state_store: S, +} + +impl WaitEpochWorker { + pub async fn run(mut self) { + tracing::debug!("wait epoch worker start success"); + loop { + // poll the rx and wait for the epoch commit + match self.wait_epoch_rx.recv().await { + Some((epoch, updated_offsets)) => { + tracing::debug!("start to wait epoch {}", epoch.0); + let ret = self + .state_store + .try_wait_epoch(HummockReadEpoch::Committed(epoch.0)) + .await; + + match ret { + Ok(()) => { + tracing::debug!(epoch = epoch.0, "wait epoch success"); + // cdc source only has one split + assert_eq!(1, updated_offsets.len()); + let (split_id, offset) = updated_offsets.into_iter().next().unwrap(); + let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap(); + // notify cdc connector to commit offset + match jni_source::commit_cdc_offset(source_id, offset.clone()) { + Ok(_) => {} + Err(e) => { + tracing::error!( + error = %e.as_report(), + "source#{source_id}: failed to commit cdc offset: {offset}.", + ) + } + } + } + Err(e) => { + tracing::error!( + error = %e.as_report(), + "wait epoch {} failed", epoch.0 + ); + } + } + } + None => { + tracing::error!("wait epoch rx closed"); + break; + } + } + } + } +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index c9d967ca56c8f..fad1f2f8f25d6 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -47,13 +47,13 @@ use crate::executor::StreamExecutorResult; const COMPLETE_SPLIT_PREFIX: &str = "SsGLdzRDqBuKzMf9bDap"; pub struct SourceStateTableHandler { - pub state_store: StateTable, + pub state_table: StateTable, } impl SourceStateTableHandler { pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self { Self { - state_store: StateTable::from_table_catalog(table_catalog, store, None).await, + state_table: StateTable::from_table_catalog(table_catalog, store, None).await, } } @@ -63,12 +63,12 @@ impl SourceStateTableHandler { vnodes: Option>, ) -> Self { Self { - state_store: StateTable::from_table_catalog(table_catalog, store, vnodes).await, + state_table: StateTable::from_table_catalog(table_catalog, store, vnodes).await, } } pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_store.init_epoch(epoch); + self.state_table.init_epoch(epoch); } fn string_to_scalar(rhs: impl Into) -> ScalarImpl { @@ -76,7 +76,7 @@ impl SourceStateTableHandler { } pub(crate) async fn get(&self, key: SplitId) -> StreamExecutorResult> { - self.state_store + self.state_table .get_row(row::once(Some(Self::string_to_scalar(key.deref())))) .await .map_err(StreamExecutorError::from) @@ -94,7 +94,7 @@ impl SourceStateTableHandler { // all source executor has vnode id zero let iter = self - .state_store + .state_table .iter_with_vnode(VirtualNode::ZERO, &(start, end), PrefetchOptions::default()) .await?; @@ -126,9 +126,9 @@ impl SourceStateTableHandler { Some(ScalarImpl::Jsonb(value)), ]; if let Some(prev_row) = self.get(key).await? { - self.state_store.delete(prev_row); + self.state_table.delete(prev_row); } - self.state_store.insert(row); + self.state_table.insert(row); Ok(()) } @@ -157,10 +157,10 @@ impl SourceStateTableHandler { ]; match self.get(key).await? { Some(prev_row) => { - self.state_store.update(prev_row, row); + self.state_table.update(prev_row, row); } None => { - self.state_store.insert(row); + self.state_table.insert(row); } } Ok(()) @@ -168,7 +168,7 @@ impl SourceStateTableHandler { pub async fn delete(&mut self, key: SplitId) -> StreamExecutorResult<()> { if let Some(prev_row) = self.get(key).await? { - self.state_store.delete(prev_row); + self.state_table.delete(prev_row); } Ok(()) @@ -310,9 +310,9 @@ pub(crate) mod tests { state_table_handler .set_states(vec![split_impl.clone()]) .await?; - state_table_handler.state_store.commit(epoch_2).await?; + state_table_handler.state_table.commit(epoch_2).await?; - state_table_handler.state_store.commit(epoch_3).await?; + state_table_handler.state_table.commit(epoch_3).await?; match state_table_handler .try_recover_from_state_store(&split_impl)