Skip to content

Commit

Permalink
Data transfers tracking + migrations (#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Nov 17, 2023
1 parent 93cb03c commit 7e84ad1
Show file tree
Hide file tree
Showing 71 changed files with 7,564 additions and 804 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Karafka Web changelog

## 0.8.0 (Unreleased)
- **[Feature]** Introduce states migrations for seamless upgrades.
- **[Feature]** Introduce "Data transfers" chart with data received and data sent to the cluster.
- **[Feature]** Introduce ability to download raw payloads.
- **[Feature]** Introduce ability to download deserialized message payload as JSON.
- [Enhancement] Report last poll time for each subscription group.
Expand Down
8 changes: 6 additions & 2 deletions lib/karafka/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ def enable!
end
end

require_relative 'web/inflector'

loader = Zeitwerk::Loader.new

# Make sure pro is not loaded unless Pro
loader.ignore(Karafka::Web.gem_root.join('lib/karafka/web/ui/pro'))

Expand All @@ -62,9 +65,10 @@ def enable!
loader = Zeitwerk::Loader.new
end

root = File.expand_path('..', __dir__)
loader.tag = 'karafka-web'
loader.inflector = Zeitwerk::GemInflector.new("#{root}/karafka/web.rb")
# Use our custom inflector to support migrations
root = File.expand_path('..', __dir__)
loader.inflector = Karafka::Web::Inflector.new("#{root}/karafka/web.rb")
loader.push_dir(root)

loader.setup
Expand Down
12 changes: 12 additions & 0 deletions lib/karafka/web/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ module Errors
# This should never happen and if you see this, please open an issue.
ContractError = Class.new(BaseError)

# Errors specific to management
module Management
# Similar to processing error with the same name, it is raised when a critical
# incompatibility is detected.
#
# This error is raised when there was an attempt to operate on aggregated Web UI states
# that are already in a newer version that the one in the current process. We prevent
# this from happening not to corrupt the data. Please upgrade all the Web UI consumers to
# the same version
IncompatibleSchemaError = Class.new(BaseError)
end

# Processing related errors namespace
module Processing
# Raised when we try to process reports but we do not have the current state bootstrapped
Expand Down
33 changes: 33 additions & 0 deletions lib/karafka/web/inflector.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

module Karafka
module Web
# Web UI Zeitwerk Inflector that allows us to have time prefixed files with migrations, similar
# to how Rails does that.
class Inflector < Zeitwerk::GemInflector
# Checks if given path is a migration one
MIGRATION_ABSPATH_REGEXP = /migrations\/[0-9]+_(.*)/

# Checks if it is a migration file
MIGRATION_BASENAME_REGEXP = /\A[0-9]+_(.*)/

private_constant :MIGRATION_ABSPATH_REGEXP, :MIGRATION_BASENAME_REGEXP

# @param [String] basename of the file to be loaded
# @param abspath [String] absolute path of the file to be loaded
# @return [String] Constant name to be used for given file
def camelize(basename, abspath)
# If not migration directory with proper migration files, use defaults
return super unless abspath.match?(MIGRATION_ABSPATH_REGEXP)
# If base name is not of a proper name in migrations, use defaults
return super unless basename.match?(MIGRATION_BASENAME_REGEXP)

super(
# Extract only the name without the timestamp
basename.match(MIGRATION_BASENAME_REGEXP).to_a.last,
abspath
)
end
end
end
end
31 changes: 20 additions & 11 deletions lib/karafka/web/installer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ def install(replication_factor: 1)
puts
puts 'Creating necessary topics and populating state data...'
puts
Management::CreateTopics.new.call(replication_factor)
Management::Actions::CreateTopics.new.call(replication_factor)
wait_for_topics
Management::CreateInitialStates.new.call
Management::Actions::CreateInitialStates.new.call
puts
Management::ExtendBootFile.new.call
puts 'Running data migrations...'
Management::Actions::MigrateStatesData.new.call
puts
Management::Actions::ExtendBootFile.new.call
puts
puts("Installation #{green('completed')}. Have fun!")
puts
Expand All @@ -35,9 +38,12 @@ def migrate(replication_factor: 1)
puts
puts 'Creating necessary topics and populating state data...'
puts
Management::CreateTopics.new.call(replication_factor)
Management::Actions::CreateTopics.new.call(replication_factor)
wait_for_topics
Management::CreateInitialStates.new.call
Management::Actions::CreateInitialStates.new.call
puts
puts 'Running data migrations...'
Management::Actions::MigrateStatesData.new.call
puts
puts("Migration #{green('completed')}. Have fun!")
puts
Expand All @@ -49,11 +55,14 @@ def reset(replication_factor: 1)
puts
puts 'Resetting Karafka Web UI...'
puts
Management::DeleteTopics.new.call
Management::Actions::DeleteTopics.new.call
puts
Management::CreateTopics.new.call(replication_factor)
Management::Actions::CreateTopics.new.call(replication_factor)
wait_for_topics
Management::CreateInitialStates.new.call
Management::Actions::CreateInitialStates.new.call
puts
puts 'Running data migrations...'
Management::Actions::MigrateStatesData.new.call
puts
puts("Resetting #{green('completed')}. Have fun!")
puts
Expand All @@ -64,16 +73,16 @@ def uninstall
puts
puts 'Uninstalling Karafka Web UI...'
puts
Management::DeleteTopics.new.call
Management::CleanBootFile.new.call
Management::Actions::DeleteTopics.new.call
Management::Actions::CleanBootFile.new.call
puts
puts("Uninstalling #{green('completed')}. Goodbye!")
puts
end

# Enables the Web-UI in the karafka app. Sets up needed routes and listeners.
def enable!
Management::Enable.new.call
Management::Actions::Enable.new.call
end

private
Expand Down
36 changes: 36 additions & 0 deletions lib/karafka/web/management/actions/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

module Karafka
module Web
module Management
# Namespace for all the commands related to management of the Web-UI in the context of
# Karafka. It includes things like installing, creating needed topics, etc.
module Actions
# Base class for all the commands that we use to manage
class Base
include ::Karafka::Helpers::Colorize

private

# @return [String] green colored word "successfully"
def successfully
green('successfully')
end

# @return [String] green colored word "already"
def already
green('already')
end

# @return [Array<String>] topics available in the cluster
def existing_topics_names
@existing_topics_names ||= ::Karafka::Admin
.cluster_info
.topics
.map { |topic| topic[:topic_name] }
end
end
end
end
end
end
33 changes: 33 additions & 0 deletions lib/karafka/web/management/actions/clean_boot_file.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

module Karafka
module Web
module Management
module Actions
# Cleans the boot file from Karafka Web-UI details.
class CleanBootFile < Base
# Web-UI enabled code
ENABLER_CODE = ExtendBootFile::ENABLER_CODE

private_constant :ENABLER_CODE

# Removes the Web-UI boot file data
def call
karafka_rb = File.readlines(Karafka.boot_file)

if karafka_rb.any? { |line| line.include?(ENABLER_CODE) }
puts 'Updating the Karafka boot file...'
karafka_rb.delete_if { |line| line.include?(ENABLER_CODE) }

File.write(Karafka.boot_file, karafka_rb.join)
puts "Karafka boot file #{successfully} updated."
puts 'Make sure to remove configuration and other customizations as well.'
else
puts 'Karafka Web UI components not found in the boot file.'
end
end
end
end
end
end
end
77 changes: 77 additions & 0 deletions lib/karafka/web/management/actions/create_initial_states.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# frozen_string_literal: true

module Karafka
module Web
module Management
module Actions
# Creates the records needed for the Web-UI to operate.
# It creates "almost" empty states because the rest is handled via migrations
class CreateInitialStates < Base
# Whole default empty state
# This will be further migrated by the migrator
DEFAULT_STATE = {
schema_version: '0.0.0'
}.freeze

# Default metrics state
DEFAULT_METRICS = {
schema_version: '0.0.0'
}.freeze

# Creates the initial states for the Web-UI if needed (if they don't exist)
def call
if exists?(Karafka::Web.config.topics.consumers.states)
exists('consumers state')
else
creating('consumers state')
::Karafka::Web.producer.produce_sync(
topic: Karafka::Web.config.topics.consumers.states,
key: Karafka::Web.config.topics.consumers.states,
payload: DEFAULT_STATE.to_json
)
created('consumers state')
end

if exists?(Karafka::Web.config.topics.consumers.metrics)
exists('consumers metrics')
else
creating('consumers metrics')
::Karafka::Web.producer.produce_sync(
topic: Karafka::Web.config.topics.consumers.metrics,
key: Karafka::Web.config.topics.consumers.metrics,
payload: DEFAULT_METRICS.to_json
)
created('consumers metrics')
end
end

private

# @param topic [String] topic name
# @return [Boolean] true if there is already an initial record in a given topic
def exists?(topic)
!::Karafka::Admin.read_topic(topic, 0, 5).last.nil?
end

# @param type [String] type of state
# @return [String] exists message
def exists(type)
puts "Initial #{type} #{already} exists."
end

# @param type [String] type of state
# @return [String] message that the state is being created
def creating(type)
puts "Creating #{type} initial record..."
end

# @param type [String] type of state
# @return [String] message that the state was created
def created(type)
puts "Initial #{type} record #{successfully} created."
end
end
end
end
end
end
Loading

0 comments on commit 7e84ad1

Please sign in to comment.