Skip to content

Commit

Permalink
align bootstrap order (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Sep 24, 2023
1 parent d7a081a commit 1d8bb94
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Karafka Web changelog

## 0.7.5 (Unreleased)
- [Enhancement] Update order of topics creation for the setup of Web to support zero-downtime setup of Web in running Karafka projects.
- [Fix] Cache assets for 1 year instead of 7 days.
- [Fix] Remove source maps pointing to non-existing locations.
- [Maintenance] Include license and copyrights notice for `timeago.js` that was missing in the JS min file.
Expand Down
98 changes: 52 additions & 46 deletions lib/karafka/web/management/create_topics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,54 +9,36 @@ class CreateTopics < Base
# Runs the creation process
#
# @param replication_factor [Integer] replication factor for Web-UI topics
#
# @note The order of creation of those topics is important. In order to support the
# zero-downtime bootstrap, we use the presence of the states topic and its initial state
# existence as an indicator that the setup went as expected. It the consumers states
# topic exists and contains needed data, it means all went as expected and that
# topics created before it also exist (as no error).
def call(replication_factor)
consumers_states_topic = ::Karafka::Web.config.topics.consumers.states
consumers_metrics_topic = ::Karafka::Web.config.topics.consumers.metrics
consumers_reports_topic = ::Karafka::Web.config.topics.consumers.reports
errors_topic = ::Karafka::Web.config.topics.errors

# Create only if needed
if existing_topics_names.include?(consumers_states_topic)
exists(consumers_states_topic)
else
creating(consumers_states_topic)
# This topic needs to have one partition
::Karafka::Admin.create_topic(
consumers_states_topic,
1,
replication_factor,
# We care only about the most recent state, previous are irrelevant. So we can easily
# compact after one minute. We do not use this beyond the most recent collective
# state, hence it all can easily go away. We also limit the segment size to at most
# 100MB not to use more space ever.
{
'cleanup.policy': 'compact',
'retention.ms': 60 * 60 * 1_000,
'segment.ms': 24 * 60 * 60 * 1_000, # 1 day
'segment.bytes': 104_857_600 # 100MB
}
)
created(consumers_states_topic)
end

if existing_topics_names.include?(consumers_metrics_topic)
exists(consumers_metrics_topic)
if existing_topics_names.include?(errors_topic)
exists(errors_topic)
else
creating(consumers_metrics_topic)
# This topic needs to have one partition
# Same as states - only most recent is relevant as it is a materialized state
creating(errors_topic)
# All the errors will be dispatched here
# This topic can have multiple partitions but we go with one by default. A single Ruby
# process should not crash that often and if there is an expectation of a higher volume
# of errors, this can be changed by the end user
::Karafka::Admin.create_topic(
consumers_metrics_topic,
errors_topic,
1,
replication_factor,
# Remove really old errors (older than 3 months just to preserve space)
{
'cleanup.policy': 'compact',
'retention.ms': 60 * 60 * 1_000, # 1h
'segment.ms': 24 * 60 * 60 * 1_000, # 1 day
'segment.bytes': 104_857_600 # 100MB
'retention.ms': 3 * 31 * 24 * 60 * 60 * 1_000 # 3 months
}
)
created(consumers_metrics_topic)
created(errors_topic)
end

if existing_topics_names.include?(consumers_reports_topic)
Expand All @@ -81,24 +63,48 @@ def call(replication_factor)
created(consumers_reports_topic)
end

if existing_topics_names.include?(errors_topic)
exists(errors_topic)
if existing_topics_names.include?(consumers_metrics_topic)
exists(consumers_metrics_topic)
else
creating(errors_topic)
# All the errors will be dispatched here
# This topic can have multiple partitions but we go with one by default. A single Ruby
# process should not crash that often and if there is an expectation of a higher volume
# of errors, this can be changed by the end user
creating(consumers_metrics_topic)
# This topic needs to have one partition
# Same as states - only most recent is relevant as it is a materialized state
::Karafka::Admin.create_topic(
errors_topic,
consumers_metrics_topic,
1,
replication_factor,
# Remove really old errors (older than 3 months just to preserve space)
{
'retention.ms': 3 * 31 * 24 * 60 * 60 * 1_000 # 3 months
'cleanup.policy': 'compact',
'retention.ms': 60 * 60 * 1_000, # 1h
'segment.ms': 24 * 60 * 60 * 1_000, # 1 day
'segment.bytes': 104_857_600 # 100MB
}
)
created(errors_topic)
created(consumers_metrics_topic)
end

# Create only if needed
if existing_topics_names.include?(consumers_states_topic)
exists(consumers_states_topic)
else
creating(consumers_states_topic)
# This topic needs to have one partition
::Karafka::Admin.create_topic(
consumers_states_topic,
1,
replication_factor,
# We care only about the most recent state, previous are irrelevant. So we can easily
# compact after one minute. We do not use this beyond the most recent collective
# state, hence it all can easily go away. We also limit the segment size to at most
# 100MB not to use more space ever.
{
'cleanup.policy': 'compact',
'retention.ms': 60 * 60 * 1_000,
'segment.ms': 24 * 60 * 60 * 1_000, # 1 day
'segment.bytes': 104_857_600 # 100MB
}
)
created(consumers_states_topic)
end
end

Expand Down

0 comments on commit 1d8bb94

Please sign in to comment.