Skip to content

Commit

Permalink
Add support for queue namespacing (#52)
Browse files Browse the repository at this point in the history
* Add support for queue namespacing

Namespacing is a powerful concept in multi-tenant infrastructure,
commonly supported as an entry in connection strings (e.g. setting
`dbname` when creating a [PG connection URI](https://www.postgresql.org/docs/10/libpq-connect.html)).

To support multi-tenant Faktory instances, namespacing can be easily
achieved by prefixing queue names with a provided namespace.

* Add namespace parsing tests

* Clean up namespacing types and also qualify worker queues

* Prettier toNamespace
  • Loading branch information
cbeav authored Oct 23, 2020
1 parent 106b2b6 commit 259dffd
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 13 deletions.
6 changes: 4 additions & 2 deletions README.lhs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ When using `envSettings`, the following variables will be used:
- `FAKTORY_PROVIDER`: the name of another environment variable where the
connection string can be found. Defaults to `FAKTORY_URL`.
- `FAKTORY_URL` (or whatever you named in `FAKTORY_PROVIDER`): connection string
to the Faktory server. Format is `tcp(+tls)://(:password@)host:port`. Defaults
to `tcp://localhost:4719`.
to the Faktory server. Format is
`tcp(+tls)://(:password@)host:port(/namespace)`. Defaults to
`tcp://localhost:4719`. `namespace` is prependend to queue names on job
submission and worker consumption.
When using `envWorkerSettings`, the following variables are also used:
Expand Down
2 changes: 1 addition & 1 deletion library/Faktory/Client.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Faktory.Client
(
-- * Client operations
Client
Client(..)
, newClient
, closeClient

Expand Down
12 changes: 10 additions & 2 deletions library/Faktory/Connection.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Faktory.Connection
( ConnectionInfo(..)
, Namespace(..)
, defaultConnectionInfo
, envConnectionInfo
, connect
Expand All @@ -25,11 +26,15 @@ import Text.Megaparsec
)
import Text.Megaparsec.Char (char, digitChar, string, upperChar)

newtype Namespace = Namespace Text
deriving newtype (Eq, Show)

data ConnectionInfo = ConnectionInfo
{ connectionInfoTls :: Bool
, connectionInfoPassword :: Maybe String
, connectionInfoHostName :: HostName
, connectionInfoPort :: PortNumber
, connectionInfoNamespace :: Namespace
}
deriving stock (Eq, Show)

Expand All @@ -39,14 +44,15 @@ defaultConnectionInfo = ConnectionInfo
, connectionInfoPassword = Nothing
, connectionInfoHostName = "localhost"
, connectionInfoPort = 7419
, connectionInfoNamespace = Namespace ""
}

-- | Parse a @'Connection'@ from environment variables
--
-- > FAKTORY_PROVIDER=FAKTORY_URL
-- > FAKTORY_URL=tcp://:my-password@localhost:7419
--
-- Supported format is @tcp(+tls):\/\/(:password@)host:port@.
-- Supported format is @tcp(+tls):\/\/(:password@)host:port(/namespace)@.
--
-- See <https://github.com/contribsys/faktory/wiki/Worker-Lifecycle#url-configuration>.
--
Expand Down Expand Up @@ -92,11 +98,13 @@ parseProvider =
some (upperChar <|> char '_') <?> "an environment variable name"

parseConnection :: Parser ConnectionInfo
parseConnection = go <?> "tcp(+tls)://(:<password>@)<host>:<port>"
parseConnection = go <?> "tcp(+tls)://(:<password>@)<host>:<port>(/namespace)"
where
go =
ConnectionInfo
<$> (False <$ string "tcp://" <|> True <$ string "tcp+tls://")
<*> optional (char ':' *> manyTill anySingle (char '@'))
<*> manyTill anySingle (char ':')
<*> (read <$> some digitChar)
<*> (toNamespace <$> optional (char '/' *> some anySingle))
toNamespace = Namespace . maybe "" pack
20 changes: 14 additions & 6 deletions library/Faktory/Job.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import Data.Aeson.Casing
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NE
import Data.Time
import Faktory.Producer (Producer, pushJob)
import Faktory.Settings (Queue)
import Faktory.Client (Client(..))
import Faktory.Producer (Producer(..), pushJob)
import Faktory.Settings
import GHC.Generics
import GHC.Stack
import System.Random
Expand Down Expand Up @@ -68,16 +69,23 @@ newtype JobOptions = JobOptions [JobUpdate]
perform
:: (HasCallStack, ToJSON arg) => JobOptions -> Producer -> arg -> IO JobId
perform options producer arg = do
job <- applyOptions options =<< newJob arg
let
namespace =
connectionInfoNamespace
$ settingsConnection
$ clientSettings
$ producerClient producer
job <- applyOptions namespace options =<< newJob arg
jobJid job <$ pushJob producer job

applyOptions :: JobOptions -> Job arg -> IO (Job arg)
applyOptions (JobOptions patches) = go patches
applyOptions :: Namespace -> JobOptions -> Job arg -> IO (Job arg)
applyOptions namespace (JobOptions patches) = go patches
where
go [] job = pure job
go (set : sets) job = case set of
SetRetry n -> go sets $ job { jobRetry = Just n }
SetQueue q -> go sets $ job { jobQueue = Just q }
SetQueue q ->
go sets $ job { jobQueue = Just $ namespaceQueue namespace q }
SetJobtype jt -> go sets $ job { jobJobtype = jt }
SetAt time -> go sets $ job { jobAt = Just time }
SetIn diff -> do
Expand Down
2 changes: 1 addition & 1 deletion library/Faktory/Producer.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Faktory.Producer
( Producer
( Producer(..)
, newProducer
, newProducerEnv
, closeProducer
Expand Down
5 changes: 5 additions & 0 deletions library/Faktory/Settings.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ module Faktory.Settings
, defaultWorkerSettings
, envWorkerSettings
, Queue(..)
, namespaceQueue
, queueArg
, defaultQueue
, WorkerId
, randomWorkerId

-- * Re-exports
, ConnectionInfo(..)
, Namespace(..)
) where

import Faktory.Prelude
Expand Down Expand Up @@ -73,6 +75,9 @@ envWorkerSettings = do
newtype Queue = Queue Text
deriving newtype (IsString, FromJSON, ToJSON)

namespaceQueue :: Namespace -> Queue -> Queue
namespaceQueue (Namespace n) (Queue q) = Queue $ mappend n q

queueArg :: Queue -> ByteString
queueArg (Queue q) = fromStrict $ encodeUtf8 q

Expand Down
4 changes: 3 additions & 1 deletion library/Faktory/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ processorLoop
-> IO ()
processorLoop client settings workerSettings f = do
let
namespace = connectionInfoNamespace $ settingsConnection settings
processAndAck job = do
f $ jobArg job
ackJob client job

emJob <- fetchJob client $ settingsQueue workerSettings
emJob <- fetchJob client $ namespaceQueue namespace $ settingsQueue
workerSettings

case emJob of
Left err -> settingsLogError settings $ "Invalid Job: " <> err
Expand Down
14 changes: 14 additions & 0 deletions tests/Faktory/ConnectionSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ spec = do
connectionInfoPassword `shouldBe` Nothing
connectionInfoHostName `shouldBe` "foo"
connectionInfoPort `shouldBe` 123
connectionInfoNamespace `shouldBe` Namespace ""

it "parses tls and password" $ do
let
Expand All @@ -45,6 +46,19 @@ spec = do
connectionInfoHostName `shouldBe` "bar"
connectionInfoPort `shouldBe` 123

it "parses namespace" $ do
let
env =
[ ("FAKTORY_PROVIDER", Nothing)
, ("FAKTORY_URL", Just "tcp://localhost:7419/prefix")
]

withEnvironment env $ do
ConnectionInfo {..} <- envConnectionInfo
connectionInfoHostName `shouldBe` "localhost"
connectionInfoPort `shouldBe` 7419
connectionInfoNamespace `shouldBe` Namespace "prefix"

it "follows _PROVIDER to find _URL" $ do
let
env =
Expand Down

0 comments on commit 259dffd

Please sign in to comment.