diff --git a/common/src/testFixtures/avro/ClickStats.avsc b/common/src/testFixtures/avro/ClickStats.avsc new file mode 100644 index 00000000..00e427d5 --- /dev/null +++ b/common/src/testFixtures/avro/ClickStats.avsc @@ -0,0 +1,15 @@ +{ + "namespace": "com.bakdata.quick.avro", + "name": "ClickStatsAvro", + "type": "record", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "amount", + "type": "long" + } + ] +} diff --git a/common/src/testFixtures/avro/PurchaseStats.avsc b/common/src/testFixtures/avro/PurchaseStats.avsc new file mode 100644 index 00000000..61f06581 --- /dev/null +++ b/common/src/testFixtures/avro/PurchaseStats.avsc @@ -0,0 +1,15 @@ +{ + "namespace": "com.bakdata.quick.avro", + "name": "PurchaseStatsAvro", + "type": "record", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "amount", + "type": "long" + } + ] +} diff --git a/common/src/testFixtures/proto/test.proto b/common/src/testFixtures/proto/test.proto index bf9d4720..1ce6732f 100644 --- a/common/src/testFixtures/proto/test.proto +++ b/common/src/testFixtures/proto/test.proto @@ -15,6 +15,16 @@ message ComplexProtoTestRecord { ProtoTestRecord protoTestRecord = 2; } +message ClickStatsProto { + string id = 1; + int64 amount = 2; +} + +message PurchaseStatsProto { + string id = 1; + int64 amount = 2; +} + message ProtoRangeQueryTest { int64 userId = 1; int32 timestamp = 2; diff --git a/docs/docs/developer/multi-subscription-details.md b/docs/docs/developer/multi-subscription-details.md new file mode 100644 index 00000000..c30c05e2 --- /dev/null +++ b/docs/docs/developer/multi-subscription-details.md @@ -0,0 +1,90 @@ +# Multi-subscriptions details + +The introduction to multi-subscriptions is +[here](../user/getting-started/working-with-quick/multi-subscriptions.md). + +We now discuss implementation details of multi-subscriptions. +As a reminder, we followed these steps: + +1. Define the multi-subscription. +2. Apply the new schema. +3. Run the multi-subscription. +4. Ingest data. + + +## Apply the new schema + +When applied, +a [`MultiSubscriptionFetcher`](https://github.com/bakdata/quick/blob/0.7.0/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/MultiSubscriptionFetcher.java) +is created. +During this process, +Quick constructs a [`DataFetcherClient`](https://github.com/bakdata/quick/blob/0.7.0/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/DataFetcherClient.java) +(for REST calls) +and a [`SubscriptionProvider`](https://github.com/bakdata/quick/blob/0.7.0/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/SubscriptionProvider.java) +(for consuming from a topic) +for each field marked with the `@topic` directive. + +## Run the multi-subscription + +When a query is executed, +the created Kafka Consumers poll the corresponding topics for events. +When a new event is emitted, +it is sent via a WebSocket to the user. +To get the missing part of complex objects, the `MultiSubscriptionFetcher` +uses either the REST service of the corresponding mirror +or an internal cache to fetch missing data. +This choice depends on the current scenario. +In our [example](../user/getting-started/working-with-quick/multi-subscriptions.md), +there are three scenarios for building complex objects via multi-subscriptions: + +1. Purchase event arrives; there were some click events, + but they have not been seen by the `MultiSubscriptionFetcher` yet + (for example, because the subscription started after the events were produced). +2. Purchase event arrives, and we have already seen the click event for the corresponding id. +3. Purchase event arrives, and there has been no click event. + In the above considerations, the first event is `Purchase`. + However, the choice is interchangeable. + The system behaves similarly + if the order of arrival changes, + i.e., `Click` first, then `Purchase`. + +## Ingest data + +Say you first ingest a single `Purchase`. +Thus, a `Click` is missing. +`MultiSubscriptionFetcher` first checks the internal cache to see +whether there is a `Click` that refers to the particular id +(the same the `Purchase` refers to). +If successful, `MultiSubscriptionFetcher` retrieves the value from the cache, +creates a complex object, +and returns it to the user (Scenario 2). +If there is a cache miss, +`MultiSubscriptionFetcher` uses the REST interface of the corresponding client-mirror +and sends a request for the desired key. +If there were previous click events (Scenario +1), it retrieves `Click` data, +creates the complex object +and inserts the information into the cache for later use. +If it receives nothing from the REST endpoint, +the result depends on the user's decision concerning nullable values. +If a user allows nullable values, +an incomplete object will be returned, e.g.: +```json +{ + "data": { + "userStatistics": { + "purchase": { + "purchaseId": "abc" + }, + "click": null + } + } + } +``` + +If nullable values are not allowed, +a user will receive a subscription error: +```console +Error: The field at path '/userStatistics/click' was declared as a non null type, +but the code involved in retrieving data has wrongly returned a null value [...]. +``` diff --git a/docs/docs/user/getting-started/working-with-quick/multi-subscriptions.md b/docs/docs/user/getting-started/working-with-quick/multi-subscriptions.md new file mode 100644 index 00000000..8246cd25 --- /dev/null +++ b/docs/docs/user/getting-started/working-with-quick/multi-subscriptions.md @@ -0,0 +1,210 @@ +# Multi-subscriptions + +In [Subscriptions](subscriptions.md), you learned +how to get real-time updates using the `Subscription` type. +The example discussed receiving updates from a single topic. + +Quick also allows you to create a so-called multi-subscription. +A multi-subscription enables you to +retrieve complex objects where elements +come from more than one Kafka topic. +To integrate a multi-subscription into your application, +we take the following steps: + +1. Define the multi-subscription. +2. Apply the new schema. +3. Run the multi-subscription. +4. Ingest data. + +--- + +For the purpose of this tutorial, +consider the following scenario: +You want live statistics +of users' clicks and purchases. +Information about clicks and purchases is stored +in separate Kafka topics. +To retrieve such combined users' statistics, +you create a multi-subscription. +Before you define the multi-subscription, +add the following type to your schema `schema.gql`: +```graphql +type Click { + userId: Int! + timestamp: Int! +} +``` +Next, create a topic that holds `Click` entries: +```shell +quick topic create click --key-type string + --value-type schema --schema example.Click +``` + + +## Modify your schema with and define the multi-subscription + +As a first step, you extend the schema +(from earlier sections of this documentation) +with the definition of a multi-subscription as follows: +```graphql +type Subscription { + userStatistics: UserStatistics +} + +type UserStatistics { + purchases: Purchase @topic(name: "purchase") + clicks: Click @topic(name: "click") +} +``` + +Note that multi-subscriptions need different modelling than single subscriptions. +In a [single subscription](subscriptions.md), you add a topic directive +(which references a Kafka topic) to the field that describes the entities +you want to receive updates for, i.e.: +```graphql title="schema.gql" +type Subscription { + purchases: Purchase @topic(name: "purchase") +} +``` + +In a multi-subscription, the field `userStatistics` in `Subscription` +is defined through another type, i.e., `UserStatistics`. +This `UserStatistics` comprises the desired fields, each followed by the `@topic` directive. + +## Apply the new schema + +You can now apply the modified schema to your gateway: +```console +quick gateway apply example -f schema.gql +``` + +## Run the multi-subscription + +To demo the multi-subscription, +we use a GraphQL client, e.g., [Altair](https://altair.sirmuel.design/). +The setup is described [here](subscriptions.md). + +Subscribe to user statistics with the following query: +```graphql title="subscription.gql" +subscription { + userStatistics { + purchase { + purchaseId + } + click { + userId + timestamp + } + } +} +``` + +In Altair, you would press the `Run subscription` button +to start the subscription. + +## Ingest data + +You can now ingest data to the different topics +and see how the multi-subscription behaves. + +Start with adding a single purchase: +```json title="subscription-purchase.json" +{ + "key": "abc", + "value": { + "purchaseId": "abc", + "productId": 123, + "userId": 2, + "amount": 2, + "price": { + "total": 29.99, + "currency": "DOLLAR" + } + } +} +``` +```shell + curl --request POST --url "$QUICK_URL/ingest/purchase" \ + --header "content-type:application/json" \ + --header "X-API-Key:$QUICK_API_KEY"\ + --data "@./subscription-purchase.json" +``` + +As a result, you see the following JSON response in your GraphQL client: +```json +{ + "data": { + "userStatistics": { + "purchase": { + "purchaseId": "abc" + }, + "click": null + } + } +} +``` + +There hasn't been any click event yet, +so the response contains only purchase data. + +Now, send the following click event via the ingest service. +!!! Note + The key of the click event equals the id of the purchase event. +```json title="click1.json" +{ + "key": "abc", + "value": { + "userId": 2, + "timestamp": 123456 + } +} +``` +```shell + curl --request POST --url "$QUICK_URL/ingest/click" \ + --header "content-type:application/json" \ + --header "X-API-Key:$QUICK_API_KEY"\ + --data "@./click1.json" +``` + +You should see the following subscription result in your GraphQL client: +```json +{ + "data": { + "userStatistics": { + "purchase": { + "purchaseId": "abc" + }, + "click": { + "userId": 2, + "timestamp": 123456 + } + } + } +} +``` + +Now consider another click with the same id `abc` but with a different timestamp. +This causes a new response with the latest timestamp. +```json title="click2.json" +{ + "key": "abc", + "value": { + "userId": 2, + "timestamp": 234567 + } +} +``` + +As you can see, when a new event of one type, say `Click`, arrives, +you immediately receive the latest state of the other type, here `Purchase`. + +Thus, Quick automatically creates an up-to-date response +with elements of the different types stored in different topics. +This mechanism can be generalized to multi-subscriptions +that comprise more than two types (topics). +For example, in a type that consists of three elements +a new event causes a fetch of the corresponding other types +to create a response immediately. + +Please see the [developer section on multi-subscriptions](../../../developer/multi-subscription-details.md), +where we discuss the subtleties of that process. diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 17d394ce..acbefc2c 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -65,6 +65,7 @@ nav: - user/getting-started/working-with-quick/ingest-data.md - user/getting-started/working-with-quick/query-data.md - user/getting-started/working-with-quick/subscriptions.md + - user/getting-started/working-with-quick/multi-subscriptions.md - user/getting-started/working-with-quick/range-query.md - user/getting-started/teardown-resources.md - Examples: @@ -87,6 +88,7 @@ nav: - developer/operations.md - developer/cli.md - developer/notice.md + - developer/multi-subscription-details.md - developer/range-query-details.md - Changelog: changelog.md - Roadmap: roadmap.md diff --git a/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/MultiSubscriptionFetcher.java b/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/MultiSubscriptionFetcher.java index bb3f4e23..ce2a9625 100644 --- a/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/MultiSubscriptionFetcher.java +++ b/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/MultiSubscriptionFetcher.java @@ -134,29 +134,19 @@ private static List getSelectedFields(final DataFetchingEnvironment envi */ private Mono> createComplexType(final NamedRecord namedRecord, final List selectedFields) { + + final ConsumerRecord record = namedRecord.getConsumerRecord(); // map holding the data for current key final Map complexType = new HashMap<>(); - complexType.put(namedRecord.getFieldName(), namedRecord.getConsumerRecord().value()); + complexType.put(namedRecord.getFieldName(), record.value()); - final FieldKey key = new FieldKey<>(namedRecord.getFieldName(), namedRecord.getConsumerRecord().key()); - final CompletableFuture recordValue = - CompletableFuture.completedFuture(namedRecord.getConsumerRecord().value()); - log.info("Update {} with {}", key, namedRecord.getConsumerRecord().value()); - this.fieldCache.put(key, recordValue); + this.updateFieldCache(namedRecord.getFieldName(), record); final Flux> fieldKeysToPopulate = Flux.fromIterable(selectedFields) .filter(fieldName -> !fieldName.equals(namedRecord.getFieldName())) - .map(fieldName -> new FieldKey<>(fieldName, namedRecord.getConsumerRecord().key())); - - final Flux> fieldValues = fieldKeysToPopulate.flatMap(fieldKey -> { - log.info("Get key {}", fieldKey); - return Mono.fromFuture(this.fieldCache.get(fieldKey)) - .map(value -> { - log.info("Set key {} to {}", fieldKey.getFieldName(), value); - return new FieldValue<>(fieldKey.getFieldName(), value); - }); - } - ); + .map(fieldName -> new FieldKey<>(fieldName, record.key())); + + final Flux> fieldValues = fieldKeysToPopulate.flatMap(this::getValueForKey); return fieldValues.reduce(complexType, (map, fieldValue) -> { map.put(fieldValue.getFieldName(), fieldValue.getValue()); @@ -164,6 +154,23 @@ private Mono> createComplexType(final NamedRecord name }); } + private void updateFieldCache(final String fieldName, final ConsumerRecord record) { + final FieldKey key = new FieldKey<>(fieldName, record.key()); + final CompletableFuture recordValue = + CompletableFuture.completedFuture(record.value()); + + log.info("Update field cache with polled record: Key {} has value {}", key, record.value()); + this.fieldCache.put(key, recordValue); + } + + /** + * Get the latest value from the {@link MultiSubscriptionFetcher#fieldCache}. + */ + private Mono> getValueForKey(final FieldKey fieldKey) { + return Mono.fromFuture(this.fieldCache.get(fieldKey)) + .map(value -> new FieldValue<>(fieldKey.getFieldName(), value)); + } + @Nullable private Object loadField(final FieldKey fieldKey) { final DataFetcherClient client = this.fieldDataFetcherClients.get(fieldKey.getFieldName()); @@ -171,18 +178,32 @@ private Object loadField(final FieldKey fieldKey) { return client.fetchResult(fieldKey.getKey()); } + /** + * Create one Flux that streams the elements of ALL topics selected by the user query. + */ private Flux> combineElementStreams(final List selectedFields, final DataFetchingEnvironment env) { final List>> fluxes = selectedFields.stream() - .map(name -> { - final SubscriptionProvider kafkaSubscriber = this.fieldSubscriptionProviders.get(name); - Objects.requireNonNull(kafkaSubscriber); - final Flux> elementStream = kafkaSubscriber.getElementStream(env); - return elementStream.map(val -> new NamedRecord<>(name, val)); - }).collect(Collectors.toList()); + .map(name -> this.createSubscriptionFlux(env, name)).collect(Collectors.toList()); return Flux.merge(fluxes); } + /** + * Creates a {@link Flux} of {@link NamedRecord} for the given field name. + * + *

+ * The field name points to a topics for which a {@link SubscriptionProvider} emits the consumed records. + */ + private Flux> createSubscriptionFlux(final DataFetchingEnvironment env, + final String name) { + final SubscriptionProvider kafkaSubscriber = Objects.requireNonNull( + this.fieldSubscriptionProviders.get(name), + () -> "No subscription provider found for field " + name + ); + final Flux> elementStream = kafkaSubscriber.getElementStream(env); + return elementStream.map(consumerRecord -> new NamedRecord<>(name, consumerRecord)); + } + /** * Function extracting the selected fields of a GraphQL environment. */ diff --git a/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/SubscriptionFetcher.java b/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/SubscriptionFetcher.java index 1720b27f..fa09a7ab 100644 --- a/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/SubscriptionFetcher.java +++ b/gateway/src/main/java/com/bakdata/quick/gateway/fetcher/subscription/SubscriptionFetcher.java @@ -21,10 +21,10 @@ import com.bakdata.quick.common.config.KafkaConfig; import com.bakdata.quick.common.type.QuickTopicData; import com.bakdata.quick.common.util.Lazy; +import edu.umd.cs.findbugs.annotations.Nullable; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; /** @@ -38,7 +38,7 @@ * @see io.micronaut.configuration.graphql.ws.GraphQLWsSender */ public class SubscriptionFetcher implements DataFetcher> { - final SubscriptionProvider subscriptionProvider; + private final SubscriptionProvider subscriptionProvider; /** * Creates a new SubscriptionFetcher. diff --git a/gateway/src/main/java/com/bakdata/quick/gateway/transformer/MultiSubscriptionTransformer.java b/gateway/src/main/java/com/bakdata/quick/gateway/transformer/MultiSubscriptionTransformer.java index be78ddc8..23abcc09 100644 --- a/gateway/src/main/java/com/bakdata/quick/gateway/transformer/MultiSubscriptionTransformer.java +++ b/gateway/src/main/java/com/bakdata/quick/gateway/transformer/MultiSubscriptionTransformer.java @@ -40,6 +40,25 @@ /** * Post-processing for creating subscriptions of multiple topics. + * + *

+ * This is implemented as {@link SchemaGeneratorPostProcessing} because the fields itself doesn't have a directive + * attached to it. + * + *

+ *

Example:

+ *
{@code
+ * type Subscription {
+ *     getStatistics: Statistics # <- Multi-Subscription Fetcher
+ * }
+ *
+ * type Statistics {
+ *     clickStatistics: ClickStatistics @topic(name: "user-statistics")
+ *     purchaseStatistics: PurchaseStatistics @topic(name: "purchase-statistics")
+ * }
+ * }
+ * + * @see com.bakdata.quick.gateway.directives.topic.rule.fetcher.SubscriptionRule */ @Slf4j @Singleton @@ -51,29 +70,37 @@ public MultiSubscriptionTransformer(final FetcherFactory fetcherFactory) { } @Override - public GraphQLSchema process(final GraphQLSchema originalSchema) { - if (originalSchema.getSubscriptionType() == null) { - return originalSchema; + public GraphQLSchema process(final GraphQLSchema schema) { + if (schema.getSubscriptionType() == null) { + return schema; } - final List fetchers = - originalSchema.getSubscriptionType().getFieldDefinitions().stream() - .filter(field -> !field.getDirectivesByName().containsKey("topic") - && field.getType() instanceof GraphQLObjectType) - .map(this::buildDataFetcher) - .collect(Collectors.toList()); + final List fetchers = schema.getSubscriptionType().getFieldDefinitions().stream() + // Skip fields that are handled by the SubscriptionRule + .filter(field -> !field.getDirectivesByName().containsKey("topic") + && field.getType() instanceof GraphQLObjectType) + .map(this::buildDataFetcher) + .collect(Collectors.toList()); - final GraphQLCodeRegistry codeRegistry = originalSchema.getCodeRegistry().transform(builder -> + final GraphQLCodeRegistry codeRegistry = schema.getCodeRegistry().transform(builder -> fetchers.forEach(spec -> builder.dataFetcher(spec.getCoordinates(), spec.getDataFetcher())) ); - return originalSchema.transform(builder -> builder.codeRegistry(codeRegistry)); + return schema.transform(builder -> builder.codeRegistry(codeRegistry)); } + /** + * Build a {@link MultiSubscriptionFetcher} for each field of the complex type that a subscription field returns. + * + * @param fieldDefinition field of the subscription type + * @return a data fetcher with its field coordinates + */ private DataFetcherSpecification buildDataFetcher(final GraphQLFieldDefinition fieldDefinition) { final GraphQLObjectType objectType = (GraphQLObjectType) fieldDefinition.getType(); + final Map> dataFetchers = new HashMap<>(); final Map> subscriptionProviders = new HashMap<>(); + for (final GraphQLFieldDefinition field : objectType.getFieldDefinitions()) { if (!field.getDirectivesByName().containsKey(TopicDirective.DIRECTIVE_NAME)) { log.warn("Skip field {} of {} in subscription: No topic directive found", field.getName(), @@ -81,9 +108,9 @@ private DataFetcherSpecification buildDataFetcher(final GraphQLFieldDefiniti continue; } - final List arguments = - field.getDirective(TopicDirective.DIRECTIVE_NAME).getArguments(); + final List arguments = field.getDirective(TopicDirective.DIRECTIVE_NAME).getArguments(); final TopicDirective topicDirective = TopicDirective.fromArguments(arguments); + final DataFetcherClient dataFetcherClient = this.fetcherFactory.dataFetcherClient(topicDirective.getTopicName()); final SubscriptionProvider subscriptionProvider = @@ -93,10 +120,13 @@ private DataFetcherSpecification buildDataFetcher(final GraphQLFieldDefiniti dataFetchers.put(field.getName(), dataFetcherClient); subscriptionProviders.put(field.getName(), subscriptionProvider); } + final DataFetcher multiSubscriptionFetcher = new MultiSubscriptionFetcher<>(dataFetchers, subscriptionProviders); final FieldCoordinates coordinates = FieldCoordinates.coordinates(GraphQLUtils.SUBSCRIPTION_TYPE, fieldDefinition.getName()); + return DataFetcherSpecification.of(coordinates, multiSubscriptionFetcher); } + } diff --git a/gateway/src/test/java/com/bakdata/quick/gateway/fetcher/subscription/MultiSubscriptionFetcherTest.java b/gateway/src/test/java/com/bakdata/quick/gateway/fetcher/subscription/MultiSubscriptionFetcherTest.java index 0bc6be17..8c9b9fa6 100644 --- a/gateway/src/test/java/com/bakdata/quick/gateway/fetcher/subscription/MultiSubscriptionFetcherTest.java +++ b/gateway/src/test/java/com/bakdata/quick/gateway/fetcher/subscription/MultiSubscriptionFetcherTest.java @@ -17,9 +17,17 @@ package com.bakdata.quick.gateway.fetcher.subscription; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.bakdata.quick.avro.ClickStatsAvro; +import com.bakdata.quick.avro.PurchaseStatsAvro; import com.bakdata.quick.gateway.fetcher.DataFetcherClient; +import com.bakdata.quick.testutil.ClickStatsProto; +import com.bakdata.quick.testutil.PurchaseStatsProto; import graphql.schema.DataFetchingEnvironmentImpl; import io.reactivex.subscribers.TestSubscriber; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -31,14 +39,13 @@ import reactor.core.publisher.Flux; class MultiSubscriptionFetcherTest { - @Test @DisabledIfEnvironmentVariable(named = "CI", matches = "true", disabledReason = "Flaky on CI") void shouldReturnComplexType() { - final DataFetcherClient field1Client = Mockito.mock(DataFetcherClient.class); - final DataFetcherClient field2Client = Mockito.mock(DataFetcherClient.class); - Mockito.doReturn("field1Key2").when(field1Client).fetchResult("key2"); - Mockito.doReturn("field2Key1").when(field2Client).fetchResult("key1"); + final DataFetcherClient field1Client = mock(DataFetcherClient.class); + final DataFetcherClient field2Client = mock(DataFetcherClient.class); + doReturn("field1Key2").when(field1Client).fetchResult("key2"); + doReturn("field2Key1").when(field2Client).fetchResult("key1"); final SubscriptionProvider field1Subscriber = env -> Flux.just(new ConsumerRecord<>("topic1", 0, 0, "key1", "field1Key1")); @@ -67,8 +74,8 @@ void shouldReturnComplexType() { @Test void shouldReturnSingleFieldOfComplexType() { - final DataFetcherClient field1Client = Mockito.mock(DataFetcherClient.class); - final DataFetcherClient field2Client = Mockito.mock(DataFetcherClient.class); + final DataFetcherClient field1Client = mock(DataFetcherClient.class); + final DataFetcherClient field2Client = mock(DataFetcherClient.class); final SubscriptionProvider field1Subscriber = env -> Flux.just(new ConsumerRecord<>("topic1", 0, 0, "key1", "field1Key1")); @@ -93,4 +100,118 @@ void shouldReturnSingleFieldOfComplexType() { testSubscriber.assertValueAt(0, Map.of("field1", "field1Key1")); Mockito.verifyNoInteractions(field1Client, field2Client); } + + @Test + @DisabledIfEnvironmentVariable(named = "CI", matches = "true", disabledReason = "Flaky on CI") + void shouldFetchValuesForStringKeyAndAvroValue() { + final ClickStatsAvro key1clickStats = newClickStatsAvro("key1", 1); + final PurchaseStatsAvro key1purchaseStats = newPurchaseStatsAvro("key1", 2); + final ClickStatsAvro key2clickStats = newClickStatsAvro("key2", 3); + final PurchaseStatsAvro key2purchaseStats = newPurchaseStatsAvro("key2", 4); + + final DataFetcherClient clickStatsClient = mock(DataFetcherClient.class); + final DataFetcherClient purchaseStatsClient = mock(DataFetcherClient.class); + doReturn(key1purchaseStats).when(purchaseStatsClient).fetchResult("key1"); + doReturn(key2clickStats).when(clickStatsClient).fetchResult("key2"); + + final SubscriptionProvider clickStatsProvider = + env -> Flux.just(new ConsumerRecord<>("topic1", 0, 0, "key1", key1clickStats)); + final SubscriptionProvider purchaseStatsProvider = + env -> Flux.just(new ConsumerRecord<>("topic2", 0, 0, "key2", key2purchaseStats)); + + final Map> fieldClients = Map.of( + "clicks", clickStatsClient, + "purchases", purchaseStatsClient + ); + final Map> fieldSubscribers = Map.of( + "clicks", clickStatsProvider, + "purchases", purchaseStatsProvider + ); + + final List selectedFields = List.of("clicks", "purchases"); + final MultiSubscriptionFetcher fetcher = + new MultiSubscriptionFetcher<>(fieldClients, fieldSubscribers, env -> selectedFields); + + final Publisher> mapPublisher = fetcher.get( + DataFetchingEnvironmentImpl.newDataFetchingEnvironment().build() + ); + + final TestSubscriber> testSubscriber = TestSubscriber.create(); + mapPublisher.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + testSubscriber.assertComplete(); + + final Map expected = new HashMap<>(); + expected.put("clicks", newClickStatsAvro("key1", 1)); + expected.put("purchases", newPurchaseStatsAvro("key1", 2)); + testSubscriber.assertValueAt(0, expected); + + final Map expected2 = new HashMap<>(); + expected2.put("clicks", newClickStatsAvro("key2", 3)); + expected2.put("purchases", newPurchaseStatsAvro("key2", 4)); + testSubscriber.assertValueAt(1, expected2); + } + + @Test + @DisabledIfEnvironmentVariable(named = "CI", matches = "true", disabledReason = "Flaky on CI") + void shouldFetchValuesForDoubleKeyAndProtoValue() { + final ClickStatsProto key1clickStats = newClickStatsProto("key1", 1); + final PurchaseStatsProto key1purchaseStats = newPurchaseStatsProto("key1", 2); + final ClickStatsProto key2clickStats = newClickStatsProto("key2", 3); + final PurchaseStatsProto key2purchaseStats = newPurchaseStatsProto("key2", 4); + + final DataFetcherClient clickStatsClient = mock(DataFetcherClient.class); + final DataFetcherClient purchaseStatsClient = mock(DataFetcherClient.class); + doReturn(key1purchaseStats).when(purchaseStatsClient).fetchResult(1); + doReturn(key2clickStats).when(clickStatsClient).fetchResult(2); + + final SubscriptionProvider clickStatsProvider = + env -> Flux.just(new ConsumerRecord<>("topic1", 0, 0, 1, key1clickStats)); + final SubscriptionProvider purchaseStatsProvider = + env -> Flux.just(new ConsumerRecord<>("topic2", 0, 0, 2, key2purchaseStats)); + + final Map> fieldClients = Map.of( + "clicks", clickStatsClient, + "purchases", purchaseStatsClient + ); + final Map> fieldSubscribers = Map.of( + "clicks", clickStatsProvider, + "purchases", purchaseStatsProvider + ); + + final List selectedFields = List.of("clicks", "purchases"); + final MultiSubscriptionFetcher fetcher = + new MultiSubscriptionFetcher<>(fieldClients, fieldSubscribers, env -> selectedFields); + + final Publisher> mapPublisher = fetcher.get( + DataFetchingEnvironmentImpl.newDataFetchingEnvironment().build() + ); + + final TestSubscriber testSubscriber = TestSubscriber.create(); + mapPublisher.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); + testSubscriber.assertComplete(); + testSubscriber.assertValueAt(0, Map.of( + "clicks", newClickStatsProto("key1", 1), + "purchases", newPurchaseStatsProto("key1", 2))); + testSubscriber.assertValueAt(1, Map.of( + "purchases", newPurchaseStatsProto("key2", 4), + "clicks", newClickStatsProto("key2", 3))); + } + + private static ClickStatsAvro newClickStatsAvro(final String id, final long amount) { + return ClickStatsAvro.newBuilder().setId(id).setAmount(amount).build(); + } + + private static PurchaseStatsAvro newPurchaseStatsAvro(final String id, final long amount) { + return PurchaseStatsAvro.newBuilder().setId(id).setAmount(amount).build(); + } + + private static ClickStatsProto newClickStatsProto(final String id, final long amount) { + return ClickStatsProto.newBuilder().setId(id).setAmount(amount).build(); + } + + private static PurchaseStatsProto newPurchaseStatsProto(final String id, final long amount) { + return PurchaseStatsProto.newBuilder().setId(id).setAmount(amount).build(); + } }