Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql-backend): support table replace for sql-backend #14415

Merged
merged 37 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
21b220b
feat: introduce metadata manager and adapt it in all RPC services, da…
yezizp2012 Dec 22, 2023
c974027
clippy
yezizp2012 Dec 25, 2023
893e8a1
fix ut
yezizp2012 Dec 25, 2023
01798ae
fmt
yezizp2012 Dec 26, 2023
c481cef
fix
yezizp2012 Dec 26, 2023
46ce96d
fix
yezizp2012 Dec 26, 2023
d83e7b5
Merge branch 'main' into feat/metadata-mgr
yezizp2012 Dec 26, 2023
79501a6
Merge branch 'main' into feat/metadata-mgr
yezizp2012 Dec 28, 2023
7b5da11
Merge branch 'main' into feat/metadata-mgr
yezizp2012 Dec 28, 2023
b74284e
fix
yezizp2012 Dec 28, 2023
ba50fc4
fix
yezizp2012 Dec 29, 2023
f4c6c9c
Merge branch 'main' into feat/metadata-mgr
yezizp2012 Dec 29, 2023
870b3b9
feat: add more functions for sql-based controllers and some bug fix
yezizp2012 Dec 22, 2023
82fd7d7
feat: support clean, background mv recover, migration in recovery
yezizp2012 Dec 22, 2023
1aa21ec
fix migration
yezizp2012 Dec 26, 2023
693cf91
fmt
yezizp2012 Dec 29, 2023
8eeb0da
Merge branch 'main' into feat/support-v2-recovery
yezizp2012 Jan 2, 2024
7b3ee45
feat: support create streaming job in sql backend
yezizp2012 Dec 22, 2023
aff989b
fix source streaming job
yezizp2012 Dec 27, 2023
b0438bc
some fix and enable backgroud ddl
yezizp2012 Dec 27, 2023
e85c9db
fix clean and recoverable streaming jobs
yezizp2012 Dec 28, 2023
fa6f05e
resolve copyright
yezizp2012 Jan 2, 2024
8ad877d
update cluster version in object
yezizp2012 Jan 2, 2024
a1f3836
fix
yezizp2012 Jan 8, 2024
bc452ad
Merge branch 'main' into feat/support-v2-create-streaming-job
yezizp2012 Jan 8, 2024
56f23c9
fix ut
yezizp2012 Jan 8, 2024
b19ccf8
clippy
yezizp2012 Jan 9, 2024
f889fc4
Merge branch 'main' into feat/support-v2-create-streaming-job
yezizp2012 Jan 9, 2024
0f117be
revert debug
yezizp2012 Jan 9, 2024
8aecb4b
feat: support throttle rate limit for sql-backend
yezizp2012 Jan 2, 2024
ede0956
feat: support table replace for sql-backend
yezizp2012 Jan 8, 2024
0dc7ea1
fix apply
yezizp2012 Jan 9, 2024
2e008a8
Merge branch 'main' into feat/support-v2-replace
yezizp2012 Jan 11, 2024
bb61837
revert debug
yezizp2012 Jan 11, 2024
eb32bac
resolve comments
yezizp2012 Jan 16, 2024
5b75f2e
Merge branch 'main' into feat/support-v2-replace
yezizp2012 Jan 16, 2024
e523f7e
fmt
yezizp2012 Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType};
use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable};
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
Expand Down Expand Up @@ -187,10 +187,28 @@ impl From<PbTable> for ActiveModel {
let table_type = pb_table.table_type();
let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();

let fragment_id = if pb_table.fragment_id == u32::MAX - 1 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are some workaround to determinate wether it's new created or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this is the placeholder ID. Can we extract a constant as it's not intuitive?

NotSet
} else {
Set(Some(pb_table.fragment_id as FragmentId))
};
let dml_fragment_id = pb_table
.dml_fragment_id
.map(|x| Set(Some(x as FragmentId)))
.unwrap_or_default();
let optional_associated_source_id =
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
pb_table.optional_associated_source_id
{
Set(Some(src_id as SourceId))
} else {
NotSet
};

Self {
table_id: Set(pb_table.id as _),
name: Set(pb_table.name),
optional_associated_source_id: NotSet,
optional_associated_source_id,
table_type: Set(table_type.into()),
belongs_to_job_id: Set(None),
columns: Set(pb_table.columns.into()),
Expand All @@ -199,7 +217,7 @@ impl From<PbTable> for ActiveModel {
stream_key: Set(pb_table.stream_key.into()),
append_only: Set(pb_table.append_only),
properties: Set(pb_table.properties.into()),
fragment_id: NotSet,
fragment_id,
vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
value_indices: Set(pb_table.value_indices.into()),
Expand All @@ -208,7 +226,7 @@ impl From<PbTable> for ActiveModel {
read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
watermark_indices: Set(pb_table.watermark_indices.into()),
dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
dml_fragment_id: NotSet,
dml_fragment_id,
cardinality: Set(pb_table.cardinality.map(|x| x.into())),
cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
description: Set(pb_table.description),
Expand Down
53 changes: 32 additions & 21 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,11 +777,8 @@ impl CommandContext {
Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement config change funcs in v2");
};
mgr.fragment_manager
self.barrier_manager_context
.metadata_manager
.update_actor_splits_by_split_assignment(split_assignment)
.await?;
self.barrier_manager_context
Expand Down Expand Up @@ -981,26 +978,40 @@ impl CommandContext {
dispatchers,
init_split_assignment,
}) => {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement replace funcs in v2");
};
let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id()));

// Tell compute nodes to drop actors.
let node_actors = mgr.fragment_manager.table_node_actors(&table_ids).await?;
let node_actors = self
.barrier_manager_context
.metadata_manager
.get_worker_actor_ids(table_ids)
.await?;
self.clean_up(node_actors).await?;

// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment.clone(),
)
.await?;
match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment.clone(),
)
.await?;
}
MetadataManager::V2(mgr) => {
// Update actors and actor_dispatchers for new table fragments.
mgr.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}
}

// Apply the split changes in source manager.
self.barrier_manager_context
Expand Down
47 changes: 45 additions & 2 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::controller::utils::{
};
use crate::manager::{ActorInfos, LocalNotification};
use crate::stream::SplitAssignment;
use crate::MetaResult;
use crate::{MetaError, MetaResult};

impl CatalogControllerInner {
/// List all fragment vnode mapping info for all CREATED streaming jobs.
Expand Down Expand Up @@ -986,15 +986,58 @@ impl CatalogController {
Ok(node_actors)
}

pub async fn get_worker_actor_ids(
&self,
job_ids: Vec<ObjectId>,
) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
let inner = self.inner.read().await;
let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?;
let actor_pu: Vec<(ActorId, i32)> = Actor::find()
.select_only()
.columns([actor::Column::ActorId, actor::Column::ParallelUnitId])
.join(JoinType::InnerJoin, actor::Relation::Fragment.def())
.filter(fragment::Column::JobId.is_in(job_ids))
.into_tuple()
.all(&inner.db)
.await?;

let mut worker_actors = BTreeMap::new();
for (actor_id, pu_id) in actor_pu {
let worker_id = parallel_units_map
.get(&(pu_id as _))
.unwrap()
.worker_node_id as WorkerId;
worker_actors
.entry(worker_id)
.or_insert_with(Vec::new)
.push(actor_id);
}

Ok(worker_actors)
}

pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;
for assignments in split_assignment.values() {
for (actor_id, splits) in assignments {
let actor_splits: Option<ConnectorSplits> = Actor::find_by_id(*actor_id as ActorId)
.select_only()
.column(actor::Column::Splits)
.into_tuple()
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("actor_id", actor_id))?;

let mut actor_splits = actor_splits
.map(|splits| splits.0.splits)
.unwrap_or_default();
actor_splits.extend(splits.iter().map(Into::into));

Actor::update(actor::ActiveModel {
actor_id: Set(*actor_id as _),
splits: Set(Some(ConnectorSplits(PbConnectorSplits {
splits: splits.iter().map(Into::into).collect(),
splits: actor_splits,
}))),
..Default::default()
})
Expand Down
Loading
Loading