Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SWITCHYARD-2009 #671

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 47 additions & 29 deletions sca/src/main/java/org/switchyard/component/sca/SCAInvoker.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
*/
package org.switchyard.component.sca;

import java.io.IOException;

import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.xml.namespace.QName;

import org.jboss.logging.Logger;
import org.jboss.jbossts.txbridge.outbound.OutboundBridge;
import org.jboss.jbossts.txbridge.outbound.OutboundBridgeManager;
import org.jboss.logging.Logger;
import org.oasis_open.docs.ws_tx.wscoor._2006._06.CoordinationContextType;
import org.switchyard.Context;
import org.switchyard.Exchange;
import org.switchyard.ExchangePattern;
Expand All @@ -33,37 +36,40 @@
import org.switchyard.config.model.composite.SCABindingModel;
import org.switchyard.deploy.BaseServiceHandler;
import org.switchyard.label.BehaviorLabel;
import org.switchyard.remote.RemoteEndpoint;
import org.switchyard.remote.RemoteInvoker;
import org.switchyard.remote.RemoteMessage;
import org.switchyard.remote.RemoteMessages;
import org.switchyard.remote.RemoteRegistry;
import org.switchyard.remote.cluster.ClusteredInvoker;
import org.switchyard.remote.cluster.LoadBalanceStrategy;
import org.switchyard.remote.cluster.RandomStrategy;
import org.switchyard.remote.cluster.RoundRobinStrategy;
import org.switchyard.remote.http.HttpInvoker;
import org.switchyard.remote.http.HttpInvokerLabel;
import org.switchyard.runtime.event.ExchangeCompletionEvent;

import com.arjuna.mw.wst11.TransactionManagerFactory;
import com.arjuna.mwlabs.wst11.at.context.TxContextImple;

import org.oasis_open.docs.ws_tx.wscoor._2006._06.CoordinationContextType;

/**
* Handles outbound communication to an SCA service endpoint.
*/
public class SCAInvoker extends BaseServiceHandler {
public class SCAInvoker extends BaseServiceHandler implements RemoteInvoker {

private static Logger _log = Logger.getLogger(SCAInvoker.class);

private final SCABindingModel _config;
private final String _bindingName;
private final String _referenceName;
private ClusteredInvoker _invoker;
private RemoteRegistry _registry;
private LoadBalanceStrategy _loadBalancer;
private TransactionContextSerializer _txSerializer = new TransactionContextSerializer();

/**
* Create a new SCAInvoker for invoking local endpoints.
* @param config binding configuration model
*/
// TODO: IMHO this constructor should not exist
public SCAInvoker(SCABindingModel config) {
_config = config;
_bindingName = config.getName();
Expand All @@ -77,12 +83,9 @@ public SCAInvoker(SCABindingModel config) {
*/
public SCAInvoker(SCABindingModel config, RemoteRegistry registry) {
this(config);
if (config.isLoadBalanced()) {
LoadBalanceStrategy loadBalancer = createLoadBalancer(config.getLoadBalance());
_invoker = new ClusteredInvoker(registry, loadBalancer);
} else {
_invoker = new ClusteredInvoker(registry);
}

_registry = registry;
_loadBalancer = createLoadBalancer(config.getLoadBalance());
}

@Override
Expand All @@ -95,25 +98,31 @@ public void handleMessage(Exchange exchange) throws HandlerException {
throw SCAMessages.MESSAGES.referenceBindingNotStarted(_referenceName, _bindingName);
}
try {
if (_config.isClustered()) {
invokeRemote(exchange);
} else {
invokeLocal(exchange);
// Figure out the QName for the service were invoking
QName serviceName = getTargetServiceName(exchange);

// Maintain old functionality
if (_config.isPreferLocal()){
if ((_registry != null)
&& (_registry.hasLocalEndpoint(serviceName)) && (_loadBalancer != null)) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this logic?
If you select preferLocal, no matter if clustered or not this executes (Fine)
then if there is a localEndpoint for the service and load balance strategy do a remote otherwise a local?
I think that it should be something like:

  • if there is localendpoint invokeLocal, otherwise invokeRemote.

Also, now that this is a RemoteInvoker, you could optimize that if the "LoadBalanceStrategy" gets you a local endpoint, invokeLocal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are making a comment on code I didn't write. :-) This looks to be a comment on the code you initially submitted. At least that's what GitHub is showing me with your comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a fucker. Tiredness fooled me.
I'm cool with your changes, in fact, very cool as it will always make local calls if prefer local and service is covered, otherwise it will load balance.

invokeRemote(exchange, serviceName);
} else {
invokeLocal(exchange, serviceName);
}
}else{
if (_config.isClustered()){
invokeRemote(exchange, serviceName);
}else{
invokeLocal(exchange, serviceName);
}
}
} catch (SwitchYardException syEx) {
throw new HandlerException(syEx.getMessage());
}
}

// This method exists for test purposes and should not be used at runtime. Initialization
// of the invoker instance occurs in the constructor for SCAInvoker.
void setInvoker(ClusteredInvoker invoker) {
_invoker = invoker;
}

private void invokeLocal(Exchange exchange) throws HandlerException {
// Figure out the QName for the service were invoking
QName serviceName = getTargetServiceName(exchange);
private void invokeLocal(Exchange exchange, QName serviceName) throws HandlerException {

// Get a handle for the reference and use a copy of the exchange to invoke it
ServiceReference ref = exchange.getProvider().getDomain().getServiceReference(serviceName);
if (ref == null) {
Expand Down Expand Up @@ -144,9 +153,7 @@ private void invokeLocal(Exchange exchange) throws HandlerException {
}
}

private void invokeRemote(Exchange exchange) throws HandlerException {
// Figure out the QName for the service were invoking
QName serviceName = getTargetServiceName(exchange);
private void invokeRemote(Exchange exchange, QName serviceName) throws HandlerException {

RemoteMessage request = new RemoteMessage()
.setDomain(exchange.getProvider().getDomain().getName())
Expand All @@ -157,7 +164,7 @@ private void invokeRemote(Exchange exchange) throws HandlerException {
boolean transactionPropagated = bridgeOutgoingTransaction(request);

try {
RemoteMessage reply = _invoker.invoke(request);
RemoteMessage reply = this.invoke(request);
if (transactionPropagated) {
bridgeIncomingTransaction();
}
Expand Down Expand Up @@ -274,6 +281,8 @@ private HandlerException createHandlerException(Object content) {


LoadBalanceStrategy createLoadBalancer(String strategy) {
if (strategy==null) return null;

if (RoundRobinStrategy.class.getSimpleName().equals(strategy)) {
return new RoundRobinStrategy();
} else if (RandomStrategy.class.getSimpleName().equals(strategy)) {
Expand All @@ -290,4 +299,13 @@ LoadBalanceStrategy createLoadBalancer(String strategy) {
}
}
}

@Override
public RemoteMessage invoke(RemoteMessage request) throws IOException {
RemoteEndpoint ep = _loadBalancer.selectEndpoint(request.getService());
if (ep == null) {
throw RemoteMessages.MESSAGES.noRemoteEndpointFound(request.getService().toString());
}
return new HttpInvoker(ep.getEndpoint()).invoke(request);
}
}
35 changes: 31 additions & 4 deletions sca/src/test/java/org/switchyard/component/sca/SCAInvokerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import javax.xml.namespace.QName;

Expand Down Expand Up @@ -324,23 +325,49 @@ public boolean isClustered() {
public String getTarget() {
return "test-target";
}
@Override
public String getLoadBalance() {
return "RoundRobinStrategy";
}
public CompositeReferenceModel getReference() {
return new V1CompositeReferenceModel();
};
};

// Mock the invoker so that we don't need an actual remote endpoint
final LinkedList<RemoteMessage> msgs = new LinkedList<RemoteMessage>();
ClusteredInvoker clInovker = new ClusteredInvoker(null) {

SCAInvoker scaInvoker = new SCAInvoker(config, new RemoteRegistry() {
@Override
public void removeEndpoint(RemoteEndpoint endpoint) {
}

@Override
public boolean hasLocalEndpoint(QName serviceName) {
return false;
}

@Override
public RemoteEndpoint getLocalEndpoint(QName serviceName) {
return null;
}

@Override
public List<RemoteEndpoint> getEndpoints(QName serviceName) {
return null;
}

@Override
public void addEndpoint(RemoteEndpoint endpoint) {
}
}){
@Override
public RemoteMessage invoke(RemoteMessage request) throws IOException {
msgs.push(request);
return null;
}
};

SCAInvoker scaInvoker = new SCAInvoker(config);
scaInvoker.setInvoker(clInovker);
// scaInvoker.setInvoker(clInovker);
scaInvoker.start();

// Verify that exchange is mapped to remote message correctly
Expand Down