diff --git a/proto/common.proto b/proto/common.proto index b34fcce42b47e..d6c596ec4c497 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -82,6 +82,10 @@ message WorkerNode { optional uint64 started_at = 9; uint32 parallelism = 10; + + // Meta may assign labels to worker nodes to partition workload by label. + // This is used for serverless backfilling of materialized views. + string node_label = 11; } message Buffer { diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 7c6a078ce9ce4..1b4f4e423949e 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -120,6 +120,16 @@ message DropSubscriptionResponse { message CreateMaterializedViewRequest { catalog.Table materialized_view = 1; stream_plan.StreamFragmentGraph fragment_graph = 2; + + // If SERVERLESS, the materialized view should be created using serverless backfill + // For that the controller will create a new compute node, which does backfilling and then is deleted. + // May alleviate pressure on the cluster during backfill process. + enum BackfillType { + UNSPECIFIED = 0; + REGULAR = 1; + SERVERLESS = 2; + } + BackfillType backfill = 3; } message CreateMaterializedViewResponse { diff --git a/proto/meta.proto b/proto/meta.proto index 0192b18ec690c..2023ad432a438 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -103,6 +103,13 @@ message TableFragments { stream_plan.StreamContext ctx = 6; TableParallelism parallelism = 7; + + // Actors of a materialize view, sink, or table can only be scheduled on nodes with matching node_label. + string node_label = 8; + + // If this is a materialized view: True if backfill is done, else false. + // If this is a regular table: Always true. + bool backfill_done = 9; } /// Worker slot mapping with fragment id, used for notification. @@ -466,7 +473,10 @@ message SubscribeResponse { } common.Status status = 1; Operation operation = 2; + + // Catalog version uint64 version = 3; + oneof info { catalog.Database database = 4; catalog.Schema schema = 5; @@ -548,9 +558,37 @@ message TableParallelism { } } +// Changes a streaming job in place by overwriting its node_label. +// This may cause the re-scheduling of the streaming job actors. +message UpdateStreamingJobNodeLabelsRequest { + // Id of the materialized view, table, or sink which we want to update + uint32 id = 1; + + // replace the node_label of the streaming job with a given id with below value + string node_label = 2; +} + +// We do not need to add an explicit status field here, we can just use the RPC status +message UpdateStreamingJobNodeLabelsResponse {} + +message GetServerlessStreamingJobsStatusRequest {} + +// Descriptions of MVs and sinks +message GetServerlessStreamingJobsStatusResponse { + message Status { + uint32 table_id = 1; + string node_label = 2; + bool backfill_done = 3; + } + + repeated Status streaming_job_statuses = 1; +} + service ScaleService { rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); rpc Reschedule(RescheduleRequest) returns (RescheduleResponse); + rpc UpdateStreamingJobNodeLabels(UpdateStreamingJobNodeLabelsRequest) returns (UpdateStreamingJobNodeLabelsResponse); + rpc GetServerlessStreamingJobsStatus(GetServerlessStreamingJobsStatusRequest) returns (GetServerlessStreamingJobsStatusResponse); } message MembersRequest {} diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 72e718f94ea00..56040bb5c973f 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -24,8 +24,9 @@ use risingwave_meta_model_v2::FragmentId; use risingwave_pb::common::WorkerType; use risingwave_pb::meta::scale_service_server::ScaleService; use risingwave_pb::meta::{ - GetClusterInfoRequest, GetClusterInfoResponse, PbWorkerReschedule, RescheduleRequest, - RescheduleResponse, + GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest, + GetServerlessStreamingJobsStatusResponse, PbWorkerReschedule, RescheduleRequest, + RescheduleResponse, UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse, }; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; use tonic::{Request, Response, Status}; @@ -219,4 +220,18 @@ impl ScaleService for ScaleServiceImpl { revision: next_revision.into(), })) } + + async fn update_streaming_job_node_labels( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } + + async fn get_serverless_streaming_jobs_status( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } } diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index b7a287e83df34..cd51e1c9f71ef 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -88,6 +88,7 @@ impl From for PbWorkerNode { transactional_id: info.0.transaction_id.map(|id| id as _), resource: info.2.resource, started_at: info.2.started_at, + node_label: "".to_string(), } } } @@ -465,6 +466,7 @@ fn meta_node_info(host: &str, started_at: Option) -> PbWorkerNode { total_cpu_cores: total_cpu_available() as _, }), started_at, + node_label: "".to_string(), } } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 78b3da81c1aee..4b2d0bd162dcf 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -340,6 +340,8 @@ impl CatalogController { } .into(), ), + node_label: "".to_string(), + backfill_done: true, }; Ok(table_fragments) diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 41e1ae02a21d0..df02b2bd3fb9c 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -215,6 +215,7 @@ impl ClusterManager { // resource doesn't need persist resource: None, started_at: None, + node_label: "".to_string(), }; let mut worker = Worker::from_protobuf(worker_node.clone()); @@ -771,6 +772,7 @@ fn meta_node_info(host: &str, started_at: Option) -> WorkerNode { }), started_at, parallelism: 0, + node_label: "".to_string(), } } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 3ef81dcda7a68..f9aae787d6e2e 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -164,6 +164,8 @@ impl MetadataModel for TableFragments { actor_splits: build_actor_connector_splits(&self.actor_splits), ctx: Some(self.ctx.to_protobuf()), parallelism: Some(self.assigned_parallelism.into()), + node_label: "".to_string(), + backfill_done: true, } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 3701e8482f650..87410d0b9bfbd 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -51,6 +51,7 @@ use risingwave_pb::cloud_service::*; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient; use risingwave_pb::ddl_service::alter_owner_request::Object; +use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType; use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient; use risingwave_pb::ddl_service::drop_table_request::SourceId; use risingwave_pb::ddl_service::*; @@ -353,6 +354,7 @@ impl MetaClient { let request = CreateMaterializedViewRequest { materialized_view: Some(table), fragment_graph: Some(graph), + backfill: PbBackfillType::Regular as _, }; let resp = self.inner.create_materialized_view(request).await?; // TODO: handle error in `resp.status` here