Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add range to topic directive with range data fetcher #62

Merged
merged 26 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6ff5221
Add range to topic directive with range data fetcher
raminqaf Aug 22, 2022
be5247d
small changes
raminqaf Aug 23, 2022
d5de0ec
Add javadocs
raminqaf Aug 23, 2022
c27f78e
add execution test
raminqaf Aug 23, 2022
6d4e23e
add test for mirror host
raminqaf Aug 24, 2022
e5a46fd
Add validation rules
raminqaf Aug 25, 2022
fe6f8f3
Merge branch 'master' into feature/gateway/add-range-data-fetcher
raminqaf Sep 1, 2022
4bbde70
Merge branch 'master' into feature/gateway/add-range-data-fetcher
raminqaf Sep 5, 2022
71e5723
fix mirror host test
raminqaf Sep 6, 2022
9a6ba8a
Add E2E test for gateway
raminqaf Sep 6, 2022
b18da6c
Add range to field
raminqaf Sep 6, 2022
6a862c6
refactor test
raminqaf Sep 6, 2022
4844dc4
Merge branch 'master' of github.com:bakdata/quick into feature/gatewa…
raminqaf Sep 6, 2022
4a57a99
fix reange query test
raminqaf Sep 7, 2022
0bd64ab
revert range on field
raminqaf Sep 7, 2022
650df67
Update shouldNotCovertIfRangeIsDefinedOnField.graphql
raminqaf Sep 7, 2022
82a9e3c
Merge branch 'master' into feature/gateway/add-range-data-fetcher
raminqaf Sep 8, 2022
8c9d485
Merge branch 'master' into feature/gateway/add-range-data-fetcher
raminqaf Sep 8, 2022
0ea8b0a
Add reviews
raminqaf Sep 8, 2022
6409d27
Merge branch 'feature/gateway/add-range-data-fetcher' of github.com:b…
raminqaf Sep 8, 2022
6951ab8
Add checkstyle
raminqaf Sep 8, 2022
3d6ebc5
Update files
raminqaf Sep 8, 2022
ae02c43
Update user-requests.json
raminqaf Sep 8, 2022
3b7bb75
Add reviews
raminqaf Sep 9, 2022
9db48bf
Update range URL
raminqaf Sep 12, 2022
0d146b9
Update gradle.properties
raminqaf Sep 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,27 @@ public class DefaultMirrorClient<K, V> implements MirrorClient<K, V> {
/**
* Constructor for the client.
*
* @param topicName name of the topic the mirror is deployed
* @param client http client
* @param mirrorConfig configuration of the mirror host
* @param topicName name of the topic the mirror is deployed
* @param client http client
* @param mirrorConfig configuration of the mirror host
* @param valueResolver the value's {@link TypeResolver}
* @param requestManager a manager for sending requests to the mirror and processing responses
*/
public DefaultMirrorClient(final String topicName, final HttpClient client, final MirrorConfig mirrorConfig,
final TypeResolver<V> valueResolver, final MirrorRequestManager requestManager) {
final TypeResolver<V> valueResolver, final MirrorRequestManager requestManager) {
this(new MirrorHost(topicName, mirrorConfig), client, valueResolver, requestManager);
}

/**
* Constructor that can be used when the mirror client is based on an IP or other non-standard host.
*
* @param mirrorHost host to use
* @param client http client
* @param mirrorHost host to use
* @param client http client
* @param typeResolver the value's {@link TypeResolver}
* @param requestManager a manager for sending requests to the mirror and processing responses
*/
public DefaultMirrorClient(final MirrorHost mirrorHost, final HttpClient client, final TypeResolver<V> typeResolver,
final MirrorRequestManager requestManager) {
final MirrorRequestManager requestManager) {
this.host = mirrorHost;
this.parser = new MirrorValueParser<>(typeResolver, client.objectMapper());
this.mirrorRequestManager = requestManager;
Expand Down Expand Up @@ -91,6 +91,17 @@ public List<V> fetchValues(final List<K> keys) {
Collections.emptyList());
}

@Override
@Nullable
public List<V> fetchRange(final K key, final String from, final String rangeTo) {
final ResponseWrapper response = this.mirrorRequestManager.makeRequest(
this.host.forRange(key.toString(), from, rangeTo)
);
return Objects.requireNonNullElse(
this.mirrorRequestManager.processResponse(response, this.parser::deserializeList),
Collections.emptyList());
}

@Override
public boolean exists(final K key) {
return this.fetchValue(key) != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public interface MirrorClient<K, V> {

/**
* fetches the value of the given key from the mirror topic.
* Fetches the value of the given key from the mirror topic.
*
* @param key a key to be fetched
* @return a list of values. If the requested mirror responds with a NOT_FOUND code the function returns null.
Expand All @@ -38,14 +38,14 @@ public interface MirrorClient<K, V> {
V fetchValue(final K key);

/**
* fetches all the values of a mirror topic.
* Fetches all the values of a mirror topic.
*
* @return returns a list of all values in a topic. null.
*/
List<V> fetchAll();

/**
* fetches the values of a list of keys from the mirror topic.
* Fetches the values of a list of keys from the mirror topic.
*
* @param keys list of keys to be fetched
* @return a list of values. If the requested mirror responds with a NOT_FOUND code the function returns null.
Expand All @@ -54,7 +54,18 @@ public interface MirrorClient<K, V> {
List<V> fetchValues(final List<K> keys);

/**
* checks if a key exists in mirror topic.
* Fetches a range of a given key from the mirror topic.
*
* @param key a key to be fetched
* @param from lower bound of the range field
* @param to higher bound of the range field
* @return a list of values.
*/
@Nullable
List<V> fetchRange(final K key, final String from, final String to);

/**
* Checks if a key exists in mirror topic.
*
* @return True/False if key exists in mirror topic
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.kafka.common.serialization.Serde;

/**
* MirrorClient that has access to information about partition-host mapping.
* This information enables it to efficiently route requests in the case when there is more than one mirror replica.
* MirrorClient that has access to information about partition-host mapping. This information enables it to efficiently
* route requests in the case when there is more than one mirror replica.
*
* @param <K> key type
* @param <V> value type
Expand All @@ -58,19 +58,18 @@ public class PartitionedMirrorClient<K, V> implements MirrorClient<K, V> {
private final List<MirrorHost> knownHosts;

/**
* Next to its default task of instantiation PartitionHost, it takes responsibility for
* creating several business objects and initializing the PartitionRouter with a mapping
* retrieved from StreamController.
* Next to its default task of instantiation PartitionHost, it takes responsibility for creating several business
* objects and initializing the PartitionRouter with a mapping retrieved from StreamController.
*
* @param mirrorHost mirror host to use
* @param client http client
* @param keySerde the serde for the key
* @param valueResolver the value's {@link TypeResolver}
* @param mirrorHost mirror host to use
* @param client http client
* @param keySerde the serde for the key
* @param valueResolver the value's {@link TypeResolver}
* @param partitionFinder strategy for finding partitions
*/
public PartitionedMirrorClient(final MirrorHost mirrorHost, final HttpClient client,
final Serde<K> keySerde, final TypeResolver<V> valueResolver,
final PartitionFinder partitionFinder) {
final Serde<K> keySerde, final TypeResolver<V> valueResolver,
final PartitionFinder partitionFinder) {
this.topicName = mirrorHost.getHost();
this.streamsStateHost = StreamsStateHost.fromMirrorHost(mirrorHost);
this.client = client;
Expand Down Expand Up @@ -121,14 +120,27 @@ public List<V> fetchValues(final List<K> keys) {
return keys.stream().map(this::fetchValue).collect(Collectors.toList());
}

@Override
@Nullable
public List<V> fetchRange(final K key, final String from, final String to) {
final MirrorHost currentKeyHost = Objects.requireNonNull(this.router.findHost(key),
String.format("Could not find the a Mirror host for key %s", key));
final ResponseWrapper response = this.requestManager
.makeRequest(Objects.requireNonNull(currentKeyHost).forRange(key.toString(), from, to));
if (response.isUpdateCacheHeaderSet()) {
log.debug("The update header has been set. Updating router info.");
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
this.updateRouterInfo();
}
return this.requestManager.processResponse(response, this.parser::deserializeList);
}

@Override
public boolean exists(final K key) {
return this.fetchValue(key) != null;
}

/**
* Responsible for fetching the information about the partition - host mapping from
* the mirror.
* Responsible for fetching the information about the partition - host mapping from the mirror.
*
* @return a mapping between a partition (a number) and a corresponding host
*/
Expand All @@ -141,11 +153,9 @@ private Map<Integer, String> makeRequestForPartitionHostMapping() {
}
final Map<Integer, String> partitionHostMappingResponse = this.client.objectMapper()
.readValue(responseBody.byteStream(), MAP_TYPE_REFERENCE);
if (log.isInfoEnabled()) {
log.info("Collected information about the partitions and hosts."
+ " There are {} partitions and {} distinct hosts", partitionHostMappingResponse.size(),
(int) partitionHostMappingResponse.values().stream().distinct().count());
}
log.info("Collected information about the partitions and hosts."
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
+ " There are {} partitions and {} distinct hosts", partitionHostMappingResponse.size(),
(int) partitionHostMappingResponse.values().stream().distinct().count());
return partitionHostMappingResponse;
} catch (final IOException exception) {
throw new MirrorException("There was a problem handling the response: ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class MirrorHost {
/**
* Default constructor.
*
* @param host the host of the mirror. This can be a service name or an IP.
* @param host the host of the mirror. This can be a service name or an IP.
* @param config mirror config to use. This can set the service prefix and REST path.
*/
public MirrorHost(final String host, final MirrorConfig config) {
Expand Down Expand Up @@ -70,6 +70,14 @@ public String forAll() {
return url;
}

/**
* Generates a URL for fetching a range of keys
*/
public String forRange(final String key, final String from, final String to) {
return String.format("http://%s%s/%s/%s?from=%s&to=%s", this.config.getPrefix(), this.host,
this.config.getPath(), key, from, to);
}

/**
* Generates a URL without any keys.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2022 bakdata GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bakdata.quick.common.api.model.mirror;

import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.quick.common.config.MirrorConfig;
import java.util.List;
import org.junit.jupiter.api.Test;

class MirrorHostTest {

private static final String MIRROR_HOST_PREFIX = "quick-mirror";
private static final String MIRROR_HOST_PATH = "mirror";

@Test
void shouldConstructCorrectUrlForKeyRequest() {
final MirrorHost mirrorHost = new MirrorHost("test-for-key", new MirrorConfig());
final String actual = mirrorHost.forKey("give-me-key");
final String url = "http://%s-test-for-key/%s/give-me-key";
assertThat(actual).isEqualTo(String.format(url, MIRROR_HOST_PREFIX, MIRROR_HOST_PATH));
}

@Test
void shouldConstructCorrectUrlForKeysRequest() {
final MirrorHost mirrorHost = new MirrorHost("test-for-keys", new MirrorConfig());
final String actual = mirrorHost.forKeys(List.of("test-1", "test-2", "test-3"));
final String url = "http://%s-test-for-keys/%s/keys?ids=test-1,test-2,test-3";
assertThat(actual).isEqualTo(String.format(url, MIRROR_HOST_PREFIX, MIRROR_HOST_PATH));
}

@Test
void shouldConstructCorrectUrlForAllRequest() {
final MirrorHost mirrorHost = new MirrorHost("test-for-all", new MirrorConfig());
final String actual = mirrorHost.forAll();
final String url = "http://%s-test-for-all/%s";
assertThat(actual).isEqualTo(String.format(url, MIRROR_HOST_PREFIX, MIRROR_HOST_PATH));
}

@Test
void shouldConstructCorrectUrlForRangeRequest() {
final MirrorHost mirrorHost = new MirrorHost("test-for-rage", new MirrorConfig());
final String actual = mirrorHost.forRange("test-key", "range-field-from", "range-field-to");
final String url = "http://%s-test-for-rage/%s/%s?from=%s&to=%s";
assertThat(actual).isEqualTo(
String.format(url, MIRROR_HOST_PREFIX, MIRROR_HOST_PATH, "test-key", "range-field-from", "range-field-to"));
}
}
8 changes: 8 additions & 0 deletions e2e/functional/range/query-range.gql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
query {
userRequests(userId: 123, timestampFrom:1, timestampTo:3) {
userId
serviceId
requests
success
}
}
20 changes: 20 additions & 0 deletions e2e/functional/range/result-range.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"userId": 123,
"serviceId": "abc",
"requests": 5,
"success": 2
},
{
"userId": 123,
"serviceId": "def",
"requests": 8,
"success": 4
},
{
"userId": 123,
"serviceId": "def",
"requests": 6,
"success": 5
}
]
19 changes: 19 additions & 0 deletions e2e/functional/range/schema.gql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
type Query {
userRequests(
userId: Int
timestampFrom: Int
timestampTo: Int
): [UserRequests]
@topic(name: "user-request-range",
keyArgument: "userId",
rangeFrom: "timestampFrom",
rangeTo: "timestampTo")
}

type UserRequests {
userId: Int
serviceId: Int
timestamp: Int
requests: Int
success: Int
}
66 changes: 66 additions & 0 deletions e2e/functional/range/tests.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env ./test/libs/bats/bin/bats
# shellcheck shell=bats

CONTENT_TYPE="content-type:application/json"
API_KEY="X-API-Key:${X_API_KEY}"
TOPIC="user-request-range"
TYPE="UserRequests"
GATEWAY="range-gateway-test"
INGEST_URL="${HOST}/ingest/${TOPIC}"
GRAPHQL_URL="${HOST}/gateway/${GATEWAY}/graphql"
GRAPHQL_CLI="gql-cli ${GRAPHQL_URL} -H ${API_KEY}"


setup() {
if [ "$BATS_TEST_NUMBER" -eq 1 ]; then
printf "creating context for %s\n" "$HOST"
printf "with API_KEY: %s\n" "${X_API_KEY}"
quick context create --host "${HOST}" --key "${X_API_KEY}"
fi
}

@test "should deploy product-gateway" {
run quick gateway create ${GATEWAY}
echo "$output"
sleep 30
[ "$status" -eq 0 ]
[ "$output" = "Create gateway ${GATEWAY} (this may take a few seconds)" ]
}

@test "should apply schema to range-gateway" {
run quick gateway apply ${GATEWAY} -f schema.gql
echo "$output"
[ "$status" -eq 0 ]
[ "$output" = "Applied schema to gateway ${GATEWAY}" ]
}

#TODO: Remove skip after implementing the Mirror and query service
@test "should create user-request-range topic with key integer and value schema" {
skip
run quick topic create ${TOPIC} --key-type integer --value-type schema --schema "${GATEWAY}.${TYPE}" --range-field timestamp --point
echo "$output"
[ "$status" -eq 0 ]
[ "$output" = "Created new topic ${TOPIC}" ]
}

@test "should ingest valid data in user-request-range" {
skip
curl --request POST --url "${INGEST_URL}" --header "${CONTENT_TYPE}" --header "${API_KEY}" --data "@./user-requests.json"
}

@test "should retrieve range of inserted items" {
skip
sleep 30
result="$(${GRAPHQL_CLI} < query-single.gql | jq -j .userRequests)"
expected="$(cat result-single.json)"
echo "$result"
[ "$result" = "$expected" ]
}

teardown() {
if [[ "${#BATS_TEST_NAMES[@]}" -eq "$BATS_TEST_NUMBER" ]]; then
quick gateway delete ${GATEWAY}
#TODO: Uncomment the topic deletion after the Range Mirror implementation is ready
# quick topic delete ${TOPIC}
fi
}
Loading