diff --git a/Makefile b/Makefile index 9b1aa31bc..5d29186ad 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,5 @@ SHELL := /bin/bash - -## help: print this help message -.PHONY: help -help: - @echo 'Usage:' - @sed -n 's/^##//p' ${MAKEFILE_LIST} | column -t -s ':' | sed -e 's/^/ /' - build: make -C analytics/ build make -C api/ build @@ -15,9 +8,10 @@ build: make -C parser/ build make -C tx-tracker/ build make -C contract-watcher/ build + make -C event-watcher/ build doc: - swag init -pd + make -C api/ doc test: cd analytics && go test -v -cover ./... @@ -27,5 +21,6 @@ test: cd parser && go test -v -cover ./... cd tx-tracker && go test -v -cover ./... cd contract-watcher && go test -v -cover ./... + cd event-watcher && go test -v -cover ./... .PHONY: build doc test diff --git a/common/settings/structs.go b/common/settings/structs.go new file mode 100644 index 000000000..4e909a059 --- /dev/null +++ b/common/settings/structs.go @@ -0,0 +1,48 @@ +package settings + +import ( + "fmt" + + "github.com/joho/godotenv" + "github.com/kelseyhightower/envconfig" +) + +// MongoDB contains configuration settings for a MongoDB database. +type MongoDB struct { + MongodbURI string `split_words:"true" required:"true"` + MongodbDatabase string `split_words:"true" required:"true"` +} + +type Logger struct { + LogLevel string `split_words:"true" default:"INFO"` +} + +type P2p struct { + P2pNetwork string `split_words:"true" required:"true"` +} + +// Monitoring contains configuration settings for the monitoring endpoints. +type Monitoring struct { + // MonitoringPort defines the TCP port for the monitoring endpoints. + MonitoringPort string `split_words:"true" default:"8000"` + PprofEnabled bool `split_words:"true" default:"false"` +} + +// LoadFromEnv loads the configuration settings from environment variables. +// +// If there is a .env file in the current directory, it will be used to +// populate the environment variables. +func LoadFromEnv[T any]() (*T, error) { + + // Load .env file (if it exists) + _ = godotenv.Load() + + // Load environment variables into a struct + var settings T + err := envconfig.Process("", &settings) + if err != nil { + return nil, fmt.Errorf("failed to read config from environment: %w", err) + } + + return &settings, nil +} diff --git a/deploy/event-watcher/env/production-mainnet.env b/deploy/event-watcher/env/production-mainnet.env new file mode 100644 index 000000000..a300e2ee5 --- /dev/null +++ b/deploy/event-watcher/env/production-mainnet.env @@ -0,0 +1,20 @@ +ENVIRONMENT=production-mainnet +NAMESPACE=wormscan +NAME=wormscan-event-watcher +REPLICAS=5 +IMAGE_NAME= +RESOURCES_LIMITS_MEMORY=512Mi +RESOURCES_LIMITS_CPU=700m +RESOURCES_REQUESTS_MEMORY=384Mi +RESOURCES_REQUESTS_CPU=500m +AWS_IAM_ROLE= +P2P_NETWORK=mainnet +PPROF_ENABLED=false +MONITORING_PORT=8000 +MONGODB_URI= +MONGODB_DATABASE= +LOG_LEVEL=INFO + +ETHEREUM_REQUESTS_PER_MINUTE=12 +ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/mainnet/native +ETHEREUM_AUTH= \ No newline at end of file diff --git a/deploy/event-watcher/env/production-testnet.env b/deploy/event-watcher/env/production-testnet.env new file mode 100644 index 000000000..56f11e84b --- /dev/null +++ b/deploy/event-watcher/env/production-testnet.env @@ -0,0 +1,20 @@ +ENVIRONMENT=production-testnet +NAMESPACE=wormscan-testnet +NAME=wormscan-event-watcher +REPLICAS=3 +IMAGE_NAME= +RESOURCES_LIMITS_MEMORY=256Mi +RESOURCES_LIMITS_CPU=500m +RESOURCES_REQUESTS_MEMORY=128Mi +RESOURCES_REQUESTS_CPU=250m +AWS_IAM_ROLE= +P2P_NETWORK=testnet +PPROF_ENABLED=false +MONITORING_PORT=8000 +MONGODB_URI= +MONGODB_DATABASE= +LOG_LEVEL=INFO + +ETHEREUM_REQUESTS_PER_MINUTE=12 +ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/goerli/native +ETHEREUM_AUTH= \ No newline at end of file diff --git a/deploy/event-watcher/env/staging-mainnet.env b/deploy/event-watcher/env/staging-mainnet.env new file mode 100644 index 000000000..d5dc77b6b --- /dev/null +++ b/deploy/event-watcher/env/staging-mainnet.env @@ -0,0 +1,20 @@ +ENVIRONMENT=staging-mainnet +NAMESPACE=wormscan +NAME=wormscan-event-watcher +REPLICAS=3 +IMAGE_NAME= +RESOURCES_LIMITS_MEMORY=512Mi +RESOURCES_LIMITS_CPU=700m +RESOURCES_REQUESTS_MEMORY=384Mi +RESOURCES_REQUESTS_CPU=500m +AWS_IAM_ROLE= +P2P_NETWORK=mainnet +PPROF_ENABLED=false +MONITORING_PORT=8000 +MONGODB_URI= +MONGODB_DATABASE= +LOG_LEVEL=INFO + +ETHEREUM_REQUESTS_PER_MINUTE=12 +ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/mainnet/native +ETHEREUM_AUTH= \ No newline at end of file diff --git a/deploy/event-watcher/env/staging-testnet.env b/deploy/event-watcher/env/staging-testnet.env new file mode 100644 index 000000000..67738fe0d --- /dev/null +++ b/deploy/event-watcher/env/staging-testnet.env @@ -0,0 +1,20 @@ +ENVIRONMENT=staging-testnet +NAMESPACE=wormscan-testnet +NAME=wormscan-event-watcher +REPLICAS=2 +IMAGE_NAME= +RESOURCES_LIMITS_MEMORY=256Mi +RESOURCES_LIMITS_CPU=500m +RESOURCES_REQUESTS_MEMORY=128Mi +RESOURCES_REQUESTS_CPU=250m +AWS_IAM_ROLE= +P2P_NETWORK=testnet +PPROF_ENABLED=false +MONITORING_PORT=8000 +MONGODB_URI= +MONGODB_DATABASE= +LOG_LEVEL=INFO + +ETHEREUM_REQUESTS_PER_MINUTE=12 +ETHEREUM_URL=https://svc.blockdaemon.com/ethereum/goerli/native +ETHEREUM_AUTH= \ No newline at end of file diff --git a/deploy/event-watcher/event-watcher-service.yaml b/deploy/event-watcher/event-watcher-service.yaml new file mode 100644 index 000000000..3faa86a1e --- /dev/null +++ b/deploy/event-watcher/event-watcher-service.yaml @@ -0,0 +1,75 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .NAME }} + namespace: {{ .NAMESPACE }} +spec: + replicas: {{ .REPLICAS }} + selector: + matchLabels: + app: {{ .NAME }} + template: + metadata: + labels: + app: {{ .NAME }} + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8000" + spec: + restartPolicy: Always + terminationGracePeriodSeconds: 40 + serviceAccountName: event-watcher + containers: + - name: {{ .NAME }} + image: {{ .IMAGE_NAME }} + imagePullPolicy: Always + readinessProbe: + initialDelaySeconds: 30 + periodSeconds: 20 + timeoutSeconds: 3 + failureThreshold: 3 + httpGet: + path: /api/ready + port: 8000 + livenessProbe: + initialDelaySeconds: 30 + periodSeconds: 30 + timeoutSeconds: 3 + failureThreshold: 3 + httpGet: + path: /api/health + port: 8000 + env: + - name: ENVIRONMENT + value: {{ .ENVIRONMENT }} + - name: MONITORING_PORT + value: "{{ .MONITORING_PORT }}" + - name: LOG_LEVEL + value: "{{ .LOG_LEVEL }}" + - name: ETHEREUM_REQUESTS_PER_MINUTE + value: "{{ .ETHEREUM_REQUESTS_PER_MINUTE }}" + - name: ETHEREUM_URL + value: "{{ .ETHEREUM_URL }}" + - name: ETHEREUM_AUTH + value: "{{ .ETHEREUM_AUTH }}" + - name: MONGODB_URI + valueFrom: + secretKeyRef: + name: mongodb + key: mongo-uri + - name: MONGODB_DATABASE + valueFrom: + configMapKeyRef: + name: config + key: mongo-database + - name: P2P_NETWORK + value: {{ .P2P_NETWORK }} + - name: PPROF_ENABLED + value: "{{ .PPROF_ENABLED }}" + resources: + limits: + memory: {{ .RESOURCES_LIMITS_MEMORY }} + cpu: {{ .RESOURCES_LIMITS_CPU }} + requests: + memory: {{ .RESOURCES_REQUESTS_MEMORY }} + cpu: {{ .RESOURCES_REQUESTS_CPU }} \ No newline at end of file diff --git a/deploy/event-watcher/sa.yaml b/deploy/event-watcher/sa.yaml new file mode 100644 index 000000000..d846fb499 --- /dev/null +++ b/deploy/event-watcher/sa.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: event-watcher + namespace: {{ .NAMESPACE }} + annotations: + eks.amazonaws.com/role-arn: {{ .AWS_IAM_ROLE }} diff --git a/event-watcher/.gitignore b/event-watcher/.gitignore new file mode 100644 index 000000000..fac41e06d --- /dev/null +++ b/event-watcher/.gitignore @@ -0,0 +1,2 @@ +bin/service +.env \ No newline at end of file diff --git a/event-watcher/Dockerfile b/event-watcher/Dockerfile new file mode 100644 index 000000000..cafc5e5c4 --- /dev/null +++ b/event-watcher/Dockerfile @@ -0,0 +1,21 @@ +# syntax=docker.io/docker/dockerfile:1.3@sha256:42399d4635eddd7a9b8a24be879d2f9a930d0ed040a61324cfdf59ef1357b3b2 +FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a2dae4a09aa13d0aa56e6d23c4ec2b1e4faacf86a813 AS build + +WORKDIR /app + +COPY event-watcher event-watcher +COPY common common + +# Build the Go app +RUN cd event-watcher && CGO_ENABLED=0 GOOS=linux go build -o "./service" cmd/service/main.go + +############################ +# STEP 2 build a small image +############################ +FROM alpine +#Copy certificates +COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +# Copy our static executable. +COPY --from=build "/app/event-watcher/service" "/service" +# Run the binary. +ENTRYPOINT ["/service"] diff --git a/event-watcher/Makefile b/event-watcher/Makefile new file mode 100644 index 000000000..6c7d3b898 --- /dev/null +++ b/event-watcher/Makefile @@ -0,0 +1,9 @@ + +build: + CGO_ENABLED=0 GOOS=linux go build -o "./bin/service" cmd/service/main.go + +test: + go test -v -cover ./... + + +.PHONY: build test \ No newline at end of file diff --git a/event-watcher/bin/.gitkeep b/event-watcher/bin/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/event-watcher/clients/eth_rpc.go b/event-watcher/clients/eth_rpc.go new file mode 100644 index 000000000..cb2fe46ec --- /dev/null +++ b/event-watcher/clients/eth_rpc.go @@ -0,0 +1,133 @@ +package clients + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/ethereum/go-ethereum/common/hexutil" +) + +type logsResponse struct { + Result []Log `json:"result"` +} + +type Log struct { + Address string `json:"address"` + BlockHash string `json:"blockHash"` + BlockNumber string `json:"blockNumber"` + Data string `json:"data"` + Topics []string `json:"topics"` + TransactionHash string `json:"transactionHash"` +} + +type EthRpcClient struct { + Url string + Auth string +} + +// TODO add rate limits +func NewEthRpcClient(url string, auth string) *EthRpcClient { + return &EthRpcClient{Url: url, Auth: auth} +} + +func (c *EthRpcClient) GetBlockNumber(ctx context.Context) (uint64, error) { + + // Create a new HTTP request + payload := strings.NewReader(`{ + "id": 1, + "jsonrpc": "2.0", + "method": "eth_blockNumber" + }`) + req, err := http.NewRequestWithContext(ctx, "POST", c.Url, payload) + if err != nil { + return 0, fmt.Errorf("failed to create HTTP request: %w", err) + } + + // Add headers + req.Header.Add("accept", "application/json") + req.Header.Add("content-type", "application/json") + req.Header.Add("Authorization", "Bearer: "+c.Auth) + + // Send the request + res, err := http.DefaultClient.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to send HTTP request: %w", err) + } + defer res.Body.Close() + if res.Status != "200 OK" { + return 0, fmt.Errorf("encoutered unexpected HTTP status code in response: %s", res.Status) + } + + // Deserialize response body + body, err := io.ReadAll(res.Body) + if err != nil { + return 0, fmt.Errorf("failed to read HTTP response body: %w", err) + } + var response struct { + Result string `json:"result"` + } + if err := json.Unmarshal(body, &response); err != nil { + return 0, fmt.Errorf("failed to deserialize HTTP response body: %w", err) + } + + // Parse the block number + n, err := hexutil.DecodeUint64(response.Result) + if err != nil { + return 0, fmt.Errorf("failed to parse block number from hex: %w", err) + } + + return n, nil +} + +func (c *EthRpcClient) GetLogs( + ctx context.Context, + fromBlock uint64, + toBlock uint64, + address string, + topic string, +) ([]Log, error) { + + params := fmt.Sprintf(`{ + "id": 1, + "jsonrpc": "2.0", + "method": "eth_getLogs", + "params": [{ + "address": ["%s"], + "fromBlock":"0x%x", + "toBlock":"0x%x", + "topics": ["%s"] + }] + }`, address, fromBlock, toBlock, topic) + payload := strings.NewReader(params) + + req, err := http.NewRequest("POST", c.Url, payload) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.Header.Add("accept", "application/json") + req.Header.Add("content-type", "application/json") + req.Header.Add("Authorization", "Bearer: "+c.Auth) + + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send HTTP request: %w", err) + } + defer res.Body.Close() + + // Deserialize response body + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("failed to read HTTP response body: %w", err) + } + var response logsResponse + if err := json.Unmarshal(body, &response); err != nil { + return nil, fmt.Errorf("failed to deserialize HTTP response body: %w", err) + } + + return response.Result, nil +} diff --git a/event-watcher/cmd/service/main.go b/event-watcher/cmd/service/main.go new file mode 100644 index 000000000..cc8ed9efe --- /dev/null +++ b/event-watcher/cmd/service/main.go @@ -0,0 +1,139 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" + "github.com/wormhole-foundation/wormhole-explorer/common/domain" + "github.com/wormhole-foundation/wormhole-explorer/common/health" + "github.com/wormhole-foundation/wormhole-explorer/common/logger" + "github.com/wormhole-foundation/wormhole-explorer/common/settings" + "github.com/wormhole-foundation/wormhole-explorer/event-watcher/config" + "github.com/wormhole-foundation/wormhole-explorer/event-watcher/http" + "github.com/wormhole-foundation/wormhole-explorer/event-watcher/watchers" + "go.uber.org/zap" +) + +func main() { + + // Load config + cfg, err := settings.LoadFromEnv[config.ServiceSettings]() + if err != nil { + log.Fatal("Error loading config: ", err) + } + + // Build rootLogger + rootLogger := logger.New("wormhole-explorer-core-contract-watcher", logger.WithLevel(cfg.LogLevel)) + + // Create top-level context + rootCtx, rootCtxCancel := context.WithCancel(context.Background()) + + // Connect to MongoDB + rootLogger.Info("connecting to MongoDB...") + db, err := dbutil.Connect(rootCtx, rootLogger, cfg.MongodbURI, cfg.MongodbDatabase) + if err != nil { + rootLogger.Fatal("Error connecting to MongoDB", zap.Error(err)) + } + + // Start serving the monitoring endpoints. + plugins := []health.Check{health.Mongo(db.Database)} + server := http.NewServer( + rootLogger, + cfg.MonitoringPort, + cfg.PprofEnabled, + plugins..., + ) + server.Start() + + // Start the watchers for each chain + if err := startWatchers(rootCtx, rootLogger, db, cfg); err != nil { + rootLogger.Fatal("Failed to start watchers", zap.Error(err)) + } + + // Block until we get a termination signal or the context is cancelled + rootLogger.Info("waiting for termination signal or context cancellation...") + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + select { + case <-rootCtx.Done(): + rootLogger.Warn("terminating (root context cancelled)") + case signal := <-sigterm: + rootLogger.Info("terminating (signal received)", zap.String("signal", signal.String())) + } + + // Shut down gracefully + rootLogger.Info("disconnecting from MongoDB...") + db.DisconnectWithTimeout(15 * time.Second) + rootLogger.Info("cancelling root context...") + rootCtxCancel() + rootLogger.Info("terminated") +} + +func startWatchers( + ctx context.Context, + logger *zap.Logger, + db *dbutil.Session, + cfg *config.ServiceSettings, +) error { + + switch cfg.P2p.P2pNetwork { + case domain.P2pMainNet: + return startWatchersMainnet(ctx, logger, db, cfg) + case domain.P2pTestNet: + return startWatchersTestnet(ctx, logger, db, cfg) + default: + return fmt.Errorf("unknown p2p network: %s", cfg.P2p) + } +} + +func startWatchersMainnet( + ctx context.Context, + logger *zap.Logger, + db *dbutil.Session, + cfg *config.ServiceSettings, +) error { + + // Start Ethereum watcher + { + w := watchers.NewEvmWatcher( + logger, + db, + config.ETHEREUM_MAINNET.ContractAddress, + config.ETHEREUM_MAINNET.Topic, + cfg.EthereumUrl, + cfg.EthereumAuth, + ) + w.Watch(ctx) + } + + return nil +} + +func startWatchersTestnet( + ctx context.Context, + logger *zap.Logger, + db *dbutil.Session, + cfg *config.ServiceSettings, +) error { + + // Start Ethereum watcher + { + w := watchers.NewEvmWatcher( + logger, + db, + config.ETHEREUM_GOERLI.ContractAddress, + config.ETHEREUM_GOERLI.Topic, + cfg.EthereumUrl, + cfg.EthereumAuth, + ) + w.Watch(ctx) + } + + return nil +} diff --git a/event-watcher/config/chains.go b/event-watcher/config/chains.go new file mode 100644 index 000000000..a7fc7a102 --- /dev/null +++ b/event-watcher/config/chains.go @@ -0,0 +1,19 @@ +package config + +type EvmParams struct { + StartingBlock uint64 + ContractAddress string + Topic string +} + +var ETHEREUM_MAINNET = EvmParams{ + StartingBlock: 12_959_638, + ContractAddress: "0x98f3c9e6e3face36baad05fe09d375ef1464288b", + Topic: "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2", +} + +var ETHEREUM_GOERLI = EvmParams{ + StartingBlock: 5_896_171, + ContractAddress: "0x706abc4e45d419950511e474c7b9ed348a4a716c", + Topic: "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2", +} diff --git a/event-watcher/config/structs.go b/event-watcher/config/structs.go new file mode 100644 index 000000000..d50040c5c --- /dev/null +++ b/event-watcher/config/structs.go @@ -0,0 +1,20 @@ +package config + +import ( + "github.com/wormhole-foundation/wormhole-explorer/common/settings" +) + +// ServiceSettings models the configuration settings for the event-watcher service. +type ServiceSettings struct { + settings.Logger + settings.MongoDB + settings.Monitoring + settings.P2p + WatcherSettings +} + +type WatcherSettings struct { + EthereumRequestsPerMinute uint `split_words:"true" default:"INFO"` + EthereumUrl string `split_words:"true" default:"INFO"` + EthereumAuth string `split_words:"true" default:"INFO"` +} diff --git a/event-watcher/go.mod b/event-watcher/go.mod new file mode 100644 index 000000000..435f0e0d8 --- /dev/null +++ b/event-watcher/go.mod @@ -0,0 +1,50 @@ +module github.com/wormhole-foundation/wormhole-explorer/event-watcher + +go 1.19 + +require ( + github.com/gofiber/fiber/v2 v2.48.0 + github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-20230801193834-0f1797e44a0c + go.uber.org/zap v1.25.0 +) + +require ( + github.com/andybalholm/brotli v1.0.5 // indirect + github.com/aws/aws-sdk-go-v2 v1.17.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.28 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22 // indirect + github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sqs v1.20.2 // indirect + github.com/aws/smithy-go v1.13.5 // indirect + github.com/deepmap/oapi-codegen v1.8.2 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/influxdata/influxdb-client-go/v2 v2.12.2 // indirect + github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect + github.com/joho/godotenv v1.5.1 // indirect + github.com/kelseyhightower/envconfig v1.4.0 // indirect + github.com/klauspost/compress v1.16.3 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.48.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.1 // indirect + github.com/xdg-go/stringprep v1.0.3 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + go.mongodb.org/mongo-driver v1.11.2 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + golang.org/x/crypto v0.7.0 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.10.0 // indirect + golang.org/x/text v0.8.0 // indirect +) + +replace github.com/wormhole-foundation/wormhole-explorer/common => ../common diff --git a/event-watcher/http/server.go b/event-watcher/http/server.go new file mode 100644 index 000000000..381085913 --- /dev/null +++ b/event-watcher/http/server.go @@ -0,0 +1,120 @@ +package http + +import ( + "fmt" + + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/pprof" + "github.com/wormhole-foundation/wormhole-explorer/common/health" + "go.uber.org/zap" +) + +type Server struct { + app *fiber.App + port string + logger *zap.Logger +} + +func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...health.Check) *Server { + + app := fiber.New(fiber.Config{DisableStartupMessage: true}) + + // config use of middlware. + if pprofEnabled { + app.Use(pprof.New()) + } + + ctrl := newController(checks, logger) + api := app.Group("/api") + api.Get("/health", ctrl.healthCheck) + api.Get("/ready", ctrl.readinessCheck) + + return &Server{ + app: app, + port: port, + logger: logger, + } +} + +// Start initiates the serving of HTTP requests. +func (s *Server) Start() { + + addr := ":" + s.port + s.logger.Info("Monitoring server starting", zap.String("bindAddress", addr)) + + go func() { + err := s.app.Listen(addr) + if err != nil { + s.logger.Error("Failed to start monitoring server", zap.Error(err), zap.String("bindAddress", addr)) + } + }() +} + +// Stop gracefully shuts down the server. +// +// Blocks until all active connections are closed. +func (s *Server) Stop() { + _ = s.app.Shutdown() +} + +type controller struct { + checks []health.Check + logger *zap.Logger +} + +// newController creates a Controller instance. +func newController(checks []health.Check, logger *zap.Logger) *controller { + return &controller{checks: checks, logger: logger} +} + +// healthCheck is the HTTP handler for the route `GET /health`. +func (c *controller) healthCheck(ctx *fiber.Ctx) error { + + response := ctx.JSON(struct { + Status string `json:"status"` + }{ + Status: "OK", + }) + + return response +} + +// readinessCheck is the HTTP handler for the route `GET /ready`. +func (c *controller) readinessCheck(ctx *fiber.Ctx) error { + + requestCtx := ctx.Context() + requestID := fmt.Sprintf("%v", requestCtx.Value("requestid")) + + // For every callback, check whether it is passing + for _, check := range c.checks { + if err := check(requestCtx); err != nil { + + c.logger.Error( + "Readiness check failed", + zap.Error(err), + zap.String("requestID", requestID), + ) + + // Return error information to the caller + response := ctx. + Status(fiber.StatusInternalServerError). + JSON(struct { + Ready string `json:"ready"` + Error string `json:"error"` + }{ + Ready: "NO", + Error: err.Error(), + }) + return response + } + } + + // All checks passed + response := ctx.Status(fiber.StatusOK). + JSON(struct { + Ready string `json:"ready"` + }{ + Ready: "OK", + }) + return response +} diff --git a/event-watcher/watchers/evm.go b/event-watcher/watchers/evm.go new file mode 100644 index 000000000..02b0b3c4d --- /dev/null +++ b/event-watcher/watchers/evm.go @@ -0,0 +1,107 @@ +package watchers + +import ( + "context" + + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" + "github.com/wormhole-foundation/wormhole-explorer/event-watcher/clients" + "go.uber.org/zap" + "golang.org/x/exp/constraints" +) + +const bulkSize = 100 + +func min[T constraints.Ordered](a, b T) T { + if a < b { + return a + } + return b +} + +type EvmWatcher struct { + logger *zap.Logger + db *dbutil.Session + client *clients.EthRpcClient + coreContractAddress string + logTopic string +} + +func NewEvmWatcher( + logger *zap.Logger, + db *dbutil.Session, + coreContractAddress string, + logTopic string, + url string, + auth string, +) *EvmWatcher { + + w := EvmWatcher{ + logger: logger, + db: db, + client: clients.NewEthRpcClient(url, auth), + coreContractAddress: coreContractAddress, + logTopic: logTopic, + } + + return &w +} + +func (w *EvmWatcher) Watch(ctx context.Context) { + + //TODO: + // - initialize current block in the database, if not already initialized. + // - get current block from database + var currentBlock uint64 = 0 + + for { + // Get the current blockchain head + latestBlock, err := w.client.GetBlockNumber(ctx) + if err != nil { + w.logger.Error("failed to get latest block number", + zap.String("url", w.client.Url), + zap.Error(err), + ) + continue + } + + // Process blocks in bulk + for currentBlock < latestBlock { + from := currentBlock + to := min(currentBlock+bulkSize, latestBlock) + w.processBlockRange(ctx, from, to) + + currentBlock = latestBlock + } + } +} + +func (w *EvmWatcher) processBlockRange(ctx context.Context, fromBlock uint64, toBlock uint64) { + + var logs []clients.Log + var err error + + // Retry until success + for { + logs, err = w.client.GetLogs(ctx, fromBlock, toBlock, w.coreContractAddress, w.logTopic) + if err != nil { + w.logger.Error("failed to get logs", + zap.String("url", w.client.Url), + zap.String("coreContractAddress", w.coreContractAddress), + zap.String("topic", w.logTopic), + zap.Uint64("fromBlock", fromBlock), + zap.Uint64("toBlock", toBlock), + zap.Error(err), + ) + } + break + } + + // Process logs + // TODO: + // - update current block in database + // - fire events for other services + for i := range logs { + log := logs[i] + w.logger.Info("found log", zap.String("transactionHash", log.TransactionHash)) + } +} diff --git a/go.work b/go.work index 32bed2be3..e1c47e31d 100644 --- a/go.work +++ b/go.work @@ -5,6 +5,7 @@ use ( ./api ./common ./contract-watcher + ./event-watcher ./fly ./parser ./pipeline