Skip to content

Commit

Permalink
Begin to cleanup "_temp" naming in tables.
Browse files Browse the repository at this point in the history
Run grants inside migration to ensure no downtime during rollouts.
  • Loading branch information
GUI committed Nov 10, 2023
1 parent 2dce21b commit 6ff52a3
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 115 deletions.
121 changes: 28 additions & 93 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,6 @@ CREATE FUNCTION api_umbrella.distributed_rate_limit_counters_increment_version()
$$;


--
-- Name: distributed_rate_limit_counters_temp_increment_version(); Type: FUNCTION; Schema: api_umbrella; Owner: -
--

CREATE FUNCTION api_umbrella.distributed_rate_limit_counters_temp_increment_version() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
NEW.version := nextval('distributed_rate_limit_counters_temp_version_seq');
return NEW;
END;
$$;


--
-- Name: path_sort_order(text); Type: FUNCTION; Schema: api_umbrella; Owner: -
--
Expand Down Expand Up @@ -1064,7 +1050,6 @@ CREATE TABLE api_umbrella.rate_limits (
api_backend_settings_id uuid,
api_user_settings_id uuid,
duration bigint NOT NULL,
accuracy bigint,
limit_by character varying(7) NOT NULL,
limit_to bigint NOT NULL,
distributed boolean DEFAULT false NOT NULL,
Expand All @@ -1086,26 +1071,23 @@ CREATE TABLE api_umbrella.rate_limits (

CREATE VIEW api_umbrella.api_users_flattened AS
SELECT u.id,
u.version,
u.api_key_prefix,
u.api_key_hash,
u.api_key_encrypted,
u.api_key_encrypted_iv,
u.email,
u.email_verified,
u.registration_source,
u.throttle_by_ip,
date_part('epoch'::text, u.disabled_at) AS disabled_at,
date_part('epoch'::text, u.created_at) AS created_at,
json_build_object('allowed_ips', s.allowed_ips, 'allowed_referers', s.allowed_referers, 'rate_limit_mode', s.rate_limit_mode, 'rate_limits', ( SELECT json_agg(r2.*) AS json_agg
(date_part('epoch'::text, u.disabled_at))::integer AS disabled_at,
(date_part('epoch'::text, u.created_at))::integer AS created_at,
jsonb_build_object('allowed_ips', s.allowed_ips, 'allowed_referers', s.allowed_referers, 'rate_limit_mode', s.rate_limit_mode, 'rate_limits', ( SELECT jsonb_agg(r2.*) AS jsonb_agg
FROM ( SELECT r.duration,
r.accuracy,
r.limit_by,
r.limit_to,
r.distributed,
r.response_headers
FROM api_umbrella.rate_limits r
WHERE (r.api_user_settings_id = s.id)) r2)) AS settings,
ARRAY( SELECT ar.api_role_id
( SELECT jsonb_object_agg(ar.api_role_id, true) AS jsonb_object_agg
FROM api_umbrella.api_users_roles ar
WHERE (ar.api_user_id = u.id)) AS roles
FROM (api_umbrella.api_users u
Expand All @@ -1117,29 +1099,18 @@ CREATE VIEW api_umbrella.api_users_flattened AS
--

CREATE VIEW api_umbrella.api_users_flattened_temp AS
SELECT u.id,
u.api_key_prefix,
u.api_key_hash,
u.email,
u.email_verified,
u.registration_source,
u.throttle_by_ip,
(date_part('epoch'::text, u.disabled_at))::integer AS disabled_at,
(date_part('epoch'::text, u.created_at))::integer AS created_at,
jsonb_build_object('allowed_ips', s.allowed_ips, 'allowed_referers', s.allowed_referers, 'rate_limit_mode', s.rate_limit_mode, 'rate_limits', ( SELECT jsonb_agg(r2.*) AS jsonb_agg
FROM ( SELECT r.duration,
r.accuracy,
r.limit_by,
r.limit_to,
r.distributed,
r.response_headers
FROM api_umbrella.rate_limits r
WHERE (r.api_user_settings_id = s.id)) r2)) AS settings,
( SELECT jsonb_object_agg(ar.api_role_id, true) AS jsonb_object_agg
FROM api_umbrella.api_users_roles ar
WHERE (ar.api_user_id = u.id)) AS roles
FROM (api_umbrella.api_users u
LEFT JOIN api_umbrella.api_user_settings s ON ((u.id = s.api_user_id)));
SELECT api_users_flattened.id,
api_users_flattened.api_key_prefix,
api_users_flattened.api_key_hash,
api_users_flattened.email,
api_users_flattened.email_verified,
api_users_flattened.registration_source,
api_users_flattened.throttle_by_ip,
api_users_flattened.disabled_at,
api_users_flattened.created_at,
api_users_flattened.settings,
api_users_flattened.roles
FROM api_umbrella.api_users_flattened;


--
Expand Down Expand Up @@ -1180,28 +1151,27 @@ CREATE UNLOGGED TABLE api_umbrella.distributed_rate_limit_counters (


--
-- Name: distributed_rate_limit_counters_temp; Type: TABLE; Schema: api_umbrella; Owner: -
-- Name: distributed_rate_limit_counters_temp; Type: VIEW; Schema: api_umbrella; Owner: -
--

CREATE UNLOGGED TABLE api_umbrella.distributed_rate_limit_counters_temp (
id character varying(500) NOT NULL,
version bigint NOT NULL,
value bigint NOT NULL,
expires_at timestamp with time zone NOT NULL
);
CREATE VIEW api_umbrella.distributed_rate_limit_counters_temp AS
SELECT distributed_rate_limit_counters.id,
distributed_rate_limit_counters.version,
distributed_rate_limit_counters.value,
distributed_rate_limit_counters.expires_at
FROM api_umbrella.distributed_rate_limit_counters;


--
-- Name: distributed_rate_limit_counters_temp_version_seq; Type: SEQUENCE; Schema: api_umbrella; Owner: -
--

CREATE SEQUENCE api_umbrella.distributed_rate_limit_counters_temp_version_seq
START WITH -9223372036854775807
START WITH 1
INCREMENT BY 1
MINVALUE -9223372036854775807
NO MINVALUE
NO MAXVALUE
CACHE 1
CYCLE;
CACHE 1;


--
Expand Down Expand Up @@ -1800,14 +1770,6 @@ ALTER TABLE ONLY api_umbrella.distributed_rate_limit_counters
ADD CONSTRAINT distributed_rate_limit_counters_pkey PRIMARY KEY (id);


--
-- Name: distributed_rate_limit_counters_temp distributed_rate_limit_counters_temp_pkey; Type: CONSTRAINT; Schema: api_umbrella; Owner: -
--

ALTER TABLE ONLY api_umbrella.distributed_rate_limit_counters_temp
ADD CONSTRAINT distributed_rate_limit_counters_temp_pkey PRIMARY KEY (id);


--
-- Name: lapis_migrations lapis_migrations_pkey; Type: CONSTRAINT; Schema: api_umbrella; Owner: -
--
Expand Down Expand Up @@ -2005,27 +1967,6 @@ CREATE INDEX cache_expires_at_idx ON api_umbrella.cache USING btree (expires_at)
CREATE INDEX distributed_rate_limit_counters_expires_at_idx ON api_umbrella.distributed_rate_limit_counters USING btree (expires_at);


--
-- Name: distributed_rate_limit_counters_temp_expires_at_idx; Type: INDEX; Schema: api_umbrella; Owner: -
--

CREATE INDEX distributed_rate_limit_counters_temp_expires_at_idx ON api_umbrella.distributed_rate_limit_counters_temp USING btree (expires_at);


--
-- Name: distributed_rate_limit_counters_temp_version_expires_at_idx; Type: INDEX; Schema: api_umbrella; Owner: -
--

CREATE INDEX distributed_rate_limit_counters_temp_version_expires_at_idx ON api_umbrella.distributed_rate_limit_counters_temp USING btree (version, expires_at);


--
-- Name: distributed_rate_limit_counters_temp_version_idx; Type: INDEX; Schema: api_umbrella; Owner: -
--

CREATE UNIQUE INDEX distributed_rate_limit_counters_temp_version_idx ON api_umbrella.distributed_rate_limit_counters_temp USING btree (version);


--
-- Name: distributed_rate_limit_counters_version_expires_at_idx; Type: INDEX; Schema: api_umbrella; Owner: -
--
Expand Down Expand Up @@ -2579,13 +2520,6 @@ CREATE TRIGGER cache_stamp_record BEFORE UPDATE ON api_umbrella.cache FOR EACH R
CREATE TRIGGER distributed_rate_limit_counters_increment_version_trigger BEFORE INSERT OR UPDATE ON api_umbrella.distributed_rate_limit_counters FOR EACH ROW EXECUTE FUNCTION api_umbrella.distributed_rate_limit_counters_increment_version();


--
-- Name: distributed_rate_limit_counters_temp distributed_rate_limit_counters_temp_increment_version_trigger; Type: TRIGGER; Schema: api_umbrella; Owner: -
--

CREATE TRIGGER distributed_rate_limit_counters_temp_increment_version_trigger BEFORE INSERT OR UPDATE ON api_umbrella.distributed_rate_limit_counters_temp FOR EACH ROW EXECUTE FUNCTION api_umbrella.distributed_rate_limit_counters_temp_increment_version();


--
-- Name: published_config published_config_stamp_record; Type: TRIGGER; Schema: api_umbrella; Owner: -
--
Expand Down Expand Up @@ -2788,3 +2722,4 @@ INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1635022846');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1645733075');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1647916501');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1651280172');
INSERT INTO api_umbrella.lapis_migrations (name) VALUES ('1699559596');
3 changes: 1 addition & 2 deletions src/api-umbrella/proxy/jobs/db_expirations.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ local function do_run()
local queries = {
"DELETE FROM analytics_cache WHERE expires_at IS NOT NULL AND expires_at < now()",
"DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < now()",
-- TODO: Remove "_temp" once done testing new rate limiting strategy in parallel.
"DELETE FROM distributed_rate_limit_counters_temp WHERE expires_at < now()",
"DELETE FROM distributed_rate_limit_counters WHERE expires_at < now()",
"DELETE FROM sessions WHERE expires_at < now()",
}
for _, query in ipairs(queries) do
Expand Down
5 changes: 1 addition & 4 deletions src/api-umbrella/proxy/stores/api_users_store.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,9 @@ local function fetch_user(api_key_prefix, api_key)
-- Allow for retries to deal with sporadic database connection issues.
-- Ideally this shouldn't be necessary, but enable while we're debugging some
-- sporadic connection issues.
--
-- TODO: Replace with `api_users_flattened` once we're done testing different
-- approaches in parallel.
local result, err
for i = 1, 5 do
result, err = query("SELECT * FROM api_users_flattened_temp WHERE api_key_prefix = :api_key_prefix", { api_key_prefix = api_key_prefix })
result, err = query("SELECT * FROM api_users_flattened WHERE api_key_prefix = :api_key_prefix", { api_key_prefix = api_key_prefix })
if not result then
ngx.log(ngx.ERR, "failed to fetch user from database, attempt " .. i .. ": ", err)
sleep(0.1)
Expand Down
6 changes: 2 additions & 4 deletions src/api-umbrella/proxy/stores/rate_limit_counters_store.lua
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ function _M.distributed_push()
local period_start_time = tonumber(key_parts[5])
local expires_at = ceil(period_start_time + duration * 2 + 1)

-- TODO: Remove "_temp" once done testing new rate limiting strategy in parallel.
local result, err = pg_utils_query("INSERT INTO distributed_rate_limit_counters_temp(id, value, expires_at) VALUES(:id, :value, to_timestamp(:expires_at)) ON CONFLICT (id) DO UPDATE SET value = distributed_rate_limit_counters_temp.value + EXCLUDED.value", {
local result, err = pg_utils_query("INSERT INTO distributed_rate_limit_counters(id, value, expires_at) VALUES(:id, :value, to_timestamp(:expires_at)) ON CONFLICT (id) DO UPDATE SET value = distributed_rate_limit_counters.value + EXCLUDED.value", {
id = key,
value = count,
expires_at = expires_at,
Expand Down Expand Up @@ -372,8 +371,7 @@ function _M.distributed_pull()
-- cycle and start over with negative values. Since the data in this table
-- expires, there shouldn't be any duplicate version numbers by the time the
-- sequence cycles.
-- TODO: Remove "_temp" once done testing new rate limiting strategy in parallel.
local results, err = pg_utils_query("SELECT id, version, value, extract(epoch FROM expires_at) AS expires_at FROM distributed_rate_limit_counters_temp WHERE version > LEAST(:version, (SELECT last_value - 1 FROM distributed_rate_limit_counters_temp_version_seq)) AND expires_at >= now() ORDER BY version DESC", { version = last_fetched_version }, { quiet = true })
local results, err = pg_utils_query("SELECT id, version, value, extract(epoch FROM expires_at) AS expires_at FROM distributed_rate_limit_counters WHERE version > LEAST(:version, (SELECT last_value - 1 FROM distributed_rate_limit_counters_version_seq)) AND expires_at >= now() ORDER BY version DESC", { version = last_fetched_version }, { quiet = true })
if not results then
ngx.log(ngx.ERR, "failed to fetch rate limits from database: ", err)
return nil
Expand Down
2 changes: 1 addition & 1 deletion src/api-umbrella/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.17.0-pre15
0.17.0-pre16
102 changes: 99 additions & 3 deletions src/migrations.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ local json_encode = require "api-umbrella.utils.json_encode"
local path_join = require "api-umbrella.utils.path_join"
local readfile = require("pl.utils").readfile

local grants_sql_path = path_join(os.getenv("API_UMBRELLA_SRC_ROOT"), "db/grants.sql")
local grants_sql = readfile(grants_sql_path, true)

return {
[1498350289] = function()
db.query("START TRANSACTION")
Expand Down Expand Up @@ -1090,7 +1093,7 @@ return {
end,

[1635022846] = function()
-- TODO: Drop and replace `api_users_flattened` view once we're done
-- Done (1699559596): Drop and replace `api_users_flattened` view once we're done
-- testing two different stacks in parallel. But for now, keep the old view
-- as-is so we can test this new one separately.
db.query([[
Expand Down Expand Up @@ -1180,15 +1183,15 @@ return {
[1651280172] = function()
db.query("BEGIN")

-- TODO: Drop column altogether and remove from api_users_flattened view
-- Done (1699559596): Drop column altogether and remove from api_users_flattened view
-- once we're not testing the two different rate limiting approaches in
-- parallel. But keep for now while some systems still use the accuracy
-- approach.
db.query("ALTER TABLE rate_limits ALTER COLUMN accuracy DROP NOT NULL")

db.query("ALTER TABLE distributed_rate_limit_counters SET UNLOGGED")

-- TODO: Drop this "temp" version of the table once we're done testing two
-- Done (1699559596): Drop this "temp" version of the table once we're done testing two
-- different rate limit approaches in parallel. But we're keeping a
-- separate table for testing the new rate limit implementation so there's
-- not mixup between the different key types.
Expand Down Expand Up @@ -1217,4 +1220,97 @@ return {

db.query("COMMIT")
end,

[1699559596] = function()
db.query("BEGIN")

-- Make the temp version the live version, removing the unused "accuracy"
-- references. But still maintain the temp version for rollout purposes.
db.query([[
DROP VIEW api_users_flattened;
CREATE VIEW api_users_flattened AS
SELECT
u.id,
u.api_key_prefix,
u.api_key_hash,
u.email,
u.email_verified,
u.registration_source,
u.throttle_by_ip,
extract(epoch from u.disabled_at)::int AS disabled_at,
extract(epoch from u.created_at)::int AS created_at,
jsonb_build_object(
'allowed_ips', s.allowed_ips,
'allowed_referers', s.allowed_referers,
'rate_limit_mode', s.rate_limit_mode,
'rate_limits', (
SELECT jsonb_agg(r2.*)
FROM (
SELECT
r.duration,
r.limit_by,
r.limit_to,
r.distributed,
r.response_headers
FROM rate_limits AS r
WHERE r.api_user_settings_id = s.id
) AS r2
)
) AS settings,
(
SELECT jsonb_object_agg(ar.api_role_id, true)
FROM api_users_roles AS ar WHERE ar.api_user_id = u.id
) AS roles
FROM api_users AS u
LEFT JOIN api_user_settings AS s ON u.id = s.api_user_id;
DROP VIEW api_users_flattened_temp;
CREATE VIEW api_users_flattened_temp AS
SELECT * FROM api_users_flattened;
]])

-- Drop unused column (from 1651280172)
db.query("ALTER TABLE rate_limits DROP COLUMN accuracy")

-- Make "temp" versions the live versions (from 1651280172)
db.query("DROP TABLE distributed_rate_limit_counters")
db.query("ALTER TABLE distributed_rate_limit_counters_temp RENAME TO distributed_rate_limit_counters")
db.query("ALTER TABLE distributed_rate_limit_counters RENAME CONSTRAINT distributed_rate_limit_counters_temp_pkey TO distributed_rate_limit_counters_pkey")
db.query("ALTER INDEX distributed_rate_limit_counters_temp_expires_at_idx RENAME TO distributed_rate_limit_counters_expires_at_idx")
db.query("ALTER INDEX distributed_rate_limit_counters_temp_version_expires_at_idx RENAME TO distributed_rate_limit_counters_version_expires_at_idx")
db.query("ALTER INDEX distributed_rate_limit_counters_temp_version_idx RENAME TO distributed_rate_limit_counters_version_idx")
db.query("DROP SEQUENCE distributed_rate_limit_counters_version_seq")
db.query("ALTER SEQUENCE distributed_rate_limit_counters_temp_version_seq RENAME TO distributed_rate_limit_counters_version_seq")
db.query("CREATE SEQUENCE distributed_rate_limit_counters_temp_version_seq");
db.query([[
CREATE OR REPLACE FUNCTION distributed_rate_limit_counters_increment_version()
RETURNS TRIGGER AS $$
BEGIN
NEW.version := nextval('distributed_rate_limit_counters_version_seq');
return NEW;
END;
$$ LANGUAGE plpgsql;
]])
db.query("DROP TRIGGER distributed_rate_limit_counters_temp_increment_version_trigger ON distributed_rate_limit_counters")
db.query("DROP FUNCTION distributed_rate_limit_counters_temp_increment_version")
db.query("CREATE TRIGGER distributed_rate_limit_counters_increment_version_trigger BEFORE INSERT OR UPDATE ON distributed_rate_limit_counters FOR EACH ROW EXECUTE PROCEDURE distributed_rate_limit_counters_increment_version()")

-- Maintain a "_temp" version for compatibility with rollout.
db.query("CREATE VIEW distributed_rate_limit_counters_temp AS SELECT * FROM distributed_rate_limit_counters")

db.query(grants_sql)
db.query("COMMIT")
end,

-- TODO: Enable to finish _temp cleanup.
-- [1699559696] = function()
-- db.query("BEGIN")

-- db.query("DROP VIEW distributed_rate_limit_counters_temp")
-- db.query("DROP SEQUENCE distributed_rate_limit_counters_temp_version_seq")
-- db.query("DROP VIEW api_users_flattened_temp")

-- db.query(grants_sql)
-- db.query("COMMIT")
-- end,
}
Loading

0 comments on commit 6ff52a3

Please sign in to comment.