Skip to content

Commit

Permalink
fix(frontend): add stream_job_status to TableCatalog (#16398)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Apr 19, 2024
1 parent accec49 commit be57d0a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 6 deletions.
24 changes: 24 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use parse_display::Display;
pub use physical_table::*;
use risingwave_pb::catalog::{
CreateType as PbCreateType, HandleConflictBehavior as PbHandleConflictBehavior,
StreamJobStatus as PbStreamJobStatus,
};
use risingwave_pb::plan_common::ColumnDescVersion;
pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, Schema};
Expand Down Expand Up @@ -478,6 +479,29 @@ impl ConflictBehavior {
}
}

#[derive(Clone, Copy, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub enum StreamJobStatus {
#[default]
Creating,
Created,
}

impl StreamJobStatus {
pub fn from_proto(stream_job_status: PbStreamJobStatus) -> Self {
match stream_job_status {
PbStreamJobStatus::Creating => StreamJobStatus::Creating,
PbStreamJobStatus::Created | PbStreamJobStatus::Unspecified => StreamJobStatus::Created,
}
}

pub fn to_proto(self) -> PbStreamJobStatus {
match self {
StreamJobStatus::Creating => PbStreamJobStatus::Creating,
StreamJobStatus::Created => PbStreamJobStatus::Created,
}
}
}

#[derive(Clone, Copy, Debug, Display, Hash, PartialOrd, PartialEq, Eq, Ord)]
pub enum CreateType {
Foreground,
Expand Down
16 changes: 13 additions & 3 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::collections::{HashMap, HashSet};
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, TableDesc, TableId, TableVersionId,
ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc,
TableId, TableVersionId,
};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
Expand Down Expand Up @@ -154,6 +155,10 @@ pub struct TableCatalog {
/// Indicate whether to create table in background or foreground.
pub create_type: CreateType,

/// Indicate the stream job status, whether it is created or creating.
/// If it is creating, we should hide it.
pub stream_job_status: StreamJobStatus,

/// description of table, set by `comment on`.
pub description: Option<String>,

Expand Down Expand Up @@ -413,7 +418,7 @@ impl TableCatalog {
initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
cleaned_by_watermark: self.cleaned_by_watermark,
stream_job_status: PbStreamJobStatus::Creating.into(),
stream_job_status: self.stream_job_status.to_proto().into(),
create_type: self.create_type.to_proto().into(),
description: self.description.clone(),
incoming_sinks: self.incoming_sinks.clone(),
Expand Down Expand Up @@ -481,6 +486,9 @@ impl From<PbTable> for TableCatalog {
let id = tb.id;
let tb_conflict_behavior = tb.handle_pk_conflict_behavior();
let table_type = tb.get_table_type().unwrap();
let stream_job_status = tb
.get_stream_job_status()
.unwrap_or(PbStreamJobStatus::Created);
let create_type = tb.get_create_type().unwrap_or(PbCreateType::Foreground);
let associated_source_id = tb.optional_associated_source_id.map(|id| match id {
OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
Expand Down Expand Up @@ -543,6 +551,7 @@ impl From<PbTable> for TableCatalog {
initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from),
cleaned_by_watermark: tb.cleaned_by_watermark,
create_type: CreateType::from_proto(create_type),
stream_job_status: StreamJobStatus::from_proto(stream_job_status),
description: tb.description,
incoming_sinks: tb.incoming_sinks.clone(),
created_at_cluster_version: tb.created_at_cluster_version.clone(),
Expand Down Expand Up @@ -639,7 +648,7 @@ mod tests {
cardinality: None,
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Creating.into(),
stream_job_status: PbStreamJobStatus::Created.into(),
create_type: PbCreateType::Foreground.into(),
description: Some("description".to_string()),
incoming_sinks: vec![],
Expand Down Expand Up @@ -700,6 +709,7 @@ mod tests {
created_at_epoch: None,
initialized_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: StreamJobStatus::Created,
create_type: CreateType::Foreground,
description: Some("description".to_string()),
incoming_sinks: vec![],
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, TableId, OBJECT_ID_PLACEHOLDER,
ColumnCatalog, ConflictBehavior, CreateType, StreamJobStatus, TableId, OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
Expand Down Expand Up @@ -262,6 +262,7 @@ impl StreamMaterialize {
initialized_at_epoch: None,
cleaned_by_watermark: false,
create_type: CreateType::Foreground, // Will be updated in the handler itself.
stream_job_status: StreamJobStatus::Creating,
description: None,
incoming_sinks: vec![],
initialized_at_cluster_version: None,
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
use pretty_xmlish::{Pretty, Str, StrAssocArr, XmlNode};
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, Field, FieldDisplay, Schema,
OBJECT_ID_PLACEHOLDER,
StreamJobStatus, OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::constants::log_store::v2::{
KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
Expand Down Expand Up @@ -176,6 +176,7 @@ impl TableCatalogBuilder {
// NOTE(kwannoel): This may not match the create type of the materialized table.
// It should be ignored for internal tables.
create_type: CreateType::Foreground,
stream_job_status: StreamJobStatus::Creating,
description: None,
incoming_sinks: vec![],
initialized_at_cluster_version: None,
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,8 @@ pub(crate) mod tests {
WorkerNodeManager, WorkerNodeSelector,
};
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, DEFAULT_SUPER_USER_ID,
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus,
DEFAULT_SUPER_USER_ID,
};
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -590,6 +591,7 @@ pub(crate) mod tests {
cleaned_by_watermark: false,
created_at_epoch: None,
initialized_at_epoch: None,
stream_job_status: StreamJobStatus::Creating,
create_type: CreateType::Foreground,
description: None,
incoming_sinks: vec![],
Expand Down

0 comments on commit be57d0a

Please sign in to comment.