diff --git a/db/schema.sql b/db/schema.sql index 616321f3f..e98efe8bf 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -132,20 +132,6 @@ CREATE FUNCTION api_umbrella.distributed_rate_limit_counters_increment_version() $$; --- --- Name: distributed_rate_limit_counters_temp_increment_version(); Type: FUNCTION; Schema: api_umbrella; Owner: - --- - -CREATE FUNCTION api_umbrella.distributed_rate_limit_counters_temp_increment_version() RETURNS trigger - LANGUAGE plpgsql - AS $$ - BEGIN - NEW.version := nextval('distributed_rate_limit_counters_temp_version_seq'); - return NEW; - END; - $$; - - -- -- Name: path_sort_order(text); Type: FUNCTION; Schema: api_umbrella; Owner: - -- @@ -1064,7 +1050,6 @@ CREATE TABLE api_umbrella.rate_limits ( api_backend_settings_id uuid, api_user_settings_id uuid, duration bigint NOT NULL, - accuracy bigint, limit_by character varying(7) NOT NULL, limit_to bigint NOT NULL, distributed boolean DEFAULT false NOT NULL, @@ -1086,26 +1071,23 @@ CREATE TABLE api_umbrella.rate_limits ( CREATE VIEW api_umbrella.api_users_flattened AS SELECT u.id, - u.version, + u.api_key_prefix, u.api_key_hash, - u.api_key_encrypted, - u.api_key_encrypted_iv, u.email, u.email_verified, u.registration_source, u.throttle_by_ip, - date_part('epoch'::text, u.disabled_at) AS disabled_at, - date_part('epoch'::text, u.created_at) AS created_at, - json_build_object('allowed_ips', s.allowed_ips, 'allowed_referers', s.allowed_referers, 'rate_limit_mode', s.rate_limit_mode, 'rate_limits', ( SELECT json_agg(r2.*) AS json_agg + (date_part('epoch'::text, u.disabled_at))::integer AS disabled_at, + (date_part('epoch'::text, u.created_at))::integer AS created_at, + jsonb_build_object('allowed_ips', s.allowed_ips, 'allowed_referers', s.allowed_referers, 'rate_limit_mode', s.rate_limit_mode, 'rate_limits', ( SELECT jsonb_agg(r2.*) AS jsonb_agg FROM ( SELECT r.duration, - r.accuracy, r.limit_by, r.limit_to, r.distributed, r.response_headers FROM api_umbrella.rate_limits r WHERE (r.api_user_settings_id = s.id)) r2)) AS settings, - ARRAY( SELECT ar.api_role_id + ( SELECT jsonb_object_agg(ar.api_role_id, true) AS jsonb_object_agg FROM api_umbrella.api_users_roles ar WHERE (ar.api_user_id = u.id)) AS roles FROM (api_umbrella.api_users u @@ -1117,29 +1099,18 @@ CREATE VIEW api_umbrella.api_users_flattened AS -- CREATE VIEW api_umbrella.api_users_flattened_temp AS - SELECT u.id, - u.api_key_prefix, - u.api_key_hash, - u.email, - u.email_verified, - u.registration_source, - u.throttle_by_ip, - (date_part('epoch'::text, u.disabled_at))::integer AS disabled_at, - (date_part('epoch'::text, u.created_at))::integer AS created_at, - jsonb_build_object('allowed_ips', s.allowed_ips, 'allowed_referers', s.allowed_referers, 'rate_limit_mode', s.rate_limit_mode, 'rate_limits', ( SELECT jsonb_agg(r2.*) AS jsonb_agg - FROM ( SELECT r.duration, - r.accuracy, - r.limit_by, - r.limit_to, - r.distributed, - r.response_headers - FROM api_umbrella.rate_limits r - WHERE (r.api_user_settings_id = s.id)) r2)) AS settings, - ( SELECT jsonb_object_agg(ar.api_role_id, true) AS jsonb_object_agg - FROM api_umbrella.api_users_roles ar - WHERE (ar.api_user_id = u.id)) AS roles - FROM (api_umbrella.api_users u - LEFT JOIN api_umbrella.api_user_settings s ON ((u.id = s.api_user_id))); + SELECT api_users_flattened.id, + api_users_flattened.api_key_prefix, + api_users_flattened.api_key_hash, + api_users_flattened.email, + api_users_flattened.email_verified, + api_users_flattened.registration_source, + api_users_flattened.throttle_by_ip, + api_users_flattened.disabled_at, + api_users_flattened.created_at, + api_users_flattened.settings, + api_users_flattened.roles + FROM api_umbrella.api_users_flattened; -- @@ -1180,15 +1151,15 @@ CREATE UNLOGGED TABLE api_umbrella.distributed_rate_limit_counters ( -- --- Name: distributed_rate_limit_counters_temp; Type: TABLE; Schema: api_umbrella; Owner: - +-- Name: distributed_rate_limit_counters_temp; Type: VIEW; Schema: api_umbrella; Owner: - -- -CREATE UNLOGGED TABLE api_umbrella.distributed_rate_limit_counters_temp ( - id character varying(500) NOT NULL, - version bigint NOT NULL, - value bigint NOT NULL, - expires_at timestamp with time zone NOT NULL -); +CREATE VIEW api_umbrella.distributed_rate_limit_counters_temp AS + SELECT distributed_rate_limit_counters.id, + distributed_rate_limit_counters.version, + distributed_rate_limit_counters.value, + distributed_rate_limit_counters.expires_at + FROM api_umbrella.distributed_rate_limit_counters; -- @@ -1196,12 +1167,11 @@ CREATE UNLOGGED TABLE api_umbrella.distributed_rate_limit_counters_temp ( -- CREATE SEQUENCE api_umbrella.distributed_rate_limit_counters_temp_version_seq - START WITH -9223372036854775807 + START WITH 1 INCREMENT BY 1 - MINVALUE -9223372036854775807 + NO MINVALUE NO MAXVALUE - CACHE 1 - CYCLE; + CACHE 1; -- @@ -1800,14 +1770,6 @@ ALTER TABLE ONLY api_umbrella.distributed_rate_limit_counters ADD CONSTRAINT distributed_rate_limit_counters_pkey PRIMARY KEY (id); --- --- Name: distributed_rate_limit_counters_temp distributed_rate_limit_counters_temp_pkey; Type: CONSTRAINT; Schema: api_umbrella; Owner: - --- - -ALTER TABLE ONLY api_umbrella.distributed_rate_limit_counters_temp - ADD CONSTRAINT distributed_rate_limit_counters_temp_pkey PRIMARY KEY (id); - - -- -- Name: lapis_migrations lapis_migrations_pkey; Type: CONSTRAINT; Schema: api_umbrella; Owner: - -- @@ -2005,27 +1967,6 @@ CREATE INDEX cache_expires_at_idx ON api_umbrella.cache USING btree (expires_at) CREATE INDEX distributed_rate_limit_counters_expires_at_idx ON api_umbrella.distributed_rate_limit_counters USING btree (expires_at); --- --- Name: distributed_rate_limit_counters_temp_expires_at_idx; Type: INDEX; Schema: api_umbrella; Owner: - --- - -CREATE INDEX distributed_rate_limit_counters_temp_expires_at_idx ON api_umbrella.distributed_rate_limit_counters_temp USING btree (expires_at); - - --- --- Name: distributed_rate_limit_counters_temp_version_expires_at_idx; Type: INDEX; Schema: api_umbrella; Owner: - --- - -CREATE INDEX distributed_rate_limit_counters_temp_version_expires_at_idx ON api_umbrella.distributed_rate_limit_counters_temp USING btree (version, expires_at); - - --- --- Name: distributed_rate_limit_counters_temp_version_idx; Type: INDEX; Schema: api_umbrella; Owner: - --- - -CREATE UNIQUE INDEX distributed_rate_limit_counters_temp_version_idx ON api_umbrella.distributed_rate_limit_counters_temp USING btree (version); - - -- -- Name: distributed_rate_limit_counters_version_expires_at_idx; Type: INDEX; Schema: api_umbrella; Owner: - -- @@ -2579,13 +2520,6 @@ CREATE TRIGGER cache_stamp_record BEFORE UPDATE ON api_umbrella.cache FOR EACH R CREATE TRIGGER distributed_rate_limit_counters_increment_version_trigger BEFORE INSERT OR UPDATE ON api_umbrella.distributed_rate_limit_counters FOR EACH ROW EXECUTE FUNCTION api_umbrella.distributed_rate_limit_counters_increment_version(); --- --- Name: distributed_rate_limit_counters_temp distributed_rate_limit_counters_temp_increment_version_trigger; Type: TRIGGER; Schema: api_umbrella; Owner: - --- - -CREATE TRIGGER distributed_rate_limit_counters_temp_increment_version_trigger BEFORE INSERT OR UPDATE ON api_umbrella.distributed_rate_limit_counters_temp FOR EACH ROW EXECUTE FUNCTION api_umbrella.distributed_rate_limit_counters_temp_increment_version(); - - -- -- Name: published_config published_config_stamp_record; Type: TRIGGER; Schema: api_umbrella; Owner: - -- @@ -2788,3 +2722,4 @@ INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1635022846'); INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1645733075'); INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1647916501'); INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1651280172'); +INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1699559596'); diff --git a/src/api-umbrella/proxy/jobs/db_expirations.lua b/src/api-umbrella/proxy/jobs/db_expirations.lua index e07ab9cf4..20d747e7e 100644 --- a/src/api-umbrella/proxy/jobs/db_expirations.lua +++ b/src/api-umbrella/proxy/jobs/db_expirations.lua @@ -9,8 +9,7 @@ local function do_run() local queries = { "DELETE FROM analytics_cache WHERE expires_at IS NOT NULL AND expires_at < now()", "DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < now()", - -- TODO: Remove "_temp" once done testing new rate limiting strategy in parallel. - "DELETE FROM distributed_rate_limit_counters_temp WHERE expires_at < now()", + "DELETE FROM distributed_rate_limit_counters WHERE expires_at < now()", "DELETE FROM sessions WHERE expires_at < now()", } for _, query in ipairs(queries) do diff --git a/src/api-umbrella/proxy/stores/api_users_store.lua b/src/api-umbrella/proxy/stores/api_users_store.lua index 5535ca9c9..286ecd952 100644 --- a/src/api-umbrella/proxy/stores/api_users_store.lua +++ b/src/api-umbrella/proxy/stores/api_users_store.lua @@ -54,12 +54,9 @@ local function fetch_user(api_key_prefix, api_key) -- Allow for retries to deal with sporadic database connection issues. -- Ideally this shouldn't be necessary, but enable while we're debugging some -- sporadic connection issues. - -- - -- TODO: Replace with `api_users_flattened` once we're done testing different - -- approaches in parallel. local result, err for i = 1, 5 do - result, err = query("SELECT * FROM api_users_flattened_temp WHERE api_key_prefix = :api_key_prefix", { api_key_prefix = api_key_prefix }) + result, err = query("SELECT * FROM api_users_flattened WHERE api_key_prefix = :api_key_prefix", { api_key_prefix = api_key_prefix }) if not result then ngx.log(ngx.ERR, "failed to fetch user from database, attempt " .. i .. ": ", err) sleep(0.1) diff --git a/src/api-umbrella/proxy/stores/rate_limit_counters_store.lua b/src/api-umbrella/proxy/stores/rate_limit_counters_store.lua index a6b95f48a..b28be00f0 100644 --- a/src/api-umbrella/proxy/stores/rate_limit_counters_store.lua +++ b/src/api-umbrella/proxy/stores/rate_limit_counters_store.lua @@ -331,8 +331,7 @@ function _M.distributed_push() local period_start_time = tonumber(key_parts[5]) local expires_at = ceil(period_start_time + duration * 2 + 1) - -- TODO: Remove "_temp" once done testing new rate limiting strategy in parallel. - local result, err = pg_utils_query("INSERT INTO distributed_rate_limit_counters_temp(id, value, expires_at) VALUES(:id, :value, to_timestamp(:expires_at)) ON CONFLICT (id) DO UPDATE SET value = distributed_rate_limit_counters_temp.value + EXCLUDED.value", { + local result, err = pg_utils_query("INSERT INTO distributed_rate_limit_counters(id, value, expires_at) VALUES(:id, :value, to_timestamp(:expires_at)) ON CONFLICT (id) DO UPDATE SET value = distributed_rate_limit_counters.value + EXCLUDED.value", { id = key, value = count, expires_at = expires_at, @@ -372,8 +371,7 @@ function _M.distributed_pull() -- cycle and start over with negative values. Since the data in this table -- expires, there shouldn't be any duplicate version numbers by the time the -- sequence cycles. - -- TODO: Remove "_temp" once done testing new rate limiting strategy in parallel. - local results, err = pg_utils_query("SELECT id, version, value, extract(epoch FROM expires_at) AS expires_at FROM distributed_rate_limit_counters_temp WHERE version > LEAST(:version, (SELECT last_value - 1 FROM distributed_rate_limit_counters_temp_version_seq)) AND expires_at >= now() ORDER BY version DESC", { version = last_fetched_version }, { quiet = true }) + local results, err = pg_utils_query("SELECT id, version, value, extract(epoch FROM expires_at) AS expires_at FROM distributed_rate_limit_counters WHERE version > LEAST(:version, (SELECT last_value - 1 FROM distributed_rate_limit_counters_version_seq)) AND expires_at >= now() ORDER BY version DESC", { version = last_fetched_version }, { quiet = true }) if not results then ngx.log(ngx.ERR, "failed to fetch rate limits from database: ", err) return nil diff --git a/src/api-umbrella/version.txt b/src/api-umbrella/version.txt index a32b1dc9a..3fbcbfd8a 100644 --- a/src/api-umbrella/version.txt +++ b/src/api-umbrella/version.txt @@ -1 +1 @@ -0.17.0-pre15 +0.17.0-pre16 diff --git a/src/migrations.lua b/src/migrations.lua index 4f955dcc1..ca570c337 100644 --- a/src/migrations.lua +++ b/src/migrations.lua @@ -3,6 +3,9 @@ local json_encode = require "api-umbrella.utils.json_encode" local path_join = require "api-umbrella.utils.path_join" local readfile = require("pl.utils").readfile +local grants_sql_path = path_join(os.getenv("API_UMBRELLA_SRC_ROOT"), "db/grants.sql") +local grants_sql = readfile(grants_sql_path, true) + return { [1498350289] = function() db.query("START TRANSACTION") @@ -1090,7 +1093,7 @@ return { end, [1635022846] = function() - -- TODO: Drop and replace `api_users_flattened` view once we're done + -- Done (1699559596): Drop and replace `api_users_flattened` view once we're done -- testing two different stacks in parallel. But for now, keep the old view -- as-is so we can test this new one separately. db.query([[ @@ -1180,7 +1183,7 @@ return { [1651280172] = function() db.query("BEGIN") - -- TODO: Drop column altogether and remove from api_users_flattened view + -- Done (1699559596): Drop column altogether and remove from api_users_flattened view -- once we're not testing the two different rate limiting approaches in -- parallel. But keep for now while some systems still use the accuracy -- approach. @@ -1188,7 +1191,7 @@ return { db.query("ALTER TABLE distributed_rate_limit_counters SET UNLOGGED") - -- TODO: Drop this "temp" version of the table once we're done testing two + -- Done (1699559596): Drop this "temp" version of the table once we're done testing two -- different rate limit approaches in parallel. But we're keeping a -- separate table for testing the new rate limit implementation so there's -- not mixup between the different key types. @@ -1217,4 +1220,97 @@ return { db.query("COMMIT") end, + + [1699559596] = function() + db.query("BEGIN") + + -- Make the temp version the live version, removing the unused "accuracy" + -- references. But still maintain the temp version for rollout purposes. + db.query([[ + DROP VIEW api_users_flattened; + CREATE VIEW api_users_flattened AS + SELECT + u.id, + u.api_key_prefix, + u.api_key_hash, + u.email, + u.email_verified, + u.registration_source, + u.throttle_by_ip, + extract(epoch from u.disabled_at)::int AS disabled_at, + extract(epoch from u.created_at)::int AS created_at, + jsonb_build_object( + 'allowed_ips', s.allowed_ips, + 'allowed_referers', s.allowed_referers, + 'rate_limit_mode', s.rate_limit_mode, + 'rate_limits', ( + SELECT jsonb_agg(r2.*) + FROM ( + SELECT + r.duration, + r.limit_by, + r.limit_to, + r.distributed, + r.response_headers + FROM rate_limits AS r + WHERE r.api_user_settings_id = s.id + ) AS r2 + ) + ) AS settings, + ( + SELECT jsonb_object_agg(ar.api_role_id, true) + FROM api_users_roles AS ar WHERE ar.api_user_id = u.id + ) AS roles + FROM api_users AS u + LEFT JOIN api_user_settings AS s ON u.id = s.api_user_id; + + DROP VIEW api_users_flattened_temp; + CREATE VIEW api_users_flattened_temp AS + SELECT * FROM api_users_flattened; + ]]) + + -- Drop unused column (from 1651280172) + db.query("ALTER TABLE rate_limits DROP COLUMN accuracy") + + -- Make "temp" versions the live versions (from 1651280172) + db.query("DROP TABLE distributed_rate_limit_counters") + db.query("ALTER TABLE distributed_rate_limit_counters_temp RENAME TO distributed_rate_limit_counters") + db.query("ALTER TABLE distributed_rate_limit_counters RENAME CONSTRAINT distributed_rate_limit_counters_temp_pkey TO distributed_rate_limit_counters_pkey") + db.query("ALTER INDEX distributed_rate_limit_counters_temp_expires_at_idx RENAME TO distributed_rate_limit_counters_expires_at_idx") + db.query("ALTER INDEX distributed_rate_limit_counters_temp_version_expires_at_idx RENAME TO distributed_rate_limit_counters_version_expires_at_idx") + db.query("ALTER INDEX distributed_rate_limit_counters_temp_version_idx RENAME TO distributed_rate_limit_counters_version_idx") + db.query("DROP SEQUENCE distributed_rate_limit_counters_version_seq") + db.query("ALTER SEQUENCE distributed_rate_limit_counters_temp_version_seq RENAME TO distributed_rate_limit_counters_version_seq") + db.query("CREATE SEQUENCE distributed_rate_limit_counters_temp_version_seq"); + db.query([[ + CREATE OR REPLACE FUNCTION distributed_rate_limit_counters_increment_version() + RETURNS TRIGGER AS $$ + BEGIN + NEW.version := nextval('distributed_rate_limit_counters_version_seq'); + return NEW; + END; + $$ LANGUAGE plpgsql; + ]]) + db.query("DROP TRIGGER distributed_rate_limit_counters_temp_increment_version_trigger ON distributed_rate_limit_counters") + db.query("DROP FUNCTION distributed_rate_limit_counters_temp_increment_version") + db.query("CREATE TRIGGER distributed_rate_limit_counters_increment_version_trigger BEFORE INSERT OR UPDATE ON distributed_rate_limit_counters FOR EACH ROW EXECUTE PROCEDURE distributed_rate_limit_counters_increment_version()") + + -- Maintain a "_temp" version for compatibility with rollout. + db.query("CREATE VIEW distributed_rate_limit_counters_temp AS SELECT * FROM distributed_rate_limit_counters") + + db.query(grants_sql) + db.query("COMMIT") + end, + + -- TODO: Enable to finish _temp cleanup. + -- [1699559696] = function() + -- db.query("BEGIN") + + -- db.query("DROP VIEW distributed_rate_limit_counters_temp") + -- db.query("DROP SEQUENCE distributed_rate_limit_counters_temp_version_seq") + -- db.query("DROP VIEW api_users_flattened_temp") + + -- db.query(grants_sql) + -- db.query("COMMIT") + -- end, } diff --git a/test/proxy/rate_limits/test_distributed_rate_limits.rb b/test/proxy/rate_limits/test_distributed_rate_limits.rb index 5f049202e..64bb205af 100644 --- a/test/proxy/rate_limits/test_distributed_rate_limits.rb +++ b/test/proxy/rate_limits/test_distributed_rate_limits.rb @@ -337,8 +337,7 @@ def test_polls_for_distributed_changes begin # Alter the sequence so that the next value is near the boundary for # bigints. - # TODO: Remove "_temp" once done testing new rate limiting strategy in parallel. - DistributedRateLimitCounter.connection.execute("ALTER SEQUENCE distributed_rate_limit_counters_temp_version_seq RESTART WITH #{sequence_start_val}") + DistributedRateLimitCounter.connection.execute("ALTER SEQUENCE distributed_rate_limit_counters_version_seq RESTART WITH #{sequence_start_val}") # Manually set the distributed count to insert a single record. set_distributed_count(20, options) @@ -393,8 +392,7 @@ def test_polls_for_distributed_changes assert_equal(99, counter.value) ensure # Restore default sequence settings. - # TODO: Remove "_temp" once done testing new rate limiting strategy in parallel. - DistributedRateLimitCounter.connection.execute("ALTER SEQUENCE distributed_rate_limit_counters_temp_version_seq RESTART WITH -9223372036854775807") + DistributedRateLimitCounter.connection.execute("ALTER SEQUENCE distributed_rate_limit_counters_version_seq RESTART WITH -9223372036854775807") end end end @@ -435,8 +433,7 @@ def set_distributed_count(count, options = {}) key = "k|#{format("%g", duration_sec)}|#{host}|#{options.fetch(:api_user).api_key_prefix}|#{period_start_time}" expires_at = Time.at((period_start_time + (duration_sec * 2) + 60).ceil).utc - # TODO: Remove "_temp" once done testing new rate limiting strategy in parallel. - DistributedRateLimitCounter.connection.execute("INSERT INTO distributed_rate_limit_counters_temp(id, value, expires_at) VALUES(#{DistributedRateLimitCounter.connection.quote(key)}, #{DistributedRateLimitCounter.connection.quote(count)}, #{DistributedRateLimitCounter.connection.quote(expires_at)}) ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value") + DistributedRateLimitCounter.connection.execute("INSERT INTO distributed_rate_limit_counters(id, value, expires_at) VALUES(#{DistributedRateLimitCounter.connection.quote(key)}, #{DistributedRateLimitCounter.connection.quote(count)}, #{DistributedRateLimitCounter.connection.quote(expires_at)}) ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value") end def assert_distributed_count(expected_count, options = {}) diff --git a/test/support/models/distributed_rate_limit_counter.rb b/test/support/models/distributed_rate_limit_counter.rb index c3bd286f5..4ef7efc75 100644 --- a/test/support/models/distributed_rate_limit_counter.rb +++ b/test/support/models/distributed_rate_limit_counter.rb @@ -1,4 +1,3 @@ class DistributedRateLimitCounter < ApplicationRecord - # TODO: Remove "_temp" once done testing new rate limiting strategy in parallel. - self.table_name = "distributed_rate_limit_counters_temp" + self.table_name = "distributed_rate_limit_counters" end