Skip to content

Commit

Permalink
feat(rise-ctl): add fix dirty upstream fragment ids for streaming job (
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Apr 11, 2024
1 parent bdde2a7 commit 8671339
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/ctl/src/cmd_impl/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod fix_table_fragments;
mod meta_store;

pub use fix_table_fragments::*;
pub use meta_store::*;
76 changes: 76 additions & 0 deletions src/ctl/src/cmd_impl/debug/fix_table_fragments.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use etcd_client::ConnectOptions;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
use risingwave_meta::model::{MetadataModel, TableFragments};
use risingwave_meta::storage::{EtcdMetaStore, WrappedEtcdClient};
use risingwave_pb::stream_plan::stream_node::NodeBody;

use crate::DebugCommon;

pub async fn fix_table_fragments(
common: DebugCommon,
table_id: u32,
dirty_fragment_ids: Vec<u32>,
) -> anyhow::Result<()> {
let DebugCommon {
etcd_endpoints,
etcd_username,
etcd_password,
enable_etcd_auth,
..
} = common;

let client = if enable_etcd_auth {
let options = ConnectOptions::default().with_user(
etcd_username.clone().unwrap_or_default(),
etcd_password.clone().unwrap_or_default(),
);
WrappedEtcdClient::connect(etcd_endpoints.clone(), Some(options), true).await?
} else {
WrappedEtcdClient::connect(etcd_endpoints.clone(), None, false).await?
};

let meta_store = EtcdMetaStore::new(client);

let mut table_fragments = TableFragments::select(&meta_store, &table_id)
.await?
.expect("table fragments not found");

for fragment in table_fragments.fragments.values_mut() {
fragment
.upstream_fragment_ids
.retain(|id| !dirty_fragment_ids.contains(id));
for actor in &mut fragment.actors {
visit_stream_node_cont(actor.nodes.as_mut().unwrap(), |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
node.input.retain_mut(|input| {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body
&& dirty_fragment_ids.contains(&merge_node.upstream_fragment_id)
{
false
} else {
true
}
});
}
true
})
}
}

table_fragments.insert(&meta_store).await?;
Ok(())
}
17 changes: 17 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ pub enum DebugCommands {
#[command(flatten)]
common: DebugCommon,
},
/// Fix table fragments by cleaning up some un-exist fragments, which happens when the upstream
/// streaming job is failed to create and the fragments are not cleaned up due to some unidentified issues.
FixDirtyUpstreams {
#[command(flatten)]
common: DebugCommon,

#[clap(long)]
table_id: u32,

#[clap(long, value_delimiter = ',')]
dirty_fragment_ids: Vec<u32>,
},
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -853,6 +865,11 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
.await?
}
Commands::Debug(DebugCommands::Dump { common }) => cmd_impl::debug::dump(common).await?,
Commands::Debug(DebugCommands::FixDirtyUpstreams {
common,
table_id,
dirty_fragment_ids,
}) => cmd_impl::debug::fix_table_fragments(common, table_id, dirty_fragment_ids).await?,
Commands::Throttle(ThrottleCommands::Source(args)) => {
apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Source, args).await?
}
Expand Down

0 comments on commit 8671339

Please sign in to comment.