Skip to content

Commit

Permalink
Introducing: Tagged Events! (journaled v4.1.0) (#22)
Browse files Browse the repository at this point in the history
This adds a new `tagged: true` option to `journal_attributes`, and `Journaled.tagged`/`Journaled.tag!` helpers that allow events to be tagged with contextual metadata (useful for audit logs and tracing):

```ruby
class MyEvent
  include Journaled::Event
      
  journal_attributes :attr_1, :attr_2, tagged: true
end

Journaled.tagged(foo: 'bar') do
  # events emitted in this scope will have a `tags` attribute with contents `{ "foo": "bar" }`
end

# events emitted in this scope will have a `tags` attribute with contents `{}`
```

All "tagged" events will be given a `tags` attribute, but it can be empty. Consuming apps with strict schema enforcement (`"additionalProperties": false`) will need to include the "tags" field in any relevant event schemas.

Under the hood, this removes `RequestStore` in favor of `ActiveSupport::CurrentAttributes`. This is the new convention for Rails apps (available in 5.2+), so in theory consuming apps shouldn't be impacted by this, but out of caution we have incremented the gem a minor version (to 4.1.0).
  • Loading branch information
smudge authored Nov 4, 2021
1 parent 0cb1f01 commit 2e0dedf
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 74 deletions.
124 changes: 83 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ add scoped ordering capability at a future date (and would gladly
entertain pull requests), but it is presently only designed to provide a
durable, eventually consistent record that discrete events happened.

**See [upgrades](#upgrades) below if you're upgrading from an older `journaled` version!**

## Installation

1. If you haven't already,
Expand Down Expand Up @@ -83,52 +85,15 @@ app's Gemfile.
The AWS principal whose credentials are in the environment will need to be allowed to assume this role.
### Upgrading from 3.1.0
Versions of Journaled prior to 4.0 relied directly on environment variables for stream names, but now stream names are configured directly.
When upgrading, you can use the following configuration to maintain the previous behavior:
```ruby
Journaled.default_stream_name = ENV['JOURNALED_STREAM_NAME']
```

If you previously specified a `Journaled.default_app_name`, you would have required a more precise environment variable name (substitute `{{upcase_app_name}}`):

```ruby
Journaled.default_stream_name = ENV["{{upcase_app_name}}_JOURNALED_STREAM_NAME"]
```

And if you had defined any `journaled_app_name` methods on `Journaled::Event` instances, you can replace them with the following:

```ruby
def journaled_stream_name
ENV['{{upcase_app_name}}_JOURNALED_STREAM_NAME']
end
```

When upgrading from 3.1 or below, `Journaled::DeliveryJob` will handle any jobs that remain in the queue by accepting an `app_name` argument. **This behavior will be removed in version 5.0**, so it is recommended to upgrade one major version at a time.

### Upgrading from 2.5.0

Versions of Journaled prior to 3.0 relied direclty on `delayed_job` and a "performable" class called `Journaled::Delivery`.
In 3.0, this was superceded by an ActiveJob class called `Journaled::DeliveryJob`, but the `Journaled::Delivery` class was not removed until 4.0.

Therefore, when upgrading from 2.5.0 or below, it is recommended to first upgrade to 3.1.0 (to allow any `Journaled::Delivery` jobs to finish working off) before upgrading to 4.0+.

The upgrade to 3.1.0 will require a working ActiveJob config. ActiveJob can be configured globally by setting `ActiveJob::Base.queue_adapter`, or just for Journaled jobs by setting `Journaled::DeliveryJob.queue_adapter`.
The `:delayed_job` queue adapter will allow you to continue relying on `delayed_job`. You may also consider switching your app(s) to [`delayed`](https://github.com/Betterment/delayed) and using the `:delayed` queue adapter.

## Usage
### Configuration
Journaling provides a number of different configuation options that can be set in Ruby using an initializer. Those values are:
#### `Journaled.default_app_name`
#### `Journaled.default_stream_name `
This is described in the proceeding paragraph and is used to specify which app name to use, which corresponds to which Journaled Stream to send events too.
This is the default value for events that do NOT specify their own `#journaled_app_name`. For events that define their own `#journaled_app_name` method, that will take precedence over this default.
Ex: `Journaled.default_app_name = 'my_app'`
This is described in the "Installation" section above, and is used to specify which stream name to use.
#### `Journaled.job_priority` (default: 20)
Expand Down Expand Up @@ -185,8 +150,8 @@ class ApplicationController < ActionController::Base
end
```

Your authenticated entity must respond to `#to_global_id`, which
ActiveRecords do by default.
Your authenticated entity must respond to `#to_global_id`, which ActiveRecords do by default.
This feature relies on `ActiveSupport::CurrentAttributes` under the hood.

Every time any of the specified attributes is modified, or a `User`
record is created or destroyed, an event will be sent to Kinesis with the following attributes:
Expand Down Expand Up @@ -214,6 +179,40 @@ journaling. Note that the less-frequently-used methods `toggle`,
`increment*`, `decrement*`, and `update_counters` are not intercepted at
this time.

### Tagged Events

Events may be optionally marked as "tagged." This will add a `tags` field, intended for tracing and
auditing purposes.

```ruby
class MyEvent
include Journaled::Event

journal_attributes :attr_1, :attr_2, tagged: true
end
```

You may then use `Journaled.tag!` and `Journaled.tagged` inside of your
`ApplicationController` and `ApplicationJob` classes (or anywhere else!) to tag
all events with request and job metadata:

```ruby
class ApplicationController < ActionController::Base
before_action do
Journaled.tag!(request_id: request.request_id, current_user_id: current_user&.id)
end
end

class ApplicationJob < ActiveJob::Base
around_perform do |job, perform|
Journaled.tagged(job_id: job.id) { perform.call }
end
end
```

This feature relies on `ActiveSupport::CurrentAttributes` under the hood, so these tags are local to
the current thread, and will be cleared at the end of each request request/job.

#### Testing

If you use RSpec (and have required `journaled/rspec` in your
Expand Down Expand Up @@ -353,6 +352,49 @@ Returns one of the following in order of preference:
In order for this to be most useful, you must configure your controller
as described in [Change Journaling](#change-journaling) above.

## Upgrades

Since this gem relies on background jobs (which can remain in the queue across
code releases), this gem generally aims to support jobs enqueued by the prior
gem version.

As such, **we always recommend upgrading only one major version at a time.**

### Upgrading from 3.1.0

Versions of Journaled prior to 4.0 relied directly on environment variables for stream names, but now stream names are configured directly.
When upgrading, you can use the following configuration to maintain the previous behavior:

```ruby
Journaled.default_stream_name = ENV['JOURNALED_STREAM_NAME']
```

If you previously specified a `Journaled.default_app_name`, you would have required a more precise environment variable name (substitute `{{upcase_app_name}}`):

```ruby
Journaled.default_stream_name = ENV["{{upcase_app_name}}_JOURNALED_STREAM_NAME"]
```

And if you had defined any `journaled_app_name` methods on `Journaled::Event` instances, you can replace them with the following:

```ruby
def journaled_stream_name
ENV['{{upcase_app_name}}_JOURNALED_STREAM_NAME']
end
```

When upgrading from 3.1 or below, `Journaled::DeliveryJob` will handle any jobs that remain in the queue by accepting an `app_name` argument. **This behavior will be removed in version 5.0**, so it is recommended to upgrade one major version at a time.

### Upgrading from 2.5.0

Versions of Journaled prior to 3.0 relied direclty on `delayed_job` and a "performable" class called `Journaled::Delivery`.
In 3.0, this was superceded by an ActiveJob class called `Journaled::DeliveryJob`, but the `Journaled::Delivery` class was not removed until 4.0.

Therefore, when upgrading from 2.5.0 or below, it is recommended to first upgrade to 3.1.0 (to allow any `Journaled::Delivery` jobs to finish working off) before upgrading to 4.0+.

The upgrade to 3.1.0 will require a working ActiveJob config. ActiveJob can be configured globally by setting `ActiveJob::Base.queue_adapter`, or just for Journaled jobs by setting `Journaled::DeliveryJob.queue_adapter`.
The `:delayed_job` queue adapter will allow you to continue relying on `delayed_job`. You may also consider switching your app(s) to [`delayed`](https://github.com/Betterment/delayed) and using the `:delayed` queue adapter.

## Future improvements & issue tracking
Suggestions for enhancements to this engine are currently being tracked via Github Issues. Please feel free to open an
issue for a desired feature, as well as for any observed bugs.
13 changes: 8 additions & 5 deletions app/controllers/concerns/journaled/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ module Journaled::Actor
extend ActiveSupport::Concern

included do
class_attribute :_journaled_actor_method_name, instance_accessor: false, instance_predicate: false
before_action do
RequestStore.store[:journaled_actor_proc] = self.class._journaled_actor_method_name &&
-> { send(self.class._journaled_actor_method_name) }
end
class_attribute :_journaled_actor_method_name, instance_writer: false
before_action :_set_journaled_actor_proc, if: :_journaled_actor_method_name?
end

private

def _set_journaled_actor_proc
Journaled::Current.journaled_actor_proc = -> { send(self.class._journaled_actor_method_name) }
end

class_methods do
Expand Down
3 changes: 1 addition & 2 deletions app/models/journaled/actor_uri_provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ def actor_uri
private

def actor_global_id_uri
actor = RequestStore.store[:journaled_actor_proc]&.call
actor.to_global_id.to_s if actor
Journaled::Current.actor&.to_global_id&.to_s
end

def fallback_global_id_uri
Expand Down
24 changes: 23 additions & 1 deletion app/models/journaled/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ def journaled_stream_name
Journaled.default_stream_name
end

def tagged?
false
end

private

class_methods do
def journal_attributes(*args, enqueue_with: {})
def journal_attributes(*args, enqueue_with: {}, tagged: false)
journaled_attributes.concat(args)
journaled_enqueue_opts.merge!(enqueue_with)

include Tagged if tagged
end

def journaled_attributes
Expand All @@ -61,4 +67,20 @@ def event_type

journal_attributes :id, :event_type, :created_at
end

module Tagged
extend ActiveSupport::Concern

included do
journaled_attributes << :tags
end

def tags
Journaled::Current.tags
end

def tagged?
true
end
end
end
17 changes: 9 additions & 8 deletions app/models/journaled/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ def initialize(journaled_event:)
end

def journal!
base_event_json_schema_validator.validate! serialized_event
json_schema_validator.validate! serialized_event
validate!
Journaled::DeliveryJob
.set(journaled_enqueue_opts.reverse_merge(priority: Journaled.job_priority))
.perform_later(delivery_perform_args)
Expand All @@ -37,6 +36,12 @@ def journal!
attr_reader :journaled_event
delegate(*EVENT_METHOD_NAMES, to: :journaled_event)

def validate!
schema_validator('base_event').validate! serialized_event
schema_validator('tagged_event').validate! serialized_event if journaled_event.tagged?
schema_validator(journaled_schema_name).validate! serialized_event
end

def delivery_perform_args
{
serialized_event: serialized_event,
Expand All @@ -49,12 +54,8 @@ def serialized_event
@serialized_event ||= journaled_attributes.to_json
end

def json_schema_validator
@json_schema_validator ||= Journaled::JsonSchemaModel::Validator.new(journaled_schema_name)
end

def base_event_json_schema_validator
@base_event_json_schema_validator ||= Journaled::JsonSchemaModel::Validator.new('base_event')
def schema_validator(schema_name)
Journaled::JsonSchemaModel::Validator.new(schema_name)
end

def respond_to_all?(object, method_names)
Expand Down
1 change: 0 additions & 1 deletion journaled.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ Gem::Specification.new do |s|
s.add_dependency "aws-sdk-kinesis", "< 2"
s.add_dependency "json-schema"
s.add_dependency "railties", ">= 5.2"
s.add_dependency "request_store"

s.add_development_dependency "appraisal", "~> 2.2.0"
s.add_development_dependency "rspec-rails"
Expand Down
14 changes: 14 additions & 0 deletions journaled_schemas/tagged_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "object",
"title": "tagged_event",
"additionalProperties": true,
"required": [
"tags"
],
"properties": {
"tags": {
"type": "object",
"additionalProperties": true
}
}
}
14 changes: 13 additions & 1 deletion lib/journaled.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
require "aws-sdk-kinesis"
require "active_job"
require "json-schema"
require "request_store"

require "journaled/engine"
require "journaled/current"

module Journaled
SUPPORTED_QUEUE_ADAPTERS = %w(delayed delayed_job good_job que).freeze
Expand Down Expand Up @@ -51,5 +51,17 @@ def detect_queue_adapter!
end
end

def self.tagged(**tags)
existing_tags = Current.tags
tag!(tags)
yield
ensure
Current.tags = existing_tags
end

def self.tag!(**tags)
Current.tags = Current.tags.merge(tags)
end

module_function :development_or_test?, :enabled?, :schema_providers, :commit_hash, :actor_uri, :detect_queue_adapter!
end
18 changes: 18 additions & 0 deletions lib/journaled/current.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module Journaled
class Current < ActiveSupport::CurrentAttributes
attribute :tags
attribute :journaled_actor_proc

def tags=(value)
super(value.freeze)
end

def tags
attributes[:tags] ||= {}.freeze
end

def actor
journaled_actor_proc&.call
end
end
end
2 changes: 1 addition & 1 deletion lib/journaled/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Journaled
VERSION = "4.0.0".freeze
VERSION = "4.1.0".freeze
end
15 changes: 8 additions & 7 deletions spec/models/concerns/journaled/actor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
let(:user) { double("User") }
let(:klass) do
Class.new do
cattr_accessor :before_actions
self.before_actions = []
cattr_accessor(:before_actions) { [] }

def self.before_action(&hook)
before_actions << hook
def self.before_action(method_name, _opts)
before_actions << method_name
end

include Journaled::Actor
Expand All @@ -21,7 +20,7 @@ def current_user
end

def trigger_before_actions
before_actions.each { |proc| instance_eval(&proc) }
before_actions.each { |method_name| send(method_name) }
end
end
end
Expand All @@ -33,14 +32,16 @@ def trigger_before_actions

allow(subject).to receive(:current_user).and_return(nil)

expect(RequestStore.store[:journaled_actor_proc].call).to eq nil
expect(Journaled::Current.journaled_actor_proc.call).to eq nil
expect(Journaled::Current.actor).to eq nil
end

it "Stores a thunk returning current_user if it is set when called" do
subject.trigger_before_actions

allow(subject).to receive(:current_user).and_return(user)

expect(RequestStore.store[:journaled_actor_proc].call).to eq user
expect(Journaled::Current.journaled_actor_proc.call).to eq user
expect(Journaled::Current.actor).to eq user
end
end
Loading

0 comments on commit 2e0dedf

Please sign in to comment.