Skip to content

Commit

Permalink
refactor(meta): readability improvements on source manager
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 23, 2023
1 parent 4f354b2 commit 1ffde49
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 184 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub use self::with_data_type::WithDataType;
/// A 32-bit floating point type with total order.
pub type F32 = ordered_float::OrderedFloat<f32>;


/// A 64-bit floating point type with total order.
pub type F64 = ordered_float::OrderedFloat<f64>;

Expand Down
1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ workspace-hack = { path = "../workspace-hack" }

[dev-dependencies]
assert_matches = "1"
expect-test = "1.4"
maplit = "1.0.2"
rand = "0.8"
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl ScaleService for ScaleServiceImpl {

let actor_splits = self
.source_manager
.get_actor_splits()
.list_assignments()
.await
.into_iter()
.map(|(actor_id, splits)| {
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ pub struct ReplaceTablePlan {
pub init_split_assignment: SplitAssignment,
}

/// [`Command`] is the action of [`crate::barrier::GlobalBarrierManager`]. For different commands,
/// we'll build different barriers to send, and may do different stuffs after the barrier is
/// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
#[derive(Debug, Clone, strum::Display)]
pub enum Command {
Expand Down Expand Up @@ -156,8 +156,8 @@ pub enum Command {
/// of the Merge executors are changed additionally.
ReplaceTable(ReplaceTablePlan),

/// `SourceSplitAssignment` generates Plain(Mutation::Splits) for pushing initialized splits or
/// newly added splits.
/// `SourceSplitAssignment` generates a `Splits` barrier for pushing initialized splits or
/// changed splits.
SourceSplitAssignment(SplitAssignment),

/// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl GlobalBarrierManager {

// clean up source connector dirty changes.
self.source_manager
.drop_source_change(&to_drop_table_fragments)
.drop_source_fragments(&to_drop_table_fragments)
.await;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ impl ScaleController {

let actor_splits = self
.source_manager
.reallocate_splits(*fragment_id, &prev_actor_ids, &curr_actor_ids)
.migrate_splits(*fragment_id, &prev_actor_ids, &curr_actor_ids)
.await?;

fragment_stream_source_actor_splits.insert(*fragment_id, actor_splits);
Expand Down
Loading

0 comments on commit 1ffde49

Please sign in to comment.