Skip to content

Commit

Permalink
feat(compaction): support serverless compaction (#11904)
Browse files Browse the repository at this point in the history
Co-authored-by: wcy-fdu <[email protected]>
  • Loading branch information
wcy-fdu and wcy-fdu authored Sep 22, 2023
1 parent 5238860 commit 4fb087c
Show file tree
Hide file tree
Showing 23 changed files with 805 additions and 203 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions proto/compactor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,38 @@ syntax = "proto3";

package compactor;

import "catalog.proto";
import "common.proto";
import "hummock.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message EchoRequest {}

message EchoResponse {}

message DispatchCompactionTaskRequest {
// DispatchCompactionTaskRequest is used to pass compaction task level parameters.
oneof task {
hummock.CompactTask compact_task = 1;
hummock.VacuumTask vacuum_task = 2;
hummock.FullScanTask full_scan_task = 3;
hummock.ValidationTask validation_task = 4;
hummock.CancelCompactTask cancel_compact_task = 5;
}
// Used to build filter_key_extract.
repeated catalog.Table tables = 6;
// We can generate output object ids for each compactor, which equal to the number of input SSTs.
// If all the output object ids are used up, this compaction task fails, and the next allocation will be twice the previous amount of output object ids.
repeated uint64 output_object_ids = 7;
}

message DispatchCompactionTaskResponse {
common.Status status = 1;
}

service CompactorService {
rpc Echo(EchoRequest) returns (EchoResponse);
rpc DispatchCompactionTask(DispatchCompactionTaskRequest) returns (DispatchCompactionTaskResponse);
}
21 changes: 21 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,26 @@ message SubscribeCompactionEventResponse {
uint64 create_at = 7;
}

message ReportCompactionTaskRequest {
// ReportTask provides the compact task to report to the meta.
message ReportTask {
CompactTask compact_task = 2;
map<uint32, TableStats> table_stats_change = 3;
}
// HeartBeat provides the progress status of all tasks on the Compactor.
message HeartBeat {
repeated CompactTaskProgress progress = 2;
}
oneof event {
ReportTask report_task = 1;
HeartBeat heart_beat = 2;
}
}

message ReportCompactionTaskResponse {
common.Status status = 1;
}

message ValidationTask {
repeated SstableInfo sst_infos = 1;
map<uint64, uint32> sst_id_to_worker_id = 2;
Expand Down Expand Up @@ -685,6 +705,7 @@ service HummockManagerService {
rpc SplitCompactionGroup(SplitCompactionGroupRequest) returns (SplitCompactionGroupResponse);
rpc RiseCtlListCompactionStatus(RiseCtlListCompactionStatusRequest) returns (RiseCtlListCompactionStatusResponse);
rpc SubscribeCompactionEvent(stream SubscribeCompactionEventRequest) returns (stream SubscribeCompactionEventResponse);
rpc ReportCompactionTask(ReportCompactionTaskRequest) returns (ReportCompactionTaskResponse);
rpc ListBranchedObject(ListBranchedObjectRequest) returns (ListBranchedObjectResponse);
rpc ListActiveWriteLimit(ListActiveWriteLimitRequest) returns (ListActiveWriteLimitResponse);
rpc ListHummockMetaConfig(ListHummockMetaConfigRequest) returns (ListHummockMetaConfigResponse);
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,16 @@ impl AsyncStackTraceOption {
}
}

#[derive(Debug, Default, Clone, Copy, ValueEnum)]
pub enum CompactorMode {
#[default]
#[clap(alias = "dedicated")]
Dedicated,

#[clap(alias = "shared")]
Shared,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct HeapProfilingConfig {
/// Enable to auto dump heap profile when memory usage is high
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ pub async fn compute_node_serve(
compactor_metrics: compactor_metrics.clone(),
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(Some(1))),
filter_key_extractor_manager: storage.filter_key_extractor_manager().clone(),
memory_limiter,

task_progress_manager: Default::default(),
Expand All @@ -234,6 +233,7 @@ pub async fn compute_node_serve(
compactor_context,
hummock_meta_client.clone(),
storage.sstable_object_id_manager().clone(),
storage.filter_key_extractor_manager().clone(),
);
sub_tasks.push((handle, shutdown_sender));
}
Expand Down
7 changes: 7 additions & 0 deletions src/meta/src/rpc/service/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,13 @@ impl HummockManagerService for HummockServiceImpl {
Ok(Response::new(RwReceiverStream::new(rx)))
}

async fn report_compaction_task(
&self,
_request: Request<ReportCompactionTaskRequest>,
) -> Result<Response<ReportCompactionTaskResponse>, Status> {
unreachable!()
}

async fn list_branched_object(
&self,
_request: Request<ListBranchedObjectRequest>,
Expand Down
89 changes: 89 additions & 0 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;

use risingwave_common::util::addr::HostAddr;
use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
use risingwave_pb::hummock::{
GetNewSstIdsRequest, GetNewSstIdsResponse, ReportCompactionTaskRequest,
ReportCompactionTaskResponse, ReportFullScanTaskRequest, ReportFullScanTaskResponse,
ReportVacuumTaskRequest, ReportVacuumTaskResponse,
};
use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse};
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};

use crate::error::Result;
Expand Down Expand Up @@ -46,3 +56,82 @@ impl CompactorClient {
.into_inner())
}
}

#[derive(Debug, Clone)]
pub struct GrpcCompactorProxyClientCore {
hummock_client: HummockManagerServiceClient<Channel>,
system_params_client: SystemParamsServiceClient<Channel>,
}

impl GrpcCompactorProxyClientCore {
pub(crate) fn new(channel: Channel) -> Self {
let hummock_client =
HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
let system_params_client = SystemParamsServiceClient::new(channel);

Self {
hummock_client,
system_params_client,
}
}
}

/// Client to proxy server. Cloning the instance is lightweight.
///
/// Todo(wcy-fdu): add refresh client interface.
#[derive(Debug, Clone)]
pub struct GrpcCompactorProxyClient {
pub core: Arc<RwLock<GrpcCompactorProxyClientCore>>,
}

impl GrpcCompactorProxyClient {
pub fn new(channel: Channel) -> Self {
let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel)));
Self { core }
}

fn _recreate_core(&self, _channel: Channel) {
todo!()
}

pub async fn get_new_sst_ids(
&self,
request: GetNewSstIdsRequest,
) -> std::result::Result<tonic::Response<GetNewSstIdsResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.get_new_sst_ids(request).await
}

pub async fn report_compaction_task(
&self,
request: ReportCompactionTaskRequest,
) -> std::result::Result<tonic::Response<ReportCompactionTaskResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.report_compaction_task(request).await
}

pub async fn report_full_scan_task(
&self,
request: ReportFullScanTaskRequest,
) -> std::result::Result<tonic::Response<ReportFullScanTaskResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.report_full_scan_task(request).await
}

pub async fn report_vacuum_task(
&self,
request: ReportVacuumTaskRequest,
) -> std::result::Result<tonic::Response<ReportVacuumTaskResponse>, tonic::Status> {
let mut hummock_client = self.core.read().await.hummock_client.clone();
hummock_client.report_vacuum_task(request).await
}

pub async fn get_system_params(
&self,
) -> std::result::Result<tonic::Response<GetSystemParamsResponse>, tonic::Status> {
let mut system_params_client = self.core.read().await.system_params_client.clone();
system_params_client
.get_system_params(GetSystemParamsRequest {})
.await
}
}
2 changes: 1 addition & 1 deletion src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mod sink_coordinate_client;
mod stream_client;
mod tracing;

pub use compactor_client::CompactorClient;
pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient};
Expand Down
1 change: 1 addition & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"signal",
] }
tokio-retry = "0.3"
tonic = { workspace = true }
tracing = "0.1"
tracing-futures = { version = "0.2", features = ["futures-03"] }
xorf = "0.8.1"
Expand Down
74 changes: 47 additions & 27 deletions src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

mod compactor_observer;
mod rpc;
mod server;
pub mod server;
mod telemetry;

use clap::Parser;
use risingwave_common::config::{AsyncStackTraceOption, MetricLevel, OverrideConfig};
use risingwave_common::config::{
AsyncStackTraceOption, CompactorMode, MetricLevel, OverrideConfig,
};

use crate::server::compactor_serve;
use crate::server::{compactor_serve, shared_compactor_serve};

/// Command-line arguments for compactor-node.
#[derive(Parser, Clone, Debug, OverrideConfig)]
Expand Down Expand Up @@ -87,6 +89,12 @@ pub struct CompactorOpts {
#[clap(long, env = "RW_OBJECT_STORE_READ_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_read_timeout_ms)]
pub object_store_read_timeout_ms: Option<u64>,

#[clap(long, env = "RW_COMPACTOR_MODE", value_enum)]
pub compactor_mode: Option<CompactorMode>,

#[clap(long, env = "RW_PROXY_RPC_ENDPOINT", default_value = "")]
pub proxy_rpc_endpoint: String,
}

use std::future::Future;
Expand All @@ -95,28 +103,40 @@ use std::pin::Pin;
pub fn start(opts: CompactorOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
// WARNING: don't change the function signature. Making it `async fn` will cause
// slow compile in release mode.
Box::pin(async move {
tracing::info!("Compactor node options: {:?}", opts);
tracing::info!("meta address: {}", opts.meta_address.clone());

let listen_addr = opts.listen_addr.parse().unwrap();
tracing::info!("Server Listening at {}", listen_addr);

let advertise_addr = opts
.advertise_addr
.as_ref()
.unwrap_or_else(|| {
tracing::warn!("advertise addr is not specified, defaulting to listen address");
&opts.listen_addr
})
.parse()
.unwrap();
tracing::info!(" address is {}", advertise_addr);

let (join_handle, observer_join_handle, _shutdown_sender) =
compactor_serve(listen_addr, advertise_addr, opts).await;

join_handle.await.unwrap();
observer_join_handle.abort();
})
match opts.compactor_mode {
Some(CompactorMode::Shared) => Box::pin(async move {
tracing::info!("Shared compactor pod options: {:?}", opts);
tracing::info!("Proxy rpc endpoint: {}", opts.proxy_rpc_endpoint.clone());

let listen_addr = opts.listen_addr.parse().unwrap();
tracing::info!("Server Listening at {}", listen_addr);

let (join_handle, _shutdown_sender) = shared_compactor_serve(listen_addr, opts).await;

join_handle.await.unwrap();
}),
None | Some(CompactorMode::Dedicated) => Box::pin(async move {
tracing::info!("Compactor node options: {:?}", opts);
tracing::info!("meta address: {}", opts.meta_address.clone());

let listen_addr = opts.listen_addr.parse().unwrap();
tracing::info!("Server Listening at {}", listen_addr);

let advertise_addr = opts
.advertise_addr
.as_ref()
.unwrap_or_else(|| {
tracing::warn!("advertise addr is not specified, defaulting to listen address");
&opts.listen_addr
})
.parse()
.unwrap();
tracing::info!(" address is {}", advertise_addr);
let (join_handle, observer_join_handle, _shutdown_sender) =
compactor_serve(listen_addr, advertise_addr, opts).await;

join_handle.await.unwrap();
observer_join_handle.abort();
}),
}
}
Loading

0 comments on commit 4fb087c

Please sign in to comment.