Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add mview_definition in scaled actors (#19784) #19851

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions src/meta/src/controller/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub struct RescheduleWorkingSet {
pub fragment_downstreams: HashMap<FragmentId, Vec<(FragmentId, DispatcherType)>>,
pub fragment_upstreams: HashMap<FragmentId, Vec<(FragmentId, DispatcherType)>>,

pub related_jobs: HashMap<ObjectId, streaming_job::Model>,
pub related_jobs: HashMap<ObjectId, (streaming_job::Model, String)>,
}

async fn resolve_no_shuffle_query<C>(
Expand All @@ -193,6 +193,67 @@ where
Ok(result)
}

async fn resolve_streaming_job_definition<C>(
txn: &C,
job_ids: &HashSet<ObjectId>,
) -> MetaResult<HashMap<ObjectId, String>>
where
C: ConnectionTrait,
{
let job_ids = job_ids.iter().cloned().collect_vec();

// including table, materialized view, index
let common_job_definitions: Vec<(ObjectId, String)> = Table::find()
.select_only()
.columns([
table::Column::TableId,
#[cfg(not(debug_assertions))]
table::Column::Name,
#[cfg(debug_assertions)]
table::Column::Definition,
])
.filter(table::Column::TableId.is_in(job_ids.clone()))
.into_tuple()
.all(txn)
.await?;

let sink_definitions: Vec<(ObjectId, String)> = Sink::find()
.select_only()
.columns([
sink::Column::SinkId,
#[cfg(not(debug_assertions))]
sink::Column::Name,
#[cfg(debug_assertions)]
sink::Column::Definition,
])
.filter(sink::Column::SinkId.is_in(job_ids.clone()))
.into_tuple()
.all(txn)
.await?;

let source_definitions: Vec<(ObjectId, String)> = Source::find()
.select_only()
.columns([
source::Column::SourceId,
#[cfg(not(debug_assertions))]
source::Column::Name,
#[cfg(debug_assertions)]
source::Column::Definition,
])
.filter(source::Column::SourceId.is_in(job_ids.clone()))
.into_tuple()
.all(txn)
.await?;

let definitions: HashMap<ObjectId, String> = common_job_definitions
.into_iter()
.chain(sink_definitions.into_iter())
.chain(source_definitions.into_iter())
.collect();

Ok(definitions)
}

impl CatalogController {
pub async fn resolve_working_set_for_reschedule_fragments(
&self,
Expand Down Expand Up @@ -340,14 +401,29 @@ impl CatalogController {
let related_job_ids: HashSet<_> =
fragments.values().map(|fragment| fragment.job_id).collect();

let related_job_definitions =
resolve_streaming_job_definition(txn, &related_job_ids).await?;

let related_jobs = StreamingJob::find()
.filter(streaming_job::Column::JobId.is_in(related_job_ids))
.all(txn)
.await?;

let related_jobs = related_jobs
.into_iter()
.map(|job| (job.job_id, job))
.map(|job| {
let job_id = job.job_id;
(
job_id,
(
job,
related_job_definitions
.get(&job_id)
.cloned()
.unwrap_or("".to_owned()),
),
)
})
.collect();

Ok(RescheduleWorkingSet {
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,9 @@ impl ScaleController {
vnode_bitmap,
} = actors.first().unwrap().clone();

let (related_job, job_definition) =
related_jobs.get(&job_id).expect("job not found");

let fragment = CustomFragmentInfo {
fragment_id: fragment_id as _,
fragment_type_mask: fragment_type_mask as _,
Expand All @@ -662,8 +665,7 @@ impl ScaleController {
dispatcher,
upstream_actor_id,
vnode_bitmap,
// todo, we need to fill this part
mview_definition: "".to_string(),
mview_definition: job_definition.to_owned(),
expr_context: expr_contexts
.get(&actor_id)
.cloned()
Expand All @@ -676,8 +678,6 @@ impl ScaleController {

fragment_to_table.insert(fragment_id as _, TableId::from(job_id as u32));

let related_job = related_jobs.get(&job_id).expect("job not found");

fragment_state.insert(
fragment_id,
table_fragments::PbState::from(related_job.job_status),
Expand Down
Loading