Skip to content

Commit

Permalink
use Client as committee if fail quorum check. better logging.
Browse files Browse the repository at this point in the history
If quorum too small for consensus, default to client.

2 threads, 4 forks
  • Loading branch information
Hellblazer committed Apr 8, 2024
1 parent eb2f42b commit 73dedb9
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ jobs:
cache: 'maven'
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: ./mvnw -batch-mode clean install -Ppre --file pom.xml -Dforks=4
run: ./mvnw -batch-mode clean install -Ppre --file pom.xml -T 2 -Dforks=4
60 changes: 36 additions & 24 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,15 @@ public void onFailure() {
@Override
public Block produce(ULong height, Digest prev, Assemble assemble, HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
return Block.newBuilder()
.setHeader(
buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(),
checkpoint.hash, v.height(), v.hash))
.setAssemble(assemble)
.build();
var block = Block.newBuilder()
.setHeader(
buildHeader(params.digestAlgorithm(), assemble, prev, height, checkpoint.height(),
checkpoint.hash, v.height(), v.hash))
.setAssemble(assemble)
.build();
log.trace("Produced block: {} height: {} on: {}", block.getBodyCase(), block.getHeader().getHeight(),
params.member().getId());
return block;
}

@Override
Expand All @@ -538,9 +541,9 @@ public Block produce(ULong height, Digest prev, Executions executions, HashedBlo
}

@Override
public void publish(CertifiedBlock cb) {
public void publish(Digest hash, CertifiedBlock cb) {
log.info("Publishing: {} hash: {} height: {} certifications: {} on: {}", cb.getBlock().getBodyCase(),
cb.hashCode(), cb.getBlock().getHeader().getHeight(), cb.getCertificationsCount(),
hash, ULong.valueOf(cb.getBlock().getHeader().getHeight()), cb.getCertificationsCount(),
params.member().getId());
combine.publish(cb, true);
}
Expand All @@ -549,7 +552,11 @@ public void publish(CertifiedBlock cb) {
public Block reconfigure(Map<Member, Join> joining, Digest nextViewId, HashedBlock previous,
HashedBlock checkpoint) {
final HashedCertifiedBlock v = view.get();
return CHOAM.reconfigure(nextViewId, joining, previous, pendingView().get(), v, params, checkpoint);
var block = CHOAM.reconfigure(nextViewId, joining, previous, pendingView().get(), v, params,
checkpoint);
log.trace("Produced block: {} height: {} on: {}", block.getBodyCase(), block.getHeader().getHeight(),
params.member().getId());
return block;
}
};
}
Expand Down Expand Up @@ -648,10 +655,11 @@ private void nextView() {
log.error("Unable to generate and sign consensus key on: {}", params.member().getId());
return;
}
var committee = current.get();
log.trace("Generated next view consensus key: {} sig: {} committee: {} on: {}",
params.digestAlgorithm().digest(pubKey.getEncoded()),
params.digestAlgorithm().digest(signed.toSig().toByteString()), current.get(),
params.member().getId());
params.digestAlgorithm().digest(signed.toSig().toByteString()),
committee == null ? "<no formation>" : committee.getClass().getSimpleName(), params.member().getId());
next.set(new nextView(ViewMember.newBuilder()
.setId(params.member().getId().toDigeste())
.setConsensusKey(pubKey)
Expand Down Expand Up @@ -727,22 +735,26 @@ private void reconfigure(Reconfigure reconfigure) {
final HashedCertifiedBlock h = head.get();
view.set(h);
session.setView(h);
try {
if (validators.containsKey(params.member())) {
if (validators.containsKey(params.member())) {
try {
current.set(new Associate(h, validators, currentView));
} else {
} catch (IllegalArgumentException e) {
current.set(new Client(validators, getViewId()));
log.debug("unable to create consensus: {} defaulting to committee: {} on: {}", e.getMessage(),
current.get().getClass().getSimpleName(), params.member().getId());
}
} catch (IllegalArgumentException e) {
log.debug("unable to create consensus: {} committee: {} on: {}", e.getMessage(), current.get(),
params.member().getId());
} else {
current.set(new Client(validators, getViewId()));
}
log.info("Reconfigured to view: {} committee: {} validators: {} on: {}", new Digest(reconfigure.getId()),
current.get(), validators.entrySet()
.stream()
.map(e -> String.format("id: %s key: %s", e.getKey().getId(),
params.digestAlgorithm().digest(e.toString())))
.toList(), params.member().getId());
current.get().getClass().getSimpleName(), validators.entrySet()
.stream()
.map(e -> String.format("id: %s key: %s",
e.getKey().getId(),
params.digestAlgorithm()
.digest(
e.toString())))
.toList(), params.member().getId());
}

private void recover(HashedCertifiedBlock anchor) {
Expand Down Expand Up @@ -965,7 +977,7 @@ private void synchronizedProcess(CertifiedBlock certifiedBlock) {
}
if (!previousBlock.hash.equals(prev)) {
log.error(
"Protocol violation on: {}. New block does not refer to current block hash. Should be {} and next block's prev is {}, current height: {} next height: {} on: {}",
"Protocol violation on: {}. New block does not refer to current block hash. Should be: {} and next block's prev is: {}, current height: {} next height: {} on: {}",
params.member().getId(), previousBlock.hash, prev, prevHeight, hcb.height(), params.member().getId());
return;
}
Expand Down Expand Up @@ -1004,7 +1016,7 @@ public interface BlockProducer {

Block produce(ULong height, Digest prev, Executions executions, HashedBlock checkpoint);

void publish(CertifiedBlock cb);
void publish(Digest hash, CertifiedBlock cb);

Block reconfigure(Map<Member, Join> joining, Digest nextViewId, HashedBlock previous, HashedBlock checkpoint);
}
Expand Down
25 changes: 15 additions & 10 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.salesforce.apollo.choam.support.HashedCertifiedBlock;
import com.salesforce.apollo.choam.support.TxDataSource;
import com.salesforce.apollo.cryptography.Digest;
import com.salesforce.apollo.cryptography.DigestAlgorithm;
import com.salesforce.apollo.ethereal.Config;
import com.salesforce.apollo.ethereal.Config.Builder;
import com.salesforce.apollo.ethereal.Ethereal;
Expand Down Expand Up @@ -163,12 +164,15 @@ private void addReassemble(Reassemble r) {
}

private void create(List<ByteString> preblock, boolean last) {
log.debug("preblock produced, last: {} on: {}", last, params().member().getId());
if (log.isDebugEnabled()) {
log.debug("emit last: {} preblock: {} on: {}", last,
preblock.stream().map(DigestAlgorithm.DEFAULT::digest).toList(), params().member().getId());
}
var aggregate = preblock.stream().map(e -> {
try {
return UnitData.parseFrom(e);
} catch (InvalidProtocolBufferException ex) {
log.error("Error parsing unit data on: {}", params().member().getId());
log.error("Error parsing unit data on: {}", params().member().getId(), ex);
return null;
}
}).filter(Objects::nonNull).toList();
Expand Down Expand Up @@ -218,7 +222,7 @@ private void create(List<ByteString> preblock, boolean last) {
final var p = new PendingBlock(next, new HashMap<>(), new AtomicBoolean());
pending.put(next.hash, p);
p.witnesses.put(params().member(), validation);
log.debug("Created block: {} hash: {} height: {} prev: {} last: {} on: {}", next.block.getBodyCase(),
log.debug("Produced block: {} hash: {} height: {} prev: {} last: {} on: {}", next.block.getBodyCase(),
next.hash, next.height(), lb.hash, last, params().member().getId());
processPendingValidations(next, p);
}
Expand Down Expand Up @@ -268,12 +272,13 @@ private void produceAssemble() {
pending.put(assemble.hash, p);
p.witnesses.put(params().member(), validation);
ds.offer(validation);
log.debug("Produced view assembly: {} block: {} height: {} body: {} from: {} on: {}", nextViewId, assemble.hash,
assemble.height(), assemble.block.getBodyCase(), getViewId(), params().member().getId());
log.debug("Produced block: {} hash: {} height: {} from: {} on: {}", assemble.block.getBodyCase(), assemble.hash,
assemble.height(), getViewId(), params().member().getId());
processPendingValidations(assemble, p);
}

private void publish(PendingBlock p) {
assert p.witnesses.size() >= params().majority() : "Publishing non majority block";
log.debug("Published pending: {} hash: {} height: {} witnesses: {} on: {}", p.block.block.getBodyCase(),
p.block.hash, p.block.height(), p.witnesses.values().size(), params().member().getId());
p.published.set(true);
Expand Down Expand Up @@ -327,9 +332,9 @@ public void assembled() {
p.witnesses.put(params().member(), validation);
ds.offer(validation);
// controller.completeIt();
log.info("Reconfiguration block: {} height: {} slate: {} produced on: {}", reconfiguration.hash,
reconfiguration.height(), slate.keySet().stream().map(m -> m.getId()).sorted().toList(),
params().member().getId());
log.info("Produced: {} hash: {} height: {} slate: {} on: {}", reconfiguration.block.getBodyCase(),
reconfiguration.hash, reconfiguration.height(),
slate.keySet().stream().map(m -> m.getId()).sorted().toList(), params().member().getId());
processPendingValidations(reconfiguration, p);
}

Expand Down Expand Up @@ -370,8 +375,8 @@ public void checkpoint() {
final var p = new PendingBlock(next, new HashMap<>(), new AtomicBoolean());
pending.put(next.hash, p);
p.witnesses.put(params().member(), validation);
log.info("Produced checkpoint: {} height: {} for: {} on: {}", next.hash, next.height(), getViewId(),
params().member().getId());
log.info("Produced: {} hash: {} height: {} for: {} on: {}", next.block.getBodyCase(), next.hash,
next.height(), getViewId(), params().member().getId());
processPendingValidations(next, p);
transitions.checkpointed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public Block produce(ULong l, Digest hash, Executions executions, HashedBlock ch
}

public void publish(HashedCertifiedBlock block) {
blockProducer.publish(block.certifiedBlock);
blockProducer.publish(block.hash, block.certifiedBlock);
}

public Block reconfigure(Map<Member, Join> aggregate, Digest nextViewId, HashedBlock lastBlock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public Block produce(ULong height, Digest prev, Executions executions, HashedBlo
}

@Override
public void publish(CertifiedBlock cb) {
public void publish(Digest hash, CertifiedBlock cb) {
complete.countDown();
}

Expand Down
8 changes: 0 additions & 8 deletions sql-state/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
*/
package com.salesforce.apollo.state;

import ch.qos.logback.classic.Level;
import com.salesforce.apollo.choam.CHOAM;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.utils.Utils;
import org.joou.ULong;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

Expand All @@ -25,7 +22,7 @@
public class CheckpointBootstrapTest extends AbstractLifecycleTest {

static {
((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(CHOAM.class)).setLevel(Level.INFO);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(CHOAM.class)).setLevel(Level.INFO);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(GenesisAssembly.class)).setLevel(Level.TRACE);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(ViewAssembly.class)).setLevel(Level.TRACE);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Producer.class)).setLevel(Level.TRACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
*/
package com.salesforce.apollo.state;

import ch.qos.logback.classic.Level;
import com.salesforce.apollo.choam.CHOAM;
import com.salesforce.apollo.context.DynamicContext;
import com.salesforce.apollo.utils.Utils;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -21,7 +18,7 @@
public class GenesisBootstrapTest extends AbstractLifecycleTest {

static {
((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(CHOAM.class)).setLevel(Level.TRACE);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(CHOAM.class)).setLevel(Level.TRACE);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(GenesisAssembly.class)).setLevel(Level.TRACE);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(ViewAssembly.class)).setLevel(Level.TRACE);
// ((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Producer.class)).setLevel(Level.TRACE);
Expand Down
12 changes: 12 additions & 0 deletions sql-state/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam.support.BatchingQueue" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam.support.TxDataSource" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam.support.OneShot" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.chiralbehaviors.tron" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
Expand Down

0 comments on commit 73dedb9

Please sign in to comment.