Skip to content

Commit

Permalink
feat(bin): list-v1-shards utility
Browse files Browse the repository at this point in the history
  • Loading branch information
sg-gs committed Jan 4, 2024
1 parent 2ae3178 commit d8dd55a
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 2 deletions.
57 changes: 55 additions & 2 deletions bin/delete-objects/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import AWS from 'aws-sdk';
import { validate } from 'uuid';
import { program } from 'commander';

import { MongoDBShardsRepository } from '../../lib/core/shards/MongoDBShardsRepository';
import { connectToDatabase, Models } from '../utils/database';
import {
DatabaseShardsReader,
FileListObjectStorageReader,
ObjectStorageReader,
ripemd160,
S3ObjectStorageReader,
StorageObject
} from './ObjectStorage';
import listV1Shards from './list-v1-shards';

program
.version('0.0.1')
Expand Down Expand Up @@ -40,6 +43,10 @@ program
'-r, --region <us-east-1>',
'object storage region'
)
.option(
'-n, --nodeId <node-id>',
'the node id of the farmer that has a contract with the shards to filter'
)
.parse();


Expand Down Expand Up @@ -107,6 +114,7 @@ if (readerSource === 'file') {
const stats = {
totalDeletedSize: 0,
totalDeletedObjects: 0,
throughput: 0,
};

async function cleanStalledObjects(): Promise<void> {
Expand Down Expand Up @@ -138,6 +146,50 @@ async function cleanStalledObjects(): Promise<void> {
}
}

const createTimer = () => {
let timeStart: [number, number];

return {
start: () => {
timeStart = process.hrtime();
},
end: () => {
const NS_PER_SEC = 1e9;
const NS_TO_MS = 1e6;
const diff = process.hrtime(timeStart);

return (diff[0] * NS_PER_SEC + diff[1]) / NS_TO_MS;
}
};
};

/**
* TODO: Add the cleanup part
*/
async function cleanStalledV1Objects(): Promise<void> {
console.log('nodeId', options.nodeId);

models = await connectToDatabase(options.config, options.mongourl);
const shardsReader = new DatabaseShardsReader(new MongoDBShardsRepository(models?.Shard));

const timer = createTimer();
timer.start();
const listingEmitter = listV1Shards(shardsReader, options.nodeId);

await new Promise((resolve, reject) => {
listingEmitter
.once('error', reject)
.once('end', resolve)
.on('progress', ({ deletedCount }) => {
stats.throughput = deletedCount / (timer.end() / 1000)
})
.on('data', (shard) => {
console.log('shard %s %s %s', shard.hash, ripemd160(shard.hash), shard.size);
})
});
}


/**
* Clean unfinished multipart uploads.
* Still WIP
Expand Down Expand Up @@ -174,7 +226,8 @@ async function main(): Promise<void> {
console.log('STATS', stats);
}, 10000);
try {
await cleanStalledObjects();
// await cleanStalledObjects();
await cleanStalledV1Objects();
await insertStatsOnDatabase(stats.totalDeletedSize);

console.log('PROGRAM FINISHED SUCCESSFULLY');
Expand Down
48 changes: 48 additions & 0 deletions bin/delete-objects/list-v1-shards.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { EventEmitter } from 'events';
import { ShardsReader } from './ObjectStorage';

/**
* Lists all the shards that are from the version 1 of the protocol
* and that have a contract with the given nodeID.
*
* @param shardsReader
* @param nodeID
*/
export default function (shardsReader: ShardsReader, nodeID: string): EventEmitter {
const eventsEmitter = new EventEmitter();

let deletedCount = 0;
let notifyProgressIntervalId = setInterval(() => {
eventsEmitter.emit('progress', { deletedCount });
}, 3000);

async function listShards() {
for await (const shard of shardsReader.list(500)) {
if (shardsReader.isV1(shard)) {
const size = shard.contracts && shard.contracts.length > 0 && shard.contracts[0].contract?.data_size || shard.size || 0;

if (shard.contracts && shard.contracts.length > 0) {
const containsContractWithOurFarmer = shard.contracts.some(c => c.nodeID === nodeID);

if (containsContractWithOurFarmer) {
eventsEmitter.emit('data', { ...shard, size });
}
}
deletedCount += 1;
}
}
}

listShards()
.then(() => {
eventsEmitter.emit('end');
})
.catch((err) => {
eventsEmitter.emit('error', err);
})
.finally(() => {
clearInterval(notifyProgressIntervalId);
});

return eventsEmitter;
};

0 comments on commit d8dd55a

Please sign in to comment.