Skip to content

Commit

Permalink
bug: 'database is locked' error after some time without calling the A…
Browse files Browse the repository at this point in the history
…PI (#71)

'database is locked' fixed using the same solution as vocdoni-node indexe
  • Loading branch information
lucasmenendez authored Sep 7, 2023
1 parent 8d9fd54 commit b63ffe8
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 85 deletions.
9 changes: 3 additions & 6 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package api

import (
"database/sql"
"encoding/json"

"github.com/vocdoni/census3/census"
queries "github.com/vocdoni/census3/db/sqlc"
"github.com/vocdoni/census3/db"
"go.vocdoni.io/dvote/httprouter"
api "go.vocdoni.io/dvote/httprouter/apirest"
"go.vocdoni.io/dvote/log"
Expand All @@ -21,18 +20,16 @@ type Census3APIConf struct {

type census3API struct {
conf Census3APIConf
db *sql.DB
sqlc *queries.Queries
db *db.DB
endpoint *api.API
censusDB *census.CensusDB
w3p map[int64]string
}

func Init(db *sql.DB, q *queries.Queries, conf Census3APIConf) error {
func Init(db *db.DB, conf Census3APIConf) error {
newAPI := &census3API{
conf: conf,
db: db,
sqlc: q,
w3p: conf.Web3Providers,
}
// get the current chainID
Expand Down
19 changes: 4 additions & 15 deletions api/censuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,7 @@ func (capi *census3API) getCensus(msg *api.APIdata, ctx *httprouter.HTTPContext)
}
internalCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// begin a transaction for group sql queries
tx, err := capi.db.BeginTx(internalCtx, nil)
if err != nil {
return ErrCantGetCensus
}
defer func() {
if err := tx.Rollback(); err != nil {
log.Errorw(err, "holders transaction rollback failed")
}
}()
qtx := capi.sqlc.WithTx(tx)
currentCensus, err := qtx.CensusByID(internalCtx, int64(censusID))
currentCensus, err := capi.db.QueriesRO.CensusByID(internalCtx, int64(censusID))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFoundCensus
Expand Down Expand Up @@ -90,7 +79,7 @@ func (capi *census3API) createAndPublishCensus(msg *api.APIdata, ctx *httprouter
defer cancel()

// begin a transaction for group sql queries
tx, err := capi.db.BeginTx(internalCtx, nil)
tx, err := capi.db.RW.BeginTx(internalCtx, nil)
if err != nil {
return ErrCantCreateCensus
}
Expand All @@ -99,7 +88,7 @@ func (capi *census3API) createAndPublishCensus(msg *api.APIdata, ctx *httprouter
log.Errorw(err, "holders transaction rollback failed")
}
}()
qtx := capi.sqlc.WithTx(tx)
qtx := capi.db.QueriesRW.WithTx(tx)

strategyTokens, err := qtx.TokensByStrategyID(internalCtx, int64(req.StrategyID))
if err != nil {
Expand Down Expand Up @@ -210,7 +199,7 @@ func (capi *census3API) getStrategyCensuses(msg *api.APIdata, ctx *httprouter.HT
// get censuses by this strategy ID
internalCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
rows, err := capi.sqlc.CensusByStrategyID(internalCtx, int64(strategyID))
rows, err := capi.db.QueriesRO.CensusByStrategyID(internalCtx, int64(strategyID))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFoundCensus
Expand Down
4 changes: 2 additions & 2 deletions api/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (capi *census3API) getTokenHolders(msg *api.APIdata, ctx *httprouter.HTTPCo

// get token holders from the database
addr := common.HexToAddress(ctx.URLParam("address"))
dbHolders, err := capi.sqlc.TokenHoldersByTokenID(ctx2, addr.Bytes())
dbHolders, err := capi.db.QueriesRO.TokenHoldersByTokenID(ctx2, addr.Bytes())
if err != nil {
// if database does not contain any token holder for this token, return
// no content, else return generic error.
Expand Down Expand Up @@ -77,7 +77,7 @@ func (capi *census3API) countHolders(msg *api.APIdata, ctx *httprouter.HTTPConte
defer cancel()

addr := common.HexToAddress(ctx.URLParam("address"))
numberOfHolders, err := capi.sqlc.CountTokenHoldersByTokenID(ctx2, addr.Bytes())
numberOfHolders, err := capi.db.QueriesRO.CountTokenHoldersByTokenID(ctx2, addr.Bytes())
if err != nil {
if errors.Is(sql.ErrNoRows, err) {
log.Errorf("no holders found for address %s: %s", addr, err.Error())
Expand Down
12 changes: 6 additions & 6 deletions api/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ func (capi *census3API) initStrategiesHandlers() error {
func (capi *census3API) createDummyStrategy(tokenID []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
res, err := capi.sqlc.CreateStategy(ctx, "test")
res, err := capi.db.QueriesRW.CreateStategy(ctx, "test")
if err != nil {
return err
}
strategyID, err := res.LastInsertId()
if err != nil {
return err
}
_, err = capi.sqlc.CreateStrategyToken(ctx, queries.CreateStrategyTokenParams{
_, err = capi.db.QueriesRW.CreateStrategyToken(ctx, queries.CreateStrategyTokenParams{
StrategyID: strategyID,
TokenID: tokenID,
MinBalance: big.NewInt(0).Bytes(),
Expand All @@ -62,7 +62,7 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont
defer cancel()
// TODO: Support for pagination
// get strategies from the database
rows, err := capi.sqlc.ListStrategies(internalCtx)
rows, err := capi.db.QueriesRO.ListStrategies(internalCtx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNoStrategies
Expand Down Expand Up @@ -100,7 +100,7 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex
// get strategy from the database
internalCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
strategyData, err := capi.sqlc.StrategyByID(internalCtx, int64(strategyID))
strategyData, err := capi.db.QueriesRO.StrategyByID(internalCtx, int64(strategyID))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFoundStrategy
Expand All @@ -115,7 +115,7 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex
Tokens: []GetStrategyToken{},
}
// get information of the strategy related tokens
tokensData, err := capi.sqlc.TokensByStrategyID(internalCtx, strategyData.ID)
tokensData, err := capi.db.QueriesRO.TokensByStrategyID(internalCtx, strategyData.ID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Errorw(ErrCantGetTokens, err.Error())
return ErrCantGetTokens
Expand Down Expand Up @@ -147,7 +147,7 @@ func (capi *census3API) getTokenStrategies(msg *api.APIdata, ctx *httprouter.HTT
internalCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// get strategies associated to the token provided
rows, err := capi.sqlc.StrategiesByTokenID(internalCtx, common.HexToAddress(tokenID).Bytes())
rows, err := capi.db.QueriesRO.StrategiesByTokenID(internalCtx, common.HexToAddress(tokenID).Bytes())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNoStrategies
Expand Down
12 changes: 6 additions & 6 deletions api/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (capi *census3API) getTokens(msg *api.APIdata, ctx *httprouter.HTTPContext)
defer cancel()
// TODO: Support for pagination
// get tokens from the database
rows, err := capi.sqlc.ListTokens(internalCtx)
rows, err := capi.db.QueriesRO.ListTokens(internalCtx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNoTokens
Expand Down Expand Up @@ -134,7 +134,7 @@ func (capi *census3API) createToken(msg *api.APIdata, ctx *httprouter.HTTPContex
return ErrCantGetToken
}
}
_, err = capi.sqlc.CreateToken(internalCtx, queries.CreateTokenParams{
_, err = capi.db.QueriesRW.CreateToken(internalCtx, queries.CreateTokenParams{
ID: info.Address.Bytes(),
Name: *name,
Symbol: *symbol,
Expand Down Expand Up @@ -168,7 +168,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext)
address := common.HexToAddress(ctx.URLParam("tokenID"))
internalCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tokenData, err := capi.sqlc.TokenByID(internalCtx, address.Bytes())
tokenData, err := capi.db.QueriesRO.TokenByID(internalCtx, address.Bytes())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
log.Errorw(ErrNotFoundToken, err.Error())
Expand All @@ -178,7 +178,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext)
return ErrCantGetToken
}
// TODO: Only for the MVP, consider to remove it
tokenStrategies, err := capi.sqlc.StrategiesByTokenID(internalCtx, tokenData.ID)
tokenStrategies, err := capi.db.QueriesRO.StrategiesByTokenID(internalCtx, tokenData.ID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Errorw(ErrCantGetToken, err.Error())
return ErrCantGetToken
Expand All @@ -188,7 +188,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext)
defaultStrategyID = uint64(tokenStrategies[0].ID)
}
// get last block with token information
atBlock, err := capi.sqlc.LastBlockByTokenID(internalCtx, address.Bytes())
atBlock, err := capi.db.QueriesRO.LastBlockByTokenID(internalCtx, address.Bytes())
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Errorw(ErrCantGetToken, err.Error())
Expand Down Expand Up @@ -221,7 +221,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext)
// get token holders count
countHoldersCtx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
holders, err := capi.sqlc.CountTokenHoldersByTokenID(countHoldersCtx, address.Bytes())
holders, err := capi.db.QueriesRO.CountTokenHoldersByTokenID(countHoldersCtx, address.Bytes())
if err != nil {
return ErrCantGetTokenCount
}
Expand Down
12 changes: 9 additions & 3 deletions cmd/census3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func main() {
flag.Parse()
log.Init(*logLevel, "stdout", nil)

db, q, err := db.Init(*dataDir)
database, err := db.Init(*dataDir)
if err != nil {
log.Fatal(err)
}
Expand All @@ -43,13 +43,13 @@ func main() {
log.Info(w3p)

// Start the holder scanner
hc, err := service.NewHoldersScanner(db, q, w3p)
hc, err := service.NewHoldersScanner(database, w3p)
if err != nil {
log.Fatal(err)
}

// Start the API
err = api.Init(db, q, api.Census3APIConf{
err = api.Init(database, api.Census3APIConf{
Hostname: "0.0.0.0",
Port: *port,
DataDir: *dataDir,
Expand All @@ -69,6 +69,12 @@ func main() {
log.Warnf("received SIGTERM, exiting at %s", time.Now().Format(time.RFC850))
cancel()
log.Infof("waiting for routines to end gracefully...")
// closing database
go func() {
if err := database.Close(); err != nil {
log.Fatal(err)
}
}()
time.Sleep(5 * time.Second)
os.Exit(0)
}
65 changes: 56 additions & 9 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"time"

_ "github.com/mattn/go-sqlite3"
"github.com/pressly/goose/v3"
Expand All @@ -15,27 +16,73 @@ import (
//go:embed migrations/*.sql
var migrationsFS embed.FS

func Init(dataDir string) (*sql.DB, *queries.Queries, error) {
// DB struct abstact a safe connection with the database using sqlc queries,
// sqlite as a database engine and go-sqlite3 as a driver.
type DB struct {
RW *sql.DB
RO *sql.DB

QueriesRW *queries.Queries
QueriesRO *queries.Queries
}

// Close function stops all internal connections to the database
func (db *DB) Close() error {
if err := db.RW.Close(); err != nil {
return err
}
return db.RO.Close()
}

// Init function starts a database using the data path provided as argument. It
// opens two different connections, one for read only, and another for read and
// write, with different configurations, optimized for each use case.
func Init(dataDir string) (*DB, error) {
dbFile := filepath.Join(dataDir, "census3.sql")
if _, err := os.Stat(dbFile); os.IsNotExist(err) {
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
return nil, nil, fmt.Errorf("error creating a new database file: %w", err)
return nil, fmt.Errorf("error creating a new database file: %w", err)
}
}
// open database file
database, err := sql.Open("sqlite3", dbFile)
// sqlite doesn't support multiple concurrent writers.
// For that reason, rwDB is limited to one open connection.
// Per https://github.com/mattn/go-sqlite3/issues/1022#issuecomment-1067353980,
// we use WAL to allow multiple concurrent readers at the same time.
rwDB, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal", dbFile))
if err != nil {
return nil, nil, fmt.Errorf("error opening database: %w", err)
return nil, fmt.Errorf("error opening database: %w", err)
}
rwDB.SetMaxOpenConns(1)
rwDB.SetMaxIdleConns(2)
rwDB.SetConnMaxIdleTime(10 * time.Minute)
rwDB.SetConnMaxLifetime(time.Hour)

roDB, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=ro&_journal_mode=wal", dbFile))
if err != nil {
return nil, fmt.Errorf("error opening database: %w", err)
}
// Increasing these numbers can allow for more queries to run concurrently,
// but it also increases the memory used by sqlite and our connection pool.
// Most read-only queries we run are quick enough, so a small number seems OK.
roDB.SetMaxOpenConns(10)
roDB.SetMaxIdleConns(20)
roDB.SetConnMaxIdleTime(5 * time.Minute)
roDB.SetConnMaxLifetime(time.Hour)

// get census3 goose migrations and setup for sqlite3
if err := goose.SetDialect("sqlite3"); err != nil {
return nil, nil, fmt.Errorf("error setting up driver for sqlite: %w", err)
return nil, fmt.Errorf("error setting up driver for sqlite: %w", err)
}
goose.SetBaseFS(migrationsFS)
// perform goose up
if err := goose.Up(database, "migrations"); err != nil {
return nil, nil, fmt.Errorf("error during goose up: %w", err)
if err := goose.Up(rwDB, "migrations"); err != nil {
return nil, fmt.Errorf("error during goose up: %w", err)
}
// init sqlc
return database, queries.New(database), nil
return &DB{
RW: rwDB,
RO: roDB,
QueriesRW: queries.New(rwDB),
QueriesRO: queries.New(roDB),
}, nil
}
12 changes: 6 additions & 6 deletions service/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ var (
)

type TestDB struct {
dir string
db *sql.DB
queries *queries.Queries
dir string
db *db.DB
}

func StartTestDB(t *testing.T) *TestDB {
c := qt.New(t)

dir := t.TempDir()
db, q, err := db.Init(dir)
db, err := db.Init(dir)
c.Assert(err, qt.IsNil)
return &TestDB{dir, db, q}
return &TestDB{dir, db}
}

func (testdb *TestDB) Close(t *testing.T) {
c := qt.New(t)
c.Assert(testdb.db.Close(), qt.IsNil)
c.Assert(testdb.db.RW.Close(), qt.IsNil)
c.Assert(testdb.db.RO.Close(), qt.IsNil)
c.Assert(os.RemoveAll(testdb.dir), qt.IsNil)
}

Expand Down
Loading

0 comments on commit b63ffe8

Please sign in to comment.