From 96df0ac70b8c818f2b2136792340c9f5d1a5eab3 Mon Sep 17 00:00:00 2001 From: onoprien Date: Tue, 22 Oct 2024 16:18:56 -0700 Subject: [PATCH] MessageGate --- src/org/jgroups/ccs/MessageGate.java | 108 ++++++++++++++++++ src/org/jgroups/protocols/UDP.java | 48 +++++++- src/org/jgroups/protocols/pbcast/NAKACK2.java | 2 +- 3 files changed, 155 insertions(+), 3 deletions(-) create mode 100644 src/org/jgroups/ccs/MessageGate.java diff --git a/src/org/jgroups/ccs/MessageGate.java b/src/org/jgroups/ccs/MessageGate.java new file mode 100644 index 0000000000..818855efc9 --- /dev/null +++ b/src/org/jgroups/ccs/MessageGate.java @@ -0,0 +1,108 @@ +package org.jgroups.ccs; + +import java.util.Random; +import java.util.concurrent.locks.LockSupport; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.jgroups.logging.Log; + +/** + * Used to throttle messages before sending them over the network, or simulate message loss. + * + * @author onoprien + */ +public class MessageGate { + +// -- Fields : ----------------------------------------------------------------- + + static final long MIN_DELAY = 1000; // ns + static private final long VETO_TIME = 60*1000000000; // 1 minute + static private final double VETO_FACTOR = 2; + + private final Random rand = new Random(); + private final Log log; + + private double loss = -1.; // fraction of discarded messages [0,1] + + private long maxSize = 0; // Bytes + private double maxRate; // Bytes/ns + + private long prevDelay, prevTime; + + private long accSize = 0; + private long accTime = System.nanoTime(); + +// -- Life cycle : ------------------------------------------------------------- + + /** + * Constructs an instance. + * + * @param logger Logger. + */ + public MessageGate(Log logger) { + log = logger; + } + + +// -- Setters and getters : ---------------------------------------------------- + + public synchronized void setMessageLoss(double fraction) { + loss = fraction; + } + + public synchronized void setRateLimit(int sizeMB, int rateMB) { + maxSize = sizeMB * 1000000L; + maxRate = rateMB / 1e3; + } + +// ----------------------------------------------------------------------------- + + synchronized public boolean process(int size) { + + // Simulate message loss: + + boolean lost = loss > 0. && loss > rand.nextDouble(); + if (lost) { + log.debug("Losing a message of size "+ size +" bytes"); + return false; + } else { + log.debug("Publishing a message of size "+ size +" bytes"); + } + + // Limit rate: + + if (maxSize > 0) limitRate(size); + return true; + } + + /** + * Processes the message, taking an appropriate action if rate exceeds threshold. + * @param size Message size in bytes. + */ + private void limitRate(int size) { + if (size <= 0) return; + long now = System.nanoTime(); + accSize = Math.max(0L, Math.round(accSize - maxRate*(now-accTime))); + accSize += size; + accTime = now; + if (accSize > maxSize) { + long delay = Math.round((accSize - maxSize)/maxRate); + if (delay > MIN_DELAY) { + if (delay > prevDelay * VETO_FACTOR || now > prevTime + VETO_TIME) { + if (delay > 1000000000) { + log.warn("Throttle : message of size {0} is held for {1} ms.", new Object[]{size, delay / 1000000}); + } else { + log.debug("Throttle : message of size {0} is held for {1} ms.", new Object[]{size, delay / 1000000}); + } + prevDelay = delay; + prevTime = now; + } + LockSupport.parkNanos(delay); + now = System.nanoTime(); + accSize = Math.max(0L, Math.round(accSize - maxRate*(now-accTime))); + accTime = now; + } + } + } + +} diff --git a/src/org/jgroups/protocols/UDP.java b/src/org/jgroups/protocols/UDP.java index bf731a39d9..337ce1702d 100644 --- a/src/org/jgroups/protocols/UDP.java +++ b/src/org/jgroups/protocols/UDP.java @@ -16,9 +16,11 @@ import java.lang.reflect.Method; import java.net.*; import java.util.Formatter; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.jgroups.ccs.CCSUtil; +import org.jgroups.ccs.MessageGate; /** @@ -136,6 +138,10 @@ public class UDP extends TP { protected SuppressLog suppress_log_out_of_buffer_space; protected static final boolean is_android, is_mac; + + // CCS begin + private MessageGate messageGate; + // CCS end static { @@ -231,10 +237,20 @@ public String getInfo() { } public void sendMulticast(byte[] data, int offset, int length) throws Exception { - if(ip_mcast && mcast_addr != null) + if(ip_mcast && mcast_addr != null) { + + // CCS begin + if (messageGate != null) { + if (!messageGate.process(length)) { + return; + } + } + // CCS end + _send(mcast_addr.getIpAddress(), mcast_addr.getPort(), data, offset, length); - else + } else { sendToMembers(members, data, offset, length); + } } public void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception { @@ -280,6 +296,34 @@ public Object down(Event evt) { public void init() throws Exception { super.init(); + + // CCS begin + String s = System.getProperty("ccs.jg.gate"); + if (s != null) { + messageGate = new MessageGate(log); + Map m = new HashMap<>(); + for (String token : s.split(";")) { + String[] ss = token.split("="); + if (ss.length == 2) { + m.put(ss[0], ss[1]); + } + } + try { + double loss = Double.parseDouble(m.get("loss")); + messageGate.setMessageLoss(loss); + log.info("Simulate losing "+ loss +" of messages."); + } catch (NullPointerException | NumberFormatException x) { + } + try { + int maxSize = Integer.parseInt(m.get("max_size")); + int maxRate = Integer.parseInt(m.get("max_rate")); + messageGate.setRateLimit(maxSize, maxRate); + log.info("Throttle message publication at "+ maxSize +" MB, "+ maxRate +" MB/sec."); + } catch (NullPointerException | NumberFormatException x) { + } + } + // CCS end + if(max_bundle_size > Global.MAX_DATAGRAM_PACKET_SIZE) throw new IllegalArgumentException("max_bundle_size (" + max_bundle_size + ") cannot exceed the max datagram " + "packet size of " + Global.MAX_DATAGRAM_PACKET_SIZE); diff --git a/src/org/jgroups/protocols/pbcast/NAKACK2.java b/src/org/jgroups/protocols/pbcast/NAKACK2.java index 806cf994eb..313b993706 100644 --- a/src/org/jgroups/protocols/pbcast/NAKACK2.java +++ b/src/org/jgroups/protocols/pbcast/NAKACK2.java @@ -914,7 +914,7 @@ protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Add if (ccs_retransmit_act && use_mcast_xmit) { // suppress if request for the same set of messages was processed less that xmit_interval/2 ago String key = missing_msgs.toString(); long now = System.currentTimeMillis(); - long t = xmit_prev.merge(key, now, (old, cur) -> (cur-old)>(xmit_interval/2) ? cur : old); + long t = xmit_prev.merge(key, now, (old, cur) -> (cur-old)>(xmit_interval/2) ? cur : old-1); if (t == now) { log.info("NAKACK2: retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ missing_msgs); } else {