Skip to content

Commit

Permalink
Minor
Browse files Browse the repository at this point in the history
  • Loading branch information
onoprien committed Oct 28, 2024
1 parent 4eb3819 commit 1ff6209
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 1 deletion.
102 changes: 102 additions & 0 deletions src/org/jgroups/ccs/CCSProperty.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.jgroups.ccs;

import java.util.logging.Level;

/**
*
* @author onoprien
*/
public class CCSProperty {

private final String value;

public CCSProperty(String name) {
String s = System.getProperty("ccs.jg.gate");
value = (s == null || s.isBlank()) ? null : s.trim();
}

public CCSProperty(String name, String value) {
this.value = value.trim();
}

public boolean isSet() {
return value != null;
}

public String get(String key) {
if (value == null) return null;
for (String s : value.split("&")) {
if (!s.isBlank()) {
String[] ss = s.split("=");
if (ss.length == 2 && ss[0].trim().equals(key)) {
return ss[1].trim();
}
}
}
return null;
}

public double getDouble(String key) {
String s = get(key);
if (s == null) return Double.NaN;
try {
return Double.parseDouble(s);
} catch (NumberFormatException x) {
return Double.NaN;
}
}

public int getInt(String key) {
String s = get(key);
if (s == null) return Integer.MIN_VALUE;
try {
return Integer.parseInt(s);
} catch (NumberFormatException x) {
return Integer.MIN_VALUE;
}
}

public boolean getBoolean(String key) {
if (value == null) return false;
for (String s : value.split("&")) {
if (!s.isBlank()) {
String[] ss = s.split("=");
if (ss[0].trim().equals(key)) {
if (ss.length == 1) {
return true;
} else if (ss.length == 2) {
return !"false".equalsIgnoreCase(ss[1]);
}
}
}
}
return false;
}

public Level getLevel() {
if (value == null) return null;
for (String s : value.split("&")) {
if (!s.isBlank()) {
String[] ss = s.split("=");
String name = switch (ss.length) {
case 1 -> ss[0].trim();
case 2 -> "level".equals(ss[0].trim()) ? ss[1].trim() : null;
default -> null;
};
if (name != null && name.matches("\\D+")) {
try {
return Level.parse(name);
} catch (IllegalArgumentException x) {
}
}
}
}
return Level.INFO;
}

static public void main(String... s) {
CCSProperty p = new CCSProperty("", "441");
System.out.println(p.getBoolean("INFO"));
System.out.println(p.getLevel());
}
}
43 changes: 42 additions & 1 deletion src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Level;

import static org.jgroups.Message.Flag.NO_FC;
import static org.jgroups.Message.Flag.OOB;
import static org.jgroups.Message.TransientFlag.*;
import org.jgroups.ccs.CCSUtil;


/**
Expand Down Expand Up @@ -254,7 +256,12 @@ public void setResendLastSeqno(boolean flag) {

/** Log to suppress identical warnings for messages from non-members */
protected SuppressLog<Address> suppress_log_non_member;


// CCS begin
private final Level ccs_retransmit_level = ccs_prop_retransmit.getLevel();
private final boolean ccs_retransmit_suppress = ccs_prop_retransmit.getBoolean("suppress");
private final Map<String,Long> xmit_prev = new ConcurrentHashMap<>(); // SeqnoList.toString() -> millis time stamp
// CCS end

public long getXmitRequestsReceived() {return xmit_reqs_received.sum();}
public long getXmitRequestsSent() {return xmit_reqs_sent.sum();}
Expand Down Expand Up @@ -976,6 +983,28 @@ protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loop
protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender) {
if(is_trace)
log.trace("%s <-- %s: XMIT(%s%s)", local_addr, xmit_requester, original_sender, missing_msgs);

// CCS begin
if (ccs_prop_retransmit.isSet()) {
if (original_sender.equals(local_addr)) {
if (ccs_retransmit_suppress && use_mcast_xmit) { // suppress if request for the same set of messages was processed less that xmit_interval/2 ago
String key = missing_msgs.toString();
long now = System.currentTimeMillis();
long t = xmit_prev.merge(key, now, (old, cur) -> (cur-old)>(xmit_interval/2) ? cur : old-1);
if (t == now) {
log.out(ccs_retransmit_level, "NAKACK2: retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ missing_msgs);
} else {
log.out(ccs_retransmit_level, "NAKACK2: suppressed retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ missing_msgs);
return;
}
} else {
log.out(ccs_retransmit_level, "NAKACK2: retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ missing_msgs);
}
} else {
log.warn("NAKACK2: handling retransmit request from "+ CCSUtil.toString(xmit_requester) +" for "+ CCSUtil.toString(original_sender));
}
}
// CCS end

if(stats)
xmit_reqs_received.add(missing_msgs.size());
Expand Down Expand Up @@ -1572,6 +1601,18 @@ else if(!xmit_task_map.isEmpty())
}
if(resend_last_seqno && last_seqno_resender != null)
last_seqno_resender.execute(seqno.get());

// CCS begin
if (ccs_retransmit_suppress) {
long deadline = System.currentTimeMillis() - xmit_interval / 2;
Iterator<Map.Entry<String, Long>> it = xmit_prev.entrySet().iterator();
while (it.hasNext()) {
if (it.next().getValue() < deadline) {
it.remove();
}
}
}
// CCS end
}


Expand Down
3 changes: 3 additions & 0 deletions src/org/jgroups/stack/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.jgroups.ccs.CCSLog;
import org.jgroups.ccs.CCSProperty;


/**
Expand Down Expand Up @@ -68,6 +69,8 @@ public abstract class Protocol implements Lifecycle {
// CCS begin
// protected final Log log=LogFactory.getLog(this.getClass());
protected final Log log = new CCSLog(this);

static public final CCSProperty ccs_prop_retransmit = new CCSProperty("ccs.jg.retransmit");
// CCS end

protected List<Policy> policies;
Expand Down

0 comments on commit 1ff6209

Please sign in to comment.