diff --git a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/ControlPlane.java b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/ControlPlane.java index 38869c8..27151af 100644 --- a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/ControlPlane.java +++ b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/ControlPlane.java @@ -25,6 +25,8 @@ import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.PubSubSocketPair; import de.unistuttgart.isw.sfsc.commonjava.zmq.reactor.Reactor; import de.unistuttgart.isw.sfsc.commonjava.zmq.reactor.TransportProtocol; +import de.unistuttgart.isw.sfsc.framework.types.SfscId; +import de.unistuttgart.isw.sfsc.framework.types.Topic; import java.io.File; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -95,17 +97,17 @@ public ControlPlane(Reactor reactor, AdapterParameter parameter) Hello hello = Hello .newBuilder() - .setAdapterId(heartbeatParameter.getOutgoingId()) - .setHeartbeatTopic(heartbeatParameter.getExpectedIncomingTopic()) + .setAdapterId(SfscId.newBuilder().setId(heartbeatParameter.getOutgoingId()).build()) + .setHeartbeatTopic(Topic.newBuilder().setTopic(heartbeatParameter.getExpectedIncomingTopic()).build()) .build(); Welcome welcome = Handshaker.handshake(handshakerParameter, pubSubConnection, schedulerService, hello); heartbeatModule = HeartbeatModule.create(pubSubConnection, schedulerService, heartbeatParameter); ByteString heartbeatCoreTopic = ByteString.copyFromUtf8(parameter.getHeartbeatCoreTopic()); - heartbeatModule.startSession(welcome.getCoreId(), heartbeatCoreTopic, coreId -> coreLostEvent.fire()); + heartbeatModule.startSession(welcome.getCoreId().getId(), heartbeatCoreTopic, coreId -> coreLostEvent.fire()); registryModule = RegistryModule.create(registryParameter, pubSubConnection, schedulerService); - adapterInformation = new AdapterInformation(welcome.getCoreId(), + adapterInformation = new AdapterInformation(welcome.getCoreId().getId(), parameter.getAdapterId(), parameter.getTransportProtocol()); diff --git a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/RegistryApi.java b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/RegistryApi.java index 116a53b..3ef048b 100644 --- a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/RegistryApi.java +++ b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/RegistryApi.java @@ -4,18 +4,19 @@ import de.unistuttgart.isw.sfsc.clientserver.protocol.registry.command.CommandReply; import de.unistuttgart.isw.sfsc.commonjava.util.Handle; import de.unistuttgart.isw.sfsc.commonjava.util.StoreEvent; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; import java.util.Set; import java.util.concurrent.Future; import java.util.function.Consumer; public interface RegistryApi { - Future create(ByteString entry); + Future create(SfscServiceDescriptor entry); - Future remove(ByteString entry); + Future remove(SfscServiceDescriptor entry); - Set getEntries(); + Set getEntries(); - Handle addListener(Consumer> listener); + Handle addListener(Consumer> listener); } diff --git a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/CommandClient.java b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/CommandClient.java index c2acc27..72fc9f4 100644 --- a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/CommandClient.java +++ b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/CommandClient.java @@ -6,6 +6,8 @@ import de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable; import de.unistuttgart.isw.sfsc.commonjava.util.scheduling.Scheduler; import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.PubSubConnection; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; +import de.unistuttgart.isw.sfsc.framework.types.SfscId; import java.util.function.Consumer; final class CommandClient implements NotThrowingAutoCloseable { @@ -20,14 +22,14 @@ final class CommandClient implements NotThrowingAutoCloseable { this.timeoutMs = timeoutMs; } - void create(ByteString entry, String adapterId, Consumer consumer, Runnable timeoutRunnable) { - ByteString command = CommandRequest.newBuilder().setAdapterId(adapterId).setCreate(entry).build().toByteString(); + void create(SfscServiceDescriptor entry, String adapterId, Consumer consumer, Runnable timeoutRunnable) { + ByteString command = CommandRequest.newBuilder().setAdapterId(SfscId.newBuilder().setId(adapterId).build()).setCreateRequest(entry).build().toByteString(); simpleClient.send(serverTopic, command, consumer, timeoutMs, timeoutRunnable); } - void remove(ByteString entry, String adapterId, Consumer consumer, Runnable timeoutRunnable) { + void remove(SfscServiceDescriptor entry, String adapterId, Consumer consumer, Runnable timeoutRunnable) { //todo dont consume bytestring but commandreply - ByteString command = CommandRequest.newBuilder().setAdapterId(adapterId).setDelete(entry).build().toByteString(); + ByteString command = CommandRequest.newBuilder().setAdapterId(SfscId.newBuilder().setId(adapterId).build()).setDeleteRequest(entry).build().toByteString(); simpleClient.send(serverTopic, command, consumer, timeoutMs, timeoutRunnable); } diff --git a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/Registry.java b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/Registry.java index 1cfc961..8ed6a01 100644 --- a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/Registry.java +++ b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/Registry.java @@ -14,6 +14,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,11 +22,11 @@ final class Registry { private static final Logger logger = LoggerFactory.getLogger(Registry.class); - private final Listeners>> entryListeners = new Listeners<>(); + private final Listeners>> entryListeners = new Listeners<>(); private final Listeners notificationListeners = new Listeners<>(); private final AtomicLong idCounter = new AtomicLong(); - private final Set registry = ConcurrentHashMap.newKeySet(); + private final Set registry = ConcurrentHashMap.newKeySet(); private final Scheduler scheduler; @@ -64,13 +65,13 @@ void handleQueryReply(ByteString byteString) { void modifyRegistry(QueryReply queryReply) { switch (queryReply.getCreatedOrDeletedOrExpiredOrFutureCase()) { case CREATED: { - ByteString data = queryReply.getCreated(); + SfscServiceDescriptor data = queryReply.getCreated(); registry.add(data); onStoreEvent(StoreEventType.CREATE, data); break; } case DELETED: { - ByteString data = queryReply.getDeleted(); + SfscServiceDescriptor data = queryReply.getDeleted(); registry.remove(data); onStoreEvent(StoreEventType.DELETE, data); break; @@ -80,13 +81,13 @@ void modifyRegistry(QueryReply queryReply) { } - void onStoreEvent(StoreEventType type, ByteString data) { - StoreEvent storeEvent = new StoreEvent<>(type, data); + void onStoreEvent(StoreEventType type, SfscServiceDescriptor data) { + StoreEvent storeEvent = new StoreEvent<>(type, data); scheduler.execute(() -> entryListeners.forEach(consumer -> consumer.accept(storeEvent))); } - Handle addEntryListener(Consumer> listener) { - ReplayingListener replayingListener = new ReplayingListener<>(listener); + Handle addEntryListener(Consumer> listener) { + ReplayingListener replayingListener = new ReplayingListener<>(listener); Handle handle = entryListeners.add(replayingListener); replayingListener.prepend(getRegistry()); @@ -99,7 +100,7 @@ Handle addNotificationListener(Runnable listener) { return notificationListeners.add(listener); } - Set getRegistry() { + Set getRegistry() { return Collections.unmodifiableSet(registry); } diff --git a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/RegistryModule.java b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/RegistryModule.java index 05f2283..3efefc6 100644 --- a/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/RegistryModule.java +++ b/adapter/src/main/java/de/unistuttgart/isw/sfsc/adapter/control/registry/RegistryModule.java @@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,26 +50,26 @@ public static RegistryModule create(RegistryParameter parameter, PubSubConnectio } @Override - public Future create(ByteString entry) { + public Future create(SfscServiceDescriptor entry) { FutureAdapter future = new FutureAdapter<>(CommandReply::parseFrom, () -> {throw new TimeoutException();}); commandClient.create(entry, params.getAdapterId(), future::handleInput, future::handleError); return future; } @Override - public Future remove(ByteString entry) { + public Future remove(SfscServiceDescriptor entry) { FutureAdapter future = new FutureAdapter<>(CommandReply::parseFrom, () -> {throw new TimeoutException();}); commandClient.remove(entry, params.getAdapterId(), future::handleInput, future::handleError); return future; } @Override - public Set getEntries() { + public Set getEntries() { return registry.getRegistry(); } @Override - public Handle addListener(Consumer> listener) { + public Handle addListener(Consumer> listener) { return registry.addEntryListener(listener); } diff --git a/common-java/src/main/java/de/unistuttgart/isw/sfsc/commonjava/heartbeating/HeartbeatManager.java b/common-java/src/main/java/de/unistuttgart/isw/sfsc/commonjava/heartbeating/HeartbeatManager.java index 1b90854..7139786 100644 --- a/common-java/src/main/java/de/unistuttgart/isw/sfsc/commonjava/heartbeating/HeartbeatManager.java +++ b/common-java/src/main/java/de/unistuttgart/isw/sfsc/commonjava/heartbeating/HeartbeatManager.java @@ -15,6 +15,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import de.unistuttgart.isw.sfsc.framework.types.SfscId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,7 @@ void startSession(String remoteId, ByteString remoteTopic, Consumer onDe void accept(ByteString byteString) { try { HeartbeatMessage heartbeat = HeartbeatMessage.parseFrom(byteString); - keepAlive(heartbeat.getId()); + keepAlive(heartbeat.getAdapterId().getId()); } catch (InvalidProtocolBufferException e) { logger.warn("received malformed message", e); } @@ -64,7 +65,7 @@ void keepAlive(String adapterId) { Handle startHeartbeat(ByteString remoteTopic) { final String heartbeatId = params.getOutgoingId(); Publisher publisher = new Publisher(pubSubConnection); - Message message = HeartbeatMessage.newBuilder().setId(heartbeatId).build(); + Message message = HeartbeatMessage.newBuilder().setAdapterId(SfscId.newBuilder().setId(heartbeatId).build()).build(); Future future = scheduler.scheduleAtFixedRate(() -> publisher.publish(remoteTopic, message), 0, params.getSendRateMs(), TimeUnit.MILLISECONDS); return () -> future.cancel(true); diff --git a/common-java/src/main/java/de/unistuttgart/isw/sfsc/commonjava/zmq/pubsubsocketpair/inputmanagement/subscription/SubscriptionTracker.java b/common-java/src/main/java/de/unistuttgart/isw/sfsc/commonjava/zmq/pubsubsocketpair/inputmanagement/subscription/SubscriptionTracker.java index 393bd9f..ad51761 100644 --- a/common-java/src/main/java/de/unistuttgart/isw/sfsc/commonjava/zmq/pubsubsocketpair/inputmanagement/subscription/SubscriptionTracker.java +++ b/common-java/src/main/java/de/unistuttgart/isw/sfsc/commonjava/zmq/pubsubsocketpair/inputmanagement/subscription/SubscriptionTracker.java @@ -19,11 +19,13 @@ public interface SubscriptionTracker { Handle addOneShotListener(Predicate> predicate, Runnable runnable); default Handle addOneShotSubscriptionListener(ByteString topic, Runnable runnable) { - return addOneShotListener(storeEvent -> topic.equals(storeEvent.getData()) && storeEvent.getStoreEventType() == StoreEventType.CREATE, runnable); + return addOneShotListener(storeEvent -> + topic.equals(storeEvent.getData()) && storeEvent.getStoreEventType() == StoreEventType.CREATE, runnable); } default Handle addOneShotUnsubscriptionListener(ByteString topic, Runnable runnable) { - return addOneShotListener(storeEvent -> topic.equals(storeEvent.getData()) && storeEvent.getStoreEventType() == StoreEventType.DELETE, runnable); + return addOneShotListener(storeEvent -> + topic.equals(storeEvent.getData()) && storeEvent.getStoreEventType() == StoreEventType.DELETE, runnable); } default Awaitable addOneShotSubscriptionListener(ByteString topic) { diff --git a/core/src/main/java/de/unistuttgart/isw/sfsc/core/control/session/SessionServer.java b/core/src/main/java/de/unistuttgart/isw/sfsc/core/control/session/SessionServer.java index a8b8521..758d443 100644 --- a/core/src/main/java/de/unistuttgart/isw/sfsc/core/control/session/SessionServer.java +++ b/core/src/main/java/de/unistuttgart/isw/sfsc/core/control/session/SessionServer.java @@ -11,6 +11,7 @@ import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.PubSubConnection; import java.util.function.Consumer; import java.util.function.Function; +import de.unistuttgart.isw.sfsc.framework.types.SfscId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,11 +45,11 @@ public SessionConsumer(Listeners> sessionListeners, Se public ByteString apply(ByteString byteString) { try { Hello hello = Hello.parseFrom(byteString); - String adapterId = hello.getAdapterId(); - ByteString adapterHeartbeatTopic = hello.getHeartbeatTopic(); + String adapterId = hello.getAdapterId().getId(); + ByteString adapterHeartbeatTopic = hello.getHeartbeatTopic().getTopic(); logger.info("new session request from {}", hello.getAdapterId()); sessionListeners.forEach(consumer -> consumer.accept(new NewSessionEvent(adapterId, adapterHeartbeatTopic))); - return Welcome.newBuilder().setCoreId(parameter.getCoreId()).build().toByteString(); + return Welcome.newBuilder().setCoreId(SfscId.newBuilder().setId(parameter.getCoreId()).build()).build().toByteString(); } catch (InvalidProtocolBufferException e) { logger.warn("received malformed message", e); return ByteString.EMPTY; diff --git a/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/Registry.java b/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/Registry.java index d3dbec5..6dadef9 100644 --- a/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/Registry.java +++ b/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/Registry.java @@ -9,6 +9,7 @@ import de.unistuttgart.isw.sfsc.commonjava.util.NotThrowingAutoCloseable; import de.unistuttgart.isw.sfsc.core.hazelcast.registry.log.RegistryEventLog; import de.unistuttgart.isw.sfsc.core.hazelcast.registry.replicatedregistry.ReplicatedRegistry; +import de.unistuttgart.isw.sfsc.framework.types.SfscId; import de.unistuttgart.isw.sfsc.serverserver.registry.RegistryEntry; import java.util.function.Consumer; import org.slf4j.Logger; @@ -36,14 +37,14 @@ public Handle addEventListener(Consumer listener) { } public CommandReply handleCommand(CommandRequest commandRequest) { - String adapterId = commandRequest.getAdapterId(); + String adapterId = commandRequest.getAdapterId().getId(); switch (commandRequest.getCreateOrDeleteCase()) { - case CREATE: { - replicatedRegistry.add(RegistryEntry.newBuilder().setAdapterId(adapterId).setCoreId(coreId).setData(commandRequest.getCreate()).build()); + case CREATE_REQUEST: { + replicatedRegistry.add(RegistryEntry.newBuilder().setAdapterId(SfscId.newBuilder().setId(adapterId).build()).setCoreId(SfscId.newBuilder().setId(coreId).build()).setData(commandRequest.getCreateRequest()).build()); break; } - case DELETE: { - replicatedRegistry.remove(RegistryEntry.newBuilder().setAdapterId(adapterId).setCoreId(coreId).setData(commandRequest.getDelete()).build()); + case DELETE_REQUEST: { + replicatedRegistry.remove(RegistryEntry.newBuilder().setAdapterId(SfscId.newBuilder().setId(adapterId).build()).setCoreId(SfscId.newBuilder().setId(coreId).build()).setData(commandRequest.getDeleteRequest()).build()); break; } default: { @@ -59,7 +60,7 @@ public QueryReply handleQuery(QueryRequest queryRequest) { } public void deleteEntries(String adapterId) { - replicatedRegistry.removeAll(entry -> entry.getAdapterId().equals(adapterId)); + replicatedRegistry.removeAll(entry -> entry.getAdapterId().getId().equals(adapterId)); } @Override diff --git a/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/log/RegistryEventLog.java b/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/log/RegistryEventLog.java index 979224f..7ee7190 100644 --- a/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/log/RegistryEventLog.java +++ b/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/log/RegistryEventLog.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Supplier; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,7 @@ public Handle addListener(Consumer listener) { return listeners.add(listener); } - public void onStoreEvent(StoreEvent storeEvent) { + public void onStoreEvent(StoreEvent storeEvent) { long id = idCounter.get(); schedulerService.execute(() -> { switch (storeEvent.getStoreEventType()) { @@ -64,13 +65,13 @@ public void onStoreEvent(StoreEvent storeEvent) { ); } - void onAdd(long id, ByteString byteString) { + void onAdd(long id, SfscServiceDescriptor byteString) { QueryReply queryReply = QueryReply.newBuilder().setCreated(byteString).setEventId(id).build(); staging.put(id, queryReply); processStaging(); } - void onRemove(long id, ByteString byteString) { + void onRemove(long id, SfscServiceDescriptor byteString) { QueryReply queryReply = QueryReply.newBuilder().setDeleted(byteString).setEventId(id).build(); staging.put(id, queryReply); processStaging(); @@ -94,9 +95,9 @@ void processStaging() { } } - void discardCreateEvent(ByteString byteString) { + void discardCreateEvent(SfscServiceDescriptor byteString) { eventLog.entrySet().stream() - .filter(entry -> byteString.equals(entry.getValue().getCreated())) + .filter(entry -> byteString.toByteString().equals(entry.getValue().getCreated().toByteString())) .map(Entry::getKey) .findAny() .ifPresentOrElse(eventLog::remove, () -> logger.warn("Could not discard, add event not found")); @@ -104,8 +105,9 @@ void discardCreateEvent(ByteString byteString) { public QueryReply handleQueryRequest(QueryRequest queryRequest) { long id = queryRequest.getEventId(); - if (logCounter.get() <= id) { - return QueryReply.newBuilder().setEventId(id).setFuture(Future.getDefaultInstance()).build(); + long currentLogCount = logCounter.get(); + if (currentLogCount <= id) { + return QueryReply.newBuilder().setEventId(id).setFuture(Future.newBuilder().setNewestValidEventId(currentLogCount)).build(); } else { return Optional.ofNullable(eventLog.get(id)) .orElseGet(() -> QueryReply.newBuilder().setEventId(id).setExpired(Expired.getDefaultInstance()).build()); diff --git a/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/replicatedregistry/EntryListenerAdapter.java b/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/replicatedregistry/EntryListenerAdapter.java index d163b3f..c6a5ee5 100644 --- a/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/replicatedregistry/EntryListenerAdapter.java +++ b/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/replicatedregistry/EntryListenerAdapter.java @@ -6,6 +6,7 @@ import com.hazelcast.map.MapEvent; import de.unistuttgart.isw.sfsc.commonjava.util.StoreEvent; import de.unistuttgart.isw.sfsc.commonjava.util.StoreEvent.StoreEventType; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; import de.unistuttgart.isw.sfsc.serverserver.registry.RegistryEntry; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -13,9 +14,9 @@ class EntryListenerAdapter implements EntryListener { private final ReentrantLock lock = new ReentrantLock(true); - private final Consumer> registryEventHandler; + private final Consumer> registryEventHandler; - EntryListenerAdapter(Consumer> registryEventHandler) { + EntryListenerAdapter(Consumer> registryEventHandler) { this.registryEventHandler = registryEventHandler; } diff --git a/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/replicatedregistry/ReplicatedRegistry.java b/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/replicatedregistry/ReplicatedRegistry.java index 219d70e..1d2f085 100644 --- a/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/replicatedregistry/ReplicatedRegistry.java +++ b/core/src/main/java/de/unistuttgart/isw/sfsc/core/hazelcast/registry/replicatedregistry/ReplicatedRegistry.java @@ -5,6 +5,7 @@ import de.unistuttgart.isw.sfsc.commonjava.util.Handle; import de.unistuttgart.isw.sfsc.commonjava.util.ReplayingListener; import de.unistuttgart.isw.sfsc.commonjava.util.StoreEvent; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; import de.unistuttgart.isw.sfsc.serverserver.registry.RegistryEntry; import java.util.HashSet; import java.util.Set; @@ -24,8 +25,8 @@ public ReplicatedRegistry(ReplicatedMap replicatedMap) { this.replicatedMap = replicatedMap; } - public Handle addListener(Consumer> listener) { - ReplayingListener replayingListener = new ReplayingListener<>(listener); + public Handle addListener(Consumer> listener) { + ReplayingListener replayingListener = new ReplayingListener<>(listener); UUID handle = replicatedMap.addEntryListener(new EntryListenerAdapter(replayingListener)); replayingListener.prepend(createStoreEventSnapshot()); @@ -50,7 +51,7 @@ public void removeAll(Predicate predicate) { copy.forEach(replicatedMap::remove); } - Set createStoreEventSnapshot() { + Set createStoreEventSnapshot() { return Set.copyOf(replicatedMap.keySet()).stream() .map(RegistryEntry::getData) .collect(Collectors.toUnmodifiableSet()); diff --git a/docker-core/Dockerfile b/docker-core/Dockerfile index 472451c..d2fd758 100644 --- a/docker-core/Dockerfile +++ b/docker-core/Dockerfile @@ -1,6 +1,7 @@ -FROM openjdk:14-alpine -WORKDIR /sfsc -RUN apk add --no-cache tini +FROM adoptopenjdk/openjdk12 +# --platform=linux/arm/v7 +WORKDIR /app + COPY target/docker-core.jar . -ENTRYPOINT ["/sbin/tini", "--"] + CMD ["java", "-jar", "docker-core.jar"] diff --git a/docker-core/docker-compose.yaml b/docker-core/docker-compose.yaml new file mode 100644 index 0000000..463e9c4 --- /dev/null +++ b/docker-core/docker-compose.yaml @@ -0,0 +1,23 @@ +version: "3" +services: + core1: + container_name: core1 + image: "nalim2/sfsc-core:linux-x86-experimental" + build: . + ports: + - "1250:1250" + - "1251:1251" + - "1252:1252" + - "1253:1253" + - "1254:1254" + - "5701:5701" + environment: + - HOST=127.0.0.1 + - BACKEND_PORT=1250 + - CONTROL_PUB_PORT=1251 + - CONTROL_SUB_PORT=1252 + - DATA_PUB_PORT=1253 + - DATA_SUB_PORT=1254 + + - BACKEND_HOST=core1 + - HAZELCAST_PORT=5701 diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/ApiRegistryManager.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/ApiRegistryManager.java index e91637e..916821a 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/ApiRegistryManager.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/ApiRegistryManager.java @@ -59,9 +59,8 @@ Predicate getVarPathPredicate(Message message, Collection } public Handle registerService(SfscServiceDescriptor descriptor) { - ByteString descriptorBytes = descriptor.toByteString(); - registryApi.create(descriptor.toByteString()); //todo why returns future? - return () -> registryApi.remove(descriptorBytes); + registryApi.create(descriptor); //todo why returns future? + return () -> registryApi.remove(descriptor); } public Handle addStoreEventListener(Consumer> listener) { diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/RegexFilter.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/RegexFilter.java index 6bb54e6..ba80001 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/RegexFilter.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/RegexFilter.java @@ -3,9 +3,9 @@ import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.Message; import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.RegexDefinition; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.RegexDefinition.VarRegex.NumberRegex; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.RegexDefinition.VarRegex.StringRegex; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.RegexDefinition; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.RegexDefinition.VarRegex.NumberRegex; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.RegexDefinition.VarRegex.StringRegex; import java.util.Objects; import java.util.function.Predicate; @@ -21,7 +21,7 @@ final class RegexFilter implements Predicate { @Override public boolean test(SfscServiceDescriptor descriptor) { - RegexDefinition regexDefinition = descriptor.getServerTags().getRegex(); + RegexDefinition regexDefinition = descriptor.getServiceTags().getServerTags().getRegex(); return test(regexDefinition); } diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/StoreEventStreamConverter.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/StoreEventStreamConverter.java index 5efe9aa..2ac3917 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/StoreEventStreamConverter.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/registry/StoreEventStreamConverter.java @@ -17,7 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class StoreEventStreamConverter implements Consumer> { +class StoreEventStreamConverter implements Consumer> { private static final Logger logger = LoggerFactory.getLogger(StoreEventStreamConverter.class); @@ -28,9 +28,9 @@ class StoreEventStreamConverter implements Consumer> { StoreEventStreamConverter(Set services) {this.services = services;} @Override - public void accept(StoreEvent storeEvent) { - try { - SfscServiceDescriptor descriptor = SfscServiceDescriptor.parseFrom(storeEvent.getData()); + public void accept(StoreEvent storeEvent) { + + SfscServiceDescriptor descriptor = storeEvent.getData(); switch (storeEvent.getStoreEventType()) { case CREATE: { services.add(descriptor); @@ -49,9 +49,7 @@ public void accept(StoreEvent storeEvent) { break; } } - } catch (InvalidProtocolBufferException e) { - logger.warn("Registry contains malformed entries", e); - } + } Handle addListener(Consumer> listener) { diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/ServiceFactory.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/ServiceFactory.java index ac65eae..ecae06a 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/ServiceFactory.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/ServiceFactory.java @@ -6,13 +6,15 @@ import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.PubSubConnection; import de.unistuttgart.isw.sfsc.framework.api.registry.ApiRegistryManager; import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; +import de.unistuttgart.isw.sfsc.framework.types.MessageType; +import de.unistuttgart.isw.sfsc.framework.types.Topic; import java.util.Map; import java.util.UUID; import java.util.function.Supplier; public class ServiceFactory { - private static final ByteString defaultType = ByteString.EMPTY; + private static final MessageType defaultType = MessageType.newBuilder().setType(ByteString.EMPTY).build(); private static final Supplier defaultIdGenerator = () -> UUID.randomUUID().toString(); private static final Map defaultCustomTags = Map.of(); private final Scheduler scheduler; @@ -47,11 +49,11 @@ public String createServiceId() { return defaultIdGenerator.get(); } - public ByteString createTopic() { - return ByteString.copyFromUtf8(defaultIdGenerator.get()); + public Topic createTopic() { + return Topic.newBuilder().setTopic(ByteString.copyFromUtf8(defaultIdGenerator.get())).build(); } - public ByteString defaultType() { + public MessageType defaultType() { return defaultType; } diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscClientImplementation.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscClientImplementation.java index 3c463c5..380cc92 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscClientImplementation.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscClientImplementation.java @@ -21,21 +21,21 @@ public SfscClientImplementation(SfscServiceApi sfscServiceApi, ServiceFactory se this.sfscServiceApi = sfscServiceApi; this.client = new AckClient( serviceFactory.pubSubConnection(), - serviceFactory.createTopic(), + serviceFactory.createTopic().getTopic(), serviceFactory.scheduler()); } @Override public void request(SfscServiceDescriptor serverDescriptor, Message payload, Consumer consumer, int timeoutMs, Runnable timeoutRunnable) { - ByteString serverTopic = serverDescriptor.getServerTags().getInputTopic(); + ByteString serverTopic = serverDescriptor.getServiceTags().getServerTags().getInputTopic().getTopic(); client.send(serverTopic, payload, consumer, timeoutMs, timeoutRunnable); } @Override public Future requestChannel(SfscServiceDescriptor channelFactoryDescriptor, ByteString payload, int timeoutMs, Consumer consumer) { - ByteString channelFactoryTopic = channelFactoryDescriptor.getServerTags().getInputTopic(); + ByteString channelFactoryTopic = channelFactoryDescriptor.getServiceTags().getServerTags().getInputTopic().getTopic(); ChannelFactoryClient channelFactoryClient = new ChannelFactoryClient(sfscServiceApi, consumer); FutureAdapter futureAdapter = new FutureAdapter<>( channelFactoryClient::process, diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscServerImplementation.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscServerImplementation.java index 1d88580..dbdca77 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscServerImplementation.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscServerImplementation.java @@ -5,11 +5,15 @@ import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.PubSubConnection; import de.unistuttgart.isw.sfsc.framework.api.services.ServiceFactory; import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.AckSettings; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.RegexDefinition; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.AckSettings; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.RegexDefinition; import de.unistuttgart.isw.sfsc.framework.patterns.ackreqrep.AckServer; import de.unistuttgart.isw.sfsc.framework.patterns.ackreqrep.AckServerResult; +import de.unistuttgart.isw.sfsc.framework.types.MessageType; +import de.unistuttgart.isw.sfsc.framework.types.SfscId; +import de.unistuttgart.isw.sfsc.framework.types.Topic; import java.util.Optional; import java.util.function.Function; @@ -26,39 +30,39 @@ public SfscServerImplementation(SfscServerParameter parameter, ServiceFactory se Function serverFunction) { PubSubConnection pubSubConnection = serviceFactory.pubSubConnection(); String serviceId = serviceFactory.createServiceId(); + MessageType inputMessageType = parameter.getInputMessageType() == null ? serviceFactory.defaultType() : MessageType.newBuilder().setType(parameter.getInputMessageType()).build(); + MessageType outputMessageType = parameter.getOutputMessageType() == null ? serviceFactory.defaultType() : MessageType.newBuilder().setType(parameter.getOutputMessageType()).build(); + Topic topic = parameter.getInputTopic() == null ? serviceFactory.createTopic() : Topic.newBuilder().setTopic(parameter.getInputTopic()).build(); descriptor = SfscServiceDescriptor .newBuilder() - .setServiceId(serviceId) - .setAdapterId(serviceFactory.adapterId()) - .setCoreId(serviceFactory.coreId()) + .setServiceId(SfscId.newBuilder().setId(serviceId).build()) + .setAdapterId(SfscId.newBuilder().setId(serviceFactory.adapterId()).build()) + .setCoreId(SfscId.newBuilder().setId(serviceFactory.coreId()).build()) .setServiceName(Optional.ofNullable(parameter.getServiceName()).orElse(serviceId)) .putAllCustomTags(Optional.ofNullable(parameter.getCustomTags()).orElseGet(serviceFactory::defaultCustomTags)) - .setServerTags(ServerTags + .setServiceTags(ServiceTags.newBuilder().setServerTags(ServerTags .newBuilder() - .setInputTopic(Optional.ofNullable(parameter.getInputTopic()).orElseGet(serviceFactory::createTopic)) - .setInputMessageType(Optional - .ofNullable(parameter.getInputMessageType()) - .orElseGet(serviceFactory::defaultType)) - .setOutputMessageType(Optional - .ofNullable(parameter.getOutputMessageType()) - .orElseGet(serviceFactory::defaultType)) + .setInputTopic(topic) + .setInputMessageType(inputMessageType) + .setOutputMessageType(outputMessageType) .setRegex(Optional.ofNullable(parameter.getRegexDefinition()).orElse(defaultRegex)) .setAckSettings(AckSettings .newBuilder() .setTimeoutMs(Optional.ofNullable(parameter.getTimeoutMs()).orElse(defaultTimeout)) .setSendMaxTries(Optional.ofNullable(parameter.getSendMaxTries()).orElse(defaultSendMaxTries)) .build()) - .build()) + .build()).build() + ) .build(); AckServer server = new AckServer( pubSubConnection, serviceFactory.scheduler(), serverFunction, - descriptor.getServerTags().getInputTopic(), - descriptor.getServerTags().getAckSettings().getTimeoutMs(), - descriptor.getServerTags().getAckSettings().getTimeoutMs(), - descriptor.getServerTags().getAckSettings().getSendMaxTries() + descriptor.getServiceTags().getServerTags().getInputTopic().getTopic(), + descriptor.getServiceTags().getServerTags().getAckSettings().getTimeoutMs(), + descriptor.getServiceTags().getServerTags().getAckSettings().getTimeoutMs(), + descriptor.getServiceTags().getServerTags().getAckSettings().getSendMaxTries() ); Handle handle = serviceFactory.registerService(descriptor); diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscServerParameter.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscServerParameter.java index a09af1a..9746452 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscServerParameter.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/clientserver/SfscServerParameter.java @@ -1,8 +1,8 @@ package de.unistuttgart.isw.sfsc.framework.api.services.clientserver; import com.google.protobuf.ByteString; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.RegexDefinition; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.RegexDefinition.VarRegex.RegexCase; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.RegexDefinition; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.RegexDefinition.VarRegex.RegexCase; import java.util.Map; import java.util.Objects; diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/pubsub/SfscPublisherImplementation.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/pubsub/SfscPublisherImplementation.java index a2b2bc9..61bd98f 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/pubsub/SfscPublisherImplementation.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/pubsub/SfscPublisherImplementation.java @@ -10,7 +10,11 @@ import de.unistuttgart.isw.sfsc.commonjava.zmq.pubsubsocketpair.inputmanagement.subscription.SubscriptionTracker; import de.unistuttgart.isw.sfsc.framework.api.services.ServiceFactory; import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.PublisherTags; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.PublisherTags; +import de.unistuttgart.isw.sfsc.framework.types.MessageType; +import de.unistuttgart.isw.sfsc.framework.types.SfscId; +import de.unistuttgart.isw.sfsc.framework.types.Topic; import java.util.Optional; public final class SfscPublisherImplementation implements SfscPublisher { @@ -28,22 +32,25 @@ public final class SfscPublisherImplementation implements SfscPublisher { public SfscPublisherImplementation(SfscPublisherParameter parameter, ServiceFactory serviceFactory) { PubSubConnection pubSubConnection = serviceFactory.pubSubConnection(); String serviceId = serviceFactory.createServiceId(); + Topic outputTopic = parameter.getOutputTopic() == null ? serviceFactory.createTopic() : Topic.newBuilder().setTopic(parameter.getOutputTopic()).build(); + MessageType outputMessageType = parameter.getOutputMessageType() == null ? serviceFactory.defaultType() : MessageType.newBuilder().setType(parameter.getOutputMessageType()).build(); descriptor = SfscServiceDescriptor.newBuilder() - .setServiceId(serviceId) - .setAdapterId(serviceFactory.adapterId()) - .setCoreId(serviceFactory.coreId()) + .setServiceId(SfscId.newBuilder().setId(serviceId).build()) + .setAdapterId(SfscId.newBuilder().setId(serviceFactory.adapterId()).build()) + .setCoreId(SfscId.newBuilder().setId(serviceFactory.coreId()).build()) .setServiceName(Optional.ofNullable(parameter.getServiceName()).orElse(serviceId)) .putAllCustomTags(Optional.ofNullable(parameter.getCustomTags()).orElseGet(serviceFactory::defaultCustomTags)) - .setPublisherTags(PublisherTags.newBuilder() - .setOutputTopic(Optional.ofNullable(parameter.getOutputTopic()).orElseGet(serviceFactory::createTopic)) - .setOutputMessageType(Optional.ofNullable(parameter.getOutputMessageType()).orElseGet(serviceFactory::defaultType)) + .setServiceTags(ServiceTags.newBuilder().setPublisherTags(PublisherTags.newBuilder() + .setOutputTopic(outputTopic) + .setOutputMessageType(outputMessageType) .setUnregistered(Optional.ofNullable(parameter.isUnregistered()).orElse(defaultRegistrationFlag)) - .build()) + .build()).build() + ) .build(); - Handle handle = descriptor.getPublisherTags().getUnregistered() ? null : serviceFactory.registerService(descriptor); + Handle handle = descriptor.getServiceTags().getPublisherTags().getUnregistered() ? null : serviceFactory.registerService(descriptor); closeCallback = handle != null ? handle::close : null; - topic = descriptor.getPublisherTags().getOutputTopic(); + topic = descriptor.getServiceTags().getPublisherTags().getOutputTopic().getTopic(); topicCache = topic.toByteArray(); publisher = new Publisher(pubSubConnection); subscriptionTracker = pubSubConnection.subscriptionTracker(); diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/pubsub/SfscSubscriberImplementation.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/pubsub/SfscSubscriberImplementation.java index de4d959..d49ef9d 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/pubsub/SfscSubscriberImplementation.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/api/services/pubsub/SfscSubscriberImplementation.java @@ -15,7 +15,7 @@ public SfscSubscriberImplementation(SfscServiceDescriptor publisherDescriptor, S Subscriber subscriber = new Subscriber( serviceFactory.pubSubConnection(), subscriberConsumer, - publisherDescriptor.getPublisherTags().getOutputTopic(), + publisherDescriptor.getServiceTags().getPublisherTags().getOutputTopic().getTopic(), serviceFactory.scheduler()); closeCallback = subscriber::close; } diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckClient.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckClient.java index c6b0fae..aa1d8a4 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckClient.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckClient.java @@ -11,6 +11,7 @@ import de.unistuttgart.isw.sfsc.commonjava.zmq.util.SubscriptionAgent; import de.unistuttgart.isw.sfsc.framework.messagingpatterns.ackreqrep.RequestOrAcknowledge; import de.unistuttgart.isw.sfsc.framework.messagingpatterns.ackreqrep.RequestOrAcknowledge.Request; +import de.unistuttgart.isw.sfsc.framework.types.Topic; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; @@ -40,7 +41,7 @@ public void send(ByteString targetTopic, Message payload, Consumer c RequestOrAcknowledge wrapRequest(int id, Message payload) { Request request = Request.newBuilder() .setRequestPayload(payload.toByteString()) - .setReplyTopic(replyTopic) + .setReplyTopic(Topic.newBuilder().setTopic(replyTopic).build()) .setExpectedReplyId(id) .build(); diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckClientConsumer.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckClientConsumer.java index 989c235..cd493f4 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckClientConsumer.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckClientConsumer.java @@ -30,7 +30,7 @@ public void accept(ByteString ignored, ByteString data) { int replyId = reply.getReplyId(); ByteString replyPayload = reply.getReplyPayload(); callbackRegistry.performCallback(replyId, replyPayload); - ByteString acknowledgeTopic = reply.getAcknowledgeTopic(); + ByteString acknowledgeTopic = reply.getAcknowledgeTopic().getTopic(); int acknowledgeId = reply.getExpectedAcknowledgeId(); RequestOrAcknowledge acknowledge = wrapAcknowledge(acknowledgeId); publisher.publish(acknowledgeTopic, acknowledge); diff --git a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckServerConsumer.java b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckServerConsumer.java index e79a4b3..3f35b01 100644 --- a/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckServerConsumer.java +++ b/framework/src/main/java/de/unistuttgart/isw/sfsc/framework/patterns/ackreqrep/AckServerConsumer.java @@ -15,6 +15,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import de.unistuttgart.isw.sfsc.framework.types.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,7 @@ public void accept(ByteString ignored, ByteString data) { case REQUEST: { Request request = requestOrAcknowledge.getRequest(); int replyId = request.getExpectedReplyId(); - ByteString replyTopic = request.getReplyTopic(); + ByteString replyTopic = request.getReplyTopic().getTopic(); ByteString requestPayload = request.getRequestPayload(); AckServerResult ackServerResult = serverFunction.apply(requestPayload); int acknowledgeId = idGenerator.get(); @@ -97,7 +98,7 @@ public void accept(ByteString ignored, ByteString data) { Reply wrapReply(int acknowledgeId, ByteString acknowledgeTopic, int replyId, Message payload) { return Reply .newBuilder() - .setAcknowledgeTopic(acknowledgeTopic) + .setAcknowledgeTopic(Topic.newBuilder().setTopic(acknowledgeTopic).build()) .setExpectedAcknowledgeId(acknowledgeId) .setReplyId(replyId) .setReplyPayload(payload.toByteString()) diff --git a/hello-world/src/main/java/de/unistuttgart/isw/sfsc/benchmark/Benchmark.java b/hello-world/src/main/java/de/unistuttgart/isw/sfsc/benchmark/Benchmark.java index b24cfdf..58dd8ba 100644 --- a/hello-world/src/main/java/de/unistuttgart/isw/sfsc/benchmark/Benchmark.java +++ b/hello-world/src/main/java/de/unistuttgart/isw/sfsc/benchmark/Benchmark.java @@ -7,7 +7,7 @@ public class Benchmark { private static final AdapterConfiguration clientConfiguration = new AdapterConfiguration().setCorePubTcpPort(1251); - private static final AdapterConfiguration serverConfiguration = new AdapterConfiguration().setCorePubTcpPort(1261); + private static final AdapterConfiguration serverConfiguration = new AdapterConfiguration().setCorePubTcpPort(1251); public static void main(String[] args) throws Exception { diff --git a/hello-world/src/main/java/de/unistuttgart/isw/sfsc/plc4x/Plc4xDemo.java b/hello-world/src/main/java/de/unistuttgart/isw/sfsc/plc4x/Plc4xDemo.java index e34a7ee..90ded1a 100644 --- a/hello-world/src/main/java/de/unistuttgart/isw/sfsc/plc4x/Plc4xDemo.java +++ b/hello-world/src/main/java/de/unistuttgart/isw/sfsc/plc4x/Plc4xDemo.java @@ -19,9 +19,9 @@ import de.unistuttgart.isw.sfsc.framework.api.services.pubsub.SfscPublisherParameter; import de.unistuttgart.isw.sfsc.framework.api.services.pubsub.SfscSubscriber; import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.RegexDefinition; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.RegexDefinition.VarRegex; -import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServerTags.RegexDefinition.VarRegex.StringRegex; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.RegexDefinition; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.RegexDefinition.VarRegex; +import de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor.ServiceTags.ServerTags.RegexDefinition.VarRegex.StringRegex; import de.unistuttgart.isw.sfsc.framework.patterns.ackreqrep.AckServerResult; import java.util.List; import java.util.Map; diff --git a/pom.xml b/pom.xml index 967273e..0e9457a 100644 --- a/pom.xml +++ b/pom.xml @@ -64,7 +64,7 @@ - 0.1.9 + 0.1.10 1.7.30 3.11.0 diff --git a/proto/src/main/proto/clientserver/protocol/registry/command/command.proto b/proto/src/main/proto/clientserver/protocol/registry/command/command.proto index 98956e4..f9f2505 100644 --- a/proto/src/main/proto/clientserver/protocol/registry/command/command.proto +++ b/proto/src/main/proto/clientserver/protocol/registry/command/command.proto @@ -1,14 +1,15 @@ syntax = "proto3"; package de.unistuttgart.isw.sfsc.clientserver.protocol.registry.command; - +import "framework/types/types.proto"; +import "framework/descriptor/service.proto"; option java_multiple_files = true; message CommandRequest { - string adapterId = 1; + de.unistuttgart.isw.sfsc.framework.types.SfscId adapterId = 1; oneof create_or_delete{ - bytes create = 2; - bytes delete = 3; + de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor create_request = 2; + de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor delete_request = 3; } } diff --git a/proto/src/main/proto/clientserver/protocol/registry/query/query.proto b/proto/src/main/proto/clientserver/protocol/registry/query/query.proto index 5e627bc..09ff730 100644 --- a/proto/src/main/proto/clientserver/protocol/registry/query/query.proto +++ b/proto/src/main/proto/clientserver/protocol/registry/query/query.proto @@ -1,7 +1,8 @@ syntax = "proto3"; package de.unistuttgart.isw.sfsc.clientserver.protocol.registry.query; - +import "framework/types/types.proto"; +import "framework/descriptor/service.proto"; option java_multiple_files = true; message QueryRequest { @@ -11,14 +12,16 @@ message QueryRequest { message QueryReply { int64 event_id = 1; oneof created_or_deleted_or_expired_or_future { - bytes created = 2; - bytes deleted = 3; + de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor created = 2; + de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor deleted = 3; Expired expired = 4; Future future = 5; } message Expired { + int64 oldest_valid_event_id = 1; } message Future { + int64 newest_valid_event_id = 1; } } diff --git a/proto/src/main/proto/clientserver/protocol/session/handshake/handshake.proto b/proto/src/main/proto/clientserver/protocol/session/handshake/handshake.proto index 92fe7ec..83bbfab 100644 --- a/proto/src/main/proto/clientserver/protocol/session/handshake/handshake.proto +++ b/proto/src/main/proto/clientserver/protocol/session/handshake/handshake.proto @@ -1,14 +1,14 @@ syntax = "proto3"; package de.unistuttgart.isw.sfsc.clientserver.protocol.session.handshake; - +import "framework/types/types.proto"; option java_multiple_files = true; message Hello { - string adapter_id = 1; - bytes heartbeat_topic = 2; + de.unistuttgart.isw.sfsc.framework.types.SfscId adapter_id = 1; + de.unistuttgart.isw.sfsc.framework.types.Topic heartbeat_topic = 2; } message Welcome { - string core_id = 1; + de.unistuttgart.isw.sfsc.framework.types.SfscId core_id = 1; } diff --git a/proto/src/main/proto/clientserver/protocol/session/heartbeat/heartbeat.proto b/proto/src/main/proto/clientserver/protocol/session/heartbeat/heartbeat.proto index 0a00b7e..2e15407 100644 --- a/proto/src/main/proto/clientserver/protocol/session/heartbeat/heartbeat.proto +++ b/proto/src/main/proto/clientserver/protocol/session/heartbeat/heartbeat.proto @@ -1,9 +1,9 @@ syntax = "proto3"; package de.unistuttgart.isw.sfsc.clientserver.protocol.session.heartbeat; - +import "framework/types/types.proto"; option java_multiple_files = true; message HeartbeatMessage { - string id = 1; + de.unistuttgart.isw.sfsc.framework.types.SfscId adapter_id = 1; } diff --git a/proto/src/main/proto/framework/descriptor/service.proto b/proto/src/main/proto/framework/descriptor/service.proto index fa1905f..30efb85 100644 --- a/proto/src/main/proto/framework/descriptor/service.proto +++ b/proto/src/main/proto/framework/descriptor/service.proto @@ -1,58 +1,62 @@ syntax = "proto3"; package de.unistuttgart.isw.sfsc.framework.descriptor; - +import "framework/types/types.proto"; option java_multiple_files = true; message SfscServiceDescriptor { - string service_id = 1; - string adapter_id = 2; - string core_id = 3; + de.unistuttgart.isw.sfsc.framework.types.SfscId service_id = 1; + de.unistuttgart.isw.sfsc.framework.types.SfscId adapter_id = 2; + de.unistuttgart.isw.sfsc.framework.types.SfscId core_id = 3; string service_name = 4; map custom_tags = 5; - oneof serviceTags { - PublisherTags publisher_tags = 6; - ServerTags server_tags = 7; - } - - message PublisherTags { - bytes output_topic = 1; - bytes output_message_type = 2; - bool unregistered = 3; - } - - message ServerTags { - bytes input_topic = 1; - bytes input_message_type = 2; - bytes output_message_type = 3; - RegexDefinition regex = 4; - AckSettings ack_settings = 5; - - message RegexDefinition { - repeated VarRegex regexes = 1; - - message VarRegex { - string varName = 1; - oneof regex { - StringRegex string_regex = 2; - NumberRegex number_regex = 3; - } - - message StringRegex { - string regex = 1; - } - - message NumberRegex { - sint64 lowerBound = 1; - sint64 upperBound = 2; - } - } - } - - message AckSettings { - int32 timeout_ms = 1; - int32 send_max_tries = 2; - } - } - + ServiceTags service_tags = 6; + + message ServiceTags { + + oneof serviceTags { + PublisherTags publisher_tags = 1; + ServerTags server_tags = 2; + } + + message PublisherTags { + de.unistuttgart.isw.sfsc.framework.types.Topic output_topic = 1; + de.unistuttgart.isw.sfsc.framework.types.MessageType output_message_type = 2; + bool unregistered = 3; + } + + message ServerTags { + de.unistuttgart.isw.sfsc.framework.types.Topic input_topic = 1; + de.unistuttgart.isw.sfsc.framework.types.MessageType input_message_type = 2; + de.unistuttgart.isw.sfsc.framework.types.MessageType output_message_type = 3; + RegexDefinition regex = 4; + AckSettings ack_settings = 5; + + message RegexDefinition { + repeated VarRegex regexes = 1; + + message VarRegex { + string varName = 1; + oneof regex { + StringRegex string_regex = 2; + NumberRegex number_regex = 3; + } + + message StringRegex { + string regex = 1; + } + + message NumberRegex { + sint64 lowerBound = 1; + sint64 upperBound = 2; + } + } + } + + message AckSettings { + int32 timeout_ms = 1; + int32 send_max_tries = 2; + } + } + } } diff --git a/proto/src/main/proto/framework/messagingpatterns/ackreqrep/ackreqrep.proto b/proto/src/main/proto/framework/messagingpatterns/ackreqrep/ackreqrep.proto index 5d85477..59478af 100644 --- a/proto/src/main/proto/framework/messagingpatterns/ackreqrep/ackreqrep.proto +++ b/proto/src/main/proto/framework/messagingpatterns/ackreqrep/ackreqrep.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package de.unistuttgart.isw.sfsc.framework.messagingpatterns.ackreqrep; - +import "framework/types/types.proto"; option java_multiple_files = true; message RequestOrAcknowledge { @@ -11,7 +11,7 @@ message RequestOrAcknowledge { } message Request { - bytes reply_topic = 1; + de.unistuttgart.isw.sfsc.framework.types.Topic reply_topic = 1; int32 expected_reply_id = 2; bytes request_payload = 3; } @@ -22,7 +22,7 @@ message RequestOrAcknowledge { } message Reply { - bytes acknowledge_topic = 1; + de.unistuttgart.isw.sfsc.framework.types.Topic acknowledge_topic = 1; int32 expected_acknowledge_id = 2; int32 reply_id = 3; bytes reply_payload = 4; diff --git a/proto/src/main/proto/framework/types/types.proto b/proto/src/main/proto/framework/types/types.proto new file mode 100644 index 0000000..b1f50b6 --- /dev/null +++ b/proto/src/main/proto/framework/types/types.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package de.unistuttgart.isw.sfsc.framework.types; + +option java_multiple_files = true; + +message SfscId { + string id = 1; +} + +message Topic { + bytes topic = 1; +} + +message MessageType { + bytes type = 1; +} \ No newline at end of file diff --git a/proto/src/main/proto/serverserver/registry/registryEntry.proto b/proto/src/main/proto/serverserver/registry/registryEntry.proto index d7ba072..8466ef7 100644 --- a/proto/src/main/proto/serverserver/registry/registryEntry.proto +++ b/proto/src/main/proto/serverserver/registry/registryEntry.proto @@ -1,11 +1,12 @@ syntax = "proto3"; package de.unistuttgart.isw.sfsc.serverserver.registry; - +import "framework/types/types.proto"; +import "framework/descriptor/service.proto"; option java_multiple_files = true; message RegistryEntry { - string adapterId = 1; - string coreId = 2; - bytes data = 3; + de.unistuttgart.isw.sfsc.framework.types.SfscId adapterId = 1; + de.unistuttgart.isw.sfsc.framework.types.SfscId coreId = 2; + de.unistuttgart.isw.sfsc.framework.descriptor.SfscServiceDescriptor data = 3; }