From 1ff620966d905776d1fee82aca1d12bb8443808d Mon Sep 17 00:00:00 2001 From: onoprien Date: Mon, 28 Oct 2024 14:05:25 -0700 Subject: [PATCH] Minor --- src/org/jgroups/ccs/CCSProperty.java | 102 ++++++++++++++++++ src/org/jgroups/protocols/pbcast/NAKACK2.java | 43 +++++++- src/org/jgroups/stack/Protocol.java | 3 + 3 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 src/org/jgroups/ccs/CCSProperty.java diff --git a/src/org/jgroups/ccs/CCSProperty.java b/src/org/jgroups/ccs/CCSProperty.java new file mode 100644 index 0000000000..5e2dadeb60 --- /dev/null +++ b/src/org/jgroups/ccs/CCSProperty.java @@ -0,0 +1,102 @@ +package org.jgroups.ccs; + +import java.util.logging.Level; + +/** + * + * @author onoprien + */ +public class CCSProperty { + + private final String value; + + public CCSProperty(String name) { + String s = System.getProperty("ccs.jg.gate"); + value = (s == null || s.isBlank()) ? null : s.trim(); + } + + public CCSProperty(String name, String value) { + this.value = value.trim(); + } + + public boolean isSet() { + return value != null; + } + + public String get(String key) { + if (value == null) return null; + for (String s : value.split("&")) { + if (!s.isBlank()) { + String[] ss = s.split("="); + if (ss.length == 2 && ss[0].trim().equals(key)) { + return ss[1].trim(); + } + } + } + return null; + } + + public double getDouble(String key) { + String s = get(key); + if (s == null) return Double.NaN; + try { + return Double.parseDouble(s); + } catch (NumberFormatException x) { + return Double.NaN; + } + } + + public int getInt(String key) { + String s = get(key); + if (s == null) return Integer.MIN_VALUE; + try { + return Integer.parseInt(s); + } catch (NumberFormatException x) { + return Integer.MIN_VALUE; + } + } + + public boolean getBoolean(String key) { + if (value == null) return false; + for (String s : value.split("&")) { + if (!s.isBlank()) { + String[] ss = s.split("="); + if (ss[0].trim().equals(key)) { + if (ss.length == 1) { + return true; + } else if (ss.length == 2) { + return !"false".equalsIgnoreCase(ss[1]); + } + } + } + } + return false; + } + + public Level getLevel() { + if (value == null) return null; + for (String s : value.split("&")) { + if (!s.isBlank()) { + String[] ss = s.split("="); + String name = switch (ss.length) { + case 1 -> ss[0].trim(); + case 2 -> "level".equals(ss[0].trim()) ? ss[1].trim() : null; + default -> null; + }; + if (name != null && name.matches("\\D+")) { + try { + return Level.parse(name); + } catch (IllegalArgumentException x) { + } + } + } + } + return Level.INFO; + } + + static public void main(String... s) { + CCSProperty p = new CCSProperty("", "441"); + System.out.println(p.getBoolean("INFO")); + System.out.println(p.getLevel()); + } +} diff --git a/src/org/jgroups/protocols/pbcast/NAKACK2.java b/src/org/jgroups/protocols/pbcast/NAKACK2.java index acf94cb606..4f645fe88d 100644 --- a/src/org/jgroups/protocols/pbcast/NAKACK2.java +++ b/src/org/jgroups/protocols/pbcast/NAKACK2.java @@ -23,10 +23,12 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.logging.Level; import static org.jgroups.Message.Flag.NO_FC; import static org.jgroups.Message.Flag.OOB; import static org.jgroups.Message.TransientFlag.*; +import org.jgroups.ccs.CCSUtil; /** @@ -254,7 +256,12 @@ public void setResendLastSeqno(boolean flag) { /** Log to suppress identical warnings for messages from non-members */ protected SuppressLog
suppress_log_non_member; - + + // CCS begin + private final Level ccs_retransmit_level = ccs_prop_retransmit.getLevel(); + private final boolean ccs_retransmit_suppress = ccs_prop_retransmit.getBoolean("suppress"); + private final Map xmit_prev = new ConcurrentHashMap<>(); // SeqnoList.toString() -> millis time stamp + // CCS end public long getXmitRequestsReceived() {return xmit_reqs_received.sum();} public long getXmitRequestsSent() {return xmit_reqs_sent.sum();} @@ -976,6 +983,28 @@ protected void removeAndDeliver(Table buf, Address sender, boolean loop protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender) { if(is_trace) log.trace("%s <-- %s: XMIT(%s%s)", local_addr, xmit_requester, original_sender, missing_msgs); + + // CCS begin + if (ccs_prop_retransmit.isSet()) { + if (original_sender.equals(local_addr)) { + if (ccs_retransmit_suppress && use_mcast_xmit) { // suppress if request for the same set of messages was processed less that xmit_interval/2 ago + String key = missing_msgs.toString(); + long now = System.currentTimeMillis(); + long t = xmit_prev.merge(key, now, (old, cur) -> (cur-old)>(xmit_interval/2) ? cur : old-1); + if (t == now) { + log.out(ccs_retransmit_level, "NAKACK2: retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ missing_msgs); + } else { + log.out(ccs_retransmit_level, "NAKACK2: suppressed retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ missing_msgs); + return; + } + } else { + log.out(ccs_retransmit_level, "NAKACK2: retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ missing_msgs); + } + } else { + log.warn("NAKACK2: handling retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ CCSUtil.toString(original_sender)); + } + } + // CCS end if(stats) xmit_reqs_received.add(missing_msgs.size()); @@ -1572,6 +1601,18 @@ else if(!xmit_task_map.isEmpty()) } if(resend_last_seqno && last_seqno_resender != null) last_seqno_resender.execute(seqno.get()); + + // CCS begin + if (ccs_retransmit_suppress) { + long deadline = System.currentTimeMillis() - xmit_interval / 2; + Iterator> it = xmit_prev.entrySet().iterator(); + while (it.hasNext()) { + if (it.next().getValue() < deadline) { + it.remove(); + } + } + } + // CCS end } diff --git a/src/org/jgroups/stack/Protocol.java b/src/org/jgroups/stack/Protocol.java index 697e51c164..67ffcacfde 100644 --- a/src/org/jgroups/stack/Protocol.java +++ b/src/org/jgroups/stack/Protocol.java @@ -24,6 +24,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.jgroups.ccs.CCSLog; +import org.jgroups.ccs.CCSProperty; /** @@ -68,6 +69,8 @@ public abstract class Protocol implements Lifecycle { // CCS begin // protected final Log log=LogFactory.getLog(this.getClass()); protected final Log log = new CCSLog(this); + + static public final CCSProperty ccs_prop_retransmit = new CCSProperty("ccs.jg.retransmit"); // CCS end protected List policies;