Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): add create_type to Table #12529

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ enum StreamJobStatus {
CREATED = 2;
}

// How the stream job was created will determine
// whether they are persisted.
enum CreateType {
CREATE_TYPE_UNSPECIFIED = 0;
BACKGROUND = 1;
FOREGROUND = 2;
}

message StreamSourceInfo {
// deprecated
plan_common.RowFormatType row_format = 1;
Expand Down Expand Up @@ -262,6 +270,8 @@ message Table {
// Used to filter created / creating tables in meta.
StreamJobStatus stream_job_status = 31;

CreateType create_type = 32;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
7 changes: 0 additions & 7 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,9 @@
uint64 version = 2;
}

enum StreamJobExecutionMode {
STREAM_JOB_EXECUTION_MODE_UNSPECIFIED = 0;
BACKGROUND = 1;
FOREGROUND = 2;
}

message CreateMaterializedViewRequest {

Check failure on line 100 in proto/ddl_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "3" with name "stream_job_execution_mode" on message "CreateMaterializedViewRequest" was deleted without reserving the number "3".
catalog.Table materialized_view = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;
StreamJobExecutionMode stream_job_execution_mode = 3;
}

message CreateMaterializedViewResponse {
Expand Down
14 changes: 5 additions & 9 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
PbCreateType, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView,
};
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::{create_connection_request, StreamJobExecutionMode};
use risingwave_pb::ddl_service::create_connection_request;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_rpc_client::MetaClient;
use tokio::sync::watch::Receiver;
Expand Down Expand Up @@ -70,7 +70,6 @@ pub trait CatalogWriter: Send + Sync {
&self,
table: PbTable,
graph: StreamFragmentGraph,
stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<()>;

async fn create_table(
Expand Down Expand Up @@ -191,16 +190,13 @@ impl CatalogWriter for CatalogWriterImpl {
&self,
table: PbTable,
graph: StreamFragmentGraph,
stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<()> {
let create_type = table.get_create_type().unwrap_or(PbCreateType::Foreground);
let (_, version) = self
.meta_client
.create_materialized_view(table, graph, stream_job_execution_mode)
.create_materialized_view(table, graph)
.await?;
if matches!(
stream_job_execution_mode,
StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified
) {
if matches!(create_type, PbCreateType::Foreground) {
self.wait_version(version).await?
}
Ok(())
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion};
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::DefaultColumnDesc;

Expand Down Expand Up @@ -402,6 +402,7 @@ impl TableCatalog {
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
cleaned_by_watermark: self.cleaned_by_watermark,
stream_job_status: PbStreamJobStatus::Creating.into(),
create_type: PbCreateType::Foreground.into(),
}
}

Expand Down Expand Up @@ -607,6 +608,7 @@ mod tests {
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Creating.into(),
create_type: PbCreateType::Foreground.into(),
}
.into();

Expand Down
14 changes: 7 additions & 7 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::catalog::PbTable;
use risingwave_pb::ddl_service::StreamJobExecutionMode;
use risingwave_pb::catalog::{CreateType, PbTable};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::user::grant_privilege::Action;
use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
Expand Down Expand Up @@ -164,7 +163,7 @@ pub async fn handle_create_mv(
Ok(_) => {}
};

let (table, graph) = {
let (mut table, graph) = {
let context = OptimizerContext::from_handler_args(handler_args);

let has_order_by = !query.order_by.is_empty();
Expand Down Expand Up @@ -202,16 +201,17 @@ It only indicates the physical clustering of the data, which may improve the per
));

let run_in_background = session.config().get_background_ddl();
let stream_job_execution_mode = if run_in_background {
StreamJobExecutionMode::Background
let create_type = if run_in_background {
CreateType::Background
} else {
StreamJobExecutionMode::Foreground
CreateType::Foreground
};
table.create_type = create_type.into();

let session = session.clone();
let catalog_writer = session.catalog_writer()?;
catalog_writer
.create_materialized_view(table, graph, stream_job_execution_mode)
.create_materialized_view(table, graph)
.await?;

Ok(PgResponse::empty_result(
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table,
};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress, StreamJobExecutionMode};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress};
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta,
Expand Down Expand Up @@ -235,7 +235,6 @@ impl CatalogWriter for MockCatalogWriter {
&self,
mut table: PbTable,
_graph: StreamFragmentGraph,
_stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<()> {
table.id = self.gen_id();
self.catalog.write().create_table(&table);
Expand All @@ -261,8 +260,7 @@ impl CatalogWriter for MockCatalogWriter {
table.optional_associated_source_id =
Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id));
}
self.create_materialized_view(table, graph, StreamJobExecutionMode::Foreground)
.await?;
self.create_materialized_view(table, graph).await?;
Ok(())
}

Expand Down
26 changes: 9 additions & 17 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider;
use risingwave_pb::catalog::{
connection, Connection, Database, Function, Schema, Source, Table, View,
connection, Connection, CreateType, Database, Function, Schema, Source, Table, View,
};
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::{DdlProgress, StreamJobExecutionMode};
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto;
use tokio::sync::Semaphore;
use tracing::log::warn;
Expand Down Expand Up @@ -93,11 +93,7 @@ pub enum DdlCommand {
DropFunction(FunctionId),
CreateView(View),
DropView(ViewId, DropMode),
CreateStreamingJob(
StreamingJob,
StreamFragmentGraphProto,
StreamJobExecutionMode,
),
CreateStreamingJob(StreamingJob, StreamFragmentGraphProto, CreateType),
DropStreamingJob(StreamingJobId, DropMode),
ReplaceTable(StreamingJob, StreamFragmentGraphProto, ColIndexMapping),
AlterRelationName(Relation, String),
Expand Down Expand Up @@ -240,12 +236,8 @@ impl DdlController {
DdlCommand::DropView(view_id, drop_mode) => {
ctrl.drop_view(view_id, drop_mode).await
}
DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
stream_job_execution_mode,
) => {
ctrl.create_streaming_job(stream_job, fragment_graph, stream_job_execution_mode)
DdlCommand::CreateStreamingJob(stream_job, fragment_graph, create_type) => {
ctrl.create_streaming_job(stream_job, fragment_graph, create_type)
.await
}
DdlCommand::DropStreamingJob(job_id, drop_mode) => {
Expand Down Expand Up @@ -414,7 +406,7 @@ impl DdlController {
&self,
mut stream_job: StreamingJob,
fragment_graph: StreamFragmentGraphProto,
stream_job_execution_mode: StreamJobExecutionMode,
create_type: CreateType,
) -> MetaResult<NotificationVersion> {
let _permit = self
.creating_streaming_job_permits
Expand Down Expand Up @@ -462,12 +454,12 @@ impl DdlController {
}
};

match stream_job_execution_mode {
StreamJobExecutionMode::Foreground | StreamJobExecutionMode::Unspecified => {
match create_type {
CreateType::Foreground | CreateType::Unspecified => {
self.create_streaming_job_inner(stream_job, table_fragments, ctx, internal_tables)
.await
}
StreamJobExecutionMode::Background => {
CreateType::Background => {
let ctrl = self.clone();
let definition = stream_job.definition();
let fut = async move {
Expand Down
11 changes: 6 additions & 5 deletions src/meta/src/rpc/service/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_pb::catalog::connection::private_link_service::{
use risingwave_pb::catalog::connection::PbPrivateLinkService;
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::{connection, Connection, PbSource, PbTable};
use risingwave_pb::catalog::{connection, Connection, CreateType, PbSource, PbTable};
use risingwave_pb::ddl_service::ddl_service_server::DdlService;
use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
use risingwave_pb::ddl_service::*;
Expand Down Expand Up @@ -232,7 +232,7 @@ impl DdlService for DdlServiceImpl {
.run_command(DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
StreamJobExecutionMode::Foreground,
CreateType::Foreground,
))
.await?;

Expand Down Expand Up @@ -276,6 +276,7 @@ impl DdlService for DdlServiceImpl {

let req = request.into_inner();
let mview = req.get_materialized_view()?.clone();
let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground);
let fragment_graph = req.get_fragment_graph()?.clone();

let mut stream_job = StreamingJob::MaterializedView(mview);
Expand All @@ -287,7 +288,7 @@ impl DdlService for DdlServiceImpl {
.run_command(DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
req.stream_job_execution_mode(),
create_type,
))
.await?;

Expand Down Expand Up @@ -342,7 +343,7 @@ impl DdlService for DdlServiceImpl {
.run_command(DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
StreamJobExecutionMode::Foreground,
CreateType::Foreground,
))
.await?;

Expand Down Expand Up @@ -438,7 +439,7 @@ impl DdlService for DdlServiceImpl {
.run_command(DdlCommand::CreateStreamingJob(
stream_job,
fragment_graph,
StreamJobExecutionMode::Foreground,
CreateType::Foreground,
))
.await?;

Expand Down
2 changes: 0 additions & 2 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,10 @@ impl MetaClient {
&self,
table: PbTable,
graph: StreamFragmentGraph,
stream_job_execution_mode: StreamJobExecutionMode,
) -> Result<(TableId, CatalogVersion)> {
let request = CreateMaterializedViewRequest {
materialized_view: Some(table),
fragment_graph: Some(graph),
stream_job_execution_mode: stream_job_execution_mode as i32,
};
let resp = self.inner.create_materialized_view(request).await?;
// TODO: handle error in `resp.status` here
Expand Down
3 changes: 2 additions & 1 deletion src/storage/src/filter_key_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ mod tests {
use risingwave_common::util::sort_util::OrderType;
use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
use risingwave_pb::plan_common::PbColumnCatalog;

Expand Down Expand Up @@ -550,6 +550,7 @@ mod tests {
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Created.into(),
create_type: PbCreateType::Foreground.into(),
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/tests/compaction_test/src/delete_range_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_meta::hummock::test_utils::setup_compute_env_with_config;
use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus, PbTable};
use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo};
use risingwave_pb::meta::SystemParams;
use risingwave_rpc_client::HummockMetaClient;
Expand Down Expand Up @@ -152,6 +152,7 @@ async fn compaction_test(
created_at_epoch: None,
cleaned_by_watermark: false,
stream_job_status: PbStreamJobStatus::Created.into(),
create_type: PbCreateType::Foreground.into(),
};
let mut delete_range_table = delete_key_table.clone();
delete_range_table.id = 2;
Expand Down
Loading