Skip to content

Commit

Permalink
Parallelize gap sealing (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
schutzekatze authored and Ben Vandervalk committed Aug 24, 2018
1 parent 895d88f commit f18bd36
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions Sealer/sealer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ template <typename Graph>
string merge(const Graph& g,
unsigned k,
const Gap& gap,
FastaRecord &read1,
FastaRecord &read2,
const FastaRecord &read1,
const FastaRecord &read2,
const ConnectPairsParams& params,
Counters& g_count,
ofstream& traceStream)
Expand Down Expand Up @@ -583,7 +583,6 @@ void kRun(const ConnectPairsParams& params,
map<FastaRecord, map<FastaRecord, Gap> >::iterator read1_it;
map<FastaRecord, Gap>::iterator read2_it;
unsigned uniqueGapsClosed = 0;
bool success;

Counters g_count;
g_count.noStartOrGoalKmer = 0;
Expand All @@ -603,38 +602,50 @@ void kRun(const ConnectPairsParams& params,

printLog(logStream, "Flanks inserted into k run = " + IntToString(flanks.size()) + "\n");

for (read1_it = flanks.begin(); read1_it != flanks.end();) {
success = false;
int counter = 0;
vector<map<FastaRecord, map<FastaRecord, Gap> >::iterator> flanks_closed;
#pragma omp parallel private(read1_it, read2_it) firstprivate(counter)
for (read1_it = flanks.begin(); read1_it != flanks.end(); ++read1_it) {
FastaRecord read1 = read1_it->first;
for (read2_it = flanks[read1].begin(); read2_it != flanks[read1].end(); read2_it++) {
bool success = false;
for (read2_it = flanks[read1].begin(); read2_it != flanks[read1].end(); ++read2_it, ++counter) {
#if _OPENMP
if (counter % omp_get_num_threads() != omp_get_thread_num())
continue;
#endif
FastaRecord read2 = read2_it->first;

int startposition = read2_it->second.gapStart();
string tempSeq = merge(g, k, read2_it->second, read1, read2, params, g_count, traceStream);
if (!tempSeq.empty()) {
success = true;
#pragma omp critical (allmerged)
allmerged[read1.id.substr(0,read1.id.length()-2)][startposition]
= ClosedGap(read2_it->second, tempSeq);
//#pragma omp atomic
gapsclosed++;
//#pragma omp atomic
uniqueGapsClosed++;
if (gapsclosed % 100 == 0)
#pragma omp atomic
++uniqueGapsClosed;
#pragma omp critical (gapsclosed)
if (++gapsclosed % 100 == 0)
printLog(logStream, IntToString(gapsclosed) + " gaps closed so far\n");

if (!opt::gapfilePath.empty()) {
if (!opt::gapfilePath.empty())
#pragma omp critical (gapStream)
gapStream << ">" << read1.id.substr(0,read1.id.length()-2)
<< "_" << read2_it->second.gapStart() << "-" << read2_it->second.gapEnd()
<< " LN:i:" << tempSeq.length() << '\n';
gapStream << tempSeq << '\n';
}
<< " LN:i:" << tempSeq.length() << '\n'
<< tempSeq << '\n';
}
}
if (success) {
flanks.erase(read1_it++);
#pragma omp critical (flanks_closed)
flanks_closed.push_back(read1_it);
}
else
read1_it++;
}

for (vector<map<FastaRecord, map<FastaRecord, Gap> >::iterator>::iterator it = flanks_closed.begin();
it != flanks_closed.end(); ++it) {
if (flanks.count((*it)->first) > 0)
flanks.erase(*it);
}

printLog(logStream, IntToString(uniqueGapsClosed) + " unique gaps closed for k" + IntToString(k) + "\n");
Expand Down

0 comments on commit f18bd36

Please sign in to comment.