Skip to content

Commit

Permalink
Add range to topic directive with range data fetcher (#62)
Browse files Browse the repository at this point in the history
Closes #57 and #59
  • Loading branch information
raminqaf authored Sep 15, 2022
1 parent c41d7df commit c8778ce
Show file tree
Hide file tree
Showing 56 changed files with 1,167 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,27 @@ public class DefaultMirrorClient<K, V> implements MirrorClient<K, V> {
/**
* Constructor for the client.
*
* @param topicName name of the topic the mirror is deployed
* @param client http client
* @param mirrorConfig configuration of the mirror host
* @param topicName name of the topic the mirror is deployed
* @param client http client
* @param mirrorConfig configuration of the mirror host
* @param valueResolver the value's {@link TypeResolver}
* @param requestManager a manager for sending requests to the mirror and processing responses
*/
public DefaultMirrorClient(final String topicName, final HttpClient client, final MirrorConfig mirrorConfig,
final TypeResolver<V> valueResolver, final MirrorRequestManager requestManager) {
final TypeResolver<V> valueResolver, final MirrorRequestManager requestManager) {
this(new MirrorHost(topicName, mirrorConfig), client, valueResolver, requestManager);
}

/**
* Constructor that can be used when the mirror client is based on an IP or other non-standard host.
*
* @param mirrorHost host to use
* @param client http client
* @param mirrorHost host to use
* @param client http client
* @param typeResolver the value's {@link TypeResolver}
* @param requestManager a manager for sending requests to the mirror and processing responses
*/
public DefaultMirrorClient(final MirrorHost mirrorHost, final HttpClient client, final TypeResolver<V> typeResolver,
final MirrorRequestManager requestManager) {
final MirrorRequestManager requestManager) {
this.host = mirrorHost;
this.parser = new MirrorValueParser<>(typeResolver, client.objectMapper());
this.mirrorRequestManager = requestManager;
Expand Down Expand Up @@ -91,6 +91,17 @@ public List<V> fetchValues(final List<K> keys) {
Collections.emptyList());
}

@Override
@Nullable
public List<V> fetchRange(final K key, final String from, final String rangeTo) {
final ResponseWrapper response = this.mirrorRequestManager.makeRequest(
this.host.forRange(key.toString(), from, rangeTo)
);
return Objects.requireNonNullElse(
this.mirrorRequestManager.processResponse(response, this.parser::deserializeList),
Collections.emptyList());
}

@Override
public boolean exists(final K key) {
return this.fetchValue(key) != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public interface MirrorClient<K, V> {

/**
* fetches the value of the given key from the mirror topic.
* Fetches the value of the given key from the mirror topic.
*
* @param key a key to be fetched
* @return a list of values. If the requested mirror responds with a NOT_FOUND code the function returns null.
Expand All @@ -38,14 +38,14 @@ public interface MirrorClient<K, V> {
V fetchValue(final K key);

/**
* fetches all the values of a mirror topic.
* Fetches all the values of a mirror topic.
*
* @return returns a list of all values in a topic. null.
*/
List<V> fetchAll();

/**
* fetches the values of a list of keys from the mirror topic.
* Fetches the values of a list of keys from the mirror topic.
*
* @param keys list of keys to be fetched
* @return a list of values. If the requested mirror responds with a NOT_FOUND code the function returns null.
Expand All @@ -54,7 +54,18 @@ public interface MirrorClient<K, V> {
List<V> fetchValues(final List<K> keys);

/**
* checks if a key exists in mirror topic.
* Fetches a range of a given key from the mirror topic.
*
* @param key a key to be fetched
* @param from lower bound of the range field
* @param to higher bound of the range field
* @return a list of values.
*/
@Nullable
List<V> fetchRange(final K key, final String from, final String to);

/**
* Checks if a key exists in mirror topic.
*
* @return True/False if key exists in mirror topic
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.kafka.common.serialization.Serde;

/**
* MirrorClient that has access to information about partition-host mapping.
* This information enables it to efficiently route requests in the case when there is more than one mirror replica.
* MirrorClient that has access to information about partition-host mapping. This information enables it to efficiently
* route requests in the case when there is more than one mirror replica.
*
* @param <K> key type
* @param <V> value type
Expand All @@ -58,19 +58,18 @@ public class PartitionedMirrorClient<K, V> implements MirrorClient<K, V> {
private final List<MirrorHost> knownHosts;

/**
* Next to its default task of instantiation PartitionHost, it takes responsibility for
* creating several business objects and initializing the PartitionRouter with a mapping
* retrieved from StreamController.
* Next to its default task of instantiation PartitionHost, it takes responsibility for creating several business
* objects and initializing the PartitionRouter with a mapping retrieved from StreamController.
*
* @param mirrorHost mirror host to use
* @param client http client
* @param keySerde the serde for the key
* @param valueResolver the value's {@link TypeResolver}
* @param mirrorHost mirror host to use
* @param client http client
* @param keySerde the serde for the key
* @param valueResolver the value's {@link TypeResolver}
* @param partitionFinder strategy for finding partitions
*/
public PartitionedMirrorClient(final MirrorHost mirrorHost, final HttpClient client,
final Serde<K> keySerde, final TypeResolver<V> valueResolver,
final PartitionFinder partitionFinder) {
final Serde<K> keySerde, final TypeResolver<V> valueResolver,
final PartitionFinder partitionFinder) {
this.topicName = mirrorHost.getHost();
this.streamsStateHost = StreamsStateHost.fromMirrorHost(mirrorHost);
this.client = client;
Expand Down Expand Up @@ -121,14 +120,29 @@ public List<V> fetchValues(final List<K> keys) {
return keys.stream().map(this::fetchValue).collect(Collectors.toList());
}

@Override
@Nullable
public List<V> fetchRange(final K key, final String from, final String to) {
final MirrorHost currentKeyHost = Objects.requireNonNull(this.router.findHost(key),
String.format("Could not find the a Mirror host %s for key %s", this.topicName, key));
final ResponseWrapper response = this.requestManager
.makeRequest(Objects.requireNonNull(currentKeyHost).forRange(key.toString(), from, to));
if (response.isUpdateCacheHeaderSet()) {
log.debug("The update header has been set for host {} and key {}. Updating router info.",
this.topicName,
key);
this.updateRouterInfo();
}
return this.requestManager.processResponse(response, this.parser::deserializeList);
}

@Override
public boolean exists(final K key) {
return this.fetchValue(key) != null;
}

/**
* Responsible for fetching the information about the partition - host mapping from
* the mirror.
* Responsible for fetching the information about the partition - host mapping from the mirror.
*
* @return a mapping between a partition (a number) and a corresponding host
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class MirrorHost {
/**
* Default constructor.
*
* @param host the host of the mirror. This can be a service name or an IP.
* @param host the host of the mirror. This can be a service name or an IP.
* @param config mirror config to use. This can set the service prefix and REST path.
*/
public MirrorHost(final String host, final MirrorConfig config) {
Expand Down Expand Up @@ -70,6 +70,14 @@ public String forAll() {
return url;
}

/**
* Generates a URL for fetching a range of keys.
*/
public String forRange(final String key, final String from, final String to) {
return String.format("http://%s%s/%s/range/%s?from=%s&to=%s", this.config.getPrefix(), this.host,
this.config.getPath(), key, from, to);
}

/**
* Generates a URL without any keys.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2022 bakdata GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bakdata.quick.common.api.model.mirror;

import static org.assertj.core.api.Assertions.assertThat;

import com.bakdata.quick.common.config.MirrorConfig;
import java.util.List;
import org.junit.jupiter.api.Test;

class MirrorHostTest {

private static final String MIRROR_HOST_PREFIX = "quick-mirror";
private static final String MIRROR_HOST_PATH = "mirror";

@Test
void shouldConstructCorrectUrlForKeyRequest() {
final MirrorHost mirrorHost = new MirrorHost("test-for-key", new MirrorConfig());
final String actual = mirrorHost.forKey("give-me-key");
final String url = "http://%s-test-for-key/%s/give-me-key";
final String expected = String.format(url, MIRROR_HOST_PREFIX, MIRROR_HOST_PATH);
assertThat(actual).isEqualTo(expected);
}

@Test
void shouldConstructCorrectUrlForKeysRequest() {
final MirrorHost mirrorHost = new MirrorHost("test-for-keys", new MirrorConfig());
final String actual = mirrorHost.forKeys(List.of("test-1", "test-2", "test-3"));
final String url = "http://%s-test-for-keys/%s/keys?ids=test-1,test-2,test-3";
final String expected = String.format(url, MIRROR_HOST_PREFIX, MIRROR_HOST_PATH);
assertThat(actual).isEqualTo(expected);
}

@Test
void shouldConstructCorrectUrlForAllRequest() {
final MirrorHost mirrorHost = new MirrorHost("test-for-all", new MirrorConfig());
final String actual = mirrorHost.forAll();
final String url = "http://%s-test-for-all/%s";
final String expected = String.format(url, MIRROR_HOST_PREFIX, MIRROR_HOST_PATH);
assertThat(actual).isEqualTo(expected);
}

@Test
void shouldConstructCorrectUrlForRangeRequest() {
final MirrorHost mirrorHost = new MirrorHost("test-for-rage", new MirrorConfig());
final String actual = mirrorHost.forRange("test-key", "range-field-from", "range-field-to");
final String url = "http://%s-test-for-rage/%s/range/%s?from=%s&to=%s";
final String expected =
String.format(url, MIRROR_HOST_PREFIX, MIRROR_HOST_PATH, "test-key", "range-field-from", "range-field-to");
assertThat(actual).isEqualTo(expected);
}
}
8 changes: 8 additions & 0 deletions e2e/functional/range/query-range.gql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
query {
userRequests(userId: 123, timestampFrom:1, timestampTo:3) {
userId
serviceId
requests
success
}
}
20 changes: 20 additions & 0 deletions e2e/functional/range/result-range.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"userId": 123,
"serviceId": "abc",
"requests": 5,
"success": 2
},
{
"userId": 123,
"serviceId": "def",
"requests": 8,
"success": 4
},
{
"userId": 123,
"serviceId": "def",
"requests": 6,
"success": 5
}
]
19 changes: 19 additions & 0 deletions e2e/functional/range/schema.gql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
type Query {
userRequests(
userId: Int
timestampFrom: Int
timestampTo: Int
): [UserRequests]
@topic(name: "user-request-range",
keyArgument: "userId",
rangeFrom: "timestampFrom",
rangeTo: "timestampTo")
}

type UserRequests {
userId: Int
serviceId: Int
timestamp: Int
requests: Int
success: Int
}
66 changes: 66 additions & 0 deletions e2e/functional/range/tests.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env ./test/libs/bats/bin/bats
# shellcheck shell=bats

CONTENT_TYPE="content-type:application/json"
API_KEY="X-API-Key:${X_API_KEY}"
TOPIC="user-request-range"
TYPE="UserRequests"
GATEWAY="range-gateway-test"
INGEST_URL="${HOST}/ingest/${TOPIC}"
GRAPHQL_URL="${HOST}/gateway/${GATEWAY}/graphql"
GRAPHQL_CLI="gql-cli ${GRAPHQL_URL} -H ${API_KEY}"


setup() {
if [ "$BATS_TEST_NUMBER" -eq 1 ]; then
printf "creating context for %s\n" "$HOST"
printf "with API_KEY: %s\n" "${X_API_KEY}"
quick context create --host "${HOST}" --key "${X_API_KEY}"
fi
}

@test "should deploy product-gateway" {
run quick gateway create ${GATEWAY}
echo "$output"
sleep 30
[ "$status" -eq 0 ]
[ "$output" = "Create gateway ${GATEWAY} (this may take a few seconds)" ]
}

@test "should apply schema to range-gateway" {
run quick gateway apply ${GATEWAY} -f schema.gql
echo "$output"
[ "$status" -eq 0 ]
[ "$output" = "Applied schema to gateway ${GATEWAY}" ]
}

#TODO: Remove skip after implementing the Mirror and query service
@test "should create user-request-range topic with key integer and value schema" {
skip
run quick topic create ${TOPIC} --key-type integer --value-type schema --schema "${GATEWAY}.${TYPE}" --range-field timestamp --point
echo "$output"
[ "$status" -eq 0 ]
[ "$output" = "Created new topic ${TOPIC}" ]
}

@test "should ingest valid data in user-request-range" {
skip
curl --request POST --url "${INGEST_URL}" --header "${CONTENT_TYPE}" --header "${API_KEY}" --data "@./user-requests.json"
}

@test "should retrieve range of inserted items" {
skip
sleep 30
result="$(${GRAPHQL_CLI} < query-single.gql | jq -j .userRequests)"
expected="$(cat result-single.json)"
echo "$result"
[ "$result" = "$expected" ]
}

teardown() {
if [[ "${#BATS_TEST_NAMES[@]}" -eq "$BATS_TEST_NUMBER" ]]; then
quick gateway delete ${GATEWAY}
#TODO: Uncomment the topic deletion after the Range Mirror implementation is ready
# quick topic delete ${TOPIC}
fi
}
Loading

0 comments on commit c8778ce

Please sign in to comment.