diff --git a/.env copy b/.env copy deleted file mode 100644 index 33cc4460..00000000 --- a/.env copy +++ /dev/null @@ -1,17 +0,0 @@ -# A web3 endpoint provider -# WEB3_PROVIDERS=https://eth.llamarpc.com,https://eth-goerli.api.onfinality.io/public -WEB3_PROVIDERS=https://mainnet.infura.io/v3/1a6832aff39441b4b5aced8fa6f8d696,https://goerli.infura.io/v3/b5825b1fbf1e4a828cc385de83b9dc7e - -# Internal port for the service (80 and 443 are used by traefik) -PORT=7788 - -# Domain name for TLS -# DOMAIN=your.own.domain.xyz -DOMAIN=localhost - -# Log level (info, debug, warning, error) -LOGLEVEL=debug - -# IPFS connect key for discovering nodes -# CONNECT_KEY=yourIPFSConnectKey -CONNECT_KEY=census3key \ No newline at end of file diff --git a/queue/queue_test.go b/queue/queue_test.go index f0230a3d..a4abe608 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -1,8 +1,13 @@ package queue import ( + "context" + "flag" "fmt" + "sync" + "sync/atomic" "testing" + "time" qt "github.com/frankban/quicktest" ) @@ -44,3 +49,129 @@ func TestUpdateDone(t *testing.T) { c.Assert(err, qt.IsNil) c.Assert(done, qt.IsFalse) } + +func TestQueueDataRace(t *testing.T) { + nConsumers := flag.Int("consumers", 100, "number of processes consumers") + flag.Parse() + c := qt.New(t) + // initialize some variables to store the results of the test and a queue + var nProcesses int64 + queueItemIdChan := make(chan string) + q := NewBackgroundQueue() + // set a context with the test deadline, decreaseing by 5 seconds to ensure + // a gap to check test results + deadline, ok := t.Deadline() + c.Assert(ok, qt.IsTrue) + ctx, cancel := context.WithDeadline(context.Background(), deadline.Add(-5*time.Second)) + defer cancel() + // launch producers + producersWg := new(sync.WaitGroup) + producersWg.Add(1) + go func() { + defer producersWg.Done() + for { + select { + case <-ctx.Done(): + return + default: + atomic.AddInt64(&nProcesses, 1) + queueItemIdChan <- q.Enqueue() + time.Sleep(time.Millisecond * 500) + } + } + }() + // create and lunch consumers + var asyncErrors sync.Map + updatersWg := new(sync.WaitGroup) + for i := 0; i < *nConsumers; i++ { + updatersWg.Add(1) + go func() { + defer updatersWg.Done() + for { + select { + case <-ctx.Done(): + return + case queueItemId, ok := <-queueItemIdChan: + // wait for a new id + if !ok { + return + } + // if not exists create an error + exists, done, data, err := q.Done(queueItemId) + if !exists { + asyncErrors.Store(queueItemId, fmt.Errorf("expected queue item not found during done check")) + continue + } + // if it is not done, update it to done + if !done { + // if this actions fails create an error + if !q.Update(queueItemId, true, data, err) { + asyncErrors.Store(queueItemId, fmt.Errorf("expected queue item not found during update")) + continue + } + } + // resend it through the channel + queueItemIdChan <- queueItemId + } + } + }() + } + // create and lunch consumers + dequeuersWg := new(sync.WaitGroup) + for i := 0; i < *nConsumers; i++ { + dequeuersWg.Add(1) + go func() { + defer dequeuersWg.Done() + for { + select { + case <-ctx.Done(): + return + case queueItemId, ok := <-queueItemIdChan: + // wait for a new id + if !ok { + return + } + // if not exists create an error + exists, done, _, _ := q.Done(queueItemId) + if !exists { + asyncErrors.Store(queueItemId, fmt.Errorf("expected queue item not found during done check")) + continue + } + // if it is done, remove it from the queue, and if this action + // fails, create an error; unless create a nil error + if done { + if !q.Dequeue(queueItemId) { + asyncErrors.Store(queueItemId, fmt.Errorf("expected queue item not found during update")) + } else { + asyncErrors.Store(queueItemId, nil) + } + continue + } + // if it is not done, resend it through the channel + queueItemIdChan <- queueItemId + } + } + }() + } + // wait until goroutines finish + producersWg.Wait() + updatersWg.Wait() + dequeuersWg.Wait() + // check completed processes errors (nil or not) + completed := []error{} + asyncErrors.Range(func(key, value any) bool { + if err, ok := value.(error); ok { + completed = append(completed, err) + } else { + completed = append(completed, nil) + } + return true + }) + // assert number of completed processes + c.Assert(int64(len(completed)), qt.Equals, nProcesses) + // assert that every error is nil + for _, err := range completed { + c.Assert(err, qt.IsNil) + } + t.Logf("Completed with %d processes created!", nProcesses) +}