From 44268e2757735cd798d0ee6f103fac8b582cfa94 Mon Sep 17 00:00:00 2001 From: davidkneipp Date: Wed, 31 Jan 2024 16:31:56 +1000 Subject: [PATCH] Fix for duplicate diameter hosts not being removed --- lib/diameter.py | 296 +++++++++++++++++++----------------- services/diameterService.py | 31 +++- 2 files changed, 179 insertions(+), 148 deletions(-) diff --git a/lib/diameter.py b/lib/diameter.py index 766a16c..d3f459f 100755 --- a/lib/diameter.py +++ b/lib/diameter.py @@ -1917,159 +1917,168 @@ def Answer_16777238_272(self, packet_vars, avps): """ If Called-Station-ID contains 'sos', we're dealing with an emergency bearer request. - Authentication is bypassed and we'll return a basic QOS profile. + Local authentication is bypassed if the subscriber doesn't exist and we'll return a basic QOS profile. """ try: if apn.lower() == 'sos': - if int(CC_Request_Type) == 1: - """ - If we've recieved a CCR-Initial, create an emergency subscriber. - """ - # Use our defined SOS APN AMBR, if defined. - # Otherwise, use a default value of 128/128kbps. - try: - sosApn = (self.database.Get_APN_by_Name(apn="sos")) - AMBR = '' #Initiate empty var AVP for AMBR - apn_ambr_ul = int(sosApn['apn_ambr_ul']) - apn_ambr_dl = int(sosApn['apn_ambr_dl']) - AMBR += self.generate_vendor_avp(516, "c0", 10415, self.int_to_hex(apn_ambr_ul, 4)) #Max-Requested-Bandwidth-UL - AMBR += self.generate_vendor_avp(515, "c0", 10415, self.int_to_hex(apn_ambr_dl, 4)) #Max-Requested-Bandwidth-DL - APN_AMBR = self.generate_vendor_avp(1435, "c0", 10415, AMBR) - - AVP_Priority_Level = self.generate_vendor_avp(1046, "80", 10415, self.int_to_hex(int(sosApn['arp_priority']), 4)) - AVP_Preemption_Capability = self.generate_vendor_avp(1047, "80", 10415, self.int_to_hex(int(not sosApn['arp_preemption_capability']), 4)) - AVP_Preemption_Vulnerability = self.generate_vendor_avp(1048, "80", 10415, self.int_to_hex(int(not sosApn['arp_preemption_vulnerability']), 4)) - AVP_ARP = self.generate_vendor_avp(1034, "80", 10415, AVP_Priority_Level + AVP_Preemption_Capability + AVP_Preemption_Vulnerability) - AVP_QoS = self.generate_vendor_avp(1028, "c0", 10415, self.int_to_hex(int(sosApn['qci']), 4)) - avp += self.generate_vendor_avp(1049, "80", 10415, AVP_QoS + AVP_ARP) - - except Exception as e: - AMBR = '' #Initiate empty var AVP for AMBR - apn_ambr_ul = 128000 - apn_ambr_dl = 128000 - AMBR += self.generate_vendor_avp(516, "c0", 10415, self.int_to_hex(apn_ambr_ul, 4)) #Max-Requested-Bandwidth-UL - AMBR += self.generate_vendor_avp(515, "c0", 10415, self.int_to_hex(apn_ambr_dl, 4)) #Max-Requested-Bandwidth-DL - APN_AMBR = self.generate_vendor_avp(1435, "c0", 10415, AMBR) - - AVP_Priority_Level = self.generate_vendor_avp(1046, "80", 10415, self.int_to_hex(1, 4)) - AVP_Preemption_Capability = self.generate_vendor_avp(1047, "80", 10415, self.int_to_hex(0, 4)) # Pre-Emption Capability Enabled - AVP_Preemption_Vulnerability = self.generate_vendor_avp(1048, "80", 10415, self.int_to_hex(1, 4)) # Pre-Emption Vulnerability Disabled - AVP_ARP = self.generate_vendor_avp(1034, "80", 10415, AVP_Priority_Level + AVP_Preemption_Capability + AVP_Preemption_Vulnerability) - AVP_QoS = self.generate_vendor_avp(1028, "c0", 10415, self.int_to_hex(5, 4)) # QCI 5 - avp += self.generate_vendor_avp(1049, "80", 10415, AVP_QoS + AVP_ARP) - - QoS_Information = self.generate_vendor_avp(1041, "80", 10415, self.int_to_hex(apn_ambr_ul, 4)) - QoS_Information += self.generate_vendor_avp(1040, "80", 10415, self.int_to_hex(apn_ambr_dl, 4)) - avp += self.generate_vendor_avp(1016, "80", 10415, QoS_Information) # QOS-Information - - #Supported-Features(628) (Gx feature list) - avp += self.generate_vendor_avp(628, "80", 10415, "0000010a4000000c000028af0000027580000010000028af000000010000027680000010000028af0000000b") + localImsi = None + try: + for SubscriptionIdentifier in self.get_avp_data(avps, 443): + for UniqueSubscriptionIdentifier in SubscriptionIdentifier: + if UniqueSubscriptionIdentifier['avp_code'] == 444: + localImsi = binascii.unhexlify(UniqueSubscriptionIdentifier['misc_data']).decode('utf-8') + except: + pass + if not localImsi: + if int(CC_Request_Type) == 1: + """ + If we've recieved a CCR-Initial, create an emergency subscriber. + """ + # Use our defined SOS APN AMBR, if defined. + # Otherwise, use a default value of 128/128kbps. + try: + sosApn = (self.database.Get_APN_by_Name(apn="sos")) + AMBR = '' #Initiate empty var AVP for AMBR + apn_ambr_ul = int(sosApn['apn_ambr_ul']) + apn_ambr_dl = int(sosApn['apn_ambr_dl']) + AMBR += self.generate_vendor_avp(516, "c0", 10415, self.int_to_hex(apn_ambr_ul, 4)) #Max-Requested-Bandwidth-UL + AMBR += self.generate_vendor_avp(515, "c0", 10415, self.int_to_hex(apn_ambr_dl, 4)) #Max-Requested-Bandwidth-DL + APN_AMBR = self.generate_vendor_avp(1435, "c0", 10415, AMBR) + + AVP_Priority_Level = self.generate_vendor_avp(1046, "80", 10415, self.int_to_hex(int(sosApn['arp_priority']), 4)) + AVP_Preemption_Capability = self.generate_vendor_avp(1047, "80", 10415, self.int_to_hex(int(not sosApn['arp_preemption_capability']), 4)) + AVP_Preemption_Vulnerability = self.generate_vendor_avp(1048, "80", 10415, self.int_to_hex(int(not sosApn['arp_preemption_vulnerability']), 4)) + AVP_ARP = self.generate_vendor_avp(1034, "80", 10415, AVP_Priority_Level + AVP_Preemption_Capability + AVP_Preemption_Vulnerability) + AVP_QoS = self.generate_vendor_avp(1028, "c0", 10415, self.int_to_hex(int(sosApn['qci']), 4)) + avp += self.generate_vendor_avp(1049, "80", 10415, AVP_QoS + AVP_ARP) - """ - Store the Emergency Subscriber - """ - ueIp = self.get_avp_data(avps, 8)[0] - ueIp = str(self.hex_to_ip(ueIp)) - try: - #Get the IMSI - for SubscriptionIdentifier in self.get_avp_data(avps, 443): - for UniqueSubscriptionIdentifier in SubscriptionIdentifier: - if UniqueSubscriptionIdentifier['avp_code'] == 444: - imsi = binascii.unhexlify(UniqueSubscriptionIdentifier['misc_data']).decode('utf-8') - except Exception as e: - imsi="Unknown" + except Exception as e: + AMBR = '' #Initiate empty var AVP for AMBR + apn_ambr_ul = 128000 + apn_ambr_dl = 128000 + AMBR += self.generate_vendor_avp(516, "c0", 10415, self.int_to_hex(apn_ambr_ul, 4)) #Max-Requested-Bandwidth-UL + AMBR += self.generate_vendor_avp(515, "c0", 10415, self.int_to_hex(apn_ambr_dl, 4)) #Max-Requested-Bandwidth-DL + APN_AMBR = self.generate_vendor_avp(1435, "c0", 10415, AMBR) + + AVP_Priority_Level = self.generate_vendor_avp(1046, "80", 10415, self.int_to_hex(1, 4)) + AVP_Preemption_Capability = self.generate_vendor_avp(1047, "80", 10415, self.int_to_hex(0, 4)) # Pre-Emption Capability Enabled + AVP_Preemption_Vulnerability = self.generate_vendor_avp(1048, "80", 10415, self.int_to_hex(1, 4)) # Pre-Emption Vulnerability Disabled + AVP_ARP = self.generate_vendor_avp(1034, "80", 10415, AVP_Priority_Level + AVP_Preemption_Capability + AVP_Preemption_Vulnerability) + AVP_QoS = self.generate_vendor_avp(1028, "c0", 10415, self.int_to_hex(5, 4)) # QCI 5 + avp += self.generate_vendor_avp(1049, "80", 10415, AVP_QoS + AVP_ARP) - try: - ratType = self.get_avp_data(avps, 1032)[0] - ratType = int(ratType, 16) - except Exception as e: - ratType = None - - try: - accessNetworkGatewayAddress = self.get_avp_data(avps, 1050)[0] - accessNetworkGatewayAddress = str(self.hex_to_ip(accessNetworkGatewayAddress[4:])) - except Exception as e: - accessNetworkGatewayAddress = None - - try: - accessNetworkChargingAddress = self.get_avp_data(avps, 501)[0] - accessNetworkChargingAddress = str(self.hex_to_ip(accessNetworkChargingAddress[4:])) - except Exception as e: - accessNetworkChargingAddress = None - - emergencySubscriberData = { - "servingPgw": binascii.unhexlify(session_id).decode(), - "requestTime": int(time.time()), - "servingPcscf": None, - "aarRequestTime": None, - "gxOriginRealm": OriginRealm, - "gxOriginHost": OriginHost, - "imsi": imsi, - "ip": ueIp, - "ratType": ratType, - "accessNetworkGatewayAddress": accessNetworkGatewayAddress, - "accessNetworkChargingAddress": accessNetworkChargingAddress, - } + QoS_Information = self.generate_vendor_avp(1041, "80", 10415, self.int_to_hex(apn_ambr_ul, 4)) + QoS_Information += self.generate_vendor_avp(1040, "80", 10415, self.int_to_hex(apn_ambr_dl, 4)) + avp += self.generate_vendor_avp(1016, "80", 10415, QoS_Information) # QOS-Information - self.database.Update_Emergency_Subscriber(subscriberIp=ueIp, subscriberData=emergencySubscriberData, imsi=imsi, gxSessionId=emergencySubscriberData.get('servingPgw')) + #Supported-Features(628) (Gx feature list) + avp += self.generate_vendor_avp(628, "80", 10415, "0000010a4000000c000028af0000027580000010000028af000000010000027680000010000028af0000000b") - avp += self.generate_avp(268, 40, self.int_to_hex(2001, 4)) #Result Code (DIAMETER_SUCCESS (2001)) - response = self.generate_diameter_packet("01", "40", 272, 16777238, packet_vars['hop-by-hop-identifier'], packet_vars['end-to-end-identifier'], avp) #Generate Diameter packet - return response - - elif int(CC_Request_Type) == 3: - """ - If we've recieved a CCR-Terminate, delete the emergency subscriber. - """ - try: + """ + Store the Emergency Subscriber + """ ueIp = self.get_avp_data(avps, 8)[0] ueIp = str(self.hex_to_ip(ueIp)) - except Exception as e: - ueIp = None - try: - #Get the IMSI - for SubscriptionIdentifier in self.get_avp_data(avps, 443): - for UniqueSubscriptionIdentifier in SubscriptionIdentifier: - if UniqueSubscriptionIdentifier['avp_code'] == 444: - imsi = binascii.unhexlify(UniqueSubscriptionIdentifier['misc_data']).decode('utf-8') - except Exception as e: - imsi="Unknown" + try: + #Get the IMSI + for SubscriptionIdentifier in self.get_avp_data(avps, 443): + for UniqueSubscriptionIdentifier in SubscriptionIdentifier: + if UniqueSubscriptionIdentifier['avp_code'] == 444: + imsi = binascii.unhexlify(UniqueSubscriptionIdentifier['misc_data']).decode('utf-8') + except Exception as e: + imsi="Unknown" + + try: + ratType = self.get_avp_data(avps, 1032)[0] + ratType = int(ratType, 16) + except Exception as e: + ratType = None - try: - ratType = self.get_avp_data(avps, 1032)[0] - ratType = int(ratType, 16) - except Exception as e: - ratType = None + try: + accessNetworkGatewayAddress = self.get_avp_data(avps, 1050)[0] + accessNetworkGatewayAddress = str(self.hex_to_ip(accessNetworkGatewayAddress[4:])) + except Exception as e: + accessNetworkGatewayAddress = None - try: - accessNetworkGatewayAddress = self.get_avp_data(avps, 1050)[0] - accessNetworkGatewayAddress = str(self.hex_to_ip(accessNetworkGatewayAddress)) - except Exception as e: - accessNetworkGatewayAddress = None + try: + accessNetworkChargingAddress = self.get_avp_data(avps, 501)[0] + accessNetworkChargingAddress = str(self.hex_to_ip(accessNetworkChargingAddress[4:])) + except Exception as e: + accessNetworkChargingAddress = None + + emergencySubscriberData = { + "servingPgw": binascii.unhexlify(session_id).decode(), + "requestTime": int(time.time()), + "servingPcscf": None, + "aarRequestTime": None, + "gxOriginRealm": OriginRealm, + "gxOriginHost": OriginHost, + "imsi": imsi, + "ip": ueIp, + "ratType": ratType, + "accessNetworkGatewayAddress": accessNetworkGatewayAddress, + "accessNetworkChargingAddress": accessNetworkChargingAddress, + } - try: - accessNetworkChargingAddress = self.get_avp_data(avps, 501)[0] - accessNetworkChargingAddress = str(self.hex_to_ip(accessNetworkChargingAddress)) - except Exception as e: - accessNetworkChargingAddress = None + self.database.Update_Emergency_Subscriber(subscriberIp=ueIp, subscriberData=emergencySubscriberData, imsi=imsi, gxSessionId=emergencySubscriberData.get('servingPgw')) + + avp += self.generate_avp(268, 40, self.int_to_hex(2001, 4)) #Result Code (DIAMETER_SUCCESS (2001)) + response = self.generate_diameter_packet("01", "40", 272, 16777238, packet_vars['hop-by-hop-identifier'], packet_vars['end-to-end-identifier'], avp) #Generate Diameter packet + return response - emergencySubscriberData = { - "servingPgw": binascii.unhexlify(session_id).decode(), - "requestTime": int(time.time()), - "gxOriginRealm": OriginRealm, - "gxOriginHost": OriginHost, - "imsi": imsi, - "ip": ueIp, - "ratType": ratType, - "accessNetworkGatewayAddress": accessNetworkGatewayAddress, - "accessNetworkChargingAddress": accessNetworkChargingAddress, - } + elif int(CC_Request_Type) == 3: + """ + If we've recieved a CCR-Terminate, delete the emergency subscriber. + """ + try: + ueIp = self.get_avp_data(avps, 8)[0] + ueIp = str(self.hex_to_ip(ueIp)) + except Exception as e: + ueIp = None + try: + #Get the IMSI + for SubscriptionIdentifier in self.get_avp_data(avps, 443): + for UniqueSubscriptionIdentifier in SubscriptionIdentifier: + if UniqueSubscriptionIdentifier['avp_code'] == 444: + imsi = binascii.unhexlify(UniqueSubscriptionIdentifier['misc_data']).decode('utf-8') + except Exception as e: + imsi="Unknown" - self.database.Delete_Emergency_Subscriber(subscriberIp=ueIp, subscriberData=emergencySubscriberData, imsi=imsi, gxSessionId=binascii.unhexlify(session_id).decode()) + try: + ratType = self.get_avp_data(avps, 1032)[0] + ratType = int(ratType, 16) + except Exception as e: + ratType = None + + try: + accessNetworkGatewayAddress = self.get_avp_data(avps, 1050)[0] + accessNetworkGatewayAddress = str(self.hex_to_ip(accessNetworkGatewayAddress)) + except Exception as e: + accessNetworkGatewayAddress = None + + try: + accessNetworkChargingAddress = self.get_avp_data(avps, 501)[0] + accessNetworkChargingAddress = str(self.hex_to_ip(accessNetworkChargingAddress)) + except Exception as e: + accessNetworkChargingAddress = None + + emergencySubscriberData = { + "servingPgw": binascii.unhexlify(session_id).decode(), + "requestTime": int(time.time()), + "gxOriginRealm": OriginRealm, + "gxOriginHost": OriginHost, + "imsi": imsi, + "ip": ueIp, + "ratType": ratType, + "accessNetworkGatewayAddress": accessNetworkGatewayAddress, + "accessNetworkChargingAddress": accessNetworkChargingAddress, + } - avp += self.generate_avp(268, 40, self.int_to_hex(2001, 4)) #Result Code (DIAMETER_SUCCESS (2001)) - response = self.generate_diameter_packet("01", "40", 272, 16777238, packet_vars['hop-by-hop-identifier'], packet_vars['end-to-end-identifier'], avp) #Generate Diameter packet - return response + self.database.Delete_Emergency_Subscriber(subscriberIp=ueIp, subscriberData=emergencySubscriberData, imsi=imsi, gxSessionId=binascii.unhexlify(session_id).decode()) + + avp += self.generate_avp(268, 40, self.int_to_hex(2001, 4)) #Result Code (DIAMETER_SUCCESS (2001)) + response = self.generate_diameter_packet("01", "40", 272, 16777238, packet_vars['hop-by-hop-identifier'], packet_vars['end-to-end-identifier'], avp) #Generate Diameter packet + return response except Exception as e: self.logTool.log(service='HSS', level='error', message=f"[diameter.py] [Answer_16777238_272] [CCA] Error generating SOS CCA: {traceback.format_exc()}", redisClient=self.redisMessaging) @@ -2928,15 +2937,18 @@ def Answer_16777236_265(self, packet_vars, avps): servingPgw = emergencySubscriberData.get('serving_pgw', None).split(';')[0] else: subscriberId = subscriberDetails.get('subscriber_id', None) - apnId = (self.database.Get_APN_by_Name(apn="ims")).get('apn_id', None) + if serviceUrn: + if 'sos' in str(serviceUrn).lower(): + registeredEmergencySubscriber = True + apnId = (self.database.Get_APN_by_Name(apn="sos")).get('apn_id', None) + else: + apnId = (self.database.Get_APN_by_Name(apn="ims")).get('apn_id', None) servingApn = self.database.Get_Serving_APN(subscriber_id=subscriberId, apn_id=apnId) servingPgwPeer = servingApn.get('serving_pgw_peer', None).split(';')[0] servingPgw = servingApn.get('serving_pgw', None) servingPgwRealm = servingApn.get('serving_pgw_realm', None) pcrfSessionId = servingApn.get('pcrf_session_id', None) - if serviceUrn: - if 'sos' in str(serviceUrn).lower(): - registeredEmergencySubscriber = True + if not ueIp: ueIp = servingApn.get('subscriber_routing', None) diff --git a/services/diameterService.py b/services/diameterService.py index bf240b1..f886698 100755 --- a/services/diameterService.py +++ b/services/diameterService.py @@ -66,7 +66,7 @@ async def validateDiameterInbound(self, clientAddress: str, clientPort: str, inb async def handleActiveDiameterPeers(self): """ - Prunes stale entries from self.activePeers, and + Prunes stale and duplicate entries from self.activePeers, and keeps the ActiveDiameterPeers key in Redis current. """ while True: @@ -77,24 +77,43 @@ async def handleActiveDiameterPeers(self): activeDiameterPeersTimeout = self.config.get('hss', {}).get('active_diameter_peers_timeout', 3600) + activePeers = self.activePeers stalePeers = [] - - for key, connection in self.activePeers.items(): + diameterHosts = {} + + for key, connection in activePeers.items(): + peerHostname = connection.get('diameterHostname', None) + if peerHostname: + if peerHostname in diameterHosts: + diameterHosts[peerHostname].append(key) + else: + diameterHosts[peerHostname] = [key] + + for host in diameterHosts.values(): + if len(host) > 1: + host.sort(key=lambda x: datetime.strptime(activePeers[x]['connectTimestamp'], "%Y-%m-%d %H:%M:%S"), reverse=True) + await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [handleActiveDiameterPeers] Adding duplicate peers to stalePeers: {host[1:]}")) + stalePeers.extend(host[1:]) + + for key, connection in activePeers.items(): if connection.get('connectionStatus', '') == 'disconnected': if (datetime.now() - datetime.strptime(connection['disconnectTimestamp'], "%Y-%m-%d %H:%M:%S")).seconds > activeDiameterPeersTimeout: stalePeers.append(key) if len(stalePeers) > 0: await(self.logTool.logAsync(service='Diameter', level='debug', message=f"[Diameter] [handleActiveDiameterPeers] Pruning disconnected peers: {stalePeers}")) - for key in stalePeers: - del self.activePeers[key] + try: + for key in stalePeers: + del self.activePeers[key] + except Exception as e: + await(self.logTool.logAsync(service='Diameter', level='warning', message=f"[Diameter] [handleActiveDiameterPeers] Error removing stale peer: {traceback.format_exc()}")) await(self.logActivePeers()) await(self.redisPeerMessaging.setValue(key='ActiveDiameterPeers', value=json.dumps(self.activePeers), keyExpiry=86400, usePrefix=True, prefixHostname=self.hostname, prefixServiceName='diameter')) await(asyncio.sleep(1)) except Exception as e: - await(self.logTool.logAsync(service='Diameter', level='warning', message=f"[Diameter] [handleActiveDiameterPeers] Exception: {e}\n{traceback.format_exc()}")) + await(self.logTool.logAsync(service='Diameter', level='warning', message=f"[Diameter] [handleActiveDiameterPeers] Exception: {traceback.format_exc()}")) await(asyncio.sleep(1)) continue