From 6b0cfebffe6a01a1f7eb28dbbc2ac3c0e8e297c6 Mon Sep 17 00:00:00 2001 From: Xiaobing <61892277+klsince@users.noreply.github.com> Date: Thu, 22 Feb 2024 07:31:34 -0800 Subject: [PATCH] share the same table config object (#12463) --- .../manager/realtime/RealtimeTableDataManager.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index c3cb5c603a83..399a0de1db1e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -185,10 +185,7 @@ protected void doInit() { // Set up dedup/upsert metadata manager // NOTE: Dedup/upsert has to be set up when starting the server. Changing the table config without restarting the // server won't enable/disable them on the fly. - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); - Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); - - DedupConfig dedupConfig = tableConfig.getDedupConfig(); + DedupConfig dedupConfig = _tableConfig.getDedupConfig(); boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled(); if (dedupEnabled) { Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); @@ -197,10 +194,10 @@ protected void doInit() { List primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns), "Primary key columns must be configured for dedup"); - _tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics); + _tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(_tableConfig, schema, this, _serverMetrics); } - UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); + UpsertConfig upsertConfig = _tableConfig.getUpsertConfig(); if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) { Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both enabled for table: %s", _tableUpsertMetadataManager); @@ -209,8 +206,8 @@ protected void doInit() { // NOTE: Set _tableUpsertMetadataManager before initializing it because when preloading is enabled, we need to // load segments into it _tableUpsertMetadataManager = - TableUpsertMetadataManagerFactory.create(tableConfig, _instanceDataManagerConfig.getUpsertConfig()); - _tableUpsertMetadataManager.init(tableConfig, schema, this, _helixManager, _segmentPreloadExecutor); + TableUpsertMetadataManagerFactory.create(_tableConfig, _instanceDataManagerConfig.getUpsertConfig()); + _tableUpsertMetadataManager.init(_tableConfig, schema, this, _helixManager, _segmentPreloadExecutor); } // For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data