Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thoth-2 #183

Merged
merged 8 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Cache local Maven repository
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- uses: actions/checkout@v3
- uses: graalvm/setup-graalvm@v1
with:
Expand All @@ -23,4 +16,4 @@ jobs:
cache: 'maven'
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: ./mvnw -batch-mode clean install -Ppre --file pom.xml
run: ./mvnw -T 1C -batch-mode clean install -Ppre --file pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,21 @@ private Join join(Digest v) {
log.error("Empty seeding response on: {}", node.getId());
return;
}
var view = Digest.from(r.getView());
log.info("Rebalancing to cardinality: {} (validate) for: {} context: {} on: {}", r.getCardinality(), view,
context.getId(), node.getId());
this.context.rebalance(r.getCardinality());
node.nextNote(view);

log.debug("Completing redirect to view: {} context: {} sample: {} on: {}", view, this.context.getId(),
r.getSampleCount(), node.getId());
if (timer != null) {
timer.close();
}
join(r, view, duration);
Thread.ofVirtual().start(Utils.wrapped(() -> {
var view = Digest.from(r.getView());
log.info("Rebalancing to cardinality: {} (validate) for: {} context: {} on: {}", r.getCardinality(),
view, context.getId(), node.getId());
this.context.rebalance(r.getCardinality());
node.nextNote(view);

log.debug("Completing redirect to view: {} context: {} sample: {} on: {}", view, this.context.getId(),
r.getSampleCount(), node.getId());
if (timer != null) {
timer.close();
}
join(r, view, duration);
}, log));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ void viewChange(Runnable r) {
try {
r.run();
// log.error("Exit view change on: {}", node.getId());
} catch (Throwable t) {
log.error("Error during view change on: {}", node.getId(), t);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,43 +300,47 @@ void join(Join join, Digest from, StreamObserver<Gateway> responseObserver, Time
}

BiConsumer<? super Bound, ? super Throwable> join(Duration duration, Timer.Context timer) {
return (bound, t) -> view.viewChange(() -> {
final var hex = bound.view();
return (bound, t) -> {
if (t != null) {
log.error("Failed to join view on: {}", node.getId(), t);
view.stop();
return;
}

log.info("Rebalancing to cardinality: {} (join) for: {} context: {} on: {}", hex.getCardinality(),
hex.compact(), context.getId(), node.getId());
context.rebalance(hex.getCardinality());
context.activate(node);
diadem.set(hex);
currentView.set(hex.compact());

bound.successors().forEach(view::addToView);
bound.initialSeedSet().forEach(view::addToView);

view.reset();

context.allMembers().forEach(Participant::clearAccusations);

view.schedule(duration);

if (timer != null) {
timer.stop();
}

view.introduced();
log.info("Currently joining view: {} seeds: {} cardinality: {} count: {} on: {}", currentView.get(),
bound.successors().size(), cardinality(), context.totalCount(), node.getId());
if (context.totalCount() == cardinality()) {
join();
} else {
populate(new ArrayList<>(context.activeMembers()));
}
});
Thread.ofVirtual().start(Utils.wrapped(() -> {
view.viewChange(() -> {
final var hex = bound.view();

log.info("Rebalancing to cardinality: {} (join) for: {} context: {} on: {}", hex.getCardinality(),
hex.compact(), context.getId(), node.getId());
context.rebalance(hex.getCardinality());
context.activate(node);
diadem.set(hex);
currentView.set(hex.compact());

bound.successors().forEach(view::addToView);
bound.initialSeedSet().forEach(view::addToView);

view.reset();

context.allMembers().forEach(Participant::clearAccusations);

view.schedule(duration);

if (timer != null) {
timer.stop();
}

view.introduced();
log.info("Currently joining view: {} seeds: {} cardinality: {} count: {} on: {}", currentView.get(),
bound.successors().size(), cardinality(), context.totalCount(), node.getId());
if (context.totalCount() == cardinality()) {
join();
} else {
populate(new ArrayList<>(context.activeMembers()));
}
});
}, log));
};
}

void joinUpdatesFor(BloomFilter<Digest> joinBff, Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@
* @author hal.hildebrand
*/
public class ProcessDomain extends Domain {
private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class);
protected final KerlDHT dht;
protected final View foundation;
private final EventValidation.DelegatedValidation validations;
private final Verifiers.DelegatedVerifiers verifiers;
private final ProcessDomainParameters parameters;
private final ViewLifecycleListener listener = listener();
private final static Logger log = LoggerFactory.getLogger(ProcessDomain.class);

protected final KerlDHT dht;
protected final View foundation;
private final EventValidation.DelegatedValidation validations;
private final Verifiers.DelegatedVerifiers verifiers;
private final ProcessDomainParameters parameters;
private final ViewLifecycleListener listener = listener();

public ProcessDomain(Digest group, ControlledIdentifierMember member, ProcessDomainParameters pdParams,
Builder builder, Parameters.RuntimeParameters.Builder runtime, InetSocketAddress endpoint,
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<forkCount>2</forkCount>
<reuseForks>true</reuseForks>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Loading
Loading