Skip to content

Commit

Permalink
Gateway service - service structure dummy class (#2840)
Browse files Browse the repository at this point in the history
Gateway service - service structure dummy class
  • Loading branch information
xyuanlu authored Jul 23, 2024
1 parent 71b4a9a commit cf6a202
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.helix.gateway;

/**
* Main class for Helix Gateway.
* It starts the Helix Gateway grpc service.
*/
public final class HelixGatewayMain {

private HelixGatewayMain() {
}

public static void main(String[] args) throws InterruptedException {

}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
package org.apache.helix.gateway.grpcservice;

import io.grpc.stub.StreamObserver;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
import proto.org.apache.helix.gateway.*;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;

public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase {
import java.util.Map;


/**
* Helix Gateway Service GRPC UI implementation.
*/
public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
implements HelixGatewayServiceProcessor {

Map<String, StreamObserver<TransitionMessage>> _observerMap =
new ConcurrentHashMap<String, StreamObserver<TransitionMessage>>();

@Override
public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report(
Expand All @@ -16,6 +29,12 @@ public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterCla
public void onNext(ShardStateMessage request) {
// called when a client sends a message
//....
String instanceName = request.getInstanceName();
if (!_observerMap.containsValue(instanceName)) {
// update state map
updateObserver(instanceName, responseObserver);
}
// process the message
}

@Override
Expand All @@ -31,4 +50,18 @@ public void onCompleted() {
}
};
}

@Override
public boolean sendStateTransitionMessage(String instanceName) {
return false;
}

@Override
public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent event) {

}

public void updateObserver(String instanceName, StreamObserver<TransitionMessage> streamObserver) {
_observerMap.put(instanceName, streamObserver);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.apache.helix.gateway.service;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.gateway.grpcservice.HelixGatewayServiceService;


/**
* A top layer class that send/receive messages from Grpc end point, and dispatch them to corrsponding gateway services.
* 1. get event from Grpc service
* 2. Maintain a gateway service registry, one gateway service maps to one Helix cluster
* 3. On init connect, create the participant manager
* 4. For ST reply message, update the tracker
*/

public class GatewayServiceManager {

HelixGatewayServiceService _helixGatewayServiceService;

HelixGatewayServiceProcessor _helixGatewayServiceProcessor;

Map<String, HelixGatewayService> _helixGatewayServiceMap;

// TODO: add thread pool for init
// single thread tp for update

public enum EventType {
CONNECT, // init connection to gateway service
UPDATE, // update state transition result
DISCONNECT // shutdown connection to gateway service.
}

public class GateWayServiceEvent {
// event type
EventType eventType;
// event data
String clusterName;
String participantName;

// todo: add more fields
}

public GatewayServiceManager() {
_helixGatewayServiceMap = new ConcurrentHashMap<>();
}

public AtomicBoolean sendTransitionRequestToApplicationInstance() {

return null;
}

public void updateShardState() {

}

public void newParticipantConnecting() {

}

public void participantDisconnected() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.helix.gateway.service;

/**
* Factory class to create GatewayServiceManager
*/
public class GatewayServiceManagerFactory {

public GatewayServiceManager createGatewayServiceManager() {
return new GatewayServiceManager();
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
package org.apache.helix.gateway.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory;
import org.apache.helix.manager.zk.ZKHelixManager;


/**
* A service object for each Helix cluster.
* This service object manages the Helix participants in the cluster.
*/
public class HelixGatewayService {
final private Map<String, Map<String, HelixManager>> _participantsMap;

final private String _zkAddress;
private final ClusterManager _clusterManager;

public HelixGatewayService(String zkAddress) {
private final GatewayServiceManager _gatewayServiceManager;
private Map<String, Map<String, AtomicBoolean>> _flagMap;
public HelixGatewayService(GatewayServiceManager gatewayServiceManager, String zkAddress) {
_participantsMap = new ConcurrentHashMap<>();
_zkAddress = zkAddress;
_clusterManager = new ClusterManager();
_gatewayServiceManager = gatewayServiceManager;
_flagMap = new ConcurrentHashMap<>();
}

public ClusterManager getClusterManager() {
return _clusterManager;
public GatewayServiceManager getClusterManager() {
return _gatewayServiceManager;
}

public void start() {
Expand All @@ -32,9 +40,8 @@ public void registerParticipant() {
// TODO: create participant manager and add to _participantsMap
HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress);
manager.getStateMachineEngine()
.registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_clusterManager));
.registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_gatewayServiceManager));
try {
_clusterManager.addChannel();
manager.connect();
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -45,11 +52,36 @@ public void deregisterParticipant(String clusterName, String participantName) {
HelixManager manager = _participantsMap.get(clusterName).remove(participantName);
if (manager != null) {
manager.disconnect();
_clusterManager.removeChannel(participantName);
removeChannel(participantName);
}
}

public void addChannel() {
// _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new ConcurrentHashMap<>());
}

public void removeChannel(String instanceName) {
_flagMap.remove(instanceName);
}

public AtomicBoolean sendMessage() {
AtomicBoolean flag = new AtomicBoolean(false);
return flag;
}

public void receiveSTResponse() {
// AtomicBoolean flag = _flagMap.get(instanceName).remove(response.getMessageId());
}

public void newParticipantConnecting(){

}

public void participantDisconnected(){

}

public void stop() {
System.out.println("Stoping Helix Gateway Service");
System.out.println("Stopping Helix Gateway Service");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.helix.gateway.service;

/**
* Translate from/to GRPC function call to Helix Gateway Service event.
*/
public interface HelixGatewayServiceProcessor {

public boolean sendStateTransitionMessage(String instanceName);

public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public class ReplicaStateTracker {

boolean compareTargetState(){
boolean compareTargetState() {
return true;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,47 @@

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.NotificationContext;
import org.apache.helix.gateway.service.ClusterManager;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;

public class HelixGatewayOnlineOfflineStateModel extends StateModel {
private boolean _firstTime = true;
private ClusterManager _clusterManager;
private GatewayServiceManager _gatewayServiceManager;

private String _resourceName;
private String _partitionKey;

private AtomicBoolean _completed;

public HelixGatewayOnlineOfflineStateModel(String resourceName, String partitionKey,
ClusterManager clusterManager) {
GatewayServiceManager gatewayServiceManager) {
_resourceName = resourceName;
_partitionKey = partitionKey;
_clusterManager = clusterManager;
_gatewayServiceManager = gatewayServiceManager;
}

public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
if (_firstTime) {
wait(_clusterManager.sendMessage());
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
System.out.println(
"Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with ADD for "
+ message.getResourceName() + " processed");
_firstTime = false;
}
wait(_clusterManager.sendMessage());
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
+ " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() + " processed");
}

public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
wait(_clusterManager.sendMessage());
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName()
+ " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() + " processed");
}

public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
wait(_clusterManager.sendMessage());
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
System.out.println(
"Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with REMOVE for "
+ message.getResourceName() + " processed");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.apache.helix.gateway.statemodel;

import org.apache.helix.gateway.service.ClusterManager;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;

public class HelixGatewayOnlineOfflineStateModelFactory extends StateModelFactory<HelixGatewayOnlineOfflineStateModel> {
private ClusterManager _clusterManager;
private GatewayServiceManager _clusterManager;

public HelixGatewayOnlineOfflineStateModelFactory(ClusterManager clusterManager) {
public HelixGatewayOnlineOfflineStateModelFactory(GatewayServiceManager clusterManager) {
_clusterManager = clusterManager;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.helix.gateway.util;

import org.apache.helix.gateway.service.GatewayServiceManager;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;


public final class StateTransitionMessageTranslateUtil {

public static TransitionMessage translateSTMsgToProto() {
return null;
}

public static GatewayServiceManager.GateWayServiceEvent translateProtoToSTMsg(ShardStateMessage message) {
return null;
}
}
Loading

0 comments on commit cf6a202

Please sign in to comment.