Skip to content

Commit

Permalink
Merge pull request #158 from internxt/feat/v1-objects-cleanup
Browse files Browse the repository at this point in the history
[_]: fix/handle-shards-deletion-when-no-mirrors-exist
  • Loading branch information
sg-gs authored Jul 9, 2024
2 parents b7d5e7a + 344ed90 commit 361446f
Show file tree
Hide file tree
Showing 22 changed files with 827 additions and 62 deletions.
22 changes: 22 additions & 0 deletions bin/cli/commands/clean-stalled-bucket-entries.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { PrepareFunctionReturnType } from "../init";
import { CommandId } from "./id";
import cleanStalledBucketEntries from "../tasks/clean-stalled-bucket-entries.task";

export default {
id: CommandId.CleanStalledBucketEntries,
version: '0.0.1',
fn: async (
{
usecase: { bucketsUsecase, bucketEntriesUsecase },
repo: { framesRepository },
readers
}: PrepareFunctionReturnType,
): Promise<void> => {
await cleanStalledBucketEntries(
bucketsUsecase,
bucketEntriesUsecase,
framesRepository,
readers.bucketEntriesReader
);
}
};
13 changes: 13 additions & 0 deletions bin/cli/commands/clean-stalled-frames.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { PrepareFunctionReturnType } from "../init";
import { cleanStalledFrames } from "../tasks/clean-stalled-frames.task";
import { CommandId } from "./id";

export default {
id: CommandId.CleanStalledFrames,
version: '0.0.1',
fn: async (
{ usecase: { bucketEntriesUsecase }, readers }: PrepareFunctionReturnType,
): Promise<void> => {
await cleanStalledFrames(bucketEntriesUsecase, readers.framesReader);
},
};
2 changes: 2 additions & 0 deletions bin/cli/commands/id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ export enum CommandId {
DestroyUserBuckets = 'destroy-user-buckets',
EmptyBucket = 'empty-bucket',
EmptyBuckets = 'empty-buckets',
CleanStalledFrames = 'clean-stalled-frames',
CleanStalledBucketEntries = 'clean-stalled-bucket-entries',
}
22 changes: 22 additions & 0 deletions bin/cli/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { buildCommand } from './build'
import { default as destroyUserBuckets } from "./destroy-user-buckets.command";
import { default as emptyBucket } from "./empty-bucket.command";
import { default as emptyBuckets } from "./empty-buckets.command";
import { default as cleanStalledFrames } from "./clean-stalled-frames.command";
import { default as cleanStalledBucketEntries } from "./clean-stalled-bucket-entries.command";

export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ({
[destroyUserBuckets.id]: buildCommand({
Expand Down Expand Up @@ -35,4 +37,24 @@ export default (resources: PrepareFunctionReturnType, onFinish: () => void) => (
await emptyBuckets.fn(resources, userId);
onFinish();
}),

[cleanStalledFrames.id]: buildCommand({
version: cleanStalledFrames.version,
command: `${cleanStalledFrames.id}`,
description: 'Cleans stalled frames',
options: [],
}).action(async () => {
await cleanStalledFrames.fn(resources);
onFinish();
}),

[cleanStalledBucketEntries.id]: buildCommand({
version: cleanStalledBucketEntries.version,
command: `${cleanStalledBucketEntries.id}`,
description: 'Cleans stalled bucket entries',
options: [],
}).action(async () => {
await cleanStalledBucketEntries.fn(resources);
onFinish();
}),
});
18 changes: 18 additions & 0 deletions bin/cli/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ import { ShardsRepository } from '../../lib/core/shards/Repository';
import { UploadsRepository } from '../../lib/core/uploads/Repository';
import { TokensRepository } from '../../lib/core/tokens/Repository';
import { ContactsRepository } from '../../lib/core/contacts/Repository';
import { MongoDB } from '../delete-objects/temp-shard.model';
import { DatabaseFramesReader, DatabaseBucketEntriesReaderWithoutBucket } from '../delete-objects/ObjectStorage';

const Config = require('../../lib/config');

const config = new Config(process.env.NODE_ENV || 'develop', '', '');

export type PrepareFunctionReturnType = {
readers: {
framesReader: DatabaseFramesReader,
bucketEntriesReader: DatabaseBucketEntriesReaderWithoutBucket,
},
repo: {
bucketEntriesRepository: BucketEntriesRepository,
bucketEntryShardsRepository:BucketEntryShardsRepository,
Expand All @@ -57,6 +63,8 @@ export type PrepareFunctionReturnType = {
export async function prepare(): Promise<PrepareFunctionReturnType> {
const QUEUE_NAME = 'NETWORK_WORKER_TASKS_QUEUE';

const newDbConnection = new MongoDB(process.env.inxtbridge_storage__mongoUri as string);
await newDbConnection.connect();
const models = await connectToDatabase('', '');
const { QUEUE_USERNAME, QUEUE_PASSWORD, QUEUE_HOST } = config;

Expand Down Expand Up @@ -114,9 +122,19 @@ export async function prepare(): Promise<PrepareFunctionReturnType> {
tokensRepository,
contactsRepository,
)
const framesReader = new DatabaseFramesReader(
newDbConnection.getCollections().frames
);
const bucketEntriesReader = new DatabaseBucketEntriesReaderWithoutBucket(
newDbConnection.getCollections().bucketEntries
);
await networkQueue.connectAndRetry();

return {
readers: {
framesReader,
bucketEntriesReader,
},
repo: {
bucketEntriesRepository,
bucketEntryShardsRepository,
Expand Down
55 changes: 55 additions & 0 deletions bin/cli/tasks/clean-stalled-bucket-entries.task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { BucketEntriesUsecase } from "../../../lib/core/bucketEntries/usecase";
import { BucketsUsecase } from "../../../lib/core/buckets/usecase";
import { FramesRepository } from "../../../lib/core/frames/Repository";
import { BucketEntriesReader } from "../../delete-objects/ObjectStorage";
import { BucketEntryDocument } from "../../delete-objects/temp-shard.model";

export type CleanStalledBucketEntriesFunctionType = (
bucketsUsecase: BucketsUsecase,
bucketEntriesUsecase: BucketEntriesUsecase,
framesRepository: FramesRepository,
reader: BucketEntriesReader,
) => Promise<void>;

const task: CleanStalledBucketEntriesFunctionType = async (
bucketsUsecase,
bucketEntriesUsecase,
framesRepository,
reader,
): Promise<void> => {
const deleteInBulksOf = 20;
const toDelete: BucketEntryDocument[] = [];
const stats = {
totalSize: 0,
totalCount: 0,
}

for await (const bucketEntry of reader.list()) {
const bucketDoesNotExist = await bucketsUsecase.findById(bucketEntry.bucket) === null;
const isV1 = !bucketEntry.version;

if (bucketDoesNotExist && isV1) {
toDelete.push(bucketEntry);
}

if (toDelete.length === deleteInBulksOf) {
const frames = await framesRepository.findByIds(toDelete.map(b => b.frame!));
const beIds = toDelete.map(be => be._id.toString());

await bucketEntriesUsecase.removeFiles(beIds);

toDelete.forEach(be => {
console.log(`deleting entry ${be._id}`);
});

stats.totalSize += frames.reduce((acc, curr) => acc + (curr.size || 0), 0);
stats.totalCount += toDelete.length;

toDelete.length = 0;

console.log(`total size ${stats.totalSize}, total count ${stats.totalCount}`);
}
}
}

export default task;
42 changes: 42 additions & 0 deletions bin/cli/tasks/clean-stalled-frames.task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { BucketEntriesUsecase } from "../../../lib/core/bucketEntries/usecase";
import { FramesReader } from "../../delete-objects/ObjectStorage";
import { Frame } from "../../../lib/core/frames/Frame";
import { FrameDocument } from "../../delete-objects/temp-shard.model";

export type CleanStalledFramesFunctionType = (
bucketEntriesUsecase: BucketEntriesUsecase,
reader: FramesReader,
) => Promise<void>;

export const cleanStalledFrames: CleanStalledFramesFunctionType = async (
bucketEntriesUsecase,
reader,
): Promise<void> => {
const deleteInBulksOf = 20;
const toDelete: { frame: Frame['id'], id: '', _frame: FrameDocument }[] = [];
const stats = {
totalSize: 0,
totalCount: 0,
}

for await (const frame of reader.list()) {
if (frame.bucketEntry) {
const be = await bucketEntriesUsecase.findById(frame.bucketEntry);

if (!be) {
console.log(`deleting frame ${frame._id}, be ${frame.bucketEntry}, size ${frame.size}`);
toDelete.push({ frame: frame._id.toString(), id: '', _frame: frame });
}
}
if (toDelete.length === deleteInBulksOf) {
await bucketEntriesUsecase.removeFilesV1(toDelete as any);

stats.totalSize += toDelete.reduce((acc, curr) => acc + curr._frame.size, 0);
stats.totalCount += toDelete.length;

toDelete.length = 0;

console.log(`total size ${stats.totalSize}, total count ${stats.totalCount}`);
}
}
}
Loading

0 comments on commit 361446f

Please sign in to comment.