diff --git a/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/MonotonicAccessionRecoveryAgent.java b/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/MonotonicAccessionRecoveryAgent.java new file mode 100644 index 00000000..b01300fe --- /dev/null +++ b/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/MonotonicAccessionRecoveryAgent.java @@ -0,0 +1,69 @@ +package uk.ac.ebi.ampt2d.commons.accession.generators.monotonic; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock; +import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService; +import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.MonotonicDatabaseService; + +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class MonotonicAccessionRecoveryAgent { + private final static Logger logger = LoggerFactory.getLogger(MonotonicAccessionRecoveryAgent.class); + + private final ContiguousIdBlockService blockService; + private final MonotonicDatabaseService monotonicDatabaseService; + + public MonotonicAccessionRecoveryAgent(ContiguousIdBlockService blockService, + MonotonicDatabaseService monotonicDatabaseService) { + this.blockService = blockService; + this.monotonicDatabaseService = monotonicDatabaseService; + } + + public void runRecovery(String categoryId, String applicationInstanceId, LocalDateTime lastUpdatedTime) { + logger.info("Starting recovering of blocks for category " + categoryId); + List blocksToRecover = blockService.allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(categoryId, lastUpdatedTime); + logger.info("List of block ids to recover : " + blocksToRecover.stream().map(b -> Long.toString(b.getId())) + .collect(Collectors.joining(","))); + for (ContiguousIdBlock block : blocksToRecover) { + logger.info("Recovering Block: " + block); + if (block.getLastCommitted() == block.getLastValue()) { + logger.info("Block is already completely used, not need to run recovery. Releasing the block."); + setAppInstanceIdAndReleaseBlock(applicationInstanceId, block); + continue; + } + + // run recover state for a block using BlockManager's recover state method + Set blockSet = recoverStateForBlock(block); + + if (blockSet.isEmpty()) { + // if block's last committed is correctly set, BlockManager's recover method will return an empty set + logger.info("Block's last committed is correct. No updates to last_committed. Releasing the block."); + setAppInstanceIdAndReleaseBlock(applicationInstanceId, block); + } else { + ContiguousIdBlock blockToUpdate = blockSet.iterator().next(); + logger.info("Recovery ran successfully for block. Last committed updated to " + block.getLastCommitted() + + ". Saving and releasing the block."); + setAppInstanceIdAndReleaseBlock(applicationInstanceId, blockToUpdate); + } + } + } + + private Set recoverStateForBlock(ContiguousIdBlock block) { + BlockManager blockManager = new BlockManager(); + blockManager.addBlock(block); + MonotonicRange monotonicRange = blockManager.getAvailableRanges().poll(); + long[] committedElements = monotonicDatabaseService.getAccessionsInRanges(Collections.singletonList(monotonicRange)); + return blockManager.recoverState(committedElements); + } + + private void setAppInstanceIdAndReleaseBlock(String applicationInstanceId, ContiguousIdBlock block) { + block.setApplicationInstanceId(applicationInstanceId); + block.releaseReserved(); + blockService.save(block); + } +} diff --git a/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/entities/ContiguousIdBlock.java b/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/entities/ContiguousIdBlock.java index c8007d95..5df32c2e 100644 --- a/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/entities/ContiguousIdBlock.java +++ b/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/entities/ContiguousIdBlock.java @@ -205,4 +205,18 @@ public int compareTo(ContiguousIdBlock contiguousIdBlock) { protected void onUpdate() { this.lastUpdatedTimestamp = LocalDateTime.now(); } + + @Override + public String toString() { + return "ContiguousIdBlock{" + + "id=" + id + + ", categoryId='" + categoryId + '\'' + + ", applicationInstanceId='" + applicationInstanceId + '\'' + + ", firstValue=" + firstValue + + ", lastValue=" + lastValue + + ", lastCommitted=" + lastCommitted + + ", reserved=" + reserved + + ", lastUpdatedTimestamp=" + lastUpdatedTimestamp + + '}'; + } } diff --git a/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/repositories/ContiguousIdBlockRepository.java b/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/repositories/ContiguousIdBlockRepository.java index b21b15bc..80c4e59d 100644 --- a/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/repositories/ContiguousIdBlockRepository.java +++ b/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/repositories/ContiguousIdBlockRepository.java @@ -23,6 +23,7 @@ import org.springframework.stereotype.Repository; import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock; +import java.time.LocalDateTime; import java.util.List; @Repository @@ -31,4 +32,7 @@ public interface ContiguousIdBlockRepository extends CrudRepository findUncompletedAndUnreservedBlocksOrderByLastValueAsc(@Param("categoryId") String categoryId); ContiguousIdBlock findFirstByCategoryIdOrderByLastValueDesc(String categoryId); + + List findByCategoryIdAndReservedIsTrueAndLastUpdatedTimestampLessThanEqualOrderByLastValueAsc( + String categoryId, LocalDateTime lastUpdatedTime); } diff --git a/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/service/ContiguousIdBlockService.java b/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/service/ContiguousIdBlockService.java index 2b36104a..5f86d4e0 100644 --- a/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/service/ContiguousIdBlockService.java +++ b/accession-commons-monotonic-generator-jpa/src/main/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/service/ContiguousIdBlockService.java @@ -25,6 +25,7 @@ import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; @@ -76,6 +77,16 @@ public void save(Iterable blocks) { entityManager.flush(); } + @Transactional + public void save(ContiguousIdBlock block) { + // release block if full + if (block.isFull()) { + block.releaseReserved(); + } + repository.save(block); + entityManager.flush(); + } + @Transactional(isolation = Isolation.SERIALIZABLE) public ContiguousIdBlock reserveNewBlock(String categoryId, String instanceId) { ContiguousIdBlock lastBlock = repository.findFirstByCategoryIdOrderByLastValueDesc(categoryId); @@ -110,4 +121,9 @@ public List reserveUncompletedBlocksForCategoryIdAndApplicati return blockList; } + public List allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(String categoryId, + LocalDateTime lastUpdatedTimeStamp) { + return repository.findByCategoryIdAndReservedIsTrueAndLastUpdatedTimestampLessThanEqualOrderByLastValueAsc(categoryId, lastUpdatedTimeStamp); + } + } diff --git a/accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/MonotonicAccessionRecoveryAgentTest.java b/accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/MonotonicAccessionRecoveryAgentTest.java new file mode 100644 index 00000000..7b3fff98 --- /dev/null +++ b/accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/generators/monotonic/MonotonicAccessionRecoveryAgentTest.java @@ -0,0 +1,102 @@ +package uk.ac.ebi.ampt2d.commons.accession.generators.monotonic; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import uk.ac.ebi.ampt2d.commons.accession.core.models.AccessionWrapper; +import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock; +import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.repositories.ContiguousIdBlockRepository; +import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService; +import uk.ac.ebi.ampt2d.commons.accession.service.BasicSpringDataRepositoryMonotonicDatabaseService; +import uk.ac.ebi.ampt2d.test.configuration.MonotonicAccessionGeneratorTestConfiguration; +import uk.ac.ebi.ampt2d.test.configuration.TestMonotonicDatabaseServiceTestConfiguration; +import uk.ac.ebi.ampt2d.test.models.TestModel; +import uk.ac.ebi.ampt2d.test.persistence.TestMonotonicEntity; + +import java.time.LocalDateTime; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import java.util.stream.StreamSupport; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(SpringRunner.class) +@DataJpaTest +@ContextConfiguration(classes = {MonotonicAccessionGeneratorTestConfiguration.class, TestMonotonicDatabaseServiceTestConfiguration.class}) +public class MonotonicAccessionRecoveryAgentTest { + private static final String TEST_CATEGORY = "TEST_CATEGORY"; + private static final String TEST_APP_INSTANCE_ID = "TEST_APP_INSTANCE_ID"; + private static final String TEST_RECOVERY_AGENT_APP_INSTANCE_ID = "TEST_RECOVERY_AGENT_APP_INSTANCE_ID"; + + @Autowired + private BasicSpringDataRepositoryMonotonicDatabaseService monotonicDBService; + @Autowired + private ContiguousIdBlockRepository repository; + @Autowired + private ContiguousIdBlockService service; + + @Test + public void testRunRecovery() throws InterruptedException { + // block1 does not have any accessions used + ContiguousIdBlock block1 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 0, 100); + repository.save(block1); + + // block2 is full and has all accessions used + ContiguousIdBlock block2 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 100, 100); + block2.setLastCommitted(199); + repository.save(block2); + + // block3 has some of the accessions used but not captured in the block's table + ContiguousIdBlock block3 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 200, 100); + repository.save(block3); + // save some accessions in db that are not captured in block3 + List> accessionsSet = LongStream.range(200l, 225l) + .boxed() + .map(longAcc -> new AccessionWrapper<>(longAcc, "hash-1" + longAcc, TestModel.of("test-obj-1-" + longAcc))) + .collect(Collectors.toList()); + monotonicDBService.save(accessionsSet); + + // block4 should not be recovered as it is after the recover cut off time + Thread.sleep(2000); + ContiguousIdBlock block4 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 300, 100); + repository.save(block4); + + // run recovery through recovery agent + LocalDateTime recoverCutOffTime = block3.getLastUpdatedTimestamp(); + MonotonicAccessionRecoveryAgent recoveryAgent = new MonotonicAccessionRecoveryAgent(service, monotonicDBService); + recoveryAgent.runRecovery(TEST_CATEGORY, TEST_RECOVERY_AGENT_APP_INSTANCE_ID, recoverCutOffTime); + + + List blockList = StreamSupport.stream(repository.findAll().spliterator(), false) + .sorted(Comparator.comparing(ContiguousIdBlock::getFirstValue)) + .collect(Collectors.toList()); + assertEquals(4, blockList.size()); + + assertEquals(TEST_RECOVERY_AGENT_APP_INSTANCE_ID, blockList.get(0).getApplicationInstanceId()); + assertEquals(0, blockList.get(0).getFirstValue()); + assertEquals(-1, blockList.get(0).getLastCommitted()); + assertTrue(blockList.get(0).isNotReserved()); + + assertEquals(TEST_RECOVERY_AGENT_APP_INSTANCE_ID, blockList.get(1).getApplicationInstanceId()); + assertEquals(100, blockList.get(1).getFirstValue()); + assertEquals(199, blockList.get(1).getLastCommitted()); + assertTrue(blockList.get(1).isNotReserved()); + + assertEquals(TEST_RECOVERY_AGENT_APP_INSTANCE_ID, blockList.get(2).getApplicationInstanceId()); + assertEquals(200, blockList.get(2).getFirstValue()); + assertEquals(224, blockList.get(2).getLastCommitted()); + assertTrue(blockList.get(2).isNotReserved()); + + assertEquals(TEST_APP_INSTANCE_ID, blockList.get(3).getApplicationInstanceId()); + assertEquals(300, blockList.get(3).getFirstValue()); + assertEquals(299, blockList.get(3).getLastCommitted()); + assertTrue(blockList.get(3).isReserved()); + } + +} diff --git a/accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/service/ContiguousIdBlockServiceTest.java b/accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/service/ContiguousIdBlockServiceTest.java index 8a873a86..af65c4a0 100644 --- a/accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/service/ContiguousIdBlockServiceTest.java +++ b/accession-commons-monotonic-generator-jpa/src/test/java/uk/ac/ebi/ampt2d/commons/accession/persistence/jpa/monotonic/service/ContiguousIdBlockServiceTest.java @@ -34,6 +34,8 @@ import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; @@ -275,8 +277,10 @@ public void testLastUpdateTimeStampAutoUpdate() { entityManager.flush(); // assert block values - ContiguousIdBlock blockInDB = repository.findById(1L).get(); - assertEquals(1L, blockInDB.getId()); + List blockInDBList = StreamSupport.stream(repository.findAll().spliterator(), false) + .collect(Collectors.toList()); + assertEquals(1, blockInDBList.size()); + ContiguousIdBlock blockInDB = blockInDBList.get(0); assertEquals(CATEGORY_ID, blockInDB.getCategoryId()); assertEquals(INSTANCE_ID, blockInDB.getApplicationInstanceId()); assertEquals(100, blockInDB.getFirstValue()); @@ -289,7 +293,7 @@ public void testLastUpdateTimeStampAutoUpdate() { block.setLastCommitted(100); repository.save(block); entityManager.flush(); - blockInDB = repository.findById(1L).get(); + blockInDB = repository.findAll().iterator().next(); assertEquals(100, blockInDB.getLastCommitted()); LocalDateTime blockLastCommittedUpdateTime = blockInDB.getLastUpdatedTimestamp(); @@ -298,7 +302,7 @@ public void testLastUpdateTimeStampAutoUpdate() { block.setApplicationInstanceId(INSTANCE_ID_2); repository.save(block); entityManager.flush(); - blockInDB = repository.findById(1L).get(); + blockInDB = repository.findAll().iterator().next(); assertEquals(INSTANCE_ID_2, blockInDB.getApplicationInstanceId()); LocalDateTime blockApplicationInstanceUpdateTime = blockInDB.getLastUpdatedTimestamp(); @@ -307,7 +311,7 @@ public void testLastUpdateTimeStampAutoUpdate() { block.releaseReserved(); repository.save(block); entityManager.flush(); - blockInDB = repository.findById(1L).get(); + blockInDB = repository.findAll().iterator().next(); assertTrue(blockInDB.isNotReserved()); LocalDateTime blockReleaseAsReservedUpdateTime = blockInDB.getLastUpdatedTimestamp(); @@ -316,7 +320,7 @@ public void testLastUpdateTimeStampAutoUpdate() { block.markAsReserved(); repository.save(block); entityManager.flush(); - blockInDB = repository.findById(1L).get(); + blockInDB = repository.findAll().iterator().next(); assertTrue(blockInDB.isReserved()); LocalDateTime blockMarkAsReservedUpdateTime = blockInDB.getLastUpdatedTimestamp(); @@ -327,4 +331,34 @@ public void testLastUpdateTimeStampAutoUpdate() { assertTrue(blockReleaseAsReservedUpdateTime.isBefore(blockMarkAsReservedUpdateTime)); } + @Test + public void testGetBlocksWithLastUpdatedTimeStampLessThan() throws InterruptedException { + // reserved + ContiguousIdBlock block1 = new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100); + // reserved + ContiguousIdBlock block2 = new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 100, 100); + // not reserved + ContiguousIdBlock block3 = getUnreservedContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 200, 100); + // reserved but different category + ContiguousIdBlock block4 = new ContiguousIdBlock(CATEGORY_ID_2, INSTANCE_ID, 300, 100); + // reserved but after timestamp + Thread.sleep(2000L); + ContiguousIdBlock block5 = new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 400, 100); + repository.save(block1); + repository.save(block2); + repository.save(block3); + repository.save(block4); + repository.save(block5); + entityManager.flush(); + + LocalDateTime cutOffTimestamp = block4.getLastUpdatedTimestamp(); + List blocksList = service.allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(CATEGORY_ID, cutOffTimestamp); + + assertEquals(2, blocksList.size()); + assertTrue(blocksList.get(0).isReserved()); + assertEquals(0, blocksList.get(0).getFirstValue()); + assertTrue(blocksList.get(1).isReserved()); + assertEquals(100, blocksList.get(1).getFirstValue()); + } + }