Skip to content

Commit

Permalink
Support for OCS notifications on CCR-I/T
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkneipp committed Feb 13, 2024
1 parent 816d2eb commit 42a48a3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 7 deletions.
6 changes: 6 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ webhooks:
endpoints:
- 'http://127.0.0.1:8181'

### Notifications to OCS on Credit Control Requests
ocs:
enabled: False
endpoints:
- 'http://127.0.0.1:8282'

## Geographic Redundancy Parameters
geored:
enabled: False
Expand Down
40 changes: 39 additions & 1 deletion lib/diameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2021,7 +2021,6 @@ def Answer_16777238_272(self, packet_vars, avps):
}

self.database.Update_Emergency_Subscriber(subscriberIp=ueIp, subscriberData=emergencySubscriberData, imsi=imsi, gxSessionId=emergencySubscriberData.get('servingPgw'))

avp += self.generate_avp(268, 40, self.int_to_hex(2001, 4)) #Result Code (DIAMETER_SUCCESS (2001))
response = self.generate_diameter_packet("01", "40", 272, 16777238, packet_vars['hop-by-hop-identifier'], packet_vars['end-to-end-identifier'], avp) #Generate Diameter packet
return response
Expand Down Expand Up @@ -2110,6 +2109,26 @@ def Answer_16777238_272(self, packet_vars, avps):
try:
ue_ip = self.get_avp_data(avps, 8)[0]
ue_ip = str(self.hex_to_ip(ue_ip))
# Fire a notification to the webhook queue, for the OCS.
try:
ocsNotificationBody = {
'notification_type': 'ocs',
'timestamp': time.time_ns(),
'operation': 'POST',
'headers': {'Content-Type': 'application/json'},
'body': {
'event': 'initiate',
'subscriber': {
'imsi': imsi,
'ue_ip': ue_ip,
'apn': apn,
'pcrf_session_id': binascii.unhexlify(session_id).decode(),
}
}
}
self.redisMessaging.sendMessage(queue=f'webhook', message=json.dumps(ocsNotificationBody), queueExpiry=120, usePrefix=True, prefixHostname=self.hostname, prefixServiceName='webhook')
except Exception as e:
self.logTool.log(service='HSS', level='error', message=f"[diameter.py] [Answer_16777238_272] [CCA] Failed queueing OCS notification to redis: {traceback.format_exc()}", redisClient=self.redisMessaging)
except Exception as E:
self.logTool.log(service='HSS', level='error', message="[diameter.py] [Answer_16777238_272] [CCA] Failed to get UE IP", redisClient=self.redisMessaging)
self.logTool.log(service='HSS', level='error', message=E, redisClient=self.redisMessaging)
Expand All @@ -2119,6 +2138,7 @@ def Answer_16777238_272(self, packet_vars, avps):
remote_peer = remote_peer + ";" + str(self.config['hss']['OriginHost'])
self.database.Update_Serving_APN(imsi=imsi, apn=apn, pcrf_session_id=binascii.unhexlify(session_id).decode(), serving_pgw=OriginHost, subscriber_routing=str(ue_ip), serving_pgw_realm=OriginRealm, serving_pgw_peer=remote_peer)


#Supported-Features(628) (Gx feature list)
avp += self.generate_vendor_avp(628, "80", 10415, "0000010a4000000c000028af0000027580000010000028af000000010000027680000010000028af0000000b")

Expand Down Expand Up @@ -2199,6 +2219,24 @@ def Answer_16777238_272(self, packet_vars, avps):
# CCR - Termination Request
elif int(CC_Request_Type) == 3:
self.logTool.log(service='HSS', level='debug', message="[diameter.py] [Answer_16777238_272] [CCA] Request type for CCA is 3 - Termination", redisClient=self.redisMessaging)
try:
ocsNotificationBody = {
'notification_type': 'ocs',
'timestamp': time.time_ns(),
'operation': 'POST',
'headers': {'Content-Type': 'application/json'},
'body': {
'event': 'terminate',
'subscriber': {
'imsi': imsi,
'apn': apn,
'pcrf_session_id': binascii.unhexlify(session_id).decode()
}
}
}
self.redisMessaging.sendMessage(queue=f'webhook', message=json.dumps(ocsNotificationBody), queueExpiry=120, usePrefix=True, prefixHostname=self.hostname, prefixServiceName='webhook')
except Exception as e:
self.logTool.log(service='HSS', level='error', message=f"[diameter.py] [Answer_16777238_272] [CCA] Failed queueing OCS notification to redis: {traceback.format_exc()}", redisClient=self.redisMessaging)
if 'ims' in apn:
try:
self.database.Update_Serving_CSCF(imsi=imsi, serving_cscf=None)
Expand Down
27 changes: 21 additions & 6 deletions services/georedService.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import uuid, time
import asyncio, aiohttp
import socket
import traceback
sys.path.append(os.path.realpath('../lib'))
from messagingAsync import RedisMessagingAsync
from banners import Banners
Expand Down Expand Up @@ -32,6 +33,7 @@ def __init__(self, redisHost: str='127.0.0.1', redisPort: int=6379):

self.georedPeers = self.config.get('geored', {}).get('endpoints', [])
self.webhookPeers = self.config.get('webhooks', {}).get('endpoints', [])
self.ocsPeers = self.config.get('ocs', {}).get('endpoints', [])
self.benchmarking = self.config.get('hss').get('enable_benchmarking', False)
self.hostname = socket.gethostname()

Expand Down Expand Up @@ -270,7 +272,7 @@ async def sendWebhook(self, asyncSession, url: str, operation: str, body: str, h
prefixHostname=self.hostname,
prefixServiceName='metric'))
except Exception as e:
await(self.logTool.logAsync(service='Geored', level='error', message=f"[Geored] [sendWebhook] Operation {operation} encountered unknown error on {url}, with body: ({body}) and transactionId {transactionId}. Response code: {responseStatusCode}. Error Message: {e}"))
await(self.logTool.logAsync(service='Geored', level='error', message=f"[Geored] [sendWebhook] Operation {operation} encountered unknown error on {url}, with body: ({body}) and transactionId {transactionId}. Response code: {responseStatusCode}. Error Message: {traceback.format_exc()}"))
asyncio.ensure_future(self.redisWebhookMessaging.sendMetric(serviceName='webhook', metricName='prom_http_webhook',
metricType='counter', metricAction='inc',
metricValue=1.0, metricHelp='Number of Webhook Pushes',
Expand Down Expand Up @@ -361,19 +363,32 @@ async def handleWebhookQueue(self):

await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Message: {webhookMessage}"))

webhookType = 'other'

notificationType = webhookMessage.get('notification_type', None)
if notificationType:
if 'ocs' in notificationType.lower():
webhookType = 'ocs'

webhookHeaders = webhookMessage['headers']
webhookOperation = webhookMessage['operation']
webhookBody = webhookMessage['body']
webhookTasks = []

socketSession = aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False))
async with socketSession as session:
for remotePeer in self.webhookPeers:
webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders))
await asyncio.gather(*webhookTasks)
if webhookType == 'ocs':
for remotePeer in self.ocsPeers:
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Sending OCS Notification to: {remotePeer}"))
webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders))
await asyncio.gather(*webhookTasks)
else:
for remotePeer in self.webhookPeers:
await(self.logTool.logAsync(service='Geored', level='debug', message=f"[Geored] [handleWebhookQueue] Sending Notification to: {remotePeer}"))
webhookTasks.append(self.sendWebhook(asyncSession=session, url=remotePeer, operation=webhookOperation, body=webhookBody, headers=webhookHeaders))
await asyncio.gather(*webhookTasks)
if self.benchmarking:
await(self.logTool.logAsync(service='Geored', level='info', message=f"[Geored] [handleWebhookQueue] Time taken to send webhook to all geored peers: {round(((time.perf_counter() - startTime)*1000), 3)} ms"))

await(asyncio.sleep(0.001))

except Exception as e:
Expand Down

0 comments on commit 42a48a3

Please sign in to comment.