Skip to content

Commit

Permalink
Merge branch 'main' into xxh/add-sink-metr
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 10, 2024
2 parents 53aad99 + 430fbb9 commit bc077d9
Show file tree
Hide file tree
Showing 40 changed files with 1,319 additions and 1,134 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/typo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ jobs:
uses: actions/checkout@v4

- name: Check spelling of the entire repository
uses: crate-ci/typos@v1.23.2
uses: crate-ci/typos@v1.28.2
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
exclude: 'src/frontend/planner_test/tests/testdata/.*'
- id: trailing-whitespace
- repo: https://github.com/crate-ci/typos
rev: v1.23.1
rev: v1.28.2
hooks:
- id: typos
- repo: local
Expand Down
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.13.0", features = ["tracing", "nightly", "prometheus"] }
foyer = { version = "0.13.1", features = ["tracing", "nightly", "prometheus"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
Expand Down
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ fi
private = true
category = "RiseDev - Check"
description = "Run cargo typos-cli check"
install_crate = { min_version = "1.23.2", crate_name = "typos-cli", binary = "typos", test_arg = [
install_crate = { min_version = "1.28.2", crate_name = "typos-cli", binary = "typos", test_arg = [
"--help",
], install_command = "binstall" }
script = """
Expand Down
6 changes: 5 additions & 1 deletion e2e_test/source_inline/kafka/avro/ref.slt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ select
(bar).b.y
from s;
----
3 4 5 6 6 5 4 3
3 4 5 6 NULL NULL NULL NULL

# Parsing of column `bar` fails even with ints because now `schema` is required.
# This will be fully supported in the next PR
# 3 4 5 6 6 5 4 3


statement ok
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ def section_streaming_actors(outer_panels: Panels):
"it's very likely to be the performance bottleneck",
[
panels.target(
# Here we use `min` but actually no much difference. Any of the sampled epoches makes sense.
# Here we use `min` but actually no much difference. Any of the sampled epochs makes sense.
f"min({metric('stream_actor_current_epoch')} != 0) by (fragment_id)",
"fragment {{fragment_id}}",
),
Expand Down
8 changes: 8 additions & 0 deletions proto/compute.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ message ShowConfigResponse {
string stream_config = 2;
}

message ResizeCacheRequest {
uint64 meta_cache_capacity = 1;
uint64 data_cache_capacity = 2;
}

message ResizeCacheResponse {}

service ConfigService {
rpc ShowConfig(ShowConfigRequest) returns (ShowConfigResponse);
rpc ResizeCache(ResizeCacheRequest) returns (ResizeCacheResponse);
}
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ pub struct StorageConfig {
#[serde(default = "default::storage::mem_table_spill_threshold")]
pub mem_table_spill_threshold: usize,

/// The concurrent uploading number of `SSTables` of buidler
/// The concurrent uploading number of `SSTables` of builder
#[serde(default = "default::storage::compactor_concurrent_uploading_sst_count")]
pub compactor_concurrent_uploading_sst_count: Option<usize>,

Expand Down
52 changes: 50 additions & 2 deletions src/compute/src/rpc/service/config_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,24 @@
// limitations under the License.
use std::sync::Arc;

use foyer::HybridCache;
use risingwave_batch::task::BatchManager;
use risingwave_common::error::tonic::ToTonicStatus;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::compute::config_service_server::ConfigService;
use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse};
use risingwave_pb::compute::{
ResizeCacheRequest, ResizeCacheResponse, ShowConfigRequest, ShowConfigResponse,
};
use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tonic::{Code, Request, Response, Status};

pub struct ConfigServiceImpl {
batch_mgr: Arc<BatchManager>,
stream_mgr: LocalStreamManager,
meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
}

#[async_trait::async_trait]
Expand All @@ -42,13 +50,53 @@ impl ConfigService for ConfigServiceImpl {
};
Ok(Response::new(show_config_response))
}

async fn resize_cache(
&self,
request: Request<ResizeCacheRequest>,
) -> Result<Response<ResizeCacheResponse>, Status> {
let req = request.into_inner();

if let Some(meta_cache) = &self.meta_cache
&& req.meta_cache_capacity > 0
{
match meta_cache.memory().resize(req.meta_cache_capacity as _) {
Ok(_) => tracing::info!(
"resize meta cache capacity to {:?}",
req.meta_cache_capacity
),
Err(e) => return Err(Status::internal(e.to_report_string())),
}
}

if let Some(block_cache) = &self.block_cache
&& req.data_cache_capacity > 0
{
match block_cache.memory().resize(req.data_cache_capacity as _) {
Ok(_) => tracing::info!(
"resize data cache capacity to {:?}",
req.data_cache_capacity
),
Err(e) => return Err(Status::internal(e.to_report_string())),
}
}

Ok(Response::new(ResizeCacheResponse {}))
}
}

impl ConfigServiceImpl {
pub fn new(batch_mgr: Arc<BatchManager>, stream_mgr: LocalStreamManager) -> Self {
pub fn new(
batch_mgr: Arc<BatchManager>,
stream_mgr: LocalStreamManager,
meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
) -> Self {
Self {
batch_mgr,
stream_mgr,
meta_cache,
block_cache,
}
}
}
6 changes: 3 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ pub async fn compute_node_serve(
let monitor_srv = MonitorServiceImpl::new(
stream_mgr.clone(),
config.server.clone(),
meta_cache,
block_cache,
meta_cache.clone(),
block_cache.clone(),
);
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone());
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone(), meta_cache, block_cache);
let health_srv = HealthServiceImpl::new();

let telemetry_manager = TelemetryManager::new(
Expand Down
2 changes: 1 addition & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ This page is automatically generated by `./risedev generate-example-config`
| block_cache_capacity_mb | DEPRECATED: This config will be deprecated in the future version, use `storage.cache.block_cache_capacity_mb` instead. | |
| check_compaction_result | | false |
| compact_iter_recreate_timeout_ms | | 600000 |
| compactor_concurrent_uploading_sst_count | The concurrent uploading number of `SSTables` of buidler | |
| compactor_concurrent_uploading_sst_count | The concurrent uploading number of `SSTables` of builder | |
| compactor_fast_max_compact_delete_ratio | | 40 |
| compactor_fast_max_compact_task_size | | 2147483648 |
| compactor_iter_max_io_retry_times | | 8 |
Expand Down
Loading

0 comments on commit bc077d9

Please sign in to comment.