Skip to content

Commit

Permalink
Merge pull request #11 from compgen-io/status-tempjob-log
Browse files Browse the repository at this point in the history
Added status command
  • Loading branch information
mbreese authored Aug 2, 2022
2 parents 28a035d + 0d12931 commit e4dfa46
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 62 deletions.
9 changes: 9 additions & 0 deletions src/java/io/compgen/cgpipe/CGPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.commons.logging.LogFactory;

import io.compgen.cgpipe.cmd.CancelPending;
import io.compgen.cgpipe.cmd.JobStatus;
import io.compgen.cgpipe.cmd.ShowPending;
import io.compgen.cgpipe.cmd.UpdateJobEnd;
import io.compgen.cgpipe.cmd.UpdateJobStart;
Expand Down Expand Up @@ -97,6 +98,14 @@ public static void main(String[] args) throws Exception {
ShowPending.main(newargs);
return;
}
if (args.length > 0 && args[0].equals("status")) {
String [] newargs = new String[args.length-1];
for (int i=1; i< args.length; i++) {
newargs[i-1] = args[i];
}
JobStatus.main(newargs);
return;
}

defaultMain(args);
}
Expand Down
5 changes: 3 additions & 2 deletions src/java/io/compgen/cgpipe/USAGE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ Note: If no target is specified, the first target in the file will be used


Additional commands:
show-pending Show pending/running jobs from a job-log
cancel-pending Cancel pending jobs from a job-log
vaccuum Clean up a job-log
show-pending Show pending/running jobs from a job-log
status Show the status for all output files
vaccuum Clean up a job-log (keep only the newest record for any output)

97 changes: 97 additions & 0 deletions src/java/io/compgen/cgpipe/cmd/JobStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.compgen.cgpipe.cmd;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import io.compgen.cgpipe.CGPipe;
import io.compgen.cgpipe.exceptions.ASTExecException;
import io.compgen.cgpipe.exceptions.ASTParseException;
import io.compgen.cgpipe.exceptions.RunnerException;
import io.compgen.cgpipe.parser.context.RootContext;
import io.compgen.cgpipe.runner.JobRunner;
import io.compgen.cgpipe.runner.joblog.JobLog;
import io.compgen.cgpipe.support.SimpleFileLoggerImpl;
import io.compgen.cmdline.MainBuilder;
import io.compgen.cmdline.annotation.Command;
import io.compgen.cmdline.annotation.Exec;
import io.compgen.cmdline.annotation.UnnamedArg;
import io.compgen.cmdline.impl.AbstractCommand;
import io.compgen.common.StringUtils;

@Command(name = "cgpipe status", desc = "For each output in the job-log, show the status (finished, pending, error)", doc = "")
public class JobStatus extends AbstractCommand {
private String jobLogFilename = null;

@UnnamedArg(required = true, name="FILE")
public void setJobLogFilename(final String jobLogFilename) {
this.jobLogFilename = jobLogFilename;
}

@Exec
public void exec() throws IOException, ASTParseException, ASTExecException, RunnerException {
SimpleFileLoggerImpl.setSilent(true);
JobRunner runner = null;
// Load config values from global config.
RootContext root = new RootContext();
root.setOutputStream(null);
CGPipe.loadInitFiles(root);


// Load the job runner *after* we execute the script to capture any config changes
runner = JobRunner.load(root);

JobLog log = JobLog.open(jobLogFilename);

// <JobId, Output>
Map<String, List<String>> jobs = new TreeMap<String, List<String>> (StringUtils.naturalSorter());

for (String output: log.getOutputJobIds().keySet()) {
String jobid = log.getJobIdForOutput(output);
if (!jobs.containsKey(jobid)) {
jobs.put(jobid, new ArrayList<String>());
}
jobs.get(jobid).add(output);
}


for (String jobid: jobs.keySet()) {
for (String file: jobs.get(jobid)) {
String status = "MISSING";
if (runner.isJobIdValid(jobid)) {
status = "PENDING";
} else {

File f;
if (file.startsWith("/")) {
f = new File(file);
} else {
f = new File(log.getJob(jobid).getWorkingDirectory() + File.separator + file);
}
if (f.exists()) {
if (f.lastModified() >= log.getJob(jobid).getSubmitTime()) {
status = "OK";
} else {
status = "JOBFAIL";
}
}

}

if (status.equals("MISSING") && log.getJob(jobid).getTempOutputs() != null && log.getJob(jobid).getTempOutputs().contains(file)) {
// missing files that don't exist and that are temp might not be errors
// TODO - actually traverse the tree to see if the next non-temp file exists?

status = "TEMP";
}
System.out.println(status + "\t" + jobid + "\t" + file);
}
}
}
public static void main(final String[] args) throws Exception {
new MainBuilder().runClass(JobStatus.class, args);
}
}
8 changes: 7 additions & 1 deletion src/java/io/compgen/cgpipe/parser/target/BuildTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,14 @@ public JobDef eval(List<NumberedLine> pre, List<NumberedLine> post, RootContext
}

TemplateParser.parseTemplate(lines, pre, post, jobRoot);
List<String> tempOutputs = new ArrayList<String>();
for (String output: outputs) {
if (isTempOutput(output)) {
tempOutputs.add(output);
}
}

return new JobDef(jobRoot.getBody(), jobRoot.cloneValues(), outputs, inputs);
return new JobDef(jobRoot.getBody(), jobRoot.cloneValues(), outputs, inputs, tempOutputs);
}
//
// public boolean isSkippable(String out) {
Expand Down
14 changes: 14 additions & 0 deletions src/java/io/compgen/cgpipe/runner/JobDef.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,26 @@ public class JobDef implements JobDependency {
private final String body;
private final Map<String, VarValue> settings;
private final List<String> outputs;
private final List<String> tempOutputs;
private final List<String> inputs;
private String jobId = null;
private String name;
private List<JobDependency> depends = new ArrayList<JobDependency>();

public JobDef(String body, Map<String, VarValue> settings, List<String> outputs, List<String> inputs) {
this(body, settings, outputs, inputs, null);
}
public JobDef(String body, Map<String, VarValue> settings, List<String> outputs, List<String> inputs, List<String> tempOutputs) {
this.body = body;
this.settings = settings;
this.outputs = Collections.unmodifiableList(new ArrayList<String>(outputs));
if (tempOutputs != null) {
this.tempOutputs = Collections.unmodifiableList(new ArrayList<String>(tempOutputs));
} else {
this.tempOutputs = new ArrayList<String>();
}


// inputs could be null...
if (inputs == null) {
this.inputs = Collections.unmodifiableList(new ArrayList<String>(){
Expand Down Expand Up @@ -65,6 +75,10 @@ public List<String> getOutputs() {
return outputs;
}

public List<String> getTempOutputs() {
return tempOutputs;
}

public Map<String, VarValue> getSettingsMap() {
return Collections.unmodifiableMap(settings);
}
Expand Down
5 changes: 4 additions & 1 deletion src/java/io/compgen/cgpipe/runner/JobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ private JobDependency submitTargets(BuildTarget target, RootContext context, Str
try {
// build my job definition (script, etc...)
JobDef job = target.eval(prelines, postlines, context);

if (job != null) {
// this is a potential submitted job
boolean blankRoot = false;
Expand Down Expand Up @@ -768,7 +769,6 @@ protected void logJob(JobDef job) {
rec.setSubmitTime(System.currentTimeMillis());
rec.setUser(System.getProperty("user.name"));


for (JobDependency dep:job.getDependencies()) {
if (job.getJobId()!=null && !job.getJobId().equals("")) {
rec.addDep(dep.getJobId());
Expand All @@ -777,6 +777,9 @@ protected void logJob(JobDef job) {
for (String out:job.getOutputs()) {
rec.addOutput(out);
}
for (String tmp:job.getTempOutputs()) {
rec.addTempOutput(tmp);
}
for (String inp:job.getInputs()) {
rec.addInput(inp);
}
Expand Down
124 changes: 66 additions & 58 deletions src/java/io/compgen/cgpipe/runner/joblog/JobLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class JobLog {
// private final String lockSecret = generateRandomString();

// protected List<String> jobIds = new ArrayList<String>();
// protected Map<String, JobLogRecord> records = new HashMap<String,JobLogRecord>();
protected Map<String, JobLogRecord> records = new HashMap<String,JobLogRecord>(); // jobid, record
protected Map<String, String> outputs = new HashMap<String,String>(); // output, jobid

protected JobLog(String filename) throws IOException {
Expand All @@ -56,64 +56,62 @@ protected JobLog(String filename) throws IOException {
outputs.put(arg1, jobid);
}

//////////////////
// THE REST OF THIS IS NOT NEEDED... when we read in the log, it is ONLY to look for the outputs/jobids.
//////////////////
String arg2 = null;
if (key.equals("SETTING")) {
cols = line.split("\t", 4);
arg2 = cols[3];

// String arg2 = null;
// if (key.equals("SETTING")) {
// cols = line.split("\t", 4);
// arg2 = cols[3];

// }

// if (!records.containsKey(jobid)) {
// records.put(jobid, new JobLogRecord(jobid));
// jobIds.add(jobid);
// }

// JobLogRecord rec = records.get(jobid);
// switch(key) {
// case "NAME":
// rec.setName(arg1);
// break;
// case "PIPELINE":
// rec.setPipeline(arg1);
// break;
// case "WORKINGDIR":
// rec.setWorkingDirectory(arg1);
// break;
// case "RETCODE":
// rec.setReturnCode(Integer.parseInt(arg1));
// break;
// case "SUBMIT":
// rec.setSubmitTime(Long.parseLong(arg1));
// break;
// case "START":
// rec.setStartTime(Long.parseLong(arg1));
// break;
// case "END":
// rec.setEndTime(Long.parseLong(arg1));
// break;
// case "SETTING":
// rec.addSetting(arg1, arg2);
// break;
// case "OUTPUT":
// outputs.put(arg1, jobid);
// rec.addOutput(arg1);
// break;
// case "INPUT":
// rec.addInput(arg1);
// break;
// case "DEP":
// rec.addDep(arg1);
// break;
// case "SRC":
// rec.addSrcLine(arg1);
// break;
// default:
// break;
// }
}

if (!records.containsKey(jobid)) {
records.put(jobid, new JobLogRecord(jobid));
}

JobLogRecord rec = records.get(jobid);
switch(key) {
case "NAME":
rec.setName(arg1);
break;
case "PIPELINE":
rec.setPipeline(arg1);
break;
case "WORKINGDIR":
rec.setWorkingDirectory(arg1);
break;
case "RETCODE":
rec.setReturnCode(Integer.parseInt(arg1));
break;
case "SUBMIT":
rec.setSubmitTime(Long.parseLong(arg1));
break;
case "START":
rec.setStartTime(Long.parseLong(arg1));
break;
case "END":
rec.setEndTime(Long.parseLong(arg1));
break;
case "SETTING":
rec.addSetting(arg1, arg2);
break;
case "OUTPUT":
outputs.put(arg1, jobid);
rec.addOutput(arg1);
break;
case "INPUT":
rec.addInput(arg1);
break;
case "DEP":
rec.addDep(arg1);
break;
case "SRC":
rec.addSrcLine(arg1);
break;
case "TEMP":
rec.addTempOutput(arg1);
break;
default:
break;
}

}
reader.close();
Expand Down Expand Up @@ -141,6 +139,11 @@ public String getJobIdForOutput(String output) {
return null;
}

public JobLogRecord getJob(String jobid) {
return records.get(jobid);
}


public Map<String, String> getOutputJobIds() {
return Collections.unmodifiableMap(outputs);
}
Expand Down Expand Up @@ -192,6 +195,11 @@ public void writeRecord(JobLogRecord rec) {
ps.println(rec.getJobId()+"\tOUTPUT\t"+out);
}
}
if (rec.getTempOutputs() != null) {
for (String tmp: rec.getTempOutputs()) {
ps.println(rec.getJobId()+"\tTEMP\t"+tmp);
}
}
if (rec.getInputs() != null) {
for (String inp: rec.getInputs()) {
ps.println(rec.getJobId()+"\tINPUT\t"+inp);
Expand Down
Loading

0 comments on commit e4dfa46

Please sign in to comment.