diff --git a/lib/mobility-core/mobility-core.cabal b/lib/mobility-core/mobility-core.cabal index 02ea99316..faf8d6fc9 100644 --- a/lib/mobility-core/mobility-core.cabal +++ b/lib/mobility-core/mobility-core.cabal @@ -363,6 +363,7 @@ library , cereal , clickhouse-haskell , clock + , concurrency , containers , cryptonite , data-default-class @@ -511,6 +512,7 @@ test-suite mobility-core-tests , cereal , clickhouse-haskell , clock + , concurrency , containers , cryptonite , data-default-class diff --git a/lib/mobility-core/package.yaml b/lib/mobility-core/package.yaml index b9a4884f3..94f2339ff 100644 --- a/lib/mobility-core/package.yaml +++ b/lib/mobility-core/package.yaml @@ -143,6 +143,7 @@ dependencies: - cereal - beam-mysql - sequelize + - concurrency - casing ghc-options: diff --git a/lib/mobility-core/src/Kernel/Beam/Functions.hs b/lib/mobility-core/src/Kernel/Beam/Functions.hs index 56204793d..8879433f2 100644 --- a/lib/mobility-core/src/Kernel/Beam/Functions.hs +++ b/lib/mobility-core/src/Kernel/Beam/Functions.hs @@ -36,6 +36,19 @@ class where toTType' :: a -> t +-- Below class FromTType'' and ToTType'' are only to be used with scheduler +class + FromTType'' t a + | a -> t + where + fromTType'' :: (MonadThrow m, Log m, L.MonadFlow m) => t -> m (Maybe a) + +class + ToTType'' t a + | a -> t + where + toTType'' :: a -> t + meshConfig :: MeshConfig meshConfig = MeshConfig @@ -76,9 +89,10 @@ setMeshConfig modelName mSchema meshConfig' = do else pure $ meshConfig' {meshEnabled = False, kvHardKilled = modelName `notElem` enableKVForRead, ecRedisDBStream = redisStream} else pure $ meshConfig' {meshEnabled = False, kvHardKilled = modelName `notElem` enableKVForRead, ecRedisDBStream = redisStream} -getMasterDBConfig :: (HasCallStack, L.MonadFlow m) => m (DBConfig Pg) +getMasterDBConfig :: (Log m, HasCallStack, L.MonadFlow m) => m (DBConfig Pg) getMasterDBConfig = do dbConf <- L.getOption KBT.PsqlDbCfg + logDebug $ "getMasterDBConfig :::" <> show dbConf case dbConf of Just dbCnf' -> pure dbCnf' Nothing -> L.throwException $ InternalError "masterDb Config not found" @@ -90,7 +104,7 @@ getLocDbConfig = do Just dbCnf' -> pure dbCnf' Nothing -> L.throwException $ InternalError "LocationDb Config not found" -getMasterBeamConfig :: (HasCallStack, L.MonadFlow m) => m (SqlConn Pg) +getMasterBeamConfig :: (Log m, HasCallStack, L.MonadFlow m) => m (SqlConn Pg) getMasterBeamConfig = do inReplica <- L.getOptionLocal ReplicaEnabled dbConf <- maybe getMasterDBConfig (\inReplica' -> if inReplica' then getReplicaDbConfig else getMasterDBConfig) inReplica @@ -152,6 +166,36 @@ findOneWithKV where' = do Right Nothing -> pure Nothing Left err -> throwError $ InternalError $ show err +findOneWithKVScheduler :: + forall table m a. + ( HasCallStack, + FromTType'' (table Identity) a, + BeamRuntime Postgres Pg, + B.HasQBuilder Postgres, + BeamRunner Pg, + Model Postgres table, + MeshMeta Postgres table, + KVConnector (table Identity), + FromJSON (table Identity), + ToJSON (table Identity), + Serialize.Serialize (table Identity), + L.MonadFlow m, + Show (table Identity), + Log m, + MonadThrow m + ) => + Where Postgres table -> + m (Maybe a) +findOneWithKVScheduler where' = do + updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig + inReplica <- L.getOptionLocal ReplicaEnabled + dbConf' <- maybe getMasterDBConfig (\inReplica' -> if inReplica' then getReplicaDbConfig else getMasterDBConfig) inReplica + result <- KV.findWithKVConnector dbConf' updatedMeshConfig where' + case result of + Right (Just res) -> fromTType'' res + Right Nothing -> pure Nothing + Left err -> throwError $ InternalError $ show err + findAllWithKV :: forall table m a. ( HasCallStack, @@ -181,6 +225,37 @@ findAllWithKV where' = do pure $ catMaybes res' Left err -> throwError $ InternalError $ show err +findAllWithKVScheduler :: + forall table m a. + ( HasCallStack, + FromTType'' (table Identity) a, + BeamRuntime Postgres Pg, + B.HasQBuilder Postgres, + BeamRunner Pg, + Model Postgres table, + MeshMeta Postgres table, + KVConnector (table Identity), + FromJSON (table Identity), + ToJSON (table Identity), + Serialize.Serialize (table Identity), + L.MonadFlow m, + Show (table Identity), + Log m, + MonadThrow m + ) => + Where Postgres table -> + m [a] +findAllWithKVScheduler where' = do + updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig + inReplica <- L.getOptionLocal ReplicaEnabled + dbConf' <- maybe getMasterDBConfig (\inReplica' -> if inReplica' then getReplicaDbConfig else getMasterDBConfig) inReplica + result <- KV.findAllWithKVConnector dbConf' updatedMeshConfig where' + case result of + Right res -> do + res' <- mapM fromTType'' res + pure $ catMaybes res' + Left err -> throwError $ InternalError $ show err + findAllWithOptionsKV :: forall table m a. ( HasCallStack, @@ -213,6 +288,40 @@ findAllWithOptionsKV where' orderBy mbLimit mbOffset = do pure $ catMaybes res' Left err -> throwError $ InternalError $ show err +findAllWithOptionsKVScheduler :: + forall table m a. + ( HasCallStack, + FromTType'' (table Identity) a, + BeamRuntime Postgres Pg, + B.HasQBuilder Postgres, + BeamRunner Pg, + Model Postgres table, + MeshMeta Postgres table, + KVConnector (table Identity), + FromJSON (table Identity), + ToJSON (table Identity), + Serialize.Serialize (table Identity), + L.MonadFlow m, + Show (table Identity), + Log m, + MonadThrow m + ) => + Where Postgres table -> + OrderBy table -> + Maybe Int -> + Maybe Int -> + m [a] +findAllWithOptionsKVScheduler where' orderBy mbLimit mbOffset = do + updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig + inReplica <- L.getOptionLocal ReplicaEnabled + dbConf' <- maybe getMasterDBConfig (\inReplica' -> if inReplica' then getReplicaDbConfig else getMasterDBConfig) inReplica + result <- KV.findAllWithOptionsKVConnector dbConf' updatedMeshConfig where' orderBy mbLimit mbOffset + case result of + Right res -> do + res' <- mapM fromTType'' res + pure $ catMaybes res' + Left err -> throwError $ InternalError $ show err + findOneWithDb :: forall table m a. ( HasCallStack, @@ -335,6 +444,39 @@ updateWithKV setClause whereClause = do pure () Left err -> throwError $ InternalError $ show err +updateWithKVScheduler :: + forall table m. + ( HasCallStack, + BeamRuntime Postgres Pg, + SqlReturning Pg Postgres, + B.HasQBuilder Postgres, + BeamRunner Pg, + Model Postgres table, + MeshMeta Postgres table, + KVConnector (table Identity), + FromJSON (table Identity), + ToJSON (table Identity), + Serialize.Serialize (table Identity), + L.MonadFlow m, + Show (table Identity), + Log m, + MonadThrow m + ) => + [Set Postgres table] -> + Where Postgres table -> + m () +updateWithKVScheduler setClause whereClause = do + updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig + dbConf <- getMasterDBConfig + res <- KV.updateAllWithKVConnector dbConf updatedMeshConfig setClause whereClause + case res of + Right res' -> do + if updatedMeshConfig.meshEnabled && not updatedMeshConfig.kvHardKilled + then logDebug $ "Updated rows KV: " <> show res' + else logDebug $ "Updated rows DB: " <> show res' + pure () + Left err -> throwError $ InternalError $ show err + updateOneWithKV :: forall table m. ( HasCallStack, @@ -398,6 +540,40 @@ createWithKV a = do pure () Left err -> throwError $ InternalError $ show err +createWithKVScheduler :: + forall table m a. + ( HasCallStack, + ToTType'' (table Identity) a, + SqlReturning Pg Postgres, + BeamRuntime Postgres Pg, + B.HasQBuilder Postgres, + BeamRunner Pg, + Model Postgres table, + MeshMeta Postgres table, + KVConnector (table Identity), + FromJSON (table Identity), + ToJSON (table Identity), + Serialize.Serialize (table Identity), + L.MonadFlow m, + Show (table Identity), + Log m, + MonadThrow m + ) => + a -> + m () +createWithKVScheduler a = do + let tType = toTType'' a + updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig + dbConf' <- getMasterDBConfig + result <- KV.createWoReturingKVConnector dbConf' updatedMeshConfig tType + case result of + Right _ -> do + if updatedMeshConfig.meshEnabled && not updatedMeshConfig.kvHardKilled + then logDebug $ "Created row in KV: " <> show tType + else logDebug $ "Created row in DB: " <> show tType + pure () + Left err -> throwError $ InternalError $ show err + deleteWithKV :: forall be table beM m. ( HasCallStack, diff --git a/lib/mobility-core/src/Kernel/Mock/App.hs b/lib/mobility-core/src/Kernel/Mock/App.hs index 8bdf2f48a..ae876f668 100644 --- a/lib/mobility-core/src/Kernel/Mock/App.hs +++ b/lib/mobility-core/src/Kernel/Mock/App.hs @@ -14,11 +14,13 @@ {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE TypeApplications #-} +{-# OPTIONS_GHC -Wwarn=missing-methods #-} module Kernel.Mock.App where import qualified Control.Monad.Catch as C import Control.Monad.IO.Unlift +import qualified EulerHS.Language as L import Kernel.Tools.Metrics.CoreMetrics import Kernel.Types.Common import Kernel.Utils.IOLogging @@ -45,9 +47,68 @@ run _ server env = serve proxyApi $ hoistServer proxyApi f (healthCheckServer :< Right res -> pure res newtype MockM e a = MockM {runMockM :: ReaderT e IO a} - deriving newtype (Functor, Applicative, Monad, MonadReader e, MonadIO, MonadUnliftIO, C.MonadThrow, C.MonadCatch, C.MonadMask) - --- TODO: think about renaming this type and moving it to the common core + deriving newtype (Functor, Applicative, Monad, MonadReader e, MonadUnliftIO, C.MonadThrow, C.MonadCatch, C.MonadMask) + +instance L.MonadFlow (MockM r) where + {-# INLINEABLE callServantAPI #-} + callServantAPI mbMgrSel url cl = L.callServantAPI mbMgrSel url cl + {-# INLINEABLE evalLogger' #-} + evalLogger' logAct = L.evalLogger' logAct + {-# INLINEABLE runIO' #-} + runIO' descr ioAct = L.runIO' descr ioAct + {-# INLINEABLE getOption #-} + getOption k = L.getOption k + {-# INLINEABLE setOption #-} + setOption k v = L.setOption k v + {-# INLINEABLE getOptionLocal #-} + getOptionLocal k = L.getOptionLocal k + {-# INLINEABLE setOptionLocal #-} + setOptionLocal k v = L.setOptionLocal k v + {-# INLINEABLE delOption #-} + delOption k = L.delOption k + {-# INLINEABLE generateGUID #-} + generateGUID = generateGUID + {-# INLINEABLE runSysCmd #-} + runSysCmd cmd = L.runSysCmd cmd + {-# INLINEABLE initSqlDBConnection #-} + initSqlDBConnection cfg = L.initSqlDBConnection cfg + {-# INLINEABLE deinitSqlDBConnection #-} + deinitSqlDBConnection conn = L.deinitSqlDBConnection conn + {-# INLINEABLE getSqlDBConnection #-} + getSqlDBConnection cfg = L.getSqlDBConnection cfg + {-# INLINEABLE initKVDBConnection #-} + initKVDBConnection cfg = L.initKVDBConnection cfg + {-# INLINEABLE deinitKVDBConnection #-} + deinitKVDBConnection conn = L.deinitKVDBConnection conn + {-# INLINEABLE getKVDBConnection #-} + getKVDBConnection cfg = L.getKVDBConnection cfg + {-# INLINEABLE runDB #-} + runDB conn dbAct = L.runDB conn dbAct + {-# INLINEABLE runTransaction #-} + runTransaction conn dbAct = L.runTransaction conn dbAct + {-# INLINEABLE await #-} + await mbMcs awaitable = L.await mbMcs awaitable + {-# INLINEABLE runSafeFlow #-} + runSafeFlow flow = L.runSafeFlow flow + {-# INLINEABLE runKVDB #-} + runKVDB cName act = L.runKVDB cName act + {-# INLINEABLE runPubSub #-} + runPubSub act = L.runPubSub act + {-# INLINEABLE publish #-} + publish channel payload = L.publish channel payload + {-# INLINEABLE subscribe #-} + subscribe channels cb = L.subscribe channels cb + {-# INLINEABLE psubscribe #-} + psubscribe channels cb = L.psubscribe channels cb + {-# INLINEABLE withModifiedRuntime #-} + withModifiedRuntime f flow = L.withModifiedRuntime f flow + {-# INLINEABLE setLoggerContext #-} + setLoggerContext k v = L.setLoggerContext k v + {-# INLINEABLE getLoggerContext #-} + getLoggerContext k = L.getLoggerContext k + +instance MonadIO (MockM r) where + liftIO = L.runIO runMock :: e -> MockM e a -> IO a runMock env action = runReaderT (runMockM action) env @@ -58,6 +119,9 @@ instance CoreMetrics (MockM e) where incrementErrorCounter _ _ = return () addUrlCallRetries _ _ = return () addUrlCallRetryFailures _ = return () + incrementSortedSetCounter _ = return () + incrementStreamCounter _ = return () + addGenericLatency _ _ = return () instance MonadTime (MockM e) where getCurrentTime = liftIO getCurrentTime diff --git a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs index a9c6fd36f..f2639c199 100644 --- a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs +++ b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs @@ -18,9 +18,8 @@ import qualified Data.Aeson as Ae import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.String.Conversions -import Data.Text hiding (map, null) +import Data.Text hiding (concatMap, map, null) import qualified Data.Text as Text -import qualified Data.Text.Encoding as DE import Database.Redis (Queued, Redis, RedisTx, Reply, TxResult (..)) import qualified Database.Redis as Hedis import EulerHS.Prelude (whenLeft) @@ -34,6 +33,32 @@ import Kernel.Utils.Logging type ExpirationTime = Int +data XReadResponse = XReadResponse + { stream :: BS.ByteString, + records :: [StreamsRecord] + } + deriving (Show) + +data StreamsRecord = StreamsRecord + { recordId :: BS.ByteString, + keyValues :: [(BS.ByteString, BS.ByteString)] + } + deriving (Show) + +convertFromHedisResponse :: Hedis.XReadResponse -> XReadResponse +convertFromHedisResponse hedisResponse = + XReadResponse + { stream = Hedis.stream hedisResponse, + records = map convertFromHedisRecord (Hedis.records hedisResponse) + } + +convertFromHedisRecord :: Hedis.StreamsRecord -> StreamsRecord +convertFromHedisRecord hedisRecord = + StreamsRecord + { recordId = Hedis.recordId hedisRecord, + keyValues = Hedis.keyValues hedisRecord + } + runHedis :: HedisFlow m env => Redis (Either Reply a) -> m a runHedis action = do @@ -153,7 +178,7 @@ getImpl decodeResult key = withLogTag "Redis" $ do Just res' -> decodeResult res' get :: (FromJSON a, HedisFlow m env) => Text -> m (Maybe a) -get key = getImpl decodeResult key +get = getImpl decodeResult where decodeResult bs = Error.fromMaybeM (HedisDecodeError $ cs bs) $ Ae.decode $ BSL.fromStrict bs @@ -415,7 +440,7 @@ zrevrangeWithscores key start stop = do pure $ map (\(k, score) -> (cs' k, score)) res where cs' :: BS.ByteString -> Text - cs' = DE.decodeUtf8 + cs' = cs zScore :: (FromJSON Double, HedisFlow m env) => Text -> Text -> m (Maybe Double) zScore key member = do @@ -429,3 +454,143 @@ zRevRank key member = do zCard :: (HedisFlow m env) => Text -> m Integer zCard key = runWithPrefix key Hedis.zcard + +zAdd :: + (ToJSON member, HedisFlow m env) => + Text -> + [(Double, member)] -> + m () +zAdd key members = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + if migrating + then do + res <- withTimeRedis "RedisStandalone" "zAdd" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zadd prefKey $ map (\(score, member) -> (score, BSL.toStrict $ Ae.encode member)) members) + whenLeft res (\err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_ZADD" $ show err) + else pure () + res <- withTimeRedis "RedisCluster" "zAdd" $ try @_ @SomeException (runWithPrefix_ key $ \prefKey -> Hedis.zadd prefKey $ map (\(score, member) -> (score, BSL.toStrict $ Ae.encode member)) members) + whenLeft res (\err -> withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZADD" $ show err) + +xInfoGroups :: + (HedisFlow m env) => + Text -> -- Stream key + m Bool +xInfoGroups key = do + eitherMaybeBS <- withTimeRedis "RedisCluster" "xInfoGroups" $ try @_ @SomeException (runWithPrefix key Hedis.xinfoGroups) + ls <- + case eitherMaybeBS of + Left err -> logTagInfo "ERROR_WHILE_GET_XInfoGroups" (show err) $> [] + Right maybeBS -> pure maybeBS + return $ not (null ls) + +-- Function to create a new consumer group for a stream +xGroupCreate :: + (HedisFlow m env) => + Text -> -- Stream key + Text -> -- Group name + Text -> -- Start ID + m () +xGroupCreate key groupName startId = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "xGroupCreate" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xgroupCreate prefKey (cs groupName) (cs startId)) + whenLeft res (withLogTag "STANDALONE" . logTagInfo "FAILED_TO_xGroupCreate" . show) + res <- withTimeRedis "RedisCluster" "xGroupCreate" $ try @_ @SomeException (runWithPrefix_ key $ \prefKey -> Hedis.xgroupCreate prefKey (cs groupName) (cs startId)) + whenLeft res (withLogTag "CLUSTER" . logTagInfo "FAILED_TO_xGroupCreate" . show) + +extractKeyValuePairs :: [StreamsRecord] -> [(Text, Text)] +extractKeyValuePairs = concatMap (\(StreamsRecord _ keyVals) -> map (\(k, v) -> (cs k, cs v)) keyVals) + +extractRecordIds :: [StreamsRecord] -> [BS.ByteString] +extractRecordIds = map (\(StreamsRecord recordId _) -> recordId) + +xReadGroup :: + (HedisFlow m env) => + Text -> -- group name + Text -> -- consumer name + [(Text, Text)] -> -- (stream, id) pairs + m (Maybe [XReadResponse]) +xReadGroup groupName consumerName pairsList = do + let bsPairsList = map (\(stream, id) -> (cs stream, cs id)) pairsList + let mbKeyVal = listToMaybe bsPairsList + case mbKeyVal of + Just keyVal -> do + eitherMaybeBS <- withTimeRedis "RedisCluster" "XReadGroup" $ try @_ @SomeException (runWithPrefix (cs $ fst keyVal) $ \_ -> Hedis.xreadGroup (cs groupName) (cs consumerName) bsPairsList) + mbRes <- + case eitherMaybeBS of + Left err -> logTagInfo "ERROR_WHILE_GET_XReadGroup" (show err) $> Nothing + Right maybeBS -> pure maybeBS + case mbRes of + Just res -> return $ Just (map convertFromHedisResponse res) + Nothing -> pure Nothing + Nothing -> pure Nothing + +xAdd :: (HedisFlow m env) => Text -> Text -> [(BS.ByteString, BS.ByteString)] -> m BS.ByteString +xAdd key entryId fieldValues = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "xadd" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xadd prefKey (cs entryId) fieldValues) + whenLeft res (withLogTag "STANDALONE" . logTagInfo "FAILED_TO_xadd" . show) + res <- withTimeRedis "RedisCluster" "xadd" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.xadd prefKey (cs entryId) fieldValues) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "xadd" $ show err + pure "" + Right items -> pure items + +zRangeByScore :: (HedisFlow m env) => Text -> Double -> Double -> m [BS.ByteString] +zRangeByScore key start end = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "zRangeByScore" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zrangebyscore prefKey start end) + whenLeft res (withLogTag "STANDALONE" . logTagInfo "FAILED_TO_ZRANGEBYSCORE" . show) + res <- withTimeRedis "RedisCluster" "zRangeByScore" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.zrangebyscore prefKey start end) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZRANGEBYSCORE" $ show err + pure [] -- Return an empty list if there was an error + Right items -> pure items + +zRemRangeByScore :: (HedisFlow m env) => Text -> Double -> Double -> m Integer +zRemRangeByScore key start end = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "zRemRangeByScore" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zremrangebyscore prefKey start end) + case res of + Left err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_ZREMRANGEBYSCORE" $ show err + Right items -> pure items + res <- withTimeRedis "RedisCluster" "zRemRangeByScore" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.zremrangebyscore prefKey start end) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZREMRANGEBYSCORE" $ show err + pure (-1) -- Return -1 if there was an error + Right items -> pure items + +xDel :: (HedisFlow m env) => Text -> [BS.ByteString] -> m Integer +xDel key entryId = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "xDel" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xdel prefKey entryId) + case res of + Left err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_XDEL" $ show err + Right items -> pure items + res <- withTimeRedis "RedisCluster" "xDel" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.xdel prefKey entryId) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_XDEL" $ show err + pure (-1) -- Return -1 if there was an error + Right items -> pure items + +xAck :: (HedisFlow m env) => Text -> Text -> [BS.ByteString] -> m Integer +xAck key groupName entryId = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "xAck" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xack prefKey (cs groupName) entryId) + case res of + Left err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_xAck" $ show err + Right items -> pure items + res <- withTimeRedis "RedisCluster" "xAck" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.xack prefKey (cs groupName) entryId) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_xAck" $ show err + pure (-1) -- Return -1 if there was an error + Right items -> pure items diff --git a/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs b/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs index fd2f60e10..5e2b411dd 100644 --- a/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs +++ b/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs @@ -104,6 +104,30 @@ addDatastoreLatencyImplementation storeType operation latency = do (storeType, operation, version.getDeploymentVersion) (`P.observe` (fromIntegral $ div (fromEnum . nominalDiffTimeToSeconds $ latency) 1000000000000)) +incrementSortedSetCounterImplementation :: + ( HasCoreMetrics r, + L.MonadFlow m, + MonadReader r m + ) => + Text -> + m () +incrementSortedSetCounterImplementation context = do + cmContainer <- asks (.coreMetrics) + version <- asks (.version) + incrementSortedSetCounterImplementation' cmContainer context version + +incrementStreamCounterImplementation :: + ( HasCoreMetrics r, + L.MonadFlow m, + MonadReader r m + ) => + Text -> + m () +incrementStreamCounterImplementation context = do + cmContainer <- asks (.coreMetrics) + version <- asks (.version) + incrementStreamCounterImplementation' cmContainer context version + addRequestLatencyImplementation' :: L.MonadFlow m => CoreMetricsContainer -> @@ -163,3 +187,38 @@ addUrlCallFailuresImplementation' cmContainers url version = do urlCallRetriesMetric (showBaseUrlText url, version.getDeploymentVersion) P.incCounter + +incrementSortedSetCounterImplementation' :: L.MonadFlow m => CoreMetricsContainer -> Text -> DeploymentVersion -> m () +incrementSortedSetCounterImplementation' cmContainers context version = do + let sortedSetMetric = cmContainers.sortedSetCounter + L.runIO $ + P.withLabel + sortedSetMetric + (context, version.getDeploymentVersion) + P.incCounter + +incrementStreamCounterImplementation' :: L.MonadFlow m => CoreMetricsContainer -> Text -> DeploymentVersion -> m () +incrementStreamCounterImplementation' cmContainers context version = do + let sortedSetMetric = cmContainers.sortedSetCounter + L.runIO $ + P.withLabel + sortedSetMetric + (context, version.getDeploymentVersion) + P.incCounter + +addGenericLatencyImplementation :: + ( HasCoreMetrics r, + L.MonadFlow m, + MonadReader r m + ) => + Text -> + NominalDiffTime -> + m () +addGenericLatencyImplementation operation latency = do + cmContainer <- asks (.coreMetrics) + version <- asks (.version) + L.runIO $ + P.withLabel + cmContainer.genericLatency + (operation, version.getDeploymentVersion) + (`P.observe` (fromIntegral $ div (fromEnum . nominalDiffTimeToSeconds $ latency) 1000000000000)) diff --git a/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs b/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs index 3a15a0a4f..680231693 100644 --- a/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs +++ b/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs @@ -38,6 +38,12 @@ type URLCallRetriesMetric = P.Vector P.Label3 P.Counter type URLCallRetryFailuresMetric = P.Vector P.Label2 P.Counter +type SortedSetMetric = P.Vector P.Label2 P.Counter + +type StreamMetric = P.Vector P.Label2 P.Counter + +type GenericLatencyMetric = P.Vector P.Label2 P.Histogram + type HasCoreMetrics r = ( HasField "coreMetrics" r CoreMetricsContainer, HasField "version" r DeploymentVersion @@ -46,32 +52,36 @@ type HasCoreMetrics r = newtype DeploymentVersion = DeploymentVersion {getDeploymentVersion :: Text} class CoreMetrics m where - addRequestLatency :: - Text -> - Text -> - Milliseconds -> - Either ClientError a -> - m () + addRequestLatency :: Text -> Text -> Milliseconds -> Either ClientError a -> m () addDatastoreLatency :: Text -> Text -> NominalDiffTime -> m () incrementErrorCounter :: Text -> SomeException -> m () addUrlCallRetries :: BaseUrl -> Int -> m () addUrlCallRetryFailures :: BaseUrl -> m () + incrementSortedSetCounter :: Text -> m () + incrementStreamCounter :: Text -> m () + addGenericLatency :: Text -> NominalDiffTime -> m () data CoreMetricsContainer = CoreMetricsContainer { requestLatency :: RequestLatencyMetric, datastoresLatency :: DatastoresLatencyMetric, + genericLatency :: GenericLatencyMetric, errorCounter :: ErrorCounterMetric, urlCallRetries :: URLCallRetriesMetric, - urlCallRetryFailures :: URLCallRetryFailuresMetric + urlCallRetryFailures :: URLCallRetryFailuresMetric, + sortedSetCounter :: SortedSetMetric, + streamCounter :: StreamMetric } registerCoreMetricsContainer :: IO CoreMetricsContainer registerCoreMetricsContainer = do requestLatency <- registerRequestLatencyMetric datastoresLatency <- registerDatastoresLatencyMetrics + genericLatency <- registerGenericLatencyMetrics errorCounter <- registerErrorCounterMetric urlCallRetries <- registerURLCallRetriesMetric urlCallRetryFailures <- registerURLCallRetryFailuresMetric + sortedSetCounter <- registerSortedSetMetric + streamCounter <- registerStreamCounter return CoreMetricsContainer {..} @@ -114,3 +124,27 @@ registerURLCallRetryFailuresMetric = P.counter info where info = P.Info "url_call_retry_failures_counter" "" + +registerSortedSetMetric :: IO SortedSetMetric +registerSortedSetMetric = + P.register $ + P.vector ("job_type", "version") $ + P.counter info + where + info = P.Info "sortedset_scheduled_jobs_counter" "" + +registerStreamCounter :: IO StreamMetric +registerStreamCounter = + P.register $ + P.vector ("job_type", "version") $ + P.counter info + where + info = P.Info "stream_jobs_counter" "" + +registerGenericLatencyMetrics :: IO GenericLatencyMetric +registerGenericLatencyMetrics = + P.register $ + P.vector ("operation", "version") $ + P.histogram info P.defaultBuckets + where + info = P.Info "producer_operation_duration" "" diff --git a/lib/mobility-core/src/Kernel/Types/Flow.hs b/lib/mobility-core/src/Kernel/Types/Flow.hs index d85587b1e..000829b82 100644 --- a/lib/mobility-core/src/Kernel/Types/Flow.hs +++ b/lib/mobility-core/src/Kernel/Types/Flow.hs @@ -18,6 +18,7 @@ module Kernel.Types.Flow (FlowR, runFlowR) where +import Control.Monad.IO.Unlift import qualified EulerHS.Interpreters as I import qualified EulerHS.Language as L import EulerHS.Prelude @@ -190,6 +191,9 @@ instance Metrics.HasCoreMetrics r => Metrics.CoreMetrics (FlowR r) where incrementErrorCounter = Metrics.incrementErrorCounterImplementation addUrlCallRetries = Metrics.addUrlCallRetriesImplementation addUrlCallRetryFailures = Metrics.addUrlCallFailuresImplementation + incrementSortedSetCounter = Metrics.incrementSortedSetCounterImplementation + incrementStreamCounter = Metrics.incrementStreamCounterImplementation + addGenericLatency = Metrics.addGenericLatencyImplementation instance MonadMonitor (FlowR r) where doIO = liftIO diff --git a/lib/mobility-core/src/Kernel/Utils/Time.hs b/lib/mobility-core/src/Kernel/Utils/Time.hs index ab7027abc..fdc051b57 100644 --- a/lib/mobility-core/src/Kernel/Utils/Time.hs +++ b/lib/mobility-core/src/Kernel/Utils/Time.hs @@ -24,6 +24,7 @@ where import qualified Data.Text as T import Data.Time hiding (getCurrentTime, nominalDiffTimeToSeconds, secondsToNominalDiffTime) import qualified Data.Time as Time hiding (secondsToNominalDiffTime) +import Data.Time.Clock.System import EulerHS.Prelude import Kernel.Types.Time import Kernel.Utils.Logging @@ -113,3 +114,14 @@ compareTimeWithInterval dt time1 time2 | abs (diffUTCTime time1 time2) < abs dt = EQ | time1 < time2 = LT | otherwise = GT -- time1 > time2 + +utcToMilliseconds :: UTCTime -> Double +utcToMilliseconds utcTime = fromIntegral $ div (systemSeconds systemTime * 1000000000 + fromIntegral (systemNanoseconds systemTime)) 1000000 + where + systemTime = utcToSystemTime utcTime + +getCurrentTimestamp :: (Monad m, MonadTime m) => m Double +getCurrentTimestamp = do + now <- getCurrentTime + let systemTime = utcToSystemTime now + pure . fromIntegral $ div (systemSeconds systemTime * 1000000000 + fromIntegral (systemNanoseconds systemTime)) 1000000 diff --git a/lib/mobility-core/test/src/APIExceptions.hs b/lib/mobility-core/test/src/APIExceptions.hs index b6a8e4f46..b61617fe9 100644 --- a/lib/mobility-core/test/src/APIExceptions.hs +++ b/lib/mobility-core/test/src/APIExceptions.hs @@ -65,6 +65,9 @@ instance Metrics.CoreMetrics IO where incrementErrorCounter _ _ = return () addUrlCallRetries _ _ = return () addUrlCallRetryFailures _ = return () + incrementSortedSetCounter _ = return () + incrementStreamCounter _ = return () + addGenericLatency _ _ = return () httpExceptionTests :: TestTree httpExceptionTests =