Skip to content

Commit

Permalink
~PreBlock (#161)
Browse files Browse the repository at this point in the history
* Remove PreBlock

* fix isolates testing

* remove
  • Loading branch information
Hellblazer authored Nov 17, 2023
1 parent 9189379 commit 27f717e
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.salesforce.apollo.ethereal.Dag;
import com.salesforce.apollo.ethereal.DataSource;
import com.salesforce.apollo.ethereal.Ethereal;
import com.salesforce.apollo.ethereal.Ethereal.PreBlock;
import com.salesforce.apollo.ethereal.memberships.ChRbcGossip;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.ContextImpl;
Expand Down Expand Up @@ -128,8 +127,8 @@ public void certify() {
}

@Override
public void certify(PreBlock preblock, boolean last) {
preblock.data().stream().map(bs -> {
public void certify(List<ByteString> preblock, boolean last) {
preblock.stream().map(bs -> {
try {
return Validate.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -157,8 +156,8 @@ public void gather() {
}

@Override
public void gather(PreBlock preblock, boolean last) {
preblock.data().stream().map(bs -> {
public void gather(List<ByteString> preblock, boolean last) {
preblock.stream().map(bs -> {
try {
return Join.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
Expand All @@ -180,9 +179,8 @@ public void nominate() {
}

@Override
public void nominations(PreBlock preblock, boolean last) {
preblock.data()
.stream()
public void nominations(List<ByteString> preblock, boolean last) {
preblock.stream()
.map(bs -> {
try {
return Validations.parseFrom(bs);
Expand Down
8 changes: 4 additions & 4 deletions choam/src/main/java/com/salesforce/apollo/choam/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -44,7 +45,6 @@
import com.salesforce.apollo.ethereal.Config;
import com.salesforce.apollo.ethereal.Config.Builder;
import com.salesforce.apollo.ethereal.Ethereal;
import com.salesforce.apollo.ethereal.Ethereal.PreBlock;
import com.salesforce.apollo.ethereal.memberships.ChRbcGossip;
import com.salesforce.apollo.membership.Member;

Expand Down Expand Up @@ -122,7 +122,7 @@ public void complete() {
}

@Override
public void create(PreBlock preblock, boolean last) {
public void create(List<ByteString> preblock, boolean last) {
Producer.this.create(preblock, last);
}

Expand Down Expand Up @@ -283,9 +283,9 @@ private void addReassemble(Reassemble r) {
ds.offer(r);
}

private void create(PreBlock preblock, boolean last) {
private void create(List<ByteString> preblock, boolean last) {
log.debug("preblock produced, last: {} on: {}", last, params().member().getId());
var aggregate = preblock.data().stream().map(e -> {
var aggregate = preblock.stream().map(e -> {
try {
return UnitData.parseFrom(e);
} catch (InvalidProtocolBufferException ex) {
Expand Down
72 changes: 33 additions & 39 deletions choam/src/main/java/com/salesforce/apollo/choam/fsm/Driven.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,41 @@
*/
package com.salesforce.apollo.choam.fsm;

import com.chiralbehaviors.tron.Entry;
import com.chiralbehaviors.tron.FsmExecutor;
import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.chiralbehaviors.tron.Entry;
import com.chiralbehaviors.tron.FsmExecutor;
import com.salesforce.apollo.ethereal.Ethereal.PreBlock;
import java.util.List;

/**
* Leaf action interface for the Producer FSM
*
* @author hal.hildebrand
*
* @author hal.hildebrand
*/
public interface Driven {
public static String PERIODIC_VALIDATIONS = "PERIODIC_VALIDATIONS";
public static String SYNC = "SYNC";

void assembled();

void checkAssembly();

void checkpoint();

void complete();

void create(List<ByteString> preblock, boolean last);

void fail();

void produceAssemble();

void reconfigure();

void startProduction();

enum Earner implements Driven.Transitions {
AWAIT_VIEW {
@Override
Expand All @@ -34,7 +55,7 @@ public void checkAssembly() {
}

@Override
public Transitions create(PreBlock preblock, boolean last) {
public Transitions create(List<ByteString> preblock, boolean last) {
context().checkAssembly();
return super.create(preblock, last);
}
Expand All @@ -49,9 +70,7 @@ public Transitions viewComplete() {
context().assembled();
return null;
}
},
CHECKPOINTING {

}, CHECKPOINTING {
@Entry
public void check() {
context().checkpoint();
Expand All @@ -61,10 +80,8 @@ public void check() {
public Transitions checkpointed() {
return SPICE;
}
},
COMPLETE {
},
INITIAL {
}, COMPLETE {
}, INITIAL {
@Override
public Transitions checkpoint() {
return CHECKPOINTING;
Expand All @@ -74,8 +91,7 @@ public Transitions checkpoint() {
public Transitions start() {
return SPICE;
}
},
PROTOCOL_FAILURE {
}, PROTOCOL_FAILURE {
@Override
public Transitions assembled() {
return null;
Expand Down Expand Up @@ -111,8 +127,7 @@ public void terminate() {
log.error("Protocol failure", new Exception("Protocol failure at: " + fsm().getPreviousState()));
context().fail();
}
},
SPICE {
}, SPICE {
@Override
public Transitions assembled() {
context().reconfigure();
Expand Down Expand Up @@ -160,7 +175,7 @@ default Transitions checkpointed() {
throw fsm().invalidTransitionOn();
}

default Transitions create(PreBlock preblock, boolean last) {
default Transitions create(List<ByteString> preblock, boolean last) {
context().create(preblock, last);
return null;
}
Expand Down Expand Up @@ -189,25 +204,4 @@ default Transitions viewComplete() {
throw fsm().invalidTransitionOn();
}
}

public static String PERIODIC_VALIDATIONS = "PERIODIC_VALIDATIONS";
public static String SYNC = "SYNC";

void assembled();

void checkAssembly();

void checkpoint();

void complete();

void create(PreBlock preblock, boolean last);

void fail();

void produceAssemble();

void reconfigure();

void startProduction();
}
18 changes: 10 additions & 8 deletions choam/src/main/java/com/salesforce/apollo/choam/fsm/Genesis.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

import com.chiralbehaviors.tron.Entry;
import com.chiralbehaviors.tron.FsmExecutor;
import com.salesforce.apollo.ethereal.Ethereal.PreBlock;
import com.google.protobuf.ByteString;

import java.util.List;

/**
* @author hal.hildebrand
Expand All @@ -24,7 +26,7 @@ public void certify() {
}

@Override
public Transitions process(PreBlock preblock, boolean last) {
public Transitions process(List<ByteString> preblock, boolean last) {
context().certify(preblock, last);
return last ? PUBLISH : null;
}
Expand All @@ -44,7 +46,7 @@ public Transitions nextEpoch(Integer epoch) {
}

@Override
public Transitions process(PreBlock preblock, boolean last) {
public Transitions process(List<ByteString> preblock, boolean last) {
context().gather(preblock, last);
return null;
}
Expand All @@ -61,7 +63,7 @@ public void nominate() {
}

@Override
public Transitions process(PreBlock preblock, boolean last) {
public Transitions process(List<ByteString> preblock, boolean last) {
context().nominations(preblock, last);
return null;
}
Expand All @@ -81,22 +83,22 @@ default Transitions nextEpoch(Integer epoch) {
throw fsm().invalidTransitionOn();
}

default Transitions process(PreBlock preblock, boolean last) {
default Transitions process(List<ByteString> preblock, boolean last) {
throw fsm().invalidTransitionOn();
}
}

public void certify();

public void certify(PreBlock preblock, boolean last);
public void certify(List<ByteString> preblock, boolean last);

public void gather();

public void gather(PreBlock preblock, boolean last);
public void gather(List<ByteString> preblock, boolean last);

public void nominate();

public void nominations(PreBlock preblock, boolean last);
public void nominations(List<ByteString> preblock, boolean last);

public void publish();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.salesforce.apollo.ethereal.Config;
import com.salesforce.apollo.ethereal.DataSource;
import com.salesforce.apollo.ethereal.Ethereal;
import com.salesforce.apollo.ethereal.Ethereal.PreBlock;
import com.salesforce.apollo.ethereal.memberships.ChRbcGossip;
import com.salesforce.apollo.membership.Context;
import com.salesforce.apollo.membership.ContextImpl;
Expand Down Expand Up @@ -189,7 +188,7 @@ private void initEthereals() {
final short pid = i;
final var member = members.get(i);
ViewAssembly assembly = assemblies.get(member);
BiConsumer<PreBlock, Boolean> blocker = (pb, last) -> {
BiConsumer<List<ByteString>, Boolean> blocker = (pb, last) -> {
assembly.inbound().accept(process(pb, last));
};
var controller = new Ethereal(builder.setSigner(members.get(i)).setPid(pid).build(), 1024 * 1024,
Expand All @@ -202,8 +201,8 @@ private void initEthereals() {
}
}

private List<Reassemble> process(PreBlock preblock, Boolean last) {
return preblock.data().stream().map(bs -> {
private List<Reassemble> process(List<ByteString> preblock, Boolean last) {
return preblock.stream().map(bs -> {
try {
return Reassemble.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
*/
public class Ethereal {

public record PreBlock(List<ByteString> data) {}

record epoch(int id, Dag dag, Adder adder, AtomicBoolean more) {

public void close() {
Expand Down Expand Up @@ -103,21 +101,21 @@ public static ThreadPoolExecutor consumer(String label) {
* that the timing unit is the last unit in the slice, and that random source
* data of the timing unit starts with random bytes from the previous level.
*/
public static PreBlock toPreBlock(List<Unit> round) {
public static List<ByteString> toList(List<Unit> round) {
var data = new ArrayList<ByteString>();
for (Unit u : round) {
if (!u.dealing()) {// data in dealing units doesn't come from users, these are new epoch proofs
data.add(u.data());
}
}
return data.isEmpty() ? null : new PreBlock(data);
return data.isEmpty() ? null : data;
}

private static Consumer<List<Unit>> blocker(BiConsumer<PreBlock, Boolean> blocker, Config config) {
private static Consumer<List<Unit>> blocker(BiConsumer<List<ByteString>, Boolean> blocker, Config config) {
return units -> {
var print = log.isTraceEnabled() ? units.stream().map(e -> e.shortString()).toList() : null;
log.trace("Make pre block: {} on: {}", print, config.logLabel());
PreBlock preBlock = toPreBlock(units);
List<ByteString> preBlock = toList(units);
var timingUnit = units.get(units.size() - 1);
var last = false;
if (timingUnit.level() == config.lastLevel() && timingUnit.epoch() == config.numberOfEpochs() - 1) {
Expand Down Expand Up @@ -149,7 +147,7 @@ private static Consumer<List<Unit>> blocker(BiConsumer<PreBlock, Boolean> blocke
private final AtomicBoolean started = new AtomicBoolean();
private final Consumer<List<Unit>> toPreblock;

public Ethereal(Config config, int maxSerializedSize, DataSource ds, BiConsumer<PreBlock, Boolean> blocker,
public Ethereal(Config config, int maxSerializedSize, DataSource ds, BiConsumer<List<ByteString>, Boolean> blocker,
Consumer<Integer> newEpochAction, ThreadPoolExecutor consumer) {
this(config, maxSerializedSize, ds, blocker(blocker, config), newEpochAction, consumer);
}
Expand Down
Loading

0 comments on commit 27f717e

Please sign in to comment.