Skip to content

Commit

Permalink
fix(streaming): correctly retrieve initial split assignments from com…
Browse files Browse the repository at this point in the history
…bined mutation for sink-into-table (#18356) (#18388)

Signed-off-by: Bugen Zhao <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
  • Loading branch information
github-actions[bot] and BugenZhao authored Sep 4, 2024
1 parent 6d81fa8 commit 08d291a
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 47 deletions.
50 changes: 50 additions & 0 deletions e2e_test/source_inline/kafka/issue_18308.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
control substitution on

system ok
rpk topic create test-topic-18308

statement ok
CREATE SOURCE kafkasource (
id int,
name string,
)
WITH (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test-topic-18308',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE compact_table (
id int,
name varchar,
PRIMARY KEY (id)
);

statement ok
CREATE SINK table_sink INTO compact_table AS SELECT * FROM kafkasource;

system ok
echo '{ "id": 1, "name": "xxchan" }' | rpk topic produce test-topic-18308

sleep 5s

statement ok
flush;

query IT
SELECT * FROM compact_table;
----
1 xxchan

statement ok
DROP SINK table_sink;

statement ok
DROP TABLE compact_table;

statement ok
DROP SOURCE kafkasource;

system ok
rpk topic delete test-topic-18308
54 changes: 50 additions & 4 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,15 @@ where
pub const INVALID_EPOCH: u64 = 0;

type UpstreamFragmentId = FragmentId;
type SplitAssignments = HashMap<ActorId, Vec<SplitImpl>>;

#[derive(Debug, Clone, PartialEq)]
pub struct UpdateMutation {
pub dispatchers: HashMap<ActorId, Vec<DispatcherUpdate>>,
pub merges: HashMap<(ActorId, UpstreamFragmentId), MergeUpdate>,
pub vnode_bitmaps: HashMap<ActorId, Arc<Bitmap>>,
pub dropped_actors: HashSet<ActorId>,
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
pub actor_splits: SplitAssignments,
pub actor_new_dispatchers: HashMap<ActorId, Vec<PbDispatcher>>,
}

Expand All @@ -280,7 +281,7 @@ pub struct AddMutation {
pub adds: HashMap<ActorId, Vec<PbDispatcher>>,
pub added_actors: HashSet<ActorId>,
// TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
pub splits: HashMap<ActorId, Vec<SplitImpl>>,
pub splits: SplitAssignments,
pub pause: bool,
/// (`upstream_mv_table_id`, `subscriber_id`)
pub subscriptions_to_add: Vec<(TableId, u32)>,
Expand All @@ -292,7 +293,7 @@ pub enum Mutation {
Stop(HashSet<ActorId>),
Update(UpdateMutation),
Add(AddMutation),
SourceChangeSplit(HashMap<ActorId, Vec<SplitImpl>>),
SourceChangeSplit(SplitAssignments),
Pause,
Resume,
Throttle(HashMap<ActorId, Option<u32>>),
Expand Down Expand Up @@ -382,6 +383,51 @@ impl Barrier {
.map_or(false, |actors| actors.contains(&actor_id))
}

/// Get the initial split assignments for the actor with `actor_id`.
///
/// This should only be called on the initial barrier received by the executor. It must be
///
/// - `Add` mutation when it's a new streaming job, or recovery.
/// - `Update` mutation when it's created for scaling.
/// - `AddAndUpdate` mutation when it's created for sink-into-table.
///
/// Note that `SourceChangeSplit` is **not** included, because it's only used for changing splits
/// of existing executors.
pub fn initial_split_assignment(&self, actor_id: ActorId) -> Option<&[SplitImpl]> {
match self.mutation.as_deref()? {
Mutation::Update(UpdateMutation { actor_splits, .. })
| Mutation::Add(AddMutation {
splits: actor_splits,
..
}) => actor_splits.get(&actor_id),

Mutation::AddAndUpdate(
AddMutation {
splits: add_actor_splits,
..
},
UpdateMutation {
actor_splits: update_actor_splits,
..
},
) => add_actor_splits
.get(&actor_id)
// `Add` and `Update` should apply to different fragments, so we don't need to merge them.
.or_else(|| update_actor_splits.get(&actor_id)),

_ => {
if cfg!(debug_assertions) {
panic!(
"the initial mutation of the barrier should not be {:?}",
self.mutation
);
}
None
}
}
.map(|s| s.as_slice())
}

/// Get all actors that to be stopped (dropped) by this barrier.
pub fn all_stop_actors(&self) -> Option<&HashSet<ActorId>> {
match self.mutation.as_deref() {
Expand Down Expand Up @@ -563,7 +609,7 @@ impl Mutation {
}

fn to_protobuf(&self) -> PbMutation {
let actor_splits_to_protobuf = |actor_splits: &HashMap<ActorId, Vec<SplitImpl>>| {
let actor_splits_to_protobuf = |actor_splits: &SplitAssignments| {
actor_splits
.iter()
.map(|(&actor_id, splits)| {
Expand Down
15 changes: 3 additions & 12 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use super::{
use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::stream_reader::StreamReaderWithPause;
use crate::executor::{AddMutation, UpdateMutation};
use crate::executor::UpdateMutation;

/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
/// some latencies in network and cost in meta.
Expand Down Expand Up @@ -316,17 +316,8 @@ impl<S: StateStore> FsSourceExecutor<S> {
let start_with_paused = barrier.is_pause_on_startup();

let mut boot_state = Vec::default();
if let Some(
Mutation::Add(AddMutation { splits, .. })
| Mutation::Update(UpdateMutation {
actor_splits: splits,
..
}),
) = barrier.mutation.as_deref()
{
if let Some(splits) = splits.get(&self.actor_ctx.id) {
boot_state.clone_from(splits);
}
if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) {
boot_state = splits.to_vec();
}

self.stream_source_core
Expand Down
18 changes: 4 additions & 14 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::{apply_rate_limit, get_split_offset_col_idx};
use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
use crate::executor::{AddMutation, UpdateMutation};
use crate::executor::UpdateMutation;

#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum BackfillState {
Expand Down Expand Up @@ -245,20 +245,10 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
};

let mut owned_splits = Vec::default();
if let Some(mutation) = barrier.mutation.as_ref() {
match mutation.as_ref() {
Mutation::Add(AddMutation { splits, .. })
| Mutation::Update(UpdateMutation {
actor_splits: splits,
..
}) => {
if let Some(splits) = splits.get(&self.actor_ctx.id) {
owned_splits.clone_from(splits);
}
}
_ => {}
}
if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) {
owned_splits = splits.to_vec();
}

self.backfill_state_store.init_epoch(barrier.epoch);

let mut backfill_states: BackfillStates = HashMap::new();
Expand Down
22 changes: 5 additions & 17 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use super::{
use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::stream_reader::StreamReaderWithPause;
use crate::executor::{AddMutation, UpdateMutation};
use crate::executor::UpdateMutation;

/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to
/// some latencies in network and cost in meta.
Expand Down Expand Up @@ -445,22 +445,9 @@ impl<S: StateStore> SourceExecutor<S> {
};

let mut boot_state = Vec::default();
if let Some(
Mutation::Add(AddMutation { splits, .. })
| Mutation::Update(UpdateMutation {
actor_splits: splits,
..
}),
) = barrier.mutation.as_deref()
{
if let Some(splits) = splits.get(&self.actor_ctx.id) {
tracing::debug!(
"source exector: actor {:?} boot with splits: {:?}",
self.actor_ctx.id,
splits
);
boot_state.clone_from(splits);
}
if let Some(splits) = barrier.initial_split_assignment(self.actor_ctx.id) {
tracing::debug!(?splits, "boot with splits");
boot_state = splits.to_vec();
}

core.split_state_store.init_epoch(barrier.epoch);
Expand Down Expand Up @@ -889,6 +876,7 @@ mod tests {

use super::*;
use crate::executor::source::{default_source_internal_table, SourceStateTableHandler};
use crate::executor::AddMutation;

const MOCK_SOURCE_NAME: &str = "mock_source";

Expand Down

0 comments on commit 08d291a

Please sign in to comment.