Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Sep 15, 2023
1 parent f0064fe commit c86d034
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 25 deletions.
18 changes: 10 additions & 8 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ enum SchemaRegistryNameStrategy {
TOPIC_RECORD_NAME_STRATEGY = 2;
}

enum StreamJobStatus {
// Prefixed by `STREAM_JOB_STATUS` due to protobuf namespacing rules.
STREAM_JOB_STATUS_UNSPECIFIED = 0;
CREATING = 1;
CREATED = 2;
}

message StreamSourceInfo {
// deprecated
plan_common.RowFormatType row_format = 1;
Expand Down Expand Up @@ -116,6 +123,7 @@ message Sink {
optional uint64 created_at_epoch = 16;
string db_name = 17;
string sink_from_name = 18;
StreamJobStatus stream_job_status = 19;
}

message Connection {
Expand Down Expand Up @@ -157,6 +165,7 @@ message Index {

optional uint64 initialized_at_epoch = 10;
optional uint64 created_at_epoch = 11;
StreamJobStatus stream_job_status = 12;
}

message Function {
Expand Down Expand Up @@ -191,13 +200,6 @@ message Table {
INTERNAL = 4;
}

enum TableStatus {
// Prefixed by `TABLE_STATUS` due to protobuf namespacing rules.
TABLE_STATUS_UNSPECIFIED = 0;
CREATING = 1;
CREATED = 2;
}

message TableVersion {
// The version number, which will be 0 by default and be increased by 1 for
// each schema change in the frontend.
Expand Down Expand Up @@ -258,7 +260,7 @@ message Table {
bool cleaned_by_watermark = 30;

// Used to filter created / creating tables in meta.
TableStatus create_status = 31;
StreamJobStatus stream_job_status = 31;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::catalog::{
};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{PbSink, PbSinkType};
use risingwave_pb::catalog::{PbSink, PbSinkType, PbStreamJobStatus};

#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
pub struct SinkId {
Expand Down Expand Up @@ -191,6 +191,7 @@ impl SinkCatalog {
created_at_epoch: self.created_at_epoch.map(|e| e.0),
db_name: self.db_name.clone(),
sink_from_name: self.sink_from_name.clone(),
stream_job_status: PbStreamJobStatus::Creating.into(),
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/catalog/index_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
use risingwave_common::catalog::IndexId;
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::PbIndex;
use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus};

use super::ColumnId;
use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, TableCatalog};
Expand Down Expand Up @@ -184,6 +184,7 @@ impl IndexCatalog {
original_columns: self.original_columns.iter().map(Into::into).collect_vec(),
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
created_at_epoch: self.created_at_epoch.map(|e| e.0),
stream_job_status: PbStreamJobStatus::Creating.into(),
}
}

Expand Down
13 changes: 5 additions & 8 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ use risingwave_common::constants::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND;
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::table::{
OptionalAssociatedSourceId, PbTableStatus, PbTableType, PbTableVersion,
};
use risingwave_pb::catalog::PbTable;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion};
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::DefaultColumnDesc;

Expand Down Expand Up @@ -403,7 +401,7 @@ impl TableCatalog {
initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
cleaned_by_watermark: self.cleaned_by_watermark,
create_status: PbTableStatus::Creating.into(),
stream_job_status: PbStreamJobStatus::Creating.into(),
}
}

Expand Down Expand Up @@ -545,8 +543,7 @@ mod tests {
use risingwave_common::test_prelude::*;
use risingwave_common::types::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::catalog::table::PbTableStatus;
use risingwave_pb::catalog::PbTable;
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};

use super::*;
Expand Down Expand Up @@ -609,7 +606,7 @@ mod tests {
cardinality: None,
created_at_epoch: None,
cleaned_by_watermark: false,
create_status: PbTableStatus::Creating.into(),
stream_job_status: PbStreamJobStatus::Creating.into(),
}
.into();

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{IndexId, TableDesc, TableId};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::catalog::{PbIndex, PbTable};
use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus, PbTable};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::user::grant_privilege::{Action, Object};
use risingwave_sqlparser::ast;
Expand Down Expand Up @@ -242,6 +242,7 @@ pub(crate) fn gen_create_index_plan(
original_columns,
initialized_at_epoch: None,
created_at_epoch: None,
stream_job_status: PbStreamJobStatus::Creating.into(),
};

let plan: PlanRef = materialize.into();
Expand Down
6 changes: 3 additions & 3 deletions src/storage/src/filter_key_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ mod tests {
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::OrderType;
use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
use risingwave_pb::catalog::table::{PbTableStatus, TableType};
use risingwave_pb::catalog::PbTable;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
use risingwave_pb::plan_common::PbColumnCatalog;

Expand Down Expand Up @@ -549,7 +549,7 @@ mod tests {
cardinality: None,
created_at_epoch: None,
cleaned_by_watermark: false,
create_status: PbTableStatus::Creating.into(),
stream_job_status: PbStreamJobStatus::Created.into(),
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/tests/compaction_test/src/delete_range_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use risingwave_meta::hummock::test_utils::setup_compute_env_with_config;
use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::catalog::table::PbTableStatus;
use risingwave_pb::catalog::PbTable;
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo};
use risingwave_pb::meta::SystemParams;
use risingwave_rpc_client::HummockMetaClient;
Expand Down Expand Up @@ -151,7 +150,7 @@ async fn compaction_test(
cardinality: None,
created_at_epoch: None,
cleaned_by_watermark: false,
create_status: PbTableStatus::Creating.into(),
stream_job_status: PbStreamJobStatus::Created.into(),
};
let mut delete_range_table = delete_key_table.clone();
delete_range_table.id = 2;
Expand Down

0 comments on commit c86d034

Please sign in to comment.