Skip to content

Commit

Permalink
Try to fix various analytics summary issues.
Browse files Browse the repository at this point in the history
- Fix the daily summaries going to the end of the current month
  (`set_end_time` needed to be called again).
- Eliminate sort order all on query that doesn't make sense and can
  cause warnings in results.
- Fix the unique_user_ids aggregation erroring out if it returned
  `max_size` buckets, since this query also has other buckets we need to
  make sure there's room for those.
- Remove nested aggregations. These were used in an earlier version, but
  now that we run separate queries for each time frame (eg, daily or
  monthly), we no longer need nested aggregations. Removing them may
  speed things up.
- Make sure the cache only caches items with a response (and not nil
  responses).

All of this is still a bit cumbersome and fragile (with the weird
caching stuff), so this could all be improved, but trying to get things
better working with this existing approach.
  • Loading branch information
GUI committed Jul 19, 2024
1 parent 3e13abe commit 0b4c498
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 32 deletions.
15 changes: 8 additions & 7 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA public;
CREATE FUNCTION api_umbrella.analytics_cache_extract_unique_user_ids() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
IF (jsonb_typeof(NEW.data->'aggregations'->'hits_over_time'->'buckets'->0->'unique_user_ids'->'buckets') = 'array') THEN
NEW.unique_user_ids := (SELECT array_agg(DISTINCT bucket->>'key')::uuid[] FROM jsonb_array_elements(NEW.data->'aggregations'->'hits_over_time'->'buckets'->0->'unique_user_ids'->'buckets') AS bucket);
END IF;
BEGIN
IF (jsonb_typeof(NEW.data->'aggregations'->'unique_user_ids'->'buckets') = 'array') THEN
NEW.unique_user_ids := (SELECT array_agg(DISTINCT bucket->>'key')::uuid[] FROM jsonb_array_elements(NEW.data->'aggregations'->'unique_user_ids'->'buckets') AS bucket);
END IF;

RETURN NEW;
END;
$$;
RETURN NEW;
END;
$$;


--
Expand Down Expand Up @@ -2824,3 +2824,4 @@ INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1699650325');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1700281762');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1700346585');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1701483732');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1721347955');
15 changes: 8 additions & 7 deletions src/api-umbrella/web-app/actions/v0/analytics.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ local function generate_organization_summary(start_time, end_time, recent_start_
search:set_start_time(start_time)
search:set_end_time(end_time)
search:set_interval("month")
search:unset_sort()
search:filter_exclude_imported()
search:aggregate_by_interval_for_summary()
search:aggregate_by_cardinality("user_id", "user_id.hash")
search:aggregate_by_interval()
search:aggregate_by_unique_user_ids()
search:aggregate_by_response_time_average()
if config["web"]["analytics_v0_summary_filter"] then
search:set_search_query_string(config["web"]["analytics_v0_summary_filter"])
Expand Down Expand Up @@ -69,12 +70,11 @@ local function generate_organization_summary(start_time, end_time, recent_start_
) AS unique_user_ids
FROM (
SELECT
substring(bucket->>'key_as_string' from 1 for :date_key_length) AS interval_date,
SUM((bucket->>'doc_count')::bigint) AS hit_count,
substring(data->'aggregations'->'hits_over_time'->'buckets'->0->>'key_as_string' from 1 for :date_key_length) AS interval_date,
SUM((data->'aggregations'->'hits_over_time'->'buckets'->0->>'doc_count')::bigint) AS hit_count,
array_accum(unique_user_ids) AS user_ids,
ROUND(SUM(CASE WHEN bucket->'response_time_average'->>'value' IS NOT NULL AND bucket->>'doc_count' IS NOT NULL THEN (bucket->'response_time_average'->>'value')::numeric * (bucket->>'doc_count')::bigint END) / SUM(CASE WHEN bucket->'response_time_average'->>'value' IS NOT NULL AND bucket->>'doc_count' IS NOT NULL THEN (bucket->>'doc_count')::bigint END)) AS response_time_average
SUM(ROUND((data->'aggregations'->'response_time_average'->>'value')::numeric)) AS response_time_average
FROM analytics_cache
CROSS JOIN LATERAL jsonb_array_elements(data->'aggregations'->'hits_over_time'->'buckets') AS bucket
WHERE id IN :ids
GROUP BY interval_date
ORDER BY interval_date
Expand All @@ -96,8 +96,9 @@ local function generate_organization_summary(start_time, end_time, recent_start_
}, { fatal = true })[1]["response"]

search:set_start_time(recent_start_time)
search:set_end_time(end_time)
search:set_interval("day")
search:aggregate_by_interval_for_summary()
search:aggregate_by_interval()
expires_at = ngx.now() + 60 * 60 * 24 * 30 -- 30 days
local recent_analytics_cache_ids = search:cache_interval_results(expires_at)
local recent_response = pg_utils.query(aggregate_sql, {
Expand Down
33 changes: 15 additions & 18 deletions src/api-umbrella/web-app/models/analytics_search_opensearch.lua
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ function _M:set_sort(order_fields)
end
end

function _M:unset_sort()
self.body["sort"] = nil
end

function _M:set_start_time(start_time)
local ok = xpcall(date_tz.parse, xpcall_error_handler, date_tz, format_date, start_time)
if not ok then
Expand Down Expand Up @@ -395,21 +399,12 @@ function _M:aggregate_by_interval()
}
end

function _M:aggregate_by_interval_for_summary()
self:aggregate_by_interval()

self.body["aggregations"]["hits_over_time"]["aggregations"] = {
unique_user_ids = {
terms = {
field = "user_id",
size = config["opensearch"]["max_buckets"],
shard_size = config["opensearch"]["max_buckets"] * 4,
},
},
response_time_average = {
avg = {
field = "response_time",
},
function _M:aggregate_by_unique_user_ids()
self.body["aggregations"]["unique_user_ids"] = {
terms = {
field = "user_id",
size = config["opensearch"]["max_buckets"] - 5,
shard_size = config["opensearch"]["max_buckets"] * 4,
},
}
end
Expand Down Expand Up @@ -861,16 +856,18 @@ local function cache_interval_results_process_batch(self, cache_ids, batch)
override_header = id_data["header"],
override_body = id_data["body"],
})
AnalyticsCache:upsert(id_data, results, expires_at)
if results then
table.insert(cache_ids, exist["id"])
AnalyticsCache:upsert(id_data, results, expires_at)
end
else
if not update_expires_at_ids[expires_at] then
update_expires_at_ids[expires_at] = {}
end

table.insert(cache_ids, exist["id"])
table.insert(update_expires_at_ids[expires_at], exist["id"])
end

table.insert(cache_ids, exist["id"])
end

for expires_at, ids in pairs(update_expires_at_ids) do
Expand Down
20 changes: 20 additions & 0 deletions src/migrations.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1504,4 +1504,24 @@ return {
db.query(grants_sql)
db.query("COMMIT")
end,

[1721347955] = function()
db.query("BEGIN")

db.query([[
CREATE OR REPLACE FUNCTION analytics_cache_extract_unique_user_ids()
RETURNS TRIGGER AS $$
BEGIN
IF (jsonb_typeof(NEW.data->'aggregations'->'unique_user_ids'->'buckets') = 'array') THEN
NEW.unique_user_ids := (SELECT array_agg(DISTINCT bucket->>'key')::uuid[] FROM jsonb_array_elements(NEW.data->'aggregations'->'unique_user_ids'->'buckets') AS bucket);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
]])

db.query(grants_sql)
db.query("COMMIT")
end,
}

0 comments on commit 0b4c498

Please sign in to comment.