Skip to content

Commit

Permalink
Update queries for workflow counts (#71)
Browse files Browse the repository at this point in the history
* Adding Hourly Workflow Counts Realtime CAgg and change DailyWorkflowCount to Materialized Only View

* initial go on using hourly classifications for workflows

* remove print statement

* Update count_classifications.rb

* Update count_classifications.rb

* remove unused var

* taking care of blank case/ no entry found case

* remove logs

* add frames for test

* adding testing for cases when end_date is before and after current day

* add tests for testing period and change eriod format to match that of data pull

* adding tests for the case when there are classifications from previous day

* update comment on migration

* update hound comments

* update db.rake with new caggs

* Update hourly_workflow_classification_count.rb

* Update db.rake

* remove redundant returns

* add frozen string literal true

* rubocop fix hound

* adding comment on spec

* rename spec to note adding counts

* update migrations to be reversible
  • Loading branch information
yuenmichelle1 authored Oct 11, 2024
1 parent 26ecb53 commit cef6509
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

module ClassificationCounts
class HourlyWorkflowClassificationCount < ApplicationRecord
self.table_name = 'hourly_classification_count_per_workflow'
attribute :classification_count, :integer

def readonly?
true
end
end
end
86 changes: 85 additions & 1 deletion app/queries/count_classifications.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,25 @@ def call(params={})
scoped = @counts
scoped = filter_by_workflow_id(scoped, params[:workflow_id])
scoped = filter_by_project_id(scoped, params[:project_id])
filter_by_date_range(scoped, params[:start_date], params[:end_date])
# Because of how the FE, calls out to this endpoint when querying for a project's workflow's classifications count
# And because of our use of Real Time Aggregates
# Querying the DailyClassificationCountByWorkflow becomes not as performant
# Because we are limited in resources, we do the following mitigaion for ONLY querying workflow classification counts:
# 1. Create a New HourlyClassificationCountByWorkflow which is RealTime and Create a Data Retention for this new aggregate (this should limit the amount of data the query planner has to sift through)
# 2. Turn off Real Time aggreation for the DailyClassificationCount
# 3. For workflow classification count queries that include the current date's counts, we query current date's counts via the HourlyClassificationCountByWorkflow and query the DailyClassificationCountByWorkflow for everything before the current date's

if params[:workflow_id].present?
if end_date_includes_today?(params[:end_date])
scoped_upto_yesterday = filter_by_date_range(scoped, params[:start_date], Date.yesterday.to_s)
scoped = include_today_to_scoped(scoped_upto_yesterday, params[:workflow_id], params[:period])
else
scoped = filter_by_date_range(scoped, params[:start_date], params[:end_date])
end
else
scoped = filter_by_date_range(scoped, params[:start_date], params[:end_date])
end
scoped
end

private
Expand All @@ -22,6 +40,72 @@ def initial_scope(relation, period)
relation.select(select_and_time_bucket_by(period, 'classification')).group('period').order('period')
end

def include_today_to_scoped(scoped_upto_yesterday, workflow_id, period)
period = 'year' if period.nil?
todays_classifications = current_date_workflow_classifications(workflow_id)
return scoped_upto_yesterday if todays_classifications.blank?

if scoped_upto_yesterday.blank?
# append new entry where period is start of the period
todays_classifications[0].period = start_of_current_period(period).to_time.utc
return todays_classifications
end

most_recent_date_from_scoped = scoped_upto_yesterday[-1].period.to_date

# If period=week, month, or year, the current date could be part of that week, month or year;
# we check if the current date is part of the period
# if so, we add the count to the most recent period pulled from db
# if not, we append as a new entry for the current period
if today_part_of_recent_period?(most_recent_date_from_scoped, period)
add_todays_counts_to_recent_period_counts(scoped_upto_yesterday, todays_classifications)
else
todays_classifications[0].period = start_of_current_period(period).to_time.utc
append_today_to_scoped(scoped_upto_yesterday, todays_classifications)
end
end

def start_of_current_period(period)
today = Date.today
case period
when 'day'
today
when 'week'
# Returns Monday of current week
today.at_beginning_of_week
when 'month'
today.at_beginning_of_month
when 'year'
today.at_beginning_of_year
end
end

def today_part_of_recent_period?(most_recent_date, period)
most_recent_date == start_of_current_period(period)
end

def append_today_to_scoped(count_records_up_to_yesterday, todays_count)
count_records_up_to_yesterday + todays_count
end

def add_todays_counts_to_recent_period_counts(count_records_up_to_yesterday, todays_count)
current_period_counts = count_records_up_to_yesterday[-1].count + todays_count[0].count
count_records_up_to_yesterday[-1].count = current_period_counts
count_records_up_to_yesterday
end

def current_date_workflow_classifications(workflow_id)
current_day_str = Date.today.to_s
current_hourly_classifications = ClassificationCounts::HourlyWorkflowClassificationCount.select("time_bucket('1 day', hour) AS period, SUM(classification_count)::integer AS count").group('period').order('period').where("hour >= '#{current_day_str}'")
filter_by_workflow_id(current_hourly_classifications, workflow_id)
end

def end_date_includes_today?(end_date)
includes_today = true
includes_today = Date.parse(end_date) >= Date.today if end_date.present?
includes_today
end

def relation(params)
if params[:workflow_id]
ClassificationCounts::DailyWorkflowClassificationCount
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true.

class CreateHourlyWorkflowClassificationCount < ActiveRecord::Migration[7.0]
# we have to disable the migration transaction because creating materialized views within it is not allowed.

# Due to how the front end pulls project stats (and workflow stats) all in one go, we hit performance issues; especially if a project has multiple workflows.
# We have discovered that having a non-realtime/materialized only continous aggregate for our daily workflow count cagg is more performant than real time.
# We plan to do the following:
# - Update the daily_classification_count_per_workflow to be materialized only (i.e. non-realtime)
# - Create a subsequent realtime cagg that buckets hourly that we will create data retention policies for. The plan is for up to 72 hours worth of hourly workflow classification counts of data.
# - Update workflow query to first query the daily counts first and the query the hourly counts for just the specific date of now.
disable_ddl_transaction!
def up
execute <<~SQL
create materialized view hourly_classification_count_per_workflow
with (
timescaledb.continuous
) as
select
time_bucket('1 hour', event_time) as hour,
workflow_id,
count(*) as classification_count
from classification_events where event_time > now() - INTERVAL '5 days'
group by hour, workflow_id;
SQL
end

def down
execute <<~SQL
DROP materialized view hourly_classification_count_per_workflow;
SQL
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

class AddRefreshPolicyForHourlyWorkflowCount < ActiveRecord::Migration[7.0]
disable_ddl_transaction!
def up
execute <<~SQL
SELECT add_continuous_aggregate_policy('hourly_classification_count_per_workflow',start_offset => INTERVAL '5 days', end_offset => INTERVAL '30 minutes', schedule_interval => INTERVAL '1 h');
SQL
end

def down
execute <<~SQL
SELECT remove_continuous_aggregate_policy('hourly_classification_count_per_workflow');
SQL
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

class CreateDataRetentionPolicyForHourlyWorkflowCount < ActiveRecord::Migration[7.0]
disable_ddl_transaction!
def up
execute <<~SQL
SELECT add_retention_policy('hourly_classification_count_per_workflow', drop_after => INTERVAL '3 days');
SQL
end

def down
execute <<~SQL
SELECT remove_retention_policy('hourly_classification_count_per_workflow');
SQL
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

class AlterDailyWorkflowClassificationCountToMaterializedOnly < ActiveRecord::Migration[7.0]
disable_ddl_transaction!
def up
execute <<~SQL
ALTER MATERIALIZED VIEW daily_classification_count_per_workflow set (timescaledb.materialized_only = true);
SQL
end

def down
execute <<~SQL
ALTER MATERIALIZED VIEW daily_classification_count_per_workflow set (timescaledb.materialized_only = false);
SQL
end
end
2 changes: 1 addition & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.0].define(version: 2024_03_28_183306) do
ActiveRecord::Schema[7.0].define(version: 2024_09_26_233924) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
enable_extension "timescaledb"
Expand Down
11 changes: 11 additions & 0 deletions lib/tasks/db.rake
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ namespace :db do
FROM classification_user_groups WHERE user_group_id IS NOT NULL
GROUP BY day, user_group_id, user_id, workflow_id;
SQL

ActiveRecord::Base.connection.execute <<-SQL
CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_classification_count_per_workflow
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', event_time) AS hour,
workflow_id,
count(*) as classification_count
FROM classification_events
GROUP BY hour, workflow_id;
SQL
end

desc 'Drop Continuous Aggregates Views'
Expand All @@ -203,6 +213,7 @@ namespace :db do
DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user CASCADE;
DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user_per_project CASCADE;
DROP MATERIALIZED VIEW IF EXISTS daily_group_classification_count_and_time_per_user_per_workflow CASCADE;
DROP MATERIALIZED VIEW IF EXISTS hourly_classification_count_per_workflow CASCADE;
SQL
end

Expand Down
Loading

0 comments on commit cef6509

Please sign in to comment.