Skip to content

Commit

Permalink
Replace app_name config with direct stream_name configurability (#21
Browse files Browse the repository at this point in the history
)

This replaces `default_app_name`, `journaled_app_name`, and other `app_name` usage with `default_stream_name`, `journaled_stream_name`, and `stream_name` (respectively). It also moves off of an ENV-vars-by-default approach for stream configuration, to more of a BYO-ENV-vars strategy (where you can still reference the old ENV vars if you want).

Naturally, this prompts a major version bump to 4.0, and upgrade instructions have been added the README.

#### Why?

We've started to realize that treating "app names" and kinesis streams as 1:1 pairings is a somewhat limiting convention. We already have at least one use case now where we actively want to send a subset of an app's events to a different stream (with different firehose routing rules), and as it stands we'd need to essentially invent a new "app name" and also set a new environment variable in order to make that happen. So this new config removes a layer of indirection there.

If you take a look at the upgrade instructions in the README, you'll see that the previous app-name-based world is still fully supported -- the old environment variables can be dropped directly into the new configurations, just with fewer baked-in assumptions about ENV conventions by the gem itself.

(There are more steps we could take to allow for region and IAM role configurability across event types, but for now we'll assume that these streams are still colocated in the same region and accessible with the same role.)

### Other Information

- Worth noting, **I made the choice to bake `stream_name` into the `Journaled::DeliveryJob` handlers**. This means that jobs will use the stream names that the events were configured to use at the time they were enqueued. My thought is that this is the more desirable trade-off (I find it easier to reason about at least), but it does mean that if you deploy a change to `default_stream_name` or `journaled_stream_name`, it won't kick in for jobs that were already enqueued prior to the deploy.

- Because this is a major version bump, I removed the old `Journaled::Delivery` "performable" class, since it has been succeeded by the `Journaled::DeliveryJob` ActiveJob. This means that anyone upgrading directly from 2.5.0 or below to 4.0 (this new version) will see errors if they have any jobs actively queued. I've added a recommendation to the README to upgrade one major version at a time, so 2.x -> 3.x -> 4.x would be the safest path.
  • Loading branch information
smudge authored Oct 27, 2021
1 parent 33257de commit 0cb1f01
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 375 deletions.
72 changes: 54 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ durable, eventually consistent record that discrete events happened.
[configure ActiveJob](https://guides.rubyonrails.org/active_job_basics.html)
to use one of the following queue adapters:

- `:delayed_job` (via `delayed_job_active_record`)
- `:que`
- `:good_job`
- `:delayed`
- `:delayed_job` (via `delayed_job_active_record`)
- `:que`
- `:good_job`
- `:delayed`

Ensure that your queue adapter is not configured to delete jobs on failure.
Ensure that your queue adapter is not configured to delete jobs on failure.

**If you launch your application in production mode and the gem detects that
`ActiveJob::Base.queue_adapter` is not in the above list, it will raise an exception
and prevent your application from performing unsafe journaling.**
**If you launch your application in production mode and the gem detects that
`ActiveJob::Base.queue_adapter` is not in the above list, it will raise an exception
and prevent your application from performing unsafe journaling.**

2. To integrate Journaled into your application, simply include the gem in your
app's Gemfile.
Expand All @@ -51,20 +51,21 @@ app's Gemfile.
require 'journaled/rspec'
```

3. You will also need to define the following environment variables to allow Journaled to publish events to your AWS Kinesis event stream:
3. You will need to set the following config in an initializer to allow Journaled to publish events to your AWS Kinesis event stream:

* `JOURNALED_STREAM_NAME`
```ruby
Journaled.default_stream_name = "my_app_#{Rails.env}_events"
```

Special case: if your `Journaled::Event` objects override the
`#journaled_app_name` method to a non-nil value e.g. `my_app`, you will
instead need to provide a corresponding
`[upcased_app_name]_JOURNALED_STREAM_NAME` variable for each distinct
value, e.g. `MY_APP_JOURNALED_STREAM_NAME`. You can provide a default value
for all `Journaled::Event`s in an initializer like this:
You may also define a `#journaled_stream_name` method on `Journaled::Event` instances:

```ruby
Journaled.default_app_name = 'my_app'
```
def journaled_stream_name
"my_app_#{Rails.env}_alternate_events"
end
````
3. You may also need to define environment variables to allow Journaled to publish events to your AWS Kinesis event stream:
You may optionally define the following ENV vars to specify AWS
credentials outside of the locations that the AWS SDK normally looks:
Expand All @@ -82,6 +83,41 @@ app's Gemfile.
The AWS principal whose credentials are in the environment will need to be allowed to assume this role.
### Upgrading from 3.1.0
Versions of Journaled prior to 4.0 relied directly on environment variables for stream names, but now stream names are configured directly.
When upgrading, you can use the following configuration to maintain the previous behavior:
```ruby
Journaled.default_stream_name = ENV['JOURNALED_STREAM_NAME']
```

If you previously specified a `Journaled.default_app_name`, you would have required a more precise environment variable name (substitute `{{upcase_app_name}}`):

```ruby
Journaled.default_stream_name = ENV["{{upcase_app_name}}_JOURNALED_STREAM_NAME"]
```

And if you had defined any `journaled_app_name` methods on `Journaled::Event` instances, you can replace them with the following:

```ruby
def journaled_stream_name
ENV['{{upcase_app_name}}_JOURNALED_STREAM_NAME']
end
```

When upgrading from 3.1 or below, `Journaled::DeliveryJob` will handle any jobs that remain in the queue by accepting an `app_name` argument. **This behavior will be removed in version 5.0**, so it is recommended to upgrade one major version at a time.

### Upgrading from 2.5.0

Versions of Journaled prior to 3.0 relied direclty on `delayed_job` and a "performable" class called `Journaled::Delivery`.
In 3.0, this was superceded by an ActiveJob class called `Journaled::DeliveryJob`, but the `Journaled::Delivery` class was not removed until 4.0.

Therefore, when upgrading from 2.5.0 or below, it is recommended to first upgrade to 3.1.0 (to allow any `Journaled::Delivery` jobs to finish working off) before upgrading to 4.0+.

The upgrade to 3.1.0 will require a working ActiveJob config. ActiveJob can be configured globally by setting `ActiveJob::Base.queue_adapter`, or just for Journaled jobs by setting `Journaled::DeliveryJob.queue_adapter`.
The `:delayed_job` queue adapter will allow you to continue relying on `delayed_job`. You may also consider switching your app(s) to [`delayed`](https://github.com/Betterment/delayed) and using the `:delayed` queue adapter.

## Usage

### Configuration
Expand Down
19 changes: 14 additions & 5 deletions app/jobs/journaled/delivery_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,24 @@ class DeliveryJob < ApplicationJob
raise KinesisTemporaryFailure
end

def perform(serialized_event:, partition_key:, app_name:)
UNSPECIFIED = Object.new
private_constant :UNSPECIFIED

def perform(serialized_event:, partition_key:, stream_name: UNSPECIFIED, app_name: UNSPECIFIED)
@serialized_event = serialized_event
@partition_key = partition_key
@app_name = app_name
if app_name != UNSPECIFIED
@stream_name = self.class.legacy_computed_stream_name(app_name: app_name)
elsif stream_name != UNSPECIFIED && !stream_name.nil?
@stream_name = stream_name
else
raise(ArgumentError, 'missing keyword: stream_name')
end

journal!
end

def self.stream_name(app_name:)
def self.legacy_computed_stream_name(app_name:)
env_var_name = [app_name&.upcase, 'JOURNALED_STREAM_NAME'].compact.join('_')
ENV.fetch(env_var_name)
end
Expand All @@ -37,15 +46,15 @@ def kinesis_client_config

private

attr_reader :serialized_event, :partition_key, :app_name
attr_reader :serialized_event, :partition_key, :stream_name

def journal!
kinesis_client.put_record record if Journaled.enabled?
end

def record
{
stream_name: self.class.stream_name(app_name: app_name),
stream_name: stream_name,
data: serialized_event,
partition_key: partition_key,
}
Expand Down
6 changes: 3 additions & 3 deletions app/models/journaled/change.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Journaled::Change
:database_operation,
:logical_operation,
:changes,
:journaled_app_name,
:journaled_stream_name,
:journaled_enqueue_opts,
:actor

Expand All @@ -22,15 +22,15 @@ def initialize(table_name:,
database_operation:,
logical_operation:,
changes:,
journaled_app_name:,
journaled_stream_name:,
journaled_enqueue_opts:,
actor:)
@table_name = table_name
@record_id = record_id
@database_operation = database_operation
@logical_operation = logical_operation
@changes = changes
@journaled_app_name = journaled_app_name
@journaled_stream_name = journaled_stream_name
@journaled_enqueue_opts = journaled_enqueue_opts
@actor = actor
end
Expand Down
10 changes: 5 additions & 5 deletions app/models/journaled/change_writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def journaled_change_for(database_operation, changes)
database_operation: database_operation,
logical_operation: logical_operation,
changes: JSON.dump(changes),
journaled_app_name: journaled_app_name,
journaled_stream_name: journaled_stream_name,
journaled_enqueue_opts: model.journaled_enqueue_opts,
actor: actor_uri,
)
Expand Down Expand Up @@ -57,11 +57,11 @@ def pluck_changed_values(change_hash, index:)
end
end

def journaled_app_name
if model.class.respond_to?(:journaled_app_name)
model.class.journaled_app_name
def journaled_stream_name
if model.class.respond_to?(:journaled_stream_name)
model.class.journaled_stream_name
else
Journaled.default_app_name
Journaled.default_stream_name
end
end
end
88 changes: 0 additions & 88 deletions app/models/journaled/delivery.rb

This file was deleted.

4 changes: 2 additions & 2 deletions app/models/journaled/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def journaled_partition_key
event_type
end

def journaled_app_name
Journaled.default_app_name
def journaled_stream_name
Journaled.default_stream_name
end

private
Expand Down
4 changes: 2 additions & 2 deletions app/models/journaled/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ class Journaled::Writer
journaled_schema_name
journaled_partition_key
journaled_attributes
journaled_app_name
journaled_stream_name
journaled_enqueue_opts
).freeze

Expand Down Expand Up @@ -41,7 +41,7 @@ def delivery_perform_args
{
serialized_event: serialized_event,
partition_key: journaled_partition_key,
app_name: journaled_app_name,
stream_name: journaled_stream_name,
}
end

Expand Down
2 changes: 1 addition & 1 deletion lib/journaled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
module Journaled
SUPPORTED_QUEUE_ADAPTERS = %w(delayed delayed_job good_job que).freeze

mattr_accessor :default_app_name
mattr_accessor :default_stream_name
mattr_accessor(:job_priority) { 20 }
mattr_accessor(:http_idle_timeout) { 5 }
mattr_accessor(:http_open_timeout) { 2 }
Expand Down
2 changes: 1 addition & 1 deletion lib/journaled/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Journaled
VERSION = "3.1.0".freeze
VERSION = "4.0.0".freeze
end
Loading

0 comments on commit 0cb1f01

Please sign in to comment.