From cac3cecc95ba76592a3bee9366a2f8b0f7a996e3 Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Wed, 31 Jul 2024 16:02:48 -0700 Subject: [PATCH] Implement Helix ST handling logic and HelixGatewayParticipant (#2845) - Add ST handling logic to support multi top state state model definitions without intermediary states - Encapsulate the participant manager create, connect, disconnect logic in HelixGatewayParticipant - Add replica state tracking in HelixGatewayParticipant --- helix-gateway/pom.xml | 6 + .../service/HelixGatewayServiceProcessor.java | 17 +- .../helix/gateway/constant/MessageStatus.java | 24 -- .../helix/gateway/constant/MessageType.java | 24 -- .../HelixGatewayServiceGrpcService.java | 21 +- .../participant/HelixGatewayParticipant.java | 245 +++++++++++++ .../gateway/service/GatewayServiceEvent.java | 10 +- .../service/GatewayServiceManager.java | 88 +++-- .../service/GatewayServiceManagerFactory.java | 1 - .../gateway/service/HelixGatewayService.java | 110 ------ .../HelixGatewayMultiTopStateStateModel.java | 60 ++++ ...atewayMultiTopStateStateModelFactory.java} | 15 +- .../HelixGatewayOnlineOfflineStateModel.java | 83 ----- .../StateTransitionMessageTranslateUtil.java | 61 +++- .../src/main/proto/HelixGatewayService.proto | 3 +- .../TestHelixGatewayParticipant.java | 333 ++++++++++++++++++ .../service/TestGatewayServiceManager.java | 1 - ...stStateTransitionMessageTranslateUtil.java | 68 ++++ 18 files changed, 857 insertions(+), 313 deletions(-) rename helix-gateway/src/main/java/org/apache/helix/gateway/{ => api}/service/HelixGatewayServiceProcessor.java (60%) delete mode 100644 helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java delete mode 100644 helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java create mode 100644 helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java delete mode 100644 helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java create mode 100644 helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java rename helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/{HelixGatewayOnlineOfflineStateModelFactory.java => HelixGatewayMultiTopStateStateModelFactory.java} (63%) delete mode 100644 helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java create mode 100644 helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java create mode 100644 helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml index 8a7c50ed71..62e927d11f 100644 --- a/helix-gateway/pom.xml +++ b/helix-gateway/pom.xml @@ -86,6 +86,12 @@ org.apache.helix helix-core + + org.apache.helix + helix-core + test-jar + test + io.grpc grpc-services diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java similarity index 60% rename from helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java rename to helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java index 814cfb0d0f..fe57e69c92 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java @@ -1,4 +1,4 @@ -package org.apache.helix.gateway.service; +package org.apache.helix.gateway.api.service; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -19,11 +19,22 @@ * under the License. */ +import org.apache.helix.model.Message; + /** - * Translate from/to GRPC function call to Helix Gateway Service event. + * Helix Gateway Service Processor interface allows sending state transition messages to + * participants through service implementing this interface. */ public interface HelixGatewayServiceProcessor { - public boolean sendStateTransitionMessage( String instanceName); + /** + * Send a state transition message to a remote participant. + * + * @param instanceName the name of the participant + * @param currentState the current state of the shard + * @param message the message to send + */ + void sendStateTransitionMessage(String instanceName, String currentState, + Message message); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java deleted file mode 100644 index 528b28e2fb..0000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.apache.helix.gateway.constant; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -public enum MessageStatus { - SUCCESS, FAILURE -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java deleted file mode 100644 index 49619dec81..0000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.apache.helix.gateway.constant; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -public enum MessageType { - ADD, REMOVE, CHANGE_ROLE -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java index 09fe3d07b6..018d6591e4 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java @@ -26,9 +26,10 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.gateway.service.GatewayServiceEvent; import org.apache.helix.gateway.service.GatewayServiceManager; -import org.apache.helix.gateway.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; import org.apache.helix.gateway.util.PerKeyLockRegistry; import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; +import org.apache.helix.model.Message; import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage; @@ -59,8 +60,9 @@ public HelixGatewayServiceGrpcService(GatewayServiceManager manager) { /** * Grpc service end pint. * Application instances Report the state of the shard or result of transition request to the gateway service. - * @param responseObserver - * @return + * + * @param responseObserver the observer to send the response to the client + * @return the observer to receive the state of the shard or result of transition request */ @Override public StreamObserver report( @@ -92,17 +94,20 @@ public void onCompleted() { /** * Send state transition message to the instance. * The instance must already have established a connection to the gateway service. - * @param instanceName - * @return + * + * @param instanceName the instance name to send the message to + * @param currentState the current state of shard + * @param message the message to convert to the transition message */ @Override - public boolean sendStateTransitionMessage(String instanceName) { + public void sendStateTransitionMessage(String instanceName, String currentState, + Message message) { StreamObserver observer; observer = _observerMap.get(instanceName); if (observer != null) { - observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage()); + observer.onNext( + StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message)); } - return true; } private void updateObserver(String instanceName, String clusterName, diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java new file mode 100644 index 0000000000..6405529603 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java @@ -0,0 +1,245 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateTransitionError; + +/** + * HelixGatewayParticipant encapsulates the Helix Participant Manager and handles tracking the state + * of a remote participant connected to the Helix Gateway Service. It processes state transitions + * for the participant and updates the state of the participant's shards upon successful state + * transitions signaled by remote participant. + */ +public class HelixGatewayParticipant { + public static final String UNASSIGNED_STATE = "UNASSIGNED"; + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; + private final HelixManager _participantManager; + private final Map> _shardStateMap; + private final Map> _stateTransitionResultMap; + + private HelixGatewayParticipant(HelixGatewayServiceProcessor gatewayServiceProcessor, + HelixManager participantManager, Map> initialShardStateMap) { + _gatewayServiceProcessor = gatewayServiceProcessor; + _participantManager = participantManager; + _shardStateMap = initialShardStateMap; + _stateTransitionResultMap = new ConcurrentHashMap<>(); + } + + public void processStateTransitionMessage(Message message) throws Exception { + String transitionId = message.getMsgId(); + String resourceId = message.getResourceName(); + String shardId = message.getPartitionName(); + String toState = message.getToState(); + + try { + if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { + return; + } + + CompletableFuture future = new CompletableFuture<>(); + _stateTransitionResultMap.put(transitionId, future); + _gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(), + getCurrentState(resourceId, shardId), message); + + if (!future.get()) { + throw new Exception("Failed to transition to state " + toState); + } + + updateState(resourceId, shardId, toState); + } finally { + _stateTransitionResultMap.remove(transitionId); + } + } + + public void handleStateTransitionError(Message message, StateTransitionError error) { + // Remove the stateTransitionResultMap future for the message + String transitionId = message.getMsgId(); + String resourceId = message.getResourceName(); + String shardId = message.getPartitionName(); + + // Remove the future from the stateTransitionResultMap since we are no longer able + // to process the state transition due to participant manager either timing out + // or failing to process the state transition + _stateTransitionResultMap.remove(transitionId); + + // Set the replica state to ERROR + updateState(resourceId, shardId, HelixDefinedState.ERROR.name()); + + // Notify the HelixGatewayParticipantClient that it is in ERROR state + // TODO: We need a better way than sending the state transition with a toState of ERROR + } + + /** + * Get the instance name of the participant. + * + * @return participant instance name + */ + public String getInstanceName() { + return _participantManager.getInstanceName(); + } + + /** + * Completes the state transition with the given transitionId. + * + * @param transitionId the transitionId to complete + * @param isSuccess whether the state transition was successful + */ + public void completeStateTransition(String transitionId, boolean isSuccess) { + CompletableFuture future = _stateTransitionResultMap.get(transitionId); + if (future != null) { + future.complete(isSuccess); + } + } + + private boolean isCurrentStateAlreadyTarget(String resourceId, String shardId, + String targetState) { + return getCurrentState(resourceId, shardId).equals(targetState); + } + + @VisibleForTesting + public Map> getShardStateMap() { + return _shardStateMap; + } + + /** + * Get the current state of the shard. + * + * @param resourceId the resource id + * @param shardId the shard id + * @return the current state of the shard or DROPPED if it does not exist + */ + public String getCurrentState(String resourceId, String shardId) { + return getShardStateMap().getOrDefault(resourceId, Collections.emptyMap()) + .getOrDefault(shardId, UNASSIGNED_STATE); + } + + private void updateState(String resourceId, String shardId, String state) { + if (state.equals(HelixDefinedState.DROPPED.name())) { + getShardStateMap().computeIfPresent(resourceId, (k, v) -> { + v.remove(shardId); + if (v.isEmpty()) { + return null; + } + return v; + }); + } else { + getShardStateMap().computeIfAbsent(resourceId, k -> new ConcurrentHashMap<>()) + .put(shardId, state); + } + } + + public void disconnect() { + _participantManager.disconnect(); + } + + public static class Builder { + private final HelixGatewayServiceProcessor _helixGatewayServiceProcessor; + private final String _instanceName; + private final String _clusterName; + private final String _zkAddress; + private final List _multiTopStateModelDefinitions; + private final Map> _initialShardStateMap; + + public Builder(HelixGatewayServiceProcessor helixGatewayServiceProcessor, String instanceName, + String clusterName, String zkAddress) { + _helixGatewayServiceProcessor = helixGatewayServiceProcessor; + _instanceName = instanceName; + _clusterName = clusterName; + _zkAddress = zkAddress; + _multiTopStateModelDefinitions = new ArrayList<>(); + _initialShardStateMap = new ConcurrentHashMap<>(); + } + + /** + * Add a multi-top state model definition to the participant to be registered in the + * participant's state machine engine. + * + * @param stateModelDefinitionName the state model definition name to add (should be multi-top + * state model) + * @return the builder + */ + public Builder addMultiTopStateStateModelDefinition(String stateModelDefinitionName) { + // TODO: Add validation that the state model definition is a multi-top state model + _multiTopStateModelDefinitions.add(stateModelDefinitionName); + return this; + } + + /** + * Add initial shard state to the participant. This is used to initialize the participant with + * the initial state of the shards in order to reduce unnecessary state transitions from being + * forwarded to the participant. + * + * @param initialShardStateMap the initial shard state map to add + * @return the Builder + */ + public Builder setInitialShardState(Map> initialShardStateMap) { + // TODO: Add handling for shard states that where never assigned to the participant since + // the participant was last online. + // deep copy into the initialShardStateMap into concurrent hash map + initialShardStateMap.forEach((resourceId, shardStateMap) -> { + _initialShardStateMap.put(resourceId, new ConcurrentHashMap<>(shardStateMap)); + }); + + return this; + } + + /** + * Build the HelixGatewayParticipant. This will create a HelixManager for the participant and + * connect to the Helix cluster. The participant will be registered with the multi-top state + * model definitions and initialized with the initial shard state map. + * + * @return the HelixGatewayParticipant + */ + public HelixGatewayParticipant build() { + HelixManager participantManager = + new ZKHelixManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, _zkAddress); + HelixGatewayParticipant participant = + new HelixGatewayParticipant(_helixGatewayServiceProcessor, participantManager, + _initialShardStateMap); + _multiTopStateModelDefinitions.forEach( + stateModelDefinition -> participantManager.getStateMachineEngine() + .registerStateModelFactory(stateModelDefinition, + new HelixGatewayMultiTopStateStateModelFactory(participant))); + try { + participantManager.connect(); + } catch (Exception e) { + // TODO: When API for gracefully triggering disconnect from remote participant + // is available, we should call it here instead of throwing exception. + throw new RuntimeException(e); + } + return participant; + } + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java index 5745dbf032..b919429b91 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java @@ -40,20 +40,20 @@ public class GatewayServiceEvent { public static class StateTransitionResult { private String stateTransitionId; - private String stateTransitionStatus; + private boolean isSuccess; private String shardState; - public StateTransitionResult(String stateTransitionId, String stateTransitionStatus, String shardState) { + public StateTransitionResult(String stateTransitionId, boolean isSuccess, String shardState) { this.stateTransitionId = stateTransitionId; - this.stateTransitionStatus = stateTransitionStatus; + this.isSuccess = isSuccess; this.shardState = shardState; } public String getStateTransitionId() { return stateTransitionId; } - public String getStateTransitionStatus() { - return stateTransitionStatus; + public boolean getIsSuccess() { + return isSuccess; } public String getShardState() { return shardState; diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 830bb97c6b..85a274156b 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -19,14 +19,17 @@ * under the License. */ -import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.collect.ImmutableSet; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; import org.apache.helix.gateway.constant.GatewayServiceEventType; import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService; +import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.gateway.util.PerKeyBlockingExecutor; @@ -37,40 +40,35 @@ * 3. On init connect, create the participant manager * 4. For ST reply message, update the tracker */ - public class GatewayServiceManager { public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10; - private final Map _helixGatewayServiceMap; + public static final ImmutableSet SUPPORTED_MULTI_STATE_MODEL_TYPES = + ImmutableSet.of("OnlineOffline"); + private final Map> _helixGatewayParticipantMap; + private final String _zkAddress; // a single thread tp for event processing private final ExecutorService _participantStateTransitionResultUpdator; // link to grpc service - private final HelixGatewayServiceGrpcService _grpcService; + private final HelixGatewayServiceProcessor _gatewayServiceProcessor; // a per key executor for connection event. All event for the same instance will be executed in sequence. // It is used to ensure for each instance, the connect/disconnect event won't start until the previous one is done. private final PerKeyBlockingExecutor _connectionEventProcessor; public GatewayServiceManager() { - _helixGatewayServiceMap = new ConcurrentHashMap<>(); + _helixGatewayParticipantMap = new ConcurrentHashMap<>(); + _zkAddress = "foo"; _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); - _grpcService = new HelixGatewayServiceGrpcService(this); + _gatewayServiceProcessor = new HelixGatewayServiceGrpcService(this); _connectionEventProcessor = new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable } /** - * send state transition message to application instance - * @return - */ - public AtomicBoolean sendTransitionRequestToApplicationInstance() { - // TODO: add param - return null; - } - - /** - * Process the event from Grpc service + * Process the event from Grpc service and dispatch to async executor for processing. + * * @param event */ public void newGatewayServiceEvent(GatewayServiceEvent event) { @@ -86,20 +84,24 @@ public void newGatewayServiceEvent(GatewayServiceEvent event) { */ class shardStateUpdator implements Runnable { - GatewayServiceEvent _event; + private final GatewayServiceEvent _event; - public shardStateUpdator(GatewayServiceEvent event) { + private shardStateUpdator(GatewayServiceEvent event) { _event = event; } @Override public void run() { - HelixGatewayService helixGatewayService = _helixGatewayServiceMap.get(_event.getClusterName()); - if (helixGatewayService == null) { + HelixGatewayParticipant participant = + getHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName()); + if (participant == null) { // TODO: return error code and throw exception. return; } - helixGatewayService.receiveSTResponse(); + _event.getStateTransitionResult().forEach(stateTransitionResult -> { + participant.completeStateTransition(stateTransitionResult.getStateTransitionId(), + stateTransitionResult.getIsSuccess()); + }); } } @@ -108,33 +110,47 @@ public void run() { * It includes waiting for ZK connection, and also wait for previous LiveInstance to expire. */ class participantConnectionProcessor implements Runnable { - GatewayServiceEvent _event; + private final GatewayServiceEvent _event; - public participantConnectionProcessor(GatewayServiceEvent event) { + private participantConnectionProcessor(GatewayServiceEvent event) { _event = event; } @Override public void run() { - HelixGatewayService helixGatewayService; - _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(), - k -> new HelixGatewayService(GatewayServiceManager.this, _event.getClusterName())); - helixGatewayService = _helixGatewayServiceMap.get(_event.getClusterName()); if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) { - helixGatewayService.registerParticipant(); + createHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName(), + _event.getShardStateMap()); } else { - helixGatewayService.deregisterParticipant(_event.getClusterName(), _event.getInstanceName()); + removeHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName()); } } } - @VisibleForTesting - HelixGatewayServiceGrpcService getGrpcService() { - return _grpcService; + private void createHelixGatewayParticipant(String clusterName, String instanceName, + Map> initialShardStateMap) { + // Create and add the participant to the participant map + HelixGatewayParticipant.Builder participantBuilder = + new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, instanceName, clusterName, + _zkAddress).setInitialShardState(initialShardStateMap); + SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach( + participantBuilder::addMultiTopStateStateModelDefinition); + _helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new ConcurrentHashMap<>()) + .put(instanceName, participantBuilder.build()); + } + + private void removeHelixGatewayParticipant(String clusterName, String instanceName) { + // Disconnect and remove the participant from the participant map + HelixGatewayParticipant participant = getHelixGatewayParticipant(clusterName, instanceName); + if (participant != null) { + participant.disconnect(); + _helixGatewayParticipantMap.get(clusterName).remove(instanceName); + } } - @VisibleForTesting - HelixGatewayService getHelixGatewayService(String clusterName) { - return _helixGatewayServiceMap.get(clusterName); + private HelixGatewayParticipant getHelixGatewayParticipant(String clusterName, + String instanceName) { + return _helixGatewayParticipantMap.getOrDefault(clusterName, Collections.emptyMap()) + .get(instanceName); } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java index dce5a44a09..aed7518b93 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java @@ -23,7 +23,6 @@ * Factory class to create GatewayServiceManager */ public class GatewayServiceManagerFactory { - public GatewayServiceManager createGatewayServiceManager() { return new GatewayServiceManager(); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java deleted file mode 100644 index 2ef35820cb..0000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java +++ /dev/null @@ -1,110 +0,0 @@ -package org.apache.helix.gateway.service; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.helix.HelixManager; -import org.apache.helix.InstanceType; -import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory; -import org.apache.helix.manager.zk.ZKHelixManager; - - -/** - * A service object for each Helix cluster. - * This service object manages the Helix participants in the cluster. - */ -public class HelixGatewayService { - final private Map> _participantsMap; - - final private String _zkAddress; - private final GatewayServiceManager _gatewayServiceManager; - private Map> _flagMap; - public HelixGatewayService(GatewayServiceManager gatewayServiceManager, String zkAddress) { - _participantsMap = new ConcurrentHashMap<>(); - _zkAddress = zkAddress; - _gatewayServiceManager = gatewayServiceManager; - _flagMap = new ConcurrentHashMap<>(); - } - - public GatewayServiceManager getClusterManager() { - return _gatewayServiceManager; - } - - /** - * Register a participant to the Helix cluster. - * It creates a HelixParticipantManager and connects to the Helix controller. - */ - public void registerParticipant() { - // TODO: create participant manager and add to _participantsMap - HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress); - manager.getStateMachineEngine() - .registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_gatewayServiceManager)); - try { - manager.connect(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - /** - * Deregister a participant from the Helix cluster when app instance is gracefully stopped or connection lost. - * @param clusterName - * @param participantName - */ - public void deregisterParticipant(String clusterName, String participantName) { - HelixManager manager = _participantsMap.get(clusterName).remove(participantName); - if (manager != null) { - manager.disconnect(); - removeChannel(participantName); - } - } - - public void addChannel() { - // _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new ConcurrentHashMap<>()); - } - - public void removeChannel(String instanceName) { - _flagMap.remove(instanceName); - } - - public AtomicBoolean sendMessage() { - AtomicBoolean flag = new AtomicBoolean(false); - return flag; - } - - /** - * Entry point for receive the state transition response from the participant. - * It will update in memory state accordingly. - */ - public void receiveSTResponse() { - // AtomicBoolean flag = _flagMap.get(instanceName).remove(response.getMessageId()); - } - - /** - * Stop the HelixGatewayService. - * It stops all participants in the cluster. - */ - public void stop() { - // TODO: stop all participants - System.out.println("Stopping Helix Gateway Service"); - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java new file mode 100644 index 0000000000..37de51b420 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java @@ -0,0 +1,60 @@ +package org.apache.helix.gateway.statemodel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.NotificationContext; +import org.apache.helix.gateway.participant.HelixGatewayParticipant; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.StateTransitionError; +import org.apache.helix.participant.statemachine.Transition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@StateModelInfo(initialState = "OFFLINE", states = {}) +public class HelixGatewayMultiTopStateStateModel extends StateModel { + private static final Logger _logger = + LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class); + + private final HelixGatewayParticipant _helixGatewayParticipant; + + public HelixGatewayMultiTopStateStateModel( + HelixGatewayParticipant helixGatewayParticipant) { + _helixGatewayParticipant = helixGatewayParticipant; + } + + @Transition(to = "*", from = "*") + public void genericStateTransitionHandler(Message message, NotificationContext context) + throws Exception { + _helixGatewayParticipant.processStateTransitionMessage(message); + } + + @Override + public void reset() { + // no-op we don't want to start from init state again. + } + + @Override + public void rollbackOnError(Message message, NotificationContext context, + StateTransitionError error) { + _helixGatewayParticipant.handleStateTransitionError(message, error); + } +} \ No newline at end of file diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java similarity index 63% rename from helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java rename to helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java index 7550fef510..64662998e3 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java @@ -19,19 +19,20 @@ * under the License. */ -import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.participant.statemachine.StateModelFactory; -public class HelixGatewayOnlineOfflineStateModelFactory extends StateModelFactory { - private GatewayServiceManager _clusterManager; +public class HelixGatewayMultiTopStateStateModelFactory extends StateModelFactory { + private final HelixGatewayParticipant _helixGatewayParticipant; - public HelixGatewayOnlineOfflineStateModelFactory(GatewayServiceManager clusterManager) { - _clusterManager = clusterManager; + public HelixGatewayMultiTopStateStateModelFactory( + HelixGatewayParticipant helixGatewayParticipant) { + _helixGatewayParticipant = helixGatewayParticipant; } @Override - public HelixGatewayOnlineOfflineStateModel createNewStateModel(String resourceName, + public HelixGatewayMultiTopStateStateModel createNewStateModel(String resourceName, String partitionKey) { - return new HelixGatewayOnlineOfflineStateModel(resourceName, partitionKey, _clusterManager); + return new HelixGatewayMultiTopStateStateModel(_helixGatewayParticipant); } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java deleted file mode 100644 index 4585ea3780..0000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.apache.helix.gateway.statemodel; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.helix.NotificationContext; -import org.apache.helix.gateway.service.GatewayServiceManager; -import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.StateModel; - -public class HelixGatewayOnlineOfflineStateModel extends StateModel { - private boolean _firstTime = true; - private GatewayServiceManager _gatewayServiceManager; - - private String _resourceName; - private String _partitionKey; - - private AtomicBoolean _completed; - - public HelixGatewayOnlineOfflineStateModel(String resourceName, String partitionKey, - GatewayServiceManager gatewayServiceManager) { - _resourceName = resourceName; - _partitionKey = partitionKey; - _gatewayServiceManager = gatewayServiceManager; - } - - public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { - if (_firstTime) { - wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance()); - System.out.println( - "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with ADD for " - + message.getResourceName() + " processed"); - _firstTime = false; - } - wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance()); - System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName() - + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() + " processed"); - } - - public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { - wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance()); - System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName() - + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() + " processed"); - } - - public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { - wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance()); - System.out.println( - "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with REMOVE for " - + message.getResourceName() + " processed"); - } - - private void wait(AtomicBoolean completed) { - _completed = completed; - while (true) { - try { - if (_completed.get()) { - break; - } - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java index 383f38b58d..ecc6c95683 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java @@ -23,8 +23,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.helix.HelixDefinedState; import org.apache.helix.gateway.constant.GatewayServiceEventType; +import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.gateway.service.GatewayServiceEvent; +import org.apache.helix.model.Message; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage; @@ -33,27 +37,63 @@ public final class StateTransitionMessageTranslateUtil { + /** + * Determine the transition type based on the current state and the target state. + * + * @param currentState current state + * @param toState target state + * @return TransitionType + */ + public static HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType translateStatesToTransitionType( + String currentState, String toState) { + boolean isUnassigned = HelixGatewayParticipant.UNASSIGNED_STATE.equals(currentState); + boolean isToStateDropped = HelixDefinedState.DROPPED.name().equals(toState); - public static TransitionMessage translateSTMsgToTransitionMessage() { - return null; + if (isToStateDropped && !isUnassigned) { + return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD; + } + if (!isToStateDropped && isUnassigned) { + return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD; + } + return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE; + } + + /** + * Translate from Helix ST Message to Helix Gateway Service TransitionMessage. + * + * @param message Message + * @return TransitionMessage + */ + public static TransitionMessage translateSTMsgToTransitionMessage(Message message) { + return TransitionMessage.newBuilder().addRequest( + HelixGatewayServiceOuterClass.SingleTransitionMessage.newBuilder() + .setTransitionID(message.getMsgId()).setTransitionType( + translateStatesToTransitionType(message.getFromState(), message.getToState())) + .setResourceID(message.getResourceName()).setShardID(message.getPartitionName()) + .setTargetState(message.getToState()).build()).build(); } /** * Translate from user sent ShardStateMessage message to Helix Gateway Service event. + * + * @param request ShardStateMessage message + * contains the state of each shard upon connection or result of state transition request. + * @return GatewayServiceEvent */ public static GatewayServiceEvent translateShardStateMessageToEvent(ShardStateMessage request) { - GatewayServiceEvent.GateWayServiceEventBuilder builder; if (request.hasShardState()) { // init connection to gateway service ShardState shardState = request.getShardState(); - Map shardStateMap = new HashMap<>(); + Map> shardStateMap = new HashMap<>(); for (HelixGatewayServiceOuterClass.SingleResourceState resourceState : shardState.getResourceStateList()) { for (HelixGatewayServiceOuterClass.SingleShardState state : resourceState.getShardStatesList()) { - shardStateMap.put(resourceState.getResource() + "_" + state.getShardName(), state.getCurrentState()); + shardStateMap.computeIfAbsent(resourceState.getResource(), k -> new HashMap<>()) + .put(state.getShardName(), state.getCurrentState()); } } builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName( - shardState.getClusterName()).setParticipantName(shardState.getInstanceName()); + shardState.getClusterName()).setParticipantName(shardState.getInstanceName()) + .setShardStateMap(shardStateMap); } else { ShardTransitionStatus shardTransitionStatus = request.getShardTransitionStatus(); // this is status update for established connection @@ -63,7 +103,7 @@ public static GatewayServiceEvent translateShardStateMessageToEvent(ShardStateMe for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus shardTransition : status) { GatewayServiceEvent.StateTransitionResult result = new GatewayServiceEvent.StateTransitionResult(shardTransition.getTransitionID(), - shardTransition.getCurrentState(), shardTransition.getCurrentState()); + shardTransition.getIsSuccess(), shardTransition.getCurrentState()); stResult.add(result); } builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName( @@ -75,9 +115,12 @@ public static GatewayServiceEvent translateShardStateMessageToEvent(ShardStateMe } /** - * Translate termination event to GatewayServiceEvent. + * Translate from client close to Helix Gateway Service event. + * + * @param instanceName the instance name to send the message to + * @param clusterName the cluster name + * @return GatewayServiceEvent */ - public static GatewayServiceEvent translateClientCloseToEvent(String instanceName, String clusterName) { GatewayServiceEvent.GateWayServiceEventBuilder builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.DISCONNECT).setClusterName( diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto b/helix-gateway/src/main/proto/HelixGatewayService.proto index d32423a34d..e7db5473e9 100644 --- a/helix-gateway/src/main/proto/HelixGatewayService.proto +++ b/helix-gateway/src/main/proto/HelixGatewayService.proto @@ -30,11 +30,10 @@ message SingleTransitionMessage { TransitionType transitionType = 2; // Transition type for shard operations string resourceID = 3; // Resource ID string shardID = 4; // Shard to perform operation - optional string startState = 5; // Shard start state string targetState = 6; // Shard target state. } -message TransitionMessage{ +message TransitionMessage { repeated SingleTransitionMessage request = 1; } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java new file mode 100644 index 0000000000..fda7fbb1be --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java @@ -0,0 +1,333 @@ +package org.apache.helix.gateway.participant; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +public class TestHelixGatewayParticipant extends ZkTestBase { + private static final String CLUSTER_NAME = TestHelixGatewayParticipant.class.getSimpleName(); + private static final int START_NUM_NODE = 2; + private static final String TEST_DB = "TestDB"; + private static final String TEST_STATE_MODEL = "OnlineOffline"; + private static final String CONTROLLER_PREFIX = "controller"; + private static final String PARTICIPANT_PREFIX = "participant"; + + private ZkHelixClusterVerifier _clusterVerifier; + private ClusterControllerManager _controller; + private int _nextStartPort = 12000; + private final List _participants = Lists.newArrayList(); + private final Map _pendingMessageMap = new ConcurrentHashMap<>(); + + @BeforeClass + public void beforeClass() { + // Set up the Helix cluster + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + _gSetupTool.addCluster(CLUSTER_NAME, true); + + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.getRecord().setSimpleField(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true"); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + // Start initial participants + for (int i = 0; i < START_NUM_NODE; i++) { + addParticipant(); + } + + // Start the controller + String controllerName = CONTROLLER_PREFIX + '_' + CLUSTER_NAME; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // Enable best possible assignment persistence + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + } + + @AfterClass + public void afterClass() { + // Clean up by disconnecting the controller and participants + _controller.disconnect(); + for (HelixGatewayParticipant participant : _participants) { + participant.disconnect(); + } + } + + /** + * Add a participant with a specific initial state map. + */ + private HelixGatewayParticipant addParticipant(String participantName, + Map> initialShardMap) { + HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder( + new MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName, CLUSTER_NAME, + ZK_ADDR).addMultiTopStateStateModelDefinition(TEST_STATE_MODEL) + .setInitialShardState(initialShardMap).build(); + _participants.add(participant); + return participant; + } + + /** + * Add a participant with an empty initial state map. + */ + private HelixGatewayParticipant addParticipant() { + String participantName = PARTICIPANT_PREFIX + "_" + _nextStartPort++; + return addParticipant(participantName, Collections.emptyMap()); + } + + /** + * Remove a participant from the cluster. + */ + private void deleteParticipant(HelixGatewayParticipant participant) { + participant.disconnect(); + _participants.remove(participant); + } + + /** + * Add a participant to the IdealState's preference list. + */ + private void addToPreferenceList(HelixGatewayParticipant participant) { + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); + idealState.getPreferenceLists().values() + .forEach(preferenceList -> preferenceList.add(participant.getInstanceName())); + idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas()) + 1)); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, TEST_DB, idealState); + } + + /** + * Remove a participant from the IdealState's preference list. + */ + private void removeFromPreferenceList(HelixGatewayParticipant participant) { + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); + idealState.getPreferenceLists().values() + .forEach(preferenceList -> preferenceList.remove(participant.getInstanceName())); + idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas()) - 1)); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, TEST_DB, idealState); + } + + /** + * Create a test database in the cluster with a semi-automatic state model. + */ + private void createDB() { + createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, + _participants.stream().map(HelixGatewayParticipant::getInstanceName) + .collect(Collectors.toList()), TEST_STATE_MODEL, 1, _participants.size()); + + _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(new HashSet<>( + _gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME))) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + } + + /** + * Retrieve a pending message for a specific participant. + */ + private Message getPendingMessage(String instanceName) { + return _pendingMessageMap.get(instanceName); + } + + /** + * Process the pending message for a participant. + */ + private void processPendingMessage(HelixGatewayParticipant participant, boolean isSuccess) { + Message message = _pendingMessageMap.remove(participant.getInstanceName()); + participant.completeStateTransition(message.getMsgId(), isSuccess); + } + + /** + * Get the current state of a Helix shard. + */ + private String getHelixCurrentState(String instanceName, String resourceName, + String shardId) { + return _gSetupTool.getClusterManagementTool() + .getResourceExternalView(CLUSTER_NAME, resourceName).getStateMap(shardId) + .getOrDefault(instanceName, HelixGatewayParticipant.UNASSIGNED_STATE); + } + + /** + * Verify that all specified participants have pending messages. + */ + private void verifyPendingMessages(List participants) throws Exception { + Assert.assertTrue(TestHelper.verify(() -> participants.stream() + .allMatch(participant -> getPendingMessage(participant.getInstanceName()) != null), + TestHelper.WAIT_DURATION)); + } + + /** + * Verify that the gateway state matches the Helix state for all participants. + */ + private void verifyGatewayStateMatchesHelixState() throws Exception { + Assert.assertTrue(TestHelper.verify(() -> _participants.stream().allMatch(participant -> { + String instanceName = participant.getInstanceName(); + for (String resourceName : _gSetupTool.getClusterManagementTool() + .getResourcesInCluster(CLUSTER_NAME)) { + for (String shardId : _gSetupTool.getClusterManagementTool() + .getResourceIdealState(CLUSTER_NAME, resourceName).getPartitionSet()) { + String helixCurrentState = + getHelixCurrentState(instanceName, resourceName, shardId); + if (!participant.getCurrentState(resourceName, shardId).equals(helixCurrentState)) { + return false; + } + } + } + return true; + }), TestHelper.WAIT_DURATION)); + } + + /** + * Verify that all shards for a given instance are in a specific state. + */ + private void verifyHelixPartitionStates(String instanceName, String state) throws Exception { + Assert.assertTrue(TestHelper.verify(() -> { + for (String resourceName : _gSetupTool.getClusterManagementTool() + .getResourcesInCluster(CLUSTER_NAME)) { + for (String shardId : _gSetupTool.getClusterManagementTool() + .getResourceIdealState(CLUSTER_NAME, resourceName).getPartitionSet()) { + if (!getHelixCurrentState(instanceName, resourceName, shardId).equals(state)) { + return false; + } + } + } + return true; + }, TestHelper.WAIT_DURATION)); + } + + @Test + public void testProcessStateTransitionMessageSuccess() throws Exception { + createDB(); + verifyPendingMessages(_participants); + + // Verify that all pending messages have the toState "ONLINE" + for (HelixGatewayParticipant participant : _participants) { + Message message = getPendingMessage(participant.getInstanceName()); + Assert.assertNotNull(message); + Assert.assertEquals(message.getToState(), "ONLINE"); + } + + // Process all pending messages successfully + for (HelixGatewayParticipant participant : _participants) { + processPendingMessage(participant, true); + } + + // Verify that the cluster converges and all states are "ONLINE" + Assert.assertTrue(_clusterVerifier.verify()); + verifyGatewayStateMatchesHelixState(); + } + + @Test(dependsOnMethods = "testProcessStateTransitionMessageSuccess") + public void testProcessStateTransitionMessageFailure() throws Exception { + // Add a new participant and include it in the preference list + HelixGatewayParticipant participant = addParticipant(); + addToPreferenceList(participant); + verifyPendingMessages(List.of(participant)); + + // Verify the pending message has the toState "ONLINE" + Message message = getPendingMessage(participant.getInstanceName()); + Assert.assertNotNull(message); + Assert.assertEquals(message.getToState(), "ONLINE"); + + // Process the message with failure + processPendingMessage(participant, false); + + // Verify that the cluster converges and states reflect the failure (e.g., "OFFLINE") + Assert.assertTrue(_clusterVerifier.verify()); + verifyGatewayStateMatchesHelixState(); + + // Remove the participant from the preference list and delete it + removeFromPreferenceList(participant); + deleteParticipant(participant); + Assert.assertTrue(_clusterVerifier.verify()); + } + + @Test(dependsOnMethods = "testProcessStateTransitionMessageFailure") + public void testProcessStateTransitionAfterReconnect() throws Exception { + // Remove the first participant + HelixGatewayParticipant participant = _participants.get(0); + deleteParticipant(participant); + + // Verify the Helix state transitions to "UNASSIGNED_STATE" for the participant + verifyHelixPartitionStates(participant.getInstanceName(), + HelixGatewayParticipant.UNASSIGNED_STATE); + + // Re-add the participant with its initial state + addParticipant(participant.getInstanceName(), participant.getShardStateMap()); + Assert.assertTrue(_clusterVerifier.verify()); + + // Verify the Helix state is "ONLINE" + verifyHelixPartitionStates(participant.getInstanceName(), "ONLINE"); + } + + @Test(dependsOnMethods = "testProcessStateTransitionAfterReconnect") + public void testProcessStateTransitionAfterReconnectAfterDroppingPartition() throws Exception { + // Remove the first participant and verify state + HelixGatewayParticipant participant = _participants.get(0); + deleteParticipant(participant); + verifyHelixPartitionStates(participant.getInstanceName(), + HelixGatewayParticipant.UNASSIGNED_STATE); + + // Remove shard preference and re-add the participant + removeFromPreferenceList(participant); + HelixGatewayParticipant participantReplacement = + addParticipant(participant.getInstanceName(), participant.getShardStateMap()); + verifyPendingMessages(List.of(participantReplacement)); + + // Process the pending message successfully + processPendingMessage(participantReplacement, true); + + // Verify that the cluster converges and states are correctly updated to "ONLINE" + Assert.assertTrue(_clusterVerifier.verify()); + verifyGatewayStateMatchesHelixState(); + } + + public static class MockHelixGatewayServiceProcessor implements HelixGatewayServiceProcessor { + private final Map _pendingMessageMap; + + public MockHelixGatewayServiceProcessor(Map pendingMessageMap) { + _pendingMessageMap = pendingMessageMap; + } + + @Override + public void sendStateTransitionMessage(String instanceName, String currentState, + Message message) { + _pendingMessageMap.put(instanceName, message); + } + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java index 01b78593ca..a345f008e2 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java @@ -40,7 +40,6 @@ public void testConnectionAndDisconnectionEvents() { // Process disconnection event grpcService.report(null).onNext(disconnectionEvent); - HelixGatewayService gatewayService = manager.getHelixGatewayService("cluster1"); // Verify the events were processed in sequence verify(manager, times(2)).newGatewayServiceEvent(any()); } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java new file mode 100644 index 0000000000..34e23d0b0c --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java @@ -0,0 +1,68 @@ +package org.apache.helix.gateway.utils;/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixDefinedState; + +import org.apache.helix.gateway.participant.HelixGatewayParticipant; +import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; +import org.testng.Assert; +import org.testng.annotations.Test; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; + +public class TestStateTransitionMessageTranslateUtil { + + @Test + public void testTranslateStatesToTransitionType_DeleteShard() { + String currentState = "ONLINE"; + String toState = HelixDefinedState.DROPPED.name(); + + HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result = + StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState); + + Assert.assertEquals(result, + HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD, + "Expected DELETE_SHARD when transitioning to DROPPED state from a non-DROPPED state."); + } + + @Test + public void testTranslateStatesToTransitionType_AddShard() { + String currentState = HelixGatewayParticipant.UNASSIGNED_STATE; + String toState = "ONLINE"; + + HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result = + StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState); + + Assert.assertEquals(result, + HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD, + "Expected ADD_SHARD when transitioning from DROPPED state to a non-DROPPED state."); + } + + @Test + public void testTranslateStatesToTransitionType_ChangeRole() { + String currentState = "ONLINE"; + String toState = "OFFLINE"; + + HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result = + StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState); + + Assert.assertEquals(result, + HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE, + "Expected CHANGE_ROLE when transitioning between non-DROPPED states."); + } +}