diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java index b515ce8bd79b6..61d1f6284a67f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java @@ -17,6 +17,7 @@ import static io.debezium.config.CommonConnectorConfig.TOPIC_PREFIX; import static io.debezium.schema.AbstractTopicNamingStrategy.*; +import com.risingwave.connector.api.source.CdcEngine; import com.risingwave.proto.ConnectorServiceProto; import io.debezium.embedded.Connect; import io.debezium.engine.DebeziumEngine; @@ -24,11 +25,11 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -public class DbzCdcEngine implements Runnable { +public class DbzCdcEngine implements CdcEngine { static final int DEFAULT_QUEUE_CAPACITY = 16; private final DebeziumEngine engine; - private final DbzChangeEventConsumer changeEventConsumer; + private final DbzCdcEventConsumer consumer; private final long id; /** If config is not valid will throw exceptions */ @@ -40,7 +41,7 @@ public DbzCdcEngine( var topicPrefix = config.getProperty(TOPIC_PREFIX.name()); var transactionTopic = String.format("%s.%s", topicPrefix, DEFAULT_TRANSACTION_TOPIC); var consumer = - new DbzChangeEventConsumer( + new DbzCdcEventConsumer( sourceId, heartbeatTopicPrefix, transactionTopic, @@ -48,7 +49,7 @@ public DbzCdcEngine( // Builds a debezium engine but not start it this.id = sourceId; - this.changeEventConsumer = consumer; + this.consumer = consumer; this.engine = DebeziumEngine.create(Connect.class) .using(config) @@ -63,6 +64,7 @@ public void run() { engine.run(); } + @Override public long getId() { return id; } @@ -71,11 +73,8 @@ public void stop() throws Exception { engine.close(); } + @Override public BlockingQueue getOutputChannel() { - return changeEventConsumer.getOutputChannel(); - } - - public DbzChangeEventConsumer getChangeEventConsumer() { - return changeEventConsumer; + return consumer.getOutputChannel(); } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java index b223d0dfba142..2f2a9408e7e06 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java @@ -28,15 +28,15 @@ import org.slf4j.LoggerFactory; /** Single-thread engine runner */ -public class DbzCdcEngineRunner { +public class DbzCdcEngineRunner implements CdcEngineRunner { static final Logger LOG = LoggerFactory.getLogger(DbzCdcEngineRunner.class); private final ExecutorService executor; private final AtomicBoolean running = new AtomicBoolean(false); - private DbzCdcEngine engine; + private CdcEngine engine; private final DbzConnectorConfig config; - public static DbzCdcEngineRunner newCdcEngineRunner( + public static CdcEngineRunner newCdcEngineRunner( DbzConnectorConfig config, StreamObserver responseObserver) { DbzCdcEngineRunner runner = null; try { @@ -69,7 +69,7 @@ public static DbzCdcEngineRunner newCdcEngineRunner( return runner; } - public static DbzCdcEngineRunner create(DbzConnectorConfig config, long channelPtr) { + public static CdcEngineRunner create(DbzConnectorConfig config, long channelPtr) { DbzCdcEngineRunner runner = new DbzCdcEngineRunner(config); try { var sourceId = config.getSourceId(); @@ -123,7 +123,7 @@ private DbzCdcEngineRunner(DbzConnectorConfig config) { this.config = config; } - private void withEngine(DbzCdcEngine engine) { + private void withEngine(CdcEngine engine) { this.engine = engine; } @@ -160,18 +160,16 @@ public void stop() throws Exception { } } - public DbzCdcEngine getEngine() { + @Override + public CdcEngine getEngine() { return engine; } + @Override public boolean isRunning() { return running.get(); } - public DbzChangeEventConsumer getChangeEventConsumer() { - return engine.getChangeEventConsumer(); - } - private void cleanUp() { running.set(false); // interrupt the runner thread if it is still running diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java similarity index 76% rename from java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java rename to java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index b6d030537c105..bcc532038c9c6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -18,17 +18,13 @@ import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer; import com.risingwave.proto.ConnectorServiceProto.CdcMessage; import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse; -import io.debezium.connector.postgresql.PostgresOffsetContext; -import io.debezium.embedded.EmbeddedEngineChangeEventProxy; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; -import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; @@ -44,9 +40,9 @@ enum EventType { DATA, } -public class DbzChangeEventConsumer +public class DbzCdcEventConsumer implements DebeziumEngine.ChangeConsumer> { - static final Logger LOG = LoggerFactory.getLogger(DbzChangeEventConsumer.class); + static final Logger LOG = LoggerFactory.getLogger(DbzCdcEventConsumer.class); private final BlockingQueue outputChannel; private final long sourceId; @@ -55,10 +51,7 @@ public class DbzChangeEventConsumer private final String heartbeatTopicPrefix; private final String transactionTopic; - private volatile DebeziumEngine.RecordCommitter> - currentRecordCommitter; - - DbzChangeEventConsumer( + DbzCdcEventConsumer( long sourceId, String heartbeatTopicPrefix, String transactionTopic, @@ -115,7 +108,6 @@ public void handleBatch( DebeziumEngine.RecordCommitter> committer) throws InterruptedException { var respBuilder = GetEventStreamResponse.newBuilder(); - currentRecordCommitter = committer; for (ChangeEvent event : events) { var record = event.value(); EventType eventType = getEventType(record); @@ -207,6 +199,9 @@ var record = event.value(); default: break; } + + // mark the event as processed + committer.markProcessed(event); } LOG.debug("recv {} events", respBuilder.getEventsCount()); @@ -216,61 +211,16 @@ var record = event.value(); var response = respBuilder.build(); outputChannel.put(response); } - } - public BlockingQueue getOutputChannel() { - return this.outputChannel; + committer.markBatchFinished(); } - /** - * Commit the offset to the Debezium engine. NOTES: The input offset is passed from the source - * executor to here - * - * @param offset persisted offset in the Source state table - */ - @SuppressWarnings("unchecked") - public void commitOffset(DebeziumOffset offset) throws InterruptedException { - // Although the committer is read/write by multi-thread, the committer will be not changed - // frequently. - if (currentRecordCommitter == null) { - LOG.info( - "commitOffset() called on Debezium change consumer which doesn't receive records yet."); - return; - } - - // only the offset is used - SourceRecord recordWrapper = - new SourceRecord( - offset.sourcePartition, - adjustSourceOffset((Map) offset.sourceOffset), - "DUMMY", - Schema.BOOLEAN_SCHEMA, - true); - ChangeEvent changeEvent = - EmbeddedEngineChangeEventProxy.create(null, recordWrapper, recordWrapper); - currentRecordCommitter.markProcessed(changeEvent); - currentRecordCommitter.markBatchFinished(); + @Override + public boolean supportsTombstoneEvents() { + return DebeziumEngine.ChangeConsumer.super.supportsTombstoneEvents(); } - /** - * We have to adjust type of LSN values to Long, because it might be Integer after - * deserialization, however {@link - * io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(Map, Map)} - * requires Long. - */ - private Map adjustSourceOffset(Map sourceOffset) { - if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)) { - String value = - sourceOffset - .get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY) - .toString(); - sourceOffset.put( - PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value)); - } - if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)) { - String value = sourceOffset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY).toString(); - sourceOffset.put(PostgresOffsetContext.LAST_COMMIT_LSN_KEY, Long.parseLong(value)); - } - return sourceOffset; + public BlockingQueue getOutputChannel() { + return this.outputChannel; } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java index 76aff296bf359..e5da5a3680a2b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzSourceHandler.java @@ -14,6 +14,7 @@ package com.risingwave.connector.source.core; +import com.risingwave.connector.api.source.CdcEngineRunner; import com.risingwave.connector.api.source.SourceHandler; import com.risingwave.connector.source.common.DbzConnectorConfig; import com.risingwave.metrics.ConnectorNodeMetrics; @@ -24,7 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** handler for starting a debezium source connectors */ +/** * handler for starting a debezium source connectors */ public class DbzSourceHandler implements SourceHandler { static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class); @@ -35,11 +36,11 @@ public DbzSourceHandler(DbzConnectorConfig config) { } class OnReadyHandler implements Runnable { - private final DbzCdcEngineRunner runner; + private final CdcEngineRunner runner; private final ServerCallStreamObserver responseObserver; public OnReadyHandler( - DbzCdcEngineRunner runner, + CdcEngineRunner runner, ServerCallStreamObserver responseObserver) { this.runner = runner; this.responseObserver = responseObserver; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index 30092195f40f2..949ccd403edc9 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -16,40 +16,28 @@ import static com.risingwave.proto.ConnectorServiceProto.SourceType.POSTGRES; +import com.risingwave.connector.api.source.CdcEngineRunner; import com.risingwave.connector.api.source.SourceTypeE; -import com.risingwave.connector.cdc.debezium.internal.DebeziumOffset; -import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer; -import com.risingwave.connector.source.common.CdcConnectorException; import com.risingwave.connector.source.common.DbzConnectorConfig; import com.risingwave.connector.source.common.DbzSourceUtils; import com.risingwave.java.binding.Binding; import com.risingwave.metrics.ConnectorNodeMetrics; import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** handler for starting a debezium source connectors for jni */ - /** handler for starting a debezium source connectors for jni */ public class JniDbzSourceHandler { static final Logger LOG = LoggerFactory.getLogger(JniDbzSourceHandler.class); private final DbzConnectorConfig config; - private final DbzCdcEngineRunner runner; - public JniDbzSourceHandler(DbzConnectorConfig config, long channelPtr) { + public JniDbzSourceHandler(DbzConnectorConfig config) { this.config = config; - this.runner = DbzCdcEngineRunner.create(config, channelPtr); - - if (runner == null) { - throw new CdcConnectorException("Failed to create engine runner"); - } } public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr) @@ -78,31 +66,15 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long mutableUserProps, request.getSnapshotDone(), isCdcSourceJob); - JniDbzSourceHandler handler = new JniDbzSourceHandler(config, channelPtr); - // register handler to the registry - JniDbzSourceRegistry.register(config.getSourceId(), handler); + JniDbzSourceHandler handler = new JniDbzSourceHandler(config); handler.start(channelPtr); } - public void commitOffset(String encodedOffset) throws InterruptedException { - try { - DebeziumOffset offset = - DebeziumOffsetSerializer.INSTANCE.deserialize( - encodedOffset.getBytes(StandardCharsets.UTF_8)); - var changeEventConsumer = runner.getChangeEventConsumer(); - if (changeEventConsumer != null) { - changeEventConsumer.commitOffset(offset); - LOG.info("Engine#{}: committed offset {}", config.getSourceId(), offset); - } else { - LOG.warn("Engine#{}: changeEventConsumer is null", config.getSourceId()); - } - } catch (IOException err) { - LOG.error("Engine#{}: fail to commit offset.", config.getSourceId(), err); - throw new CdcConnectorException(err.getMessage()); - } - } - public void start(long channelPtr) { + var runner = DbzCdcEngineRunner.create(config, channelPtr); + if (runner == null) { + return; + } try { // Start the engine @@ -111,8 +83,6 @@ public void start(long channelPtr) { LOG.error( "Failed to send handshake message to channel. sourceId={}", config.getSourceId()); - // remove the handler from registry - JniDbzSourceRegistry.unregister(config.getSourceId()); return; } @@ -152,13 +122,10 @@ public void start(long channelPtr) { LOG.warn("Failed to stop Engine#{}", config.getSourceId(), e); } } - - // remove the handler from registry - JniDbzSourceRegistry.unregister(config.getSourceId()); } - private boolean sendHandshakeMessage( - DbzCdcEngineRunner runner, long channelPtr, boolean startOk) throws Exception { + private boolean sendHandshakeMessage(CdcEngineRunner runner, long channelPtr, boolean startOk) + throws Exception { // send a handshake message to notify the Source executor // if the handshake is not ok, the split reader will return error to source actor var controlInfo = diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceRegistry.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceRegistry.java deleted file mode 100644 index 97eca6003b72c..0000000000000 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceRegistry.java +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.risingwave.connector.source.core; - -import java.util.concurrent.ConcurrentHashMap; - -/** Global registry for all JNI Debezium source handlers. */ -public class JniDbzSourceRegistry { - private static final ConcurrentHashMap sourceHandlers = - new ConcurrentHashMap<>(); - - public static void register(long sourceId, JniDbzSourceHandler handler) { - sourceHandlers.put(sourceId, handler); - } - - public static JniDbzSourceHandler getSourceHandler(long sourceId) { - return sourceHandlers.get(sourceId); - } - - public static void unregister(long sourceId) { - sourceHandlers.remove(sourceId); - } -} diff --git a/java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEventProxy.java b/java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEventProxy.java deleted file mode 100644 index d76075a91e7e4..0000000000000 --- a/java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEventProxy.java +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package io.debezium.embedded; - -import io.debezium.engine.ChangeEvent; -import java.util.Collections; -import org.apache.kafka.connect.source.SourceRecord; - -/** Act as a proxy to the package-private class EmbeddedEngineChangeEvent */ -public class EmbeddedEngineChangeEventProxy { - public static ChangeEvent create( - SourceRecord key, SourceRecord value, SourceRecord sourceRecord) { - return new EmbeddedEngineChangeEvent<>(key, value, Collections.emptyList(), sourceRecord); - } -} diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index bd201785a8f15..fb38f2db00c4f 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -56,7 +56,6 @@ pub mod common; pub mod mqtt_common; pub use paste::paste; -pub use risingwave_jni_core::{call_method, call_static_method, jvm_runtime}; mod with_options; pub use with_options::WithPropertiesExt; diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index a95db3541616e..52724b1707660 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -47,7 +47,6 @@ use crate::parser::ParserConfig; pub(crate) use crate::source::common::CommonSplitReader; use crate::source::filesystem::FsPageItem; use crate::source::monitor::EnumeratorMetrics; -use crate::source::SplitImpl::{CitusCdc, MongodbCdc, MysqlCdc, PostgresCdc}; use crate::with_options::WithOptions; use crate::{ dispatch_source_prop, dispatch_split_impl, for_all_sources, impl_connector_properties, @@ -445,24 +444,6 @@ impl SplitImpl { |other| bail!("connector '{}' is not supported", other) ) } - - pub fn is_cdc_split(&self) -> bool { - matches!( - self, - MysqlCdc(_) | PostgresCdc(_) | MongodbCdc(_) | CitusCdc(_) - ) - } - - /// Get the current split offset. - pub fn get_cdc_split_offset(&self) -> String { - match self { - MysqlCdc(split) => split.start_offset().clone().unwrap_or_default(), - PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(), - MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(), - CitusCdc(split) => split.start_offset().clone().unwrap_or_default(), - _ => unreachable!("get_cdc_split_offset() is only for cdc split"), - } - } } impl SplitMetaData for SplitImpl { @@ -577,7 +558,6 @@ pub trait SplitMetaData: Sized { Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap()) } - /// Encode the whole split metadata to a JSON object fn encode_to_json(&self) -> JsonbVal; fn restore_from_json(value: JsonbVal) -> Result; fn update_with_offset(&mut self, start_offset: String) -> crate::error::ConnectorResult<()>; diff --git a/src/connector/src/source/cdc/jni_source.rs b/src/connector/src/source/cdc/jni_source.rs deleted file mode 100644 index 406756d0f84fe..0000000000000 --- a/src/connector/src/source/cdc/jni_source.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use anyhow::Context; -use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, JVM}; -use risingwave_jni_core::{call_method, call_static_method}; - -pub fn commit_cdc_offset(source_id: u64, encoded_offset: String) -> anyhow::Result<()> { - let jvm = JVM.get_or_init()?; - execute_with_jni_env(jvm, |env| { - // get source handler by source id - let handler = call_static_method!( - env, - {com.risingwave.connector.source.core.JniDbzSourceRegistry}, - {com.risingwave.connector.source.core.JniDbzSourceHandler getSourceHandler(long sourceId)}, - source_id - )?; - - let offset_str = env.new_string(&encoded_offset).with_context(|| { - format!("Failed to create jni string from source offset: {encoded_offset}.") - })?; - // commit offset to upstream - call_method!(env, handler, {void commitOffset(String)}, &offset_str).with_context( - || format!("Failed to commit offset to upstream for source: {source_id}."), - )?; - Ok(()) - }) -} diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 6ecdf19a78333..02e94dd337bd6 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -14,10 +14,8 @@ pub mod enumerator; pub mod external; -pub mod jni_source; pub mod source; pub mod split; - use std::collections::HashMap; use std::marker::PhantomData; diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index cf2b5c3d17e00..029be2a6e30ea 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -97,6 +97,7 @@ impl SplitReader for CdcSplitReader { let (mut tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let jvm = JVM.get_or_init()?; + let get_event_stream_request = GetEventStreamRequest { source_id, source_type: source_type as _, @@ -157,8 +158,8 @@ impl SplitReader for CdcSplitReader { } tracing::info!(?source_id, "cdc connector started"); - let instance = match T::source_type() { - CdcSourceType::Mysql | CdcSourceType::Postgres | CdcSourceType::Mongodb => Self { + match T::source_type() { + CdcSourceType::Mysql | CdcSourceType::Postgres | CdcSourceType::Mongodb => Ok(Self { source_id: split.split_id() as u64, start_offset: split.start_offset().clone(), server_addr: None, @@ -168,8 +169,8 @@ impl SplitReader for CdcSplitReader { parser_config, source_ctx, rx, - }, - CdcSourceType::Citus => Self { + }), + CdcSourceType::Citus => Ok(Self { source_id: split.split_id() as u64, start_offset: split.start_offset().clone(), server_addr: citus_server_addr, @@ -179,12 +180,11 @@ impl SplitReader for CdcSplitReader { parser_config, source_ctx, rx, - }, + }), CdcSourceType::Unspecified => { unreachable!(); } - }; - Ok(instance) + } } fn into_stream(self) -> BoxChunkSourceStream { diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index e8c20f66dedc4..5cfd10835998a 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -105,17 +105,6 @@ impl SourceReader { } } - /// Postgres and Oracle connectors need to commit the offset to upstream. - pub fn need_commit_offset_to_upstream(&self) -> bool { - matches!( - &self.config, - ConnectorProperties::PostgresCdc(_) - | ConnectorProperties::MysqlCdc(_) - | ConnectorProperties::MongodbCdc(_) - | ConnectorProperties::CitusCdc(_) - ) - } - pub async fn to_stream( &self, state: ConnectorState, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index a43b527b0ee8a..730e94c5345da 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -95,8 +95,6 @@ pub struct StateTableInner< /// Id for this table. table_id: TableId, - store: S, - /// State store backend. local_store: S::Local, @@ -200,10 +198,6 @@ where .expect("non-replicated state store should start immediately.") .expect("non-replicated state store should not wait_for_epoch, and fail because of it.") } - - pub fn state_store(&self) -> &S { - &self.store - } } fn consistent_old_value_op(row_serde: impl ValueRowSerde) -> OpConsistencyLevel { @@ -427,7 +421,6 @@ where Self { table_id, - store, local_store: local_state_store, pk_serde, row_serde, @@ -609,7 +602,6 @@ where }; Self { table_id, - store, local_store: local_state_store, pk_serde, row_serde, diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 9b57d1c1ee680..ce571c47f5403 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -97,9 +97,9 @@ impl FsFetchExecutor { stream: &mut StreamReaderWithPause, ) -> StreamExecutorResult<()> { let mut batch = Vec::with_capacity(SPLIT_BATCH_SIZE); - 'vnodes: for vnode in state_store_handler.state_table.vnodes().iter_vnodes() { + 'vnodes: for vnode in state_store_handler.state_store.vnodes().iter_vnodes() { let table_iter = state_store_handler - .state_table + .state_store .iter_with_vnode( vnode, &(Bound::::Unbounded, Bound::::Unbounded), @@ -220,7 +220,7 @@ impl FsFetchExecutor { // Hence we try building a reader first. Self::replace_with_new_batch_reader( &mut splits_on_fetch, - &state_store_handler, // move into the function + &state_store_handler, core.column_ids.clone(), self.build_source_ctx(&source_desc, core.source_id, &core.source_name), &source_desc, @@ -251,7 +251,7 @@ impl FsFetchExecutor { } state_store_handler - .state_table + .state_store .commit(barrier.epoch) .await?; @@ -261,7 +261,7 @@ impl FsFetchExecutor { // if _cache_may_stale, we must rebuild the stream to adjust vnode mappings let (_prev_vnode_bitmap, cache_may_stale) = state_store_handler - .state_table + .state_store .update_vnode_bitmap(vnode_bitmap); if cache_may_stale { @@ -305,7 +305,7 @@ impl FsFetchExecutor { }) .collect(); state_store_handler.set_states(file_assignment).await?; - state_store_handler.state_table.try_flush().await?; + state_store_handler.state_store.try_flush().await?; } _ => unreachable!(), } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 848c03f4feb9b..94576d6a4c459 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -244,7 +244,7 @@ impl FsSourceExecutor { core.split_state_store.set_all_complete(completed).await? } // commit anyway, even if no message saved - core.split_state_store.state_table.commit(epoch).await?; + core.split_state_store.state_store.commit(epoch).await?; core.updated_splits_in_epoch.clear(); Ok(()) @@ -253,7 +253,7 @@ impl FsSourceExecutor { async fn try_flush_data(&mut self) -> StreamExecutorResult<()> { let core = &mut self.stream_source_core; - core.split_state_store.state_table.try_flush().await?; + core.split_state_store.state_store.try_flush().await?; Ok(()) } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 0fbc3d2b84b17..36358bdcd372e 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::Formatter; -use std::str::FromStr; use std::time::Duration; use anyhow::anyhow; @@ -23,17 +22,14 @@ use futures_async_stream::try_stream; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; -use risingwave_connector::source::cdc::jni_source; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData, }; use risingwave_connector::ConnectorParams; -use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::StateStore; use thiserror_ext::AsReport; -use tokio::sync::mpsc; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::UnboundedReceiver; use tokio::time::Instant; use super::executor_core::StreamSourceCore; @@ -89,18 +85,6 @@ impl SourceExecutor { } } - pub fn spawn_wait_epoch_worker( - core: &StreamSourceCore, - ) -> UnboundedSender<(Epoch, HashMap)> { - let (wait_epoch_tx, wait_epoch_rx) = mpsc::unbounded_channel(); - let wait_epoch_worker = WaitEpochWorker { - wait_epoch_rx, - state_store: core.split_state_store.state_table.state_store().clone(), - }; - tokio::spawn(wait_epoch_worker.run()); - wait_epoch_tx - } - pub async fn build_stream_source_reader( &self, source_desc: &SourceDesc, @@ -321,7 +305,7 @@ impl SourceExecutor { async fn persist_state_and_clear_cache( &mut self, epoch: EpochPair, - ) -> StreamExecutorResult> { + ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); let cache = core @@ -336,19 +320,16 @@ impl SourceExecutor { } // commit anyway, even if no message saved - core.split_state_store.state_table.commit(epoch).await?; - - let updated_splits = core.updated_splits_in_epoch.clone(); - + core.split_state_store.state_store.commit(epoch).await?; core.updated_splits_in_epoch.clear(); - Ok(updated_splits) + Ok(()) } /// try mem table spill async fn try_flush_data(&mut self) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); - core.split_state_store.state_table.try_flush().await?; + core.split_state_store.state_store.try_flush().await?; Ok(()) } @@ -380,12 +361,6 @@ impl SourceExecutor { .build() .map_err(StreamExecutorError::connector_error)?; - let wait_epoch_tx = if source_desc.source.need_commit_offset_to_upstream() { - Some(Self::spawn_wait_epoch_worker(&core)) - } else { - None - }; - let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns) else { unreachable!("Partition and offset columns must be set."); @@ -512,24 +487,7 @@ impl SourceExecutor { } } - let updated_splits = self.persist_state_and_clear_cache(epoch).await?; - - // when handle a checkpoint barrier, spawn a task to wait for epoch commit notification - if barrier.kind.is_checkpoint() - && !updated_splits.is_empty() - && let Some(ref tx) = wait_epoch_tx - { - let mut updated_offsets = HashMap::new(); - for (split_id, split_impl) in updated_splits { - if split_impl.is_cdc_split() { - updated_offsets.insert(split_id, split_impl.get_cdc_split_offset()); - } - } - - tracing::debug!("epoch to wait {:?}", epoch); - tx.send((Epoch(epoch.prev), updated_offsets)) - .expect("wait_epoch_tx send success"); - } + self.persist_state_and_clear_cache(epoch).await?; yield Message::Barrier(barrier); } @@ -660,59 +618,6 @@ impl Debug for SourceExecutor { } } -struct WaitEpochWorker { - wait_epoch_rx: UnboundedReceiver<(Epoch, HashMap)>, - state_store: S, -} - -impl WaitEpochWorker { - pub async fn run(mut self) { - tracing::debug!("wait epoch worker start success"); - loop { - // poll the rx and wait for the epoch commit - match self.wait_epoch_rx.recv().await { - Some((epoch, updated_offsets)) => { - tracing::debug!("start to wait epoch {}", epoch.0); - let ret = self - .state_store - .try_wait_epoch(HummockReadEpoch::Committed(epoch.0)) - .await; - - match ret { - Ok(()) => { - tracing::debug!(epoch = epoch.0, "wait epoch success"); - // cdc source only has one split - assert_eq!(1, updated_offsets.len()); - let (split_id, offset) = updated_offsets.into_iter().next().unwrap(); - let source_id: u64 = u64::from_str(split_id.as_ref()).unwrap(); - // notify cdc connector to commit offset - match jni_source::commit_cdc_offset(source_id, offset.clone()) { - Ok(_) => {} - Err(e) => { - tracing::error!( - error = %e.as_report(), - "source#{source_id}: failed to commit cdc offset: {offset}.", - ) - } - } - } - Err(e) => { - tracing::error!( - error = %e.as_report(), - "wait epoch {} failed", epoch.0 - ); - } - } - } - None => { - tracing::error!("wait epoch rx closed"); - break; - } - } - } - } -} - #[cfg(test)] mod tests { use std::time::Duration; diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index fad1f2f8f25d6..c9d967ca56c8f 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -47,13 +47,13 @@ use crate::executor::StreamExecutorResult; const COMPLETE_SPLIT_PREFIX: &str = "SsGLdzRDqBuKzMf9bDap"; pub struct SourceStateTableHandler { - pub state_table: StateTable, + pub state_store: StateTable, } impl SourceStateTableHandler { pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self { Self { - state_table: StateTable::from_table_catalog(table_catalog, store, None).await, + state_store: StateTable::from_table_catalog(table_catalog, store, None).await, } } @@ -63,12 +63,12 @@ impl SourceStateTableHandler { vnodes: Option>, ) -> Self { Self { - state_table: StateTable::from_table_catalog(table_catalog, store, vnodes).await, + state_store: StateTable::from_table_catalog(table_catalog, store, vnodes).await, } } pub fn init_epoch(&mut self, epoch: EpochPair) { - self.state_table.init_epoch(epoch); + self.state_store.init_epoch(epoch); } fn string_to_scalar(rhs: impl Into) -> ScalarImpl { @@ -76,7 +76,7 @@ impl SourceStateTableHandler { } pub(crate) async fn get(&self, key: SplitId) -> StreamExecutorResult> { - self.state_table + self.state_store .get_row(row::once(Some(Self::string_to_scalar(key.deref())))) .await .map_err(StreamExecutorError::from) @@ -94,7 +94,7 @@ impl SourceStateTableHandler { // all source executor has vnode id zero let iter = self - .state_table + .state_store .iter_with_vnode(VirtualNode::ZERO, &(start, end), PrefetchOptions::default()) .await?; @@ -126,9 +126,9 @@ impl SourceStateTableHandler { Some(ScalarImpl::Jsonb(value)), ]; if let Some(prev_row) = self.get(key).await? { - self.state_table.delete(prev_row); + self.state_store.delete(prev_row); } - self.state_table.insert(row); + self.state_store.insert(row); Ok(()) } @@ -157,10 +157,10 @@ impl SourceStateTableHandler { ]; match self.get(key).await? { Some(prev_row) => { - self.state_table.update(prev_row, row); + self.state_store.update(prev_row, row); } None => { - self.state_table.insert(row); + self.state_store.insert(row); } } Ok(()) @@ -168,7 +168,7 @@ impl SourceStateTableHandler { pub async fn delete(&mut self, key: SplitId) -> StreamExecutorResult<()> { if let Some(prev_row) = self.get(key).await? { - self.state_table.delete(prev_row); + self.state_store.delete(prev_row); } Ok(()) @@ -310,9 +310,9 @@ pub(crate) mod tests { state_table_handler .set_states(vec![split_impl.clone()]) .await?; - state_table_handler.state_table.commit(epoch_2).await?; + state_table_handler.state_store.commit(epoch_2).await?; - state_table_handler.state_table.commit(epoch_3).await?; + state_table_handler.state_store.commit(epoch_3).await?; match state_table_handler .try_recover_from_state_store(&split_impl)