Skip to content

Commit

Permalink
Update recovery with new method & created-state filter.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Feb 5, 2024
1 parent 823382b commit e52d5c4
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
6 changes: 5 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::State;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::PbFieldNotFound;
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -607,7 +610,7 @@ impl GlobalBarrierManagerContext {
let table_parallelisms: HashMap<_, _> = {
let streaming_parallelisms = mgr
.catalog_controller
.get_all_streaming_parallelisms()
.get_all_created_streaming_parallelisms()
.await?;

streaming_parallelisms
Expand Down Expand Up @@ -751,6 +754,7 @@ impl GlobalBarrierManagerContext {
guard
.table_fragments()
.iter()
.filter(|&(_, table)| matches!(table.state(), State::Created))
.map(|(table_id, table)| {
let target_parallelism =
derive_target_parallelism_for_custom(current_parallelism, table);
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2134,12 +2134,13 @@ impl CatalogController {
.collect())
}

pub async fn get_all_streaming_parallelisms(
pub async fn get_all_created_streaming_parallelisms(
&self,
) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
let inner = self.inner.read().await;

let job_parallelisms = StreamingJob::find()
.filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
.select_only()
.columns([
streaming_job::Column::JobId,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2574,7 +2574,7 @@ impl GlobalStreamManager {
let table_parallelisms: HashMap<_, _> = {
let streaming_parallelisms = mgr
.catalog_controller
.get_all_streaming_parallelisms()
.get_all_created_streaming_parallelisms()
.await?;

streaming_parallelisms
Expand Down

0 comments on commit e52d5c4

Please sign in to comment.