Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use high watermark to finish backfill faster #18342

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 59 additions & 36 deletions e2e_test/source_inline/kafka/shared_source.slt
Original file line number Diff line number Diff line change
@@ -6,6 +6,29 @@ SET rw_enable_shared_source TO true;
system ok
rpk topic create shared_source -p 4

# Test create source before produing data.
statement ok
create source s_before_produce (v1 int, v2 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
create materialized view mv_before_produce as select * from s_before_produce;

sleep 2s

# All partitions starts with backfill_info: NoDataToBackfill, so it finishes immediately.
system ok
internal_table.mjs --name mv_before_produce --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""


system ok
cat << EOF | rpk topic produce shared_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
@@ -21,7 +44,7 @@ create source s0 (v1 int, v2 varchar) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

query I
query ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we need to use ? instead of I ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently sqllogictest --override will produce ?. Actually the character doesn't have any meaning now, any character will pass test..

select count(*) from rw_internal_tables where name like '%s0%';
----
1
@@ -41,21 +64,24 @@ create materialized view mv_1 as select * from s0;
# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 2s

# SourceExecutor's ingestion started, but it only starts from latest.
# SourceExecutor's ingestion started, but it only starts from latest (offset 1).
system ok
internal_table.mjs --name s0 --type source
----
(empty)


# offset 0 must be backfilled, not from upstream.
# SourceBackfill starts from offset 0, with backfill_info: HasDataToBackfill { latest_offset: "0" } (decided by kafka high watermark).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, what will the high watermark value be?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1

# (meaning upstream already consumed offset 0, so we only need to backfill to offset 0)
# After backfilling offset 0, it enters SourceCachingUp state. Now the backfill is finished.
# We wait for SourceExecutor to produce offset > 0.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"{""Backfilling"": ""0""}"
1,"{""Backfilling"": ""0""}"
2,"{""Backfilling"": ""0""}"
3,"{""Backfilling"": ""0""}"
0,"{""SourceCachingUp"": ""0""}"
1,"{""SourceCachingUp"": ""0""}"
2,"{""SourceCachingUp"": ""0""}"
3,"{""SourceCachingUp"": ""0""}"


# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
@@ -67,23 +93,23 @@ create materialized view mv_2 as select * from s0;

sleep 2s

query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
2 b
3 c
4 d

query IT rowsort
query ?? rowsort
select v1, v2 from mv_2;
----
1 a
@@ -111,7 +137,7 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


query IT rowsort
query ?? rowsort
select v1, v2 from s0;
----
1 a
@@ -123,7 +149,7 @@ select v1, v2 from s0;
4 d
4 dd

query IT rowsort
query ?? rowsort
select v1, v2 from mv_1;
----
1 a
@@ -146,18 +172,14 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# The result is non-deterministic:
# If the upstream row comes before the backfill row, it will be ignored, and the result state is "{""Backfilling"": ""1""}".
# If the upstream row comes after the backfill row, the result state is Finished.
# Uncomment below and run manually to see the result.

# system ok
# internal_table.mjs --name mv_1 --type sourcebackfill
# ----
# 0,"{""Finished""}"
# 1,"{""Finished""}"
# 2,"{""Finished""}"
# 3,"{""Finished""}"
# Transition from SourceCachingUp to Finished after consuming one upstream message.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""


system ok
@@ -173,22 +195,30 @@ done

sleep 3s

query IT rowsort
query ?? rowsort
select v1, count(*) from s0 group by v1;
----
1 12
2 12
3 12
4 12

query IT rowsort
query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 12
2 12
3 12
4 12

query ?? rowsort
select v1, count(*) from mv_before_produce group by v1;
----
1 12
2 12
3 12
4 12


# start_offset changed to 11
system ok
@@ -200,15 +230,8 @@ internal_table.mjs --name s0 --type source
3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# Now it is highly probable that all partitions have finished.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""


statement ok
drop source s0 cascade;

statement ok
drop source s_before_produce cascade;
27 changes: 27 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
@@ -370,6 +370,33 @@ pub trait SplitReader: Sized + Send {
) -> crate::error::ConnectorResult<Self>;

fn into_stream(self) -> BoxChunkSourceStream;

fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
HashMap::new()
}
}

/// Information used to determine whether we should start and finish source backfill.
///
/// XXX: if a connector cannot provide the latest offsets (but we want to make it shareable),
/// perhaps we should ban blocking DDL for it.
#[derive(Debug, Clone)]
pub enum BackfillInfo {
HasDataToBackfill {
/// The last available offsets for each split (**inclusive**).
///
/// This will be used to determine whether source backfill is finished when
/// there are no _new_ messages coming from upstream `SourceExecutor`. Otherwise,
/// blocking DDL cannot finish until new messages come.
///
/// When there are upstream messages, we will use the latest offsets from the upstream.
latest_offset: String,
},
/// If there are no messages in the split at all, we don't need to start backfill.
/// In this case, there will be no message from the backfill stream too.
/// If we started backfill, we cannot finish it until new messages come.
/// So we mark this a special case for optimization.
NoDataToBackfill,
}

for_all_sources!(impl_connector_properties);
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
@@ -170,6 +170,7 @@ impl KafkaSplitEnumerator {
self.report_high_watermark(*partition, high);
map.insert(*partition, (low, high));
}
tracing::debug!("fetch kafka watermarks: {map:?}");
Ok(map)
}

34 changes: 31 additions & 3 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
@@ -34,13 +34,14 @@ use crate::source::kafka::{
KafkaContextCommon, KafkaProperties, KafkaSplit, RwConsumerContext, KAFKA_ISOLATION_LEVEL,
};
use crate::source::{
into_chunk_stream, BoxChunkSourceStream, Column, SourceContextRef, SplitId, SplitMetaData,
SplitReader,
into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId,
SplitMetaData, SplitReader,
};

pub struct KafkaSplitReader {
consumer: StreamConsumer<RwConsumerContext>,
offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
backfill_info: HashMap<SplitId, BackfillInfo>,
bytes_per_second: usize,
max_num_messages: usize,
parser_config: ParserConfig,
@@ -106,7 +107,7 @@ impl SplitReader for KafkaSplitReader {
let mut tpl = TopicPartitionList::with_capacity(splits.len());

let mut offsets = HashMap::new();

let mut backfill_info = HashMap::new();
for split in splits {
offsets.insert(split.id(), (split.start_offset, split.stop_offset));

@@ -121,7 +122,29 @@ impl SplitReader for KafkaSplitReader {
} else {
tpl.add_partition(split.topic.as_str(), split.partition);
}

let (low, high) = consumer
.fetch_watermarks(
split.topic.as_str(),
split.partition,
properties.common.sync_call_timeout,
)
.await?;
tracing::debug!("fetch kafka watermarks: low: {low}, high: {high}, split: {split:?}");
// note: low is inclusive, high is exclusive
if low == high {
backfill_info.insert(split.id(), BackfillInfo::NoDataToBackfill);
} else {
debug_assert!(high > 0);
backfill_info.insert(
split.id(),
BackfillInfo::HasDataToBackfill {
latest_offset: (high - 1).to_string(),
},
);
}
}
tracing::debug!("backfill_info: {:?}", backfill_info);

consumer.assign(&tpl)?;

@@ -143,6 +166,7 @@ impl SplitReader for KafkaSplitReader {
Ok(Self {
consumer,
offsets,
backfill_info,
bytes_per_second,
max_num_messages,
parser_config,
@@ -155,6 +179,10 @@ impl SplitReader for KafkaSplitReader {
let source_context = self.source_ctx.clone();
into_chunk_stream(self.into_data_stream(), parser_config, source_context)
}

fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
self.backfill_info.clone()
}
}

impl KafkaSplitReader {
72 changes: 70 additions & 2 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
@@ -34,8 +35,9 @@ use crate::source::filesystem::opendal_source::{
};
use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
use crate::source::{
create_split_reader, BoxChunkSourceStream, BoxTryStream, Column, ConnectorProperties,
ConnectorState, SourceColumnDesc, SourceContext, SplitReader, WaitCheckpointTask,
create_split_reader, BackfillInfo, BoxChunkSourceStream, BoxTryStream, Column,
ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitReader,
WaitCheckpointTask,
};
use crate::{dispatch_source_prop, WithOptionsSecResolved};

@@ -129,6 +131,72 @@ impl SourceReader {
})
}

pub async fn build_stream_for_backfill(
&self,
state: ConnectorState,
column_ids: Vec<ColumnId>,
source_ctx: Arc<SourceContext>,
) -> ConnectorResult<(BoxChunkSourceStream, HashMap<SplitId, BackfillInfo>)> {
let Some(splits) = state else {
return Ok((pending().boxed(), HashMap::new()));
};
let config = self.config.clone();
let columns = self.get_target_columns(column_ids)?;

let data_gen_columns = Some(
columns
.iter()
.map(|col| Column {
name: col.name.clone(),
data_type: col.data_type.clone(),
is_visible: col.is_visible(),
})
.collect_vec(),
);

let parser_config = ParserConfig {
specific: self.parser_config.clone(),
common: CommonParserConfig {
rw_columns: columns,
},
};

let support_multiple_splits = config.support_multiple_splits();
dispatch_source_prop!(config, prop, {
let readers = if support_multiple_splits {
tracing::debug!(
"spawning connector split reader for multiple splits {:?}",
splits
);
let reader =
create_split_reader(*prop, splits, parser_config, source_ctx, data_gen_columns)
.await?;

vec![reader]
} else {
let to_reader_splits = splits.into_iter().map(|split| vec![split]);
try_join_all(to_reader_splits.into_iter().map(|splits| {
tracing::debug!(?splits, "spawning connector split reader");
let props = prop.clone();
let data_gen_columns = data_gen_columns.clone();
let parser_config = parser_config.clone();
// TODO: is this reader split across multiple threads...? Realistically, we want
// source_ctx to live in a single actor.
let source_ctx = source_ctx.clone();
create_split_reader(*props, splits, parser_config, source_ctx, data_gen_columns)
}))
.await?
};

let backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();

Ok((
select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
backfill_info,
))
})
}

/// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s).
pub async fn build_stream(
&self,
55 changes: 45 additions & 10 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
@@ -27,7 +27,8 @@ use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::types::JsonbVal;
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
use risingwave_connector::source::{
BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
BackfillInfo, BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl,
SplitMetaData,
};
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;
@@ -43,6 +44,7 @@ use crate::executor::UpdateMutation;
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum BackfillState {
/// `None` means not started yet. It's the initial state.
/// XXX: perhaps we can also set to low-watermark instead of `None`
Backfilling(Option<String>),
/// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
SourceCachingUp(String),
@@ -90,6 +92,8 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {

/// Local variables used in the backfill stage.
///
/// See <https://github.com/risingwavelabs/risingwave/issues/18299> for a state diagram about how it works.
///
/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
#[derive(Debug)]
struct BackfillStage {
@@ -99,8 +103,8 @@ struct BackfillStage {
/// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
splits: Vec<SplitImpl>,
/// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
/// TODO: initialize this with high watermark so that we can finish backfilling even when upstream
/// doesn't emit any data.
/// This is initialized with the latest available offset in the connector (if the connector provides the ability to fetch it)
/// so that we can finish backfilling even when upstream doesn't emit any data.
target_offsets: HashMap<SplitId, Option<String>>,
}

@@ -259,7 +263,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
&self,
source_desc: &SourceDesc,
splits: Vec<SplitImpl>,
) -> StreamExecutorResult<BoxChunkSourceStream> {
) -> StreamExecutorResult<(BoxChunkSourceStream, HashMap<SplitId, BackfillInfo>)> {
let column_ids = source_desc
.columns
.iter()
@@ -278,12 +282,22 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
source_desc.source.config.clone(),
None,
);
let stream = source_desc

// We will check watermark to decide whether we need to backfill.
// e.g., when there's a Kafka topic-partition without any data,
// we don't need to backfill at all. But if we do not check here,
// the executor can only know it's finished when data coming in.
// For blocking DDL, this would be annoying.

let (stream, backfill_info) = source_desc
.source
.build_stream(Some(splits), column_ids, Arc::new(source_ctx))
.build_stream_for_backfill(Some(splits), column_ids, Arc::new(source_ctx))
.await
.map_err(StreamExecutorError::connector_error)?;
Ok(apply_rate_limit(stream, self.rate_limit_rps).boxed())
Ok((
apply_rate_limit(stream, self.rate_limit_rps).boxed(),
backfill_info,
))
}

#[try_stream(ok = Message, error = StreamExecutorError)]
@@ -337,13 +351,25 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = core;

let source_chunk_reader = self
let (source_chunk_reader, backfill_info) = self
.build_stream_source_reader(
&source_desc,
backfill_stage.get_latest_unfinished_splits()?,
)
.instrument_await("source_build_reader")
.await?;
for (split_id, info) in &backfill_info {
match info {
BackfillInfo::NoDataToBackfill => {
*backfill_stage.states.get_mut(split_id).unwrap() = BackfillState::Finished;
}
BackfillInfo::HasDataToBackfill { latest_offset } => {
// Note: later we will override it with the offset from the source message, and it's possible to become smaller than this value.
*backfill_stage.target_offsets.get_mut(split_id).unwrap() =
Some(latest_offset.clone());
}
}
}

fn select_strategy(_: &mut ()) -> PollNext {
futures::stream::PollNext::Left
@@ -422,7 +448,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.actor_ctx.fragment_id.to_string(),
]);

let reader = self
let (reader, _backfill_info) = self
.build_stream_source_reader(
&source_desc,
backfill_stage.get_latest_unfinished_splits()?,
@@ -504,7 +530,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
);

// Replace the source reader with a new one of the new state.
let reader = self
let (reader, _backfill_info) = self
.build_stream_source_reader(
&source_desc,
latest_unfinished_splits,
@@ -602,6 +628,15 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
}

let mut splits: HashSet<SplitId> = backfill_stage.states.keys().cloned().collect();
// Make sure `Finished` state is persisted.
self.backfill_state_store
.set_states(
splits
.iter()
.map(|s| (s.clone(), BackfillState::Finished))
.collect(),
)
.await?;

// All splits finished backfilling. Now we only forward the source data.
#[for_await]