Skip to content

Commit

Permalink
remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 30, 2024
1 parent 78843c8 commit 4e0dd77
Show file tree
Hide file tree
Showing 31 changed files with 41 additions and 568 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3299,16 +3299,6 @@ def section_hummock_manager(outer_panels):
),
],
),
panels.timeseries_count(
"Full GC Last Watermark",
"the object id watermark used in last full GC",
[
panels.target(
f"{metric('storage_full_gc_last_object_id_watermark')}",
"full_gc_last_object_id_watermark",
),
],
),
panels.timeseries_latency_ms(
"Compaction Event Loop Time",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

7 changes: 0 additions & 7 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@ service TelemetryInfoService {
}

message HeartbeatRequest {

Check failure on line 27 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "info" on message "HeartbeatRequest" was deleted without reserving the name "info".

Check failure on line 27 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "info" on message "HeartbeatRequest" was deleted without reserving the number "2".
message ExtraInfo {
oneof info {
uint64 hummock_gc_watermark = 1;
}
}
uint32 node_id = 1;
// Lightweight info piggybacked by heartbeat request.
repeated ExtraInfo info = 2;
}

message HeartbeatResponse {
Expand Down
8 changes: 0 additions & 8 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,6 @@ pub struct MetaConfig {
#[serde(default = "default::meta::full_gc_object_limit")]
pub full_gc_object_limit: u64,

/// The spin interval when collecting global GC watermark in hummock.
#[serde(default = "default::meta::collect_gc_watermark_spin_interval_sec")]
pub collect_gc_watermark_spin_interval_sec: u64,

/// Schedule compaction for all compaction groups with this interval.
#[serde(default = "default::meta::periodic_compaction_interval_sec")]
pub periodic_compaction_interval_sec: u64,
Expand Down Expand Up @@ -1358,10 +1354,6 @@ pub mod default {
100_000
}

pub fn collect_gc_watermark_spin_interval_sec() -> u64 {
5
}

pub fn periodic_compaction_interval_sec() -> u64 {
60
}
Expand Down
5 changes: 1 addition & 4 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer
use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
use risingwave_pb::task_service::task_service_server::TaskServiceServer;
use risingwave_rpc_client::{ComputeClientPool, ExtraInfoSourceRef, MetaClient};
use risingwave_rpc_client::{ComputeClientPool, MetaClient};
use risingwave_storage::hummock::compactor::{
new_compaction_await_tree_reg_ref, start_compactor, CompactionExecutor, CompactorContext,
};
Expand Down Expand Up @@ -228,9 +228,7 @@ pub async fn compute_node_serve(
ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
observer_manager.start().await;

let mut extra_info_sources: Vec<ExtraInfoSourceRef> = vec![];
if let Some(storage) = state_store.as_hummock() {
extra_info_sources.push(storage.sstable_object_id_manager().clone());
if embedded_compactor_enabled {
tracing::info!("start embedded compactor");
let memory_limiter = Arc::new(MemoryLimiter::new(
Expand Down Expand Up @@ -279,7 +277,6 @@ pub async fn compute_node_serve(
sub_tasks.push(MetaClient::start_heartbeat_loop(
meta_client.clone(),
Duration::from_millis(config.server.heartbeat_interval_ms as u64),
extra_info_sources,
));

// Initialize the managers.
Expand Down
1 change: 0 additions & 1 deletion src/config/ci-meta-backup-test.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[meta]
min_sst_retention_time_sec = 0
collect_gc_watermark_spin_interval_sec = 1
vacuum_interval_sec = 10

[system]
Expand Down
1 change: 0 additions & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ This page is automatically generated by `./risedev generate-example-config`
| Config | Description | Default |
|--------|-------------|---------|
| backend | | "Mem" |
| collect_gc_watermark_spin_interval_sec | The spin interval when collecting global GC watermark in hummock. | 5 |
| compact_task_table_size_partition_threshold_high | The threshold of table size in one compact task to decide whether to partition one table into `partition_vnode_count` parts, which belongs to default group and materialized view group. Set it max value of 64-bit number to disable this feature. | 536870912 |
| compact_task_table_size_partition_threshold_low | The threshold of table size in one compact task to decide whether to partition one table into `hybrid_partition_vnode_count` parts, which belongs to default group and materialized view group. Set it max value of 64-bit number to disable this feature. | 134217728 |
| compaction_task_max_heartbeat_interval_secs | | 30 |
Expand Down
1 change: 0 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ dir = "./"
min_sst_retention_time_sec = 86400
full_gc_interval_sec = 86400
full_gc_object_limit = 100000
collect_gc_watermark_spin_interval_sec = 5
periodic_compaction_interval_sec = 60
vacuum_interval_sec = 30
vacuum_spin_interval_ms = 200
Expand Down
1 change: 0 additions & 1 deletion src/config/full-iceberg-bench.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ threshold = 0.8999999761581421
[meta]
min_sst_retention_time_sec = 86400
full_gc_interval_sec = 86400
collect_gc_watermark_spin_interval_sec = 5
periodic_compaction_interval_sec = 60
vacuum_interval_sec = 30
vacuum_spin_interval_ms = 10
Expand Down
7 changes: 2 additions & 5 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,8 @@ impl HummockServiceOpts {
&mut self,
meta_client: &MetaClient,
) -> Result<(MonitoredStateStore<HummockStorage>, Metrics)> {
let (heartbeat_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
meta_client.clone(),
Duration::from_millis(1000),
vec![],
);
let (heartbeat_handle, heartbeat_shutdown_sender) =
MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000));
self.heartbeat_handle = Some(heartbeat_handle);
self.heartbeat_shutdown_sender = Some(heartbeat_shutdown_sender);

Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ impl FrontendEnv {
let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
meta_client.clone(),
Duration::from_millis(config.server.heartbeat_interval_ms as u64),
vec![],
);
let mut join_handles = vec![heartbeat_join_handle];
let mut shutdown_senders = vec![heartbeat_shutdown_sender];
Expand Down
3 changes: 0 additions & 3 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,6 @@ pub fn start(
min_sst_retention_time_sec: config.meta.min_sst_retention_time_sec,
full_gc_interval_sec: config.meta.full_gc_interval_sec,
full_gc_object_limit: config.meta.full_gc_object_limit,
collect_gc_watermark_spin_interval_sec: config
.meta
.collect_gc_watermark_spin_interval_sec,
enable_committed_sst_sanity_check: config.meta.enable_committed_sst_sanity_check,
periodic_compaction_interval_sec: config.meta.periodic_compaction_interval_sec,
node_num_monitor_interval_sec: config.meta.node_num_monitor_interval_sec,
Expand Down
14 changes: 2 additions & 12 deletions src/meta/service/src/heartbeat_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_meta::manager::MetadataManager;
use risingwave_pb::meta::heartbeat_service_server::HeartbeatService;
use risingwave_pb::meta::{HeartbeatRequest, HeartbeatResponse};
Expand All @@ -38,18 +37,9 @@ impl HeartbeatService for HeartbeatServiceImpl {
request: Request<HeartbeatRequest>,
) -> Result<Response<HeartbeatResponse>, Status> {
let req = request.into_inner();
let info = req
.info
.into_iter()
.filter_map(|node_info| node_info.info)
.collect_vec();
let result = match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr.cluster_manager.heartbeat(req.node_id, info).await,
MetadataManager::V2(mgr) => {
mgr.cluster_controller
.heartbeat(req.node_id as _, info)
.await
}
MetadataManager::V1(mgr) => mgr.cluster_manager.heartbeat(req.node_id).await,
MetadataManager::V2(mgr) => mgr.cluster_controller.heartbeat(req.node_id as _).await,
};

match result {
Expand Down
1 change: 0 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,6 @@ impl HummockManagerService for HummockServiceImpl {
min_delta_log_num_for_hummock_version_checkpoint,
min_sst_retention_time_sec,
full_gc_interval_sec,
collect_gc_watermark_spin_interval_sec,
periodic_compaction_interval_sec,
periodic_space_reclaim_compaction_interval_sec,
periodic_ttl_reclaim_compaction_interval_sec,
Expand Down
35 changes: 3 additions & 32 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::RW_VERSION;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_license::LicenseManager;
use risingwave_meta_model_v2::prelude::{Worker, WorkerProperty};
use risingwave_meta_model_v2::worker::{WorkerStatus, WorkerType};
use risingwave_meta_model_v2::{worker, worker_property, TransactionId, WorkerId};
use risingwave_pb::common::worker_node::{PbProperty, PbResource, PbState};
use risingwave_pb::common::{HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode};
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::heartbeat_request;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use sea_orm::prelude::Expr;
Expand Down Expand Up @@ -198,16 +196,12 @@ impl ClusterController {
}

/// Invoked when it receives a heartbeat from a worker node.
pub async fn heartbeat(
&self,
worker_id: WorkerId,
info: Vec<heartbeat_request::extra_info::Info>,
) -> MetaResult<()> {
pub async fn heartbeat(&self, worker_id: WorkerId) -> MetaResult<()> {
tracing::trace!(target: "events::meta::server_heartbeat", worker_id = worker_id, "receive heartbeat");
self.inner
.write()
.await
.heartbeat(worker_id, self.max_heartbeat_interval, info)
.heartbeat(worker_id, self.max_heartbeat_interval)
}

pub fn start_heartbeat_checker(
Expand Down Expand Up @@ -394,10 +388,6 @@ pub struct WorkerExtraInfo {
// Unix timestamp that the worker will expire at.
expire_at: Option<u64>,
started_at: Option<u64>,
// Monotonic increasing id since meta node bootstrap.
pub(crate) info_version_id: u64,
// GC watermark.
pub(crate) hummock_gc_watermark: Option<HummockSstableObjectId>,
resource: Option<PbResource>,
}

Expand All @@ -417,17 +407,6 @@ impl WorkerExtraInfo {
fn update_started_at(&mut self) {
self.started_at = Some(timestamp_now_sec());
}

fn update_hummock_info(&mut self, info: Vec<heartbeat_request::extra_info::Info>) {
self.info_version_id += 1;
for i in info {
match i {
heartbeat_request::extra_info::Info::HummockGcWatermark(watermark) => {
self.hummock_gc_watermark = Some(watermark);
}
}
}
}
}

// TODO: remove this when we deprecate model v1.
Expand All @@ -436,8 +415,6 @@ impl From<crate::model::Worker> for WorkerExtraInfo {
Self {
expire_at: Some(worker.expire_at),
started_at: worker.started_at,
info_version_id: worker.info_version_id,
hummock_gc_watermark: worker.hummock_gc_watermark,
resource: worker.resource,
}
}
Expand Down Expand Up @@ -821,15 +798,9 @@ impl ClusterControllerInner {
Ok(WorkerInfo(worker, property, extra_info).into())
}

pub fn heartbeat(
&mut self,
worker_id: WorkerId,
ttl: Duration,
info: Vec<heartbeat_request::extra_info::Info>,
) -> MetaResult<()> {
pub fn heartbeat(&mut self, worker_id: WorkerId, ttl: Duration) -> MetaResult<()> {
if let Some(worker_info) = self.worker_extra_info.get_mut(&worker_id) {
worker_info.update_ttl(ttl);
worker_info.update_hummock_info(info);
Ok(())
} else {
Err(MetaError::invalid_worker(
Expand Down
Loading

0 comments on commit 4e0dd77

Please sign in to comment.