Skip to content

Commit

Permalink
Throttling and loss simulation
Browse files Browse the repository at this point in the history
  • Loading branch information
onoprien committed Oct 30, 2024
1 parent 1ff6209 commit b2ce85b
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 3 deletions.
55 changes: 52 additions & 3 deletions src/org/jgroups/ccs/CCSProperty.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,27 @@ public class CCSProperty {
private final String value;

public CCSProperty(String name) {
String s = System.getProperty("ccs.jg.gate");
String s = System.getProperty(name);
value = (s == null || s.isBlank()) ? null : s.trim();
}

public CCSProperty(String name, String value) {
this.value = value.trim();
}

static public Level getMaxLevel(CCSProperty... properties) {
Level out = null;
for (CCSProperty p : properties) {
Level level = p.getLevel();
if (level != null) {
if (out == null || out.intValue() < level.intValue()) {
out = level;
}
}
}
return out;
}

public boolean isSet() {
return value != null;
}
Expand Down Expand Up @@ -46,6 +59,19 @@ public double getDouble(String key) {
}
}

public double getDouble() {
if (value == null) return Double.NaN;
for (String s : value.split("&")) {
if (!(s.isBlank() || s.contains("="))) {
try {
return Double.parseDouble(s.trim());
} catch (NumberFormatException x) {
}
}
}
return Double.NaN;
}

public int getInt(String key) {
String s = get(key);
if (s == null) return Integer.MIN_VALUE;
Expand All @@ -56,6 +82,19 @@ public int getInt(String key) {
}
}

public int getInt() {
if (value == null) return Integer.MIN_VALUE;
for (String s : value.split("&")) {
if (!(s.isBlank() || s.contains("="))) {
try {
return Integer.parseInt(s.trim());
} catch (NumberFormatException x) {
}
}
}
return Integer.MIN_VALUE;
}

public boolean getBoolean(String key) {
if (value == null) return false;
for (String s : value.split("&")) {
Expand Down Expand Up @@ -96,7 +135,17 @@ public Level getLevel() {

static public void main(String... s) {
CCSProperty p = new CCSProperty("", "441");
System.out.println(p.getBoolean("INFO"));
System.out.println(p.getLevel());
System.out.println(p.getBoolean("FINE")); // false
System.out.println(p.getLevel()); // INFO
System.out.println(p.getDouble()); // 441.
System.out.println(p.getDouble("x")); // NaN
System.out.println(p.getInt()); // 441
p = new CCSProperty("", "x=4&FINE & 441");
System.out.println(p.getBoolean("FINE")); // true
System.out.println(p.getLevel()); // FINE
System.out.println(p.getDouble()); // 441.
System.out.println(p.getDouble("x")); // 4
System.out.println(p.getInt()); // 441

}
}
109 changes: 109 additions & 0 deletions src/org/jgroups/ccs/MessageGate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package org.jgroups.ccs;

import java.util.Random;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jgroups.logging.Log;

/**
* Used to throttle messages before sending them over the network, or simulate message loss.
*
* @author onoprien
*/
public class MessageGate {

// -- Fields : -----------------------------------------------------------------

static final long MIN_DELAY = 1000; // ns
static private final long VETO_TIME = 60*1000000000; // 1 minute
static private final double VETO_FACTOR = 2;

private final Random rand = new Random();
private final Log log;
private final Level lev;

private double loss = -1.; // fraction of discarded messages [0,1]

private long maxSize = 0; // Bytes
private double maxRate; // Bytes/ns

private long prevDelay, prevTime;

private long accSize = 0;
private long accTime = System.nanoTime();

// -- Life cycle : -------------------------------------------------------------

public MessageGate(Log logger) {
this(logger, Level.FINE);
}

public MessageGate(Log logger, Level level) {
log = logger;
lev = level == null ? Level.ALL : level;
}


// -- Setters and getters : ----------------------------------------------------

public synchronized void setMessageLoss(double fraction) {
loss = fraction;
}

public synchronized void setRateLimit(int sizeMB, int rateMB) {
maxSize = sizeMB * 1000000L;
maxRate = rateMB / 1e3;
}

// -----------------------------------------------------------------------------

synchronized public boolean process(int size) {

// Simulate message loss:

boolean lost = loss > 0. && loss > rand.nextDouble();
if (lost) {
log.out(lev, "Losing a message of size "+ size +" bytes");
return false;
} else {
log.trace("Publishing a message of size "+ size +" bytes");
}

// Limit rate:

if (maxSize > 0) limitRate(size);
return true;
}

/**
* Processes the message, taking an appropriate action if rate exceeds threshold.
* @param size Message size in bytes.
*/
private void limitRate(int size) {
if (size <= 0) return;
long now = System.nanoTime();
accSize = Math.max(0L, Math.round(accSize - maxRate*(now-accTime)));
accSize += size;
accTime = now;
if (accSize > maxSize) {
long delay = Math.round((accSize - maxSize)/maxRate);
if (delay > MIN_DELAY) {
if (delay > prevDelay * VETO_FACTOR || now > prevTime + VETO_TIME) {
if (delay > 1000000000) {
log.out(lev, "Throttle : message of size {0} is held for {1} ms.", new Object[]{size, delay / 1000000});
} else {
log.out(lev, "Throttle : message of size {0} is held for {1} ms.", new Object[]{size, delay / 1000000});
}
prevDelay = delay;
prevTime = now;
}
LockSupport.parkNanos(delay);
now = System.nanoTime();
accSize = Math.max(0L, Math.round(accSize - maxRate*(now-accTime)));
accTime = now;
}
}
}

}
29 changes: 29 additions & 0 deletions src/org/jgroups/protocols/UDP.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.Formatter;
import java.util.List;
import java.util.Map;
import org.jgroups.ccs.CCSProperty;
import org.jgroups.ccs.MessageGate;


/**
Expand Down Expand Up @@ -137,6 +139,9 @@ public class UDP extends TP {

protected static final boolean is_android, is_mac;

// CCS begin
private MessageGate messageGate;
// CCS end

static {
is_android=Util.checkForAndroid();
Expand Down Expand Up @@ -257,6 +262,13 @@ public String getInfo() {

@Override
public void sendToAll(byte[] data, int offset, int length) throws Exception {
// CCS begin
if (messageGate != null) {
if (!messageGate.process(length)) {
return;
}
}
// CCS end
if(ip_mcast && mcast_addr != null) {
if(local_transport != null) {
try {
Expand Down Expand Up @@ -324,6 +336,23 @@ public Object down(Event evt) {

public void init() throws Exception {
super.init();

// CCS begin
double loss = ccs_prop_debug_loss.getDouble();
if ((!Double.isNaN(loss) && loss > 0. && loss < 1.) || ccs_prop_throttle.isSet()) {
messageGate = new MessageGate(log, CCSProperty.getMaxLevel(ccs_prop_debug_loss, ccs_prop_throttle));
messageGate.setMessageLoss(loss);
log.warn("Simulating losing "+ loss +" of multicast messages.");
int maxSize = ccs_prop_throttle.getInt("size");
int maxRate = ccs_prop_throttle.getInt("rate");
if (maxRate > 0) {
if (maxSize < 1) maxSize = maxRate;
messageGate.setRateLimit(maxSize, maxRate);
log.info("Throttle message publication at "+ maxSize +" MB, "+ maxRate +" MB/sec.");
}
}
// CCS end

if(bundler.getMaxSize() > Global.MAX_DATAGRAM_PACKET_SIZE)
throw new IllegalArgumentException("bundler.max_size (" + bundler.getMaxSize() + ") cannot exceed the max " +
"datagram packet size of " + Global.MAX_DATAGRAM_PACKET_SIZE);
Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/stack/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public abstract class Protocol implements Lifecycle {
protected final Log log = new CCSLog(this);

static public final CCSProperty ccs_prop_retransmit = new CCSProperty("ccs.jg.retransmit");
static public final CCSProperty ccs_prop_throttle = new CCSProperty("ccs.jg.throttle");
static public final CCSProperty ccs_prop_debug_loss = new CCSProperty("ccs.jg.debug.loss");
// CCS end

protected List<Policy> policies;
Expand Down

0 comments on commit b2ce85b

Please sign in to comment.