diff --git a/config.js b/config.js index dbd09fd1a8..a37136c1d6 100644 --- a/config.js +++ b/config.js @@ -722,6 +722,30 @@ config.NSFS_RENAME_RETRIES = 3; config.NSFS_VERSIONING_ENABLED = true; config.NSFS_UPDATE_ISSUES_REPORT_ENABLED = true; +config.NSFS_GLACIER_LOGS_DIR = '/var/run/noobaa-nsfs/wal'; +config.NSFS_GLACIER_LOGS_MAX_INTERVAL = 15 * 60 * 1000; + +// NSFS_GLACIER_ENABLED can override internal autodetection and will force +// the use of restore for all objects. +config.NSFS_GLACIER_ENABLED = false; +config.NSFS_GLACIER_LOGS_ENABLED = true; +config.NSFS_GLACIER_BACKEND = 'TAPECLOUD'; + +// TAPECLOUD Glacier backend specific configs +config.NSFS_GLACIER_TAPECLOUD_BIN_DIR = '/opt/ibm/tapecloud/bin'; + +// NSFS_GLACIER_MIGRATE_INTERVAL indicates the interval between runs +// of `manage_nsfs glacier migrate` +config.NSFS_GLACIER_MIGRATE_INTERVAL = 15 * 60 * 1000; + +// NSFS_GLACIER_RESTORE_INTERVAL indicates the interval between runs +// of `manage_nsfs glacier restore` +config.NSFS_GLACIER_RESTORE_INTERVAL = 15 * 60 * 1000; + +// NSFS_GLACIER_EXPIRY_INTERVAL indicates the interval between runs +// of `manage_nsfs glacier expiry` +config.NSFS_GLACIER_EXPIRY_INTERVAL = 12 * 60 * 60 * 1000; + //////////////////////////// // NSFS NON CONTAINERIZED // //////////////////////////// @@ -742,11 +766,8 @@ config.BASE_MODE_CONFIG_DIR = 0o700; config.NSFS_WHITELIST = []; -// NSFS_RESTORE_ENABLED can override internal autodetection and will force -// the use of restore for all objects. -config.NSFS_RESTORE_ENABLED = false; -config.NSFS_HEALTH_ENDPOINT_RETRY_COUNT = 3 -config.NSFS_HEALTH_ENDPOINT_RETRY_DELAY = 10 +config.NSFS_HEALTH_ENDPOINT_RETRY_COUNT = 3; +config.NSFS_HEALTH_ENDPOINT_RETRY_DELAY = 10; //Quota config.QUOTA_LOW_THRESHOLD = 80; @@ -961,4 +982,4 @@ module.exports.reload_nsfs_nc_config = reload_nsfs_nc_config; load_nsfs_nc_config(); reload_nsfs_nc_config(); load_config_local(); -load_config_env_overrides(); \ No newline at end of file +load_config_env_overrides(); diff --git a/docs/design/NSFSGlacierStorageClass.md b/docs/design/NSFSGlacierStorageClass.md new file mode 100644 index 0000000000..3508464e3f --- /dev/null +++ b/docs/design/NSFSGlacierStorageClass.md @@ -0,0 +1,131 @@ +# NSFS Glacier Storage Class + +## Goal +- Support "GLACIER" storage class in NooBaa which should behave similar to AWS "GLACIER" storage class. +- NooBaa should allow limited support of `RestoreObject` API. + +## Approach +The current approach to support `GLACIER` storage class is to separate the implementation into two parts. +Main NooBaa process only manages metadata on the files/objects via extended attributes and maintains relevant +data in a log file. Another process (currently `manage_nsfs`) manages the actual movements of the files across +disk and tape. + +There are 3 primary flows of concern and this document will discuss all 3 of them: +1. Upload object to `GLACIER` storage class (API: `PutObject`). +2. Restore object that are uploaded to `GLACIER` storage class (API: `RestoreObject`). +3. Copy objects where source is an object stored in `GLACIER` (API: `PutObject`). + +### WAL +Important component of all the flows is the write ahead log (WAL). NooBaa has a `SimpleWAL` which as name states +is extremely simple in some senses. It does not deal with fsync issues, partial writes, holes, etc. rather just +appends data seperated by a new line character. + +`SimpleWAL` features: +1. Exposes an `append` method which adds data to the file. +2. Can perform auto rotation of the file which makes sure that a single WAL is never too huge for the +WAL consumer to consume. +3. Exposes a `process` method which allows "safe" iteration on the previous WAL files. +4. Tries to make sure that no data loss happens due to process level races. + +#### Races which are handled by the current implementation +1. `n` processes open the WAL file while a "consumer" swoops and tries to process the file affectively losing the +current writes (due to processing partially written file and ultimately invoking `unlink` on the file) - This isn't +possible as `process` method makes sure that it doesn't iterate over the "current active file". +2. `k` processes out of `n` (such that `k < n`) open the WAL while a "consumer" swoops and tries to process the +file affectively losing the current writes (due to unliking the file others hold reference to) - Although `process` +method will not protect against this as technically "current active file" is a different file but this is still **not** +possible as the "consumer" need to have an "EXCLUSIVE" lock on the files before it can process the file this makes sure +that for as long as any process is writing on the file, the "consumer" cannot consume the file and will block. +3. `k` processes out of `n` (such that `k < n`) open the WAL but before the NSFS process could get a "SHARED" lock on +the file the "consumer" process swoops in and process the files and then issues `unlink` on the file. The unlink will +not delete the file as `k` processes have open FD to the file but as soon as those processes will be done writing to +it and will close the FD, the file will be deleted which will result in lost writes - This isn't possible as `SimpleWAL` +does not allow writing to a file till it can get a lock on the file and ensure that there are `> 0` links to the file. +If there are no links then it tries to open file the again assuming that the consumer has issued `unlink` on the file +it holds the FD to. +4. Multiple processes try to swap the same file causing issues - This isn't possible as the process needs to acquire +a "swap lock" before it performs the swap which essentially serializes the operations. Further the swapping is done only +once by ensuring that the process only swaps if the current `inode` matches with the `inode` it got when it opened the +file initially, if not it skips the swapping. + +### Requirements for `TAPECLOUD` backend +1. Scripts should be placed in `config.NSFS_GLACIER_TAPECLOUD_BIN_DIR` dir. +2. `migrate` script should take a file name and perform migrations of the files mentioned in the given file. The output should comply with `eeadm migrate` command. +3. `recall` script should take a file name and perform recall of the files mentioned in the given file. The output should comply with `eeadm recall` command. +3. `task_show` script should take a task id as argument and output its status. The output should be similar to `eeadm task show -r `. +4. `scan_expired` should take a directory name and dump files in it. The files should have the names of all the files which need to be migrated back to disk. The names should be newline separated. +5. `low_free_space` script should output `true` if the disk has low free space or else should return `false`. + +### Flow 1: Upload Object to Glacier +As mentioned earlier, any operation that is related to `GLACIER` are handled in 2 phases. One phase is immediate +which is managed my the NSFS process itself while another phase is something which needs to be invoked seperately +which manages the actual movements of the file. + +#### Phase 1 +1. PutObject is requested with storage class set to `GLACIER`. +2. NooBaa rejects the request if NooBaa isn't configured to support the given storage class. This is **not** enabled +by default and needs to be enabled via `config-local.js` by setting `config.NSFS_GLACIER_ENABLED = true` and `config.NSFS_GLACIER_LOGS_ENABLED = true`. +3. NooBaa will set the storage class to `GLACIER` by setting `user.storage_class` extended attribute. +4. NooBaa creates a simple WAL (Write Ahead Log) and appends the filename to the log file. +5. Completes the upload. + +Once the upload is complete, the file sits on the disk till the second process kicks in and actually does the movement +of the file but main NooBaa process does not concerns itself with the actual file state and rather just relies on the +extended attributes to judge the state of the file. The implications of this is that NooBaa will refuse a file read operation +even if the file is on disk unless the user explicitly issues a `RestoreObject` (It should be noted that this is what AWS +does as well). + +#### Phase 2 +1. A scheduler (eg. Cron, human, script, etc) issues `node src/cmd/manage_nsfs glacier migrate --interval `. +2. The command will first acquire an "EXCLUSIVE" lock so as to ensure that only one tape management command is running at once. +3. Once the process has the lock it will start to iterate over the potentially currently inactive files. +4. Before processing a WAL file, the proceess will get an "EXCLUSIVE" lock to the file ensuring that it is indeed the only +process processing the file. +5. It will read the WAL one line at a time and will ensure the following: + 1. The file still exists. + 2. The file is still has `GLACIER` storage class. (This is can happen if the user uploads another object with `STANDARD` + storage class). + 3. The file doesn't have any of the `RestoreObject` extended attributes. This is to ensure that if the file was marked + for restoration as soon as it was uploaded then we don't perform the migration at all. This is to avoid unnecessary + work and also make sure that we don't end up racing with ourselves. +6. Once a file name passes through all the above criterions then we add its name to a temporary WAL and handover the file +name to `migrate` script which should be in `config.NSFS_GLACIER_TAPECLOUD_BIN_DIR` directory. We expect that the script will take the file name as its first parameter and will perform the migration. If the `config.NSFS_GLACIER_BACKEND` is set to `TAPECLOUD` (default) then we expect the script to output data in compliance with `eeadm migrate` command. +7. We delete the temporary WAL that we created. +8. We delete the WAL created by NSFS process **iff** there were no failures in `migrate`. In case of failures we skip the WAL +deletion as a way to retry during the next trigger of the script. It should be noted that NooBaa's `migrate` (`TAPECLOUD` backend) invocation does **not** consider `DUPLICATE TASK` an error. + +### Flow 2: Restore Object +As mentioned earlier, any operation that is related to `GLACIER` are handled in 2 phases. One phase is immediate +which is managed my the NSFS process itself while another phase is something which needs to be invoked seperately +which manages the actual movements of the file. + +#### Phase 1 +1. RestoreObject is requested with non-zero positive number of days. +2. NooBaa rejects the request if NooBaa isn't configured to support the given storage class. This is **not** enabled +by default and needs to be enabled via `config-local.js` by setting `config.NSFS_GLACIER_ENABLED = true` and `config.NSFS_GLACIER_LOGS_ENABLED = true`. +3. NooBaa performs a number of checks to ensure that the operation is valid (for example there is no already ongoing +restore request going on etc). +4. NooBaa saves the filename to a simple WAL (Write Ahead Log). +5. Returns the request with success indicating that the restore request has been accepted. + +#### Phase 2 +1. A scheduler (eg. Cron, human, script, etc) issues `node src/cmd/manage_nsfs glacier restore --interval `. +2. The command will first acquire an "EXCLUSIVE" lock so as to ensure that only one tape management command is running at once. +3. Once the process has the lock it will start to iterate over the potentially currently inactive files. +4. Before processing a WAL file, the proceess will get an "EXCLUSIVE" lock to the file ensuring that it is indeed the only +process processing the file. +5. It will read the WAL one line at a time and will store the names of the files that we expect to fail during an eeadm restore +(this can happen for example because a `RestoreObject` was issued for a file but later on that file was deleted before we could +actually process the file). +6. The WAL is handed over to `recall` script which should be present in `config.NSFS_GLACIER_TAPECLOUD_BIN_DIR` directory. We expect that the script will take the file name as its first parameter and will perform the recall. If the `config.NSFS_GLACIER_BACKEND` is set to `TAPECLOUD` (default) then we expect the script to output data in compliance with `eeadm recall` command. +7. If we get any unexpected failures then we mark it a failure and make sure we do not delete the WAL file (so as to retry later). +8. We iterate over the WAL again to set the final extended attributes. This is to make sure that we can communicate the latest with +the NSFS processes. + +### Flow 3: Copy Object with Glacier Object as copy source +This is very similar to Flow 1 with some additional checks. +If the source file is not in `GLACIER` storage class then normal procedure kicks in. +If the source file is in `GLACIER` storage class then: +- NooBaa refuses the copy if the file is not already restored (similar to AWS behaviour). +- NooBaa accepts the copy if the file is already restored (similar to AWS behaviour). + diff --git a/src/cmd/manage_nsfs.js b/src/cmd/manage_nsfs.js index 7afb916383..4318de4e19 100644 --- a/src/cmd/manage_nsfs.js +++ b/src/cmd/manage_nsfs.js @@ -18,11 +18,12 @@ const ManageCLIError = require('../manage_nsfs/manage_nsfs_cli_errors').ManageCL const NSFS_CLI_ERROR_EVENT_MAP = require('../manage_nsfs/manage_nsfs_cli_errors').NSFS_CLI_ERROR_EVENT_MAP; const ManageCLIResponse = require('../manage_nsfs/manage_nsfs_cli_responses').ManageCLIResponse; const NSFS_CLI_SUCCESS_EVENT_MAP = require('../manage_nsfs/manage_nsfs_cli_responses').NSFS_CLI_SUCCESS_EVENT_MAP; +const manage_nsfs_glacier = require('../manage_nsfs/manage_nsfs_glacier'); const bucket_policy_utils = require('../endpoint/s3/s3_bucket_policy_utils'); const nsfs_schema_utils = require('../manage_nsfs/nsfs_schema_utils'); const { print_usage } = require('../manage_nsfs/manage_nsfs_help_utils'); const { TYPES, ACTIONS, VALID_OPTIONS, OPTION_TYPE, - LIST_ACCOUNT_FILTERS, LIST_BUCKET_FILTERS} = require('../manage_nsfs/manage_nsfs_constants'); + LIST_ACCOUNT_FILTERS, LIST_BUCKET_FILTERS, GLACIER_ACTIONS } = require('../manage_nsfs/manage_nsfs_constants'); const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent; function throw_cli_error(error_code, detail, event_arg) { @@ -105,6 +106,8 @@ async function main(argv = minimist(process.argv.slice(2))) { await bucket_management(argv, from_file); } else if (type === TYPES.IP_WHITELIST) { await whitelist_ips_management(argv); + } else if (type === TYPES.GLACIER) { + await glacier_management(argv); } else { // we should not get here (we check it before) throw_cli_error(ManageCLIError.InvalidType); @@ -822,6 +825,8 @@ function validate_type_and_action(type, action) { if (!Object.values(ACTIONS).includes(action)) throw_cli_error(ManageCLIError.InvalidAction); } else if (type === TYPES.IP_WHITELIST) { if (action !== '') throw_cli_error(ManageCLIError.InvalidAction); + } else if (type === TYPES.GLACIER) { + if (!Object.values(GLACIER_ACTIONS).includes(action)) throw_cli_error(ManageCLIError.InvalidAction); } } @@ -838,6 +843,8 @@ function validate_no_extra_options(type, action, input_options) { valid_options = VALID_OPTIONS.bucket_options[action]; } else if (type === TYPES.ACCOUNT) { valid_options = VALID_OPTIONS.account_options[action]; + } else if (type === TYPES.GLACIER) { + valid_options = VALID_OPTIONS.glacier_options[action]; } else { valid_options = VALID_OPTIONS.whitelist_options; } @@ -942,6 +949,26 @@ function _validate_access_keys(argv) { })) throw_cli_error(ManageCLIError.AccountSecretKeyFlagComplexity); } +async function glacier_management(argv) { + const action = argv._[1] || ''; + await manage_glacier_operations(action, argv); +} + +async function manage_glacier_operations(action, argv) { + switch (action) { + case GLACIER_ACTIONS.MIGRATE: + await manage_nsfs_glacier.process_migrations(); + break; + case GLACIER_ACTIONS.RESTORE: + await manage_nsfs_glacier.process_restores(); + break; + case GLACIER_ACTIONS.EXPIRY: + await manage_nsfs_glacier.process_expiry(); + break; + default: + throw_cli_error(ManageCLIError.InvalidGlacierOperation); + } +} exports.main = main; if (require.main === module) main(); diff --git a/src/deploy/NVA_build/standalone_deploy.sh b/src/deploy/NVA_build/standalone_deploy.sh index 9a84368934..482c7be44d 100755 --- a/src/deploy/NVA_build/standalone_deploy.sh +++ b/src/deploy/NVA_build/standalone_deploy.sh @@ -44,11 +44,19 @@ function execute() { fi } +function sigterm() { + echo "SIGTERM received" + kill -TERM $(jobs -p) + exit 0 +} + function main() { if [ "${STANDALONE_SETUP_ENV}" = "true" ]; then setup_env fi + trap sigterm SIGTERM + # Start NooBaa processes execute "npm run web" web.log sleep 10 diff --git a/src/manage_nsfs/manage_nsfs_cli_errors.js b/src/manage_nsfs/manage_nsfs_cli_errors.js index ae1c7fce6a..8ba2876fbc 100644 --- a/src/manage_nsfs/manage_nsfs_cli_errors.js +++ b/src/manage_nsfs/manage_nsfs_cli_errors.js @@ -246,6 +246,12 @@ ManageCLIError.InvalidAccountDistinguishedName = Object.freeze({ message: 'Account distinguished name was not found', http_code: 400, }); +ManageCLIError.InvalidGlacierOperation = Object.freeze({ + code: 'InvalidGlacierOperation', + message: 'only "migrate", "restore" and "expiry" subcommands are supported', + http_code: 400, +}); + //////////////////////// //// BUCKET ERRORS ///// diff --git a/src/manage_nsfs/manage_nsfs_constants.js b/src/manage_nsfs/manage_nsfs_constants.js index 277dd42d6b..64a95a00b2 100644 --- a/src/manage_nsfs/manage_nsfs_constants.js +++ b/src/manage_nsfs/manage_nsfs_constants.js @@ -4,7 +4,8 @@ const TYPES = { ACCOUNT: 'account', BUCKET: 'bucket', - IP_WHITELIST: 'whitelist' + IP_WHITELIST: 'whitelist', + GLACIER: 'glacier', }; const ACTIONS = { @@ -15,6 +16,12 @@ const ACTIONS = { STATUS: 'status' }; +const GLACIER_ACTIONS = { + MIGRATE: 'migrate', + RESTORE: 'restore', + EXPIRY: 'expiry', +}; + const GLOBAL_CONFIG_ROOT = 'config_root'; const GLOBAL_CONFIG_OPTIONS = new Set(['from_file', GLOBAL_CONFIG_ROOT, 'config_root_backend']); @@ -34,11 +41,18 @@ const VALID_OPTIONS_BUCKET = { 'status': new Set(['name', GLOBAL_CONFIG_ROOT]), }; +const VALID_OPTIONS_GLACIER = { + 'migrate': new Set([ GLOBAL_CONFIG_ROOT]), + 'restore': new Set([ GLOBAL_CONFIG_ROOT]), + 'expiry': new Set([ GLOBAL_CONFIG_ROOT]), +}; + const VALID_OPTIONS_WHITELIST = new Set(['ips', GLOBAL_CONFIG_ROOT]); const VALID_OPTIONS = { account_options: VALID_OPTIONS_ACCOUNT, bucket_options: VALID_OPTIONS_BUCKET, + glacier_options: VALID_OPTIONS_GLACIER, whitelist_options: VALID_OPTIONS_WHITELIST, }; @@ -70,6 +84,7 @@ const LIST_BUCKET_FILTERS = ['name']; // EXPORTS exports.TYPES = TYPES; exports.ACTIONS = ACTIONS; +exports.GLACIER_ACTIONS = GLACIER_ACTIONS; exports.VALID_OPTIONS = VALID_OPTIONS; exports.OPTION_TYPE = OPTION_TYPE; diff --git a/src/manage_nsfs/manage_nsfs_glacier.js b/src/manage_nsfs/manage_nsfs_glacier.js new file mode 100644 index 0000000000..0469a5b51c --- /dev/null +++ b/src/manage_nsfs/manage_nsfs_glacier.js @@ -0,0 +1,166 @@ +/* Copyright (C) 2024 NooBaa */ +'use strict'; + +const path = require('path'); +const { PersistentLogger } = require('../util/persistent_logger'); +const config = require('../../config'); +const nb_native = require('../util/nb_native'); +const { GlacierBackend } = require('../sdk/nsfs_glacier_backend/backend'); +const { getGlacierBackend } = require('../sdk/nsfs_glacier_backend/helper'); +const native_fs_utils = require('../util/native_fs_utils'); + +const CLUSTER_LOCK = 'cluster.lock'; +const SCAN_LOCK = 'scan.lock'; + +const RESTORE_TIMESTAMP_FILE = 'restore.timestamp'; +const MIGRATE_TIMESTAMP_FILE = 'migrate.timestamp'; +const EXPIRY_TIMESTAMP_FILE = 'expiry.timestamp'; + +async function process_migrations() { + const fs_context = native_fs_utils.get_process_fs_context(); + + await lock_and_run(fs_context, CLUSTER_LOCK, async () => { + if ( + await low_free_space() || + await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, MIGRATE_TIMESTAMP_FILE) + ) { + await run_glacier_migrations(fs_context); + await record_current_time(fs_context, MIGRATE_TIMESTAMP_FILE); + } + }); +} + +/** + * run_tape_migrations reads the migration WALs and attempts to migrate the + * files mentioned in the WAL. + * @param {nb.NativeFSContext} fs_context + */ +async function run_glacier_migrations(fs_context) { + // This WAL is getting opened only so that we can process all the prcess WAL entries + const wal = new PersistentLogger( + config.NSFS_GLACIER_LOGS_DIR, + GlacierBackend.MIGRATE_WAL_NAME, + { disable_rotate: true, locking: 'EXCLUSIVE' }, + ); + + const backend = getGlacierBackend(); + await wal.process_inactive(async file => backend.migrate(fs_context, file)); +} + +async function process_restores() { + const fs_context = native_fs_utils.get_process_fs_context(); + + await lock_and_run(fs_context, CLUSTER_LOCK, async () => { + if ( + await low_free_space() || + !(await time_exceeded(fs_context, config.NSFS_GLACIER_RESTORE_INTERVAL, RESTORE_TIMESTAMP_FILE)) + ) return; + + + await run_glacier_restore(fs_context); + await record_current_time(fs_context, RESTORE_TIMESTAMP_FILE); + }); +} + +/** + * run_tape_restore reads the restore WALs and attempts to restore the + * files mentioned in the WAL. + * @param {nb.NativeFSContext} fs_context + */ +async function run_glacier_restore(fs_context) { + // This WAL is getting opened only so that we can process all the prcess WAL entries + const wal = new PersistentLogger( + config.NSFS_GLACIER_LOGS_DIR, + GlacierBackend.RESTORE_WAL_NAME, + { disable_rotate: true, locking: 'EXCLUSIVE' }, + ); + + const backend = getGlacierBackend(); + await wal.process_inactive(async file => backend.restore(fs_context, file)); +} + +async function process_expiry() { + const fs_context = native_fs_utils.get_process_fs_context(); + + await lock_and_run(fs_context, SCAN_LOCK, async () => { + if (!(await time_exceeded(fs_context, config.NSFS_GLACIER_EXPIRY_INTERVAL, EXPIRY_TIMESTAMP_FILE))) return; + + + await run_glacier_expiry(fs_context); + await record_current_time(fs_context, EXPIRY_TIMESTAMP_FILE); + }); +} + +async function run_glacier_expiry(fs_context) { + const backend = getGlacierBackend(); + await backend.expiry(fs_context); +} + +/** + * time_exceeded returns true if the time between last run recorded in the given + * timestamp_file and now is greater than the given interval. + * @param {nb.NativeFSContext} fs_context + * @param {number} interval + * @param {string} timestamp_file + * @returns {Promise} + */ +async function time_exceeded(fs_context, interval, timestamp_file) { + try { + const { data } = await nb_native().fs.readFile(fs_context, path.join(config.NSFS_GLACIER_LOGS_DIR, timestamp_file)); + const lastrun = new Date(data.toString()); + + if (lastrun.getTime() + interval < Date.now()) return true; + } catch (error) { + console.error('failed to read last run timestamp:', error); + if (error.code === 'ENOENT') return true; + + throw error; + } + + return false; +} + +/** + * low_free_space returns true if the default backend has low disk space + * @returns {Promise} + */ +async function low_free_space() { + const backend = getGlacierBackend(); + return backend.low_free_space(); +} + +/** + * record_current_time stores the current timestamp in ISO format into + * the given timestamp file + * @param {nb.NativeFSContext} fs_context + * @param {string} timestamp_file + */ +async function record_current_time(fs_context, timestamp_file) { + await nb_native().fs.writeFile( + fs_context, + path.join(config.NSFS_GLACIER_LOGS_DIR, timestamp_file), + Buffer.from(new Date().toISOString()), + ); +} + +/** + * lock_and_run acquires a flock and calls the given callback after + * acquiring the lock + * @param {nb.NativeFSContext} fs_context + * @param {string} lockfilename + * @param {Function} cb + */ +async function lock_and_run(fs_context, lockfilename, cb) { + const lockfd = await nb_native().fs.open(fs_context, path.join(config.NSFS_GLACIER_LOGS_DIR, lockfilename), 'w'); + + try { + await lockfd.flock(fs_context, 'EXCLUSIVE'); + await cb(); + } finally { + await lockfd.close(fs_context); + } +} + +exports.process_migrations = process_migrations; +exports.process_restores = process_restores; +exports.process_expiry = process_expiry; diff --git a/src/manage_nsfs/manage_nsfs_help_utils.js b/src/manage_nsfs/manage_nsfs_help_utils.js index 4f387a9a22..983dfa5aa0 100644 --- a/src/manage_nsfs/manage_nsfs_help_utils.js +++ b/src/manage_nsfs/manage_nsfs_help_utils.js @@ -1,7 +1,7 @@ /* Copyright (C) 2024 NooBaa */ 'use strict'; -const { TYPES, ACTIONS } = require('./manage_nsfs_constants'); +const { TYPES, ACTIONS, GLACIER_ACTIONS } = require('./manage_nsfs_constants'); const HELP = ` Help: @@ -183,6 +183,26 @@ Flags: --name (optional) Filter the list based on the provided bucket name `; +const GLACIER_OPTIONS = ` +Usage: + manage_nsfs glacier [options] +`; + +const GLACIER_MIGRATE_OPTIONS = ` +Glacier Migrate Options: + --interval (default none) Run the operation if "interval" milliseconds have passed since last run +`; + +const GLACIER_RESTORE_OPTIONS = ` +Glacier Restore Options: + --interval (default none) Run the operation if "interval" milliseconds have passed since last run +`; + +const GLACIER_EXPIRY_OPTIONS = ` +Glacier Expiry Options: + --interval (default none) Run the operation if "interval" milliseconds have passed since last run +`; + /** * print_usage would print the help according to the arguments that were passed * @param {string} type @@ -199,6 +219,9 @@ function print_usage(type, action) { case TYPES.IP_WHITELIST: process.stdout.write(WHITELIST_FLAGS.trimStart()); break; + case TYPES.GLACIER: + print_help_glacier(action); + break; default: process.stdout.write(HELP + '\n'); process.stdout.write(USAGE.trimStart() + '\n'); @@ -265,5 +288,21 @@ function print_help_bucket(action) { process.exit(0); } +function print_help_glacier(action) { + switch (action) { + case GLACIER_ACTIONS.MIGRATE: + process.stdout.write(GLACIER_MIGRATE_OPTIONS.trimStart()); + break; + case GLACIER_ACTIONS.RESTORE: + process.stdout.write(GLACIER_RESTORE_OPTIONS.trimStart()); + break; + case GLACIER_ACTIONS.EXPIRY: + process.stdout.write(GLACIER_EXPIRY_OPTIONS.trimStart()); + break; + default: + process.stdout.write(GLACIER_OPTIONS.trimStart()); + } +} + // EXPORTS exports.print_usage = print_usage; diff --git a/src/native/fs/fs_napi.cpp b/src/native/fs/fs_napi.cpp index e31655d54a..59237ed425 100644 --- a/src/native/fs/fs_napi.cpp +++ b/src/native/fs/fs_napi.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -1333,6 +1334,7 @@ struct FileWrap : public Napi::ObjectWrap InstanceMethod<&FileWrap::unlinkfileat>("unlinkfileat"), InstanceMethod<&FileWrap::stat>("stat"), InstanceMethod<&FileWrap::fsync>("fsync"), + InstanceMethod<&FileWrap::flock>("flock"), InstanceAccessor<&FileWrap::getfd>("fd"), })); constructor.SuppressDestruct(); @@ -1361,6 +1363,7 @@ struct FileWrap : public Napi::ObjectWrap Napi::Value stat(const Napi::CallbackInfo& info); Napi::Value fsync(const Napi::CallbackInfo& info); Napi::Value getfd(const Napi::CallbackInfo& info); + Napi::Value flock(const Napi::CallbackInfo& info); }; Napi::FunctionReference FileWrap::constructor; @@ -1680,6 +1683,34 @@ struct FileFsync : public FSWrapWorker } }; +struct FileFlock : public FSWrapWorker +{ + int lock_mode; + FileFlock(const Napi::CallbackInfo& info) + : FSWrapWorker(info) + , lock_mode(LOCK_SH) + { + if (info.Length() > 1 && !info[1].IsUndefined()) { + auto mode = info[1].As().Utf8Value(); + if (mode == "EXCLUSIVE") { + lock_mode = LOCK_EX; + } else if (mode == "UNLOCK") { + lock_mode = LOCK_UN; + } else { + lock_mode = LOCK_SH; + } + } + + Begin(XSTR() << "FileFlock " << DVAL(_wrap->_path)); + } + virtual void Work() + { + int fd = _wrap->_fd; + CHECK_WRAP_FD(fd); + SYSCALL_OR_RETURN(flock(fd, lock_mode)); + } +}; + struct RealPath : public FSWorker { std::string _path; @@ -1826,6 +1857,12 @@ FileWrap::fsync(const Napi::CallbackInfo& info) return api(info); } +Napi::Value +FileWrap::flock(const Napi::CallbackInfo& info) +{ + return api(info); +} + /** * */ diff --git a/src/sdk/namespace_blob.js b/src/sdk/namespace_blob.js index e0b0a340af..293a915ce5 100644 --- a/src/sdk/namespace_blob.js +++ b/src/sdk/namespace_blob.js @@ -63,7 +63,7 @@ class NamespaceBlob { return `https://${this.account_name}.blob.core.windows.net/${container}/${blob}${sas_string}`; } - is_server_side_copy(other, params) { + is_server_side_copy(other, other_md, params) { return other instanceof NamespaceBlob && this.connection_string === other.connection_string; } diff --git a/src/sdk/namespace_cache.js b/src/sdk/namespace_cache.js index aa79077b5b..2da276f224 100644 --- a/src/sdk/namespace_cache.js +++ b/src/sdk/namespace_cache.js @@ -56,7 +56,7 @@ class NamespaceCache { return this.namespace_hub.get_bucket(); } - is_server_side_copy(other, params) { + is_server_side_copy(other, other_md, params) { return other instanceof NamespaceCache && this.namespace_hub === other.namespace_hub && this.namespace_nb === other.namespace_nb; diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index f949fe3c1c..0a686394bf 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -24,6 +24,8 @@ const nb_native = require('../util/nb_native'); const RpcError = require('../rpc/rpc_error'); const { S3Error } = require('../endpoint/s3/s3_errors'); const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent; +const { PersistentLogger } = require('../util/persistent_logger'); +const { GlacierBackend } = require('./nsfs_glacier_backend/backend'); const multi_buffer_pool = new buffer_utils.MultiSizeBuffersPool({ sorted_buf_sizes: [ @@ -66,35 +68,6 @@ const NULL_VERSION_ID = 'null'; const NULL_VERSION_SUFFIX = '_' + NULL_VERSION_ID; const XATTR_STORAGE_CLASS_KEY = XATTR_USER_PREFIX + 'storage_class'; -/** - * XATTR_RESTORE_REQUEST is set to a NUMBER (expiry days) by `restore_object` when - * a restore request is made. This is unset by the underlying restore process when - * it picks up the request, this is to ensure that the same object is not queued - * for restoration multiple times. - */ -const XATTR_RESTORE_REQUEST = XATTR_USER_PREFIX + 'noobaa.restore.request'; - -/** - * XATTR_RESTORE_ONGOING is set to a BOOL by the underlying restore process when it picks up - * a restore request. This is unset by the underlying restore process when it finishes - * restoring the object. - */ -const XATTR_RESTORE_ONGOING = XATTR_USER_PREFIX + 'noobaa.restore.ongoing'; - -/** - * XATTR_RESTORE_EXPIRY is set to a ISO DATE by the underlying restore process or by - * NooBaa (in case restore is issued again while the object is on disk). - * This is read by the underlying "disk evict" process to determine if the object - * should be evicted from the disk or not. - * - * NooBaa will use this date to determine if the object is on disk or not, if the - * expiry date is in the future, the object is on disk, if the expiry date is in - * the past, the object is not on disk. This may or may not represent the actual - * state of the object on disk, but is probably good enough for NooBaa's purposes - * assuming that restore request for already restored objects fails gracefully. - */ -const XATTR_RESTORE_EXPIRY = XATTR_USER_PREFIX + 'noobaa.restore.expiry'; - const versioning_status_enum = { VER_ENABLED: 'ENABLED', VER_SUSPENDED: 'SUSPENDED', @@ -495,11 +468,12 @@ class NamespaceFS { return bucket; } - is_server_side_copy(other, params) { + is_server_side_copy(other, other_md, params) { const is_server_side_copy = other instanceof NamespaceFS && other.bucket_path === this.bucket_path && - other.fs_backend === this.fs_backend && //Check that the same backend type - params.xattr_copy; // TODO, DO we need to hard link at MetadataDirective 'REPLACE'? + other.fs_backend === this.fs_backend && // Check that the same backend type + params.xattr_copy && // TODO, DO we need to hard link at MetadataDirective 'REPLACE'? + params.content_type === other_md.content_type; dbg.log2('NamespaceFS: is_server_side_copy:', is_server_side_copy); dbg.log2('NamespaceFS: other instanceof NamespaceFS:', other instanceof NamespaceFS, 'other.bucket_path:', other.bucket_path, 'this.bucket_path:', this.bucket_path, @@ -1125,6 +1099,8 @@ class NamespaceFS { // and opens upload_path (if exists) or file_path // returns upload params - params that are passed to the called functions in upload_object async _start_upload(fs_context, object_sdk, file_path, params, open_mode) { + const force_copy_fallback = await this._check_copy_storage_class(fs_context, params); + let upload_path; // upload path is needed only when open_mode is w / for copy if (open_mode === 'w' || params.copy_source) { @@ -1134,7 +1110,13 @@ class NamespaceFS { } let open_path = upload_path || file_path; - const copy_res = params.copy_source && (await this._try_copy_file(fs_context, params, file_path, upload_path)); + let copy_res; + if (force_copy_fallback) { + copy_res = copy_status_enum.FALLBACK; + } else if (params.copy_source) { + copy_res = await this._try_copy_file(fs_context, params, file_path, upload_path); + } + if (copy_res) { if (copy_res === copy_status_enum.FALLBACK) { params.copy_source.nsfs_copy_fallback(); @@ -1174,6 +1156,38 @@ class NamespaceFS { return res; } + /** + * _check_copy_storage_class returns true if a copy is needed to be forced. + * + * This might be needed if we need to manage xattr separately on the source + * object and target object (eg. GLACIER objects). + * + * NOTE: The function will throw S3 error if source object storage class is + * "GLACIER" but it is not in restored state (AWS behaviour). + * @param {nb.NativeFSContext} fs_context + * @param {Record} params + * @returns {Promise} + */ + async _check_copy_storage_class(fs_context, params) { + if (params.copy_source) { + const src_file_path = await this._find_version_path(fs_context, params.copy_source); + const stat = await nb_native().fs.stat(fs_context, src_file_path); + const src_storage_class = s3_utils.parse_storage_class(stat.xattr[XATTR_STORAGE_CLASS_KEY]); + const src_restore_status = this._get_object_restore_status(stat, src_storage_class); + + if (src_storage_class === s3_utils.STORAGE_CLASS_GLACIER) { + if (src_restore_status?.ongoing || !src_restore_status?.expiry_time) { + dbg.warn('_validate_upload: object is not restored yet', src_restore_status); + throw new S3Error(S3Error.InvalidObjectState); + } + + return true; + } + } + + return params.copy_source && params.storage_class === s3_utils.STORAGE_CLASS_GLACIER; + } + // on put part - file path is equal to upload path // put part upload should NOT contain - versioning & move to dest steps // if copy status is SAME_INODE - NO xattr replace/move_to_dest @@ -1217,6 +1231,10 @@ class NamespaceFS { fs_xattr = Object.assign(fs_xattr || {}, { [XATTR_STORAGE_CLASS_KEY]: params.storage_class }); + + if (params.storage_class === s3_utils.STORAGE_CLASS_GLACIER) { + await this.append_to_migrate_wal(file_path); + } } if (fs_xattr && !is_dir_content) await target_file.replacexattr(fs_context, fs_xattr); // fsync @@ -1947,9 +1965,11 @@ class NamespaceFS { } // Total 8 states - const restore_request = stat.xattr[XATTR_RESTORE_REQUEST] ? parseInt(stat.xattr[XATTR_RESTORE_REQUEST], 10) : null; - const restore_ongoing = stat.xattr[XATTR_RESTORE_ONGOING] === 'true'; - const restore_expiry = stat.xattr[XATTR_RESTORE_EXPIRY] ? new Date(stat.xattr[XATTR_RESTORE_EXPIRY]) : null; + const restore_request = stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST] ? + parseInt(stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST], 10) : null; + const restore_ongoing = stat.xattr[GlacierBackend.XATTR_RESTORE_ONGOING] === 'true'; + const restore_expiry = stat.xattr[GlacierBackend.XATTR_RESTORE_EXPIRY] ? + new Date(stat.xattr[GlacierBackend.XATTR_RESTORE_EXPIRY]) : null; // 5 Valid States if (restore_request) { @@ -1978,7 +1998,7 @@ class NamespaceFS { expires_on.setDate(expires_on.getDate() + params.days); await file.replacexattr(fs_context, { - [XATTR_RESTORE_EXPIRY]: expires_on.toISOString(), + [GlacierBackend.XATTR_RESTORE_EXPIRY]: expires_on.toISOString(), }); // Should result in HTTP: 200 OK @@ -1989,8 +2009,14 @@ class NamespaceFS { // essentially adding the object to the batch. if (!restore_expiry || restore_expiry <= now) { dbg.log0('namespace_fs.restore_object: state - (!restore_request, !restore_ongoing, !restore_expiry or restore_expiry <= now)'); + + // First add it to the log and then add the extended attribute as if we fail after + // this point then the restore request can be triggered again without issue but + // the reverse doesn't works. + await this.append_to_restore_wal(file_path); + await file.replacexattr(fs_context, { - [XATTR_RESTORE_REQUEST]: params.days.toString(), + [GlacierBackend.XATTR_RESTORE_REQUEST]: params.days.toString(), }); // Should result in HTTP: 202 Accepted @@ -2188,30 +2214,6 @@ class NamespaceFS { mime.getType(key) || 'application/octet-stream'; const storage_class = s3_utils.parse_storage_class(stat.xattr[XATTR_STORAGE_CLASS_KEY]); - let restore_status; - if (storage_class === s3_utils.STORAGE_CLASS_GLACIER) { - const restore_request = stat.xattr[XATTR_RESTORE_REQUEST] ? parseInt(stat.xattr[XATTR_RESTORE_REQUEST], 10) : null; - const restore_ongoing = stat.xattr[XATTR_RESTORE_ONGOING] === 'true'; - const restore_expiry = stat.xattr[XATTR_RESTORE_EXPIRY] ? new Date(stat.xattr[XATTR_RESTORE_EXPIRY]) : null; - - // Assume the state invariants to hold true - - // Restore is ongoing if, - // 1. Restore request is set - // 2. Restore ongoing is set to true - // Restore completed if - // 1. Expiry is set and expiry is in the future - if (restore_request || restore_ongoing) { - restore_status = { - ongoing: true, - }; - } else if (restore_expiry && restore_expiry > new Date()) { - restore_status = { - ongoing: false, - expiry_time: restore_expiry, - }; - } - } return { obj_id: etag, @@ -2226,7 +2228,7 @@ class NamespaceFS { is_latest, delete_marker, storage_class, - restore_status, + restore_status: this._get_object_restore_status(stat, storage_class), xattr: to_xattr(stat.xattr), // temp: @@ -2240,6 +2242,46 @@ class NamespaceFS { }; } + /** + * _get_object_restore_status returns the restore status of the object if the object is + * in "GLACIER" storage class + * @param {nb.NativeFSStats} stat stat of the object file + * @param {string} [storage_class] optional storage class of the target object + * @returns {{ ongoing: boolean, expiry_time?: Date } | undefined} + */ + _get_object_restore_status(stat, storage_class) { + if (!storage_class) storage_class = s3_utils.parse_storage_class(stat.xattr[XATTR_STORAGE_CLASS_KEY]); + + let restore_status; + if (storage_class === s3_utils.STORAGE_CLASS_GLACIER) { + const restore_request = stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST] ? + parseInt(stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST], 10) : null; + const restore_ongoing = stat.xattr[GlacierBackend.XATTR_RESTORE_ONGOING] === 'true'; + const restore_expiry = stat.xattr[GlacierBackend.XATTR_RESTORE_EXPIRY] ? + new Date(stat.xattr[GlacierBackend.XATTR_RESTORE_EXPIRY]) : null; + + // Assume the state invariants to hold true + + // Restore is ongoing if, + // 1. Restore request is set + // 2. Restore ongoing is set to true + // Restore completed if + // 1. Expiry is set and expiry is in the future + if (restore_request || restore_ongoing) { + restore_status = { + ongoing: true, + }; + } else if (restore_expiry && restore_expiry > new Date()) { + restore_status = { + ongoing: false, + expiry_time: restore_expiry, + }; + } + } + + return restore_status; + } + _get_upload_info(stat, version_id) { const etag = this._get_etag(stat); const encryption = this._get_encryption_info(stat); @@ -2986,10 +3028,50 @@ class NamespaceFS { // TODO: Upon integration with underlying systems, we should // check if archiving is actually supported or not - return config.NSFS_RESTORE_ENABLED || false; + return config.NSFS_GLACIER_ENABLED || false; + } + + async append_to_migrate_wal(entry) { + if (!config.NSFS_GLACIER_LOGS_ENABLED) return; + + await NamespaceFS.migrate_wal.append(entry); + } + + async append_to_restore_wal(entry) { + if (!config.NSFS_GLACIER_LOGS_ENABLED) return; + + await NamespaceFS.restore_wal.append(entry); + } + + static get migrate_wal() { + if (!NamespaceFS._migrate_wal) { + NamespaceFS._migrate_wal = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, GlacierBackend.MIGRATE_WAL_NAME, { + max_interval: config.NSFS_GLACIER_LOGS_MAX_INTERVAL, + locking: 'SHARED', + }); + } + + return NamespaceFS._migrate_wal; + } + + static get restore_wal() { + if (!NamespaceFS._restore_wal) { + NamespaceFS._restore_wal = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, GlacierBackend.RESTORE_WAL_NAME, { + max_interval: config.NSFS_GLACIER_LOGS_MAX_INTERVAL, + locking: 'SHARED', + }); + } + + return NamespaceFS._restore_wal; } } +/** @type {PersistentLogger} */ +NamespaceFS._migrate_wal = null; + +/** @type {PersistentLogger} */ +NamespaceFS._restore_wal = null; + module.exports = NamespaceFS; module.exports.multi_buffer_pool = multi_buffer_pool; diff --git a/src/sdk/namespace_gcp.js b/src/sdk/namespace_gcp.js index 98230b5912..a983ccff46 100644 --- a/src/sdk/namespace_gcp.js +++ b/src/sdk/namespace_gcp.js @@ -56,7 +56,7 @@ class NamespaceGCP { return this; } - is_server_side_copy(other, params) { + is_server_side_copy(other, other_md, params) { //TODO: what is the case here, what determine server side copy? return other instanceof NamespaceGCP && this.private_key === other.private_key && diff --git a/src/sdk/namespace_merge.js b/src/sdk/namespace_merge.js index 1a3ecf10bf..dc382adbdc 100644 --- a/src/sdk/namespace_merge.js +++ b/src/sdk/namespace_merge.js @@ -24,7 +24,7 @@ class NamespaceMerge { return this.namespaces.write_resource; } - is_server_side_copy(other, params) { + is_server_side_copy(other, other_md, params) { // we do not allow server side copy for merge return false; } diff --git a/src/sdk/namespace_nb.js b/src/sdk/namespace_nb.js index b03d9e8ca0..fe410217aa 100644 --- a/src/sdk/namespace_nb.js +++ b/src/sdk/namespace_nb.js @@ -37,7 +37,7 @@ class NamespaceNB { return this.active_triggers_map_by_bucket.get(bucket); } - is_server_side_copy(other, params) { + is_server_side_copy(other, other_md, params) { // in noobaa namespace case just check that other is also local (noobaa) return other instanceof NamespaceNB; } diff --git a/src/sdk/namespace_s3.js b/src/sdk/namespace_s3.js index 2aa1bcd116..d5364d4ff7 100644 --- a/src/sdk/namespace_s3.js +++ b/src/sdk/namespace_s3.js @@ -47,7 +47,7 @@ class NamespaceS3 { // for now we only send copy to AWS if both source and target are using the same access key // to aboid ACCESS_DENIED errors. a more complete solution is to always perform the server side copy // and fall back to read\write copy if access is denied - is_server_side_copy(other, params) { + is_server_side_copy(other, other_md, params) { return other instanceof NamespaceS3 && this.endpoint === other.endpoint && this.access_key === other.access_key; diff --git a/src/sdk/nb.d.ts b/src/sdk/nb.d.ts index 8292099820..d038231fc1 100644 --- a/src/sdk/nb.d.ts +++ b/src/sdk/nb.d.ts @@ -769,7 +769,7 @@ interface ObjectSDK { interface Namespace { - is_server_side_copy(other: Namespace, params: object): boolean; + is_server_side_copy(other: Namespace, other_md: ObjectInfo, params: object): boolean; is_readonly_namespace(): boolean; get_write_resource(): Namespace; get_bucket(): string; @@ -900,7 +900,7 @@ interface NativeFS { xattr_get_keys?: string[]; }, ): Promise; - statfs(fs_context: NativeFSContext, path: string): Promise; + statfs(fs_context: NativeFSContext, path: string): Promise>; realpath(fs_context: NativeFSContext, path: string): Promise; checkAccess(fs_context: NativeFSContext, path: string): Promise; getsinglexattr(fs_context: NativeFSContext, path: string, key: string): Promise; @@ -965,6 +965,7 @@ interface NativeFile { linkfileat(fs_context: NativeFSContext, path: string, fd?: number): Promise; fsync(fs_context: NativeFSContext): Promise; fd: number + flock(fs_context: NativeFSContext, operation: "EXCLUSIVE" | "SHARED" | "UNLOCK"): Promise; } interface NativeDir { diff --git a/src/sdk/nsfs_glacier_backend/backend.js b/src/sdk/nsfs_glacier_backend/backend.js new file mode 100644 index 0000000000..619aec08c7 --- /dev/null +++ b/src/sdk/nsfs_glacier_backend/backend.js @@ -0,0 +1,182 @@ +/* Copyright (C) 2024 NooBaa */ +'use strict'; + +const nb_native = require('../../util/nb_native'); + +class GlacierBackend { + static MIGRATE_TIMESTAMP_FILE = 'timestamp.migrate'; + static RESTORE_TIMESTAMP_FILE = 'timestamp.restore'; + + /** + * XATTR_RESTORE_REQUEST is set to a NUMBER (expiry days) by `restore_object` when + * a restore request is made. This is unset by the underlying restore process when + * it picks up the request, this is to ensure that the same object is not queued + * for restoration multiple times. + */ + static XATTR_RESTORE_REQUEST = 'user.noobaa.restore.request'; + + /** + * XATTR_RESTORE_EXPIRY is set to a ISO DATE by the underlying restore process or by + * NooBaa (in case restore is issued again while the object is on disk). + * This is read by the underlying "disk evict" process to determine if the object + * should be evicted from the disk or not. + * + * NooBaa will use this date to determine if the object is on disk or not, if the + * expiry date is in the future, the object is on disk, if the expiry date is in + * the past, the object is not on disk. This may or may not represent the actual + * state of the object on disk, but is probably good enough for NooBaa's purposes + * assuming that restore request for already restored objects fails gracefully. + */ + static XATTR_RESTORE_EXPIRY = 'user.noobaa.restore.expiry'; + + + /** + * XATTR_RESTORE_ONGOING is set to a BOOL by the underlying restore process when it picks up + * a restore request. This is unset by the underlying restore process when it finishes + * restoring the object. + */ + static XATTR_RESTORE_ONGOING = 'user.noobaa.restore.ongoing'; + + /** + * XATTR_RESTORE_REQUEST_STAGED is set to the same valuue as XATTR_RESTORE_REQUEST + * by a backend as a means to mark the request to be in-flight. + * + * Any backend needs to make sure that both the attributes shall NOT be set at the same + * time. + */ + static XATTR_RESTORE_REQUEST_STAGED = 'user.noobaa.restore.request.staged'; + + static STORAGE_CLASS_XATTR = 'user.storage_class'; + + static MIGRATE_WAL_NAME = 'migrate'; + static RESTORE_WAL_NAME = 'restore'; + + /** + * migrate must take a file name which will have newline seperated + * entries of filenames which needs to be migrated to GLACIER and + * should perform migration of those files if feasible. + * + * The function should return false if it needs the log file to be + * preserved. + * + * NOTE: This needs to be implemented by each backend. + * @param {nb.NativeFSContext} fs_context + * @param {string} log_file log filename + * @returns {Promise} + */ + async migrate(fs_context, log_file) { + throw new Error('Unimplementented'); + } + + /** + * restore must take a file name which will have newline seperated + * entries of filenames which needs to be restored from GLACIER and + * should perform restore of those files if feasible + * + * The function should return false if it needs the log file to be + * preserved. + * + * NOTE: This needs to be implemented by each backend. + * @param {nb.NativeFSContext} fs_context + * @param {string} log_file log filename + * @returns {Promise} + */ + async restore(fs_context, log_file) { + throw new Error('Unimplementented'); + } + + /** + * expiry moves the restored files back to glacier + * + * NOTE: This needs to be implemented by each backend. + * @param {nb.NativeFSContext} fs_context + */ + async expiry(fs_context) { + throw new Error('Unimplementented'); + } + + /** + * low_free_space must return true if the backend has + * low free space. + * + * NOTE: This may be used as a precheck before executing + * operations like `migrate` and `restore`. + * + * Example: `migrate` can be more frequently if this function + * returns `true`. + * + * @returns {Promise} + */ + async low_free_space() { + throw new Error('Unimplementented'); + } + + /** + * should_migrate returns true if the given file must be migrated + * + * The caller can pass the stat data, if none is passed, stat is + * called internally. + * @param {string} file name of the file + * @param {nb.NativeFSStats} [stat] + * @returns {Promise} + */ + async should_migrate(fs_context, file, stat) { + if (!stat) { + stat = await nb_native().fs.stat(fs_context, file, { + xattr_get_keys: [ + GlacierBackend.XATTR_RESTORE_REQUEST, + GlacierBackend.XATTR_RESTORE_EXPIRY, + GlacierBackend.XATTR_RESTORE_REQUEST_STAGED, + GlacierBackend.STORAGE_CLASS_XATTR, + ], + }); + } + + // How can this happen? + // 1. User uploads an item with GLACIER storage class + // 2. It gets logged into the WAL because of storage class + // 3. User uploads again without specifying storage class + if (stat.xattr[GlacierBackend.STORAGE_CLASS_XATTR] !== 'GLACIER') { + return false; + } + + // If any of the these extended attributes are set then that means that this object was + // marked for restore or has been restored, skip migration of these or else will result + // in races + if ( + stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST] || + stat.xattr[GlacierBackend.XATTR_RESTORE_EXPIRY] || + stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST_STAGED]) { + return false; + } + } + + /** + * should_restore returns true if the give file must be restored + * + * The caller can pass the stat data, if none is passed, stat is + * called internally. + * @param {string} file name of the file + * @param {nb.NativeFSStats} [stat] + * @returns {Promise} + */ + async should_restore(fs_context, file, stat) { + if (!stat) { + stat = await nb_native().fs.stat(fs_context, file, { + xattr_get_keys: [ + GlacierBackend.XATTR_RESTORE_REQUEST, + GlacierBackend.XATTR_RESTORE_REQUEST_STAGED, + ], + }); + } + + // Can happen if the file was uploaded again to `STANDARD` storage class + if (!stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST] && !stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST_STAGED]) { + return false; + } + + return true; + } +} + +exports.GlacierBackend = GlacierBackend; diff --git a/src/sdk/nsfs_glacier_backend/helper.js b/src/sdk/nsfs_glacier_backend/helper.js new file mode 100644 index 0000000000..526dbb8b6e --- /dev/null +++ b/src/sdk/nsfs_glacier_backend/helper.js @@ -0,0 +1,29 @@ +/* Copyright (C) 2024 NooBaa */ +'use strict'; + +/** + * This module exists so as to export the common function `getGlacierBackend` + * + * Keeping this in the generic.js creates cyclic dependency issue.w + */ + +const config = require('../../../config'); +const { TapeCloudGlacierBackend } = require('./tapecloud'); +// eslint-disable-next-line no-unused-vars +const { GlacierBackend } = require('./backend'); + +/** + * getGlacierBackend returns appropriate backend for the provided type + * @param {string} [typ] + * @returns {GlacierBackend} + */ +function getGlacierBackend(typ = config.NSFS_GLACIER_BACKEND) { + switch (typ) { + case 'TAPECLOUD': + return new TapeCloudGlacierBackend(); + default: + throw new Error('invalid backend type provide'); + } +} + +exports.getGlacierBackend = getGlacierBackend; diff --git a/src/sdk/nsfs_glacier_backend/tapecloud.js b/src/sdk/nsfs_glacier_backend/tapecloud.js new file mode 100644 index 0000000000..a1093cd983 --- /dev/null +++ b/src/sdk/nsfs_glacier_backend/tapecloud.js @@ -0,0 +1,373 @@ +/* Copyright (C) 2024 NooBaa */ +'use strict'; + +const { PersistentLogger } = require("../../util/persistent_logger"); +const { NewlineReader } = require('../../util/file_reader'); +const { GlacierBackend } = require("./backend"); +const nb_native = require('../../util/nb_native'); +const config = require('../../../config'); +const path = require("path"); +const { parse_decimal_int } = require("../../endpoint/s3/s3_utils"); +const native_fs_utils = require('../../util/native_fs_utils'); +const { exec } = require('../../util/os_utils'); + +const ERROR_DUPLICATE_TASK = "GLESM431E"; + +const MIGRATE_SCRIPT = 'migrate'; +const RECALL_SCRIPT = 'recall'; +const TASK_SHOW_SCRIPT = 'task_show'; +const SCAN_EXPIRED_SCRIPT = 'scan_expired'; +const LOW_FREE_SPACE_SCRIPT = 'low_free_space'; + +function get_bin_path(bin_name) { + return path.join(config.NSFS_GLACIER_TAPECLOUD_BIN_DIR, bin_name); +} + +async function get_task(task_id) { + return await exec(`${get_bin_path(TASK_SHOW_SCRIPT)} ${task_id}`, { return_stdout: true }); +} + +async function tapecloud_failure_handler(error) { + const { stdout } = error; + + // Find the line in the stdout which has the line 'task ID is, ' and extract id + const match = stdout.match(/task ID is (\d+)/); + if (match.length !== 2) { + throw error; + } + + const task_id = match[1]; + + // Fetch task status and see what failed + const taskshowstdout = await get_task(task_id); + return taskshowstdout + .split('\n') + .filter(line => line.startsWith("Fail")) + .map(line => { + const parsed = line.split(/\s+/); + if (parsed.length !== 6) { + throw Error('failed to parse task show'); + } + + if (parsed[1] === ERROR_DUPLICATE_TASK) { + return null; + } + + // Column 5 is the filename (refer tapecloud [eeadm] manual) + return parsed[5]; + }) + .filter(Boolean); +} + +/** + * migrate takes name of a file which contains the list + * of the files to be migrated to tape. + * + * The file should be in the following format: + * + * The function returns the names of the files which failed + * to migrate. + * @param {string} file filename + * @returns {Promise} failedfiles + */ +async function migrate(file) { + try { + await exec(`${get_bin_path(MIGRATE_SCRIPT)} ${file}`); + return []; + } catch (error) { + return tapecloud_failure_handler(error); + } +} + +/** + * recall takes name of a file which contains the list + * of the files to be recall to tape. + * + * The file should be in the following format: + * + * The function returns the names of the files which failed + * to recall. + * @param {string} file filename + * @returns {Promise} failed files + */ +async function recall(file) { + try { + await exec(`${get_bin_path(RECALL_SCRIPT)} ${file}`); + return []; + } catch (error) { + return tapecloud_failure_handler(error); + } +} + +/** + * scan_expired will invoke the `SCAN_EXPIRED` script and will expect the + * data to be dumped into `destination` dir. + * @param {string} destination + */ +async function scan_expired(destination) { + await exec(`${get_bin_path(SCAN_EXPIRED_SCRIPT)} ${destination}`); +} + +class TapeCloudGlacierBackend extends GlacierBackend { + async migrate(fs_context, log_file) { + let filtered_log = null; + let walreader = null; + try { + filtered_log = new PersistentLogger( + config.NSFS_GLACIER_LOGS_DIR, + `tapecloud_migrate_run_${Date.now().toString()}`, + { disable_rotate: true, locking: 'EXCLUSIVE' }, + ); + + walreader = new NewlineReader(fs_context, log_file, 'EXCLUSIVE'); + + const result = await walreader.forEach(async entry => { + let should_migrate = true; + try { + should_migrate = await this.should_migrate(fs_context, entry); + } catch (err) { + if (err.code === 'ENOENT') { + // Skip this file + return true; + } + + // Something else is wrong with this entry of this file + // should skip processing this WAL for now + return false; + } + + // Skip the file if it shouldn't be migrated + if (!should_migrate) return true; + + await filtered_log.append(entry); + return true; + }); + + // If the result of the above is false then it indicates that we concluded + // to exit early hence the file shouldn't be processed further, exit + if (!result) return false; + + await filtered_log.close(); + const failed = await migrate(filtered_log.active_path); + + // Do not delete the WAL if migration failed - This allows easy retries + return failed.length === 0; + } catch (error) { + console.error('unexpected error occured while processing migrate WAL:', error); + + // Preserve the WAL if we encounter exception here, possible failures + // 1.eaedm command failure + // 2. tempwal failure + // 3. newline reader failure + return false; + } finally { + if (filtered_log) { + await filtered_log.close(); + await filtered_log.remove(); + } + + if (walreader) await walreader.close(); + } + } + + async restore(fs_context, log_file) { + let tempwal = null; + let walreader = null; + let tempwalreader = null; + try { + // tempwal will store all the files of interest and will be handed over to tapecloud script + tempwal = new PersistentLogger( + config.NSFS_GLACIER_LOGS_DIR, + `tapecloud_restore_run_${Date.now().toString()}`, + { disable_rotate: true, locking: 'EXCLUSIVE' }, + ); + + walreader = new NewlineReader(fs_context, log_file, 'EXCLUSIVE'); + + const [precount, preres] = await walreader.forEach(async entry => { + let fh = null; + try { + fh = await nb_native().fs.open(fs_context, entry, 'rw'); + const stat = await fh.stat( + fs_context, + { + xattr_get_keys: [ + GlacierBackend.XATTR_RESTORE_REQUEST, + GlacierBackend.XATTR_RESTORE_REQUEST_STAGED, + ] + } + ); + + const should_restore = await this.should_restore(fs_context, entry, stat); + if (!should_restore) { + // Skip this file + return true; + } + + await fh.replacexattr( + fs_context, + { + [GlacierBackend.XATTR_RESTORE_REQUEST_STAGED]: + stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST] || stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST_STAGED], + [GlacierBackend.XATTR_RESTORE_ONGOING]: 'true', + } + ); + + // Add entry to the tempwal + await tempwal.append(entry); + + return true; + } catch (error) { + if (error.code === 'ENOENT') { + // Skip this file + return true; + } + + // Something else is wrong so skip processing the file for now + return false; + } finally { + if (fh) await fh.close(fs_context); + } + }); + + // If the result of the above iteration was negative it indicates + // an early exit hence no need to process further for now + if (!preres) return false; + + // If we didn't read even one line then it most likely indicates that the WAL is + // empty - this case is unlikely given the mechanism of WAL but still needs to be + // handled. + // Still return `false` so as to not insist file deletion + if (precount === 0) return false; + + // If we didn't find any candidates despite complete read, exit and delete this WAL + if (tempwal.local_size === 0) return true; + + await tempwal.close(); + const failed = await recall(tempwal.active_path); + + tempwalreader = new NewlineReader(fs_context, tempwal.active_path, "EXCLUSIVE"); + + // Start iteration over the WAL again + const post = await tempwalreader.forEach(async entry => { + let fh = null; + try { + fh = await nb_native().fs.open(fs_context, entry, 'rw'); + + const stat = await fh.stat( + fs_context, + { + xattr_get_keys: [ + GlacierBackend.XATTR_RESTORE_REQUEST, + GlacierBackend.XATTR_RESTORE_REQUEST_STAGED, + ] + } + ); + + // We noticed that the file has failed earlier + // so mustn't have been part of the WAL, ignore + if (failed.includes(entry)) { + await fh.replacexattr( + fs_context, + { + [GlacierBackend.XATTR_RESTORE_REQUEST]: stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST_STAGED], + }, + GlacierBackend.XATTR_RESTORE_ONGOING, + ); + + return true; + } + + const expires_on = new Date(); + const days = parse_decimal_int(stat.xattr[GlacierBackend.XATTR_RESTORE_REQUEST_STAGED]); + expires_on.setUTCDate(expires_on.getUTCDate() + days); + expires_on.setUTCHours(0, 0, 0, 0); + + await fh.replacexattr(fs_context, { + [GlacierBackend.XATTR_RESTORE_ONGOING]: 'false', + [GlacierBackend.XATTR_RESTORE_EXPIRY]: expires_on.toISOString(), + }, GlacierBackend.XATTR_RESTORE_REQUEST); + + return true; + } catch (error) { + console.error(`failed to process ${entry}`, error); + // It's OK if the file got deleted between the last check and this check + // but if there is any other error, retry restore + // + // It could be that the error is transient and the actual + // restore did successfully take place, in that case, rely on tapecloud script to + // handle dups + if (error.code !== 'ENOENT') { + return false; + } + } finally { + if (fh) await fh.close(fs_context); + } + }); + + if (!post[1]) return false; + + // Even if we failed to process one entry in log, preserve the WAL + return failed.length === 0; + } catch (error) { + console.error('unexpected error occured while processing restore WAL:', error); + + // Preserve the WAL, failure cases: + // 1. tapecloud command exception + // 2. WAL open failure + // 3. Newline reader failure + return false; + } finally { + if (walreader) await walreader.close(); + if (tempwalreader) await tempwalreader.close(); + + if (tempwal) { + await tempwal.close(); + await tempwal.remove(); + } + } + } + + async expiry(fs_context) { + try { + // Create temporary directory for `scan_expired` + const tempdir = path.join(config.NSFS_GLACIER_LOGS_DIR, `scanexpire-out-tmp-${Date.now()}`); + await nb_native().fs.mkdir( + fs_context, + tempdir, + native_fs_utils.get_umasked_mode(config.BASE_MODE_DIR) + ); + + await scan_expired(tempdir); + + const entries = await nb_native().fs.readdir( + fs_context, + tempdir + ); + + for (const entry of entries) { + if (!entry.isFile()) continue; + try { + await migrate(entry.path); + } catch (error) { + console.error('failed to process entry:', entry.path); + } + } + + // Delete the tempdir at the end + await nb_native().fs.unlink( + fs_context, + tempdir + ); + } catch (error) { + console.error('Unexpected error occured while running tapecloud.expiry:', error); + } + } + + async low_free_space() { + const result = await exec(get_bin_path(LOW_FREE_SPACE_SCRIPT), { return_stdout: true }); + return result.toLowerCase() === 'true'; + } +} + +exports.TapeCloudGlacierBackend = TapeCloudGlacierBackend; diff --git a/src/sdk/object_sdk.js b/src/sdk/object_sdk.js index 9961a521d8..6a3b1eacf3 100644 --- a/src/sdk/object_sdk.js +++ b/src/sdk/object_sdk.js @@ -653,7 +653,7 @@ class ObjectSDK { const actual_source_ns = source_md.ns || source_ns; const actual_target_ns = target_ns.get_write_resource(); - if (actual_target_ns.is_server_side_copy(actual_source_ns, params)) { + if (actual_target_ns.is_server_side_copy(actual_source_ns, source_md, params)) { // fix copy_source in params to point to the correct cloud bucket params.copy_source.bucket = actual_source_ns.get_bucket(bucket); params.copy_source.obj_id = source_md.obj_id; diff --git a/src/test/unit_tests/test_namespace_cache.js b/src/test/unit_tests/test_namespace_cache.js index d1ba1dfa4e..0170ae818a 100644 --- a/src/test/unit_tests/test_namespace_cache.js +++ b/src/test/unit_tests/test_namespace_cache.js @@ -329,7 +329,7 @@ class MockNamespace { /** @returns {any} */ not_implemented() { throw new Error('MockNamespace not implemented'); } - is_server_side_copy(other, params) { return false; } + is_server_side_copy(other, other_md, params) { return false; } is_readonly_namespace() { return false; } get_write_resource() { return this; } get_bucket() { return this.target_bucket; } diff --git a/src/test/unit_tests/test_namespace_fs.js b/src/test/unit_tests/test_namespace_fs.js index b37217f96f..40baa2efeb 100644 --- a/src/test/unit_tests/test_namespace_fs.js +++ b/src/test/unit_tests/test_namespace_fs.js @@ -9,6 +9,7 @@ const path = require('path'); const mocha = require('mocha'); const crypto = require('crypto'); const assert = require('assert'); +const os = require('os'); const P = require('../../util/promise'); const config = require('../../../config'); @@ -23,6 +24,8 @@ const { S3Error } = require('../../endpoint/s3/s3_errors'); const inspect = (x, max_arr = 5) => util.inspect(x, { colors: true, depth: null, maxArrayLength: max_arr }); +const mkdtemp = util.promisify(fs.mkdtemp); + // TODO: In order to verify validity add content_md5_mtime as well const XATTR_MD5_KEY = 'content_md5'; const XATTR_DIR_CONTENT = 'user.noobaa.dir_content'; @@ -459,6 +462,11 @@ mocha.describe('namespace_fs', function() { const restore_key = 'restore_key_1'; const data = crypto.randomBytes(100); + mocha.before(async function() { + const dir = await mkdtemp(`${os.tmpdir()}${path.sep}`); + config.NSFS_GLACIER_LOGS_DIR = dir; + }); + mocha.describe('GLACIER storage class not supported', function() { mocha.before(async function() { const upload_res = await ns_tmp.upload_object({ diff --git a/src/util/file_reader.js b/src/util/file_reader.js new file mode 100644 index 0000000000..3d17726dac --- /dev/null +++ b/src/util/file_reader.js @@ -0,0 +1,105 @@ +/* Copyright (C) 2024 NooBaa */ + +'use strict'; + +const nb_native = require('./nb_native'); + +class NewlineReader { + /** + * NewlineReader allows to read a file line by line while at max holding one line + 4096 bytes + * in memory. + * @param {nb.NativeFSContext} fs_context + * @param {string} filepath + * @param {'EXCLUSIVE' | 'SHARED' | undefined} lock + */ + constructor(fs_context, filepath, lock) { + this.path = filepath; + this.lock = lock; + this.buf = Buffer.alloc(4096); + + this.fs_context = fs_context; + this.fh = null; + + this.readoffset = 0; + this.readresults = []; + this._partialread = ""; + this.eof = false; + } + + /** + * nextline returns the next line from the given file + * @returns {Promise} + */ + async nextline() { + if (!this.fh) await this.init(); + + if (this.readresults.length) return this.readresults.shift(); + if (this.eof) return null; + + // Will keep reading till we find at least one new line character + while (!this._partialread.includes('\n')) { + const read = await this.fh.read(this.fs_context, this.buf, 0, this.buf.length, this.readoffset); + if (read === 0) { + this.eof = true; + return null; + } + + this.readoffset += read; + + this._partialread += this.buf.subarray(0, read).toString('utf-8'); + } + + // readresults will contain >= 1 result or else we would have kept looping above + this.readresults = this._partialread.split('\n').slice(0, -1); + + const lastnewlineidx = this._partialread.lastIndexOf('\n'); + this._partialread = this._partialread.substring(lastnewlineidx + 1); + + return this.readresults.shift(); + } + + /** + * forEach takes a callback function and invokes it + * with each line as parameter + * + * The callback function can return `false` if it wants + * to stop the iteration. + * @param {(entry: string) => Promise} cb + * @returns {Promise<[number, boolean]>} + */ + async forEach(cb) { + let entry = await this.nextline(); + let count = 0; + while (entry !== null) { + count += 1; + if (!await cb(entry)) return [count, false]; + + entry = await this.nextline(); + } + + return [count, true]; + } + + // reset will reset the reader and will allow reading the file from + // the beginning again, this does not reopens the file so if the file + // was moved, this will still keep on reading from the previous FD. + reset() { + this.readresults = []; + this._partialread = ""; + this.eof = false; + this.readoffset = 0; + } + + async init() { + const fh = await nb_native().fs.open(this.fs_context, this.path, 'r'); + if (this.lock) await fh.flock(this.fs_context, this.lock); + + this.fh = fh; + } + + async close() { + if (this.fh) await this.fh.close(this.fs_context); + } +} + +exports.NewlineReader = NewlineReader; diff --git a/src/util/persistent_logger.js b/src/util/persistent_logger.js new file mode 100644 index 0000000000..b4ea1007a3 --- /dev/null +++ b/src/util/persistent_logger.js @@ -0,0 +1,210 @@ +/* Copyright (C) 2023 NooBaa */ +'use strict'; + +const path = require('path'); +const nb_native = require('./nb_native'); +const native_fs_utils = require('./native_fs_utils'); +const P = require('./promise'); +const Semaphore = require('./semaphore'); +const dbg = require('./debug_module')(__filename); + +/** + * PersistentLogger is a logger that is used to record data onto disk separated by newlines. + * + * WAL should ideally use DirectIO to avoid fsyncgate (this does not) + * Refer: [Can applications recover from fsync failures?](https://ramalagappan.github.io/pdfs/papers/cuttlefs.pdf) + * + * Cannot recover from bit rot (Use RAID or something). + */ +class PersistentLogger { + /** + * @param {string} dir parent directory + * @param {string} file file prefix + * @param {{ + * max_interval?: Number, + * locking?: "SHARED" | "EXCLUSIVE", + * disable_rotate?: boolean, + * }} cfg + */ + constructor(dir, file, cfg) { + this.dir = dir; + this.file = file; + this.cfg = cfg; + this.active_path = path.join(this.dir, this.file); + this.locking = cfg.locking; + + this.fs_context = native_fs_utils.get_process_fs_context(); + + this.fh = null; + this.fh_stat = null; + this.local_size = 0; + + this.init_lock = new Semaphore(1); + + if (!cfg.disable_rotate) this._auto_rotate(); + } + + async init() { + if (this.fh) return this.fh; + + return this.init_lock.surround(async () => { + if (this.fh) return this.fh; + + const total_retries = 10; + const backoff = 5; + + for (let retries = 0; retries < total_retries; retries++) { + let fh = null; + try { + fh = await this._open(); + if (this.locking) await fh.flock(this.fs_context, this.locking); + + const fh_stat = await fh.stat(this.fs_context); + const path_stat = await nb_native().fs.stat(this.fs_context, this.active_path); + + if (fh_stat.ino === path_stat.ino && fh_stat.nlink > 0) { + this.fh = fh; + this.local_size = 0; + this.fh_stat = fh_stat; + + // Prevent closing the fh if we succedded in the init + fh = null; + + return this.fh; + } + + dbg.log0( + 'failed to init active log file, retry:', retries + 1, + 'active path:', this.active_path, + ); + await P.delay(backoff * (1 + Math.random())); + } catch (error) { + dbg.log0( + 'an error occured during init:', error, + 'active path:', this.active_path, + ); + throw error; + } finally { + if (fh) await fh.close(this.fs_context); + } + } + + dbg.log0( + 'init retries exceeded, total retries:', + total_retries, + 'active path:', this.active_path, + ); + throw new Error('init retries exceeded'); + }); + } + + async append(data) { + const fh = await this.init(); + + const buf = Buffer.from(data + "\n", 'utf8'); + await fh.write(this.fs_context, buf, buf.length); + this.local_size += buf.length; + } + + _auto_rotate() { + this.swap_lock_file = path.join(this.dir, `swaplock.${this.file}`); + + setInterval(async () => { + await this._swap(); + }, this.cfg.max_interval).unref(); + } + + async _swap() { + if (!this.fh || !this.local_size) return; + + let slfh = null; + try { + // Taking this lock ensure that when the file isn't moved between us checking the inode + // and performing the rename + slfh = await nb_native().fs.open(this.fs_context, this.swap_lock_file, "rw"); + await slfh.flock(this.fs_context, 'EXCLUSIVE'); + + let path_stat = null; + try { + // Ensure that the inode of the `this.active_path` is the same as the one we opened + path_stat = await nb_native().fs.stat(this.fs_context, this.active_path, {}); + } catch (error) { + if (error.code === 'ENOENT') { + // Some other process must have renamed the file + dbg.log1('got ENOENT for the active file'); + } else { + // TODO: Unexpected case, handle better + dbg.error('failed to stat current file:', error); + } + } + + if (path_stat && path_stat.ino === this.fh_stat.ino) { + // Yes, time can drift. It can go in past or future. This at times might produce + // duplicate names or might produce names which ideally would have produced in the past. + // + // Hence, the order of files in the directory is not guaranteed to be in order of "time". + const inactive_file = `${this.file}.${Date.now()}`; + try { + await nb_native().fs.rename(this.fs_context, this.active_path, path.join(this.dir, inactive_file)); + } catch (error) { + // It isn't really expected that this will fail assuming all the processes respect the locking + // semantics + // TODO: Unexpected case, handle better + dbg.error('failed to rename file', error); + } + } + + await this.close(); + } catch (error) { + dbg.log0( + 'failed to get swap lock:', error, + 'dir:', this.dir, + 'file:', this.file, + ); + } finally { + if (slfh) await slfh.close(this.fs_context); + } + } + + async close() { + const fh = this.fh; + + this.fh = null; + this.fh_stat = null; + this.local_size = 0; + + if (fh) await fh.close(this.fs_context); + } + + async remove() { + try { + await nb_native().fs.unlink(this.fs_context, this.active_path); + } catch (error) { + // ignore + } + } + + /** + * process_inactive takes a callback and runs it on all past WAL files. + * It does not do so in any particular order. + * @param {(file: string) => Promise} cb callback + */ + async process_inactive(cb) { + const files = await nb_native().fs.readdir(this.fs_context, this.dir); + const filtered = files.filter(f => f.name.startsWith(this.file) && f.name !== this.file && !native_fs_utils.isDirectory(f)); + + for (const file of filtered) { + const delete_processed = await cb(path.join(this.dir, file.name)); + if (delete_processed) { + await nb_native().fs.unlink(this.fs_context, path.join(this.dir, file.name)); + } + } + } + + async _open() { + return nb_native().fs.open(this.fs_context, this.active_path, 'as'); + } +} + + +exports.PersistentLogger = PersistentLogger;