From f3b2dc7720e050782420836b9472cd4ebe07bfcf Mon Sep 17 00:00:00 2001 From: Martin Kolman Date: Tue, 5 Nov 2024 17:08:48 +0100 Subject: [PATCH] Subscription code updates - adjust unit tests Adjust to Satellite support being added & entitlement support being dropped. There were also some changes in how the registration tasks are structured internally & unit tests need to be adjusted for that as well. Resolves: RHEL-49661 Related: INSTALLER-3903 Related: INSTALLER-3903 --- .../modules/subscription/test_subscription.py | 393 +++-- .../subscription/test_subscription_tasks.py | 1296 +++++++++++++++-- .../test_subscription_helpers.py | 331 +---- 3 files changed, 1362 insertions(+), 658 deletions(-) diff --git a/tests/unit_tests/pyanaconda_tests/modules/subscription/test_subscription.py b/tests/unit_tests/pyanaconda_tests/modules/subscription/test_subscription.py index e7a8e6107b8..f7f1c4d62c9 100644 --- a/tests/unit_tests/pyanaconda_tests/modules/subscription/test_subscription.py +++ b/tests/unit_tests/pyanaconda_tests/modules/subscription/test_subscription.py @@ -29,16 +29,15 @@ from pyanaconda.modules.common.constants.services import SUBSCRIPTION from pyanaconda.modules.common.constants.objects import RHSM_CONFIG from pyanaconda.modules.common.structures.subscription import SystemPurposeData, \ - SubscriptionRequest, AttachedSubscription + SubscriptionRequest from pyanaconda.modules.subscription.subscription import SubscriptionService from pyanaconda.modules.subscription.subscription_interface import SubscriptionInterface from pyanaconda.modules.subscription.installation import ConnectToInsightsTask, \ - RestoreRHSMDefaultsTask, TransferSubscriptionTokensTask + RestoreRHSMDefaultsTask, TransferSubscriptionTokensTask, ProvisionTargetSystemForSatelliteTask from pyanaconda.modules.subscription.runtime import SetRHSMConfigurationTask, \ - RegisterWithUsernamePasswordTask, RegisterWithOrganizationKeyTask, \ - UnregisterTask, AttachSubscriptionTask, SystemPurposeConfigurationTask, \ - ParseAttachedSubscriptionsTask, SystemSubscriptionData + RegisterAndSubscribeTask, UnregisterTask, SystemPurposeConfigurationTask, \ + RetrieveOrganizationsTask from tests.unit_tests.pyanaconda_tests import check_kickstart_interface, check_dbus_property, \ PropertiesChangedCallback, patch_dbus_publish_object, check_task_creation_list, \ @@ -198,6 +197,7 @@ def test_subscription_request_data_defaults(self): "type": DEFAULT_SUBSCRIPTION_REQUEST_TYPE, "organization": "", "account-username": "", + "account-organization": "", "server-hostname": "", "rhsm-baseurl": "", "server-proxy-hostname": "", @@ -220,6 +220,7 @@ def test_subscription_request_data_full(self): full_request.type = SUBSCRIPTION_REQUEST_TYPE_ORG_KEY full_request.organization = "123456789" full_request.account_username = "foo_user" + full_request.account_organization = "foo_account_org" full_request.server_hostname = "candlepin.foo.com" full_request.rhsm_baseurl = "cdn.foo.com" full_request.server_proxy_hostname = "proxy.foo.com" @@ -233,6 +234,7 @@ def test_subscription_request_data_full(self): "type": SUBSCRIPTION_REQUEST_TYPE_ORG_KEY, "organization": "123456789", "account-username": "foo_user", + "account-organization": "foo_account_org", "server-hostname": "candlepin.foo.com", "rhsm-baseurl": "cdn.foo.com", "server-proxy-hostname": "proxy.foo.com", @@ -259,6 +261,7 @@ def test_subscription_request_data_full(self): "type": SUBSCRIPTION_REQUEST_TYPE_ORG_KEY, "organization": "123456789", "account-username": "foo_user", + "account-organization": "foo_account_org", "server-hostname": "candlepin.foo.com", "rhsm-baseurl": "cdn.foo.com", "server-proxy-hostname": "proxy.foo.com", @@ -286,6 +289,7 @@ def test_set_subscription_request_password(self): "type": get_variant(Str, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD), "organization": get_variant(Str, ""), "account-username": get_variant(Str, "foo_user"), + "account-organization": get_variant(Str, ""), "server-hostname": get_variant(Str, ""), "rhsm-baseurl": get_variant(Str, ""), "server-proxy-hostname": get_variant(Str, ""), @@ -326,6 +330,7 @@ def test_set_subscription_request_activation_key(self): "type": get_variant(Str, SUBSCRIPTION_REQUEST_TYPE_ORG_KEY), "organization": get_variant(Str, "123456789"), "account-username": get_variant(Str, ""), + "account-organization": get_variant(Str, ""), "server-hostname": get_variant(Str, ""), "rhsm-baseurl": get_variant(Str, ""), "server-proxy-hostname": get_variant(Str, ""), @@ -367,6 +372,7 @@ def test_set_subscription_request_proxy(self): "type": get_variant(Str, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD), "organization": get_variant(Str, ""), "account-username": get_variant(Str, ""), + "account-organization": get_variant(Str, ""), "server-hostname": get_variant(Str, ""), "rhsm-baseurl": get_variant(Str, ""), "server-proxy-hostname": get_variant(Str, "proxy.foo.bar"), @@ -402,6 +408,7 @@ def test_set_subscription_request_custom_urls(self): "type": get_variant(Str, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD), "organization": get_variant(Str, ""), "account-username": get_variant(Str, ""), + "account-organization": get_variant(Str, ""), "server-hostname": get_variant(Str, "candlepin.foo.bar"), "rhsm-baseurl": get_variant(Str, "cdn.foo.bar"), "server-proxy-hostname": get_variant(Str, ""), @@ -450,6 +457,7 @@ def test_set_subscription_request_sensitive_data_wipe(self): "type": get_variant(Str, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD), "organization": get_variant(Str, ""), "account-username": get_variant(Str, "foo_user"), + "account-organization": get_variant(Str, ""), "server-hostname": get_variant(Str, ""), "rhsm-baseurl": get_variant(Str, ""), "server-proxy-hostname": get_variant(Str, ""), @@ -499,6 +507,7 @@ def test_set_subscription_request_sensitive_data_wipe(self): "type": get_variant(Str, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD), "organization": get_variant(Str, ""), "account-username": get_variant(Str, "foo_user"), + "account-organization": get_variant(Str, ""), "server-hostname": get_variant(Str, ""), "rhsm-baseurl": get_variant(Str, ""), "server-proxy-hostname": get_variant(Str, ""), @@ -548,6 +557,7 @@ def test_set_subscription_request_sensitive_data_keep(self): "type": get_variant(Str, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD), "organization": get_variant(Str, ""), "account-username": get_variant(Str, "foo_user"), + "account-organization": get_variant(Str, ""), "server-hostname": get_variant(Str, ""), "rhsm-baseurl": get_variant(Str, ""), "server-proxy-hostname": get_variant(Str, ""), @@ -588,6 +598,7 @@ def test_set_subscription_request_sensitive_data_keep(self): subscription_request = { "type": get_variant(Str, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD), "account-username": get_variant(Str, "foo_user"), + "account-organization": get_variant(Str, ""), "account-password": get_variant(Structure, {"type": get_variant(Str, "HIDDEN"), @@ -605,6 +616,7 @@ def test_set_subscription_request_sensitive_data_keep(self): "type": get_variant(Str, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD), "organization": get_variant(Str, ""), "account-username": get_variant(Str, "foo_user"), + "account-organization": get_variant(Str, ""), "server-hostname": get_variant(Str, ""), "rhsm-baseurl": get_variant(Str, ""), "server-proxy-hostname": get_variant(Str, ""), @@ -640,53 +652,6 @@ def test_set_subscription_request_sensitive_data_keep(self): assert internal_request.server_proxy_password.value == \ "foo_proxy_password" - def test_attached_subscription_defaults(self): - """Test the AttachedSubscription DBus structure defaults.""" - - # create empty AttachedSubscription structure - empty_request = AttachedSubscription() - - # compare with expected default values - expected_default_dict = { - "name": get_variant(Str, ""), - "service-level": get_variant(Str, ""), - "sku": get_variant(Str, ""), - "contract": get_variant(Str, ""), - "start-date": get_variant(Str, ""), - "end-date": get_variant(Str, ""), - "consumed-entitlement-count": get_variant(Int, 1) - } - # compare the empty structure with expected default values - assert AttachedSubscription.to_structure(empty_request) == \ - expected_default_dict - - def test_attached_subscription_full(self): - """Test the AttachedSubscription DBus structure that is fully populated.""" - - # create empty AttachedSubscription structure - full_request = AttachedSubscription() - full_request.name = "Foo Bar Beta" - full_request.service_level = "really good" - full_request.sku = "ABCD1234" - full_request.contract = "87654321" - full_request.start_date = "Jan 01, 1970" - full_request.end_date = "Jan 19, 2038" - full_request.consumed_entitlement_count = 9001 - - # compare with expected values - expected_default_dict = { - "name": get_variant(Str, "Foo Bar Beta"), - "service-level": get_variant(Str, "really good"), - "sku": get_variant(Str, "ABCD1234"), - "contract": get_variant(Str, "87654321"), - "start-date": get_variant(Str, "Jan 01, 1970"), - "end-date": get_variant(Str, "Jan 19, 2038"), - "consumed-entitlement-count": get_variant(Int, 9001) - } - # compare the full structure with expected values - assert AttachedSubscription.to_structure(full_request) == \ - expected_default_dict - def test_insights_property(self): """Test the InsightsEnabled property.""" # should be False by default @@ -725,72 +690,51 @@ def custom_setter(value): # at the end the property should be True assert self.subscription_interface.IsRegistered - def test_subscription_attached_property(self): - """Test the IsSubscriptionAttached property.""" + def test_simple_content_access_property(self): + """Test the IsSimpleContentAccessEnabled property.""" # should be false by default - assert not self.subscription_interface.IsSubscriptionAttached + assert not self.subscription_interface.IsSimpleContentAccessEnabled # this property can't be set by client as it is set as the result of # subscription attempts, so we need to call the internal module interface # via a custom setter def custom_setter(value): - self.subscription_module.set_subscription_attached(value) + self.subscription_module.set_simple_content_access_enabled(value) # check the property is True and the signal was emitted # - we use fake setter as there is no public setter self._check_dbus_property( - "IsSubscriptionAttached", + "IsSimpleContentAccessEnabled", True, setter=custom_setter ) # at the end the property should be True - assert self.subscription_interface.IsSubscriptionAttached + assert self.subscription_interface.IsSimpleContentAccessEnabled + + def test_subscription_attached_property(self): + """Test the IsSubscriptionAttached property.""" + # should be false by default + assert not self.subscription_interface.IsSubscriptionAttached - def test_attached_subscriptions_property(self): - """Test the AttachedSubscriptions property.""" - # should return an empty list by default - assert self.subscription_interface.AttachedSubscriptions == [] # this property can't be set by client as it is set as the result of # subscription attempts, so we need to call the internal module interface # via a custom setter - def custom_setter(struct_list): - instance_list = AttachedSubscription.from_structure_list(struct_list) - self.subscription_module.set_attached_subscriptions(instance_list) - - # prepare some testing data - subscription_structs = [ - { - "name": get_variant(Str, "Foo Bar Beta"), - "service-level": get_variant(Str, "very good"), - "sku": get_variant(Str, "ABC1234"), - "contract": get_variant(Str, "12345678"), - "start-date": get_variant(Str, "May 12, 2020"), - "end-date": get_variant(Str, "May 12, 2021"), - "consumed-entitlement-count": get_variant(Int, 1) - }, - { - "name": get_variant(Str, "Foo Bar Beta NG"), - "service-level": get_variant(Str, "even better"), - "sku": get_variant(Str, "ABC4321"), - "contract": get_variant(Str, "87654321"), - "start-date": get_variant(Str, "now"), - "end-date": get_variant(Str, "never"), - "consumed-entitlement-count": get_variant(Int, 1000) - } - ] + def custom_setter(value): + self.subscription_module.set_subscription_attached(value) + # check the property is True and the signal was emitted # - we use fake setter as there is no public setter self._check_dbus_property( - "AttachedSubscriptions", - subscription_structs, + "IsSubscriptionAttached", + True, setter=custom_setter ) - # at the end the property should return the expected list - # of AttachedSubscription structures - assert self.subscription_interface.AttachedSubscriptions == subscription_structs + + # at the end the property should be True + assert self.subscription_interface.IsSubscriptionAttached @patch_dbus_publish_object def test_set_system_purpose_with_task(self, publisher): @@ -979,6 +923,7 @@ def test_set_rhsm_config_with_task(self, publisher): full_request.type = SUBSCRIPTION_REQUEST_TYPE_ORG_KEY full_request.organization = "123456789" full_request.account_username = "foo_user" + full_request.account_organization = "foo_account_organization" full_request.server_hostname = "candlepin.foo.com" full_request.rhsm_baseurl = "cdn.foo.com" full_request.server_proxy_hostname = "proxy.foo.com" @@ -1010,6 +955,7 @@ def test_set_rhsm_config_with_task(self, publisher): "type": SUBSCRIPTION_REQUEST_TYPE_ORG_KEY, "organization": "123456789", "account-username": "foo_user", + "account-organization": "foo_account_organization", "server-hostname": "candlepin.foo.com", "rhsm-baseurl": "cdn.foo.com", "server-proxy-hostname": "proxy.foo.com", @@ -1023,43 +969,41 @@ def test_set_rhsm_config_with_task(self, publisher): assert obj.implementation._rhsm_config_defaults == flat_default_config @patch_dbus_publish_object - def test_register_with_username_password(self, publisher): - """Test RegisterWithUsernamePasswordTask creation.""" - # prepare the module with dummy data - full_request = SubscriptionRequest() - full_request.type = SUBSCRIPTION_REQUEST_TYPE_ORG_KEY - full_request.organization = "123456789" - full_request.account_username = "foo_user" - full_request.server_hostname = "candlepin.foo.com" - full_request.rhsm_baseurl = "cdn.foo.com" - full_request.server_proxy_hostname = "proxy.foo.com" - full_request.server_proxy_port = 9001 - full_request.server_proxy_user = "foo_proxy_user" - full_request.account_password.set_secret("foo_password") - full_request.activation_keys.set_secret(["key1", "key2", "key3"]) - full_request.server_proxy_password.set_secret("foo_proxy_password") - - self.subscription_interface.SubscriptionRequest = \ - SubscriptionRequest.to_structure(full_request) - - # make sure the task gets dummy rhsm register server proxy - observer = Mock() - observer.get_proxy = Mock() - self.subscription_module._rhsm_observer = observer - register_server_proxy = Mock() - observer.get_proxy.return_value = register_server_proxy - + def test_register_and_subscribe(self, publisher): + """Test RegisterAndSubscribeTask creation - org + key.""" + # prepare dummy objects for the task + rhsm_observer = Mock() + self.subscription_module._rhsm_observer = rhsm_observer + subscription_request = Mock() + self.subscription_module._subscription_request = subscription_request + system_purpose_data = Mock() + self.subscription_module._system_purpose_data = system_purpose_data # check the task is created correctly - task_path = self.subscription_interface.RegisterUsernamePasswordWithTask() - obj = check_task_creation(task_path, publisher, RegisterWithUsernamePasswordTask) - # check all the data got propagated to the module correctly - assert obj.implementation._rhsm_register_server_proxy == register_server_proxy - assert obj.implementation._username == "foo_user" - assert obj.implementation._password == "foo_password" - # trigger the succeeded signal - obj.implementation.succeeded_signal.emit() - # check this set the registered property to True - assert self.subscription_interface.IsRegistered + task_path = self.subscription_interface.RegisterAndSubscribeWithTask() + obj = check_task_creation(task_path, publisher, RegisterAndSubscribeTask) + # check all the data got propagated to the task correctly + assert obj.implementation._rhsm_observer == rhsm_observer + assert obj.implementation._subscription_request == subscription_request + assert obj.implementation._system_purpose_data == system_purpose_data + # pylint: disable=comparison-with-callable + assert obj.implementation._registered_callback == self.subscription_module.set_registered + # pylint: disable=comparison-with-callable + assert obj.implementation._registered_to_satellite_callback == \ + self.subscription_module.set_registered_to_satellite + assert obj.implementation._simple_content_access_callback == \ + self.subscription_module.set_simple_content_access_enabled + # pylint: disable=comparison-with-callable + assert obj.implementation._subscription_attached_callback == \ + self.subscription_module.set_subscription_attached + # pylint: disable=comparison-with-callable + assert obj.implementation._subscription_data_callback == \ + self.subscription_module._set_system_subscription_data + # pylint: disable=comparison-with-callable + assert obj.implementation._satellite_script_downloaded_callback == \ + self.subscription_module._set_satellite_provisioning_script + # pylint: disable=comparison-with-callable + assert obj.implementation._config_backup_callback == \ + self.subscription_module._set_pre_satellite_rhsm_conf_snapshot @patch_dbus_publish_object def test_register_with_organization_key(self, publisher): @@ -1089,122 +1033,68 @@ def test_register_with_organization_key(self, publisher): observer.get_proxy.return_value = register_server_proxy # check the task is created correctly - task_path = self.subscription_interface.RegisterOrganizationKeyWithTask() - obj = check_task_creation(task_path, publisher, RegisterWithOrganizationKeyTask) + task_path = self.subscription_interface.RegisterAndSubscribeWithTask() + obj = check_task_creation(task_path, publisher, RegisterAndSubscribeTask) # check all the data got propagated to the module correctly - assert obj.implementation._rhsm_register_server_proxy == register_server_proxy - assert obj.implementation._organization == "123456789" - assert obj.implementation._activation_keys == ["key1", "key2", "key3"] - # trigger the succeeded signal - obj.implementation.succeeded_signal.emit() - # check this set the registered property to True - assert self.subscription_interface.IsRegistered + assert obj.implementation._rhsm_observer == observer + assert SubscriptionRequest.to_structure(obj.implementation._subscription_request) == \ + SubscriptionRequest.to_structure(full_request) @patch_dbus_publish_object def test_unregister(self, publisher): """Test UnregisterTask creation.""" # simulate system being subscribed self.subscription_module.set_subscription_attached(True) - # make sure the task gets dummy rhsm unregister proxy - observer = Mock() - self.subscription_module._rhsm_observer = observer - rhsm_unregister_proxy = observer.get_proxy.return_value + # make sure the task gets dummy rhsm observer + rhsm_observer = Mock() + self.subscription_module._rhsm_observer = rhsm_observer # check the task is created correctly task_path = self.subscription_interface.UnregisterWithTask() obj = check_task_creation(task_path, publisher, UnregisterTask) # check all the data got propagated to the module correctly - assert obj.implementation._rhsm_unregister_proxy == rhsm_unregister_proxy + assert obj.implementation._rhsm_observer == rhsm_observer + assert obj.implementation._registered_to_satellite is False + assert obj.implementation._rhsm_configuration == {} # trigger the succeeded signal obj.implementation.succeeded_signal.emit() - # check this set the subscription-attached & registered properties to False - assert not self.subscription_interface.IsRegistered - assert not self.subscription_interface.IsSubscriptionAttached + # check unregistration set the subscription-attached, registered + # and SCA properties to False + assert self.subscription_interface.IsRegistered is False + assert self.subscription_interface.IsRegisteredToSatellite is False + assert self.subscription_interface.IsSimpleContentAccessEnabled is False + assert self.subscription_interface.IsSubscriptionAttached is False @patch_dbus_publish_object - def test_attach_subscription(self, publisher): - """Test AttachSubscriptionTask creation.""" - # create the SystemPurposeData structure - system_purpose_data = SystemPurposeData() - system_purpose_data.role = "foo" - system_purpose_data.sla = "bar" - system_purpose_data.usage = "baz" - system_purpose_data.addons = ["a", "b", "c"] - # feed it to the DBus interface - self.subscription_interface.SystemPurposeData = \ - SystemPurposeData.to_structure(system_purpose_data) - - # make sure system is not subscribed - assert not self.subscription_interface.IsSubscriptionAttached - # make sure the task gets dummy rhsm attach proxy - observer = Mock() - self.subscription_module._rhsm_observer = observer - rhsm_attach_proxy = observer.get_proxy.return_value - # check the task is created correctly - task_path = self.subscription_interface.AttachSubscriptionWithTask() - obj = check_task_creation(task_path, publisher, AttachSubscriptionTask) - # check all the data got propagated to the module correctly - assert obj.implementation._rhsm_attach_proxy == rhsm_attach_proxy - assert obj.implementation._sla == "bar" - # trigger the succeeded signal - obj.implementation.succeeded_signal.emit() - # check this set subscription_attached to True - assert self.subscription_interface.IsSubscriptionAttached - - @patch_dbus_publish_object - def test_parse_attached_subscriptions(self, publisher): - """Test ParseAttachedSubscriptionsTask creation.""" - # make sure the task gets dummy rhsm entitlement and syspurpose proxies - observer = Mock() - self.subscription_module._rhsm_observer = observer - rhsm_entitlement_proxy = Mock() - rhsm_syspurpose_proxy = Mock() - # yes, this can be done - observer.get_proxy.side_effect = [rhsm_entitlement_proxy, rhsm_syspurpose_proxy] + def test_unregister_satellite(self, publisher): + """Test UnregisterTask creation - system registered to Satellite.""" + # simulate system being subscribed & registered to Satellite + self.subscription_module.set_subscription_attached(True) + self.subscription_module._set_satellite_provisioning_script("foo script") + self.subscription_module.set_registered_to_satellite(True) + # lets also set SCA as enabled + self.subscription_module.set_simple_content_access_enabled(True) + # simulate RHSM config backup + self.subscription_module._rhsm_conf_before_satellite_provisioning = {"foo.bar": "baz"} + # make sure the task gets dummy rhsm unregister proxy + rhsm_observer = Mock() + self.subscription_module._rhsm_observer = rhsm_observer # check the task is created correctly - task_path = self.subscription_interface.ParseAttachedSubscriptionsWithTask() - obj = check_task_creation(task_path, publisher, ParseAttachedSubscriptionsTask) + task_path = self.subscription_interface.UnregisterWithTask() + obj = check_task_creation(task_path, publisher, UnregisterTask) # check all the data got propagated to the module correctly - assert obj.implementation._rhsm_entitlement_proxy == rhsm_entitlement_proxy - assert obj.implementation._rhsm_syspurpose_proxy == rhsm_syspurpose_proxy - # prepare some testing data - subscription_structs = [ - { - "name": get_variant(Str, "Foo Bar Beta"), - "service-level": get_variant(Str, "very good"), - "sku": get_variant(Str, "ABC1234"), - "contract": get_variant(Str, "12345678"), - "start-date": get_variant(Str, "May 12, 2020"), - "end-date": get_variant(Str, "May 12, 2021"), - "consumed-entitlement-count": get_variant(Int, 1) - }, - { - "name": get_variant(Str, "Foo Bar Beta NG"), - "service-level": get_variant(Str, "even better"), - "sku": get_variant(Str, "ABC4321"), - "contract": get_variant(Str, "87654321"), - "start-date": get_variant(Str, "now"), - "end-date": get_variant(Str, "never"), - "consumed-entitlement-count": get_variant(Int, 1000) - } - ] - system_purpose_struct = { - "role": get_variant(Str, "foo"), - "sla": get_variant(Str, "bar"), - "usage": get_variant(Str, "baz"), - "addons": get_variant(List[Str], ["a", "b", "c"]) - } - # make sure this data is returned by get_result() - return_tuple = SystemSubscriptionData( - attached_subscriptions=AttachedSubscription.from_structure_list(subscription_structs), - system_purpose_data=SystemPurposeData.from_structure(system_purpose_struct) - ) - obj.implementation.get_result = Mock() - obj.implementation.get_result.return_value = return_tuple + assert obj.implementation._registered_to_satellite is True + assert obj.implementation._rhsm_configuration == {"foo.bar": "baz"} + assert obj.implementation._rhsm_observer == rhsm_observer # trigger the succeeded signal obj.implementation.succeeded_signal.emit() - # check this set attached subscription and system purpose as expected - assert self.subscription_interface.AttachedSubscriptions == subscription_structs - assert self.subscription_interface.SystemPurposeData == system_purpose_struct + # check unregistration set the subscription-attached, registered + # and SCA properties to False + assert self.subscription_interface.IsRegistered is False + assert self.subscription_interface.IsRegisteredToSatellite is False + assert self.subscription_interface.IsSimpleContentAccessEnabled is False + assert self.subscription_interface.IsSubscriptionAttached is False + # check the provisioning scrip has been cleared + assert self.subscription_module._satellite_provisioning_script is None @patch_dbus_publish_object def test_install_with_tasks_default(self, publisher): @@ -1219,6 +1109,7 @@ def test_install_with_tasks_default(self, publisher): task_classes = [ RestoreRHSMDefaultsTask, TransferSubscriptionTokensTask, + ProvisionTargetSystemForSatelliteTask, ConnectToInsightsTask ] task_paths = self.subscription_interface.InstallWithTasks() @@ -1232,8 +1123,12 @@ def test_install_with_tasks_default(self, publisher): obj = task_objs[1] assert obj.implementation._transfer_subscription_tokens is False - # ConnectToInsightsTask + # ProvisionTargetSystemForSatelliteTask obj = task_objs[2] + assert obj.implementation._provisioning_script is None + + # ConnectToInsightsTask + obj = task_objs[3] assert obj.implementation._subscription_attached is False assert obj.implementation._connect_to_insights is False @@ -1243,6 +1138,9 @@ def test_install_with_tasks_configured(self, publisher): self.subscription_interface.InsightsEnabled = True self.subscription_module.set_subscription_attached(True) + self.subscription_module.set_registered_to_satellite(True) + self.subscription_module.set_simple_content_access_enabled(True) + self.subscription_module._satellite_provisioning_script = "foo script" # mock the rhsm config proxy observer = Mock() @@ -1254,6 +1152,7 @@ def test_install_with_tasks_configured(self, publisher): task_classes = [ RestoreRHSMDefaultsTask, TransferSubscriptionTokensTask, + ProvisionTargetSystemForSatelliteTask, ConnectToInsightsTask ] task_paths = self.subscription_interface.InstallWithTasks() @@ -1267,8 +1166,12 @@ def test_install_with_tasks_configured(self, publisher): obj = task_objs[1] assert obj.implementation._transfer_subscription_tokens is True - # ConnectToInsightsTask + # ProvisionTargetSystemForSatelliteTask obj = task_objs[2] + assert obj.implementation._provisioning_script == "foo script" + + # ConnectToInsightsTask + obj = task_objs[3] assert obj.implementation._subscription_attached is True assert obj.implementation._connect_to_insights is True @@ -1480,3 +1383,39 @@ def test_ks_no_apply_syspurpose(self, mock_give_purpose): # the SystemPurposeConfigurationTask should have been called, # which calls give_the_system_purpose() mock_give_purpose.assert_not_called() + + @patch_dbus_publish_object + def test_parse_organization_data(self, publisher): + """Test ParseOrganizationDataTask creation.""" + # make sure the task gets dummy rhsm entitlement and syspurpose proxies + + # prepare the module with dummy data + full_request = SubscriptionRequest() + full_request.type = SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD + full_request.organization = "123456789" + full_request.account_username = "foo_user" + full_request.server_hostname = "candlepin.foo.com" + full_request.rhsm_baseurl = "cdn.foo.com" + full_request.server_proxy_hostname = "proxy.foo.com" + full_request.server_proxy_port = 9001 + full_request.server_proxy_user = "foo_proxy_user" + full_request.account_password.set_secret("foo_password") + full_request.activation_keys.set_secret(["key1", "key2", "key3"]) + full_request.server_proxy_password.set_secret("foo_proxy_password") + + self.subscription_interface.SubscriptionRequest = \ + SubscriptionRequest.to_structure(full_request) + # make sure the task gets dummy rhsm register server proxy + observer = Mock() + observer.get_proxy = Mock() + self.subscription_module._rhsm_observer = observer + register_server_proxy = Mock() + observer.get_proxy.return_value = register_server_proxy + + # check the task is created correctly + task_path = self.subscription_interface.RetrieveOrganizationsWithTask() + obj = check_task_creation(task_path, publisher, RetrieveOrganizationsTask) + # check all the data got propagated to the module correctly + assert obj.implementation._rhsm_register_server_proxy == register_server_proxy + assert obj.implementation._username == "foo_user" + assert obj.implementation._password == "foo_password" diff --git a/tests/unit_tests/pyanaconda_tests/modules/subscription/test_subscription_tasks.py b/tests/unit_tests/pyanaconda_tests/modules/subscription/test_subscription_tasks.py index b66dc0e93b7..c7e35aaec72 100644 --- a/tests/unit_tests/pyanaconda_tests/modules/subscription/test_subscription_tasks.py +++ b/tests/unit_tests/pyanaconda_tests/modules/subscription/test_subscription_tasks.py @@ -26,29 +26,34 @@ import tempfile -from dasbus.typing import get_variant, get_native, Str +from dasbus.typing import get_variant, get_native, Str, Bool from dasbus.error import DBusError from pyanaconda.core.path import join_paths from pyanaconda.core.constants import SUBSCRIPTION_REQUEST_TYPE_ORG_KEY, \ - RHSM_SYSPURPOSE_FILE_PATH + RHSM_SYSPURPOSE_FILE_PATH, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD from pyanaconda.modules.common.errors.installation import InsightsConnectError, \ InsightsClientMissingError, SubscriptionTokenTransferError from pyanaconda.modules.common.errors.subscription import RegistrationError, \ - SubscriptionError + SatelliteProvisioningError, MultipleOrganizationsError from pyanaconda.modules.common.structures.subscription import SystemPurposeData, \ - SubscriptionRequest, AttachedSubscription + SubscriptionRequest, OrganizationData from pyanaconda.modules.common.constants.services import RHSM -from pyanaconda.modules.common.constants.objects import RHSM_REGISTER +from pyanaconda.modules.common.constants.objects import RHSM_REGISTER, RHSM_UNREGISTER, \ + RHSM_CONFIG from pyanaconda.modules.subscription.installation import ConnectToInsightsTask, \ - RestoreRHSMDefaultsTask, TransferSubscriptionTokensTask + RestoreRHSMDefaultsTask, TransferSubscriptionTokensTask, ProvisionTargetSystemForSatelliteTask from pyanaconda.modules.subscription.runtime import SetRHSMConfigurationTask, \ RHSMPrivateBus, RegisterWithUsernamePasswordTask, RegisterWithOrganizationKeyTask, \ - UnregisterTask, AttachSubscriptionTask, SystemPurposeConfigurationTask, \ - ParseAttachedSubscriptionsTask + UnregisterTask, SystemPurposeConfigurationTask, \ + ParseSubscriptionDataTask, DownloadSatelliteProvisioningScriptTask, \ + RunSatelliteProvisioningScriptTask, BackupRHSMConfBeforeSatelliteProvisioningTask, \ + RollBackSatelliteProvisioningTask, RegisterAndSubscribeTask, RetrieveOrganizationsTask +from pyanaconda.modules.subscription.constants import SERVER_HOSTNAME_NOT_SATELLITE_PREFIX, \ + RHSM_SERVICE_NAME import gi gi.require_version("Gio", "2.0") @@ -643,16 +648,19 @@ def test_username_password_success(self, private_bus, environ_get): # private register proxy get_proxy = private_bus.return_value.__enter__.return_value.get_proxy private_register_proxy = get_proxy.return_value + # make the Register() method return some JSON data + private_register_proxy.Register.return_value = '{"json":"stuff"}' # instantiate the task and run it task = RegisterWithUsernamePasswordTask(rhsm_register_server_proxy=register_server_proxy, username="foo_user", - password="bar_password") - task.run() + password="bar_password", + organization="foo_org") + assert task.run() == '{"json":"stuff"}' # check the private register proxy Register method was called correctly - private_register_proxy.Register.assert_called_once_with("", + private_register_proxy.Register.assert_called_once_with("foo_org", "foo_user", "bar_password", - {}, + {"enable_content": get_variant(Bool, True)}, {}, "en_US.UTF-8") @@ -671,17 +679,93 @@ def test_username_password_failure(self, private_bus, environ_get): # instantiate the task and run it task = RegisterWithUsernamePasswordTask(rhsm_register_server_proxy=register_server_proxy, username="foo_user", - password="bar_password") + password="bar_password", + organization="foo_org") with pytest.raises(RegistrationError): task.run() # check private register proxy Register method was called correctly - private_register_proxy.Register.assert_called_with("", + private_register_proxy.Register.assert_called_with("foo_org", "foo_user", "bar_password", - {}, + {"enable_content": get_variant(Bool, True)}, {}, "en_US.UTF-8") + @patch("pyanaconda.modules.subscription.runtime.RetrieveOrganizationsTask") + @patch("os.environ.get", return_value="en_US.UTF-8") + @patch("pyanaconda.modules.subscription.runtime.RHSMPrivateBus") + def test_username_password_org_single(self, private_bus, environ_get, retrieve_orgs_task): + """Test the RegisterWithUsernamePasswordTask - parsed single org.""" + # register server proxy + register_server_proxy = Mock() + # private register proxy + get_proxy = private_bus.return_value.__enter__.return_value.get_proxy + private_register_proxy = get_proxy.return_value + # make the Register() method return some JSON data + private_register_proxy.Register.return_value = '{"json":"stuff"}' + # mock the org data retrieval task to return single organization + org_data = [ + { + "key": "foo_org", + "displayName": "Foo Org", + } + ] + org_data_json = json.dumps(org_data) + org_data_list = RetrieveOrganizationsTask._parse_org_data_json(org_data_json) + retrieve_orgs_task.return_value.run.return_value = org_data_list + # prepare mock data callaback as well + # instantiate the task and run it - we set organization to "" to make the task + # fetch organization list + task = RegisterWithUsernamePasswordTask(rhsm_register_server_proxy=register_server_proxy, + username="foo_user", + password="bar_password", + organization="") + # if we get just a single organization, we don't actually have to feed + # it to the RHSM API, its only a problem if there are more than one + assert task.run() == '{"json":"stuff"}' + # check the private register proxy Register method was called correctly + private_register_proxy.Register.assert_called_once_with("", + "foo_user", + "bar_password", + {"enable_content": get_variant(Bool, True)}, + {}, + "en_US.UTF-8") + + @patch("pyanaconda.modules.subscription.runtime.RetrieveOrganizationsTask") + @patch("os.environ.get", return_value="en_US.UTF-8") + def test_username_password_org_multi(self, environ_get, retrieve_orgs_task): + """Test the RegisterWithUsernamePasswordTask - parsed multiple orgs.""" + # register server proxy + register_server_proxy = Mock() + # mock the org data retrieval task to return single organization + org_data = [ + { + "key": "foo_org", + "displayName": "Foo Org", + }, + { + "key": "bar_org", + "displayName": "Bar Org", + }, + { + "key": "baz_org", + "displayName": "Baz Org", + } + ] + org_data_json = json.dumps(org_data) + org_data_list = RetrieveOrganizationsTask._parse_org_data_json(org_data_json) + retrieve_orgs_task.return_value.run.return_value = org_data_list + # instantiate the task and run it - we set organization to "" to make the task + # fetch organization list + task = RegisterWithUsernamePasswordTask(rhsm_register_server_proxy=register_server_proxy, + username="foo_user", + password="bar_password", + organization="") + # if we get more than one organization, we can's automatically decide which one to + # use so we throw an exception to notify the user to pick one and try again + with pytest.raises(MultipleOrganizationsError): + task.run() + @patch("os.environ.get", return_value="en_US.UTF-8") @patch("pyanaconda.modules.subscription.runtime.RHSMPrivateBus") def test_org_key_success(self, private_bus, environ_get): @@ -692,11 +776,13 @@ def test_org_key_success(self, private_bus, environ_get): get_proxy = private_bus.return_value.__enter__.return_value.get_proxy private_register_proxy = get_proxy.return_value private_register_proxy.Register.return_value = True, "" + # make the Register() method return some JSON data + private_register_proxy.RegisterWithActivationKeys.return_value = '{"json":"stuff"}' # instantiate the task and run it task = RegisterWithOrganizationKeyTask(rhsm_register_server_proxy=register_server_proxy, organization="123456789", activation_keys=["foo", "bar", "baz"]) - task.run() + assert task.run() == '{"json":"stuff"}' # check private register proxy RegisterWithActivationKeys method was called correctly private_register_proxy.RegisterWithActivationKeys.assert_called_with( "123456789", @@ -737,157 +823,105 @@ def test_org_key_failure(self, private_bus, environ_get): class UnregisterTaskTestCase(unittest.TestCase): """Test the unregister task.""" + @patch("pyanaconda.modules.subscription.runtime.RollBackSatelliteProvisioningTask") @patch("os.environ.get", return_value="en_US.UTF-8") - def test_unregister_success(self, environ_get): + def test_unregister_success(self, environ_get, roll_back_task): """Test the UnregisterTask - success.""" - # register server proxy - rhsm_unregister_proxy = Mock() + rhsm_observer = Mock() # instantiate the task and run it - task = UnregisterTask(rhsm_unregister_proxy=rhsm_unregister_proxy) + task = UnregisterTask( + rhsm_observer=rhsm_observer, + registered_to_satellite=False, + rhsm_configuration={} + ) task.run() # check the unregister proxy Unregister method was called correctly - rhsm_unregister_proxy.Unregister.assert_called_once_with({}, "en_US.UTF-8") + rhsm_observer.get_proxy.assert_called_once_with(RHSM_UNREGISTER) + # registered_to_satellite is False, so roll back task should not run + roll_back_task.assert_not_called() + roll_back_task.return_value.run.assert_not_called() + @patch("pyanaconda.modules.subscription.runtime.RollBackSatelliteProvisioningTask") @patch("os.environ.get", return_value="en_US.UTF-8") - def test_unregister_failure(self, environ_get): + def test_unregister_failure(self, environ_get, roll_back_task): """Test the UnregisterTask - failure.""" - # register server proxy - rhsm_unregister_proxy = Mock() + rhsm_observer = Mock() + rhsm_unregister_proxy = rhsm_observer.get_proxy.return_value # raise DBusError with error message in JSON json_error = '{"message": "Unregistration failed."}' rhsm_unregister_proxy.Unregister.side_effect = DBusError(json_error) # instantiate the task and run it - task = UnregisterTask(rhsm_unregister_proxy=rhsm_unregister_proxy) + task = UnregisterTask( + rhsm_observer=rhsm_observer, + registered_to_satellite=False, + rhsm_configuration={} + ) with pytest.raises(DBusError): task.run() + # check the RHSM observer was used correctly + rhsm_observer.get_proxy.assert_called_once_with(RHSM_UNREGISTER) # check the unregister proxy Unregister method was called correctly rhsm_unregister_proxy.Unregister.assert_called_once_with({}, "en_US.UTF-8") + # registered_to_satellite is False, so roll back task should not run + roll_back_task.assert_not_called() + roll_back_task.return_value.run.assert_not_called() - -class AttachSubscriptionTaskTestCase(unittest.TestCase): - """Test the subscription task.""" - + @patch("pyanaconda.modules.subscription.runtime.RollBackSatelliteProvisioningTask") @patch("os.environ.get", return_value="en_US.UTF-8") - def test_attach_subscription_task_success(self, environ_get): - """Test the AttachSubscriptionTask - success.""" - rhsm_attach_proxy = Mock() - task = AttachSubscriptionTask(rhsm_attach_proxy=rhsm_attach_proxy, - sla="foo_sla") - task.run() - rhsm_attach_proxy.AutoAttach.assert_called_once_with("foo_sla", - {}, - "en_US.UTF-8") - - @patch("os.environ.get", return_value="en_US.UTF-8") - def test_attach_subscription_task_failure(self, environ_get): - """Test the AttachSubscriptionTask - failure.""" - rhsm_attach_proxy = Mock() + def test_unregister_failure_satellite(self, environ_get, roll_back_task): + """Test the UnregisterTask - unregister failure on Satellite.""" + rhsm_observer = Mock() + rhsm_unregister_proxy = rhsm_observer.get_proxy.return_value # raise DBusError with error message in JSON - json_error = '{"message": "Failed to attach subscription."}' - rhsm_attach_proxy.AutoAttach.side_effect = DBusError(json_error) - task = AttachSubscriptionTask(rhsm_attach_proxy=rhsm_attach_proxy, - sla="foo_sla") - with pytest.raises(SubscriptionError): + json_error = '{"message": "Unregistration failed."}' + rhsm_unregister_proxy.Unregister.side_effect = DBusError(json_error) + # instantiate the task and run it + task = UnregisterTask( + rhsm_observer=rhsm_observer, + registered_to_satellite=True, + rhsm_configuration={} + ) + with pytest.raises(DBusError): task.run() - rhsm_attach_proxy.AutoAttach.assert_called_once_with("foo_sla", - {}, - "en_US.UTF-8") - + # check the RHSM observer was used correctly + rhsm_observer.get_proxy.assert_called_once_with(RHSM_UNREGISTER) + # check the unregister proxy Unregister method was called correctly + rhsm_unregister_proxy.Unregister.assert_called_once_with({}, "en_US.UTF-8") + # registered_to_satellite is True, but unregistration failed before roll back + # could happen + roll_back_task.assert_not_called() + roll_back_task.return_value.run.assert_not_called() -class ParseAttachedSubscriptionsTaskTestCase(unittest.TestCase): - """Test the attached subscription parsing task.""" + @patch("pyanaconda.modules.subscription.runtime.RollBackSatelliteProvisioningTask") + @patch("os.environ.get", return_value="en_US.UTF-8") + def test_unregister_satellite_success(self, environ_get, roll_back_task): + """Test the UnregisterTask - Satellite rollback success.""" + rhsm_observer = Mock() + unregister_proxy = Mock() + config_proxy = Mock() + rhsm_observer.get_proxy.side_effect = [unregister_proxy, config_proxy] + # instantiate the task and run it + mock_rhsm_configuration = {"foo": "bar"} + task = UnregisterTask( + rhsm_observer=rhsm_observer, + registered_to_satellite=True, + rhsm_configuration=mock_rhsm_configuration + ) + task.run() + # check the unregister proxy Unregister method was called correctly + rhsm_observer.get_proxy.assert_has_calls([]) + # registered_to_satellite is False, so roll back task should not run + roll_back_task.assert_called_once_with(rhsm_config_proxy=config_proxy, + rhsm_configuration=mock_rhsm_configuration) + roll_back_task.return_value.run.assert_called_once() - def test_pretty_date(self): - """Test the pretty date method of ParseAttachedSubscriptionsTask.""" - pretty_date_method = ParseAttachedSubscriptionsTask._pretty_date - # try to parse ISO 8601 first - assert pretty_date_method("2015-12-22") == "Dec 22, 2015" - # the method expects short mm/dd/yy dates - assert pretty_date_method("12/22/15") == "Dec 22, 2015" - # returns the input if parsing fails - ambiguous_date = "noon of the twenty first century" - assert pretty_date_method(ambiguous_date) == ambiguous_date - - def test_subscription_json_parsing(self): - """Test the subscription JSON parsing method of ParseAttachedSubscriptionsTask.""" - parse_method = ParseAttachedSubscriptionsTask._parse_subscription_json - # the method should be able to survive the RHSM DBus API returning an empty string, - # as empty list of subscriptions is a lesser issue than crashed installation - assert parse_method("") == [] - # try parsing a json file containing two subscriptions - # - to make this look sane, we write it as a dict that we then convert to JSON - subscription_dict = { - "consumed": [ - { - "subscription_name": "Foo Bar Beta", - "service_level": "very good", - "sku": "ABC1234", - "contract": "12345678", - "starts": "05/12/20", - "ends": "05/12/21", - "quantity_used": "1" - }, - { - "subscription_name": "Foo Bar Beta NG", - "service_level": "even better", - "sku": "ABC4321", - "contract": "87654321", - "starts": "now", - "ends": "never", - "quantity_used": "1000" - }, - { - "subscription_name": "Foo Bar Beta NG", - "service_level": "much wow", - "sku": "ABC5678", - "contract": "12344321", - "starts": "2020-05-12", - "ends": "never", - "quantity_used": "1000" - } - ] - } - subscription_json = json.dumps(subscription_dict) - expected_structs = [ - { - "name": "Foo Bar Beta", - "service-level": "very good", - "sku": "ABC1234", - "contract": "12345678", - "start-date": "May 12, 2020", - "end-date": "May 12, 2021", - "consumed-entitlement-count": 1 - }, - { - "name": "Foo Bar Beta NG", - "service-level": "even better", - "sku": "ABC4321", - "contract": "87654321", - "start-date": "now", - "end-date": "never", - "consumed-entitlement-count": 1000 - }, - { - "name": "Foo Bar Beta NG", - "service-level": "much wow", - "sku": "ABC5678", - "contract": "12344321", - "start-date": "May 12, 2020", - "end-date": "never", - "consumed-entitlement-count": 1000 - } - ] - structs = get_native( - AttachedSubscription.to_structure_list(parse_method(subscription_json)) - ) - # check the content of the AttachedSubscription corresponds to the input JSON, - # including date formatting - assert structs == expected_structs +class ParseSubscriptionDataTaskTestCase(unittest.TestCase): + """Test the attached subscription parsing task.""" def test_system_purpose_json_parsing(self): - """Test the system purpose JSON parsing method of ParseAttachedSubscriptionsTask.""" - parse_method = ParseAttachedSubscriptionsTask._parse_system_purpose_json + """Test the system purpose JSON parsing method of ParseSubscriptionDataTask.""" + parse_method = ParseSubscriptionDataTask._parse_system_purpose_json # the parsing method should be able to survive also getting an empty string expected_struct = { "role": "", @@ -936,33 +970,981 @@ def test_system_purpose_json_parsing(self): @patch("os.environ.get", return_value="en_US.UTF-8") def test_attach_subscription_task_success(self, environ_get): - """Test the ParseAttachedSubscriptionsTask.""" + """Test the ParseSubscriptionDataTask.""" # prepare mock proxies the task is expected to interact with - rhsm_entitlement_proxy = Mock() - rhsm_entitlement_proxy.GetPools.return_value = "foo" rhsm_syspurpose_proxy = Mock() rhsm_syspurpose_proxy.GetSyspurpose.return_value = "bar" - task = ParseAttachedSubscriptionsTask(rhsm_entitlement_proxy=rhsm_entitlement_proxy, - rhsm_syspurpose_proxy=rhsm_syspurpose_proxy) + task = ParseSubscriptionDataTask(rhsm_syspurpose_proxy=rhsm_syspurpose_proxy) # mock the parsing methods - subscription1 = AttachedSubscription() - subscription2 = AttachedSubscription() - task._parse_subscription_json = Mock() - task._parse_subscription_json.return_value = [subscription1, subscription2] system_purpose_data = SystemPurposeData() task._parse_system_purpose_json = Mock() task._parse_system_purpose_json.return_value = system_purpose_data # run the task result = task.run() # check DBus proxies were called as expected - rhsm_entitlement_proxy.GetPools.assert_called_once_with({'pool_subsets': - get_variant(Str, "consumed")}, - {}, - "en_US.UTF-8") rhsm_syspurpose_proxy.GetSyspurpose.assert_called_once_with("en_US.UTF-8") # check the parsing methods were called - task._parse_subscription_json.assert_called_once_with("foo") task._parse_system_purpose_json.assert_called_once_with("bar") # check the result that has been returned is as expected - assert result.attached_subscriptions == [subscription1, subscription2] assert result.system_purpose_data == system_purpose_data + + +class SatelliteTasksTestCase(unittest.TestCase): + """Test the Satellite support tasks.""" + + @patch("pyanaconda.modules.subscription.satellite.download_satellite_provisioning_script") + def test_satellite_provisioning_script_download(self, download_function): + """Test the DownloadSatelliteProvisioningScriptTask.""" + # make the download function return a dummy script text + download_function.return_value = "foo bar" + # create the task and run it + task = DownloadSatelliteProvisioningScriptTask( + satellite_url="satellite.example.com", + proxy_url="proxy.example.com", + ) + assert task.run() == "foo bar" + # check the wrapped download function was called correctly + download_function.assert_called_with( + satellite_url="satellite.example.com", + proxy_url="proxy.example.com", + ) + + @patch("pyanaconda.modules.subscription.satellite.run_satellite_provisioning_script") + def test_satellite_provisioning_run_script(self, run_script_function): + """Test the RunSatelliteProvisioningScriptTask - success.""" + # create the task and run it + task = RunSatelliteProvisioningScriptTask( + provisioning_script="foo bar" + ) + task.run() + # check the wrapped run function was called correctly + run_script_function.assert_called_with( + provisioning_script="foo bar", + run_on_target_system=False + ) + + @patch("pyanaconda.modules.subscription.satellite.run_satellite_provisioning_script") + def test_satellite_provisioning_run_script_failure(self, run_script_function): + """Test the RunSatelliteProvisioningScriptTask - failure.""" + # make sure the run-script function raises the correct error + run_script_function.side_effect = SatelliteProvisioningError() + # create the task and run it + task = RunSatelliteProvisioningScriptTask( + provisioning_script="foo bar" + ) + with pytest.raises(SatelliteProvisioningError): + task.run() + # check the wrapped run function was called correctly + run_script_function.assert_called_with( + provisioning_script="foo bar", + run_on_target_system=False + ) + + def test_rhsm_config_backup(self): + """Test the BackupRHSMConfBeforeSatelliteProvisioningTask.""" + # create mock RHSM config proxy + config_proxy = Mock() + # make it return a DBus struct + config_proxy.GetAll.return_value = {"foo": get_variant(Str, "bar")} + # create the task and run it + task = BackupRHSMConfBeforeSatelliteProvisioningTask( + rhsm_config_proxy=config_proxy + ) + conf_backup = task.run() + # check the RHSM config proxy was called correctly + config_proxy.GetAll.assert_called_once_with("") + # check the DBus struct is correctly converted to a Python dict + assert conf_backup == {"foo": "bar"} + + def test_rhsm_roll_back(self): + """Test the RollBackSatelliteProvisioningTask.""" + # create mock RHSM config proxy + config_proxy = Mock() + # and mock RHSM configuration + rhsm_config = {"foo": "bar"} + # create the task and run it + task = RollBackSatelliteProvisioningTask( + rhsm_config_proxy=config_proxy, + rhsm_configuration=rhsm_config + ) + task.run() + # check the RHSM config proxy was called correctly + config_proxy.SetAll.assert_called_once_with({"foo": get_variant(Str, "bar")}, "") + + @patch("pyanaconda.modules.subscription.satellite.run_satellite_provisioning_script") + def test_provision_target_no_op(self, run_script_function): + """Test the ProvisionTargetSystemForSatelliteTask - no op.""" + # create the task and run it + task = ProvisionTargetSystemForSatelliteTask(provisioning_script=None) + task.run() + # make sure we did not try to provision the system with + # registered_to_satellite == False + run_script_function.assert_not_called() + + @patch("pyanaconda.modules.subscription.satellite.run_satellite_provisioning_script") + def test_provision_target_success(self, run_script_function): + """Test the ProvisionTargetSystemForSatelliteTask - success.""" + # make the run script function return True, indicating success + run_script_function.return_value = True + # create the task and run it + task = ProvisionTargetSystemForSatelliteTask(provisioning_script="foo") + task.run() + # make sure we did try to provision the system with + run_script_function.assert_called_once_with( + provisioning_script="foo", + run_on_target_system=True + ) + + @patch("pyanaconda.modules.subscription.satellite.run_satellite_provisioning_script") + def test_provision_target_failure(self, run_script_function): + """Test the ProvisionTargetSystemForSatelliteTask - failure.""" + # make the run script function return False, indicating failure + run_script_function.return_value = False + # create the task and run it + task = ProvisionTargetSystemForSatelliteTask(provisioning_script="foo") + # check if the correct exception for a failure is raised + with pytest.raises(SatelliteProvisioningError): + task.run() + # make sure we did try to provision the system with + run_script_function.assert_called_once_with( + provisioning_script="foo", + run_on_target_system=True + ) + + +class RegisterandSubscribeTestCase(unittest.TestCase): + """Test the RegisterAndSubscribeTask orchestration task. + + This task does orchestration of many individual tasks, + so it makes sense to have a separate test case for it. + """ + + def test_get_proxy_url(self): + """Test proxy URL generation in RegisterAndSubscribeTask.""" + # no proxy data provided + empty_request = SubscriptionRequest() + assert RegisterAndSubscribeTask._get_proxy_url(empty_request) is None + # proxy data provided in subscription request + request_with_proxy_data = SubscriptionRequest() + request_with_proxy_data.server_proxy_hostname = "proxy.example.com" + request_with_proxy_data.server_proxy_user = "foo_user" + request_with_proxy_data.server_proxy_password.set_secret("foo_password") + request_with_proxy_data.server_proxy_port = 1234 + assert RegisterAndSubscribeTask._get_proxy_url(request_with_proxy_data) == \ + "http://foo_user:foo_password@proxy.example.com:1234" + # one more time without valid port set + request_with_proxy_data = SubscriptionRequest() + request_with_proxy_data.server_proxy_hostname = "proxy.example.com" + request_with_proxy_data.server_proxy_user = "foo_user" + request_with_proxy_data.server_proxy_password.set_secret("foo_password") + request_with_proxy_data.server_proxy_port = -1 + # this should result in the default proxy port 3128 being used + assert RegisterAndSubscribeTask._get_proxy_url(request_with_proxy_data) == \ + "http://foo_user:foo_password@proxy.example.com:3128" + + def test_registration_data_json_parsing(self): + """Test the registration data JSON parsing method of RegisterAndSubscribeTask.""" + parse_method = RegisterAndSubscribeTask._detect_sca_from_registration_data + # the parsing method should be able to survive also getting an empty string + # or even None, returning False + assert not parse_method("") + assert not parse_method(None) + + # registration data without owner key + no_owner_data = { + "foo": "123", + "bar": "456", + "baz": "789" + } + assert not parse_method(json.dumps(no_owner_data)) + + # registration data with owner key but without the necessary + # contentAccessMode key + no_access_mode_data = { + "foo": "123", + "owner": { + "id": "abc", + "key": "admin", + "displayName": "Admin Owner" + }, + "bar": "456", + "baz": "789" + } + assert not parse_method(json.dumps(no_access_mode_data)) + + # registration data with owner key but without the necessary + # contentAccessMode key + no_access_mode_data = { + "foo": "123", + "owner": { + "id": "abc", + "key": "admin", + "displayName": "Admin Owner" + }, + "bar": "456", + "baz": "789" + } + assert not parse_method(json.dumps(no_access_mode_data)) + + # registration data for SCA mode + sca_mode_data = { + "foo": "123", + "owner": { + "id": "abc", + "key": "admin", + "displayName": "Admin Owner", + "contentAccessMode": "org_environment" + }, + "bar": "456", + "baz": "789" + } + assert parse_method(json.dumps(sca_mode_data)) + + # registration data for entitlement mode + entitlement_mode_data = { + "foo": "123", + "owner": { + "id": "abc", + "key": "admin", + "displayName": "Admin Owner", + "contentAccessMode": "entitlement" + }, + "bar": "456", + "baz": "789" + } + assert not parse_method(json.dumps(entitlement_mode_data)) + + # registration data for unknown mode + unknown_mode_data = { + "foo": "123", + "owner": { + "id": "abc", + "key": "admin", + "displayName": "Admin Owner", + "contentAccessMode": "something_else" + }, + "bar": "456", + "baz": "789" + } + assert not parse_method(json.dumps(unknown_mode_data)) + + @patch("pyanaconda.modules.subscription.runtime.DownloadSatelliteProvisioningScriptTask") + def test_provision_system_for_satellite_skip(self, download_task): + """Test Satellite provisioning in RegisterAndSubscribeTask - skip.""" + # create the task and related bits + subscription_request = SubscriptionRequest() + subscription_request.server_hostname = \ + SERVER_HOSTNAME_NOT_SATELLITE_PREFIX + "something.else.example.com" + task = RegisterAndSubscribeTask( + rhsm_observer=Mock(), + subscription_request=subscription_request, + system_purpose_data=Mock(), + registered_callback=Mock(), + registered_to_satellite_callback=Mock(), + simple_content_access_callback=Mock(), + subscription_attached_callback=Mock(), + subscription_data_callback=Mock(), + satellite_script_callback=Mock(), + config_backup_callback=Mock() + ) + # run the provisioning method + task._provision_system_for_satellite() + # detect if provisioning is skipped by checking if the + # DownloadSatelliteProvisioningScriptTask has been instantiated + download_task.assert_not_called() + + @patch("pyanaconda.modules.subscription.runtime.DownloadSatelliteProvisioningScriptTask") + def test_provision_system_for_satellite_download_error(self, download_task): + """Test Satellite provisioning in RegisterAndSubscribeTask - script download error.""" + # create the task and related bits + subscription_request = SubscriptionRequest() + subscription_request.server_hostname = "satellite.example.com" + satellite_script_callback = Mock() + task = RegisterAndSubscribeTask( + rhsm_observer=Mock(), + subscription_request=subscription_request, + system_purpose_data=Mock(), + registered_callback=Mock(), + registered_to_satellite_callback=Mock(), + simple_content_access_callback=Mock(), + subscription_attached_callback=Mock(), + subscription_data_callback=Mock(), + satellite_script_callback=satellite_script_callback, + config_backup_callback=Mock() + ) + # make the mock download task fail + download_task.side_effect = SatelliteProvisioningError() + # run the provisioning method, check correct exception is raised + with pytest.raises(SatelliteProvisioningError): + task._provision_system_for_satellite() + # download task should have been instantiated + download_task.assert_called_once_with( + satellite_url='satellite.example.com', + proxy_url=None) + # but the callback should not have been called due to the failure + satellite_script_callback.assert_not_called() + + @patch("pyanaconda.core.service.restart_service") + @patch("pyanaconda.modules.subscription.runtime.RunSatelliteProvisioningScriptTask") + @patch("pyanaconda.modules.subscription.runtime.DownloadSatelliteProvisioningScriptTask") + @patch("pyanaconda.modules.subscription.runtime.BackupRHSMConfBeforeSatelliteProvisioningTask") + def test_provision_satellite_run_error(self, backup_task, download_task, run_script_task, + restart_service): + """Test Satellite provisioning in RegisterAndSubscribeTask - script run failed.""" + # create the task and related bits + subscription_request = SubscriptionRequest() + subscription_request.server_hostname = "satellite.example.com" + satellite_script_callback = Mock() + task = RegisterAndSubscribeTask( + rhsm_observer=Mock(), + subscription_request=subscription_request, + system_purpose_data=Mock(), + registered_callback=Mock(), + registered_to_satellite_callback=Mock(), + simple_content_access_callback=Mock(), + subscription_attached_callback=Mock(), + subscription_data_callback=Mock(), + satellite_script_callback=satellite_script_callback, + config_backup_callback=Mock() + ) + # make the mock download task return the script from its run() method + download_task.return_value.run.return_value = "foo bar script" + # make the mock run task fail + run_script_task.side_effect = SatelliteProvisioningError() + # make the mock backup task return mock RHSM config dict + backup_task.return_value.run.return_value = {"foo": {"bar": "baz"}} + # run the provisioning method, check correct exception is raised + with pytest.raises(SatelliteProvisioningError): + task._provision_system_for_satellite() + # download task should have been instantiated + download_task.assert_called_once_with( + satellite_url='satellite.example.com', + proxy_url=None) + # download callback should have been called + satellite_script_callback.assert_called_once() + # then the run script task should have been instantiated + run_script_task.assert_called_once_with(provisioning_script="foo bar script") + # but the next call to restart_service should not happen + # due to the exception being raised + restart_service.assert_not_called() + + @patch("pyanaconda.core.service.restart_service") + @patch("pyanaconda.modules.subscription.runtime.RunSatelliteProvisioningScriptTask") + @patch("pyanaconda.modules.subscription.runtime.BackupRHSMConfBeforeSatelliteProvisioningTask") + @patch("pyanaconda.modules.subscription.runtime.DownloadSatelliteProvisioningScriptTask") + def test_provision_success(self, download_task, backup_task, run_script_task, restart_service): + """Test Satellite provisioning in RegisterAndSubscribeTask - success.""" + # this tests a simulated successful end-to-end provisioning run, which contains + # some more bits that have been skipped in the previous tests for complexity: + # - check proxy URL propagates correctly + # - check the backup task (run between download and run tasks) is run correctly + + # create the task and related bits + rhsm_observer = Mock() + subscription_request = SubscriptionRequest() + subscription_request.server_hostname = "satellite.example.com" + subscription_request.server_proxy_hostname = "proxy.example.com" + subscription_request.server_proxy_user = "foo_user" + subscription_request.server_proxy_password.set_secret("foo_password") + subscription_request.server_proxy_port = 1234 + config_backup_callback = Mock() + satellite_script_callback = Mock() + task = RegisterAndSubscribeTask( + rhsm_observer=rhsm_observer, + subscription_request=subscription_request, + system_purpose_data=Mock(), + registered_callback=Mock(), + registered_to_satellite_callback=Mock(), + simple_content_access_callback=Mock(), + subscription_attached_callback=Mock(), + subscription_data_callback=Mock(), + satellite_script_callback=satellite_script_callback, + config_backup_callback=config_backup_callback + ) + # mock the roll back method + task._roll_back_satellite_provisioning = Mock() + # make the mock download task return the script from its run() method + download_task.return_value.run.return_value = "foo bar script" + # make the mock backup task return mock RHSM config dict + backup_task.return_value.run.return_value = {"foo": {"bar": "baz"}} + # run the provisioning method + task._provision_system_for_satellite() + # download task should have been instantiated + download_task.assert_called_once_with( + satellite_url='satellite.example.com', + proxy_url='http://foo_user:foo_password@proxy.example.com:1234') + # download callback should have been called + satellite_script_callback.assert_called_once() + # next we should attempt to backup RHSM configuration, so that + # unregistration can correctly cleanup after a Satellite + # registration attempt + rhsm_observer.get_proxy.assert_called_once_with(RHSM_CONFIG) + backup_task.assert_called_once_with(rhsm_config_proxy=rhsm_observer.get_proxy.return_value) + config_backup_callback.assert_called_once_with({"foo.bar": "baz"}) + # then the run script task should have been instantiated + run_script_task.assert_called_once_with(provisioning_script="foo bar script") + # then the RHSM service restart should happen + restart_service.assert_called_once_with(RHSM_SERVICE_NAME) + # make sure the rollback method was not called + task._roll_back_satellite_provisioning.assert_not_called() + + @patch("pyanaconda.modules.subscription.runtime.RegisterWithUsernamePasswordTask") + def test_registration_error_username_password(self, register_username_task): + """Test RegisterAndSubscribeTask - username + password registration error.""" + # create the task and related bits + rhsm_observer = Mock() + subscription_request = SubscriptionRequest() + subscription_request.type = SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD + subscription_request.account_username = "foo_user" + subscription_request.account_password.set_secret("foo_password") + task = RegisterAndSubscribeTask( + rhsm_observer=rhsm_observer, + subscription_request=subscription_request, + system_purpose_data=Mock(), + registered_callback=Mock(), + registered_to_satellite_callback=Mock(), + simple_content_access_callback=Mock(), + subscription_attached_callback=Mock(), + subscription_data_callback=Mock(), + satellite_script_callback=Mock(), + config_backup_callback=Mock() + ) + # make the register task throw an exception + register_username_task.return_value.run_with_signals.side_effect = RegistrationError() + # check the exception is raised as expected + with pytest.raises(RegistrationError): + task.run() + # check the register task was properly instantiated + register_username_task.assert_called_once_with( + rhsm_register_server_proxy=rhsm_observer.get_proxy.return_value, + username='foo_user', + password='foo_password', + organization='' + ) + # check the register task has been run + register_username_task.return_value.run_with_signals.assert_called_once() + + @patch("pyanaconda.modules.subscription.runtime.RegisterWithOrganizationKeyTask") + def test_registration_error_org_key(self, register_org_task): + """Test RegisterAndSubscribeTask - org + key registration error.""" + # create the task and related bits + rhsm_observer = Mock() + subscription_request = SubscriptionRequest() + subscription_request.type = SUBSCRIPTION_REQUEST_TYPE_ORG_KEY + subscription_request.organization = "foo_org" + subscription_request.activation_keys.set_secret(["key1", "key2", "key3"]) + task = RegisterAndSubscribeTask( + rhsm_observer=rhsm_observer, + subscription_request=subscription_request, + system_purpose_data=Mock(), + registered_callback=Mock(), + registered_to_satellite_callback=Mock(), + simple_content_access_callback=Mock(), + subscription_attached_callback=Mock(), + subscription_data_callback=Mock(), + satellite_script_callback=Mock(), + config_backup_callback=Mock() + ) + # make the register task throw an exception + register_org_task.return_value.run_with_signals.side_effect = RegistrationError() + # check the exception is raised as expected + with pytest.raises(RegistrationError): + task.run() + # check the register task was properly instantiated + register_org_task.assert_called_once_with( + rhsm_register_server_proxy=rhsm_observer.get_proxy.return_value, + organization='foo_org', + activation_keys=['key1', 'key2', 'key3'] + ) + # check the register task has been run + register_org_task.return_value.run_with_signals.assert_called_once() + + @patch("pyanaconda.modules.subscription.runtime.ParseSubscriptionDataTask") + @patch("pyanaconda.modules.subscription.runtime.RegisterWithOrganizationKeyTask") + def test_registration_and_subscribe(self, register_task, parse_task): + """Test RegisterAndSubscribeTask - success.""" + # create the task and related bits + rhsm_observer = Mock() + rhsm_register_server = Mock() + rhsm_syspurpose = Mock() + rhsm_observer.get_proxy.side_effect = [ + rhsm_register_server, rhsm_syspurpose + ] + subscription_request = SubscriptionRequest() + subscription_request.type = SUBSCRIPTION_REQUEST_TYPE_ORG_KEY + subscription_request.organization = "foo_org" + subscription_request.activation_keys.set_secret(["key1", "key2", "key3"]) + system_purpose_data = SystemPurposeData() + system_purpose_data.sla = "foo_sla" + subscription_attached_callback = Mock() + task = RegisterAndSubscribeTask( + rhsm_observer=rhsm_observer, + subscription_request=subscription_request, + system_purpose_data=system_purpose_data, + registered_callback=Mock(), + registered_to_satellite_callback=Mock(), + simple_content_access_callback=Mock(), + subscription_attached_callback=subscription_attached_callback, + subscription_data_callback=Mock(), + satellite_script_callback=Mock(), + config_backup_callback=Mock() + ) + # mock the Satellite provisioning method + task._provision_system_for_satellite = Mock() + # run the main task + task.run() + # check satellite provisioning was not attempted + task._provision_system_for_satellite.assert_not_called() + # check the register task was properly instantiated and run + register_task.assert_called_once_with( + rhsm_register_server_proxy=rhsm_register_server, + organization='foo_org', + activation_keys=['key1', 'key2', 'key3'] + ) + register_task.return_value.run_with_signals.assert_called_once() + # also check the callback was called correctly + subscription_attached_callback.assert_called_once_with(True) + # check the subscription parsing task has been properly instantiated and run + parse_task.assert_called_once_with( + rhsm_syspurpose_proxy=rhsm_syspurpose + ) + parse_task.return_value.run_with_signals.assert_called_once() + + @patch("pyanaconda.modules.subscription.runtime.ParseSubscriptionDataTask") + @patch("pyanaconda.modules.subscription.runtime.RegisterWithOrganizationKeyTask") + def test_registration_and_subscribe_satellite(self, register_task, parse_task): + """Test RegisterAndSubscribeTask - success with satellite provisioning.""" + # create the task and related bits + rhsm_observer = Mock() + rhsm_register_server = Mock() + rhsm_syspurpose = Mock() + rhsm_observer.get_proxy.side_effect = [ + rhsm_register_server, rhsm_syspurpose + ] + subscription_request = SubscriptionRequest() + subscription_request.type = SUBSCRIPTION_REQUEST_TYPE_ORG_KEY + subscription_request.organization = "foo_org" + subscription_request.activation_keys.set_secret(["key1", "key2", "key3"]) + subscription_request.server_hostname = "satellite.example.com" + system_purpose_data = SystemPurposeData() + system_purpose_data.sla = "foo_sla" + subscription_attached_callback = Mock() + task = RegisterAndSubscribeTask( + rhsm_observer=rhsm_observer, + subscription_request=subscription_request, + system_purpose_data=system_purpose_data, + registered_callback=Mock(), + registered_to_satellite_callback=Mock(), + simple_content_access_callback=Mock(), + subscription_attached_callback=subscription_attached_callback, + subscription_data_callback=Mock(), + satellite_script_callback=Mock(), + config_backup_callback=Mock() + ) + # mock the Satellite provisioning method + task._provision_system_for_satellite = Mock() + # run the main task + task.run() + # check satellite provisioning was attempted + task._provision_system_for_satellite.assert_called_once_with() + # check the register task was properly instantiated and run + register_task.assert_called_once_with( + rhsm_register_server_proxy=rhsm_register_server, + organization='foo_org', + activation_keys=['key1', 'key2', 'key3'] + ) + register_task.return_value.run_with_signals.assert_called_once() + # also check the callback was called correctly + subscription_attached_callback.assert_called_once_with(True) + # check the subscription parsing task has been properly instantiated and run + parse_task.assert_called_once_with( + rhsm_syspurpose_proxy=rhsm_syspurpose + ) + parse_task.return_value.run_with_signals.assert_called_once() + + @patch("pyanaconda.modules.subscription.runtime.ParseSubscriptionDataTask") + @patch("pyanaconda.modules.subscription.runtime.RegisterWithOrganizationKeyTask") + def test_registration_failure_satellite(self, register_task, parse_task): + """Test RegisterAndSubscribeTask - registration failure with satellite provisioning.""" + # create the task and related bits + rhsm_observer = Mock() + rhsm_register_server = Mock() + rhsm_entitlement = Mock() + rhsm_syspurpose = Mock() + rhsm_observer.get_proxy.side_effect = [ + rhsm_register_server, rhsm_entitlement, rhsm_syspurpose + ] + subscription_request = SubscriptionRequest() + subscription_request.type = SUBSCRIPTION_REQUEST_TYPE_ORG_KEY + subscription_request.organization = "foo_org" + subscription_request.activation_keys.set_secret(["key1", "key2", "key3"]) + subscription_request.server_hostname = "satellite.example.com" + system_purpose_data = SystemPurposeData() + system_purpose_data.sla = "foo_sla" + subscription_attached_callback = Mock() + task = RegisterAndSubscribeTask( + rhsm_observer=rhsm_observer, + subscription_request=subscription_request, + system_purpose_data=system_purpose_data, + registered_callback=Mock(), + registered_to_satellite_callback=Mock(), + simple_content_access_callback=Mock(), + subscription_attached_callback=subscription_attached_callback, + subscription_data_callback=Mock(), + satellite_script_callback=Mock(), + config_backup_callback=Mock() + ) + # mock the Satellite provisioning method + task._provision_system_for_satellite = Mock() + # mock the Satellite rollback method + task._roll_back_satellite_provisioning = Mock() + # make the register task throw an exception + register_task.return_value.run_with_signals.side_effect = RegistrationError() + # run the main task, epxect registration error + with pytest.raises(RegistrationError): + task.run() + # check satellite provisioning was attempted + task._provision_system_for_satellite.assert_called_once_with() + # check the register task was properly instantiated and run + register_task.assert_called_once_with( + rhsm_register_server_proxy=rhsm_register_server, + organization='foo_org', + activation_keys=['key1', 'key2', 'key3'] + ) + register_task.return_value.run_with_signals.assert_called_once() + # also check the callback was not called + subscription_attached_callback.assert_not_called() + # check the subscription parsing task has not been instantiated and run + parse_task.assert_not_called() + parse_task.return_value.run_with_signals.assert_not_called() + # the Satellite provisioning rollback should have been called due to the failure + task._roll_back_satellite_provisioning.assert_called_once() + + +class RetrieveOrganizationsTaskTestCase(unittest.TestCase): + """Test the organization data parsing task.""" + + def test_org_data_json_parsing(self): + """Test the organization data JSON parsing method of RetrieveOrganizationsTask.""" + parse_method = RetrieveOrganizationsTask._parse_org_data_json + # the parsing method should be able to survive also getting an empty string, + # resulting in an empty list being returned + struct = get_native( + OrganizationData.to_structure_list(parse_method("")) + ) + assert struct == [] + + # try data with single organization + single_org_data = [ + { + "key": "123abc", + "displayName": "Foo Org", + "contentAccessMode": "entitlement" + } + ] + single_org_data_json = json.dumps(single_org_data) + expected_struct_list = [ + { + "id": "123abc", + "name": "Foo Org", + } + ] + + struct = get_native( + OrganizationData.to_structure_list(parse_method(single_org_data_json)) + ) + assert struct == expected_struct_list + + # try multiple organizations: + # - one in entitlement (classic) mode + # - one in Simple Content Access mode + # - one in unknown unexpected mode (should fall back to entitlement/classic mode) + multiple_org_data = [ + { + "key": "123a", + "displayName": "Foo Org", + "contentAccessMode": "entitlement" + }, + { + "key": "123b", + "displayName": "Bar Org", + "contentAccessMode": "org_environment" + }, + { + "key": "123c", + "displayName": "Baz Org", + "contentAccessMode": "something_else" + } + ] + multiple_org_data_json = json.dumps(multiple_org_data) + expected_struct_list = [ + { + "id": "123a", + "name": "Foo Org", + }, + { + "id": "123b", + "name": "Bar Org", + }, + { + "id": "123c", + "name": "Baz Org", + } + ] + structs = get_native( + OrganizationData.to_structure_list(parse_method(multiple_org_data_json)) + ) + assert structs == expected_struct_list + + @patch("os.environ.get", return_value="en_US.UTF-8") + @patch("pyanaconda.modules.subscription.runtime.RHSMPrivateBus") + def test_get_org_data(self, private_bus, environ_get): + """Test the RetrieveOrganizationsTask.""" + # register server proxy + register_server_proxy = Mock() + # private register proxy + get_proxy = private_bus.return_value.__enter__.return_value.get_proxy + private_register_proxy = get_proxy.return_value + # mock the GetOrgs JSON output + multiple_org_data = [ + { + "key": "123a", + "displayName": "Foo Org", + "contentAccessMode": "entitlement" + }, + { + "key": "123b", + "displayName": "Bar Org", + "contentAccessMode": "org_environment" + }, + { + "key": "123c", + "displayName": "Baz Org", + "contentAccessMode": "something_else" + } + ] + multiple_org_data_json = json.dumps(multiple_org_data) + private_register_proxy.GetOrgs.return_value = multiple_org_data_json + + # instantiate the task and run it + task = RetrieveOrganizationsTask(rhsm_register_server_proxy=register_server_proxy, + username="foo_user", + password="bar_password") + org_data_structs = task.run() + # check the structs based on the JSON data look as expected + expected_struct_list = [ + { + "id": "123a", + "name": "Foo Org", + }, + { + "id": "123b", + "name": "Bar Org", + }, + { + "id": "123c", + "name": "Baz Org", + } + ] + structs = get_native( + OrganizationData.to_structure_list(org_data_structs) + ) + assert structs == expected_struct_list + + # check the private register proxy Register method was called correctly + private_register_proxy.GetOrgs.assert_called_once_with("foo_user", + "bar_password", + {}, + "en_US.UTF-8") + + @patch("os.environ.get", return_value="en_US.UTF-8") + @patch("pyanaconda.modules.subscription.runtime.RHSMPrivateBus") + def test_get_org_data_cached(self, private_bus, environ_get): + """Test the RetrieveOrganizationsTask - return cached data on error.""" + # register server proxy + register_server_proxy = Mock() + # private register proxy + get_proxy = private_bus.return_value.__enter__.return_value.get_proxy + private_register_proxy = get_proxy.return_value + # simulate GetOrgs call failure + private_register_proxy.GetOrgs.side_effect = DBusError("org listing failed") + # create some dummy cached data + cached_structs_list = [ + { + "id": get_variant(Str, "123a cached"), + "name": get_variant(Str, "Foo Org cached"), + }, + { + "id": get_variant(Str, "123b cached"), + "name": get_variant(Str, "Bar Org cached"), + }, + { + "id": get_variant(Str, "123c cached"), + "name": get_variant(Str, "Baz Org cached"), + } + ] + cached_structs = OrganizationData.from_structure_list(cached_structs_list) + RetrieveOrganizationsTask._org_data_list_cache = cached_structs + + # instantiate the task and run it with cached data + task = RetrieveOrganizationsTask(rhsm_register_server_proxy=register_server_proxy, + username="foo_user", + password="bar_password") + org_data_structs = task.run() + # check the returned structs are based on the cache data, not the + # JSON data the mock-API would return + expected_struct_list = [ + { + "id": "123a cached", + "name": "Foo Org cached", + }, + { + "id": "123b cached", + "name": "Bar Org cached", + }, + { + "id": "123c cached", + "name": "Baz Org cached", + } + ] + structs = get_native( + OrganizationData.to_structure_list(org_data_structs) + ) + assert structs == expected_struct_list + + # check the private register proxy Register method was *not* called + # as all data should come from the cache, if provided, with *no* + # DBus API access + private_register_proxy.GetOrgs.assert_called_once_with( + 'foo_user', 'bar_password', {}, 'en_US.UTF-8' + ) + + @patch("os.environ.get", return_value="en_US.UTF-8") + @patch("pyanaconda.modules.subscription.runtime.RHSMPrivateBus") + def test_get_org_data_ignore_cache(self, private_bus, environ_get): + """Test the RetrieveOrganizationsTask - do not use cache on success.""" + # register server proxy + register_server_proxy = Mock() + # private register proxy + get_proxy = private_bus.return_value.__enter__.return_value.get_proxy + private_register_proxy = get_proxy.return_value + # mock the GetOrgs JSON output + multiple_org_data = [ + { + "key": "123a", + "displayName": "Foo Org", + "contentAccessMode": "entitlement" + }, + { + "key": "123b", + "displayName": "Bar Org", + "contentAccessMode": "org_environment" + }, + { + "key": "123c", + "displayName": "Baz Org", + "contentAccessMode": "something_else" + } + ] + multiple_org_data_json = json.dumps(multiple_org_data) + private_register_proxy.GetOrgs.return_value = multiple_org_data_json + # create some dummy cached data + cached_structs_list = [ + { + "id": get_variant(Str, "123a cached"), + "name": get_variant(Str, "Foo Org cached"), + }, + { + "id": get_variant(Str, "123b cached"), + "name": get_variant(Str, "Bar Org cached"), + }, + { + "id": get_variant(Str, "123c cached"), + "name": get_variant(Str, "Baz Org cached"), + } + ] + cached_structs = OrganizationData.from_structure_list(cached_structs_list) + RetrieveOrganizationsTask._org_data_list_cache = cached_structs + + # instantiate the task and run it with cached data + task = RetrieveOrganizationsTask(rhsm_register_server_proxy=register_server_proxy, + username="foo_user", + password="bar_password") + org_data_structs = task.run() + + # check the structs based on the GetOrgs returned JSON data look as expected + expected_struct_list = [ + { + "id": "123a", + "name": "Foo Org", + }, + { + "id": "123b", + "name": "Bar Org", + }, + { + "id": "123c", + "name": "Baz Org", + } + ] + structs = get_native( + OrganizationData.to_structure_list(org_data_structs) + ) + assert structs == expected_struct_list + + # check the private register proxy Register method was *not* called + # as all data should come from the cache, if provided, with *no* + # DBus API access + private_register_proxy.GetOrgs.assert_called_once_with( + 'foo_user', 'bar_password', {}, 'en_US.UTF-8' + ) + + @patch("os.environ.get", return_value="en_US.UTF-8") + @patch("pyanaconda.modules.subscription.runtime.RHSMPrivateBus") + def test_get_org_data_cache_reset(self, private_bus, environ_get): + """Test the RetrieveOrganizationsTask - test cache reset.""" + # register server proxy + register_server_proxy = Mock() + # private register proxy + get_proxy = private_bus.return_value.__enter__.return_value.get_proxy + private_register_proxy = get_proxy.return_value + # simulate GetOrgs call failure + private_register_proxy.GetOrgs.side_effect = DBusError("org listing failed") + # create some dummy cached data + cached_structs_list = [ + { + "id": get_variant(Str, "123a cached"), + "name": get_variant(Str, "Foo Org cached"), + }, + { + "id": get_variant(Str, "123b cached"), + "name": get_variant(Str, "Bar Org cached"), + }, + { + "id": get_variant(Str, "123c cached"), + "name": get_variant(Str, "Baz Org cached"), + } + ] + cached_structs = OrganizationData.from_structure_list(cached_structs_list) + RetrieveOrganizationsTask._org_data_list_cache = cached_structs + + # instantiate the task and run it with cached data + task = RetrieveOrganizationsTask(rhsm_register_server_proxy=register_server_proxy, + username="foo_user", + password="bar_password", + reset_cache=True) + org_data_structs = task.run() + # we dropped the cache and the GetOrgs() call failed, so we return the + # contents of the empty cache + expected_struct_list = [] + structs = get_native( + OrganizationData.to_structure_list(org_data_structs) + ) + assert structs == expected_struct_list + + # check the private register proxy Register method was *not* called + # as all data should come from the cache, if provided, with *no* + # DBus API access + private_register_proxy.GetOrgs.assert_called_once_with( + 'foo_user', 'bar_password', {}, 'en_US.UTF-8' + ) diff --git a/tests/unit_tests/pyanaconda_tests/test_subscription_helpers.py b/tests/unit_tests/pyanaconda_tests/test_subscription_helpers.py index 606b780ba2e..afe2931e7dd 100644 --- a/tests/unit_tests/pyanaconda_tests/test_subscription_helpers.py +++ b/tests/unit_tests/pyanaconda_tests/test_subscription_helpers.py @@ -21,26 +21,26 @@ import tempfile import unittest -from unittest.mock import patch, Mock, call +from unittest.mock import patch, Mock, MagicMock, call from dasbus.typing import * # pylint: disable=wildcard-import -from tests.unit_tests.pyanaconda_tests import patch_dbus_get_proxy_with_cache - from pyanaconda.core.path import join_paths from pyanaconda.core.constants import RHSM_SYSPURPOSE_FILE_PATH, \ THREAD_WAIT_FOR_CONNECTING_NM, SUBSCRIPTION_REQUEST_TYPE_USERNAME_PASSWORD, \ SUBSCRIPTION_REQUEST_TYPE_ORG_KEY, SOURCE_TYPE_CLOSEST_MIRROR, \ SOURCE_TYPE_CDN, SOURCE_TYPE_CDROM, PAYLOAD_TYPE_DNF, PAYLOAD_TYPE_RPM_OSTREE, \ SOURCE_TYPE_URL -from pyanaconda.core.subscription import check_system_purpose_set -from pyanaconda.modules.common.constants.services import BOSS, SUBSCRIPTION + from pyanaconda.modules.common.errors.subscription import UnregistrationError, \ - RegistrationError, SubscriptionError + RegistrationError, SatelliteProvisioningError from pyanaconda.modules.common.structures.subscription import SubscriptionRequest + +from pyanaconda.core.subscription import check_system_purpose_set + from pyanaconda.ui.lib.subscription import SubscriptionPhase, \ register_and_subscribe, unregister, org_keys_sufficient, \ - username_password_sufficient, check_cdn_is_installation_source, is_cdn_registration_required + username_password_sufficient, check_cdn_is_installation_source class CheckSystemPurposeSetTestCase(unittest.TestCase): @@ -215,18 +215,14 @@ def test_username_password_sufficient_direct_request(self): @patch("pyanaconda.modules.common.task.sync_run_task") @patch("pyanaconda.core.threads.thread_manager.wait") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_register_org_key(self, get_proxy, thread_mgr_wait, run_task, switch_source): - """Test the register_and_subscribe() helper method - org & key.""" + def test_register_success(self, get_proxy, thread_mgr_wait, run_task, switch_source): + """Test the register_and_subscribe() helper method - success.""" payload = Mock() - source_proxy = payload.get_source_proxy.return_value - source_proxy.Type = SOURCE_TYPE_CLOSEST_MIRROR progress_callback = Mock() error_callback = Mock() subscription_proxy = get_proxy.return_value # simulate the system not being registered subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.KEY_REQUEST # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -236,16 +232,13 @@ def test_register_org_key(self, get_proxy, thread_mgr_wait, run_task, switch_sou # system was no registered, so no unregistration phase progress_callback.assert_has_calls( [call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION), call(SubscriptionPhase.DONE)] ) # we were successful, so no error callback calls error_callback.assert_not_called() # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterOrganizationKeyWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - subscription_proxy.ParseAttachedSubscriptionsWithTask.assert_called_once() + subscription_proxy.RegisterAndSubscribeWithTask.assert_called_once() # not tried to set the CDN source switch_source.assert_not_called() # and tried to run them @@ -253,48 +246,7 @@ def test_register_org_key(self, get_proxy, thread_mgr_wait, run_task, switch_sou @patch("pyanaconda.ui.lib.subscription.switch_source") @patch("pyanaconda.modules.common.task.sync_run_task") - @patch("pyanaconda.core.threads.thread_manager.wait") - @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_register_username_password(self, get_proxy, thread_mgr_wait, run_task, switch_source): - """Test the register_and_subscribe() helper method - username & password.""" - payload = Mock() - source_proxy = payload.get_source_proxy.return_value - source_proxy.Type = SOURCE_TYPE_CLOSEST_MIRROR - progress_callback = Mock() - error_callback = Mock() - subscription_proxy = get_proxy.return_value - # simulate the system not being registered - subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.PASSWORD_REQUEST - # run the function - register_and_subscribe(payload=payload, - progress_callback=progress_callback, - error_callback=error_callback) - # we should have waited on network - thread_mgr_wait.assert_called_once_with(THREAD_WAIT_FOR_CONNECTING_NM) - # system was no registered, so no unregistration phase - print(error_callback.mock_calls) - progress_callback.assert_has_calls( - [call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION), - call(SubscriptionPhase.DONE)] - ) - # we were successful, so no error callback calls - error_callback.assert_not_called() - # we should have requested the appropriate tasks - subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterUsernamePasswordWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - subscription_proxy.ParseAttachedSubscriptionsWithTask.assert_called_once() - # not tried to set the CDN source - switch_source.assert_not_called() - # and tried to run them - run_task.assert_called() - - @patch("pyanaconda.ui.lib.subscription.switch_source") - @patch("pyanaconda.modules.common.task.sync_run_task") - @patch("pyanaconda.core.threads.thread_manager.wait") + @patch("pyanaconda.threading.threadMgr.wait") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") def test_unregister_register(self, get_proxy, thread_mgr_wait, run_task, switch_source): """Test the register_and_subscribe() helper method - registered system.""" @@ -307,8 +259,6 @@ def test_unregister_register(self, get_proxy, thread_mgr_wait, run_task, switch_ # simulate the system being registered, # - this should add additional unregister phase and task subscription_proxy.IsRegistered = True - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.KEY_REQUEST # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -319,7 +269,6 @@ def test_unregister_register(self, get_proxy, thread_mgr_wait, run_task, switch_ progress_callback.assert_has_calls( [call(SubscriptionPhase.UNREGISTER), call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION), call(SubscriptionPhase.DONE)] ) # we were successful, so no error callback calls @@ -327,9 +276,7 @@ def test_unregister_register(self, get_proxy, thread_mgr_wait, run_task, switch_ # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() subscription_proxy.UnregisterWithTask.assert_called_once() - subscription_proxy.RegisterOrganizationKeyWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - subscription_proxy.ParseAttachedSubscriptionsWithTask.assert_called_once() + subscription_proxy.SetRHSMConfigWithTask.assert_called_once() # not tried to set the CDN source switch_source.assert_not_called() # and tried to run them @@ -348,10 +295,9 @@ def test_unregister_task_failed(self, get_proxy, thread_mgr_wait, run_task, swit # simulate the system being registered, # - this should add additional unregister phase and task subscription_proxy.IsRegistered = True - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.KEY_REQUEST # make the first (unregistration) task fail - run_task.side_effect = [True, UnregistrationError("unregistration failed")] + unregistration_error = UnregistrationError("unregistration failed") + run_task.side_effect = [True, unregistration_error] # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -363,7 +309,7 @@ def test_unregister_task_failed(self, get_proxy, thread_mgr_wait, run_task, swit [call(SubscriptionPhase.UNREGISTER)] ) # and the error callback should have been triggered - error_callback.assert_called_once_with("unregistration failed") + error_callback.assert_called_once_with(unregistration_error) # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() subscription_proxy.UnregisterWithTask.assert_called_once() @@ -377,18 +323,17 @@ def test_unregister_task_failed(self, get_proxy, thread_mgr_wait, run_task, swit @patch("pyanaconda.modules.common.task.sync_run_task") @patch("pyanaconda.core.threads.thread_manager.wait") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_register_org_key_failed(self, get_proxy, thread_mgr_wait, run_task, switch_source): - """Test the register_and_subscribe() helper method - org & key failed.""" + def test_sat_provisioning_failed(self, get_proxy, thread_mgr_wait, run_task, switch_source): + """Test the register_and_subscribe() helper method - Satellite provisioning failed.""" payload = Mock() progress_callback = Mock() error_callback = Mock() subscription_proxy = get_proxy.return_value # simulate the system not being registered subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.KEY_REQUEST # make the first (registration) task fail - run_task.side_effect = [True, RegistrationError("registration failed")] + sat_error = SatelliteProvisioningError("Satellite provisioning failed") + run_task.side_effect = [True, sat_error] # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -400,10 +345,10 @@ def test_register_org_key_failed(self, get_proxy, thread_mgr_wait, run_task, swi [call(SubscriptionPhase.REGISTER)] ) # and the error callback should have been triggered - error_callback.assert_called_once_with("registration failed") + error_callback.assert_called_once_with(sat_error) # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterOrganizationKeyWithTask.assert_called_once() + subscription_proxy.RegisterAndSubscribeWithTask.assert_called_once() # and tried to run them run_task.assert_called() # setting CDN as installation source does not make sense @@ -414,53 +359,17 @@ def test_register_org_key_failed(self, get_proxy, thread_mgr_wait, run_task, swi @patch("pyanaconda.modules.common.task.sync_run_task") @patch("pyanaconda.core.threads.thread_manager.wait") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_register_key_missing(self, get_proxy, thread_mgr_wait, run_task, switch_source): - """Test the register_and_subscribe() helper method - key missing.""" - payload = Mock() - progress_callback = Mock() - error_callback = Mock() - subscription_proxy = get_proxy.return_value - # simulate the system not being registered - subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.KEY_MISSING_REQUEST - # run the function - register_and_subscribe(payload=payload, - progress_callback=progress_callback, - error_callback=error_callback) - # we should have waited on network - thread_mgr_wait.assert_called_once_with(THREAD_WAIT_FOR_CONNECTING_NM) - # there should be only the registration phase - progress_callback.assert_has_calls( - [call(SubscriptionPhase.REGISTER)] - ) - # and the error callback should have been triggered - error_callback.assert_called_once() - # in this case we fail before requesting any other task than - # the config one - subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - run_task.assert_called() - # setting CDN as installation source does not make sense - # when we were not able to attach a subscription - switch_source.assert_not_called() - - @patch("pyanaconda.ui.lib.subscription.switch_source") - @patch("pyanaconda.modules.common.task.sync_run_task") - @patch("pyanaconda.core.threads.thread_manager.wait") - @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_register_username_password_task_failed(self, get_proxy, thread_mgr_wait, - run_task, switch_source): - """Test the register_and_subscribe() helper method - username & password failed.""" + def test_register_failed(self, get_proxy, thread_mgr_wait, run_task, switch_source): + """Test the register_and_subscribe() helper method - failed to register.""" payload = Mock() progress_callback = Mock() error_callback = Mock() subscription_proxy = get_proxy.return_value # simulate the system not being registered subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.PASSWORD_REQUEST # make the first (registration) task fail - run_task.side_effect = [True, RegistrationError("registration failed")] + registration_error = RegistrationError("registration failed") + run_task.side_effect = [True, registration_error] # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -472,60 +381,27 @@ def test_register_username_password_task_failed(self, get_proxy, thread_mgr_wait [call(SubscriptionPhase.REGISTER)] ) # and the error callback should have been triggered - error_callback.assert_called_once_with("registration failed") + error_callback.assert_called_once_with(registration_error) # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterUsernamePasswordWithTask.assert_called_once() + subscription_proxy.RegisterAndSubscribeWithTask.assert_called_once() # and tried to run them run_task.assert_called() # setting CDN as installation source does not make sense # when we were not able to attach a subscription switch_source.assert_not_called() - @patch("pyanaconda.ui.lib.subscription.switch_source") - @patch("pyanaconda.modules.common.task.sync_run_task") - @patch("pyanaconda.core.threads.thread_manager.wait") - @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_register_password_missing(self, get_proxy, thread_mgr_wait, run_task, switch_source): - """Test the register_and_subscribe() helper method - password missing.""" - payload = Mock() - progress_callback = Mock() - error_callback = Mock() - subscription_proxy = get_proxy.return_value - # simulate the system not being registered - subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.PASSWORD_MISSING_REQUEST - # run the function - register_and_subscribe(payload=payload, - progress_callback=progress_callback, - error_callback=error_callback) - # we should have waited on network - thread_mgr_wait.assert_called_once_with(THREAD_WAIT_FOR_CONNECTING_NM) - # there should be only the registration phase - progress_callback.assert_has_calls( - [call(SubscriptionPhase.REGISTER)] - ) - # and the error callback should have been triggered - error_callback.assert_called_once() - # in this case we fail before requesting any other task than - # the config one - subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - run_task.assert_called() - # setting CDN as installation source does not make sense - # when we were not able to attach a subscription - switch_source.assert_not_called() - @patch("pyanaconda.payload.manager.payloadMgr.start") @patch("pyanaconda.ui.lib.subscription.switch_source") @patch("pyanaconda.modules.common.task.sync_run_task") - @patch("pyanaconda.core.threads.thread_manager.wait") + @patch("pyanaconda.threading.threadMgr.wait") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") def test_register_override_cdrom(self, get_proxy, thread_mgr_wait, run_task, switch_source, - restart_thread): + start_thread): """Test the register_and_subscribe() helper method - override CDROM source.""" payload = Mock() payload.type = PAYLOAD_TYPE_DNF + payload.data.repo.dataList = MagicMock(return_value=[]) source_proxy_1 = Mock() source_proxy_1.Type = SOURCE_TYPE_CDROM source_proxy_2 = Mock() @@ -539,8 +415,6 @@ def test_register_override_cdrom(self, get_proxy, thread_mgr_wait, run_task, swi subscription_proxy = get_proxy.return_value # simulate the system not being registered subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.KEY_REQUEST # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -551,24 +425,21 @@ def test_register_override_cdrom(self, get_proxy, thread_mgr_wait, run_task, swi # system was no registered, so no unregistration phase progress_callback.assert_has_calls( [call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION), call(SubscriptionPhase.DONE)] ) # we were successful, so no error callback calls error_callback.assert_not_called() # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterOrganizationKeyWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - subscription_proxy.ParseAttachedSubscriptionsWithTask.assert_called_once() + subscription_proxy.RegisterAndSubscribeWithTask.assert_called_once() # and tried to override the CDROM source, as it is on a list of sources - # that are appropriate to be overriden by the CDN source + # that are appropriate to be overridden by the CDN source switch_source.assert_called_once_with(payload, SOURCE_TYPE_CDN) # and tried to run them run_task.assert_called() # tried to restart the payload as CDN is set and we need to restart # the payload to make it usable - restart_thread.assert_called_once() + start_thread.assert_called_once() @patch("pyanaconda.payload.manager.payloadMgr.start") @patch("pyanaconda.ui.lib.subscription.switch_source") @@ -576,10 +447,11 @@ def test_register_override_cdrom(self, get_proxy, thread_mgr_wait, run_task, swi @patch("pyanaconda.core.threads.thread_manager.wait") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") def test_register_override_cdrom_no_restart(self, get_proxy, thread_mgr_wait, run_task, - switch_source, restart_thread): + switch_source, start_thread): """Test the register_and_subscribe() helper method - override CDROM source, no restart.""" payload = Mock() payload.type = PAYLOAD_TYPE_DNF + payload.data.repo.dataList = MagicMock(return_value=[]) source_proxy_1 = Mock() source_proxy_1.Type = SOURCE_TYPE_CDROM source_proxy_2 = Mock() @@ -593,8 +465,6 @@ def test_register_override_cdrom_no_restart(self, get_proxy, thread_mgr_wait, ru subscription_proxy = get_proxy.return_value # simulate the system not being registered subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.KEY_REQUEST # run the function & tell it not to restart payload register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -605,62 +475,20 @@ def test_register_override_cdrom_no_restart(self, get_proxy, thread_mgr_wait, ru # system was no registered, so no unregistration phase progress_callback.assert_has_calls( [call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION), call(SubscriptionPhase.DONE)] ) # we were successful, so no error callback calls error_callback.assert_not_called() # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterOrganizationKeyWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - subscription_proxy.ParseAttachedSubscriptionsWithTask.assert_called_once() + subscription_proxy.RegisterAndSubscribeWithTask.assert_called_once() # and tried to override the CDROM source, as it is on a list of sources - # that are appropriate to be overriden by the CDN source + # that are appropriate to be overridden by the CDN source switch_source.assert_called_once_with(payload, SOURCE_TYPE_CDN) # and tried to run them run_task.assert_called() # we told the payload not to restart - restart_thread.assert_not_called() - - @patch("pyanaconda.ui.lib.subscription.switch_source") - @patch("pyanaconda.modules.common.task.sync_run_task") - @patch("pyanaconda.core.threads.thread_manager.wait") - @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_subscription_task_failed(self, get_proxy, thread_mgr_wait, run_task, switch_source): - """Test the register_and_subscribe() helper method - failed to attach subscription.""" - payload = Mock() - progress_callback = Mock() - error_callback = Mock() - subscription_proxy = get_proxy.return_value - # simulate the system not being registered - subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.PASSWORD_REQUEST - # make the second (subscription) task fail - run_task.side_effect = [True, True, SubscriptionError("failed to attach subscription")] - # run the function - register_and_subscribe(payload=payload, - progress_callback=progress_callback, - error_callback=error_callback) - # we should have waited on network - thread_mgr_wait.assert_called_once_with(THREAD_WAIT_FOR_CONNECTING_NM) - # there should be only the registration & subscription phase - progress_callback.assert_has_calls( - [call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION)] - ) - # and the error callback should have been triggered - error_callback.assert_called_once_with("failed to attach subscription") - # we should have requested the appropriate tasks - subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterUsernamePasswordWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - # and tried to run them - run_task.assert_called() - # setting CDN as installation source does not make sense - # when we were not able to attach a subscription - switch_source.assert_not_called() + start_thread.assert_not_called() @patch("pyanaconda.modules.common.task.sync_run_task") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") @@ -726,7 +554,8 @@ def test_unregister_failed(self, get_proxy, run_task): # simulate the system being registered, subscription_proxy.IsRegistered = True # make the unregistration task fail - run_task.side_effect = [True, UnregistrationError("unregistration failed")] + unregistration_error = UnregistrationError("unregistration failed") + run_task.side_effect = [True, unregistration_error] # run the function unregister(payload=payload, overridden_source_type=None, @@ -737,7 +566,7 @@ def test_unregister_failed(self, get_proxy, run_task): [call(SubscriptionPhase.UNREGISTER)] ) # and the error callback should have been triggered - error_callback.assert_called_once_with("unregistration failed") + error_callback.assert_called_once_with(unregistration_error) # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() subscription_proxy.UnregisterWithTask.assert_called_once() @@ -803,32 +632,6 @@ def test_check_cdn_is_installation_source(self): ostree_payload.type = PAYLOAD_TYPE_RPM_OSTREE assert not check_cdn_is_installation_source(ostree_payload) - @patch_dbus_get_proxy_with_cache - def test_is_cdn_registration_required(self, proxy_getter): - """Test the is_cdn_registration_required function.""" - dnf_payload = Mock() - dnf_payload.type = PAYLOAD_TYPE_DNF - - source_proxy = dnf_payload.get_source_proxy.return_value - source_proxy.Type = SOURCE_TYPE_CDN - - boss_proxy = BOSS.get_proxy() - boss_proxy.GetModules.return_value = [SUBSCRIPTION.service_name] - - subscription_proxy = SUBSCRIPTION.get_proxy() - subscription_proxy.IsSubscriptionAttached = False - - assert is_cdn_registration_required(dnf_payload) is True - - subscription_proxy.IsSubscriptionAttached = True - assert is_cdn_registration_required(dnf_payload) is False - - boss_proxy.GetModules.return_value = [] - assert is_cdn_registration_required(dnf_payload) is False - - source_proxy.Type = SOURCE_TYPE_CDROM - assert is_cdn_registration_required(dnf_payload) is False - @patch("pyanaconda.ui.lib.subscription.switch_source") @patch("pyanaconda.modules.common.task.sync_run_task") @patch("pyanaconda.core.threads.thread_manager.wait") @@ -842,8 +645,6 @@ def test_unsupported_payload_reg(self, get_proxy, thread_mgr_wait, run_task, swi subscription_proxy = get_proxy.return_value # simulate the system not being registered subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.PASSWORD_REQUEST # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -854,16 +655,13 @@ def test_unsupported_payload_reg(self, get_proxy, thread_mgr_wait, run_task, swi print(error_callback.mock_calls) progress_callback.assert_has_calls( [call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION), call(SubscriptionPhase.DONE)] ) # we were successful, so no error callback calls error_callback.assert_not_called() # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterUsernamePasswordWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - subscription_proxy.ParseAttachedSubscriptionsWithTask.assert_called_once() + subscription_proxy.RegisterAndSubscribeWithTask.assert_called_once() # not tried to set the CDN source switch_source.assert_not_called() # and tried to run them @@ -908,7 +706,7 @@ def test_unsupported_payload_unregister(self, get_proxy, run_task, switch_source @patch("pyanaconda.core.threads.thread_manager.wait") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") def test_register_payload_restart(self, get_proxy, thread_mgr_wait, run_task, switch_source, - restart_thread): + start_thread): """Test payload restart at registration.""" payload = Mock() payload.type = PAYLOAD_TYPE_DNF @@ -919,8 +717,6 @@ def test_register_payload_restart(self, get_proxy, thread_mgr_wait, run_task, sw subscription_proxy = get_proxy.return_value # simulate the system not being registered subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.PASSWORD_REQUEST # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -932,21 +728,18 @@ def test_register_payload_restart(self, get_proxy, thread_mgr_wait, run_task, sw print(error_callback.mock_calls) progress_callback.assert_has_calls( [call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION), call(SubscriptionPhase.DONE)] ) # we were successful, so no error callback calls error_callback.assert_not_called() # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterUsernamePasswordWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - subscription_proxy.ParseAttachedSubscriptionsWithTask.assert_called_once() + subscription_proxy.RegisterAndSubscribeWithTask.assert_called_once() # and tried to run them run_task.assert_called() # tried to restart the payload as CDN is set and we need to restart # the payload to make it usable - restart_thread.assert_called_once() + start_thread.assert_called_once() @patch("pyanaconda.payload.manager.payloadMgr.start") @patch("pyanaconda.ui.lib.subscription.switch_source") @@ -954,7 +747,7 @@ def test_register_payload_restart(self, get_proxy, thread_mgr_wait, run_task, sw @patch("pyanaconda.core.threads.thread_manager.wait") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") def test_register_payload_no_restart(self, get_proxy, thread_mgr_wait, run_task, switch_source, - restart_thread): + start_thread): """Test payload no restart at registration if not requested.""" payload = Mock() payload.type = PAYLOAD_TYPE_DNF @@ -965,8 +758,6 @@ def test_register_payload_no_restart(self, get_proxy, thread_mgr_wait, run_task, subscription_proxy = get_proxy.return_value # simulate the system not being registered subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.PASSWORD_REQUEST # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -978,20 +769,17 @@ def test_register_payload_no_restart(self, get_proxy, thread_mgr_wait, run_task, print(error_callback.mock_calls) progress_callback.assert_has_calls( [call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION), call(SubscriptionPhase.DONE)] ) # we were successful, so no error callback calls error_callback.assert_not_called() # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterUsernamePasswordWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - subscription_proxy.ParseAttachedSubscriptionsWithTask.assert_called_once() + subscription_proxy.RegisterAndSubscribeWithTask.assert_called_once() # and tried to run them run_task.assert_called() # told the helper method not to restart - restart_thread.assert_not_called() + start_thread.assert_not_called() @patch("pyanaconda.payload.manager.payloadMgr.start") @patch("pyanaconda.ui.lib.subscription.switch_source") @@ -999,7 +787,7 @@ def test_register_payload_no_restart(self, get_proxy, thread_mgr_wait, run_task, @patch("pyanaconda.core.threads.thread_manager.wait") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") def test_register_no_payload_restart(self, get_proxy, thread_mgr_wait, run_task, switch_source, - restart_thread): + start_thread): """Test there is no payload restart during registration for non CDN source.""" payload = Mock() payload.type = PAYLOAD_TYPE_DNF @@ -1010,8 +798,6 @@ def test_register_no_payload_restart(self, get_proxy, thread_mgr_wait, run_task, subscription_proxy = get_proxy.return_value # simulate the system not being registered subscription_proxy.IsRegistered = False - # simulate subscription request - subscription_proxy.SubscriptionRequest = self.PASSWORD_REQUEST # run the function register_and_subscribe(payload=payload, progress_callback=progress_callback, @@ -1023,16 +809,13 @@ def test_register_no_payload_restart(self, get_proxy, thread_mgr_wait, run_task, print(error_callback.mock_calls) progress_callback.assert_has_calls( [call(SubscriptionPhase.REGISTER), - call(SubscriptionPhase.ATTACH_SUBSCRIPTION), call(SubscriptionPhase.DONE)] ) # we were successful, so no error callback calls error_callback.assert_not_called() # we should have requested the appropriate tasks subscription_proxy.SetRHSMConfigWithTask.assert_called_once() - subscription_proxy.RegisterUsernamePasswordWithTask.assert_called_once() - subscription_proxy.AttachSubscriptionWithTask.assert_called_once() - subscription_proxy.ParseAttachedSubscriptionsWithTask.assert_called_once() + subscription_proxy.RegisterAndSubscribeWithTask.assert_called_once() # not tried to set the CDN source switch_source.assert_not_called() # and tried to run them @@ -1040,12 +823,12 @@ def test_register_no_payload_restart(self, get_proxy, thread_mgr_wait, run_task, # Payload should have not been restarted as URL is the current # installation source. This usually means custom user provided URL # that is already all configured and we should not touch it. - restart_thread.assert_not_called() + start_thread.assert_not_called() @patch("pyanaconda.payload.manager.payloadMgr.start") @patch("pyanaconda.modules.common.task.sync_run_task") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_unregister_payload_restart_CDN(self, get_proxy, run_task, restart_thread): + def test_unregister_payload_restart_CDN(self, get_proxy, run_task, start_thread): """Test payload restart at unregistration - CDN source.""" payload = Mock() payload.type = PAYLOAD_TYPE_DNF @@ -1078,13 +861,13 @@ def test_unregister_payload_restart_CDN(self, get_proxy, run_task, restart_threa # source being the CDN, as it is no longer usable without # registration and we need payload restart for this # to propagate to the Source and Software spokes - restart_thread.assert_called_once() + start_thread.assert_called_once() @patch("pyanaconda.ui.lib.subscription.switch_source") @patch("pyanaconda.payload.manager.payloadMgr.start") @patch("pyanaconda.modules.common.task.sync_run_task") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_unregister_payload_restart_switched(self, get_proxy, run_task, restart_thread, + def test_unregister_payload_restart_switched(self, get_proxy, run_task, start_thread, switch_source): """Test payload restart at unregistration - source switched.""" payload = Mock() @@ -1121,13 +904,13 @@ def test_unregister_payload_restart_switched(self, get_proxy, run_task, restart_ # (happens for the CDROM source at the moment) # and we need payload restart for this to propagate # to the Source and Software spokes - restart_thread.assert_called_once() + start_thread.assert_called_once() @patch("pyanaconda.ui.lib.subscription.switch_source") @patch("pyanaconda.payload.manager.payloadMgr.start") @patch("pyanaconda.modules.common.task.sync_run_task") @patch("pyanaconda.modules.common.constants.services.SUBSCRIPTION.get_proxy") - def test_unregister_on_payload_restart(self, get_proxy, run_task, restart_thread, + def test_unregister_on_payload_restart(self, get_proxy, run_task, start_thread, switch_source): """Test payload restart at unregistration - no restart needed.""" payload = Mock() @@ -1162,4 +945,4 @@ def test_unregister_on_payload_restart(self, get_proxy, run_task, restart_thread # we should not have tried to restart the payload # as we are on the URL source and we don't need to change # anything about it when we unregister - restart_thread.assert_not_called() + start_thread.assert_not_called()