Skip to content

Commit

Permalink
shared-kernel/fix:#187/Added Redis Sorted Set Functions and Redis Str…
Browse files Browse the repository at this point in the history
…eams Functions for Job Scheduling
  • Loading branch information
suraj.kumar1 authored and DwivediAman-exe committed Aug 18, 2023
1 parent 1665c4f commit 3f3f5d2
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 11 deletions.
3 changes: 3 additions & 0 deletions lib/mobility-core/src/Kernel/Mock/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ instance CoreMetrics (MockM e) where
incrementErrorCounter _ _ = return ()
addUrlCallRetries _ _ = return ()
addUrlCallRetryFailures _ = return ()
incrementSortedSetCounter _ _ = return ()
incrementStreamCounter _ _ = return ()
addGenericLatency _ _ = return ()

instance MonadTime (MockM e) where
getCurrentTime = liftIO getCurrentTime
Expand Down
173 changes: 169 additions & 4 deletions lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import qualified Data.Aeson as Ae
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.String.Conversions
import Data.Text hiding (map, null)
import Data.Text hiding (concatMap, map, null)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as DE
import Database.Redis (Queued, Redis, RedisTx, Reply, TxResult (..))
import qualified Database.Redis as Hedis
import EulerHS.Prelude (whenLeft)
Expand All @@ -34,6 +33,32 @@ import Kernel.Utils.Logging

type ExpirationTime = Int

data XReadResponse = XReadResponse
{ stream :: BS.ByteString,
records :: [StreamsRecord]
}
deriving (Show)

data StreamsRecord = StreamsRecord
{ recordId :: BS.ByteString,
keyValues :: [(BS.ByteString, BS.ByteString)]
}
deriving (Show)

convertFromHedisResponse :: Hedis.XReadResponse -> XReadResponse
convertFromHedisResponse hedisResponse =
XReadResponse
{ stream = Hedis.stream hedisResponse,
records = map convertFromHedisRecord (Hedis.records hedisResponse)
}

convertFromHedisRecord :: Hedis.StreamsRecord -> StreamsRecord
convertFromHedisRecord hedisRecord =
StreamsRecord
{ recordId = Hedis.recordId hedisRecord,
keyValues = Hedis.keyValues hedisRecord
}

runHedis ::
HedisFlow m env => Redis (Either Reply a) -> m a
runHedis action = do
Expand Down Expand Up @@ -153,7 +178,7 @@ getImpl decodeResult key = withLogTag "Redis" $ do
Just res' -> decodeResult res'

get :: (FromJSON a, HedisFlow m env) => Text -> m (Maybe a)
get key = getImpl decodeResult key
get = getImpl decodeResult
where
decodeResult bs = Error.fromMaybeM (HedisDecodeError $ cs bs) $ Ae.decode $ BSL.fromStrict bs

Expand Down Expand Up @@ -415,7 +440,7 @@ zrevrangeWithscores key start stop = do
pure $ map (\(k, score) -> (cs' k, score)) res
where
cs' :: BS.ByteString -> Text
cs' = DE.decodeUtf8
cs' = cs

zScore :: (FromJSON Double, HedisFlow m env) => Text -> Text -> m (Maybe Double)
zScore key member = do
Expand All @@ -429,3 +454,143 @@ zRevRank key member = do

zCard :: (HedisFlow m env) => Text -> m Integer
zCard key = runWithPrefix key Hedis.zcard

zAdd ::
(ToJSON member, HedisFlow m env) =>
Text ->
[(Double, member)] ->
m ()
zAdd key members = withLogTag "Redis" $ do
migrating <- asks (.hedisMigrationStage)
if migrating
then do
res <- withTimeRedis "RedisStandalone" "zAdd" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zadd prefKey $ map (\(score, member) -> (score, BSL.toStrict $ Ae.encode member)) members)
whenLeft res (\err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_ZADD" $ show err)
else pure ()
res <- withTimeRedis "RedisCluster" "zAdd" $ try @_ @SomeException (runWithPrefix_ key $ \prefKey -> Hedis.zadd prefKey $ map (\(score, member) -> (score, BSL.toStrict $ Ae.encode member)) members)
whenLeft res (\err -> withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZADD" $ show err)

xInfoGroups ::
(HedisFlow m env) =>
Text -> -- Stream key
m Bool
xInfoGroups key = do
eitherMaybeBS <- withTimeRedis "RedisCluster" "xInfoGroups" $ try @_ @SomeException (runWithPrefix key Hedis.xinfoGroups)
ls <-
case eitherMaybeBS of
Left err -> logTagInfo "ERROR_WHILE_GET_XInfoGroups" (show err) $> []
Right maybeBS -> pure maybeBS
return $ not (null ls)

-- Function to create a new consumer group for a stream
xGroupCreate ::
(HedisFlow m env) =>
Text -> -- Stream key
Text -> -- Group name
Text -> -- Start ID
m ()
xGroupCreate key groupName startId = withLogTag "Redis" $ do
migrating <- asks (.hedisMigrationStage)
when migrating $ do
res <- withTimeRedis "RedisStandalone" "xGroupCreate" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xgroupCreate prefKey (cs groupName) (cs startId))
whenLeft res (withLogTag "STANDALONE" . logTagInfo "FAILED_TO_xGroupCreate" . show)
res <- withTimeRedis "RedisCluster" "xGroupCreate" $ try @_ @SomeException (runWithPrefix_ key $ \prefKey -> Hedis.xgroupCreate prefKey (cs groupName) (cs startId))
whenLeft res (withLogTag "CLUSTER" . logTagInfo "FAILED_TO_xGroupCreate" . show)

extractKeyValuePairs :: [StreamsRecord] -> [(Text, Text)]
extractKeyValuePairs = concatMap (\(StreamsRecord _ keyVals) -> map (\(k, v) -> (cs k, cs v)) keyVals)

extractRecordIds :: [StreamsRecord] -> [BS.ByteString]
extractRecordIds = map (\(StreamsRecord recordId _) -> recordId)

xReadGroup ::
(HedisFlow m env) =>
Text -> -- group name
Text -> -- consumer name
[(Text, Text)] -> -- (stream, id) pairs
m (Maybe [XReadResponse])
xReadGroup groupName consumerName pairsList = do
let bsPairsList = map (\(stream, id) -> (cs stream, cs id)) pairsList
let mbKeyVal = listToMaybe bsPairsList
case mbKeyVal of
Just keyVal -> do
eitherMaybeBS <- withTimeRedis "RedisCluster" "XReadGroup" $ try @_ @SomeException (runWithPrefix (cs $ fst keyVal) $ \_ -> Hedis.xreadGroup (cs groupName) (cs consumerName) bsPairsList)
mbRes <-
case eitherMaybeBS of
Left err -> logTagInfo "ERROR_WHILE_GET_XReadGroup" (show err) $> Nothing
Right maybeBS -> pure maybeBS
case mbRes of
Just res -> return $ Just (map convertFromHedisResponse res)
Nothing -> pure Nothing
Nothing -> pure Nothing

xAdd :: (HedisFlow m env) => Text -> Text -> [(BS.ByteString, BS.ByteString)] -> m BS.ByteString
xAdd key entryId fieldValues = withLogTag "Redis" $ do
migrating <- asks (.hedisMigrationStage)
when migrating $ do
res <- withTimeRedis "RedisStandalone" "xadd" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xadd prefKey (cs entryId) fieldValues)
whenLeft res (withLogTag "STANDALONE" . logTagInfo "FAILED_TO_xadd" . show)
res <- withTimeRedis "RedisCluster" "xadd" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.xadd prefKey (cs entryId) fieldValues)
case res of
Left err -> do
withLogTag "CLUSTER" $ logTagInfo "xadd" $ show err
pure ""
Right items -> pure items

zRangeByScore :: (HedisFlow m env) => Text -> Double -> Double -> m [BS.ByteString]
zRangeByScore key start end = withLogTag "Redis" $ do
migrating <- asks (.hedisMigrationStage)
when migrating $ do
res <- withTimeRedis "RedisStandalone" "zRangeByScore" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zrangebyscore prefKey start end)
whenLeft res (withLogTag "STANDALONE" . logTagInfo "FAILED_TO_ZRANGEBYSCORE" . show)
res <- withTimeRedis "RedisCluster" "zRangeByScore" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.zrangebyscore prefKey start end)
case res of
Left err -> do
withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZRANGEBYSCORE" $ show err
pure [] -- Return an empty list if there was an error
Right items -> pure items

zRemRangeByScore :: (HedisFlow m env) => Text -> Double -> Double -> m Integer
zRemRangeByScore key start end = withLogTag "Redis" $ do
migrating <- asks (.hedisMigrationStage)
when migrating $ do
res <- withTimeRedis "RedisStandalone" "zRemRangeByScore" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zremrangebyscore prefKey start end)
case res of
Left err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_ZREMRANGEBYSCORE" $ show err
Right items -> pure items
res <- withTimeRedis "RedisCluster" "zRemRangeByScore" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.zremrangebyscore prefKey start end)
case res of
Left err -> do
withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZREMRANGEBYSCORE" $ show err
pure (-1) -- Return -1 if there was an error
Right items -> pure items

xDel :: (HedisFlow m env) => Text -> [BS.ByteString] -> m Integer
xDel key entryId = withLogTag "Redis" $ do
migrating <- asks (.hedisMigrationStage)
when migrating $ do
res <- withTimeRedis "RedisStandalone" "xDel" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xdel prefKey entryId)
case res of
Left err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_XDEL" $ show err
Right items -> pure items
res <- withTimeRedis "RedisCluster" "xDel" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.xdel prefKey entryId)
case res of
Left err -> do
withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_XDEL" $ show err
pure (-1) -- Return -1 if there was an error
Right items -> pure items

xAck :: (HedisFlow m env) => Text -> Text -> [BS.ByteString] -> m Integer
xAck key groupName entryId = withLogTag "Redis" $ do
migrating <- asks (.hedisMigrationStage)
when migrating $ do
res <- withTimeRedis "RedisStandalone" "xAck" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xack prefKey (cs groupName) entryId)
case res of
Left err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_xAck" $ show err
Right items -> pure items
res <- withTimeRedis "RedisCluster" "xAck" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.xack prefKey (cs groupName) entryId)
case res of
Left err -> do
withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_xAck" $ show err
pure (-1) -- Return -1 if there was an error
Right items -> pure items
61 changes: 61 additions & 0 deletions lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,64 @@ addUrlCallFailuresImplementation' cmContainers url version = do
urlCallRetriesMetric
(showBaseUrlText url, version.getDeploymentVersion)
P.incCounter

incrementSortedSetCounterImplementation ::
( HasCoreMetrics r,
L.MonadFlow m,
MonadReader r m
) =>
Text ->
Int ->
m ()
incrementSortedSetCounterImplementation context scheduledSecond = do
cmContainer <- asks (.coreMetrics)
version <- asks (.version)
incrementSortedSetCounterImplementation' cmContainer context scheduledSecond version

incrementSortedSetCounterImplementation' :: L.MonadFlow m => CoreMetricsContainer -> Text -> Int -> DeploymentVersion -> m ()
incrementSortedSetCounterImplementation' cmContainers context scheduledSecond version = do
let sortedSetMetric = cmContainers.sortedSetCounter
L.runIO $
P.withLabel
sortedSetMetric
(context, show scheduledSecond, version.getDeploymentVersion)
P.incCounter

incrementStreamCounterImplementation ::
( HasCoreMetrics r,
L.MonadFlow m,
MonadReader r m
) =>
Text ->
Int ->
m ()
incrementStreamCounterImplementation context executedseconds = do
cmContainer <- asks (.coreMetrics)
version <- asks (.version)
incrementStreamCounterImplementation' cmContainer context executedseconds version

incrementStreamCounterImplementation' :: L.MonadFlow m => CoreMetricsContainer -> Text -> Int -> DeploymentVersion -> m ()
incrementStreamCounterImplementation' cmContainers context executedseconds version = do
let sortedSetMetric = cmContainers.sortedSetCounter
L.runIO $
P.withLabel
sortedSetMetric
(context, show executedseconds, version.getDeploymentVersion)
P.incCounter

addGenericLatencyImplementation ::
( HasCoreMetrics r,
L.MonadFlow m,
MonadReader r m
) =>
Text ->
NominalDiffTime ->
m ()
addGenericLatencyImplementation operation latency = do
cmContainer <- asks (.coreMetrics)
version <- asks (.version)
L.runIO $
P.withLabel
cmContainer.genericLatency
(operation, version.getDeploymentVersion)
(`P.observe` (fromIntegral $ div (fromEnum . nominalDiffTimeToSeconds $ latency) 1000000000000))
48 changes: 41 additions & 7 deletions lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ type URLCallRetriesMetric = P.Vector P.Label3 P.Counter

type URLCallRetryFailuresMetric = P.Vector P.Label2 P.Counter

type SortedSetMetric = P.Vector P.Label3 P.Counter

type StreamMetric = P.Vector P.Label3 P.Counter

type GenericLatencyMetric = P.Vector P.Label2 P.Histogram

type HasCoreMetrics r =
( HasField "coreMetrics" r CoreMetricsContainer,
HasField "version" r DeploymentVersion
Expand All @@ -46,32 +52,36 @@ type HasCoreMetrics r =
newtype DeploymentVersion = DeploymentVersion {getDeploymentVersion :: Text}

class CoreMetrics m where
addRequestLatency ::
Text ->
Text ->
Milliseconds ->
Either ClientError a ->
m ()
addRequestLatency :: Text -> Text -> Milliseconds -> Either ClientError a -> m ()
addDatastoreLatency :: Text -> Text -> NominalDiffTime -> m ()
incrementErrorCounter :: Text -> SomeException -> m ()
addUrlCallRetries :: BaseUrl -> Int -> m ()
addUrlCallRetryFailures :: BaseUrl -> m ()
incrementSortedSetCounter :: Text -> Int -> m ()
incrementStreamCounter :: Text -> Int -> m ()
addGenericLatency :: Text -> NominalDiffTime -> m ()

data CoreMetricsContainer = CoreMetricsContainer
{ requestLatency :: RequestLatencyMetric,
datastoresLatency :: DatastoresLatencyMetric,
genericLatency :: GenericLatencyMetric,
errorCounter :: ErrorCounterMetric,
urlCallRetries :: URLCallRetriesMetric,
urlCallRetryFailures :: URLCallRetryFailuresMetric
urlCallRetryFailures :: URLCallRetryFailuresMetric,
sortedSetCounter :: SortedSetMetric,
streamCounter :: StreamMetric
}

registerCoreMetricsContainer :: IO CoreMetricsContainer
registerCoreMetricsContainer = do
requestLatency <- registerRequestLatencyMetric
datastoresLatency <- registerDatastoresLatencyMetrics
genericLatency <- registerGenericLatencyMetrics
errorCounter <- registerErrorCounterMetric
urlCallRetries <- registerURLCallRetriesMetric
urlCallRetryFailures <- registerURLCallRetryFailuresMetric
sortedSetCounter <- registerSortedSetMetric
streamCounter <- registerStreamCounter

return CoreMetricsContainer {..}

Expand Down Expand Up @@ -114,3 +124,27 @@ registerURLCallRetryFailuresMetric =
P.counter info
where
info = P.Info "url_call_retry_failures_counter" ""

registerSortedSetMetric :: IO SortedSetMetric
registerSortedSetMetric =
P.register $
P.vector ("job_type", "scheduled_second", "version") $
P.counter info
where
info = P.Info "sortedset_scheduled_jobs_counter" ""

registerStreamCounter :: IO StreamMetric
registerStreamCounter =
P.register $
P.vector ("job_type", "executed_seconds", "version") $
P.counter info
where
info = P.Info "stream_jobs_counter" ""

registerGenericLatencyMetrics :: IO GenericLatencyMetric
registerGenericLatencyMetrics =
P.register $
P.vector ("operation", "version") $
P.histogram info P.defaultBuckets
where
info = P.Info "producer_operation_duration" ""
3 changes: 3 additions & 0 deletions lib/mobility-core/src/Kernel/Types/Flow.hs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ instance Metrics.HasCoreMetrics r => Metrics.CoreMetrics (FlowR r) where
incrementErrorCounter = Metrics.incrementErrorCounterImplementation
addUrlCallRetries = Metrics.addUrlCallRetriesImplementation
addUrlCallRetryFailures = Metrics.addUrlCallFailuresImplementation
incrementSortedSetCounter = Metrics.incrementSortedSetCounterImplementation
incrementStreamCounter = Metrics.incrementStreamCounterImplementation
addGenericLatency = Metrics.addGenericLatencyImplementation

instance MonadMonitor (FlowR r) where
doIO = liftIO
Expand Down
12 changes: 12 additions & 0 deletions lib/mobility-core/src/Kernel/Utils/Time.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ where
import qualified Data.Text as T
import Data.Time hiding (getCurrentTime, nominalDiffTimeToSeconds, secondsToNominalDiffTime)
import qualified Data.Time as Time hiding (secondsToNominalDiffTime)
import Data.Time.Clock.System
import EulerHS.Prelude
import Kernel.Types.Time
import Kernel.Utils.Logging
Expand Down Expand Up @@ -113,3 +114,14 @@ compareTimeWithInterval dt time1 time2
| abs (diffUTCTime time1 time2) < abs dt = EQ
| time1 < time2 = LT
| otherwise = GT -- time1 > time2

utcToMilliseconds :: UTCTime -> Double
utcToMilliseconds utcTime = fromIntegral $ div (systemSeconds systemTime * 1000000000 + fromIntegral (systemNanoseconds systemTime)) 1000000
where
systemTime = utcToSystemTime utcTime

getCurrentTimestamp :: (Monad m, MonadTime m) => m Double
getCurrentTimestamp = do
now <- getCurrentTime
let systemTime = utcToSystemTime now
pure . fromIntegral $ div (systemSeconds systemTime * 1000000000 + fromIntegral (systemNanoseconds systemTime)) 1000000
Loading

0 comments on commit 3f3f5d2

Please sign in to comment.