diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java new file mode 100644 index 0000000000..c08eb618db --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java @@ -0,0 +1,15 @@ +package org.apache.helix.gateway; + +/** + * Main class for Helix Gateway. + * It starts the Helix Gateway grpc service. + */ +public final class HelixGatewayMain { + + private HelixGatewayMain() { + } + + public static void main(String[] args) throws InterruptedException { + + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java index 286cf20010..237f1f2721 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java @@ -1,10 +1,23 @@ package org.apache.helix.gateway.grpcservice; import io.grpc.stub.StreamObserver; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.service.HelixGatewayServiceProcessor; import proto.org.apache.helix.gateway.*; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*; -public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase { +import java.util.Map; + + +/** + * Helix Gateway Service GRPC UI implementation. + */ +public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase + implements HelixGatewayServiceProcessor { + + Map> _observerMap = + new ConcurrentHashMap>(); @Override public StreamObserver report( @@ -16,6 +29,12 @@ public StreamObserver streamObserver) { + _observerMap.put(instanceName, streamObserver); + } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java deleted file mode 100644 index b96694f8b7..0000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.helix.gateway.service; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - - -public class ClusterManager { - private Map> _flagMap; - private Lock _lock = new ReentrantLock(); - - // event queue - // state tracker, call tracker.update - - public ClusterManager() { - _flagMap = new ConcurrentHashMap<>(); - } - - public void addChannel() { - } - - public void removeChannel(String instanceName) { - _flagMap.remove(instanceName); - } - - public AtomicBoolean sendMessage() { - AtomicBoolean flag = new AtomicBoolean(false); - return flag; - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java new file mode 100644 index 0000000000..ee803b9cc0 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -0,0 +1,65 @@ +package org.apache.helix.gateway.service; + +import java.util.Map; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.helix.gateway.grpcservice.HelixGatewayServiceService; + + +/** + * A top layer class that send/receive messages from Grpc end point, and dispatch them to corrsponding gateway services. + * 1. get event from Grpc service + * 2. Maintain a gateway service registry, one gateway service maps to one Helix cluster + * 3. On init connect, create the participant manager + * 4. For ST reply message, update the tracker + */ + +public class GatewayServiceManager { + + HelixGatewayServiceService _helixGatewayServiceService; + + HelixGatewayServiceProcessor _helixGatewayServiceProcessor; + + Map _helixGatewayServiceMap; + + // TODO: add thread pool for init + // single thread tp for update + + public enum EventType { + CONNECT, // init connection to gateway service + UPDATE, // update state transition result + DISCONNECT // shutdown connection to gateway service. + } + + public class GateWayServiceEvent { + // event type + EventType eventType; + // event data + String clusterName; + String participantName; + + // todo: add more fields + } + + public GatewayServiceManager() { + _helixGatewayServiceMap = new ConcurrentHashMap<>(); + } + + public AtomicBoolean sendTransitionRequestToApplicationInstance() { + + return null; + } + + public void updateShardState() { + + } + + public void newParticipantConnecting() { + + } + + public void participantDisconnected() { + + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java new file mode 100644 index 0000000000..4bab8f0b7f --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java @@ -0,0 +1,11 @@ +package org.apache.helix.gateway.service; + +/** + * Factory class to create GatewayServiceManager + */ +public class GatewayServiceManagerFactory { + + public GatewayServiceManager createGatewayServiceManager() { + return new GatewayServiceManager(); + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java index 12810f80d6..49f1bbf2cf 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java @@ -1,27 +1,35 @@ package org.apache.helix.gateway.service; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; + +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory; import org.apache.helix.manager.zk.ZKHelixManager; +/** + * A service object for each Helix cluster. + * This service object manages the Helix participants in the cluster. + */ public class HelixGatewayService { final private Map> _participantsMap; final private String _zkAddress; - private final ClusterManager _clusterManager; - - public HelixGatewayService(String zkAddress) { + private final GatewayServiceManager _gatewayServiceManager; + private Map> _flagMap; + public HelixGatewayService(GatewayServiceManager gatewayServiceManager, String zkAddress) { _participantsMap = new ConcurrentHashMap<>(); _zkAddress = zkAddress; - _clusterManager = new ClusterManager(); + _gatewayServiceManager = gatewayServiceManager; + _flagMap = new ConcurrentHashMap<>(); } - public ClusterManager getClusterManager() { - return _clusterManager; + public GatewayServiceManager getClusterManager() { + return _gatewayServiceManager; } public void start() { @@ -32,9 +40,8 @@ public void registerParticipant() { // TODO: create participant manager and add to _participantsMap HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress); manager.getStateMachineEngine() - .registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_clusterManager)); + .registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_gatewayServiceManager)); try { - _clusterManager.addChannel(); manager.connect(); } catch (Exception e) { throw new RuntimeException(e); @@ -45,11 +52,36 @@ public void deregisterParticipant(String clusterName, String participantName) { HelixManager manager = _participantsMap.get(clusterName).remove(participantName); if (manager != null) { manager.disconnect(); - _clusterManager.removeChannel(participantName); + removeChannel(participantName); } } + public void addChannel() { + // _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new ConcurrentHashMap<>()); + } + + public void removeChannel(String instanceName) { + _flagMap.remove(instanceName); + } + + public AtomicBoolean sendMessage() { + AtomicBoolean flag = new AtomicBoolean(false); + return flag; + } + + public void receiveSTResponse() { + // AtomicBoolean flag = _flagMap.get(instanceName).remove(response.getMessageId()); + } + + public void newParticipantConnecting(){ + + } + + public void participantDisconnected(){ + + } + public void stop() { - System.out.println("Stoping Helix Gateway Service"); + System.out.println("Stopping Helix Gateway Service"); } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java new file mode 100644 index 0000000000..d419a71b57 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java @@ -0,0 +1,11 @@ +package org.apache.helix.gateway.service; + +/** + * Translate from/to GRPC function call to Helix Gateway Service event. + */ +public interface HelixGatewayServiceProcessor { + + public boolean sendStateTransitionMessage(String instanceName); + + public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent event); +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java index 1e7c16e388..3df3b00dbd 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java @@ -2,7 +2,7 @@ public class ReplicaStateTracker { - boolean compareTargetState(){ + boolean compareTargetState() { return true; } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java deleted file mode 100644 index ae2dbfe947..0000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java +++ /dev/null @@ -1,4 +0,0 @@ -package org.apache.helix.gateway.service; - -public class StateTransitionMessageTranslator { -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java index 5c95feb38a..1f2846c6c3 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java @@ -2,13 +2,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.helix.NotificationContext; -import org.apache.helix.gateway.service.ClusterManager; +import org.apache.helix.gateway.service.GatewayServiceManager; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModel; public class HelixGatewayOnlineOfflineStateModel extends StateModel { private boolean _firstTime = true; - private ClusterManager _clusterManager; + private GatewayServiceManager _gatewayServiceManager; private String _resourceName; private String _partitionKey; @@ -16,33 +16,33 @@ public class HelixGatewayOnlineOfflineStateModel extends StateModel { private AtomicBoolean _completed; public HelixGatewayOnlineOfflineStateModel(String resourceName, String partitionKey, - ClusterManager clusterManager) { + GatewayServiceManager gatewayServiceManager) { _resourceName = resourceName; _partitionKey = partitionKey; - _clusterManager = clusterManager; + _gatewayServiceManager = gatewayServiceManager; } public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { if (_firstTime) { - wait(_clusterManager.sendMessage()); + wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance()); System.out.println( "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with ADD for " + message.getResourceName() + " processed"); _firstTime = false; } - wait(_clusterManager.sendMessage()); + wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance()); System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() + " processed"); } public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { - wait(_clusterManager.sendMessage()); + wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance()); System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() + " processed"); } public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { - wait(_clusterManager.sendMessage()); + wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance()); System.out.println( "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with REMOVE for " + message.getResourceName() + " processed"); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java index 5db789112a..b7e06051ea 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java @@ -1,12 +1,13 @@ package org.apache.helix.gateway.statemodel; -import org.apache.helix.gateway.service.ClusterManager; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModel; import org.apache.helix.participant.statemachine.StateModelFactory; public class HelixGatewayOnlineOfflineStateModelFactory extends StateModelFactory { - private ClusterManager _clusterManager; + private GatewayServiceManager _clusterManager; - public HelixGatewayOnlineOfflineStateModelFactory(ClusterManager clusterManager) { + public HelixGatewayOnlineOfflineStateModelFactory(GatewayServiceManager clusterManager) { _clusterManager = clusterManager; } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java new file mode 100644 index 0000000000..7f0c592c27 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java @@ -0,0 +1,17 @@ +package org.apache.helix.gateway.util; + +import org.apache.helix.gateway.service.GatewayServiceManager; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage; + + +public final class StateTransitionMessageTranslateUtil { + + public static TransitionMessage translateSTMsgToProto() { + return null; + } + + public static GatewayServiceManager.GateWayServiceEvent translateProtoToSTMsg(ShardStateMessage message) { + return null; + } +} diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto b/helix-gateway/src/main/proto/HelixGatewayService.proto index c82431123f..dbf488b5af 100644 --- a/helix-gateway/src/main/proto/HelixGatewayService.proto +++ b/helix-gateway/src/main/proto/HelixGatewayService.proto @@ -26,26 +26,25 @@ message SingleShardTransitionStatus { optional string currentState = 3; // If it failed, what is the current state it should reported as. } -// resource has list of replica - message SingleResourceState { - string resource = 1; - repeated SingleShardState SingleReplicaState= 2; + string resource = 1; // name of the resource + repeated SingleShardState SingleShardState = 2; // State of each shard } message SingleShardState { - string shardaName = 1; - string currentState = 2; + string shardName = 1; // Name of the shard + string currentState = 2; // Current state of the shard } // message ShardStateMessage{ - repeated SingleShardTransitionStatus replicaStateST = 1; - repeated SingleShardState shardState = 2; + string instanceName = 1; // Name of the application instance + string clusterName = 2; // Name of the cluster to connect to + repeated SingleShardTransitionStatus shardStatus = 3; // state transition result for a shard + repeated SingleShardState shardState = 4; // State of each shard, only reported upon init connection } - service HelixGatewayService { rpc report(stream ShardStateMessage) returns (stream TransitionMessage) {} }