From c03f0f4d89338496876eeb6a5a4c959e7cafdb28 Mon Sep 17 00:00:00 2001 From: dennis deng <599166320@qq.com> Date: Thu, 6 Jul 2023 21:56:49 +0800 Subject: [PATCH 1/3] support for federated clusters --- .../org/apache/druid/query/QueryContexts.java | 1 + .../apache/druid/client/BrokerServerView.java | 17 ++++++++++++ .../druid/client/CachingClusteredClient.java | 27 +++++++++++++++++++ .../druid/client/TimelineServerView.java | 7 +++++ 4 files changed, 52 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 203e63f23f68..6a3c5be53e14 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -83,6 +83,7 @@ public class QueryContexts public static final String SERIALIZE_DATE_TIME_AS_LONG_INNER_KEY = "serializeDateTimeAsLongInner"; public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit"; public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold"; + public static final String FEDERATED_CLUSSTER_BROKERS = "federatedClusterBrokers"; // SQL query context keys public static final String CTX_SQL_QUERY_ID = BaseQuery.SQL_QUERY_ID; diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index a10dd285390b..3f88abb4ba44 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -72,6 +72,7 @@ public class BrokerServerView implements TimelineServerView private final Object lock = new Object(); private final ConcurrentMap clients = new ConcurrentHashMap<>(); + private final ConcurrentMap federatedClients; private final Map selectors = new HashMap<>(); private final Map> timelines = new HashMap<>(); private final ConcurrentMap timelineCallbacks = new ConcurrentHashMap<>(); @@ -104,6 +105,7 @@ public BrokerServerView( this.queryWatcher = queryWatcher; this.smileMapper = smileMapper; this.httpClient = httpClient; + this.federatedClients = new ConcurrentHashMap<>(); this.baseView = baseView; this.tierSelectorStrategy = tierSelectorStrategy; this.emitter = emitter; @@ -241,6 +243,21 @@ private QueryableDruidServer addServer(DruidServer server) return retVal; } + @Override + public QueryableDruidServer getAndAddServer(String hostAndPort) + { + if (federatedClients.containsKey(hostAndPort)) { + return federatedClients.get(hostAndPort); + } + DruidServer server = new DruidServer(hostAndPort, hostAndPort, null, 0, ServerType.BROKER, hostAndPort, 0); + QueryableDruidServer retVal = new QueryableDruidServer<>(server, makeDirectClient(server)); + QueryableDruidServer exists = federatedClients.put(server.getName(), retVal); + if (exists != null) { + log.warn("QueryRunner for server[%s] already exists!? Well it's getting replaced", server); + } + return retVal; + } + private DirectDruidClient makeDirectClient(DruidServer server) { return new DirectDruidClient( diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 5c0e5efb7ea3..94e5ba4a4c37 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -376,6 +376,7 @@ ClusterQueryResult run( List> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size()); addSequencesFromCache(sequencesByInterval, alreadyCachedResults); addSequencesFromServer(sequencesByInterval, segmentsByServer); + addSequencesFromFederatedCluster(sequencesByInterval); return merge(sequencesByInterval); }); @@ -679,6 +680,32 @@ private void addSequencesFromServer( }); } + private void addSequencesFromFederatedCluster( + final List> listOfSequences + ) + { + if (query.context().containsKey(QueryContexts.FEDERATED_CLUSSTER_BROKERS)) { + String[] brokers = query.context().getString(QueryContexts.FEDERATED_CLUSSTER_BROKERS).split(","); + for (String hostName : brokers) { + if (hostName.length() > 0) { + final QueryRunner serverRunner = serverView.getAndAddServer(hostName).getQueryRunner(); + // Divide user-provided maxQueuedBytes by the number of servers, and limit each server to that much. + final long maxQueuedBytes = query.context().getMaxQueuedBytes(httpClientConfig.getMaxQueuedBytes()) / brokers.length; + final Sequence serverResults = serverRunner.run( + queryPlus.withQuery(queryPlus.getQuery() + .withOverriddenContext(ImmutableMap.of( + QueryContexts.FEDERATED_CLUSSTER_BROKERS, + "" + ))) + .withMaxQueuedBytes(maxQueuedBytes), + responseContext + ); + listOfSequences.add(serverResults); + } + } + } + } + @SuppressWarnings("unchecked") private Sequence getBySegmentServerResults( final QueryRunner serverRunner, diff --git a/server/src/main/java/org/apache/druid/client/TimelineServerView.java b/server/src/main/java/org/apache/druid/client/TimelineServerView.java index 477882342425..6ed82a08febc 100644 --- a/server/src/main/java/org/apache/druid/client/TimelineServerView.java +++ b/server/src/main/java/org/apache/druid/client/TimelineServerView.java @@ -19,6 +19,7 @@ package org.apache.druid.client; +import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.planning.DataSourceAnalysis; @@ -102,4 +103,10 @@ interface TimelineCallback */ CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment); } + + default QueryableDruidServer getAndAddServer(String hostAndPort) + { + throw new RuntimeException("This function currently does not support being called."); + } + } From 1f615dfdea549cda60907929bf530e2c30f5a5f2 Mon Sep 17 00:00:00 2001 From: dennis deng <599166320@qq.com> Date: Wed, 18 Oct 2023 09:41:04 +0800 Subject: [PATCH 2/3] Add some test code --- .../druid/client/TimelineServerView.java | 3 +- .../druid/client/BrokerServerViewTest.java | 61 ++++++++++++++++ .../client/CachingClusteredClientTest.java | 72 +++++++++++++++++++ 3 files changed, 135 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/TimelineServerView.java b/server/src/main/java/org/apache/druid/client/TimelineServerView.java index 6ed82a08febc..36d4cd00bf82 100644 --- a/server/src/main/java/org/apache/druid/client/TimelineServerView.java +++ b/server/src/main/java/org/apache/druid/client/TimelineServerView.java @@ -21,6 +21,7 @@ import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -106,7 +107,7 @@ interface TimelineCallback default QueryableDruidServer getAndAddServer(String hostAndPort) { - throw new RuntimeException("This function currently does not support being called."); + throw new ISE("This function currently does not support being called."); } } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index cef8328c8b15..aa87b914a461 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; +import org.apache.druid.client.selector.QueryableDruidServer; import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.curator.CuratorTestBase; @@ -37,9 +38,11 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; @@ -61,6 +64,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -526,6 +530,63 @@ public void testEmptyIgnoredTiersConfig() throws Exception setupViews(null, Collections.emptySet(), true); } + @Test + public void testGetAndAddServer() throws Exception + { + setupViews(null, null, false); + String hostAndPort = "127.0.0.1"; + QueryableDruidServer queryableDruidServer = brokerServerView.getAndAddServer(hostAndPort); + Assert.assertEquals(queryableDruidServer.getServer().getHostAndPort(), hostAndPort); + Assert.assertEquals(queryableDruidServer, brokerServerView.getAndAddServer(hostAndPort)); + } + + @Test(expected = ISE.class) + public void testGetAndAddServerException() throws Exception + { + setupViews(null, null, false); + new TimelineServerView() + { + @Override + public Optional> getTimeline(DataSourceAnalysis analysis) + { + return Optional.empty(); + } + + @Override + public List getDruidServers() + { + return null; + } + + @Override + public QueryRunner getQueryRunner(DruidServer server) + { + return null; + } + + @Override + public void registerTimelineCallback(Executor exec, TimelineCallback callback) + { + + } + + @Override + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + { + + } + + @Override + public void registerSegmentCallback(Executor exec, SegmentCallback callback) + { + + } + + }.getAndAddServer("localhost:8888"); + + } + + /** * Creates a DruidServer of type HISTORICAL and sets up a ZNode for it. */ diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 7755ae2c122c..bdb0a3035600 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2675,6 +2675,12 @@ public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback c { } + + @Override + public QueryableDruidServer getAndAddServer(String hostAndPort) + { + return serverView.getAndAddServer(hostAndPort); + } }, cache, JSON_MAPPER, @@ -3139,6 +3145,72 @@ public void testEtagforDifferentQueryInterval() Assert.assertNotEquals(etag1, etag2); } + @Test + public void testAddSequencesFromFederatedCluster() + { + + Map context = new HashMap<>(); + context.put(QueryContexts.FEDERATED_CLUSSTER_BROKERS, "test1"); + context.putAll(CONTEXT); + + final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() + .dataSource(DATA_SOURCE) + .granularity(GRANULARITY) + .intervals(SEG_SPEC) + .context(context) + .intervals("2011-01-05/2011-01-10") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS); + + TimeseriesQuery query = builder.randomQueryId().build(); + + final Interval interval1 = Intervals.of("2011-01-06/2011-01-07"); + + QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest()); + + final DruidServer lastServer = servers[random.nextInt(servers.length)]; + ServerSelector selector1 = makeMockSingleDimensionSelector(lastServer, "dim1", null, "b", 0); + + timeline.add(interval1, "v", new NumberedPartitionChunk<>(0, 3, selector1)); + + final Capture capture = Capture.newInstance(); + final Capture contextCap = Capture.newInstance(); + + QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class); + QueryRunner mockFederatedClusterRunner = EasyMock.createNiceMock(QueryRunner.class); + QueryableDruidServer queryableDruidServer = EasyMock.createNiceMock(QueryableDruidServer.class); + + EasyMock.expect(mockRunner.run(EasyMock.capture(capture), EasyMock.capture(contextCap))) + .andReturn(Sequences.empty()) + .anyTimes(); + EasyMock.expect(serverView.getQueryRunner(lastServer)) + .andReturn(mockRunner) + .anyTimes(); + + EasyMock.expect(serverView.getAndAddServer("test1")) + .andReturn(queryableDruidServer) + .anyTimes(); + + EasyMock.expect(queryableDruidServer.getQueryRunner()) + .andReturn(mockFederatedClusterRunner) + .anyTimes(); + + EasyMock.expect(mockFederatedClusterRunner.run( + EasyMock.capture(Capture.newInstance()), + EasyMock.capture(Capture.newInstance()) + )) + .andReturn(Sequences.empty()) + .anyTimes(); + + EasyMock.replay(serverView); + EasyMock.replay(mockRunner); + EasyMock.replay(mockFederatedClusterRunner); + EasyMock.replay(queryableDruidServer); + + runner.run(QueryPlus.wrap(query)).toList(); + } + + @SuppressWarnings("unchecked") private QueryRunner getDefaultQueryRunner() { From 16e0383caf21d52ef9454e3148d62e92426a9f29 Mon Sep 17 00:00:00 2001 From: dennis deng <599166320@qq.com> Date: Thu, 19 Oct 2023 09:30:41 +0800 Subject: [PATCH 3/3] Add relevant documentation and modify variable names --- docs/querying/query-context.md | 1 + .../main/java/org/apache/druid/query/QueryContexts.java | 2 +- .../org/apache/druid/client/CachingClusteredClient.java | 8 +++++--- .../apache/druid/client/CachingClusteredClientTest.java | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 1ac3af1127c4..2299c98ab070 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -67,6 +67,7 @@ Unless otherwise noted, the following parameters apply to all query types. |`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | |`maxNumericInFilters`|`-1`|Max limit for the amount of numeric values that can be compared for a string type dimension when the entire SQL WHERE clause of a query translates only to an [OR](../querying/filters.md#or) of [Bound filter](../querying/filters.md#bound-filter). By default, Druid does not restrict the amount of of numeric Bound Filters on String columns, although this situation may block other queries from running. Set this parameter to a smaller value to prevent Druid from running queries that have prohibitively long segment processing times. The optimal limit requires some trial and error; we recommend starting with 100. Users who submit a query that exceeds the limit of `maxNumericInFilters` should instead rewrite their queries to use strings in the `WHERE` clause instead of numbers. For example, `WHERE someString IN (‘123’, ‘456’)`. This value cannot exceed the set system configuration `druid.sql.planner.maxNumericInFilters`. This value is ignored if `druid.sql.planner.maxNumericInFilters` is not set explicitly.| |`inSubQueryThreshold`|`2147483647`| Threshold for minimum number of values in an IN clause to convert the query to a JOIN operation on an inlined table rather than a predicate. A threshold of 0 forces usage of an inline table in all cases; a threshold of [Integer.MAX_VALUE] forces usage of OR in all cases. | +|`federatedClusterBrokers`|`null`| If multiple Druid clusters need to support federated queries, each cluster should select a broker or router. Separate the connection information for each cluster with commas. | ## Druid SQL parameters diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index bf8003658c17..f108a3514065 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -85,7 +85,7 @@ public class QueryContexts public static final String SERIALIZE_DATE_TIME_AS_LONG_INNER_KEY = "serializeDateTimeAsLongInner"; public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit"; public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold"; - public static final String FEDERATED_CLUSSTER_BROKERS = "federatedClusterBrokers"; + public static final String FEDERATED_CLUSTER_BROKERS = "federatedClusterBrokers"; // SQL query context keys public static final String CTX_SQL_QUERY_ID = BaseQuery.SQL_QUERY_ID; diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index aa776f8f3936..2629b4a6ce0f 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; @@ -680,8 +681,9 @@ private void addSequencesFromFederatedCluster( final List> listOfSequences ) { - if (query.context().containsKey(QueryContexts.FEDERATED_CLUSSTER_BROKERS)) { - String[] brokers = query.context().getString(QueryContexts.FEDERATED_CLUSSTER_BROKERS).split(","); + String federatedClusterBrokersStr = query.context().getString(QueryContexts.FEDERATED_CLUSTER_BROKERS); + if (!Strings.isNullOrEmpty(federatedClusterBrokersStr)) { + String[] brokers = federatedClusterBrokersStr.split(","); for (String hostName : brokers) { if (hostName.length() > 0) { final QueryRunner serverRunner = serverView.getAndAddServer(hostName).getQueryRunner(); @@ -690,7 +692,7 @@ private void addSequencesFromFederatedCluster( final Sequence serverResults = serverRunner.run( queryPlus.withQuery(queryPlus.getQuery() .withOverriddenContext(ImmutableMap.of( - QueryContexts.FEDERATED_CLUSSTER_BROKERS, + QueryContexts.FEDERATED_CLUSTER_BROKERS, "" ))) .withMaxQueuedBytes(maxQueuedBytes), diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index bdb0a3035600..64078955748b 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -3150,7 +3150,7 @@ public void testAddSequencesFromFederatedCluster() { Map context = new HashMap<>(); - context.put(QueryContexts.FEDERATED_CLUSSTER_BROKERS, "test1"); + context.put(QueryContexts.FEDERATED_CLUSTER_BROKERS, "test1"); context.putAll(CONTEXT); final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()