Skip to content

Commit

Permalink
shared-kernel/fix:#187/Added Redis Sorted Set Functions and Redis Str…
Browse files Browse the repository at this point in the history
…eams Functions for Job Scheduling
  • Loading branch information
suraj.kumar1 committed Aug 1, 2023
1 parent acd37ce commit 62e5236
Showing 1 changed file with 54 additions and 1 deletion.
55 changes: 54 additions & 1 deletion lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import qualified Data.Aeson as Ae
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.String.Conversions
import Data.Text hiding (map, null)
import Data.Text hiding (concatMap, map, null)
import qualified Data.Text as Text
import qualified Data.Text.Encoding as DE
import Database.Redis (Queued, Redis, RedisTx, Reply, TxResult (..))
Expand Down Expand Up @@ -429,3 +429,56 @@ zRevRank key member = do

zCard :: (HedisFlow m env) => Text -> m Integer
zCard key = runWithPrefix key Hedis.zcard

zAdd ::
(ToJSON member, HedisFlow m env) =>
Text ->
[(Double, member)] ->
m ()
zAdd key members = withLogTag "Redis" $ do
migrating <- asks (.hedisMigrationStage)
if migrating
then do
res <- withTimeRedis "RedisStandalone" "zAdd" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zadd prefKey $ map (\(score, member) -> (score, BSL.toStrict $ Ae.encode member)) members)
whenLeft res (\err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_ZADD" $ show err)
else pure ()
res <- withTimeRedis "RedisCluster" "zAdd" $ try @_ @SomeException (runWithPrefix_ key $ \prefKey -> Hedis.zadd prefKey $ map (\(score, member) -> (score, BSL.toStrict $ Ae.encode member)) members)
whenLeft res (\err -> withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZADD" $ show err)

xinfoGroups ::
(HedisFlow m env) =>
Text -> -- Stream key
m Bool
xinfoGroups key = do
ls <- runWithPrefix key $ \prefKey -> Hedis.xinfoGroups prefKey
return $ not (null ls)

-- Function to create a new consumer group for a stream
xgroupCreate ::
(HedisFlow m env) =>
Text -> -- Stream key
Text -> -- Group name
Text -> -- Start ID
m ()
xgroupCreate key groupName startId = do
void $ runWithPrefix key $ \prefKey -> Hedis.xgroupCreate prefKey (cs groupName) (cs startId)

xreadGroup ::
(HedisFlow m env) =>
Text -> -- group name
Text -> -- consumer name
[(Text, Text)] -> -- (stream, id) pairs
m [(Text, Text)]
xreadGroup groupName consumerName xs = do
let ls = map (\(stream, id) -> (DE.encodeUtf8 stream, DE.encodeUtf8 id)) xs
let var = listToMaybe ls
case var of
Just keyVal -> do
res <- runWithPrefix (DE.decodeUtf8 $ fst keyVal) $ \_ -> Hedis.xreadGroup (DE.encodeUtf8 groupName) (DE.encodeUtf8 consumerName) ls
case res of
Just messages -> return $ extractKeyValuePairs (concatMap Hedis.records messages)
Nothing -> pure []
Nothing -> pure []

extractKeyValuePairs :: [Hedis.StreamsRecord] -> [(Text, Text)]
extractKeyValuePairs = concatMap (\(Hedis.StreamsRecord _ keyVals) -> map (\(k, v) -> (DE.decodeUtf8 k, DE.decodeUtf8 v)) keyVals)

0 comments on commit 62e5236

Please sign in to comment.