From 42a48a36afcefeade7eed23ce5842f6042f72829 Mon Sep 17 00:00:00 2001 From: davidkneipp Date: Tue, 13 Feb 2024 15:20:28 +1000 Subject: [PATCH] Support for OCS notifications on CCR-I/T --- config.yaml | 6 ++++++ lib/diameter.py | 40 ++++++++++++++++++++++++++++++++++++++- services/georedService.py | 27 ++++++++++++++++++++------ 3 files changed, 66 insertions(+), 7 deletions(-) diff --git a/config.yaml b/config.yaml index 9d2e7fb..d2b4769 100644 --- a/config.yaml +++ b/config.yaml @@ -126,6 +126,12 @@ webhooks: endpoints: - 'http://127.0.0.1:8181' +### Notifications to OCS on Credit Control Requests +ocs: + enabled: False + endpoints: + - 'http://127.0.0.1:8282' + ## Geographic Redundancy Parameters geored: enabled: False diff --git a/lib/diameter.py b/lib/diameter.py index d3f459f..c418482 100755 --- a/lib/diameter.py +++ b/lib/diameter.py @@ -2021,7 +2021,6 @@ def Answer_16777238_272(self, packet_vars, avps): } self.database.Update_Emergency_Subscriber(subscriberIp=ueIp, subscriberData=emergencySubscriberData, imsi=imsi, gxSessionId=emergencySubscriberData.get('servingPgw')) - avp += self.generate_avp(268, 40, self.int_to_hex(2001, 4)) #Result Code (DIAMETER_SUCCESS (2001)) response = self.generate_diameter_packet("01", "40", 272, 16777238, packet_vars['hop-by-hop-identifier'], packet_vars['end-to-end-identifier'], avp) #Generate Diameter packet return response @@ -2110,6 +2109,26 @@ def Answer_16777238_272(self, packet_vars, avps): try: ue_ip = self.get_avp_data(avps, 8)[0] ue_ip = str(self.hex_to_ip(ue_ip)) + # Fire a notification to the webhook queue, for the OCS. + try: + ocsNotificationBody = { + 'notification_type': 'ocs', + 'timestamp': time.time_ns(), + 'operation': 'POST', + 'headers': {'Content-Type': 'application/json'}, + 'body': { + 'event': 'initiate', + 'subscriber': { + 'imsi': imsi, + 'ue_ip': ue_ip, + 'apn': apn, + 'pcrf_session_id': binascii.unhexlify(session_id).decode(), + } + } + } + self.redisMessaging.sendMessage(queue=f'webhook', message=json.dumps(ocsNotificationBody), queueExpiry=120, usePrefix=True, prefixHostname=self.hostname, prefixServiceName='webhook') + except Exception as e: + self.logTool.log(service='HSS', level='error', message=f"[diameter.py] [Answer_16777238_272] [CCA] Failed queueing OCS notification to redis: {traceback.format_exc()}", redisClient=self.redisMessaging) except Exception as E: self.logTool.log(service='HSS', level='error', message="[diameter.py] [Answer_16777238_272] [CCA] Failed to get UE IP", redisClient=self.redisMessaging) self.logTool.log(service='HSS', level='error', message=E, redisClient=self.redisMessaging) @@ -2119,6 +2138,7 @@ def Answer_16777238_272(self, packet_vars, avps): remote_peer = remote_peer + ";" + str(self.config['hss']['OriginHost']) self.database.Update_Serving_APN(imsi=imsi, apn=apn, pcrf_session_id=binascii.unhexlify(session_id).decode(), serving_pgw=OriginHost, subscriber_routing=str(ue_ip), serving_pgw_realm=OriginRealm, serving_pgw_peer=remote_peer) + #Supported-Features(628) (Gx feature list) avp += self.generate_vendor_avp(628, "80", 10415, "0000010a4000000c000028af0000027580000010000028af000000010000027680000010000028af0000000b") @@ -2199,6 +2219,24 @@ def Answer_16777238_272(self, packet_vars, avps): # CCR - Termination Request elif int(CC_Request_Type) == 3: self.logTool.log(service='HSS', level='debug', message="[diameter.py] [Answer_16777238_272] [CCA] Request type for CCA is 3 - Termination", redisClient=self.redisMessaging) + try: + ocsNotificationBody = { + 'notification_type': 'ocs', + 'timestamp': time.time_ns(), + 'operation': 'POST', + 'headers': {'Content-Type': 'application/json'}, + 'body': { + 'event': 'terminate', + 'subscriber': { + 'imsi': imsi, + 'apn': apn, + 'pcrf_session_id': binascii.unhexlify(session_id).decode() + } + } + } + self.redisMessaging.sendMessage(queue=f'webhook', message=json.dumps(ocsNotificationBody), queueExpiry=120, usePrefix=True, prefixHostname=self.hostname, prefixServiceName='webhook') + except Exception as e: + self.logTool.log(service='HSS', level='error', message=f"[diameter.py] [Answer_16777238_272] [CCA] Failed queueing OCS notification to redis: {traceback.format_exc()}", redisClient=self.redisMessaging) if 'ims' in apn: try: self.database.Update_Serving_CSCF(imsi=imsi, serving_cscf=None) diff --git a/services/georedService.py b/services/georedService.py index 7f84d44..6066735 100755 --- a/services/georedService.py +++ b/services/georedService.py @@ -2,6 +2,7 @@ import uuid, time import asyncio, aiohttp import socket +import traceback sys.path.append(os.path.realpath('../lib')) from messagingAsync import RedisMessagingAsync from banners import Banners @@ -32,6 +33,7 @@ def __init__(self, redisHost: str='127.0.0.1', redisPort: int=6379): self.georedPeers = self.config.get('geored', {}).get('endpoints', []) self.webhookPeers = self.config.get('webhooks', {}).get('endpoints', []) + self.ocsPeers = self.config.get('ocs', {}).get('endpoints', []) self.benchmarking = self.config.get('hss').get('enable_benchmarking', False) self.hostname = socket.gethostname() @@ -270,7 +272,7 @@ async def sendWebhook(self, asyncSession, url: str, operation: str, body: str, h prefixHostname=self.hostname, prefixServiceName='metric')) except Exception as e: - await(self.logTool.logAsync(service='Geored', level='error', message=f"[Geored] [sendWebhook] Operation {operation} encountered unknown error on {url}, with body: ({body}) and transactionId {transactionId}. Response code: {responseStatusCode}. Error Message: {e}")) + await(self.logTool.logAsync(service='Geored', level='error', message=f"[Geored] [sendWebhook] Operation {operation} encountered unknown error on {url}, with body: ({body}) and transactionId {transactionId}. Response code: {responseStatusCode}. Error Message: {traceback.format_exc()}")) asyncio.ensure_future(self.redisWebhookMessaging.sendMetric(serviceName='webhook', metricName='prom_http_webhook', metricType='counter', metricAction='inc', metricValue=1.0, metricHelp='Number of Webhook Pushes', @@ -361,19 +363,32 @@ async def handleWebhookQueue(self): await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Message: {webhookMessage}")) + webhookType = 'other' + + notificationType = webhookMessage.get('notification_type', None) + if notificationType: + if 'ocs' in notificationType.lower(): + webhookType = 'ocs' + webhookHeaders = webhookMessage['headers'] webhookOperation = webhookMessage['operation'] webhookBody = webhookMessage['body'] webhookTasks = [] - + socketSession = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) async with socketSession as session: - for remotePeer in self.webhookPeers: - webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders)) - await asyncio.gather(*webhookTasks) + if webhookType == 'ocs': + for remotePeer in self.ocsPeers: + await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Sending OCS Notification to: {remotePeer}")) + webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders)) + await asyncio.gather(*webhookTasks) + else: + for remotePeer in self.webhookPeers: + await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Sending Notification to: {remotePeer}")) + webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders)) + await asyncio.gather(*webhookTasks) if self.benchmarking: await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleWebhookQueue] Time taken to send webhook to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms")) - await(asyncio.sleep(0.001)) except Exception as e: