-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix incompatibility with Postgres 14.x (#117)
Postgres v14.0 to v14.5 had an issue with functions taking a `RECORD` typed argument when within a single transaction this type was instantiated with different actual types (tables). In combination with my refactorings in Carbonite v0.12.0, extracting two smaller procs from the main change tracking procedure. This patch reverts these refactorings and restores compatibility with the affected Postgres versions. Refactorings weren't exactly pretty, either. But the issues with the 150 line procedure I keep copying over to new versions remain. Fixes #112
- Loading branch information
Showing
7 changed files
with
193 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
defmodule Carbonite.Migrations.V11 do | ||
@moduledoc false | ||
|
||
use Ecto.Migration | ||
use Carbonite.Migrations.Version | ||
alias Carbonite.Migrations.{V10, V8} | ||
|
||
@type prefix :: binary() | ||
|
||
defp create_capture_changes_procedure(prefix) do | ||
""" | ||
CREATE OR REPLACE FUNCTION #{prefix}.capture_changes() RETURNS TRIGGER AS | ||
$body$ | ||
DECLARE | ||
trigger_row RECORD; | ||
change_row #{prefix}.changes; | ||
pk_source RECORD; | ||
pk_col VARCHAR; | ||
pk_col_val VARCHAR; | ||
BEGIN | ||
/* load trigger config */ | ||
WITH settings AS (SELECT NULLIF(current_setting('#{prefix}.override_mode', TRUE), '')::TEXT AS override_mode) | ||
SELECT | ||
primary_key_columns, | ||
excluded_columns, | ||
filtered_columns, | ||
CASE | ||
WHEN settings.override_mode = 'override' AND mode = 'ignore' THEN 'capture' | ||
WHEN settings.override_mode = 'override' AND mode = 'capture' THEN 'ignore' | ||
ELSE COALESCE(settings.override_mode, mode::text) | ||
END AS mode, | ||
store_changed_from | ||
INTO trigger_row | ||
FROM #{prefix}.triggers | ||
JOIN settings ON TRUE | ||
WHERE table_prefix = TG_TABLE_SCHEMA AND table_name = TG_TABLE_NAME; | ||
/* skip if ignored */ | ||
IF (trigger_row.mode = 'ignore') THEN | ||
RETURN NULL; | ||
END IF; | ||
/* instantiate change row */ | ||
change_row := ROW( | ||
NEXTVAL('#{prefix}.changes_id_seq'), | ||
pg_current_xact_id(), | ||
LOWER(TG_OP::TEXT), | ||
TG_TABLE_SCHEMA::TEXT, | ||
TG_TABLE_NAME::TEXT, | ||
NULL, | ||
NULL, | ||
'{}', | ||
NULL, | ||
NULL | ||
); | ||
/* collect table pk */ | ||
IF trigger_row.primary_key_columns != '{}' THEN | ||
IF (TG_OP IN ('INSERT', 'UPDATE')) THEN | ||
pk_source := NEW; | ||
ELSIF (TG_OP = 'DELETE') THEN | ||
pk_source := OLD; | ||
END IF; | ||
change_row.table_pk = '{}'; | ||
FOREACH pk_col IN ARRAY trigger_row.primary_key_columns LOOP | ||
EXECUTE 'SELECT $1.' || quote_ident(pk_col) || '::TEXT' USING pk_source INTO pk_col_val; | ||
change_row.table_pk := change_row.table_pk || pk_col_val; | ||
END LOOP; | ||
END IF; | ||
/* collect version data */ | ||
IF (TG_OP IN ('INSERT', 'UPDATE')) THEN | ||
SELECT to_jsonb(NEW.*) - trigger_row.excluded_columns | ||
INTO change_row.data; | ||
ELSIF (TG_OP = 'DELETE') THEN | ||
SELECT to_jsonb(OLD.*) - trigger_row.excluded_columns | ||
INTO change_row.data; | ||
END IF; | ||
/* change tracking for UPDATEs */ | ||
IF (TG_OP = 'UPDATE') THEN | ||
change_row.changed_from = '{}'::JSONB; | ||
SELECT jsonb_object_agg(before.key, before.value) | ||
FROM jsonb_each(to_jsonb(OLD.*) - trigger_row.excluded_columns) AS before | ||
WHERE (change_row.data->before.key)::JSONB != before.value | ||
INTO change_row.changed_from; | ||
SELECT ARRAY(SELECT jsonb_object_keys(change_row.changed_from)) | ||
INTO change_row.changed; | ||
/* skip persisting this update if nothing has changed */ | ||
IF change_row.changed = '{}' THEN | ||
RETURN NULL; | ||
END IF; | ||
/* persisting the old data is opt-in, discard if not configured. */ | ||
IF trigger_row.store_changed_from IS FALSE THEN | ||
change_row.changed_from := NULL; | ||
END IF; | ||
END IF; | ||
/* filtered columns */ | ||
SELECT #{prefix}.jsonb_redact_keys(change_row.data, trigger_row.filtered_columns) | ||
INTO change_row.data; | ||
IF change_row.changed_from IS NOT NULL THEN | ||
SELECT #{prefix}.jsonb_redact_keys(change_row.changed_from, trigger_row.filtered_columns) | ||
INTO change_row.changed_from; | ||
END IF; | ||
/* insert, fail gracefully unless transaction record present or NEXTVAL has never been called */ | ||
BEGIN | ||
change_row.transaction_id = CURRVAL('#{prefix}.transactions_id_seq'); | ||
/* verify that xact_id matches */ | ||
IF NOT | ||
EXISTS( | ||
SELECT 1 FROM #{prefix}.transactions | ||
WHERE id = change_row.transaction_id AND xact_id = change_row.transaction_xact_id | ||
) | ||
THEN | ||
RAISE USING ERRCODE = 'foreign_key_violation'; | ||
END IF; | ||
INSERT INTO #{prefix}.changes VALUES (change_row.*); | ||
EXCEPTION WHEN foreign_key_violation OR object_not_in_prerequisite_state THEN | ||
RAISE '(carbonite) % on table %.% without prior INSERT into #{prefix}.transactions', | ||
TG_OP, TG_TABLE_SCHEMA, TG_TABLE_NAME USING ERRCODE = 'foreign_key_violation'; | ||
END; | ||
RETURN NULL; | ||
END; | ||
$body$ | ||
LANGUAGE plpgsql; | ||
""" | ||
|> squish_and_execute() | ||
|
||
:ok | ||
end | ||
|
||
@type up_option :: {:carbonite_prefix, prefix()} | ||
|
||
@impl true | ||
@spec up([up_option()]) :: :ok | ||
def up(opts) do | ||
prefix = Keyword.get(opts, :carbonite_prefix, default_prefix()) | ||
|
||
create_capture_changes_procedure(prefix) | ||
execute("DROP FUNCTION #{prefix}.record_dynamic_varchar_agg;") | ||
execute("DROP FUNCTION #{prefix}.record_dynamic_varchar;") | ||
|
||
:ok | ||
end | ||
|
||
@type down_option :: {:carbonite_prefix, prefix()} | ||
|
||
@impl true | ||
@spec down([down_option()]) :: :ok | ||
def down(opts) do | ||
prefix = Keyword.get(opts, :carbonite_prefix, default_prefix()) | ||
|
||
V8.create_record_dynamic_varchar_procedure(prefix) | ||
V8.create_record_dynamic_varchar_agg_procedure(prefix) | ||
V10.create_capture_changes_procedure(prefix) | ||
|
||
:ok | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters