From 93c97904f2d6449341f8216e372a3f0e88c7de6d Mon Sep 17 00:00:00 2001 From: Aaron Elkiss Date: Wed, 29 May 2024 10:58:28 -0400 Subject: [PATCH 1/2] DEV-1125: Run cleanup duplicate holdings under sidekiq * Queues batches of clusters to clean up as jobs * Move logic to lib; add phctl command * Don't re-save the cluster's holdings if there are no changes (optimization) --- bin/cleanup_duplicate_holdings.rb | 73 ----------------------- lib/cleanup_duplicate_holdings.rb | 78 +++++++++++++++++++++++++ lib/phctl.rb | 5 ++ lib/sidekiq_jobs.rb | 1 + spec/cleanup_duplicate_holdings_spec.rb | 65 ++++++++++++++++----- 5 files changed, 136 insertions(+), 86 deletions(-) delete mode 100644 bin/cleanup_duplicate_holdings.rb create mode 100644 lib/cleanup_duplicate_holdings.rb diff --git a/bin/cleanup_duplicate_holdings.rb b/bin/cleanup_duplicate_holdings.rb deleted file mode 100644 index 03e4436d..00000000 --- a/bin/cleanup_duplicate_holdings.rb +++ /dev/null @@ -1,73 +0,0 @@ -# frozen_string_literal: true - -require 'services' -require 'cluster' - -Services.mongo! - -# Iterates through clusters, removing any duplicate holdings and logging its progress. -# -# Usage: bundle exec ruby bin/cleanup_duplicate_holdings. - -class CleanupDuplicateHoldings - LOG_INTERVAL = 60 - - def initialize - @clusters_processed = 0 - @old_holdings_processed = 0 - @new_holdings_processed = 0 - @last_log_time = Time.now - Services.logger.info("Starting cluster deduplication") - end - - def run - Cluster.each do |cluster| - Services.logger.debug("Cleaning cluster #{cluster._id}: #{cluster.ocns}") - old_count = cluster.holdings.count - new_count = remove_duplicate_holdings(cluster) - update_progress(old_count, new_count) - end - - Services.logger.info("Finished cleaning clusters") - log_progress - end - - private - - def update_progress(old_count, new_count) - @clusters_processed += 1 - @old_holdings_processed += old_count - @new_holdings_processed += new_count - - log_progress if hasnt_logged_recently? - end - - def log_progress - Services.logger.info("Processed #{@clusters_processed} clusters") - Services.logger.info("Processed #{@old_holdings_processed} old holdings") - Services.logger.info("Kept #{@new_holdings_processed} holdings") - @last_log_time = Time.now - end - - def hasnt_logged_recently? - !@last_log_time or (Time.now - @last_log_time > LOG_INTERVAL) - end - - # Returns the count of deduped holdings - def remove_duplicate_holdings(cluster) - cluster.holdings = dedupe_holdings(cluster) - cluster.save - cluster.holdings.count - end - - def dedupe_holdings(cluster) - cluster.holdings.group_by(&:update_key).map do |update_key,holdings_group| - latest_date = holdings_group.map(&:date_received).max - holdings_group.reject { |h| h.date_received != latest_date } - end.flatten - end -end - -if __FILE__ == $PROGRAM_NAME - CleanupDuplicateHoldings.new.run -end diff --git a/lib/cleanup_duplicate_holdings.rb b/lib/cleanup_duplicate_holdings.rb new file mode 100644 index 00000000..8fb616c5 --- /dev/null +++ b/lib/cleanup_duplicate_holdings.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +require "services" +require "cluster" +require "sidekiq" +require "milemarker" + +class CleanupDuplicateHoldings + include Sidekiq::Job + JOB_CLUSTER_COUNT = 100 + + def self.queue_jobs(job_cluster_count: JOB_CLUSTER_COUNT) + milemarker = Milemarker.new(name: "Queue clusters", batch_size: 50_000) + milemarker.logger = Services.logger + + # Iterate over batches of clusters of size job_cluster_count, and queue + # a job for each batch. + # + # We need the "each" in there to turn it into an iterable + # such that "each_slice" will work performantly (i.e. without trying to + # fetch all the results) + Cluster.only(:id).each.each_slice(job_cluster_count) do |batch| + cluster_ids = batch.map(&:_id).map(&:to_s) + # Queues a job of this class + perform_async(cluster_ids) + + milemarker.incr(job_cluster_count).on_batch do + Services.logger.info milemarker.batch_line + end + end + + milemarker.log_final_line + end + + def initialize + @clusters_processed = 0 + @old_holdings_processed = 0 + @new_holdings_processed = 0 + @last_log_time = Time.now + end + + def perform(cluster_ids) + cluster_ids.each do |cluster_id| + cluster = Cluster.find(_id: cluster_id) + Services.logger.info("Cleaning cluster #{cluster._id}: #{cluster.ocns}") + old_count = cluster.holdings.count + remove_duplicate_holdings(cluster) + new_count = cluster.holdings.count + update_progress(old_count, new_count) + Thread.pass + end + + Services.logger.info("Processed #{@clusters_processed} clusters, #{@old_holdings_processed} old holdings, kept #{@new_holdings_processed} holdings") + end + + private + + def update_progress(old_count, new_count) + @clusters_processed += 1 + @old_holdings_processed += old_count + @new_holdings_processed += new_count + end + + # Returns the count of deduped holdings + def remove_duplicate_holdings(cluster) + rejected_count = 0 + + deduped_holdings = cluster.holdings.group_by(&:update_key).map do |update_key, holdings_group| + latest_date = holdings_group.map(&:date_received).max + holdings_group.reject { |h| h.date_received != latest_date && rejected_count += 1 } + end.flatten + + if rejected_count > 0 + cluster.holdings = deduped_holdings + cluster.save + end + end +end diff --git a/lib/phctl.rb b/lib/phctl.rb index 84168e25..4e0152f7 100644 --- a/lib/phctl.rb +++ b/lib/phctl.rb @@ -57,6 +57,11 @@ class Cleanup < JobCommand def holdings(inst, date) run_job(Jobs::Cleanup::Holdings, inst, date) end + + desc "duplicate_holdings", "Cleans duplicate holdings from all clusters" + def duplicate_holdings + CleanupDuplicateHoldings.queue_jobs + end end class Concordance < JobCommand diff --git a/lib/sidekiq_jobs.rb b/lib/sidekiq_jobs.rb index 29d84812..ce0d2d0a 100644 --- a/lib/sidekiq_jobs.rb +++ b/lib/sidekiq_jobs.rb @@ -1,5 +1,6 @@ require "services" require "sidekiq" +require "cleanup_duplicate_holdings" require "concordance_processing" require "loader/cluster_loader" require "loader/file_loader" diff --git a/spec/cleanup_duplicate_holdings_spec.rb b/spec/cleanup_duplicate_holdings_spec.rb index 9c816962..827d212d 100644 --- a/spec/cleanup_duplicate_holdings_spec.rb +++ b/spec/cleanup_duplicate_holdings_spec.rb @@ -2,9 +2,9 @@ require "spec_helper" -require_relative "../bin/cleanup_duplicate_holdings" +require "cleanup_duplicate_holdings" -RSpec.describe CleanupDuplicateHoldings do +RSpec.describe CleanupDuplicateHoldings, type: :sidekiq_fake do def set_blank_fields(holding, value) [:n_enum=, :n_chron=, :condition=, :issn=].each do |setter| holding.public_send(setter, value) @@ -24,14 +24,39 @@ def nil_fields_dupe_holding(h) end end + def cluster_ids + Cluster.all.map(&:id).map(&:to_s) + end + before(:each) { Cluster.each(&:delete) } - describe "run" do + describe "self.queue_jobs" do + it "queues a job of the expected size for each batch of holdings" do + 10.times { create(:cluster) } + + expect { described_class.queue_jobs(job_cluster_count: 5) } + .to change(described_class.jobs, :size).by(2) + + expect(described_class.jobs[0]["args"][0].length).to eq(5) + expect(described_class.jobs[1]["args"][0].length).to eq(5) + end + + it "queues jobs for all clusters" do + 10.times { create(:cluster) } + + described_class.queue_jobs(job_cluster_count: 5) + + cluster_ids = described_class.jobs.map { |job| job["args"][0] }.flatten + expect(cluster_ids).to eq(Cluster.all.map(&:id).map(&:to_s)) + end + end + + describe "perform" do it "cleans up duplicate holdings" do holding = blank_fields_holding create(:cluster, holdings: [holding, nil_fields_dupe_holding(holding)]) - described_class.new.run + described_class.new.perform(cluster_ids) expect(Cluster.first.holdings.count).to eq(1) end @@ -45,7 +70,7 @@ def nil_fields_dupe_holding(h) another_holding ]) - described_class.new.run + described_class.new.perform(cluster_ids) cluster_holdings = Cluster.first.holdings expect(cluster_holdings.length).to eq(2) @@ -62,7 +87,7 @@ def nil_fields_dupe_holding(h) nil_fields_dupe_holding(upenn_holding) ]) - described_class.new.run + described_class.new.perform(cluster_ids) expect(Cluster.first.holdings.count).to eq(2) expect(Cluster.first.holdings.map(&:organization).uniq).to contain_exactly("umich", "upenn") @@ -76,7 +101,7 @@ def nil_fields_dupe_holding(h) nil_fields_dupe_holding(holding) ]) - described_class.new.run + described_class.new.perform(cluster_ids) expect(Cluster.first.holdings.count).to eq(1) end @@ -94,7 +119,7 @@ def nil_fields_dupe_holding(h) nil_fields_dupe_holding(holding2) ]) - described_class.new.run + described_class.new.perform(cluster_ids) expect(Cluster.count).to eq(2) Cluster.each do |c| @@ -108,24 +133,24 @@ def nil_fields_dupe_holding(h) holding = blank_fields_holding create(:cluster, holdings: [holding, nil_fields_dupe_holding(holding)]) - described_class.new.run + described_class.new.perform(cluster_ids) expect(Cluster.first.holdings[0].date_received).to eq(Date.today) end - it "logs what it's working on at DEBUG level" do + it "logs what it's working on" do Services.register(:logger) { Logger.new($stdout, level: Logger::DEBUG) } create(:cluster) - expect { described_class.new.run }.to output(/#{Cluster.first.ocns.first}/).to_stdout + expect { described_class.new.perform(cluster_ids) }.to output(/#{Cluster.first.ocns.first}/).to_stdout end it "logs how many clusters it's worked on" do Services.register(:logger) { Logger.new($stdout, level: Logger::INFO) } create(:cluster) - expect { described_class.new.run }.to output(/Processed 1 cluster/).to_stdout + expect { described_class.new.perform(cluster_ids) }.to output(/Processed.* 1 cluster/).to_stdout end it "logs how many holdings it's worked on" do @@ -134,7 +159,21 @@ def nil_fields_dupe_holding(h) holding = blank_fields_holding create(:cluster, holdings: [holding, nil_fields_dupe_holding(holding)]) - expect { described_class.new.run }.to output(/Processed 2 old holdings.*Kept 1 holding/m).to_stdout + expect { described_class.new.perform(cluster_ids) }.to output(/Processed.* 2 old holdings.* kept 1 holding/).to_stdout + end + + it "doesn't save the cluster when there are no duplicate holdings" do + create(:cluster, + holdings: [ + build(:holding), + build(:holding) + ]) + + orig_time = Cluster.first.last_modified + + described_class.new.perform(cluster_ids) + + expect(Cluster.first.last_modified).to eq(orig_time) end end end From 30789667d08d05bf010305c4ed1e1a2b5a0b3add Mon Sep 17 00:00:00 2001 From: Aaron Elkiss Date: Wed, 29 May 2024 10:57:45 -0400 Subject: [PATCH 2/2] Fix sidekiq web * add rack-session gem (needed by rack 3) * update standalone code from https://github.com/sidekiq/sidekiq/wiki/Monitoring#standalone * fix sidekiq_web service in docker-compose.yml --- Gemfile | 1 + Gemfile.lock | 3 +++ bin/sidekiq_web.ru | 26 +++++++++++--------------- docker-compose.yml | 2 ++ 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/Gemfile b/Gemfile index 5c522d18..e1f9f3ec 100644 --- a/Gemfile +++ b/Gemfile @@ -20,6 +20,7 @@ gem "sequel" gem "thor" gem "zinzout" gem "puma" +gem "rack-session" gem "sidekiq" gem "sidekiq-batch", git: "https://github.com/breamware/sidekiq-batch" diff --git a/Gemfile.lock b/Gemfile.lock index eb0825e7..34e9d826 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -101,6 +101,8 @@ GEM nio4r (~> 2.0) racc (1.8.0) rack (3.0.11) + rack-session (2.0.0) + rack (>= 3.0.0) rainbow (3.1.1) rdoc (6.6.3.1) psych (>= 4.0.0) @@ -212,6 +214,7 @@ DEPENDENCIES pry puma push_metrics! + rack-session rgl rspec rspec-sidekiq diff --git a/bin/sidekiq_web.ru b/bin/sidekiq_web.ru index 6d43d2f7..a69c4d34 100644 --- a/bin/sidekiq_web.ru +++ b/bin/sidekiq_web.ru @@ -1,18 +1,14 @@ -require "sidekiq" -require "sidekiq/web" -require_relative "../config/initializers/sidekiq" - -SESSION_KEY = ".session.key" - -if !File.exist?(SESSION_KEY) - require "securerandom" - File.open(SESSION_KEY, "w") do |f| - f.write(SecureRandom.hex(32)) - end -end +# This is sidekiq_web.ru +# Run with `bundle exec rackup sidekiq_web.ru` or similar. -Services.redis_config[:size] = 1 - -use Rack::Session::Cookie, secret: File.read(SESSION_KEY), same_site: true, max_age: 86400 +require "securerandom" +require "rack/session" +require "sidekiq/web" +# In a multi-process deployment, all Web UI instances should share +# this secret key so they can all decode the encrypted browser cookies +# and provide a working session. +# Rails does this in /config/initializers/secret_token.rb +secret_key = SecureRandom.hex(32) +use Rack::Session::Cookie, secret: secret_key, same_site: true, max_age: 86400 run Sidekiq::Web diff --git a/docker-compose.yml b/docker-compose.yml index 190d001d..655a288a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,6 +49,8 @@ services: <<: *holdings-container-defaults restart: always volumes: + - .:/usr/src/app + - gem_cache:/gems - ./example/datasets:/tmp/datasets command: bundle exec sidekiq -c 1 -r ./lib/sidekiq_jobs.rb