Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Helix ST handling logic and HelixGatewayParticipant #2845

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions helix-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.helix.gateway.constant;
package org.apache.helix.gateway.api.participant;

/*
* Licensed to the Apache Software Foundation (ASF) under one
Expand All @@ -19,6 +19,26 @@
* under the License.
*/

public enum MessageStatus {
SUCCESS, FAILURE
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateTransitionError;

/**
* Process state transition message.
*/
public interface HelixStateTransitionProcessor {
zpinto marked this conversation as resolved.
Show resolved Hide resolved
/**
* Process state transition message.
* @param message state transition message
* @throws Exception if failed to process the message
*/
void processStateTransitionMessage(Message message) throws Exception;

/**
* Handle state transition error. This results from state transition handler throwing an exception or
* timing out.
*
* @param message state transition message
* @param error state transition error
*/
void handleStateTransitionError(Message message, StateTransitionError error);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.helix.gateway.service;
package org.apache.helix.gateway.api.service;

/*
* Licensed to the Apache Software Foundation (ASF) under one
Expand All @@ -19,11 +19,22 @@
* under the License.
*/

import org.apache.helix.model.Message;

/**
* Translate from/to GRPC function call to Helix Gateway Service event.
* Helix Gateway Service Processor interface allows sending state transition messages to
* participants through service implementing this interface.
*/
public interface HelixGatewayServiceProcessor {

public boolean sendStateTransitionMessage( String instanceName);
/**
* Send a state transition message to a remote participant.
*
* @param instanceName the name of the participant
* @param currentState the current state of the participant
zpinto marked this conversation as resolved.
Show resolved Hide resolved
* @param message the message to send
*/
void sendStateTransitionMessage(String instanceName, String currentState,
Message message);

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.gateway.service.GatewayServiceEvent;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
import org.apache.helix.gateway.util.PerKeyLockRegistry;
import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
import org.apache.helix.model.Message;
import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
Expand Down Expand Up @@ -59,8 +60,9 @@ public HelixGatewayServiceGrpcService(GatewayServiceManager manager) {
/**
* Grpc service end pint.
* Application instances Report the state of the shard or result of transition request to the gateway service.
* @param responseObserver
* @return
*
* @param responseObserver the observer to send the response to the client
* @return the observer to receive the state of the shard or result of transition request
*/
@Override
public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report(
Expand Down Expand Up @@ -92,17 +94,21 @@ public void onCompleted() {
/**
* Send state transition message to the instance.
* The instance must already have established a connection to the gateway service.
* @param instanceName
* @return
*
* @param instanceName the instance name to send the message to
* @param currentState the current state of shard
* @param message the message to convert to the transition message
*/
@Override
public boolean sendStateTransitionMessage(String instanceName) {
public void sendStateTransitionMessage(String instanceName, String currentState,
xyuanlu marked this conversation as resolved.
Show resolved Hide resolved
Message message) {
StreamObserver<TransitionMessage> observer;
observer = _observerMap.get(instanceName);
if (observer != null) {
observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage());
observer.onNext(
StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(currentState,
message));
}
return true;
}

private void updateObserver(String instanceName, String clusterName,
Expand Down
Loading