diff --git a/README.md b/README.md index 665ad81..ab74ee3 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,8 @@ add scoped ordering capability at a future date (and would gladly entertain pull requests), but it is presently only designed to provide a durable, eventually consistent record that discrete events happened. +**See [upgrades](#upgrades) below if you're upgrading from an older `journaled` version!** + ## Installation 1. If you haven't already, @@ -83,52 +85,15 @@ app's Gemfile. The AWS principal whose credentials are in the environment will need to be allowed to assume this role. -### Upgrading from 3.1.0 - -Versions of Journaled prior to 4.0 relied directly on environment variables for stream names, but now stream names are configured directly. -When upgrading, you can use the following configuration to maintain the previous behavior: - -```ruby -Journaled.default_stream_name = ENV['JOURNALED_STREAM_NAME'] -``` - -If you previously specified a `Journaled.default_app_name`, you would have required a more precise environment variable name (substitute `{{upcase_app_name}}`): - -```ruby -Journaled.default_stream_name = ENV["{{upcase_app_name}}_JOURNALED_STREAM_NAME"] -``` - -And if you had defined any `journaled_app_name` methods on `Journaled::Event` instances, you can replace them with the following: - -```ruby -def journaled_stream_name - ENV['{{upcase_app_name}}_JOURNALED_STREAM_NAME'] -end -``` - -When upgrading from 3.1 or below, `Journaled::DeliveryJob` will handle any jobs that remain in the queue by accepting an `app_name` argument. **This behavior will be removed in version 5.0**, so it is recommended to upgrade one major version at a time. - -### Upgrading from 2.5.0 - -Versions of Journaled prior to 3.0 relied direclty on `delayed_job` and a "performable" class called `Journaled::Delivery`. -In 3.0, this was superceded by an ActiveJob class called `Journaled::DeliveryJob`, but the `Journaled::Delivery` class was not removed until 4.0. - -Therefore, when upgrading from 2.5.0 or below, it is recommended to first upgrade to 3.1.0 (to allow any `Journaled::Delivery` jobs to finish working off) before upgrading to 4.0+. - -The upgrade to 3.1.0 will require a working ActiveJob config. ActiveJob can be configured globally by setting `ActiveJob::Base.queue_adapter`, or just for Journaled jobs by setting `Journaled::DeliveryJob.queue_adapter`. -The `:delayed_job` queue adapter will allow you to continue relying on `delayed_job`. You may also consider switching your app(s) to [`delayed`](https://github.com/Betterment/delayed) and using the `:delayed` queue adapter. - ## Usage ### Configuration Journaling provides a number of different configuation options that can be set in Ruby using an initializer. Those values are: -#### `Journaled.default_app_name` +#### `Journaled.default_stream_name ` - This is described in the proceeding paragraph and is used to specify which app name to use, which corresponds to which Journaled Stream to send events too. - This is the default value for events that do NOT specify their own `#journaled_app_name`. For events that define their own `#journaled_app_name` method, that will take precedence over this default. - Ex: `Journaled.default_app_name = 'my_app'` + This is described in the "Installation" section above, and is used to specify which stream name to use. #### `Journaled.job_priority` (default: 20) @@ -185,8 +150,8 @@ class ApplicationController < ActionController::Base end ``` -Your authenticated entity must respond to `#to_global_id`, which -ActiveRecords do by default. +Your authenticated entity must respond to `#to_global_id`, which ActiveRecords do by default. +This feature relies on `ActiveSupport::CurrentAttributes` under the hood. Every time any of the specified attributes is modified, or a `User` record is created or destroyed, an event will be sent to Kinesis with the following attributes: @@ -214,6 +179,40 @@ journaling. Note that the less-frequently-used methods `toggle`, `increment*`, `decrement*`, and `update_counters` are not intercepted at this time. +### Tagged Events + +Events may be optionally marked as "tagged." This will add a `tags` field, intended for tracing and +auditing purposes. + +```ruby +class MyEvent + include Journaled::Event + + journal_attributes :attr_1, :attr_2, tagged: true +end +``` + +You may then use `Journaled.tag!` and `Journaled.tagged` inside of your +`ApplicationController` and `ApplicationJob` classes (or anywhere else!) to tag +all events with request and job metadata: + +```ruby +class ApplicationController < ActionController::Base + before_action do + Journaled.tag!(request_id: request.request_id, current_user_id: current_user&.id) + end +end + +class ApplicationJob < ActiveJob::Base + around_perform do |job, perform| + Journaled.tagged(job_id: job.id) { perform.call } + end +end +``` + +This feature relies on `ActiveSupport::CurrentAttributes` under the hood, so these tags are local to +the current thread, and will be cleared at the end of each request request/job. + #### Testing If you use RSpec (and have required `journaled/rspec` in your @@ -353,6 +352,49 @@ Returns one of the following in order of preference: In order for this to be most useful, you must configure your controller as described in [Change Journaling](#change-journaling) above. +## Upgrades + +Since this gem relies on background jobs (which can remain in the queue across +code releases), this gem generally aims to support jobs enqueued by the prior +gem version. + +As such, **we always recommend upgrading only one major version at a time.** + +### Upgrading from 3.1.0 + +Versions of Journaled prior to 4.0 relied directly on environment variables for stream names, but now stream names are configured directly. +When upgrading, you can use the following configuration to maintain the previous behavior: + +```ruby +Journaled.default_stream_name = ENV['JOURNALED_STREAM_NAME'] +``` + +If you previously specified a `Journaled.default_app_name`, you would have required a more precise environment variable name (substitute `{{upcase_app_name}}`): + +```ruby +Journaled.default_stream_name = ENV["{{upcase_app_name}}_JOURNALED_STREAM_NAME"] +``` + +And if you had defined any `journaled_app_name` methods on `Journaled::Event` instances, you can replace them with the following: + +```ruby +def journaled_stream_name + ENV['{{upcase_app_name}}_JOURNALED_STREAM_NAME'] +end +``` + +When upgrading from 3.1 or below, `Journaled::DeliveryJob` will handle any jobs that remain in the queue by accepting an `app_name` argument. **This behavior will be removed in version 5.0**, so it is recommended to upgrade one major version at a time. + +### Upgrading from 2.5.0 + +Versions of Journaled prior to 3.0 relied direclty on `delayed_job` and a "performable" class called `Journaled::Delivery`. +In 3.0, this was superceded by an ActiveJob class called `Journaled::DeliveryJob`, but the `Journaled::Delivery` class was not removed until 4.0. + +Therefore, when upgrading from 2.5.0 or below, it is recommended to first upgrade to 3.1.0 (to allow any `Journaled::Delivery` jobs to finish working off) before upgrading to 4.0+. + +The upgrade to 3.1.0 will require a working ActiveJob config. ActiveJob can be configured globally by setting `ActiveJob::Base.queue_adapter`, or just for Journaled jobs by setting `Journaled::DeliveryJob.queue_adapter`. +The `:delayed_job` queue adapter will allow you to continue relying on `delayed_job`. You may also consider switching your app(s) to [`delayed`](https://github.com/Betterment/delayed) and using the `:delayed` queue adapter. + ## Future improvements & issue tracking Suggestions for enhancements to this engine are currently being tracked via Github Issues. Please feel free to open an issue for a desired feature, as well as for any observed bugs. diff --git a/app/controllers/concerns/journaled/actor.rb b/app/controllers/concerns/journaled/actor.rb index 3b2ec26..990ae90 100644 --- a/app/controllers/concerns/journaled/actor.rb +++ b/app/controllers/concerns/journaled/actor.rb @@ -2,11 +2,14 @@ module Journaled::Actor extend ActiveSupport::Concern included do - class_attribute :_journaled_actor_method_name, instance_accessor: false, instance_predicate: false - before_action do - RequestStore.store[:journaled_actor_proc] = self.class._journaled_actor_method_name && - -> { send(self.class._journaled_actor_method_name) } - end + class_attribute :_journaled_actor_method_name, instance_writer: false + before_action :_set_journaled_actor_proc, if: :_journaled_actor_method_name? + end + + private + + def _set_journaled_actor_proc + Journaled::Current.journaled_actor_proc = -> { send(self.class._journaled_actor_method_name) } end class_methods do diff --git a/app/models/journaled/actor_uri_provider.rb b/app/models/journaled/actor_uri_provider.rb index bb3881f..d54514e 100644 --- a/app/models/journaled/actor_uri_provider.rb +++ b/app/models/journaled/actor_uri_provider.rb @@ -8,8 +8,7 @@ def actor_uri private def actor_global_id_uri - actor = RequestStore.store[:journaled_actor_proc]&.call - actor.to_global_id.to_s if actor + Journaled::Current.actor&.to_global_id&.to_s end def fallback_global_id_uri diff --git a/app/models/journaled/event.rb b/app/models/journaled/event.rb index 432fe25..9833917 100644 --- a/app/models/journaled/event.rb +++ b/app/models/journaled/event.rb @@ -39,12 +39,18 @@ def journaled_stream_name Journaled.default_stream_name end + def tagged? + false + end + private class_methods do - def journal_attributes(*args, enqueue_with: {}) + def journal_attributes(*args, enqueue_with: {}, tagged: false) journaled_attributes.concat(args) journaled_enqueue_opts.merge!(enqueue_with) + + include Tagged if tagged end def journaled_attributes @@ -61,4 +67,20 @@ def event_type journal_attributes :id, :event_type, :created_at end + + module Tagged + extend ActiveSupport::Concern + + included do + journaled_attributes << :tags + end + + def tags + Journaled::Current.tags + end + + def tagged? + true + end + end end diff --git a/app/models/journaled/writer.rb b/app/models/journaled/writer.rb index 8a5c79b..2898f66 100644 --- a/app/models/journaled/writer.rb +++ b/app/models/journaled/writer.rb @@ -25,8 +25,7 @@ def initialize(journaled_event:) end def journal! - base_event_json_schema_validator.validate! serialized_event - json_schema_validator.validate! serialized_event + validate! Journaled::DeliveryJob .set(journaled_enqueue_opts.reverse_merge(priority: Journaled.job_priority)) .perform_later(delivery_perform_args) @@ -37,6 +36,12 @@ def journal! attr_reader :journaled_event delegate(*EVENT_METHOD_NAMES, to: :journaled_event) + def validate! + schema_validator('base_event').validate! serialized_event + schema_validator('tagged_event').validate! serialized_event if journaled_event.tagged? + schema_validator(journaled_schema_name).validate! serialized_event + end + def delivery_perform_args { serialized_event: serialized_event, @@ -49,12 +54,8 @@ def serialized_event @serialized_event ||= journaled_attributes.to_json end - def json_schema_validator - @json_schema_validator ||= Journaled::JsonSchemaModel::Validator.new(journaled_schema_name) - end - - def base_event_json_schema_validator - @base_event_json_schema_validator ||= Journaled::JsonSchemaModel::Validator.new('base_event') + def schema_validator(schema_name) + Journaled::JsonSchemaModel::Validator.new(schema_name) end def respond_to_all?(object, method_names) diff --git a/journaled.gemspec b/journaled.gemspec index f2441e2..fe8edb4 100644 --- a/journaled.gemspec +++ b/journaled.gemspec @@ -22,7 +22,6 @@ Gem::Specification.new do |s| s.add_dependency "aws-sdk-kinesis", "< 2" s.add_dependency "json-schema" s.add_dependency "railties", ">= 5.2" - s.add_dependency "request_store" s.add_development_dependency "appraisal", "~> 2.2.0" s.add_development_dependency "rspec-rails" diff --git a/journaled_schemas/tagged_event.json b/journaled_schemas/tagged_event.json new file mode 100644 index 0000000..dbb3f31 --- /dev/null +++ b/journaled_schemas/tagged_event.json @@ -0,0 +1,14 @@ +{ + "type": "object", + "title": "tagged_event", + "additionalProperties": true, + "required": [ + "tags" + ], + "properties": { + "tags": { + "type": "object", + "additionalProperties": true + } + } +} diff --git a/lib/journaled.rb b/lib/journaled.rb index 48b2697..8cc6b94 100644 --- a/lib/journaled.rb +++ b/lib/journaled.rb @@ -1,9 +1,9 @@ require "aws-sdk-kinesis" require "active_job" require "json-schema" -require "request_store" require "journaled/engine" +require "journaled/current" module Journaled SUPPORTED_QUEUE_ADAPTERS = %w(delayed delayed_job good_job que).freeze @@ -51,5 +51,17 @@ def detect_queue_adapter! end end + def self.tagged(**tags) + existing_tags = Current.tags + tag!(tags) + yield + ensure + Current.tags = existing_tags + end + + def self.tag!(**tags) + Current.tags = Current.tags.merge(tags) + end + module_function :development_or_test?, :enabled?, :schema_providers, :commit_hash, :actor_uri, :detect_queue_adapter! end diff --git a/lib/journaled/current.rb b/lib/journaled/current.rb new file mode 100644 index 0000000..e9b05fa --- /dev/null +++ b/lib/journaled/current.rb @@ -0,0 +1,18 @@ +module Journaled + class Current < ActiveSupport::CurrentAttributes + attribute :tags + attribute :journaled_actor_proc + + def tags=(value) + super(value.freeze) + end + + def tags + attributes[:tags] ||= {}.freeze + end + + def actor + journaled_actor_proc&.call + end + end +end diff --git a/lib/journaled/version.rb b/lib/journaled/version.rb index e0399a1..69b3aa9 100644 --- a/lib/journaled/version.rb +++ b/lib/journaled/version.rb @@ -1,3 +1,3 @@ module Journaled - VERSION = "4.0.0".freeze + VERSION = "4.1.0".freeze end diff --git a/spec/models/concerns/journaled/actor_spec.rb b/spec/models/concerns/journaled/actor_spec.rb index 377fa52..62ddf66 100644 --- a/spec/models/concerns/journaled/actor_spec.rb +++ b/spec/models/concerns/journaled/actor_spec.rb @@ -5,11 +5,10 @@ let(:user) { double("User") } let(:klass) do Class.new do - cattr_accessor :before_actions - self.before_actions = [] + cattr_accessor(:before_actions) { [] } - def self.before_action(&hook) - before_actions << hook + def self.before_action(method_name, _opts) + before_actions << method_name end include Journaled::Actor @@ -21,7 +20,7 @@ def current_user end def trigger_before_actions - before_actions.each { |proc| instance_eval(&proc) } + before_actions.each { |method_name| send(method_name) } end end end @@ -33,7 +32,8 @@ def trigger_before_actions allow(subject).to receive(:current_user).and_return(nil) - expect(RequestStore.store[:journaled_actor_proc].call).to eq nil + expect(Journaled::Current.journaled_actor_proc.call).to eq nil + expect(Journaled::Current.actor).to eq nil end it "Stores a thunk returning current_user if it is set when called" do @@ -41,6 +41,7 @@ def trigger_before_actions allow(subject).to receive(:current_user).and_return(user) - expect(RequestStore.store[:journaled_actor_proc].call).to eq user + expect(Journaled::Current.journaled_actor_proc.call).to eq user + expect(Journaled::Current.actor).to eq user end end diff --git a/spec/models/journaled/actor_uri_provider_spec.rb b/spec/models/journaled/actor_uri_provider_spec.rb index 5bf700e..da81444 100644 --- a/spec/models/journaled/actor_uri_provider_spec.rb +++ b/spec/models/journaled/actor_uri_provider_spec.rb @@ -2,7 +2,7 @@ RSpec.describe Journaled::ActorUriProvider do describe "#actor_uri" do - let(:request_store) { double(:[] => nil) } + let(:current_attributes) { double(:[] => nil) } let(:actor) { double(to_global_id: actor_gid) } let(:actor_gid) { double(to_s: "my_fancy_gid") } let(:program_name) { "/usr/local/bin/puma_or_something" } @@ -17,13 +17,14 @@ end before do - allow(RequestStore).to receive(:store).and_return(request_store) + allow(Journaled::Current.instance) + .to receive(:attributes).and_return(current_attributes) end - it "returns the global ID of the entity returned by RequestStore.store[:journaled_actor_proc].call if set" do - allow(request_store).to receive(:[]).and_return(-> { actor }) + it "returns the global ID of the entity returned by Current.journaled_actor_proc.call if set" do + allow(current_attributes).to receive(:[]).and_return(-> { actor }) expect(subject.actor_uri).to eq("my_fancy_gid") - expect(request_store).to have_received(:[]).with(:journaled_actor_proc) + expect(current_attributes).to have_received(:[]).with(:journaled_actor_proc) end context "when running in rake" do diff --git a/spec/models/journaled/event_spec.rb b/spec/models/journaled/event_spec.rb index 222c2b5..68641cb 100644 --- a/spec/models/journaled/event_spec.rb +++ b/spec/models/journaled/event_spec.rb @@ -88,8 +88,10 @@ context 'when no additional attributes have been defined' do it 'returns the base attributes, and memoizes them after the first call' do - expect(sample_journaled_event.journaled_attributes).to eq id: fake_uuid, created_at: frozen_time, event_type: 'some_class_name' - expect(sample_journaled_event.journaled_attributes).to eq id: fake_uuid, created_at: frozen_time, event_type: 'some_class_name' + expect(sample_journaled_event.journaled_attributes) + .to eq id: fake_uuid, created_at: frozen_time, event_type: 'some_class_name' + expect(sample_journaled_event.journaled_attributes) + .to eq id: fake_uuid, created_at: frozen_time, event_type: 'some_class_name' end end @@ -134,6 +136,80 @@ def bar ) end end + + context 'tagged: true' do + before do + sample_journaled_event_class.journal_attributes tagged: true + end + + it 'adds a "tags" attribute' do + expect(sample_journaled_event.journaled_attributes).to include(tags: {}) + end + + context 'when tags are specified' do + around do |example| + Journaled.tag!(foo: 'bar') + Journaled.tagged(baz: 'bat') { example.run } + end + + it 'adds them to the journaled attributes' do + expect(sample_journaled_event.journaled_attributes).to include( + tags: { foo: 'bar', baz: 'bat' }, + ) + end + + context 'when even more tags are nested' do + it 'merges them in and then resets them' do + Journaled.tagged(oh_no: 'even more tags') do + expect(sample_journaled_event.journaled_attributes).to include( + tags: { foo: 'bar', baz: 'bat', oh_no: 'even more tags' }, + ) + end + + allow(SecureRandom).to receive(:uuid).and_return(fake_uuid).once + expect(sample_journaled_event_class.new.journaled_attributes).to include( + tags: { foo: 'bar', baz: 'bat' }, + ) + end + end + + context 'when custom event tags are also specified and merged' do + let(:sample_journaled_event_class) do + Class.new do + include Journaled::Event + + def tags + super.merge(abc: '123') + end + end + end + + it 'combines all tags' do + expect(sample_journaled_event.journaled_attributes).to include( + tags: { foo: 'bar', baz: 'bat', abc: '123' }, + ) + end + end + + context 'when custom event tags are also specified but not merged' do + let(:sample_journaled_event_class) do + Class.new do + include Journaled::Event + + def tags + { bananas: 'are great', but_not_actually: 'the best source of potassium' } # it's true + end + end + end + + it 'adds them to the journaled attributes' do + expect(sample_journaled_event.journaled_attributes).to include( + tags: { bananas: 'are great', but_not_actually: 'the best source of potassium' }, + ) + end + end + end + end end describe '#journaled_enqueue_opts, .journaled_enqueue_opts' do diff --git a/spec/models/journaled/writer_spec.rb b/spec/models/journaled/writer_spec.rb index f03465f..60dd094 100644 --- a/spec/models/journaled/writer_spec.rb +++ b/spec/models/journaled/writer_spec.rb @@ -77,6 +77,7 @@ journaled_partition_key: 'fake_partition_key', journaled_stream_name: 'my_app_events', journaled_enqueue_opts: journaled_enqueue_opts, + tagged?: false, ) end @@ -141,5 +142,33 @@ end end end + + context 'when the event is tagged' do + before do + allow(journaled_event).to receive(:tagged?).and_return(true) + end + + context 'and the "tags" attribute is not present' do + let(:journaled_event_attributes) do + { id: 'FAKE_UUID', event_type: 'fake_event', created_at: Time.zone.now, foo: 'bar' } + end + + it 'raises an error and does not enqueue anything' do + expect { subject.journal! }.to raise_error JSON::Schema::ValidationError + expect(enqueued_jobs.count).to eq 0 + end + end + + context 'and the "tags" attribute is present' do + let(:journaled_event_attributes) do + { id: 'FAKE_UUID', event_type: 'fake_event', created_at: Time.zone.now, foo: 'bar', tags: { baz: 'bat' } } + end + + it 'creates a delivery with the app name passed through' do + expect { subject.journal! }.to change { enqueued_jobs.count }.from(0).to(1) + expect(enqueued_jobs.first[:args].first).to include('stream_name' => 'my_app_events') + end + end + end end end