Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkneipp committed Jul 3, 2024
2 parents b01350f + be4da87 commit f8826dc
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 3 deletions.
6 changes: 6 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ hss:
#The maximum time to wait, in seconds, before discarding a diameter request.
diameter_request_timeout: 3

# Whether to send a DWR to connected peers.
send_dwr: False

# How often to send a DWR to connected peers if enabled, in seconds.
send_dwr_interval: 5

#The amount of time, in seconds, before purging a disconnected client from the Active Diameter Peers key in redis.
active_diameter_peers_timeout: 10

Expand Down
100 changes: 98 additions & 2 deletions lib/diameterAsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import math
import asyncio
import yaml
import uuid
import socket
import traceback
import binascii
from messagingAsync import RedisMessagingAsync


Expand All @@ -11,7 +14,7 @@ class DiameterAsync:
def __init__(self, logTool):
self.diameterCommandList = [
{"commandCode": 257, "applicationId": 0, "flags": 80, "responseMethod": self.Answer_257, "failureResultCode": 5012 ,"requestAcronym": "CER", "responseAcronym": "CEA", "requestName": "Capabilites Exchange Request", "responseName": "Capabilites Exchange Answer"},
{"commandCode": 280, "applicationId": 0, "flags": 80, "responseMethod": self.Answer_280, "failureResultCode": 5012 ,"requestAcronym": "DWR", "responseAcronym": "DWA", "requestName": "Device Watchdog Request", "responseName": "Device Watchdog Answer"},
{"commandCode": 280, "applicationId": 0, "flags": 80, "requestMethod": self.Request_280, "responseMethod": self.Answer_280, "failureResultCode": 5012 ,"requestAcronym": "DWR", "responseAcronym": "DWA", "requestName": "Device Watchdog Request", "responseName": "Device Watchdog Answer"},
{"commandCode": 282, "applicationId": 0, "flags": 80, "responseMethod": self.Answer_282, "failureResultCode": 5012 ,"requestAcronym": "DPR", "responseAcronym": "DPA", "requestName": "Disconnect Peer Request", "responseName": "Disconnect Peer Answer"},
{"commandCode": 300, "applicationId": 16777216, "responseMethod": self.Answer_16777216_300, "failureResultCode": 4100 ,"requestAcronym": "UAR", "responseAcronym": "UAA", "requestName": "User Authentication Request", "responseName": "User Authentication Answer"},
{"commandCode": 301, "applicationId": 16777216, "responseMethod": self.Answer_16777216_301, "failureResultCode": 4100 ,"requestAcronym": "SAR", "responseAcronym": "SAA", "requestName": "Server Assignment Request", "responseName": "Server Assignment Answer"},
Expand Down Expand Up @@ -53,6 +56,15 @@ async def myRound(self, n, base=4):
else:
return 4

#Converts string to hex
async def string_to_hex(self, string):
string_bytes = string.encode('utf-8')
return str(binascii.hexlify(string_bytes), 'ascii')

#Converts int to hex padded to required number of bytes
async def int_to_hex(self, input_int, output_bytes):
return format(input_int,"x").zfill(output_bytes*2)

async def roundUpToMultiple(self, n, multiple):
return ((n + multiple - 1) // multiple) * multiple

Expand Down Expand Up @@ -299,7 +311,91 @@ async def generateDiameterResponse(self, binaryData: str) -> str:
continue

return response


async def generateId(self, length):
length = length * 2
return str(uuid.uuid4().hex[:length])

#Generates an AVP with inputs provided (AVP Code, AVP Flags, AVP Content, Padding)
#AVP content must already be in HEX - This can be done with binascii.hexlify(avp_content.encode())
async def generate_avp(self, avp_code, avp_flags, avp_content):
avp_code = format(avp_code,"x").zfill(8)

avp_length = 1 ##This is a placeholder that's overwritten later

#AVP Must always be a multiple of 4 - Round up to nearest multiple of 4 and fill remaining bits with padding
avp = str(avp_code) + str(avp_flags) + str("000000") + str(avp_content)
avp_length = int(len(avp)/2)

if avp_length % 4 == 0: #Multiple of 4 - No Padding needed
avp_padding = ''
else: #Not multiple of 4 - Padding needed
rounded_value = await(self.myRound(avp_length))
avp_padding = format(0,"x").zfill(int( rounded_value - avp_length) * 2)

avp = str(avp_code) + str(avp_flags) + str(format(avp_length,"x").zfill(6)) + str(avp_content) + str(avp_padding)
return avp

#Generates an AVP with inputs provided (AVP Code, AVP Flags, AVP Content, Padding)
#AVP content must already be in HEX - This can be done with binascii.hexlify(avp_content.encode())
async def generate_vendor_avp(self, avp_code, avp_flags, avp_vendorid, avp_content):
avp_code = format(avp_code,"x").zfill(8)

avp_length = 1 ##This is a placeholder that gets overwritten later

avp_vendorid = format(int(avp_vendorid),"x").zfill(8)

#AVP Must always be a multiple of 4 - Round up to nearest multiple of 4 and fill remaining bits with padding
avp = str(avp_code) + str(avp_flags) + str("000000") + str(avp_vendorid) + str(avp_content)
avp_length = int(len(avp)/2)

if avp_length % 4 == 0: #Multiple of 4 - No Padding needed
avp_padding = ''
else: #Not multiple of 4 - Padding needed
rounded_value = await(self.myRound(avp_length))
# await(self.logTool.debug(message="Rounded value is " + str(rounded_value), redisClient=self.redisMessaging))
# await(self.logTool.debug(message="Has " + str( int( rounded_value - avp_length)) + " bytes of padding", redisClient=self.redisMessaging))
avp_padding = format(0,"x").zfill(int( rounded_value - avp_length) * 2)



avp = str(avp_code) + str(avp_flags) + str(format(avp_length,"x").zfill(6)) + str(avp_vendorid) + str(avp_content) + str(avp_padding)
return avp

async def generate_diameter_packet(self, packet_version, packet_flags, packet_command_code, packet_application_id, packet_hop_by_hop_id, packet_end_to_end_id, avp):
try:
packet_length = 228
packet_length = format(packet_length,"x").zfill(6)

packet_command_code = format(packet_command_code,"x").zfill(6)

packet_application_id = format(packet_application_id,"x").zfill(8)

packet_hex = packet_version + packet_length + packet_flags + packet_command_code + packet_application_id + packet_hop_by_hop_id + packet_end_to_end_id + avp
packet_length = int(round(len(packet_hex))/2)
packet_length = format(packet_length,"x").zfill(6)

packet_hex = packet_version + packet_length + packet_flags + packet_command_code + packet_application_id + packet_hop_by_hop_id + packet_end_to_end_id + avp
return packet_hex
except Exception as e:
await(self.logTool.error(message=f"Exception: {e}", redisClient=self.redisMessaging))

async def Request_280(self, originHost: str, originRealm: str, endToEndIdentifier: str=None):
"""
Builds a Device Watchdog Request.
"""
try:
if not endToEndIdentifier:
endToEndIdentifier = await(self.generateId(4))
avp = ''
avp += await(self.generate_avp(264, 40, await(self.string_to_hex(originHost)))) #Origin Host
avp += await(self.generate_avp(296, 40, await(self.string_to_hex(originRealm)))) #Origin Realm
response = await(self.generate_diameter_packet("01", "80", 280, 0, (await(self.generateId(4))), endToEndIdentifier, avp)) #Generate Diameter packet
return response
except Exception as e:
await(self.logTool.error(message=f"Error: {traceback.format_exc()}", redisClient=self.redisMessaging))
return None

async def Answer_257(self):
pass

Expand Down
35 changes: 34 additions & 1 deletion services/diameterService.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ def __init__(self):
self.redisPeerMessaging = RedisMessagingAsync(host=self.redisHost, port=self.redisPort, useUnixSocket=self.redisUseUnixSocket, unixSocketPath=self.redisUnixSocketPath)
self.redisPeerLogMessaging = RedisMessagingAsync(host=self.redisHost, port=self.redisPort, useUnixSocket=self.redisUseUnixSocket, unixSocketPath=self.redisUnixSocketPath)
self.redisMetricMessaging = RedisMessagingAsync(host=self.redisHost, port=self.redisPort, useUnixSocket=self.redisUseUnixSocket, unixSocketPath=self.redisUnixSocketPath)
self.redisDwrMessaging = RedisMessagingAsync(host=self.redisHost, port=self.redisPort, useUnixSocket=self.redisUseUnixSocket, unixSocketPath=self.redisUnixSocketPath)
self.banners = Banners()
self.logTool = LogTool(config=self.config)
self.diameterLibrary = DiameterAsync(logTool=self.logTool)
self.activePeers = {}
self.enableOutboundDwr = self.config.get('hss', {}).get('send_dwr', False)
self.outboundDwrInterval = int(self.config.get('hss', {}).get('send_dwr_interval', 5))
self.originHost = self.config.get('hss', {}).get('OriginHost', 'hss01')
self.originRealm = self.config.get('hss', {}).get('OriginRealm', "epc.mnc001.mcc001.3gppnetwork.org")
self.diameterRequestTimeout = int(self.config.get('hss', {}).get('diameter_request_timeout', 10))
self.benchmarking = self.config.get('benchmarking', {}).get('enabled', False)
self.benchmarkingInterval = self.config.get('benchmarking', {}).get('reporting_interval', 3600)
Expand All @@ -64,6 +69,32 @@ async def validateDiameterInbound(self, clientAddress: str, clientPort: str, inb
await(self.logTool.logAsync(service='Diameter', level='warning', message=f"[Diameter] [validateDiameterInbound] AVPs: {avps}\nPacketVars: {packetVars}"))
return False

async def handleOutboundDwr(self) -> bool:
"""
Asynchronously sends an outbound DWR every outboundDwrInterval to each connected peer, if enabled.
"""
while True:
try:
outboundDwrEncoded = await(self.diameterLibrary.Request_280(originHost=self.originHost, originRealm=self.originRealm))
activePeersCached = self.activePeers
for activePeerKey, activePeerValue in activePeersCached.items():
connectionStatus = activePeerValue.get('connectionStatus', "disconnected")
peerIp = activePeerValue.get('ipAddress', None)
peerPort = activePeerValue.get('port', None)
if not peerIp or not peerPort or connectionStatus.lower() != "connected":
continue

outboundQueue = f"diameter-outbound-{peerIp}-{peerPort}"
outboundMessage = json.dumps({"diameter-outbound": outboundDwrEncoded, "inbound-received-timestamp": time.time()})
await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [handleOutboundDwr] Sending Outbound DWR to: {outboundQueue}"))
await(self.redisDwrMessaging.sendMessage(queue=outboundQueue, message=outboundMessage, queueExpiry=60, usePrefix=True, prefixHostname=self.hostname, prefixServiceName='diameter'))
await(asyncio.sleep(self.outboundDwrInterval))
continue
except Exception as e:
await(self.logTool.logAsync(service='Diameter', level='warning', message=f"[Diameter] [handleOutboundDwr] Exception: {e}\n{traceback.format_exc()}"))
await(asyncio.sleep(self.outboundDwrInterval))
continue

async def handleActiveDiameterPeers(self):
"""
Prunes stale and duplicate entries from self.activePeers, and
Expand Down Expand Up @@ -381,6 +412,8 @@ async def startServer(self, host: str=None, port: int=None, type: str=None):
servingAddresses = ', '.join(str(sock.getsockname()) for sock in server.sockets)
await(self.logTool.logAsync(service='Diameter', level='info', message=f"{self.banners.diameterService()}\n[Diameter] Serving on {servingAddresses}"))
handleActiveDiameterPeerTask = asyncio.create_task(self.handleActiveDiameterPeers())
if self.enableOutboundDwr:
handleOutboundDwrTask = asyncio.create_task(self.handleOutboundDwr())
if self.benchmarking:
logProcessedMessagesTask = asyncio.create_task(self.logProcessedMessages())

Expand All @@ -390,4 +423,4 @@ async def startServer(self, host: str=None, port: int=None, type: str=None):

if __name__ == '__main__':
diameterService = DiameterService()
asyncio.run(diameterService.startServer())
asyncio.run(diameterService.startServer())

0 comments on commit f8826dc

Please sign in to comment.