Skip to content

Commit

Permalink
Purge sqlite states should not run 2 times in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Pierre-Gilles committed Aug 10, 2024
1 parent d64a365 commit 3b84732
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 56 deletions.
2 changes: 1 addition & 1 deletion server/api/controllers/device.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ module.exports = function DeviceController(gladys) {
* @apiGroup Device
*/
async function purgeAllSqliteStates(req, res) {
await gladys.device.purgeAllSqliteStates();
gladys.event.emit(EVENTS.DEVICE.PURGE_ALL_SQLITE_STATES);
res.json({ success: true });
}

Expand Down
119 changes: 65 additions & 54 deletions server/lib/device/device.purgeAllSqliteStates.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,81 +11,92 @@ const logger = require('../../utils/logger');
* device.purgeAllSqliteStates('d47b481b-a7be-4224-9850-313cdb8a4065');
*/
async function purgeAllSqliteStates(jobId) {
if (this.purgeAllSQliteStatesInProgress) {
logger.info(`Not purging all SQlite states, a purge is already in progress`);
return null;
}
logger.info(`Purging all SQlite states`);
this.purgeAllSQliteStatesInProgress = true;

const numberOfDeviceFeatureStateToDelete = await db.DeviceFeatureState.count();
const numberOfDeviceFeatureStateAggregateToDelete = await db.DeviceFeatureStateAggregate.count();
try {
const numberOfDeviceFeatureStateToDelete = await db.DeviceFeatureState.count();
const numberOfDeviceFeatureStateAggregateToDelete = await db.DeviceFeatureStateAggregate.count();

logger.info(
`Purging All SQLite states: ${numberOfDeviceFeatureStateToDelete} states & ${numberOfDeviceFeatureStateAggregateToDelete} aggregates to delete.`,
);
logger.info(
`Purging All SQLite states: ${numberOfDeviceFeatureStateToDelete} states & ${numberOfDeviceFeatureStateAggregateToDelete} aggregates to delete.`,
);

const numberOfIterationsStates = Math.ceil(
numberOfDeviceFeatureStateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
);
const iterator = [...Array(numberOfIterationsStates)];
const numberOfIterationsStates = Math.ceil(
numberOfDeviceFeatureStateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
);
const iterator = [...Array(numberOfIterationsStates)];

const numberOfIterationsStatesAggregates = Math.ceil(
numberOfDeviceFeatureStateAggregateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
);
const iteratorAggregates = [...Array(numberOfIterationsStatesAggregates)];
const numberOfIterationsStatesAggregates = Math.ceil(
numberOfDeviceFeatureStateAggregateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
);
const iteratorAggregates = [...Array(numberOfIterationsStatesAggregates)];

const total = numberOfIterationsStates + numberOfIterationsStatesAggregates;
let currentBatch = 0;
let currentProgressPercent = 0;
const total = numberOfIterationsStates + numberOfIterationsStatesAggregates;
let currentBatch = 0;
let currentProgressPercent = 0;

// We only save progress to DB if it changed
// Because saving progress is expensive (DB write + Websocket call)
const updateProgressIfNeeded = async () => {
currentBatch += 1;
const newProgressPercent = Math.round((currentBatch * 100) / total);
if (currentProgressPercent !== newProgressPercent) {
currentProgressPercent = newProgressPercent;
await this.job.updateProgress(jobId, currentProgressPercent);
}
};
// We only save progress to DB if it changed
// Because saving progress is expensive (DB write + Websocket call)
const updateProgressIfNeeded = async () => {
currentBatch += 1;
const newProgressPercent = Math.round((currentBatch * 100) / total);
if (currentProgressPercent !== newProgressPercent) {
currentProgressPercent = newProgressPercent;
await this.job.updateProgress(jobId, currentProgressPercent);
}
};

await Promise.each(iterator, async () => {
await db.sequelize.query(
`
await Promise.each(iterator, async () => {
await db.sequelize.query(
`
DELETE FROM t_device_feature_state WHERE id IN (
SELECT id FROM t_device_feature_state
LIMIT :limit
);
`,
{
replacements: {
limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
{
replacements: {
limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
},
type: QueryTypes.SELECT,
},
type: QueryTypes.SELECT,
},
);
await updateProgressIfNeeded();
await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH);
});
);
await updateProgressIfNeeded();
await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH);
});

await Promise.each(iteratorAggregates, async () => {
await db.sequelize.query(
`
await Promise.each(iteratorAggregates, async () => {
await db.sequelize.query(
`
DELETE FROM t_device_feature_state_aggregate WHERE id IN (
SELECT id FROM t_device_feature_state_aggregate
LIMIT :limit
);
`,
{
replacements: {
limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
{
replacements: {
limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
},
type: QueryTypes.SELECT,
},
type: QueryTypes.SELECT,
},
);
await updateProgressIfNeeded();
await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH);
});
return {
numberOfDeviceFeatureStateToDelete,
numberOfDeviceFeatureStateAggregateToDelete,
};
);
await updateProgressIfNeeded();
await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH);
});
this.purgeAllSQliteStatesInProgress = false;
return {
numberOfDeviceFeatureStateToDelete,
numberOfDeviceFeatureStateAggregateToDelete,
};
} catch (e) {
this.purgeAllSQliteStatesInProgress = false;
throw e;

Check warning on line 98 in server/lib/device/device.purgeAllSqliteStates.js

View check run for this annotation

Codecov / codecov/patch

server/lib/device/device.purgeAllSqliteStates.js#L97-L98

Added lines #L97 - L98 were not covered by tests
}
}

module.exports = {
Expand Down
4 changes: 4 additions & 0 deletions server/lib/device/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ const DeviceManager = function DeviceManager(
EVENTS.DEVICE.MIGRATE_FROM_SQLITE_TO_DUCKDB,
eventFunctionWrapper(this.migrateFromSQLiteToDuckDb.bind(this)),
);
this.eventManager.on(
EVENTS.DEVICE.PURGE_ALL_SQLITE_STATES,
eventFunctionWrapper(this.purgeAllSqliteStates.bind(this)),
);
};

DeviceManager.prototype.add = add;
Expand Down
6 changes: 5 additions & 1 deletion server/test/lib/device/device.purgeAllSqliteStates.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ describe('Device', () => {
wrapper: (type, func) => func,
};
const device = new Device(event, {}, stateManager, service, {}, variable, job);
const devicePurged = await device.purgeAllSqliteStates('632c6d92-a79a-4a38-bf5b-a2024721c101');
const devicePurgedPromise = device.purgeAllSqliteStates('632c6d92-a79a-4a38-bf5b-a2024721c101');
const emptyRes = await device.purgeAllSqliteStates('632c6d92-a79a-4a38-bf5b-a2024721c101');
const devicePurged = await devicePurgedPromise;
expect(devicePurged).to.deep.equal({
numberOfDeviceFeatureStateAggregateToDelete: 3,
numberOfDeviceFeatureStateToDelete: 110,
});
// should not start a new purge when a purge is running
expect(emptyRes).to.equal(null);
});
});
1 change: 1 addition & 0 deletions server/utils/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ const EVENTS = {
PURGE_STATES_SINGLE_FEATURE: 'device.purge-states-single-feature',
CHECK_BATTERIES: 'device.check-batteries',
MIGRATE_FROM_SQLITE_TO_DUCKDB: 'device.migrate-from-sqlite-to-duckdb',
PURGE_ALL_SQLITE_STATES: 'device.purge-all-sqlite-states',
},
GATEWAY: {
CREATE_BACKUP: 'gateway.create-backup',
Expand Down

0 comments on commit 3b84732

Please sign in to comment.