From 86713399ccd08f14d986391f40d32273268e4fb0 Mon Sep 17 00:00:00 2001 From: August Date: Thu, 11 Apr 2024 17:59:28 +0800 Subject: [PATCH] feat(rise-ctl): add fix dirty upstream fragment ids for streaming job (#16258) --- src/ctl/src/cmd_impl/debug.rs | 2 + .../src/cmd_impl/debug/fix_table_fragments.rs | 76 +++++++++++++++++++ src/ctl/src/lib.rs | 17 +++++ 3 files changed, 95 insertions(+) create mode 100644 src/ctl/src/cmd_impl/debug/fix_table_fragments.rs diff --git a/src/ctl/src/cmd_impl/debug.rs b/src/ctl/src/cmd_impl/debug.rs index fed9a564da71..1b7d7db5b2aa 100644 --- a/src/ctl/src/cmd_impl/debug.rs +++ b/src/ctl/src/cmd_impl/debug.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod fix_table_fragments; mod meta_store; +pub use fix_table_fragments::*; pub use meta_store::*; diff --git a/src/ctl/src/cmd_impl/debug/fix_table_fragments.rs b/src/ctl/src/cmd_impl/debug/fix_table_fragments.rs new file mode 100644 index 000000000000..627545ac178e --- /dev/null +++ b/src/ctl/src/cmd_impl/debug/fix_table_fragments.rs @@ -0,0 +1,76 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use etcd_client::ConnectOptions; +use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; +use risingwave_meta::model::{MetadataModel, TableFragments}; +use risingwave_meta::storage::{EtcdMetaStore, WrappedEtcdClient}; +use risingwave_pb::stream_plan::stream_node::NodeBody; + +use crate::DebugCommon; + +pub async fn fix_table_fragments( + common: DebugCommon, + table_id: u32, + dirty_fragment_ids: Vec, +) -> anyhow::Result<()> { + let DebugCommon { + etcd_endpoints, + etcd_username, + etcd_password, + enable_etcd_auth, + .. + } = common; + + let client = if enable_etcd_auth { + let options = ConnectOptions::default().with_user( + etcd_username.clone().unwrap_or_default(), + etcd_password.clone().unwrap_or_default(), + ); + WrappedEtcdClient::connect(etcd_endpoints.clone(), Some(options), true).await? + } else { + WrappedEtcdClient::connect(etcd_endpoints.clone(), None, false).await? + }; + + let meta_store = EtcdMetaStore::new(client); + + let mut table_fragments = TableFragments::select(&meta_store, &table_id) + .await? + .expect("table fragments not found"); + + for fragment in table_fragments.fragments.values_mut() { + fragment + .upstream_fragment_ids + .retain(|id| !dirty_fragment_ids.contains(id)); + for actor in &mut fragment.actors { + visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| { + if let Some(NodeBody::Union(_)) = node.node_body { + node.input.retain_mut(|input| { + if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body + && dirty_fragment_ids.contains(&merge_node.upstream_fragment_id) + { + false + } else { + true + } + }); + } + true + }) + } + } + + table_fragments.insert(&meta_store).await?; + Ok(()) +} diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index c41dec52aaad..a1aaa8f48c5f 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -141,6 +141,18 @@ pub enum DebugCommands { #[command(flatten)] common: DebugCommon, }, + /// Fix table fragments by cleaning up some un-exist fragments, which happens when the upstream + /// streaming job is failed to create and the fragments are not cleaned up due to some unidentified issues. + FixDirtyUpstreams { + #[command(flatten)] + common: DebugCommon, + + #[clap(long)] + table_id: u32, + + #[clap(long, value_delimiter = ',')] + dirty_fragment_ids: Vec, + }, } #[derive(Subcommand)] @@ -853,6 +865,11 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { .await? } Commands::Debug(DebugCommands::Dump { common }) => cmd_impl::debug::dump(common).await?, + Commands::Debug(DebugCommands::FixDirtyUpstreams { + common, + table_id, + dirty_fragment_ids, + }) => cmd_impl::debug::fix_table_fragments(common, table_id, dirty_fragment_ids).await?, Commands::Throttle(ThrottleCommands::Source(args)) => { apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await? }