Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update queries for workflow classification counts - attempt to optimize workflow classification count performance #70

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

module ClassificationCounts
class HourlyWorkflowClassificationCount < ApplicationRecord
self.table_name = 'hourly_classification_count_per_workflow'
attribute :classification_count, :integer

def readonly?
true
end
end
end
86 changes: 85 additions & 1 deletion app/queries/count_classifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,25 @@ def call(params={})
scoped = @counts
scoped = filter_by_workflow_id(scoped, params[:workflow_id])
scoped = filter_by_project_id(scoped, params[:project_id])
filter_by_date_range(scoped, params[:start_date], params[:end_date])
# Because of how the FE, calls out to this endpoint when querying for a project's workflow's classifications count
# And because of our use of Real Time Aggregates
# Querying the DailyClassificationCountByWorkflow becomes not as performant
# Because we are limited in resources, we do the following mitigaion for ONLY querying workflow classification counts:
# 1. Create a New HourlyClassificationCountByWorkflow which is RealTime and Create a Data Retention for this new aggregate (this should limit the amount of data the query planner has to sift through)
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved
# 2. Turn off Real Time aggreation for the DailyClassificationCount
# 3. For workflow classification count queries that include the current date's counts, we query current date's counts via the HourlyClassificationCountByWorkflow and query the DailyClassificationCountByWorkflow for everything before the current date's
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved

if params[:workflow_id].present?
if end_date_includes_today?(params[:end_date])
scoped_upto_yesterday = filter_by_date_range(scoped, params[:start_date], Date.yesterday.to_s)
scoped = include_today_to_scoped(scoped_upto_yesterday, params[:workflow_id], params[:period])
else
scoped = filter_by_date_range(scoped, params[:start_date], params[:end_date])
end
else
scoped = filter_by_date_range(scoped, params[:start_date], params[:end_date])
end
scoped
end

private
Expand All @@ -22,6 +40,72 @@ def initial_scope(relation, period)
relation.select(select_and_time_bucket_by(period, 'classification')).group('period').order('period')
end

def include_today_to_scoped(scoped_upto_yesterday, workflow_id, period)
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved
period = 'year' if period.nil?
todays_classifications = current_date_workflow_classifications(workflow_id)
return scoped_upto_yesterday if todays_classifications.blank?

if scoped_upto_yesterday.blank?
# append new entry where period is start of the period
todays_classifications[0].period = start_of_current_period(period).to_time.utc
return todays_classifications
end

most_recent_date_from_scoped = scoped_upto_yesterday[-1].period.to_date

# If period=week, month, or year, the current date could be part of that week, month or year;
# we check if the current date is part of the period
# if so, we add the count to the most recent period pulled from db
# if not, we append as a new entry for the current period
if today_part_of_recent_period?(most_recent_date_from_scoped, period)
add_todays_counts_to_recent_period_counts(scoped_upto_yesterday, todays_classifications)
else
todays_classifications[0].period = start_of_current_period(period).to_time.utc
append_today_to_scoped(scoped_upto_yesterday, todays_classifications)
end
end

def start_of_current_period(period)
today = Date.today
case period
when 'day'
today
when 'week'
# Returns Monday of current week
today.at_beginning_of_week
when 'month'
today.at_beginning_of_month
when 'year'
today.at_beginning_of_year
end
end

def today_part_of_recent_period?(most_recent_date, period)
most_recent_date == start_of_current_period(period)
end

def append_today_to_scoped(count_records_up_to_yesterday, todays_count)
count_records_up_to_yesterday + todays_count
end

def add_todays_counts_to_recent_period_counts(count_records_up_to_yesterday, todays_count)
current_period_counts = count_records_up_to_yesterday[-1].count + todays_count[0].count
count_records_up_to_yesterday[-1].count = current_period_counts
count_records_up_to_yesterday
end

def current_date_workflow_classifications(workflow_id)
current_day_str = Date.today.to_s
current_hourly_classifications = ClassificationCounts::HourlyWorkflowClassificationCount.select("time_bucket('1 day', hour) AS period, SUM(classification_count)::integer AS count").group('period').order('period').where("hour >= '#{current_day_str}'")
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved
filter_by_workflow_id(current_hourly_classifications, workflow_id)
end

def end_date_includes_today?(end_date)
includes_today = true
includes_today = Date.parse(end_date) >= Date.today if end_date.present?
includes_today
end

def relation(params)
if params[:workflow_id]
ClassificationCounts::DailyWorkflowClassificationCount
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style/FrozenStringLiteralComment: Missing magic comment # frozen_string_literal: true.


class CreateHourlyWorkflowClassificationCount < ActiveRecord::Migration[7.0]
# we have to disable the migration transaction because creating materialized views within it is not allowed.

# Due to how the front end pulls project stats (and workflow stats) all in one go, we hit performance issues; especially if a project has multiple workflows.
# We have discovered that having a non-realtime/materialized only continous aggregate for our daily workflow count cagg is more performant than real time.
# We plan to do the following:
# - Update the daily_classification_count_per_workflow to be materialized only (i.e. non-realtime)
# - Create a subsequent realtime cagg that buckets hourly that we will create data retention policies for. The plan is for up to 72 hours worth of hourly workflow classification counts of data.
# - Update workflow query to first query the daily counts first and the query the hourly counts for just the specific date of now.
disable_ddl_transaction!
def change
execute <<~SQL
create materialized view hourly_classification_count_per_workflow
with (
timescaledb.continuous
) as
select
time_bucket('1 hour', event_time) as hour,
workflow_id,
count(*) as classification_count
from classification_events where event_time > now() - INTERVAL '5 days'
group by hour, workflow_id;
SQL
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

class AddRefreshPolicyForHourlyWorkflowCount < ActiveRecord::Migration[7.0]
disable_ddl_transaction!
def change
execute <<~SQL
SELECT add_continuous_aggregate_policy('hourly_classification_count_per_workflow',start_offset => INTERVAL '5 days', end_offset => INTERVAL '30 minutes', schedule_interval => INTERVAL '1 h');
SQL
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

class CreateDataRetentionPolicyForHourlyWorkflowCount < ActiveRecord::Migration[7.0]
disable_ddl_transaction!
def change
execute <<~SQL
SELECT add_retention_policy('hourly_classification_count_per_workflow', drop_after => INTERVAL '3 days');
SQL
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

class AlterDailyWorkflowClassificationCountToMaterializedOnly < ActiveRecord::Migration[7.0]
disable_ddl_transaction!
def up
execute <<~SQL
ALTER MATERIALIZED VIEW daily_classification_count_per_workflow set (timescaledb.materialized_only = true);
SQL
end

def down
execute <<~SQL
ALTER MATERIALIZED VIEW daily_classification_count_per_workflow set (timescaledb.materialized_only = false);
SQL
end
end
2 changes: 1 addition & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.0].define(version: 2024_03_28_183306) do
ActiveRecord::Schema[7.0].define(version: 2024_09_26_233924) do
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
enable_extension "timescaledb"
Expand Down
11 changes: 11 additions & 0 deletions lib/tasks/db.rake
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ namespace :db do
FROM classification_user_groups WHERE user_group_id IS NOT NULL
GROUP BY day, user_group_id, user_id, workflow_id;
SQL

ActiveRecord::Base.connection.execute <<-SQL
CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_classification_count_per_workflow
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', event_time) AS hour,
workflow_id,
count(*) as classification_count
FROM classification_events
GROUP BY hour, workflow_id;
SQL
end

desc 'Drop Continuous Aggregates Views'
Expand All @@ -203,6 +213,7 @@ namespace :db do
DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user CASCADE;
DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user_per_project CASCADE;
DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user_per_workflow CASCADE;
DROP MATERIALIZED VIEW IF EXISTS hourly_classification_count_per_workflow CASCADE;
SQL
end

Expand Down
111 changes: 104 additions & 7 deletions spec/queries/count_classifications_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
end

describe 'select_and_time_bucket_by' do
let(:counts) { count_classifications.call(params) }
it 'buckets counts by year by default' do
counts = count_classifications.call(params)
expected_select_query = "SELECT time_bucket('1 year', day) AS period, SUM(classification_count)::integer AS count FROM \"daily_classification_count\" GROUP BY period ORDER BY period"
expect(counts.to_sql).to eq(expected_select_query)
end

it 'buckets counts by given period' do
params[:period] = 'week'
counts = count_classifications.call(params)
expected_select_query = "SELECT time_bucket('1 week', day) AS period, SUM(classification_count)::integer AS count FROM \"daily_classification_count\" GROUP BY period ORDER BY period"
expect(counts.to_sql).to eq(expected_select_query)
end
Expand All @@ -41,13 +40,13 @@
let!(:diff_workflow_event) { create(:classification_with_diff_workflow) }
let!(:diff_project_event) { create(:classification_with_diff_project) }
let!(:diff_time_event) { create(:classification_created_yesterday) }
let(:counts) { count_classifications.call(params) }

it_behaves_like 'is filterable by workflow'
it_behaves_like 'is filterable by project'
it_behaves_like 'is filterable by date range'

it 'returns counts of all events when no params given' do
counts = count_classifications.call(params)
# because default is bucket by year and all data created in the same year, we expect counts to look something like
# [<ClassificationCounts::DailyClassificationCount period: 01-01-2023, count: 4>]
current_year = Date.today.year
Expand All @@ -58,7 +57,6 @@

it 'returns counts bucketed by given period' do
params[:period] = 'day'
counts = count_classifications.call(params)
expect(counts.length).to eq(2)
expect(counts[0].count).to eq(1)
expect(counts[0].period).to eq((Date.today - 1).to_s)
Expand All @@ -69,15 +67,13 @@
it 'returns counts of events with given workflow' do
workflow_id = diff_workflow_event.workflow_id
params[:workflow_id] = workflow_id.to_s
counts = count_classifications.call(params)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end

it 'returns counts of events with given project' do
project_id = diff_project_event.project_id
params[:project_id] = project_id.to_s
counts = count_classifications.call(params)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end
Expand All @@ -87,9 +83,110 @@
yesterday = Date.today - 1
params[:start_date] = last_week.to_s
params[:end_date] = yesterday.to_s
counts = count_classifications.call(params)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end

context 'when params[:workflow_id] present' do
context 'when params[:end_date] is before current date' do
it 'returns counts from DailyWorkflowClassificationCount' do
yesterday = Date.today - 1
params[:workflow_id] = diff_time_event.workflow_id.to_s
params[:end_date] = yesterday.to_s
expect(counts.model).to be(ClassificationCounts::DailyWorkflowClassificationCount)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end
end

context 'when params[:end_date] includes current date' do
before do
params[:end_date] = Date.today.to_s
end

context 'when 0 classifications up to previous day' do
context 'when 0 classifications for current day' do
it 'returns from DailyWorkflowClassificationCount' do
# Select a workflow id that has no classification
params[:workflow_id] = '100'
expect(counts.model).to be(ClassificationCounts::DailyWorkflowClassificationCount)
expect(counts.length).to eq(0)
end
end

context 'when there are classifications for current day' do
before do
params[:workflow_id] = diff_workflow_event.workflow_id.to_s
end

it "returns today's classifications from HourlyWorkflowClassificationCount" do
expect(counts.model).to be(ClassificationCounts::HourlyWorkflowClassificationCount)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end

it 'returns current date when period is day' do
params[:period] = 'day'
expect(counts[0].period).to eq(Date.today.to_time.utc)
end

it 'returns start of week when period is week' do
params[:period] = 'week'
expect(counts[0].period).to eq(Date.today.at_beginning_of_week.to_time.utc)
end

it 'returns start of month when period is month' do
params[:period] = 'month'
expect(counts[0].period).to eq(Date.today.at_beginning_of_month.to_time.utc)
end

it 'returns start of year when period is year' do
params[:period] = 'year'
expect(counts[0].period).to eq(Date.today.at_beginning_of_year.to_time.utc)
end
end
end

context 'when there are classifications up to previous day' do
context 'when there are 0 classifications for current day' do
let!(:classification_created_yesterday_diff_workflow) { create(:classification_created_yesterday, workflow_id: 4, classification_id: 100) }
it 'returns from DailyWorkflowCount (scoped up to yesterday)' do
params[:workflow_id] = classification_created_yesterday_diff_workflow.workflow_id.to_s
expect(counts.model).to be(ClassificationCounts::DailyWorkflowClassificationCount)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end
end

context 'when there are classifications for current day' do
before do
allow(Date).to receive(:today).and_return Date.new(2022, 10, 21)
params[:workflow_id] = diff_workflow_event.workflow_id.to_s
params[:period] = 'year'
end

context 'when current day is part of the most recently pulled period' do
it 'adds the most recent period to the most recently pulled period counts' do
create(:classification_with_diff_workflow, classification_id: 1000, event_time: Date.new(2022, 1, 2))
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(2)
expect(counts[0].period).to eq(Date.today.at_beginning_of_year)
end
end

context 'when current day is not part of the most recently pulled period' do
it 'appends a new entry to scoped from HourlyWorkflowCount query' do
create(:classification_with_diff_workflow, classification_id: 1000, event_time: Date.new(2021, 1, 2))
expect(counts.length).to eq(2)
counts.each { |c| expect(c.count).to eq(1) }
expect(counts[0].class).to be(ClassificationCounts::DailyWorkflowClassificationCount)
expect(counts[1].class).to be(ClassificationCounts::HourlyWorkflowClassificationCount)
expect(counts.last.period).to eq(Date.today.at_beginning_of_year)
end
end
end
end
end
end
end
end
Loading