Skip to content

Commit

Permalink
Merge pull request #461 from slovensko-digital/GO-75/jobs_priority
Browse files Browse the repository at this point in the history
GO-75 Set jobs priority
  • Loading branch information
luciajanikova authored Nov 16, 2024
2 parents 1e746ee + 3857463 commit 9172720
Show file tree
Hide file tree
Showing 29 changed files with 183 additions and 51 deletions.
43 changes: 43 additions & 0 deletions app/jobs/application_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,48 @@
class ApplicationJob < ActiveJob::Base
include Rollbar::ActiveJob

queue_as :default

attr_accessor :job_context

def set(options = {})
# :nodoc:
self.job_context = options[:job_context]
super
end

def serialize
super.merge(
job_context: job_context
)
end

def deserialize(job_data)
super
self.job_context = job_data["job_context"]
end

before_enqueue do |job|
job.job_context = Thread.current[:job_context] if Thread.current[:job_context].present?
job.queue_name = job.job_context if job.job_context.present?
end

around_perform do |job, block|
Thread.current[:job_context] = job.job_context if job.job_context.present?
block.call
ensure
Thread.current[:job_context] = nil
end

queue_with_priority do
case queue_name.to_sym
when :asap then -1000
when :default, :automation then 0
when :later then 1000
else
raise "Unable to assign default priority to a job on #{queue_name} queue"
end
end

retry_on StandardError, wait: :polynomially_longer, attempts: Float::INFINITY
end
2 changes: 0 additions & 2 deletions app/jobs/automation/message_created_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Automation
class MessageCreatedJob < ApplicationJob
queue_as :default

def perform(message)
Automation.run_rules_for(message, :message_created)
end
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/automation/message_thread_created_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Automation
class MessageThreadCreatedJob < ApplicationJob
queue_as :default

def perform(message_thread)
Automation.run_rules_for(message_thread, :message_thread_created)
end
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/govbox/check_messages_mapping_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Govbox
class CheckMessagesMappingJob < ApplicationJob
queue_as :default

def perform
unmapped_govbox_message_ids = ::Govbox::Message.joins(:folder).where.not(
::Message.select(1).joins(:thread)
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/govbox/destroy_box_data_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Govbox
class DestroyBoxDataJob < ApplicationJob
queue_as :default

def perform(box_id)
Govbox::Folder.where(box_id: box_id).find_each { |govbox_folder| govbox_folder.messages.in_batches(of: 50).destroy_all }
Govbox::Folder.where(box_id: box_id).destroy_all
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/govbox/download_message_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Govbox
class DownloadMessageJob < ApplicationJob
queue_as :default

def perform(govbox_folder, edesk_message_id, upvs_client: UpvsEnvironment.upvs_client)
edesk_api = upvs_client.api(govbox_folder.box).edesk
response_status, raw_message = edesk_api.fetch_message(edesk_message_id)
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/govbox/process_message_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

module Govbox
class ProcessMessageJob < ApplicationJob
queue_as :default

retry_on ::ApplicationRecord::FailedToAcquireLockError, wait: :polynomially_longer, attempts: Float::INFINITY

def perform(govbox_message)
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/govbox/sync_all_boxes_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Govbox
class SyncAllBoxesJob < ApplicationJob
queue_as :default

def perform
Upvs::Box.where(syncable: true).find_each do |box|
SyncBoxJob.perform_later(box)
Expand Down
3 changes: 1 addition & 2 deletions app/jobs/govbox/sync_box_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Govbox
class SyncBoxJob < ApplicationJob
queue_as :default

def perform(box, upvs_client: UpvsEnvironment.upvs_client)
raise unless box.is_a?(Upvs::Box)
return unless box.syncable?
Expand All @@ -14,6 +12,7 @@ def perform(box, upvs_client: UpvsEnvironment.upvs_client)
raw_folders = raw_folders.index_by {|f| f["id"]}
raw_folders.each_value do |folder_hash|
folder = find_or_create_folder_with_parent(folder_hash, raw_folders, box)

SyncFolderJob.perform_later(folder) unless folder.bin? || folder.drafts?
end
end
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/govbox/sync_folder_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Govbox
class SyncFolderJob < ApplicationJob
queue_as :default

def perform(folder, upvs_client: UpvsEnvironment.upvs_client, batch_size: 1000)
edesk_api = upvs_client.api(folder.box).edesk
new_messages_ids = []
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/notify_filter_subscription_job.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
class NotifyFilterSubscriptionJob < ApplicationJob
queue_as :default

include GoodJob::ActiveJobExtensions::Concurrency

good_job_control_concurrency_with(
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/reindex_and_notify_filter_subscriptions_job.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
class ReindexAndNotifyFilterSubscriptionsJob < ApplicationJob
queue_as :default

include GoodJob::ActiveJobExtensions::Concurrency

good_job_control_concurrency_with(
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/searchable/reindex_message_thread_job.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
class Searchable::ReindexMessageThreadJob < ApplicationJob
queue_as :default

include GoodJob::ActiveJobExtensions::Concurrency

good_job_control_concurrency_with(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
class Searchable::ReindexMessageThreadsWithTagIdJob < ApplicationJob
queue_as :default

def perform(tag_id)
::Searchable::MessageThread.reindex_with_tag_id(tag_id)
end
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/upvs/create_upvs_forms_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Upvs
class CreateUpvsFormsJob < ApplicationJob
queue_as :default

def perform
MessageObject.find_each do |message_object|
message_object.find_or_create_form
Expand Down
2 changes: 0 additions & 2 deletions app/jobs/upvs/download_form_related_documents_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Upvs
class DownloadFormRelatedDocumentsJob < ApplicationJob
queue_as :default

def perform(upvs_form, downloader: ::Upvs::FormRelatedDocumentsDownloader)
upvs_form_downloader = downloader.new(upvs_form)

Expand Down
2 changes: 0 additions & 2 deletions app/jobs/upvs/fetch_form_related_documents_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Upvs
class FetchFormRelatedDocumentsJob < ApplicationJob
queue_as :default

def perform(download_job: DownloadFormRelatedDocumentsJob)
Upvs::Form.find_each do |upvs_form|
download_job.perform_later(upvs_form)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def self.run(message)
message.save!

Govbox::Message.remove_delivery_notification_tag(message)
Govbox::AuthorizeDeliveryNotificationJob.perform_later(message)
Govbox::AuthorizeDeliveryNotificationJob.set(job_context: :asap).perform_later(message)

EventBus.publish(:message_delivery_authorized, message)
end
Expand Down
2 changes: 1 addition & 1 deletion app/models/govbox/submit_message_draft_action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ def self.run(message, jobs_batch: nil)
if jobs_batch
jobs_batch.add { Govbox::SubmitMessageDraftJob.perform_later(message, bulk_submit: true) }
else
Govbox::SubmitMessageDraftJob.perform_later(message)
Govbox::SubmitMessageDraftJob.set(job_context: :asap).perform_later(message)
end

message.being_submitted!
Expand Down
2 changes: 1 addition & 1 deletion app/models/upvs/box.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def self.create_with_api_connection!(params)
end

def sync
Govbox::SyncBoxJob.perform_later(self)
Govbox::SyncBoxJob.set(job_context: :asap).perform_later(self)
end

def single_recipient?
Expand Down
15 changes: 10 additions & 5 deletions config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ class Application < Rails::Application
config.active_record.encryption.key_derivation_salt = ENV['ACTIVE_RECORD_ENCRYPTION_KEY_DERIVATION_SALT']

config.active_job.queue_adapter = :good_job
config.active_job.default_queue_name = :medium_priority
config.action_mailer.deliver_later_queue_name = :high_priority
config.active_job.default_queue_name = :default
config.action_mailer.deliver_later_queue_name = :asap

config.good_job.enable_cron = true
config.good_job.smaller_number_is_higher_priority = true

if ENV['AUTO_SYNC_BOXES'] == "ON"
config.good_job.cron = {
sync_boxes: {
Expand All @@ -55,7 +57,8 @@ class Application < Rails::Application
config.good_job.cron['check_messages_mapping'] = {
cron: "30 7 * * *", # run every day at 7:30 am
class: "Govbox::CheckMessagesMappingJob",
description: "Regular job to check messages mapping"
description: "Regular job to check messages mapping",
set: { job_context: :later }
}

config.good_job.cron['check_archived_documents'] = {
Expand All @@ -68,15 +71,17 @@ class Application < Rails::Application
config.good_job.cron['fetch_fs_forms'] = {
cron: "0 */12 * * *", # run every 12 hours
class: "Fs::FetchFormsJob",
description: "Regular job to fetch Fs::Forms"
description: "Regular job to fetch Fs::Forms",
set: { job_context: :later }
}
end

if ENV['AUTO_SYNC_UPVS_FORMS'] == "ON"
config.good_job.cron['fetch_upvs_forms_related_documents'] = {
cron: "0 */12 * * *", # run every 12 hours
class: "Upvs::FetchFormRelatedDocumentsJob",
description: "Regular job to fetch Upvs::FormRelatedDocuments"
description: "Regular job to fetch Upvs::FormRelatedDocuments",
set: { job_context: :later }
}
end

Expand Down
6 changes: 3 additions & 3 deletions db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions test/fixtures/message_object_data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ two:
message_object: ssd_main_general_two_form
blob: MyText

main_draft:
message_object: ssd_main_draft_form
blob: <GeneralAgenda xmlns="http://schemas.gov.sk/form/App.GeneralAgenda/1.9" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><subject>predmet</subject><text>text</text></GeneralAgenda>

draft_two:
message_object: ssd_main_general_draft_two_form
blob: <GeneralAgenda xmlns="http://schemas.gov.sk/form/App.GeneralAgenda/1.9" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><subject>predmet</subject><text>text</text></GeneralAgenda>
Expand All @@ -24,6 +20,10 @@ empty_draft:
message_object: ssd_main_empty_draft_form
blob: <GeneralAgenda xmlns="http://schemas.gov.sk/form/App.GeneralAgenda/1.9" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><subject></subject><text></text></GeneralAgenda>

ssd_main_draft_form_data:
message_object: ssd_main_draft_form
blob: <GeneralAgenda xmlns="http://schemas.gov.sk/form/App.GeneralAgenda/1.9" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><subject>predmet</subject><text>text</text></GeneralAgenda>

fs_one:
message_object: ssd_main_fs_one_form
blob: <n0:ElectronicOfficialDocument xmlns:n0="http://schemas.gov.sk/form/42499500.FS_EUD_v1_0.sk/1.6"> <n0:Sender> <n0:PersonData> <n0:CorporateBody> <n0:FinancialAdministrationAuthority>Daňový úrad Bratislava</n0:FinancialAdministrationAuthority> </n0:CorporateBody> <n0:PhysicalAddress> <n0:AddressLine>Ševčenkova 32, 850 00 Bratislava</n0:AddressLine> </n0:PhysicalAddress> </n0:PersonData> </n0:Sender> <n0:Document> <n0:DocumentReference>000000000/2022</n0:DocumentReference> <n0:ContactPerson>Mgr. X Y</n0:ContactPerson> <n0:PhoneNumber>02/00000000</n0:PhoneNumber> <n0:EmailAddress>[email protected]</n0:EmailAddress> <n0:IssuePlace>Bratislava</n0:IssuePlace> <n0:IssueDate>2022-04-19</n0:IssueDate> <n0:Subject>Rozhodnutie</n0:Subject> </n0:Document></n0:ElectronicOfficialDocument>
6 changes: 6 additions & 0 deletions test/fixtures/message_objects.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,9 @@ ssd_main_empty_draft_form:
name: MyString
mimetype: MyString
object_type: FORM

ssd_main_draft_form:
message: ssd_main_draft
name: MyString
mimetype: application/x-eform-xml
object_type: FORM
6 changes: 5 additions & 1 deletion test/fixtures/messages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ ssd_main_draft:
metadata:
status: created
correlation_id: <%= SecureRandom.uuid %>
recipient_uri: ico://sk/12345678
posp_id: App.GeneralAgenda
posp_version: 1.9
message_type: App.GeneralAgenda
author: basic

ssd_main_issue_one:
Expand Down Expand Up @@ -311,7 +315,7 @@ solver_main_delivery_notification_two:
delivered_at: <%= DateTime.current %>
metadata:
delivery_notification:
delivery_period_end_at: 2023-07-04T21:59:59.000Z,
delivery_period_end_at: 2030-07-04T21:59:59.000Z,
delivery_period: 15
consignment:
message_id: x7800b40-44b1-4012-ae78-774de6457cc2
Expand Down
Loading

0 comments on commit 9172720

Please sign in to comment.