From 85af7c8f4afc9ca55632715648d5e39ef459064c Mon Sep 17 00:00:00 2001
From: Campbell Allen <campbell.allen@gmail.com>
Date: Wed, 4 Aug 2021 18:17:56 +0100
Subject: [PATCH] Add SubjectSet completeness metrics (#3649)

* add a subject set workflow counter

to count the number of retired subjects for a workflow

* add a completeness json attribute to subject_set

* spec out a subject et completeness worker

* fix linter warnings

* correctly spell the spec file name

* add json field atomic update

* match perform method arg order

* ignore rspec change block syntax in linter

* add specs for clobbering and range clamping

* use the readreplica via feature flag

* add subejct_set completness to serializer

* add frozen string literal magic comment

* recalculate the set completeness on subject add / remove

hook into subject add / remove events to ensure we have an update count of completeness

* run the set completness worker on each retirement event

* recalculate subject set completeness when unretiring
---
 .rubocop.yml                                  |  4 +
 .../api/v1/subject_sets_controller.rb         | 11 ++-
 app/counters/subject_set_workflow_counter.rb  | 23 +++++
 app/serializers/subject_set_serializer.rb     |  2 +-
 app/workers/retirement_worker.rb              |  5 ++
 .../subject_set_completeness_worker.rb        | 42 +++++++++
 app/workers/unretire_subject_worker.rb        | 35 +++++++-
 ...dd_workflow_completeness_to_subject_set.rb | 10 +++
 db/structure.sql                              |  7 +-
 .../api/v1/subject_sets_controller_spec.rb    | 18 ++++
 .../subject_set_workflow_counter_spec.rb      | 31 +++++++
 .../subject_set_serializer_spec.rb            | 14 +++
 spec/workers/retirement_worker_spec.rb        | 10 +++
 .../subject_set_completeness_worker_spec.rb   | 89 +++++++++++++++++++
 spec/workers/unretire_subject_worker_spec.rb  |  9 +-
 15 files changed, 302 insertions(+), 8 deletions(-)
 create mode 100644 app/counters/subject_set_workflow_counter.rb
 create mode 100644 app/workers/subject_set_completeness_worker.rb
 create mode 100644 db/migrate/20210729152047_add_workflow_completeness_to_subject_set.rb
 create mode 100644 spec/counters/subject_set_workflow_counter_spec.rb
 create mode 100644 spec/workers/subject_set_completeness_worker_spec.rb

diff --git a/.rubocop.yml b/.rubocop.yml
index d35b2288c..c3d36ceb0 100644
--- a/.rubocop.yml
+++ b/.rubocop.yml
@@ -19,6 +19,9 @@ Layout/HashAlignment:
 Layout/SpaceAroundOperators:
   Exclude:
     - 'spec/factories/workflows.rb'
+Lint/AmbiguousBlockAssociation:
+  Exclude:
+    - "spec/**/*"
 
 Metrics/BlockLength:
   Exclude:
@@ -32,6 +35,7 @@ Metrics/MethodLength:
 Rails/SkipsModelValidations:
   Whitelist:
   - update_all
+  - update_column
   - touch
 
 RSpec/MultipleMemoizedHelpers:
diff --git a/app/controllers/api/v1/subject_sets_controller.rb b/app/controllers/api/v1/subject_sets_controller.rb
index 5fd9fbfc1..0f9df6dc3 100644
--- a/app/controllers/api/v1/subject_sets_controller.rb
+++ b/app/controllers/api/v1/subject_sets_controller.rb
@@ -27,8 +27,10 @@ def update_links
       notify_subject_selector(subject_set)
       reset_subject_counts(subject_set.id)
 
-      subject_set.subject_sets_workflows.pluck(:workflow_id).each do |workflow_id|
+      subject_set.workflow_ids.each do |workflow_id|
         UnfinishWorkflowWorker.perform_async(workflow_id)
+        # recalculate the set's completeness values if we've added new subjects
+        SubjectSetCompletenessWorker.perform_async(subject_set.id, workflow_id) if params.key?(:subjects)
         duration = params[:subjects].length * 4 # Pad times to prevent backlogs
         params[:subjects].each do |subject_id|
           SubjectWorkflowStatusCreateWorker.perform_in(duration.seconds*rand, subject_id, workflow_id)
@@ -67,6 +69,13 @@ def destroy_links
       notify_subject_selector(subject_set)
       reset_subject_counts(subject_set.id)
       reset_workflow_retired_counts(subject_set.workflow_ids)
+
+      if params['link_relation'] == 'subjects'
+        # recalculate the set's completeness values if # we're removing subjects
+        subject_set.workflow_ids.each do |workflow_id|
+          SubjectSetCompletenessWorker.perform_async(subject_set.id, workflow_id)
+        end
+      end
     end
   end
 
diff --git a/app/counters/subject_set_workflow_counter.rb b/app/counters/subject_set_workflow_counter.rb
new file mode 100644
index 000000000..5658583f8
--- /dev/null
+++ b/app/counters/subject_set_workflow_counter.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+class SubjectSetWorkflowCounter
+  attr_reader :subject_set_id, :workflow_id
+
+  def initialize(subject_set_id, workflow_id)
+    @subject_set_id = subject_set_id
+    @workflow_id = workflow_id
+  end
+
+  # count the number of subjects in this subject set
+  # that have been retired for this workflow
+  def retired_subjects
+    scope =
+      SubjectWorkflowStatus
+      .where(workflow: workflow_id)
+      .joins(workflow: :subject_sets)
+      .where(subject_sets: { id: subject_set_id })
+      .retired
+
+    scope.count
+  end
+end
diff --git a/app/serializers/subject_set_serializer.rb b/app/serializers/subject_set_serializer.rb
index 9001ca84b..850d0ee55 100644
--- a/app/serializers/subject_set_serializer.rb
+++ b/app/serializers/subject_set_serializer.rb
@@ -4,7 +4,7 @@ class SubjectSetSerializer
   include CachedSerializer
 
   attributes :id, :display_name, :set_member_subjects_count, :metadata,
-    :created_at, :updated_at, :href
+             :created_at, :updated_at, :href, :completeness
 
   can_include :project, :workflows
   can_sort_by :display_name
diff --git a/app/workers/retirement_worker.rb b/app/workers/retirement_worker.rb
index 375804b2e..fc4412dca 100644
--- a/app/workers/retirement_worker.rb
+++ b/app/workers/retirement_worker.rb
@@ -14,6 +14,11 @@ def perform(status_id, force_retire=false, reason="classification_count")
       NotifySubjectSelectorOfRetirementWorker.perform_async(
         status.subject_id, status.workflow_id
       )
+      # recalculate the subject set completeness metric
+      # for the workflow and each linked subject's subject_set
+      status.subject.subject_set_ids.each do |subject_set_id|
+        SubjectSetCompletenessWorker.perform_async(subject_set_id, status.workflow_id)
+      end
     end
   rescue ActiveRecord::RecordNotFound
   end
diff --git a/app/workers/subject_set_completeness_worker.rb b/app/workers/subject_set_completeness_worker.rb
new file mode 100644
index 000000000..8898aaa41
--- /dev/null
+++ b/app/workers/subject_set_completeness_worker.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+class SubjectSetCompletenessWorker
+  include Sidekiq::Worker
+  using Refinements::RangeClamping
+
+  sidekiq_options queue: :data_low
+
+  sidekiq_options congestion: {
+    interval: 30, # N jobs (below) in each 30s
+    max_in_interval: 1, # only 1 job every interval above
+    min_delay: 60, # next job can run 60s after the last one
+    reject_with: :reschedule, # reschedule the job to run later (avoid db pressure) so we don't eventually run all the jobs and the stored metrics eventually align
+    key: ->(subject_set_id, workflow_id) { "subject_set_#{subject_set_id}_completeness_#{workflow_id}_worker" }
+  }
+
+  sidekiq_options lock: :until_executing
+
+  def perform(subject_set_id, workflow_id)
+    subject_set = SubjectSet.find(subject_set_id)
+    workflow = Workflow.find_without_json_attrs(workflow_id)
+
+    # find the count of all retired subjects, for a known subject set, in the context of a known workflow
+    # using the read replica if the feature flag is enabled
+    retired_subjects_completeness = 0.0
+    DatabaseReplica.read('subject_set_completeness_from_read_replica') do
+      retired_subjects_count = SubjectSetWorkflowCounter.new(subject_set.id, workflow.id).retired_subjects * 1.0
+      total_subjects_count = subject_set.set_member_subjects_count * 1.0
+      # calculate and clamp the completeness value between 0.0 and 1.0, i.e. 0 to 100%
+      retired_subjects_completeness = (0.0..1.0).clamp(retired_subjects_count / total_subjects_count)
+    end
+
+    # store these per workflow completeness metric in a json object keyed by the workflow id
+    # use the atomic DB json operator to avoid clobbering data in the jsonb attribute by other updates
+    # https://www.postgresql.org/docs/11/functions-json.html
+    SubjectSet.where(id: subject_set.id).update_all(
+      "completeness = jsonb_set(completeness, '{#{workflow_id}}', '#{retired_subjects_completeness}', true)"
+    )
+  rescue ActiveRecord::RecordNotFound
+    # avoid running sql count queries for subject sets and workflows we can't find
+  end
+end
diff --git a/app/workers/unretire_subject_worker.rb b/app/workers/unretire_subject_worker.rb
index e73c32fe4..60dae81e7 100644
--- a/app/workers/unretire_subject_worker.rb
+++ b/app/workers/unretire_subject_worker.rb
@@ -2,15 +2,46 @@
 
 class UnretireSubjectWorker
   include Sidekiq::Worker
+  attr_reader :workflow_id, :subject_ids
 
   sidekiq_options queue: :high
 
   def perform(workflow_id, subject_ids)
     return unless Workflow.exists?(id: workflow_id)
+    @workflow_id = workflow_id
+    @subject_ids = subject_ids
+
+    unretire_all_known_subjects
+
+    recalculate_subject_set_completion_metrics
 
-    SubjectWorkflowStatus.where.not(retired_at: nil).where(workflow_id: workflow_id, subject_id: subject_ids).update_all(retired_at: nil,
-                                                                                                                         retirement_reason: nil)
     RefreshWorkflowStatusWorker.perform_async(workflow_id)
     NotifySubjectSelectorOfChangeWorker.perform_async(workflow_id)
   end
+
+  private
+
+  def unretire_all_known_subjects
+    SubjectWorkflowStatus
+      .where.not(retired_at: nil)
+      .where(workflow_id: workflow_id, subject_id: subject_ids)
+      .update_all(retired_at: nil, retirement_reason: nil)
+  end
+
+  def recalculate_subject_set_completion_metrics
+    linked_subject_sets.each do |subject_set|
+      SubjectSetCompletenessWorker.perform_async(subject_set.id, workflow_id)
+    end
+  end
+
+  # find all subject sets for all subject_ids in this workflow
+  def linked_subject_sets
+    SubjectSet
+      .joins(:workflows)
+      .where(workflows: { id: workflow_id })
+      .joins(:set_member_subjects)
+      .where(set_member_subjects: { subject_id: subject_ids })
+      .select(:id)
+      .distinct
+  end
 end
diff --git a/db/migrate/20210729152047_add_workflow_completeness_to_subject_set.rb b/db/migrate/20210729152047_add_workflow_completeness_to_subject_set.rb
new file mode 100644
index 000000000..8ed1c21a6
--- /dev/null
+++ b/db/migrate/20210729152047_add_workflow_completeness_to_subject_set.rb
@@ -0,0 +1,10 @@
+# frozen_string_literal: true
+
+class AddWorkflowCompletenessToSubjectSet < ActiveRecord::Migration
+  def change
+    # since PG v11+ we can add a new column and a default at the same time
+    # https://github.com/ankane/strong_migrations#bad-1
+    # https://www.2ndquadrant.com/en/blog/add-new-table-column-default-value-postgresql-11/
+    add_column :subject_sets, :completeness, :jsonb, default: {}
+  end
+end
diff --git a/db/structure.sql b/db/structure.sql
index bfd9b21b7..d19750e4c 100644
--- a/db/structure.sql
+++ b/db/structure.sql
@@ -3,7 +3,7 @@
 --
 
 -- Dumped from database version 11.11 (Debian 11.11-1.pgdg90+1)
--- Dumped by pg_dump version 11.12 (Debian 11.12-0+deb10u1)
+-- Dumped by pg_dump version 11.12
 
 SET statement_timeout = 0;
 SET lock_timeout = 0;
@@ -1280,7 +1280,8 @@ CREATE TABLE public.subject_sets (
     set_member_subjects_count integer DEFAULT 0 NOT NULL,
     metadata jsonb DEFAULT '{}'::jsonb,
     lock_version integer DEFAULT 0,
-    expert_set boolean
+    expert_set boolean,
+    completeness jsonb DEFAULT '{}'::jsonb
 );
 
 
@@ -4808,3 +4809,5 @@ INSERT INTO schema_migrations (version) VALUES ('20210226173243');
 
 INSERT INTO schema_migrations (version) VALUES ('20210602210437');
 
+INSERT INTO schema_migrations (version) VALUES ('20210729152047');
+
diff --git a/spec/controllers/api/v1/subject_sets_controller_spec.rb b/spec/controllers/api/v1/subject_sets_controller_spec.rb
index 03225c987..fef3c3bd4 100644
--- a/spec/controllers/api/v1/subject_sets_controller_spec.rb
+++ b/spec/controllers/api/v1/subject_sets_controller_spec.rb
@@ -154,6 +154,15 @@
         run_update_links
       end
 
+      it 'queues the subject_set completeness worker' do
+        allow(SubjectSetCompletenessWorker).to receive(:perform_async)
+        run_update_links
+        linked_workflow_id = resource.workflow_ids.first
+        expect(SubjectSetCompletenessWorker)
+          .to have_received(:perform_async)
+          .with(resource.id, linked_workflow_id)
+      end
+
       it "should queue the SMS metadata worker" do
         fake_sms_ids = %w[1318 1319 1320 1321]
         import_result_double = instance_double(
@@ -388,6 +397,15 @@
       end
 
       it_behaves_like "cleans up the linked set member subjects"
+
+      it 'queues the subject_set completeness worker' do
+        allow(SubjectSetCompletenessWorker).to receive(:perform_async)
+        delete_resources
+        linked_workflow_id = subject_set.workflow_ids.first
+        expect(SubjectSetCompletenessWorker)
+          .to have_received(:perform_async)
+          .with(subject_set.id, linked_workflow_id)
+      end
     end
   end
 end
diff --git a/spec/counters/subject_set_workflow_counter_spec.rb b/spec/counters/subject_set_workflow_counter_spec.rb
new file mode 100644
index 000000000..315d9a49c
--- /dev/null
+++ b/spec/counters/subject_set_workflow_counter_spec.rb
@@ -0,0 +1,31 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+describe SubjectSetWorkflowCounter do
+  let(:subject_set) { create(:subject_set_with_subjects, num_workflows: 1, num_subjects: 2) }
+  let(:workflow) { subject_set.workflows.first }
+  let(:counter) { described_class.new(subject_set.id, workflow.id) }
+
+  describe 'retired_subjects' do
+    it 'returns 0 if there are none' do
+      expect(counter.retired_subjects).to eq(0)
+    end
+
+    context 'with retired_subjects' do
+      let(:subject_to_retire) { subject_set.subjects.first }
+
+      before do
+        SubjectWorkflowStatus.create(
+          workflow_id: workflow.id,
+          subject_id: subject_to_retire.id,
+          retired_at: Time.now.utc
+        )
+      end
+
+      it 'returns 1' do
+        expect(counter.retired_subjects).to eq(1)
+      end
+    end
+  end
+end
diff --git a/spec/serializers/subject_set_serializer_spec.rb b/spec/serializers/subject_set_serializer_spec.rb
index 9a643d3dc..bb7ba08d9 100644
--- a/spec/serializers/subject_set_serializer_spec.rb
+++ b/spec/serializers/subject_set_serializer_spec.rb
@@ -16,4 +16,18 @@
       create(:subject_set, workflows: subject_set.workflows)
     end
   end
+
+  describe 'serialized attributes' do
+    let(:expected_attributes) do
+      %i[id display_name set_member_subjects_count metadata created_at updated_at href completeness]
+    end
+    let(:serialized_attributes_no_links) do
+      result = described_class.single({}, SubjectSet.where(id: subject_set.id), {})
+      result.except(:links).keys
+    end
+
+    it 'serializes the correct attributes' do
+      expect(serialized_attributes_no_links).to match_array(expected_attributes)
+    end
+  end
 end
diff --git a/spec/workers/retirement_worker_spec.rb b/spec/workers/retirement_worker_spec.rb
index e582a471a..fd84a2019 100644
--- a/spec/workers/retirement_worker_spec.rb
+++ b/spec/workers/retirement_worker_spec.rb
@@ -54,6 +54,16 @@
         worker.perform(status.id)
       end
 
+      it 'queues the subject_set completeness worker' do
+        allow(SubjectSetCompletenessWorker).to receive(:perform_async)
+        linked_subject_set_id = status.subject.subject_set_ids.first
+        linked_workflow_id = status.workflow.id
+        worker.perform(status.id)
+        expect(SubjectSetCompletenessWorker)
+          .to have_received(:perform_async)
+          .with(linked_subject_set_id, linked_workflow_id)
+      end
+
       it "should call the publish retire event worker" do
         expect(PublishRetirementEventWorker)
           .to receive(:perform_async)
diff --git a/spec/workers/subject_set_completeness_worker_spec.rb b/spec/workers/subject_set_completeness_worker_spec.rb
new file mode 100644
index 000000000..9dfc1dbec
--- /dev/null
+++ b/spec/workers/subject_set_completeness_worker_spec.rb
@@ -0,0 +1,89 @@
+# frozen_string_literal: true
+
+require 'spec_helper'
+
+RSpec.describe SubjectSetCompletenessWorker do
+  subject(:worker) { described_class.new }
+
+  let(:fake_wf_id) { '-1' }
+  let(:subject_set) do
+    create(:subject_set_with_subjects, num_workflows: 1, num_subjects: 2, completeness: { fake_wf_id => 0.8 })
+  end
+  let(:workflow) { subject_set.workflows.first }
+
+  describe '#perform' do
+    it 'ignores an unknown subject set' do
+      expect { worker.perform(-1, workflow.id) }.not_to raise_error
+    end
+
+    it 'ignores an unknown workflow' do
+      expect { worker.perform(subject_set.id, -1) }.not_to raise_error
+    end
+
+    context 'when there is no retired data for the workflow' do
+      it 'stores 0.0 in the subject_set#completeness json store' do
+        expect {
+          worker.perform(subject_set.id, workflow.id)
+        }.to change {
+          subject_set.reload.completeness[workflow.id.to_s]
+        }.from(nil).to(0.0)
+      end
+    end
+
+    context 'when half the set is retired for the workflow' do
+      let(:counter_double) { instance_double(SubjectSetWorkflowCounter, retired_subjects: 1) }
+
+      before do
+        allow(SubjectSetWorkflowCounter).to receive(:new).and_return(counter_double)
+      end
+
+      it 'stores 0.5 in the subject_set#completeness json store' do
+        expect {
+          worker.perform(subject_set.id, workflow.id)
+        }.to change {
+          subject_set.reload.completeness[workflow.id.to_s]
+        }.from(nil).to(0.5)
+      end
+
+      it 'does not clobber existing per workflow completeness data' do
+        expect {
+          worker.perform(subject_set.id, workflow.id)
+        }.not_to change {
+          subject_set.reload.completeness[fake_wf_id]
+        }
+      end
+    end
+
+    context 'with more than 100% complete' do
+      let(:counter_double) { instance_double(SubjectSetWorkflowCounter, retired_subjects: 10) }
+
+      before do
+        allow(SubjectSetWorkflowCounter).to receive(:new).and_return(counter_double)
+      end
+
+      it 'clamps the range of completeness to 1.0 (100%)' do
+        expect {
+          worker.perform(subject_set.id, workflow.id)
+        }.to change {
+          subject_set.reload.completeness[workflow.id.to_s]
+        }.to(1.0)
+      end
+    end
+
+    context 'with less than 0% complete' do
+      let(:counter_double) { instance_double(SubjectSetWorkflowCounter, retired_subjects: -1) }
+
+      before do
+        allow(SubjectSetWorkflowCounter).to receive(:new).and_return(counter_double)
+      end
+
+      it 'clamps the range of completeness to 0.0 (0%)' do
+        expect {
+          worker.perform(subject_set.id, workflow.id)
+        }.to change {
+          subject_set.reload.completeness[workflow.id.to_s]
+        }.to(0.0)
+      end
+    end
+  end
+end
diff --git a/spec/workers/unretire_subject_worker_spec.rb b/spec/workers/unretire_subject_worker_spec.rb
index 97bc1da91..564425bb8 100644
--- a/spec/workers/unretire_subject_worker_spec.rb
+++ b/spec/workers/unretire_subject_worker_spec.rb
@@ -6,14 +6,13 @@
   let(:worker) { described_class.new }
   let(:workflow) { create(:workflow_with_subjects, num_sets: 1) }
   let(:subject1) { workflow.subjects.first }
-  let(:sms) { subject.set_member_subject.first }
-  let(:set) { sms.subject_set }
   let(:status) { create(:subject_workflow_status, subject: subject1, workflow: workflow, retired_at: 1.day.ago, retirement_reason: 'other') }
 
   describe '#perform' do
     before do
       allow(RefreshWorkflowStatusWorker).to receive(:perform_async)
       allow(NotifySubjectSelectorOfChangeWorker).to receive(:perform_async)
+      allow(SubjectSetCompletenessWorker).to receive(:perform_async)
     end
 
     context 'when subjects are already retired' do
@@ -38,6 +37,12 @@
         worker.perform(workflow.id, [subject1.id])
         expect(NotifySubjectSelectorOfChangeWorker).to have_received(:perform_async).with(workflow.id)
       end
+
+      it 'queues the subject_set completeness worker' do
+        linked_subject_set_id = subject1.subject_set_ids.first
+        worker.perform(workflow.id, [subject1.id])
+        expect(SubjectSetCompletenessWorker).to have_received(:perform_async).with(linked_subject_set_id, workflow.id)
+      end
     end
 
     it 'handles unknown workflow' do