Skip to content

Commit

Permalink
feat: allow the database connection to be configured for async jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
bethesque committed Jan 13, 2019
1 parent 5292406 commit 6a745d4
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 21 deletions.
14 changes: 10 additions & 4 deletions lib/pact_broker/webhooks/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ class Job
include PactBroker::Logging

def perform data
data.fetch(:database_connector).call do
perform_with_connection(data)
end
end

private

attr_reader :triggered_webhook, :error_count

def perform_with_connection(data)
@data = data
@triggered_webhook = PactBroker::Webhooks::TriggeredWebhook.find(id: data[:triggered_webhook].id)
@error_count = data[:error_count] || 0
Expand All @@ -26,10 +36,6 @@ def perform data
end
end

private

attr_reader :triggered_webhook, :error_count

def execution_options
{
success_log_message: "Successfully executed webhook",
Expand Down
14 changes: 8 additions & 6 deletions lib/pact_broker/webhooks/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,20 @@ def self.run_later webhooks, pact, verification, event_name
begin
triggered_webhook = webhook_repository.create_triggered_webhook(trigger_uuid, webhook, pact, verification, RESOURCE_CREATION)
logger.info "Scheduling job for #{webhook.description} with uuid #{webhook.uuid}"
job_data = { triggered_webhook: triggered_webhook }
schedule_webhook_job(job_data)
job_data = {
triggered_webhook: triggered_webhook,
database_connector: job_database_connector
}
# Delay slightly to make sure the request transaction has finished before we execute the webhook
Job.perform_in(5, job_data)
rescue StandardError => e
log_error e
end
end
end

# This is a separate method so it can be overridden in the saas broker
def self.schedule_webhook_job(job_data)
# Delay slightly to make sure the request transaction has finished before we execute the webhook
Job.perform_in(5, job_data)
def self.job_database_connector
Thread.current[:pact_broker_thread_data].database_connector
end

def self.find_latest_triggered_webhooks_for_pact pact
Expand Down
22 changes: 22 additions & 0 deletions lib/rack/pact_broker/database_transaction.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'pact_broker/constants'
require 'sequel'
require 'ostruct'

module Rack
module PactBroker
Expand All @@ -11,14 +12,22 @@ class DatabaseTransaction
def initialize app, database_connection
@app = app
@database_connection = database_connection
@default_database_connector = ->(&block) {
database_connection.synchronize do
block.call
end
}
end

def call env
set_database_connector
if use_transaction? env
call_with_transaction env
else
call_without_transaction env
end
ensure
clear_database_connector
end

def use_transaction? env
Expand All @@ -43,6 +52,19 @@ def call_with_transaction env
def do_not_rollback? response
response[1].delete(::PactBroker::DO_NOT_ROLLBACK)
end

def set_database_connector
Thread.current[:pact_broker_thread_data] ||= OpenStruct.new
Thread.current[:pact_broker_thread_data].database_connector ||= @default_database_connector
end

def clear_database_connector
if thread_data = Thread.current[:pact_broker_thread_data]
if thread_data.database_connector == @default_database_connector
thread_data.database_connector = nil
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion spec/features/publish_verification_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
expect(JSON.parse(subject.body)).to include JSON.parse(verification_content)
end

context "with a webhook configured" do
context "with a webhook configured", job: true do
before do
td.create_webhook(
method: 'POST',
Expand Down
14 changes: 7 additions & 7 deletions spec/lib/pact_broker/webhooks/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
module PactBroker
module Webhooks
describe Job do

before do
PactBroker.configuration.webhook_retry_schedule = [10, 60, 120, 300, 600, 1200]
allow(PactBroker::Webhooks::Service).to receive(:execute_triggered_webhook_now).and_return(result)
Expand All @@ -16,8 +15,9 @@ module Webhooks
let(:result) { instance_double("PactBroker::Domain::WebhookExecutionResult", success?: success)}
let(:success) { true }
let(:logger) { double('logger').as_null_object }
let(:database_connector) { ->(&block) { block.call } }

subject { Job.new.perform(triggered_webhook: triggered_webhook) }
subject { Job.new.perform(triggered_webhook: triggered_webhook, database_connector: database_connector) }

it "reloads the TriggeredWebhook object to make sure it has a fresh copy" do
expect(PactBroker::Webhooks::TriggeredWebhook).to receive(:find).with(id: 1)
Expand All @@ -44,7 +44,7 @@ module Webhooks
end

it "reschedules the job in 10 seconds" do
expect(Job).to receive(:perform_in).with(10, {triggered_webhook: triggered_webhook, error_count: 1})
expect(Job).to receive(:perform_in).with(10, {triggered_webhook: triggered_webhook, error_count: 1, database_connector: database_connector})
subject
end

Expand All @@ -59,7 +59,7 @@ module Webhooks
let(:success) { false }

it "reschedules the job in 10 seconds" do
expect(Job).to receive(:perform_in).with(10, {triggered_webhook: triggered_webhook, error_count: 1})
expect(Job).to receive(:perform_in).with(10, {triggered_webhook: triggered_webhook, error_count: 1, database_connector: database_connector})
subject
end

Expand All @@ -84,10 +84,10 @@ module Webhooks
allow(PactBroker::Webhooks::Service).to receive(:execute_triggered_webhook_now).and_raise("an error")
end

subject { Job.new.perform(triggered_webhook: triggered_webhook, error_count: 1) }
subject { Job.new.perform(triggered_webhook: triggered_webhook, error_count: 1, database_connector: database_connector) }

it "reschedules the job in 60 seconds" do
expect(Job).to receive(:perform_in).with(60, {triggered_webhook: triggered_webhook, error_count: 2})
expect(Job).to receive(:perform_in).with(60, {triggered_webhook: triggered_webhook, error_count: 2, database_connector: database_connector})
subject
end

Expand All @@ -101,7 +101,7 @@ module Webhooks
context "when the job is not successful for the last time" do
let(:success) { false }

subject { Job.new.perform(triggered_webhook: triggered_webhook, error_count: 6) }
subject { Job.new.perform(triggered_webhook: triggered_webhook, error_count: 6, database_connector: database_connector) }

it "executes the job with an log message indicating that the webhook has failed" do
expect(PactBroker::Webhooks::Service).to receive(:execute_triggered_webhook_now)
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/pact_broker/webhooks/service_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ module Webhooks
end
end

context "when there is a scheduling error" do
context "when there is a scheduling error", job: true do
before do
allow(Job).to receive(:perform_in).and_raise("an error")
end
Expand Down Expand Up @@ -166,7 +166,7 @@ module Webhooks
end
end

describe ".execute_webhook_now integration test" do
describe ".execute_webhook_now integration test", job: true do
let(:td) { TestDataBuilder.new }

let!(:http_request) do
Expand Down Expand Up @@ -215,7 +215,7 @@ module Webhooks
end
end

describe ".trigger_webhooks integration test" do
describe ".trigger_webhooks integration test", job: true do
let!(:http_request) do
stub_request(:get, "http://example.org").
to_return(:status => 200)
Expand Down
40 changes: 40 additions & 0 deletions spec/lib/rack/pact_broker/database_transaction_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,46 @@ module PactBroker
expect { subject }.to change { ::PactBroker::Domain::Pacticipant.count }.by(1)
end
end

describe "setting the database connector" do
let(:api) { double('api', call: [200, {}, []]) }

it "sets a database connector for use in jobs scheduled by this request" do
expect(api).to receive(:call) do | env |
expect(Thread.current[:pact_broker_thread_data].database_connector).to_not be nil
[200, {}, []]
end

subject
end

it "clears it after the request" do
subject
expect(Thread.current[:pact_broker_thread_data].database_connector).to be nil
end

context "when other middleware sets the database connector" do
before do
Thread.current[:pact_broker_thread_data] = OpenStruct.new(database_connector: other_database_connector)
end

let(:other_database_connector) { ->(&block) { block.call } }

it "does not override it" do
expect(api).to receive(:call) do | env |
expect(Thread.current[:pact_broker_thread_data].database_connector).to eq other_database_connector
[200, {}, []]
end

subject
end

it "does not clear it after the request" do
subject
expect(Thread.current[:pact_broker_thread_data].database_connector).to_not be nil
end
end
end
end
end
end
12 changes: 12 additions & 0 deletions spec/support/jobs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require 'ostruct'

RSpec.configure do | config |
config.before(:each, job: true) do
Thread.current[:pact_broker_thread_data] = OpenStruct.new
Thread.current[:pact_broker_thread_data].database_connector = -> (&block) { block.call }
end

config.after(:each, job: true) do
Thread.current[:pact_broker_thread_data] = nil
end
end

0 comments on commit 6a745d4

Please sign in to comment.