Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update queries for workflow counts #71

Merged
merged 23 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3e691d5
Adding Hourly Workflow Counts Realtime CAgg and change DailyWorkflowC…
yuenmichelle1 Sep 27, 2024
4f1e82a
initial go on using hourly classifications for workflows
yuenmichelle1 Oct 3, 2024
73f8249
remove print statement
yuenmichelle1 Oct 3, 2024
77cb262
Update count_classifications.rb
yuenmichelle1 Oct 3, 2024
2e9ec6c
Update count_classifications.rb
yuenmichelle1 Oct 3, 2024
435992a
remove unused var
yuenmichelle1 Oct 6, 2024
1485084
taking care of blank case/ no entry found case
yuenmichelle1 Oct 7, 2024
b4eb0b4
remove logs
yuenmichelle1 Oct 7, 2024
3d0831c
add frames for test
yuenmichelle1 Oct 7, 2024
4168da8
adding testing for cases when end_date is before and after current day
yuenmichelle1 Oct 7, 2024
77a14c3
add tests for testing period and change eriod format to match that of…
yuenmichelle1 Oct 8, 2024
b1d4a66
adding tests for the case when there are classifications from previou…
yuenmichelle1 Oct 8, 2024
d9d5e39
update comment on migration
yuenmichelle1 Oct 8, 2024
4ae1eee
update hound comments
yuenmichelle1 Oct 8, 2024
6ee4f68
update db.rake with new caggs
yuenmichelle1 Oct 8, 2024
789b555
Update hourly_workflow_classification_count.rb
yuenmichelle1 Oct 8, 2024
db35f9b
Update db.rake
yuenmichelle1 Oct 8, 2024
967974b
remove redundant returns
yuenmichelle1 Oct 8, 2024
7ba19d4
add frozen string literal true
yuenmichelle1 Oct 8, 2024
b751b38
rubocop fix hound
yuenmichelle1 Oct 8, 2024
d932fdf
adding comment on spec
yuenmichelle1 Oct 8, 2024
862dd9b
rename spec to note adding counts
yuenmichelle1 Oct 8, 2024
3bc6875
update migrations to be reversible
yuenmichelle1 Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

module ClassificationCounts
class HourlyWorkflowClassificationCount < ApplicationRecord
self.table_name = 'hourly_classification_count_per_workflow'
attribute :classification_count, :integer

def readonly?
true
end
end
end
86 changes: 85 additions & 1 deletion app/queries/count_classifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,25 @@ def call(params={})
scoped = @counts
scoped = filter_by_workflow_id(scoped, params[:workflow_id])
scoped = filter_by_project_id(scoped, params[:project_id])
filter_by_date_range(scoped, params[:start_date], params[:end_date])
# Because of how the FE, calls out to this endpoint when querying for a project's workflow's classifications count
# And because of our use of Real Time Aggregates
# Querying the DailyClassificationCountByWorkflow becomes not as performant
# Because we are limited in resources, we do the following mitigaion for ONLY querying workflow classification counts:
# 1. Create a New HourlyClassificationCountByWorkflow which is RealTime and Create a Data Retention for this new aggregate (this should limit the amount of data the query planner has to sift through)
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved
# 2. Turn off Real Time aggreation for the DailyClassificationCount
# 3. For workflow classification count queries that include the current date's counts, we query current date's counts via the HourlyClassificationCountByWorkflow and query the DailyClassificationCountByWorkflow for everything before the current date's
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved

if params[:workflow_id].present?
if end_date_includes_today?(params[:end_date])
scoped_upto_yesterday = filter_by_date_range(scoped, params[:start_date], Date.yesterday.to_s)
scoped = include_today_to_scoped(scoped_upto_yesterday, params[:workflow_id], params[:period])
else
scoped = filter_by_date_range(scoped, params[:start_date], params[:end_date])
end
else
scoped = filter_by_date_range(scoped, params[:start_date], params[:end_date])
end
scoped
end

private
Expand All @@ -22,6 +40,72 @@ def initial_scope(relation, period)
relation.select(select_and_time_bucket_by(period, 'classification')).group('period').order('period')
end

def include_today_to_scoped(scoped_upto_yesterday, workflow_id, period)
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved
period = 'year' if period.nil?
todays_classifications = current_date_workflow_classifications(workflow_id)
return scoped_upto_yesterday if todays_classifications.blank?

if scoped_upto_yesterday.blank?
# append new entry where period is start of the period
todays_classifications[0].period = start_of_current_period(period).to_time.utc
return todays_classifications
end

most_recent_date_from_scoped = scoped_upto_yesterday[-1].period.to_date

# If period=week, month, or year, the current date could be part of that week, month or year;
# we check if the current date is part of the period
# if so, we add the count to the most recent period pulled from db
# if not, we append as a new entry for the current period
if today_part_of_recent_period?(most_recent_date_from_scoped, period)
add_todays_counts_to_recent_period_counts(scoped_upto_yesterday, todays_classifications)
else
todays_classifications[0].period = start_of_current_period(period).to_time.utc
append_today_to_scoped(scoped_upto_yesterday, todays_classifications)
end
end

def start_of_current_period(period)
today = Date.today
case period
when 'day'
today
when 'week'
# Returns Monday of current week
today.at_beginning_of_week
when 'month'
today.at_beginning_of_month
when 'year'
today.at_beginning_of_year
end
end

def today_part_of_recent_period?(most_recent_date, period)
most_recent_date == start_of_current_period(period)
end

def append_today_to_scoped(count_records_up_to_yesterday, todays_count)
count_records_up_to_yesterday + todays_count
end

def add_todays_counts_to_recent_period_counts(count_records_up_to_yesterday, todays_count)
current_period_counts = count_records_up_to_yesterday[-1].count + todays_count[0].count
count_records_up_to_yesterday[-1].count = current_period_counts
count_records_up_to_yesterday
end

def current_date_workflow_classifications(workflow_id)
current_day_str = Date.today.to_s
current_hourly_classifications = ClassificationCounts::HourlyWorkflowClassificationCount.select("time_bucket('1 day', hour) AS period, SUM(classification_count)::integer AS count").group('period').order('period').where("hour >= '#{current_day_str}'")
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved
filter_by_workflow_id(current_hourly_classifications, workflow_id)
end

def end_date_includes_today?(end_date)
includes_today = true
includes_today = Date.parse(end_date) >= Date.today if end_date.present?
includes_today
end

def relation(params)
if params[:workflow_id]
ClassificationCounts::DailyWorkflowClassificationCount
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true.
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved

class CreateHourlyWorkflowClassificationCount < ActiveRecord::Migration[7.0]
# we have to disable the migration transaction because creating materialized views within it is not allowed.

# Due to how the front end pulls project stats (and workflow stats) all in one go, we hit performance issues; especially if a project has multiple workflows.
# We have discovered that having a non-realtime/materialized only continous aggregate for our daily workflow count cagg is more performant than real time.
# We plan to do the following:
# - Update the daily_classification_count_per_workflow to be materialized only (i.e. non-realtime)
# - Create a subsequent realtime cagg that buckets hourly that we will create data retention policies for. The plan is for up to 72 hours worth of hourly workflow classification counts of data.
# - Update workflow query to first query the daily counts first and the query the hourly counts for just the specific date of now.
disable_ddl_transaction!
def change
execute <<~SQL
create materialized view hourly_classification_count_per_workflow
with (
timescaledb.continuous
) as
select
time_bucket('1 hour', event_time) as hour,
workflow_id,
count(*) as classification_count
from classification_events where event_time > now() - INTERVAL '5 days'
group by hour, workflow_id;
SQL
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

class AddRefreshPolicyForHourlyWorkflowCount < ActiveRecord::Migration[7.0]
disable_ddl_transaction!
def change
execute <<~SQL
SELECT add_continuous_aggregate_policy('hourly_classification_count_per_workflow',start_offset => INTERVAL '5 days', end_offset => INTERVAL '30 minutes', schedule_interval => INTERVAL '1 h');
SQL
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

class CreateDataRetentionPolicyForHourlyWorkflowCount < ActiveRecord::Migration[7.0]
disable_ddl_transaction!
def change
execute <<~SQL
SELECT add_retention_policy('hourly_classification_count_per_workflow', drop_after => INTERVAL '3 days');
SQL
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

class AlterDailyWorkflowClassificationCountToMaterializedOnly < ActiveRecord::Migration[7.0]
disable_ddl_transaction!
def up
execute <<~SQL
ALTER MATERIALIZED VIEW daily_classification_count_per_workflow set (timescaledb.materialized_only = true);
SQL
end

def down
execute <<~SQL
ALTER MATERIALIZED VIEW daily_classification_count_per_workflow set (timescaledb.materialized_only = false);
SQL
end
end
2 changes: 1 addition & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.0].define(version: 2024_03_28_183306) do
ActiveRecord::Schema[7.0].define(version: 2024_09_26_233924) do
yuenmichelle1 marked this conversation as resolved.
Show resolved Hide resolved
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
enable_extension "timescaledb"
Expand Down
11 changes: 11 additions & 0 deletions lib/tasks/db.rake
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ namespace :db do
FROM classification_user_groups WHERE user_group_id IS NOT NULL
GROUP BY day, user_group_id, user_id, workflow_id;
SQL

ActiveRecord::Base.connection.execute <<-SQL
CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_classification_count_per_workflow
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', event_time) AS hour,
workflow_id,
count(*) as classification_count
FROM classification_events
GROUP BY hour, workflow_id;
SQL
end

desc 'Drop Continuous Aggregates Views'
Expand All @@ -203,6 +213,7 @@ namespace :db do
DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user CASCADE;
DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user_per_project CASCADE;
DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user_per_workflow CASCADE;
DROP MATERIALIZED VIEW IF EXISTS hourly_classification_count_per_workflow CASCADE;
SQL
end

Expand Down
111 changes: 104 additions & 7 deletions spec/queries/count_classifications_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
end

describe 'select_and_time_bucket_by' do
let(:counts) { count_classifications.call(params) }
it 'buckets counts by year by default' do
counts = count_classifications.call(params)
expected_select_query = "SELECT time_bucket('1 year', day) AS period, SUM(classification_count)::integer AS count FROM \"daily_classification_count\" GROUP BY period ORDER BY period"
expect(counts.to_sql).to eq(expected_select_query)
end

it 'buckets counts by given period' do
params[:period] = 'week'
counts = count_classifications.call(params)
expected_select_query = "SELECT time_bucket('1 week', day) AS period, SUM(classification_count)::integer AS count FROM \"daily_classification_count\" GROUP BY period ORDER BY period"
expect(counts.to_sql).to eq(expected_select_query)
end
Expand All @@ -41,13 +40,13 @@
let!(:diff_workflow_event) { create(:classification_with_diff_workflow) }
let!(:diff_project_event) { create(:classification_with_diff_project) }
let!(:diff_time_event) { create(:classification_created_yesterday) }
let(:counts) { count_classifications.call(params) }

it_behaves_like 'is filterable by workflow'
it_behaves_like 'is filterable by project'
it_behaves_like 'is filterable by date range'

it 'returns counts of all events when no params given' do
counts = count_classifications.call(params)
# because default is bucket by year and all data created in the same year, we expect counts to look something like
# [<ClassificationCounts::DailyClassificationCount period: 01-01-2023, count: 4>]
current_year = Date.today.year
Expand All @@ -58,7 +57,6 @@

it 'returns counts bucketed by given period' do
params[:period] = 'day'
counts = count_classifications.call(params)
expect(counts.length).to eq(2)
expect(counts[0].count).to eq(1)
expect(counts[0].period).to eq((Date.today - 1).to_s)
Expand All @@ -69,15 +67,13 @@
it 'returns counts of events with given workflow' do
workflow_id = diff_workflow_event.workflow_id
params[:workflow_id] = workflow_id.to_s
counts = count_classifications.call(params)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end

it 'returns counts of events with given project' do
project_id = diff_project_event.project_id
params[:project_id] = project_id.to_s
counts = count_classifications.call(params)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end
Expand All @@ -87,9 +83,110 @@
yesterday = Date.today - 1
params[:start_date] = last_week.to_s
params[:end_date] = yesterday.to_s
counts = count_classifications.call(params)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end

context 'when params[:workflow_id] present' do
context 'when params[:end_date] is before current date' do
it 'returns counts from DailyWorkflowClassificationCount' do
yesterday = Date.today - 1
params[:workflow_id] = diff_time_event.workflow_id.to_s
params[:end_date] = yesterday.to_s
expect(counts.model).to be(ClassificationCounts::DailyWorkflowClassificationCount)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end
end

context 'when params[:end_date] includes current date' do
before do
params[:end_date] = Date.today.to_s
end

context 'when 0 classifications up to previous day' do
context 'when 0 classifications for current day' do
it 'returns from DailyWorkflowClassificationCount' do
# Select a workflow id that has no classification
params[:workflow_id] = '100'
expect(counts.model).to be(ClassificationCounts::DailyWorkflowClassificationCount)
expect(counts.length).to eq(0)
end
end

context 'when there are classifications for current day' do
before do
params[:workflow_id] = diff_workflow_event.workflow_id.to_s
end

it "returns today's classifications from HourlyWorkflowClassificationCount" do
expect(counts.model).to be(ClassificationCounts::HourlyWorkflowClassificationCount)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end

it 'returns current date when period is day' do
params[:period] = 'day'
expect(counts[0].period).to eq(Date.today.to_time.utc)
end

it 'returns start of week when period is week' do
params[:period] = 'week'
expect(counts[0].period).to eq(Date.today.at_beginning_of_week.to_time.utc)
end

it 'returns start of month when period is month' do
params[:period] = 'month'
expect(counts[0].period).to eq(Date.today.at_beginning_of_month.to_time.utc)
end

it 'returns start of year when period is year' do
params[:period] = 'year'
expect(counts[0].period).to eq(Date.today.at_beginning_of_year.to_time.utc)
end
end
end

context 'when there are classifications up to previous day' do
context 'when there are 0 classifications for current day' do
let!(:classification_created_yesterday_diff_workflow) { create(:classification_created_yesterday, workflow_id: 4, classification_id: 100) }
it 'returns from DailyWorkflowCount (scoped up to yesterday)' do
params[:workflow_id] = classification_created_yesterday_diff_workflow.workflow_id.to_s
expect(counts.model).to be(ClassificationCounts::DailyWorkflowClassificationCount)
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(1)
end
end

context 'when there are classifications for current day' do
before do
allow(Date).to receive(:today).and_return Date.new(2022, 10, 21)
params[:workflow_id] = diff_workflow_event.workflow_id.to_s
params[:period] = 'year'
end

context 'when current day is part of the most recently pulled period' do
it 'adds the most recent period to the most recently pulled period counts' do
create(:classification_with_diff_workflow, classification_id: 1000, event_time: Date.new(2022, 1, 2))
expect(counts.length).to eq(1)
expect(counts[0].count).to eq(2)
expect(counts[0].period).to eq(Date.today.at_beginning_of_year)
end
end

context 'when current day is not part of the most recently pulled period' do
it 'appends a new entry to scoped from HourlyWorkflowCount query' do
create(:classification_with_diff_workflow, classification_id: 1000, event_time: Date.new(2021, 1, 2))
expect(counts.length).to eq(2)
counts.each { |c| expect(c.count).to eq(1) }
expect(counts[0].class).to be(ClassificationCounts::DailyWorkflowClassificationCount)
expect(counts[1].class).to be(ClassificationCounts::HourlyWorkflowClassificationCount)
expect(counts.last.period).to eq(Date.today.at_beginning_of_year)
end
end
end
end
end
end
end
end
Loading