Skip to content

Commit

Permalink
fix: only processing streaming jobs that are in the 'created' state i…
Browse files Browse the repository at this point in the history
…n auto-scaling (#15000) (#15009)
  • Loading branch information
github-actions[bot] authored Feb 5, 2024
1 parent 5528bbf commit 94363df
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
4 changes: 3 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::table_fragments::State;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
Expand Down Expand Up @@ -614,7 +615,7 @@ impl GlobalBarrierManagerContext {
let table_parallelisms: HashMap<_, _> = {
let streaming_parallelisms = mgr
.catalog_controller
.get_all_streaming_parallelisms()
.get_all_created_streaming_parallelisms()
.await?;

streaming_parallelisms
Expand Down Expand Up @@ -758,6 +759,7 @@ impl GlobalBarrierManagerContext {
guard
.table_fragments()
.iter()
.filter(|&(_, table)| matches!(table.state(), State::Created))
.map(|(table_id, table)| {
let target_parallelism =
derive_target_parallelism_for_custom(current_parallelism, table);
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2134,12 +2134,13 @@ impl CatalogController {
.collect())
}

pub async fn get_all_streaming_parallelisms(
pub async fn get_all_created_streaming_parallelisms(
&self,
) -> MetaResult<HashMap<ObjectId, StreamingParallelism>> {
let inner = self.inner.read().await;

let job_parallelisms = StreamingJob::find()
.filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
.select_only()
.columns([
streaming_job::Column::JobId,
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolic
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment};
use risingwave_pb::meta::table_fragments::{self, ActorStatus, Fragment, State};
use risingwave_pb::meta::FragmentParallelUnitMappings;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, StreamNode};
Expand Down Expand Up @@ -2545,6 +2545,7 @@ impl GlobalStreamManager {
guard
.table_fragments()
.iter()
.filter(|&(_, table)| matches!(table.state(), State::Created))
.map(|(table_id, table)| (table_id.table_id, table.assigned_parallelism))
.collect()
};
Expand Down Expand Up @@ -2591,7 +2592,7 @@ impl GlobalStreamManager {
let table_parallelisms: HashMap<_, _> = {
let streaming_parallelisms = mgr
.catalog_controller
.get_all_streaming_parallelisms()
.get_all_created_streaming_parallelisms()
.await?;

streaming_parallelisms
Expand Down

0 comments on commit 94363df

Please sign in to comment.