Skip to content

Commit

Permalink
Add support for Mongoid
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammadnawzad committed Jan 21, 2024
1 parent a7d02b8 commit 67c819e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
8 changes: 5 additions & 3 deletions lib/inboxable/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
module Inboxable
class Configuration
ALLOWED_ORMS = %i[activerecord].freeze
ALLOWED_ORMS = %i[activerecord mongoid].freeze

attr_accessor :orm

def initialize
raise Error, 'Sidekiq is not available. Unfortunately, sidekiq must be available for Inboxable to work' unless Object.const_defined?('Sidekiq')
raise Error, 'Inboxable Gem uses the sidekiq-cron Gem. Make sure you add it to your project' unless Object.const_defined?('Sidekiq::Cron')
raise Error, 'Inboxable Gem only supports Rails but you application does not seem to be a Rails app' unless Object.const_defined?('Rails')
raise Error, 'Inboxable Gem only support Rails version 7 and newer' if Rails::VERSION::MAJOR < 7

Sidekiq::Options[:cron_poll_interval] = 5
Sidekiq::Cron::Job.create(name: 'InboxablePollingReceiver', cron: '*/5 * * * * *', class: 'Inboxable::PollingReceiverWorker')
Sidekiq::Options[:cron_poll_interval] = ENV.fetch('INBOXABLE__CRON_POLL_INTERVAL', 5).to_i
Sidekiq::Cron::Job.create(name: 'InboxablePollingReceiver', cron: ENV.fetch('INBOXABLE__CRON', '*/5 * * * * *'), class: 'Inboxable::PollingReceiverWorker')
end

def orm=(orm)
Expand Down
19 changes: 17 additions & 2 deletions lib/inboxable/polling_receiver_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,31 @@ class PollingReceiverWorker
include Sidekiq::Job

def perform
perform_activerecord
Inboxable.configuration.orm == :activerecord ? perform_activerecord : perform_mongoid
end

def perform_activerecord
Inbox.pending.where(last_attempted_at: [..Time.zone.now, nil]).find_in_batches(batch_size: ENV.fetch('INBOXABLE__BATCH_SIZE', 100).to_i).each do |batch|
Inbox.pending
.where(last_attempted_at: [..Time.zone.now, nil])
.find_in_batches(batch_size: ENV.fetch('INBOXABLE__BATCH_SIZE', 100).to_i)
.each do |batch|
batch.each do |inbox|
inbox.processor_class_name.constantize.perform_async(inbox.id)
inbox.update(last_attempted_at: 1.minute.from_now, status: :processing, allow_publish: false)
end
end
end

def perform_mongoid
batch_size = ENV.fetch('INBOXABLE__BATCH_SIZE', 100).to_i
Inbox.pending
.any_of({ last_attempted_at: ..Time.zone.now }, { last_attempted_at: nil })
.each_slice(batch_size) do |batch|
batch.each do |inbox|
inbox.processor_class_name.constantize.perform_async(inbox.idempotency_key)
inbox.update(last_attempted_at: 1.minute.from_now, status: :processing, allow_publish: false)
end
end
end
end
end

0 comments on commit 67c819e

Please sign in to comment.