Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[_]: fix/handle-shards-deletion-when-no-mirrors-exist #158

Merged
merged 10 commits into from
Jul 9, 2024
22 changes: 22 additions & 0 deletions bin/cli/commands/clean-stalled-bucket-entries.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { PrepareFunctionReturnType } from "../init";
import { CommandId } from "./id";
import cleanStalledBucketEntries from "../tasks/clean-stalled-bucket-entries.task";

export default {
id: CommandId.CleanStalledBucketEntries,
version: '0.0.1',
fn: async (
{
usecase: { bucketsUsecase, bucketEntriesUsecase },
repo: { framesRepository },
readers
}: PrepareFunctionReturnType,
): Promise<void> => {
await cleanStalledBucketEntries(
bucketsUsecase,
bucketEntriesUsecase,
framesRepository,
readers.bucketEntriesReader
);
}
};
13 changes: 13 additions & 0 deletions bin/cli/commands/clean-stalled-frames.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { PrepareFunctionReturnType } from "../init";
import { cleanStalledFrames } from "../tasks/clean-stalled-frames.task";
import { CommandId } from "./id";

export default {
id: CommandId.CleanStalledFrames,
version: '0.0.1',
fn: async (
{ usecase: { bucketEntriesUsecase }, readers }: PrepareFunctionReturnType,
): Promise<void> => {
await cleanStalledFrames(bucketEntriesUsecase, readers.framesReader);
},
};
2 changes: 2 additions & 0 deletions bin/cli/commands/id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ export enum CommandId {
DestroyUserBuckets = 'destroy-user-buckets',
EmptyBucket = 'empty-bucket',
EmptyBuckets = 'empty-buckets',
CleanStalledFrames = 'clean-stalled-frames',
CleanStalledBucketEntries = 'clean-stalled-bucket-entries',
}
22 changes: 22 additions & 0 deletions bin/cli/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { buildCommand } from './build'
import { default as destroyUserBuckets } from "./destroy-user-buckets.command";
import { default as emptyBucket } from "./empty-bucket.command";
import { default as emptyBuckets } from "./empty-buckets.command";
import { default as cleanStalledFrames } from "./clean-stalled-frames.command";
import { default as cleanStalledBucketEntries } from "./clean-stalled-bucket-entries.command";

export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ({
[destroyUserBuckets.id]: buildCommand({
Expand Down Expand Up @@ -35,4 +37,24 @@ export default (resources: PrepareFunctionReturnType, onFinish: () => void) => (
await emptyBuckets.fn(resources, userId);
onFinish();
}),

[cleanStalledFrames.id]: buildCommand({
version: cleanStalledFrames.version,
command: `${cleanStalledFrames.id}`,
description: 'Cleans stalled frames',
options: [],
}).action(async () => {
await cleanStalledFrames.fn(resources);
onFinish();
}),

[cleanStalledBucketEntries.id]: buildCommand({
version: cleanStalledBucketEntries.version,
command: `${cleanStalledBucketEntries.id}`,
description: 'Cleans stalled bucket entries',
options: [],
}).action(async () => {
await cleanStalledBucketEntries.fn(resources);
onFinish();
}),
});
18 changes: 18 additions & 0 deletions bin/cli/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ import { ShardsRepository } from '../../lib/core/shards/Repository';
import { UploadsRepository } from '../../lib/core/uploads/Repository';
import { TokensRepository } from '../../lib/core/tokens/Repository';
import { ContactsRepository } from '../../lib/core/contacts/Repository';
import { MongoDB } from '../delete-objects/temp-shard.model';
import { DatabaseFramesReader, DatabaseBucketEntriesReaderWithoutBucket } from '../delete-objects/ObjectStorage';

const Config = require('../../lib/config');

const config = new Config(process.env.NODE_ENV || 'develop', '', '');

export type PrepareFunctionReturnType = {
readers: {
framesReader: DatabaseFramesReader,
bucketEntriesReader: DatabaseBucketEntriesReaderWithoutBucket,
},
repo: {
bucketEntriesRepository: BucketEntriesRepository,
bucketEntryShardsRepository:BucketEntryShardsRepository,
Expand All @@ -57,6 +63,8 @@ export type PrepareFunctionReturnType = {
export async function prepare(): Promise<PrepareFunctionReturnType> {
const QUEUE_NAME = 'NETWORK_WORKER_TASKS_QUEUE';

const newDbConnection = new MongoDB(process.env.inxtbridge_storage__mongoUri as string);
await newDbConnection.connect();
const models = await connectToDatabase('', '');
const { QUEUE_USERNAME, QUEUE_PASSWORD, QUEUE_HOST } = config;

Expand Down Expand Up @@ -114,9 +122,19 @@ export async function prepare(): Promise<PrepareFunctionReturnType> {
tokensRepository,
contactsRepository,
)
const framesReader = new DatabaseFramesReader(
newDbConnection.getCollections().frames
);
const bucketEntriesReader = new DatabaseBucketEntriesReaderWithoutBucket(
newDbConnection.getCollections().bucketEntries
);
await networkQueue.connectAndRetry();

return {
readers: {
framesReader,
bucketEntriesReader,
},
repo: {
bucketEntriesRepository,
bucketEntryShardsRepository,
Expand Down
55 changes: 55 additions & 0 deletions bin/cli/tasks/clean-stalled-bucket-entries.task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { BucketEntriesUsecase } from "../../../lib/core/bucketEntries/usecase";
import { BucketsUsecase } from "../../../lib/core/buckets/usecase";
import { FramesRepository } from "../../../lib/core/frames/Repository";
import { BucketEntriesReader } from "../../delete-objects/ObjectStorage";
import { BucketEntryDocument } from "../../delete-objects/temp-shard.model";

export type CleanStalledBucketEntriesFunctionType = (
bucketsUsecase: BucketsUsecase,
bucketEntriesUsecase: BucketEntriesUsecase,
framesRepository: FramesRepository,
reader: BucketEntriesReader,
) => Promise<void>;

const task: CleanStalledBucketEntriesFunctionType = async (
bucketsUsecase,
bucketEntriesUsecase,
framesRepository,
reader,
): Promise<void> => {
const deleteInBulksOf = 20;
const toDelete: BucketEntryDocument[] = [];
const stats = {
totalSize: 0,
totalCount: 0,
}

for await (const bucketEntry of reader.list()) {
const bucketDoesNotExist = await bucketsUsecase.findById(bucketEntry.bucket) === null;
const isV1 = !bucketEntry.version;

if (bucketDoesNotExist && isV1) {
toDelete.push(bucketEntry);
}

if (toDelete.length === deleteInBulksOf) {
const frames = await framesRepository.findByIds(toDelete.map(b => b.frame!));
const beIds = toDelete.map(be => be._id.toString());

await bucketEntriesUsecase.removeFiles(beIds);

toDelete.forEach(be => {
console.log(`deleting entry ${be._id}`);
});

stats.totalSize += frames.reduce((acc, curr) => acc + (curr.size || 0), 0);
stats.totalCount += toDelete.length;

toDelete.length = 0;

console.log(`total size ${stats.totalSize}, total count ${stats.totalCount}`);
}
}
}

export default task;
42 changes: 42 additions & 0 deletions bin/cli/tasks/clean-stalled-frames.task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { BucketEntriesUsecase } from "../../../lib/core/bucketEntries/usecase";
import { FramesReader } from "../../delete-objects/ObjectStorage";
import { Frame } from "../../../lib/core/frames/Frame";
import { FrameDocument } from "../../delete-objects/temp-shard.model";

export type CleanStalledFramesFunctionType = (
bucketEntriesUsecase: BucketEntriesUsecase,
reader: FramesReader,
) => Promise<void>;

export const cleanStalledFrames: CleanStalledFramesFunctionType = async (
bucketEntriesUsecase,
reader,
): Promise<void> => {
const deleteInBulksOf = 20;
const toDelete: { frame: Frame['id'], id: '', _frame: FrameDocument }[] = [];
const stats = {
totalSize: 0,
totalCount: 0,
}

for await (const frame of reader.list()) {
if (frame.bucketEntry) {
const be = await bucketEntriesUsecase.findById(frame.bucketEntry);

if (!be) {
console.log(`deleting frame ${frame._id}, be ${frame.bucketEntry}, size ${frame.size}`);
toDelete.push({ frame: frame._id.toString(), id: '', _frame: frame });
}
}
if (toDelete.length === deleteInBulksOf) {
await bucketEntriesUsecase.removeFilesV1(toDelete as any);

stats.totalSize += toDelete.reduce((acc, curr) => acc + curr._frame.size, 0);
stats.totalCount += toDelete.length;

toDelete.length = 0;

console.log(`total size ${stats.totalSize}, total count ${stats.totalCount}`);
}
}
}
Loading
Loading