diff --git a/app/models/classification_counts/hourly_workflow_classification_count.rb b/app/models/classification_counts/hourly_workflow_classification_count.rb new file mode 100644 index 0000000..04cd787 --- /dev/null +++ b/app/models/classification_counts/hourly_workflow_classification_count.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +module ClassificationCounts + class HourlyWorkflowClassificationCount < ApplicationRecord + self.table_name = 'hourly_classification_count_per_workflow' + attribute :classification_count, :integer + + def readonly? + true + end + end +end diff --git a/app/queries/count_classifications.rb b/app/queries/count_classifications.rb index 2abd588..d75c094 100644 --- a/app/queries/count_classifications.rb +++ b/app/queries/count_classifications.rb @@ -13,7 +13,25 @@ def call(params={}) scoped = @counts scoped = filter_by_workflow_id(scoped, params[:workflow_id]) scoped = filter_by_project_id(scoped, params[:project_id]) - filter_by_date_range(scoped, params[:start_date], params[:end_date]) + # Because of how the FE, calls out to this endpoint when querying for a project's workflow's classifications count + # And because of our use of Real Time Aggregates + # Querying the DailyClassificationCountByWorkflow becomes not as performant + # Because we are limited in resources, we do the following mitigaion for ONLY querying workflow classification counts: + # 1. Create a New HourlyClassificationCountByWorkflow which is RealTime and Create a Data Retention for this new aggregate (this should limit the amount of data the query planner has to sift through) + # 2. Turn off Real Time aggreation for the DailyClassificationCount + # 3. For workflow classification count queries that include the current date's counts, we query current date's counts via the HourlyClassificationCountByWorkflow and query the DailyClassificationCountByWorkflow for everything before the current date's + + if params[:workflow_id].present? + if end_date_includes_today?(params[:end_date]) + scoped_upto_yesterday = filter_by_date_range(scoped, params[:start_date], Date.yesterday.to_s) + scoped = include_today_to_scoped(scoped_upto_yesterday, params[:workflow_id], params[:period]) + else + scoped = filter_by_date_range(scoped, params[:start_date], params[:end_date]) + end + else + scoped = filter_by_date_range(scoped, params[:start_date], params[:end_date]) + end + scoped end private @@ -22,6 +40,72 @@ def initial_scope(relation, period) relation.select(select_and_time_bucket_by(period, 'classification')).group('period').order('period') end + def include_today_to_scoped(scoped_upto_yesterday, workflow_id, period) + period = 'year' if period.nil? + todays_classifications = current_date_workflow_classifications(workflow_id) + return scoped_upto_yesterday if todays_classifications.blank? + + if scoped_upto_yesterday.blank? + # append new entry where period is start of the period + todays_classifications[0].period = start_of_current_period(period).to_time.utc + return todays_classifications + end + + most_recent_date_from_scoped = scoped_upto_yesterday[-1].period.to_date + + # If period=week, month, or year, the current date could be part of that week, month or year; + # we check if the current date is part of the period + # if so, we add the count to the most recent period pulled from db + # if not, we append as a new entry for the current period + if today_part_of_recent_period?(most_recent_date_from_scoped, period) + add_todays_counts_to_recent_period_counts(scoped_upto_yesterday, todays_classifications) + else + todays_classifications[0].period = start_of_current_period(period).to_time.utc + append_today_to_scoped(scoped_upto_yesterday, todays_classifications) + end + end + + def start_of_current_period(period) + today = Date.today + case period + when 'day' + today + when 'week' + # Returns Monday of current week + today.at_beginning_of_week + when 'month' + today.at_beginning_of_month + when 'year' + today.at_beginning_of_year + end + end + + def today_part_of_recent_period?(most_recent_date, period) + most_recent_date == start_of_current_period(period) + end + + def append_today_to_scoped(count_records_up_to_yesterday, todays_count) + count_records_up_to_yesterday + todays_count + end + + def add_todays_counts_to_recent_period_counts(count_records_up_to_yesterday, todays_count) + current_period_counts = count_records_up_to_yesterday[-1].count + todays_count[0].count + count_records_up_to_yesterday[-1].count = current_period_counts + count_records_up_to_yesterday + end + + def current_date_workflow_classifications(workflow_id) + current_day_str = Date.today.to_s + current_hourly_classifications = ClassificationCounts::HourlyWorkflowClassificationCount.select("time_bucket('1 day', hour) AS period, SUM(classification_count)::integer AS count").group('period').order('period').where("hour >= '#{current_day_str}'") + filter_by_workflow_id(current_hourly_classifications, workflow_id) + end + + def end_date_includes_today?(end_date) + includes_today = true + includes_today = Date.parse(end_date) >= Date.today if end_date.present? + includes_today + end + def relation(params) if params[:workflow_id] ClassificationCounts::DailyWorkflowClassificationCount diff --git a/db/migrate/20240926225916_create_hourly_workflow_classification_count.rb b/db/migrate/20240926225916_create_hourly_workflow_classification_count.rb new file mode 100644 index 0000000..e4be895 --- /dev/null +++ b/db/migrate/20240926225916_create_hourly_workflow_classification_count.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true. + +class CreateHourlyWorkflowClassificationCount < ActiveRecord::Migration[7.0] + # we have to disable the migration transaction because creating materialized views within it is not allowed. + + # Due to how the front end pulls project stats (and workflow stats) all in one go, we hit performance issues; especially if a project has multiple workflows. + # We have discovered that having a non-realtime/materialized only continous aggregate for our daily workflow count cagg is more performant than real time. + # We plan to do the following: + # - Update the daily_classification_count_per_workflow to be materialized only (i.e. non-realtime) + # - Create a subsequent realtime cagg that buckets hourly that we will create data retention policies for. The plan is for up to 72 hours worth of hourly workflow classification counts of data. + # - Update workflow query to first query the daily counts first and the query the hourly counts for just the specific date of now. + disable_ddl_transaction! + def change + execute <<~SQL + create materialized view hourly_classification_count_per_workflow + with ( + timescaledb.continuous + ) as + select + time_bucket('1 hour', event_time) as hour, + workflow_id, + count(*) as classification_count + from classification_events where event_time > now() - INTERVAL '5 days' + group by hour, workflow_id; + SQL + end +end diff --git a/db/migrate/20240926231010_add_refresh_policy_for_hourly_workflow_count.rb b/db/migrate/20240926231010_add_refresh_policy_for_hourly_workflow_count.rb new file mode 100644 index 0000000..ec4364e --- /dev/null +++ b/db/migrate/20240926231010_add_refresh_policy_for_hourly_workflow_count.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +class AddRefreshPolicyForHourlyWorkflowCount < ActiveRecord::Migration[7.0] + disable_ddl_transaction! + def change + execute <<~SQL + SELECT add_continuous_aggregate_policy('hourly_classification_count_per_workflow',start_offset => INTERVAL '5 days', end_offset => INTERVAL '30 minutes', schedule_interval => INTERVAL '1 h'); + SQL + end +end diff --git a/db/migrate/20240926231325_create_data_retention_policy_for_hourly_workflow_count.rb b/db/migrate/20240926231325_create_data_retention_policy_for_hourly_workflow_count.rb new file mode 100644 index 0000000..23c7aca --- /dev/null +++ b/db/migrate/20240926231325_create_data_retention_policy_for_hourly_workflow_count.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +class CreateDataRetentionPolicyForHourlyWorkflowCount < ActiveRecord::Migration[7.0] + disable_ddl_transaction! + def change + execute <<~SQL + SELECT add_retention_policy('hourly_classification_count_per_workflow', drop_after => INTERVAL '3 days'); + SQL + end +end diff --git a/db/migrate/20240926233924_alter_daily_workflow_classification_count_to_materialized_only.rb b/db/migrate/20240926233924_alter_daily_workflow_classification_count_to_materialized_only.rb new file mode 100644 index 0000000..9418a27 --- /dev/null +++ b/db/migrate/20240926233924_alter_daily_workflow_classification_count_to_materialized_only.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +class AlterDailyWorkflowClassificationCountToMaterializedOnly < ActiveRecord::Migration[7.0] + disable_ddl_transaction! + def up + execute <<~SQL + ALTER MATERIALIZED VIEW daily_classification_count_per_workflow set (timescaledb.materialized_only = true); + SQL + end + + def down + execute <<~SQL + ALTER MATERIALIZED VIEW daily_classification_count_per_workflow set (timescaledb.materialized_only = false); + SQL + end +end diff --git a/db/schema.rb b/db/schema.rb index 612a347..4be333d 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.0].define(version: 2024_03_28_183306) do +ActiveRecord::Schema[7.0].define(version: 2024_09_26_233924) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" enable_extension "timescaledb" diff --git a/lib/tasks/db.rake b/lib/tasks/db.rake index 472a835..8b0de7d 100644 --- a/lib/tasks/db.rake +++ b/lib/tasks/db.rake @@ -183,6 +183,16 @@ namespace :db do FROM classification_user_groups WHERE user_group_id IS NOT NULL GROUP BY day, user_group_id, user_id, workflow_id; SQL + + ActiveRecord::Base.connection.execute <<-SQL + CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_classification_count_per_workflow + WITH (timescaledb.continuous) AS + SELECT time_bucket('1 hour', event_time) AS hour, + workflow_id, + count(*) as classification_count + FROM classification_events + GROUP BY hour, workflow_id; + SQL end desc 'Drop Continuous Aggregates Views' @@ -203,6 +213,7 @@ namespace :db do DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user CASCADE; DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user_per_project CASCADE; DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user_per_workflow CASCADE; + DROP MATERIALIZED VIEW IF EXISTS hourly_classification_count_per_workflow CASCADE; SQL end diff --git a/spec/queries/count_classifications_spec.rb b/spec/queries/count_classifications_spec.rb index 14c8c1f..67d1e24 100644 --- a/spec/queries/count_classifications_spec.rb +++ b/spec/queries/count_classifications_spec.rb @@ -22,15 +22,14 @@ end describe 'select_and_time_bucket_by' do + let(:counts) { count_classifications.call(params) } it 'buckets counts by year by default' do - counts = count_classifications.call(params) expected_select_query = "SELECT time_bucket('1 year', day) AS period, SUM(classification_count)::integer AS count FROM \"daily_classification_count\" GROUP BY period ORDER BY period" expect(counts.to_sql).to eq(expected_select_query) end it 'buckets counts by given period' do params[:period] = 'week' - counts = count_classifications.call(params) expected_select_query = "SELECT time_bucket('1 week', day) AS period, SUM(classification_count)::integer AS count FROM \"daily_classification_count\" GROUP BY period ORDER BY period" expect(counts.to_sql).to eq(expected_select_query) end @@ -41,13 +40,13 @@ let!(:diff_workflow_event) { create(:classification_with_diff_workflow) } let!(:diff_project_event) { create(:classification_with_diff_project) } let!(:diff_time_event) { create(:classification_created_yesterday) } + let(:counts) { count_classifications.call(params) } it_behaves_like 'is filterable by workflow' it_behaves_like 'is filterable by project' it_behaves_like 'is filterable by date range' it 'returns counts of all events when no params given' do - counts = count_classifications.call(params) # because default is bucket by year and all data created in the same year, we expect counts to look something like # [] current_year = Date.today.year @@ -58,7 +57,6 @@ it 'returns counts bucketed by given period' do params[:period] = 'day' - counts = count_classifications.call(params) expect(counts.length).to eq(2) expect(counts[0].count).to eq(1) expect(counts[0].period).to eq((Date.today - 1).to_s) @@ -69,7 +67,6 @@ it 'returns counts of events with given workflow' do workflow_id = diff_workflow_event.workflow_id params[:workflow_id] = workflow_id.to_s - counts = count_classifications.call(params) expect(counts.length).to eq(1) expect(counts[0].count).to eq(1) end @@ -77,7 +74,6 @@ it 'returns counts of events with given project' do project_id = diff_project_event.project_id params[:project_id] = project_id.to_s - counts = count_classifications.call(params) expect(counts.length).to eq(1) expect(counts[0].count).to eq(1) end @@ -87,9 +83,110 @@ yesterday = Date.today - 1 params[:start_date] = last_week.to_s params[:end_date] = yesterday.to_s - counts = count_classifications.call(params) expect(counts.length).to eq(1) expect(counts[0].count).to eq(1) end + + context 'when params[:workflow_id] present' do + context 'when params[:end_date] is before current date' do + it 'returns counts from DailyWorkflowClassificationCount' do + yesterday = Date.today - 1 + params[:workflow_id] = diff_time_event.workflow_id.to_s + params[:end_date] = yesterday.to_s + expect(counts.model).to be(ClassificationCounts::DailyWorkflowClassificationCount) + expect(counts.length).to eq(1) + expect(counts[0].count).to eq(1) + end + end + + context 'when params[:end_date] includes current date' do + before do + params[:end_date] = Date.today.to_s + end + + context 'when 0 classifications up to previous day' do + context 'when 0 classifications for current day' do + it 'returns from DailyWorkflowClassificationCount' do + # Select a workflow id that has no classification + params[:workflow_id] = '100' + expect(counts.model).to be(ClassificationCounts::DailyWorkflowClassificationCount) + expect(counts.length).to eq(0) + end + end + + context 'when there are classifications for current day' do + before do + params[:workflow_id] = diff_workflow_event.workflow_id.to_s + end + + it "returns today's classifications from HourlyWorkflowClassificationCount" do + expect(counts.model).to be(ClassificationCounts::HourlyWorkflowClassificationCount) + expect(counts.length).to eq(1) + expect(counts[0].count).to eq(1) + end + + it 'returns current date when period is day' do + params[:period] = 'day' + expect(counts[0].period).to eq(Date.today.to_time.utc) + end + + it 'returns start of week when period is week' do + params[:period] = 'week' + expect(counts[0].period).to eq(Date.today.at_beginning_of_week.to_time.utc) + end + + it 'returns start of month when period is month' do + params[:period] = 'month' + expect(counts[0].period).to eq(Date.today.at_beginning_of_month.to_time.utc) + end + + it 'returns start of year when period is year' do + params[:period] = 'year' + expect(counts[0].period).to eq(Date.today.at_beginning_of_year.to_time.utc) + end + end + end + + context 'when there are classifications up to previous day' do + context 'when there are 0 classifications for current day' do + let!(:classification_created_yesterday_diff_workflow) { create(:classification_created_yesterday, workflow_id: 4, classification_id: 100) } + it 'returns from DailyWorkflowCount (scoped up to yesterday)' do + params[:workflow_id] = classification_created_yesterday_diff_workflow.workflow_id.to_s + expect(counts.model).to be(ClassificationCounts::DailyWorkflowClassificationCount) + expect(counts.length).to eq(1) + expect(counts[0].count).to eq(1) + end + end + + context 'when there are classifications for current day' do + before do + allow(Date).to receive(:today).and_return Date.new(2022, 10, 21) + params[:workflow_id] = diff_workflow_event.workflow_id.to_s + params[:period] = 'year' + end + + context 'when current day is part of the most recently pulled period' do + it 'adds the most recent period to the most recently pulled period counts' do + create(:classification_with_diff_workflow, classification_id: 1000, event_time: Date.new(2022, 1, 2)) + expect(counts.length).to eq(1) + expect(counts[0].count).to eq(2) + expect(counts[0].period).to eq(Date.today.at_beginning_of_year) + end + end + + context 'when current day is not part of the most recently pulled period' do + it 'appends a new entry to scoped from HourlyWorkflowCount query' do + create(:classification_with_diff_workflow, classification_id: 1000, event_time: Date.new(2021, 1, 2)) + expect(counts.length).to eq(2) + counts.each { |c| expect(c.count).to eq(1) } + expect(counts[0].class).to be(ClassificationCounts::DailyWorkflowClassificationCount) + expect(counts[1].class).to be(ClassificationCounts::HourlyWorkflowClassificationCount) + expect(counts.last.period).to eq(Date.today.at_beginning_of_year) + end + end + end + end + end + end end end