Skip to content

Commit

Permalink
property
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Oct 12, 2023
1 parent e90f4a5 commit 0e064dd
Show file tree
Hide file tree
Showing 10 changed files with 377 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.apache.helix.rest.common.dataprovider;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

/** Init, register listener and manager callback handler for different
* clusters manage the providers lifecycle
*
*/

import java.util.List;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;


public class HelixRestDataProviderManager {

protected RealmAwareZkClient _zkclient;

private HelixAdmin _helixAdmin;

private RestServiceDataProvider _restServiceDataProvider;
// list of callback handlers

//TODO: create own zk client
public HelixRestDataProviderManager(RealmAwareZkClient zkclient, HelixAdmin helixAdmin) {
_zkclient = zkclient;
_helixAdmin = helixAdmin;
_restServiceDataProvider = new RestServiceDataProvider();
init();
}

public RestServiceDataProvider getRestServiceDataProvider() {
return _restServiceDataProvider;
}

private void init() {
List<String> clusters = _helixAdmin.getClusters();
for (String cluster : clusters) {
PerClusterDataProvider clusterDataProvider =
new PerClusterDataProvider(cluster, _zkclient, new ZkBaseDataAccessor(_zkclient));
clusterDataProvider.initCache();
// register callback handler for each provider
_restServiceDataProvider.addClusterDataProvider(cluster, clusterDataProvider);
}
}

public void close() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.apache.helix.rest.common.dataprovider;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.Map;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.common.caches.PropertyCache;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;


/**
* Dara cache for each Helix cluster. Configs, ideal stats and current states are read from ZK and updated
* using event changes. External view are consolidated using current state.
*/
public class PerClusterDataProvider {

private HelixDataAccessor _accessor;

private RealmAwareZkClient _zkclient;

private final String _clusterName;

// Simple caches
private final RestPropertyCache<InstanceConfig> _instanceConfigCache;
private final RestPropertyCache<ClusterConfig> _clusterConfigCache;
private final RestPropertyCache<ResourceConfig> _resourceConfigCache;
private final RestPropertyCache<LiveInstance> _liveInstanceCache;
private final RestPropertyCache<IdealState> _idealStateCache;
private final RestPropertyCache<StateModelDefinition> _stateModelDefinitionCache;

// special caches
private final RestCurrentStateCache _currentStateCache;

// TODO: add external view caches

public PerClusterDataProvider(String clusterName, RealmAwareZkClient zkClient, BaseDataAccessor baseDataAccessor) {
_clusterName = clusterName;
_accessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);

_zkclient = zkClient;
_instanceConfigCache = null;
_clusterConfigCache = null;
_resourceConfigCache = null;
_liveInstanceCache = null;
_idealStateCache = null;
_stateModelDefinitionCache = null;
_currentStateCache = null;
}
// TODO: consolidate EV from CSs
public Map<String, ExternalView> consolidateExternalViews() {
return null;
}

// Used for dummy cache. Remove later
public void initCache(final HelixDataAccessor accessor) {

}

public void initCache() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.helix.rest.common.dataprovider;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.model.CurrentState;


/**
* Special cache for instances current states.
*
*/
public class RestCurrentStateCache {

//Map<instanceName, Map<ResourceName, CurrentState>>
private ConcurrentHashMap<String, ConcurrentHashMap<String, CurrentState>> _objCache;

public RestCurrentStateCache() {
_objCache = new ConcurrentHashMap<>();
}

public void init(final HelixDataAccessor accessor) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.apache.helix.rest.common.dataprovider;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.common.caches.PropertyCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Class for caching simple HelixProperty objects.
* @param <T>
*/
public class RestPropertyCache<T extends HelixProperty> {
private static final Logger LOG = LoggerFactory.getLogger(RestPropertyCache.class);

private ConcurrentHashMap<String, T> _objCache;
private final String _propertyDescription;

private final RestPropertyCache.PropertyCacheKeyFuncs<T> _keyFuncs;

public interface PropertyCacheKeyFuncs<O extends HelixProperty> {
/**
* Get PropertyKey for the root of this type of object, used for LIST all objects
* @return property key to object root
*/
PropertyKey getRootKey(HelixDataAccessor accessor);

/**
* Get PropertyKey for a single object of this type, used for GET single instance of the type
* @param objName object name
* @return property key to the object instance
*/
PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName);

/**
* Get the string to identify the object when we actually use them. It's not necessarily the
* "id" field of HelixProperty, but could have more semantic meanings of that object type
* @param obj object instance
* @return object identifier
*/
String getObjName(O obj);
}

public RestPropertyCache(String propertyDescription, RestPropertyCache.PropertyCacheKeyFuncs<T> keyFuncs) {
_keyFuncs = keyFuncs;
_propertyDescription = propertyDescription;
}

public void init(final HelixDataAccessor accessor) {
_objCache = new ConcurrentHashMap<>(accessor.getChildValuesMap(_keyFuncs.getRootKey(accessor), true));
LOG.info("Init RestPropertyCache for {}. ", _propertyDescription);
}

public Map<String, T> getPropertyMap() {
return Collections.unmodifiableMap(_objCache);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.apache.helix.rest.common.dataprovider;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.HashMap;
import java.util.Map;


public class RestServiceDataProvider {
protected Map<String, PerClusterDataProvider> _clusterDataProviders ;


public RestServiceDataProvider() {
_clusterDataProviders = new HashMap<>();
}

public PerClusterDataProvider getClusterData(String clusterName) {
return _clusterDataProviders.get(clusterName);
}

public void addClusterDataProvider(String clusterName, PerClusterDataProvider clusterData) {
_clusterDataProviders.put(clusterName, clusterData);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.rest.common.dataprovider.HelixRestDataProviderManager;
import org.apache.helix.rest.common.dataprovider.RestServiceDataProvider;
import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.tools.ClusterSetup;
Expand Down Expand Up @@ -80,6 +82,8 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
// Create ZkBucketDataAccessor for ReadOnlyWagedRebalancer.
private volatile ZkBucketDataAccessor _zkBucketDataAccessor;

private volatile HelixRestDataProviderManager _helixRestDataProviderManager;

/**
* Multi-ZK support
*/
Expand Down Expand Up @@ -292,6 +296,17 @@ public ZkBucketDataAccessor getZkBucketDataAccessor() {
return _zkBucketDataAccessor;
}

public HelixRestDataProviderManager getHelixRestDataProviderManager() {
if (_helixRestDataProviderManager == null) {
synchronized (this) {
if (_helixRestDataProviderManager == null) {
_helixRestDataProviderManager = new HelixRestDataProviderManager(getRealmAwareZkClient(), getHelixAdmin());
}
}
}
return _helixRestDataProviderManager;
}

public void close() {
if (_zkClient != null) {
_zkClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
Expand Down Expand Up @@ -164,6 +165,18 @@ void subscribeStateChanges(
void unsubscribeStateChanges(
org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener);

default void subscribePersistRecursiveListener(String path,
RecursivePersistListener recursivePersistListener) {
throw new UnsupportedOperationException(
"subscribePersistRecursiveListener() is not supported!");
}

default void unsubscribePersistRecursiveListener(String path,
RecursivePersistListener recursivePersistListener) {
throw new UnsupportedOperationException(
"subscribePersistRecursiveListener() is not supported!");
}

void unsubscribeAll();

// data access
Expand Down
Loading

0 comments on commit 0e064dd

Please sign in to comment.