diff --git a/analytics/cmd/metrics/volume_mongo.go b/analytics/cmd/metrics/volume_mongo.go index f40676dc8..e2611890a 100755 --- a/analytics/cmd/metrics/volume_mongo.go +++ b/analytics/cmd/metrics/volume_mongo.go @@ -7,7 +7,7 @@ import ( "time" "github.com/wormhole-foundation/wormhole-explorer/analytics/prices" - "github.com/wormhole-foundation/wormhole-explorer/common/db" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/common/repository" "go.uber.org/zap" @@ -25,7 +25,7 @@ func RunVaaVolumeFromMongo(mongoUri, mongoDb, outputFile, pricesFile string) { logger.Info("starting wormhole-explorer-analytics ...") //setup DB connection - db, err := db.New(rootCtx, logger, mongoUri, mongoDb) + db, err := dbutil.Connect(rootCtx, logger, mongoUri, mongoDb) if err != nil { logger.Fatal("Failed to connect MongoDB", zap.Error(err)) } @@ -98,8 +98,9 @@ func RunVaaVolumeFromMongo(mongoUri, mongoDb, outputFile, pricesFile string) { } page++ } - logger.Info("Closing database connections ...") - db.Close() + + logger.Info("closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) for k := range converter.MissingTokensCounter { fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), converter.MissingTokens[k], converter.MissingTokensCounter[k])) diff --git a/analytics/cmd/service/run.go b/analytics/cmd/service/run.go index f1b00fd4b..49d50a3fe 100644 --- a/analytics/cmd/service/run.go +++ b/analytics/cmd/service/run.go @@ -7,6 +7,7 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" @@ -23,7 +24,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/analytics/queue" wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional" sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs" - "github.com/wormhole-foundation/wormhole-explorer/common/db" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" health "github.com/wormhole-foundation/wormhole-explorer/common/health" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "go.mongodb.org/mongo-driver/mongo" @@ -57,7 +58,7 @@ func Run() { // setup DB connection logger.Info("connecting to MongoDB...") - db, err := db.New(rootCtx, logger, config.MongodbURI, config.MongodbDatabase) + db, err := dbutil.Connect(rootCtx, logger, config.MongodbURI, config.MongodbDatabase) if err != nil { logger.Fatal("failed to connect MongoDB", zap.Error(err)) } @@ -119,12 +120,16 @@ func Run() { logger.Info("cancelling root context...") rootCtxCancel() + logger.Info("closing metrics client...") metric.Close() + logger.Info("closing HTTP server...") server.Stop() + logger.Info("closing MongoDB connection...") - db.Close() + db.DisconnectWithTimeout(10 * time.Second) + logger.Info("terminated successfully") } diff --git a/api/internal/db/db.go b/api/internal/db/db.go deleted file mode 100644 index ebd531676..000000000 --- a/api/internal/db/db.go +++ /dev/null @@ -1,19 +0,0 @@ -// Package db handle mongodb connections. -package db - -import ( - "context" - - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" -) - -// Connect create a new mongo db client for the options defined in the input param url. -func Connect(ctx context.Context, url string) (*mongo.Client, error) { - cli, err := mongo.NewClient(options.Client().ApplyURI(url)) - if err != nil { - return cli, err - } - err = cli.Connect(ctx) - return cli, err -} diff --git a/api/main.go b/api/main.go index dc44cdbde..4cdf032ba 100644 --- a/api/main.go +++ b/api/main.go @@ -32,7 +32,6 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa" "github.com/wormhole-foundation/wormhole-explorer/api/internal/config" - "github.com/wormhole-foundation/wormhole-explorer/api/internal/db" "github.com/wormhole-foundation/wormhole-explorer/api/internal/tvl" "github.com/wormhole-foundation/wormhole-explorer/api/middleware" "github.com/wormhole-foundation/wormhole-explorer/api/response" @@ -40,6 +39,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan" rpcApi "github.com/wormhole-foundation/wormhole-explorer/api/rpc" wormscanCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" xlogger "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/common/utils" "go.uber.org/zap" @@ -103,11 +103,10 @@ func main() { // Setup DB rootLogger.Info("connecting to MongoDB") - cli, err := db.Connect(appCtx, cfg.DB.URL) + db, err := dbutil.Connect(appCtx, rootLogger, cfg.DB.URL, cfg.DB.Name) if err != nil { rootLogger.Fatal("failed to connect to MongoDB", zap.Error(err)) } - db := cli.Database(cfg.DB.Name) // Get cache get function rootLogger.Info("initializing cache") @@ -126,12 +125,12 @@ func main() { // Set up repositories rootLogger.Info("initializing repositories") - addressRepo := address.NewRepository(db, rootLogger) - vaaRepo := vaa.NewRepository(db, rootLogger) - obsRepo := observations.NewRepository(db, rootLogger) - governorRepo := governor.NewRepository(db, rootLogger) - infrastructureRepo := infrastructure.NewRepository(db, rootLogger) - heartbeatsRepo := heartbeats.NewRepository(db, rootLogger) + addressRepo := address.NewRepository(db.Database, rootLogger) + vaaRepo := vaa.NewRepository(db.Database, rootLogger) + obsRepo := observations.NewRepository(db.Database, rootLogger) + governorRepo := governor.NewRepository(db.Database, rootLogger) + infrastructureRepo := infrastructure.NewRepository(db.Database, rootLogger) + heartbeatsRepo := heartbeats.NewRepository(db.Database, rootLogger) transactionsRepo := transactions.NewRepository( tvl, influxCli, @@ -139,7 +138,7 @@ func main() { cfg.Influx.Bucket24Hours, cfg.Influx.Bucket30Days, cfg.Influx.BucketInfinite, - db, + db.Database, rootLogger, ) @@ -223,10 +222,16 @@ func main() { } rootLogger.Info("cleanup tasks...") + rootLogger.Info("shutting down server...") app.Shutdown() + rootLogger.Info("closing cache...") cache.Close() + + rootLogger.Info("closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) + rootLogger.Info("terminated API service successfully") } diff --git a/common/db/db.go b/common/db/db.go deleted file mode 100644 index 7335bb836..000000000 --- a/common/db/db.go +++ /dev/null @@ -1,31 +0,0 @@ -package db - -import ( - "context" - "time" - - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.uber.org/zap" -) - -// Database definition. -type Database struct { - Database *mongo.Database - client *mongo.Client -} - -// New connects to DB and returns a client that will disconnect when the passed in context is cancelled. -func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) { - cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri)) - if err != nil { - return nil, err - } - return &Database{client: cli, Database: cli.Database(databaseName)}, err -} - -// Close closes the database connections. -func (d *Database) Close() error { - ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) - return d.client.Disconnect(ctx) -} diff --git a/common/dbutil/session.go b/common/dbutil/session.go new file mode 100644 index 000000000..4faae90c6 --- /dev/null +++ b/common/dbutil/session.go @@ -0,0 +1,76 @@ +package dbutil + +import ( + "context" + "fmt" + "time" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "go.uber.org/zap" +) + +// Session is a plain-old-data struct that represents a handle to a MongoDB database. +type Session struct { + Client *mongo.Client + Database *mongo.Database + logger *zap.Logger +} + +// Connect to a MongoDB database. +func Connect( + ctx context.Context, + logger *zap.Logger, + uri string, + databaseName string, +) (*Session, error) { + + // Create a timed sub-context for the connection attempt + const connectTimeout = 10 * time.Second + subContext, cancelFunc := context.WithTimeout(ctx, connectTimeout) + defer cancelFunc() + + // Connect to MongoDB + client, err := mongo.Connect(subContext, options.Client().ApplyURI(uri)) + if err != nil { + return nil, fmt.Errorf("failed to connect to MongoDB: %w", err) + } + + // Ping the database to make sure we're actually connected + // + // This can detect a misconfuiguration error when a service is being initialized, + // rather than waiting for the first query to fail in the service's processing loop. + err = client.Ping(subContext, readpref.Primary()) + if err != nil { + return nil, fmt.Errorf("failed to ping MongoDB database: %w", err) + } + + // Populate the result struct and return + db := &Session{ + Client: client, + Database: client.Database(databaseName), + } + return db, nil +} + +// Disconnect from a MongoDB database. +func (s *Session) DisconnectWithTimeout(timeout time.Duration) error { + + // Create a timed sub-context for the disconnection attempt + subContext, cancelFunc := context.WithTimeout(context.Background(), timeout) + defer cancelFunc() + + // Attempt to disconnect + err := s.Client.Disconnect(subContext) + if err != nil { + s.logger.Warn( + "failed to disconnect from MongoDB", + zap.Duration("timeout", timeout), + zap.Error(err), + ) + return fmt.Errorf("failed to disconnect from MongoDB: %w", err) + } + + return nil +} diff --git a/contract-watcher/cmd/backfiller/run.go b/contract-watcher/cmd/backfiller/run.go index 87c6a4e03..a32c32c16 100644 --- a/contract-watcher/cmd/backfiller/run.go +++ b/contract-watcher/cmd/backfiller/run.go @@ -2,13 +2,14 @@ package backfiller import ( "context" + "time" "github.com/wormhole-foundation/wormhole-explorer/common/client/alert" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/domain" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/builder" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/config" - "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/db" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/watcher" @@ -24,7 +25,7 @@ func Run(config *config.BackfillerConfiguration) { logger.Info("Starting wormhole-explorer-contract-watcher as backfiller ...") //setup DB connection - db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase) + db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase) if err != nil { logger.Fatal("failed to connect MongoDB", zap.Error(err)) } @@ -58,8 +59,8 @@ func Run(config *config.BackfillerConfiguration) { watcher.Backfill(rootCtx, config.FromBlock, config.ToBlock, config.PageSize, config.PersistBlock) - logger.Info("Closing database connections ...") - db.Close() + logger.Info("closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) logger.Info("Finish wormhole-explorer-contract-watcher as backfiller") diff --git a/contract-watcher/cmd/service/run.go b/contract-watcher/cmd/service/run.go index e99325a28..c837579a5 100644 --- a/contract-watcher/cmd/service/run.go +++ b/contract-watcher/cmd/service/run.go @@ -9,6 +9,7 @@ import ( "time" "github.com/wormhole-foundation/wormhole-explorer/common/client/alert" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/domain" "github.com/wormhole-foundation/wormhole-explorer/common/health" "github.com/wormhole-foundation/wormhole-explorer/common/logger" @@ -17,7 +18,6 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/http/infrastructure" cwAlert "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/alert" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/ankr" - "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/db" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/processor" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage" @@ -78,7 +78,7 @@ func Run() { logger.Info("Starting wormhole-explorer-contract-watcher ...") //setup DB connection - db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase) + db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase) if err != nil { logger.Fatal("failed to connect MongoDB", zap.Error(err)) } @@ -126,10 +126,13 @@ func Run() { logger.Info("Closing processor ...") processor.Close() - logger.Info("Closing database connections ...") - db.Close() + + logger.Info("closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) + logger.Info("Closing Http server ...") server.Stop() + logger.Info("Finished wormhole-explorer-contract-watcher") } diff --git a/contract-watcher/internal/db/db.go b/contract-watcher/internal/db/db.go deleted file mode 100644 index 7335bb836..000000000 --- a/contract-watcher/internal/db/db.go +++ /dev/null @@ -1,31 +0,0 @@ -package db - -import ( - "context" - "time" - - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.uber.org/zap" -) - -// Database definition. -type Database struct { - Database *mongo.Database - client *mongo.Client -} - -// New connects to DB and returns a client that will disconnect when the passed in context is cancelled. -func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) { - cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri)) - if err != nil { - return nil, err - } - return &Database{client: cli, Database: cli.Database(databaseName)}, err -} - -// Close closes the database connections. -func (d *Database) Close() error { - ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) - return d.client.Disconnect(ctx) -} diff --git a/fly/cmd/backfiller/txhash_encoding_strategy.go b/fly/cmd/backfiller/txhash_encoding_strategy.go index b042c300b..599ed320f 100644 --- a/fly/cmd/backfiller/txhash_encoding_strategy.go +++ b/fly/cmd/backfiller/txhash_encoding_strategy.go @@ -3,8 +3,10 @@ package main import ( "context" "encoding/hex" + "time" "github.com/wormhole-foundation/wormhole-explorer/common/client/alert" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/domain" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics" @@ -25,12 +27,13 @@ func RunTxHashEncoding(cfg TxHashEncondingConfig) { ctx := context.Background() logger := logger.New("wormhole-fly", logger.WithLevel(cfg.LogLevel)) - db, err := storage.GetDB(ctx, logger, cfg.MongoURI, cfg.MongoDatabase) + db, err := dbutil.Connect(ctx, logger, cfg.MongoURI, cfg.MongoDatabase) if err != nil { logger.Fatal("could not connect to DB", zap.Error(err)) } + defer db.DisconnectWithTimeout(10 * time.Second) - repository := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), db, logger) + repository := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), db.Database, logger) workerTxHashEncoding(ctx, logger, repository, vaa.ChainID(cfg.ChainID), cfg.PageSize) } diff --git a/fly/cmd/backfiller/workpool.go b/fly/cmd/backfiller/workpool.go index 9e609674f..8e5f9aba3 100644 --- a/fly/cmd/backfiller/workpool.go +++ b/fly/cmd/backfiller/workpool.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "sync" + "time" "github.com/schollz/progressbar/v3" "github.com/wormhole-foundation/wormhole-explorer/common/client/alert" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/fly/storage" - "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" ) @@ -19,7 +20,7 @@ type Workpool struct { Workers int Queue chan string WG sync.WaitGroup - DB *mongo.Database + DB *dbutil.Session Log *zap.Logger Bar *progressbar.ProgressBar WorkerFunc GenericWorker @@ -42,7 +43,7 @@ func NewWorkpool(ctx context.Context, cfg WorkerConfiguration, workerFunc Generi WorkerFunc: workerFunc, } - db, err := storage.GetDB(ctx, wp.Log, cfg.MongoURI, cfg.MongoDatabase) + db, err := dbutil.Connect(ctx, wp.Log, cfg.MongoURI, cfg.MongoDatabase) if err != nil { panic(err) } @@ -59,9 +60,11 @@ func NewWorkpool(ctx context.Context, cfg WorkerConfiguration, workerFunc Generi } func (w *Workpool) Process(ctx context.Context) error { - repo := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), w.DB, w.Log) + repo := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), w.DB.Database, w.Log) var err error + defer w.DB.DisconnectWithTimeout(10 * time.Second) + for { select { case line := <-w.Queue: diff --git a/fly/main.go b/fly/main.go index d6a0216f7..988a13613 100644 --- a/fly/main.go +++ b/fly/main.go @@ -6,6 +6,7 @@ import ( "log" "strconv" "strings" + "time" "fmt" "os" @@ -15,6 +16,7 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/go-redis/redis/v8" "github.com/wormhole-foundation/wormhole-explorer/common/client/alert" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/domain" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/fly/config" @@ -270,18 +272,18 @@ func main() { logger.Fatal("You must set your 'MONGODB_DATABASE' environmental variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable") } - db, err := storage.GetDB(rootCtx, logger, uri, databaseName) + db, err := dbutil.Connect(rootCtx, logger, uri, databaseName) if err != nil { logger.Fatal("could not connect to DB", zap.Error(err)) } // Run the database migration. - err = migration.Run(db) + err = migration.Run(db.Database) if err != nil { logger.Fatal("error running migration", zap.Error(err)) } - repository := storage.NewRepository(alertClient, metrics, db, logger) + repository := storage.NewRepository(alertClient, metrics, db.Database, logger) // Outbound gossip message queue sendC := make(chan []byte) @@ -503,9 +505,13 @@ func main() { supervisor.WithPropagatePanic) <-rootCtx.Done() + // TODO: wait for things to shut down gracefully vaaGossipConsumerSplitter.Close() server.Stop() + + logger.Info("Closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) } // getGovernorConfigNodeName get node name from governor config. diff --git a/fly/storage/db.go b/fly/storage/db.go deleted file mode 100644 index 0f0f50487..000000000 --- a/fly/storage/db.go +++ /dev/null @@ -1,25 +0,0 @@ -package storage - -import ( - "context" - "time" - - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.uber.org/zap" -) - -// GetDB connects to DB and returns a client that will disconnect when the passed in context is cancelled -func GetDB(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*mongo.Database, error) { - cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri)) - if err != nil { - return nil, err - } - go func() { - <-appCtx.Done() - ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) - err := cli.Disconnect(ctx) - log.Error("error disconnecting from db", zap.Error(err)) - }() - return cli.Database(databaseName), err -} diff --git a/parser/cmd/backfiller/run.go b/parser/cmd/backfiller/run.go index 88e6f7e28..af605d10c 100644 --- a/parser/cmd/backfiller/run.go +++ b/parser/cmd/backfiller/run.go @@ -5,10 +5,10 @@ import ( "time" "github.com/wormhole-foundation/wormhole-explorer/common/client/alert" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/parser/config" "github.com/wormhole-foundation/wormhole-explorer/parser/http/vaa" - "github.com/wormhole-foundation/wormhole-explorer/parser/internal/db" "github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/parser/parser" "github.com/wormhole-foundation/wormhole-explorer/parser/processor" @@ -43,7 +43,7 @@ func Run(config *config.BackfillerConfiguration) { } //setup DB connection - db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase) + db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase) if err != nil { logger.Fatal("Failed to connect MongoDB", zap.Error(err)) } @@ -87,8 +87,9 @@ func Run(config *config.BackfillerConfiguration) { } page++ } - logger.Info("Closing database connections ...") - db.Close() + + logger.Info("closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) logger.Info("Finish wormhole-explorer-parser as backfiller") } diff --git a/parser/cmd/service/run.go b/parser/cmd/service/run.go index 40b0a9b0c..1a8b3b0dd 100644 --- a/parser/cmd/service/run.go +++ b/parser/cmd/service/run.go @@ -6,18 +6,19 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/wormhole-foundation/wormhole-explorer/common/client/alert" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/parser/config" "github.com/wormhole-foundation/wormhole-explorer/parser/consumer" "github.com/wormhole-foundation/wormhole-explorer/parser/http/infrastructure" "github.com/wormhole-foundation/wormhole-explorer/parser/http/vaa" parserAlert "github.com/wormhole-foundation/wormhole-explorer/parser/internal/alert" - "github.com/wormhole-foundation/wormhole-explorer/parser/internal/db" "github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs" "github.com/wormhole-foundation/wormhole-explorer/parser/parser" @@ -51,8 +52,8 @@ func Run() { logger.Info("Starting wormhole-explorer-parser ...") - //setup DB connection - db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase) + // setup DB connection + db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase) if err != nil { logger.Fatal("failed to connect MongoDB", zap.Error(err)) } @@ -104,10 +105,12 @@ func Run() { logger.Info("root context cancelled, exiting...") rootCtxCancel() - logger.Info("Closing database connections ...") - db.Close() + logger.Info("closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) + logger.Info("Closing Http server ...") server.Stop() + logger.Info("Finished wormhole-explorer-parser") } diff --git a/parser/internal/db/db.go b/parser/internal/db/db.go deleted file mode 100644 index 7335bb836..000000000 --- a/parser/internal/db/db.go +++ /dev/null @@ -1,31 +0,0 @@ -package db - -import ( - "context" - "time" - - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.uber.org/zap" -) - -// Database definition. -type Database struct { - Database *mongo.Database - client *mongo.Client -} - -// New connects to DB and returns a client that will disconnect when the passed in context is cancelled. -func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) { - cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri)) - if err != nil { - return nil, err - } - return &Database{client: cli, Database: cli.Database(databaseName)}, err -} - -// Close closes the database connections. -func (d *Database) Close() error { - ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) - return d.client.Disconnect(ctx) -} diff --git a/pipeline/cmd/main.go b/pipeline/cmd/main.go index e7b7bd0c0..055cc7aba 100644 --- a/pipeline/cmd/main.go +++ b/pipeline/cmd/main.go @@ -6,17 +6,18 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/wormhole-foundation/wormhole-explorer/common/client/alert" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/pipeline/config" "github.com/wormhole-foundation/wormhole-explorer/pipeline/healthcheck" "github.com/wormhole-foundation/wormhole-explorer/pipeline/http/infrastructure" pipelineAlert "github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/alert" - "github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/db" "github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/sns" "github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline" @@ -52,7 +53,7 @@ func main() { logger.Info("Starting wormhole-explorer-pipeline ...") //setup DB connection - db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase) + db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase) if err != nil { logger.Fatal("failed to connect MongoDB", zap.Error(err)) } @@ -115,10 +116,12 @@ func main() { logger.Info("Closing tx hash handler ...") close(quit) - logger.Info("Closing database connections ...") - db.Close() + logger.Info("closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) + logger.Info("Closing Http server ...") server.Stop() + logger.Info("Finished wormhole-explorer-pipeline") } diff --git a/pipeline/internal/db/db.go b/pipeline/internal/db/db.go deleted file mode 100644 index 7335bb836..000000000 --- a/pipeline/internal/db/db.go +++ /dev/null @@ -1,31 +0,0 @@ -package db - -import ( - "context" - "time" - - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.uber.org/zap" -) - -// Database definition. -type Database struct { - Database *mongo.Database - client *mongo.Client -} - -// New connects to DB and returns a client that will disconnect when the passed in context is cancelled. -func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) { - cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri)) - if err != nil { - return nil, err - } - return &Database{client: cli, Database: cli.Database(databaseName)}, err -} - -// Close closes the database connections. -func (d *Database) Close() error { - ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) - return d.client.Disconnect(ctx) -} diff --git a/spy/cmd/main.go b/spy/cmd/main.go index 47f8a14b4..166869391 100644 --- a/spy/cmd/main.go +++ b/spy/cmd/main.go @@ -6,8 +6,10 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/certusone/wormhole/node/pkg/supervisor" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/spy/config" "github.com/wormhole-foundation/wormhole-explorer/spy/grpc" @@ -67,7 +69,7 @@ func main() { publisher := grpc.NewPublisher(svs, avs, logger) - db, err := storage.New(rootCtx, logger, config.MongoURI, config.MongoDatabase) + db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase) if err != nil { logger.Fatal("failed to connect MongoDB", zap.Error(err)) } @@ -98,8 +100,10 @@ func main() { logger.Info("Closing GRPC server ...") grpcServer.Stop() - logger.Info("Closing database connections ...") - db.Close() + + logger.Info("Closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) + logger.Info("Closing Http server ...") server.Stop() logger.Info("Finished wormhole-explorer-spy") diff --git a/spy/storage/db.go b/spy/storage/db.go deleted file mode 100644 index 6f6735d87..000000000 --- a/spy/storage/db.go +++ /dev/null @@ -1,30 +0,0 @@ -package storage - -import ( - "context" - "time" - - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.uber.org/zap" -) - -type Database struct { - Database *mongo.Database - client *mongo.Client -} - -// New connects to DB and returns a client that will disconnect when the passed in context is cancelled -func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) { - cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri)) - if err != nil { - return nil, err - } - return &Database{client: cli, Database: cli.Database(databaseName)}, err -} - -// Close closes the database connections. -func (d *Database) Close() error { - ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) - return d.client.Disconnect(ctx) -} diff --git a/tx-tracker/cmd/backfiller/main.go b/tx-tracker/cmd/backfiller/main.go index 3fc5cc823..d54b89ddf 100644 --- a/tx-tracker/cmd/backfiller/main.go +++ b/tx-tracker/cmd/backfiller/main.go @@ -11,12 +11,11 @@ import ( "syscall" "time" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" "github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" ) @@ -65,12 +64,11 @@ func main() { }() // Initialize the database client - cli, err := mongo.Connect(rootCtx, options.Client().ApplyURI(cfg.MongodbUri)) + db, err := dbutil.Connect(rootCtx, mainLogger, cfg.MongodbUri, cfg.MongodbDatabase) if err != nil { log.Fatal("Failed to initialize MongoDB client: ", err) } - defer cli.Disconnect(rootCtx) - repository := consumer.NewRepository(rootLogger, cli.Database(cfg.MongodbDatabase)) + repository := consumer.NewRepository(rootLogger, db.Database) strategyCallbacks, err := parseStrategyCallbacks(mainLogger, cfg, repository) if err != nil { @@ -116,7 +114,12 @@ func main() { } // Wait for all workers to finish before closing + mainLogger.Info("Waiting for all workers to finish...") wg.Wait() + + mainLogger.Info("Closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) + mainLogger.Info("Closing main goroutine") } diff --git a/tx-tracker/cmd/service/main.go b/tx-tracker/cmd/service/main.go index a5fc79f0f..ee44ba7a0 100644 --- a/tx-tracker/cmd/service/main.go +++ b/tx-tracker/cmd/service/main.go @@ -12,9 +12,9 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/health" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" @@ -47,19 +47,13 @@ func main() { chains.Initialize(&cfg.RpcProviderSettings) // initialize the database client - cli, err := mongo.Connect(rootCtx, options.Client().ApplyURI(cfg.MongodbUri)) + db, err := dbutil.Connect(rootCtx, logger, cfg.MongodbUri, cfg.MongodbDatabase) if err != nil { log.Fatal("Failed to initialize MongoDB client: ", err) } - defer func() { - subCtx, cancelSubCtx := context.WithTimeout(context.Background(), 10*time.Second) - _ = cli.Disconnect(subCtx) - cancelSubCtx() - }() - db := cli.Database(cfg.MongodbDatabase) // start serving /health and /ready endpoints - healthChecks, err := makeHealthChecks(rootCtx, cfg, db) + healthChecks, err := makeHealthChecks(rootCtx, cfg, db.Database) if err != nil { logger.Fatal("Failed to create health checks", zap.Error(err)) } @@ -68,7 +62,7 @@ func main() { // create and start a consumer. vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, metrics, logger) - repository := consumer.NewRepository(logger, db) + repository := consumer.NewRepository(logger, db.Database) consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics) consumer.Start(rootCtx) @@ -87,8 +81,13 @@ func main() { // graceful shutdown logger.Info("Cancelling root context...") rootCtxCancel() + logger.Info("Closing Http server...") server.Stop() + + logger.Info("Closing MongoDB connection...") + db.DisconnectWithTimeout(10 * time.Second) + logger.Info("Terminated wormhole-explorer-tx-tracker") }