Skip to content

Public repository with a convenience adapter & testing classes for apps talking to NSQ

License

Notifications You must be signed in to change notification settings

fastly/fastly_nsq

Repository files navigation

fastly_nsq Build Status

NSQ adapter and testing objects for using the NSQ messaging system in your Ruby project.

This library is intended to facilitate publishing and consuming messages on an NSQ messaging queue.

We also include fakes to make testing easier.

This library is dependent on the nsq-ruby-fastly gem.

Please use GitHub Issues to report bugs.

Documentation

Install

fastly_nsq is a Ruby Gem tested against Rails >= 4.2 and Ruby >= 2.1.8.

To get started, add fastly_nsq to your Gemfile and bundle install.

Usage

Connections

NSQD cconnections can be discovered via nsqlookupd's or specified explicity for consumers and producers.

Using nsqlookup:

Set the ENV variable to a comma sepearated string of lookups:

ENV['NSQLOOKUPD_HTTP_ADDRESS'] = "lookup01:1234,lookup01:1234"

Or configure them directly:

FastlyNsq.configure do |config|
  config.lookupd_http_addresses = ["lookup01:1234", "lookup02:1234"]
end

Using nsqd directly:

NSQD connections can be specified for consumers and producers. Being able to set different sets for consumers and producers facilitates removing and adding new instances without downtime.

Set the following ENV variables to a comma sepearted string of nsqds:

ENV['NSQD_CONSUMERS']="nsqd01:4150,nsd02:4150"
ENV['NSQD_PRODUCERS']="nsqd01:4150,nsd02:4150"

Or configure them directly:

FastlyNsq.configure do |config|
  config.consumer_nsqds = ["nsqd01:4150", "nsqd02:4150"]
  config.producer_nsqds = ["nsqd01:4150", "nsqd02:4150"]
end

Connection Priority

When FastlyNsq.consumer_nsqds or FastlyNsq.producer_nsqds are set they will be used instead of FastlyNsq.lookupd_http_addresses.

TLS

Set the following ENV variables to enable TLS support:

NSQ_SSL_KEY
NSQ_SSL_CERTIFICATE
NSQ_SSL_CA_CERTIFICATE
NSQ_SSL_VERIFY_MODE (optional)

FastlyNsq::Producer

This is a class which provides an adapter to the fake and real NSQ producers. These are used to write messages onto the queue:

message_data = {
  "data" => {
    "key" => "value"
  }
}

producer = FastlyNsq::Producer.new(
  topic: topic,
)

producer.write(message_data.to_json)

The mock/real strategy used can be switched by requiring the test file and configuring the mode.

require 'fastly_nsq/testing'
FastlyNsq::Testing.enabled? #=> true
FastlyNsq::Testing.disabled? #=> false

producer = FastlyNsq::Producer.new(topic: topic)
listener = FastlyNsq::Listener.new(topic: topic, channel: channel, processor: ->(m) { puts 'got: '+ m.body })

FastlyNsq::Testing.fake! # default, messages accumulate on the listeners

producer.write '{"foo":"bar"}'
listener.messages.size #=> 1

FastlyNsq::Testing.reset!  # remove all accumulated messages

listener.messages.size #=> 0

producer.write '{"foo":"bar"}'
listener.messages.size #=> 1

listener.drain
#  got: {"foo"=>"bar"}
listener.messages.size #=> 0

FastlyNsq::Testing.inline! # messages are processed as they are produced
producer.write '{"foo":"bar"}'
#  got: {"foo"=>"bar"}
listener.messages.size #=> 0

FastlyNsq::Testing.disable! # do it live
FastlyNsq::Testing.enable!  # re-enable testing mode

FastlyNsq::Consumer

This is a class which provides an adapter to the fake and real NSQ consumers. These are used to read messages off of the queue:

consumer = FastlyNsq::Consumer.new(
  topic: 'topic',
  channel: 'channel'
)

consumer.size #=> 1
message = consumer.pop
message.body #=> "{ 'data': { 'key': 'value' } }"
message.finish
consumer.size #=> 0
consumer.terminate

FastlyNsq::Listener

To process the next message on the queue:

topic     = 'user_created'
channel   = 'my_consuming_service'
processor = MessageProcessor

FastlyNsq::Listener.new(topic: topic, channel: channel, processor: processor)

This will send messages through FastlyNsq.manager.pool off of the queue and send the JSON text body to MessageProcessor.call(message).

Specify a topic priority by providing a number (default is 0)

topic     = 'user_created'
channel   = 'my_consuming_service'
processor = MessageProcessor
priority  = 1 # a little higher

FastlyNsq::Listener.new(topic: topic, channel: channel, processor: processor, priority: priority)

FastlyNsq::CLI

To help facilitate running the FastlyNsq::Listener in a blocking fashion outside your application, a CLI and bin script fastly_nsq are provided.

This can be setup ahead of time by calling FastlyNsq.configure and passing block.

# config/fastly_nsq.rb
FastlyNsq.configure do |config|
  config.channel = 'fnsq'
  config.logger = Logger.new
  config.preprocessor = ->(_) { FastlyNsq.logger.info 'PREPROCESSESES' }

  config.max_attempts = 20
  config.max_req_timeout = (60 * 60 * 4 * 1_000) # 4 hours
  config.max_processing_pool_threads = 10

  lc.listen 'posts', ->(m) { puts "posts: #{m.body}" }
  lc.listen 'blogs', ->(m) { puts "blogs: #{m.body}" }, priority: 3
end

An example of using the cli:

./bin/fastly_nsq -r config/fastly_nsq.rb -L ./test.log -P ./fastly_nsq.pid -v -d -t 4 -c 10

FastlyNsq::Messenger

Wrapper around a producer for sending messages and persisting producer objects.

FastlyNsq::Messenger.deliver(message: msg, topic: 'my_topic', originating_service: 'my service')

You can also optionally pass custom metadata.

FastlyNsq::Messenger.deliver(message: msg, topic: 'my_topic', originating_service: 'my service', meta: { test: 'test' })

This will use a FastlyNsq::Producer for the given topic or create on if it isn't already persisted. Then it will write the passed message to the queue. If you don't set the originating service it will use unknown

You can also set the originating service for all deliver calls:

FastlyNsq::Messenger.originating_service = 'some awesome service'

FastlyNsq::Messenger also spuports delivering multiple message at once and will use the NSQ mpub directive under the hood.

FastlyNsq::Messenger.deliver_multi(messages: array_of_msgs, topic: 'my_topic')

FastlyNsq::Messenger can also be used to manage Producer connections

# get a producer:
producer = FastlyNsq::Messenger.producer_for(topic: 'hot_topic')

# get a hash of all persisted producers:
producers = FastlyNsq::Messenger.producers

# terminate a producer
FastlyNsq::Messenger.terminate_producer(topic: 'hot_topic')

# terminate all producers
FastlyNsq::Messenger.terminate_all_producers

FastlyNsq::Http

Wrappers around nsqd and nsqlookupd http api's described here:

Nsqd

Implements most of the Nsqd api.

Example usage:

FastlyNsq::Http::Nsqd.ping
FastlyNsq::Http::Nsqd.create_channel(topic: 'foo', channel: 'bar')
FastlyNsq::Http::Nsqd.stats(topic: 'foo', format: '')

TODO:

  1. Debug endpoints (/debug/*)
  2. Config PUT (/config/nsqlookupd_tcp_address)
  3. Correct Handling of mpub binary mode

Nsqlookupd

Implements all of the Nsqlookupd api.

Example usage:

FastlyNsq::Http::Nsqlookupd.nodes
FastlyNsq::Http::Nsqlookupd.lookup(topic: 'foo')

Testing

FastlyNsq provides a test mode and a helper class to make testing easier.

In order to test classes that use FastlyNsq without having real connections to NSQ:

require 'fastly_nsq/testing'

RSpec.configure do |config|
  config.before(:each) do
    FastlyNsq::Testing.fake!
    FastlyNsq::Testing.reset!
  end
end

To test processor classes you can create test messages:

test_message = FastlyNsq::Testing.message(data: { 'count' => 123 })

My::ProcessorKlass.call(test_message)

expect(some_result)

Configuration

See the documentation for additional settings

Example:

FastlyNsq.configure do |config|
 config.channel = "z"
 config.producer_nsqds = ["nsqd01:4150", "nsqd02:4150"]
 config.lookupd_http_addresses = ["lookupd01:4161", "lookupd02:4161"]
 config.logger = Logger.new(STDOUT)
 config.max_attempts = 10
 config.max_req_timeout = 10_000
 config.max_processing_pool_threads = 42
end

Environment Variables

The URLs for the various NSQ endpoints are expected in ENV variables.

Below are the required variables and sample values for using stock NSQ on OS X, installed via Homebrew:

NSQD_HTTP_ADDRESS='127.0.0.1:4151'
NSQLOOKUPD_HTTP_ADDRESS='127.0.0.1:4161, 10.1.1.101:4161'
NSQD_CONSUMERS='127.0.0.1:4150'
NSQD_PRODUCERS='127.0.0.1:4150'

See the .sample.env file for more detail.

Development

The fastest way to get up and running for development is to use the Docker container provided by Docker Compose:

  • Clone: git clone https://github.com/fastly/fastly_nsq.git
  • cd fastly_nsq
  • run bundle install
  • run docker-compose up -d
  • rake spec

You will still need the ENV variables as defined above.

Contributors

Acknowledgements

Copyright

Copyright (c) 2016 Fastly, Inc under an MIT license.

See LICENSE.txt for details.

Metadata

  • Ignore

About

Public repository with a convenience adapter & testing classes for apps talking to NSQ

Topics

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Languages