Skip to content

Commit

Permalink
revert:feat(stream): create cdc table reader and source data stream … (
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored Dec 5, 2024
1 parent 1add86d commit 2a6aa41
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 346 deletions.
6 changes: 1 addition & 5 deletions e2e_test/source_inline/kafka/issue_19563.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,7 @@ cat <<EOF | rpk topic produce test-topic-19563
{"v1": "0001-01-01 21:00:00"}
EOF


sleep 6s

statement ok
flush;
sleep 3s

# Below lower bound and above upper bound are not shown
query I
Expand Down
22 changes: 17 additions & 5 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field,
use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader;
use risingwave_connector::source::cdc::external::mysql::MySqlOffset;
use risingwave_connector::source::cdc::external::{
CdcTableType, DebeziumOffset, DebeziumSourceOffset, ExternalTableConfig, SchemaTableName,
DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, SchemaTableName,
};
use risingwave_connector::source::cdc::DebeziumCdcSplit;
use risingwave_connector::source::SplitImpl;
Expand Down Expand Up @@ -158,6 +160,19 @@ async fn test_cdc_backfill() -> StreamResult<()> {
MockOffsetGenExecutor::new(source).boxed(),
);

let binlog_file = String::from("1.binlog");
// mock binlog watermarks for backfill
// initial low watermark: 1.binlog, pos=2 and expected behaviors:
// - ignore events before (1.binlog, pos=2);
// - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot
let binlog_watermarks = vec![
MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark
MySqlOffset::new(binlog_file.clone(), 4),
MySqlOffset::new(binlog_file.clone(), 6),
MySqlOffset::new(binlog_file.clone(), 8),
MySqlOffset::new(binlog_file.clone(), 10),
];

let table_name = SchemaTableName {
schema_name: "public".to_string(),
table_name: "mock_table".to_string(),
Expand All @@ -168,14 +183,11 @@ async fn test_cdc_backfill() -> StreamResult<()> {
]);
let table_pk_indices = vec![0];
let table_pk_order_types = vec![OrderType::ascending()];
let config = ExternalTableConfig::default();

let external_table = ExternalStorageTable::new(
TableId::new(1234),
table_name,
"mydb".to_string(),
config,
CdcTableType::Mock,
ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)),
table_schema.clone(),
table_pk_order_types,
table_pk_indices.clone(),
Expand Down
14 changes: 1 addition & 13 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,7 @@ pub struct MockExternalTableReader {
}

impl MockExternalTableReader {
pub fn new() -> Self {
let binlog_file = String::from("1.binlog");
// mock binlog watermarks for backfill
// initial low watermark: 1.binlog, pos=2 and expected behaviors:
// - ignore events before (1.binlog, pos=2);
// - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot
let binlog_watermarks = vec![
MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark
MySqlOffset::new(binlog_file.clone(), 4),
MySqlOffset::new(binlog_file.clone(), 6),
MySqlOffset::new(binlog_file.clone(), 8),
MySqlOffset::new(binlog_file.clone(), 10),
];
pub fn new(binlog_watermarks: Vec<MySqlOffset>) -> Self {
Self {
binlog_watermarks,
snapshot_cnt: AtomicUsize::new(0),
Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ use crate::source::cdc::external::sql_server::{
use crate::source::cdc::CdcSourceType;
use crate::WithPropertiesExt;

#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum CdcTableType {
Undefined,
Mock,
MySql,
Postgres,
SqlServer,
Expand Down Expand Up @@ -102,7 +101,6 @@ impl CdcTableType {
Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
)),
Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
_ => bail!("invalid external table type: {:?}", *self),
}
}
Expand Down Expand Up @@ -220,7 +218,7 @@ pub enum ExternalTableReaderImpl {
Mock(MockExternalTableReader),
}

#[derive(Debug, Default, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct ExternalTableConfig {
pub connector: String,

Expand Down
111 changes: 16 additions & 95 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::pin::Pin;

use either::Either;
Expand All @@ -28,11 +27,9 @@ use risingwave_connector::parser::{
ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig,
};
use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl};
use risingwave_connector::source::cdc::external::CdcOffset;
use risingwave_connector::source::{SourceColumnDesc, SourceContext};
use rw_futures_util::pausable;
use thiserror_ext::AsReport;
use tracing::Instrument;

use crate::executor::backfill::cdc::state::CdcBackfillState;
use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
Expand All @@ -45,7 +42,6 @@ use crate::executor::backfill::utils::{
use crate::executor::backfill::CdcScanOptions;
use crate::executor::monitor::CdcBackfillMetrics;
use crate::executor::prelude::*;
use crate::executor::source::get_infinite_backoff_strategy;
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgressReporter;

Expand Down Expand Up @@ -144,6 +140,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let upstream_table_name = self.external_table.qualified_table_name();
let schema_table_name = self.external_table.schema_table_name().clone();
let external_database_name = self.external_table.database_name().to_owned();
let upstream_table_reader = UpstreamTableReader::new(self.external_table);

let additional_columns = self
.output_columns
Expand All @@ -162,94 +159,38 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let first_barrier = expect_first_barrier(&mut upstream).await?;

let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
let first_barrier_epoch = first_barrier.epoch;
// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);
let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);

// Check whether this parallelism has been assigned splits,
// if not, we should bypass the backfill directly.
let mut state_impl = self.state_impl;

state_impl.init_epoch(first_barrier_epoch);
let mut upstream = transform_upstream(upstream, &self.output_columns)
.boxed()
.peekable();

state_impl.init_epoch(first_barrier.epoch);

// restore backfill state
let state = state_impl.restore_state().await?;
current_pk_pos = state.current_pk_pos.clone();

let need_backfill = !self.options.disable_backfill && !state.is_finished;
let to_backfill = !self.options.disable_backfill && !state.is_finished;

// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);

// Keep track of rows from the snapshot.
let mut total_snapshot_row_count = state.row_count as u64;

// After init the state table and forward the initial barrier to downstream,
// we now try to create the table reader with retry.
// If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
// If backfill is finished, we should forward the upstream cdc events to downstream.
let mut table_reader: Option<ExternalTableReaderImpl> = None;
let external_table = self.external_table.clone();
let mut future = Box::pin(async move {
let backoff = get_infinite_backoff_strategy();
tokio_retry::Retry::spawn(backoff, || async {
match external_table.create_table_reader().await {
Ok(reader) => Ok(reader),
Err(e) => {
tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
Err(e)
}
}
})
.instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
.await
.expect("Retry create cdc table reader until success.")
});
loop {
if let Some(msg) =
build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
.await?
{
match msg {
Message::Barrier(barrier) => {
// commit state to bump the epoch of state table
state_impl.commit_state(barrier.epoch).await?;
yield Message::Barrier(barrier);
}
Message::Chunk(chunk) => {
if need_backfill {
// ignore chunk if we need backfill, since we can read the data from the snapshot
} else {
// forward the chunk to downstream
yield Message::Chunk(chunk);
}
}
Message::Watermark(_) => {
// ignore watermark
}
}
} else {
assert!(table_reader.is_some(), "table reader must created");
tracing::info!(
table_id,
upstream_table_name,
"table reader created successfully"
);
break;
}
}

let upstream_table_reader = UpstreamTableReader::new(
self.external_table.clone(),
table_reader.expect("table reader must created"),
);

let mut upstream = transform_upstream(upstream, &self.output_columns)
.boxed()
.peekable();
let mut last_binlog_offset: Option<CdcOffset> = state
.last_cdc_offset
.map_or(upstream_table_reader.current_cdc_offset().await?, Some);

let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
let offset_parse_func = upstream_table_reader
.inner()
.table_reader()
.get_cdc_offset_parser();
let mut consumed_binlog_offset: Option<CdcOffset> = None;

tracing::info!(
Expand Down Expand Up @@ -286,7 +227,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// finished.
//
// Once the backfill loop ends, we forward the upstream directly to the downstream.
if need_backfill {
if to_backfill {
// drive the upstream changelog first to ensure we can receive timely changelog event,
// otherwise the upstream changelog may be blocked by the snapshot read stream
let _ = Pin::new(&mut upstream).peek().await;
Expand Down Expand Up @@ -761,26 +702,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
}

async fn build_reader_and_poll_upstream(
upstream: &mut BoxedMessageStream,
table_reader: &mut Option<ExternalTableReaderImpl>,
future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
) -> StreamExecutorResult<Option<Message>> {
if table_reader.is_some() {
return Ok(None);
}
tokio::select! {
biased;
reader = &mut *future => {
*table_reader = Some(reader);
Ok(None)
}
msg = upstream.next() => {
msg.transpose()
}
}
}

#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) {
let props = SpecificParserConfig {
Expand Down
36 changes: 6 additions & 30 deletions src/stream/src/executor/backfill/cdc/upstream_table/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@

use risingwave_common::catalog::{Schema, TableId};
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::source::cdc::external::{
CdcOffset, CdcTableType, ExternalTableConfig, ExternalTableReader, ExternalTableReaderImpl,
SchemaTableName,
};
use risingwave_connector::source::cdc::external::{ExternalTableReaderImpl, SchemaTableName};

/// This struct represents an external table to be read during backfill
#[derive(Debug, Clone)]
pub struct ExternalStorageTable {
/// Id for this table.
table_id: TableId,
Expand All @@ -33,9 +28,7 @@ pub struct ExternalStorageTable {

database_name: String,

config: ExternalTableConfig,

table_type: CdcTableType,
table_reader: ExternalTableReaderImpl,

/// The schema of the output columns, i.e., this table VIEWED BY some executor like
/// `RowSeqScanExecutor`.
Expand All @@ -50,16 +43,14 @@ pub struct ExternalStorageTable {
}

impl ExternalStorageTable {
#[allow(clippy::too_many_arguments)]
pub fn new(
table_id: TableId,
SchemaTableName {
table_name,
schema_name,
}: SchemaTableName,
database_name: String,
config: ExternalTableConfig,
table_type: CdcTableType,
table_reader: ExternalTableReaderImpl,
schema: Schema,
pk_order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
Expand All @@ -69,8 +60,7 @@ impl ExternalStorageTable {
table_name,
schema_name,
database_name,
config,
table_type,
table_reader,
schema,
pk_order_types,
pk_indices,
Expand Down Expand Up @@ -100,14 +90,8 @@ impl ExternalStorageTable {
}
}

pub async fn create_table_reader(&self) -> ConnectorResult<ExternalTableReaderImpl> {
self.table_type
.create_table_reader(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
)
.await
pub fn table_reader(&self) -> &ExternalTableReaderImpl {
&self.table_reader
}

pub fn qualified_table_name(&self) -> String {
Expand All @@ -117,12 +101,4 @@ impl ExternalStorageTable {
pub fn database_name(&self) -> &str {
self.database_name.as_str()
}

pub async fn current_cdc_offset(
&self,
table_reader: &ExternalTableReaderImpl,
) -> ConnectorResult<Option<CdcOffset>> {
let binlog = table_reader.current_cdc_offset().await?;
Ok(Some(binlog))
}
}
Loading

0 comments on commit 2a6aa41

Please sign in to comment.