Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/nharper285/onefuzz
Browse files Browse the repository at this point in the history
  • Loading branch information
nharper285 committed Oct 26, 2023
2 parents d10ae27 + 5cf619e commit 55527fb
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 108 deletions.
9 changes: 7 additions & 2 deletions src/ApiService/ApiService/Functions/QueueJobResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ public async Async.Task Run([QueueTrigger("job-result", Connection = "AzureWebJo

var job = await _jobs.Get(task.JobId);
if (job == null) {
_log.LogWarning("invalid {JobId}", task.JobId);
_log.LogWarning("invalid message {JobId}", task.JobId);
return;
}

if (jr.CreatedAt == null) {
_log.LogWarning("invalid message, no created_at field {JobId}", task.JobId);
return;
}

Expand All @@ -52,7 +57,7 @@ public async Async.Task Run([QueueTrigger("job-result", Connection = "AzureWebJo
return;
}

var jobResult = await _context.JobResultOperations.CreateOrUpdate(job.JobId, jobResultType, value);
var jobResult = await _context.JobResultOperations.CreateOrUpdate(job.JobId, jr.TaskId, jr.MachineId, jr.CreatedAt.Value, jr.Version, jobResultType, value);
if (!jobResult.IsOk) {
_log.LogError("failed to create or update with job result {JobId}", job.JobId);
}
Expand Down
49 changes: 18 additions & 31 deletions src/ApiService/ApiService/OneFuzzTypes/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,6 @@ public enum HeartbeatType {
TaskAlive,
}

[SkipRename]
public enum JobResultType {
NewCrashingInput,
NoReproCrashingInput,
NewReport,
NewUniqueReport,
NewRegressionReport,
NewCoverage,
NewCrashDump,
CoverageData,
RuntimeStats,
}

public record HeartbeatData(HeartbeatType Type);

public record TaskHeartbeatEntry(
Expand All @@ -55,12 +42,14 @@ public record TaskHeartbeatEntry(
[property: Required] Guid MachineId,
HeartbeatData[] Data);

public record JobResultData(JobResultType Type);
public record JobResultData(string Type);

public record TaskJobResultEntry(
Guid TaskId,
Guid? JobId,
Guid MachineId,
DateTime? CreatedAt,
double Version,
JobResultData Data,
Dictionary<string, double> Value
);
Expand Down Expand Up @@ -921,26 +910,24 @@ public record SecretAddress<T>(Uri Url) : ISecret<T> {
public record SecretData<T>(ISecret<T> Secret) {
}

[SkipRename]
public enum JobResultType {
CoverageData,
RuntimeStats,
}

public record JobResult(
[PartitionKey][RowKey] Guid JobId,
[PartitionKey] Guid JobId,
[RowKey] string TaskIdMachineIdMetric,
Guid TaskId,
Guid MachineId,
DateTime CreatedAt,
string Project,
string Name,
double NewCrashingInput = 0,
double NoReproCrashingInput = 0,
double NewReport = 0,
double NewUniqueReport = 0,
double NewRegressionReport = 0,
double NewCrashDump = 0,
double InstructionsCovered = 0,
double TotalInstructions = 0,
double CoverageRate = 0,
double IterationCount = 0
) : EntityBase() {
public JobResult(Guid JobId, string Project, string Name) : this(
JobId: JobId,
Project: Project,
Name: Name, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) { }
}
string Type,
double Version,
Dictionary<string, double> MetricValue
) : EntityBase();

public record JobConfig(
string Project,
Expand Down
1 change: 1 addition & 0 deletions src/ApiService/ApiService/host.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"version": "2.0",
"functionTimeout": "12:00:00",
"logging": {
"applicationInsights": {
"samplingSettings": {
Expand Down
118 changes: 47 additions & 71 deletions src/ApiService/ApiService/onefuzzlib/JobResultOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,99 +2,75 @@
using Microsoft.Extensions.Logging;
using Polly;
namespace Microsoft.OneFuzz.Service;
using System.Net;

public interface IJobResultOperations : IOrm<JobResult> {

Async.Task<JobResult?> GetJobResult(Guid jobId);
Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary<string, double> resultValue);
Async.Task<JobResult?> GetJobResult(Guid jobId, Guid taskId, Guid machineId, string metricType);
Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, Guid taskId, Guid machineId, DateTime createdAt, double version, string resultType, Dictionary<string, double> resultValue);

}
public class JobResultOperations : Orm<JobResult>, IJobResultOperations {

const string COVERAGE_DATA = "CoverageData";
const string RUNTIME_STATS = "RuntimeStats";

public JobResultOperations(ILogger<JobResultOperations> log, IOnefuzzContext context)
: base(log, context) {
}

public async Async.Task<JobResult?> GetJobResult(Guid jobId) {
return await SearchByPartitionKeys(new[] { jobId.ToString() }).SingleOrDefaultAsync();
public async Async.Task<JobResult?> GetJobResult(Guid jobId, Guid taskId, Guid machineId, string metricType) {
var data = QueryAsync(Query.SingleEntity(jobId.ToString(), string.Concat(taskId, "-", machineId, "-", metricType)));
return await data.FirstOrDefaultAsync();
}

private JobResult UpdateResult(JobResult result, JobResultType type, Dictionary<string, double> resultValue) {

var newResult = result;
double newValue;
switch (type) {
case JobResultType.NewCrashingInput:
newValue = result.NewCrashingInput + resultValue["count"];
newResult = result with { NewCrashingInput = newValue };
break;
case JobResultType.NewReport:
newValue = result.NewReport + resultValue["count"];
newResult = result with { NewReport = newValue };
break;
case JobResultType.NewUniqueReport:
newValue = result.NewUniqueReport + resultValue["count"];
newResult = result with { NewUniqueReport = newValue };
break;
case JobResultType.NewRegressionReport:
newValue = result.NewRegressionReport + resultValue["count"];
newResult = result with { NewRegressionReport = newValue };
break;
case JobResultType.NewCrashDump:
newValue = result.NewCrashDump + resultValue["count"];
newResult = result with { NewCrashDump = newValue };
break;
case JobResultType.CoverageData:
double newCovered = resultValue["covered"];
double newTotalCovered = resultValue["features"];
double newCoverageRate = resultValue["rate"];
newResult = result with { InstructionsCovered = newCovered, TotalInstructions = newTotalCovered, CoverageRate = newCoverageRate };
break;
case JobResultType.RuntimeStats:
double newTotalIterations = resultValue["total_count"];
newResult = result with { IterationCount = newTotalIterations };
break;
default:
_logTracer.LogWarning($"Invalid Field {type}.");
break;
}
_logTracer.LogInformation($"Attempting to log new result: {newResult}");
return newResult;
}

private async Async.Task<bool> TryUpdate(Job job, JobResultType resultType, Dictionary<string, double> resultValue) {
private async Async.Task<bool> TryUpdate(Job job, Guid taskId, Guid machineId, DateTime createdAt, double version, string resultType, Dictionary<string, double> resultValue) {
var jobId = job.JobId;
var taskIdMachineIdMetric = string.Concat(taskId, "-", machineId, "-", resultType);

var jobResult = await GetJobResult(jobId);

if (jobResult == null) {
_logTracer.LogInformation("Creating new JobResult for Job {JobId}", jobId);

var entry = new JobResult(JobId: jobId, Project: job.Config.Project, Name: job.Config.Name);
var oldEntry = await GetJobResult(jobId, taskId, machineId, resultType);

jobResult = UpdateResult(entry, resultType, resultValue);

var r = await Insert(jobResult);
if (!r.IsOk) {
throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}");
if (oldEntry == null) {
_logTracer.LogInformation($"attempt to insert new job result {taskId} and taskId+machineId+metricType {taskIdMachineIdMetric}");
var newEntry = new JobResult(JobId: jobId, TaskIdMachineIdMetric: taskIdMachineIdMetric, TaskId: taskId, MachineId: machineId, CreatedAt: createdAt, Project: job.Config.Project, Name: job.Config.Name, resultType, Version: version, resultValue);
var result = await Insert(newEntry);
if (!result.IsOk) {
throw new InvalidOperationException($"failed to insert job result with taskId {taskId} and taskId+machineId+metricType {taskIdMachineIdMetric}");
}
_logTracer.LogInformation("created job result {JobId}", jobResult.JobId);
} else {
_logTracer.LogInformation("Updating existing JobResult entry for Job {JobId}", jobId);

jobResult = UpdateResult(jobResult, resultType, resultValue);
return true;
}

var r = await Update(jobResult);
if (!r.IsOk) {
throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}");
}
_logTracer.LogInformation("updated job result {JobId}", jobResult.JobId);
ResultVoid<(HttpStatusCode Status, string Reason)> r;
switch (resultType) {
case COVERAGE_DATA:
case RUNTIME_STATS:
if (oldEntry.CreatedAt < createdAt) {
oldEntry = oldEntry with { CreatedAt = createdAt, MetricValue = resultValue };
r = await Update(oldEntry);
if (!r.IsOk) {
throw new InvalidOperationException($"failed to replace job result with taskId {taskId} and machineId+metricType {taskIdMachineIdMetric}");
}
} else {
_logTracer.LogInformation($"received an out-of-date metric. skipping.");
}
break;
default:
_logTracer.LogInformation($"attempt to update job result {taskId} and taskId+machineId+metricType {taskIdMachineIdMetric}");
oldEntry.MetricValue["count"]++;
oldEntry = oldEntry with { MetricValue = oldEntry.MetricValue };
r = await Update(oldEntry);
if (!r.IsOk) {
throw new InvalidOperationException($"failed to update job result with taskId {taskId} and machineId+metricType {taskIdMachineIdMetric}");
}
break;
}


return true;

}

public async Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary<string, double> resultValue) {
public async Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, Guid taskId, Guid machineId, DateTime createdAt, double version, string resultType, Dictionary<string, double> resultValue) {

var job = await _context.JobOperations.Get(jobId);
if (job == null) {
Expand All @@ -106,7 +82,7 @@ public async Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, JobResultT
_logTracer.LogInformation("attempt to update job result {JobId}", job.JobId);
var policy = Policy.Handle<InvalidOperationException>().WaitAndRetryAsync(50, _ => new TimeSpan(0, 0, 5));
await policy.ExecuteAsync(async () => {
success = await TryUpdate(job, resultType, resultValue);
success = await TryUpdate(job, taskId, machineId, createdAt, version, resultType, resultValue);
_logTracer.LogInformation("attempt {success}", success);
});
return OneFuzzResultVoid.Ok;
Expand Down
2 changes: 2 additions & 0 deletions src/agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion src/agent/onefuzz-result/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ license = "MIT"
[dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
async-trait = "0.1"
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
"serde"
] }
reqwest = "0.11"
serde = "1.0"
storage-queue = { path = "../storage-queue" }
uuid = { version = "1.4", features = ["serde", "v4"] }
onefuzz-telemetry = { path = "../onefuzz-telemetry" }
log = "0.4"

9 changes: 8 additions & 1 deletion src/agent/onefuzz-result/src/job_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use anyhow::Result;
use async_trait::async_trait;
use chrono::DateTime;
pub use chrono::Utc;
use onefuzz_telemetry::warn;
use reqwest::Url;
use serde::{self, Deserialize, Serialize};
Expand Down Expand Up @@ -32,6 +34,8 @@ struct JobResult {
job_id: Uuid,
machine_id: Uuid,
machine_name: String,
created_at: DateTime<Utc>,
version: f64,
data: JobResultData,
value: HashMap<String, f64>,
}
Expand Down Expand Up @@ -103,7 +107,8 @@ impl JobResultSender for TaskJobResultClient {
let job_id = self.context.state.job_id;
let machine_id = self.context.state.machine_id;
let machine_name = self.context.state.machine_name.clone();

let created_at = chrono::Utc::now();
let version = 1.0;
let _ = self
.context
.queue_client
Expand All @@ -112,6 +117,8 @@ impl JobResultSender for TaskJobResultClient {
job_id,
machine_id,
machine_name,
created_at,
version,
data,
value,
})
Expand Down
7 changes: 5 additions & 2 deletions src/agent/onefuzz-task/src/tasks/coverage/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ impl CoverageTask {
bail!("input is not specified on the command line or arguments for the target");
}

context.heartbeat.alive();

info!("report initial coverage");
context.report_coverage_stats().await;

context.heartbeat.alive();

for dir in &self.config.readonly_inputs {
debug!("recording coverage for {}", dir.local_path.display());

Expand All @@ -174,6 +174,9 @@ impl CoverageTask {
context.save_and_sync_coverage().await?;
}

info!("report coverage");
context.report_coverage_stats().await;

context.heartbeat.alive();

if let Some(queue) = &self.config.input_queue {
Expand Down

0 comments on commit 55527fb

Please sign in to comment.