Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

testing #3575

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/ApiService/ApiService/Functions/Jobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ private async Task<HttpResponseData> Delete(HttpRequestData req) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "jobs delete");
}

_logTracer.LogInformation("*** Deleting job: {JobId}", request.OkV.JobId);

var jobId = request.OkV.JobId;
var job = await _context.JobOperations.Get(jobId);
if (job is null) {
Expand Down
10 changes: 0 additions & 10 deletions src/ApiService/ApiService/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,16 +309,6 @@ public static void LogPageView(this ILogger logger, string pageName) {
logger.Log(LogLevel.Information, EmptyEventId, pageView, null, (state, exception) => state.ToString() ?? $"Failed to convert pageView {pageView}");
}

/// <param name="logger"></param>
/// <param name="name"></param>
/// <param name="startTime"></param>
/// <param name="duration"></param>
/// <param name="responseCode"></param>
/// <param name="success"></param>
public static void LogRequest(this ILogger logger, string name, DateTimeOffset startTime, TimeSpan duration, string responseCode, bool success) {
var request = new RequestTelemetry(name, startTime, duration, responseCode, success);
logger.Log(LogLevel.Information, EmptyEventId, request, null, (state, exception) => state.ToString() ?? $"Failed to convert request {request}");
}
}


Expand Down
105 changes: 96 additions & 9 deletions src/ApiService/ApiService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
global using System.Linq;
// to avoid collision with Task in model.cs
global using Async = System.Threading.Tasks;
using System.IO;
using System.Text;
using System.Text.Json;
using ApiService.OneFuzzLib.Orm;
using Azure.Core.Serialization;
using Azure.Identity;
using Microsoft.ApplicationInsights.Channel;
using Microsoft.ApplicationInsights.DataContracts;
using Microsoft.ApplicationInsights.DependencyCollector;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Middleware;
using Microsoft.Extensions.Configuration;
Expand All @@ -19,24 +24,24 @@
using Microsoft.Graph;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using Semver;

namespace Microsoft.OneFuzz.Service;

public class Program {

/// <summary>
///
///
/// </summary>
public class LoggingMiddleware : IFunctionsWorkerMiddleware {
/// <summary>
///
///
/// </summary>
/// <param name="context"></param>
/// <param name="next"></param>
/// <returns></returns>
public async Async.Task Invoke(FunctionContext context, FunctionExecutionDelegate next) {
//https://learn.microsoft.com/en-us/azure/azure-monitor/app/custom-operations-tracking#applicationinsights-operations-vs-systemdiagnosticsactivity
using var activity = OneFuzzLogger.Activity;

// let azure functions identify the headers for us
if (context.TraceContext is not null && !string.IsNullOrEmpty(context.TraceContext.TraceParent)) {
activity.TraceStateString = context.TraceContext.TraceState;
Expand Down Expand Up @@ -120,12 +125,87 @@ public async Async.Task Invoke(FunctionContext context, FunctionExecutionDelegat
}
}

// public interface IAsyncTelemetryInitializer {
// public Async.Task Initialize(ITelemetry telemetry);
// }

public interface IOnefuzzRequestStore {
void AddRequestTelemetry(RequestTelemetry telemetry);
List<RequestTelemetry> RequestTelemetries { get; }
}

public class OnefuzzRequestStore : IOnefuzzRequestStore {
public List<RequestTelemetry> RequestTelemetries { get; } = new();

public void AddRequestTelemetry(RequestTelemetry telemetry) {
RequestTelemetries.Add(telemetry);

}
}


public class OnefuzzTelemetryInitializer : ITelemetryInitializer {
IServiceProvider services;


public OnefuzzTelemetryInitializer(IServiceProvider services) {

this.services = services;
}

public void Initialize(ITelemetry telemetry) {
var requestTelemetry = telemetry as RequestTelemetry;

if (requestTelemetry == null)
return;

var requestStore = services.GetRequiredService<IOnefuzzRequestStore>();

requestStore.AddRequestTelemetry(requestTelemetry);
}
}

public class RequestBodyLogger : IFunctionsWorkerMiddleware {

public async Async.Task Invoke(FunctionContext context, FunctionExecutionDelegate next) {

var requestStore = context.InstanceServices.GetRequiredService<IOnefuzzRequestStore>();

var requestTelemetry = requestStore.RequestTelemetries.FirstOrDefault();

if (requestTelemetry != null) {
var requestData = await context.GetHttpRequestDataAsync();
var body = requestData?.Body;
if (body is { CanRead: true, CanSeek: true }) {
const int MAX_BODY_SIZE = 4096;
var bufferSize = Math.Max(MAX_BODY_SIZE, body.Length);
var buffer = new byte[bufferSize];
var count = body.Read(buffer);
_ = body.Seek(0, SeekOrigin.Begin);
var bodyText = Encoding.UTF8.GetString(buffer);

// var tc = context.InstanceServices.GetServices<TelemetryClient>().FirstOrDefault() ?? throw new Exception("No Telemtry client");


// var requestTelemetry = context.Features.Get<RequestTelemetry>() ??
// throw new Exception("No request telemetry");

// var requestTelemetry = context.InstanceServices.Get<ITelemetryInitializer>() ??
// throw new Exception("No request telemetry");
requestTelemetry.Properties.Add("RequestBody", bodyText);
}

await next(context);
}
}
}

//Move out expensive resources into separate class, and add those as Singleton
// ArmClient, Table Client(s), Queue Client(s), HttpClient, etc.
public static async Async.Task Main() {
var configuration = new ServiceConfiguration();


using var host =
new HostBuilder()

Expand Down Expand Up @@ -198,6 +278,7 @@ public static async Async.Task Main() {
.AddScoped<INodeMessageOperations, NodeMessageOperations>()
.AddScoped<ISubnet, Subnet>()
.AddScoped<IAutoScaleOperations, AutoScaleOperations>()
.AddScoped<IOnefuzzRequestStore, OnefuzzRequestStore>()
.AddSingleton<GraphServiceClient>(new GraphServiceClient(new DefaultAzureCredential()))
.AddSingleton<DependencyTrackingTelemetryModule>()
.AddSingleton<ICreds, Creds>()
Expand All @@ -211,13 +292,13 @@ public static async Async.Task Main() {
_ = services.AddFeatureManagement();
})
.ConfigureLogging(loggingBuilder => {

var appInsightsConnectionString = $"InstrumentationKey={configuration.ApplicationInsightsInstrumentationKey}";
var tc = new ApplicationInsights.TelemetryClient(new ApplicationInsights.Extensibility.TelemetryConfiguration() { ConnectionString = appInsightsConnectionString });
loggingBuilder.Services.TryAddEnumerable(ServiceDescriptor.Singleton<ILoggerProvider, OneFuzzLoggerProvider>(
x => {
var appInsightsConnectionString = $"InstrumentationKey={configuration.ApplicationInsightsInstrumentationKey}";
var tc = new ApplicationInsights.TelemetryClient(new ApplicationInsights.Extensibility.TelemetryConfiguration() { ConnectionString = appInsightsConnectionString });
return new OneFuzzLoggerProvider(new List<TelemetryConfig>() { new TelemetryConfig(tc) });
}
));
x => new OneFuzzLoggerProvider(new List<TelemetryConfig>() { new TelemetryConfig(tc) })));

// loggingBuilder.Services.AddScoped(typeof(Telemetry))
})
.ConfigureFunctionsWorkerDefaults(builder => {
builder.UseMiddleware<LoggingMiddleware>();
Expand All @@ -227,15 +308,21 @@ public static async Async.Task Main() {

//this is a must, to tell the host that worker logging is done by us
builder.Services.Configure<WorkerOptions>(workerOptions => workerOptions.Capabilities["WorkerApplicationInsightsLoggingEnabled"] = bool.TrueString);
builder.Services.TryAddEnumerable(ServiceDescriptor.Singleton<ITelemetryInitializer, OnefuzzTelemetryInitializer>());
builder.AddApplicationInsights(options => {
#if DEBUG
options.DeveloperMode = true;
#else
options.DeveloperMode = false;
#endif
options.EnableDependencyTrackingTelemetryModule = true;

});
builder.AddApplicationInsightsLogger();
// builder.UseMiddleware<RequestBodyLogger>();

// builder.Services.AddSingleton<IAsyncTelemetryInitializer, AsyncTelemetryInitializer>();


})
.Build();
Expand Down
1 change: 1 addition & 0 deletions src/ApiService/ApiService/onefuzzlib/JobOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public async Async.Task<Job> Init(Job job) {
}

public async Async.Task<Job> Stopping(Job job) {
_logTracer.LogInformation("Stopping job: {JobId} {StackTrace}", job.JobId, Environment.StackTrace);
job = job with { State = JobState.Stopping };
var tasks = await _context.TaskOperations.QueryAsync(Query.PartitionKey(job.JobId.ToString())).ToListAsync();
var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-task/src/tasks/report/dotnet/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl AsanProcessor {
let mut args = vec![target_exe];
args.extend(self.config.target_options.clone());

let expand = self.config.get_expand();
let expand = self.config.get_expand().input_path(input);

let expanded_args = expand.evaluate(&args)?;

Expand Down
12 changes: 10 additions & 2 deletions src/cli/onefuzz/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ def wait(func: Callable[[], Tuple[bool, str, A]], frequency: float = 1.0) -> A:
Provides user feedback via a spinner if stdout is a TTY.
"""

isatty = sys.stdout.isatty()
isatty = True
frames = ["-", "\\", "|", "/"]
waited = False
last_message = None
Expand All @@ -623,20 +623,28 @@ def wait(func: Callable[[], Tuple[bool, str, A]], frequency: float = 1.0) -> A:
if isatty:
if last_message:
if last_message == message:
LOGGER.info("\b" * (len(last_message) + 2))
sys.stdout.write("\b" * (len(last_message) + 2))
else:
LOGGER.info("\n")
sys.stdout.write("\n")
LOGGER.info("%s %s" % (frames[0], message))
sys.stdout.write("%s %s" % (frames[0], message))
sys.stdout.flush()
elif last_message != message:
print(message, flush=True)
LOGGER.info(message)

last_message = message
waited = True
time.sleep(frequency)
frames.sort(key=frames[0].__eq__)
except Exception as err:
LOGGER.info(f"*** error in wait : {err}")
print(f"*** error in wait : {err}", flush=True)
raise err
finally:
if waited and isatty:
print(flush=True)

LOGGER.info(f"wait result: {result}")
return result[2]
19 changes: 16 additions & 3 deletions src/integration-tests/integration-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

import requests
import yaml
from onefuzztypes.enums import OS, ContainerType, ScalesetState, TaskState, VmState
from onefuzztypes.enums import OS, ContainerType, ErrorCode, ScalesetState, TaskState, VmState
from onefuzztypes.models import Job, Pool, Repro, Scaleset, Task
from onefuzztypes.primitives import Container, Directory, File, PoolName, Region
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -787,6 +787,9 @@ def check_task(

# check if the task itself has an error
if task.error is not None:
if task.error == ErrorCode.TASK_CANCELLED:
return TaskTestState.stopped

self.logger.error(
"task failed: %s - %s (%s) - %s",
job.config.name,
Expand Down Expand Up @@ -861,6 +864,7 @@ def check_jobs_impl() -> Tuple[bool, str, bool]:
job_task_states: Dict[UUID, Set[TaskTestState]] = {}

if datetime.datetime.utcnow() - start > timeout:
self.logger.info("timed out while checking jobs")
return (True, "timed out while checking jobs", False)

for job_id in check_containers:
Expand Down Expand Up @@ -944,7 +948,11 @@ def check_jobs_impl() -> Tuple[bool, str, bool]:

for job_id in to_remove:
if stop_on_complete_check:
self.stop_job(jobs[job_id])
try:
self.stop_job(jobs[job_id])
except Exception as err:
self.logger.error("unable to stop job: %s", err)
return (True, str(err), False)
del jobs[job_id]

msg = "waiting on: %s" % ",".join(
Expand Down Expand Up @@ -1311,7 +1319,7 @@ def check_jobs(
job_ids=job_ids,
)
if not result:
raise Exception("jobs failed")
raise Exception("jobs failed !!!!!")

def setup(
self,
Expand Down Expand Up @@ -1432,6 +1440,11 @@ def check_results(
test_id: UUID,
job_ids: List[UUID] = [],
) -> None:
if job_ids:
self.logger.info(f"checking results for jobs: {job_ids}", )
else:
self.logger.info("checking results for all jobs")

self.check_jobs(
test_id,
endpoint=endpoint,
Expand Down
Loading