From 67c819e980db4b923973768f283440f77c0d90a3 Mon Sep 17 00:00:00 2001 From: Muhammad Nawzad Date: Sun, 21 Jan 2024 15:18:05 +0300 Subject: [PATCH] Add support for Mongoid --- lib/inboxable/configuration.rb | 8 +++++--- lib/inboxable/polling_receiver_worker.rb | 19 +++++++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/lib/inboxable/configuration.rb b/lib/inboxable/configuration.rb index 2cba715..14c130a 100644 --- a/lib/inboxable/configuration.rb +++ b/lib/inboxable/configuration.rb @@ -1,15 +1,17 @@ module Inboxable class Configuration - ALLOWED_ORMS = %i[activerecord].freeze + ALLOWED_ORMS = %i[activerecord mongoid].freeze attr_accessor :orm def initialize + raise Error, 'Sidekiq is not available. Unfortunately, sidekiq must be available for Inboxable to work' unless Object.const_defined?('Sidekiq') + raise Error, 'Inboxable Gem uses the sidekiq-cron Gem. Make sure you add it to your project' unless Object.const_defined?('Sidekiq::Cron') raise Error, 'Inboxable Gem only supports Rails but you application does not seem to be a Rails app' unless Object.const_defined?('Rails') raise Error, 'Inboxable Gem only support Rails version 7 and newer' if Rails::VERSION::MAJOR < 7 - Sidekiq::Options[:cron_poll_interval] = 5 - Sidekiq::Cron::Job.create(name: 'InboxablePollingReceiver', cron: '*/5 * * * * *', class: 'Inboxable::PollingReceiverWorker') + Sidekiq::Options[:cron_poll_interval] = ENV.fetch('INBOXABLE__CRON_POLL_INTERVAL', 5).to_i + Sidekiq::Cron::Job.create(name: 'InboxablePollingReceiver', cron: ENV.fetch('INBOXABLE__CRON', '*/5 * * * * *'), class: 'Inboxable::PollingReceiverWorker') end def orm=(orm) diff --git a/lib/inboxable/polling_receiver_worker.rb b/lib/inboxable/polling_receiver_worker.rb index 6bc9a0e..a37a441 100644 --- a/lib/inboxable/polling_receiver_worker.rb +++ b/lib/inboxable/polling_receiver_worker.rb @@ -3,16 +3,31 @@ class PollingReceiverWorker include Sidekiq::Job def perform - perform_activerecord + Inboxable.configuration.orm == :activerecord ? perform_activerecord : perform_mongoid end def perform_activerecord - Inbox.pending.where(last_attempted_at: [..Time.zone.now, nil]).find_in_batches(batch_size: ENV.fetch('INBOXABLE__BATCH_SIZE', 100).to_i).each do |batch| + Inbox.pending + .where(last_attempted_at: [..Time.zone.now, nil]) + .find_in_batches(batch_size: ENV.fetch('INBOXABLE__BATCH_SIZE', 100).to_i) + .each do |batch| batch.each do |inbox| inbox.processor_class_name.constantize.perform_async(inbox.id) inbox.update(last_attempted_at: 1.minute.from_now, status: :processing, allow_publish: false) end end end + + def perform_mongoid + batch_size = ENV.fetch('INBOXABLE__BATCH_SIZE', 100).to_i + Inbox.pending + .any_of({ last_attempted_at: ..Time.zone.now }, { last_attempted_at: nil }) + .each_slice(batch_size) do |batch| + batch.each do |inbox| + inbox.processor_class_name.constantize.perform_async(inbox.idempotency_key) + inbox.update(last_attempted_at: 1.minute.from_now, status: :processing, allow_publish: false) + end + end + end end end