Skip to content

Commit

Permalink
Replace timer with scheduled executor service in IngestionDelayTracker (
Browse files Browse the repository at this point in the history
  • Loading branch information
cypherean authored Nov 16, 2023
1 parent 1ab9e62 commit 71e9c2c
Showing 1 changed file with 38 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
Expand All @@ -50,8 +53,8 @@
* 7-When a segment goes from CONSUMING to ONLINE, we start a timeout for the corresponding partition.
* If no consumption is noticed after the timeout, we then read ideal state to confirm the server still hosts the
* partition. If not, we stop tracking the respective partition.
* 8-A timer thread is started by this object to track timeouts of partitions and drive the reading of their ideal
* state.
* 8-A scheduled executor thread is started by this object to track timeouts of partitions and drive the reading
* of their ideal state.
*
* The following diagram illustrates the object interactions with main external APIs
*
Expand Down Expand Up @@ -85,12 +88,12 @@ private static class IngestionTimestamps {
private final long _ingestionTimeMs;
private final long _firstStreamIngestionTimeMs;
}
// Sleep interval for timer thread that triggers read of ideal state
private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 minutes +/- precision in timeouts
// Sleep interval for scheduled executor service thread that triggers read of ideal state
private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS = 300000; // 5 minutes +/- precision in timeouts
// Once a partition is marked for verification, we wait 10 minutes to pull its ideal state.
private static final int PARTITION_TIMEOUT_MS = 600000; // 10 minutes timeouts
// Delay Timer thread for this amount of time after starting timer
private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
// Delay scheduled executor service for this amount of time after starting service
private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100;
private static final Logger _logger = LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());

// HashMap used to store ingestion time measures for all partitions active for the current table.
Expand All @@ -100,9 +103,10 @@ private static class IngestionTimestamps {
// ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons.
private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>();

final int _timerThreadTickIntervalMs;
// Timer task to check partitions that are inactive against ideal state.
private final Timer _timer;
final int _scheduledExecutorThreadTickIntervalMs;
// TODO: Make thread pool a server/cluster level config
// ScheduledExecutorService to check partitions that are inactive against ideal state.
private final ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(2);

private final ServerMetrics _serverMetrics;
private final String _tableNameWithType;
Expand All @@ -114,7 +118,7 @@ private static class IngestionTimestamps {
private Clock _clock;

public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
RealtimeTableDataManager realtimeTableDataManager, int timerThreadTickIntervalMs,
RealtimeTableDataManager realtimeTableDataManager, int scheduledExecutorThreadTickIntervalMs,
Supplier<Boolean> isServerReadyToServeQueries)
throws RuntimeException {
_serverMetrics = serverMetrics;
Expand All @@ -124,23 +128,32 @@ public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithTy
_clock = Clock.systemUTC();
_isServerReadyToServeQueries = isServerReadyToServeQueries;
// Handle negative timer values
if (timerThreadTickIntervalMs <= 0) {
if (scheduledExecutorThreadTickIntervalMs <= 0) {
throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s",
timerThreadTickIntervalMs, _tableNameWithType));
scheduledExecutorThreadTickIntervalMs, _tableNameWithType));
}
_timerThreadTickIntervalMs = timerThreadTickIntervalMs;
_timer = new Timer("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(tableNameWithType));
_timer.schedule(new TimerTask() {
@Override
public void run() {
timeoutInactivePartitions();
}
}, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
_scheduledExecutorThreadTickIntervalMs = scheduledExecutorThreadTickIntervalMs;

// ThreadFactory to set the thread's name
ThreadFactory threadFactory = new ThreadFactory() {
private final ThreadFactory _defaultFactory = Executors.defaultThreadFactory();

@Override
public Thread newThread(Runnable r) {
Thread thread = _defaultFactory.newThread(r);
thread.setName("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(tableNameWithType));
return thread;
}
};
((ScheduledThreadPoolExecutor) _scheduledExecutor).setThreadFactory(threadFactory);

_scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, _scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
}

public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType,
RealtimeTableDataManager tableDataManager, Supplier<Boolean> isServerReadyToServeQueries) {
this(serverMetrics, tableNameWithType, tableDataManager, TIMER_THREAD_TICK_INTERVAL_MS,
this(serverMetrics, tableNameWithType, tableDataManager, SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS,
isServerReadyToServeQueries);
}

Expand Down Expand Up @@ -255,7 +268,7 @@ public void stopTrackingPartitionIngestionDelay(int partitionGroupId) {
* This method is used for timing out inactive partitions, so we don't display their metrics on current server.
* When the inactive time exceeds some threshold, we read from ideal state to confirm we still host the partition,
* if not we remove the partition from being tracked locally.
* This call is to be invoked by a timer thread that will periodically wake up and invoke this function.
* This call is to be invoked by a scheduled executor thread that will periodically wake up and invoke this function.
*/
public void timeoutInactivePartitions() {
if (!_isServerReadyToServeQueries.get()) {
Expand Down Expand Up @@ -337,7 +350,7 @@ public long getPartitionEndToEndIngestionDelayMs(int partitionGroupId) {
*/
public void shutdown() {
// Now that segments can't report metric, destroy metric for this table
_timer.cancel(); // Timer is installed in constructor so must always be cancelled
_scheduledExecutor.shutdown(); // ScheduledExecutor is installed in constructor so must always be cancelled
if (!_isServerReadyToServeQueries.get()) {
// Do not update the tracker state during server startup period
return;
Expand Down

0 comments on commit 71e9c2c

Please sign in to comment.