Skip to content

Commit

Permalink
Refactor scaling logic and enhance JobStatus enum
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Aug 6, 2024
1 parent d3451bf commit e3b1770
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub type HummockSstableObjectId = i64;
pub type FragmentId = i32;
pub type ActorId = i32;

#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "String(None)")]
pub enum JobStatus {
#[sea_orm(string_value = "INITIAL")]
Expand Down
26 changes: 22 additions & 4 deletions src/meta/src/controller/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ use risingwave_meta_model_migration::{
UnionType, WithClause, WithQuery,
};
use risingwave_meta_model_v2::actor_dispatcher::DispatcherType;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment};
use risingwave_meta_model_v2::{actor, actor_dispatcher, fragment, ActorId, FragmentId, ObjectId};
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, StreamingJob};
use risingwave_meta_model_v2::{
actor, actor_dispatcher, fragment, streaming_job, ActorId, FragmentId, ObjectId,
};
use sea_orm::{
ColumnTrait, ConnectionTrait, DbErr, EntityTrait, JoinType, QueryFilter, QuerySelect,
QueryTrait, RelationTrait, Statement, TransactionTrait,
Expand Down Expand Up @@ -154,6 +156,8 @@ pub struct RescheduleWorkingSet {

pub fragment_downstreams: HashMap<FragmentId, Vec<(FragmentId, DispatcherType)>>,
pub fragment_upstreams: HashMap<FragmentId, Vec<(FragmentId, DispatcherType)>>,

pub related_jobs: HashMap<ObjectId, streaming_job::Model>,
}

async fn resolve_no_shuffle_query<C>(
Expand Down Expand Up @@ -298,7 +302,7 @@ impl CatalogController {
.await?;

let actor_and_dispatchers: Vec<(_, _)> = Actor::find()
.filter(actor::Column::FragmentId.is_in(all_fragment_ids))
.filter(actor::Column::FragmentId.is_in(all_fragment_ids.clone()))
.find_with_related(ActorDispatcher)
.all(txn)
.await?;
Expand All @@ -312,17 +316,31 @@ impl CatalogController {
actor_dispatchers.insert(actor_id, dispatchers);
}

let fragments = fragments
let fragments: HashMap<FragmentId, _> = fragments
.into_iter()
.map(|fragment| (fragment.fragment_id, fragment))
.collect();

let related_job_ids: HashSet<_> =
fragments.values().map(|fragment| fragment.job_id).collect();

let related_jobs = StreamingJob::find()
.filter(streaming_job::Column::JobId.is_in(related_job_ids))
.all(txn)
.await?;

let related_jobs = related_jobs
.into_iter()
.map(|job| (job.job_id, job))
.collect();

Ok(RescheduleWorkingSet {
fragments,
actors,
actor_dispatchers,
fragment_downstreams,
fragment_upstreams,
related_jobs,
})
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, VirtualNode};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_meta_model_v2::{actor, fragment, ObjectId, StreamingParallelism};
use risingwave_pb::common::{ActorInfo, Buffer, PbActorLocation, WorkerNode, WorkerType};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
Expand Down Expand Up @@ -566,6 +565,7 @@ impl ScaleController {
mut actor_dispatchers,
fragment_downstreams: _,
fragment_upstreams: _,
related_jobs,
} = mgr
.catalog_controller
.resolve_working_set_for_reschedule_fragments(fragment_ids)
Expand Down Expand Up @@ -661,7 +661,7 @@ impl ScaleController {
dispatcher,
upstream_actor_id,
vnode_bitmap,
mview_definition: "".to_string(),
mview_definition: "todo".to_string(),
expr_context: expr_contexts
.get(&actor_id)
.cloned()
Expand All @@ -674,8 +674,12 @@ impl ScaleController {

fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));

// todo
fragment_state.insert(fragment_id, table_fragments::State::Created);
let related_job = related_jobs.get(&job_id).expect("");

fragment_state.insert(
fragment_id,
table_fragments::PbState::from(related_job.job_status),
);
}
}
};
Expand Down Expand Up @@ -2096,6 +2100,7 @@ impl ScaleController {
actor_dispatchers: _,
fragment_downstreams,
fragment_upstreams: _,
related_jobs,
} = mgr
.catalog_controller
.resolve_working_set_for_reschedule_tables(table_ids)
Expand Down

0 comments on commit e3b1770

Please sign in to comment.