Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-2675 Change block allocation strategy #72

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public void addBlock(ContiguousIdBlock block) {
availableRanges.add(new MonotonicRange(block.getLastCommitted() + 1, block.getLastValue()));
}

/**
* Add a newly created block (all ids are available)
*/
public void addNewBlock(ContiguousIdBlock block) {
assignedBlocks.add(block);
availableRanges.add(new MonotonicRange(block.getFirstValue(), block.getLastValue()));
}

public MonotonicRangePriorityQueue getAvailableRanges() {
return availableRanges;
}
Expand Down Expand Up @@ -111,28 +119,59 @@ private void assertAccessionsArePending(long[] accessions) throws AccessionIsNot
}

private Set<ContiguousIdBlock> doCommit(long[] accessions) {
Set<ContiguousIdBlock> blocksToUpdate = new HashSet<>();
//Add all previously existing blocks to be updated in the contiguous_id_blocks table in the DB
Set<ContiguousIdBlock> blocksToUpdate = assignedBlocks.stream()
.filter(block -> !isNewBlock(block))
.collect(Collectors.toSet());

if (accessions == null || accessions.length == 0) {
return blocksToUpdate;
}

addToCommitted(accessions);

if (assignedBlocks.size() == 0) {
return blocksToUpdate;
}

ContiguousIdBlock block = assignedBlocks.peek();
while (block != null && committedAccessions.peek() != null &&
committedAccessions.peek() == block.getLastCommitted() + 1) {
long lastCommitted = calculateLastCommitted(block);
while (block != null && committedAccessions.peek() != null && committedAccessions.peek() == lastCommitted + 1) {
//Next value continues sequence, change last committed value
block.setLastCommitted(committedAccessions.poll());
lastCommitted = block.getLastCommitted();
blocksToUpdate.add(block);
if (!block.isNotFull()) {
if (isBlockFull(block)) {
assignedBlocks.poll();
block = assignedBlocks.peek();
if (block != null) {
lastCommitted = calculateLastCommitted(block);
}
}
}

return blocksToUpdate;
}

/**
* Existing blocks have the actual last_committed accession in the block manager but not in the db table
* New blocks are all marked as used in both the block manager and db table
*/
private long calculateLastCommitted(ContiguousIdBlock block) {
return (isNewBlock(block)) ? (block.getFirstValue() - 1) : block.getLastCommitted();
}

/**
* New block have the same last_committed and last_value in the block manager
*/
private boolean isNewBlock(ContiguousIdBlock block) {
return block.getLastCommitted() == block.getLastValue();
}

private boolean isBlockFull(ContiguousIdBlock block) {
return !block.isNotFull();
}

private void addToCommitted(long[] accessions) {
for (long accession : accessions) {
committedAccessions.add(accession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.MonotonicDatabaseService;
import uk.ac.ebi.ampt2d.commons.accession.utils.ExponentialBackOff;
import uk.ac.ebi.ampt2d.commons.accession.utils.exceptions.ExponentialBackOffMaxRetriesRuntimeException;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -85,6 +84,7 @@ private static BlockManager initializeBlockManager(ContiguousIdBlockService bloc
//Insert as available ranges
for (ContiguousIdBlock block : uncompletedBlocks) {
blockManager.addBlock(block);
blockService.markBlockAsUsed(block);
}
return blockManager;
}
Expand Down Expand Up @@ -135,7 +135,7 @@ private synchronized void reserveNewBlocksUntilSizeIs(int totalAccessionsToGener
}

private synchronized void reserveNewBlock(String categoryId, String instanceId) {
blockManager.addBlock(blockService.reserveNewBlock(categoryId, instanceId));
blockManager.addNewBlock(blockService.reserveNewBlock(categoryId, instanceId));
}

public synchronized void commit(long... accessions) throws AccessionIsNotPendingException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public ContiguousIdBlock(String categoryId, String applicationInstanceId, long f
this.applicationInstanceId = applicationInstanceId;
this.firstValue = firstValue;
this.lastValue = firstValue + size - 1;
this.lastCommitted = firstValue - 1;
this.lastCommitted = firstValue + size - 1;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ public ContiguousIdBlock reserveNewBlock(String categoryId, String instanceId) {
return reservedBlock;
}

/**
* Mark the block as completely used in the contiguous_id_blocks tables but in the block manager keep last_committed
* value before the beginning of the block
*/
@Transactional(isolation = Isolation.SERIALIZABLE)
public void markBlockAsUsed(ContiguousIdBlock block) {
long lastCommitted = block.getLastCommitted();
block.setLastCommitted(block.getLastValue());
repository.save(block);
block.setLastCommitted(lastCommitted);
}

public BlockParameters getBlockParameters(String categoryId) {
return categoryBlockInitializations.get(categoryId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void testAlternateRangesWithDifferentGenerators() throws AccessionCouldNo
assertEquals(0, evaAccessions.get(0).getAccession().longValue());
assertEquals(8, evaAccessions.get(8).getAccession().longValue());
//BlockSize of 10 was reserved but only 9 elements have been accessioned
//All accessions were marked as used
assertEquals(1, contiguousIdBlockService
.getUncompletedBlocksByCategoryIdAndApplicationInstanceIdOrderByEndAsc(categoryId, INSTANCE_ID)
.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionIsNotPendingException;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;

import java.util.Arrays;
import java.util.Set;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -41,7 +44,7 @@ public void noAvailableAccessionsIfNoBlocks() {
@Test
public void availableAccessionWhenBlockHashBeenAdded() {
BlockManager manager = new BlockManager();
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
assertTrue(manager.hasAvailableAccessions(10));
assertFalse(manager.hasAvailableAccessions(101));
}
Expand All @@ -67,7 +70,7 @@ public void pollNextWhenNoValues() throws AccessionCouldNotBeGeneratedException
@Test
public void generateAccessions() throws AccessionCouldNotBeGeneratedException {
BlockManager manager = new BlockManager();
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
long[] accessions = manager.pollNext(10);
assertEquals(10, accessions.length);
assertArrayEquals(new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, accessions);
Expand All @@ -76,7 +79,7 @@ public void generateAccessions() throws AccessionCouldNotBeGeneratedException {
@Test
public void generateAccessionsAndRelease() throws AccessionCouldNotBeGeneratedException {
BlockManager manager = new BlockManager();
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
long[] accessions = manager.pollNext(10);
assertEquals(10, accessions.length);
assertArrayEquals(new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, accessions);
Expand All @@ -89,7 +92,7 @@ public void generateAccessionsAndRelease() throws AccessionCouldNotBeGeneratedEx
@Test
public void generateAccessionsAndReleaseSome() throws AccessionCouldNotBeGeneratedException {
BlockManager manager = new BlockManager();
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
long[] accessions = manager.pollNext(10);
assertEquals(10, accessions.length);
assertArrayEquals(new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, accessions);
Expand All @@ -105,7 +108,7 @@ public void generateAccessionsAndReleaseSome() throws AccessionCouldNotBeGenerat
@Test
public void generateAccessionsAndConfirmSome() throws AccessionCouldNotBeGeneratedException {
BlockManager manager = new BlockManager();
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
long[] accessions = manager.pollNext(10);
assertEquals(10, accessions.length);
assertArrayEquals(new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, accessions);
Expand All @@ -118,7 +121,7 @@ public void generateAccessionsAndConfirmSome() throws AccessionCouldNotBeGenerat
@Test
public void recoverState() throws AccessionCouldNotBeGeneratedException {
BlockManager manager = new BlockManager();
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100));
manager.recoverState(new long[]{0, 1, 2, 6, 7, 8, 9});
long[] accessions = manager.pollNext(10);
assertEquals(3, accessions.length);
Expand All @@ -131,8 +134,8 @@ public void recoverState() throws AccessionCouldNotBeGeneratedException {
@Test
public void multipleContinuousBlocks() throws AccessionCouldNotBeGeneratedException {
BlockManager manager = new BlockManager();
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 10));
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 10, 10));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 10));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 10, 10));
manager.recoverState(new long[]{0, 1, 2, 6, 7, 8, 9});
long[] accessions = manager.pollNext(10);
assertEquals(3, accessions.length);
Expand All @@ -145,12 +148,39 @@ public void multipleContinuousBlocks() throws AccessionCouldNotBeGeneratedExcept
@Test
public void commitAllValuesOnBlockManager() throws AccessionCouldNotBeGeneratedException {
BlockManager manager = new BlockManager();
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 10));
manager.addBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 10, 10));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 10));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 10, 10));
long[] accessions1 = manager.pollNext(10);
long[] accessions2 = manager.pollNext(10);
manager.commit(accessions1);
manager.commit(accessions2);
}

@Test
public void commitMoreAccessionsThanMaxPerBlock() throws AccessionCouldNotBeGeneratedException {
BlockManager manager = new BlockManager();
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 10));
manager.addNewBlock(new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 10, 10));
long[] accessions1 = manager.pollNext(10);
long[] accessions2 = manager.pollNext(2);
long[] all = Arrays.copyOf(accessions1, accessions1.length + accessions2.length);
System.arraycopy(accessions2, 0, all, accessions1.length, accessions2.length);

Set<ContiguousIdBlock> blocksToUpdate = manager.commit(all);

//Entire first block should be marked as used
//Second should only mark 2 accessions as used (accession 10 and 11)
assertEquals(2, blocksToUpdate.size());
for (ContiguousIdBlock currentBlock : blocksToUpdate) {
switch ((int) currentBlock.getFirstValue()) {
case 0:
assertEquals(9, currentBlock.getLastCommitted());
break;
case 10:
assertEquals(11, currentBlock.getLastCommitted());
break;
default:
}
}
}
}
Loading