Skip to content

Commit

Permalink
Merge pull request hpcc-systems#19309 from ghalliday/issue33015
Browse files Browse the repository at this point in the history
HPCC-33015 Improve system resilience when thor crashes

Reviewed-by: Mark Kelly mark.kelly@lexisnexisrisk.com
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday authored Nov 22, 2024
2 parents d6da7b3 + 59aeb2b commit c1ca74e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
5 changes: 5 additions & 0 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14500,6 +14500,11 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
// NB: check for expected success state (WUStateWait). If any other state, abort.
{
Owned<IWorkUnit> w = &workunit.lock();
//If the thor instance crashed, make sure that the workunit is no longer associated with it - otherwise a
//failure clause that causes a graph to run can abort because the instances has stopped.
if (w->getEngineSession() > 0)
w->setEngineSession(-1);

WUState state = w->getState();
if (WUStateWait != state) // expected state from successful Thor run from above
{
Expand Down
39 changes: 28 additions & 11 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1473,8 +1473,13 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
Owned<IConstWorkUnit> workunit;
factory.setown(getWorkUnitFactory());
workunit.setown(factory->openWorkUnit(currentWuid));
SessionId agentSessionID = workunit->getAgentSession();
if (agentSessionID <= 0)
SessionId agentSessionID = workunit ? workunit->getAgentSession() : 0;
if (!workunit)
{
WARNLOG("Discarding job with missing workunit wuid=%s, graph=%s", currentWuid.str(), currentGraphName.str());
currentWuid.clear();
}
else if (agentSessionID <= 0)
{
WARNLOG("Discarding job with invalid sessionID: wuid=%s, graph=%s (sessionID=%" I64F "d)", currentWuid.str(), currentGraphName.str(), agentSessionID);
currentWuid.clear();
Expand Down Expand Up @@ -1524,8 +1529,8 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
}
}
}
currentGraphName.clear();

currentGraphName.clear();
if (lingerPeriod)
{
PROGLOG("Lingering time left: %.2f", ((float)lingerPeriod)/1000);
Expand All @@ -1545,15 +1550,27 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
break; // timeout/abort
// else - reject/ignore duff message.
}
if (0 == currentGraphName.length()) // only ever true if !multiJobLinger

// The following is true if no workunit/graph have been received
// MORE: I think it should also be executed if lingerPeriod is 0
if (0 == currentGraphName.length())
{
// De-register the idle lingering entry.
Owned<IWorkUnitFactory> factory;
Owned<IConstWorkUnit> workunit;
factory.setown(getWorkUnitFactory());
workunit.setown(factory->openWorkUnit(currentWuid));
Owned<IWorkUnit> w = &workunit->lock();
w->setDebugValue(instance, "0", true);
if (!multiJobLinger)
{
// De-register the idle lingering entry.
Owned<IWorkUnitFactory> factory;
Owned<IConstWorkUnit> workunit;
factory.setown(getWorkUnitFactory());
workunit.setown(factory->openWorkUnit(currentWuid));
//Unlikely, but the workunit could have been deleted while we were lingering
//currentWuid can also be blank if the workunit this started for died before thor started
//processing the graph. This test covers both (unlikely) situations.
if (workunit)
{
Owned<IWorkUnit> w = &workunit->lock();
w->setDebugValue(instance, "0", true);
}
}
break;
}
}
Expand Down

0 comments on commit c1ca74e

Please sign in to comment.