Skip to content

Commit

Permalink
fix(shards): handle deletion when no mirrors exist
Browse files Browse the repository at this point in the history
  • Loading branch information
sg-gs committed Jul 9, 2024
1 parent a2a3c14 commit 344ed90
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 9 deletions.
2 changes: 1 addition & 1 deletion lib/core/bucketEntries/usecase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export class BucketEntriesUsecase {
const shards = await this.shardsRepository.findByIds(shardIds);

if (shards.length > 0) {
await this.shardsUsecase.deleteShardsStorageByUuids(shards.map(s => ({ uuid: s.uuid!, hash: s.hash })));
await this.shardsUsecase.deleteShardsStorageByUuids(shards as any);
await this.shardsRepository.deleteByIds(shards.map(s => s.id));
}

Expand Down
41 changes: 37 additions & 4 deletions lib/core/shards/usecase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,52 @@ import { MirrorsRepository } from '../mirrors/Repository';
import NetworkMessageQueue from "../../server/queues/networkQueue";
import { DELETING_FILE_MESSAGE } from "../../server/queues/messageTypes";
import log from '../../logger';
import { ContactsRepository } from '../contacts/Repository';
import { Contact } from '../contacts/Contact';
import { MirrorWithContact } from '../mirrors/Mirror';

export class ShardsUsecase {
constructor(
private mirrorsRepository: MirrorsRepository,
private networkQueue: NetworkMessageQueue
private readonly mirrorsRepository: MirrorsRepository,
private readonly contactsRepository: ContactsRepository,
private readonly networkQueue: NetworkMessageQueue
) {}

async deleteShardsStorageByUuids(shards: { hash: string, uuid: string }[]) {
async deleteShardsStorageByUuids(shards: {
hash: string,
uuid: string,
contracts: ({ nodeID: Contact['id'] })[]
}[]) {
const mirrors = await this.mirrorsRepository.findByShardHashesWithContacts(shards.map(s => s.hash));
const stillExistentMirrors = mirrors.filter((mirror) => {
return mirror.contact && mirror.contact.address && mirror.contact.port;
});

const noMirrors = stillExistentMirrors.length === 0;

if (noMirrors) {
const contactIdsWithShardsHashes = shards.flatMap((s) =>
s.contracts.map(c => ({ nodeID: c.nodeID, shardHash: s.hash, uuid: s.uuid }))
);

const contacts = await this.contactsRepository.findByIds(
contactIdsWithShardsHashes.map(c => c.nodeID)
);

for (const shard of shards) {
const contactsForGivenShard = contactIdsWithShardsHashes.filter((contactWHash) => {
return contactWHash.shardHash === shard.hash
});
for (const mirror of contactsForGivenShard) {
stillExistentMirrors.push({
id: '000000000000000000000000',
contact: contacts.find(c => c.id === mirror.nodeID) as Contact,
shardHash: mirror.shardHash
} as MirrorWithContact);
}
}
}

for (const { contact, shardHash } of stillExistentMirrors) {
const { address, port } = contact;
const { uuid } = (shards.find(s => s.hash === shardHash) as { hash: string, uuid: string });
Expand All @@ -35,7 +68,7 @@ export class ShardsUsecase {
})
}

if (stillExistentMirrors.length > 0) {
if (!noMirrors && stillExistentMirrors.length > 0) {
log.info('Deleting still existent mirrors (by uuids): %s from hashes: %s', stillExistentMirrors.map(m => m.id).toString(), shards.toString());

await this.mirrorsRepository.deleteByIds(stillExistentMirrors.map(m => m.id));
Expand Down
1 change: 1 addition & 0 deletions lib/server/http/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export function bindNewRoutes(

const shardsUsecase = new ShardsUsecase(
mirrorsRepository,
contactsRepository,
networkQueue
);

Expand Down
10 changes: 9 additions & 1 deletion tests/lib/core/bucketentries/usecase.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { ShardsUsecase } from '../../../../lib/core/shards/usecase';
import fixtures from '../fixtures';
import { BucketEntry } from '../../../../lib/core/bucketEntries/BucketEntry';
import { Bucket } from '../../../../lib/core/buckets/Bucket';
import { ContactsRepository } from '../../../../lib/core/contacts/Repository';
import { MongoDBContactsRepository } from '../../../../lib/core/contacts/MongoDBContactsRepository';

describe('BucketEntriesUsecase', function () {
const bucketId = 'bucketIdSAMPLE';
Expand All @@ -38,13 +40,15 @@ describe('BucketEntriesUsecase', function () {
let pointersRepository: PointersRepository = new MongoDBPointersRepository({});
let usersRepository: UsersRepository = new MongoDBUsersRepository({});
let bucketEntryShardsRepository: BucketEntryShardsRepository = new MongoDBBucketEntryShardsRepository({});
let contactsRepository: ContactsRepository = new MongoDBContactsRepository({});

let networkQueue: any = {
enqueueMessage: (message: any) => {}
};

let shardsUseCase = new ShardsUsecase(
mirrorsRepository,
contactsRepository,
networkQueue,
);

Expand All @@ -67,9 +71,11 @@ describe('BucketEntriesUsecase', function () {
shardsRepository = new MongoDBShardsRepository({});
bucketsRepository = new MongoDBBucketsRepository({});
pointersRepository = new MongoDBPointersRepository({});
contactsRepository = new MongoDBContactsRepository({});

shardsUseCase = new ShardsUsecase(
mirrorsRepository,
contactsRepository,
networkQueue,
);

Expand Down Expand Up @@ -278,7 +284,9 @@ describe('BucketEntriesUsecase', function () {
expect(findShardsStub.calledWith(shards.map(b => b.id)));

expect(deleteShardsStorageStub.calledOnce).toBeTruthy();
expect(deleteShardsStorageStub.calledWith(shards.map(s => ({ hash: s.hash, uuid: (s.uuid as string) }))))
expect(deleteShardsStorageStub.calledWith(
shards.map(s => ({ hash: s.hash, uuid: (s.uuid as string), contracts: s.contracts })))
)

expect(deleteShardsStub.calledOnce).toBeTruthy();
expect(deleteShardsStub.calledWith(shards.map(s => s.id)));
Expand Down
33 changes: 30 additions & 3 deletions tests/lib/core/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import { Frame } from '../../../lib/core/frames/Frame';
import { BucketEntryShard } from '../../../lib/core/bucketEntryShards/BucketEntryShard';
import { Bucket } from '../../../lib/core/buckets/Bucket';
import { User } from '../../../lib/core/users/User';
import { Shard } from '../../../lib/core/shards/Shard';
import { Shard, Contract } from '../../../lib/core/shards/Shard';
import { Contact } from '../../../lib/core/contacts/Contact';
import { Mirror } from '../../../lib/core/mirrors/Mirror';

function getBucketEntriesWithFrames(fileIds?: string[]): BucketEntryWithFrame[] {
const ids = fileIds ?? [v4()];
Expand Down Expand Up @@ -156,7 +157,7 @@ function getShard(custom?: Partial<Shard>, contactId?: Contact['id']): Shard {

function getContact(custom?: Partial<Contact>): Contact {
const defaultContact: Contact = {
address: `http://${randomBytes(10).toString('hex')}.com`,
address: `${randomBytes(10).toString('hex')}.com`,
id: v4(),
ip: 'http://1.1.1.1',
lastSeen: new Date(),
Expand All @@ -174,6 +175,30 @@ function getContact(custom?: Partial<Contact>): Contact {
return { ...defaultContact, ...custom };
}

function getContract(custom?: Partial<Contract>): Contract {
return {
version: 1,
farmer_id: randomBytes(40).toString('hex'),
data_size: Math.trunc(Math.random() * 1000),
data_hash: randomBytes(40).toString('hex'),
store_begin: new Date(),
...custom
}
};

function getMirror(custom?: Partial<Mirror>): Mirror {
return {
id: v4(),
shardHash: randomBytes(40).toString('hex'),
contact: v4(),
token: v4(),
isEstablished: true,
contract: getContract(),
created: new Date(),
...custom
};
}

export default {
getBucketEntriesWithFrames,
getBucketEntriesWithoutFrames,
Expand All @@ -184,5 +209,7 @@ export default {
getBucket,
getUser,
getShard,
getContact
getContact,
getContract,
getMirror
};
137 changes: 137 additions & 0 deletions tests/lib/core/shards/usecase.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { restore, stub } from 'sinon';
import fixtures from '../fixtures';

import { MirrorsRepository } from '../../../../lib/core/mirrors/Repository';
import { ShardsUsecase } from '../../../../lib/core/shards/usecase';
import { MongoDBMirrorsRepository } from '../../../../lib/core/mirrors/MongoDBMirrorsRepository';
import { MongoDBContactsRepository } from '../../../../lib/core/contacts/MongoDBContactsRepository';
import { ContactsRepository } from '../../../../lib/core/contacts/Repository';
import NetworkMessageQueue from '../../../../lib/server/queues/networkQueue';
import { DELETING_FILE_MESSAGE } from '../../../../lib/server/queues/messageTypes';
import { Shard } from '../../../../lib/core/shards/Shard';
import { MirrorWithContact } from '../../../../lib/core/mirrors/Mirror';

describe('ShardsUsecase', () => {
let mirrorsRepository: MirrorsRepository = new MongoDBMirrorsRepository({});
let contactsRepository: ContactsRepository = new MongoDBContactsRepository({});
const queue = new NetworkMessageQueue({
connection: {
url: `amqp://fake@fake`,
},
exchange: {
name: 'exchangeName',
type: 'direct',
},
queue: {
name: 'fake_name',
},
routingKey: {
name: 'routingKeyName',
},
});

let usecase = new ShardsUsecase(mirrorsRepository, contactsRepository, queue);

beforeEach(() => {
mirrorsRepository = new MongoDBMirrorsRepository({});
contactsRepository = new MongoDBContactsRepository({});

usecase = new ShardsUsecase(
mirrorsRepository,
contactsRepository,
queue,
);

restore();
});

describe('deleteShardsStorageByUuids()', () => {
it('When mirrors exist, then it deletes them properly', async () => {
const shardsToDelete = [fixtures.getShard(), fixtures.getShard()];
const [firstShard, secondShard] = shardsToDelete;
const contacts = shardsToDelete.map(s => fixtures.getContact({ id: s.contracts[0].nodeID }))
const mirrors: MirrorWithContact[] = contacts.map((c, i) => ({
...fixtures.getMirror(),
shardHash: shardsToDelete[i].hash,
contact: c,
}));
const [firstMirror, secondMirror] = mirrors;

const findByShardHashes = stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors);
const findContactsByIds = stub(contactsRepository, 'findByIds').resolves();
const enqueueMessage = stub(queue, 'enqueueMessage').resolves();
const deleteMirrorsByIds = stub(mirrorsRepository, 'deleteByIds').resolves();

await usecase.deleteShardsStorageByUuids(
shardsToDelete as (Shard & { uuid: string })[]
);

expect(findByShardHashes.calledOnce).toBeTruthy();
expect(findByShardHashes.firstCall.args).toStrictEqual([shardsToDelete.map(s => s.hash)]);
expect(findContactsByIds.notCalled).toBeTruthy();
expect(enqueueMessage.callCount).toEqual(mirrors.length);
expect(enqueueMessage.firstCall.args[0]).toEqual({
type: DELETING_FILE_MESSAGE,
payload: {
key: firstShard.uuid,
hash: firstShard.uuid,
url: `http://${firstMirror.contact.address}:${firstMirror.contact.port}/v2/shards/${firstShard.uuid}`
}
})
expect(enqueueMessage.secondCall.args[0]).toEqual({
type: DELETING_FILE_MESSAGE,
payload: {
key: secondShard.uuid,
hash: secondShard.uuid,
url: `http://${secondMirror.contact.address}:${secondMirror.contact.port}/v2/shards/${secondShard.uuid}`
}
});
expect(deleteMirrorsByIds.calledOnce).toBeTruthy();
expect(deleteMirrorsByIds.firstCall.args).toStrictEqual([mirrors.map(m => m.id)]);
});

it('When mirrors do not exist, then uses contracts as fallback to delete shards', async () => {
const shardsToDelete = [fixtures.getShard(), fixtures.getShard()];
const [firstShard, secondShard] = shardsToDelete;
const contacts = shardsToDelete.map(s => fixtures.getContact({ id: s.contracts[0].nodeID }))
const [firstContact, secondContact] = contacts;
const mirrors: MirrorWithContact[] = [];

const findByShardHashes = stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors);
const findContactsByIds = stub(contactsRepository, 'findByIds').resolves(contacts);
const enqueueMessage = stub(queue, 'enqueueMessage').resolves();
const deleteMirrorsByIds = stub(mirrorsRepository, 'deleteByIds').resolves();

await usecase.deleteShardsStorageByUuids(
shardsToDelete as (Shard & { uuid: string })[]
);

expect(findByShardHashes.calledOnce).toBeTruthy();
expect(findByShardHashes.firstCall.args).toStrictEqual([shardsToDelete.map(s => s.hash)]);
expect(findContactsByIds.calledOnce).toBeTruthy();
expect(findContactsByIds.firstCall.args).toStrictEqual([
shardsToDelete.flatMap(s => s.contracts.flatMap(c => c.nodeID))
]);
expect(enqueueMessage.callCount).toEqual(
shardsToDelete.reduce((a, s) => a + s.contracts.length, 0)
);
expect(enqueueMessage.firstCall.args[0]).toEqual({
type: DELETING_FILE_MESSAGE,
payload: {
key: firstShard.uuid,
hash: firstShard.uuid,
url: `http://${firstContact.address}:${firstContact.port}/v2/shards/${firstShard.uuid}`
}
})
expect(enqueueMessage.secondCall.args[0]).toEqual({
type: DELETING_FILE_MESSAGE,
payload: {
key: secondShard.uuid,
hash: secondShard.uuid,
url: `http://${secondContact.address}:${secondContact.port}/v2/shards/${secondShard.uuid}`
}
});
expect(deleteMirrorsByIds.notCalled).toBeTruthy();
});
});
});

0 comments on commit 344ed90

Please sign in to comment.