Skip to content

Commit

Permalink
fix: add mview_definition in scaled actors (#19784)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Dec 27, 2024
1 parent 40e5401 commit fa02a55
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 6 deletions.
80 changes: 78 additions & 2 deletions src/meta/src/controller/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub struct RescheduleWorkingSet {
pub fragment_downstreams: HashMap<FragmentId, Vec<(FragmentId, DispatcherType)>>,
pub fragment_upstreams: HashMap<FragmentId, Vec<(FragmentId, DispatcherType)>>,

pub related_jobs: HashMap<ObjectId, streaming_job::Model>,
pub related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
}

async fn resolve_no_shuffle_query<C>(
Expand All @@ -193,6 +193,67 @@ where
Ok(result)
}

async fn resolve_streaming_job_definition<C>(
txn: &C,
job_ids: &HashSet<ObjectId>,
) -> MetaResult<HashMap<ObjectId, String>>
where
C: ConnectionTrait,
{
let job_ids = job_ids.iter().cloned().collect_vec();

// including table, materialized view, index
let common_job_definitions: Vec<(ObjectId, String)> = Table::find()
.select_only()
.columns([
table::Column::TableId,
#[cfg(not(debug_assertions))]
table::Column::Name,
#[cfg(debug_assertions)]
table::Column::Definition,
])
.filter(table::Column::TableId.is_in(job_ids.clone()))
.into_tuple()
.all(txn)
.await?;

let sink_definitions: Vec<(ObjectId, String)> = Sink::find()
.select_only()
.columns([
sink::Column::SinkId,
#[cfg(not(debug_assertions))]
sink::Column::Name,
#[cfg(debug_assertions)]
sink::Column::Definition,
])
.filter(sink::Column::SinkId.is_in(job_ids.clone()))
.into_tuple()
.all(txn)
.await?;

let source_definitions: Vec<(ObjectId, String)> = Source::find()
.select_only()
.columns([
source::Column::SourceId,
#[cfg(not(debug_assertions))]
source::Column::Name,
#[cfg(debug_assertions)]
source::Column::Definition,
])
.filter(source::Column::SourceId.is_in(job_ids.clone()))
.into_tuple()
.all(txn)
.await?;

let definitions: HashMap<ObjectId, String> = common_job_definitions
.into_iter()
.chain(sink_definitions.into_iter())
.chain(source_definitions.into_iter())
.collect();

Ok(definitions)
}

impl CatalogController {
pub async fn resolve_working_set_for_reschedule_fragments(
&self,
Expand Down Expand Up @@ -340,14 +401,29 @@ impl CatalogController {
let related_job_ids: HashSet<_> =
fragments.values().map(|fragment| fragment.job_id).collect();

let related_job_definitions =
resolve_streaming_job_definition(txn, &related_job_ids).await?;

let related_jobs = StreamingJob::find()
.filter(streaming_job::Column::JobId.is_in(related_job_ids))
.all(txn)
.await?;

let related_jobs = related_jobs
.into_iter()
.map(|job| (job.job_id, job))
.map(|job| {
let job_id = job.job_id;
(
job_id,
(
job,
related_job_definitions
.get(&job_id)
.cloned()
.unwrap_or("".to_owned()),
),
)
})
.collect();

Ok(RescheduleWorkingSet {
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,9 @@ impl ScaleController {
vnode_bitmap,
} = actors.first().unwrap().clone();

let (related_job, job_definition) =
related_jobs.get(&job_id).expect("job not found");

let fragment = CustomFragmentInfo {
fragment_id: fragment_id as _,
fragment_type_mask: fragment_type_mask as _,
Expand All @@ -662,8 +665,7 @@ impl ScaleController {
dispatcher,
upstream_actor_id,
vnode_bitmap,
// todo, we need to fill this part
mview_definition: "".to_string(),
mview_definition: job_definition.to_owned(),
expr_context: expr_contexts
.get(&actor_id)
.cloned()
Expand All @@ -676,8 +678,6 @@ impl ScaleController {

fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));

let related_job = related_jobs.get(&job_id).expect("job not found");

fragment_state.insert(
fragment_id,
table_fragments::PbState::from(related_job.job_status),
Expand Down

0 comments on commit fa02a55

Please sign in to comment.