Skip to content

Commit

Permalink
feat(stream): backfill resuming for sink (#13665)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Nov 28, 2023
1 parent edfb9b9 commit c6e91cf
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 28 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/backfill-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ shift $((OPTIND -1))

git config --global --add safe.directory /risingwave

download_and_prepare_rw "$profile" common
download_and_prepare_rw "$profile" source

################ TESTS

Expand Down
57 changes: 54 additions & 3 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

# USAGE:
# ```sh
# cargo make ci-start ci-backfill
# ./ci/scripts/run-backfill-tests.sh
# ```
# Example progress:
Expand All @@ -23,7 +22,7 @@ TEST_DIR=$PWD/e2e_test
BACKGROUND_DDL_DIR=$TEST_DIR/background_ddl
COMMON_DIR=$BACKGROUND_DDL_DIR/common

CLUSTER_PROFILE='ci-1cn-1fe-with-recovery'
CLUSTER_PROFILE='ci-1cn-1fe-kafka-with-recovery'
export RUST_LOG="risingwave_meta=debug"

run_sql_file() {
Expand Down Expand Up @@ -126,7 +125,7 @@ test_backfill_tombstone() {
WITH (
connector = 'datagen',
fields.v1._.kind = 'sequence',
datagen.rows.per.second = '1000000'
datagen.rows.per.second = '2000000'
)
FORMAT PLAIN
ENCODE JSON;
Expand All @@ -150,10 +149,62 @@ test_backfill_tombstone() {
wait
}

# Test sink backfill recovery
test_sink_backfill_recovery() {
total_records=100000
echo "--- e2e, test_sink_backfill_recovery"
cargo make ci-start $CLUSTER_PROFILE
run_sql "create table t (v1 int);"
run_sql "insert into t select * from generate_series(1, $total_records);"
run_sql "flush;"
run_sql "SET STREAMING_RATE_LIMIT = 2000;"

run_sql "
create sink s as select x.v1 as v1
from t x join t y
on x.v1 = y.v1
with (
connector='kafka',
properties.bootstrap.server='localhost:29092',
topic='s_kafka',
primary_key='v1',
allow.auto.create.topics=true,
)
FORMAT DEBEZIUM ENCODE JSON;"

# Let backfill progress a little.
sleep 3

# Check progress
sqllogictest -p 4566 -d dev 'e2e_test/background_ddl/common/validate_one_job.slt'

# Restart
restart_cluster
sleep 3

# FIXME(kwannoel): Sink's backfill progress is not recovered yet.
# Check progress
# sqllogictest -p 4566 -d dev 'e2e_test/background_ddl/common/validate_one_job.slt'

# Sink back into rw
run_sql "CREATE TABLE table_kafka (v1 int primary key)
WITH (
connector = 'kafka',
topic = 's_kafka',
properties.bootstrap.server = 'localhost:29092',
) FORMAT DEBEZIUM ENCODE JSON;"

sleep 20

# Verify data matches upstream table.
sqllogictest -p 4566 -d dev 'e2e_test/backfill/sink/validate_sink.slt'
}

main() {
set -euo pipefail
test_snapshot_and_upstream_read
test_backfill_tombstone
test_sink_backfill_recovery
}

main
6 changes: 6 additions & 0 deletions e2e_test/backfill/sink/validate_sink.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
query B
SELECT *
FROM t
FULL OUTER JOIN table_kafka ON t.v1 = table_kafka.v1
WHERE t.v1 IS NULL OR table_kafka.v1 IS NULL;
----
15 changes: 15 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,21 @@ profile:
- use: frontend
- use: compactor

ci-1cn-1fe-kafka-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
- use: minio
- use: etcd
- use: meta-node
- use: compute-node
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: zookeeper
persist-data: true
- use: kafka
persist-data: true

ci-meta-backup-test:
config-path: src/config/ci-meta-backup-test.toml
steps:
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use super::info::BarrierActorInfo;
use super::trace::TracedEpoch;
use crate::barrier::CommandChanges;
use crate::hummock::HummockManagerRef;
use crate::manager::{CatalogManagerRef, FragmentManagerRef, WorkerId};
use crate::manager::{CatalogManagerRef, DdlType, FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::stream::{
build_actor_connector_splits, ScaleControllerRef, SourceManagerRef, SplitAssignment,
Expand Down Expand Up @@ -117,6 +117,7 @@ pub enum Command {
dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
init_split_assignment: SplitAssignment,
definition: String,
ddl_type: DdlType,
},
/// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given
/// table fragment.
Expand Down
11 changes: 8 additions & 3 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1099,20 +1099,25 @@ impl GlobalBarrierManager {
let mut commands = vec![];
let version_stats = self.hummock_manager.get_version_stats().await;
let mut tracker = self.tracker.lock().await;
// Add the command to tracker.
if let Some(command) = tracker.add(
TrackingCommand {
context: node.command_ctx.clone(),
notifiers,
},
&version_stats,
) {
// Those with no actors to track can be finished immediately.
commands.push(command);
}
// Update the progress of all commands.
for progress in resps.iter().flat_map(|r| &r.create_mview_progress) {
tracing::trace!(?progress, "update progress");
if let Some(command) = tracker.update(progress, &version_stats) {
tracing::trace!(?progress, "update progress");
// Those with actors complete can be finished immediately.
if let Some(command) = tracker.update(progress, &version_stats) && !command.tracks_sink() {
tracing::trace!(?progress, "finish progress");
commands.push(command);
} else {
tracing::trace!(?progress, "update progress");
}
}
commands
Expand Down
57 changes: 50 additions & 7 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::barrier::{
Command, TableActorMap, TableDefinitionMap, TableFragmentMap, TableNotifierMap,
TableUpstreamMvCountMap,
};
use crate::manager::{FragmentManager, FragmentManagerRef};
use crate::manager::{DdlType, FragmentManager, FragmentManagerRef};
use crate::model::{ActorId, TableFragments};
use crate::MetaResult;

Expand Down Expand Up @@ -153,6 +153,8 @@ impl Progress {
/// On recovery, the stream manager will stop managing the job.
/// 2. `Recovered`. This refers to the "Recovered" type of tracking job.
/// On recovery, the barrier manager will recover and start managing the job.
/// 3. `Immediate`. Immediate jobs will immediately return,
/// but still register the lag between the job and its upstream.
pub enum TrackingJob {
New(TrackingCommand),
Recovered(RecoveredTrackingJob),
Expand Down Expand Up @@ -218,6 +220,13 @@ impl TrackingJob {
TrackingJob::Recovered(recovered) => Some(recovered.fragments.table_id()),
}
}

pub(crate) fn tracks_sink(&self) -> bool {
match self {
TrackingJob::New(command) => command.tracks_sink(),
TrackingJob::Recovered(_) => false,
}
}
}

pub struct RecoveredTrackingJob {
Expand All @@ -235,6 +244,15 @@ pub(super) struct TrackingCommand {
pub notifiers: Vec<Notifier>,
}

impl TrackingCommand {
pub fn tracks_sink(&self) -> bool {
match &self.context.command {
Command::CreateStreamingJob { ddl_type, .. } => *ddl_type == DdlType::Sink,
_ => false,
}
}
}

/// Track the progress of all creating mviews. When creation is done, `notify_finished` will be
/// called on registered notifiers.
///
Expand Down Expand Up @@ -368,12 +386,13 @@ impl CreateMviewProgressTracker {
return Some(TrackingJob::New(command));
}

let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition) =
let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition, ddl_type) =
if let Command::CreateStreamingJob {
table_fragments,
dispatchers,
upstream_mview_actors,
definition,
ddl_type,
..
} = &command.context.command
{
Expand Down Expand Up @@ -404,6 +423,7 @@ impl CreateMviewProgressTracker {
upstream_mv_count,
upstream_total_key_count,
definition.to_string(),
ddl_type,
)
} else {
unreachable!("Must be CreateStreamingJob.");
Expand All @@ -419,11 +439,34 @@ impl CreateMviewProgressTracker {
upstream_total_key_count,
definition,
);
let old = self
.progress_map
.insert(creating_mv_id, (progress, TrackingJob::New(command)));
assert!(old.is_none());
None
if *ddl_type == DdlType::Sink {
// First we duplicate a separate tracking job for sink.
// This does not need notifiers, it is solely used for
// tracking the backfill progress of sink.
// It will still be removed from progress map when
// backfill completes.
let tracking_job = TrackingJob::New(TrackingCommand {
context: command.context.clone(),
notifiers: vec![],
});
let old = self
.progress_map
.insert(creating_mv_id, (progress, tracking_job));
assert!(old.is_none());

// We return the original tracking job immediately.
// This is because sink can be decoupled with backfill progress.
// We don't need to wait for sink to finish backfill.
// This still contains the notifiers, so we can tell listeners
// that the sink job has been created.
Some(TrackingJob::New(command))
} else {
let old = self
.progress_map
.insert(creating_mv_id, (progress, TrackingJob::New(command)));
assert!(old.is_none());
None
}
}

/// Update the progress of `actor` according to the Pb struct.
Expand Down
31 changes: 31 additions & 0 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,37 @@ pub enum StreamingJob {
Source(PbSource),
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DdlType {
MaterializedView,
Sink,
Table,
Index,
Source,
}

#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for DdlType {
fn default() -> Self {
// This should not be used by mock services,
// so we can just pick an arbitrary default variant.
DdlType::Table
}
}

impl From<&StreamingJob> for DdlType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_) => DdlType::Sink,
StreamingJob::Table(_, _, _) => DdlType::Table,
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
}
}
}

impl StreamingJob {
pub fn mark_created(&mut self) {
let created_at_epoch = Some(Epoch::now().0);
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ impl DdlController {
definition: stream_job.definition(),
mv_table_id: stream_job.mv_table(),
create_type: stream_job.create_type(),
ddl_type: stream_job.into(),
};

// 4. Mark creating tables, including internal tables and the table of the stream job.
Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use uuid::Uuid;
use super::{Locations, ScaleController, ScaleControllerRef};
use crate::barrier::{BarrierScheduler, Command};
use crate::hummock::HummockManagerRef;
use crate::manager::{ClusterManagerRef, FragmentManagerRef, MetaSrvEnv};
use crate::manager::{ClusterManagerRef, DdlType, FragmentManagerRef, MetaSrvEnv};
use crate::model::{ActorId, TableFragments};
use crate::stream::SourceManagerRef;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -69,6 +69,8 @@ pub struct CreateStreamingJobContext {
pub mv_table_id: Option<u32>,

pub create_type: CreateType,

pub ddl_type: DdlType,
}

impl CreateStreamingJobContext {
Expand Down Expand Up @@ -440,6 +442,7 @@ impl GlobalStreamManager {
mv_table_id,
internal_tables,
create_type,
ddl_type,
}: CreateStreamingJobContext,
) -> MetaResult<()> {
// Register to compaction group beforehand.
Expand Down Expand Up @@ -483,6 +486,7 @@ impl GlobalStreamManager {
dispatchers,
init_split_assignment,
definition: definition.to_string(),
ddl_type,
})
.await
{
Expand Down
Loading

0 comments on commit c6e91cf

Please sign in to comment.