Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type fields in proto messages-PR-51+ #54

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.PubSubSocketPair;
import de.unistuttgart.isw.sfsc.commonjava.zmq.reactor.Reactor;
import de.unistuttgart.isw.sfsc.commonjava.zmq.reactor.TransportProtocol;
import de.unistuttgart.isw.sfsc.framework.types.SfscId;
import de.unistuttgart.isw.sfsc.framework.types.Topic;
import java.io.File;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -95,17 +97,17 @@ public ControlPlane(Reactor reactor, AdapterParameter parameter)

Hello hello = Hello
.newBuilder()
.setAdapterId(heartbeatParameter.getOutgoingId())
.setHeartbeatTopic(heartbeatParameter.getExpectedIncomingTopic())
.setAdapterId(SfscId.newBuilder().setId(heartbeatParameter.getOutgoingId()).build())
.setHeartbeatTopic(Topic.newBuilder().setTopic(heartbeatParameter.getExpectedIncomingTopic()).build())
.build();
Welcome welcome = Handshaker.handshake(handshakerParameter, pubSubConnection, schedulerService, hello);

heartbeatModule = HeartbeatModule.create(pubSubConnection, schedulerService, heartbeatParameter);
ByteString heartbeatCoreTopic = ByteString.copyFromUtf8(parameter.getHeartbeatCoreTopic());
heartbeatModule.startSession(welcome.getCoreId(), heartbeatCoreTopic, coreId -> coreLostEvent.fire());
heartbeatModule.startSession(welcome.getCoreId().getId(), heartbeatCoreTopic, coreId -> coreLostEvent.fire());
registryModule = RegistryModule.create(registryParameter, pubSubConnection, schedulerService);

adapterInformation = new AdapterInformation(welcome.getCoreId(),
adapterInformation = new AdapterInformation(welcome.getCoreId().getId(),
parameter.getAdapterId(),
parameter.getTransportProtocol());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@
import de.unistuttgart.isw.sfsc.clientserver.protocol.registry.command.CommandReply;
import de.unistuttgart.isw.sfsc.commonjava.util.Handle;
import de.unistuttgart.isw.sfsc.commonjava.util.StoreEvent;
import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.Consumer;

public interface RegistryApi {

Future<CommandReply> create(ByteString entry);
Future<CommandReply> create(SfscServiceDescriptor entry);

Future<CommandReply> remove(ByteString entry);
Future<CommandReply> remove(SfscServiceDescriptor entry);

Set<ByteString> getEntries();
Set<SfscServiceDescriptor> getEntries();

Handle addListener(Consumer<StoreEvent<ByteString>> listener);
Handle addListener(Consumer<StoreEvent<SfscServiceDescriptor>> listener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable;
import de.unistuttgart.isw.sfsc.commonjava.util.scheduling.Scheduler;
import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.PubSubConnection;
import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor;
import de.unistuttgart.isw.sfsc.framework.types.SfscId;
import java.util.function.Consumer;

final class CommandClient implements NotThrowingAutoCloseable {
Expand All @@ -20,14 +22,14 @@ final class CommandClient implements NotThrowingAutoCloseable {
this.timeoutMs = timeoutMs;
}

void create(ByteString entry, String adapterId, Consumer<ByteString> consumer, Runnable timeoutRunnable) {
ByteString command = CommandRequest.newBuilder().setAdapterId(adapterId).setCreate(entry).build().toByteString();
void create(SfscServiceDescriptor entry, String adapterId, Consumer<ByteString> consumer, Runnable timeoutRunnable) {
ByteString command = CommandRequest.newBuilder().setAdapterId(SfscId.newBuilder().setId(adapterId).build()).setCreateRequest(entry).build().toByteString();
simpleClient.send(serverTopic, command, consumer, timeoutMs, timeoutRunnable);
}

void remove(ByteString entry, String adapterId, Consumer<ByteString> consumer, Runnable timeoutRunnable) {
void remove(SfscServiceDescriptor entry, String adapterId, Consumer<ByteString> consumer, Runnable timeoutRunnable) {
//todo dont consume bytestring but commandreply
ByteString command = CommandRequest.newBuilder().setAdapterId(adapterId).setDelete(entry).build().toByteString();
ByteString command = CommandRequest.newBuilder().setAdapterId(SfscId.newBuilder().setId(adapterId).build()).setDeleteRequest(entry).build().toByteString();
simpleClient.send(serverTopic, command, consumer, timeoutMs, timeoutRunnable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class Registry {

private static final Logger logger = LoggerFactory.getLogger(Registry.class);

private final Listeners<Consumer<StoreEvent<ByteString>>> entryListeners = new Listeners<>();
private final Listeners<Consumer<StoreEvent<SfscServiceDescriptor>>> entryListeners = new Listeners<>();
private final Listeners<Runnable> notificationListeners = new Listeners<>();

private final AtomicLong idCounter = new AtomicLong();
private final Set<ByteString> registry = ConcurrentHashMap.newKeySet();
private final Set<SfscServiceDescriptor> registry = ConcurrentHashMap.newKeySet();

private final Scheduler scheduler;

Expand Down Expand Up @@ -64,13 +65,13 @@ void handleQueryReply(ByteString byteString) {
void modifyRegistry(QueryReply queryReply) {
switch (queryReply.getCreatedOrDeletedOrExpiredOrFutureCase()) {
case CREATED: {
ByteString data = queryReply.getCreated();
SfscServiceDescriptor data = queryReply.getCreated();
registry.add(data);
onStoreEvent(StoreEventType.CREATE, data);
break;
}
case DELETED: {
ByteString data = queryReply.getDeleted();
SfscServiceDescriptor data = queryReply.getDeleted();
registry.remove(data);
onStoreEvent(StoreEventType.DELETE, data);
break;
Expand All @@ -80,13 +81,13 @@ void modifyRegistry(QueryReply queryReply) {

}

void onStoreEvent(StoreEventType type, ByteString data) {
StoreEvent<ByteString> storeEvent = new StoreEvent<>(type, data);
void onStoreEvent(StoreEventType type, SfscServiceDescriptor data) {
StoreEvent<SfscServiceDescriptor> storeEvent = new StoreEvent<>(type, data);
scheduler.execute(() -> entryListeners.forEach(consumer -> consumer.accept(storeEvent)));
}

Handle addEntryListener(Consumer<StoreEvent<ByteString>> listener) {
ReplayingListener<ByteString> replayingListener = new ReplayingListener<>(listener);
Handle addEntryListener(Consumer<StoreEvent<SfscServiceDescriptor>> listener) {
ReplayingListener<SfscServiceDescriptor> replayingListener = new ReplayingListener<>(listener);
Handle handle = entryListeners.add(replayingListener);

replayingListener.prepend(getRegistry());
Expand All @@ -99,7 +100,7 @@ Handle addNotificationListener(Runnable listener) {
return notificationListeners.add(listener);
}

Set<ByteString> getRegistry() {
Set<SfscServiceDescriptor> getRegistry() {
return Collections.unmodifiableSet(registry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -49,26 +50,26 @@ public static RegistryModule create(RegistryParameter parameter, PubSubConnectio
}

@Override
public Future<CommandReply> create(ByteString entry) {
public Future<CommandReply> create(SfscServiceDescriptor entry) {
FutureAdapter<ByteString, CommandReply> future = new FutureAdapter<>(CommandReply::parseFrom, () -> {throw new TimeoutException();});
commandClient.create(entry, params.getAdapterId(), future::handleInput, future::handleError);
return future;
}

@Override
public Future<CommandReply> remove(ByteString entry) {
public Future<CommandReply> remove(SfscServiceDescriptor entry) {
FutureAdapter<ByteString, CommandReply> future = new FutureAdapter<>(CommandReply::parseFrom, () -> {throw new TimeoutException();});
commandClient.remove(entry, params.getAdapterId(), future::handleInput, future::handleError);
return future;
}

@Override
public Set<ByteString> getEntries() {
public Set<SfscServiceDescriptor> getEntries() {
return registry.getRegistry();
}

@Override
public Handle addListener(Consumer<StoreEvent<ByteString>> listener) {
public Handle addListener(Consumer<StoreEvent<SfscServiceDescriptor>> listener) {
return registry.addEntryListener(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import de.unistuttgart.isw.sfsc.framework.types.SfscId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,7 +49,7 @@ void startSession(String remoteId, ByteString remoteTopic, Consumer<String> onDe
void accept(ByteString byteString) {
try {
HeartbeatMessage heartbeat = HeartbeatMessage.parseFrom(byteString);
keepAlive(heartbeat.getId());
keepAlive(heartbeat.getAdapterId().getId());
} catch (InvalidProtocolBufferException e) {
logger.warn("received malformed message", e);
}
Expand All @@ -64,7 +65,7 @@ void keepAlive(String adapterId) {
Handle startHeartbeat(ByteString remoteTopic) {
final String heartbeatId = params.getOutgoingId();
Publisher publisher = new Publisher(pubSubConnection);
Message message = HeartbeatMessage.newBuilder().setId(heartbeatId).build();
Message message = HeartbeatMessage.newBuilder().setAdapterId(SfscId.newBuilder().setId(heartbeatId).build()).build();
Future<?> future = scheduler.scheduleAtFixedRate(() ->
publisher.publish(remoteTopic, message), 0, params.getSendRateMs(), TimeUnit.MILLISECONDS);
return () -> future.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ public interface SubscriptionTracker {
Handle addOneShotListener(Predicate<StoreEvent<ByteString>> predicate, Runnable runnable);

default Handle addOneShotSubscriptionListener(ByteString topic, Runnable runnable) {
return addOneShotListener(storeEvent -> topic.equals(storeEvent.getData()) && storeEvent.getStoreEventType() == StoreEventType.CREATE, runnable);
return addOneShotListener(storeEvent ->
topic.equals(storeEvent.getData()) && storeEvent.getStoreEventType() == StoreEventType.CREATE, runnable);
}

default Handle addOneShotUnsubscriptionListener(ByteString topic, Runnable runnable) {
return addOneShotListener(storeEvent -> topic.equals(storeEvent.getData()) && storeEvent.getStoreEventType() == StoreEventType.DELETE, runnable);
return addOneShotListener(storeEvent ->
topic.equals(storeEvent.getData()) && storeEvent.getStoreEventType() == StoreEventType.DELETE, runnable);
}

default Awaitable addOneShotSubscriptionListener(ByteString topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.PubSubConnection;
import java.util.function.Consumer;
import java.util.function.Function;
import de.unistuttgart.isw.sfsc.framework.types.SfscId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -44,11 +45,11 @@ public SessionConsumer(Listeners<Consumer<NewSessionEvent>> sessionListeners, Se
public ByteString apply(ByteString byteString) {
try {
Hello hello = Hello.parseFrom(byteString);
String adapterId = hello.getAdapterId();
ByteString adapterHeartbeatTopic = hello.getHeartbeatTopic();
String adapterId = hello.getAdapterId().getId();
ByteString adapterHeartbeatTopic = hello.getHeartbeatTopic().getTopic();
logger.info("new session request from {}", hello.getAdapterId());
sessionListeners.forEach(consumer -> consumer.accept(new NewSessionEvent(adapterId, adapterHeartbeatTopic)));
return Welcome.newBuilder().setCoreId(parameter.getCoreId()).build().toByteString();
return Welcome.newBuilder().setCoreId(SfscId.newBuilder().setId(parameter.getCoreId()).build()).build().toByteString();
} catch (InvalidProtocolBufferException e) {
logger.warn("received malformed message", e);
return ByteString.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable;
import de.unistuttgart.isw.sfsc.core.hazelcast.registry.log.RegistryEventLog;
import de.unistuttgart.isw.sfsc.core.hazelcast.registry.replicatedregistry.ReplicatedRegistry;
import de.unistuttgart.isw.sfsc.framework.types.SfscId;
import de.unistuttgart.isw.sfsc.serverserver.registry.RegistryEntry;
import java.util.function.Consumer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -36,14 +37,14 @@ public Handle addEventListener(Consumer<QueryReply> listener) {
}

public CommandReply handleCommand(CommandRequest commandRequest) {
String adapterId = commandRequest.getAdapterId();
String adapterId = commandRequest.getAdapterId().getId();
switch (commandRequest.getCreateOrDeleteCase()) {
case CREATE: {
replicatedRegistry.add(RegistryEntry.newBuilder().setAdapterId(adapterId).setCoreId(coreId).setData(commandRequest.getCreate()).build());
case CREATE_REQUEST: {
replicatedRegistry.add(RegistryEntry.newBuilder().setAdapterId(SfscId.newBuilder().setId(adapterId).build()).setCoreId(SfscId.newBuilder().setId(coreId).build()).setData(commandRequest.getCreateRequest()).build());
break;
}
case DELETE: {
replicatedRegistry.remove(RegistryEntry.newBuilder().setAdapterId(adapterId).setCoreId(coreId).setData(commandRequest.getDelete()).build());
case DELETE_REQUEST: {
replicatedRegistry.remove(RegistryEntry.newBuilder().setAdapterId(SfscId.newBuilder().setId(adapterId).build()).setCoreId(SfscId.newBuilder().setId(coreId).build()).setData(commandRequest.getDeleteRequest()).build());
break;
}
default: {
Expand All @@ -59,7 +60,7 @@ public QueryReply handleQuery(QueryRequest queryRequest) {
}

public void deleteEntries(String adapterId) {
replicatedRegistry.removeAll(entry -> entry.getAdapterId().equals(adapterId));
replicatedRegistry.removeAll(entry -> entry.getAdapterId().getId().equals(adapterId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,7 +45,7 @@ public Handle addListener(Consumer<QueryReply> listener) {
return listeners.add(listener);
}

public void onStoreEvent(StoreEvent<ByteString> storeEvent) {
public void onStoreEvent(StoreEvent<SfscServiceDescriptor> storeEvent) {
long id = idCounter.get();
schedulerService.execute(() -> {
switch (storeEvent.getStoreEventType()) {
Expand All @@ -64,13 +65,13 @@ public void onStoreEvent(StoreEvent<ByteString> storeEvent) {
);
}

void onAdd(long id, ByteString byteString) {
void onAdd(long id, SfscServiceDescriptor byteString) {
QueryReply queryReply = QueryReply.newBuilder().setCreated(byteString).setEventId(id).build();
staging.put(id, queryReply);
processStaging();
}

void onRemove(long id, ByteString byteString) {
void onRemove(long id, SfscServiceDescriptor byteString) {
QueryReply queryReply = QueryReply.newBuilder().setDeleted(byteString).setEventId(id).build();
staging.put(id, queryReply);
processStaging();
Expand All @@ -94,18 +95,19 @@ void processStaging() {
}
}

void discardCreateEvent(ByteString byteString) {
void discardCreateEvent(SfscServiceDescriptor byteString) {
eventLog.entrySet().stream()
.filter(entry -> byteString.equals(entry.getValue().getCreated()))
.filter(entry -> byteString.toByteString().equals(entry.getValue().getCreated().toByteString()))
.map(Entry::getKey)
.findAny()
.ifPresentOrElse(eventLog::remove, () -> logger.warn("Could not discard, add event not found"));
}

public QueryReply handleQueryRequest(QueryRequest queryRequest) {
long id = queryRequest.getEventId();
if (logCounter.get() <= id) {
return QueryReply.newBuilder().setEventId(id).setFuture(Future.getDefaultInstance()).build();
long currentLogCount = logCounter.get();
if (currentLogCount <= id) {
return QueryReply.newBuilder().setEventId(id).setFuture(Future.newBuilder().setNewestValidEventId(currentLogCount)).build();
} else {
return Optional.ofNullable(eventLog.get(id))
.orElseGet(() -> QueryReply.newBuilder().setEventId(id).setExpired(Expired.getDefaultInstance()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
import com.hazelcast.map.MapEvent;
import de.unistuttgart.isw.sfsc.commonjava.util.StoreEvent;
import de.unistuttgart.isw.sfsc.commonjava.util.StoreEvent.StoreEventType;
import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor;
import de.unistuttgart.isw.sfsc.serverserver.registry.RegistryEntry;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

class EntryListenerAdapter implements EntryListener<RegistryEntry, Boolean> {

private final ReentrantLock lock = new ReentrantLock(true);
private final Consumer<StoreEvent<ByteString>> registryEventHandler;
private final Consumer<StoreEvent<SfscServiceDescriptor>> registryEventHandler;

EntryListenerAdapter(Consumer<StoreEvent<ByteString>> registryEventHandler) {
EntryListenerAdapter(Consumer<StoreEvent<SfscServiceDescriptor>> registryEventHandler) {
this.registryEventHandler = registryEventHandler;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import de.unistuttgart.isw.sfsc.commonjava.util.Handle;
import de.unistuttgart.isw.sfsc.commonjava.util.ReplayingListener;
import de.unistuttgart.isw.sfsc.commonjava.util.StoreEvent;
import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor;
import de.unistuttgart.isw.sfsc.serverserver.registry.RegistryEntry;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -24,8 +25,8 @@ public ReplicatedRegistry(ReplicatedMap<RegistryEntry, Boolean> replicatedMap) {
this.replicatedMap = replicatedMap;
}

public Handle addListener(Consumer<StoreEvent<ByteString>> listener) {
ReplayingListener<ByteString> replayingListener = new ReplayingListener<>(listener);
public Handle addListener(Consumer<StoreEvent<SfscServiceDescriptor>> listener) {
ReplayingListener<SfscServiceDescriptor> replayingListener = new ReplayingListener<>(listener);
UUID handle = replicatedMap.addEntryListener(new EntryListenerAdapter(replayingListener));

replayingListener.prepend(createStoreEventSnapshot());
Expand All @@ -50,7 +51,7 @@ public void removeAll(Predicate<RegistryEntry> predicate) {
copy.forEach(replicatedMap::remove);
}

Set<ByteString> createStoreEventSnapshot() {
Set<SfscServiceDescriptor> createStoreEventSnapshot() {
return Set.copyOf(replicatedMap.keySet()).stream()
.map(RegistryEntry::getData)
.collect(Collectors.toUnmodifiableSet());
Expand Down
Loading