From f18bd36bbc601d8b6911ddee94ac4109d36e088f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vladimir=20Nikoli=C4=87?= Date: Fri, 24 Aug 2018 19:07:09 +0200 Subject: [PATCH] Parallelize gap sealing (#249) --- Sealer/sealer.cc | 47 +++++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/Sealer/sealer.cc b/Sealer/sealer.cc index 288492d95..a4c827f4b 100644 --- a/Sealer/sealer.cc +++ b/Sealer/sealer.cc @@ -420,8 +420,8 @@ template string merge(const Graph& g, unsigned k, const Gap& gap, - FastaRecord &read1, - FastaRecord &read2, + const FastaRecord &read1, + const FastaRecord &read2, const ConnectPairsParams& params, Counters& g_count, ofstream& traceStream) @@ -583,7 +583,6 @@ void kRun(const ConnectPairsParams& params, map >::iterator read1_it; map::iterator read2_it; unsigned uniqueGapsClosed = 0; - bool success; Counters g_count; g_count.noStartOrGoalKmer = 0; @@ -603,38 +602,50 @@ void kRun(const ConnectPairsParams& params, printLog(logStream, "Flanks inserted into k run = " + IntToString(flanks.size()) + "\n"); - for (read1_it = flanks.begin(); read1_it != flanks.end();) { - success = false; + int counter = 0; + vector >::iterator> flanks_closed; +#pragma omp parallel private(read1_it, read2_it) firstprivate(counter) + for (read1_it = flanks.begin(); read1_it != flanks.end(); ++read1_it) { FastaRecord read1 = read1_it->first; - for (read2_it = flanks[read1].begin(); read2_it != flanks[read1].end(); read2_it++) { + bool success = false; + for (read2_it = flanks[read1].begin(); read2_it != flanks[read1].end(); ++read2_it, ++counter) { +#if _OPENMP + if (counter % omp_get_num_threads() != omp_get_thread_num()) + continue; +#endif FastaRecord read2 = read2_it->first; int startposition = read2_it->second.gapStart(); string tempSeq = merge(g, k, read2_it->second, read1, read2, params, g_count, traceStream); if (!tempSeq.empty()) { success = true; +#pragma omp critical (allmerged) allmerged[read1.id.substr(0,read1.id.length()-2)][startposition] = ClosedGap(read2_it->second, tempSeq); -//#pragma omp atomic - gapsclosed++; -//#pragma omp atomic - uniqueGapsClosed++; - if (gapsclosed % 100 == 0) +#pragma omp atomic + ++uniqueGapsClosed; +#pragma omp critical (gapsclosed) + if (++gapsclosed % 100 == 0) printLog(logStream, IntToString(gapsclosed) + " gaps closed so far\n"); - if (!opt::gapfilePath.empty()) { + if (!opt::gapfilePath.empty()) +#pragma omp critical (gapStream) gapStream << ">" << read1.id.substr(0,read1.id.length()-2) << "_" << read2_it->second.gapStart() << "-" << read2_it->second.gapEnd() - << " LN:i:" << tempSeq.length() << '\n'; - gapStream << tempSeq << '\n'; - } + << " LN:i:" << tempSeq.length() << '\n' + << tempSeq << '\n'; } } if (success) { - flanks.erase(read1_it++); +#pragma omp critical (flanks_closed) + flanks_closed.push_back(read1_it); } - else - read1_it++; + } + + for (vector >::iterator>::iterator it = flanks_closed.begin(); + it != flanks_closed.end(); ++it) { + if (flanks.count((*it)->first) > 0) + flanks.erase(*it); } printLog(logStream, IntToString(uniqueGapsClosed) + " unique gaps closed for k" + IntToString(k) + "\n");