Skip to content

Commit

Permalink
shared-kernel/fix:#187/Added Redis Functions for job scheduler and ex…
Browse files Browse the repository at this point in the history
…posed more coreMetrics
  • Loading branch information
suraj.kumar1 authored and DwivediAman-exe committed Sep 5, 2023
1 parent 020f0e7 commit dd42a6b
Show file tree
Hide file tree
Showing 10 changed files with 536 additions and 16 deletions.
2 changes: 2 additions & 0 deletions lib/mobility-core/mobility-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ library
, cereal
, clickhouse-haskell
, clock
, concurrency
, containers
, cryptonite
, data-default-class
Expand Down Expand Up @@ -511,6 +512,7 @@ test-suite mobility-core-tests
, cereal
, clickhouse-haskell
, clock
, concurrency
, containers
, cryptonite
, data-default-class
Expand Down
1 change: 1 addition & 0 deletions lib/mobility-core/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ dependencies:
- cereal
- beam-mysql
- sequelize
- concurrency
- casing

ghc-options:
Expand Down
180 changes: 178 additions & 2 deletions lib/mobility-core/src/Kernel/Beam/Functions.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ class
where
toTType' :: a -> t

-- Below class FromTType'' and ToTType'' are only to be used with scheduler
class
FromTType'' t a
| a -> t
where
fromTType'' :: (MonadThrow m, Log m, L.MonadFlow m) => t -> m (Maybe a)

class
ToTType'' t a
| a -> t
where
toTType'' :: a -> t

meshConfig :: MeshConfig
meshConfig =
MeshConfig
Expand Down Expand Up @@ -76,9 +89,10 @@ setMeshConfig modelName mSchema meshConfig' = do
else pure $ meshConfig' {meshEnabled = False, kvHardKilled = modelName `notElem` enableKVForRead, ecRedisDBStream = redisStream}
else pure $ meshConfig' {meshEnabled = False, kvHardKilled = modelName `notElem` enableKVForRead, ecRedisDBStream = redisStream}

getMasterDBConfig :: (HasCallStack, L.MonadFlow m) => m (DBConfig Pg)
getMasterDBConfig :: (Log m, HasCallStack, L.MonadFlow m) => m (DBConfig Pg)
getMasterDBConfig = do
dbConf <- L.getOption KBT.PsqlDbCfg
logDebug $ "getMasterDBConfig :::" <> show dbConf
case dbConf of
Just dbCnf' -> pure dbCnf'
Nothing -> L.throwException $ InternalError "masterDb Config not found"
Expand All @@ -90,7 +104,7 @@ getLocDbConfig = do
Just dbCnf' -> pure dbCnf'
Nothing -> L.throwException $ InternalError "LocationDb Config not found"

getMasterBeamConfig :: (HasCallStack, L.MonadFlow m) => m (SqlConn Pg)
getMasterBeamConfig :: (Log m, HasCallStack, L.MonadFlow m) => m (SqlConn Pg)
getMasterBeamConfig = do
inReplica <- L.getOptionLocal ReplicaEnabled
dbConf <- maybe getMasterDBConfig (\inReplica' -> if inReplica' then getReplicaDbConfig else getMasterDBConfig) inReplica
Expand Down Expand Up @@ -152,6 +166,36 @@ findOneWithKV where' = do
Right Nothing -> pure Nothing
Left err -> throwError $ InternalError $ show err

findOneWithKVScheduler ::
forall table m a.
( HasCallStack,
FromTType'' (table Identity) a,
BeamRuntime Postgres Pg,
B.HasQBuilder Postgres,
BeamRunner Pg,
Model Postgres table,
MeshMeta Postgres table,
KVConnector (table Identity),
FromJSON (table Identity),
ToJSON (table Identity),
Serialize.Serialize (table Identity),
L.MonadFlow m,
Show (table Identity),
Log m,
MonadThrow m
) =>
Where Postgres table ->
m (Maybe a)
findOneWithKVScheduler where' = do
updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig
inReplica <- L.getOptionLocal ReplicaEnabled
dbConf' <- maybe getMasterDBConfig (\inReplica' -> if inReplica' then getReplicaDbConfig else getMasterDBConfig) inReplica
result <- KV.findWithKVConnector dbConf' updatedMeshConfig where'
case result of
Right (Just res) -> fromTType'' res
Right Nothing -> pure Nothing
Left err -> throwError $ InternalError $ show err

findAllWithKV ::
forall table m a.
( HasCallStack,
Expand Down Expand Up @@ -181,6 +225,37 @@ findAllWithKV where' = do
pure $ catMaybes res'
Left err -> throwError $ InternalError $ show err

findAllWithKVScheduler ::
forall table m a.
( HasCallStack,
FromTType'' (table Identity) a,
BeamRuntime Postgres Pg,
B.HasQBuilder Postgres,
BeamRunner Pg,
Model Postgres table,
MeshMeta Postgres table,
KVConnector (table Identity),
FromJSON (table Identity),
ToJSON (table Identity),
Serialize.Serialize (table Identity),
L.MonadFlow m,
Show (table Identity),
Log m,
MonadThrow m
) =>
Where Postgres table ->
m [a]
findAllWithKVScheduler where' = do
updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig
inReplica <- L.getOptionLocal ReplicaEnabled
dbConf' <- maybe getMasterDBConfig (\inReplica' -> if inReplica' then getReplicaDbConfig else getMasterDBConfig) inReplica
result <- KV.findAllWithKVConnector dbConf' updatedMeshConfig where'
case result of
Right res -> do
res' <- mapM fromTType'' res
pure $ catMaybes res'
Left err -> throwError $ InternalError $ show err

findAllWithOptionsKV ::
forall table m a.
( HasCallStack,
Expand Down Expand Up @@ -213,6 +288,40 @@ findAllWithOptionsKV where' orderBy mbLimit mbOffset = do
pure $ catMaybes res'
Left err -> throwError $ InternalError $ show err

findAllWithOptionsKVScheduler ::
forall table m a.
( HasCallStack,
FromTType'' (table Identity) a,
BeamRuntime Postgres Pg,
B.HasQBuilder Postgres,
BeamRunner Pg,
Model Postgres table,
MeshMeta Postgres table,
KVConnector (table Identity),
FromJSON (table Identity),
ToJSON (table Identity),
Serialize.Serialize (table Identity),
L.MonadFlow m,
Show (table Identity),
Log m,
MonadThrow m
) =>
Where Postgres table ->
OrderBy table ->
Maybe Int ->
Maybe Int ->
m [a]
findAllWithOptionsKVScheduler where' orderBy mbLimit mbOffset = do
updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig
inReplica <- L.getOptionLocal ReplicaEnabled
dbConf' <- maybe getMasterDBConfig (\inReplica' -> if inReplica' then getReplicaDbConfig else getMasterDBConfig) inReplica
result <- KV.findAllWithOptionsKVConnector dbConf' updatedMeshConfig where' orderBy mbLimit mbOffset
case result of
Right res -> do
res' <- mapM fromTType'' res
pure $ catMaybes res'
Left err -> throwError $ InternalError $ show err

findOneWithDb ::
forall table m a.
( HasCallStack,
Expand Down Expand Up @@ -335,6 +444,39 @@ updateWithKV setClause whereClause = do
pure ()
Left err -> throwError $ InternalError $ show err

updateWithKVScheduler ::
forall table m.
( HasCallStack,
BeamRuntime Postgres Pg,
SqlReturning Pg Postgres,
B.HasQBuilder Postgres,
BeamRunner Pg,
Model Postgres table,
MeshMeta Postgres table,
KVConnector (table Identity),
FromJSON (table Identity),
ToJSON (table Identity),
Serialize.Serialize (table Identity),
L.MonadFlow m,
Show (table Identity),
Log m,
MonadThrow m
) =>
[Set Postgres table] ->
Where Postgres table ->
m ()
updateWithKVScheduler setClause whereClause = do
updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig
dbConf <- getMasterDBConfig
res <- KV.updateAllWithKVConnector dbConf updatedMeshConfig setClause whereClause
case res of
Right res' -> do
if updatedMeshConfig.meshEnabled && not updatedMeshConfig.kvHardKilled
then logDebug $ "Updated rows KV: " <> show res'
else logDebug $ "Updated rows DB: " <> show res'
pure ()
Left err -> throwError $ InternalError $ show err

updateOneWithKV ::
forall table m.
( HasCallStack,
Expand Down Expand Up @@ -398,6 +540,40 @@ createWithKV a = do
pure ()
Left err -> throwError $ InternalError $ show err

createWithKVScheduler ::
forall table m a.
( HasCallStack,
ToTType'' (table Identity) a,
SqlReturning Pg Postgres,
BeamRuntime Postgres Pg,
B.HasQBuilder Postgres,
BeamRunner Pg,
Model Postgres table,
MeshMeta Postgres table,
KVConnector (table Identity),
FromJSON (table Identity),
ToJSON (table Identity),
Serialize.Serialize (table Identity),
L.MonadFlow m,
Show (table Identity),
Log m,
MonadThrow m
) =>
a ->
m ()
createWithKVScheduler a = do
let tType = toTType'' a
updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig
dbConf' <- getMasterDBConfig
result <- KV.createWoReturingKVConnector dbConf' updatedMeshConfig tType
case result of
Right _ -> do
if updatedMeshConfig.meshEnabled && not updatedMeshConfig.kvHardKilled
then logDebug $ "Created row in KV: " <> show tType
else logDebug $ "Created row in DB: " <> show tType
pure ()
Left err -> throwError $ InternalError $ show err

deleteWithKV ::
forall be table beM m.
( HasCallStack,
Expand Down
70 changes: 67 additions & 3 deletions lib/mobility-core/src/Kernel/Mock/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Wwarn=missing-methods #-}

module Kernel.Mock.App where

import qualified Control.Monad.Catch as C
import Control.Monad.IO.Unlift
import qualified EulerHS.Language as L
import Kernel.Tools.Metrics.CoreMetrics
import Kernel.Types.Common
import Kernel.Utils.IOLogging
Expand All @@ -45,9 +47,68 @@ run _ server env = serve proxyApi $ hoistServer proxyApi f (healthCheckServer :<
Right res -> pure res

newtype MockM e a = MockM {runMockM :: ReaderT e IO a}
deriving newtype (Functor, Applicative, Monad, MonadReader e, MonadIO, MonadUnliftIO, C.MonadThrow, C.MonadCatch, C.MonadMask)

-- TODO: think about renaming this type and moving it to the common core
deriving newtype (Functor, Applicative, Monad, MonadReader e, MonadUnliftIO, C.MonadThrow, C.MonadCatch, C.MonadMask)

instance L.MonadFlow (MockM r) where
{-# INLINEABLE callServantAPI #-}
callServantAPI mbMgrSel url cl = L.callServantAPI mbMgrSel url cl
{-# INLINEABLE evalLogger' #-}
evalLogger' logAct = L.evalLogger' logAct
{-# INLINEABLE runIO' #-}
runIO' descr ioAct = L.runIO' descr ioAct
{-# INLINEABLE getOption #-}
getOption k = L.getOption k
{-# INLINEABLE setOption #-}
setOption k v = L.setOption k v
{-# INLINEABLE getOptionLocal #-}
getOptionLocal k = L.getOptionLocal k
{-# INLINEABLE setOptionLocal #-}
setOptionLocal k v = L.setOptionLocal k v
{-# INLINEABLE delOption #-}
delOption k = L.delOption k
{-# INLINEABLE generateGUID #-}
generateGUID = generateGUID
{-# INLINEABLE runSysCmd #-}
runSysCmd cmd = L.runSysCmd cmd
{-# INLINEABLE initSqlDBConnection #-}
initSqlDBConnection cfg = L.initSqlDBConnection cfg
{-# INLINEABLE deinitSqlDBConnection #-}
deinitSqlDBConnection conn = L.deinitSqlDBConnection conn
{-# INLINEABLE getSqlDBConnection #-}
getSqlDBConnection cfg = L.getSqlDBConnection cfg
{-# INLINEABLE initKVDBConnection #-}
initKVDBConnection cfg = L.initKVDBConnection cfg
{-# INLINEABLE deinitKVDBConnection #-}
deinitKVDBConnection conn = L.deinitKVDBConnection conn
{-# INLINEABLE getKVDBConnection #-}
getKVDBConnection cfg = L.getKVDBConnection cfg
{-# INLINEABLE runDB #-}
runDB conn dbAct = L.runDB conn dbAct
{-# INLINEABLE runTransaction #-}
runTransaction conn dbAct = L.runTransaction conn dbAct
{-# INLINEABLE await #-}
await mbMcs awaitable = L.await mbMcs awaitable
{-# INLINEABLE runSafeFlow #-}
runSafeFlow flow = L.runSafeFlow flow
{-# INLINEABLE runKVDB #-}
runKVDB cName act = L.runKVDB cName act
{-# INLINEABLE runPubSub #-}
runPubSub act = L.runPubSub act
{-# INLINEABLE publish #-}
publish channel payload = L.publish channel payload
{-# INLINEABLE subscribe #-}
subscribe channels cb = L.subscribe channels cb
{-# INLINEABLE psubscribe #-}
psubscribe channels cb = L.psubscribe channels cb
{-# INLINEABLE withModifiedRuntime #-}
withModifiedRuntime f flow = L.withModifiedRuntime f flow
{-# INLINEABLE setLoggerContext #-}
setLoggerContext k v = L.setLoggerContext k v
{-# INLINEABLE getLoggerContext #-}
getLoggerContext k = L.getLoggerContext k

instance MonadIO (MockM r) where
liftIO = L.runIO

runMock :: e -> MockM e a -> IO a
runMock env action = runReaderT (runMockM action) env
Expand All @@ -58,6 +119,9 @@ instance CoreMetrics (MockM e) where
incrementErrorCounter _ _ = return ()
addUrlCallRetries _ _ = return ()
addUrlCallRetryFailures _ = return ()
incrementSortedSetCounter _ = return ()
incrementStreamCounter _ = return ()
addGenericLatency _ _ = return ()

instance MonadTime (MockM e) where
getCurrentTime = liftIO getCurrentTime
Expand Down
Loading

0 comments on commit dd42a6b

Please sign in to comment.