Skip to content

Commit

Permalink
Messages reproduce and surrounding link (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Sep 5, 2023
1 parent 3f11022 commit 95b7b01
Show file tree
Hide file tree
Showing 17 changed files with 336 additions and 10 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
- **[Feature]** Introduce `bundle exec karafka-web migrate` that can be used to bootstrap the proper topics and initial data in environments where Karafka Web-UI should be used but is missing the initial setup.
- **[Feature]** Replace `decrypt` with a pluggable API for deciding which topics data to display.
- **[Feature]** Make sure, that the karafka server process that is materializing UI states is not processing any data having unsupported (newer) schemas. This state will be also visible in the status page.
- **[Feature]** Provide ability to reproduce a given message to the same topic partition with all the details from the per message explorer view.
- **[Feature]** Provide "surrounding" navigation link that allows to view the given message in the context of its surrounding. Useful for debugging of failures where the batch context may be relevant.
- [Improvement] Support pattern subscriptions details in the routing view both by displaying the pattern as well as expanded routing details.
- [Improvement] Collect total number of threads per process for the process details view.
- [Improvement] Normalize naming of metrics to better reflect what they do (in reports and in the Web UI).
Expand Down Expand Up @@ -46,6 +48,7 @@
- [Improvement] Limit segment size for Web topics to ensure, that Web-UI does not drain resources.
- [Improvement] Introduce cookie based sessions management for future usage.
- [Improvement] Introduce config validation.
- [Improvement] Provide flash messages support.
- [Fix] Return 402 status instead of 500 on Pro features that are not available in OSS.
- [Fix] Fix a case where errors would not be visible without Rails due to the `String#first` usage.
- [Fix] Fix a case where live-poll would be disabled but would still update data.
Expand Down
2 changes: 1 addition & 1 deletion lib/karafka/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def enable!
# Inject correct settings for the Web-UI sessions plugin based on the user configuration
# We cannot configure this automatically like other Roda plugins because it requires safe
# custom values provided by our user
Ui::Base.plugin(:sessions, **config.ui.sessions.to_h)
App.engine.plugin(:sessions, **config.ui.sessions.to_h)
end
end
end
Expand Down
8 changes: 6 additions & 2 deletions lib/karafka/web/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ class << self
# @param env [Hash] Rack env
# @param block [Proc] Rack block
def call(env, &block)
handler = Karafka.pro? ? Ui::Pro::App : Ui::App
handler.call(env, &block)
engine.call(env, &block)
end

# @return [Class] regular or pro Web engine
def engine
::Karafka.pro? ? Ui::Pro::App : Ui::App
end
end
end
Expand Down
24 changes: 19 additions & 5 deletions lib/karafka/web/ui/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ class Base < Roda
)
plugin :render_each
plugin :partials
# The secret here will be reconfigured after Web UI configuration setup
# This is why we assign here a random value as it will have to be changed by the end
# user to make the Web UI work.
plugin :sessions, key: '_karafka_session', secret: SecureRandom.hex(64)
plugin :route_csrf
end

plugin :render, escape: true, engine: 'erb'
plugin :run_append_slash
plugin :error_handler
plugin :not_found
plugin :hooks
plugin :flash
plugin :path
# The secret here will be reconfigured after Web UI configuration setup
# This is why we assign here a random value as it will have to be changed by the end
# user to make the Web UI work.
plugin :sessions, key: '_karafka_session', secret: SecureRandom.hex(64)
plugin :route_csrf

# Based on
# https://github.com/sidekiq/sidekiq/blob/ae6ca119/lib/sidekiq/web/application.rb#L8
Expand All @@ -57,6 +59,14 @@ class Base < Roda
render_response(result)
end

# Redirect either to referer back or to the desired path
handle_block_result Controllers::Responses::Redirect do |result|
# Map redirect flashes (if any) to Roda flash messages
result.flashes.each { |key, value| flash[key] = value }

response.redirect result.back? ? request.referer : root_path(result.path)
end

# Display appropriate error specific to a given error type
plugin :error_handler, classes: [
::Rdkafka::RdkafkaError,
Expand All @@ -80,6 +90,10 @@ class Base < Roda
view 'shared/exceptions/not_found'
end

before do
check_csrf!
end

# Allows us to build current path with additional params
# @param query_data [Hash] query params we want to add to the current path
path :current do |query_data = {}|
Expand Down
9 changes: 9 additions & 0 deletions lib/karafka/web/ui/controllers/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ def respond
)
end

# Builds a redirect data object with assigned flashes (if any)
# @param path [String, Symbol] relative (without root path) path where we want to be
# redirected or `:back` to use referer back
# @param flashes [Hash] hash where key is the flash type and value is the message
# @return [Responses::Redirect] redirect result
def redirect(path = :back, flashes = {})
Responses::Redirect.new(path, flashes)
end

# Initializes the expected pagination engine and assigns expected arguments
# @param args Any arguments accepted by the selected pagination engine
def paginate(*args)
Expand Down
29 changes: 29 additions & 0 deletions lib/karafka/web/ui/controllers/responses/redirect.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module Karafka
module Web
module Ui
module Controllers
module Responses
# Representation of a redirect response with optional flash messages
class Redirect
attr_reader :path, :flashes

# @param path [String, Symbol] relative (without root path) path where we want to be
# redirected or `:back` to use referer back
# @param flashes [Hash] hash where key is the flash type and value is the message
def initialize(path = :back, flashes = {})
@path = path
@flashes = flashes
end

# @return [Boolean] are we going back via referer and not explicit path
def back?
@path == :back
end
end
end
end
end
end
end
5 changes: 3 additions & 2 deletions lib/karafka/web/ui/helpers/paths_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ def asset_path(local_path)
# if we want to go to the topic root
# @param offset [Integer, nil] offset of particular message or nil of we want to just go
# to the partition root
# @param action [String, nil] specific routed action or nil
# @return [String] path to the expected location
def explorer_path(topic_name = nil, partition_id = nil, offset = nil)
root_path(*['explorer', topic_name, partition_id, offset].compact)
def explorer_path(topic_name = nil, partition_id = nil, offset = nil, action = nil)
root_path(*['explorer', topic_name, partition_id, offset, action].compact)
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions lib/karafka/web/ui/pro/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class App < Ui::Base
controller.recent(topic_id, partition_id)
end

r.get String, Integer, Integer, 'surrounding' do |topic_id, partition_id, offset|
controller.surrounding(topic_id, partition_id, offset)
end

r.get String, 'recent' do |topic_id|
controller.recent(topic_id, nil)
end
Expand Down Expand Up @@ -118,6 +122,14 @@ class App < Ui::Base
end
end

r.on 'messages' do
controller = Controllers::Messages.new(params)

r.post String, Integer, Integer, 'republish' do |topic_id, partition_id, offset|
controller.republish(topic_id, partition_id, offset)
end
end

r.get 'health' do
controller = Controllers::Health.new(params)
controller.index
Expand Down
34 changes: 34 additions & 0 deletions lib/karafka/web/ui/pro/controllers/explorer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,40 @@ def recent(topic_id, partition_id)
show(topic_id, recent.partition, recent.offset, paginate: false)
end

# Computes a page on which the given offset is in the middle of the page (if possible)
# Useful often when debugging to be able to quickly jump to the historical location
# of message and its surrounding to understand failure
#
# @param topic_id [String]
# @param partition_id [Integer]
# @param offset [Integer] offset of the message we want to display
def surrounding(topic_id, partition_id, offset)
watermark_offsets = Ui::Models::WatermarkOffsets.find(topic_id, partition_id)

raise ::Karafka::Web::Errors::Ui::NotFoundError if offset < watermark_offsets.low
raise ::Karafka::Web::Errors::Ui::NotFoundError if offset >= watermark_offsets.high

# Assume we start from this offset
shift = 0
elements = 0

# Position the offset as close to the middle of offset based page as possible
::Karafka::Web.config.ui.per_page.times do
break if elements >= ::Karafka::Web.config.ui.per_page

elements += 1 if offset + shift < watermark_offsets.high

if offset - shift > watermark_offsets.low
shift += 1
elements += 1
end
end

target = offset - shift

redirect("explorer/#{topic_id}/#{partition_id}?offset=#{target}")
end

private

# Fetches current page data
Expand Down
62 changes: 62 additions & 0 deletions lib/karafka/web/ui/pro/controllers/messages.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

# This Karafka component is a Pro component under a commercial license.
# This Karafka component is NOT licensed under LGPL.
#
# All of the commercial components are present in the lib/karafka/pro directory of this
# repository and their usage requires commercial license agreement.
#
# Karafka has also commercial-friendly license, commercial support and commercial components.
#
# By sending a pull request to the pro components, you are agreeing to transfer the copyright of
# your code to Maciej Mensfeld.

module Karafka
module Web
module Ui
module Pro
module Controllers
# Controller for working with messages
# While part of messages operations is done via explorer (exploring), this controller
# handles other cases not related to viewing data
class Messages < Ui::Controllers::Base
# Takes a requested message content and republishes it again
#
# @param topic_id [String]
# @param partition_id [Integer]
# @param offset [Integer] offset of the message we want to republish
def republish(topic_id, partition_id, offset)
message = Ui::Models::Message.find(topic_id, partition_id, offset)

delivery = ::Karafka.producer.produce_sync(
topic: topic_id,
partition: partition_id,
payload: message.raw_payload,
headers: message.headers,
key: message.key
)

redirect(
:back,
success: reproduced(message, delivery)
)
end

private

# @param message [Karafka::Messages::Message]
# @param delivery [Rdkafka::Producer::DeliveryReport]
# @return [String] flash message about message reproducing
def reproduced(message, delivery)
<<~MSG
Message with offset #{message.offset}
has been sent again to #{message.topic}##{message.partition}
and received offset #{delivery.offset}.
MSG
end
end
end
end
end
end
end
19 changes: 19 additions & 0 deletions lib/karafka/web/ui/pro/views/explorer/show.erb
Original file line number Diff line number Diff line change
@@ -1,4 +1,23 @@
<%
republish_path = root_path('messages', @message.topic, @message.partition, @message.offset, 'republish')
surrounding_path = explorer_path(@message.topic, @message.partition, @message.offset, 'surrounding')
%>

<div class="container">
<div class="row mb-0">
<div class="col-sm-12 text-end">
<a href="<%= surrounding_path %>" class="btn btn-secondary btn-sm float-end ms-1">
&#8651;
Surrounding
</a>

<form action="<%= republish_path %>" method="post" class="confirm-action float-end">
<%== csrf_tag(republish_path) %>
<input type="submit" value="&#10227; Republish" class="btn btn-primary btn-sm"/>
</form>
</div>
</div>

<div class="row mb-4">
<div class="col-sm-12">
<h5 class="mb-2">
Expand Down
22 changes: 22 additions & 0 deletions lib/karafka/web/ui/public/javascripts/application.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,27 @@ function redirectToPartition() {
});
}

// Binds to links and forms to make sure action is what user wants
function bindActionsConfirmations() {
var elements = document.getElementsByClassName('confirm-action')
var confirmation = 'Are you sure?'

for (var i = 0; i < elements.length; i++) {
let element = elements[i]
let action = 'click'

if (element.nodeName === 'FORM') {
action = 'submit'
}

element.addEventListener(action, function(event) {
if (!window.confirm(confirmation)) {
event.preventDefault();
}
})
}
}

function addListeners() {
bindPollingButtonClick();
setLivePollButton();
Expand All @@ -37,6 +58,7 @@ function addListeners() {
redirectToPartition();
manageTabs();
manageCharts();
bindActionsConfirmations();
displayUi();
}

Expand Down
2 changes: 2 additions & 0 deletions lib/karafka/web/ui/views/layout.erb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
<main>
<%== partial 'shared/become_pro' %>

<%== partial 'shared/flashes' %>

<%== partial 'shared/content', locals: { content: yield } %>
</main>

Expand Down
9 changes: 9 additions & 0 deletions lib/karafka/web/ui/views/shared/_flashes.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<% flash.each do |name, message| %>
<div class="mb-4">
<div class="container">
<p class="alert alert-<%= name %>">
<%= message %>
</p>
</div>
</div>
<% end %>
Loading

0 comments on commit 95b7b01

Please sign in to comment.