Skip to content

Commit

Permalink
✨ Add reporting jobs for scoring queries MRNs. Keep the current repor…
Browse files Browse the repository at this point in the history
…ting jobs by code id as well so for a brief amount we will have twice the amount of reporting jobs.

Signed-off-by: Preslav <preslav@mondoo.com>
  • Loading branch information
preslavgerchev committed Feb 23, 2024
1 parent 4df2afd commit 12edfec
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 66 deletions.
4 changes: 3 additions & 1 deletion cli/reporter/print_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,9 @@ func (r *defaultReporter) printAssetQueries(resolved *policy.ResolvedPolicy, rep
for id, score := range foundChecks {
query, ok := queries[id]
if !ok {
r.out.Write([]byte("Couldn't find any queries for incoming value for " + id))
// FIXME SOON: temporarily disable this as we're reporting by both mrn and code id.
// FIXME SOON: enable once we report only by mrn
// r.out.Write([]byte("Couldn't find any queries for incoming value for " + id))
continue
}

Expand Down
106 changes: 66 additions & 40 deletions policy/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,39 +1003,56 @@ func (s *LocalServices) policyGroupToJobs(ctx context.Context, group *PolicyGrou
}

func (cache *policyResolverCache) addCheckJob(ctx context.Context, check *explorer.Mquery, impact *explorer.Impact, ownerJob *ReportingJob) {
uuid := cache.global.relativeChecksum(check.Checksum)
queryJob := cache.global.reportingJobsByUUID[uuid]
// FIXME SOON: we want to add a RJ that goes by the check's mrn and check's codeid
// we use the relative checksums as uuids and the RJ's ids are either the check's mrn or codeid
uuidCodeId := cache.global.relativeChecksum(check.Checksum)
uuidMrn := cache.global.relativeChecksum(check.Mrn)

uuidToRjMap := map[string]struct {
QrId string
Rj *ReportingJob
}{
uuidCodeId: {check.CodeId, cache.global.reportingJobsByUUID[uuidCodeId]},
uuidMrn: {check.Mrn, cache.global.reportingJobsByUUID[uuidMrn]},
}

cache.global.codeIdToMrn[check.CodeId] = append(cache.global.codeIdToMrn[check.CodeId], check.Mrn)

for uuid, rjAndId := range uuidToRjMap {
rj := rjAndId.Rj
qrId := rjAndId.QrId

if rj == nil {
rj = &ReportingJob{
Uuid: uuid,
QrId: qrId,
ChildJobs: map[string]*explorer.Impact{},
Datapoints: map[string]bool{},
Type: ReportingJob_CHECK,
}

if queryJob == nil {
queryJob = &ReportingJob{
Uuid: uuid,
QrId: check.Mrn,
ChildJobs: map[string]*explorer.Impact{},
Datapoints: map[string]bool{},
Type: ReportingJob_CHECK,
cache.global.reportingJobsByUUID[uuid] = rj
cache.global.reportingJobsByMsum[check.Checksum] = append(cache.global.reportingJobsByMsum[check.Checksum], rj)
cache.childJobsByMrn[check.Mrn] = append(cache.childJobsByMrn[check.Mrn], rj)
}
cache.global.codeIdToMrn[check.CodeId] = append(cache.global.codeIdToMrn[check.CodeId], check.Mrn)
cache.global.reportingJobsByUUID[uuid] = queryJob
cache.global.reportingJobsByMsum[check.Checksum] = append(cache.global.reportingJobsByMsum[check.Checksum], queryJob)
cache.childJobsByMrn[check.Mrn] = append(cache.childJobsByMrn[check.Mrn], queryJob)
}

if ownerJob.ChildJobs[queryJob.Uuid] == nil {
ownerJob.ChildJobs[queryJob.Uuid] = impact
}
if ownerJob.ChildJobs[rj.Uuid] == nil {
ownerJob.ChildJobs[rj.Uuid] = impact
}

// local aspects for the resolved policy
queryJob.Notify = append(queryJob.Notify, ownerJob.Uuid)
// local aspects for the resolved policy
rj.Notify = append(rj.Notify, ownerJob.Uuid)

if len(check.Variants) != 0 {
err := cache.addCheckJobVariants(ctx, check, queryJob)
if err != nil {
log.Error().Err(err).Str("checkMrn", check.Mrn).Msg("failed to add data query variants")
if len(check.Variants) != 0 {
err := cache.addCheckJobVariants(ctx, check, rj)
if err != nil {
log.Error().Err(err).Str("checkMrn", check.Mrn).Msg("failed to add data query variants")
}
} else {
// we set a placeholder for the execution query, just to indicate it will be added
cache.global.executionQueries[check.Checksum] = nil
cache.global.queriesByMsum[check.Checksum] = check
}
} else {
// we set a placeholder for the execution query, just to indicate it will be added
cache.global.executionQueries[check.Checksum] = nil
cache.global.queriesByMsum[check.Checksum] = check
}
}

Expand Down Expand Up @@ -1075,6 +1092,7 @@ func (cache *policyResolverCache) addCheckJobVariants(ctx context.Context, query
func (cache *policyResolverCache) addDataQueryJob(ctx context.Context, query *explorer.Mquery, ownerJob *ReportingJob) {
uuid := cache.global.relativeChecksum(query.Mrn)
queryJob := cache.global.reportingJobsByUUID[uuid]
cache.global.codeIdToMrn[query.CodeId] = append(cache.global.codeIdToMrn[query.CodeId], query.Mrn)

if queryJob == nil {
queryJob = &ReportingJob{
Expand All @@ -1086,7 +1104,6 @@ func (cache *policyResolverCache) addDataQueryJob(ctx context.Context, query *ex
// FIXME: DEPRECATED, remove in v10.0 vv
DeprecatedV8IsData: true,
}
cache.global.codeIdToMrn[query.CodeId] = append(cache.global.codeIdToMrn[query.CodeId], query.Mrn)
cache.global.reportingJobsByUUID[uuid] = queryJob
cache.global.reportingJobsByMsum[query.Checksum] = append(cache.global.reportingJobsByMsum[query.Checksum], queryJob)
cache.childJobsByMrn[query.Mrn] = append(cache.childJobsByMrn[query.Mrn], queryJob)
Expand Down Expand Up @@ -1229,6 +1246,9 @@ func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cac
}
cache.executionQueries[checksum] = executionQuery
executionJob.Queries[prop.CodeId] = executionQuery
// TODO: unsure if this needs to go in here.. some code uses the executionJob.Queries for different assertions
// so maybe we want it in here as well?
executionJob.Queries[prop.Mrn] = executionQuery

propTypes[name] = &llx.Primitive{Type: prop.Type}
propToChecksums[name] = dataChecksum
Expand Down Expand Up @@ -1264,7 +1284,9 @@ func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cac

cache.executionQueries[checksum] = executionQuery
executionJob.Queries[codeID] = executionQuery

// TODO: unsure if this needs to go in here.. some code uses the executionJob.Queries for different assertions
// so maybe we want it in here as well?
executionJob.Queries[query.Mrn] = executionQuery
// Scoring+Data Queries handling
reportingjobs, ok := cache.reportingJobsByMsum[query.Checksum]
if !ok {
Expand All @@ -1281,8 +1303,6 @@ func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cac
rj := reportingjobs[i]
// (2) Scoring Queries handling
if !isDataQuery {
rj.QrId = codeID

if query.Impact != nil {
for _, parentID := range rj.Notify {
parentJob, ok := collectorJob.ReportingJobs[parentID]
Expand All @@ -1294,10 +1314,10 @@ func (s *LocalServices) jobsToQueries(ctx context.Context, policyMrn string, cac
}
}

arr, ok := collectorJob.ReportingQueries[codeID]
arr, ok := collectorJob.ReportingQueries[rj.QrId]
if !ok {
arr = &StringArray{}
collectorJob.ReportingQueries[codeID] = arr
collectorJob.ReportingQueries[rj.QrId] = arr
}
arr.Items = append(arr.Items, rj.Uuid)

Expand Down Expand Up @@ -1562,16 +1582,22 @@ func (s *LocalServices) jobsToControls(cache *frameworkResolverCache, framework

for _, queryMrn := range queryMrns {
uuid := cache.relativeChecksum(queryMrn)
queryJob := &ReportingJob{
Uuid: uuid,
QrId: queryMrn,
ChildJobs: map[string]*explorer.Impact{},
Type: ReportingJob_CHECK,
queryJob := job.ReportingJobs[uuid]
// now that we also have RJs for mrns, we should always have a queryJob
// fallback to creating a new one if we don't find it just in case
if queryJob == nil {
log.Debug().Str("queryMrn", queryMrn).Msg("jobsToControl> did not find a query job")
queryJob = &ReportingJob{
Uuid: uuid,
QrId: queryMrn,
ChildJobs: map[string]*explorer.Impact{},
Type: ReportingJob_CHECK,
}
}
nuJobs[uuid] = queryJob

queryJob.ChildJobs[rj.Uuid] = nil
rj.Notify = append(rj.Notify, queryJob.Uuid)
nuJobs[queryJob.Uuid] = queryJob

continue
}

Expand Down
74 changes: 49 additions & 25 deletions policy/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,25 @@ policies:
})
require.NoError(t, err)
require.NotNil(t, rp)
require.Len(t, rp.ExecutionJob.Queries, 3)
require.Len(t, rp.ExecutionJob.Queries, 6)
require.Len(t, rp.Filters, 1)
require.Len(t, rp.CollectorJob.ReportingJobs, 3)
require.Len(t, rp.CollectorJob.ReportingJobs, 4)

qrIdToRj := map[string]*policy.ReportingJob{}
for _, rj := range rp.CollectorJob.ReportingJobs {
qrIdToRj[rj.QrId] = rj
}
// scoring queries report by code id
// FIXME SOON: scoring queries report by BOTH code id and MRN temporarily
// FIXME SOON: we want to remove the code id reporting soon
require.NotNil(t, qrIdToRj[b.Queries[1].CodeId])
require.NotNil(t, qrIdToRj[b.Queries[1].Mrn])

// data queries report by mrn
require.NotNil(t, qrIdToRj[queryMrn("query1")])

require.Len(t, qrIdToRj[b.Queries[1].CodeId].Datapoints, 3)
require.Len(t, qrIdToRj[b.Queries[1].Mrn].Datapoints, 3)

require.Len(t, qrIdToRj[queryMrn("query1")].Datapoints, 1)
})

Expand Down Expand Up @@ -231,7 +236,18 @@ policies:
})
require.NoError(t, err)
require.NotNil(t, rp)
require.Len(t, rp.CollectorJob.ReportingJobs, 5)
require.Len(t, rp.CollectorJob.ReportingJobs, 6)

qrIdToRj := map[string]*policy.ReportingJob{}
for _, rj := range rp.CollectorJob.ReportingJobs {
qrIdToRj[rj.QrId] = rj
}

// FIXME SOON: scoring queries report by BOTH code id and MRN temporarily
// FIXME SOON: we want to remove the code id reporting soon
require.NotNil(t, qrIdToRj[b.Queries[1].CodeId])
require.NotNil(t, qrIdToRj[b.Queries[1].Mrn])

ignoreJob := rp.CollectorJob.ReportingJobs["8Sis0SvMbtI="]
require.NotNil(t, ignoreJob)
childJob := ignoreJob.ChildJobs["YCeU4NjbMe0="]
Expand Down Expand Up @@ -307,12 +323,7 @@ policies:

require.NoError(t, err)
require.NotNil(t, rp)
require.Len(t, rp.CollectorJob.ReportingJobs, 3)

mrnToQueryId := map[string]string{}
for _, q := range bundleMap.Queries {
mrnToQueryId[q.Mrn] = q.CodeId
}
require.Len(t, rp.CollectorJob.ReportingJobs, 4)

rjTester := frameworkReportingJobTester{
t: t,
Expand All @@ -328,13 +339,21 @@ policies:
rjTester.queryIdToReportingJob[rj.QrId] = rj
}

queryRj := rjTester.queryIdToReportingJob[mrnToQueryId[queryMrn("check1")]]
// we ensure that even though ignored, theres an RJ for the query
require.NotNil(t, queryRj)
parent := queryRj.Notify[0]
parentRj := rjTester.rjIdToReportingJob[parent]
require.NotNil(t, parentRj)
require.Equal(t, explorer.ScoringSystem_IGNORE_SCORE, parentRj.ChildJobs[queryRj.Uuid].Scoring)
query := bundleMap.Queries[queryMrn("check1")]

// FIXME SOON: scoring queries report by BOTH code id and MRN temporarily
// FIXME SOON: we want to remove the code id reporting soon
queryRj1 := rjTester.queryIdToReportingJob[query.CodeId]
queryRj2 := rjTester.queryIdToReportingJob[query.Mrn]
queries := []*policy.ReportingJob{queryRj1, queryRj2}
for _, queryRj := range queries {
// we ensure that even though ignored, theres an RJ for the query
require.NotNil(t, queryRj)
parent := queryRj.Notify[0]
parentRj := rjTester.rjIdToReportingJob[parent]
require.NotNil(t, parentRj)
require.Equal(t, explorer.ScoringSystem_IGNORE_SCORE, parentRj.ChildJobs[queryRj.Uuid].Scoring)
}
}

func TestResolve_ExpiredGroups(t *testing.T) {
Expand Down Expand Up @@ -379,7 +398,7 @@ policies:
})
require.NoError(t, err)
require.NotNil(t, rp)
require.Len(t, rp.ExecutionJob.Queries, 2)
require.Len(t, rp.ExecutionJob.Queries, 4)
})

t.Run("resolve with end dates", func(t *testing.T) {
Expand Down Expand Up @@ -418,7 +437,7 @@ policies:
})
require.NoError(t, err)
require.NotNil(t, rp)
require.Len(t, rp.ExecutionJob.Queries, 1)
require.Len(t, rp.ExecutionJob.Queries, 2)

// Set the end date of the group to the past. This group deactivates a check,
// but it should not be taken into account because it is expired
Expand All @@ -439,7 +458,7 @@ policies:
})
require.NoError(t, err)
require.NotNil(t, rp)
require.Len(t, rp.ExecutionJob.Queries, 2)
require.Len(t, rp.ExecutionJob.Queries, 4)
})
}

Expand Down Expand Up @@ -571,7 +590,7 @@ framework_maps:
requireUnique(t, rj.Notify)
}

require.Len(t, rp.ExecutionJob.Queries, 5)
require.Len(t, rp.ExecutionJob.Queries, 10)

rjTester := frameworkReportingJobTester{
t: t,
Expand Down Expand Up @@ -710,7 +729,7 @@ framework_maps:
requireUnique(t, rj.Notify)
}

require.Len(t, rp.ExecutionJob.Queries, 1)
require.Len(t, rp.ExecutionJob.Queries, 2)

rjTester := frameworkReportingJobTester{
t: t,
Expand Down Expand Up @@ -823,7 +842,7 @@ framework_maps:
requireUnique(t, rj.Notify)
}

require.Len(t, rp.ExecutionJob.Queries, 1)
require.Len(t, rp.ExecutionJob.Queries, 2)

rjTester := frameworkReportingJobTester{
t: t,
Expand Down Expand Up @@ -1352,12 +1371,18 @@ queries:
require.NoError(t, err)
require.NotNil(t, rp)

require.Len(t, rp.CollectorJob.ReportingJobs, 5)
require.Len(t, rp.CollectorJob.ReportingJobs, 6)

qrIdToRj := map[string]*policy.ReportingJob{}
for _, rj := range rp.CollectorJob.ReportingJobs {
qrIdToRj[rj.QrId] = rj
}

// FIXME SOON: scoring queries report by BOTH code id and MRN temporarily
// FIXME SOON: we want to remove the code id reporting soon
require.NotNil(t, qrIdToRj[b.Queries[0].Mrn])
require.NotNil(t, qrIdToRj[b.Queries[0].CodeId])

require.NotNil(t, qrIdToRj[policyMrn("policy1")])
require.NotNil(t, qrIdToRj[policyMrn("pack1")])
require.Nil(t, qrIdToRj[policyMrn("policy2")])
Expand Down Expand Up @@ -1403,5 +1428,4 @@ queries:
qrIdToRj[rj.QrId] = rj
}
require.NotNil(t, qrIdToRj["root"])

}

0 comments on commit 12edfec

Please sign in to comment.