Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix a bug that when catalog is unified catalog the background refresh for hive connector do not work #55215

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.starrocks.connector.ConnectorContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.hive.CatalogNameType;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudConfigurationFactory;
import com.starrocks.server.GlobalStateMgr;
Expand All @@ -36,12 +37,14 @@ public class DeltaLakeConnector implements Connector {
private final Map<String, String> properties;
private final CloudConfiguration cloudConfiguration;
private final String catalogName;
private final CatalogNameType catalogNameType;
private final DeltaLakeInternalMgr internalMgr;
private final DeltaLakeMetadataFactory metadataFactory;
private IDeltaLakeMetastore metastore;

public DeltaLakeConnector(ConnectorContext context) {
this.catalogName = context.getCatalogName();
this.catalogNameType = new CatalogNameType(catalogName, "delta_lake");
this.properties = context.getProperties();
this.cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
Expand Down Expand Up @@ -75,13 +78,13 @@ public CloudConfiguration getCloudConfiguration() {
public void shutdown() {
internalMgr.shutdown();
metadataFactory.metastoreCacheInvalidateCache();
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogNameType);
}

public void onCreate() {
Optional<DeltaLakeCacheUpdateProcessor> updateProcessor = metadataFactory.getCacheUpdateProcessor();
updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
.registerCacheUpdateProcessor(catalogName, updateProcessor.get()));
.registerCacheUpdateProcessor(catalogNameType, updateProcessor.get()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.hive;

/**
* This class will be used for register background refresh in `ConnectorTableMetadataProcessor`.
* As the unified catalog feature is implemented, we can not use the catalog name as the key
* in the `cacheUpdateProcessors` map of the `ConnectorTableMetadataProcessor`.
* So here we introduce this class and use it as the key for that map.
*/
public class CatalogNameType {

private final String catalogName;
private final String catalogType;

public CatalogNameType(String catalogName, String catalogType) {
this.catalogName = catalogName;
this.catalogType = catalogType;
}

public String getCatalogName() {
return this.catalogName;
}

public String getCatalogType() {
return this.catalogType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon {

private final Set<BaseTableInfo> registeredTableInfos = Sets.newConcurrentHashSet();

private final Map<String, CacheUpdateProcessor> cacheUpdateProcessors = new ConcurrentHashMap<>();
private final Map<CatalogNameType, CacheUpdateProcessor> cacheUpdateProcessors =
new ConcurrentHashMap<>();

private final ExecutorService refreshRemoteFileExecutor;
private final Map<String, IcebergCatalog> cachingIcebergCatalogs = new ConcurrentHashMap<>();
Expand All @@ -57,14 +58,16 @@ public void registerTableInfo(BaseTableInfo tableInfo) {
registeredTableInfos.add(tableInfo);
}

public void registerCacheUpdateProcessor(String catalogName, CacheUpdateProcessor cache) {
LOG.info("register to update {} metadata cache in the ConnectorTableMetadataProcessor", catalogName);
cacheUpdateProcessors.put(catalogName, cache);
public void registerCacheUpdateProcessor(CatalogNameType catalogNameType, CacheUpdateProcessor cache) {
LOG.info("register to update {}:{} metadata cache in the ConnectorTableMetadataProcessor",
catalogNameType.getCatalogName(), catalogNameType.getCatalogType());
cacheUpdateProcessors.put(catalogNameType, cache);
}

public void unRegisterCacheUpdateProcessor(String catalogName) {
LOG.info("unregister to update {} metadata cache in the ConnectorTableMetadataProcessor", catalogName);
cacheUpdateProcessors.remove(catalogName);
public void unRegisterCacheUpdateProcessor(CatalogNameType catalogNameType) {
LOG.info("unregister to update {}:{} metadata cache in the ConnectorTableMetadataProcessor",
catalogNameType.getCatalogName(), catalogNameType.getCatalogType());
cacheUpdateProcessors.remove(catalogNameType);
}

public void registerCachingIcebergCatalog(String catalogName, IcebergCatalog icebergCatalog) {
Expand Down Expand Up @@ -99,9 +102,11 @@ protected void runAfterCatalogReady() {

private void refreshCatalogTable() {
MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
List<String> catalogNames = Lists.newArrayList(cacheUpdateProcessors.keySet());
for (String catalogName : catalogNames) {
CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(catalogName);
List<CatalogNameType> catalogNameTypes = Lists.newArrayList(cacheUpdateProcessors.keySet());
for (CatalogNameType catalogNameType : catalogNameTypes) {
String catalogName = catalogNameType.getCatalogName();
LOG.info("Starting to refresh tables from {}:{} metadata cache", catalogName, catalogNameType.getCatalogType());
CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(catalogNameType);
if (updateProcessor == null) {
LOG.error("Failed to get cacheUpdateProcessor by catalog {}.", catalogName);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ public class HiveConnector implements Connector {
public static final String HIVE_METASTORE_CONNECTION_POOL_SIZE = "hive.metastore.connection.pool.size";
private final Map<String, String> properties;
private final String catalogName;
private final CatalogNameType catalogNameType;
private final HiveConnectorInternalMgr internalMgr;
private final HiveMetadataFactory metadataFactory;

public HiveConnector(ConnectorContext context) {
this.properties = context.getProperties();
this.catalogName = context.getCatalogName();
this.catalogNameType = new CatalogNameType(catalogName, "hive");
CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
this.internalMgr = new HiveConnectorInternalMgr(catalogName, properties, hdfsEnvironment);
Expand Down Expand Up @@ -83,7 +85,7 @@ public void onCreate() {
internalMgr.isEnableBackgroundRefreshHiveMetadata()) {
updateProcessor
.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
.registerCacheUpdateProcessor(catalogName, updateProcessor.get()));
.registerCacheUpdateProcessor(catalogNameType, updateProcessor.get()));
}
}
}
Expand All @@ -93,6 +95,6 @@ public void shutdown() {
internalMgr.shutdown();
metadataFactory.getCacheUpdateProcessor().ifPresent(HiveCacheUpdateProcessor::invalidateAll);
GlobalStateMgr.getCurrentState().getMetastoreEventsProcessor().unRegisterCacheUpdateProcessor(catalogName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogNameType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.RemoteFileIO;
import com.starrocks.connector.hive.CatalogNameType;
import com.starrocks.connector.hive.IHiveMetastore;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudConfigurationFactory;
Expand All @@ -33,6 +34,7 @@ public class HudiConnector implements Connector {
public static final List<String> SUPPORTED_METASTORE_TYPE = Lists.newArrayList("hive", "glue", "dlf");
private final Map<String, String> properties;
private final String catalogName;
private final CatalogNameType catalogNameType;
private final HudiConnectorInternalMgr internalMgr;
private final HudiMetadataFactory metadataFactory;

Expand All @@ -41,6 +43,7 @@ public HudiConnector(ConnectorContext context) {
CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
this.catalogName = context.getCatalogName();
this.catalogNameType = new CatalogNameType(catalogName, "hudi");
this.internalMgr = new HudiConnectorInternalMgr(catalogName, properties, hdfsEnvironment);
this.metadataFactory = createMetadataFactory(hdfsEnvironment);
onCreate();
Expand Down Expand Up @@ -74,6 +77,6 @@ public void onCreate() {
@Override
public void shutdown() {
internalMgr.shutdown();
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogNameType);
}
}
Loading