From 36fcde63df82e22cf92786847eae3188c555beee Mon Sep 17 00:00:00 2001 From: Constantine Date: Sat, 18 May 2024 11:19:48 -0700 Subject: [PATCH] misc-3 (#198) * TCP_NODELAY=true * update H2, update jooq. explicit use of H2 dialect with jooq --- .../apollo/delphinius/DomainMaintenance.java | 22 +++++++++---------- .../apollo/demesnes/DemesneSmoke.java | 2 ++ .../apollo/archipelago/Enclave.java | 3 +++ .../apollo/archipelago/MtlsClient.java | 2 ++ .../salesforce/apollo/archipelago/Portal.java | 2 ++ .../apollo/archipelago/DemultiplexerTest.java | 2 ++ .../apollo/archipelago/EnclaveTest.java | 2 ++ .../apollo/model/ProcessContainerDomain.java | 4 ++++ .../apollo/model/demesnes/DemesneImpl.java | 3 +++ .../apollo/model/demesnes/DemesneTest.java | 6 ++++- pom.xml | 9 ++++++-- .../apollo/stereotomy/db/UniKERL.java | 7 +++--- .../com/salesforce/apollo/thoth/KerlDHT.java | 3 ++- .../salesforce/apollo/thoth/KerlSpace.java | 9 ++++---- .../apollo/thoth/KerlSpaceTest.java | 7 ++++-- 15 files changed, 59 insertions(+), 24 deletions(-) diff --git a/delphinius/src/main/java/com/salesforce/apollo/delphinius/DomainMaintenance.java b/delphinius/src/main/java/com/salesforce/apollo/delphinius/DomainMaintenance.java index d1e9776965..49e5bb2cce 100644 --- a/delphinius/src/main/java/com/salesforce/apollo/delphinius/DomainMaintenance.java +++ b/delphinius/src/main/java/com/salesforce/apollo/delphinius/DomainMaintenance.java @@ -6,17 +6,17 @@ */ package com.salesforce.apollo.delphinius; -import static com.salesforce.apollo.delphinius.schema.tables.Edge.EDGE; +import org.h2.api.Trigger; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; import java.sql.Connection; import java.sql.SQLException; -import org.h2.api.Trigger; -import org.jooq.impl.DSL; +import static com.salesforce.apollo.delphinius.schema.tables.Edge.EDGE; /** * @author hal.hildebrand - * */ public class DomainMaintenance implements Trigger { @@ -24,7 +24,7 @@ public class DomainMaintenance implements Trigger { @Override public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLException { - var dsl = DSL.using(conn); + var dsl = DSL.using(conn, SQLDialect.H2); dsl.deleteFrom(EDGE) .where(EDGE.TYPE.eq(type)) .and(EDGE.PARENT.eq((Long) oldRow[0]).or(EDGE.CHILD.eq((Long) oldRow[0]))) @@ -32,14 +32,14 @@ public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLEx } @Override - public void init(Connection conn, String schemaName, String triggerName, String tableName, boolean before, - int type) throws SQLException { + public void init(Connection conn, String schemaName, String triggerName, String tableName, boolean before, int type) + throws SQLException { assert !before && type == DELETE : "this is an after delete trigger"; this.type = switch (tableName.toLowerCase()) { - case "object" -> Oracle.OBJECT_TYPE; - case "relation" -> Oracle.RELATION_TYPE; - case "subject" -> Oracle.SUBJECT_TYPE; - default -> throw new IllegalArgumentException("Unexpected value: " + tableName.toLowerCase()); + case "object" -> Oracle.OBJECT_TYPE; + case "relation" -> Oracle.RELATION_TYPE; + case "subject" -> Oracle.SUBJECT_TYPE; + default -> throw new IllegalArgumentException("Unexpected value: " + tableName.toLowerCase()); }; } diff --git a/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java b/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java index 7662586301..4c13cd5456 100644 --- a/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java +++ b/isolates/src/test/java/com/salesforce/apollo/demesnes/DemesneSmoke.java @@ -43,6 +43,7 @@ import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import io.netty.channel.unix.ServerDomainSocketChannel; @@ -192,6 +193,7 @@ public void register(SubContext context) { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) .executor(executor) + .withOption(ChannelOption.TCP_NODELAY, true) .eventLoopGroup(eventLoopGroup) .channelType(clientChannelType) .keepAliveTime(1, TimeUnit.SECONDS) diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java index 20e65095bd..5ae364d6c9 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Enclave.java @@ -20,6 +20,7 @@ import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import org.slf4j.Logger; @@ -85,6 +86,7 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier
  • responseListener, Metadata headers) { } }; final var builder = NettyChannelBuilder.forAddress(bridge) + .withOption(ChannelOption.TCP_NODELAY, true) .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(channelType) diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java index fa53429684..58db9ec8e7 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/MtlsClient.java @@ -15,6 +15,7 @@ import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.netty.NettyChannelBuilder; +import io.netty.channel.ChannelOption; import io.netty.handler.ssl.ClientAuth; import java.net.SocketAddress; @@ -33,6 +34,7 @@ public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, Cl Limiter limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build(); channel = NettyChannelBuilder.forAddress(address) .executor(executor) + .withOption(ChannelOption.TCP_NODELAY, true) .sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3)) .intercept(new ConcurrencyLimitClientInterceptor(limiter, () -> Status.RESOURCE_EXHAUSTED.withDescription( diff --git a/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java b/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java index 74f0eccf8b..f0a04955ab 100644 --- a/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java +++ b/memberships/src/main/java/com/salesforce/apollo/archipelago/Portal.java @@ -15,6 +15,7 @@ import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; @@ -86,6 +87,7 @@ public void start(Listener responseListener, Metadata headers) { }; return NettyChannelBuilder.forAddress(address) .executor(executor) + .withOption(ChannelOption.TCP_NODELAY, true) .eventLoopGroup(eventLoopGroup) .channelType(channelType) .keepAliveTime(keepAlive.toNanos(), TimeUnit.NANOSECONDS) diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/DemultiplexerTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/DemultiplexerTest.java index c659a97777..34b5bc66dd 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/DemultiplexerTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/DemultiplexerTest.java @@ -25,6 +25,7 @@ import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import org.junit.jupiter.api.AfterEach; @@ -114,6 +115,7 @@ public void smokin() throws Exception { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) + .withOption(ChannelOption.TCP_NODELAY, true) .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(channelType) diff --git a/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java b/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java index 68e7dd9041..55e8c78f25 100644 --- a/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java +++ b/memberships/src/test/java/com/salesforce/apollo/archipelago/EnclaveTest.java @@ -23,6 +23,7 @@ import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import org.joou.ULong; @@ -148,6 +149,7 @@ public void smokin() throws Exception { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) + .withOption(ChannelOption.TCP_NODELAY, true) .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(channelType) diff --git a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java index 2052db4f5d..6a8f61c833 100644 --- a/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java +++ b/model/src/main/java/com/salesforce/apollo/model/ProcessContainerDomain.java @@ -30,6 +30,7 @@ import io.grpc.netty.DomainSocketNegotiatorHandler; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import org.joou.ULong; @@ -84,6 +85,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P .protocolNegotiator( new DomainSocketNegotiatorHandler.DomainSocketNegotiator( IMPL)) + .withChildOption(ChannelOption.TCP_NODELAY, true) .channelType(IMPL.getServerDomainSocketChannelClass()) .workerEventLoopGroup(portalEventLoopGroup) .bossEventLoopGroup(portalEventLoopGroup) @@ -94,6 +96,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P outerContextService = NettyServerBuilder.forAddress(outerContextEndpoint) .protocolNegotiator( new DomainSocketNegotiatorHandler.DomainSocketNegotiator(IMPL)) + .withChildOption(ChannelOption.TCP_NODELAY, true) .channelType(IMPL.getServerDomainSocketChannelClass()) .addService(new DemesneKERLServer(dht, null)) .addService(outerContextService()) @@ -207,6 +210,7 @@ protected void stopServices() { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) + .withOption(ChannelOption.TCP_NODELAY, true) .executor(executor) .eventLoopGroup(clientEventLoopGroup) .channelType(channelType) diff --git a/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java b/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java index ab13a60c5b..9e15d8ebe1 100644 --- a/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java +++ b/model/src/main/java/com/salesforce/apollo/model/demesnes/DemesneImpl.java @@ -38,6 +38,7 @@ import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import org.slf4j.Logger; @@ -189,6 +190,7 @@ private CachingKERL kerlFrom(File address) { ManagedChannel channel = null; try { channel = NettyChannelBuilder.forAddress(serverAddress) + .withOption(ChannelOption.TCP_NODELAY, true) .executor(executor) .intercept(clientInterceptor(kerlContext)) .eventLoopGroup(eventLoopGroup) @@ -210,6 +212,7 @@ private CachingKERL kerlFrom(File address) { private OuterContextClient outerFrom(File address) { return new OuterContextClient(NettyChannelBuilder.forAddress(new DomainSocketAddress(address)) + .withOption(ChannelOption.TCP_NODELAY, true) .executor(executor) .intercept(clientInterceptor(context.getId())) .eventLoopGroup(eventLoopGroup) diff --git a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java index aa24c12b9a..bd31b9010a 100644 --- a/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java +++ b/model/src/test/java/com/salesforce/apollo/model/demesnes/DemesneTest.java @@ -45,6 +45,7 @@ import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; import io.netty.channel.unix.ServerDomainSocketChannel; @@ -149,7 +150,9 @@ public void portal() throws Exception { .bossEventLoopGroup( IMPL.getEventLoopGroup()) .intercept( - new DomainSocketServerInterceptor()), + new DomainSocketServerInterceptor()) + .withChildOption( + ChannelOption.TCP_NODELAY, true), s -> handler(portalEndpoint), bridge, Duration.ofMillis(1), s -> routes.get(s)); final var endpoint1 = new DomainSocketAddress(Path.of("target").resolve(UUID.randomUUID().toString()).toFile()); @@ -280,6 +283,7 @@ public void register(SubContext context) { private ManagedChannel handler(DomainSocketAddress address) { return NettyChannelBuilder.forAddress(address) + .withOption(ChannelOption.TCP_NODELAY, true) .executor(executor) .eventLoopGroup(eventLoopGroup) .channelType(clientChannelType) diff --git a/pom.xml b/pom.xml index fd128c7acf..25af6eb157 100644 --- a/pom.xml +++ b/pom.xml @@ -37,8 +37,8 @@ 1.9.5 2.15.2 4.0.5 - 2.2.220 - 3.17.2 + 2.2.224 + 3.18.15 1.78 1.4.12 1.62.2 @@ -553,6 +553,11 @@ ${graal.vm.version} provided + + jakarta.xml.bind + jakarta.xml.bind-api + 4.0.2 + diff --git a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java index 3923cb8ce4..484ccd8dcc 100644 --- a/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java +++ b/stereotomy/src/main/java/com/salesforce/apollo/stereotomy/db/UniKERL.java @@ -29,6 +29,7 @@ import org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException; import org.jooq.DSLContext; import org.jooq.Record1; +import org.jooq.SQLDialect; import org.jooq.exception.DataAccessException; import org.jooq.impl.DSL; import org.joou.ULong; @@ -70,7 +71,7 @@ abstract public class UniKERL implements DigestKERL { public UniKERL(Connection connection, DigestAlgorithm digestAlgorithm) { this.digestAlgorithm = digestAlgorithm; - this.dsl = DSL.using(connection); + this.dsl = DSL.using(connection, SQLDialect.H2); processor = new KeyEventProcessor(this); } @@ -243,7 +244,7 @@ public static void appendAttachments(Connection connection, List attachm log.error("Error deserializing attachment event", e); return; } - append(DSL.using(connection), event); + append(DSL.using(connection, SQLDialect.H2), event); }); } @@ -363,7 +364,7 @@ public static void initialize(DSLContext dsl) { } public static void initializeKERL(Connection connection) { - initialize(DSL.using(connection)); + initialize(DSL.using(connection, SQLDialect.H2)); } @Override diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java index 9a4d9e65d8..b029eee4a6 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java @@ -58,6 +58,7 @@ import liquibase.ui.UIService; import org.h2.jdbcx.JdbcConnectionPool; import org.jooq.DSLContext; +import org.jooq.SQLDialect; import org.jooq.impl.DSL; import org.joou.ULong; import org.slf4j.Logger; @@ -956,7 +957,7 @@ private void reconcile(ScheduledExecutorService scheduler, Duration duration) { private void updateLocationHash(Identifier identifier) { try (var connection = connectionPool.getConnection()) { - var dsl = DSL.using(connection); + var dsl = DSL.using(connection, SQLDialect.H2); updateLocationHash(identifier, kerl.getDigestAlgorithm(), dsl); } catch (SQLException e) { log.error("Cannot update location hash for: {} on: {}", identifier, member.getId()); diff --git a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java index 88d7b8cfef..7efd785d57 100644 --- a/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java +++ b/thoth/src/main/java/com/salesforce/apollo/thoth/KerlSpace.java @@ -28,6 +28,7 @@ import org.h2.jdbcx.JdbcConnectionPool; import org.jooq.DSLContext; import org.jooq.Record1; +import org.jooq.SQLDialect; import org.jooq.exception.DataAccessException; import org.jooq.impl.DSL; import org.joou.ULong; @@ -216,7 +217,7 @@ public static void upsert(DSLContext dsl, Validations validations, Digest member public Biff populate(long seed, CombinedIntervals intervals, double fpr) { DigestBloomFilter bff = new DigestBloomFilter(seed, Math.max(cardinality(), 100), fpr); try (var connection = connectionPool.getConnection()) { - var dsl = DSL.using(connection); + var dsl = DSL.using(connection, SQLDialect.H2); eventDigestsIn(intervals, dsl).forEach(d -> { log.trace("Adding reconcile digest: {} on: {}", d, member); bff.add(d); @@ -239,7 +240,7 @@ public Update.Builder reconcile(Intervals intervals, DigestKERL kerl) { var biff = BloomFilter.from(intervals.getHave()); var update = Update.newBuilder(); try (var connection = connectionPool.getConnection()) { - var dsl = DSL.using(connection); + var dsl = DSL.using(connection, SQLDialect.H2); intervals.getIntervalsList() .stream() .map(KeyInterval::new) @@ -274,7 +275,7 @@ public void update(List events, KERL.Appe final var digestAlgorithm = kerl.getDigestAlgorithm(); try (var connection = connectionPool.getConnection()) { - var dsl = DSL.using(connection); + var dsl = DSL.using(connection, SQLDialect.H2); dsl.transaction(ctx -> { var context = DSL.using(ctx); for (var evente_ : events) { @@ -298,7 +299,7 @@ public void update(List events, KERL.Appe // the estimated cardinality of the number of key events private int cardinality() { try (var connection = connectionPool.getConnection()) { - var dsl = DSL.using(connection); + var dsl = DSL.using(connection, SQLDialect.H2); return dsl.fetchCount(dsl.selectFrom(IDENTIFIER)); } catch (SQLException e) { log.error("Unable to provide estimated cardinality, cannot acquire JDBC connection on: {}", member, e); diff --git a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlSpaceTest.java b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlSpaceTest.java index 7c0cde364a..f6431f27f1 100644 --- a/thoth/src/test/java/com/salesforce/apollo/thoth/KerlSpaceTest.java +++ b/thoth/src/test/java/com/salesforce/apollo/thoth/KerlSpaceTest.java @@ -18,6 +18,7 @@ import liquibase.exception.LiquibaseException; import liquibase.resource.ClassLoaderResourceAccessor; import org.h2.jdbcx.JdbcConnectionPool; +import org.jooq.SQLDialect; import org.jooq.impl.DSL; import org.junit.jupiter.api.Test; @@ -80,7 +81,8 @@ public void smokin() throws Exception { var identifierA = stereotomyA.newIdentifier(); try (var connection = connectionPoolA.getConnection()) { - KerlDHT.updateLocationHash(identifierA.getIdentifier(), digestAlgorithm, DSL.using(connection)); + KerlDHT.updateLocationHash(identifierA.getIdentifier(), digestAlgorithm, + DSL.using(connection, SQLDialect.H2)); } identifierA.rotate(); @@ -94,7 +96,8 @@ public void smokin() throws Exception { identifierB.rotate(); var digestB = identifierB.getLastEstablishingEvent().getCoordinates().getDigest(); try (var connection = connectionPoolB.getConnection()) { - KerlDHT.updateLocationHash(identifierB.getIdentifier(), digestAlgorithm, DSL.using(connection)); + KerlDHT.updateLocationHash(identifierB.getIdentifier(), digestAlgorithm, + DSL.using(connection, SQLDialect.H2)); } var biffB = spaceB.populate(0x1638, new CombinedIntervals( new KeyInterval(digestAlgorithm.getOrigin(), digestAlgorithm.getLast())), 0.000125);