Skip to content

Commit

Permalink
fix transactional issue
Browse files Browse the repository at this point in the history
  • Loading branch information
nitin-ebi committed Feb 2, 2024
1 parent 2534ee2 commit c631552
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@
import uk.ac.ebi.eva.contigalias.entities.SequenceEntity;
import uk.ac.ebi.eva.contigalias.exception.DownloadFailedException;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

@Repository("ENADataSource")
public class ENAAssemblyDataSource implements AssemblyDataSource {
Expand Down Expand Up @@ -136,23 +138,19 @@ public Optional<Path> downloadAssemblyReport(ENABrowser enaBrowser, String acces
}
}

public List<ChromosomeEntity> getChromosomeEntityList(AssemblyEntity assemblyEntity, List<String> chrDataList) {
public List<ChromosomeEntity> getChromosomeEntityList(List<String> chrDataList) {
List<ChromosomeEntity> chromosomeEntityList = new ArrayList<>();
for (String chrData : chrDataList) {
ChromosomeEntity chromosomeEntity = getChromosomeEntity(assemblyEntity, chrData);
ChromosomeEntity chromosomeEntity = getChromosomeEntity(chrData);
if (chromosomeEntity != null) {
chromosomeEntityList.add(chromosomeEntity);
}
}
return chromosomeEntityList;
}

public ChromosomeEntity getChromosomeEntity(AssemblyEntity assemblyEntity, String chrLine) {
ChromosomeEntity chromosomeEntity = ENAAssemblyReportReader.getChromosomeEntity(chrLine);
if (chromosomeEntity != null) {
chromosomeEntity.setAssembly(assemblyEntity);
}
return chromosomeEntity;
public ChromosomeEntity getChromosomeEntity(String chrLine) {
return ENAAssemblyReportReader.getChromosomeEntity(chrLine);
}

/**
Expand Down Expand Up @@ -184,16 +182,41 @@ public boolean hasAllEnaSequenceNames(AssemblyEntity assembly) {

public void addENASequenceNames(
List<? extends SequenceEntity> sourceSequences, List<? extends SequenceEntity> targetSequences) {
Map<String, SequenceEntity> insdcToSequenceEntity = new HashMap<>();
for (SequenceEntity targetSeq : targetSequences) {
insdcToSequenceEntity.put(targetSeq.getInsdcAccession(), targetSeq);
if (targetSequences == null || sourceSequences == null || targetSequences.isEmpty() || sourceSequences.isEmpty()) {
return;
}
Map<String, SequenceEntity> insdcToSequenceEntityMap = targetSequences.stream()
.collect(Collectors.toMap(s->s.getInsdcAccession(), s->s));

for (SequenceEntity sourceSeq : sourceSequences) {
String sourceInsdcAccession = sourceSeq.getInsdcAccession();
if (insdcToSequenceEntity.containsKey(sourceInsdcAccession)) {
insdcToSequenceEntity.get(sourceInsdcAccession).setEnaSequenceName(sourceSeq.getEnaSequenceName());
} else {
insdcToSequenceEntity.put(sourceInsdcAccession, sourceSeq);
if (insdcToSequenceEntityMap.containsKey(sourceInsdcAccession)) {
insdcToSequenceEntityMap.get(sourceInsdcAccession).setEnaSequenceName(sourceSeq.getEnaSequenceName());
}
}
}

public void addENASequenceNameToChromosomes(List<ChromosomeEntity> ncbiChromosomeList,
Path downloadedENAFilePath, final int BATCH_SIZE) throws IOException {
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedENAFilePath.toFile()))) {
List<String> chrLines = new ArrayList<>();
List<ChromosomeEntity> enaChromosomeList;
String line;
while ((line = bufferedReader.readLine()) != null) {
if (line.startsWith("accession")) {
continue;
}
chrLines.add(line);
if (chrLines.size() == BATCH_SIZE) {
enaChromosomeList = getChromosomeEntityList(chrLines);
addENASequenceNames(enaChromosomeList, ncbiChromosomeList);

chrLines = new ArrayList<>();
}
}
if (!chrLines.isEmpty()) {
enaChromosomeList = getChromosomeEntityList(chrLines);
addENASequenceNames(enaChromosomeList, ncbiChromosomeList);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@
import uk.ac.ebi.eva.contigalias.dus.NCBIBrowserFactory;
import uk.ac.ebi.eva.contigalias.entities.AssemblyEntity;
import uk.ac.ebi.eva.contigalias.entities.ChromosomeEntity;
import uk.ac.ebi.eva.contigalias.exception.AssemblyNotFoundException;
import uk.ac.ebi.eva.contigalias.repo.AssemblyRepository;
import uk.ac.ebi.eva.contigalias.repo.ChromosomeRepository;

import javax.transaction.Transactional;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
Expand All @@ -47,6 +53,8 @@ public class NCBIAssemblyDataSource implements AssemblyDataSource {

private final Logger logger = LoggerFactory.getLogger(NCBIAssemblyDataSource.class);

private final int BATCH_SIZE = 100000;

private final NCBIBrowserFactory factory;

private final NCBIAssemblyReportReaderFactory readerFactory;
Expand Down Expand Up @@ -157,4 +165,60 @@ public Optional<Path> downloadAssemblyReport(String accession, NCBIBrowser ncbiB
}
}

@Transactional
public void parseFileAndInsertAssembly(String accession, ENAAssemblyDataSource enaDataSource,
AssemblyRepository assemblyRepository, ChromosomeRepository chromosomeRepository) throws IOException {
Optional<Path> downloadNCBIFilePathOpt = downloadAssemblyReport(accession);
Path downloadedNCBIFilePath = downloadNCBIFilePathOpt.orElseThrow(() -> new AssemblyNotFoundException(accession));
Optional<Path> downloadENAFilePathOpt = enaDataSource.downloadAssemblyReport(accession);
Path downloadedENAFilePath = downloadENAFilePathOpt.orElse(null);

long numberOfChromosomesInFile = Files.lines(downloadedNCBIFilePath).filter(line -> !line.startsWith("#")).count();
logger.info("Number of chromosomes in assembly (" + accession + "): " + numberOfChromosomesInFile);

AssemblyEntity assemblyEntity = getAssemblyEntity(downloadedNCBIFilePath);
assemblyRepository.save(assemblyEntity);

try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedNCBIFilePath.toFile()))) {
long chromosomesSavedTillNow = 0l;
List<String> chrLines = new ArrayList<>();
String line;
while ((line = bufferedReader.readLine()) != null) {
if (line.startsWith("#")) {
continue;
}
chrLines.add(line);
if (chrLines.size() == BATCH_SIZE) {
// add ena sequence name and save
addENASequenceNameAndSave(assemblyEntity, chrLines, enaDataSource, downloadedENAFilePath, chromosomeRepository);
chromosomesSavedTillNow += chrLines.size();
logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow);

chrLines = new ArrayList<>();
}
}
if (!chrLines.isEmpty()) {
// add ena sequence name and save
addENASequenceNameAndSave(assemblyEntity, chrLines, enaDataSource, downloadedENAFilePath, chromosomeRepository);
chromosomesSavedTillNow += chrLines.size();
logger.info("Number of chromosomes saved till now : " + chromosomesSavedTillNow);
}
}

// delete the files after assembly insertion
Files.deleteIfExists(downloadedNCBIFilePath);
if (downloadedENAFilePath != null) {
Files.deleteIfExists(downloadedENAFilePath);
}
}

public void addENASequenceNameAndSave(AssemblyEntity assemblyEntity, List<String> chrLines,
ENAAssemblyDataSource enaDataSource, Path downloadedENAFilePath,
ChromosomeRepository chromosomeRepository) throws IOException {
List<ChromosomeEntity> chromosomeEntityList = getChromosomeEntityList(assemblyEntity, chrLines);
if (downloadedENAFilePath != null) {
enaDataSource.addENASequenceNameToChromosomes(chromosomeEntityList, downloadedENAFilePath, BATCH_SIZE);
}
chromosomeRepository.saveAll(chromosomeEntityList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ protected void parseAssemblyData(String line) {

protected void parseChromosomeLine(String[] columns) {
ChromosomeEntity chromosomeEntity = getChromosome(columns);
if (chromosomeEntity == null) {
return;
}

if (assemblyEntity == null) {
assemblyEntity = new AssemblyEntity();
Expand All @@ -81,6 +84,9 @@ protected void parseChromosomeLine(String[] columns) {

protected void parseScaffoldLine(String[] columns) {
ChromosomeEntity scaffoldEntity = getScaffold(columns);
if (scaffoldEntity == null) {
return;
}

if (assemblyEntity == null) {
assemblyEntity = new AssemblyEntity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ protected void parseAssemblyData(String line) {

protected void parseChromosomeLine(String[] columns) {
ChromosomeEntity chromosomeEntity = getChromosome(columns);
if (chromosomeEntity == null) {
return;
}

if (assemblyEntity == null) {
assemblyEntity = new AssemblyEntity();
Expand All @@ -114,6 +117,9 @@ protected void parseChromosomeLine(String[] columns) {

protected void parseScaffoldLine(String[] columns) {
ChromosomeEntity scaffoldEntity = getScaffold(columns);
if (scaffoldEntity == null) {
return;
}

if (assemblyEntity == null) {
assemblyEntity = new AssemblyEntity();
Expand Down
117 changes: 2 additions & 115 deletions src/main/java/uk/ac/ebi/eva/contigalias/service/AssemblyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@
import uk.ac.ebi.eva.contigalias.scheduler.ChecksumSetter;

import javax.transaction.Transactional;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -60,8 +56,6 @@ public class AssemblyService {

private final ChecksumSetter checksumSetter;

private final int BATCH_SIZE = 100000;

private final Logger logger = LoggerFactory.getLogger(AssemblyService.class);

@Autowired
Expand Down Expand Up @@ -111,119 +105,12 @@ public void fetchAndInsertAssembly(String accession) throws IOException {
throw duplicateAssemblyInsertionException(accession, entity.get());
}

Optional<Path> downloadNCBIFilePathOpt = ncbiDataSource.downloadAssemblyReport(accession);
Path downloadedNCBIFilePath = downloadNCBIFilePathOpt.orElseThrow(() -> new AssemblyNotFoundException(accession));
Optional<Path> downloadENAFilePathOpt = enaDataSource.downloadAssemblyReport(accession);
Path downloadedENAFilePath = downloadENAFilePathOpt.orElse(null);

long numberOfChromosomesInFile = Files.lines(downloadedNCBIFilePath).filter(line -> !line.startsWith("#")).count();
logger.info("Number of chromosomes in assembly (" + accession + "): " + numberOfChromosomesInFile);

// parse file and save data
parseFileAndInsertAssembly(downloadedNCBIFilePath, downloadedENAFilePath);
// download file and save assembly and chromosome data
ncbiDataSource.parseFileAndInsertAssembly(accession, enaDataSource, assemblyRepository, chromosomeRepository);
logger.info("Successfully inserted assembly for accession " + accession);

// submit job for retrieving and updating MD5 Checksum for assembly (asynchronously)
checksumSetter.updateMd5CheckSumForAssemblyAsync(accession);

Files.deleteIfExists(downloadedNCBIFilePath);
if (downloadedENAFilePath != null) {
Files.deleteIfExists(downloadedENAFilePath);
}
}

@Transactional
public void parseFileAndInsertAssembly(Path downloadedNCBIFilePath, Path downloadedENAFilePath) throws IOException {
AssemblyEntity assemblyEntity = ncbiDataSource.getAssemblyEntity(downloadedNCBIFilePath);
assemblyRepository.save(assemblyEntity);

try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedNCBIFilePath.toFile()))) {
List<String> chrLines = new ArrayList<>();
String line;
long chromosomesSavedTillNow = 0;
while ((line = bufferedReader.readLine()) != null) {
if (line.startsWith("#")) {
continue;
}
chrLines.add(line);
if (chrLines.size() == BATCH_SIZE) {
List<ChromosomeEntity> chromosomeEntityList = ncbiDataSource.getChromosomeEntityList(assemblyEntity, chrLines);
if (downloadedENAFilePath != null) {
addENASequenceNameToChromosomes(assemblyEntity, chromosomeEntityList, downloadedENAFilePath);
}
chromosomeRepository.saveAll(chromosomeEntityList);
chromosomesSavedTillNow += chromosomeEntityList.size();
logger.info("Number of total chromosomes saved till now : " + chromosomesSavedTillNow);

chrLines = new ArrayList<>();
}
}

if (!chrLines.isEmpty()) {
List<ChromosomeEntity> chromosomeEntityList = ncbiDataSource.getChromosomeEntityList(assemblyEntity, chrLines);
if (downloadedENAFilePath != null) {
addENASequenceNameToChromosomes(assemblyEntity, chromosomeEntityList, downloadedENAFilePath);
}
chromosomeRepository.saveAll(chromosomeEntityList);
chromosomesSavedTillNow += chromosomeEntityList.size();
logger.info("Number of total chromosomes saved till now : " + chromosomesSavedTillNow);
}
}
}

public void addENASequenceNameToChromosomes(AssemblyEntity assemblyEntity, List<ChromosomeEntity> ncbiChromosomeList,
Path downloadedENAFilePath) throws IOException {
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(downloadedENAFilePath.toFile()))) {
List<String> chrLines = new ArrayList<>();
String line;
while ((line = bufferedReader.readLine()) != null) {
if (line.startsWith("accession")) {
continue;
}
chrLines.add(line);
if (chrLines.size() == BATCH_SIZE) {
List<ChromosomeEntity> enaChromosomeList = enaDataSource.getChromosomeEntityList(assemblyEntity, chrLines);
enaDataSource.addENASequenceNames(
!enaChromosomeList.isEmpty() ? enaChromosomeList : Collections.emptyList(),
!ncbiChromosomeList.isEmpty() ? ncbiChromosomeList : Collections.emptyList()
);

chrLines = new ArrayList<>();
}
}
if (!chrLines.isEmpty()) {
List<ChromosomeEntity> enaChromosomeList = enaDataSource.getChromosomeEntityList(assemblyEntity, chrLines);
enaDataSource.addENASequenceNames(
!enaChromosomeList.isEmpty() ? enaChromosomeList : Collections.emptyList(),
!ncbiChromosomeList.isEmpty() ? ncbiChromosomeList : Collections.emptyList()
);
}
}
}

public void fetchAndInsertAssemblyOld(String accession) throws IOException {
Optional<AssemblyEntity> entity = assemblyRepository.findAssemblyEntityByAccession(accession);
if (entity.isPresent()) {
throw duplicateAssemblyInsertionException(accession, entity.get());
}
Optional<AssemblyEntity> fetchAssembly = ncbiDataSource.getAssemblyByAccession(accession);
if (!fetchAssembly.isPresent()) {
throw new AssemblyNotFoundException(accession);
}
if (fetchAssembly.isPresent()) {
AssemblyEntity assemblyEntity = fetchAssembly.get();
enaDataSource.addENASequenceNamesToAssembly(assemblyEntity);
if (assemblyEntity.getChromosomes() != null && assemblyEntity.getChromosomes().size() > 0) {
insertAssembly(assemblyEntity);
logger.info("Successfully inserted assembly for accession " + accession);
// submit job for retrieving and updating MD5 Checksum for assembly (asynchronously)
checksumSetter.updateMd5CheckSumForAssemblyAsync(accession);
} else {
logger.error("Skipping inserting assembly : No chromosome in assembly " + accession);
}
} else {
logger.error("Could not get assembly from NCBI");
}
}

public void retrieveAndInsertMd5ChecksumForAssembly(String assembly) {
Expand Down

0 comments on commit c631552

Please sign in to comment.