Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to LeadershipWatcher - refactor to make it pluggable + catch SessionExpired in getChildren() #13

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ts.segment.uploader.upload.timeout.ms=60000
ts.segment.uploader.upload.thread.count=3
ts.segment.uploader.upload.max.retries=10

ts.segment.uploader.zk.watcher.poll.interval.seconds=60
ts.segment.uploader.leadership.watcher.poll.interval.seconds=60

storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.ExampleS3StorageServiceEndpointProvider
metrics.reporter.class=com.pinterest.kafka.tieredstorage.common.metrics.NoOpMetricsReporter
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.kafka.tieredstorage.common.metrics.MetricRegistryManager;
import com.pinterest.kafka.tieredstorage.uploader.leadership.KafkaLeadershipWatcher;
import com.pinterest.kafka.tieredstorage.uploader.leadership.LeadershipWatcher;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -11,17 +11,14 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -56,18 +53,16 @@ public class DirectoryTreeWatcher implements Runnable {
private static final Logger LOG = LogManager.getLogger(DirectoryTreeWatcher.class);
private static final String[] MONITORED_EXTENSIONS = {".timeindex", ".index", ".log"};
private static final Pattern MONITORED_FILE_PATTERN = Pattern.compile("^\\d+(" + String.join("|", MONITORED_EXTENSIONS) + ")$");
private final Path topLevelPath;
private final WatchService watchService;
private Thread thread;
private boolean cancelled = false;
private static Map<TopicPartition, String> activeSegment;
private static Map<TopicPartition, Set<String>> segmentsQueue;
private static LeadershipWatcher leadershipWatcher;
private final Path topLevelPath;
private final WatchService watchService;
private final ConcurrentLinkedQueue<UploadTask> uploadTasks = new ConcurrentLinkedQueue<>();
private final S3FileUploader s3FileUploader;
private final ThreadLocal<WatermarkFileHandler> tempFileGenerator = ThreadLocal.withInitial(WatermarkFileHandler::new);
private final ConcurrentHashMap<TopicPartition, String> latestUploadedOffset = new ConcurrentHashMap<>();
private final S3FileDownloader s3FileDownloader;
private static KafkaLeadershipWatcher kafkaLeadershipWatcher;
private final Pattern SKIP_TOPICS_PATTERN = Pattern.compile(
"^__consumer_offsets$|^__transaction_state$|.+\\.changlog$|.+\\.repartition$"
);
Expand All @@ -78,36 +73,30 @@ public class DirectoryTreeWatcher implements Runnable {
private final SegmentUploaderConfiguration config;
private final KafkaEnvironmentProvider environmentProvider;
private final Object watchKeyMapLock = new Object();
private Thread thread;
private boolean cancelled = false;

public static void setKafkaLeadershipWatcher(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) {
if (kafkaLeadershipWatcher == null) {
try {
kafkaLeadershipWatcher = new KafkaLeadershipWatcher(directoryTreeWatcher, config, environmentProvider);
} catch (IOException | InterruptedException e) {
LOG.error("Could not launch Kafka Leadership Watcher; quitting ...");
throw new RuntimeException(e);
}
}
public static void setLeadershipWatcher(LeadershipWatcher suppliedLeadershipWatcher) {
if (leadershipWatcher == null)
leadershipWatcher = suppliedLeadershipWatcher;
}

@VisibleForTesting
protected static void unsetKafkaLeadershipWatcher() {
kafkaLeadershipWatcher = null;
protected static void unsetLeadershipWatcher() {
leadershipWatcher = null;
}

public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws IOException, InterruptedException, KeeperException {
public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws Exception {
this.environmentProvider = environmentProvider;
this.topLevelPath = Paths.get(environmentProvider.logDir());
this.watchService = FileSystems.getDefault().newWatchService();
setKafkaLeadershipWatcher(this, config, environmentProvider);
activeSegment = new HashMap<>();
segmentsQueue = new HashMap<>();
this.s3FileUploader = s3FileUploader;
this.s3UploadHandler = Executors.newSingleThreadExecutor();
this.s3FileDownloader = new S3FileDownloader(s3FileUploader.getStorageServiceEndpointProvider(), config);
heartbeat = new Heartbeat("watcher.logs", config, environmentProvider);
this.config = config;
initialize();
}

/**
Expand All @@ -116,7 +105,10 @@ public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfig
* @throws InterruptedException
* @throws KeeperException
*/
private void initialize() throws IOException, InterruptedException, KeeperException {
public void initialize() throws Exception {
if (leadershipWatcher == null) {
throw new IllegalStateException("LeadershipWatcher must be set before initializing DirectoryTreeWatcher");
}
s3UploadHandler.submit(() -> {
while (!cancelled) {
if (uploadTasks.isEmpty()) {
Expand Down Expand Up @@ -145,8 +137,8 @@ private void initialize() throws IOException, InterruptedException, KeeperExcept
s3FileUploader.uploadFile(task, this::handleUploadCallback);
}
});
LOG.info("Initializing KafkaLeadershipWatcher");
kafkaLeadershipWatcher.start();
LOG.info("Starting LeadershipWatcher: " + leadershipWatcher.getClass().getName());
leadershipWatcher.start();
LOG.info("Submitting s3UploadHandler loop");
}

Expand Down Expand Up @@ -702,7 +694,7 @@ public void stop() throws InterruptedException {
if (thread != null && thread.isAlive()) {
thread.interrupt();
}
kafkaLeadershipWatcher.stop();
leadershipWatcher.stop();
heartbeat.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.kafka.tieredstorage.common.discovery.StorageServiceEndpointProvider;
import com.pinterest.kafka.tieredstorage.uploader.leadership.LeadershipWatcher;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -21,22 +22,12 @@ public class KafkaSegmentUploader {
private final StorageServiceEndpointProvider endpointProvider;
private final SegmentUploaderConfiguration config;

public KafkaSegmentUploader(String configDirectory) throws IOException, InterruptedException, KeeperException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
Utils.acquireLock();
KafkaEnvironmentProvider environmentProvider = getEnvironmentProvider();
environmentProvider.load();

config = new SegmentUploaderConfiguration(configDirectory, environmentProvider.clusterId());

endpointProvider = getEndpointProviderFromConfigs(config);
endpointProvider.initialize(environmentProvider.clusterId());

multiThreadedS3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider);
directoryTreeWatcher = new DirectoryTreeWatcher(multiThreadedS3FileUploader, config, environmentProvider);
public KafkaSegmentUploader(String configDirectory) throws Exception {
this(configDirectory, getEnvironmentProvider());
}

@VisibleForTesting
protected KafkaSegmentUploader(String configDirectory, KafkaEnvironmentProvider environmentProvider) throws IOException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException, InterruptedException, KeeperException {
protected KafkaSegmentUploader(String configDirectory, KafkaEnvironmentProvider environmentProvider) throws Exception {
Utils.acquireLock();
environmentProvider.load();
config = new SegmentUploaderConfiguration(configDirectory, environmentProvider.clusterId());
Expand All @@ -46,6 +37,11 @@ protected KafkaSegmentUploader(String configDirectory, KafkaEnvironmentProvider

multiThreadedS3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider);
directoryTreeWatcher = new DirectoryTreeWatcher(multiThreadedS3FileUploader, config, environmentProvider);

LeadershipWatcher leadershipWatcher = getLeadershipWatcherFromConfigs(directoryTreeWatcher, config, environmentProvider);
DirectoryTreeWatcher.setLeadershipWatcher(leadershipWatcher);

directoryTreeWatcher.initialize();
jeffxiang marked this conversation as resolved.
Show resolved Hide resolved
}

public void start() {
Expand All @@ -58,7 +54,7 @@ public void stop() throws InterruptedException, IOException {
Utils.releaseLock();
}

private KafkaEnvironmentProvider getEnvironmentProvider() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
private static KafkaEnvironmentProvider getEnvironmentProvider() throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
String kafkaEnvironmentProviderClass = System.getProperty("kafkaEnvironmentProviderClass");
if (kafkaEnvironmentProviderClass == null) {
throw new RuntimeException("kafkaEnvironmentProviderClass must be set as a JVM argument");
Expand All @@ -69,6 +65,14 @@ private KafkaEnvironmentProvider getEnvironmentProvider() throws ClassNotFoundEx
return environmentProviderConstructor.newInstance();
}

private static LeadershipWatcher getLeadershipWatcherFromConfigs(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider kafkaEnvironmentProvider) throws InvocationTargetException, InstantiationException, IllegalAccessException, ClassNotFoundException, NoSuchMethodException {
String leadershipWatcherClassName = config.getLeadershipWatcherClassName();
LOG.info(String.format("LeadershipWatcher: %s", leadershipWatcherClassName));
Constructor<? extends LeadershipWatcher> leadershipWatcherConstructor = Class.forName(leadershipWatcherClassName)
.asSubclass(LeadershipWatcher.class).getConstructor(DirectoryTreeWatcher.class, SegmentUploaderConfiguration.class, KafkaEnvironmentProvider.class);
return leadershipWatcherConstructor.newInstance(directoryTreeWatcher, config, kafkaEnvironmentProvider);
}

@VisibleForTesting
protected StorageServiceEndpointProvider getEndpointProvider() {
return endpointProvider;
Expand All @@ -79,13 +83,15 @@ protected SegmentUploaderConfiguration getSegmentUploaderConfiguration() {
return config;
}

private StorageServiceEndpointProvider getEndpointProviderFromConfigs(SegmentUploaderConfiguration config) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
Constructor<? extends StorageServiceEndpointProvider> endpointProviderConstructor = Class.forName(config.getStorageServiceEndpointProviderClassName())
private static StorageServiceEndpointProvider getEndpointProviderFromConfigs(SegmentUploaderConfiguration config) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
String endpointProviderClassName = config.getStorageServiceEndpointProviderClassName();
LOG.info(String.format("StorageServiceEndpointProvider: %s", endpointProviderClassName));
Constructor<? extends StorageServiceEndpointProvider> endpointProviderConstructor = Class.forName(endpointProviderClassName)
.asSubclass(StorageServiceEndpointProvider.class).getConstructor();
return endpointProviderConstructor.newInstance();
}

public static void main(String[] args) throws IOException, InterruptedException, KeeperException, ConfigurationException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
LOG.error("configDirectory is required as an argument");
System.exit(1);
Expand Down
Loading
Loading