Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compaction): per table vnode on compactor side #19059

Merged
merged 14 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub async fn compute_node_serve(
compactor_context,
hummock_meta_client.clone(),
storage.sstable_object_id_manager().clone(),
storage.filter_key_extractor_manager().clone(),
storage.compaction_catalog_manager_ref().clone(),
);
sub_tasks.push((handle, shutdown_sender));
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub async fn setup_compute_env_with_metric(
compactor_streams_change_tx,
)
.await;

let fake_host_address = HostAddress {
host: "127.0.0.1".to_string(),
port,
Expand Down
28 changes: 22 additions & 6 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -32,6 +33,7 @@ use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{InMemObjectStore, ObjectStore, ObjectStoreImpl};
use risingwave_pb::hummock::compact_task::PbTaskType;
use risingwave_pb::hummock::PbTableSchema;
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::compactor::compactor_runner::compact_and_build_sst;
use risingwave_storage::hummock::compactor::{
ConcatSstableIterator, DummyCompactionFilter, TaskConfig, TaskProgress,
Expand Down Expand Up @@ -133,8 +135,13 @@ async fn build_table(
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt);
let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
);
let value = b"1234567890123456789";
let mut full_key = test_key_of(0, epoch, TableId::new(0));
let table_key_len = full_key.user_key.table_key.len();
Expand Down Expand Up @@ -177,8 +184,14 @@ async fn build_table_2(
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt);

let table_id_to_vnode = HashMap::from_iter(vec![(table_id, VirtualNode::COUNT_FOR_TEST)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
);
let mut full_key = test_key_of(0, epoch, TableId::new(table_id));
let table_key_len = full_key.user_key.table_key.len();

Expand Down Expand Up @@ -273,8 +286,11 @@ async fn compact<I: HummockIterator<Direction = Forward>>(
bloom_false_positive: 0.001,
..Default::default()
};
let mut builder =
CapacitySplitTableBuilder::for_test(LocalTableBuilderFactory::new(32, sstable_store, opt));
let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
let mut builder = CapacitySplitTableBuilder::for_test(
LocalTableBuilderFactory::new(32, sstable_store, opt),
compaction_catalog_agent_ref,
);

let task_config = task_config.unwrap_or_else(|| TaskConfig {
key_range: KeyRange::inf(),
Expand Down
16 changes: 15 additions & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::env;
use std::ops::Range;
use std::sync::atomic::AtomicU64;
Expand All @@ -24,11 +25,13 @@ use foyer::{Engine, HybridCacheBuilder};
use rand::random;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_object_store::object::{
InMemObjectStore, ObjectStore, ObjectStoreImpl, S3ObjectStore,
};
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::iterator::{ConcatIterator, ConcatIteratorInner, HummockIterator};
use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use risingwave_storage::hummock::value::HummockValue;
Expand Down Expand Up @@ -83,7 +86,11 @@ impl<F: SstableWriterFactory> TableBuilderFactory for LocalTableBuilderFactory<F
.create_sst_writer(id, writer_options)
.await
.unwrap();
let builder = SstableBuilder::for_test(id, writer, self.options.clone());
let table_id_to_vnode = HashMap::from_iter(vec![(
TableId::default().into(),
VirtualNode::COUNT_FOR_TEST,
)]);
let builder = SstableBuilder::for_test(id, writer, self.options.clone(), table_id_to_vnode);

Ok(builder)
}
Expand Down Expand Up @@ -192,6 +199,8 @@ fn bench_builder(

let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);

let mut group = c.benchmark_group("bench_multi_builder");
group
.sample_size(SAMPLE_COUNT)
Expand All @@ -205,6 +214,7 @@ fn bench_builder(
StreamingSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
})
});
Expand All @@ -217,6 +227,7 @@ fn bench_builder(
BatchSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
})
});
Expand Down Expand Up @@ -249,13 +260,16 @@ fn bench_table_scan(c: &mut Criterion) {
let object_store = Arc::new(ObjectStoreImpl::InMem(store));
let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);

let ssts = runtime.block_on(async {
build_tables(CapacitySplitTableBuilder::for_test(
LocalTableBuilderFactory::new(
1,
BatchSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
.await
});
Expand Down
27 changes: 9 additions & 18 deletions src/storage/compactor/src/compactor_observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
Expand All @@ -21,12 +20,10 @@ use risingwave_pb::catalog::Table;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::SubscribeResponse;
use risingwave_storage::filter_key_extractor::{
FilterKeyExtractorImpl, FilterKeyExtractorManagerRef,
};
use risingwave_storage::compaction_catalog_manager::CompactionCatalogManagerRef;

pub struct CompactorObserverNode {
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
compaction_catalog_manager: CompactionCatalogManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
version: u64,
}
Expand Down Expand Up @@ -83,36 +80,30 @@ impl ObserverState for CompactorObserverNode {

impl CompactorObserverNode {
pub fn new(
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
compaction_catalog_manager: CompactionCatalogManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
) -> Self {
Self {
filter_key_extractor_manager,
compaction_catalog_manager,
system_params_manager,
version: 0,
}
}

fn handle_catalog_snapshot(&mut self, tables: Vec<Table>) {
let all_filter_key_extractors: HashMap<u32, Arc<FilterKeyExtractorImpl>> = tables
.iter()
.map(|t| (t.id, Arc::new(FilterKeyExtractorImpl::from_table(t))))
.collect();
self.filter_key_extractor_manager
.sync(all_filter_key_extractors);
self.compaction_catalog_manager
.sync(tables.into_iter().map(|t| (t.id, Arc::new(t))).collect());
}

fn handle_catalog_notification(&mut self, operation: Operation, table_catalog: Table) {
match operation {
Operation::Add | Operation::Update => {
self.filter_key_extractor_manager.update(
table_catalog.id,
Arc::new(FilterKeyExtractorImpl::from_table(&table_catalog)),
);
self.compaction_catalog_manager
.update(table_catalog.id, Arc::new(table_catalog));
}

Operation::Delete => {
self.filter_key_extractor_manager.remove(table_catalog.id);
self.compaction_catalog_manager.remove(table_catalog.id);
}

_ => panic!("receive an unsupported notify {:?}", operation),
Expand Down
14 changes: 6 additions & 8 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use risingwave_pb::common::WorkerType;
use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
use risingwave_storage::filter_key_extractor::{
FilterKeyExtractorManager, RemoteTableAccessor, RpcFilterKeyExtractorManager,
use risingwave_storage::compaction_catalog_manager::{
CompactionCatalogManager, RemoteTableAccessor,
};
use risingwave_storage::hummock::compactor::{
new_compaction_await_tree_reg_ref, CompactionAwaitTreeRegRef, CompactionExecutor,
Expand Down Expand Up @@ -212,12 +212,13 @@ pub async fn compactor_serve(
compactor_metrics,
) = prepare_start_parameters(config.clone(), system_params_reader.clone()).await;

let filter_key_extractor_manager = Arc::new(RpcFilterKeyExtractorManager::new(Box::new(
let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
RemoteTableAccessor::new(meta_client.clone()),
)));

let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
let compactor_observer_node = CompactorObserverNode::new(
filter_key_extractor_manager.clone(),
compaction_catalog_manager_ref.clone(),
system_params_manager.clone(),
);
let observer_manager =
Expand All @@ -234,9 +235,6 @@ pub async fn compactor_serve(
hummock_meta_client.clone(),
storage_opts.sstable_id_remote_fetch_number,
));
let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager(
filter_key_extractor_manager.clone(),
);

let compaction_executor = Arc::new(CompactionExecutor::new(
opts.compaction_worker_threads_number,
Expand All @@ -263,7 +261,7 @@ pub async fn compactor_serve(
compactor_context.clone(),
hummock_meta_client.clone(),
sstable_object_id_manager.clone(),
filter_key_extractor_manager.clone(),
compaction_catalog_manager_ref,
),
];

Expand Down
12 changes: 5 additions & 7 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use risingwave_hummock_trace::{
use risingwave_meta::hummock::test_utils::setup_compute_env;
use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_storage::filter_key_extractor::{
FakeRemoteTableAccessor, RpcFilterKeyExtractorManager,
use risingwave_storage::compaction_catalog_manager::{
CompactionCatalogManager, FakeRemoteTableAccessor,
};
use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics};
Expand Down Expand Up @@ -166,16 +166,14 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
)
};

let key_filter_manager = Arc::new(RpcFilterKeyExtractorManager::new(Box::new(
FakeRemoteTableAccessor {},
)));

let storage = HummockStorage::new(
storage_opts,
sstable_store,
hummock_meta_client.clone(),
notification_client,
key_filter_manager,
Arc::new(CompactionCatalogManager::new(Box::new(
FakeRemoteTableAccessor {},
))),
state_store_metrics,
compactor_metrics,
None,
Expand Down
Loading
Loading