Skip to content

Commit

Permalink
Update parent (#376)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v authored Oct 1, 2022
1 parent 3965890 commit afbc74c
Show file tree
Hide file tree
Showing 15 changed files with 39 additions and 253 deletions.
16 changes: 12 additions & 4 deletions cluster-api/src/main/java/io/scalecube/cluster/Member.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.ObjectOutput;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;

/**
* Cluster member which represents node in the cluster and contains its id and address. This class
Expand Down Expand Up @@ -73,8 +74,6 @@ public String namespace() {
* from other cluster members.
*
* @see io.scalecube.cluster.transport.api.TransportConfig#port(int)
* @see ClusterConfig#containerHost(String)
* @see ClusterConfig#containerPort(Integer)
* @return member address
*/
public Address address() {
Expand Down Expand Up @@ -131,13 +130,22 @@ public void readExternal(ObjectInput in) throws IOException {
this.namespace = in.readUTF();
}

private static String stringifyId(String id) {
try {
final UUID uuid = UUID.fromString(id);
return Long.toHexString(uuid.getMostSignificantBits() & Long.MAX_VALUE);
} catch (Exception ex) {
return id;
}
}

@Override
public String toString() {
StringJoiner stringJoiner = new StringJoiner(":");
if (alias == null) {
return stringJoiner.add(namespace).add(id + "@" + address).toString();
return stringJoiner.add(namespace).add(stringifyId(id) + "@" + address).toString();
} else {
return stringJoiner.add(namespace).add(alias).add(id + "@" + address).toString();
return stringJoiner.add(namespace).add(alias).add(stringifyId(id) + "@" + address).toString();
}
}
}
8 changes: 2 additions & 6 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public final class ClusterImpl implements Cluster {
private MembershipProtocolImpl membership;
private MetadataStore metadataStore;
private Scheduler scheduler;
private CorrelationIdGenerator cidGenerator;
private ClusterMonitorModel.Builder monitorModelBuilder;

public ClusterImpl() {
Expand Down Expand Up @@ -255,7 +254,6 @@ private Mono<Cluster> doStart0() {
localMember = createLocalMember(boundTransport.address());
transport = new SenderAwareTransport(boundTransport, localMember.address());

cidGenerator = new CorrelationIdGenerator(localMember.id());
scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true);
monitorModelBuilder = new ClusterMonitorModel.Builder();

Expand All @@ -265,8 +263,7 @@ private Mono<Cluster> doStart0() {
transport,
membershipSink.asFlux().onBackpressureBuffer(),
config.failureDetectorConfig(),
scheduler,
cidGenerator);
scheduler);

gossip =
new GossipProtocolImpl(
Expand All @@ -278,7 +275,7 @@ private Mono<Cluster> doStart0() {

metadataStore =
new MetadataStoreImpl(
localMember, transport, config.metadata(), config, scheduler, cidGenerator);
localMember, transport, config.metadata(), config, scheduler);

membership =
new MembershipProtocolImpl(
Expand All @@ -289,7 +286,6 @@ private Mono<Cluster> doStart0() {
metadataStore,
config,
scheduler,
cidGenerator,
monitorModelBuilder);

actionsDisposables.add(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;

import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.PingData.AckType;
import io.scalecube.cluster.membership.MemberStatus;
Expand All @@ -16,6 +15,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
Expand All @@ -41,7 +41,6 @@ public final class FailureDetectorImpl implements FailureDetector {
private final Member localMember;
private final Transport transport;
private final FailureDetectorConfig config;
private final CorrelationIdGenerator cidGenerator;

// State

Expand Down Expand Up @@ -69,21 +68,18 @@ public final class FailureDetectorImpl implements FailureDetector {
* @param membershipProcessor membership event processor
* @param config failure detector settings
* @param scheduler scheduler
* @param cidGenerator correlationId generator
*/
public FailureDetectorImpl(
Member localMember,
Transport transport,
Flux<MembershipEvent> membershipProcessor,
FailureDetectorConfig config,
Scheduler scheduler,
CorrelationIdGenerator cidGenerator) {
Scheduler scheduler) {

this.localMember = Objects.requireNonNull(localMember);
this.transport = Objects.requireNonNull(transport);
this.config = Objects.requireNonNull(config);
this.scheduler = Objects.requireNonNull(scheduler);
this.cidGenerator = Objects.requireNonNull(cidGenerator);

// Subscribe
actionsDisposables.addAll(
Expand Down Expand Up @@ -144,7 +140,7 @@ private void doPing() {
}

// Send ping
String cid = cidGenerator.nextCid();
String cid = UUID.randomUUID().toString();
PingData pingData = new PingData(localMember, pingMember);
Message pingMsg = Message.withData(pingData).qualifier(PING).correlationId(cid).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetector;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
Expand Down Expand Up @@ -35,6 +34,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -80,7 +80,6 @@ private enum MembershipUpdateReason {
private final FailureDetector failureDetector;
private final GossipProtocol gossipProtocol;
private final MetadataStore metadataStore;
private final CorrelationIdGenerator cidGenerator;
private final ClusterMonitorModel.Builder monitorModelBuilder;

// State
Expand Down Expand Up @@ -112,7 +111,6 @@ private enum MembershipUpdateReason {
* @param metadataStore metadata store
* @param config cluster config parameters
* @param scheduler scheduler
* @param cidGenerator correlation id generator
* @param monitorModelBuilder monitor model builder
*/
public MembershipProtocolImpl(
Expand All @@ -123,7 +121,6 @@ public MembershipProtocolImpl(
MetadataStore metadataStore,
ClusterConfig config,
Scheduler scheduler,
CorrelationIdGenerator cidGenerator,
ClusterMonitorModel.Builder monitorModelBuilder) {

this.transport = Objects.requireNonNull(transport);
Expand All @@ -132,7 +129,6 @@ public MembershipProtocolImpl(
this.metadataStore = Objects.requireNonNull(metadataStore);
this.localMember = Objects.requireNonNull(localMember);
this.scheduler = Objects.requireNonNull(scheduler);
this.cidGenerator = Objects.requireNonNull(cidGenerator);
this.monitorModelBuilder = Objects.requireNonNull(monitorModelBuilder);
this.membershipConfig = Objects.requireNonNull(config).membershipConfig();
this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig();
Expand Down Expand Up @@ -277,7 +273,8 @@ private void start0(MonoSink<Object> sink) {
.map(
address ->
transport
.requestResponse(address, prepareSyncDataMsg(SYNC, cidGenerator.nextCid()))
.requestResponse(
address, prepareSyncDataMsg(SYNC, UUID.randomUUID().toString()))
.doOnError(
ex ->
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.scalecube.cluster.metadata;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
Expand All @@ -12,6 +11,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
Expand All @@ -36,7 +36,6 @@ public class MetadataStoreImpl implements MetadataStore {
private final Member localMember;
private final Transport transport;
private final ClusterConfig config;
private final CorrelationIdGenerator cidGenerator;

// State

Expand All @@ -58,20 +57,17 @@ public class MetadataStoreImpl implements MetadataStore {
* @param localMetadata local metadata (optional)
* @param config config
* @param scheduler scheduler
* @param cidGenerator correlationId generator
*/
public MetadataStoreImpl(
Member localMember,
Transport transport,
Object localMetadata,
ClusterConfig config,
Scheduler scheduler,
CorrelationIdGenerator cidGenerator) {
Scheduler scheduler) {
this.localMember = Objects.requireNonNull(localMember);
this.transport = Objects.requireNonNull(transport);
this.config = Objects.requireNonNull(config);
this.scheduler = Objects.requireNonNull(scheduler);
this.cidGenerator = Objects.requireNonNull(cidGenerator);
this.localMetadata = localMetadata; // optional
}

Expand Down Expand Up @@ -151,7 +147,7 @@ public ByteBuffer removeMetadata(Member member) {
public Mono<ByteBuffer> fetchMetadata(Member member) {
return Mono.defer(
() -> {
final String cid = cidGenerator.nextCid();
final String cid = UUID.randomUUID().toString();
final Address targetAddress = member.address();

LOGGER.debug("[{}][{}] Getting metadata for member {}", localMember, cid, member);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.scalecube.cluster.BaseTest;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MemberStatus;
import io.scalecube.cluster.membership.MembershipEvent;
Expand Down Expand Up @@ -419,10 +418,7 @@ private FailureDetectorImpl createFd(
.map(address -> new Member("member-" + address.port(), null, address, NAMESPACE))
.map(member -> MembershipEvent.createAdded(member, null, 0));

CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id());

return new FailureDetectorImpl(
localMember, transport, membershipFlux, config, scheduler, cidGenerator);
return new FailureDetectorImpl(localMember, transport, membershipFlux, config, scheduler);
}

private void start(List<FailureDetectorImpl> fdetectors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.scalecube.cluster.BaseTest;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
import io.scalecube.cluster.gossip.GossipProtocolImpl;
Expand Down Expand Up @@ -1129,16 +1128,13 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf

Sinks.Many<MembershipEvent> membershipProcessor = Sinks.many().multicast().directBestEffort();

CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id());

FailureDetectorImpl failureDetector =
new FailureDetectorImpl(
localMember,
transport,
membershipProcessor.asFlux().onBackpressureBuffer(),
config.failureDetectorConfig(),
scheduler,
cidGenerator);
scheduler);

GossipProtocolImpl gossipProtocol =
new GossipProtocolImpl(
Expand All @@ -1149,7 +1145,7 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf
scheduler);

MetadataStoreImpl metadataStore =
new MetadataStoreImpl(localMember, transport, null, config, scheduler, cidGenerator);
new MetadataStoreImpl(localMember, transport, null, config, scheduler);

MembershipProtocolImpl membership =
new MembershipProtocolImpl(
Expand All @@ -1160,7 +1156,6 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf
metadataStore,
config,
scheduler,
cidGenerator,
new ClusterMonitorModel.Builder());

membership
Expand Down
19 changes: 0 additions & 19 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,4 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
8 changes: 0 additions & 8 deletions examples/scripts/issues/187/README

This file was deleted.

26 changes: 0 additions & 26 deletions examples/scripts/issues/187/node-i-th.sh

This file was deleted.

Loading

0 comments on commit afbc74c

Please sign in to comment.