Skip to content

Commit

Permalink
subtle
Browse files Browse the repository at this point in the history
  • Loading branch information
Hellblazer committed Jul 4, 2024
1 parent bd8393a commit 867de1a
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 173 deletions.
5 changes: 1 addition & 4 deletions choam/src/main/java/com/salesforce/apollo/choam/CHOAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ public String logState() {
public void rotateViewKeys(ViewChange viewChange) {
var context = viewChange.context();
var diadem = viewChange.diadem();
log.trace("Setting RBC Context to: {} on: {}", context, params.member().getId());
((DelegatedContext<Member>) combine.getContext()).setContext(context);
var c = current.get();
if (c != null) {
Expand All @@ -331,10 +332,6 @@ public void rotateViewKeys(ViewChange viewChange) {
pendingViews.clear();
pendingViews.add(diadem, context);
}

log.info("Pushing pending view of: {}, diadem: {} size: {} on: {}", context.getId(), diadem, context.size(),
params.member().getId());
pendingViews.add(diadem, context);
}

public void start() {
Expand Down
17 changes: 11 additions & 6 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -57,6 +58,8 @@ public class Producer {
private final ViewAssembly assembly;
private final int maxEpoch;
private final AtomicBoolean assembled = new AtomicBoolean(false);
private final AtomicInteger epoch = new AtomicInteger(-1);
private final AtomicInteger preblocks = new AtomicInteger();

public Producer(Digest nextViewId, ViewContext view, HashedBlock lastBlock, HashedBlock checkpoint, String label,
ScheduledExecutorService scheduler) {
Expand Down Expand Up @@ -203,9 +206,10 @@ private List<UnitData> aggregate(List<ByteString> preblock) {
}

private void create(List<ByteString> preblock, boolean last) {
var count = preblocks.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("emit last: {} preblock: {} on: {}", last,
preblock.stream().map(DigestAlgorithm.DEFAULT::digest).toList(), params().member().getId());
log.debug("emit #{} epoch: {} hashes: {} last: {} on: {}", count, epoch,
preblock.stream().map(DigestAlgorithm.DEFAULT::digest).toList(), last, params().member().getId());
}
var aggregate = aggregate(preblock);
processAssemblies(aggregate);
Expand All @@ -220,18 +224,19 @@ private Digest getViewId() {
return view.context().getId();
}

private void newEpoch(Integer epoch) {
private void newEpoch(Integer e) {
serialize.execute(Utils.wrapped(() -> {
log.trace("new epoch: {} on: {}", epoch, params().member().getId());
this.epoch.set(e);
log.trace("new epoch: {} preblocks: {} on: {}", e, preblocks.get(), params().member().getId());
assembly.newEpoch();
var last = epoch >= maxEpoch && assembled.get();
var last = e >= maxEpoch && assembled.get();
if (last) {
controller.completeIt();
Producer.this.transitions.viewComplete();
} else {
ds.reset();
}
transitions.newEpoch(epoch, last);
transitions.newEpoch(e, last);
}, log));
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -36,14 +35,13 @@ public class TxDataSource implements DataSource {

private final static Logger log = LoggerFactory.getLogger(TxDataSource.class);

private final Duration batchInterval;
private final AtomicBoolean draining = new AtomicBoolean();
private final Member member;
private final ChoamMetrics metrics;
private final BatchingQueue<Transaction> processing;
private final BlockingQueue<Assemblies> assemblies = new LinkedBlockingQueue<>();
private final BlockingQueue<Validate> validations = new LinkedBlockingQueue<>();
private volatile Thread blockingThread;
private final Duration batchInterval;
private final AtomicBoolean draining = new AtomicBoolean();
private final Member member;
private final ChoamMetrics metrics;
private final BatchingQueue<Transaction> processing;
private final BlockingQueue<Assemblies> assemblies = new LinkedBlockingQueue<>();
private final BlockingQueue<Validate> validations = new LinkedBlockingQueue<>();

public TxDataSource(Member member, int maxElements, ChoamMetrics metrics, int maxBatchByteSize,
Duration batchInterval, int maxBatchCount) {
Expand All @@ -55,11 +53,6 @@ public TxDataSource(Member member, int maxElements, ChoamMetrics metrics, int ma
}

public void close() {
final var current = blockingThread;
if (current != null) {
current.interrupt();
}
blockingThread = null;
if (metrics != null) {
metrics.dropped(processing.size(), validations.size(), assemblies.size());
}
Expand All @@ -76,38 +69,15 @@ public void drain() {
@Override
public ByteString getData() {
var builder = UnitData.newBuilder();
log.trace("Requesting unit data on: {}", member.getId());
blockingThread = Thread.currentThread();
try {
var r = new ArrayList<Assemblies>();
var v = new ArrayList<Validate>();

if (draining.get()) {
var target = Instant.now().plus(batchInterval);
while (target.isAfter(Instant.now()) && builder.getAssembliesCount() == 0
&& builder.getValidationsCount() == 0) {
// rinse and repeat
r = new ArrayList<>();
assemblies.drainTo(r);
builder.addAllAssemblies(r);

v = new ArrayList<Validate>();
validations.drainTo(v);
builder.addAllValidations(v);

if (builder.getAssembliesCount() != 0 || builder.getValidationsCount() != 0) {
break;
}
var r = new ArrayList<Assemblies>();
assemblies.drainTo(r);
builder.addAllAssemblies(r);

// sleep waiting for input
try {
Thread.sleep(batchInterval.dividedBy(2).toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ByteString.EMPTY;
}
}
} else {
var v = new ArrayList<Validate>();
validations.drainTo(v);
builder.addAllValidations(v);
if (!draining.get()) {
if (processing.size() > 0 || (validations.isEmpty() || assemblies.isEmpty())) {
try {
var batch = processing.take(batchInterval);
if (batch != null) {
Expand All @@ -118,28 +88,17 @@ public ByteString getData() {
return ByteString.EMPTY;
}
}
}

// One more time into ye breech
r = new ArrayList<>();
assemblies.drainTo(r);
builder.addAllAssemblies(r);

v = new ArrayList<Validate>();
validations.drainTo(v);
builder.addAllValidations(v);

ByteString bs = builder.build().toByteString();
if (metrics != null) {
metrics.publishedBatch(builder.getTransactionsCount(), bs.size(), builder.getValidationsCount(),
builder.getAssembliesCount());
}
log.trace("Unit data: {} txns, {} validations, {} assemblies totalling: {} bytes on: {}",
builder.getTransactionsCount(), builder.getValidationsCount(), builder.getAssembliesCount(),
bs.size(), member.getId());
return bs;
} finally {
blockingThread = null;
ByteString bs = builder.build().toByteString();
if (metrics != null) {
metrics.publishedBatch(builder.getTransactionsCount(), bs.size(), builder.getValidationsCount(),
builder.getAssembliesCount());
}
log.trace("Unit data: {} txns, {} validations, {} assemblies totalling: {} bytes on: {}",
builder.getTransactionsCount(), builder.getValidationsCount(), builder.getAssembliesCount(),
bs.size(), member.getId());
return bs;
}

public int getRemainingReassemblies() {
Expand All @@ -166,8 +125,8 @@ public boolean offer(Transaction txn) {
}
}

public void offer(Validate generateValidation) {
validations.offer(generateValidation);
public boolean offer(Validate generateValidation) {
return validations.offer(generateValidation);
}

public void reset() {
Expand Down
4 changes: 0 additions & 4 deletions choam/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@
<appender-ref ref="STDOUT"/>
</logger>

<logger name="com.salesforce.apollo.choam.support.OneShot" level="warn" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>

<root level="warn">
<appender-ref ref="STDOUT"/>
</root>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private void insert(Unit unit) {
var ep = retrieveEpoch(unit);
if (ep != null) {
ep.adder().produce(unit);
log.debug("Produced: {} on: {}", unit, config.logLabel());
log.debug("Produced: {} {}", unit, config.logLabel());
} else {
log.trace("Unable to retrieve epic for Unit creator: {} epoch: {} height: {} level: {} on: {}",
unit.creator(), unit.epoch(), unit.height(), unit.level(), config.logLabel());
Expand Down
Loading

0 comments on commit 867de1a

Please sign in to comment.