Skip to content

Commit

Permalink
finish basic development
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 26, 2024
1 parent 62ff350 commit a4e1d97
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 42 deletions.
6 changes: 4 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ message AddMutation {
// We may embed a pause mutation here.
// TODO: we may allow multiple mutations in a single barrier.
bool pause = 4;
// upstream_mv_table_id -> subscription_id
map<uint32, uint32> subscriptions_to_add = 5;
}

message StopMutation {
Expand Down Expand Up @@ -101,8 +103,8 @@ message CreateSubscriptionMutation {
}

message DropSubscriptionMutation {
uint32 subscription_id = 1;
uint32 upstream_mv_table_id = 2;
// subscriber_id -> upstream_mv_table_id
map<uint32, uint32> subscriptions_to_drop = 1;
}

message BarrierMutation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,27 @@ async fn read_hummock_snapshot_groups(
})
.collect())
}

#[derive(Fields)]
struct RwHummockTableChangeLog {
#[primary_key]
table_id: i32,
change_log: JsonbVal,
}

#[system_catalog(table, "rw_catalog.rw_hummock_table_change_log")]
async fn read_hummock_snapshot_group(
reader: &SysCatalogReaderImpl,
) -> Result<
Vec<crate::catalog::system_catalog::rw_catalog::rw_hummock_version::RwHummockTableChangeLog>,
> {
let version = reader.meta_client.get_hummock_current_version().await?;
Ok(version
.table_change_log
.iter()
.map(|(table_id, change_log)| RwHummockTableChangeLog {
table_id: table_id.table_id as i32,
change_log: json!(change_log.to_protobuf()).into(),
})
.collect())
}
31 changes: 29 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ pub enum Command {
info: CreateStreamingJobCommandInfo,
snapshot_backfill_info: SnapshotBackfillInfo,
},
FinishCreateSnapshotBackfillStreamingJobs(HashMap<TableId, SnapshotBackfillInfo>),
/// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given
/// table fragment.
///
Expand Down Expand Up @@ -384,6 +385,7 @@ impl Command {
.collect(),
}),
Command::ReplaceTable(plan) => Some(plan.actor_changes()),
Command::FinishCreateSnapshotBackfillStreamingJobs(_) => None,
Command::SourceSplitAssignment(_) => None,
Command::Throttle(_) => None,
Command::CreateSubscription { .. } => None,
Expand Down Expand Up @@ -591,6 +593,7 @@ impl CommandContext {
actor_splits,
// If the cluster is already paused, the new actors should be paused too.
pause: self.current_paused_reason.is_some(),
subscriptions_to_add: Default::default(),
}));

if let Some(ReplaceTablePlan {
Expand Down Expand Up @@ -647,12 +650,33 @@ impl CommandContext {
.values()
.flat_map(build_actor_connector_splits)
.collect();
let subscriptions_to_add = snapshot_backfill_info
.upstream_mv_table_ids
.iter()
.map(|table_id| (table_id.table_id, table_fragments.table_id().table_id))
.collect();

Some(Mutation::Add(AddMutation {
actor_dispatchers,
added_actors,
actor_splits,
// If the cluster is already paused, the new actors should be paused too.
pause: self.current_paused_reason.is_some(),
subscriptions_to_add,
}))
}
Command::FinishCreateSnapshotBackfillStreamingJobs(jobs_to_finish) => {
Some(Mutation::DropSubscription(DropSubscriptionMutation {
subscriptions_to_drop: jobs_to_finish
.iter()
.flat_map(|(table_id, backfill_info)| {
backfill_info.upstream_mv_table_ids.iter().map(
move |upstream_table_id| {
(table_id.table_id, upstream_table_id.table_id)
},
)
})
.collect(),
}))
}

Expand Down Expand Up @@ -828,8 +852,10 @@ impl CommandContext {
upstream_mv_table_id,
subscription_id,
} => Some(Mutation::DropSubscription(DropSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
subscriptions_to_drop: HashMap::from_iter([(
*subscription_id,
upstream_mv_table_id.table_id,
)]),
})),
};

Expand Down Expand Up @@ -1284,6 +1310,7 @@ impl CommandContext {
}
},
Command::DropSubscription { .. } => {}
Command::FinishCreateSnapshotBackfillStreamingJobs(_) => {}
}

Ok(())
Expand Down
34 changes: 27 additions & 7 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ pub struct GlobalBarrierManager {
active_streaming_nodes: ActiveStreamingWorkerNodes,

control_stream_manager: ControlStreamManager,

finished_table_ids: HashMap<TableId, SnapshotBackfillInfo>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -699,7 +701,7 @@ enum CompletingCommand {
None,
Completing {
command_ctx: Arc<CommandContext>,
table_ids_to_finish: HashSet<TableId>,
table_ids_to_finish: HashMap<TableId, SnapshotBackfillInfo>,

// The join handle of a spawned task that completes the barrier.
// The return value indicate whether there is some create streaming job command
Expand Down Expand Up @@ -764,6 +766,7 @@ impl GlobalBarrierManager {
pending_non_checkpoint_barriers: Vec::new(),
active_streaming_nodes,
control_stream_manager,
finished_table_ids: Default::default(),
}
}

Expand Down Expand Up @@ -974,7 +977,7 @@ impl GlobalBarrierManager {
assert_matches!(output.command_ctx.kind, BarrierKind::Barrier);
self.scheduled_barriers.force_checkpoint_in_next_barrier();
}
self.control_stream_manager.remove_partial_graph(output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect());
self.finished_table_ids.extend(output.table_ids_to_finish);
}
Err(e) => {
self.failure_recovery(e).await;
Expand All @@ -997,7 +1000,7 @@ impl GlobalBarrierManager {
/// Handle the new barrier from the scheduled queue and inject it.
fn handle_new_barrier(&mut self, scheduled: Scheduled) -> MetaResult<()> {
let Scheduled {
command,
mut command,
mut notifiers,
send_latency_timer,
checkpoint,
Expand All @@ -1018,6 +1021,19 @@ impl GlobalBarrierManager {

debug!(prev_epoch = prev_epoch.value().0, "inject barrier");

if let (BarrierKind::Checkpoint(_), Command::Plain(None)) = (&kind, &command)
&& !self.finished_table_ids.is_empty()
{
let finished_table_ids = take(&mut self.finished_table_ids);
self.control_stream_manager.remove_partial_graph(
finished_table_ids
.keys()
.map(|table_id| table_id.table_id)
.collect(),
);
command = Command::FinishCreateSnapshotBackfillStreamingJobs(finished_table_ids);
}

// Tracing related stuff
prev_epoch.span().in_scope(|| {
tracing::info!(target: "rw_tracing", epoch = curr_epoch.value().0, "new barrier enqueued");
Expand Down Expand Up @@ -1323,15 +1339,14 @@ impl GlobalBarrierManagerContext {
struct BarrierCompleteOutput {
command_ctx: Arc<CommandContext>,
require_next_checkpoint: bool,
table_ids_to_finish: HashSet<TableId>,
table_ids_to_finish: HashMap<TableId, SnapshotBackfillInfo>,
}

impl CheckpointControl {
fn collect_backfill_progress(&self) -> HashMap<TableId, (u64, HashSet<TableId>)> {
let temp = 0;
// TODO: filter out creating job that has merge to global graph and don't need log store
self.creating_streaming_job_controls
.iter()
.filter(|(_, job)| job.status.is_creating() || !job.inflight_barrier_queue.is_empty())
.map(|(table_id, creating_job)| {
(
*table_id,
Expand All @@ -1356,7 +1371,12 @@ impl CheckpointControl {
{
let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty");
assert!(node.state.finishing_table_ids.is_empty());
let table_ids_to_finish = node.state.finished_table_ids.keys().cloned().collect();
let table_ids_to_finish = node
.state
.finished_table_ids
.iter()
.map(|(table_id, job)| (*table_id, job.snapshot_backfill_info.clone()))
.collect();
let command_ctx = node.command_ctx.clone();
let join_handle = tokio::spawn(
self.context
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl GlobalBarrierManager {
added_actors: Default::default(),
actor_splits: build_actor_connector_splits(&source_split_assignments),
pause: paused_reason.is_some(),
subscriptions_to_add: Default::default(),
})));

// Use a different `curr_epoch` for each recovery attempt.
Expand Down
7 changes: 7 additions & 0 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,13 @@ where
op_consistency_level: StateTableOpConsistencyLevel,
) -> StreamExecutorResult<()> {
if self.op_consistency_level != op_consistency_level {
info!(
?new_epoch,
prev_op_consistency_level = ?self.op_consistency_level,
?op_consistency_level,
table_id = self.table_id.table_id,
"switch to new op consistency level"
);
self.commit_inner(new_epoch, Some(op_consistency_level))
.await
} else {
Expand Down
46 changes: 34 additions & 12 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ pub struct AddMutation {
// TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
pub splits: HashMap<ActorId, Vec<SplitImpl>>,
pub pause: bool,
/// `upstream_mv_table_id` -> subscriber_id
pub subscriptions_to_add: HashMap<TableId, u32>,
}

/// See [`PbMutation`] for the semantics of each mutation.
Expand All @@ -292,9 +294,9 @@ pub enum Mutation {
subscription_id: u32,
upstream_mv_table_id: TableId,
},
DropSubscription {
subscription_id: u32,
upstream_mv_table_id: TableId,
DropSubscriptions {
/// `subscription_id` -> `upstream_mv_table_id`
subscriptions_to_drop: HashMap<u32, TableId>,
},
}

Expand Down Expand Up @@ -448,7 +450,7 @@ impl Barrier {
| Mutation::SourceChangeSplit(_)
| Mutation::Throttle(_)
| Mutation::CreateSubscription { .. }
| Mutation::DropSubscription { .. } => false,
| Mutation::DropSubscriptions { .. } => false,
}
}

Expand Down Expand Up @@ -581,6 +583,7 @@ impl Mutation {
added_actors,
splits,
pause,
subscriptions_to_add,
}) => PbMutation::Add(PbAddMutation {
actor_dispatchers: adds
.iter()
Expand All @@ -596,6 +599,10 @@ impl Mutation {
added_actors: added_actors.iter().copied().collect(),
actor_splits: actor_splits_to_protobuf(splits),
pause: *pause,
subscriptions_to_add: subscriptions_to_add
.iter()
.map(|(table_id, subscription_id)| (table_id.table_id, *subscription_id))
.collect(),
}),
Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
actor_splits: changes
Expand Down Expand Up @@ -636,12 +643,15 @@ impl Mutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
}),
Mutation::DropSubscription {
upstream_mv_table_id,
subscription_id,
Mutation::DropSubscriptions {
subscriptions_to_drop,
} => PbMutation::DropSubscription(DropSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
subscriptions_to_drop: subscriptions_to_drop
.iter()
.map(|(subscription_id, upstream_mv_table_id)| {
(*subscription_id, upstream_mv_table_id.table_id)
})
.collect(),
}),
}
}
Expand Down Expand Up @@ -712,6 +722,13 @@ impl Mutation {
})
.collect(),
pause: add.pause,
subscriptions_to_add: add
.subscriptions_to_add
.iter()
.map(|(upstream_mv_table_id, subscription_id)| {
(TableId::new(*upstream_mv_table_id), *subscription_id)
})
.collect(),
}),

PbMutation::Splits(s) => {
Expand Down Expand Up @@ -744,9 +761,14 @@ impl Mutation {
upstream_mv_table_id: TableId::new(create.upstream_mv_table_id),
subscription_id: create.subscription_id,
},
PbMutation::DropSubscription(drop) => Mutation::DropSubscription {
upstream_mv_table_id: TableId::new(drop.upstream_mv_table_id),
subscription_id: drop.subscription_id,
PbMutation::DropSubscription(drop) => Mutation::DropSubscriptions {
subscriptions_to_drop: drop
.subscriptions_to_drop
.iter()
.map(|(subscription_id, upstream_mv_table_id)| {
(*subscription_id, TableId::new(*upstream_mv_table_id))
})
.collect(),
},
PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
[BarrierMutation {
Expand Down
Loading

0 comments on commit a4e1d97

Please sign in to comment.