Skip to content

Commit

Permalink
Add second trial run to OCR2 feature test
Browse files Browse the repository at this point in the history
Currently failing, even when ChainReader disabled. Initial values
reported by data sources are all 10. Then later it changes to all
20. LatestValue shows up correctly, but reading from contract the
median never changes from its original value.
  • Loading branch information
reductionista committed Dec 19, 2023
1 parent 9d315cb commit 1530c04
Showing 1 changed file with 89 additions and 63 deletions.
152 changes: 89 additions & 63 deletions core/internal/features/ocr2/features_ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,23 @@ fromBlock = %d
// latestAnswer:30
var metaLock sync.Mutex
expectedMeta := map[string]struct{}{
"0": {}, "10": {}, "20": {}, "30": {}, "40": {}, "50": {}, "60": {}, "70": {},
"0": {}, "10": {}, "20": {}, "30": {},
}
returnData := int(10)
for i := 0; i < 4; i++ {
s := i
require.NoError(t, apps[i].Start(testutils.Context(t)))

// API speed is > observation timeout set in ContractSetConfigArgsForIntegrationTest
slowServers[i] = httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
time.Sleep(5 * time.Second)
var result string
metaLock.Lock()
result = fmt.Sprintf(`{"data":%d}`, returnData)
metaLock.Unlock()
res.WriteHeader(http.StatusOK)
_, err := res.Write([]byte(`{"data":10}`))
t.Logf("Slow Bridge %d returning data:10", s)
_, err := res.Write([]byte(result))
require.NoError(t, err)
}))
t.Cleanup(slowServers[s].Close)
Expand All @@ -283,13 +289,18 @@ fromBlock = %d
require.NoError(t, err)
var m bridges.BridgeMetaDataJSON
require.NoError(t, json.Unmarshal(b, &m))
var result string
metaLock.Lock()
result = fmt.Sprintf(`{"data":%d}`, returnData)
metaLock.Unlock()
if m.Meta.LatestAnswer != nil && m.Meta.UpdatedAt != nil {
t.Logf("Bridge %d deleting %s, from request body: %s", s, m.Meta.LatestAnswer, b)
metaLock.Lock()
delete(expectedMeta, m.Meta.LatestAnswer.String())
metaLock.Unlock()
}
res.WriteHeader(http.StatusOK)
_, err = res.Write([]byte(`{"data":10}`))
_, err = res.Write([]byte(result))
require.NoError(t, err)
}))
t.Cleanup(servers[s].Close)
Expand Down Expand Up @@ -359,73 +370,88 @@ juelsPerFeeCoinSource = """
jids = append(jids, ocrJob.ID)
}

// Assert that all the OCR jobs get a run with valid values eventually.
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
ic := i
wg.Add(1)
go func() {
defer wg.Done()
// Want at least 2 runs so we see all the metadata.
pr := cltest.WaitForPipelineComplete(t, ic, jids[ic], 4, 14, apps[ic].JobORM(), 2*time.Minute, 5*time.Second)
jb, err := pr[0].Outputs.MarshalJSON()
for trial := 0; trial < 2; trial++ {
var retVal int

metaLock.Lock()
returnData = 10 * (trial + 1)
retVal = returnData
for i := 0; i < 4; i++ {
expectedMeta[fmt.Sprintf("%d", returnData*i)] = struct{}{}
}
metaLock.Unlock()

// Assert that all the OCR jobs get a run with valid values eventually.
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
ic := i
wg.Add(1)
go func() {
defer wg.Done()
completedRuns, err := apps[ic].JobORM().FindPipelineRunIDsByJobID(jids[ic], 0, 1000)
require.NoError(t, err)
// Want at least 2 runs so we see all the metadata.
pr := cltest.WaitForPipelineComplete(t, ic, jids[ic], len(completedRuns)+2, 7, apps[ic].JobORM(), 2*time.Minute, 5*time.Second)
jb, err := pr[0].Outputs.MarshalJSON()
require.NoError(t, err)
assert.Equal(t, []byte(fmt.Sprintf("[\"%d\"]", retVal*ic)), jb, "pr[0] %+v pr[1] %+v", pr[0], pr[1])
require.NoError(t, err)
}()
}
wg.Wait()

// Trail #1: 4 oracles reporting 0, 10, 20, 30. Answer should be 20 (results[4/2]).
// Trial #2: 4 oracles reporting 0, 20, 40, 60. Answer should be 40 (results[4/2]).
gomega.NewGomegaWithT(t).Eventually(func() string {
answer, err := ocrContract.LatestAnswer(nil)
require.NoError(t, err)
assert.Equal(t, []byte(fmt.Sprintf("[\"%d\"]", 10*ic)), jb, "pr[0] %+v pr[1] %+v", pr[0], pr[1])
return answer.String()
}, 1*time.Minute, 200*time.Millisecond).Should(gomega.Equal(fmt.Sprintf("%d", 2*retVal)))

for _, app := range apps {
jobs, _, err := app.JobORM().FindJobs(0, 1000)
require.NoError(t, err)
}()
}
wg.Wait()
// No spec errors
for _, j := range jobs {
ignore := 0
for i := range j.JobSpecErrors {
// Non-fatal timing related error, ignore for testing.
if strings.Contains(j.JobSpecErrors[i].Description, "leader's phase conflicts tGrace timeout") {
ignore++
}
}
require.Len(t, j.JobSpecErrors, ignore)
}
}
em := map[string]struct{}{}
metaLock.Lock()
maps.Copy(em, expectedMeta)
metaLock.Unlock()
assert.Len(t, em, 0, "expected metadata %v", em)

// 4 oracles reporting 0, 10, 20, 30. Answer should be 20 (results[4/2]).
gomega.NewGomegaWithT(t).Eventually(func() string {
answer, err := ocrContract.LatestAnswer(nil)
// Assert we can read the latest config digest and epoch after a report has been submitted.
contractABI, err := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorABI))
require.NoError(t, err)
return answer.String()
}, 1*time.Minute, 200*time.Millisecond).Should(gomega.Equal("60"))

for _, app := range apps {
jobs, _, err := app.JobORM().FindJobs(0, 1000)
apps[0].GetRelayers().LegacyEVMChains().Slice()
ct, err := evm.NewOCRContractTransmitter(ocrContractAddress, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].Client(), contractABI, nil, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].LogPoller(), lggr, nil)
require.NoError(t, err)
// No spec errors
for _, j := range jobs {
ignore := 0
for i := range j.JobSpecErrors {
// Non-fatal timing related error, ignore for testing.
if strings.Contains(j.JobSpecErrors[i].Description, "leader's phase conflicts tGrace timeout") {
ignore++
}
}
require.Len(t, j.JobSpecErrors, ignore)
configDigest, epoch, err := ct.LatestConfigDigestAndEpoch(testutils.Context(t))
require.NoError(t, err)
details, err := ocrContract.LatestConfigDetails(nil)
require.NoError(t, err)
assert.True(t, bytes.Equal(configDigest[:], details.ConfigDigest[:]))
digestAndEpoch, err := ocrContract.LatestConfigDigestAndEpoch(nil)
require.NoError(t, err)
assert.Equal(t, digestAndEpoch.Epoch, epoch)
latestTransmissionDetails, err := ocrContract.LatestTransmissionDetails(nil)
require.NoError(t, err)
assert.Equal(t, latestTransmissionDetails.LatestAnswer, big.NewInt(20))
latestRoundRequested, err := ocrContract.FilterRoundRequested(&bind.FilterOpts{Start: 0, End: nil}, []common.Address{{}})
require.NoError(t, err)
if latestRoundRequested.Next() { // Shouldn't this come back non-nil?
assert.NotNil(t, latestRoundRequested.Event)
}
}
em := map[string]struct{}{}
metaLock.Lock()
maps.Copy(em, expectedMeta)
metaLock.Unlock()
assert.Len(t, em, 0, "expected metadata %v", em)

// Assert we can read the latest config digest and epoch after a report has been submitted.
contractABI, err := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorABI))
require.NoError(t, err)
apps[0].GetRelayers().LegacyEVMChains().Slice()
ct, err := evm.NewOCRContractTransmitter(ocrContractAddress, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].Client(), contractABI, nil, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].LogPoller(), lggr, nil)
require.NoError(t, err)
configDigest, epoch, err := ct.LatestConfigDigestAndEpoch(testutils.Context(t))
require.NoError(t, err)
details, err := ocrContract.LatestConfigDetails(nil)
require.NoError(t, err)
assert.True(t, bytes.Equal(configDigest[:], details.ConfigDigest[:]))
digestAndEpoch, err := ocrContract.LatestConfigDigestAndEpoch(nil)
require.NoError(t, err)
assert.Equal(t, digestAndEpoch.Epoch, epoch)
latestTransmissionDetails, err := ocrContract.LatestTransmissionDetails(nil)
require.NoError(t, err)
assert.Equal(t, latestTransmissionDetails.LatestAnswer, big.NewInt(20))
latestRoundRequested, err := ocrContract.FilterRoundRequested(&bind.FilterOpts{Start: 0, End: nil}, []common.Address{{}})
require.NoError(t, err)
if latestRoundRequested.Next() { // Shouldn't this come back non-nil?
assert.NotNil(t, latestRoundRequested.Event)
}
})
}
}
Expand Down

0 comments on commit 1530c04

Please sign in to comment.