Skip to content

Commit

Permalink
Add SubjectSet completeness metrics (#3649)
Browse files Browse the repository at this point in the history
* add a subject set workflow counter

to count the number of retired subjects for a workflow

* add a completeness json attribute to subject_set

* spec out a subject et completeness worker

* fix linter warnings

* correctly spell the spec file name

* add json field atomic update

* match perform method arg order

* ignore rspec change block syntax in linter

* add specs for clobbering and range clamping

* use the readreplica via feature flag

* add subejct_set completness to serializer

* add frozen string literal magic comment

* recalculate the set completeness on subject add / remove

hook into subject add / remove events to ensure we have an update count of completeness

* run the set completness worker on each retirement event

* recalculate subject set completeness when unretiring
  • Loading branch information
camallen authored Aug 4, 2021
1 parent 91fa533 commit 85af7c8
Show file tree
Hide file tree
Showing 15 changed files with 302 additions and 8 deletions.
4 changes: 4 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ Layout/HashAlignment:
Layout/SpaceAroundOperators:
Exclude:
- 'spec/factories/workflows.rb'
Lint/AmbiguousBlockAssociation:
Exclude:
- "spec/**/*"

Metrics/BlockLength:
Exclude:
Expand All @@ -32,6 +35,7 @@ Metrics/MethodLength:
Rails/SkipsModelValidations:
Whitelist:
- update_all
- update_column
- touch

RSpec/MultipleMemoizedHelpers:
Expand Down
11 changes: 10 additions & 1 deletion app/controllers/api/v1/subject_sets_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ def update_links
notify_subject_selector(subject_set)
reset_subject_counts(subject_set.id)

subject_set.subject_sets_workflows.pluck(:workflow_id).each do |workflow_id|
subject_set.workflow_ids.each do |workflow_id|
UnfinishWorkflowWorker.perform_async(workflow_id)
# recalculate the set's completeness values if we've added new subjects
SubjectSetCompletenessWorker.perform_async(subject_set.id, workflow_id) if params.key?(:subjects)
duration = params[:subjects].length * 4 # Pad times to prevent backlogs
params[:subjects].each do |subject_id|
SubjectWorkflowStatusCreateWorker.perform_in(duration.seconds*rand, subject_id, workflow_id)
Expand Down Expand Up @@ -67,6 +69,13 @@ def destroy_links
notify_subject_selector(subject_set)
reset_subject_counts(subject_set.id)
reset_workflow_retired_counts(subject_set.workflow_ids)

if params['link_relation'] == 'subjects'
# recalculate the set's completeness values if # we're removing subjects
subject_set.workflow_ids.each do |workflow_id|
SubjectSetCompletenessWorker.perform_async(subject_set.id, workflow_id)
end
end
end
end

Expand Down
23 changes: 23 additions & 0 deletions app/counters/subject_set_workflow_counter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

class SubjectSetWorkflowCounter
attr_reader :subject_set_id, :workflow_id

def initialize(subject_set_id, workflow_id)
@subject_set_id = subject_set_id
@workflow_id = workflow_id
end

# count the number of subjects in this subject set
# that have been retired for this workflow
def retired_subjects
scope =
SubjectWorkflowStatus
.where(workflow: workflow_id)
.joins(workflow: :subject_sets)
.where(subject_sets: { id: subject_set_id })
.retired

scope.count
end
end
2 changes: 1 addition & 1 deletion app/serializers/subject_set_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class SubjectSetSerializer
include CachedSerializer

attributes :id, :display_name, :set_member_subjects_count, :metadata,
:created_at, :updated_at, :href
:created_at, :updated_at, :href, :completeness

can_include :project, :workflows
can_sort_by :display_name
Expand Down
5 changes: 5 additions & 0 deletions app/workers/retirement_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ def perform(status_id, force_retire=false, reason="classification_count")
NotifySubjectSelectorOfRetirementWorker.perform_async(
status.subject_id, status.workflow_id
)
# recalculate the subject set completeness metric
# for the workflow and each linked subject's subject_set
status.subject.subject_set_ids.each do |subject_set_id|
SubjectSetCompletenessWorker.perform_async(subject_set_id, status.workflow_id)
end
end
rescue ActiveRecord::RecordNotFound
end
Expand Down
42 changes: 42 additions & 0 deletions app/workers/subject_set_completeness_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

class SubjectSetCompletenessWorker
include Sidekiq::Worker
using Refinements::RangeClamping

sidekiq_options queue: :data_low

sidekiq_options congestion: {
interval: 30, # N jobs (below) in each 30s
max_in_interval: 1, # only 1 job every interval above
min_delay: 60, # next job can run 60s after the last one
reject_with: :reschedule, # reschedule the job to run later (avoid db pressure) so we don't eventually run all the jobs and the stored metrics eventually align
key: ->(subject_set_id, workflow_id) { "subject_set_#{subject_set_id}_completeness_#{workflow_id}_worker" }
}

sidekiq_options lock: :until_executing

def perform(subject_set_id, workflow_id)
subject_set = SubjectSet.find(subject_set_id)
workflow = Workflow.find_without_json_attrs(workflow_id)

# find the count of all retired subjects, for a known subject set, in the context of a known workflow
# using the read replica if the feature flag is enabled
retired_subjects_completeness = 0.0
DatabaseReplica.read('subject_set_completeness_from_read_replica') do
retired_subjects_count = SubjectSetWorkflowCounter.new(subject_set.id, workflow.id).retired_subjects * 1.0
total_subjects_count = subject_set.set_member_subjects_count * 1.0
# calculate and clamp the completeness value between 0.0 and 1.0, i.e. 0 to 100%
retired_subjects_completeness = (0.0..1.0).clamp(retired_subjects_count / total_subjects_count)
end

# store these per workflow completeness metric in a json object keyed by the workflow id
# use the atomic DB json operator to avoid clobbering data in the jsonb attribute by other updates
# https://www.postgresql.org/docs/11/functions-json.html
SubjectSet.where(id: subject_set.id).update_all(
"completeness = jsonb_set(completeness, '{#{workflow_id}}', '#{retired_subjects_completeness}', true)"
)
rescue ActiveRecord::RecordNotFound
# avoid running sql count queries for subject sets and workflows we can't find
end
end
35 changes: 33 additions & 2 deletions app/workers/unretire_subject_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,46 @@

class UnretireSubjectWorker
include Sidekiq::Worker
attr_reader :workflow_id, :subject_ids

sidekiq_options queue: :high

def perform(workflow_id, subject_ids)
return unless Workflow.exists?(id: workflow_id)
@workflow_id = workflow_id
@subject_ids = subject_ids

unretire_all_known_subjects

recalculate_subject_set_completion_metrics

SubjectWorkflowStatus.where.not(retired_at: nil).where(workflow_id: workflow_id, subject_id: subject_ids).update_all(retired_at: nil,
retirement_reason: nil)
RefreshWorkflowStatusWorker.perform_async(workflow_id)
NotifySubjectSelectorOfChangeWorker.perform_async(workflow_id)
end

private

def unretire_all_known_subjects
SubjectWorkflowStatus
.where.not(retired_at: nil)
.where(workflow_id: workflow_id, subject_id: subject_ids)
.update_all(retired_at: nil, retirement_reason: nil)
end

def recalculate_subject_set_completion_metrics
linked_subject_sets.each do |subject_set|
SubjectSetCompletenessWorker.perform_async(subject_set.id, workflow_id)
end
end

# find all subject sets for all subject_ids in this workflow
def linked_subject_sets
SubjectSet
.joins(:workflows)
.where(workflows: { id: workflow_id })
.joins(:set_member_subjects)
.where(set_member_subjects: { subject_id: subject_ids })
.select(:id)
.distinct
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

class AddWorkflowCompletenessToSubjectSet < ActiveRecord::Migration
def change
# since PG v11+ we can add a new column and a default at the same time
# https://github.com/ankane/strong_migrations#bad-1
# https://www.2ndquadrant.com/en/blog/add-new-table-column-default-value-postgresql-11/
add_column :subject_sets, :completeness, :jsonb, default: {}
end
end
7 changes: 5 additions & 2 deletions db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
--

-- Dumped from database version 11.11 (Debian 11.11-1.pgdg90+1)
-- Dumped by pg_dump version 11.12 (Debian 11.12-0+deb10u1)
-- Dumped by pg_dump version 11.12

SET statement_timeout = 0;
SET lock_timeout = 0;
Expand Down Expand Up @@ -1280,7 +1280,8 @@ CREATE TABLE public.subject_sets (
set_member_subjects_count integer DEFAULT 0 NOT NULL,
metadata jsonb DEFAULT '{}'::jsonb,
lock_version integer DEFAULT 0,
expert_set boolean
expert_set boolean,
completeness jsonb DEFAULT '{}'::jsonb
);


Expand Down Expand Up @@ -4808,3 +4809,5 @@ INSERT INTO schema_migrations (version) VALUES ('20210226173243');

INSERT INTO schema_migrations (version) VALUES ('20210602210437');

INSERT INTO schema_migrations (version) VALUES ('20210729152047');

18 changes: 18 additions & 0 deletions spec/controllers/api/v1/subject_sets_controller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@
run_update_links
end

it 'queues the subject_set completeness worker' do
allow(SubjectSetCompletenessWorker).to receive(:perform_async)
run_update_links
linked_workflow_id = resource.workflow_ids.first
expect(SubjectSetCompletenessWorker)
.to have_received(:perform_async)
.with(resource.id, linked_workflow_id)
end

it "should queue the SMS metadata worker" do
fake_sms_ids = %w[1318 1319 1320 1321]
import_result_double = instance_double(
Expand Down Expand Up @@ -388,6 +397,15 @@
end

it_behaves_like "cleans up the linked set member subjects"

it 'queues the subject_set completeness worker' do
allow(SubjectSetCompletenessWorker).to receive(:perform_async)
delete_resources
linked_workflow_id = subject_set.workflow_ids.first
expect(SubjectSetCompletenessWorker)
.to have_received(:perform_async)
.with(subject_set.id, linked_workflow_id)
end
end
end
end
31 changes: 31 additions & 0 deletions spec/counters/subject_set_workflow_counter_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

require 'spec_helper'

describe SubjectSetWorkflowCounter do
let(:subject_set) { create(:subject_set_with_subjects, num_workflows: 1, num_subjects: 2) }
let(:workflow) { subject_set.workflows.first }
let(:counter) { described_class.new(subject_set.id, workflow.id) }

describe 'retired_subjects' do
it 'returns 0 if there are none' do
expect(counter.retired_subjects).to eq(0)
end

context 'with retired_subjects' do
let(:subject_to_retire) { subject_set.subjects.first }

before do
SubjectWorkflowStatus.create(
workflow_id: workflow.id,
subject_id: subject_to_retire.id,
retired_at: Time.now.utc
)
end

it 'returns 1' do
expect(counter.retired_subjects).to eq(1)
end
end
end
end
14 changes: 14 additions & 0 deletions spec/serializers/subject_set_serializer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,18 @@
create(:subject_set, workflows: subject_set.workflows)
end
end

describe 'serialized attributes' do
let(:expected_attributes) do
%i[id display_name set_member_subjects_count metadata created_at updated_at href completeness]
end
let(:serialized_attributes_no_links) do
result = described_class.single({}, SubjectSet.where(id: subject_set.id), {})
result.except(:links).keys
end

it 'serializes the correct attributes' do
expect(serialized_attributes_no_links).to match_array(expected_attributes)
end
end
end
10 changes: 10 additions & 0 deletions spec/workers/retirement_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@
worker.perform(status.id)
end

it 'queues the subject_set completeness worker' do
allow(SubjectSetCompletenessWorker).to receive(:perform_async)
linked_subject_set_id = status.subject.subject_set_ids.first
linked_workflow_id = status.workflow.id
worker.perform(status.id)
expect(SubjectSetCompletenessWorker)
.to have_received(:perform_async)
.with(linked_subject_set_id, linked_workflow_id)
end

it "should call the publish retire event worker" do
expect(PublishRetirementEventWorker)
.to receive(:perform_async)
Expand Down
Loading

0 comments on commit 85af7c8

Please sign in to comment.