Skip to content

Commit

Permalink
Journaled 5.0: Transactional batching (#25)
Browse files Browse the repository at this point in the history
This adds transactional batching to journaled. What does that mean? Well, now, by default, when multiple events are emitted inside of a transaction:

```ruby
ActiveRecord::Base.transaction do
  event_1.journal!
  event_2.journal!
end
```

A single job will be enqueued directly before the SQL `COMMIT` statement, batching up the two events. (And if the transaction rolls back, the job will never be enqueued.)

This can be disabled with a global config (for testing purposes -- we should eventually remove this config if we find no downsides in the new behavior):

```ruby
Journaled.transactional_batching_enabled = !Rails.env.production?
```

What happens if we aren't in a transaction? Well, the same thing that happened before! (A job will be enqueued right away.) (**I'm looking into adding an additional transactional safety check, that would raise an error if `journal!` is called outside of a transaction!** But for I'll save that for a future PR.)

Because this bumps the version to 5.0, it also removes compatibility for `Journaled::DeliveryJob` jobs enqueued with legacy (3.1.0-era) arguments. This job now accepts a list of events to emit, rather than a single event-kwarg-blog, so a new legacy input pattern is accepted for compatibility with jobs enqueued by v4.x.x of the gem.
  • Loading branch information
smudge authored Aug 23, 2022
1 parent db39c4d commit 4ff34a6
Show file tree
Hide file tree
Showing 16 changed files with 523 additions and 126 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,24 @@ gem version.

As such, **we always recommend upgrading only one major version at a time.**

### Upgrading from 4.3.0

Versions of Journaled prior to 5.0 would enqueue events one at a time, but 5.0
introduces a new transaction-aware feature that will bundle up all events
emitted within a transaction and enqueue them all in a single "batch" job
directly before the SQL `COMMIT` statement. This reduces the database impact of
emitting a large volume of events at once.

This feature can be disabled conditionally:

```ruby
Journaled.transactional_batching_enabled = false
```

Backwards compatibility has been included for background jobs enqueued by
version 4.0 and above, but **has been dropped for jobs emitted by versions prior
to 4.0**. (Again, be sure to upgrade only one major version at a time.)

### Upgrading from 3.1.0

Versions of Journaled prior to 4.0 relied directly on environment variables for stream names, but now stream names are configured directly.
Expand Down
19 changes: 19 additions & 0 deletions UPGRADING
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
============================
NOTE FOR UPGRADING JOURNALED
============================

If you are upgrading from an older `journaled` version, please be sure to
increment only ONE major version at a time.

⚠️ IF YOU ARE UPGRADING FROM 3.1 OR EARLIER, you should NOT USE THIS VERSION. ⚠️

Instead, install a version of the gem that is backwards compatible with your
app's currently-enqueued journaled jobs:

gem 'journaled', '~> 4.2.0' # upgrading from 3.0-3.1
gem 'journaled', '~> 3.1.0' # upgrading from 2.0-2.5

For additional upgrade instructions (e.g. how to handle a few BREAKING CHANGES
to environment variables), please see the README:
https://github.com/Betterment/journaled/blob/v5.0.0/README.md#upgrades

45 changes: 17 additions & 28 deletions app/jobs/journaled/delivery_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,11 @@ class DeliveryJob < ApplicationJob
raise KinesisTemporaryFailure
end

UNSPECIFIED = Object.new
private_constant :UNSPECIFIED

def perform(serialized_event:, partition_key:, stream_name: UNSPECIFIED, app_name: UNSPECIFIED)
@serialized_event = serialized_event
@partition_key = partition_key
if app_name != UNSPECIFIED
@stream_name = self.class.legacy_computed_stream_name(app_name: app_name)
elsif stream_name != UNSPECIFIED && !stream_name.nil?
@stream_name = stream_name
else
raise(ArgumentError, 'missing keyword: stream_name')
end

journal!
end
def perform(*events, **legacy_kwargs)
events << legacy_kwargs if legacy_kwargs.present?
@kinesis_records = events.map { |e| KinesisRecord.new(**e.delete_if { |_k, v| v.nil? }) }

def self.legacy_computed_stream_name(app_name:)
env_var_name = [app_name&.upcase, 'JOURNALED_STREAM_NAME'].compact.join('_')
ENV.fetch(env_var_name)
journal! if Journaled.enabled?
end

def kinesis_client_config
Expand All @@ -46,18 +31,22 @@ def kinesis_client_config

private

attr_reader :serialized_event, :partition_key, :stream_name
KinesisRecord = Struct.new(:serialized_event, :partition_key, :stream_name, keyword_init: true) do
def initialize(serialized_event:, partition_key:, stream_name:)
super(serialized_event: serialized_event, partition_key: partition_key, stream_name: stream_name)
end

def journal!
kinesis_client.put_record record if Journaled.enabled?
def to_h
{ stream_name: stream_name, data: serialized_event, partition_key: partition_key }
end
end

def record
{
stream_name: stream_name,
data: serialized_event,
partition_key: partition_key,
}
attr_reader :kinesis_records

def journal!
kinesis_records.map do |record|
kinesis_client.put_record(**record.to_h)
end
end

def kinesis_client
Expand Down
48 changes: 30 additions & 18 deletions app/models/journaled/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,34 @@ def initialize(journaled_event:)

def journal!
validate!
ActiveSupport::Notifications.instrument('journaled.event.enqueue', event: journaled_event, priority: job_opts[:priority]) do
Journaled::DeliveryJob.set(job_opts).perform_later(**delivery_perform_args)

ActiveSupport::Notifications.instrument('journaled.event.stage', event: journaled_event, **journaled_enqueue_opts) do
if Journaled::Connection.available?
Journaled::Connection.stage!(journaled_event)
else
self.class.enqueue!(journaled_event)
end
end
end

def self.enqueue!(*events)
events.group_by(&:journaled_enqueue_opts).each do |enqueue_opts, batch|
job_opts = enqueue_opts.reverse_merge(priority: Journaled.job_priority)
ActiveSupport::Notifications.instrument('journaled.batch.enqueue', batch: batch, **job_opts) do
Journaled::DeliveryJob.set(job_opts).perform_later(*delivery_perform_args(batch))

batch.each { |event| ActiveSupport::Notifications.instrument('journaled.event.enqueue', event: event, **job_opts) }
end
end
end

def self.delivery_perform_args(events)
events.map do |event|
{
serialized_event: event.journaled_attributes.to_json,
partition_key: event.journaled_partition_key,
stream_name: event.journaled_stream_name,
}
end
end

Expand All @@ -38,27 +64,13 @@ def journal!
delegate(*EVENT_METHOD_NAMES, to: :journaled_event)

def validate!
serialized_event = journaled_event.journaled_attributes.to_json

schema_validator('base_event').validate! serialized_event
schema_validator('tagged_event').validate! serialized_event if journaled_event.tagged?
schema_validator(journaled_schema_name).validate! serialized_event
end

def job_opts
journaled_enqueue_opts.reverse_merge(priority: Journaled.job_priority)
end

def delivery_perform_args
{
serialized_event: serialized_event,
partition_key: journaled_partition_key,
stream_name: journaled_stream_name,
}
end

def serialized_event
@serialized_event ||= journaled_attributes.to_json
end

def schema_validator(schema_name)
Journaled::JsonSchemaModel::Validator.new(schema_name)
end
Expand Down
3 changes: 2 additions & 1 deletion journaled.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ Gem::Specification.new do |s|
s.metadata['rubygems_mfa_required'] = 'true'

s.files = Dir["{app,config,lib,journaled_schemas}/**/*", "LICENSE", "Rakefile", "README.md"]
s.test_files = Dir["spec/**/*"]

s.required_ruby_version = ">= 2.6"

s.post_install_message = File.read("UPGRADING") if File.exist?('UPGRADING')

s.add_dependency "activejob"
s.add_dependency "activerecord"
s.add_dependency "aws-sdk-kinesis", "< 2"
Expand Down
26 changes: 15 additions & 11 deletions lib/journaled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

require "journaled/engine"
require "journaled/current"
require "journaled/errors"
require 'journaled/connection'

module Journaled
SUPPORTED_QUEUE_ADAPTERS = %w(delayed delayed_job good_job que).freeze
Expand All @@ -14,32 +16,36 @@ module Journaled
mattr_accessor(:http_open_timeout) { 2 }
mattr_accessor(:http_read_timeout) { 60 }
mattr_accessor(:job_base_class_name) { 'ActiveJob::Base' }
mattr_accessor(:transactional_batching_enabled) { true }

def development_or_test?
def self.development_or_test?
%w(development test).include?(Rails.env)
end

def enabled?
def self.enabled?
['0', 'false', false, 'f', ''].exclude?(ENV.fetch('JOURNALED_ENABLED', !development_or_test?))
end

def schema_providers
def self.schema_providers
@schema_providers ||= [Journaled::Engine, Rails]
end

def commit_hash
def self.commit_hash
ENV.fetch('GIT_COMMIT')
end

def actor_uri
def self.actor_uri
Journaled::ActorUriProvider.instance.actor_uri
end

def detect_queue_adapter!
adapter = job_base_class_name.constantize.queue_adapter_name
unless SUPPORTED_QUEUE_ADAPTERS.include?(adapter)
def self.queue_adapter
job_base_class_name.constantize.queue_adapter_name
end

def self.detect_queue_adapter!
unless SUPPORTED_QUEUE_ADAPTERS.include?(queue_adapter)
raise <<~MSG
Journaled has detected an unsupported ActiveJob queue adapter: `:#{adapter}`
Journaled has detected an unsupported ActiveJob queue adapter: `:#{queue_adapter}`
Journaled jobs must be enqueued transactionally to your primary database.
Expand All @@ -62,6 +68,4 @@ def self.tagged(**tags)
def self.tag!(**tags)
Current.tags = Current.tags.merge(tags)
end

module_function :development_or_test?, :enabled?, :schema_providers, :commit_hash, :actor_uri, :detect_queue_adapter!
end
48 changes: 48 additions & 0 deletions lib/journaled/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
module Journaled
module Connection
class << self
def available?
Journaled.transactional_batching_enabled && transaction_open?
end

def stage!(event)
raise TransactionSafetyError, <<~MSG unless transaction_open?
Transaction not available! By default, journaled event batching requires an open database transaction.
MSG

connection.current_transaction._journaled_staged_events << event
end

private

def transaction_open?
connection.transaction_open?
end

def connection
if Journaled.queue_adapter.in? %w(delayed delayed_job)
Delayed::Job.connection
elsif Journaled.queue_adapter == 'good_job'
GoodJob::BaseRecord.connection
elsif Journaled.queue_adapter == 'que'
Que::ActiveRecord::Model.connection
elsif Journaled.queue_adapter == 'test' && Rails.env.test?
ActiveRecord::Base.connection
else
raise "Unsupported adapter: #{Journaled.queue_adapter}"
end
end
end

module TestOnlyBehaviors
def transaction_open?
# Transactional fixtures wrap all tests in an outer, non-joinable transaction:
super && (connection.open_transactions > 1 || connection.current_transaction.joinable?)
end
end

class << self
prepend TestOnlyBehaviors if Rails.env.test?
end
end
end
5 changes: 5 additions & 0 deletions lib/journaled/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ class Engine < ::Rails::Engine
ActiveSupport.on_load(:active_job) do
Journaled.detect_queue_adapter! unless Journaled.development_or_test?
end

ActiveSupport.on_load(:active_record) do
require 'journaled/transaction_ext'
ActiveRecord::ConnectionAdapters::Transaction.prepend Journaled::TransactionExt
end
end
end
end
3 changes: 3 additions & 0 deletions lib/journaled/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module Journaled
class TransactionSafetyError < StandardError; end
end
31 changes: 31 additions & 0 deletions lib/journaled/transaction_ext.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
require 'active_record/connection_adapters/abstract/transaction'

module Journaled
module TransactionExt
def initialize(*, **)
super.tap do
raise TransactionSafetyError, <<~MSG unless instance_variable_defined?(:@run_commit_callbacks)
Journaled::TransactionExt expects @run_commit_callbacks to be defined on Transaction!
This is an internal API that may have changed in a recent Rails release.
If you were not expecting to see this error, please file an issue here:
https://github.com/Betterment/journaled/issues
MSG
end
end

def before_commit_records
super.tap do
Writer.enqueue!(*_journaled_staged_events) if @run_commit_callbacks
end
end

def commit_records
connection.current_transaction._journaled_staged_events.push(*_journaled_staged_events) unless @run_commit_callbacks
super
end

def _journaled_staged_events
@_journaled_staged_events ||= []
end
end
end
2 changes: 1 addition & 1 deletion lib/journaled/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Journaled
VERSION = "4.3.0".freeze
VERSION = "5.0.0".freeze
end
Loading

0 comments on commit 4ff34a6

Please sign in to comment.