Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to LeadershipWatcher - refactor to make it pluggable + catch SessionExpired in getChildren() #13

Merged
merged 5 commits into from
Nov 15, 2024

Conversation

jeffxiang
Copy link
Contributor

@jeffxiang jeffxiang commented Nov 12, 2024

This PR includes improvements to the logic for leadership detection in the Uploader.

Specifically:

  1. We introduce a new abstract class LeadershipWatcher which can be extended to accommodate for different ways of detecting leadership. The corresponding config ts.segment.uploader.leadership.watcher.class will tell the uploader which class to use for leadership detection. This abstract class generalizes the logic for leadership detection, and can be implemented for later versions of Kafka such as KRaftLeadershipWatcher.
    Implementations of this interface need to define the following methods:
  • void initialize(): initialization logic
  • Set<TopicPartition> queryCurrentLeadingPartitions(): returns the set of currently leading partitions for this broker
  1. KafkaLeadershipWatcher was renamed to ZookeeperLeadershipWatcher which extends the newly-introduced abstract class LeadershipWatcher.
  2. This PR also attempts to fix the SessionExpiredException thrown by the ZooKeeper client by catching the exception appropriately and performing a reconstruction of the ZooKeeper client instance.
  3. Several unit tests to validate the new LeadershipWatcher framework
  4. Other minor fixes / improvements

Testing was performed in dev Kafka clusters to validate that the uploaders accurately detect leadership changes, partition movements, etc.

@jeffxiang jeffxiang requested a review from a team as a code owner November 12, 2024 16:06
@jeffxiang jeffxiang changed the title Update error handling logic in KafkaLeadershipWatcher Improvements to LeadershipWatcher - refactor to make it pluggable + catch SessionExpired in getChildren() Nov 13, 2024
Comment on lines +187 to 192
public int getLeadershipWatcherPollIntervalSeconds() {
if (properties.containsKey(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS)) {
return Integer.parseInt(properties.getProperty(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS));
}
return Defaults.DEFAULT_ZK_WATCHER_POLL_INTERVAL_SECONDS;
return Defaults.DEFAULT_LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this become internal to leadership watchers? What if some implementations won't be poll based?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now, all LeadershipWatchers that extend the abstract class need to be poll based, as the polling logic happens in the abstract class itself. If we implement another type of LeadershipWatcher that is push-based, it would require a separate abstract class implementation.

The current implementation of the Uploader requires a poll-based LeadershipWatcher (there is no alternative right now) - IMO we should keep this a top-level config until we have a push-based alternative. WDYT?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good; then can we update the interface name to something like PollingLeadershipWatcher or a better name? And highlight it in the javadoc?

We can leave the more generic interface to a future work, if it becomes necessary.

Comment on lines +187 to 192
public int getLeadershipWatcherPollIntervalSeconds() {
if (properties.containsKey(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS)) {
return Integer.parseInt(properties.getProperty(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS));
}
return Defaults.DEFAULT_ZK_WATCHER_POLL_INTERVAL_SECONDS;
return Defaults.DEFAULT_LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good; then can we update the interface name to something like PollingLeadershipWatcher or a better name? And highlight it in the javadoc?

We can leave the more generic interface to a future work, if it becomes necessary.

@jeffxiang jeffxiang merged commit cfe54d5 into main Nov 15, 2024
1 check passed
@jeffxiang jeffxiang deleted the update_error_handling_logic branch November 15, 2024 17:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants