Skip to content

Commit

Permalink
feat: improve the performance of the incremental clean queries
Browse files Browse the repository at this point in the history
  • Loading branch information
bethesque committed Jan 12, 2023
1 parent 7e53a8c commit c3a07c7
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 62 deletions.
126 changes: 67 additions & 59 deletions lib/pact_broker/db/clean_incremental.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,36 @@ def initialize database_connection, options = {}
@options = options
end

def call
require "pact_broker/db/models"

if dry_run?
dry_run_results
else
execute_clean
end
end

private

attr_reader :db, :options

def execute_clean
db.transaction do
before_counts = current_counts
PactBroker::Domain::Version.where(id: versions_to_delete.from_self.select_map(:id)).delete
delete_orphan_pact_versions
after_counts = current_counts

TABLES.each_with_object({}) do | table_name, comparison_counts |
comparison_counts[table_name.to_s] = {
"deleted" => before_counts[table_name] - after_counts[table_name],
"kept" => after_counts[table_name]
}
end
end
end

def logger
options[:logger] || PactBroker.logger
end
Expand All @@ -42,47 +72,20 @@ def limit
options[:limit] || 1000
end

def resolve_ids(query, column_name = :id)
query.collect { |h| h[column_name] }
end

def version_ids_to_delete
db[:versions].where(id: version_ids_to_keep).invert.limit(limit).order(Sequel.asc(:id))
def versions_to_delete(columns = [:id])
fully_qualified_columns = columns.collect { |col| Sequel[:versions][col] }
PactBroker::Domain::Version
.select(*fully_qualified_columns)
.left_outer_join(version_ids_to_keep, { Sequel[:versions][:id] => Sequel[:keep_versions][:id] }, table_alias: :keep_versions)
.where(Sequel[:keep_versions][:id] => nil)
.order(Sequel.asc( Sequel[:versions][:id]))
.limit(limit)
end

def version_ids_to_keep
@version_ids_to_keep ||= selected_versions_to_keep.reduce(&:union)
@version_ids_to_keep ||= keep.collect { |selector| PactBroker::Domain::Version.select(:id).for_selector(selector) }.reduce(&:union)
end

def selected_versions_to_keep
keep.collect do | selector |
PactBroker::Domain::Version.select(:id).for_selector(selector)
end
end

def call
require "pact_broker/db/models"

if dry_run?
dry_run_results
else
db.transaction do
before_counts = current_counts
PactBroker::Domain::Version.where(id: resolve_ids(version_ids_to_delete)).delete
delete_orphan_pact_versions
after_counts = current_counts

TABLES.each_with_object({}) do | table_name, comparison_counts |
comparison_counts[table_name.to_s] = { "deleted" => before_counts[table_name] - after_counts[table_name], "kept" => after_counts[table_name] }
end
end
end
end

private

attr_reader :db, :options

def current_counts
TABLES.each_with_object({}) do | table_name, counts |
counts[table_name] = db[table_name].count
Expand Down Expand Up @@ -122,8 +125,8 @@ def version_info(version)
end

def dry_run_results
to_delete = dry_run_to_delete
to_keep = dry_run_to_keep
to_delete = dry_run_to_delete_by_pacticipant
to_keep = dry_run_to_keep_by_pacticipant

kept_per_selector = keep.collect do | selector |
{
Expand All @@ -141,7 +144,7 @@ def dry_run_results

total_versions_count = PactBroker::Domain::Version.count
versions_to_keep_count = version_ids_to_keep.count
versions_to_delete_count = version_ids_to_delete.count
versions_to_delete_count = versions_to_delete.count

{
"counts" => {
Expand All @@ -155,9 +158,15 @@ def dry_run_results
}
end

def dry_run_latest_versions_to_keep
latest_undeleted_versions_by_order = PactBroker::Domain::Version.where(id: version_ids_to_delete.select(:id))
.invert
def expected_remaining_versions
PactBroker::Domain::Version
.left_outer_join(versions_to_delete, { Sequel[:versions][:id] => Sequel[:delete_versions][:id] }, table_alias: :delete_versions )
.where(Sequel[:delete_versions][:id] => nil)
end

# Returns the latest version that will be kept for each pacticipant
def dry_run_latest_versions_to_keep_by_pacticipant
latest_undeleted_versions_by_order = expected_remaining_versions
.select_group(:pacticipant_id)
.select_append{ max(order).as(latest_order) }

Expand All @@ -171,9 +180,9 @@ def dry_run_latest_versions_to_keep
.join(latest_undeleted_versions_by_order, lv_versions_join, { table_alias: :lv })
end

def dry_run_earliest_versions_to_keep
earliest_undeleted_versions_by_order = PactBroker::Domain::Version.where(id: version_ids_to_delete.select(:id))
.invert
# Returns the earliest version that will be kept for each pacticipant
def dry_run_earliest_versions_to_keep_by_pacticipant
earliest_undeleted_versions_by_order = expected_remaining_versions
.select_group(:pacticipant_id)
.select_append{ min(order).as(first_order) }

Expand All @@ -187,13 +196,16 @@ def dry_run_earliest_versions_to_keep
.join(earliest_undeleted_versions_by_order, ev_versions_join, { table_alias: :lv })
end

def dry_run_to_delete
PactBroker::Domain::Version
.where(id: version_ids_to_delete.select(:id))
# Returns Hash of pacticipant name => Hash, where the Hash value contains the count, fromVersion and toVersion
# that will be deleted.
# @return Hash
def dry_run_to_delete_by_pacticipant
versions_to_delete
.select(Sequel[:versions].*)
.all
.group_by{ | v | v.pacticipant_id }
.each_with_object({}) do | (_pacticipant_id, versions), thing |
thing[versions.first.pacticipant.name] = {
.each_with_object({}) do | (_pacticipant_id, versions), hash |
hash[versions.first.pacticipant.name] = {
"count" => versions.count,
"fromVersion" => version_info(versions.first),
"toVersion" => version_info(versions.last)
Expand All @@ -202,14 +214,14 @@ def dry_run_to_delete
end

# rubocop: disable Metrics/CyclomaticComplexity
def dry_run_to_keep
latest_to_keep = dry_run_latest_versions_to_keep.eager(:tags).each_with_object({}) do | version, r |
def dry_run_to_keep_by_pacticipant
latest_to_keep = dry_run_latest_versions_to_keep_by_pacticipant.eager(:tags).each_with_object({}) do | version, r |
r[version.pacticipant_id] = {
"firstVersion" => version_info(version)
}
end

earliest_to_keep = dry_run_earliest_versions_to_keep.eager(:tags).each_with_object({}) do | version, r |
earliest_to_keep = dry_run_earliest_versions_to_keep_by_pacticipant.eager(:tags).each_with_object({}) do | version, r |
r[version.pacticipant_id] = {
"latestVersion" => version_info(version)
}
Expand All @@ -226,14 +238,10 @@ def dry_run_to_keep
# rubocop: enable Metrics/CyclomaticComplexity

def counts_to_keep
db[:versions].where(id: version_ids_to_delete.select(:id))
.invert
expected_remaining_versions
.select_group(:pacticipant_id)
.select_append{ count(1).as(count) }
.all
.each_with_object({}) do | row, counts |
counts[row[:pacticipant_id]] = row[:count]
end
.as_hash(:pacticipant_id, :count)
end

def pacticipants
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
},
{
"selector": {
"tag": "dev",
"latest": true
"latest": true,
"tag": "dev"
},
"count": 1
}
Expand Down
12 changes: 11 additions & 1 deletion spec/lib/pact_broker/db/clean_incremental_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def pact_publication_count_for(consumer_name, version_number)

end

let(:options) { { keep: [all_prod_selector, latest_dev_selector], limit: limit, dry_run: dry_run } }
let(:keep_selectors) { [all_prod_selector, latest_dev_selector] }

let(:options) { { keep: keep_selectors, limit: limit, dry_run: dry_run } }

it "does not delete the consumer versions specified" do
expect(PactBroker::Domain::Version.where(number: "1").count).to be 1
Expand All @@ -70,6 +72,14 @@ def pact_publication_count_for(consumer_name, version_number)
expect(PactBroker::Domain::Version.where(number: "7").count).to be 1
end

context "with the default selectors" do
let(:options) { {} }

it "doesn't blow up" do
subject
end
end

context "when dry_run: true" do
before do
td.create_pact_with_hierarchy("Meep", "2", "Moop")
Expand Down

0 comments on commit c3a07c7

Please sign in to comment.