From c6315523e16d2d060211fcb10399fa413beb362a Mon Sep 17 00:00:00 2001 From: nkumar2 Date: Fri, 2 Feb 2024 11:24:42 +0000 Subject: [PATCH] fix transactional issue --- .../datasource/ENAAssemblyDataSource.java | 55 +++++--- .../datasource/NCBIAssemblyDataSource.java | 64 ++++++++++ .../dus/ENAAssemblyReportReader.java | 6 + .../dus/NCBIAssemblyReportReader.java | 6 + .../contigalias/service/AssemblyService.java | 117 +----------------- 5 files changed, 117 insertions(+), 131 deletions(-) diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSource.java b/src/main/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSource.java index eafa94a6..b4c01557 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSource.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/datasource/ENAAssemblyDataSource.java @@ -33,7 +33,9 @@ import uk.ac.ebi.eva.contigalias.entities.SequenceEntity; import uk.ac.ebi.eva.contigalias.exception.DownloadFailedException; +import java.io.BufferedReader; import java.io.FileInputStream; +import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; @@ -41,11 +43,11 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; @Repository("ENADataSource") public class ENAAssemblyDataSource implements AssemblyDataSource { @@ -136,10 +138,10 @@ public Optional downloadAssemblyReport(ENABrowser enaBrowser, String acces } } - public List getChromosomeEntityList(AssemblyEntity assemblyEntity, List chrDataList) { + public List getChromosomeEntityList(List chrDataList) { List chromosomeEntityList = new ArrayList<>(); for (String chrData : chrDataList) { - ChromosomeEntity chromosomeEntity = getChromosomeEntity(assemblyEntity, chrData); + ChromosomeEntity chromosomeEntity = getChromosomeEntity(chrData); if (chromosomeEntity != null) { chromosomeEntityList.add(chromosomeEntity); } @@ -147,12 +149,8 @@ public List getChromosomeEntityList(AssemblyEntity assemblyEnt return chromosomeEntityList; } - public ChromosomeEntity getChromosomeEntity(AssemblyEntity assemblyEntity, String chrLine) { - ChromosomeEntity chromosomeEntity = ENAAssemblyReportReader.getChromosomeEntity(chrLine); - if (chromosomeEntity != null) { - chromosomeEntity.setAssembly(assemblyEntity); - } - return chromosomeEntity; + public ChromosomeEntity getChromosomeEntity(String chrLine) { + return ENAAssemblyReportReader.getChromosomeEntity(chrLine); } /** @@ -184,16 +182,41 @@ public boolean hasAllEnaSequenceNames(AssemblyEntity assembly) { public void addENASequenceNames( List sourceSequences, List targetSequences) { - Map insdcToSequenceEntity = new HashMap<>(); - for (SequenceEntity targetSeq : targetSequences) { - insdcToSequenceEntity.put(targetSeq.getInsdcAccession(), targetSeq); + if (targetSequences == null || sourceSequences == null || targetSequences.isEmpty() || sourceSequences.isEmpty()) { + return; } + Map insdcToSequenceEntityMap = targetSequences.stream() + .collect(Collectors.toMap(s->s.getInsdcAccession(), s->s)); + for (SequenceEntity sourceSeq : sourceSequences) { String sourceInsdcAccession = sourceSeq.getInsdcAccession(); - if (insdcToSequenceEntity.containsKey(sourceInsdcAccession)) { - insdcToSequenceEntity.get(sourceInsdcAccession).setEnaSequenceName(sourceSeq.getEnaSequenceName()); - } else { - insdcToSequenceEntity.put(sourceInsdcAccession, sourceSeq); + if (insdcToSequenceEntityMap.containsKey(sourceInsdcAccession)) { + insdcToSequenceEntityMap.get(sourceInsdcAccession).setEnaSequenceName(sourceSeq.getEnaSequenceName()); + } + } + } + + public void addENASequenceNameToChromosomes(List ncbiChromosomeList, + Path downloadedENAFilePath, final int BATCH_SIZE) throws IOException { + try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedENAFilePath.toFile()))) { + List chrLines = new ArrayList<>(); + List enaChromosomeList; + String line; + while ((line = bufferedReader.readLine()) != null) { + if (line.startsWith("accession")) { + continue; + } + chrLines.add(line); + if (chrLines.size() == BATCH_SIZE) { + enaChromosomeList = getChromosomeEntityList(chrLines); + addENASequenceNames(enaChromosomeList, ncbiChromosomeList); + + chrLines = new ArrayList<>(); + } + } + if (!chrLines.isEmpty()) { + enaChromosomeList = getChromosomeEntityList(chrLines); + addENASequenceNames(enaChromosomeList, ncbiChromosomeList); } } } diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/datasource/NCBIAssemblyDataSource.java b/src/main/java/uk/ac/ebi/eva/contigalias/datasource/NCBIAssemblyDataSource.java index 9a42031f..f798dd66 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/datasource/NCBIAssemblyDataSource.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/datasource/NCBIAssemblyDataSource.java @@ -30,8 +30,14 @@ import uk.ac.ebi.eva.contigalias.dus.NCBIBrowserFactory; import uk.ac.ebi.eva.contigalias.entities.AssemblyEntity; import uk.ac.ebi.eva.contigalias.entities.ChromosomeEntity; +import uk.ac.ebi.eva.contigalias.exception.AssemblyNotFoundException; +import uk.ac.ebi.eva.contigalias.repo.AssemblyRepository; +import uk.ac.ebi.eva.contigalias.repo.ChromosomeRepository; +import javax.transaction.Transactional; +import java.io.BufferedReader; import java.io.FileInputStream; +import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; @@ -47,6 +53,8 @@ public class NCBIAssemblyDataSource implements AssemblyDataSource { private final Logger logger = LoggerFactory.getLogger(NCBIAssemblyDataSource.class); + private final int BATCH_SIZE = 100000; + private final NCBIBrowserFactory factory; private final NCBIAssemblyReportReaderFactory readerFactory; @@ -157,4 +165,60 @@ public Optional downloadAssemblyReport(String accession, NCBIBrowser ncbiB } } + @Transactional + public void parseFileAndInsertAssembly(String accession, ENAAssemblyDataSource enaDataSource, + AssemblyRepository assemblyRepository, ChromosomeRepository chromosomeRepository) throws IOException { + Optional downloadNCBIFilePathOpt = downloadAssemblyReport(accession); + Path downloadedNCBIFilePath = downloadNCBIFilePathOpt.orElseThrow(() -> new AssemblyNotFoundException(accession)); + Optional downloadENAFilePathOpt = enaDataSource.downloadAssemblyReport(accession); + Path downloadedENAFilePath = downloadENAFilePathOpt.orElse(null); + + long numberOfChromosomesInFile = Files.lines(downloadedNCBIFilePath).filter(line -> !line.startsWith("#")).count(); + logger.info("Number of chromosomes in assembly (" + accession + "): " + numberOfChromosomesInFile); + + AssemblyEntity assemblyEntity = getAssemblyEntity(downloadedNCBIFilePath); + assemblyRepository.save(assemblyEntity); + + try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedNCBIFilePath.toFile()))) { + long chromosomesSavedTillNow = 0l; + List chrLines = new ArrayList<>(); + String line; + while ((line = bufferedReader.readLine()) != null) { + if (line.startsWith("#")) { + continue; + } + chrLines.add(line); + if (chrLines.size() == BATCH_SIZE) { + // add ena sequence name and save + addENASequenceNameAndSave(assemblyEntity, chrLines, enaDataSource, downloadedENAFilePath, chromosomeRepository); + chromosomesSavedTillNow += chrLines.size(); + logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow); + + chrLines = new ArrayList<>(); + } + } + if (!chrLines.isEmpty()) { + // add ena sequence name and save + addENASequenceNameAndSave(assemblyEntity, chrLines, enaDataSource, downloadedENAFilePath, chromosomeRepository); + chromosomesSavedTillNow += chrLines.size(); + logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow); + } + } + + // delete the files after assembly insertion + Files.deleteIfExists(downloadedNCBIFilePath); + if (downloadedENAFilePath != null) { + Files.deleteIfExists(downloadedENAFilePath); + } + } + + public void addENASequenceNameAndSave(AssemblyEntity assemblyEntity, List chrLines, + ENAAssemblyDataSource enaDataSource, Path downloadedENAFilePath, + ChromosomeRepository chromosomeRepository) throws IOException { + List chromosomeEntityList = getChromosomeEntityList(assemblyEntity, chrLines); + if (downloadedENAFilePath != null) { + enaDataSource.addENASequenceNameToChromosomes(chromosomeEntityList, downloadedENAFilePath, BATCH_SIZE); + } + chromosomeRepository.saveAll(chromosomeEntityList); + } } diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/dus/ENAAssemblyReportReader.java b/src/main/java/uk/ac/ebi/eva/contigalias/dus/ENAAssemblyReportReader.java index 29fb45e1..3e857885 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/dus/ENAAssemblyReportReader.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/dus/ENAAssemblyReportReader.java @@ -64,6 +64,9 @@ protected void parseAssemblyData(String line) { protected void parseChromosomeLine(String[] columns) { ChromosomeEntity chromosomeEntity = getChromosome(columns); + if (chromosomeEntity == null) { + return; + } if (assemblyEntity == null) { assemblyEntity = new AssemblyEntity(); @@ -81,6 +84,9 @@ protected void parseChromosomeLine(String[] columns) { protected void parseScaffoldLine(String[] columns) { ChromosomeEntity scaffoldEntity = getScaffold(columns); + if (scaffoldEntity == null) { + return; + } if (assemblyEntity == null) { assemblyEntity = new AssemblyEntity(); diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/dus/NCBIAssemblyReportReader.java b/src/main/java/uk/ac/ebi/eva/contigalias/dus/NCBIAssemblyReportReader.java index f2c0d502..28417ab7 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/dus/NCBIAssemblyReportReader.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/dus/NCBIAssemblyReportReader.java @@ -98,6 +98,9 @@ protected void parseAssemblyData(String line) { protected void parseChromosomeLine(String[] columns) { ChromosomeEntity chromosomeEntity = getChromosome(columns); + if (chromosomeEntity == null) { + return; + } if (assemblyEntity == null) { assemblyEntity = new AssemblyEntity(); @@ -114,6 +117,9 @@ protected void parseChromosomeLine(String[] columns) { protected void parseScaffoldLine(String[] columns) { ChromosomeEntity scaffoldEntity = getScaffold(columns); + if (scaffoldEntity == null) { + return; + } if (assemblyEntity == null) { assemblyEntity = new AssemblyEntity(); diff --git a/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java b/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java index cedad343..2365181b 100644 --- a/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java +++ b/src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java @@ -33,11 +33,7 @@ import uk.ac.ebi.eva.contigalias.scheduler.ChecksumSetter; import javax.transaction.Transactional; -import java.io.BufferedReader; -import java.io.FileReader; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -60,8 +56,6 @@ public class AssemblyService { private final ChecksumSetter checksumSetter; - private final int BATCH_SIZE = 100000; - private final Logger logger = LoggerFactory.getLogger(AssemblyService.class); @Autowired @@ -111,119 +105,12 @@ public void fetchAndInsertAssembly(String accession) throws IOException { throw duplicateAssemblyInsertionException(accession, entity.get()); } - Optional downloadNCBIFilePathOpt = ncbiDataSource.downloadAssemblyReport(accession); - Path downloadedNCBIFilePath = downloadNCBIFilePathOpt.orElseThrow(() -> new AssemblyNotFoundException(accession)); - Optional downloadENAFilePathOpt = enaDataSource.downloadAssemblyReport(accession); - Path downloadedENAFilePath = downloadENAFilePathOpt.orElse(null); - - long numberOfChromosomesInFile = Files.lines(downloadedNCBIFilePath).filter(line -> !line.startsWith("#")).count(); - logger.info("Number of chromosomes in assembly (" + accession + "): " + numberOfChromosomesInFile); - - // parse file and save data - parseFileAndInsertAssembly(downloadedNCBIFilePath, downloadedENAFilePath); + // download file and save assembly and chromosome data + ncbiDataSource.parseFileAndInsertAssembly(accession, enaDataSource, assemblyRepository, chromosomeRepository); logger.info("Successfully inserted assembly for accession " + accession); // submit job for retrieving and updating MD5 Checksum for assembly (asynchronously) checksumSetter.updateMd5CheckSumForAssemblyAsync(accession); - - Files.deleteIfExists(downloadedNCBIFilePath); - if (downloadedENAFilePath != null) { - Files.deleteIfExists(downloadedENAFilePath); - } - } - - @Transactional - public void parseFileAndInsertAssembly(Path downloadedNCBIFilePath, Path downloadedENAFilePath) throws IOException { - AssemblyEntity assemblyEntity = ncbiDataSource.getAssemblyEntity(downloadedNCBIFilePath); - assemblyRepository.save(assemblyEntity); - - try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedNCBIFilePath.toFile()))) { - List chrLines = new ArrayList<>(); - String line; - long chromosomesSavedTillNow = 0; - while ((line = bufferedReader.readLine()) != null) { - if (line.startsWith("#")) { - continue; - } - chrLines.add(line); - if (chrLines.size() == BATCH_SIZE) { - List chromosomeEntityList = ncbiDataSource.getChromosomeEntityList(assemblyEntity, chrLines); - if (downloadedENAFilePath != null) { - addENASequenceNameToChromosomes(assemblyEntity, chromosomeEntityList, downloadedENAFilePath); - } - chromosomeRepository.saveAll(chromosomeEntityList); - chromosomesSavedTillNow += chromosomeEntityList.size(); - logger.info("Number of total chromosomes saved till now : " + chromosomesSavedTillNow); - - chrLines = new ArrayList<>(); - } - } - - if (!chrLines.isEmpty()) { - List chromosomeEntityList = ncbiDataSource.getChromosomeEntityList(assemblyEntity, chrLines); - if (downloadedENAFilePath != null) { - addENASequenceNameToChromosomes(assemblyEntity, chromosomeEntityList, downloadedENAFilePath); - } - chromosomeRepository.saveAll(chromosomeEntityList); - chromosomesSavedTillNow += chromosomeEntityList.size(); - logger.info("Number of total chromosomes saved till now : " + chromosomesSavedTillNow); - } - } - } - - public void addENASequenceNameToChromosomes(AssemblyEntity assemblyEntity, List ncbiChromosomeList, - Path downloadedENAFilePath) throws IOException { - try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedENAFilePath.toFile()))) { - List chrLines = new ArrayList<>(); - String line; - while ((line = bufferedReader.readLine()) != null) { - if (line.startsWith("accession")) { - continue; - } - chrLines.add(line); - if (chrLines.size() == BATCH_SIZE) { - List enaChromosomeList = enaDataSource.getChromosomeEntityList(assemblyEntity, chrLines); - enaDataSource.addENASequenceNames( - !enaChromosomeList.isEmpty() ? enaChromosomeList : Collections.emptyList(), - !ncbiChromosomeList.isEmpty() ? ncbiChromosomeList : Collections.emptyList() - ); - - chrLines = new ArrayList<>(); - } - } - if (!chrLines.isEmpty()) { - List enaChromosomeList = enaDataSource.getChromosomeEntityList(assemblyEntity, chrLines); - enaDataSource.addENASequenceNames( - !enaChromosomeList.isEmpty() ? enaChromosomeList : Collections.emptyList(), - !ncbiChromosomeList.isEmpty() ? ncbiChromosomeList : Collections.emptyList() - ); - } - } - } - - public void fetchAndInsertAssemblyOld(String accession) throws IOException { - Optional entity = assemblyRepository.findAssemblyEntityByAccession(accession); - if (entity.isPresent()) { - throw duplicateAssemblyInsertionException(accession, entity.get()); - } - Optional fetchAssembly = ncbiDataSource.getAssemblyByAccession(accession); - if (!fetchAssembly.isPresent()) { - throw new AssemblyNotFoundException(accession); - } - if (fetchAssembly.isPresent()) { - AssemblyEntity assemblyEntity = fetchAssembly.get(); - enaDataSource.addENASequenceNamesToAssembly(assemblyEntity); - if (assemblyEntity.getChromosomes() != null && assemblyEntity.getChromosomes().size() > 0) { - insertAssembly(assemblyEntity); - logger.info("Successfully inserted assembly for accession " + accession); - // submit job for retrieving and updating MD5 Checksum for assembly (asynchronously) - checksumSetter.updateMd5CheckSumForAssemblyAsync(accession); - } else { - logger.error("Skipping inserting assembly : No chromosome in assembly " + accession); - } - } else { - logger.error("Could not get assembly from NCBI"); - } } public void retrieveAndInsertMd5ChecksumForAssembly(String assembly) {