Skip to content

Commit

Permalink
add create_type
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Sep 26, 2023
1 parent 31aa925 commit b476cbc
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 3 deletions.
10 changes: 10 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ enum StreamJobStatus {
CREATED = 2;
}

// How the stream job was created will determine
// whether they are persisted.
enum CreateType {
CREATE_STATUS_UNSPECIFIED = 0;
BACKGROUND = 1;
FOREGROUND = 2;
}

message StreamSourceInfo {
// deprecated
plan_common.RowFormatType row_format = 1;
Expand Down Expand Up @@ -262,6 +270,8 @@ message Table {
// Used to filter created / creating tables in meta.
StreamJobStatus stream_job_status = 31;

CreateType create_type = 32;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion};
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::DefaultColumnDesc;

Expand Down Expand Up @@ -402,6 +402,7 @@ impl TableCatalog {
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
cleaned_by_watermark: self.cleaned_by_watermark,
stream_job_status: PbStreamJobStatus::Creating.into(),
create_type: PbCreateType::Foreground.into(),
}
}

Expand Down Expand Up @@ -607,6 +608,7 @@ mod tests {
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Creating.into(),
create_type: PbCreateType::Foreground.into(),
}
.into();

Expand Down
3 changes: 2 additions & 1 deletion src/storage/src/filter_key_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ mod tests {
use risingwave_common::util::sort_util::OrderType;
use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
use risingwave_pb::plan_common::PbColumnCatalog;

Expand Down Expand Up @@ -550,6 +550,7 @@ mod tests {
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Created.into(),
create_type: PbCreateType::Foreground.into(),
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/tests/compaction_test/src/delete_range_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_meta::hummock::test_utils::setup_compute_env_with_config;
use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo};
use risingwave_pb::meta::SystemParams;
use risingwave_rpc_client::HummockMetaClient;
Expand Down Expand Up @@ -152,6 +152,7 @@ async fn compaction_test(
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Created.into(),
create_type: PbCreateType::Foreground.into(),
};
let mut delete_range_table = delete_key_table.clone();
delete_range_table.id = 2;
Expand Down

0 comments on commit b476cbc

Please sign in to comment.