From 90735255fa470aa5fa9b9520348ce54daae02bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Men=C3=A9ndez?= Date: Fri, 8 Sep 2023 12:38:13 +0200 Subject: [PATCH] Feature/async census creation (#70) * now when the user tries to create a new census, it is included into the database and generated in background, a new census column has ben created on the database called 'published' that indicates if the current census is built and published on IPFS * new queue for track ongoing censuses and new endpoint to get the status of a ongoing census. The processes are purged from the queue when they are finished and someone checks them. * new queue map attribute to store metadata, now when the user request the creation of a census that is already created, it is returned through the queue * data race test based on go test timeout flag --- .golangci.yml | 4 +- api/README.md | 34 ++++- api/api.go | 11 +- api/censuses.go | 241 +++++++++++++++++++++++---------- api/const.go | 7 + api/errors.go | 22 ++- api/strategy.go | 31 ++--- api/tokens.go | 51 +++---- api/types.go | 8 +- census/census_test.go | 10 +- db/migrations/0001_census3.sql | 12 +- db/queries/censuses.sql | 17 ++- db/sqlc/censuses.sql.go | 70 +++++++--- db/sqlc/models.go | 5 +- letsencrypt/acme.json | 0 queue/queue.go | 95 +++++++++++++ queue/queue_test.go | 177 ++++++++++++++++++++++++ 17 files changed, 625 insertions(+), 170 deletions(-) create mode 100644 api/const.go create mode 100644 letsencrypt/acme.json create mode 100644 queue/queue.go create mode 100644 queue/queue_test.go diff --git a/.golangci.yml b/.golangci.yml index 9a0b0047..eadcabd2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,3 +1,5 @@ +run: + go: '1.20' issues: max-same-issues: 0 exclude-use-default: false @@ -16,4 +18,4 @@ linters: linters-settings: lll: - line-length: 130 + line-length: 130 \ No newline at end of file diff --git a/api/README.md b/api/README.md index 0d135b62..a9e177f7 100644 --- a/api/README.md +++ b/api/README.md @@ -102,7 +102,7 @@ Triggers a new scan for the provided token, starting from the defined block. | 409 | `token already created` | 4009 | | 500 | `the token cannot be created` | 5000 | | 500 | `error getting token information` | 5004 | -| 500 | `error initialising web3 client` | 5018 | +| 500 | `error initialising web3 client` | 5019 | ### GET `/token/{tokenID}` Returns the information about the token referenced by the provided ID. @@ -290,7 +290,7 @@ Returns a list of censusID for the strategy provided. | 204 | `-` | 4007 | | 404 | `census not found` | 4006 | | 500 | `error getting census information` | 5009 | -| 500 | `error encoding cenuses` | 5018 | +| 500 | `error encoding censuses` | 5018 | ## Censuses @@ -311,7 +311,7 @@ Request the creation of a new census with the strategy provided for the `blockNu ```json { - "censusId": 12 + "queueId": "0123456789abcdef0123456789abcdef01234567" } ``` @@ -319,9 +319,7 @@ Request the creation of a new census with the strategy provided for the `blockNu | HTTP Status | Message | Internal error | |:---:|:---|:---:| -| 400 | `malformed strategy ID, it must be a integer` | 4002 | -| 404 | `no strategy found with the ID provided` | 4005 | -| 500 | `error creating the census tree on the census database` | 5001 | +| 400 | `malformed strategy ID, it must be an integer` | 4002 | | 500 | `error encoding strategy holders` | 5014 | ### GET `/census/{censusId}` @@ -350,3 +348,27 @@ Returns the information of the snapshots related to the provided ID. | 404 | `census not found` | 4006 | | 500 | `error getting census information` | 5009 | | 500 | `error encoding census` | 5017 | + +### GET `/census/queue/{queueId}` +Returns the information of the census that are in the creation queue. + +- 📥 response: +```json +{ + "done": true, + "error": { + "code": 0, + "err": "error message or null" + }, + "census": { /* */ } +} +``` + +- ⚠️ errors: + +| HTTP Status | Message | Internal error | +|:---:|:---|:---:| +| 400 | `malformed queue ID` | 4010 | +| 404 | `census not found` | 4006 | +| 500 | `error getting census information` | 5009 | +| 500 | `error encoding census queue item` | 5022 | diff --git a/api/api.go b/api/api.go index a28987e9..5bf35a66 100644 --- a/api/api.go +++ b/api/api.go @@ -5,6 +5,7 @@ import ( "github.com/vocdoni/census3/census" "github.com/vocdoni/census3/db" + "github.com/vocdoni/census3/queue" "go.vocdoni.io/dvote/httprouter" api "go.vocdoni.io/dvote/httprouter/apirest" "go.vocdoni.io/dvote/log" @@ -23,14 +24,16 @@ type census3API struct { db *db.DB endpoint *api.API censusDB *census.CensusDB + queue *queue.BackgroundQueue w3p map[int64]string } func Init(db *db.DB, conf Census3APIConf) error { newAPI := &census3API{ - conf: conf, - db: db, - w3p: conf.Web3Providers, + conf: conf, + db: db, + w3p: conf.Web3Providers, + queue: queue.NewBackgroundQueue(), } // get the current chainID log.Infow("starting API", "chainID-web3Providers", conf.Web3Providers) @@ -79,13 +82,11 @@ func (capi *census3API) getAPIInfo(msg *api.APIdata, ctx *httprouter.HTTPContext for chainID := range capi.w3p { chainIDs = append(chainIDs, chainID) } - info := map[string]any{"chainIDs": chainIDs} res, err := json.Marshal(info) if err != nil { log.Errorw(err, "error encoding api info") return ErrEncodeAPIInfo } - return ctx.Send(res, api.HTTPstatusOK) } diff --git a/api/censuses.go b/api/censuses.go index 9bcdb037..21f91142 100644 --- a/api/censuses.go +++ b/api/censuses.go @@ -23,7 +23,11 @@ func (capi *census3API) initCensusHandlers() error { return err } if err := capi.endpoint.RegisterMethod("/census", "POST", - api.MethodAccessTypePublic, capi.createAndPublishCensus); err != nil { + api.MethodAccessTypePublic, capi.launchCensusCreation); err != nil { + return err + } + if err := capi.endpoint.RegisterMethod("/census/queue/{queueID}", "GET", + api.MethodAccessTypePublic, capi.enqueueCensus); err != nil { return err } return capi.endpoint.RegisterMethod("/census/strategy/{strategyID}", "GET", @@ -42,46 +46,80 @@ func (capi *census3API) getCensus(msg *api.APIdata, ctx *httprouter.HTTPContext) currentCensus, err := capi.db.QueriesRO.CensusByID(internalCtx, int64(censusID)) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return ErrNotFoundCensus + return ErrNotFoundCensus.WithErr(err) } - return ErrCantGetCensus + return ErrCantGetCensus.WithErr(err) + } + censusSize := int32(0) + if currentCensus.Size.Valid { + censusSize = currentCensus.Size.Int32 + } + censusWeight := []byte{} + if currentCensus.Weight.Valid { + censusWeight = []byte(currentCensus.Weight.String) } res, err := json.Marshal(GetCensusResponse{ CensusID: uint64(censusID), StrategyID: uint64(currentCensus.StrategyID), MerkleRoot: common.Bytes2Hex(currentCensus.MerkleRoot), URI: "ipfs://" + currentCensus.Uri.String, - Size: int32(currentCensus.Size), - Weight: new(big.Int).SetBytes(currentCensus.Weight).String(), + Size: censusSize, + Weight: new(big.Int).SetBytes(censusWeight).String(), Anonymous: currentCensus.CensusType == int64(census.AnonymousCensusType), }) if err != nil { - return ErrEncodeCensus + return ErrEncodeCensus.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } -// createAndPublishCensus handler creates a census tree based on the token -// holders of the tokens that are included in the given strategy. It recovers -// all the required information from the database, and then creates and publish -// the census merkle tree on IPFS. Then saves the resulting information of the -// census tree in the database and returns its ID. -// -// TODO: This handler is costly, specially for big censuses. It should be refactored to be a background task. -func (capi *census3API) createAndPublishCensus(msg *api.APIdata, ctx *httprouter.HTTPContext) error { +// launchCensusCreation handler parses the census creation request, enqueues it +// and starts the creation process, then returns the queue identifier of that +// process to support tracking it. When the process ends updates the queue item +// with the resulting status or error into the queue. +func (capi *census3API) launchCensusCreation(msg *api.APIdata, ctx *httprouter.HTTPContext) error { // decode request req := &CreateCensusResquest{} if err := json.Unmarshal(msg.Data, req); err != nil { - return ErrMalformedStrategyID + return ErrMalformedStrategyID.WithErr(err) } - // get tokens associated to the strategy - internalCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() + // create and publish census merkle tree in background + queueID := capi.queue.Enqueue() + go func() { + censusID, err := capi.createAndPublishCensus(req, queueID) + if err != nil && !errors.Is(ErrCensusAlreadyExists, err) { + if ok := capi.queue.Update(queueID, true, nil, err); !ok { + log.Errorf("error updating census queue process with error: %v", err) + } + return + } + queueData := map[string]any{"censusID": censusID} + if ok := capi.queue.Update(queueID, true, queueData, nil); !ok { + log.Errorf("error updating census queue process with error") + } + }() + // encoding the result and response it + res, err := json.Marshal(CreateCensusResponse{ + QueueID: queueID, + }) + if err != nil { + return ErrEncodeCensus.WithErr(err) + } + return ctx.Send(res, api.HTTPstatusOK) +} +// createAndPublishCensus method creates a census tree based on the token +// holders of the tokens that are included in the given strategy. It recovers +// all the required information from the database, and then creates and publish +// the census merkle tree on IPFS. Then saves the resulting information of the +// census tree in the database. +func (capi *census3API) createAndPublishCensus(req *CreateCensusResquest, qID string) (int, error) { + bgCtx, cancel := context.WithTimeout(context.Background(), censusCreationTimeout) + defer cancel() // begin a transaction for group sql queries - tx, err := capi.db.RW.BeginTx(internalCtx, nil) + tx, err := capi.db.RW.BeginTx(bgCtx, nil) if err != nil { - return ErrCantCreateCensus + return -1, ErrCantCreateCensus.WithErr(err) } defer func() { if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) { @@ -90,14 +128,28 @@ func (capi *census3API) createAndPublishCensus(msg *api.APIdata, ctx *httprouter }() qtx := capi.db.QueriesRW.WithTx(tx) - strategyTokens, err := qtx.TokensByStrategyID(internalCtx, int64(req.StrategyID)) + strategyTokens, err := qtx.TokensByStrategyID(bgCtx, int64(req.StrategyID)) if err != nil { if errors.Is(sql.ErrNoRows, err) { - log.Errorf("no strategy found for id %d: %s", req.StrategyID, err.Error()) - return ErrNotFoundStrategy + return -1, ErrNoStrategyTokens.WithErr(err) } - log.Errorf("error getting strategy with id %d: %s", req.StrategyID, err.Error()) - return ErrCantGetStrategy + return -1, ErrCantCreateCensus.WithErr(err) + } + if len(strategyTokens) == 0 { + return -1, ErrNoStrategyTokens.WithErr(err) + } + + // get the maximun current census ID to calculate the next one, if any + // census has been created yet, continue + lastCensusID, err := qtx.LastCensusID(bgCtx) + if err != nil && !errors.Is(sql.ErrNoRows, err) { + return -1, ErrCantCreateCensus.WithErr(err) + } + // compute the new censusId and censusType + newCensusID := int(lastCensusID) + 1 + censusType := census.DefaultCensusType + if req.Anonymous { + censusType = census.AnonymousCensusType } // get holders associated to every strategy token, create a map to avoid // duplicates and count the sum of the balances to get the weight of the @@ -105,14 +157,12 @@ func (capi *census3API) createAndPublishCensus(msg *api.APIdata, ctx *httprouter censusWeight := new(big.Int) strategyHolders := map[common.Address]*big.Int{} for _, token := range strategyTokens { - holders, err := qtx.TokenHoldersByTokenID(internalCtx, token.ID) + holders, err := qtx.TokenHoldersByTokenID(bgCtx, token.ID) if err != nil { if errors.Is(sql.ErrNoRows, err) { continue } - log.Errorw(err, "error getting token holders") - return ErrCantGetTokenHolders.Withf("for the token with address %s", - common.BytesToAddress(token.ID)) + return -1, ErrCantGetTokenHolders.WithErr(err) } for _, holder := range holders { holderAddr := common.BytesToAddress(holder.ID) @@ -123,67 +173,119 @@ func (capi *census3API) createAndPublishCensus(msg *api.APIdata, ctx *httprouter } } } - // get the maximun current census ID to calculate the next one, if any - // census has been created yet, continue - lastCensusID, err := qtx.LastCensusID(internalCtx) - if err != nil && !errors.Is(sql.ErrNoRows, err) { - log.Errorw(err, "error getting last census ID") - return ErrCantCreateCensus + if len(strategyHolders) == 0 { + log.Errorf("no holders for strategy '%d'", req.StrategyID) + return -1, ErrNotFoundTokenHolders.With("no holders for strategy") } // create a census tree and publish on IPFS - def := census.NewCensusDefinition(int(lastCensusID+1), int(req.StrategyID), strategyHolders, req.Anonymous) + def := census.NewCensusDefinition(newCensusID, int(req.StrategyID), strategyHolders, req.Anonymous) newCensus, err := capi.censusDB.CreateAndPublish(def) if err != nil { - log.Errorw(err, "error creating or publishing the census") - return ErrCantCreateCensus + return -1, ErrCantCreateCensus.WithErr(err) } // check if the census already exists using the merkle root of the generated // census - existingCensus, err := qtx.CensusByMerkleRoot(internalCtx, newCensus.RootHash) + currentCensus, err := qtx.CensusByMerkleRoot(bgCtx, newCensus.RootHash) if err == nil { - // encoding the result and response it - res, err := json.Marshal(CreateCensusResponse{ - CensusID: uint64(existingCensus.ID), - }) - if err != nil { - log.Error("error marshalling holders") - return ErrEncodeStrategyHolders - } - return ctx.Send(res, api.HTTPstatusOK) + return int(currentCensus.ID), ErrCensusAlreadyExists.WithErr(err) } if err != nil && !errors.Is(sql.ErrNoRows, err) { - log.Errorw(err, "error checking if the generated census already exists") - return ErrCantCreateCensus + return -1, ErrCantCreateCensus.WithErr(err) } // save the new census in the SQL database sqlURI := new(sql.NullString) if err := sqlURI.Scan(newCensus.URI); err != nil { - log.Errorw(err, "error saving the census on the database") - return ErrCantCreateCensus + return -1, ErrCantCreateCensus.WithErr(err) } - _, err = qtx.CreateCensus(internalCtx, queries.CreateCensusParams{ + sqlCensusSize := sql.NullInt32{} + if err := sqlCensusSize.Scan(int64(len(strategyHolders))); err != nil { + return -1, ErrCantCreateCensus.WithErr(err) + } + sqlCensusWeight := sql.NullString{} + if err := sqlCensusWeight.Scan(censusWeight.String()); err != nil { + return -1, ErrCantCreateCensus.WithErr(err) + } + _, err = qtx.CreateCensus(bgCtx, queries.CreateCensusParams{ ID: int64(newCensus.ID), StrategyID: int64(req.StrategyID), + CensusType: int64(censusType), MerkleRoot: newCensus.RootHash, Uri: *sqlURI, - Size: int64(len(strategyHolders)), - Weight: censusWeight.Bytes(), - CensusType: int64(def.Type), + Size: sqlCensusSize, + Weight: sqlCensusWeight, + QueueID: qID, }) if err != nil { - log.Errorw(err, "error saving the census on the database") - return ErrCantCreateCensus + return -1, ErrCantCreateCensus.WithErr(err) } if err := tx.Commit(); err != nil { - return err + return -1, ErrCantCreateCensus.WithErr(err) } - // encoding the result and response it - res, err := json.Marshal(CreateCensusResponse{ - CensusID: uint64(newCensus.ID), - }) + return newCensus.ID, nil +} + +// enqueueCensus handler returns the current status of the queue item +// identified by the ID provided. If it not exists it returns that the census +// is not found. Else if the census exists and has been successfully created, it +// will be included into the response. If not, the response only will include +// if it is done or not and the resulting error. +func (capi *census3API) enqueueCensus(msg *api.APIdata, ctx *httprouter.HTTPContext) error { + queueID := ctx.URLParam("queueID") + if queueID == "" { + return ErrMalformedCensusQueueID + } + // try to get and check if the census is in the queue + exists, done, data, err := capi.queue.Done(queueID) + if !exists { + return ErrNotFoundCensus.Withf("the ID %s does not exist in the queue", queueID) + } + // init queue item response + queueCensus := CensusQueueResponse{ + Done: done, + Error: err, + } + // check if it is not finished or some error occurred + if done && err == nil { + // if everything is ok, get the census information an return it + internalCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + censusID, ok := data["censusID"].(int) + if !ok { + log.Errorf("no census id registered on queue item") + return ErrCantGetCensus + } + + // get the census from the database by queue_id + currentCensus, err := capi.db.QueriesRO.CensusByID(internalCtx, int64(censusID)) + if err != nil { + return ErrCantGetCensus.WithErr(err) + } + // get values for optional parameters + censusSize := int32(0) + if currentCensus.Size.Valid { + censusSize = currentCensus.Size.Int32 + } + censusWeight := []byte{} + if currentCensus.Weight.Valid { + censusWeight = []byte(currentCensus.Weight.String) + } + // encode census + queueCensus.Census = &GetCensusResponse{ + CensusID: uint64(currentCensus.ID), + StrategyID: uint64(currentCensus.StrategyID), + MerkleRoot: common.Bytes2Hex(currentCensus.MerkleRoot), + URI: "ipfs://" + currentCensus.Uri.String, + Size: censusSize, + Weight: new(big.Int).SetBytes(censusWeight).String(), + Anonymous: currentCensus.CensusType == int64(census.AnonymousCensusType), + } + // remove the item from the queue + capi.queue.Dequeue(queueID) + } + // encode item response and send it + res, err := json.Marshal(queueCensus) if err != nil { - log.Error("error marshalling holders") - return ErrEncodeStrategyHolders + return ErrEncodeQueueItem.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } @@ -194,7 +296,7 @@ func (capi *census3API) getStrategyCensuses(msg *api.APIdata, ctx *httprouter.HT // get strategy ID strategyID, err := strconv.Atoi(ctx.URLParam("strategyID")) if err != nil { - return ErrMalformedCensusID + return ErrMalformedCensusID.WithErr(err) } // get censuses by this strategy ID internalCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -202,9 +304,9 @@ func (capi *census3API) getStrategyCensuses(msg *api.APIdata, ctx *httprouter.HT rows, err := capi.db.QueriesRO.CensusByStrategyID(internalCtx, int64(strategyID)) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return ErrNotFoundCensus + return ErrNotFoundCensus.WithErr(err) } - return ErrCantGetCensus + return ErrCantGetCensus.WithErr(err) } // parse and encode response censuses := GetCensusesResponse{Censuses: []uint64{}} @@ -213,8 +315,7 @@ func (capi *census3API) getStrategyCensuses(msg *api.APIdata, ctx *httprouter.HT } res, err := json.Marshal(censuses) if err != nil { - log.Errorw(ErrEncodeCensuses, err.Error()) - return ErrEncodeCensuses + return ErrEncodeCensuses.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } diff --git a/api/const.go b/api/const.go new file mode 100644 index 00000000..6169f169 --- /dev/null +++ b/api/const.go @@ -0,0 +1,7 @@ +package api + +import "time" + +const ( + censusCreationTimeout = 10 * time.Minute +) diff --git a/api/errors.go b/api/errors.go index 060344d1..807d6677 100644 --- a/api/errors.go +++ b/api/errors.go @@ -21,7 +21,7 @@ var ( ErrMalformedStrategyID = apirest.APIerror{ Code: 4002, HTTPstatus: apirest.HTTPstatusBadRequest, - Err: fmt.Errorf("malformed strategy ID, it must be a integer"), + Err: fmt.Errorf("malformed strategy ID, it must be an integer"), } ErrNotFoundToken = apirest.APIerror{ Code: 4003, @@ -58,6 +58,21 @@ var ( HTTPstatus: http.StatusConflict, Err: fmt.Errorf("token already created"), } + ErrNoStrategyTokens = apirest.APIerror{ + Code: 4010, + HTTPstatus: apirest.HTTPstatusBadRequest, + Err: fmt.Errorf("no tokens found for the strategy provided"), + } + ErrMalformedCensusQueueID = apirest.APIerror{ + Code: 4011, + HTTPstatus: apirest.HTTPstatusBadRequest, + Err: fmt.Errorf("malformed queue ID"), + } + ErrCensusAlreadyExists = apirest.APIerror{ + Code: 4012, + HTTPstatus: http.StatusConflict, + Err: fmt.Errorf("census already exists"), + } ErrChainIDNotSupported = apirest.APIerror{ Code: 4013, HTTPstatus: apirest.HTTPstatusBadRequest, @@ -173,6 +188,11 @@ var ( HTTPstatus: apirest.HTTPstatusInternalErr, Err: fmt.Errorf("error getting last block number from web3 endpoint"), } + ErrEncodeQueueItem = apirest.APIerror{ + Code: 5022, + HTTPstatus: apirest.HTTPstatusInternalErr, + Err: fmt.Errorf("error encoding census queue item"), + } ErrEncodeAPIInfo = apirest.APIerror{ Code: 5023, HTTPstatus: apirest.HTTPstatusInternalErr, diff --git a/api/strategy.go b/api/strategy.go index df69bb2a..95974fef 100644 --- a/api/strategy.go +++ b/api/strategy.go @@ -13,7 +13,6 @@ import ( queries "github.com/vocdoni/census3/db/sqlc" "go.vocdoni.io/dvote/httprouter" api "go.vocdoni.io/dvote/httprouter/apirest" - "go.vocdoni.io/dvote/log" ) func (capi *census3API) initStrategiesHandlers() error { @@ -65,10 +64,9 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont rows, err := capi.db.QueriesRO.ListStrategies(internalCtx) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return ErrNoStrategies + return ErrNoStrategies.WithErr(err) } - log.Errorw(ErrCantGetStrategies, err.Error()) - return ErrCantGetStrategies + return ErrCantGetStrategies.WithErr(err) } if len(rows) == 0 { return ErrNoStrategies @@ -80,8 +78,7 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont } res, err := json.Marshal(strategies) if err != nil { - log.Errorw(ErrEncodeStrategies, err.Error()) - return ErrEncodeStrategies + return ErrEncodeStrategies.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } @@ -94,8 +91,7 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex // get provided strategyID strategyID, err := strconv.Atoi(ctx.URLParam("strategyID")) if err != nil { - log.Errorw(ErrMalformedStrategyID, err.Error()) - return ErrMalformedStrategyID + return ErrMalformedStrategyID.WithErr(err) } // get strategy from the database internalCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -103,10 +99,9 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex strategyData, err := capi.db.QueriesRO.StrategyByID(internalCtx, int64(strategyID)) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return ErrNotFoundStrategy + return ErrNotFoundStrategy.WithErr(err) } - log.Errorw(ErrCantGetStrategy, err.Error()) - return ErrCantGetStrategy + return ErrCantGetStrategy.WithErr(err) } // parse strategy information strategy := GetStrategyResponse{ @@ -117,8 +112,7 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex // get information of the strategy related tokens tokensData, err := capi.db.QueriesRO.TokensByStrategyID(internalCtx, strategyData.ID) if err != nil && !errors.Is(err, sql.ErrNoRows) { - log.Errorw(ErrCantGetTokens, err.Error()) - return ErrCantGetTokens + return ErrCantGetTokens.WithErr(err) } // parse and encode tokens information for _, tokenData := range tokensData { @@ -131,8 +125,7 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex } res, err := json.Marshal(strategy) if err != nil { - log.Errorw(ErrEncodeStrategy, err.Error()) - return ErrEncodeStrategy + return ErrEncodeStrategy.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } @@ -150,10 +143,9 @@ func (capi *census3API) getTokenStrategies(msg *api.APIdata, ctx *httprouter.HTT rows, err := capi.db.QueriesRO.StrategiesByTokenID(internalCtx, common.HexToAddress(tokenID).Bytes()) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return ErrNoStrategies + return ErrNoStrategies.WithErr(err) } - log.Errorw(ErrCantGetStrategies, err.Error()) - return ErrCantGetStrategies + return ErrCantGetStrategies.WithErr(err) } if len(rows) == 0 { return ErrNoStrategies @@ -165,8 +157,7 @@ func (capi *census3API) getTokenStrategies(msg *api.APIdata, ctx *httprouter.HTT } res, err := json.Marshal(strategies) if err != nil { - log.Errorw(ErrEncodeStrategies, err.Error()) - return ErrEncodeStrategies + return ErrEncodeStrategies.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } diff --git a/api/tokens.go b/api/tokens.go index 2e1078d5..eab1fc99 100644 --- a/api/tokens.go +++ b/api/tokens.go @@ -45,10 +45,9 @@ func (capi *census3API) getTokens(msg *api.APIdata, ctx *httprouter.HTTPContext) rows, err := capi.db.QueriesRO.ListTokens(internalCtx) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return ErrNoTokens + return ErrNoTokens.WithErr(err) } - log.Errorw(ErrCantGetTokens, err.Error()) - return ErrCantGetTokens + return ErrCantGetTokens.WithErr(err) } if len(rows) == 0 { return ErrNoTokens @@ -68,8 +67,7 @@ func (capi *census3API) getTokens(msg *api.APIdata, ctx *httprouter.HTTPContext) } res, err := json.Marshal(tokens) if err != nil { - log.Errorw(ErrEncodeTokens, err.Error()) - return ErrEncodeTokens + return ErrEncodeTokens.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } @@ -83,7 +81,7 @@ func (capi *census3API) createToken(msg *api.APIdata, ctx *httprouter.HTTPContex req := CreateTokenRequest{} if err := json.Unmarshal(msg.Data, &req); err != nil { log.Errorf("error unmarshalling token information: %s", err) - return ErrMalformedToken.With("error unmarshalling token information") + return ErrMalformedToken.WithErr(err) } tokenType := state.TokenTypeFromString(req.Type) addr := common.HexToAddress(req.ID) @@ -99,12 +97,11 @@ func (capi *census3API) createToken(msg *api.APIdata, ctx *httprouter.HTTPContex } if err := w3.Init(internalCtx, w3uri, addr, tokenType); err != nil { log.Errorw(ErrInitializingWeb3, err.Error()) - return ErrInitializingWeb3 + return ErrInitializingWeb3.WithErr(err) } info, err := w3.TokenData() if err != nil { - log.Errorw(ErrCantGetToken, err.Error()) - return ErrCantGetToken + return ErrCantGetToken.WithErr(err) } var ( name = new(sql.NullString) @@ -115,23 +112,20 @@ func (capi *census3API) createToken(msg *api.APIdata, ctx *httprouter.HTTPContex tag = new(sql.NullString) ) if err := name.Scan(info.Name); err != nil { - log.Errorw(ErrCantGetToken, err.Error()) - return ErrCantGetToken + return ErrCantGetToken.WithErr(err) } if err := symbol.Scan(info.Symbol); err != nil { - log.Errorw(ErrCantGetToken, err.Error()) - return ErrCantGetToken + return ErrCantGetToken.WithErr(err) } if err := decimals.Scan(info.Decimals); err != nil { - log.Errorw(ErrCantGetToken, err.Error()) - return ErrCantGetToken + return ErrCantGetToken.WithErr(err) } if info.TotalSupply != nil { totalSupply = info.TotalSupply } if req.Tag != "" { if err := tag.Scan(req.Tag); err != nil { - return ErrCantGetToken + return ErrCantGetToken.WithErr(err) } } _, err = capi.db.QueriesRW.CreateToken(internalCtx, queries.CreateTokenParams{ @@ -148,10 +142,9 @@ func (capi *census3API) createToken(msg *api.APIdata, ctx *httprouter.HTTPContex }) if err != nil { if strings.Contains(err.Error(), "UNIQUE constraint failed") { - return ErrTokenAlreadyExists + return ErrTokenAlreadyExists.WithErr(err) } - log.Errorw(err, "error creating token on the database") - return ErrCantCreateToken.Withf("error creating token with address %s", addr) + return ErrCantCreateToken.WithErr(err) } // TODO: Only for the MVP, consider to remove it if err := capi.createDummyStrategy(info.Address.Bytes()); err != nil { @@ -171,17 +164,14 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext) tokenData, err := capi.db.QueriesRO.TokenByID(internalCtx, address.Bytes()) if err != nil { if errors.Is(err, sql.ErrNoRows) { - log.Errorw(ErrNotFoundToken, err.Error()) - return ErrNotFoundToken + return ErrNotFoundToken.WithErr(err) } - log.Errorw(ErrCantGetToken, err.Error()) - return ErrCantGetToken + return ErrCantGetToken.WithErr(err) } // TODO: Only for the MVP, consider to remove it tokenStrategies, err := capi.db.QueriesRO.StrategiesByTokenID(internalCtx, tokenData.ID) if err != nil && !errors.Is(err, sql.ErrNoRows) { - log.Errorw(ErrCantGetToken, err.Error()) - return ErrCantGetToken + return ErrCantGetToken.WithErr(err) } defaultStrategyID := uint64(0) if len(tokenStrategies) > 0 { @@ -191,8 +181,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext) atBlock, err := capi.db.QueriesRO.LastBlockByTokenID(internalCtx, address.Bytes()) if err != nil { if !errors.Is(err, sql.ErrNoRows) { - log.Errorw(ErrCantGetToken, err.Error()) - return ErrCantGetToken + return ErrCantGetToken.WithErr(err) } atBlock = 0 } @@ -213,7 +202,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext) // fetch the last block header and calculate progress lastBlockNumber, err := w3.LatestBlockNumber(internalCtx) if err != nil { - return ErrCantGetLastBlockNumber + return ErrCantGetLastBlockNumber.WithErr(err) } tokenProgress = uint64(float64(atBlock) / float64(lastBlockNumber) * 100) } @@ -223,7 +212,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext) defer cancel2() holders, err := capi.db.QueriesRO.CountTokenHoldersByTokenID(countHoldersCtx, address.Bytes()) if err != nil { - return ErrCantGetTokenCount + return ErrCantGetTokenCount.WithErr(err) } // build response @@ -250,7 +239,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext) } res, err := json.Marshal(tokenResponse) if err != nil { - return ErrEncodeToken + return ErrEncodeToken.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } @@ -264,7 +253,7 @@ func (capi *census3API) getTokenTypes(msg *api.APIdata, ctx *httprouter.HTTPCont } res, err := json.Marshal(TokenTypesResponse{supportedTypes}) if err != nil { - return ErrEncodeTokenTypes + return ErrEncodeTokenTypes.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } diff --git a/api/types.go b/api/types.go index 89a27b1f..d86fb444 100644 --- a/api/types.go +++ b/api/types.go @@ -57,7 +57,7 @@ type CreateCensusResquest struct { } type CreateCensusResponse struct { - CensusID uint64 `json:"censusId"` + QueueID string `json:"queueId"` } type GetCensusResponse struct { @@ -90,3 +90,9 @@ type GetStrategyResponse struct { Tokens []GetStrategyToken `json:"tokens"` Predicate string `json:"strategy"` } + +type CensusQueueResponse struct { + Done bool `json:"done"` + Error error `json:"error"` + Census *GetCensusResponse `json:"census"` +} diff --git a/census/census_test.go b/census/census_test.go index db1e02df..8498780d 100644 --- a/census/census_test.go +++ b/census/census_test.go @@ -107,8 +107,7 @@ func Test_save(t *testing.T) { }() def := NewCensusDefinition(0, 0, map[common.Address]*big.Int{}, false) - rtx := cdb.treeDB.ReadTx() - _, err = rtx.Get([]byte(censusDBKey(def.ID))) + _, err = cdb.treeDB.ReadTx().Get([]byte(censusDBKey(def.ID))) c.Assert(err, qt.IsNotNil) bdef := bytes.Buffer{} @@ -116,7 +115,7 @@ func Test_save(t *testing.T) { c.Assert(encoder.Encode(def), qt.IsNil) c.Assert(cdb.save(def), qt.IsNil) - res, err := rtx.Get([]byte(censusDBKey(def.ID))) + res, err := cdb.treeDB.ReadTx().Get([]byte(censusDBKey(def.ID))) c.Assert(err, qt.IsNil) c.Assert(res, qt.ContentEquals, bdef.Bytes()) } @@ -165,11 +164,10 @@ func Test_delete(t *testing.T) { def := NewCensusDefinition(0, 0, map[common.Address]*big.Int{}, false) c.Assert(cdb.save(def), qt.IsNil) - rtx := cdb.treeDB.ReadTx() - _, err = rtx.Get([]byte(censusDBKey(def.ID))) + _, err = cdb.treeDB.ReadTx().Get([]byte(censusDBKey(def.ID))) c.Assert(err, qt.IsNil) c.Assert(cdb.delete(def), qt.IsNil) - _, err = rtx.Get([]byte(censusDBKey(def.ID))) + _, err = cdb.treeDB.ReadTx().Get([]byte(censusDBKey(def.ID))) c.Assert(err, qt.IsNotNil) } diff --git a/db/migrations/0001_census3.sql b/db/migrations/0001_census3.sql index 525f6501..412e1f4b 100644 --- a/db/migrations/0001_census3.sql +++ b/db/migrations/0001_census3.sql @@ -35,12 +35,14 @@ CREATE INDEX idx_tokens_type_id ON tokens(type_id); CREATE TABLE censuses ( id INTEGER PRIMARY KEY, strategy_id INTEGER NOT NULL, - merkle_root BLOB NOT NULL UNIQUE, - uri TEXT UNIQUE, - size INTEGER NOT NULL, - weight BLOB NOT NULL, + merkle_root BLOB NOT NULL, + uri TEXT, + size INTEGER, + weight BLOB, census_type INTEGER NOT NULL, - FOREIGN KEY (strategy_id) REFERENCES strategies(id) ON DELETE CASCADE + queue_id TEXT NOT NULL, + FOREIGN KEY (strategy_id) REFERENCES strategies(id) ON DELETE CASCADE, + UNIQUE(id, merkle_root) ); CREATE INDEX idx_censuses_strategy_id ON censuses(strategy_id); diff --git a/db/queries/censuses.sql b/db/queries/censuses.sql index d838b148..20f98e4e 100644 --- a/db/queries/censuses.sql +++ b/db/queries/censuses.sql @@ -16,6 +16,11 @@ SELECT * FROM censuses WHERE merkle_root = ? LIMIT 1; +-- name: CensusByQueueID :one +SELECT * FROM censuses +WHERE queue_id = ? +LIMIT 1; + -- name: CensusesByStrategyIDAndBlockID :many SELECT c.* FROM censuses c JOIN census_blocks cb ON c.id = cb.census_id @@ -54,10 +59,11 @@ INSERT INTO censuses ( uri, size, weight, - census_type + census_type, + queue_id ) VALUES ( - ?, ?, ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ?, ?, ? ); -- name: DeleteCensus :execresult @@ -66,9 +72,10 @@ WHERE id = ?; -- name: UpdateCensus :execresult UPDATE censuses -SET strategy_id = sqlc.arg(strategy_id), - merkle_root = sqlc.arg(merkle_root), - uri = sqlc.arg(uri) +SET merkle_root = sqlc.arg(merkle_root), + uri = sqlc.arg(uri), + size = sqlc.arg(size), + weight = sqlc.arg(weight) WHERE id = sqlc.arg(id); -- name: CreateCensusBlock :execresult diff --git a/db/sqlc/censuses.sql.go b/db/sqlc/censuses.sql.go index a31d174f..b3e9398c 100644 --- a/db/sqlc/censuses.sql.go +++ b/db/sqlc/censuses.sql.go @@ -13,7 +13,7 @@ import ( ) const censusByID = `-- name: CensusByID :one -SELECT id, strategy_id, merkle_root, uri, size, weight, census_type FROM censuses +SELECT id, strategy_id, merkle_root, uri, size, weight, census_type, queue_id FROM censuses WHERE id = ? LIMIT 1 ` @@ -29,12 +29,13 @@ func (q *Queries) CensusByID(ctx context.Context, id int64) (Censuse, error) { &i.Size, &i.Weight, &i.CensusType, + &i.QueueID, ) return i, err } const censusByMerkleRoot = `-- name: CensusByMerkleRoot :one -SELECT id, strategy_id, merkle_root, uri, size, weight, census_type FROM censuses +SELECT id, strategy_id, merkle_root, uri, size, weight, census_type, queue_id FROM censuses WHERE merkle_root = ? LIMIT 1 ` @@ -50,12 +51,35 @@ func (q *Queries) CensusByMerkleRoot(ctx context.Context, merkleRoot annotations &i.Size, &i.Weight, &i.CensusType, + &i.QueueID, + ) + return i, err +} + +const censusByQueueID = `-- name: CensusByQueueID :one +SELECT id, strategy_id, merkle_root, uri, size, weight, census_type, queue_id FROM censuses +WHERE queue_id = ? +LIMIT 1 +` + +func (q *Queries) CensusByQueueID(ctx context.Context, queueID string) (Censuse, error) { + row := q.db.QueryRowContext(ctx, censusByQueueID, queueID) + var i Censuse + err := row.Scan( + &i.ID, + &i.StrategyID, + &i.MerkleRoot, + &i.Uri, + &i.Size, + &i.Weight, + &i.CensusType, + &i.QueueID, ) return i, err } const censusByStrategyID = `-- name: CensusByStrategyID :many -SELECT id, strategy_id, merkle_root, uri, size, weight, census_type FROM censuses +SELECT id, strategy_id, merkle_root, uri, size, weight, census_type, queue_id FROM censuses WHERE strategy_id = ? ` @@ -76,6 +100,7 @@ func (q *Queries) CensusByStrategyID(ctx context.Context, strategyID int64) ([]C &i.Size, &i.Weight, &i.CensusType, + &i.QueueID, ); err != nil { return nil, err } @@ -91,7 +116,7 @@ func (q *Queries) CensusByStrategyID(ctx context.Context, strategyID int64) ([]C } const censusByURI = `-- name: CensusByURI :one -SELECT id, strategy_id, merkle_root, uri, size, weight, census_type FROM censuses +SELECT id, strategy_id, merkle_root, uri, size, weight, census_type, queue_id FROM censuses WHERE uri = ? LIMIT 1 ` @@ -107,12 +132,13 @@ func (q *Queries) CensusByURI(ctx context.Context, uri sql.NullString) (Censuse, &i.Size, &i.Weight, &i.CensusType, + &i.QueueID, ) return i, err } const censusesByStrategyIDAndBlockID = `-- name: CensusesByStrategyIDAndBlockID :many -SELECT c.id, c.strategy_id, c.merkle_root, c.uri, c.size, c.weight, c.census_type FROM censuses c +SELECT c.id, c.strategy_id, c.merkle_root, c.uri, c.size, c.weight, c.census_type, c.queue_id FROM censuses c JOIN census_blocks cb ON c.id = cb.census_id WHERE c.strategy_id = ? AND cb.block_id = ? LIMIT ? OFFSET ? @@ -147,6 +173,7 @@ func (q *Queries) CensusesByStrategyIDAndBlockID(ctx context.Context, arg Census &i.Size, &i.Weight, &i.CensusType, + &i.QueueID, ); err != nil { return nil, err } @@ -162,7 +189,7 @@ func (q *Queries) CensusesByStrategyIDAndBlockID(ctx context.Context, arg Census } const censusesByTokenID = `-- name: CensusesByTokenID :many -SELECT c.id, c.strategy_id, c.merkle_root, c.uri, c.size, c.weight, c.census_type FROM censuses AS c +SELECT c.id, c.strategy_id, c.merkle_root, c.uri, c.size, c.weight, c.census_type, c.queue_id FROM censuses AS c JOIN strategy_tokens AS st ON c.strategy_id = st.strategy_id WHERE st.token_id = ? LIMIT ? OFFSET ? @@ -191,6 +218,7 @@ func (q *Queries) CensusesByTokenID(ctx context.Context, arg CensusesByTokenIDPa &i.Size, &i.Weight, &i.CensusType, + &i.QueueID, ); err != nil { return nil, err } @@ -206,7 +234,7 @@ func (q *Queries) CensusesByTokenID(ctx context.Context, arg CensusesByTokenIDPa } const censusesByTokenType = `-- name: CensusesByTokenType :many -SELECT c.id, c.strategy_id, c.merkle_root, c.uri, c.size, c.weight, c.census_type FROM censuses AS c +SELECT c.id, c.strategy_id, c.merkle_root, c.uri, c.size, c.weight, c.census_type, c.queue_id FROM censuses AS c JOIN strategy_tokens AS st ON c.strategy_id = st.strategy_id JOIN tokens AS t ON st.token_id = t.id JOIN token_types AS tt ON t.type_id = tt.id @@ -230,6 +258,7 @@ func (q *Queries) CensusesByTokenType(ctx context.Context, typeName string) ([]C &i.Size, &i.Weight, &i.CensusType, + &i.QueueID, ); err != nil { return nil, err } @@ -252,10 +281,11 @@ INSERT INTO censuses ( uri, size, weight, - census_type + census_type, + queue_id ) VALUES ( - ?, ?, ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ?, ?, ? ) ` @@ -264,9 +294,10 @@ type CreateCensusParams struct { StrategyID int64 MerkleRoot annotations.Hash Uri sql.NullString - Size int64 - Weight []byte + Size sql.NullInt32 + Weight sql.NullString CensusType int64 + QueueID string } func (q *Queries) CreateCensus(ctx context.Context, arg CreateCensusParams) (sql.Result, error) { @@ -278,6 +309,7 @@ func (q *Queries) CreateCensus(ctx context.Context, arg CreateCensusParams) (sql arg.Size, arg.Weight, arg.CensusType, + arg.QueueID, ) } @@ -338,7 +370,7 @@ func (q *Queries) LastCensusID(ctx context.Context) (int64, error) { } const listCensuses = `-- name: ListCensuses :many -SELECT id, strategy_id, merkle_root, uri, size, weight, census_type FROM censuses +SELECT id, strategy_id, merkle_root, uri, size, weight, census_type, queue_id FROM censuses ORDER BY id ` @@ -359,6 +391,7 @@ func (q *Queries) ListCensuses(ctx context.Context) ([]Censuse, error) { &i.Size, &i.Weight, &i.CensusType, + &i.QueueID, ); err != nil { return nil, err } @@ -375,24 +408,27 @@ func (q *Queries) ListCensuses(ctx context.Context) ([]Censuse, error) { const updateCensus = `-- name: UpdateCensus :execresult UPDATE censuses -SET strategy_id = ?, - merkle_root = ?, - uri = ? +SET merkle_root = ?, + uri = ?, + size = ?, + weight = ? WHERE id = ? ` type UpdateCensusParams struct { - StrategyID int64 MerkleRoot annotations.Hash Uri sql.NullString + Size sql.NullInt32 + Weight sql.NullString ID int64 } func (q *Queries) UpdateCensus(ctx context.Context, arg UpdateCensusParams) (sql.Result, error) { return q.db.ExecContext(ctx, updateCensus, - arg.StrategyID, arg.MerkleRoot, arg.Uri, + arg.Size, + arg.Weight, arg.ID, ) } diff --git a/db/sqlc/models.go b/db/sqlc/models.go index f478fd32..d561d12a 100644 --- a/db/sqlc/models.go +++ b/db/sqlc/models.go @@ -26,9 +26,10 @@ type Censuse struct { StrategyID int64 MerkleRoot annotations.Hash Uri sql.NullString - Size int64 - Weight []byte + Size sql.NullInt32 + Weight sql.NullString CensusType int64 + QueueID string } type Holder struct { diff --git a/letsencrypt/acme.json b/letsencrypt/acme.json new file mode 100644 index 00000000..e69de29b diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 00000000..924a12c9 --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,95 @@ +// queue package abstracts a structure for enqueuing processes that running in +// the background, updating the status of those processes and dequeuing them. +// It ensures that processes are enqueued, dequeued and update thread safely. +package queue + +import ( + "sync" + + "go.vocdoni.io/dvote/util" +) + +// QueueIDLen constant defines the fixed length of any queue item +const QueueIDLen = 20 + +// QueueItem struct defines a single queue item, including if it is done or not, +// it it throw any error during its execution, and a flexsible data map variable +// to store resulting information of the enqueued process execution. +type QueueItem struct { + done bool + err error + data map[string]any +} + +// BackgroundQueue struct abstracts a background processes queue, including a +// mutex to make the operations over it safely and also a map of items, +// identified by the queue item id's. +type BackgroundQueue struct { + mtx *sync.Mutex + processes map[string]QueueItem +} + +// NewBackgroundQueue function initializes a new queue and return it. +func NewBackgroundQueue() *BackgroundQueue { + return &BackgroundQueue{ + mtx: &sync.Mutex{}, + processes: make(map[string]QueueItem), + } +} + +// Enqueue method creates a new queue item and enqueue it into the current +// background queue and returns its ID. It initialize the queue item done to +// false, its error and data to nil. This queue item parameters can be updated +// using the resulting ID and the queue Update method. +func (q *BackgroundQueue) Enqueue() string { + q.mtx.Lock() + defer q.mtx.Unlock() + + id := util.RandomHex(QueueIDLen) + q.processes[id] = QueueItem{ + done: false, + err: nil, + data: make(map[string]any), + } + return id +} + +// Dequeue method removes a item from que current queue using the id provided. +// It returns if the item was in the queue before remove it. +func (q *BackgroundQueue) Dequeue(id string) bool { + q.mtx.Lock() + defer q.mtx.Unlock() + + if _, ok := q.processes[id]; !ok { + return false + } + delete(q.processes, id) + return true +} + +// Update method updates the information of a queue item identified by the +// provided id. It changes the done, data and error parameters of the found +// queue item to the provided values. +func (q *BackgroundQueue) Update(id string, done bool, data map[string]any, err error) bool { + q.mtx.Lock() + defer q.mtx.Unlock() + + if _, ok := q.processes[id]; !ok { + return false + } + q.processes[id] = QueueItem{done: done, err: err, data: data} + return true +} + +// Done method returns the queue item information such as it is done or not, if +// it throws any error and its data. But all this information is only returned +// if the queue item exists in the queue, returned as first parameter. +func (q *BackgroundQueue) Done(id string) (bool, bool, map[string]any, error) { + q.mtx.Lock() + defer q.mtx.Unlock() + + if p, ok := q.processes[id]; ok { + return true, p.done, p.data, p.err + } + return false, false, nil, nil +} diff --git a/queue/queue_test.go b/queue/queue_test.go new file mode 100644 index 00000000..a4abe608 --- /dev/null +++ b/queue/queue_test.go @@ -0,0 +1,177 @@ +package queue + +import ( + "context" + "flag" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + qt "github.com/frankban/quicktest" +) + +func TestEnqueueDequeue(t *testing.T) { + c := qt.New(t) + + q := NewBackgroundQueue() + + id := q.Enqueue() + _, ok := q.processes[id] + c.Assert(ok, qt.IsTrue) + c.Assert(q.Dequeue(id), qt.IsTrue) + _, ok = q.processes[id] + c.Assert(ok, qt.IsFalse) + c.Assert(q.Dequeue(id), qt.IsFalse) +} + +func TestUpdateDone(t *testing.T) { + c := qt.New(t) + + q := NewBackgroundQueue() + + id := q.Enqueue() + exists, done, _, err := q.Done(id) + c.Assert(exists, qt.IsTrue) + c.Assert(err, qt.IsNil) + c.Assert(done, qt.IsFalse) + + c.Assert(q.Update(id, true, nil, fmt.Errorf("test error")), qt.IsTrue) + exists, done, _, err = q.Done(id) + c.Assert(exists, qt.IsTrue) + c.Assert(err, qt.IsNotNil) + c.Assert(done, qt.IsTrue) + + c.Assert(q.Dequeue(id), qt.IsTrue) + exists, done, _, err = q.Done(id) + c.Assert(exists, qt.IsFalse) + c.Assert(err, qt.IsNil) + c.Assert(done, qt.IsFalse) +} + +func TestQueueDataRace(t *testing.T) { + nConsumers := flag.Int("consumers", 100, "number of processes consumers") + flag.Parse() + c := qt.New(t) + // initialize some variables to store the results of the test and a queue + var nProcesses int64 + queueItemIdChan := make(chan string) + q := NewBackgroundQueue() + // set a context with the test deadline, decreaseing by 5 seconds to ensure + // a gap to check test results + deadline, ok := t.Deadline() + c.Assert(ok, qt.IsTrue) + ctx, cancel := context.WithDeadline(context.Background(), deadline.Add(-5*time.Second)) + defer cancel() + // launch producers + producersWg := new(sync.WaitGroup) + producersWg.Add(1) + go func() { + defer producersWg.Done() + for { + select { + case <-ctx.Done(): + return + default: + atomic.AddInt64(&nProcesses, 1) + queueItemIdChan <- q.Enqueue() + time.Sleep(time.Millisecond * 500) + } + } + }() + // create and lunch consumers + var asyncErrors sync.Map + updatersWg := new(sync.WaitGroup) + for i := 0; i < *nConsumers; i++ { + updatersWg.Add(1) + go func() { + defer updatersWg.Done() + for { + select { + case <-ctx.Done(): + return + case queueItemId, ok := <-queueItemIdChan: + // wait for a new id + if !ok { + return + } + // if not exists create an error + exists, done, data, err := q.Done(queueItemId) + if !exists { + asyncErrors.Store(queueItemId, fmt.Errorf("expected queue item not found during done check")) + continue + } + // if it is not done, update it to done + if !done { + // if this actions fails create an error + if !q.Update(queueItemId, true, data, err) { + asyncErrors.Store(queueItemId, fmt.Errorf("expected queue item not found during update")) + continue + } + } + // resend it through the channel + queueItemIdChan <- queueItemId + } + } + }() + } + // create and lunch consumers + dequeuersWg := new(sync.WaitGroup) + for i := 0; i < *nConsumers; i++ { + dequeuersWg.Add(1) + go func() { + defer dequeuersWg.Done() + for { + select { + case <-ctx.Done(): + return + case queueItemId, ok := <-queueItemIdChan: + // wait for a new id + if !ok { + return + } + // if not exists create an error + exists, done, _, _ := q.Done(queueItemId) + if !exists { + asyncErrors.Store(queueItemId, fmt.Errorf("expected queue item not found during done check")) + continue + } + // if it is done, remove it from the queue, and if this action + // fails, create an error; unless create a nil error + if done { + if !q.Dequeue(queueItemId) { + asyncErrors.Store(queueItemId, fmt.Errorf("expected queue item not found during update")) + } else { + asyncErrors.Store(queueItemId, nil) + } + continue + } + // if it is not done, resend it through the channel + queueItemIdChan <- queueItemId + } + } + }() + } + // wait until goroutines finish + producersWg.Wait() + updatersWg.Wait() + dequeuersWg.Wait() + // check completed processes errors (nil or not) + completed := []error{} + asyncErrors.Range(func(key, value any) bool { + if err, ok := value.(error); ok { + completed = append(completed, err) + } else { + completed = append(completed, nil) + } + return true + }) + // assert number of completed processes + c.Assert(int64(len(completed)), qt.Equals, nProcesses) + // assert that every error is nil + for _, err := range completed { + c.Assert(err, qt.IsNil) + } + t.Logf("Completed with %d processes created!", nProcesses) +}