diff --git a/src/org/jgroups/logging/Log.java b/src/org/jgroups/logging/Log.java index 54ab90f0d7..4aa77ce76c 100644 --- a/src/org/jgroups/logging/Log.java +++ b/src/org/jgroups/logging/Log.java @@ -45,6 +45,7 @@ public interface Log { // CCS begin default boolean isEnabled(Level level) { + if (level == null) return false; return switch (level.getName()) { case "OFF" -> isFatalEnabled(); case "SEVERE" -> isErrorEnabled(); @@ -58,6 +59,7 @@ default boolean isEnabled(Level level) { } default void out(Level level, String msg) { + if (level == null) return; switch (level.getName()) { case "OFF" -> fatal(msg); case "SEVERE" -> error(msg); @@ -71,6 +73,7 @@ default void out(Level level, String msg) { } default void out(Level level, String format, Object ... args) { + if (level == null) return; switch (level.getName()) { case "OFF" -> fatal(format, args); case "SEVERE" -> error(format, args); @@ -84,6 +87,7 @@ default void out(Level level, String format, Object ... args) { } default void out(Level level, String msg, Throwable throwable) { + if (level == null) return; switch (level.getName()) { case "OFF" -> fatal(msg, throwable); case "SEVERE" -> error(msg, throwable); diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index 409d715194..48d12d817b 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -26,7 +26,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; +import org.jgroups.ccs.CCSUtil; import static org.jgroups.conf.AttributeType.SCALAR; @@ -1436,6 +1438,37 @@ protected void sendTo(final Address dest, byte[] buf, int offset, int length) th responses.done(); } } + // CCS begin + if (ccs_prop_physical.isSet()) { + if (logical_addr_cache != null && dest != null) { + int where = 0; + physical_dest = logical_addr_cache.get(dest); + if (physical_dest == null) { + for (Map.Entry
e : logical_addr_cache.contents(false).entrySet()) { + Address a = e.getKey(); + if (dest.equals(a)) { + physical_dest = e.getValue(); + where = 1; + break; + } else if (dest.toString().equals(a.toString())) { + log.warn("TP: stale cache, wanted "+ CCSUtil.toString(dest) +", found "+ CCSUtil.toString(a)); + } + } + } + if (physical_dest == null) { + LockSupport.parkNanos(10000); + physical_dest = logical_addr_cache.get(dest); + where = 2; + } + if (physical_dest == null) { + log.warn("TP: failed sendTo "+ CCSUtil.toString(dest)); + } else { + log.warn("TP: recovered sendTo "+ where +", logical "+ CCSUtil.toString(dest) +", physical "+ CCSUtil.toString(physical_dest)); + sendUnicast(physical_dest, buf, offset, length); + } + } + } + // CCS end } @@ -1574,6 +1607,11 @@ public boolean addPhysicalAddressToCache(Address logical_addr, PhysicalAddress p } protected boolean addPhysicalAddressToCache(Address logical_addr, PhysicalAddress physical_addr, boolean overwrite) { + // CCS begin + if (ccs_prop_physical.isSet() && !Objects.equals(local_addr, logical_addr)) { + log.out(ccs_prop_physical.getLevel(), "TP: adding physical address to cache: Logical: "+ CCSUtil.toString(logical_addr) +", Physical: "+ CCSUtil.toString(physical_addr)); + } + // CCS end return logical_addr != null && physical_addr != null && overwrite? logical_addr_cache.add(logical_addr, physical_addr) : logical_addr_cache.addIfAbsent(logical_addr, physical_addr); } diff --git a/src/org/jgroups/protocols/UDP.java b/src/org/jgroups/protocols/UDP.java index 5982c5fe08..04aab55696 100644 --- a/src/org/jgroups/protocols/UDP.java +++ b/src/org/jgroups/protocols/UDP.java @@ -19,7 +19,9 @@ import java.util.Formatter; import java.util.List; import java.util.Map; +import java.util.logging.Level; import org.jgroups.ccs.CCSProperty; +import org.jgroups.ccs.CCSUtil; import org.jgroups.ccs.MessageGate; @@ -515,14 +517,38 @@ protected PacketReceiver[] createReceivers(int num, DatagramSocket sock, String protected IpAddress createLocalAddress() { - if(sock == null || sock.isClosed()) + + // CCS begin + Level level = ccs_prop_physical.getLevel(); + + if (sock == null || sock.isClosed()) { + log.out(level, "Trying to create physical address of null or closed socket %s", sock); return null; - if(external_addr != null) { - if(external_port > 0) + } + if (external_addr != null) { + if (external_port > 0) { return new IpAddress(external_addr, external_port); + } return new IpAddress(external_addr, sock.getLocalPort()); } - return new IpAddress(sock.getLocalAddress(), sock.getLocalPort()); + IpAddress out = new IpAddress(sock.getLocalAddress(), sock.getLocalPort()); + if (log.isEnabled(level)) { + log.out(level, "Created physical address: "+ CCSUtil.toString(out)); + } + return out; + + +// if(sock == null || sock.isClosed()) +// return null; +// if(external_addr != null) { +// if(external_port > 0) +// return new IpAddress(external_addr, external_port); +// return new IpAddress(external_addr, sock.getLocalPort()); +// } +// return new IpAddress(sock.getLocalAddress(), sock.getLocalPort()); + + // CCS end + } protected