Skip to content

Commit

Permalink
Add support for deploying range mirror (#61)
Browse files Browse the repository at this point in the history
Fixes #56
  • Loading branch information
raminqaf authored Sep 8, 2022
1 parent 95e0934 commit ac83d34
Show file tree
Hide file tree
Showing 38 changed files with 693 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ public class MirrorCreationData implements CreationData {
String tag;
@Nullable
Duration retentionTime;
boolean point;
@Nullable
String rangeField;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.bakdata.quick.common.api.model.TopicWriteType;
import com.bakdata.quick.common.api.model.manager.GatewaySchema;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.Duration;
import lombok.Value;

Expand All @@ -29,5 +30,9 @@ public class TopicCreationData implements CreationData {
TopicWriteType writeType;
GatewaySchema valueSchema;
GatewaySchema keySchema;
@Nullable
Duration retentionTime;
boolean point;
@Nullable
String rangeField;
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static List<String> convertArgs(final Map<String, String> args) {
/**
* Converts arbitrary key value pairs and a kafka config into cli args.
*
* @param args arguments to convert
* @param args arguments to convert
* @param kafkaConfig bootstrap server and schema registry url
* @return list containing strings formatted "--key=value" and "--brokers=XX" and "--schema-registry-url=XX"
*/
Expand Down
6 changes: 4 additions & 2 deletions docs/docs/developer/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ define the dependencies. The list of dependencies can be found under the

The manager API is specified in [the `openapi` directory](https://github.com/bakdata/quick/tree/master/openapi/spec).
We use the [OpenAPI generator](https://github.com/OpenAPITools/openapi-generator) (Version 4.3.1) to generate our
[python client](https://github.com/bakdata/quick-cli/tree/master/quick_client).
[python client](https://github.com/bakdata/quick-cli/tree/master/quick_client).
To download the OpenAPI JAR,
please visit the [OpenAPI installation documentation](https://openapi-generator.tech/docs/installation/#jar).

```shell
java -jar openapi-generator-cli.jar generate \
-i quick/openapi/spec/Quick-Manager-v1.yaml \
-g python \
-g python \
-o quick-cli/ \
-p=generateSourceCodeOnly=true,packageName=quick_client \
--global-property apiTests=false,modelTests=false,modelDocs=false
Expand Down
47 changes: 24 additions & 23 deletions docs/docs/user/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Kafka

| Environment Variable | Required | Description |
| ----------------------------------------- | ---------------- | ------------------------------------------------ |
|-------------------------------------------|------------------|--------------------------------------------------|
| `QUICK_KAFKA_BOOTSTRAP_SERVER` | :material-check: | Kafka address to connect to |
| `QUICK_KAFKA_SCHEMA_REGISTRY_URL` | :material-check: | Schema Registry URL to connect to |
| `QUICK_KAFKA_APPLICATION_ID` | :material-close: | Application id to use |
Expand All @@ -13,7 +13,7 @@
## Mirror

| Environment Variable | Required | Description |
| --------------------- | ---------------- | ------------------------------------------------------- |
|-----------------------|------------------|---------------------------------------------------------|
| `QUICK_MIRROR_PREFIX` | :material-close: | Prefix of Kubernetes deployments for mirror deployments |
| `QUICK_MIRROR_PATH` | :material-close: | REST subpath where the mirror's API is running |

Expand All @@ -29,7 +29,7 @@
## Topic Registry

| Environment Variable | Required | Description |
| ----------------------------------------- | ---------------- | ---------------------------------------------------------- |
|-------------------------------------------|------------------|------------------------------------------------------------|
| `QUICK_TOPIC_REGISTRY_TOPIC_NAME` | :material-check: | Topic backing the topic registry |
| `QUICK_TOPIC_REGISTRY_SERVICE_NAME` | :material-check: | Service name of the topic registry |
| `QUICK_TOPIC_REGISTRY_PARTITIONS` | :material-check: | Partition count of the topic backing the topic registry |
Expand All @@ -38,28 +38,29 @@

## Deployment

| Environment Variable | Required | Description |
| ------------------------------------- | ---------------- | ------------------------------------------------------------------------------------- |
| `QUICK_DOCKER_REGISTRY` | :material-check: | Docker registry for use Quick images |
| `QUICK_DEFAULT_IMAGE_TAG` | :material-check: | Default image tag of Quick images to deploy |
| `QUICK_DEFAULT_REPLICAS` | :material-check: | Default amount of replicas for Quick deployments |
| `QUICK_INGRESS_HOST` | :material-close: | Host for Kubernetes Ingress objects for gateways |
| `QUICK_INGRESS_SSL` | :material-check: | Flag indicating whether the ingress should use SSL |
| `QUICK_INGRESS_ENTRYPOINT` | :material-check: | Traefik's entrypoint for ingress |
| `QUICK_MANAGER_UPDATE_MANAGED_IMAGES` | :material-check: | Flag indicating whether the manager should ensure deployments have the same image tag |
| `QUICK_MANAGER_CREATE_TOPIC_REGISTRY` | :material-check: | Flag if manager should deploy a topic registry |

## Resources

| Environment Variable | Required | Description |
| -------------------------------------------- | ---------------- | ------------------------------ |
| `QUICK_APPLICATION_RESOURCES_MEMORY_LIMIT` | :material-close: | Memory limit for deployments |
| `QUICK_APPLICATION_RESOURCES_MEMORY_REQUEST` | :material-close: | Memory request for deployments |
| `QUICK_APPLICATION_RESOURCES_CPU_LIMIT` | :material-close: | Cpu limit for deployments |
| `QUICK_APPLICATION_RESOURCES_CPU_REQUEST` | :material-close: | Cpu requests for deployments |
| Environment Variable | Required | Description |
|---------------------------------------------|------------------|----------------------------------------------------------------------------------------------------------------|
| `QUICK_DOCKER_REGISTRY` | :material-check: | Docker registry for use Quick images |
| `QUICK_DEFAULT_IMAGE_TAG` | :material-check: | Default image tag of Quick images to deploy |
| `QUICK_DEFAULT_REPLICAS` | :material-check: | Default amount of replicas for Quick deployments |
| `QUICK_INGRESS_HOST` | :material-close: | Host for Kubernetes Ingress objects for gateways |
| `QUICK_INGRESS_SSL` | :material-check: | Flag indicating whether the ingress should use SSL |
| `QUICK_INGRESS_ENTRYPOINT` | :material-check: | Traefik's entrypoint for ingress |
| `QUICK_MANAGER_UPDATE_MANAGED_IMAGES` | :material-check: | Flag indicating whether the manager should ensure deployments have the same image tag |
| `QUICK_MANAGER_CREATE_TOPIC_REGISTRY` | :material-check: | Flag if manager should deploy a topic registry |

## Applications Specification

| Environment Variable | Required | Description |
|----------------------------------------------------|------------------|--------------------------------------------------------------------------------------------------------------|
| `QUICK_APPLICATIONS_SPEC_IMAGE_PULL_POLICY` | :material-close: | Image pull policy of the deployed applications by Quick. Valid values: Always (default), IfNotPresent, Never |
| `QUICK_APPLICATIONS_SPEC_RESOURCES_MEMORY_LIMIT` | :material-close: | Memory limit for deployments |
| `QUICK_APPLICATIONS_SPEC_RESOURCES_MEMORY_REQUEST` | :material-close: | Memory request for deployments |
| `QUICK_APPLICATIONS_SPEC_RESOURCES_CPU_LIMIT` | :material-close: | Cpu limit for deployments |
| `QUICK_APPLICATIONS_SPEC_RESOURCES_CPU_REQUEST` | :material-close: | Cpu requests for deployments |

## Gateway

| Environment Variable | Required | Description |
| -------------------- | ---------------- | ----------------------------------------- |
|----------------------|------------------|-------------------------------------------|
| `QUICK_SCHEMA_PATH` | :material-check: | The path where the schema file is located |
2 changes: 1 addition & 1 deletion docs/overrides/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
}

/* Moves the search results to the front */
.md-search {
.md-header {
z-index: 10;
}

Expand Down
2 changes: 1 addition & 1 deletion e2e/functional/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ There are three scenarios tested: </br>
**NOTE:** Subscription tests are currently skipped by bats. (WIP)

## Prerequisite
You can specify the quick-cli version when you are building the image through the argument `QUICK_CLI_VERSION`. The default version is `0.4.0`
You can specify the quick-cli version when you are building the image through the argument `QUICK_CLI_VERSION`. The default version is `0.7.0`

```bash
docker build --build-arg QUICK_CLI_VERSION=<Version> -t quick-e2e-test-runner:<TAG> .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
* Data fetcher for subscribing to multiple topics.
*
* <p>
* This data fetcher works by subscribing to multiple topics with a {@link KafkaSubscriptionProvider}.
* When a new query comes in, it subscribes to the Kafka Subscriber for all selected fields.
* Whenever a subscriber emits an event, the data for the other selected fields is fetched through the mirror.
* This data fetcher works by subscribing to multiple topics with a {@link KafkaSubscriptionProvider}. When a new query
* comes in, it subscribes to the Kafka Subscriber for all selected fields. Whenever a subscriber emits an event, the
* data for the other selected fields is fetched through the mirror.
*/
@Slf4j
public class MultiSubscriptionFetcher implements DataFetcher<Publisher<Map<String, Object>>> {
Expand All @@ -54,8 +54,8 @@ public class MultiSubscriptionFetcher implements DataFetcher<Publisher<Map<Strin
* A cache for all field values.
*
* <p>
* The subscriptions update the values, so that we only need to fetch the values from the mirror in case the
* kafka didn't yield a value for this field yet.
* The subscriptions update the values, so that we only need to fetch the values from the mirror in case the kafka
* didn't yield a value for this field yet.
*/
private final AsyncLoadingCache<FieldKey<?>, Object> fieldCache;
private final Map<String, SubscriptionProvider<?, ?>> fieldSubscriptionProviders;
Expand All @@ -64,25 +64,25 @@ public class MultiSubscriptionFetcher implements DataFetcher<Publisher<Map<Strin
/**
* Default constructor.
*
* @param fieldDataFetcherClients map of fields to their fetching clients
* @param fieldDataFetcherClients map of fields to their fetching clients
* @param fieldSubscriptionProviders map of field to their subscription providers
*/
public MultiSubscriptionFetcher(final Map<String, DataFetcherClient<?>> fieldDataFetcherClients,
final Map<String, SubscriptionProvider<?, ?>> fieldSubscriptionProviders) {
final Map<String, SubscriptionProvider<?, ?>> fieldSubscriptionProviders) {
this(fieldDataFetcherClients, fieldSubscriptionProviders, MultiSubscriptionFetcher::getSelectedFields);
}

/**
* Constructor with custom field selection for testing purposes.
*
* @param fieldDataFetcherClients map of fields to their fetching clients
* @param fieldDataFetcherClients map of fields to their fetching clients
* @param fieldSubscriptionProviders map of field to their subscription providers
* @param fieldSelector function extracting the selected fields of a GraphQL environment
* @param fieldSelector function extracting the selected fields of a GraphQL environment
*/
@VisibleForTesting
MultiSubscriptionFetcher(final Map<String, DataFetcherClient<?>> fieldDataFetcherClients,
final Map<String, SubscriptionProvider<?, ?>> fieldSubscriptionProviders,
final FieldSelector fieldSelector) {
final Map<String, SubscriptionProvider<?, ?>> fieldSubscriptionProviders,
final FieldSelector fieldSelector) {
this.fieldDataFetcherClients = fieldDataFetcherClients;
this.fieldSubscriptionProviders = fieldSubscriptionProviders;
this.fieldSelector = fieldSelector;
Expand All @@ -102,9 +102,9 @@ public Publisher<Map<String, Object>> get(final DataFetchingEnvironment environm
*
* <p>
* Nested types are qualified by their parent and then a /, i.e., parent/child (see
* {@link graphql.schema.DataFetchingFieldSelectionSet}).
* In this setting, we're only interested in the first level of fields. Deeper levels are expected to be part of
* the returned Kafka value itself. Therefore, child wouldn't be part of the returned list.
* {@link graphql.schema.DataFetchingFieldSelectionSet}). In this setting, we're only interested in the first level
* of fields. Deeper levels are expected to be part of the returned Kafka value itself. Therefore, child wouldn't be
* part of the returned list.
*
* @param environment environment of the current request
* @return list of root field names returned by this fetcher
Expand All @@ -128,18 +128,19 @@ private static List<String> getSelectedFields(final DataFetchingEnvironment envi
* We can also cache it since we get all updates.</il>
* </ol>
*
* @param namedRecord the record we got from Kafka
* @param namedRecord the record we got from Kafka
* @param selectedFields the fields selected by this query
* @return a map representing the selected object
*/
private Mono<Map<String, Object>> createComplexType(final NamedRecord<?, ?> namedRecord,
final List<String> selectedFields) {
final List<String> selectedFields) {
// map holding the data for current key
final Map<String, Object> complexType = new HashMap<>();
complexType.put(namedRecord.getFieldName(), namedRecord.getConsumerRecord().value());

final FieldKey<?> key = new FieldKey<>(namedRecord.getFieldName(), namedRecord.getConsumerRecord().key());
final CompletableFuture<?> recordValue = CompletableFuture.completedFuture(namedRecord.getConsumerRecord().value());
final CompletableFuture<?> recordValue =
CompletableFuture.completedFuture(namedRecord.getConsumerRecord().value());
log.info("Update {} with {}", key, namedRecord.getConsumerRecord().value());
this.fieldCache.put(key, recordValue);

Expand Down Expand Up @@ -171,7 +172,7 @@ private Object loadField(final FieldKey<?> fieldKey) {
}

private Flux<? extends NamedRecord<?, ?>> combineElementStreams(final List<String> selectedFields,
final DataFetchingEnvironment env) {
final DataFetchingEnvironment env) {
final List<Flux<? extends NamedRecord<?, ?>>> fluxes = selectedFields.stream()
.map(name -> {
final SubscriptionProvider<?, ?> kafkaSubscriber = this.fieldSubscriptionProviders.get(name);
Expand Down
Loading

0 comments on commit ac83d34

Please sign in to comment.