diff --git a/bin/cli/commands/clean-stalled-bucket-entries.command.ts b/bin/cli/commands/clean-stalled-bucket-entries.command.ts new file mode 100644 index 000000000..c73083ab5 --- /dev/null +++ b/bin/cli/commands/clean-stalled-bucket-entries.command.ts @@ -0,0 +1,22 @@ +import { PrepareFunctionReturnType } from "../init"; +import { CommandId } from "./id"; +import cleanStalledBucketEntries from "../tasks/clean-stalled-bucket-entries.task"; + +export default { + id: CommandId.CleanStalledBucketEntries, + version: '0.0.1', + fn: async ( + { + usecase: { bucketsUsecase, bucketEntriesUsecase }, + repo: { framesRepository }, + readers + }: PrepareFunctionReturnType, + ): Promise => { + await cleanStalledBucketEntries( + bucketsUsecase, + bucketEntriesUsecase, + framesRepository, + readers.bucketEntriesReader + ); + } +}; diff --git a/bin/cli/commands/clean-stalled-frames.command.ts b/bin/cli/commands/clean-stalled-frames.command.ts new file mode 100644 index 000000000..3dda5bcfa --- /dev/null +++ b/bin/cli/commands/clean-stalled-frames.command.ts @@ -0,0 +1,13 @@ +import { PrepareFunctionReturnType } from "../init"; +import { cleanStalledFrames } from "../tasks/clean-stalled-frames.task"; +import { CommandId } from "./id"; + +export default { + id: CommandId.CleanStalledFrames, + version: '0.0.1', + fn: async ( + { usecase: { bucketEntriesUsecase }, readers }: PrepareFunctionReturnType, + ): Promise => { + await cleanStalledFrames(bucketEntriesUsecase, readers.framesReader); + }, +}; diff --git a/bin/cli/commands/id.ts b/bin/cli/commands/id.ts index 02770c7d1..0c904c8cc 100644 --- a/bin/cli/commands/id.ts +++ b/bin/cli/commands/id.ts @@ -2,4 +2,6 @@ export enum CommandId { DestroyUserBuckets = 'destroy-user-buckets', EmptyBucket = 'empty-bucket', EmptyBuckets = 'empty-buckets', + CleanStalledFrames = 'clean-stalled-frames', + CleanStalledBucketEntries = 'clean-stalled-bucket-entries', } diff --git a/bin/cli/commands/index.ts b/bin/cli/commands/index.ts index 19bb90e28..f978ab79b 100644 --- a/bin/cli/commands/index.ts +++ b/bin/cli/commands/index.ts @@ -4,6 +4,8 @@ import { buildCommand } from './build' import { default as destroyUserBuckets } from "./destroy-user-buckets.command"; import { default as emptyBucket } from "./empty-bucket.command"; import { default as emptyBuckets } from "./empty-buckets.command"; +import { default as cleanStalledFrames } from "./clean-stalled-frames.command"; +import { default as cleanStalledBucketEntries } from "./clean-stalled-bucket-entries.command"; export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ({ [destroyUserBuckets.id]: buildCommand({ @@ -35,4 +37,24 @@ export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ( await emptyBuckets.fn(resources, userId); onFinish(); }), + + [cleanStalledFrames.id]: buildCommand({ + version: cleanStalledFrames.version, + command: `${cleanStalledFrames.id}`, + description: 'Cleans stalled frames', + options: [], + }).action(async () => { + await cleanStalledFrames.fn(resources); + onFinish(); + }), + + [cleanStalledBucketEntries.id]: buildCommand({ + version: cleanStalledBucketEntries.version, + command: `${cleanStalledBucketEntries.id}`, + description: 'Cleans stalled bucket entries', + options: [], + }).action(async () => { + await cleanStalledBucketEntries.fn(resources); + onFinish(); + }), }); diff --git a/bin/cli/init.ts b/bin/cli/init.ts index f3b81bca7..fdf43d9cf 100644 --- a/bin/cli/init.ts +++ b/bin/cli/init.ts @@ -28,12 +28,18 @@ import { ShardsRepository } from '../../lib/core/shards/Repository'; import { UploadsRepository } from '../../lib/core/uploads/Repository'; import { TokensRepository } from '../../lib/core/tokens/Repository'; import { ContactsRepository } from '../../lib/core/contacts/Repository'; +import { MongoDB } from '../delete-objects/temp-shard.model'; +import { DatabaseFramesReader, DatabaseBucketEntriesReaderWithoutBucket } from '../delete-objects/ObjectStorage'; const Config = require('../../lib/config'); const config = new Config(process.env.NODE_ENV || 'develop', '', ''); export type PrepareFunctionReturnType = { + readers: { + framesReader: DatabaseFramesReader, + bucketEntriesReader: DatabaseBucketEntriesReaderWithoutBucket, + }, repo: { bucketEntriesRepository: BucketEntriesRepository, bucketEntryShardsRepository:BucketEntryShardsRepository, @@ -57,6 +63,8 @@ export type PrepareFunctionReturnType = { export async function prepare(): Promise { const QUEUE_NAME = 'NETWORK_WORKER_TASKS_QUEUE'; + const newDbConnection = new MongoDB(process.env.inxtbridge_storage__mongoUri as string); + await newDbConnection.connect(); const models = await connectToDatabase('', ''); const { QUEUE_USERNAME, QUEUE_PASSWORD, QUEUE_HOST } = config; @@ -114,9 +122,19 @@ export async function prepare(): Promise { tokensRepository, contactsRepository, ) + const framesReader = new DatabaseFramesReader( + newDbConnection.getCollections().frames + ); + const bucketEntriesReader = new DatabaseBucketEntriesReaderWithoutBucket( + newDbConnection.getCollections().bucketEntries + ); await networkQueue.connectAndRetry(); return { + readers: { + framesReader, + bucketEntriesReader, + }, repo: { bucketEntriesRepository, bucketEntryShardsRepository, diff --git a/bin/cli/tasks/clean-stalled-bucket-entries.task.ts b/bin/cli/tasks/clean-stalled-bucket-entries.task.ts new file mode 100644 index 000000000..b2c0d9fdb --- /dev/null +++ b/bin/cli/tasks/clean-stalled-bucket-entries.task.ts @@ -0,0 +1,55 @@ +import { BucketEntriesUsecase } from "../../../lib/core/bucketEntries/usecase"; +import { BucketsUsecase } from "../../../lib/core/buckets/usecase"; +import { FramesRepository } from "../../../lib/core/frames/Repository"; +import { BucketEntriesReader } from "../../delete-objects/ObjectStorage"; +import { BucketEntryDocument } from "../../delete-objects/temp-shard.model"; + +export type CleanStalledBucketEntriesFunctionType = ( + bucketsUsecase: BucketsUsecase, + bucketEntriesUsecase: BucketEntriesUsecase, + framesRepository: FramesRepository, + reader: BucketEntriesReader, +) => Promise; + +const task: CleanStalledBucketEntriesFunctionType = async ( + bucketsUsecase, + bucketEntriesUsecase, + framesRepository, + reader, +): Promise => { + const deleteInBulksOf = 20; + const toDelete: BucketEntryDocument[] = []; + const stats = { + totalSize: 0, + totalCount: 0, + } + + for await (const bucketEntry of reader.list()) { + const bucketDoesNotExist = await bucketsUsecase.findById(bucketEntry.bucket) === null; + const isV1 = !bucketEntry.version; + + if (bucketDoesNotExist && isV1) { + toDelete.push(bucketEntry); + } + + if (toDelete.length === deleteInBulksOf) { + const frames = await framesRepository.findByIds(toDelete.map(b => b.frame!)); + const beIds = toDelete.map(be => be._id.toString()); + + await bucketEntriesUsecase.removeFiles(beIds); + + toDelete.forEach(be => { + console.log(`deleting entry ${be._id}`); + }); + + stats.totalSize += frames.reduce((acc, curr) => acc + (curr.size || 0), 0); + stats.totalCount += toDelete.length; + + toDelete.length = 0; + + console.log(`total size ${stats.totalSize}, total count ${stats.totalCount}`); + } + } +} + +export default task; diff --git a/bin/cli/tasks/clean-stalled-frames.task.ts b/bin/cli/tasks/clean-stalled-frames.task.ts new file mode 100644 index 000000000..0797b98b4 --- /dev/null +++ b/bin/cli/tasks/clean-stalled-frames.task.ts @@ -0,0 +1,42 @@ +import { BucketEntriesUsecase } from "../../../lib/core/bucketEntries/usecase"; +import { FramesReader } from "../../delete-objects/ObjectStorage"; +import { Frame } from "../../../lib/core/frames/Frame"; +import { FrameDocument } from "../../delete-objects/temp-shard.model"; + +export type CleanStalledFramesFunctionType = ( + bucketEntriesUsecase: BucketEntriesUsecase, + reader: FramesReader, +) => Promise; + +export const cleanStalledFrames: CleanStalledFramesFunctionType = async ( + bucketEntriesUsecase, + reader, +): Promise => { + const deleteInBulksOf = 20; + const toDelete: { frame: Frame['id'], id: '', _frame: FrameDocument }[] = []; + const stats = { + totalSize: 0, + totalCount: 0, + } + + for await (const frame of reader.list()) { + if (frame.bucketEntry) { + const be = await bucketEntriesUsecase.findById(frame.bucketEntry); + + if (!be) { + console.log(`deleting frame ${frame._id}, be ${frame.bucketEntry}, size ${frame.size}`); + toDelete.push({ frame: frame._id.toString(), id: '', _frame: frame }); + } + } + if (toDelete.length === deleteInBulksOf) { + await bucketEntriesUsecase.removeFilesV1(toDelete as any); + + stats.totalSize += toDelete.reduce((acc, curr) => acc + curr._frame.size, 0); + stats.totalCount += toDelete.length; + + toDelete.length = 0; + + console.log(`total size ${stats.totalSize}, total count ${stats.totalCount}`); + } + } +} diff --git a/bin/delete-objects/ObjectStorage.ts b/bin/delete-objects/ObjectStorage.ts index 7247fbace..34635a7da 100644 --- a/bin/delete-objects/ObjectStorage.ts +++ b/bin/delete-objects/ObjectStorage.ts @@ -1,6 +1,12 @@ import AWS from 'aws-sdk'; import { existsSync, createReadStream } from 'fs'; import readline from 'readline'; +import { createHash } from 'crypto'; + +import { ShardsRepository } from '../../lib/core/shards/Repository'; +import { Shard } from '../../lib/core/shards/Shard'; +import { BucketEntryDocument, FrameDocument, MongoDBCollections, TempShardDocument } from './temp-shard.model'; +import { ObjectId } from 'mongodb'; export interface StorageObject { Key: string; @@ -10,6 +16,7 @@ export interface StorageObject { export interface ObjectStorageReader { listObjects(pageSize: number): AsyncGenerator; + find(key: string): Promise; } /** @@ -51,6 +58,22 @@ export class FileListObjectStorageReader implements ObjectStorageReader { yield { Key, Size: parseInt(Size), LastModified: new Date() }; } } + + async find(key: string): Promise { + const fileStream = createReadStream(this.filename); + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity // To recognize '\n' as a line delimiter + }); + + for await (const line of rl) { + const [Size, Key] = line.split(' '); + if (Key === key) { + return { Key, Size: parseInt(Size), LastModified: new Date() }; + } + } + return null; + } } export class S3ObjectStorageReader implements ObjectStorageReader { @@ -92,5 +115,211 @@ export class S3ObjectStorageReader implements ObjectStorageReader { lastPointer = response.NextContinuationToken; } while (lastPointer); } + + async find(key: string): Promise { + try { + const response = await this.s3.headObject({ + Bucket: this.bucket, + Key: key, + }).promise(); + + return { + Key: key, + Size: response.ContentLength ?? 0, + LastModified: response.LastModified ?? new Date(), + }; + } catch (error) { + if ((error as { code: string }).code === 'NotFound') { + return null; + } + throw error; + } + } +} + +export interface ShardsReader { + list(pageSize: number): AsyncGenerator; + isV1(s: Shard): boolean; +} + +interface TempShardsReader { + list(pageSize: number): AsyncGenerator; +} + +interface TempShardsWriter { + write(shard: Shard): Promise; +} + +function isValidShardHash(hash: string) { + return !!hash.match(/^[a-f0-9]{40}$/); +} + +export function ripemd160(content: string) { + if (!isValidShardHash(content)) { + throw Error('Invalid hex string'); + } + + return createHash('ripemd160').update(Buffer.from(content, 'hex')).digest('hex'); +} + +export class DatabaseShardsReader implements ShardsReader { + constructor(private readonly shardsRepository: ShardsRepository) {} + + isV1(s: Shard): boolean { + const doesNotHaveUuid = !s.uuid; + + return isValidShardHash(s.hash) && doesNotHaveUuid; + } + + async* list(pageSize = 1000): AsyncGenerator { + let offset = 0; + do { + const shards = await this.shardsRepository.findWithNoUuid( + pageSize, + offset, + ); + for (const shard of shards) { + yield shard; + } + offset += shards.length; + } while (offset % pageSize === 0); + } +} + +export class DatabaseTempShardsWriter implements TempShardsWriter { + constructor(private readonly tempShards: MongoDBCollections['tempShards']) {} + + async write(shard: Shard): Promise { + await this.tempShards.insertOne({ + hash: shard.hash, + objectStorageHash: ripemd160(shard.hash), + shardId: new ObjectId(shard.id), + size: shard.size, + }); + } +} + +export class DatabaseTempShardsReader implements TempShardsReader { + constructor(private readonly tempShards: MongoDBCollections['tempShards']) {} + + async* list(pageSize = 1000): AsyncGenerator { + let offset = 0; + do { + const tempShards = await this.tempShards.find( + {}, + { limit: pageSize, skip: offset }, + ).toArray(); + for (const tempShard of tempShards) { + yield tempShard; + } + offset += tempShards.length; + } while (offset % pageSize === 0); + } +} + +export interface Reader { + list(pageSize?: number): AsyncGenerator; +} + +export interface FramesReader extends Reader {} +export interface BucketEntriesReader extends Reader {} + +export class DatabaseFramesReader { + constructor(private readonly frames: MongoDBCollections['frames']) {} + + async* list(pageSize = 50): AsyncGenerator { + const pipeline = [ + { + $lookup: { + from: 'bucketentries', + localField: '_id', + foreignField: 'frame', + as: 'matched_entries' + } + }, + { + $match: { + matched_entries: { $size: 0 } + } + }, + ]; + const cursor = this.frames.aggregate(pipeline); + + while (await cursor.hasNext()) { + const frame = await cursor.next(); + + if (frame) { + yield frame; + } + } + } +} + +export class DatabaseFramesReaderWithoutOwner { + constructor(private readonly frames: MongoDBCollections['frames']) {} + + async* list(pageSize = 50): AsyncGenerator { + const pipeline = [ + { + $lookup: { + from: "users", + localField: "user", + foreignField: "_id", + as: "user_info" + } + }, + { + $match: { + user_info: { $eq: [] } // Filtra los documentos donde no hay coincidencias en Users + } + }, + ]; + const cursor = this.frames.aggregate(pipeline); + + while (await cursor.hasNext()) { + const frame = await cursor.next(); + + if (frame) { + yield frame; + } + } + } } +export class DatabaseBucketEntriesReaderWithoutBucket implements BucketEntriesReader { + constructor(private readonly collection: MongoDBCollections['bucketEntries']) {} + + async* list(pageSize = 50): AsyncGenerator { + const pipeline = [ + { + $match: { + created: { + $lt: new Date("2022-04-01T00:00:00Z") // Filtrar antes de abril de 2022 + } + } + }, + { + $lookup: { + from: "buckets", + localField: "bucket", + foreignField: "_id", + as: "bucketInfo" + } + }, + { + $match: { + bucketInfo: { $size: 0 } // Filtra los documentos donde no hay coincidencias en Users + } + }, + ]; + const cursor = this.collection.aggregate(pipeline); + + while (await cursor.hasNext()) { + const doc = await cursor.next(); + + if (doc) { + yield doc; + } + } + } +} diff --git a/bin/delete-objects/index.ts b/bin/delete-objects/index.ts index 08b082f8e..02d91ab28 100755 --- a/bin/delete-objects/index.ts +++ b/bin/delete-objects/index.ts @@ -1,14 +1,17 @@ import AWS from 'aws-sdk'; import { validate } from 'uuid'; import { program } from 'commander'; - +import { MongoDBShardsRepository } from '../../lib/core/shards/MongoDBShardsRepository'; import { connectToDatabase, Models } from '../utils/database'; import { + DatabaseShardsReader, FileListObjectStorageReader, ObjectStorageReader, + ripemd160, S3ObjectStorageReader, StorageObject } from './ObjectStorage'; +import listV1Shards from './list-v1-shards'; program .version('0.0.1') @@ -40,6 +43,10 @@ program '-r, --region ', 'object storage region' ) + .option( + '-n, --nodeId ', + 'the node id of the farmer that has a contract with the shards to filter' + ) .parse(); @@ -107,6 +114,7 @@ if (readerSource === 'file') { const stats = { totalDeletedSize: 0, totalDeletedObjects: 0, + throughput: 0, }; async function cleanStalledObjects(): Promise { @@ -138,6 +146,50 @@ async function cleanStalledObjects(): Promise { } } +const createTimer = () => { + let timeStart: [number, number]; + + return { + start: () => { + timeStart = process.hrtime(); + }, + end: () => { + const NS_PER_SEC = 1e9; + const NS_TO_MS = 1e6; + const diff = process.hrtime(timeStart); + + return (diff[0] * NS_PER_SEC + diff[1]) / NS_TO_MS; + } + }; +}; + +/** + * TODO: Add the cleanup part + */ +async function cleanStalledV1Objects(): Promise { + console.log('nodeId', options.nodeId); + + models = await connectToDatabase(options.config, options.mongourl); + const shardsReader = new DatabaseShardsReader(new MongoDBShardsRepository(models?.Shard)); + + const timer = createTimer(); + timer.start(); + const listingEmitter = listV1Shards(shardsReader, options.nodeId); + + await new Promise((resolve, reject) => { + listingEmitter + .once('error', reject) + .once('end', resolve) + .on('progress', ({ deletedCount }) => { + stats.throughput = deletedCount / (timer.end() / 1000) + }) + .on('data', (shard) => { + console.log('shard %s %s %s', shard.hash, ripemd160(shard.hash), shard.size); + }) + }); +} + + /** * Clean unfinished multipart uploads. * Still WIP @@ -174,7 +226,8 @@ async function main(): Promise { console.log('STATS', stats); }, 10000); try { - await cleanStalledObjects(); + // await cleanStalledObjects(); + await cleanStalledV1Objects(); await insertStatsOnDatabase(stats.totalDeletedSize); console.log('PROGRAM FINISHED SUCCESSFULLY'); diff --git a/bin/delete-objects/list-v1-shards.ts b/bin/delete-objects/list-v1-shards.ts new file mode 100644 index 000000000..61e0ec788 --- /dev/null +++ b/bin/delete-objects/list-v1-shards.ts @@ -0,0 +1,48 @@ +import { EventEmitter } from 'events'; +import { ShardsReader } from './ObjectStorage'; + +/** + * Lists all the shards that are from the version 1 of the protocol + * and that have a contract with the given nodeID. + * + * @param shardsReader + * @param nodeID + */ +export default function (shardsReader: ShardsReader, nodeID: string): EventEmitter { + const eventsEmitter = new EventEmitter(); + + let deletedCount = 0; + let notifyProgressIntervalId = setInterval(() => { + eventsEmitter.emit('progress', { deletedCount }); + }, 3000); + + async function listShards() { + for await (const shard of shardsReader.list(500)) { + if (shardsReader.isV1(shard)) { + const size = shard.contracts && shard.contracts.length > 0 && shard.contracts[0].contract?.data_size || shard.size || 0; + + if (shard.contracts && shard.contracts.length > 0) { + const containsContractWithOurFarmer = shard.contracts.some(c => c.nodeID === nodeID); + + if (containsContractWithOurFarmer) { + eventsEmitter.emit('data', { ...shard, size }); + } + } + deletedCount += 1; + } + } + } + + listShards() + .then(() => { + eventsEmitter.emit('end'); + }) + .catch((err) => { + eventsEmitter.emit('error', err); + }) + .finally(() => { + clearInterval(notifyProgressIntervalId); + }); + + return eventsEmitter; +}; diff --git a/bin/delete-objects/temp-shard.model.ts b/bin/delete-objects/temp-shard.model.ts new file mode 100644 index 000000000..9a2546ab1 --- /dev/null +++ b/bin/delete-objects/temp-shard.model.ts @@ -0,0 +1,70 @@ +import { ObjectId, Document, Collection, Db, MongoClient } from 'mongodb'; +import { Frame } from '../../lib/core/frames/Frame'; +import { BucketEntry } from '../../lib/core/bucketEntries/BucketEntry'; + +export interface MongoDBCollections { + tempShards: Collection; + frames: Collection; + bucketEntries: Collection; +} + +interface TempShard extends Document { + hash: string; + objectStorageHash: string; + shardId: string; + size: number; +} + +export interface TempShardDocument extends Omit { + shardId: ObjectId; +} + +export interface FrameDocument extends Omit { + _id: ObjectId; +} + +export interface BucketEntryDocument extends Omit { + _id: ObjectId; + renewal: Date; + toObject(): Omit; +} + +export class MongoDB { + private uri: string; + private db: Db | null; + private client: MongoClient; + + constructor(uri: string) { + this.uri = uri; + this.db = null; + this.client = new MongoClient(this.uri); + } + + get URI() { + return this.uri; + } + + async connect(): Promise { + await this.client.connect(); + + this.db = this.client.db('__inxt-network'); + + return this; + } + + getCollections(): MongoDBCollections { + if (!this.db) { + throw new Error('Database not initialized'); + } + + return { + tempShards: this.db.collection('tempshards'), + frames: this.db.collection('frames'), + bucketEntries: this.db.collection('bucketentries'), + }; + } + + disconnect(): Promise { + return this.client.close(); + } +} \ No newline at end of file diff --git a/lib/core/bucketEntries/usecase.ts b/lib/core/bucketEntries/usecase.ts index 3fa125268..ec99922f0 100644 --- a/lib/core/bucketEntries/usecase.ts +++ b/lib/core/bucketEntries/usecase.ts @@ -41,6 +41,12 @@ export class BucketEntriesUsecase { return bucketEntries; } + async findById(id: BucketEntry['id']): Promise { + const bucketEntry = await this.bucketEntriesRepository.findOne({ id }); + + return bucketEntry; + } + async countByBucket(bucketId: Bucket['id']): Promise { const count = await this.bucketEntriesRepository.count({ bucket: bucketId }); @@ -162,7 +168,7 @@ export class BucketEntriesUsecase { const shards = await this.shardsRepository.findByIds(shardIds); if (shards.length > 0) { - await this.shardsUsecase.deleteShardsStorageByUuids(shards.map(s => ({ uuid: s.uuid!, hash: s.hash }))); + await this.shardsUsecase.deleteShardsStorageByUuids(shards as any); await this.shardsRepository.deleteByIds(shards.map(s => s.id)); } diff --git a/lib/core/buckets/usecase.ts b/lib/core/buckets/usecase.ts index accb8dff8..54a11da78 100644 --- a/lib/core/buckets/usecase.ts +++ b/lib/core/buckets/usecase.ts @@ -268,6 +268,12 @@ export class BucketsUsecase { return response; } + async findById(id: Bucket['id']): Promise { + const bucket = await this.bucketsRepository.findOne({ id }); + + return bucket; + } + async getUserUsage(user: Frame['user']): Promise { const usage = await this.framesRepository.getUserUsage(user); diff --git a/lib/core/shards/MongoDBShardsRepository.ts b/lib/core/shards/MongoDBShardsRepository.ts index 1ee24a44c..a8256b726 100644 --- a/lib/core/shards/MongoDBShardsRepository.ts +++ b/lib/core/shards/MongoDBShardsRepository.ts @@ -34,6 +34,16 @@ const formatFromMongoToShard = (mongoShard: any): Shard => { export class MongoDBShardsRepository implements ShardsRepository { constructor(private model: any) {} + findWithNoUuid(limit = 10, offset = 0): Promise { + return this.model + .find({ uuid: { $exists: false } }) + .sort({ _id: 1 }) + .skip(offset) + .limit(limit) + .exec() + .then((shards: any) => shards.map(formatFromMongoToShard)); + } + findByIds(shardIds: Shard['id'][]): Promise { return this.model .find({ _id: { $in: shardIds } }) diff --git a/lib/core/shards/Repository.ts b/lib/core/shards/Repository.ts index 2416d8ae1..4508bac7b 100644 --- a/lib/core/shards/Repository.ts +++ b/lib/core/shards/Repository.ts @@ -2,6 +2,7 @@ import { Contact } from "../contacts/Contact"; import { Shard } from "./Shard"; export interface ShardsRepository { + findWithNoUuid(limit: number, offset: number): Promise; findByIds(shardIds: Shard['id'][]): Promise; findByHashes(hashes: Shard['hash'][]): Promise< (Omit & { contracts: Record } diff --git a/lib/core/shards/usecase.ts b/lib/core/shards/usecase.ts index c88f7fa3a..71567276a 100644 --- a/lib/core/shards/usecase.ts +++ b/lib/core/shards/usecase.ts @@ -2,19 +2,52 @@ import { MirrorsRepository } from '../mirrors/Repository'; import NetworkMessageQueue from "../../server/queues/networkQueue"; import { DELETING_FILE_MESSAGE } from "../../server/queues/messageTypes"; import log from '../../logger'; +import { ContactsRepository } from '../contacts/Repository'; +import { Contact } from '../contacts/Contact'; +import { MirrorWithContact } from '../mirrors/Mirror'; export class ShardsUsecase { constructor( - private mirrorsRepository: MirrorsRepository, - private networkQueue: NetworkMessageQueue + private readonly mirrorsRepository: MirrorsRepository, + private readonly contactsRepository: ContactsRepository, + private readonly networkQueue: NetworkMessageQueue ) {} - async deleteShardsStorageByUuids(shards: { hash: string, uuid: string }[]) { + async deleteShardsStorageByUuids(shards: { + hash: string, + uuid: string, + contracts: ({ nodeID: Contact['id'] })[] + }[]) { const mirrors = await this.mirrorsRepository.findByShardHashesWithContacts(shards.map(s => s.hash)); const stillExistentMirrors = mirrors.filter((mirror) => { return mirror.contact && mirror.contact.address && mirror.contact.port; }); + const noMirrors = stillExistentMirrors.length === 0; + + if (noMirrors) { + const contactIdsWithShardsHashes = shards.flatMap((s) => + s.contracts.map(c => ({ nodeID: c.nodeID, shardHash: s.hash, uuid: s.uuid })) + ); + + const contacts = await this.contactsRepository.findByIds( + contactIdsWithShardsHashes.map(c => c.nodeID) + ); + + for (const shard of shards) { + const contactsForGivenShard = contactIdsWithShardsHashes.filter((contactWHash) => { + return contactWHash.shardHash === shard.hash + }); + for (const mirror of contactsForGivenShard) { + stillExistentMirrors.push({ + id: '000000000000000000000000', + contact: contacts.find(c => c.id === mirror.nodeID) as Contact, + shardHash: mirror.shardHash + } as MirrorWithContact); + } + } + } + for (const { contact, shardHash } of stillExistentMirrors) { const { address, port } = contact; const { uuid } = (shards.find(s => s.hash === shardHash) as { hash: string, uuid: string }); @@ -35,7 +68,7 @@ export class ShardsUsecase { }) } - if (stillExistentMirrors.length > 0) { + if (!noMirrors && stillExistentMirrors.length > 0) { log.info('Deleting still existent mirrors (by uuids): %s from hashes: %s', stillExistentMirrors.map(m => m.id).toString(), shards.toString()); await this.mirrorsRepository.deleteByIds(stillExistentMirrors.map(m => m.id)); diff --git a/lib/server/http/index.ts b/lib/server/http/index.ts index 1024d8c52..27092b6da 100644 --- a/lib/server/http/index.ts +++ b/lib/server/http/index.ts @@ -91,6 +91,7 @@ export function bindNewRoutes( const shardsUsecase = new ShardsUsecase( mirrorsRepository, + contactsRepository, networkQueue ); diff --git a/package.json b/package.json index 3e84389a8..42c0be1b8 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,6 @@ "@types/jest": "^27.4.1", "@types/jsonwebtoken": "^8.5.8", "@types/lodash": "^4.14.182", - "@types/mongoose": "^5.11.97", "@types/mysql": "^2.15.21", "@types/node": "^17.0.23", "@types/node-mongodb-fixtures": "^3.2.3", diff --git a/tests/lib/core/bucketentries/usecase.test.ts b/tests/lib/core/bucketentries/usecase.test.ts index 6a0cb5732..fb5c834f9 100644 --- a/tests/lib/core/bucketentries/usecase.test.ts +++ b/tests/lib/core/bucketentries/usecase.test.ts @@ -24,6 +24,8 @@ import { ShardsUsecase } from '../../../../lib/core/shards/usecase'; import fixtures from '../fixtures'; import { BucketEntry } from '../../../../lib/core/bucketEntries/BucketEntry'; import { Bucket } from '../../../../lib/core/buckets/Bucket'; +import { ContactsRepository } from '../../../../lib/core/contacts/Repository'; +import { MongoDBContactsRepository } from '../../../../lib/core/contacts/MongoDBContactsRepository'; describe('BucketEntriesUsecase', function () { const bucketId = 'bucketIdSAMPLE'; @@ -38,6 +40,7 @@ describe('BucketEntriesUsecase', function () { let pointersRepository: PointersRepository = new MongoDBPointersRepository({}); let usersRepository: UsersRepository = new MongoDBUsersRepository({}); let bucketEntryShardsRepository: BucketEntryShardsRepository = new MongoDBBucketEntryShardsRepository({}); + let contactsRepository: ContactsRepository = new MongoDBContactsRepository({}); let networkQueue: any = { enqueueMessage: (message: any) => {} @@ -45,6 +48,7 @@ describe('BucketEntriesUsecase', function () { let shardsUseCase = new ShardsUsecase( mirrorsRepository, + contactsRepository, networkQueue, ); @@ -67,9 +71,11 @@ describe('BucketEntriesUsecase', function () { shardsRepository = new MongoDBShardsRepository({}); bucketsRepository = new MongoDBBucketsRepository({}); pointersRepository = new MongoDBPointersRepository({}); + contactsRepository = new MongoDBContactsRepository({}); shardsUseCase = new ShardsUsecase( mirrorsRepository, + contactsRepository, networkQueue, ); @@ -278,7 +284,9 @@ describe('BucketEntriesUsecase', function () { expect(findShardsStub.calledWith(shards.map(b => b.id))); expect(deleteShardsStorageStub.calledOnce).toBeTruthy(); - expect(deleteShardsStorageStub.calledWith(shards.map(s => ({ hash: s.hash, uuid: (s.uuid as string) })))) + expect(deleteShardsStorageStub.calledWith( + shards.map(s => ({ hash: s.hash, uuid: (s.uuid as string), contracts: s.contracts }))) + ) expect(deleteShardsStub.calledOnce).toBeTruthy(); expect(deleteShardsStub.calledWith(shards.map(s => s.id))); diff --git a/tests/lib/core/fixtures.ts b/tests/lib/core/fixtures.ts index 15ace86c6..7408656ea 100644 --- a/tests/lib/core/fixtures.ts +++ b/tests/lib/core/fixtures.ts @@ -7,8 +7,9 @@ import { Frame } from '../../../lib/core/frames/Frame'; import { BucketEntryShard } from '../../../lib/core/bucketEntryShards/BucketEntryShard'; import { Bucket } from '../../../lib/core/buckets/Bucket'; import { User } from '../../../lib/core/users/User'; -import { Shard } from '../../../lib/core/shards/Shard'; +import { Shard, Contract } from '../../../lib/core/shards/Shard'; import { Contact } from '../../../lib/core/contacts/Contact'; +import { Mirror } from '../../../lib/core/mirrors/Mirror'; function getBucketEntriesWithFrames(fileIds?: string[]): BucketEntryWithFrame[] { const ids = fileIds ?? [v4()]; @@ -156,7 +157,7 @@ function getShard(custom?: Partial, contactId?: Contact['id']): Shard { function getContact(custom?: Partial): Contact { const defaultContact: Contact = { - address: `http://${randomBytes(10).toString('hex')}.com`, + address: `${randomBytes(10).toString('hex')}.com`, id: v4(), ip: 'http://1.1.1.1', lastSeen: new Date(), @@ -174,6 +175,30 @@ function getContact(custom?: Partial): Contact { return { ...defaultContact, ...custom }; } +function getContract(custom?: Partial): Contract { + return { + version: 1, + farmer_id: randomBytes(40).toString('hex'), + data_size: Math.trunc(Math.random() * 1000), + data_hash: randomBytes(40).toString('hex'), + store_begin: new Date(), + ...custom + } +}; + +function getMirror(custom?: Partial): Mirror { + return { + id: v4(), + shardHash: randomBytes(40).toString('hex'), + contact: v4(), + token: v4(), + isEstablished: true, + contract: getContract(), + created: new Date(), + ...custom + }; +} + export default { getBucketEntriesWithFrames, getBucketEntriesWithoutFrames, @@ -184,5 +209,7 @@ export default { getBucket, getUser, getShard, - getContact + getContact, + getContract, + getMirror }; diff --git a/tests/lib/core/shards/usecase.test.ts b/tests/lib/core/shards/usecase.test.ts new file mode 100644 index 000000000..2193fc361 --- /dev/null +++ b/tests/lib/core/shards/usecase.test.ts @@ -0,0 +1,137 @@ +import { restore, stub } from 'sinon'; +import fixtures from '../fixtures'; + +import { MirrorsRepository } from '../../../../lib/core/mirrors/Repository'; +import { ShardsUsecase } from '../../../../lib/core/shards/usecase'; +import { MongoDBMirrorsRepository } from '../../../../lib/core/mirrors/MongoDBMirrorsRepository'; +import { MongoDBContactsRepository } from '../../../../lib/core/contacts/MongoDBContactsRepository'; +import { ContactsRepository } from '../../../../lib/core/contacts/Repository'; +import NetworkMessageQueue from '../../../../lib/server/queues/networkQueue'; +import { DELETING_FILE_MESSAGE } from '../../../../lib/server/queues/messageTypes'; +import { Shard } from '../../../../lib/core/shards/Shard'; +import { MirrorWithContact } from '../../../../lib/core/mirrors/Mirror'; + +describe('ShardsUsecase', () => { + let mirrorsRepository: MirrorsRepository = new MongoDBMirrorsRepository({}); + let contactsRepository: ContactsRepository = new MongoDBContactsRepository({}); + const queue = new NetworkMessageQueue({ + connection: { + url: `amqp://fake@fake`, + }, + exchange: { + name: 'exchangeName', + type: 'direct', + }, + queue: { + name: 'fake_name', + }, + routingKey: { + name: 'routingKeyName', + }, + }); + + let usecase = new ShardsUsecase(mirrorsRepository, contactsRepository, queue); + + beforeEach(() => { + mirrorsRepository = new MongoDBMirrorsRepository({}); + contactsRepository = new MongoDBContactsRepository({}); + + usecase = new ShardsUsecase( + mirrorsRepository, + contactsRepository, + queue, + ); + + restore(); + }); + + describe('deleteShardsStorageByUuids()', () => { + it('When mirrors exist, then it deletes them properly', async () => { + const shardsToDelete = [fixtures.getShard(), fixtures.getShard()]; + const [firstShard, secondShard] = shardsToDelete; + const contacts = shardsToDelete.map(s => fixtures.getContact({ id: s.contracts[0].nodeID })) + const mirrors: MirrorWithContact[] = contacts.map((c, i) => ({ + ...fixtures.getMirror(), + shardHash: shardsToDelete[i].hash, + contact: c, + })); + const [firstMirror, secondMirror] = mirrors; + + const findByShardHashes = stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); + const findContactsByIds = stub(contactsRepository, 'findByIds').resolves(); + const enqueueMessage = stub(queue, 'enqueueMessage').resolves(); + const deleteMirrorsByIds = stub(mirrorsRepository, 'deleteByIds').resolves(); + + await usecase.deleteShardsStorageByUuids( + shardsToDelete as (Shard & { uuid: string })[] + ); + + expect(findByShardHashes.calledOnce).toBeTruthy(); + expect(findByShardHashes.firstCall.args).toStrictEqual([shardsToDelete.map(s => s.hash)]); + expect(findContactsByIds.notCalled).toBeTruthy(); + expect(enqueueMessage.callCount).toEqual(mirrors.length); + expect(enqueueMessage.firstCall.args[0]).toEqual({ + type: DELETING_FILE_MESSAGE, + payload: { + key: firstShard.uuid, + hash: firstShard.uuid, + url: `http://${firstMirror.contact.address}:${firstMirror.contact.port}/v2/shards/${firstShard.uuid}` + } + }) + expect(enqueueMessage.secondCall.args[0]).toEqual({ + type: DELETING_FILE_MESSAGE, + payload: { + key: secondShard.uuid, + hash: secondShard.uuid, + url: `http://${secondMirror.contact.address}:${secondMirror.contact.port}/v2/shards/${secondShard.uuid}` + } + }); + expect(deleteMirrorsByIds.calledOnce).toBeTruthy(); + expect(deleteMirrorsByIds.firstCall.args).toStrictEqual([mirrors.map(m => m.id)]); + }); + + it('When mirrors do not exist, then uses contracts as fallback to delete shards', async () => { + const shardsToDelete = [fixtures.getShard(), fixtures.getShard()]; + const [firstShard, secondShard] = shardsToDelete; + const contacts = shardsToDelete.map(s => fixtures.getContact({ id: s.contracts[0].nodeID })) + const [firstContact, secondContact] = contacts; + const mirrors: MirrorWithContact[] = []; + + const findByShardHashes = stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); + const findContactsByIds = stub(contactsRepository, 'findByIds').resolves(contacts); + const enqueueMessage = stub(queue, 'enqueueMessage').resolves(); + const deleteMirrorsByIds = stub(mirrorsRepository, 'deleteByIds').resolves(); + + await usecase.deleteShardsStorageByUuids( + shardsToDelete as (Shard & { uuid: string })[] + ); + + expect(findByShardHashes.calledOnce).toBeTruthy(); + expect(findByShardHashes.firstCall.args).toStrictEqual([shardsToDelete.map(s => s.hash)]); + expect(findContactsByIds.calledOnce).toBeTruthy(); + expect(findContactsByIds.firstCall.args).toStrictEqual([ + shardsToDelete.flatMap(s => s.contracts.flatMap(c => c.nodeID)) + ]); + expect(enqueueMessage.callCount).toEqual( + shardsToDelete.reduce((a, s) => a + s.contracts.length, 0) + ); + expect(enqueueMessage.firstCall.args[0]).toEqual({ + type: DELETING_FILE_MESSAGE, + payload: { + key: firstShard.uuid, + hash: firstShard.uuid, + url: `http://${firstContact.address}:${firstContact.port}/v2/shards/${firstShard.uuid}` + } + }) + expect(enqueueMessage.secondCall.args[0]).toEqual({ + type: DELETING_FILE_MESSAGE, + payload: { + key: secondShard.uuid, + hash: secondShard.uuid, + url: `http://${secondContact.address}:${secondContact.port}/v2/shards/${secondShard.uuid}` + } + }); + expect(deleteMirrorsByIds.notCalled).toBeTruthy(); + }); + }); +}); diff --git a/yarn.lock b/yarn.lock index db6fcffc9..19665cd00 100644 --- a/yarn.lock +++ b/yarn.lock @@ -844,13 +844,6 @@ "@types/bson" "*" "@types/node" "*" -"@types/mongoose@^5.11.97": - version "5.11.97" - resolved "https://registry.yarnpkg.com/@types/mongoose/-/mongoose-5.11.97.tgz#80b0357f3de6807eb597262f52e49c3e13ee14d8" - integrity sha512-cqwOVYT3qXyLiGw7ueU2kX9noE8DPGRY6z8eUxudhXY8NZ7DMKYAxyZkLSevGfhCX3dO/AoX5/SO9lAzfjon0Q== - dependencies: - mongoose "*" - "@types/mysql@^2.15.21": version "2.15.21" resolved "https://registry.yarnpkg.com/@types/mysql/-/mysql-2.15.21.tgz#7516cba7f9d077f980100c85fd500c8210bd5e45" @@ -1638,7 +1631,7 @@ bson@^1.1.4: resolved "https://registry.yarnpkg.com/bson/-/bson-1.1.6.tgz#fb819be9a60cd677e0853aee4ca712a785d6618a" integrity sha512-EvVNVeGo4tHxwi8L6bPj3y3itEvStdwvvlojVxxbyYfoaxJ6keLgrTuKdyfEAszFK+H3olzBuafE0yoh0D1gdg== -bson@^4.6.2, bson@^4.6.3: +bson@^4.6.3: version "4.7.2" resolved "https://registry.yarnpkg.com/bson/-/bson-4.7.2.tgz#320f4ad0eaf5312dd9b45dc369cc48945e2a5f2e" integrity sha512-Ry9wCtIZ5kGqkJoi6aD8KjxFZEx78guTQDnpXWiNthsxzrxAK/i8E6pCHAIZTbaEFWcOCvbecMukfK7XUvyLpQ== @@ -4992,11 +4985,6 @@ kareem@2.3.2: resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.3.2.tgz#78c4508894985b8d38a0dc15e1a8e11078f2ca93" integrity sha512-STHz9P7X2L4Kwn72fA4rGyqyXdmrMSdxqHx9IXon/FXluXieaFA6KJ2upcHAHxQPQ0LeM/OjLrhFxifHewOALQ== -kareem@2.4.1: - version "2.4.1" - resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.4.1.tgz#7d81ec518204a48c1cb16554af126806c3cd82b0" - integrity sha512-aJ9opVoXroQUPfovYP5kaj2lM7Jn02Gw13bL0lg9v0V7SaUc0qavPs0Eue7d2DcC3NjqI6QAUElXNsuZSeM+EA== - kareem@2.5.1: version "2.5.1" resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.5.1.tgz#7b8203e11819a8e77a34b3517d3ead206764d15d" @@ -5559,18 +5547,6 @@ mongodb@3.7.4: optionalDependencies: saslprep "^1.0.0" -mongodb@4.7.0, mongodb@^4.5.0, mongodb@^4.6.0: - version "4.7.0" - resolved "https://registry.yarnpkg.com/mongodb/-/mongodb-4.7.0.tgz#99f7323271d93659067695b60e7b4efee2de9bf0" - integrity sha512-HhVar6hsUeMAVlIbwQwWtV36iyjKd9qdhY+s4wcU8K6TOj4Q331iiMy+FoPuxEntDIijTYWivwFJkLv8q/ZgvA== - dependencies: - bson "^4.6.3" - denque "^2.0.1" - mongodb-connection-string-url "^2.5.2" - socks "^2.6.2" - optionalDependencies: - saslprep "^1.0.3" - mongodb@6.2.0: version "6.2.0" resolved "https://registry.yarnpkg.com/mongodb/-/mongodb-6.2.0.tgz#2c9dcb3eeaf528ed850e94b3df392de6c6b0d7ab" @@ -5593,6 +5569,18 @@ mongodb@^3.6.9: optionalDependencies: saslprep "^1.0.0" +mongodb@^4.5.0, mongodb@^4.6.0: + version "4.7.0" + resolved "https://registry.yarnpkg.com/mongodb/-/mongodb-4.7.0.tgz#99f7323271d93659067695b60e7b4efee2de9bf0" + integrity sha512-HhVar6hsUeMAVlIbwQwWtV36iyjKd9qdhY+s4wcU8K6TOj4Q331iiMy+FoPuxEntDIijTYWivwFJkLv8q/ZgvA== + dependencies: + bson "^4.6.3" + denque "^2.0.1" + mongodb-connection-string-url "^2.5.2" + socks "^2.6.2" + optionalDependencies: + saslprep "^1.0.3" + mongoose-int32@^0.1.0: version "0.1.0" resolved "https://registry.yarnpkg.com/mongoose-int32/-/mongoose-int32-0.1.0.tgz#2c8e138e99fa1b44e2470776177a76529a244303" @@ -5610,19 +5598,6 @@ mongoose-types@^1.0.3: dependencies: mongoose ">= 1.0.16" -mongoose@*: - version "6.4.4" - resolved "https://registry.yarnpkg.com/mongoose/-/mongoose-6.4.4.tgz#4e22a36373d8a867ee8f73063d8b31f1e451316d" - integrity sha512-r6sp96veRNhNIWFtHHe4Lqak+ilgiExYnnMLhYTGdzjIMR90G1ayx0JKFVdHuC6dKNHGFX0ETJGbf36N8Romjg== - dependencies: - bson "^4.6.2" - kareem "2.4.1" - mongodb "4.7.0" - mpath "0.9.0" - mquery "4.0.3" - ms "2.1.3" - sift "16.0.0" - mongoose@=4.11.14: version "4.11.14" resolved "https://registry.yarnpkg.com/mongoose/-/mongoose-4.11.14.tgz#b85402aaf28c5c3e45c8ef93fe69544eaa5d00f3" @@ -5715,13 +5690,6 @@ mquery@3.2.5: safe-buffer "5.1.2" sliced "1.0.1" -mquery@4.0.3: - version "4.0.3" - resolved "https://registry.yarnpkg.com/mquery/-/mquery-4.0.3.tgz#4d15f938e6247d773a942c912d9748bd1965f89d" - integrity sha512-J5heI+P08I6VJ2Ky3+33IpCdAvlYGTSUjwTPxkAr8i8EoduPMBX2OY/wa3IKZIQl7MU4SbFk8ndgSKyB/cl1zA== - dependencies: - debug "4.x" - mquery@5.0.0: version "5.0.0" resolved "https://registry.yarnpkg.com/mquery/-/mquery-5.0.0.tgz#a95be5dfc610b23862df34a47d3e5d60e110695d" @@ -7154,11 +7122,6 @@ sift@13.5.2: resolved "https://registry.yarnpkg.com/sift/-/sift-13.5.2.tgz#24a715e13c617b086166cd04917d204a591c9da6" integrity sha512-+gxdEOMA2J+AI+fVsCqeNn7Tgx3M9ZN9jdi95939l1IJ8cZsqS8sqpJyOkic2SJk+1+98Uwryt/gL6XDaV+UZA== -sift@16.0.0: - version "16.0.0" - resolved "https://registry.yarnpkg.com/sift/-/sift-16.0.0.tgz#447991577db61f1a8fab727a8a98a6db57a23eb8" - integrity sha512-ILTjdP2Mv9V1kIxWMXeMTIRbOBrqKc4JAXmFMnFq3fKeyQ2Qwa3Dw1ubcye3vR+Y6ofA0b9gNDr/y2t6eUeIzQ== - sift@16.0.1: version "16.0.1" resolved "https://registry.yarnpkg.com/sift/-/sift-16.0.1.tgz#e9c2ccc72191585008cf3e36fc447b2d2633a053"