-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improvements to LeadershipWatcher - refactor to make it pluggable + catch SessionExpired in getChildren() #13
Conversation
...-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java
Outdated
Show resolved
Hide resolved
...-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/KafkaSegmentUploader.java
Show resolved
Hide resolved
public int getLeadershipWatcherPollIntervalSeconds() { | ||
if (properties.containsKey(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS)) { | ||
return Integer.parseInt(properties.getProperty(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS)); | ||
} | ||
return Defaults.DEFAULT_ZK_WATCHER_POLL_INTERVAL_SECONDS; | ||
return Defaults.DEFAULT_LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this become internal to leadership watchers? What if some implementations won't be poll based?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now, all LeadershipWatchers that extend the abstract class need to be poll based, as the polling logic happens in the abstract class itself. If we implement another type of LeadershipWatcher that is push-based, it would require a separate abstract class implementation.
The current implementation of the Uploader requires a poll-based LeadershipWatcher (there is no alternative right now) - IMO we should keep this a top-level config until we have a push-based alternative. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good; then can we update the interface name to something like PollingLeadershipWatcher
or a better name? And highlight it in the javadoc?
We can leave the more generic interface to a future work, if it becomes necessary.
public int getLeadershipWatcherPollIntervalSeconds() { | ||
if (properties.containsKey(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS)) { | ||
return Integer.parseInt(properties.getProperty(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS)); | ||
} | ||
return Defaults.DEFAULT_ZK_WATCHER_POLL_INTERVAL_SECONDS; | ||
return Defaults.DEFAULT_LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good; then can we update the interface name to something like PollingLeadershipWatcher
or a better name? And highlight it in the javadoc?
We can leave the more generic interface to a future work, if it becomes necessary.
This PR includes improvements to the logic for leadership detection in the Uploader.
Specifically:
LeadershipWatcher
which can be extended to accommodate for different ways of detecting leadership. The corresponding configts.segment.uploader.leadership.watcher.class
will tell the uploader which class to use for leadership detection. This abstract class generalizes the logic for leadership detection, and can be implemented for later versions of Kafka such asKRaftLeadershipWatcher
.Implementations of this interface need to define the following methods:
void initialize()
: initialization logicSet<TopicPartition> queryCurrentLeadingPartitions()
: returns the set of currently leading partitions for this brokerKafkaLeadershipWatcher
was renamed toZookeeperLeadershipWatcher
which extends the newly-introduced abstract classLeadershipWatcher
.SessionExpiredException
thrown by the ZooKeeper client by catching the exception appropriately and performing a reconstruction of the ZooKeeper client instance.LeadershipWatcher
frameworkTesting was performed in dev Kafka clusters to validate that the uploaders accurately detect leadership changes, partition movements, etc.