Skip to content

Commit

Permalink
MessageGate
Browse files Browse the repository at this point in the history
  • Loading branch information
onoprien committed Oct 22, 2024
1 parent 5e3d067 commit 96df0ac
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 3 deletions.
108 changes: 108 additions & 0 deletions src/org/jgroups/ccs/MessageGate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.jgroups.ccs;

import java.util.Random;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jgroups.logging.Log;

/**
* Used to throttle messages before sending them over the network, or simulate message loss.
*
* @author onoprien
*/
public class MessageGate {

// -- Fields : -----------------------------------------------------------------

static final long MIN_DELAY = 1000; // ns
static private final long VETO_TIME = 60*1000000000; // 1 minute
static private final double VETO_FACTOR = 2;

private final Random rand = new Random();
private final Log log;

private double loss = -1.; // fraction of discarded messages [0,1]

private long maxSize = 0; // Bytes
private double maxRate; // Bytes/ns

private long prevDelay, prevTime;

private long accSize = 0;
private long accTime = System.nanoTime();

// -- Life cycle : -------------------------------------------------------------

/**
* Constructs an instance.
*
* @param logger Logger.
*/
public MessageGate(Log logger) {
log = logger;
}


// -- Setters and getters : ----------------------------------------------------

public synchronized void setMessageLoss(double fraction) {
loss = fraction;
}

public synchronized void setRateLimit(int sizeMB, int rateMB) {
maxSize = sizeMB * 1000000L;
maxRate = rateMB / 1e3;
}

// -----------------------------------------------------------------------------

synchronized public boolean process(int size) {

// Simulate message loss:

boolean lost = loss > 0. && loss > rand.nextDouble();
if (lost) {
log.debug("Losing a message of size "+ size +" bytes");
return false;
} else {
log.debug("Publishing a message of size "+ size +" bytes");
}

// Limit rate:

if (maxSize > 0) limitRate(size);
return true;
}

/**
* Processes the message, taking an appropriate action if rate exceeds threshold.
* @param size Message size in bytes.
*/
private void limitRate(int size) {
if (size <= 0) return;
long now = System.nanoTime();
accSize = Math.max(0L, Math.round(accSize - maxRate*(now-accTime)));
accSize += size;
accTime = now;
if (accSize > maxSize) {
long delay = Math.round((accSize - maxSize)/maxRate);
if (delay > MIN_DELAY) {
if (delay > prevDelay * VETO_FACTOR || now > prevTime + VETO_TIME) {
if (delay > 1000000000) {
log.warn("Throttle : message of size {0} is held for {1} ms.", new Object[]{size, delay / 1000000});
} else {
log.debug("Throttle : message of size {0} is held for {1} ms.", new Object[]{size, delay / 1000000});
}
prevDelay = delay;
prevTime = now;
}
LockSupport.parkNanos(delay);
now = System.nanoTime();
accSize = Math.max(0L, Math.round(accSize - maxRate*(now-accTime)));
accTime = now;
}
}
}

}
48 changes: 46 additions & 2 deletions src/org/jgroups/protocols/UDP.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import java.lang.reflect.Method;
import java.net.*;
import java.util.Formatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.jgroups.ccs.CCSUtil;
import org.jgroups.ccs.MessageGate;


/**
Expand Down Expand Up @@ -136,6 +138,10 @@ public class UDP extends TP {
protected SuppressLog<InetAddress> suppress_log_out_of_buffer_space;

protected static final boolean is_android, is_mac;

// CCS begin
private MessageGate messageGate;
// CCS end


static {
Expand Down Expand Up @@ -231,10 +237,20 @@ public String getInfo() {
}

public void sendMulticast(byte[] data, int offset, int length) throws Exception {
if(ip_mcast && mcast_addr != null)
if(ip_mcast && mcast_addr != null) {

// CCS begin
if (messageGate != null) {
if (!messageGate.process(length)) {
return;
}
}
// CCS end

_send(mcast_addr.getIpAddress(), mcast_addr.getPort(), data, offset, length);
else
} else {
sendToMembers(members, data, offset, length);
}
}

public void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception {
Expand Down Expand Up @@ -280,6 +296,34 @@ public Object down(Event evt) {

public void init() throws Exception {
super.init();

// CCS begin
String s = System.getProperty("ccs.jg.gate");
if (s != null) {
messageGate = new MessageGate(log);
Map<String,String> m = new HashMap<>();
for (String token : s.split(";")) {
String[] ss = token.split("=");
if (ss.length == 2) {
m.put(ss[0], ss[1]);
}
}
try {
double loss = Double.parseDouble(m.get("loss"));
messageGate.setMessageLoss(loss);
log.info("Simulate losing "+ loss +" of messages.");
} catch (NullPointerException | NumberFormatException x) {
}
try {
int maxSize = Integer.parseInt(m.get("max_size"));
int maxRate = Integer.parseInt(m.get("max_rate"));
messageGate.setRateLimit(maxSize, maxRate);
log.info("Throttle message publication at "+ maxSize +" MB, "+ maxRate +" MB/sec.");
} catch (NullPointerException | NumberFormatException x) {
}
}
// CCS end

if(max_bundle_size > Global.MAX_DATAGRAM_PACKET_SIZE)
throw new IllegalArgumentException("max_bundle_size (" + max_bundle_size + ") cannot exceed the max datagram " +
"packet size of " + Global.MAX_DATAGRAM_PACKET_SIZE);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Add
if (ccs_retransmit_act && use_mcast_xmit) { // suppress if request for the same set of messages was processed less that xmit_interval/2 ago
String key = missing_msgs.toString();
long now = System.currentTimeMillis();
long t = xmit_prev.merge(key, now, (old, cur) -> (cur-old)>(xmit_interval/2) ? cur : old);
long t = xmit_prev.merge(key, now, (old, cur) -> (cur-old)>(xmit_interval/2) ? cur : old-1);
if (t == now) {
log.info("NAKACK2: retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ missing_msgs);
} else {
Expand Down

0 comments on commit 96df0ac

Please sign in to comment.