Skip to content

Commit

Permalink
Add output file to metacat dataset to
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew-McNab-UK committed May 18, 2024
1 parent 579d8cf commit c2f8ba4
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 22 deletions.
15 changes: 8 additions & 7 deletions agents/justin-finder
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ def findSubmittedWorkflows():
'ON workflows.scope_id=scopes.scope_id '
'LEFT JOIN condor_groups '
'ON condor_groups.condor_group_id=scopes.condor_group_id '
'LEFT JOIN users ON users.user_id=workflows.user_id
'LEFT JOIN users ON users.user_id=workflows.user_id '
'LEFT JOIN principal_names '
'ON principal_names.pn_id=users.main_pn_id '
'WHERE workflows.state="submitted" '
Expand Down Expand Up @@ -765,7 +765,7 @@ def processSubmittedWorkflow(workflow):
except:
condorGroupID = 0

stages = justin.select('SELECT stage_id,processors,jobscript_git,
stages = justin.select('SELECT stage_id,processors,jobscript_git,'
'jobscript_image,wall_seconds,rss_bytes '
'FROM stages WHERE workflow_id=%d'
% workflowID)
Expand Down Expand Up @@ -801,7 +801,7 @@ def processSubmittedWorkflow(workflow):
metadataDict = { "dune.workflow" :
{ "workflow_id" : workflowID,
"stage_id" : stageID,
"pattern_id" : patternID,
"pattern_id" : stageOutput['pattern_id'],
"file_pattern" : stageOutput['file_pattern'],
"user" : workflow['principal_name'],
"processors" : stage['processors'],
Expand All @@ -813,7 +813,7 @@ def processSubmittedWorkflow(workflow):
# ADD RCDS/Git repos in here too with tags and commit hashes

if stage['jobscript_git']:
metadataDict['workflow']['jobscript_git'] = stage['jobscript_git']
metadataDict['dune.workflow']['jobscript_git'] = stage['jobscript_git']

createOutputDatasets(workflowID = workflowID,
stageID = stageID,
Expand Down Expand Up @@ -852,13 +852,14 @@ def createOutputDatasets(workflowID, stageID, patternID, metadataDict,
logLine('Try to add MetaCat dataset %s:%s' % (scopeName, destination))

# Temporary file to pass to metacat command
(fp, metadataFile) = tempfile.mkstemp(text = True)
(fd, metadataFile) = tempfile.mkstemp(text = True)
fp = os.fdopen(fd, 'w')
json.dump(metadataDict, fp)
fp.close()

try:
ret = executeMetaCatCommand("dataset create %s:%s --metadata %s"
% (scopeName, destination, metadataFile))
ret = executeMetaCatCommand("dataset create --metadata %s %s:%s"
% (metadataFile, scopeName, destination))
# Carry on regardless if ret != 0 since we can't tell from the metacat
# return code if the dataset just already exists or there was a
# genuine problem ...
Expand Down
10 changes: 10 additions & 0 deletions agents/justin-wrapper-job
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,16 @@ for (destination, fileScope, fileName, fileSize, patternID) in outputFiles:
logLine('Failed to upload %s:%s' % (fileScope, fileName))
jobAborted(320, 'rucio_upload', '')

# Add the file to the main dataset
(ret, out) = executeMetaCatCommand('dataset add-files --files "%s:%s" '
'"%s:%s"' % ( fileScope, fileName,
fileScope, destination )
)

if ret:
logLine('Failed to add %s:%s to %s' % (fileScope, fileName, destination))
jobAborted(319, 'metacat_registration', '')

# Update MetaCat output_status for this file to uploaded
try:
(ret, out) = executeMetaCatCommand("file update --metadata "
Expand Down
2 changes: 1 addition & 1 deletion modules/justin_version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This file must by valid Python AND valid Bash!
justinVersion='01.01.rc2'
justinVersion='01.01.rc3'
27 changes: 14 additions & 13 deletions services/justin-wsgi-allocator
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,15 @@ def httpError(startResponse, code, message):

return [message.encode('UTF-8')]

def httpOK(startResponse, outputString):
def httpOK(startResponse, outputString, codeStr = '200 OK'):

try:
justin.conn.commit()
except Exception as e:
code = '500 Internal Server Error'
codeStr = '500 Internal Server Error'
outputString = 'Database commit fails: ' + str(e)
else:
code = '200 OK'

startResponse(code,
startResponse(codeStr,
[('Content-type', 'text/plain'),
('Content-length', str(len(outputString)))
])
Expand Down Expand Up @@ -551,7 +549,7 @@ def makeJobscriptDict(jsonDict, jobDict):
if rseRow['distance'] == 0
else rseRow['wan_write_scheme'],
pfn))

# Create an ordered output RSE list specific to this stage
try:
query = ('SELECT rse_name,lan_write_scheme,wan_write_scheme,distance '
Expand Down Expand Up @@ -1363,9 +1361,10 @@ def getFileMethod(startResponse, jsonDict):
'500 Internal Server Error',
'Unable to update job to sent_get_file=TRUE: ' + str(e))

return httpError(startResponse,
'404 Not Found',
'No eligible file found')
# Use httpOK() since this does not do a rollback
return httpOK(startResponse,
'No eligible file found',
codeStr = '404 Not Found')

def updateFileProcessing(fileIDList,
state, justinJobID, workflowID, stageID):
Expand Down Expand Up @@ -1954,7 +1953,8 @@ def confirmResultsMethod(startResponse, jsonDict):

try:
numProcessedRow = justin.select('SELECT '
'(SELECT sent_get_file FROM jobs WHERE justin_job_id=%d),'
'(SELECT sent_get_file FROM jobs WHERE justin_job_id=%d) '
'AS sent_get_file,'
'(SELECT COUNT(*) FROM events WHERE '
'events.justin_job_id=%d AND '
'events.event_type_id=%d) AS num_processed,'
Expand All @@ -1976,14 +1976,15 @@ def confirmResultsMethod(startResponse, jsonDict):
# Finish up and update job status
try:
# Jobs only finish in the none_processed state if any files were allocated
# or if they never even asked for any
# or if they never even asked for any, but not for AWT jobs
justin.insertUpdate('UPDATE jobs SET job_state="%s",'
'heartbeat_time=NOW(),'
'finished_time=NOW() '
'WHERE justin_job_id=%d AND job_state="outputting"'
% ('none_processed'
if (numProcessed==0 and numAllocated)
or not sentGetFile
if ((numProcessed==0 and numAllocated)
or not sentGetFile)
and (workflowID != justin.awtWorkflowID)
else 'finished',
justinJobID))

Expand Down
2 changes: 1 addition & 1 deletion services/justin-wsgi-ui
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ def createStage(jsonDict, user, simple = False, scopeName = None):
'stage_priority=50,'
'workflow_id=%d,'
'stage_id=%d,'
'jobscript_git="github.com:%s",'
'jobscript_git="%s",'
'jobscript_image="%s",'
'processors=%d,'
'wall_seconds=%d,'
Expand Down

0 comments on commit c2f8ba4

Please sign in to comment.