Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(serverless backfill): proto changes #17010

Merged
merged 44 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d34c24b
feat(serverless backfill): proto changes
CAJan93 May 29, 2024
d54a613
worker label
CAJan93 May 29, 2024
443126d
change SubscribeResponse
CAJan93 Jun 6, 2024
497339a
minor change
CAJan93 Jun 6, 2024
45c7fc8
change node label RPC
CAJan93 Jun 6, 2024
46702d9
typo
CAJan93 Jun 6, 2024
ea1e361
backfill flag
CAJan93 Jun 6, 2024
4c6a1af
add BackfillService rpc
CAJan93 Jun 25, 2024
7572600
typo
CAJan93 Jun 25, 2024
2a5de5a
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Jul 3, 2024
b8ab435
change interface
CAJan93 Jul 4, 2024
f0c676e
change comments
CAJan93 Jul 4, 2024
730a5d7
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Jul 10, 2024
ca018b3
typo
CAJan93 Jul 10, 2024
e30604b
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Jul 11, 2024
331e90f
delete unrelated
CAJan93 Jul 11, 2024
c00bd44
rm todo
CAJan93 Jul 11, 2024
41e635c
minor
CAJan93 Jul 11, 2024
9c7d8d6
rm label
CAJan93 Jul 11, 2024
5075a07
change number
CAJan93 Jul 15, 2024
39e75f0
rename var
CAJan93 Jul 15, 2024
003ac47
change comments
CAJan93 Jul 15, 2024
6a8bbc1
unspecified
CAJan93 Jul 15, 2024
31a2be5
minor change
CAJan93 Jul 15, 2024
90bf313
update only node_labels
CAJan93 Jul 15, 2024
c7ff25f
change scheduling
CAJan93 Jul 15, 2024
f0bddd6
GetStreamingJob
CAJan93 Jul 15, 2024
7b75d88
map
CAJan93 Jul 15, 2024
8cd9efb
nit
CAJan93 Jul 17, 2024
981e3a0
nit
CAJan93 Jul 17, 2024
f49e953
nit
CAJan93 Jul 17, 2024
6befbcc
nit
CAJan93 Jul 17, 2024
6c61679
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Jul 22, 2024
342ab25
move node_labels and flag
CAJan93 Jul 22, 2024
68c9a83
GetStreamingJobsStatus
CAJan93 Jul 22, 2024
bb0c0cd
repeat
CAJan93 Jul 22, 2024
57ff4ff
merge main + use string node_label
CAJan93 Jul 23, 2024
da66c46
rename
CAJan93 Jul 23, 2024
273b201
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Jul 24, 2024
759a2f7
GetServerlessStreamingJobsStatus
CAJan93 Jul 24, 2024
a79cd89
Merge branch 'main' into cajan93/proto-updates
shanicky Jul 24, 2024
0b694c2
adapt proto changes
yezizp2012 Jul 24, 2024
352bac7
remove trailing white spaces
CAJan93 Jul 24, 2024
e0aff77
Merge branch 'cajan93/proto-updates' of ssh://github.com/risingwavela…
CAJan93 Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ message WorkerNode {
optional uint64 started_at = 9;

uint32 parallelism = 10;

// Meta may assign labels to worker nodes to partition workload by label.
// This is used for serverless backfilling of materialized views.
string node_label = 11;
}

message Buffer {
Expand Down
10 changes: 10 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ message DropSubscriptionResponse {
message CreateMaterializedViewRequest {
catalog.Table materialized_view = 1;
stream_plan.StreamFragmentGraph fragment_graph = 2;

// If SERVERLESS, the materialized view should be created using serverless backfill
// For that the controller will create a new compute node, which does backfilling and then is deleted.
// May alleviate pressure on the cluster during backfill process.
enum BackfillType {
CAJan93 marked this conversation as resolved.
Show resolved Hide resolved
UNSPECIFIED = 0;
REGULAR = 1;
SERVERLESS = 2;
}
BackfillType backfill = 3;
}

message CreateMaterializedViewResponse {
Expand Down
38 changes: 38 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ message TableFragments {

stream_plan.StreamContext ctx = 6;
TableParallelism parallelism = 7;

// Actors of a materialize view, sink, or table can only be scheduled on nodes with matching node_label.
string node_label = 8;

// If this is a materialized view: True if backfill is done, else false.
// If this is a regular table: Always true.
bool backfill_done = 9;
}

/// Worker slot mapping with fragment id, used for notification.
Expand Down Expand Up @@ -466,7 +473,10 @@ message SubscribeResponse {
}
common.Status status = 1;
Operation operation = 2;

// Catalog version
uint64 version = 3;

oneof info {
catalog.Database database = 4;
catalog.Schema schema = 5;
Expand Down Expand Up @@ -548,9 +558,37 @@ message TableParallelism {
}
}

// Changes a streaming job in place by overwriting its node_label.
// This may cause the re-scheduling of the streaming job actors.
message UpdateStreamingJobNodeLabelsRequest {
// Id of the materialized view, table, or sink which we want to update
uint32 id = 1;

// replace the node_label of the streaming job with a given id with below value
string node_label = 2;
}

// We do not need to add an explicit status field here, we can just use the RPC status
message UpdateStreamingJobNodeLabelsResponse {}

message GetServerlessStreamingJobsStatusRequest {}

// Descriptions of MVs and sinks
message GetServerlessStreamingJobsStatusResponse {
message Status {
uint32 table_id = 1;
string node_label = 2;
bool backfill_done = 3;
}

repeated Status streaming_job_statuses = 1;
}

service ScaleService {
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc Reschedule(RescheduleRequest) returns (RescheduleResponse);
rpc UpdateStreamingJobNodeLabels(UpdateStreamingJobNodeLabelsRequest) returns (UpdateStreamingJobNodeLabelsResponse);
rpc GetServerlessStreamingJobsStatus(GetServerlessStreamingJobsStatusRequest) returns (GetServerlessStreamingJobsStatusResponse);
}

message MembersRequest {}
Expand Down
19 changes: 17 additions & 2 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use risingwave_meta_model_v2::FragmentId;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::scale_service_server::ScaleService;
use risingwave_pb::meta::{
GetClusterInfoRequest, GetClusterInfoResponse, PbWorkerReschedule, RescheduleRequest,
RescheduleResponse,
GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
GetServerlessStreamingJobsStatusResponse, PbWorkerReschedule, RescheduleRequest,
RescheduleResponse, UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
};
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use tonic::{Request, Response, Status};
Expand Down Expand Up @@ -219,4 +220,18 @@ impl ScaleService for ScaleServiceImpl {
revision: next_revision.into(),
}))
}

async fn update_streaming_job_node_labels(
&self,
_request: Request<UpdateStreamingJobNodeLabelsRequest>,
) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
todo!()
}

async fn get_serverless_streaming_jobs_status(
&self,
_request: Request<GetServerlessStreamingJobsStatusRequest>,
) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
todo!()
}
}
2 changes: 2 additions & 0 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl From<WorkerInfo> for PbWorkerNode {
transactional_id: info.0.transaction_id.map(|id| id as _),
resource: info.2.resource,
started_at: info.2.started_at,
node_label: "".to_string(),
}
}
}
Expand Down Expand Up @@ -465,6 +466,7 @@ fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
total_cpu_cores: total_cpu_available() as _,
}),
started_at,
node_label: "".to_string(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ impl CatalogController {
}
.into(),
),
node_label: "".to_string(),
backfill_done: true,
};

Ok(table_fragments)
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl ClusterManager {
// resource doesn't need persist
resource: None,
started_at: None,
node_label: "".to_string(),
};

let mut worker = Worker::from_protobuf(worker_node.clone());
Expand Down Expand Up @@ -771,6 +772,7 @@ fn meta_node_info(host: &str, started_at: Option<u64>) -> WorkerNode {
}),
started_at,
parallelism: 0,
node_label: "".to_string(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ impl MetadataModel for TableFragments {
actor_splits: build_actor_connector_splits(&self.actor_splits),
ctx: Some(self.ctx.to_protobuf()),
parallelism: Some(self.assigned_parallelism.into()),
node_label: "".to_string(),
backfill_done: true,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use risingwave_pb::cloud_service::*;
use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient;
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::create_materialized_view_request::PbBackfillType;
use risingwave_pb::ddl_service::ddl_service_client::DdlServiceClient;
use risingwave_pb::ddl_service::drop_table_request::SourceId;
use risingwave_pb::ddl_service::*;
Expand Down Expand Up @@ -353,6 +354,7 @@ impl MetaClient {
let request = CreateMaterializedViewRequest {
materialized_view: Some(table),
fragment_graph: Some(graph),
backfill: PbBackfillType::Regular as _,
};
let resp = self.inner.create_materialized_view(request).await?;
// TODO: handle error in `resp.status` here
Expand Down
Loading