diff --git a/bin/delete-objects/index.ts b/bin/delete-objects/index.ts index 08b082f8..02d91ab2 100755 --- a/bin/delete-objects/index.ts +++ b/bin/delete-objects/index.ts @@ -1,14 +1,17 @@ import AWS from 'aws-sdk'; import { validate } from 'uuid'; import { program } from 'commander'; - +import { MongoDBShardsRepository } from '../../lib/core/shards/MongoDBShardsRepository'; import { connectToDatabase, Models } from '../utils/database'; import { + DatabaseShardsReader, FileListObjectStorageReader, ObjectStorageReader, + ripemd160, S3ObjectStorageReader, StorageObject } from './ObjectStorage'; +import listV1Shards from './list-v1-shards'; program .version('0.0.1') @@ -40,6 +43,10 @@ program '-r, --region ', 'object storage region' ) + .option( + '-n, --nodeId ', + 'the node id of the farmer that has a contract with the shards to filter' + ) .parse(); @@ -107,6 +114,7 @@ if (readerSource === 'file') { const stats = { totalDeletedSize: 0, totalDeletedObjects: 0, + throughput: 0, }; async function cleanStalledObjects(): Promise { @@ -138,6 +146,50 @@ async function cleanStalledObjects(): Promise { } } +const createTimer = () => { + let timeStart: [number, number]; + + return { + start: () => { + timeStart = process.hrtime(); + }, + end: () => { + const NS_PER_SEC = 1e9; + const NS_TO_MS = 1e6; + const diff = process.hrtime(timeStart); + + return (diff[0] * NS_PER_SEC + diff[1]) / NS_TO_MS; + } + }; +}; + +/** + * TODO: Add the cleanup part + */ +async function cleanStalledV1Objects(): Promise { + console.log('nodeId', options.nodeId); + + models = await connectToDatabase(options.config, options.mongourl); + const shardsReader = new DatabaseShardsReader(new MongoDBShardsRepository(models?.Shard)); + + const timer = createTimer(); + timer.start(); + const listingEmitter = listV1Shards(shardsReader, options.nodeId); + + await new Promise((resolve, reject) => { + listingEmitter + .once('error', reject) + .once('end', resolve) + .on('progress', ({ deletedCount }) => { + stats.throughput = deletedCount / (timer.end() / 1000) + }) + .on('data', (shard) => { + console.log('shard %s %s %s', shard.hash, ripemd160(shard.hash), shard.size); + }) + }); +} + + /** * Clean unfinished multipart uploads. * Still WIP @@ -174,7 +226,8 @@ async function main(): Promise { console.log('STATS', stats); }, 10000); try { - await cleanStalledObjects(); + // await cleanStalledObjects(); + await cleanStalledV1Objects(); await insertStatsOnDatabase(stats.totalDeletedSize); console.log('PROGRAM FINISHED SUCCESSFULLY'); diff --git a/bin/delete-objects/list-v1-shards.ts b/bin/delete-objects/list-v1-shards.ts new file mode 100644 index 00000000..61e0ec78 --- /dev/null +++ b/bin/delete-objects/list-v1-shards.ts @@ -0,0 +1,48 @@ +import { EventEmitter } from 'events'; +import { ShardsReader } from './ObjectStorage'; + +/** + * Lists all the shards that are from the version 1 of the protocol + * and that have a contract with the given nodeID. + * + * @param shardsReader + * @param nodeID + */ +export default function (shardsReader: ShardsReader, nodeID: string): EventEmitter { + const eventsEmitter = new EventEmitter(); + + let deletedCount = 0; + let notifyProgressIntervalId = setInterval(() => { + eventsEmitter.emit('progress', { deletedCount }); + }, 3000); + + async function listShards() { + for await (const shard of shardsReader.list(500)) { + if (shardsReader.isV1(shard)) { + const size = shard.contracts && shard.contracts.length > 0 && shard.contracts[0].contract?.data_size || shard.size || 0; + + if (shard.contracts && shard.contracts.length > 0) { + const containsContractWithOurFarmer = shard.contracts.some(c => c.nodeID === nodeID); + + if (containsContractWithOurFarmer) { + eventsEmitter.emit('data', { ...shard, size }); + } + } + deletedCount += 1; + } + } + } + + listShards() + .then(() => { + eventsEmitter.emit('end'); + }) + .catch((err) => { + eventsEmitter.emit('error', err); + }) + .finally(() => { + clearInterval(notifyProgressIntervalId); + }); + + return eventsEmitter; +};