Skip to content

Commit

Permalink
misc-3 (#198)
Browse files Browse the repository at this point in the history
* TCP_NODELAY=true

* update H2, update jooq. explicit use of H2 dialect with jooq
  • Loading branch information
Hellblazer authored May 18, 2024
1 parent 0f53aa9 commit 36fcde6
Show file tree
Hide file tree
Showing 15 changed files with 59 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,40 @@
*/
package com.salesforce.apollo.delphinius;

import static com.salesforce.apollo.delphinius.schema.tables.Edge.EDGE;
import org.h2.api.Trigger;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;

import java.sql.Connection;
import java.sql.SQLException;

import org.h2.api.Trigger;
import org.jooq.impl.DSL;
import static com.salesforce.apollo.delphinius.schema.tables.Edge.EDGE;

/**
* @author hal.hildebrand
*
*/
public class DomainMaintenance implements Trigger {

private String type;

@Override
public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLException {
var dsl = DSL.using(conn);
var dsl = DSL.using(conn, SQLDialect.H2);
dsl.deleteFrom(EDGE)
.where(EDGE.TYPE.eq(type))
.and(EDGE.PARENT.eq((Long) oldRow[0]).or(EDGE.CHILD.eq((Long) oldRow[0])))
.execute();
}

@Override
public void init(Connection conn, String schemaName, String triggerName, String tableName, boolean before,
int type) throws SQLException {
public void init(Connection conn, String schemaName, String triggerName, String tableName, boolean before, int type)
throws SQLException {
assert !before && type == DELETE : "this is an after delete trigger";
this.type = switch (tableName.toLowerCase()) {
case "object" -> Oracle.OBJECT_TYPE;
case "relation" -> Oracle.RELATION_TYPE;
case "subject" -> Oracle.SUBJECT_TYPE;
default -> throw new IllegalArgumentException("Unexpected value: " + tableName.toLowerCase());
case "object" -> Oracle.OBJECT_TYPE;
case "relation" -> Oracle.RELATION_TYPE;
case "subject" -> Oracle.SUBJECT_TYPE;
default -> throw new IllegalArgumentException("Unexpected value: " + tableName.toLowerCase());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.ServerDomainSocketChannel;
Expand Down Expand Up @@ -192,6 +193,7 @@ public void register(SubContext context) {
private ManagedChannel handler(DomainSocketAddress address) {
return NettyChannelBuilder.forAddress(address)
.executor(executor)
.withOption(ChannelOption.TCP_NODELAY, true)
.eventLoopGroup(eventLoopGroup)
.channelType(clientChannelType)
.keepAliveTime(1, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import org.slf4j.Logger;
Expand Down Expand Up @@ -85,6 +86,7 @@ public RouterImpl router(ServerConnectionCache.Builder cacheBuilder, Supplier<Li
.executor(executor)
.protocolNegotiator(new DomainSocketNegotiator(IMPL))
.channelType(IMPL.getServerDomainSocketChannelClass())
.withChildOption(ChannelOption.TCP_NODELAY, true)
.workerEventLoopGroup(IMPL.getEventLoopGroup())
.bossEventLoopGroup(IMPL.getEventLoopGroup())
.intercept(new DomainSocketServerInterceptor())
Expand Down Expand Up @@ -129,6 +131,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
}
};
final var builder = NettyChannelBuilder.forAddress(bridge)
.withOption(ChannelOption.TCP_NODELAY, true)
.executor(executor)
.eventLoopGroup(eventLoopGroup)
.channelType(channelType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.ClientAuth;

import java.net.SocketAddress;
Expand All @@ -33,6 +34,7 @@ public MtlsClient(SocketAddress address, ClientAuth clientAuth, String alias, Cl
Limiter<GrpcClientRequestContext> limiter = new GrpcClientLimiterBuilder().blockOnLimit(false).build();
channel = NettyChannelBuilder.forAddress(address)
.executor(executor)
.withOption(ChannelOption.TCP_NODELAY, true)
.sslContext(supplier.forClient(clientAuth, alias, validator, MtlsServer.TL_SV1_3))
.intercept(new ConcurrencyLimitClientInterceptor(limiter,
() -> Status.RESOURCE_EXHAUSTED.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.grpc.netty.DomainSocketNegotiatorHandler.DomainSocketNegotiator;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;

Expand Down Expand Up @@ -86,6 +87,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
};
return NettyChannelBuilder.forAddress(address)
.executor(executor)
.withOption(ChannelOption.TCP_NODELAY, true)
.eventLoopGroup(eventLoopGroup)
.channelType(channelType)
.keepAliveTime(keepAlive.toNanos(), TimeUnit.NANOSECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -114,6 +115,7 @@ public void smokin() throws Exception {

private ManagedChannel handler(DomainSocketAddress address) {
return NettyChannelBuilder.forAddress(address)
.withOption(ChannelOption.TCP_NODELAY, true)
.executor(executor)
.eventLoopGroup(eventLoopGroup)
.channelType(channelType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import org.joou.ULong;
Expand Down Expand Up @@ -148,6 +149,7 @@ public void smokin() throws Exception {

private ManagedChannel handler(DomainSocketAddress address) {
return NettyChannelBuilder.forAddress(address)
.withOption(ChannelOption.TCP_NODELAY, true)
.executor(executor)
.eventLoopGroup(eventLoopGroup)
.channelType(channelType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.grpc.netty.DomainSocketNegotiatorHandler;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import org.joou.ULong;
Expand Down Expand Up @@ -84,6 +85,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P
.protocolNegotiator(
new DomainSocketNegotiatorHandler.DomainSocketNegotiator(
IMPL))
.withChildOption(ChannelOption.TCP_NODELAY, true)
.channelType(IMPL.getServerDomainSocketChannelClass())
.workerEventLoopGroup(portalEventLoopGroup)
.bossEventLoopGroup(portalEventLoopGroup)
Expand All @@ -94,6 +96,7 @@ public ProcessContainerDomain(Digest group, ControlledIdentifierMember member, P
outerContextService = NettyServerBuilder.forAddress(outerContextEndpoint)
.protocolNegotiator(
new DomainSocketNegotiatorHandler.DomainSocketNegotiator(IMPL))
.withChildOption(ChannelOption.TCP_NODELAY, true)
.channelType(IMPL.getServerDomainSocketChannelClass())
.addService(new DemesneKERLServer(dht, null))
.addService(outerContextService())
Expand Down Expand Up @@ -207,6 +210,7 @@ protected void stopServices() {

private ManagedChannel handler(DomainSocketAddress address) {
return NettyChannelBuilder.forAddress(address)
.withOption(ChannelOption.TCP_NODELAY, true)
.executor(executor)
.eventLoopGroup(clientEventLoopGroup)
.channelType(channelType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import org.slf4j.Logger;
Expand Down Expand Up @@ -189,6 +190,7 @@ private CachingKERL kerlFrom(File address) {
ManagedChannel channel = null;
try {
channel = NettyChannelBuilder.forAddress(serverAddress)
.withOption(ChannelOption.TCP_NODELAY, true)
.executor(executor)
.intercept(clientInterceptor(kerlContext))
.eventLoopGroup(eventLoopGroup)
Expand All @@ -210,6 +212,7 @@ private CachingKERL kerlFrom(File address) {

private OuterContextClient outerFrom(File address) {
return new OuterContextClient(NettyChannelBuilder.forAddress(new DomainSocketAddress(address))
.withOption(ChannelOption.TCP_NODELAY, true)
.executor(executor)
.intercept(clientInterceptor(context.getId()))
.eventLoopGroup(eventLoopGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.ServerDomainSocketChannel;
Expand Down Expand Up @@ -149,7 +150,9 @@ public void portal() throws Exception {
.bossEventLoopGroup(
IMPL.getEventLoopGroup())
.intercept(
new DomainSocketServerInterceptor()),
new DomainSocketServerInterceptor())
.withChildOption(
ChannelOption.TCP_NODELAY, true),
s -> handler(portalEndpoint), bridge, Duration.ofMillis(1), s -> routes.get(s));

final var endpoint1 = new DomainSocketAddress(Path.of("target").resolve(UUID.randomUUID().toString()).toFile());
Expand Down Expand Up @@ -280,6 +283,7 @@ public void register(SubContext context) {

private ManagedChannel handler(DomainSocketAddress address) {
return NettyChannelBuilder.forAddress(address)
.withOption(ChannelOption.TCP_NODELAY, true)
.executor(executor)
.eventLoopGroup(eventLoopGroup)
.channelType(clientChannelType)
Expand Down
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
<micrometer.version>1.9.5</micrometer.version>
<jackson.version>2.15.2</jackson.version>
<dropwizard.version>4.0.5</dropwizard.version>
<h2.version>2.2.220</h2.version>
<jooq.version>3.17.2</jooq.version>
<h2.version>2.2.224</h2.version>
<jooq.version>3.18.15</jooq.version>
<bc.version>1.78</bc.version>
<logback.version>1.4.12</logback.version>
<grpc.version>1.62.2</grpc.version>
Expand Down Expand Up @@ -553,6 +553,11 @@
<version>${graal.vm.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<version>4.0.2</version>
</dependency>

<!-- Test dependencies only below this line! -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException;
import org.jooq.DSLContext;
import org.jooq.Record1;
import org.jooq.SQLDialect;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;
import org.joou.ULong;
Expand Down Expand Up @@ -70,7 +71,7 @@ abstract public class UniKERL implements DigestKERL {

public UniKERL(Connection connection, DigestAlgorithm digestAlgorithm) {
this.digestAlgorithm = digestAlgorithm;
this.dsl = DSL.using(connection);
this.dsl = DSL.using(connection, SQLDialect.H2);
processor = new KeyEventProcessor(this);
}

Expand Down Expand Up @@ -243,7 +244,7 @@ public static void appendAttachments(Connection connection, List<byte[]> attachm
log.error("Error deserializing attachment event", e);
return;
}
append(DSL.using(connection), event);
append(DSL.using(connection, SQLDialect.H2), event);
});
}

Expand Down Expand Up @@ -363,7 +364,7 @@ public static void initialize(DSLContext dsl) {
}

public static void initializeKERL(Connection connection) {
initialize(DSL.using(connection));
initialize(DSL.using(connection, SQLDialect.H2));
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion thoth/src/main/java/com/salesforce/apollo/thoth/KerlDHT.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import liquibase.ui.UIService;
import org.h2.jdbcx.JdbcConnectionPool;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.joou.ULong;
import org.slf4j.Logger;
Expand Down Expand Up @@ -956,7 +957,7 @@ private void reconcile(ScheduledExecutorService scheduler, Duration duration) {

private void updateLocationHash(Identifier identifier) {
try (var connection = connectionPool.getConnection()) {
var dsl = DSL.using(connection);
var dsl = DSL.using(connection, SQLDialect.H2);
updateLocationHash(identifier, kerl.getDigestAlgorithm(), dsl);
} catch (SQLException e) {
log.error("Cannot update location hash for: {} on: {}", identifier, member.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.h2.jdbcx.JdbcConnectionPool;
import org.jooq.DSLContext;
import org.jooq.Record1;
import org.jooq.SQLDialect;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;
import org.joou.ULong;
Expand Down Expand Up @@ -216,7 +217,7 @@ public static void upsert(DSLContext dsl, Validations validations, Digest member
public Biff populate(long seed, CombinedIntervals intervals, double fpr) {
DigestBloomFilter bff = new DigestBloomFilter(seed, Math.max(cardinality(), 100), fpr);
try (var connection = connectionPool.getConnection()) {
var dsl = DSL.using(connection);
var dsl = DSL.using(connection, SQLDialect.H2);
eventDigestsIn(intervals, dsl).forEach(d -> {
log.trace("Adding reconcile digest: {} on: {}", d, member);
bff.add(d);
Expand All @@ -239,7 +240,7 @@ public Update.Builder reconcile(Intervals intervals, DigestKERL kerl) {
var biff = BloomFilter.from(intervals.getHave());
var update = Update.newBuilder();
try (var connection = connectionPool.getConnection()) {
var dsl = DSL.using(connection);
var dsl = DSL.using(connection, SQLDialect.H2);
intervals.getIntervalsList()
.stream()
.map(KeyInterval::new)
Expand Down Expand Up @@ -274,7 +275,7 @@ public void update(List<KeyEventWithAttachmentAndValidations_> events, KERL.Appe
final var digestAlgorithm = kerl.getDigestAlgorithm();

try (var connection = connectionPool.getConnection()) {
var dsl = DSL.using(connection);
var dsl = DSL.using(connection, SQLDialect.H2);
dsl.transaction(ctx -> {
var context = DSL.using(ctx);
for (var evente_ : events) {
Expand All @@ -298,7 +299,7 @@ public void update(List<KeyEventWithAttachmentAndValidations_> events, KERL.Appe
// the estimated cardinality of the number of key events
private int cardinality() {
try (var connection = connectionPool.getConnection()) {
var dsl = DSL.using(connection);
var dsl = DSL.using(connection, SQLDialect.H2);
return dsl.fetchCount(dsl.selectFrom(IDENTIFIER));
} catch (SQLException e) {
log.error("Unable to provide estimated cardinality, cannot acquire JDBC connection on: {}", member, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import liquibase.exception.LiquibaseException;
import liquibase.resource.ClassLoaderResourceAccessor;
import org.h2.jdbcx.JdbcConnectionPool;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -80,7 +81,8 @@ public void smokin() throws Exception {

var identifierA = stereotomyA.newIdentifier();
try (var connection = connectionPoolA.getConnection()) {
KerlDHT.updateLocationHash(identifierA.getIdentifier(), digestAlgorithm, DSL.using(connection));
KerlDHT.updateLocationHash(identifierA.getIdentifier(), digestAlgorithm,
DSL.using(connection, SQLDialect.H2));
}

identifierA.rotate();
Expand All @@ -94,7 +96,8 @@ public void smokin() throws Exception {
identifierB.rotate();
var digestB = identifierB.getLastEstablishingEvent().getCoordinates().getDigest();
try (var connection = connectionPoolB.getConnection()) {
KerlDHT.updateLocationHash(identifierB.getIdentifier(), digestAlgorithm, DSL.using(connection));
KerlDHT.updateLocationHash(identifierB.getIdentifier(), digestAlgorithm,
DSL.using(connection, SQLDialect.H2));
}
var biffB = spaceB.populate(0x1638, new CombinedIntervals(
new KeyInterval(digestAlgorithm.getOrigin(), digestAlgorithm.getLast())), 0.000125);
Expand Down

0 comments on commit 36fcde6

Please sign in to comment.