diff --git a/CHANGELOG.md b/CHANGELOG.md index 792f08e..aadc806 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Configurable redis connection (Unix socket or TCP) - Basic database upgrade support in tools/databaseUpgrade - PCSCF state storage in ims_subscriber + - (Experimental) Working horizontal scalability ### Changed @@ -30,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Logtool no longer handles metric processing - Updated config.yaml - Gx CCR-T now flushes PGW / IMS data, depending on Called-Station-Id +- Benchmarked lossless at ~100 diameter requests per second, per hssService. ### Fixed @@ -37,6 +39,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Gx CCA now supports apn inside a plmn based uri - AVP_Preemption_Capability and AVP_Preemption_Vulnerability now presents correctly in all diameter messages - Crash when webhook or geored endpoints enabled and no peers defined + - CPU overutilization on all services ### Removed diff --git a/config.yaml b/config.yaml index 5f01224..a12d857 100644 --- a/config.yaml +++ b/config.yaml @@ -33,9 +33,6 @@ hss: #The maximum time to wait, in seconds, before disconnecting a client when no data is received. client_socket_timeout: 120 - #Enable benchmarking log output for response times - set to False in production. - enable_benchmarking: False - #The maximum time to wait, in seconds, before disconnecting a client when no data is received. client_socket_timeout: 300 @@ -71,6 +68,12 @@ hss: api: page_size: 200 +benchmarking: + # Whether to enable benchmark logging + enabled: True + # How often to report, in seconds. Not all benchmarking supports interval reporting. + reporting_interval: 3600 + eir: imsi_imei_logging: True #Store current IMEI / IMSI pair in backend no_match_response: 2 #Greylist diff --git a/lib/database.py b/lib/database.py index 41f4823..7def195 100755 --- a/lib/database.py +++ b/lib/database.py @@ -864,13 +864,15 @@ def handleGeored(self, jsonData, operation: str="PATCH", asymmetric: bool=False, if len(self.config.get('geored', {}).get('endpoints', [])) > 0: georedDict['body'] = jsonData georedDict['operation'] = operation - self.redisMessaging.sendMessage(queue=f'geored-{uuid.uuid4()}-{time.time_ns()}', message=json.dumps(georedDict), queueExpiry=120) + georedDict['timestamp'] = time.time_ns() + self.redisMessaging.sendMessage(queue=f'geored', message=json.dumps(georedDict), queueExpiry=120) if asymmetric: if len(asymmetricUrls) > 0: georedDict['body'] = jsonData georedDict['operation'] = operation + georedDict['timestamp'] = time.time_ns() georedDict['urls'] = asymmetricUrls - self.redisMessaging.sendMessage(queue=f'asymmetric-geored-{uuid.uuid4()}-{time.time_ns()}', message=json.dumps(georedDict), queueExpiry=120) + self.redisMessaging.sendMessage(queue=f'asymmetric-geored', message=json.dumps(georedDict), queueExpiry=120) return True except Exception as E: @@ -897,7 +899,8 @@ def handleWebhook(self, objectData, operation: str="PATCH"): webhook['body'] = self.Sanitize_Datetime(objectData) webhook['headers'] = webhookHeaders webhook['operation'] = operation - self.redisMessaging.sendMessage(queue=f'webhook-{uuid.uuid4()}-{time.time_ns()}', message=json.dumps(webhook), queueExpiry=120) + webhook['timestamp'] = time.time_ns() + self.redisMessaging.sendMessage(queue=f'webhook', message=json.dumps(webhook), queueExpiry=120) return True def Sanitize_Datetime(self, result): @@ -1580,7 +1583,7 @@ def Update_Serving_MME(self, imsi, serving_mme, serving_mme_realm=None, serving_ self.logTool.log(service='Database', level='debug', message="Updating serving MME & Timestamp", redisClient=self.redisMessaging) result.serving_mme = serving_mme try: - if serving_mme_timestamp is not None and serving_mme_timestamp is not 'None': + if serving_mme_timestamp != None and serving_mme_timestamp != 'None': result.serving_mme_timestamp = datetime.strptime(serving_mme_timestamp, '%Y-%m-%dT%H:%M:%SZ') result.serving_mme_timestamp = result.serving_mme_timestamp.replace(tzinfo=timezone.utc) serving_mme_timestamp_string = result.serving_mme_timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') @@ -1615,8 +1618,8 @@ def Update_Serving_MME(self, imsi, serving_mme, serving_mme_realm=None, serving_ self.handleGeored({ "imsi": str(imsi), "serving_mme": result.serving_mme, - "serving_mme_realm": str(result.serving_mme_realm), - "serving_mme_peer": str(result.serving_mme_peer), + "serving_mme_realm": result.serving_mme_realm, + "serving_mme_peer": result.serving_mme_peer, "serving_mme_timestamp": serving_mme_timestamp_string }) else: @@ -1643,7 +1646,7 @@ def Update_Proxy_CSCF(self, imsi, proxy_cscf, pcscf_realm=None, pcscf_peer=None, result.pcscf = proxy_cscf result.pcscf_active_session = pcscf_active_session try: - if pcscf_timestamp is not None and pcscf_timestamp is not 'None': + if pcscf_timestamp != None and pcscf_timestamp != 'None': result.pcscf_timestamp = datetime.strptime(pcscf_timestamp, '%Y-%m-%dT%H:%M:%SZ') result.pcscf_timestamp = result.pcscf_timestamp.replace(tzinfo=timezone.utc) pcscf_timestamp_string = result.pcscf_timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') @@ -1673,7 +1676,7 @@ def Update_Proxy_CSCF(self, imsi, proxy_cscf, pcscf_realm=None, pcscf_peer=None, if propagate == True: if 'IMS' in self.config['geored']['sync_actions'] and self.config['geored']['enabled'] == True: self.logTool.log(service='Database', level='debug', message="Propagate IMS changes to Geographic PyHSS instances", redisClient=self.redisMessaging) - self.handleGeored({"imsi": str(imsi), "pcscf": result.pcscf, "pcscf_realm": str(result.pcscf_realm), "pcscf_timestamp": pcscf_timestamp_string, "pcscf_peer": str(result.pcscf_peer), "pcscf_active_session": str(pcscf_active_session)}) + self.handleGeored({"imsi": str(imsi), "pcscf": result.pcscf, "pcscf_realm": result.pcscf_realm, "pcscf_timestamp": pcscf_timestamp_string, "pcscf_peer": result.pcscf_peer, "pcscf_active_session": pcscf_active_session}) else: self.logTool.log(service='Database', level='debug', message="Config does not allow sync of IMS events", redisClient=self.redisMessaging) except Exception as E: @@ -1698,7 +1701,7 @@ def Update_Serving_CSCF(self, imsi, serving_cscf, scscf_realm=None, scscf_peer=N serving_cscf = serving_cscf.replace("sip:sip:", "sip:") result.scscf = serving_cscf try: - if scscf_timestamp is not None and scscf_timestamp is not 'None': + if scscf_timestamp != None and scscf_timestamp != 'None': result.scscf_timestamp = datetime.strptime(scscf_timestamp, '%Y-%m-%dT%H:%M:%SZ') result.scscf_timestamp = result.scscf_timestamp.replace(tzinfo=timezone.utc) scscf_timestamp_string = result.scscf_timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') @@ -1727,7 +1730,7 @@ def Update_Serving_CSCF(self, imsi, serving_cscf, scscf_realm=None, scscf_peer=N if propagate == True: if 'IMS' in self.config['geored']['sync_actions'] and self.config['geored']['enabled'] == True: self.logTool.log(service='Database', level='debug', message="Propagate IMS changes to Geographic PyHSS instances", redisClient=self.redisMessaging) - self.handleGeored({"imsi": str(imsi), "scscf": result.scscf, "scscf_realm": str(result.scscf_realm), "scscf_timestamp": scscf_timestamp_string, "scscf_peer": str(result.scscf_peer)}) + self.handleGeored({"imsi": str(imsi), "scscf": result.scscf, "scscf_realm": result.scscf_realm, "scscf_timestamp": scscf_timestamp_string, "scscf_peer": result.scscf_peer}) else: self.logTool.log(service='Database', level='debug', message="Config does not allow sync of IMS events", redisClient=self.redisMessaging) except Exception as E: @@ -1770,7 +1773,7 @@ def Update_Serving_APN(self, imsi, apn, pcrf_session_id, serving_pgw, subscriber self.logTool.log(service='Database', level='debug', message="APN ID is " + str(apn_id), redisClient=self.redisMessaging) try: - if serving_pgw_timestamp is not None and serving_pgw_timestamp is not 'None': + if serving_pgw_timestamp != None and serving_pgw_timestamp != 'None': serving_pgw_timestamp = datetime.strptime(serving_pgw_timestamp, '%Y-%m-%dT%H:%M:%SZ') serving_pgw_timestamp = serving_pgw_timestamp.replace(tzinfo=timezone.utc) serving_pgw_timestamp_string = serving_pgw_timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') @@ -1836,13 +1839,13 @@ def Update_Serving_APN(self, imsi, apn, pcrf_session_id, serving_pgw, subscriber if 'PCRF' in self.config['geored']['sync_actions'] and self.config['geored']['enabled'] == True: self.logTool.log(service='Database', level='debug', message="Propagate PCRF changes to Geographic PyHSS instances", redisClient=self.redisMessaging) self.handleGeored({"imsi": str(imsi), - 'serving_apn' : str(apn), - 'pcrf_session_id': str(pcrf_session_id), - 'serving_pgw': str(serving_pgw), - 'serving_pgw_realm': str(serving_pgw_realm), - 'serving_pgw_peer': str(serving_pgw_peer), + 'serving_apn' : apn, + 'pcrf_session_id': pcrf_session_id, + 'serving_pgw': serving_pgw, + 'serving_pgw_realm': serving_pgw_realm, + 'serving_pgw_peer': serving_pgw_peer, 'serving_pgw_timestamp': serving_pgw_timestamp_string, - 'subscriber_routing': str(subscriber_routing) + 'subscriber_routing': subscriber_routing }) else: self.logTool.log(service='Database', level='debug', message="Config does not allow sync of PCRF events", redisClient=self.redisMessaging) diff --git a/lib/diameter.py b/lib/diameter.py index c592492..4692925 100644 --- a/lib/diameter.py +++ b/lib/diameter.py @@ -603,12 +603,16 @@ def sendDiameterRequest(self, requestType: str, hostname: str, **kwargs) -> str: except Exception as e: continue connectedPeer = self.getPeerByHostname(hostname=hostname) - peerIp = connectedPeer['ipAddress'] - peerPort = connectedPeer['port'] + try: + peerIp = connectedPeer['ipAddress'] + peerPort = connectedPeer['port'] + except Exception as e: + return '' request = diameterApplication["requestMethod"](**kwargs) self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [sendDiameterRequest] [{requestType}] Successfully generated request: {request}", redisClient=self.redisMessaging) - outboundQueue = f"diameter-outbound-{peerIp}-{peerPort}-{time.time_ns()}" - outboundMessage = json.dumps({'diameter-outbound': request}) + outboundQueue = f"diameter-outbound-{peerIp}-{peerPort}" + sendTime = time.time_ns() + outboundMessage = json.dumps({"diameter-outbound": request, "inbound-received-timestamp": sendTime}) self.redisMessaging.sendMessage(queue=outboundQueue, message=outboundMessage, queueExpiry=self.diameterRequestTimeout) self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [sendDiameterRequest] [{requestType}] Queueing for host: {hostname} on {peerIp}-{peerPort}", redisClient=self.redisMessaging) return request @@ -632,12 +636,16 @@ def broadcastDiameterRequest(self, requestType: str, peerType: str, **kwargs) -> continue connectedPeerList = self.getConnectedPeersByType(peerType=peerType) for connectedPeer in connectedPeerList: - peerIp = connectedPeer['ipAddress'] - peerPort = connectedPeer['port'] + try: + peerIp = connectedPeer['ipAddress'] + peerPort = connectedPeer['port'] + except Exception as e: + return '' request = diameterApplication["requestMethod"](**kwargs) self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [broadcastDiameterRequest] [{requestType}] Successfully generated request: {request}", redisClient=self.redisMessaging) - outboundQueue = f"diameter-outbound-{peerIp}-{peerPort}-{time.time_ns()}" - outboundMessage = json.dumps({'diameter-outbound': request}) + outboundQueue = f"diameter-outbound-{peerIp}-{peerPort}" + sendTime = time.time_ns() + outboundMessage = json.dumps({"diameter-outbound": request, "inbound-received-timestamp": sendTime}) self.redisMessaging.sendMessage(queue=outboundQueue, message=outboundMessage, queueExpiry=self.diameterRequestTimeout) self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [broadcastDiameterRequest] [{requestType}] Queueing for peer type: {peerType} on {peerIp}-{peerPort}", redisClient=self.redisMessaging) return connectedPeerList @@ -670,15 +678,18 @@ def awaitDiameterRequestAndResponse(self, requestType: str, hostname: str, timeo except Exception as e: continue connectedPeer = self.getPeerByHostname(hostname=hostname) - peerIp = connectedPeer['ipAddress'] - peerPort = connectedPeer['port'] + try: + peerIp = connectedPeer['ipAddress'] + peerPort = connectedPeer['port'] + except Exception as e: + return '' request = diameterApplication["requestMethod"](**kwargs) responseType = diameterApplication["responseAcronym"] sessionId = kwargs.get('sessionId', None) self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [awaitDiameterRequestAndResponse] [{requestType}] Successfully generated request: {request}", redisClient=self.redisMessaging) sendTime = time.time_ns() - outboundQueue = f"diameter-outbound-{peerIp}-{peerPort}-{sendTime}" - outboundMessage = json.dumps({'diameter-outbound': request}) + outboundQueue = f"diameter-outbound-{peerIp}-{peerPort}" + outboundMessage = json.dumps({"diameter-outbound": request, "inbound-received-timestamp": sendTime}) self.redisMessaging.sendMessage(queue=outboundQueue, message=outboundMessage, queueExpiry=self.diameterRequestTimeout) self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [awaitDiameterRequestAndResponse] [{requestType}] Queueing for host: {hostname} on {peerIp}-{peerPort}", redisClient=self.redisMessaging) startTimer = time.time() @@ -686,29 +697,41 @@ def awaitDiameterRequestAndResponse(self, requestType: str, hostname: str, timeo try: if not time.time() >= startTimer + timeout: if sessionId is None: - responseQueues = self.redisMessaging.getQueues(pattern=f"diameter-inbound-{peerIp.replace('.', '*')}-{peerPort}-{responseType}*") - self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [awaitDiameterRequestAndResponse] [{requestType}] responseQueues(NoSessionId): {responseQueues}", redisClient=self.redisMessaging) - for responseQueue in responseQueues: - if float(responseQueue.split('-')[5]) > sendTime: - inboundResponseList = self.redisMessaging.getMessage(queue=responseQueue) - if len(inboundResponseList) > 0: + queuedMessages = self.redisMessaging.getList(key=f"diameter-inbound") + self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [awaitDiameterRequestAndResponse] [{requestType}] queuedMessages(NoSessionId): {queuedMessages}", redisClient=self.redisMessaging) + for queuedMessage in queuedMessages: + queuedMessage = json.loads(queuedMessage) + clientAddress = queuedMessage.get('clientAddress', None) + clientPort = queuedMessage.get('clientPort', None) + if clientAddress != peerIp or clientPort != peerPort: + continue + messageReceiveTime = queuedMessage.get('inbound-received-timestamp', None) + if float(messageReceiveTime) > sendTime: + messageHex = queuedMessage.get('diameter-inbound') + messageType = self.getDiameterMessageType(messageHex) + if messageType['inbound'].upper() == responseType.upper(): self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [awaitDiameterRequestAndResponse] [{requestType}] Found inbound response: {inboundResponse}", redisClient=self.redisMessaging) - return json.loads(inboundResponseList[0]).get('diameter-inbound', '') + return messageHex time.sleep(0.02) else: - responseQueues = self.redisMessaging.getQueues(pattern=f"diameter-inbound-{peerIp.replace('.', '*')}-{peerPort}-{responseType}*") - self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [awaitDiameterRequestAndResponse] [{requestType}] responseQueues({sessionId}): {responseQueues} responseType: {responseType}", redisClient=self.redisMessaging) - for responseQueue in responseQueues: - if float(responseQueue.split('-')[5]) > sendTime: - inboundResponseList = self.redisMessaging.getList(key=responseQueue) - if len(inboundResponseList) > 0: - for inboundResponse in inboundResponseList: - responseHex = json.loads(inboundResponse)['diameter-inbound'] - packetVars, avps = self.decode_diameter_packet(responseHex) - responseSessionId = bytes.fromhex(self.get_avp_data(avps, 263)[0]).decode('ascii') - if responseSessionId == sessionId: - self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [awaitDiameterRequestAndResponse] [{requestType}] Matched on Session Id: {sessionId}", redisClient=self.redisMessaging) - return json.loads(inboundResponseList[0]).get('diameter-inbound', '') + queuedMessages = self.redisMessaging.getList(key=f"diameter-inbound") + self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [awaitDiameterRequestAndResponse] [{requestType}] queuedMessages({sessionId}): {queuedMessages} responseType: {responseType}", redisClient=self.redisMessaging) + for queuedMessage in queuedMessages: + queuedMessage = json.loads(queuedMessage) + clientAddress = queuedMessage.get('clientAddress', None) + clientPort = queuedMessage.get('clientPort', None) + if clientAddress != peerIp or clientPort != peerPort: + continue + messageReceiveTime = queuedMessage.get('inbound-received-timestamp', None) + if float(messageReceiveTime) > sendTime: + messageHex = queuedMessage.get('diameter-inbound') + messageType = self.getDiameterMessageType(messageHex) + if messageType['inbound'].upper() == responseType.upper(): + packetVars, avps = self.decode_diameter_packet(messageHex) + messageSessionId = bytes.fromhex(self.get_avp_data(avps, 263)[0]).decode('ascii') + if messageSessionId == sessionId: + self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [awaitDiameterRequestAndResponse] [{requestType}] Matched on Session Id: {sessionId}", redisClient=self.redisMessaging) + return messageHex time.sleep(0.02) else: return '' @@ -2572,7 +2595,7 @@ def Answer_16777236_275(self, packet_vars, avps): response = self.generate_diameter_packet("01", "40", 275, 16777236, packet_vars['hop-by-hop-identifier'], packet_vars['end-to-end-identifier'], avp) #Generate Diameter packet return response except Exception as e: - self.logTool.log(service='HSS', level='error', message=f"[diameter.py] [Answer_16777236_275] [STA] Error generating STA, returning 2001: {traceback.format_exc()}", redisClient=self.redisMessaging) + self.logTool.log(service='HSS', level='debug', message=f"[diameter.py] [Answer_16777236_275] [STA] Error generating STA, returning 2001", redisClient=self.redisMessaging) avp = '' sessionId = self.get_avp_data(avps, 263)[0] #Get Session-ID avp += self.generate_avp(263, 40, sessionId) #Set session ID to received session ID diff --git a/lib/messaging.py b/lib/messaging.py index 62b4a78..cc78835 100644 --- a/lib/messaging.py +++ b/lib/messaging.py @@ -33,6 +33,8 @@ def sendMetric(self, serviceName: str, metricName: str, metricType: str, metricA return 'Invalid Argument: metricValue must be a digit' metricValue = float(metricValue) prometheusMetricBody = json.dumps([{ + 'serviceName': serviceName, + 'timestamp': metricTimestamp, 'NAME': metricName, 'TYPE': metricType, 'HELP': metricHelp, @@ -42,7 +44,7 @@ def sendMetric(self, serviceName: str, metricName: str, metricType: str, metricA } ]) - metricQueueName = f"metric-{serviceName}-{metricTimestamp}-{uuid.uuid4()}" + metricQueueName = f"metric" try: self.redisClient.rpush(metricQueueName, prometheusMetricBody) @@ -57,8 +59,8 @@ def sendLogMessage(self, serviceName: str, logLevel: str, logTimestamp: int, mes Stores a message in a given Queue (Key). """ try: - logQueueName = f"log-{serviceName}-{logLevel}-{logTimestamp}-{uuid.uuid4()}" - logMessage = json.dumps({"message": message}) + logQueueName = f"log" + logMessage = json.dumps({"message": message, "service": serviceName, "level": logLevel, "timestamp": logTimestamp}) self.redisClient.rpush(logQueueName, logMessage) if logExpiry is not None: self.redisClient.expire(logQueueName, logExpiry) @@ -103,6 +105,17 @@ def getNextQueue(self, pattern: str='*') -> dict: except Exception as e: return {} + def awaitMessage(self, key: str): + """ + Blocks until a message is received at the given key, then returns the message. + """ + try: + message = self.redisClient.blpop(key) + return tuple(data.decode() for data in message) + except Exception as e: + return '' + + def deleteQueue(self, queue: str) -> bool: """ Deletes the given Queue (Key) diff --git a/lib/messagingAsync.py b/lib/messagingAsync.py index baed706..78f2ca7 100644 --- a/lib/messagingAsync.py +++ b/lib/messagingAsync.py @@ -37,6 +37,8 @@ async def sendMetric(self, serviceName: str, metricName: str, metricType: str, m return 'Invalid Argument: metricValue must be a digit' metricValue = float(metricValue) prometheusMetricBody = json.dumps([{ + 'serviceName': serviceName, + 'timestamp': metricTimestamp, 'NAME': metricName, 'TYPE': metricType, 'HELP': metricHelp, @@ -46,7 +48,7 @@ async def sendMetric(self, serviceName: str, metricName: str, metricType: str, m } ]) - metricQueueName = f"metric-{serviceName}-{metricTimestamp}-{uuid.uuid4()}" + metricQueueName = f"metric" try: async with self.redisClient.pipeline(transaction=True) as redisPipe: @@ -63,8 +65,8 @@ async def sendLogMessage(self, serviceName: str, logLevel: str, logTimestamp: in Stores a log message in a given Queue (Key) asynchronously and sets an expiry (in seconds) if provided. """ try: - logQueueName = f"log-{serviceName}-{logLevel}-{logTimestamp}-{uuid.uuid4()}" - logMessage = json.dumps({"message": message}) + logQueueName = f"log" + logMessage = json.dumps({"message": message, "service": serviceName, "level": logLevel, "timestamp": logTimestamp}) async with self.redisClient.pipeline(transaction=True) as redisPipe: await redisPipe.rpush(logQueueName, logMessage) if logExpiry is not None: @@ -117,6 +119,16 @@ async def getNextQueue(self, pattern: str='*') -> str: print(e) return '' + async def awaitMessage(self, key: str): + """ + Asynchronously blocks until a message is received at the given key, then returns the message. + """ + try: + message = (await(self.redisClient.blpop(key))) + return tuple(data.decode() for data in message) + except Exception as e: + return '' + async def deleteQueue(self, queue: str) -> bool: """ Deletes the given Queue (Key) asynchronously. diff --git a/services/apiService.py b/services/apiService.py index ed00ec8..c2ce524 100644 --- a/services/apiService.py +++ b/services/apiService.py @@ -1451,22 +1451,33 @@ def put(self): print("subscriber_data: " + str(subscriber_data)) #Get PCRF Session - pcrf_session_data = databaseClient.Get_Serving_APN(subscriber_id=subscriber_data['subscriber_id'], apn_id=json_data['apn_id']) - print("pcrf_session_data: " + str(pcrf_session_data)) + servingApn = databaseClient.Get_Serving_APN(subscriber_id=subscriber_data['subscriber_id'], apn_id=json_data['apn_id']) + print("pcrf_session_data: " + str(servingApn)) #Get Charging Rules ChargingRule = databaseClient.Get_Charging_Rule(json_data['charging_rule_id']) ChargingRule['apn_data'] = databaseClient.Get_APN(json_data['apn_id']) print("Got ChargingRule: " + str(ChargingRule)) - diameterRequest = diameterClient.Request_16777238_258(pcrf_session_data['pcrf_session_id'], ChargingRule, pcrf_session_data['subscriber_routing'], pcrf_session_data['serving_pgw'], 'ServingRealm.com') - connectedPgws = diameterClient.getConnectedPeersByType('pgw') - for connectedPgw in connectedPgws: - outboundQueue = f"diameter-outbound-{connectedPgw.get('ipAddress')}-{connectedPgw.get('port')}-{time.time_ns()}" - outboundMessage = json.dumps({"diameter-outbound": diameterRequest}) - redisMessaging.sendMessage(queue=outboundQueue, message=outboundMessage, queueExpiry=60) + subscriberId = subscriber_data.get('subscriber_id', None) + apnId = (self.database.Get_APN_by_Name(apn="ims")).get('apn_id', None) + servingPgwPeer = servingApn.get('serving_pgw_peer', None).split(';')[0] + servingPgw = servingApn.get('serving_pgw', None) + servingPgwRealm = servingApn.get('serving_pgw_realm', None) + pcrfSessionId = servingApn.get('pcrf_session_id', None) + ueIp = servingApn.get('subscriber_routing', None) + + diameterResponse = diameterClient.sendDiameterRequest( + requestType='RAR', + hostname=servingPgwPeer, + sessionId=pcrfSessionId, + chargingRules=ChargingRule, + ueIp=ueIp, + servingPgw=servingPgw, + servingRealm=servingPgwRealm + ) - result = {"request": diameterRequest, "destinationClients": connectedPgws} + result = {"Result": "Successfully sent Gx RAR", "destinationClients": str(servingPgw)} return result, 200 @ns_pcrf.route('/') @@ -1547,7 +1558,7 @@ def patch(self): json_data['scscf_peer'] = None if 'scscf_timestamp' not in json_data: json_data['scscf_timestamp'] = None - response_data.append(databaseClient.Update_Serving_CSCF(imsi=str(json_data['imsi']), serving_cscf=json_data['scscf'], scscf_realm=str(json_data['scscf_realm']), scscf_peer=str(json_data['scscf_peer']), scscf_timestamp=json_data['scscf_timestamp'], propagate=False)) + response_data.append(databaseClient.Update_Serving_CSCF(imsi=str(json_data['imsi']), serving_cscf=json_data['scscf'], scscf_realm=json_data['scscf_realm'], scscf_peer=json_data['scscf_peer'], scscf_timestamp=json_data['scscf_timestamp'], propagate=False)) redisMessaging.sendMetric(serviceName='api', metricName='prom_flask_http_geored_endpoints', metricType='counter', metricAction='inc', metricValue=1.0, metricHelp='Number of Geored Pushes Received', @@ -1566,7 +1577,7 @@ def patch(self): json_data['pcscf_timestamp'] = None if 'pcscf_active_session' not in json_data: json_data['pcscf_active_session'] = None - response_data.append(databaseClient.Update_Proxy_CSCF(imsi=str(json_data['imsi']), proxy_cscf=json_data['pcscf'], pcscf_realm=str(json_data['pcscf_realm']), pcscf_peer=str(json_data['pcscf_peer']), pcscf_timestamp=json_data['pcscf_timestamp'], pcscf_active_session=str(json_data['pcscf_active_session']), propagate=False)) + response_data.append(databaseClient.Update_Proxy_CSCF(imsi=str(json_data['imsi']), proxy_cscf=json_data['pcscf'], pcscf_realm=json_data['pcscf_realm'], pcscf_peer=json_data['pcscf_peer'], pcscf_timestamp=json_data['pcscf_timestamp'], pcscf_active_session=json_data['pcscf_active_session'], propagate=False)) redisMessaging.sendMetric(serviceName='api', metricName='prom_flask_http_geored_endpoints', metricType='counter', metricAction='inc', metricValue=1.0, metricHelp='Number of Geored Pushes Received', diff --git a/services/diameterService.py b/services/diameterService.py index 0969170..cebf4cf 100644 --- a/services/diameterService.py +++ b/services/diameterService.py @@ -36,7 +36,10 @@ def __init__(self): self.diameterLibrary = DiameterAsync(logTool=self.logTool) self.activePeers = {} self.diameterRequestTimeout = int(self.config.get('hss', {}).get('diameter_request_timeout', 10)) - self.benchmarking = self.config.get('hss').get('enable_benchmarking', False) + self.benchmarking = self.config.get('benchmarking', {}).get('enabled', False) + self.benchmarkingInterval = self.config.get('benchmarking', {}).get('reporting_interval', 3600) + self.diameterRequests = 0 + self.diameterResponses = 0 async def validateDiameterInbound(self, clientAddress: str, clientPort: str, inboundData) -> bool: """ @@ -44,28 +47,17 @@ async def validateDiameterInbound(self, clientAddress: str, clientPort: str, inb """ try: packetVars, avps = await(self.diameterLibrary.decodeDiameterPacket(inboundData)) - messageType = await(self.diameterLibrary.getDiameterMessageType(inboundData)) originHost = (await(self.diameterLibrary.getAvpData(avps, 264)))[0] originHost = bytes.fromhex(originHost).decode("utf-8") peerType = await(self.diameterLibrary.getPeerType(originHost)) - self.activePeers[f"{clientAddress}-{clientPort}"].update({'lastDwrTimestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") if messageType['inbound'] == 'DWR' else self.activePeers[f"{clientAddress}-{clientPort}"]['lastDwrTimestamp'], - 'diameterHostname': originHost, - 'peerType': peerType, - }) - await(self.redisReaderMessaging.sendMetric(serviceName='diameter', metricName='prom_diam_inbound_count', - metricType='counter', metricAction='inc', - metricValue=1.0, metricHelp='Number of Diameter Inbounds', - metricLabels={ - "diameter_application_id": str(packetVars["ApplicationId"]), - "diameter_cmd_code": str(packetVars["command_code"]), - "endpoint": originHost, - "type": "inbound"}, - metricExpiry=60)) + self.activePeers[f"{clientAddress}-{clientPort}"].update({'diameterHostname': originHost, + 'peerType': peerType, + }) + return True except Exception as e: await(self.logTool.logAsync(service='Diameter', level='warning', message=f"[Diameter] [validateDiameterInbound] Exception: {e}\n{traceback.format_exc()}")) await(self.logTool.logAsync(service='Diameter', level='warning', message=f"[Diameter] [validateDiameterInbound] AVPs: {avps}\nPacketVars: {packetVars}")) return False - return True async def handleActiveDiameterPeers(self): """ @@ -110,20 +102,34 @@ async def logActivePeers(self): activePeers = '' await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [logActivePeers] {len(self.activePeers)} Active Peers {activePeers}")) + async def logProcessedMessages(self): + """ + Logs the number of processed messages on a rolling basis. + """ + if not self.benchmarking: + return False + + benchmarkInterval = int(self.benchmarkingInterval) + + while True: + await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [logProcessedMessages] Processed {self.diameterRequests} inbound diameter messages in the last {self.benchmarkingInterval} second(s)")) + await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [logProcessedMessages] Processed {self.diameterResponses} outbound in the last {self.benchmarkingInterval} second(s)")) + self.diameterRequests = 0 + self.diameterResponses = 0 + await(asyncio.sleep(benchmarkInterval)) + async def readInboundData(self, reader, clientAddress: str, clientPort: str, socketTimeout: int, coroutineUuid: str) -> bool: """ Reads and parses incoming data from a connected client. Validated diameter messages are sent to the redis queue for processing. Terminates the connection if diameter traffic is not received, or if the client disconnects. """ await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [readInboundData] [{coroutineUuid}] New connection from {clientAddress} on port {clientPort}")) + peerIsValidated = False while True: try: inboundData = await(asyncio.wait_for(reader.read(8192), timeout=socketTimeout)) - if self.benchmarking: - startTime = time.perf_counter() - if reader.at_eof(): await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [readInboundData] [{coroutineUuid}] Socket Timeout for {clientAddress} on port {clientPort}, closing connection.")) return False @@ -131,26 +137,22 @@ async def readInboundData(self, reader, clientAddress: str, clientPort: str, soc if len(inboundData) > 0: await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [readInboundData] [{coroutineUuid}] Received data from {clientAddress} on port {clientPort}")) - if self.benchmarking: - diamteterValidationStartTime = time.perf_counter() - if not await(self.validateDiameterInbound(clientAddress, clientPort, inboundData)): - await(self.logTool.logAsync(service='Diameter', level='warning', message=f"[Diameter] [readInboundData] [{coroutineUuid}] Invalid Diameter Inbound, discarding data.")) - await(asyncio.sleep(0.001)) - continue - if self.benchmarking: - await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [readInboundData] [{coroutineUuid}] Time taken to validate diameter request: {round(((time.perf_counter() - diamteterValidationStartTime)*1000), 3)} ms")) - - - diameterMessageType = await(self.diameterLibrary.getDiameterMessageType(binaryData=inboundData)) - diameterMessageType = diameterMessageType.get('inbound', '') + if not peerIsValidated: + if not await(self.validateDiameterInbound(clientAddress, clientPort, inboundData)): + await(self.logTool.logAsync(service='Diameter', level='warning', message=f"[Diameter] [readInboundData] [{coroutineUuid}] Invalid Diameter Inbound, discarding data.")) + await(asyncio.sleep(0)) + continue + else: + await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [readInboundData] [{coroutineUuid}] Validated peer: {clientAddress} on port {clientPort}")) + peerIsValidated = True - inboundQueueName = f"diameter-inbound-{clientAddress}-{clientPort}-{diameterMessageType}-{time.time_ns()}" - inboundHexString = json.dumps({f"diameter-inbound": inboundData.hex()}) - await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [readInboundData] [{coroutineUuid}] [{diameterMessageType}] Queueing {inboundHexString}")) + inboundQueueName = f"diameter-inbound" + inboundHexString = json.dumps({"diameter-inbound": inboundData.hex(), "inbound-received-timestamp": time.time_ns(), "clientAddress": clientAddress, "clientPort": clientPort}) + await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [readInboundData] [{coroutineUuid}] Queueing {inboundHexString}")) await(self.redisReaderMessaging.sendMessage(queue=inboundQueueName, message=inboundHexString, queueExpiry=self.diameterRequestTimeout)) if self.benchmarking: - await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [readInboundData] [{coroutineUuid}] Time taken to process request: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) - await(asyncio.sleep(0.001)) + self.diameterRequests += 1 + await(asyncio.sleep(0)) except Exception as e: await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [readInboundData] [{coroutineUuid}] Socket Exception for {clientAddress} on port {clientPort}, closing connection.\n{e}")) @@ -158,44 +160,24 @@ async def readInboundData(self, reader, clientAddress: str, clientPort: str, soc async def writeOutboundData(self, writer, clientAddress: str, clientPort: str, socketTimeout: int, coroutineUuid: str) -> bool: """ - Continually polls the Redis queue for outbound messages. Received messages from the queue are validated against the connected client, and sent. + Waits for a message to be received from Redis, then sends to the connected client. """ await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [writeOutboundData] [{coroutineUuid}] writeOutboundData with host {clientAddress} on port {clientPort}")) - while True: + while not writer.transport.is_closing(): try: + await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [writeOutboundData] [{coroutineUuid}] Waiting for messages for host {clientAddress} on port {clientPort}")) + pendingOutboundMessage = json.loads((await(self.redisWriterMessaging.awaitMessage(key=f"diameter-outbound-{clientAddress}-{clientPort}")))[1]) + await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [writeOutboundData] [{coroutineUuid}] Received message: {pendingOutboundMessage} for host {clientAddress} on port {clientPort}")) + diameterOutboundBinary = bytes.fromhex(pendingOutboundMessage.get('diameter-outbound', '')) + await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [writeOutboundData] [{coroutineUuid}] Sending: {diameterOutboundBinary.hex()} to to {clientAddress} on {clientPort}.")) + writer.write(diameterOutboundBinary) + await(writer.drain()) if self.benchmarking: - startTime = time.perf_counter() - - if writer.transport.is_closing(): - return False - - pendingOutboundQueue = await(self.redisWriterMessaging.getNextQueue(pattern=f'diameter-outbound-{clientAddress.replace(".", "*")}-{clientPort}-*')) - if not len(pendingOutboundQueue) > 0: - await(asyncio.sleep(0.01)) - continue - pendingOutboundQueue = pendingOutboundQueue - outboundQueueSplit = str(pendingOutboundQueue).split('-') - queuedMessageType = outboundQueueSplit[1] - diameterOutboundHost = outboundQueueSplit[2] - diameterOutboundPort = outboundQueueSplit[3] - - if str(diameterOutboundHost) == str(clientAddress) and str(diameterOutboundPort) == str(clientPort) and queuedMessageType == 'outbound': - await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [writeOutboundData] [{coroutineUuid}] Matched {pendingOutboundQueue} to host {clientAddress} on port {clientPort}")) - diameterOutbound = json.loads(await(self.redisWriterMessaging.getMessage(queue=pendingOutboundQueue))) - diameterOutboundBinary = bytes.fromhex(next(iter(diameterOutbound.values()))) - diameterMessageType = await(self.diameterLibrary.getDiameterMessageType(binaryData=diameterOutboundBinary)) - diameterMessageType = diameterMessageType.get('outbound', '') - await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [writeOutboundData] [{coroutineUuid}] [{diameterMessageType}] Sending: {diameterOutboundBinary.hex()} to to {clientAddress} on {clientPort}.")) - writer.write(diameterOutboundBinary) - await(writer.drain()) - await(asyncio.sleep(0.001)) - if self.benchmarking: - await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [writeOutboundData] [{coroutineUuid}] Time taken to write response: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) - + self.diameterResponses += 1 + await(asyncio.sleep(0)) except Exception as e: await(self.logTool.logAsync(service='Diameter', level='info', message=f"[Diameter] [writeOutboundData] [{coroutineUuid}] Connection closed for {clientAddress} on port {clientPort}, closing writer.")) return False - await(asyncio.sleep(0.001)) async def handleConnection(self, reader, writer): """ @@ -214,7 +196,6 @@ async def handleConnection(self, reader, writer): "ipAddress":'', "port":'', "connectionStatus": '', - "lastDwrTimestamp": '', "diameterHostname": '', "peerType": '', } @@ -262,7 +243,7 @@ async def handleConnection(self, reader, writer): async def startServer(self, host: str=None, port: int=None, type: str=None): """ Start a server with the given parameters and handle new clients with self.handleConnection. - Also create a single instance of self.handleActiveDiameterPeers. + Also create a single instance of self.handleActiveDiameterPeers and self.logProcessedMessages. """ if host is None: @@ -283,6 +264,8 @@ async def startServer(self, host: str=None, port: int=None, type: str=None): servingAddresses = ', '.join(str(sock.getsockname()) for sock in server.sockets) await(self.logTool.logAsync(service='Diameter', level='info', message=f"{self.banners.diameterService()}\n[Diameter] Serving on {servingAddresses}")) handleActiveDiameterPeerTask = asyncio.create_task(self.handleActiveDiameterPeers()) + if self.benchmarking: + logProcessedMessagesTask = asyncio.create_task(self.logProcessedMessages()) async with server: await(server.serve_forever()) diff --git a/services/georedService.py b/services/georedService.py index 01336fe..861e8b8 100644 --- a/services/georedService.py +++ b/services/georedService.py @@ -256,21 +256,12 @@ async def handleAsymmetricGeoredQueue(self): try: if self.benchmarking: startTime = time.perf_counter() - asymmetricGeoredQueue = await(self.redisGeoredMessaging.getNextQueue(pattern='asymmetric-geored-*')) - if not len(asymmetricGeoredQueue) > 0: - await(asyncio.sleep(0.01)) - continue - georedMessage = await(self.redisGeoredMessaging.getMessage(queue=asymmetricGeoredQueue)) - if not len(georedMessage) > 0: - await(asyncio.sleep(0.01)) - continue - await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Queue: {asymmetricGeoredQueue}")) + georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='asymmetric-geored')))[1]) await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Message: {georedMessage}")) - georedDict = json.loads(georedMessage) - georedOperation = georedDict['operation'] - georedBody = georedDict['body'] - georedUrls = georedDict['urls'] + georedOperation = georedMessage['operation'] + georedBody = georedMessage['body'] + georedUrls = georedMessage['urls'] georedTasks = [] for georedEndpoint in georedUrls: @@ -279,11 +270,11 @@ async def handleAsymmetricGeoredQueue(self): if self.benchmarking: await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleAsymmetricGeoredQueue] Time taken to send asymmetric geored message to specified peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) - await(asyncio.sleep(0.001)) + await(asyncio.sleep(0)) except Exception as e: await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleAsymmetricGeoredQueue] Error handling asymmetric geored queue: {e}")) - await(asyncio.sleep(0.001)) + await(asyncio.sleep(0)) continue async def handleGeoredQueue(self): @@ -295,20 +286,11 @@ async def handleGeoredQueue(self): try: if self.benchmarking: startTime = time.perf_counter() - georedQueue = await(self.redisGeoredMessaging.getNextQueue(pattern='geored-*')) - if not len(georedQueue) > 0: - await(asyncio.sleep(0.01)) - continue - georedMessage = await(self.redisGeoredMessaging.getMessage(queue=georedQueue)) - if not len(georedMessage) > 0: - await(asyncio.sleep(0.01)) - continue - await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Queue: {georedQueue}")) + georedMessage = json.loads((await(self.redisGeoredMessaging.awaitMessage(key='geored')))[1]) await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Message: {georedMessage}")) - georedDict = json.loads(georedMessage) - georedOperation = georedDict['operation'] - georedBody = georedDict['body'] + georedOperation = georedMessage['operation'] + georedBody = georedMessage['body'] georedTasks = [] for remotePeer in self.georedPeers: @@ -317,11 +299,11 @@ async def handleGeoredQueue(self): if self.benchmarking: await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleGeoredQueue] Time taken to send geored message to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) - await(asyncio.sleep(0.001)) + await(asyncio.sleep(0)) except Exception as e: await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleGeoredQueue] Error handling geored queue: {e}")) - await(asyncio.sleep(0.001)) + await(asyncio.sleep(0)) continue async def handleWebhookQueue(self): @@ -333,21 +315,13 @@ async def handleWebhookQueue(self): try: if self.benchmarking: startTime = time.perf_counter() - webhookQueue = await(self.redisWebhookMessaging.getNextQueue(pattern='webhook-*')) - if not len(webhookQueue) > 0: - await(asyncio.sleep(0.01)) - continue - webhookMessage = await(self.redisWebhookMessaging.getMessage(queue=webhookQueue)) - if not len(webhookMessage) > 0: - await(asyncio.sleep(0.001)) - continue - await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Queue: {webhookQueue}")) + webhookMessage = json.loads((await(self.redisWebhookMessaging.awaitMessage(key='webhook')))[1]) + await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Message: {webhookMessage}")) - webhookDict = json.loads(webhookMessage) - webhookHeaders = webhookDict['headers'] - webhookOperation = webhookDict['operation'] - webhookBody = webhookDict['body'] + webhookHeaders = webhookMessage['headers'] + webhookOperation = webhookMessage['operation'] + webhookBody = webhookMessage['body'] webhookTasks = [] for remotePeer in self.webhookPeers: diff --git a/services/hssService.py b/services/hssService.py index dc8027d..b1d9da5 100644 --- a/services/hssService.py +++ b/services/hssService.py @@ -1,4 +1,4 @@ -import os, sys, json, yaml, time +import os, sys, json, yaml, time, traceback sys.path.append(os.path.realpath('../lib')) from messaging import RedisMessaging from diameter import Diameter @@ -31,7 +31,6 @@ def __init__(self): self.diameterLibrary = Diameter(logTool=self.logTool, originHost=self.originHost, originRealm=self.originRealm, productName=self.productName, mcc=self.mcc, mnc=self.mnc) self.benchmarking = self.config.get('hss').get('enable_benchmarking', False) - def handleQueue(self): """ Gets and parses inbound diameter requests, processes them and queues the response. @@ -40,35 +39,41 @@ def handleQueue(self): try: if self.benchmarking: startTime = time.perf_counter() - inboundQueue = self.redisMessaging.getNextQueue(pattern='diameter-inbound*') - inboundMessage = self.redisMessaging.getMessage(queue=inboundQueue) - assert(len(inboundMessage)) - inboundDict = json.loads(inboundMessage) - inboundBinary = bytes.fromhex(next(iter(inboundDict.values()))) - inboundSplit = str(inboundQueue).split('-') - inboundHost = inboundSplit[2] - inboundPort = inboundSplit[3] - inboundMessageType = inboundSplit[4] - inboundTimestamp = inboundSplit[5] + inboundMessage = json.loads(self.redisMessaging.awaitMessage(key='diameter-inbound')[1]) + + inboundBinary = bytes.fromhex(inboundMessage.get('diameter-inbound', None)) + if inboundBinary == None: + continue + inboundHost = inboundMessage.get('clientAddress', None) + inboundPort = inboundMessage.get('clientPort', None) + inboundTimestamp = inboundMessage.get('inbound-received-timestamp', None) try: diameterOutbound = self.diameterLibrary.generateDiameterResponse(binaryData=inboundBinary) + + if diameterOutbound == None: + continue + if not len(diameterOutbound) > 0: + continue + diameterMessageTypeDict = self.diameterLibrary.getDiameterMessageType(binaryData=inboundBinary) + + if diameterMessageTypeDict == None: + continue + if not len(diameterMessageTypeDict) > 0: + continue + diameterMessageTypeInbound = diameterMessageTypeDict.get('inbound', '') diameterMessageTypeOutbound = diameterMessageTypeDict.get('outbound', '') except Exception as e: self.logTool.log(service='HSS', level='warning', message=f"[HSS] [handleQueue] Failed to generate diameter outbound: {e}", redisClient=self.redisMessaging) continue - self.logTool.log(service='HSS', level='debug', message=f"[HSS] [handleQueue] [{diameterMessageTypeInbound}] Inbound Diameter Inbound Queue: {inboundQueue}", redisClient=self.redisMessaging) self.logTool.log(service='HSS', level='debug', message=f"[HSS] [handleQueue] [{diameterMessageTypeInbound}] Inbound Diameter Inbound: {inboundMessage}", redisClient=self.redisMessaging) - - if not len(diameterOutbound) > 0: - continue - outboundQueue = f"diameter-outbound-{inboundHost}-{inboundPort}-{inboundTimestamp}" - outboundMessage = json.dumps({"diameter-outbound": diameterOutbound}) + outboundQueue = f"diameter-outbound-{inboundHost}-{inboundPort}" + outboundMessage = json.dumps({"diameter-outbound": diameterOutbound, "inbound-received-timestamp": inboundTimestamp}) self.logTool.log(service='HSS', level='debug', message=f"[HSS] [handleQueue] [{diameterMessageTypeOutbound}] Generated Diameter Outbound: {diameterOutbound}", redisClient=self.redisMessaging) self.logTool.log(service='HSS', level='debug', message=f"[HSS] [handleQueue] [{diameterMessageTypeOutbound}] Outbound Diameter Outbound Queue: {outboundQueue}", redisClient=self.redisMessaging) @@ -79,7 +84,7 @@ def handleQueue(self): self.logTool.log(service='HSS', level='info', message=f"[HSS] [handleQueue] [{diameterMessageTypeInbound}] Time taken to process request: {round(((time.perf_counter() - startTime)*1000), 3)} ms", redisClient=self.redisMessaging) except Exception as e: - time.sleep(0.001) + self.logTool.log(service='HSS', level='error', message=f"[HSS] [handleQueue] Exception: {traceback.format_exc()}", redisClient=self.redisMessaging) continue diff --git a/services/logService.py b/services/logService.py index 4828195..34e7ae0 100644 --- a/services/logService.py +++ b/services/logService.py @@ -39,7 +39,6 @@ def __init__(self): } print(f"{self.banners.logService()}") - def handleLogs(self): """ Continually polls the Redis DB for queued log files. Parses and writes log files to disk, using LogTool. @@ -47,24 +46,14 @@ def handleLogs(self): activeLoggers = {} while True: try: - logQueue = self.redisMessaging.getNextQueue(pattern='log-*') - logMessage = self.redisMessaging.getMessage(queue=logQueue) - - if not len(logMessage) > 0: - time.sleep(0.001) - continue + logMessage = json.loads(self.redisMessaging.awaitMessage(key='log')[1]) - print(f"[Log] Queue: {logQueue}") print(f"[Log] Message: {logMessage}") - logSplit = logQueue.split('-') - logService = logSplit[1].lower() - logLevel = logSplit[2].upper() - logTimestamp = logSplit[3] - - logDict = json.loads(logMessage) - logFileMessage = logDict['message'] - + logFileMessage = logMessage['message'] + logService = logMessage.get('service').lower() + logLevel = logMessage.get('level').lower() + logTimestamp = logMessage.get('timestamp') if f"{logService}_logging_file" not in self.logFilePaths: continue diff --git a/services/metricService.py b/services/metricService.py index d75902d..12d51c1 100644 --- a/services/metricService.py +++ b/services/metricService.py @@ -34,13 +34,8 @@ def handleMetrics(self): actions = {'inc': 'inc', 'dec': 'dec', 'set':'set'} prometheusTypes = {'counter': Counter, 'gauge': Gauge, 'histogram': Histogram, 'summary': Summary} - metricQueue = self.redisMessaging.getNextQueue(pattern='metric-*') - metric = self.redisMessaging.getMessage(queue=metricQueue) + metric = self.redisMessaging.awaitMessage(key='metric')[1] - if not (len(metric) > 0): - time.sleep(0.001) - return - self.logTool.log(service='Metric', level='debug', message=f"[Metric] [handleMetrics] Received Metric: {metric}", redisClient=self.redisMessaging) prometheusJsonList = json.loads(metric)