diff --git a/config/kdump.py b/config/kdump.py index c61e79099d..0b7e3cb5a6 100644 --- a/config/kdump.py +++ b/config/kdump.py @@ -1,18 +1,15 @@ import sys - import click from utilities_common.cli import AbbreviationGroup, pass_db - - # # 'kdump' group ('sudo config kdump ...') # + @click.group(cls=AbbreviationGroup, name="kdump") def kdump(): """Configure the KDUMP mechanism""" pass - def check_kdump_table_existence(kdump_table): """Checks whether the 'KDUMP' table is configured in Config DB. @@ -32,10 +29,16 @@ def check_kdump_table_existence(kdump_table): sys.exit(2) +def echo_reboot_warning(): + """Prints the warning message about reboot requirements.""" + click.echo("KDUMP configuration changes may require a reboot to take effect.") + click.echo("Save SONiC configuration using 'config save' before issuing the reboot command.") # # 'disable' command ('sudo config kdump disable') # -@kdump.command(name="disable", short_help="Disable the KDUMP mechanism") + + +@kdump.command(name="disable", help="Disable the KDUMP mechanism") @pass_db def kdump_disable(db): """Disable the KDUMP mechanism""" @@ -43,14 +46,14 @@ def kdump_disable(db): check_kdump_table_existence(kdump_table) db.cfgdb.mod_entry("KDUMP", "config", {"enabled": "false"}) - click.echo("KDUMP configuration changes may require a reboot to take effect.") - click.echo("Save SONiC configuration using 'config save' before issuing the reboot command.") - + echo_reboot_warning() # # 'enable' command ('sudo config kdump enable') # -@kdump.command(name="enable", short_help="Enable the KDUMP mechanism") + + +@kdump.command(name="enable", help="Enable the KDUMP mechanism") @pass_db def kdump_enable(db): """Enable the KDUMP mechanism""" @@ -58,14 +61,14 @@ def kdump_enable(db): check_kdump_table_existence(kdump_table) db.cfgdb.mod_entry("KDUMP", "config", {"enabled": "true"}) - click.echo("KDUMP configuration changes may require a reboot to take effect.") - click.echo("Save SONiC configuration using 'config save' before issuing the reboot command.") - + echo_reboot_warning() # # 'memory' command ('sudo config kdump memory ...') # -@kdump.command(name="memory", short_help="Configure the memory for KDUMP mechanism") + + +@kdump.command(name="memory", help="Configure the memory for KDUMP mechanism") @click.argument('kdump_memory', metavar='', required=True) @pass_db def kdump_memory(db, kdump_memory): @@ -74,14 +77,14 @@ def kdump_memory(db, kdump_memory): check_kdump_table_existence(kdump_table) db.cfgdb.mod_entry("KDUMP", "config", {"memory": kdump_memory}) - click.echo("KDUMP configuration changes may require a reboot to take effect.") - click.echo("Save SONiC configuration using 'config save' before issuing the reboot command.") - + echo_reboot_warning() # -# 'num_dumps' command ('sudo config keump num_dumps ...') +# 'num_dumps' command ('sudo config kdump num_dumps ...') # -@kdump.command(name="num_dumps", short_help="Configure the maximum dump files of KDUMP mechanism") + + +@kdump.command(name="num_dumps", help="Configure the maximum dump files of KDUMP mechanism") @click.argument('kdump_num_dumps', metavar='', required=True, type=int) @pass_db def kdump_num_dumps(db, kdump_num_dumps): @@ -90,3 +93,112 @@ def kdump_num_dumps(db, kdump_num_dumps): check_kdump_table_existence(kdump_table) db.cfgdb.mod_entry("KDUMP", "config", {"num_dumps": kdump_num_dumps}) + echo_reboot_warning() + + +@kdump.command(name="remote", help="Configure the remote enable/disable for KDUMP mechanism") +@click.argument('action', metavar='', required=True, type=click.STRING) # Corrected this line +@pass_db +def remote(db, action): + """Enable or disable remote kdump feature""" + kdump_table = db.cfgdb.get_table("KDUMP") + check_kdump_table_existence(kdump_table) + + # Get the current status of the remote feature as string + current_status = kdump_table["config"].get("remote", "false").lower() + + if action.lower() == 'enable': + if current_status == "true": + click.echo("Remote kdump feature is already enabled.") + else: + db.cfgdb.mod_entry("KDUMP", "config", {"remote": "true"}) + click.echo("Remote kdump feature enabled.") + echo_reboot_warning() + elif action.lower() == 'disable': + if current_status == "false": + click.echo("Remote kdump feature is already disabled.") + else: + db.cfgdb.mod_entry("KDUMP", "config", {"remote": "false"}) + click.echo("Remote kdump feature disabled.") + echo_reboot_warning() + else: + click.echo("Invalid action. Use 'enable' or 'disable'.") + + +@kdump.group(name="add", help="Add configuration items to KDUMP") +def add(): + """Group of commands to add configuration items to KDUMP""" + pass + + +@add.command(name="ssh_string", help="Add an SSH string to the KDUMP configuration") +@click.argument('ssh_string', metavar='', required=True) +@pass_db +def add_ssh_key(db, ssh_string): + """Add an SSH string to KDUMP configuration""" + + kdump_table = db.cfgdb.get_table("KDUMP") + check_kdump_table_existence(kdump_table) + current_status = kdump_table["config"].get("remote", "false").lower() + + if current_status == 'false': + click.echo("Remote feature is not enabled. Please enable the remote feature first.") + return + + # Add or update the 'ssh_key' entry in the KDUMP table + db.cfgdb.mod_entry("KDUMP", "config", {"ssh_string": ssh_string}) + click.echo(f"SSH string added to KDUMP configuration: {ssh_string}") + + +@add.command(name="ssh_path", help="Add an SSH path to the KDUMP configuration") +@click.argument('ssh_path', metavar='', required=True) +@pass_db +def add_ssh_path(db, ssh_path): + """Add an SSH path to KDUMP configuration""" + + kdump_table = db.cfgdb.get_table("KDUMP") + check_kdump_table_existence(kdump_table) + current_status = kdump_table["config"].get("remote", "false").lower() + if current_status == 'false': + click.echo("Remote feature is not enabled. Please enable the remote feature first.") + return + + # Add or update the 'ssh_key' entry in the KDUMP table + db.cfgdb.mod_entry("KDUMP", "config", {"ssh_path": ssh_path}) + click.echo(f"SSH path added to KDUMP configuration: {ssh_path}") + + +@kdump.group(name="remove", help="remove configuration items to KDUMP") +def remove(): + """Group of commands to remove configuration items to KDUMP""" + pass + + +@remove.command(name="ssh_string", help="Remove the SSH string from the KDUMP configuration") +@pass_db +def remove_ssh_string(db): + """Remove the SSH string from KDUMP configuration""" + kdump_table = db.cfgdb.get_table("KDUMP") + check_kdump_table_existence(kdump_table) + + # Check if ssh_string exists + if "ssh_string" in kdump_table["config"]: + db.cfgdb.mod_entry("KDUMP", "config", {"ssh_string": None}) + click.echo("SSH string removed from KDUMP configuration.") + else: + click.echo("SSH string not found in KDUMP configuration.") + + +@remove.command(name="ssh_path", help="Remove the SSH path from the KDUMP configuration") +@pass_db +def remove_ssh_path(db): + """Remove the SSH path from KDUMP configuration""" + kdump_table = db.cfgdb.get_table("KDUMP") + check_kdump_table_existence(kdump_table) + + # Check if ssh_string exists + if "ssh_path" in kdump_table["config"]: + db.cfgdb.mod_entry("KDUMP", "config", {"ssh_path": None}) + click.echo("SSH path removed from KDUMP configuration.") + else: + click.echo("SSH path not found in KDUMP configuration.") diff --git a/scripts/sonic-kdump-config b/scripts/sonic-kdump-config index cafda7db22..6fae501516 100755 --- a/scripts/sonic-kdump-config +++ b/scripts/sonic-kdump-config @@ -172,9 +172,23 @@ def cmd_dump_config_json(): kdump_enabled = get_kdump_administrative_mode() kdump_memory = get_kdump_memory() kdump_num_dumps = get_kdump_num_dumps() - data = { "enable" : kdump_enabled, \ - "memory" : kdump_memory, \ - "max-dumps" : int(kdump_num_dumps) } + + # Use helper functions to get new kdump features + remote_enabled = get_kdump_remote() + ssh_string = get_kdump_ssh_string() + ssh_path = get_kdump_ssh_path() + + # Construct the data dictionary + data = { + "enable": kdump_enabled, + "memory": kdump_memory, + "max-dumps": int(kdump_num_dumps), + "remote": remote_enabled, + "ssh_string": ssh_string, + "ssh_path": ssh_path + } + + # Print the JSON data print(json.dumps(data, indent=4)) def cmd_dump_kdump_records_json(): @@ -223,11 +237,18 @@ def cmd_dump_status_json(): kdump_oper_state = get_kdump_oper_mode(kdump_enabled) kdump_memory = get_kdump_memory() kdump_num_dumps = get_kdump_num_dumps() + kdump_remote = get_kdump_remote() + kdump_ssh_string = get_kdump_ssh_string() + kdump_ssh_path = get_kdump_ssh_path() + # remote SSH variables data = { "enable" : kdump_enabled, \ "current-state" : kdump_oper_state, \ "memory" : kdump_memory, \ "allocated-memory" : get_crash_kernel_size(), \ - "max-dumps" : int(kdump_num_dumps) } + "max-dumps" : int(kdump_num_dumps),\ + "remote" : kdump_remote,\ + "ssh_string" : kdump_ssh_string,\ + "ssh_path" : kdump_ssh_path } print(json.dumps(data, indent=4)) ## Query current configuration to check if kdump is enabled or disabled @@ -299,6 +320,53 @@ def get_kdump_num_dumps(): num_dumps = num return num_dumps +def get_kdump_remote(): + """Get the value of the remote feature from the KDUMP configuration.""" + config_db = ConfigDBConnector(use_unix_socket_path=True) + if config_db is not None: + config_db.connect() + table_data = config_db.get_table('KDUMP') + if table_data is not None: + config_data = table_data.get('config') + if config_data is not None: + remote_value = config_data.get('remote') + if remote_value is not None: + # Return True if the remote value is 'true', otherwise False + return remote_value.lower() == 'true' # Explicitly return boolean + + return False # Default return value if no remote setting is found + +def get_kdump_ssh_string(): + """Get the SSH string from the KDUMP configuration.""" + config_db = ConfigDBConnector(use_unix_socket_path=True) + ssh_string = None # Default value if not found + + if config_db is not None: + config_db.connect() + table_data = config_db.get_table('KDUMP') + if table_data is not None: + config_data = table_data.get('config') + if config_data is not None: + ssh_string = config_data.get('ssh_string') + + return ssh_string # Return the SSH string or None if not found + +def get_kdump_ssh_path(): + """Get the SSH path from the KDUMP configuration.""" + config_db = ConfigDBConnector(use_unix_socket_path=True) + ssh_path = None # Default value if not found + + if config_db is not None: + config_db.connect() + table_data = config_db.get_table('KDUMP') + if table_data is not None: + config_data = table_data.get('config') + if config_data is not None: + ssh_path = config_data.get('ssh_path') + + return ssh_path # Return the SSH path or None if not found + + ## Read current value for USE_KDUMP in kdump config file # # @return The integer value X from USE_KDUMP=X in /etc/default/kdump-tools @@ -362,14 +430,88 @@ def write_num_dumps(num_dumps): print_err("Error while writing KDUMP_NUM_DUMPS into %s" % kdump_cfg) sys.exit(1) +def write_kdump_remote(): + # Assuming there's a function that retrieves the remote value from the config database + remote = get_kdump_remote() # This function needs to be implemented + + kdump_cfg = '/etc/default/kdump-tools' + + if remote: + # Uncomment SSH and SSH_KEY in the /etc/default/kdump-tools file + run_command("/bin/sed -i 's/#SSH/SSH/' %s" % kdump_cfg, use_shell=True) + run_command("/bin/sed -i 's/#SSH_KEY/SSH_KEY/' %s" % kdump_cfg, use_shell=True) + print("SSH and SSH_KEY uncommented for remote configuration.") + else: + # Comment out SSH and SSH_KEY in the /etc/default/kdump-tools file + run_command("/bin/sed -i 's/SSH/#SSH/' %s" % kdump_cfg, use_shell=True) + run_command("/bin/sed -i 's/SSH_KEY/#SSH_KEY/' %s" % kdump_cfg, use_shell=True) + print("SSH and SSH_KEY commented out for local configuration.") + +def read_ssh_string(): + (rc, lines, err_str) = run_command("grep '#*SSH=.*' %s | cut -d '=' -f 2 | tr -d '\"'" % kdump_cfg, use_shell=True) + if rc == 0 and type(lines) == list and len(lines) >= 1: + try: + return lines[0].strip() # Return the SSH string (e.g., user@ip_address) + except Exception as e: + print_err('Error! Exception[%s] occurred while reading SSH_STRING from %s' % (str(e), kdump_cfg)) + sys.exit(1) + else: + print_err("Unable to read SSH_STRING from %s" % kdump_cfg) + sys.exit(1) + +def write_ssh_string(ssh_string): + ssh_string = ssh_string.replace('/', '\/') # Escape any slashes in the SSH string + cmd = "/bin/sed -i -e 's/#*SSH=.*/SSH=\"%s\"/' %s" % (ssh_string, kdump_cfg) + + (rc, lines, err_str) = run_command(cmd, use_shell=True) # Make sure to set use_shell=True + if rc == 0 and type(lines) == list and len(lines) == 0: + ssh_string_in_cfg = read_ssh_string() + if ssh_string_in_cfg != ssh_string: + print_err("Unable to write SSH_STRING into %s" % kdump_cfg) + sys.exit(1) + else: + print_err("Error while writing SSH_STRING into %s" % kdump_cfg) + sys.exit(1) + +def read_ssh_path(): + (rc, lines, err_str) = run_command("grep '#*SSH_KEY=.*' %s | cut -d '=' -f 2 | tr -d '\"'" % kdump_cfg, use_shell=True) + if rc == 0 and type(lines) == list and len(lines) >= 1: + try: + ssh_path = lines[0].strip() + if not ssh_path.startswith('/'): # Validate that the path starts with a slash + raise ValueError("Invalid SSH path format.") + return ssh_path + except Exception as e: + print_err('Error! Exception[%s] occurred while reading SSH_PATH from %s' % (str(e), kdump_cfg)) + sys.exit(1) + else: + print_err("Unable to read SSH_PATH from %s" % kdump_cfg) + sys.exit(1) + +def write_ssh_path(ssh_path): + cmd = "/bin/sed -i -e 's/#*SSH_KEY=.*/SSH_KEY=\"%s\"/' %s" % (ssh_path, kdump_cfg) + + (rc, lines, err_str) = run_command(cmd, use_shell=True) # Make sure to set use_shell=True + if rc == 0 and type(lines) == list and len(lines) == 0: + ssh_path_in_cfg = read_ssh_path() + if ssh_path_in_cfg != ssh_path: + print_err("Unable to write SSH_PATH into %s" % kdump_cfg) + sys.exit(1) + else: + print_err("Error while writing SSH_PATH into %s" % kdump_cfg) + sys.exit(1) + + ## Enable kdump # # @param verbose If True, the function will display a few additinal information # @return True if the grub/cmdline cfg has changed, and False if it has not -def kdump_enable(verbose, kdump_enabled, memory, num_dumps, image, cmdline_file): +def kdump_enable(verbose, kdump_enabled, memory, num_dumps, image, cmdline_file, remote, ssh_string, ssh_path): if verbose: print("Enabling kdump for image=[%s]" % image) + + # Existing functionality: Reading the kernel command line file try: lines = [line.rstrip('\n') for line in open(cmdline_file)] except Exception as exception: @@ -379,6 +521,7 @@ def kdump_enable(verbose, kdump_enabled, memory, num_dumps, image, cmdline_file) if verbose: print("Image index in %s=%d" % (cmdline_file, img_index)) + # Check if crashkernel is already set changed = False crash_kernel_in_cmdline = search_for_crash_kernel_in_cmdline() if verbose: @@ -386,6 +529,8 @@ def kdump_enable(verbose, kdump_enabled, memory, num_dumps, image, cmdline_file) crash_kernel_mem = search_for_crash_kernel(lines[img_index]) if verbose: print("crash_kernel_mem=[%s]" % crash_kernel_mem) + + # Add or update crashkernel memory setting if crash_kernel_mem is None: lines[img_index] += " crashkernel=%s" % memory changed = True @@ -401,23 +546,41 @@ def kdump_enable(verbose, kdump_enabled, memory, num_dumps, image, cmdline_file) lines[img_index] = lines[img_index].replace(crash_kernel_mem, memory) changed = True if verbose: - print("Replace [%s] with [%s] in %s" % (crash_kernel_mem, memory, cmdline_file)) + print("Replaced [%s] with [%s] in %s" % (crash_kernel_mem, memory, cmdline_file)) if changed: rewrite_cfg(lines, cmdline_file) + # Enable kdump write_use_kdump(1) + + # New functionality: Handle remote crash dump if "remote" is enabled + if remote: + if verbose: + print(f"Configuring remote crash dump to {ssh_string} using SSH keys from {ssh_path}") + + # Set up SSH connection with ssh_string and ssh_path + (rc, lines, err_str) = run_command(f"/usr/sbin/kdump-config set-remote {ssh_string} {ssh_path}", use_shell=False) + if rc != 0: + print_err("Error: Unable to set remote crash dump configuration", err_str) + sys.exit(1) + else: + if verbose: + print("Local crash dump configuration enabled") + + # Reload kdump configuration if needed if crash_kernel_in_cmdline is not None: (rc, lines, err_str) = run_command("/usr/sbin/kdump-config load", use_shell=False) if rc != 0: - print_err("Error Unable to unload Kdump the kernel '%s'", err_str) + print_err("Error: Unable to reload kdump configuration", err_str) sys.exit(1) return changed + ## Read kdump configuration saved in the startup configuration file # -# @param config_param If True, the function will display a few additional information +# @param config_param If True, the function will display a few additional information. # @return Value of the configuration parameter saved in the startup configuration file # @return None if the startup configuration file does not exist or the kdump # configuration parameter is not present in the file. @@ -448,14 +611,16 @@ def cmd_kdump_enable(verbose, image): kdump_enabled = get_kdump_administrative_mode() memory = get_kdump_memory() num_dumps = get_kdump_num_dumps() + remote = get_kdump_remote() + ssh_string = get_kdump_ssh_string + ssh_path = get_kdump_ssh_path if verbose: - print("configDB: kdump_enabled=%d memory=[%s] num_nums=%d" % (kdump_enabled, memory, num_dumps)) - + print("kdump enabled: %s" % kdump_enabled) if os.path.exists(grub_cfg): - return kdump_enable(verbose, kdump_enabled, memory, num_dumps, image, grub_cfg) + return kdump_enable(verbose, kdump_enabled, memory, num_dumps, image, grub_cfg, remote, ssh_string, ssh_path) elif open(machine_cfg, 'r').read().find('aboot_platform') >= 0: aboot_cfg = aboot_cfg_template % image - return kdump_enable(verbose, kdump_enabled, memory, num_dumps, image, aboot_cfg) + return kdump_enable(verbose, kdump_enabled, memory, num_dumps, image, aboot_cfg, remote, ssh_string, ssh_path) else: print("Feature not supported on this platform") return False @@ -527,10 +692,12 @@ def cmd_kdump_disable(verbose): kdump_enabled = get_kdump_administrative_mode() memory = get_kdump_memory() num_dumps = get_kdump_num_dumps() + remote = get_kdump_remote() + ssh_string = get_kdump_ssh_string() + ssh_path = get_kdump_ssh_path() if verbose: - print("configDB: kdump_enabled=%d memory=[%s] num_nums=%d" % (kdump_enabled, memory, num_dumps)) - + print("configDB: kdump_enabled=%d kdump_remote=%s memory=[%s] ssh_string=[%s] num_nums=%d ssh_path=[%ss]" % (kdump_enabled, remote, memory, ssh_string, num_dumps, ssh_path)) if os.path.exists(grub_cfg): return kdump_disable(verbose, image, grub_cfg) elif open(machine_cfg, 'r').read().find('aboot_platform') >= 0: @@ -576,6 +743,41 @@ def cmd_kdump_num_dumps(verbose, num_dumps): kdump_enabled = get_kdump_administrative_mode() kdump_memory = get_kdump_memory() +def cmd_kdump_remote(verbose): + remote = get_kdump_remote() # Retrieve the remote value from the config database + + if remote: + # Uncomment SSH and SSH_KEY in the /etc/default/kdump-tools file + run_command("/bin/sed -i 's/#SSH/SSH/' /etc/default/kdump-tools", use_shell=True) + run_command("/bin/sed -i 's/#SSH_KEY/SSH_KEY/' /etc/default/kdump-tools", use_shell=True) + if verbose: + print("SSH and SSH_KEY uncommented for remote configuration.") + else: + # Comment out SSH and SSH_KEY in the /etc/default/kdump-tools file + run_command("/bin/sed -i 's/SSH/#SSH/' /etc/default/kdump-tools", use_shell=True) + run_command("/bin/sed -i 's/SSH_KEY/#SSH_KEY/' /etc/default/kdump-tools", use_shell=True) + if verbose: + print("SSH and SSH_KEY commented out for local configuration.") + +def cmd_kdump_ssh_string(verbose, ssh_string): + if ssh_string is None: + (rc, lines, err_str) = run_command("show kdump ssh_string", use_shell=False) + print('\n'.join(lines)) + else: + current_ssh_string = read_ssh_string() + if current_ssh_string != ssh_string: + write_ssh_string(ssh_string) + print("SSH string updated. Changes will take effect after the next reboot.") + +def cmd_kdump_ssh_path(verbose, ssh_path): + if ssh_path is None: + (rc, lines, err_str) = run_command("show kdump ssh_path", use_shell=False) + print('\n'.join(lines)) + else: + current_ssh_path = read_ssh_path() + if current_ssh_path != ssh_path: + write_ssh_path(ssh_path) + print("SSH path updated. Changes will take effect after reboot.") def main(): @@ -619,6 +821,11 @@ def main(): parser.add_argument('--num_dumps', nargs='?', type=int, action='store', default=False, help='Maximum number of kernel dump files stored') + parser.add_argument('--ssh_string', nargs='?', type=str, action='store', default=False, + help='ssh_string for remote kdump') + + parser.add_argument('--ssh_path', nargs='?', type=str, action='store',default=False, + help='ssh_path for remote kdump') # Memory allocated for capture kernel on Current Image parser.add_argument('--memory', nargs='?', type=str, action='store', default=False, help='Amount of memory reserved for the capture kernel') @@ -627,6 +834,7 @@ def main(): parser.add_argument("-v", "--verbose", action='store_true', help='displays detailed kdump status information. Used with status command.') + # How many lines should we display from the kernel log parser.add_argument("-l", "--lines", default=75, type=int, help="Number of lines displayed from the kernel log") @@ -651,6 +859,12 @@ def main(): changed = cmd_kdump_disable(options.verbose) elif options.memory != False: cmd_kdump_memory(options.verbose, options.memory) + elif options.remote: + cmd_kdump_remote(options.verbose) + elif options.ssh_string: + cmd_kdump_ssh_string(options.verbose, options.ssh_string) + elif options.ssh_path: + cmd_kdump_ssh_path(options.verbos, options.ssh_path) elif options.num_dumps != False: cmd_kdump_num_dumps(options.verbose, options.num_dumps) elif options.dump_db: diff --git a/show/kdump.py b/show/kdump.py index 6eba55082e..6842441c8e 100644 --- a/show/kdump.py +++ b/show/kdump.py @@ -83,6 +83,17 @@ def config(): num_files_config = get_kdump_config("num_dumps") click.echo("Maximum number of Kdump files: {}".format(num_files_config)) + # remote SSH configs + if get_kdump_config("remote") == "true": + ssh_conn_str = get_kdump_config("ssh_string") + click.echo("Kdump ssh connection string: {}".format(ssh_conn_str)) + + ssh_prv_key = get_kdump_config("ssh_path") + click.echo("Kdump private key path: {}".format(ssh_prv_key)) + + if get_kdump_config("remote") == "false": + click.echo("Kdump ssh connection string and ssh_path not found") + def get_kdump_core_files(): """Retrieves the kernel core dump files from directory '/var/crash/'. diff --git a/tests/kdump_test.py b/tests/kdump_test.py index 18f14181b3..553b9b0f7a 100644 --- a/tests/kdump_test.py +++ b/tests/kdump_test.py @@ -1,10 +1,9 @@ -import importlib - from click.testing import CliRunner from utilities_common.db import Db -class TestKdump(object): +class TestKdump: + @classmethod def setup_class(cls): print("SETUP") @@ -13,62 +12,224 @@ def test_config_kdump_disable(self, get_cmd_module): (config, show) = get_cmd_module db = Db() runner = CliRunner() + + # Simulate command execution for 'disable' result = runner.invoke(config.config.commands["kdump"].commands["disable"], obj=db) - print(result.exit_code) assert result.exit_code == 0 - # Delete the 'KDUMP' table. + # Delete the 'KDUMP' table to test error case db.cfgdb.delete_table("KDUMP") - result = runner.invoke(config.config.commands["kdump"].commands["disable"], obj=db) - print(result.exit_code) assert result.exit_code == 1 def test_config_kdump_enable(self, get_cmd_module): (config, show) = get_cmd_module db = Db() runner = CliRunner() + + # Simulate command execution for 'enable' result = runner.invoke(config.config.commands["kdump"].commands["enable"], obj=db) - print(result.exit_code) assert result.exit_code == 0 - # Delete the 'KDUMP' table. + # Delete the 'KDUMP' table to test error case db.cfgdb.delete_table("KDUMP") - result = runner.invoke(config.config.commands["kdump"].commands["enable"], obj=db) - print(result.exit_code) assert result.exit_code == 1 def test_config_kdump_memory(self, get_cmd_module): (config, show) = get_cmd_module db = Db() runner = CliRunner() + + # Simulate command execution for 'memory' result = runner.invoke(config.config.commands["kdump"].commands["memory"], ["256MB"], obj=db) - print(result.exit_code) assert result.exit_code == 0 - # Delete the 'KDUMP' table. + # Delete the 'KDUMP' table to test error case db.cfgdb.delete_table("KDUMP") - result = runner.invoke(config.config.commands["kdump"].commands["memory"], ["256MB"], obj=db) - print(result.exit_code) assert result.exit_code == 1 def test_config_kdump_num_dumps(self, get_cmd_module): (config, show) = get_cmd_module db = Db() runner = CliRunner() + + # Simulate command execution for 'num_dumps' result = runner.invoke(config.config.commands["kdump"].commands["num_dumps"], ["10"], obj=db) - print(result.exit_code) assert result.exit_code == 0 - # Delete the 'KDUMP' table. + # Delete the 'KDUMP' table to test error case db.cfgdb.delete_table("KDUMP") - result = runner.invoke(config.config.commands["kdump"].commands["num_dumps"], ["10"], obj=db) - print(result.exit_code) assert result.exit_code == 1 + def test_config_kdump_remote_enable(self, get_cmd_module): + (config, show) = get_cmd_module + db = Db() + runner = CliRunner() + + # Initialize KDUMP table + db.cfgdb.mod_entry("KDUMP", "config", {"remote": "false"}) + + # Simulate command execution for 'remote enable' + result = runner.invoke(config.config.commands["kdump"].commands["remote"], ["enable"], obj=db) + assert result.exit_code == 0 + assert db.cfgdb.get_table("KDUMP")["config"]["remote"] == "true" + + # Test enabling again + result = runner.invoke(config.config.commands["kdump"].commands["remote"], ["enable"], obj=db) + assert result.exit_code == 0 + assert db.cfgdb.get_table("KDUMP")["config"]["remote"] == "true" # Check that it remains enabled + + def test_config_kdump_remote_disable(self, get_cmd_module): + (config, show) = get_cmd_module + db = Db() + runner = CliRunner() + + # Initialize KDUMP table + db.cfgdb.mod_entry("KDUMP", "config", {"remote": "true"}) + + # Simulate command execution for 'remote disable' + result = runner.invoke(config.config.commands["kdump"].commands["remote"], ["disable"], obj=db) + assert result.exit_code == 0 + assert db.cfgdb.get_table("KDUMP")["config"]["remote"] == "false" + + # Test disabling again + result = runner.invoke(config.config.commands["kdump"].commands["remote"], ["disable"], obj=db) + assert result.exit_code == 0 + assert db.cfgdb.get_table("KDUMP")["config"]["remote"] == "false" + + def test_config_kdump_add_ssh_string(self, get_cmd_module): + (config, show) = get_cmd_module + db = Db() + runner = CliRunner() + + # Test case where KDUMP table does not exist + ssh_string = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEArV1..." + db.cfgdb.delete_table("KDUMP") + + result = runner.invoke( + config.config.commands["kdump"].commands["add"].commands["ssh_string"], + [ssh_string], + obj=db + ) + + # Assert that the command fails when the table is missing + assert result.exit_code == 1 + assert "Unable to retrieve 'KDUMP' table from Config DB." in result.output + + # Test case when remote feature is not enabled + db.cfgdb.mod_entry("KDUMP", "config", {"remote": "false"}) + result = runner.invoke( + config.config.commands["kdump"].commands["add"].commands["ssh_string"], + [ssh_string], + obj=db + ) + + # Assert that the command fails because the remote feature is disabled + assert result.exit_code == 0 + assert "Remote feature is not enabled. Please enable the remote feature first." in result.output + + # Test case where remote feature is enabled + db.cfgdb.mod_entry("KDUMP", "config", {"remote": "true"}) + result = runner.invoke( + config.config.commands["kdump"].commands["add"].commands["ssh_string"], + [ssh_string], + obj=db + ) + + # Assert that the command executes successfully + assert result.exit_code == 0 + assert f"SSH string added to KDUMP configuration: {ssh_string}" in result.output + + # Retrieve the updated table to ensure the SSH string was added + kdump_table = db.cfgdb.get_table("KDUMP") + assert kdump_table["config"]["ssh_string"] == ssh_string + + def test_config_kdump_add_ssh_path(self, get_cmd_module): + (config, show) = get_cmd_module + db = Db() + runner = CliRunner() + + # Test case where KDUMP table does not exist + ssh_path = "/root/.ssh/id_rsa" + db.cfgdb.delete_table("KDUMP") + + result = runner.invoke( + config.config.commands["kdump"].commands["add"].commands["ssh_path"], + [ssh_path], + obj=db + ) + + # Assert that the command fails when the table is missing + assert result.exit_code == 1 + assert "Unable to retrieve 'KDUMP' table from Config DB." in result.output + + # Test case when remote feature is not enabled + db.cfgdb.mod_entry("KDUMP", "config", {"remote": "false"}) + result = runner.invoke( + config.config.commands["kdump"].commands["add"].commands["ssh_path"], + [ssh_path], + obj=db + ) + + # Assert that the command fails because the remote feature is disabled + assert result.exit_code == 0 + assert "Remote feature is not enabled. Please enable the remote feature first." in result.output + + # Test case where remote feature is enabled + db.cfgdb.mod_entry("KDUMP", "config", {"remote": "true"}) + result = runner.invoke( + config.config.commands["kdump"].commands["add"].commands["ssh_path"], + [ssh_path], + obj=db + ) + + # Assert that the command executes successfully + assert result.exit_code == 0 + assert f"SSH path added to KDUMP configuration: {ssh_path}" in result.output + + # Retrieve the updated table to ensure the SSH path was added + kdump_table = db.cfgdb.get_table("KDUMP") + assert kdump_table["config"]["ssh_path"] == ssh_path + + def test_config_kdump_remove_ssh_string(self, get_cmd_module): + (config, show) = get_cmd_module + db = Db() + runner = CliRunner() + + # Add an SSH string for testing removal + db.cfgdb.mod_entry("KDUMP", "config", {"ssh_string": "test_ssh_string"}) + + # Simulate command execution for 'remove ssh_string' + result = runner.invoke(config.config.commands["kdump"].commands["remove"].commands["ssh_string"], obj=db) + assert result.exit_code == 0 + assert "SSH string removed from KDUMP configuration." in result.output + + # Test case when SSH string does not exist + result = runner.invoke(config.config.commands["kdump"].commands["remove"].commands["ssh_string"], obj=db) + assert result.exit_code == 0 + assert "SSH string removed from KDUMP configuration." in result.output + + def test_config_kdump_remove_ssh_path(self, get_cmd_module): + (config, show) = get_cmd_module + db = Db() + runner = CliRunner() + + # Add an SSH string for testing removal + db.cfgdb.mod_entry("KDUMP", "config", {"ssh_path": "test_ssh_path"}) + + # Simulate command execution for 'remove ssh_string' + result = runner.invoke(config.config.commands["kdump"].commands["remove"].commands["ssh_path"], obj=db) + assert result.exit_code == 0 + assert "SSH path removed from KDUMP configuration." in result.output + + # Test case when SSH path does not exist + result = runner.invoke(config.config.commands["kdump"].commands["remove"].commands["ssh_path"], obj=db) + assert result.exit_code == 0 + assert "SSH path removed from KDUMP configuration." in result.output + @classmethod def teardown_class(cls): print("TEARDOWN") diff --git a/tests/sonic_kdump_config_test.py b/tests/sonic_kdump_config_test.py index ebf547fac4..48b31300d1 100644 --- a/tests/sonic_kdump_config_test.py +++ b/tests/sonic_kdump_config_test.py @@ -3,8 +3,8 @@ import sys import unittest from unittest.mock import patch, mock_open, Mock - from utilities_common.general import load_module_from_source + from sonic_installer.common import IMAGE_PREFIX TESTS_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) @@ -35,7 +35,6 @@ def test_read_num_kdumps(self, mock_run_cmd): mock_run_cmd.return_value = (0, ["0"], None) num_dumps = sonic_kdump_config.read_num_dumps() assert num_dumps == 0 - logger.info("Value of 'num_dumps' is: '{}'.".format(num_dumps)) logger.info("Expected value of 'num_dumps' is: '0'.") @@ -226,6 +225,53 @@ def test_write_num_kdump(self, mock_run_cmd, mock_read_kdump): sonic_kdump_config.write_use_kdump(0) self.assertEqual(sys_exit.exception.code, 1) + @patch('sonic_kdump_config.run_command') + @patch('sonic_kdump_config.read_ssh_string') + @patch('builtins.print') # Mock print to capture printed output + def test_cmd_kdump_ssh_string_none(self, mock_print, mock_read, mock_run): + # Mock the output of run_command when ssh_string is None + mock_run.return_value = (0, ['current_ssh_string'], '') # Simulated output + sonic_kdump_config.cmd_kdump_ssh_string(verbose=True, ssh_string=None) + + # Check that run_command was called with the correct command + mock_run.assert_called_once_with("show kdump ssh_string", use_shell=False) + + # Verify that the printed output is correct + mock_print.assert_called_once_with('current_ssh_string') + + @patch('sonic_kdump_config.run_command') + @patch('sonic_kdump_config.read_ssh_string') + @patch('sonic_kdump_config.write_ssh_string') + @patch('builtins.print') # Mock print to capture printed output + def test_cmd_kdump_ssh_string_update(self, mock_print, mock_write, mock_read, mock_run): + # Mock read_ssh_string to return the current SSH string + mock_read.return_value = 'old_ssh_string' + # Call the function with a new SSH string + sonic_kdump_config.cmd_kdump_ssh_string(verbose=True, ssh_string='new_ssh_string') + + # Check that write_ssh_string was called with the new SSH string + mock_write.assert_called_once_with('new_ssh_string') + + # Verify that the correct message is printed + mock_print.assert_called_once_with("SSH string updated. Changes will take effect after the next reboot.") + + @patch('sonic_kdump_config.run_command') + @patch('sonic_kdump_config.read_ssh_string') + @patch('sonic_kdump_config.write_ssh_string') + @patch('builtins.print') # Mock print to capture printed output + def test_cmd_kdump_ssh_string_no_update(self, mock_print, mock_write, mock_read, mock_run): + # Mock read_ssh_string to return the same SSH string provided + mock_read.return_value = 'same_ssh_string' + + # Call the function with the same SSH string + sonic_kdump_config.cmd_kdump_ssh_string(verbose=True, ssh_string='same_ssh_string') + + # Check that write_ssh_string was not called + mock_write.assert_not_called() + + # Check that no message is printed for update + mock_print.assert_not_called() + @patch("sonic_kdump_config.kdump_disable") @patch("sonic_kdump_config.get_current_image") @patch("sonic_kdump_config.get_kdump_administrative_mode") @@ -282,6 +328,299 @@ def test_get_image(self, mock_get_bootloader): return_result = sonic_kdump_config.get_next_image() self.assertEqual(sys_exit.exception.code, 1) + @patch("sonic_kdump_config.run_command") + @patch("sonic_kdump_config.get_kdump_remote") + def test_write_kdump_remote_true(self, mock_read_remote, mock_run_command): + """Test when remote is true, SSH and SSH_KEY should be uncommented.""" + mock_read_remote.return_value = True # Simulate that remote is true + + sonic_kdump_config.write_kdump_remote() # Call the function + + # Ensure the correct commands were run to uncomment SSH and SSH_KEY + mock_run_command.assert_any_call("/bin/sed -i 's/#SSH/SSH/' /etc/default/kdump-tools", use_shell=True) + mock_run_command.assert_any_call("/bin/sed -i 's/#SSH_KEY/SSH_KEY/' /etc/default/kdump-tools", use_shell=True) + self.assertEqual(mock_run_command.call_count, 2) # Ensure both commands were called + + @patch("sonic_kdump_config.run_command") + @patch("sonic_kdump_config.get_kdump_remote") + def test_write_kdump_remote_false(self, mock_read_remote, mock_run_command): + """Test when remote is false, SSH and SSH_KEY should be commented.""" + mock_read_remote.return_value = False # Simulate that remote is false + + sonic_kdump_config.write_kdump_remote() # Call the function + + # Ensure the correct commands were run to comment SSH and SSH_KEY + mock_run_command.assert_any_call("/bin/sed -i 's/SSH/#SSH/' /etc/default/kdump-tools", use_shell=True) + mock_run_command.assert_any_call("/bin/sed -i 's/SSH_KEY/#SSH_KEY/' /etc/default/kdump-tools", use_shell=True) + self.assertEqual(mock_run_command.call_count, 2) + + @patch("sonic_kdump_config.get_kdump_remote") + @patch("sonic_kdump_config.run_command") + def test_cmd_kdump_remote(self, mock_run_command, mock_read_remote): + """Tests the function `cmd_kdump_remote(...)` in script `sonic-kdump-config`.""" + + # Test case: Remote is True + mock_read_remote.return_value = True + sonic_kdump_config.cmd_kdump_remote(verbose=True) + + # Ensure the correct commands are being run + mock_run_command.assert_any_call("/bin/sed -i 's/#SSH/SSH/' /etc/default/kdump-tools", use_shell=True) + mock_run_command.assert_any_call("/bin/sed -i 's/#SSH_KEY/SSH_KEY/' /etc/default/kdump-tools", use_shell=True) + + # Test case: Remote is False + mock_read_remote.return_value = False + sonic_kdump_config.cmd_kdump_remote(verbose=True) + + # Ensure the correct commands are being run + mock_run_command.assert_any_call("/bin/sed -i 's/SSH/#SSH/' /etc/default/kdump-tools", use_shell=True) + mock_run_command.assert_any_call("/bin/sed -i 's/SSH_KEY/#SSH_KEY/' /etc/default/kdump-tools", use_shell=True) + + # Test case: Checking output messages + with patch("builtins.print") as mock_print: + sonic_kdump_config.cmd_kdump_remote(verbose=True) + mock_print.assert_called_with("SSH and SSH_KEY commented out for local configuration.") + + mock_read_remote.return_value = False + sonic_kdump_config.cmd_kdump_remote(verbose=True) + mock_print.assert_called_with("SSH and SSH_KEY commented out for local configuration.") + + @patch("sonic_kdump_config.run_command") + def test_read_ssh_string(self, mock_run_cmd): + """Tests the function `read_ssh_string(...)` in script `sonic-kdump-config`.""" + + # Test case for successful read + mock_run_cmd.return_value = (0, ['user@ip_address'], None) # Simulate successful command execution + ssh_string = sonic_kdump_config.read_ssh_string() + self.assertEqual(ssh_string, 'user@ip_address') + + # Test case for non-integer output + mock_run_cmd.return_value = (0, ['NotAString'], None) # Simulate command execution returning a non-string + ssh_string = sonic_kdump_config.read_ssh_string() + self.assertEqual(ssh_string, 'NotAString') + + # Test case for empty output + mock_run_cmd.return_value = (0, [], None) # Simulate command execution with empty output + with self.assertRaises(SystemExit) as sys_exit: + sonic_kdump_config.read_ssh_string() + self.assertEqual(sys_exit.exception.code, 1) + + # Test case for command failure + mock_run_cmd.return_value = (1, [], None) # Simulate command failure + with self.assertRaises(SystemExit) as sys_exit: + sonic_kdump_config.read_ssh_string() + self.assertEqual(sys_exit.exception.code, 1) + + @patch("sonic_kdump_config.run_command") + @patch("sonic_kdump_config.read_ssh_string") + def test_write_ssh_string(self, mock_read_ssh_string, mock_run_cmd): + """Tests the function `write_ssh_string(...)` in script `sonic-kdump-config`.""" + + # Test successful case + mock_run_cmd.return_value = (0, [], None) # Simulate successful command execution + mock_read_ssh_string.return_value = 'user@ip_address' # Simulate reading existing SSH string + + # Call the function to test + sonic_kdump_config.write_ssh_string('user@ip_address') + + # Verify that run_command was called with the correct command + expected_cmd = '/bin/sed -i -e \'s/#*SSH=.*/SSH="user@ip_address"/\' %s' % sonic_kdump_config.kdump_cfg + mock_run_cmd.assert_called_once_with(expected_cmd, use_shell=True) + + # Test case where write fails + mock_run_cmd.return_value = (1, [], None) # Simulate command failure + with self.assertRaises(SystemExit) as sys_exit: + sonic_kdump_config.write_ssh_string('user@ip_address') + self.assertEqual(sys_exit.exception.code, 1) + + # Test case where the written SSH string doesn't match the expected value + mock_run_cmd.return_value = (0, [], None) + mock_read_ssh_string.return_value = 'different_user@ip_address' # Simulate reading a different SSH string + with self.assertRaises(SystemExit) as sys_exit: + sonic_kdump_config.write_ssh_string('user@ip_address') + self.assertEqual(sys_exit.exception.code, 1) + + @patch("sonic_kdump_config.run_command") + def test_read_ssh_path(self, mock_run_cmd): + """Tests the function `read_ssh_path(...)` in script `sonic-kdump-config`.""" + + # Test successful case with valid SSH path + mock_run_cmd.return_value = (0, ['/path/to/keys'], None) + ssh_path = sonic_kdump_config.read_ssh_path() + self.assertEqual(ssh_path, '/path/to/keys') + + # Test case where SSH path is invalid + mock_run_cmd.return_value = (0, ['NotAPath'], None) + with self.assertRaises(SystemExit) as sys_exit: + ssh_path = sonic_kdump_config.read_ssh_path() + self.assertEqual(sys_exit.exception.code, 1) + + # Test case where grep fails (no SSH path found) + mock_run_cmd.return_value = (1, [], None) + with self.assertRaises(SystemExit) as sys_exit: + ssh_path = sonic_kdump_config.read_ssh_path() + self.assertEqual(sys_exit.exception.code, 1) + + @patch("sonic_kdump_config.run_command") + @patch("sonic_kdump_config.read_ssh_path") + def test_write_ssh_path(self, mock_read_ssh_path, mock_run_cmd): + """Tests the function `write_ssh_path(...)` in script `sonic-kdump-config`.""" + + # Test case: Successfully write a valid SSH path + mock_run_cmd.return_value = (0, [], None) # Simulate successful sed command + mock_read_ssh_path.return_value = '/path/to/keys' # Simulate correct SSH path read + + sonic_kdump_config.write_ssh_path('/path/to/keys') # Call function with valid path + # Ensure the correct command is being run + expected_cmd = ( + "/bin/sed -i -e 's/#*SSH_KEY=.*/SSH_KEY=\"/path/to/keys\"/' %s" + % sonic_kdump_config.kdump_cfg + ) + mock_run_cmd.assert_called_once_with(expected_cmd, use_shell=True) + + # Test case: SSH path in config doesn't match the provided one + mock_read_ssh_path.return_value = '/wrong/path' + with self.assertRaises(SystemExit) as sys_exit: + sonic_kdump_config.write_ssh_path('/path/to/keys') + self.assertEqual(sys_exit.exception.code, 1) + + # Test case: Error during sed command execution + mock_run_cmd.return_value = (1, [], "Error") + with self.assertRaises(SystemExit) as sys_exit: + sonic_kdump_config.write_ssh_path('/path/to/keys') + self.assertEqual(sys_exit.exception.code, 1) + + @patch('sonic_kdump_config.run_command') + @patch('sonic_kdump_config.read_ssh_path') + @patch('builtins.print') # Mock print to capture printed output + def test_cmd_kdump_ssh_path_none(self, mock_print, mock_read, mock_run): + # Mock the output of run_command when ssh_path is None + mock_run.return_value = (0, ['current_ssh_path'], '') # Simulated output + sonic_kdump_config.cmd_kdump_ssh_path(verbose=True, ssh_path=None) + + # Check that run_command was called with the correct command + mock_run.assert_called_once_with("show kdump ssh_path", use_shell=False) + + # Verify that the printed output is correct + mock_print.assert_called_once_with('current_ssh_path') + + @patch('builtins.open', new_callable=mock_open, read_data='loop=image-myimage crashkernel=128M') + @patch('sonic_kdump_config.run_command') + @patch('sonic_kdump_config.write_use_kdump') + @patch('sonic_kdump_config.rewrite_cfg') + @patch('sonic_kdump_config.search_for_crash_kernel_in_cmdline') + @patch('sonic_kdump_config.search_for_crash_kernel') + @patch('sonic_kdump_config.locate_image') + def test_kdump_enable_remote( + self, mock_locate, mock_search_kernel, mock_search_cmdline, + mock_rewrite, mock_write_kdump, mock_run, mock_open + ): + # Setup mocks + mock_locate.return_value = 0 # Image found at index 0 + mock_search_cmdline.return_value = None # No crashkernel set in cmdline + mock_search_kernel.return_value = None # No crashkernel set in image line + mock_run.return_value = (0, [], '') # Simulate successful remote configuration + + # Call the function with remote enabled + changed = sonic_kdump_config.kdump_enable( + verbose=True, kdump_enabled=True, memory='128M', num_dumps=3, + image='myimage', cmdline_file='cmdline.txt', + remote=True, ssh_string='user@remote', ssh_path='/path/to/keys' + ) + + # Assertions + self.assertTrue(changed) # Expect some changes to be made + mock_run.assert_called_once_with("/usr/sbin/kdump-config set-remote user@remote /path/to/keys", use_shell=False) + + @patch('builtins.open', new_callable=mock_open, read_data='loop=image-myimage crashkernel=128M') + @patch('sonic_kdump_config.run_command') + @patch('sonic_kdump_config.write_use_kdump') + @patch('sonic_kdump_config.rewrite_cfg') + @patch('sonic_kdump_config.search_for_crash_kernel_in_cmdline') + @patch('sonic_kdump_config.search_for_crash_kernel') + @patch('sonic_kdump_config.locate_image') + def test_kdump_enable_remote_error( + self, mock_locate, mock_search_kernel, mock_search_cmdline, + mock_rewrite, mock_write_kdump, mock_run, mock_open + ): + # Setup mocks + mock_locate.return_value = 0 # Image found at index 0 + mock_search_cmdline.return_value = None # No crashkernel set in cmdline + mock_search_kernel.return_value = None # No crashkernel set in image line + mock_run.return_value = (1, [], 'Error occurred') # Simulate failure + + # Expecting sys.exit to be called on failure + with self.assertRaises(SystemExit): + sonic_kdump_config.kdump_enable( + verbose=True, kdump_enabled=True, memory='128M', num_dumps=3, + image='myimage', cmdline_file='cmdline.txt', + remote=True, ssh_string='user@remote', ssh_path='/path/to/keys' + ) + + # Check that the error message was printed + mock_run.assert_called_once_with("/usr/sbin/kdump-config set-remote user@remote /path/to/keys", use_shell=False) + + @patch('builtins.open', new_callable=mock_open, read_data='loop=image-myimage crashkernel=128M') + @patch('sonic_kdump_config.run_command') + @patch('sonic_kdump_config.write_use_kdump') + @patch('sonic_kdump_config.rewrite_cfg') + @patch('sonic_kdump_config.search_for_crash_kernel_in_cmdline') + @patch('sonic_kdump_config.search_for_crash_kernel') + @patch('sonic_kdump_config.locate_image') + def test_kdump_enable_local( + self, mock_locate, mock_search_kernel, mock_search_cmdline, + mock_rewrite, mock_write_kdump, mock_run, mock_open + ): + # Setup mocks + mock_locate.return_value = 0 # Image found at index 0 + mock_search_cmdline.return_value = None # No crashkernel set in cmdline + mock_search_kernel.return_value = None # No crashkernel set in image line + + # Call the function with remote disabled + changed = sonic_kdump_config.kdump_enable( + verbose=True, kdump_enabled=True, memory='128M', num_dumps=3, + image='myimage', cmdline_file='cmdline.txt', + remote=False, ssh_string='user@remote', ssh_path='/path/to/keys' + ) + + # Assertions + + self.assertTrue(changed) # Expect some changes to be made + mock_run.assert_not_called() # Ensure no remote commands were run + + @patch('sonic_kdump_config.run_command') + @patch('sonic_kdump_config.read_ssh_path') + @patch('sonic_kdump_config.write_ssh_path') + @patch('builtins.print') # Mock print to capture printed output + def test_cmd_kdump_ssh_path_update(self, mock_print, mock_write, mock_read, mock_run): + # Mock read_ssh_path to return the current SSH path + mock_read.return_value = '/old/path/to/keys' + + # Call the function with a new SSH path + sonic_kdump_config.cmd_kdump_ssh_path(verbose=True, ssh_path='/new/path/to/keys') + + # Check that write_ssh_path was called with the new SSH path + mock_write.assert_called_once_with('/new/path/to/keys') + + # Verify that the correct message is printed + mock_print.assert_called_once_with("SSH path updated. Changes will take effect after reboot.") + + @patch('sonic_kdump_config.run_command') + @patch('sonic_kdump_config.read_ssh_path') + @patch('sonic_kdump_config.write_ssh_path') + @patch('builtins.print') # Mock print to capture printed output + def test_cmd_kdump_ssh_path_no_update(self, mock_print, mock_write, mock_read, mock_run): + # Mock read_ssh_path to return the same SSH path provided + mock_read.return_value = '/same/path/to/keys' + + # Call the function with the same SSH path + sonic_kdump_config.cmd_kdump_ssh_path(verbose=True, ssh_path='/same/path/to/keys') + + # Check that write_ssh_path was not called + mock_write.assert_not_called() + + # Check that no message is printed for update + mock_print.assert_not_called() + @patch("sonic_kdump_config.write_use_kdump") @patch("os.path.exists") def test_kdump_disable(self, mock_path_exist, mock_write_kdump):