Skip to content

Commit

Permalink
feat(meta): add create_type to Table (#12529)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Sep 26, 2023
1 parent 5eeef12 commit b7c8c9d
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 54 deletions.
10 changes: 10 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ enum StreamJobStatus {
CREATED = 2;
}

// How the stream job was created will determine
// whether they are persisted.
enum CreateType {
CREATE_TYPE_UNSPECIFIED = 0;
BACKGROUND = 1;
FOREGROUND = 2;
}

message StreamSourceInfo {
// deprecated
plan_common.RowFormatType row_format = 1;
Expand Down Expand Up @@ -262,6 +270,8 @@ message Table {
// Used to filter created / creating tables in meta.
StreamJobStatus stream_job_status = 31;

CreateType create_type = 32;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
7 changes: 0 additions & 7 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,9 @@ message DropSinkResponse {
uint64 version = 2;
}

enum StreamJobExecutionMode {
STREAM_JOB_EXECUTION_MODE_UNSPECIFIED = 0;
BACKGROUND = 1;
FOREGROUND = 2;
}

message CreateMaterializedViewRequest {
catalog.Table materialized_view = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
StreamJobExecutionMode stream_job_execution_mode = 3;
}

message CreateMaterializedViewResponse {
Expand Down
14 changes: 5 additions & 9 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::{create_connection_request, StreamJobExecutionMode};
use risingwave_pb::ddl_service::create_connection_request;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_rpc_client::MetaClient;
use tokio::sync::watch::Receiver;
Expand Down Expand Up @@ -70,7 +70,6 @@ pub trait CatalogWriter: Send + Sync {
&self,
table: PbTable,
graph: StreamFragmentGraph,
stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<()>;

async fn create_table(
Expand Down Expand Up @@ -191,16 +190,13 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
table: PbTable,
graph: StreamFragmentGraph,
stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<()> {
let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
let (_, version) = self
.meta_client
.create_materialized_view(table, graph, stream_job_execution_mode)
.create_materialized_view(table, graph)
.await?;
if matches!(
stream_job_execution_mode,
StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified
) {
if matches!(create_type, PbCreateType::Foreground) {
self.wait_version(version).await?
}
Ok(())
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion};
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::DefaultColumnDesc;

Expand Down Expand Up @@ -402,6 +402,7 @@ impl TableCatalog {
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
cleaned_by_watermark: self.cleaned_by_watermark,
stream_job_status: PbStreamJobStatus::Creating.into(),
create_type: PbCreateType::Foreground.into(),
}
}

Expand Down Expand Up @@ -607,6 +608,7 @@ mod tests {
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Creating.into(),
create_type: PbCreateType::Foreground.into(),
}
.into();

Expand Down
14 changes: 7 additions & 7 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::catalog::PbTable;
use risingwave_pb::ddl_service::StreamJobExecutionMode;
use risingwave_pb::catalog::{CreateType, PbTable};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::user::grant_privilege::Action;
use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
Expand Down Expand Up @@ -164,7 +163,7 @@ pub async fn handle_create_mv(
Ok(_) => {}
};

let (table, graph) = {
let (mut table, graph) = {
let context = OptimizerContext::from_handler_args(handler_args);

let has_order_by = !query.order_by.is_empty();
Expand Down Expand Up @@ -202,16 +201,17 @@ It only indicates the physical clustering of the data, which may improve the per
));

let run_in_background = session.config().get_background_ddl();
let stream_job_execution_mode = if run_in_background {
StreamJobExecutionMode::Background
let create_type = if run_in_background {
CreateType::Background
} else {
StreamJobExecutionMode::Foreground
CreateType::Foreground
};
table.create_type = create_type.into();

let session = session.clone();
let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_materialized_view(table, graph, stream_job_execution_mode)
.create_materialized_view(table, graph)
.await?;

Ok(PgResponse::empty_result(
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table,
};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress, StreamJobExecutionMode};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress};
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta,
Expand Down Expand Up @@ -235,7 +235,6 @@ impl CatalogWriter for MockCatalogWriter {
&self,
mut table: PbTable,
_graph: StreamFragmentGraph,
_stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<()> {
table.id = self.gen_id();
self.catalog.write().create_table(&table);
Expand All @@ -261,8 +260,7 @@ impl CatalogWriter for MockCatalogWriter {
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
}
self.create_materialized_view(table, graph, StreamJobExecutionMode::Foreground)
.await?;
self.create_materialized_view(table, graph).await?;
Ok(())
}

Expand Down
26 changes: 9 additions & 17 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider;
use risingwave_pb::catalog::{
connection, Connection, Database, Function, Schema, Source, Table, View,
connection, Connection, CreateType, Database, Function, Schema, Source, Table, View,
};
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::{DdlProgress, StreamJobExecutionMode};
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto;
use tokio::sync::Semaphore;
use tracing::log::warn;
Expand Down Expand Up @@ -93,11 +93,7 @@ pub enum DdlCommand {
DropFunction(FunctionId),
CreateView(View),
DropView(ViewId, DropMode),
CreateStreamingJob(
StreamingJob,
StreamFragmentGraphProto,
StreamJobExecutionMode,
),
CreateStreamingJob(StreamingJob, StreamFragmentGraphProto, CreateType),
DropStreamingJob(StreamingJobId, DropMode),
ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping),
AlterRelationName(Relation, String),
Expand Down Expand Up @@ -240,12 +236,8 @@ impl DdlController {
DdlCommand::DropView(view_id, drop_mode) => {
ctrl.drop_view(view_id, drop_mode).await
}
DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
stream_job_execution_mode,
) => {
ctrl.create_streaming_job(stream_job, fragment_graph, stream_job_execution_mode)
DdlCommand::CreateStreamingJob(stream_job, fragment_graph, create_type) => {
ctrl.create_streaming_job(stream_job, fragment_graph, create_type)
.await
}
DdlCommand::DropStreamingJob(job_id, drop_mode) => {
Expand Down Expand Up @@ -414,7 +406,7 @@ impl DdlController {
&self,
mut stream_job: StreamingJob,
fragment_graph: StreamFragmentGraphProto,
stream_job_execution_mode: StreamJobExecutionMode,
create_type: CreateType,
) -> MetaResult<NotificationVersion> {
let _permit = self
.creating_streaming_job_permits
Expand Down Expand Up @@ -462,12 +454,12 @@ impl DdlController {
}
};

match stream_job_execution_mode {
StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified => {
match create_type {
CreateType::Foreground | CreateType::Unspecified => {
self.create_streaming_job_inner(stream_job, table_fragments, ctx, internal_tables)
.await
}
StreamJobExecutionMode::Background => {
CreateType::Background => {
let ctrl = self.clone();
let definition = stream_job.definition();
let fut = async move {
Expand Down
11 changes: 6 additions & 5 deletions src/meta/src/rpc/service/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_pb::catalog::connection::private_link_service::{
use risingwave_pb::catalog::connection::PbPrivateLinkService;
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{connection, Connection, PbSource, PbTable};
use risingwave_pb::catalog::{connection, Connection, CreateType, PbSource, PbTable};
use risingwave_pb::ddl_service::ddl_service_server::DdlService;
use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
use risingwave_pb::ddl_service::*;
Expand Down Expand Up @@ -232,7 +232,7 @@ impl DdlService for DdlServiceImpl {
.run_command(DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
StreamJobExecutionMode::Foreground,
CreateType::Foreground,
))
.await?;

Expand Down Expand Up @@ -276,6 +276,7 @@ impl DdlService for DdlServiceImpl {

let req = request.into_inner();
let mview = req.get_materialized_view()?.clone();
let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground);
let fragment_graph = req.get_fragment_graph()?.clone();

let mut stream_job = StreamingJob::MaterializedView(mview);
Expand All @@ -287,7 +288,7 @@ impl DdlService for DdlServiceImpl {
.run_command(DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
req.stream_job_execution_mode(),
create_type,
))
.await?;

Expand Down Expand Up @@ -342,7 +343,7 @@ impl DdlService for DdlServiceImpl {
.run_command(DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
StreamJobExecutionMode::Foreground,
CreateType::Foreground,
))
.await?;

Expand Down Expand Up @@ -438,7 +439,7 @@ impl DdlService for DdlServiceImpl {
.run_command(DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
StreamJobExecutionMode::Foreground,
CreateType::Foreground,
))
.await?;

Expand Down
2 changes: 0 additions & 2 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,10 @@ impl MetaClient {
&self,
table: PbTable,
graph: StreamFragmentGraph,
stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<(TableId, CatalogVersion)> {
let request = CreateMaterializedViewRequest {
materialized_view: Some(table),
fragment_graph: Some(graph),
stream_job_execution_mode: stream_job_execution_mode as i32,
};
let resp = self.inner.create_materialized_view(request).await?;
// TODO: handle error in `resp.status` here
Expand Down
3 changes: 2 additions & 1 deletion src/storage/src/filter_key_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ mod tests {
use risingwave_common::util::sort_util::OrderType;
use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
use risingwave_pb::plan_common::PbColumnCatalog;

Expand Down Expand Up @@ -550,6 +550,7 @@ mod tests {
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Created.into(),
create_type: PbCreateType::Foreground.into(),
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/tests/compaction_test/src/delete_range_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_meta::hummock::test_utils::setup_compute_env_with_config;
use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo};
use risingwave_pb::meta::SystemParams;
use risingwave_rpc_client::HummockMetaClient;
Expand Down Expand Up @@ -152,6 +152,7 @@ async fn compaction_test(
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Created.into(),
create_type: PbCreateType::Foreground.into(),
};
let mut delete_range_table = delete_key_table.clone();
delete_range_table.id = 2;
Expand Down

0 comments on commit b7c8c9d

Please sign in to comment.