Skip to content

Commit

Permalink
fix(meta): fix column binding for fragment ids (#16133)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Apr 7, 2024
1 parent cf2dfd0 commit e811ad7
Showing 3 changed files with 16 additions and 11 deletions.
18 changes: 9 additions & 9 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
@@ -23,9 +23,9 @@ use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits,
ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId,
StreamingParallelism, TableId, VnodeBitmap, WorkerId,
actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ActorUpstreamActors,
ConnectorSplits, ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId,
SinkId, SourceId, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::meta::subscribe_response::{
@@ -1094,9 +1094,9 @@ impl CatalogController {
pub async fn get_running_actors_and_upstream_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<Vec<(ActorId, Vec<ActorId>)>> {
) -> MetaResult<Vec<(ActorId, ActorUpstreamActors)>> {
let inner = self.inner.read().await;
let actors: Vec<(ActorId, Vec<ActorId>)> = Actor::find()
let actors: Vec<(ActorId, ActorUpstreamActors)> = Actor::find()
.select_only()
.column(actor::Column::ActorId)
.column(actor::Column::UpstreamActorIds)
@@ -1236,7 +1236,7 @@ impl CatalogController {
&self,
) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
let inner = self.inner.read().await;
let mut fragments: Vec<(FragmentId, Vec<FragmentId>, i32, StreamNode)> = Fragment::find()
let mut fragments: Vec<(FragmentId, I32Array, i32, StreamNode)> = Fragment::find()
.select_only()
.columns([
fragment::Column::FragmentId,
@@ -1252,13 +1252,13 @@ impl CatalogController {
let mut source_fragment_ids = HashMap::new();
for (fragment_id, upstream_fragment_id, _, stream_node) in fragments {
if let Some(source_id) = stream_node.to_protobuf().find_source_backfill() {
if upstream_fragment_id.len() != 1 {
bail!("SourceBackfill should have only one upstream fragment, found {} for fragment {}", upstream_fragment_id.len(), fragment_id);
if upstream_fragment_id.inner_ref().len() != 1 {
bail!("SourceBackfill should have only one upstream fragment, found {} for fragment {}", upstream_fragment_id.inner_ref().len(), fragment_id);
}
source_fragment_ids
.entry(source_id as SourceId)
.or_insert_with(BTreeSet::new)
.insert((fragment_id, upstream_fragment_id[0]));
.insert((fragment_id, upstream_fragment_id.inner_ref()[0]));
}
}
Ok(source_fragment_ids)
3 changes: 2 additions & 1 deletion src/meta/src/controller/user.rs
Original file line number Diff line number Diff line change
@@ -294,7 +294,8 @@ impl CatalogController {
if *privilege.with_grant_option.as_ref() {
on_conflict.update_columns([user_privilege::Column::WithGrantOption]);
} else {
on_conflict.do_nothing();
// Workaround to support MYSQL for `DO NOTHING`.
on_conflict.update_column(user_privilege::Column::UserId);
}

UserPrivilege::insert(privilege)
6 changes: 5 additions & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
@@ -653,7 +653,11 @@ impl MetadataManager {
.map(|(id, actors)| {
(
id as ActorId,
actors.into_iter().map(|id| id as ActorId).collect(),
actors
.into_inner()
.into_iter()
.flat_map(|(_, ids)| ids.into_iter().map(|id| id as ActorId))
.collect(),
)
})
.collect())

0 comments on commit e811ad7

Please sign in to comment.