diff --git a/CHANGELOG.md b/CHANGELOG.md index c9d33a74..49fc0e0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Karafka Web changelog ## 0.7.5 (Unreleased) +- [Enhancement] Update order of topics creation for the setup of Web to support zero-downtime setup of Web in running Karafka projects. - [Fix] Cache assets for 1 year instead of 7 days. - [Fix] Remove source maps pointing to non-existing locations. - [Maintenance] Include license and copyrights notice for `timeago.js` that was missing in the JS min file. diff --git a/lib/karafka/web/management/create_topics.rb b/lib/karafka/web/management/create_topics.rb index d6436ff4..3e64a9f0 100644 --- a/lib/karafka/web/management/create_topics.rb +++ b/lib/karafka/web/management/create_topics.rb @@ -9,54 +9,36 @@ class CreateTopics < Base # Runs the creation process # # @param replication_factor [Integer] replication factor for Web-UI topics + # + # @note The order of creation of those topics is important. In order to support the + # zero-downtime bootstrap, we use the presence of the states topic and its initial state + # existence as an indicator that the setup went as expected. It the consumers states + # topic exists and contains needed data, it means all went as expected and that + # topics created before it also exist (as no error). def call(replication_factor) consumers_states_topic = ::Karafka::Web.config.topics.consumers.states consumers_metrics_topic = ::Karafka::Web.config.topics.consumers.metrics consumers_reports_topic = ::Karafka::Web.config.topics.consumers.reports errors_topic = ::Karafka::Web.config.topics.errors - # Create only if needed - if existing_topics_names.include?(consumers_states_topic) - exists(consumers_states_topic) - else - creating(consumers_states_topic) - # This topic needs to have one partition - ::Karafka::Admin.create_topic( - consumers_states_topic, - 1, - replication_factor, - # We care only about the most recent state, previous are irrelevant. So we can easily - # compact after one minute. We do not use this beyond the most recent collective - # state, hence it all can easily go away. We also limit the segment size to at most - # 100MB not to use more space ever. - { - 'cleanup.policy': 'compact', - 'retention.ms': 60 * 60 * 1_000, - 'segment.ms': 24 * 60 * 60 * 1_000, # 1 day - 'segment.bytes': 104_857_600 # 100MB - } - ) - created(consumers_states_topic) - end - - if existing_topics_names.include?(consumers_metrics_topic) - exists(consumers_metrics_topic) + if existing_topics_names.include?(errors_topic) + exists(errors_topic) else - creating(consumers_metrics_topic) - # This topic needs to have one partition - # Same as states - only most recent is relevant as it is a materialized state + creating(errors_topic) + # All the errors will be dispatched here + # This topic can have multiple partitions but we go with one by default. A single Ruby + # process should not crash that often and if there is an expectation of a higher volume + # of errors, this can be changed by the end user ::Karafka::Admin.create_topic( - consumers_metrics_topic, + errors_topic, 1, replication_factor, + # Remove really old errors (older than 3 months just to preserve space) { - 'cleanup.policy': 'compact', - 'retention.ms': 60 * 60 * 1_000, # 1h - 'segment.ms': 24 * 60 * 60 * 1_000, # 1 day - 'segment.bytes': 104_857_600 # 100MB + 'retention.ms': 3 * 31 * 24 * 60 * 60 * 1_000 # 3 months } ) - created(consumers_metrics_topic) + created(errors_topic) end if existing_topics_names.include?(consumers_reports_topic) @@ -81,24 +63,48 @@ def call(replication_factor) created(consumers_reports_topic) end - if existing_topics_names.include?(errors_topic) - exists(errors_topic) + if existing_topics_names.include?(consumers_metrics_topic) + exists(consumers_metrics_topic) else - creating(errors_topic) - # All the errors will be dispatched here - # This topic can have multiple partitions but we go with one by default. A single Ruby - # process should not crash that often and if there is an expectation of a higher volume - # of errors, this can be changed by the end user + creating(consumers_metrics_topic) + # This topic needs to have one partition + # Same as states - only most recent is relevant as it is a materialized state ::Karafka::Admin.create_topic( - errors_topic, + consumers_metrics_topic, 1, replication_factor, - # Remove really old errors (older than 3 months just to preserve space) { - 'retention.ms': 3 * 31 * 24 * 60 * 60 * 1_000 # 3 months + 'cleanup.policy': 'compact', + 'retention.ms': 60 * 60 * 1_000, # 1h + 'segment.ms': 24 * 60 * 60 * 1_000, # 1 day + 'segment.bytes': 104_857_600 # 100MB } ) - created(errors_topic) + created(consumers_metrics_topic) + end + + # Create only if needed + if existing_topics_names.include?(consumers_states_topic) + exists(consumers_states_topic) + else + creating(consumers_states_topic) + # This topic needs to have one partition + ::Karafka::Admin.create_topic( + consumers_states_topic, + 1, + replication_factor, + # We care only about the most recent state, previous are irrelevant. So we can easily + # compact after one minute. We do not use this beyond the most recent collective + # state, hence it all can easily go away. We also limit the segment size to at most + # 100MB not to use more space ever. + { + 'cleanup.policy': 'compact', + 'retention.ms': 60 * 60 * 1_000, + 'segment.ms': 24 * 60 * 60 * 1_000, # 1 day + 'segment.bytes': 104_857_600 # 100MB + } + ) + created(consumers_states_topic) end end