diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 92ee8295447a7..71142b8290897 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -23,9 +23,9 @@ use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::StreamNode; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; use risingwave_meta_model_v2::{ - actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ConnectorSplits, - ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, SinkId, SourceId, - StreamingParallelism, TableId, VnodeBitmap, WorkerId, + actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ActorUpstreamActors, + ConnectorSplits, ExprContext, FragmentId, FragmentVnodeMapping, I32Array, JobStatus, ObjectId, + SinkId, SourceId, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; use risingwave_pb::common::PbParallelUnit; use risingwave_pb::meta::subscribe_response::{ @@ -1094,9 +1094,9 @@ impl CatalogController { pub async fn get_running_actors_and_upstream_of_fragment( &self, fragment_id: FragmentId, - ) -> MetaResult)>> { + ) -> MetaResult> { let inner = self.inner.read().await; - let actors: Vec<(ActorId, Vec)> = Actor::find() + let actors: Vec<(ActorId, ActorUpstreamActors)> = Actor::find() .select_only() .column(actor::Column::ActorId) .column(actor::Column::UpstreamActorIds) @@ -1236,7 +1236,7 @@ impl CatalogController { &self, ) -> MetaResult>> { let inner = self.inner.read().await; - let mut fragments: Vec<(FragmentId, Vec, i32, StreamNode)> = Fragment::find() + let mut fragments: Vec<(FragmentId, I32Array, i32, StreamNode)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, @@ -1252,13 +1252,13 @@ impl CatalogController { let mut source_fragment_ids = HashMap::new(); for (fragment_id, upstream_fragment_id, _, stream_node) in fragments { if let Some(source_id) = stream_node.to_protobuf().find_source_backfill() { - if upstream_fragment_id.len() != 1 { - bail!("SourceBackfill should have only one upstream fragment, found {} for fragment {}", upstream_fragment_id.len(), fragment_id); + if upstream_fragment_id.inner_ref().len() != 1 { + bail!("SourceBackfill should have only one upstream fragment, found {} for fragment {}", upstream_fragment_id.inner_ref().len(), fragment_id); } source_fragment_ids .entry(source_id as SourceId) .or_insert_with(BTreeSet::new) - .insert((fragment_id, upstream_fragment_id[0])); + .insert((fragment_id, upstream_fragment_id.inner_ref()[0])); } } Ok(source_fragment_ids) diff --git a/src/meta/src/controller/user.rs b/src/meta/src/controller/user.rs index fe20c9cd5e18d..d42fc79721807 100644 --- a/src/meta/src/controller/user.rs +++ b/src/meta/src/controller/user.rs @@ -294,7 +294,8 @@ impl CatalogController { if *privilege.with_grant_option.as_ref() { on_conflict.update_columns([user_privilege::Column::WithGrantOption]); } else { - on_conflict.do_nothing(); + // Workaround to support MYSQL for `DO NOTHING`. + on_conflict.update_column(user_privilege::Column::UserId); } UserPrivilege::insert(privilege) diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index ebb03d461a2fb..876f5c3365d36 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -653,7 +653,11 @@ impl MetadataManager { .map(|(id, actors)| { ( id as ActorId, - actors.into_iter().map(|id| id as ActorId).collect(), + actors + .into_inner() + .into_iter() + .flat_map(|(_, ids)| ids.into_iter().map(|id| id as ActorId)) + .collect(), ) }) .collect())