Skip to content

Commit

Permalink
hallelujah! build runs green
Browse files Browse the repository at this point in the history
View change protocol now stable.
  • Loading branch information
Hellblazer committed Apr 28, 2024
1 parent b530290 commit 1782b3f
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ private void checkpointCompletion(int threshold, Initial mostRecent) {
.filter(cb -> cb.getBlock().hasReconfigure())
.map(cb -> new HashedCertifiedBlock(params.digestAlgorithm(), cb))
.forEach(reconfigure -> store.put(reconfigure));
scheduleViewChainCompletion(new AtomicReference<>(checkpointView.height()), ULong.valueOf(0));
var lastReconfig = ULong.valueOf(checkpointView.block.getHeader().getLastReconfig());
var zero = ULong.valueOf(0);
if (lastReconfig.equals(zero)) {
viewChainSynchronized.complete(true);
} else {
scheduleViewChainCompletion(new AtomicReference<>(lastReconfig), zero);
}
}

private boolean completeAnchor(Optional<Blocks> futureSailor, AtomicReference<ULong> start, ULong end,
Expand Down Expand Up @@ -214,7 +220,9 @@ private Blocks completeViewChain(Terminal link, AtomicReference<ULong> start, UL
long seed = Entropy.nextBitsStreamLong();
ULongBloomFilter blocksBff = new BloomFilter.ULongBloomFilter(seed, params.bootstrap().maxViewBlocks() * 2,
params.combine().falsePositiveRate());
start.set(store.lastViewChainFrom(start.get()));
var lastViewChain = store.lastViewChainFrom(start.get());
assert lastViewChain != null : "last view chain from: " + start.get() + " is null";
start.set(lastViewChain);
store.viewChainFrom(start.get(), end).forEachRemaining(h -> blocksBff.add(h));
BlockReplication replication = BlockReplication.newBuilder()
.setBlocksBff(blocksBff.toBff())
Expand Down Expand Up @@ -468,7 +476,7 @@ private void validateAnchor() {
try {
store.validate(anchor.height(), to);
anchorSynchronized.complete(true);
log.info("Anchor chain to checkpoint synchronized on: {}", params.member().getId());
log.info("Anchor chain to checkpoint synchronized to: {} on: {}", to, params.member().getId());
} catch (Throwable e) {
log.error("Anchor chain from: {} to: {} does not validate on: {}", anchor.height(), to,
params.member().getId(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public byte[] block(ULong height) {
public Iterator<ULong> blocksFrom(ULong from, ULong to, int max) {
return new Iterator<>() {
ULong next;
int remaining = max;
int remaining = max;

{
next = from == null ? ULong.valueOf(0) : from;
Expand Down Expand Up @@ -247,7 +247,7 @@ public ULong lastViewChainFrom(ULong height) {
}
next = viewChain.get(next);
}
return last;
return next == null ? last : next;
}

public void put(HashedCertifiedBlock cb) {
Expand Down Expand Up @@ -353,8 +353,8 @@ public Iterator<ULong> viewChainFrom(ULong from, ULong to) {
ULong next;

{
next = viewChain.get(from);
if (!viewChain.containsKey(next)) {
next = from;
if (!viewChain.containsKey(from)) {
next = null;
}
}
Expand Down Expand Up @@ -412,10 +412,12 @@ private void put(Digest hash, Block block) {
hashes.put(height, hash);
hashToHeight.put(hash, height);
if (block.hasReconfigure() || block.hasGenesis()) {
log.trace("insert view chain: {}:{}", height, hash);
viewChain.put(ULong.valueOf(block.getHeader().getHeight()),
ULong.valueOf(block.getHeader().getLastReconfig()));
} else {
log.trace("insert: {}:{}", height, hash);
}
log.trace("insert: {}:{}", height, hash);
}

private <T> T transactionally(Callable<T> action) throws ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
class Transactioneer {
private final static Random entropy = new Random();
private final static Logger log = LoggerFactory.getLogger(Transactioneer.class);
private final Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final ScheduledExecutorService scheduler;
private final AtomicInteger completed = new AtomicInteger();
private final CountDownLatch countdown;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void smoke() throws Exception {
var context = new StaticContext<Member>(DigestAlgorithm.DEFAULT.getOrigin(), 0.2, members, 3);
TestChain testChain = new TestChain(bootstrapStore);
testChain.genesis()
.checkpoint()
.userBlocks(10)
.viewChange()
.userBlocks(10)
Expand Down
4 changes: 2 additions & 2 deletions choam/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.chiralbehaviors.tron" level="trace" additivity="false">
<logger name="com.chiralbehaviors.tron" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam" level="trace" additivity="false">
<logger name="com.salesforce.apollo.choam.CHOAM" level="info" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

Expand Down
8 changes: 8 additions & 0 deletions sql-state/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -81,9 +83,10 @@ abstract public class AbstractLifecycleTest {
protected SigningMember testSubject;
protected int toleranceLevel;
DynamicContextImpl<Member> context;
private File baseDir;
private File checkpointDirBase;
private List<Transactioneer> transactioneers;
private File baseDir;
private File checkpointDirBase;
private List<Transactioneer> transactioneers;
private ScheduledExecutorService scheduler;

{
var txns = MigrationTest.initializeBookSchema();
Expand Down Expand Up @@ -115,6 +118,10 @@ public void after() throws Exception {
choams.values().forEach(e -> e.stop());
choams = null;
}
if (scheduler != null) {
scheduler.shutdownNow();
scheduler = null;
}
updaters.values().forEach(up -> up.close());
updaters.clear();
parameters.clear();
Expand All @@ -125,6 +132,7 @@ public void after() throws Exception {

@BeforeEach
public void before() throws Exception {
scheduler = Executors.newScheduledThreadPool(10);
checkpointOccurred = new CountDownLatch(CARDINALITY);
checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong());
Utils.clean(checkpointDirBase);
Expand Down Expand Up @@ -301,7 +309,8 @@ protected void pre() throws Exception {
.toList()));

var mutator = txneer.getMutator(choams.get(members.getLast().getId()).getSession());
transactioneers.add(new Transactioneer(() -> update(entropy, mutator), mutator, timeout, 1, countdown));
transactioneers.add(
new Transactioneer(scheduler, () -> update(entropy, mutator), mutator, timeout, 1, countdown));
System.out.println("Transaction member: " + members.getLast().getId());
System.out.println("Starting txns");
transactioneers.stream().forEach(e -> e.start());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class CHOAMTest {
private List<SigningMember> members;
private MetricRegistry registry;
private Map<Digest, Router> routers;
private ScheduledExecutorService scheduler;

private static Txn initialInsert() {
return Txn.newBuilder()
Expand All @@ -102,6 +103,10 @@ public void after() throws Exception {
choams.values().forEach(e -> e.stop());
choams = null;
}
if (scheduler != null) {
scheduler.shutdownNow();
scheduler = null;
}
updaters.values().forEach(up -> up.close());
updaters.clear();
members = null;
Expand All @@ -117,6 +122,7 @@ public void after() throws Exception {

@BeforeEach
public void before() throws Exception {
scheduler = Executors.newScheduledThreadPool(10);
registry = new MetricRegistry();
checkpointDirBase = new File("target/ct-chkpoints-" + Entropy.nextBitsStreamLong());
Utils.clean(checkpointDirBase);
Expand Down Expand Up @@ -184,7 +190,7 @@ public void submitMultiplTxn() throws Exception {
var mutator = e.getValue().getMutator(choams.get(e.getKey().getId()).getSession());
for (int i = 0; i < clientCount; i++) {
transactioneers.add(
new Transactioneer(() -> update(entropy, mutator), mutator, timeout, max, countdown));
new Transactioneer(scheduler, () -> update(entropy, mutator), mutator, timeout, max, countdown));
}
});
System.out.println("Starting txns");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
import java.util.function.Supplier;

class Transactioneer {
private final static Random entropy = new Random();
private final static Logger log = LoggerFactory.getLogger(Transactioneer.class);
private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, Thread.ofVirtual()
.factory());
private final static Random entropy = new Random();
private final static Logger log = LoggerFactory.getLogger(Transactioneer.class);
private final static Executor executor = Executors.newVirtualThreadPerTaskExecutor();

private final Executor executor = Executors.newVirtualThreadPerTaskExecutor();
private final ScheduledExecutorService scheduler;
private final AtomicInteger completed = new AtomicInteger();
private final CountDownLatch countdown;
private final AtomicReference<CompletableFuture> inFlight = new AtomicReference<>();
Expand All @@ -36,7 +35,9 @@ class Transactioneer {
private final Supplier<Txn> update;
private final AtomicBoolean finished = new AtomicBoolean();

public Transactioneer(Supplier<Txn> update, Mutator mutator, Duration timeout, int max, CountDownLatch countdown) {
public Transactioneer(ScheduledExecutorService scheduler, Supplier<Txn> update, Mutator mutator, Duration timeout,
int max, CountDownLatch countdown) {
this.scheduler = scheduler;
this.update = update;
this.timeout = timeout;
this.max = max;
Expand Down
10 changes: 5 additions & 5 deletions sql-state/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.chiralbehaviors.tron" level="trace" additivity="false">
<logger name="com.chiralbehaviors.tron" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

Expand All @@ -36,15 +36,15 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.state" level="trace" additivity="false">
<logger name="com.salesforce.apollo.state" level="info" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam" level="trace" additivity="false">
<logger name="com.salesforce.apollo.choam" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam.CHOAM" level="trace" additivity="false">
<logger name="com.salesforce.apollo.choam.CHOAM" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

Expand All @@ -60,7 +60,7 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.chiralbehaviors.tron" level="trace" additivity="false">
<logger name="com.chiralbehaviors.tron" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

Expand Down

0 comments on commit 1782b3f

Please sign in to comment.