Skip to content

Commit

Permalink
Add Broker Node Level Config: newSegmentExpirationTimeInSeconds (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 authored Jul 26, 2024
1 parent 9fd75c9 commit 24e01c1
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {

public BalancedInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock,
boolean useFixedReplica) {
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica);
boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica,
newSegmentExpirationTimeInSeconds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ abstract class BaseInstanceSelector implements InstanceSelector {
final AdaptiveServerSelector _adaptiveServerSelector;
final Clock _clock;
final boolean _useFixedReplica;
final long _newSegmentExpirationTimeInSeconds;
final int _tableNameHashForFixedReplicaRouting;

// These 3 variables are the cached states to help accelerate the change processing
Expand All @@ -104,13 +105,14 @@ abstract class BaseInstanceSelector implements InstanceSelector {

BaseInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock,
boolean useFixedReplica) {
boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
_tableNameWithType = tableNameWithType;
_propertyStore = propertyStore;
_brokerMetrics = brokerMetrics;
_adaptiveServerSelector = adaptiveServerSelector;
_clock = clock;
_useFixedReplica = useFixedReplica;
_newSegmentExpirationTimeInSeconds = newSegmentExpirationTimeInSeconds;
// Using raw table name to ensure queries spanning across REALTIME and OFFLINE tables are routed to the same
// instance
// Math.abs(Integer.MIN_VALUE) = Integer.MIN_VALUE, so we use & 0x7FFFFFFF to get a positive value
Expand Down Expand Up @@ -170,7 +172,7 @@ Map<String, Long> getNewSegmentCreationTimeMapFromZK(IdealState idealState, Exte
}
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(record);
long creationTimeMs = SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata);
if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs)) {
if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs, _newSegmentExpirationTimeInSeconds * 1000)) {
newSegmentCreationTimeMap.put(segmentZKMetadata.getSegmentName(), creationTimeMs);
}
}
Expand Down Expand Up @@ -400,7 +402,8 @@ Map<String, Long> getNewSegmentCreationTimeMapFromExistingStates(IdealState idea
long creationTimeMs = 0;
if (newSegmentState != null) {
// It was a new segment before, check the creation time and segment state to see if it is still a new segment
if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(), currentTimeMs)) {
if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(), currentTimeMs,
_newSegmentExpirationTimeInSeconds * 1000)) {
creationTimeMs = newSegmentState.getCreationTimeMs();
}
} else if (!_oldSegmentCandidatesMap.containsKey(segment)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ public interface InstanceSelector {
long NEW_SEGMENT_EXPIRATION_MILLIS = TimeUnit.MINUTES.toMillis(5);

static boolean isNewSegment(long creationTimeMs, long currentTimeMs) {
return creationTimeMs > 0 && currentTimeMs - creationTimeMs <= NEW_SEGMENT_EXPIRATION_MILLIS;
return isNewSegment(creationTimeMs, currentTimeMs, NEW_SEGMENT_EXPIRATION_MILLIS);
}

static boolean isNewSegment(long creationTimeMs, long currentTimeMs, long newSegmentExpirationMillis) {
return creationTimeMs > 0 && currentTimeMs - creationTimeMs <= newSegmentExpirationMillis;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
boolean useFixedReplica = brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_USE_FIXED_REPLICA,
CommonConstants.Broker.DEFAULT_USE_FIXED_REPLICA);
long newSegmentExpirationTimeInSeconds =
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS,
CommonConstants.Broker.DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS);
if (routingConfig != null) {
if (routingConfig.getUseFixedReplica() != null) {
// table config overrides broker config
Expand All @@ -74,22 +77,22 @@ public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
&& LEGACY_REPLICA_GROUP_REALTIME_ROUTING.equalsIgnoreCase(routingConfig.getRoutingTableBuilderName()))) {
LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", tableNameWithType);
return new ReplicaGroupInstanceSelector(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector,
clock, useFixedReplica);
clock, useFixedReplica, newSegmentExpirationTimeInSeconds);
}
if (RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
routingConfig.getInstanceSelectorType())) {
LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}", tableNameWithType);
return new StrictReplicaGroupInstanceSelector(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
adaptiveServerSelector, clock, useFixedReplica, newSegmentExpirationTimeInSeconds);
}
if (RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
routingConfig.getInstanceSelectorType())) {
LOGGER.info("Using {} for table: {}", routingConfig.getInstanceSelectorType(), tableNameWithType);
return new MultiStageReplicaGroupSelector(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
adaptiveServerSelector, clock, useFixedReplica, newSegmentExpirationTimeInSeconds);
}
}
return new BalancedInstanceSelector(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock,
useFixedReplica);
useFixedReplica, newSegmentExpirationTimeInSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {

public MultiStageReplicaGroupSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock,
boolean useFixedReplica) {
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica);
boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica,
newSegmentExpirationTimeInSeconds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {

public ReplicaGroupInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock,
boolean useFixedReplica) {
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica);
boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica,
newSegmentExpirationTimeInSeconds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele

public StrictReplicaGroupInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock,
boolean useFixedReplica) {
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica);
boolean useFixedReplica, long newSegmentExpirationTimeInSeconds) {
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock, useFixedReplica,
newSegmentExpirationTimeInSeconds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,14 @@ public void testInstanceSelector() {
ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
BalancedInstanceSelector balancedInstanceSelector =
new BalancedInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), false);
new BalancedInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(),
false, 300);
ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(),
false);
false, 300);
StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
new StrictReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(),
false);
false, 300);

Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
Expand Down Expand Up @@ -761,7 +762,7 @@ public void testReplicaGroupInstanceSelectorNumReplicaGroupsToQuery() {

ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(),
false);
false, 300);

Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
Expand Down Expand Up @@ -844,7 +845,7 @@ public void testReplicaGroupInstanceSelectorNumReplicaGroupsToQueryGreaterThanRe

ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(),
false);
false, 300);

Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
Expand Down Expand Up @@ -927,7 +928,7 @@ public void testReplicaGroupInstanceSelectorNumReplicaGroupsNotSet() {

ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(),
false);
false, 300);

Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
Expand Down Expand Up @@ -1001,7 +1002,7 @@ public void testMultiStageStrictReplicaGroupSelector() {

MultiStageReplicaGroupSelector multiStageSelector =
new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(),
false);
false, 300);
multiStageSelector = spy(multiStageSelector);
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();

Expand Down Expand Up @@ -1096,11 +1097,12 @@ public void testUnavailableSegments() {
ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
BalancedInstanceSelector balancedInstanceSelector =
new BalancedInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(), false);
new BalancedInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(),
false, 300);
// ReplicaGroupInstanceSelector has the same behavior as BalancedInstanceSelector for the unavailable segments
StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
new StrictReplicaGroupInstanceSelector(offlineTableName, propertyStore, brokerMetrics, null, Clock.systemUTC(),
false);
false, 300);

Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.File;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.config.instance.InstanceType;


Expand Down Expand Up @@ -344,6 +345,9 @@ public static class Broker {
// precedence over "query.response.size" (i.e., "query.response.size" will be ignored).
public static final String CONFIG_OF_MAX_SERVER_RESPONSE_SIZE_BYTES = "pinot.broker.max.server.response.size.bytes";

public static final String CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS = "pinot.broker.new.segment.expiration.seconds";
public static final long DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS = TimeUnit.MINUTES.toSeconds(5);

public static class Request {
public static final String SQL = "sql";
public static final String TRACE = "trace";
Expand Down

0 comments on commit 24e01c1

Please sign in to comment.