Skip to content

Commit

Permalink
feat: support table replace for sql-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Jan 9, 2024
1 parent 8aecb4b commit ede0956
Show file tree
Hide file tree
Showing 9 changed files with 580 additions and 87 deletions.
26 changes: 22 additions & 4 deletions src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType};
use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable};
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
Expand Down Expand Up @@ -187,10 +187,28 @@ impl From<PbTable> for ActiveModel {
let table_type = pb_table.table_type();
let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();

let fragment_id = if pb_table.fragment_id == u32::MAX - 1 {
NotSet
} else {
Set(Some(pb_table.fragment_id as FragmentId))
};
let dml_fragment_id = pb_table
.dml_fragment_id
.map(|x| Set(Some(x as FragmentId)))
.unwrap_or_default();
let optional_associated_source_id =
if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
pb_table.optional_associated_source_id
{
Set(Some(src_id as SourceId))
} else {
NotSet
};

Self {
table_id: Set(pb_table.id as _),
name: Set(pb_table.name),
optional_associated_source_id: NotSet,
optional_associated_source_id,
table_type: Set(table_type.into()),
belongs_to_job_id: Set(None),
columns: Set(pb_table.columns.into()),
Expand All @@ -199,7 +217,7 @@ impl From<PbTable> for ActiveModel {
stream_key: Set(pb_table.stream_key.into()),
append_only: Set(pb_table.append_only),
properties: Set(pb_table.properties.into()),
fragment_id: NotSet,
fragment_id,
vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
value_indices: Set(pb_table.value_indices.into()),
Expand All @@ -208,7 +226,7 @@ impl From<PbTable> for ActiveModel {
read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
watermark_indices: Set(pb_table.watermark_indices.into()),
dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
dml_fragment_id: NotSet,
dml_fragment_id,
cardinality: Set(pb_table.cardinality.map(|x| x.into())),
cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
description: Set(pb_table.description),
Expand Down
9 changes: 6 additions & 3 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,12 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
},
MetaBackend::Mem => MetaStoreBackend::Mem,
};
let sql_backend = opts
.sql_endpoint
.map(|endpoint| MetaStoreSqlBackend { endpoint });
// let sql_backend = opts
// .sql_endpoint
// .map(|endpoint| MetaStoreSqlBackend { endpoint });
let sql_backend = Some(MetaStoreSqlBackend {
endpoint: "postgres://postgres:@localhost:5432/postgres".to_string(),
});

validate_config(&config);

Expand Down
53 changes: 32 additions & 21 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,11 +777,8 @@ impl CommandContext {
Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement config change funcs in v2");
};
mgr.fragment_manager
self.barrier_manager_context
.metadata_manager
.update_actor_splits_by_split_assignment(split_assignment)
.await?;
self.barrier_manager_context
Expand Down Expand Up @@ -981,26 +978,40 @@ impl CommandContext {
dispatchers,
init_split_assignment,
}) => {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement replace funcs in v2");
};
let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id()));

// Tell compute nodes to drop actors.
let node_actors = mgr.fragment_manager.table_node_actors(&table_ids).await?;
let node_actors = self
.barrier_manager_context
.metadata_manager
.get_worker_actor_ids(table_ids)
.await?;
self.clean_up(node_actors).await?;

// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment.clone(),
)
.await?;
match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
old_table_fragments,
new_table_fragments,
merge_updates,
dispatchers,
init_split_assignment.clone(),
)
.await?;
}
MetadataManager::V2(mgr) => {
// Update actors and actor_dispatchers for new table fragments.
mgr.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
new_table_fragments.actor_ids(),
dispatchers.clone(),
init_split_assignment,
)
.await?;
}
}
}
}

Expand Down
47 changes: 45 additions & 2 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::controller::utils::{
};
use crate::manager::{ActorInfos, LocalNotification};
use crate::stream::SplitAssignment;
use crate::MetaResult;
use crate::{MetaError, MetaResult};

impl CatalogControllerInner {
/// List all fragment vnode mapping info for all CREATED streaming jobs.
Expand Down Expand Up @@ -986,15 +986,58 @@ impl CatalogController {
Ok(node_actors)
}

pub async fn get_worker_actor_ids(
&self,
job_ids: Vec<ObjectId>,
) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
let inner = self.inner.read().await;
let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?;
let actor_pu: Vec<(ActorId, i32)> = Actor::find()
.select_only()
.columns([actor::Column::ActorId, actor::Column::ParallelUnitId])
.join(JoinType::InnerJoin, actor::Relation::Fragment.def())
.filter(fragment::Column::JobId.is_in(job_ids))
.into_tuple()
.all(&inner.db)
.await?;

let mut worker_actors = BTreeMap::new();
for (actor_id, pu_id) in actor_pu {
let worker_id = parallel_units_map
.get(&(pu_id as _))
.unwrap()
.worker_node_id as WorkerId;
worker_actors
.entry(worker_id)
.or_insert_with(Vec::new)
.push(actor_id);
}

Ok(worker_actors)
}

pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;
for assignments in split_assignment.values() {
for (actor_id, splits) in assignments {
let actor_splits: Option<ConnectorSplits> = Actor::find_by_id(*actor_id as ActorId)
.select_only()
.column(actor::Column::Splits)
.into_tuple()
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("actor_id", actor_id))?;

let mut actor_splits = actor_splits
.map(|splits| splits.0.splits)
.unwrap_or_default();
actor_splits.extend(splits.iter().map(Into::into));

Actor::update(actor::ActiveModel {
actor_id: Set(*actor_id as _),
splits: Set(Some(ConnectorSplits(PbConnectorSplits {
splits: splits.iter().map(Into::into).collect(),
splits: actor_splits,
}))),
..Default::default()
})
Expand Down
Loading

0 comments on commit ede0956

Please sign in to comment.