Skip to content

Commit

Permalink
Patch for physical address
Browse files Browse the repository at this point in the history
  • Loading branch information
onoprien committed Oct 31, 2024
1 parent b2ce85b commit a1f9620
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 4 deletions.
4 changes: 4 additions & 0 deletions src/org/jgroups/logging/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public interface Log {

// CCS begin
default boolean isEnabled(Level level) {
if (level == null) return false;
return switch (level.getName()) {
case "OFF" -> isFatalEnabled();
case "SEVERE" -> isErrorEnabled();
Expand All @@ -58,6 +59,7 @@ default boolean isEnabled(Level level) {
}

default void out(Level level, String msg) {
if (level == null) return;
switch (level.getName()) {
case "OFF" -> fatal(msg);
case "SEVERE" -> error(msg);
Expand All @@ -71,6 +73,7 @@ default void out(Level level, String msg) {
}

default void out(Level level, String format, Object ... args) {
if (level == null) return;
switch (level.getName()) {
case "OFF" -> fatal(format, args);
case "SEVERE" -> error(format, args);
Expand All @@ -84,6 +87,7 @@ default void out(Level level, String format, Object ... args) {
}

default void out(Level level, String msg, Throwable throwable) {
if (level == null) return;
switch (level.getName()) {
case "OFF" -> fatal(msg, throwable);
case "SEVERE" -> error(msg, throwable);
Expand Down
38 changes: 38 additions & 0 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import org.jgroups.ccs.CCSUtil;

import static org.jgroups.conf.AttributeType.SCALAR;

Expand Down Expand Up @@ -1436,6 +1438,37 @@ protected void sendTo(final Address dest, byte[] buf, int offset, int length) th
responses.done();
}
}
// CCS begin
if (ccs_prop_physical.isSet()) {
if (logical_addr_cache != null && dest != null) {
int where = 0;
physical_dest = logical_addr_cache.get(dest);
if (physical_dest == null) {
for (Map.Entry<Address,PhysicalAddress> e : logical_addr_cache.contents(false).entrySet()) {
Address a = e.getKey();
if (dest.equals(a)) {
physical_dest = e.getValue();
where = 1;
break;
} else if (dest.toString().equals(a.toString())) {
log.warn("TP: stale cache, wanted "+ CCSUtil.toString(dest) +", found "+ CCSUtil.toString(a));
}
}
}
if (physical_dest == null) {
LockSupport.parkNanos(10000);
physical_dest = logical_addr_cache.get(dest);
where = 2;
}
if (physical_dest == null) {
log.warn("TP: failed sendTo "+ CCSUtil.toString(dest));
} else {
log.warn("TP: recovered sendTo "+ where +", logical "+ CCSUtil.toString(dest) +", physical "+ CCSUtil.toString(physical_dest));
sendUnicast(physical_dest, buf, offset, length);
}
}
}
// CCS end
}


Expand Down Expand Up @@ -1574,6 +1607,11 @@ public boolean addPhysicalAddressToCache(Address logical_addr, PhysicalAddress p
}

protected boolean addPhysicalAddressToCache(Address logical_addr, PhysicalAddress physical_addr, boolean overwrite) {
// CCS begin
if (ccs_prop_physical.isSet() && !Objects.equals(local_addr, logical_addr)) {
log.out(ccs_prop_physical.getLevel(), "TP: adding physical address to cache: Logical: "+ CCSUtil.toString(logical_addr) +", Physical: "+ CCSUtil.toString(physical_addr));
}
// CCS end
return logical_addr != null && physical_addr != null &&
overwrite? logical_addr_cache.add(logical_addr, physical_addr) : logical_addr_cache.addIfAbsent(logical_addr, physical_addr);
}
Expand Down
34 changes: 30 additions & 4 deletions src/org/jgroups/protocols/UDP.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.util.Formatter;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import org.jgroups.ccs.CCSProperty;
import org.jgroups.ccs.CCSUtil;
import org.jgroups.ccs.MessageGate;


Expand Down Expand Up @@ -515,14 +517,38 @@ protected PacketReceiver[] createReceivers(int num, DatagramSocket sock, String


protected IpAddress createLocalAddress() {
if(sock == null || sock.isClosed())

// CCS begin
Level level = ccs_prop_physical.getLevel();

if (sock == null || sock.isClosed()) {
log.out(level, "Trying to create physical address of null or closed socket %s", sock);
return null;
if(external_addr != null) {
if(external_port > 0)
}
if (external_addr != null) {
if (external_port > 0) {
return new IpAddress(external_addr, external_port);
}
return new IpAddress(external_addr, sock.getLocalPort());
}
return new IpAddress(sock.getLocalAddress(), sock.getLocalPort());
IpAddress out = new IpAddress(sock.getLocalAddress(), sock.getLocalPort());
if (log.isEnabled(level)) {
log.out(level, "Created physical address: "+ CCSUtil.toString(out));
}
return out;


// if(sock == null || sock.isClosed())
// return null;
// if(external_addr != null) {
// if(external_port > 0)
// return new IpAddress(external_addr, external_port);
// return new IpAddress(external_addr, sock.getLocalPort());
// }
// return new IpAddress(sock.getLocalAddress(), sock.getLocalPort());

// CCS end

}

protected <T extends UDP> T setTimeToLive(int ttl, MulticastSocket s) {
Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/stack/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public abstract class Protocol implements Lifecycle {
// protected final Log log=LogFactory.getLog(this.getClass());
protected final Log log = new CCSLog(this);

static public final CCSProperty ccs_prop_physical = new CCSProperty("ccs.jg.physical");
static public final CCSProperty ccs_prop_connect = new CCSProperty("ccs.jg.connect");
static public final CCSProperty ccs_prop_retransmit = new CCSProperty("ccs.jg.retransmit");
static public final CCSProperty ccs_prop_throttle = new CCSProperty("ccs.jg.throttle");
static public final CCSProperty ccs_prop_debug_loss = new CCSProperty("ccs.jg.debug.loss");
Expand Down

0 comments on commit a1f9620

Please sign in to comment.