diff --git a/Makefile b/Makefile index 123a8c7b..939f9927 100644 --- a/Makefile +++ b/Makefile @@ -67,7 +67,7 @@ build-go: $(GOENVVARS) go build -ldflags "all=$(LDFLAGS)" -o $(GOBIN)/$(GOBINARY) $(GOCMD) .PHONY: build-docker -build-docker: build-mock-signer-docker ## Builds a docker image with the cdk binary +build-docker: docker build -t cdk -f ./Dockerfile . .PHONY: build-mock-signer-docker diff --git a/cmd/run.go b/cmd/run.go index 365e2407..f781151d 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -26,6 +26,7 @@ import ( "github.com/0xPolygon/cdk/config" "github.com/0xPolygon/cdk/dataavailability" "github.com/0xPolygon/cdk/dataavailability/datacommittee" + sqldb "github.com/0xPolygon/cdk/db" "github.com/0xPolygon/cdk/etherman" ethermanconfig "github.com/0xPolygon/cdk/etherman/config" "github.com/0xPolygon/cdk/etherman/contracts" @@ -85,6 +86,8 @@ func start(cliCtx *cli.Context) error { cliCtx.Context, components, c.LastGERSync, reorgDetectorL2, l2Client, l1InfoTreeSync, ) + runSqliteServiceIfNeeded(components, *c) + for _, component := range components { switch component { case cdkcommon.SEQUENCE_SENDER: @@ -812,3 +815,41 @@ func getL2RPCUrl(c *config.Config) string { return c.AggOracle.EVMSender.URLRPCL2 } + +func runSqliteServiceIfNeeded( + components []string, + cfg config.Config, +) { + dbPath := make(map[string]string) + if isNeeded([]string{ + cdkcommon.AGGREGATOR}, + components) { + dbPath[sqldb.AGG_TX_MGR] = cfg.Aggregator.EthTxManager.StoragePath + dbPath[sqldb.AGG_SYNC] = cfg.Aggregator.Synchronizer.SQLDB.DataSource + dbPath[sqldb.AGG_REORG_L1] = cfg.ReorgDetectorL1.DBPath + } else if isNeeded([]string{ + cdkcommon.SEQUENCE_SENDER}, + components) { + dbPath[sqldb.SEQS_TX_MGR] = cfg.SequenceSender.EthTxManager.StoragePath + dbPath[sqldb.SEQS_L1_TREE] = cfg.L1InfoTreeSync.DBPath + dbPath[sqldb.SEQS_REORG_L1] = cfg.ReorgDetectorL1.DBPath + } else { + log.Warn("No need to start sqlite service") + return + } + + allDBPath := "" + for dbPathKey, dbPathValue := range dbPath { + allDBPath += fmt.Sprintf("%s=%s \n", dbPathKey, dbPathValue) + } + + server := sqldb.CreateSqliteService(cfg.Sqlite, dbPath) + log.Info(fmt.Sprintf("Starting sqlite service on %s:%d\n%v", cfg.Sqlite.Host, cfg.Sqlite.Port, allDBPath)) + go func() { + if err := server.Start(); err != nil { + log.Fatal(err) + } + }() + + return +} diff --git a/config/config.go b/config/config.go index 9363b93b..6d9ab935 100644 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "github.com/0xPolygon/cdk/db" "os" "strings" @@ -165,6 +166,8 @@ type Config struct { // AggSender is the configuration of the agg sender service AggSender aggsender.Config + + Sqlite db.Config } // Load loads the configuration diff --git a/db/config.go b/db/config.go new file mode 100644 index 00000000..c300f6ca --- /dev/null +++ b/db/config.go @@ -0,0 +1,22 @@ +package db + +import "github.com/0xPolygon/cdk-rpc/config/types" + +// Config represents the configuration of the json rpc +type Config struct { + // Host defines the network adapter that will be used to serve the HTTP requests + Host string `mapstructure:"Host"` + + // Port defines the port to serve the endpoints via HTTP + Port int `mapstructure:"Port"` + + // ReadTimeout is the HTTP server read timeout + // check net/http.server.ReadTimeout and net/http.server.ReadHeaderTimeout + ReadTimeout types.Duration `mapstructure:"ReadTimeout"` + + // WriteTimeout is the HTTP server write timeout + // check net/http.server.WriteTimeout + WriteTimeout types.Duration `mapstructure:"WriteTimeout"` + + AuthMethodList string `mapstructure:"AuthMethodList"` +} diff --git a/db/service.go b/db/service.go new file mode 100644 index 00000000..38a60080 --- /dev/null +++ b/db/service.go @@ -0,0 +1,247 @@ +package db + +import ( + "context" + dbSql "database/sql" + "errors" + "fmt" + "strings" + "time" + + "github.com/0xPolygon/cdk-data-availability/rpc" + jRPC "github.com/0xPolygon/cdk-rpc/rpc" + "github.com/0xPolygon/cdk/db/types" + "github.com/0xPolygon/cdk/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +const ( + NAME = "sqlite" + meterName = "github.com/0xPolygon/cdk/sqlite/service" + + METHOD_SELECT = "select" + METHOD_UPDATE = "update" + METHOD_DELETE = "delete" + + LIMIT_SQL_LEN = 6 + + zeroHex = "0x0" + + SEQS_L1_TREE = "seqs_l1tree" + SEQS_TX_MGR = "seqs_txmgr" + SEQS_REORG_L1 = "seqs_reorg_l1" + + AGG_SYNC = "agg_sync" + AGG_TX_MGR = "agg_txmgr" + AGG_REORG_L1 = "agg_reorg_l1" +) + +type SqliteEndpoints struct { + logger *log.Logger + meter metric.Meter + readTimeout time.Duration + writeTimeout time.Duration + + authMethods []string + + dbMaps map[string]string + sqlDBs map[string]*dbSql.DB +} + +func CreateSqliteService( + cfg Config, + dbMaps map[string]string, +) *jRPC.Server { + logger := log.WithFields("module", NAME) + + meter := otel.Meter(meterName) + methodList := strings.Split(cfg.AuthMethodList, ",") + log.Info(fmt.Sprintf("Sqlite service method auth list: %s", methodList)) + for _, s := range methodList { + methodList = append(methodList, s) + } + log.Info(fmt.Sprintf("Sqlite service dbMaps: %v", dbMaps)) + time.Sleep(10 * time.Second) + sqlDBs := make(map[string]*dbSql.DB) + for k, dbPath := range dbMaps { + log.Info(fmt.Sprintf("Sqlite service: %s, %s", k, dbPath)) + db, err := NewSQLiteDB(dbPath) + if err != nil { + log.Fatal(err) + } + sqlDBs[k] = db + } + + services := []jRPC.Service{ + { + Name: NAME, + Service: &SqliteEndpoints{ + logger: logger, + meter: meter, + readTimeout: cfg.ReadTimeout.Duration, + writeTimeout: cfg.WriteTimeout.Duration, + authMethods: methodList, + dbMaps: dbMaps, + sqlDBs: sqlDBs, + }, + }, + } + + return jRPC.NewServer(jRPC.Config{ + Host: cfg.Host, + Port: cfg.Port, + ReadTimeout: cfg.ReadTimeout, + WriteTimeout: cfg.WriteTimeout, + }, services, jRPC.WithLogger(logger.GetSugaredLogger())) +} + +type A struct { + Fields map[string]interface{} +} + +func (b *SqliteEndpoints) Select( + db string, + sql string, +) (interface{}, rpc.Error) { + err, dbCon := b.checkAndGetDB(db, sql, METHOD_SELECT) + if err != nil { + return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("invalid sql: %s", err.Error())) + } + + ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) + defer cancel() + rows, err := dbCon.QueryContext(ctx, sql) + if err != nil { + if errors.Is(err, dbSql.ErrNoRows) { + return nil, rpc.NewRPCError( + rpc.DefaultErrorCode, fmt.Sprintf("No rows"), ErrNotFound) + } + return nil, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to query: %s", err.Error())) + } + err, result := getResults(rows) + + return result, nil +} + +func (b *SqliteEndpoints) Update( + db string, + sql string, +) (interface{}, rpc.Error) { + ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) + defer cancel() + c, merr := b.meter.Int64Counter("claim_proof") + if merr != nil { + b.logger.Warnf("failed to create claim_proof counter: %s", merr) + } + c.Add(ctx, 1) + + return types.SqliteData{ + ProofLocalExitRoot: "ProofLocalExitRoot", + ProofRollupExitRoot: "ProofRollupExitRoot", + L1InfoTreeLeaf: "L1InfoTreeLeaf", + }, nil +} + +func (b *SqliteEndpoints) Delete( + db string, + sql string, +) (interface{}, rpc.Error) { + ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) + defer cancel() + c, merr := b.meter.Int64Counter("claim_proof") + if merr != nil { + b.logger.Warnf("failed to create claim_proof counter: %s", merr) + } + c.Add(ctx, 1) + + return types.SqliteData{ + ProofLocalExitRoot: "ProofLocalExitRoot", + ProofRollupExitRoot: "ProofRollupExitRoot", + L1InfoTreeLeaf: "L1InfoTreeLeaf", + }, nil +} + +func (b *SqliteEndpoints) GetDbList() (interface{}, rpc.Error) { + ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout) + defer cancel() + c, merr := b.meter.Int64Counter("claim_proof") + if merr != nil { + b.logger.Warnf("failed to create claim_proof counter: %s", merr) + } + c.Add(ctx, 1) + + return types.SqliteData{ + ProofLocalExitRoot: "ProofLocalExitRoot", + ProofRollupExitRoot: "ProofRollupExitRoot", + L1InfoTreeLeaf: "L1InfoTreeLeaf", + }, nil +} + +func (b *SqliteEndpoints) checkAndGetDB(db string, sql string, method string) (error, *dbSql.DB) { + if len(sql) <= LIMIT_SQL_LEN { + return fmt.Errorf("sql length is too short"), nil + } + + sqlMethod := strings.ToLower(sql[:6]) + if sqlMethod != method { + return fmt.Errorf("sql method is not valid"), nil + } + + found := false + for _, str := range b.authMethods { + if str == method { + found = true + break + } + } + if !found { + return fmt.Errorf("sql method is not authorized"), nil + } + + dbCon, ok := b.sqlDBs[db] + if !ok { + return fmt.Errorf("sql db is not valid"), nil + } + + return nil, dbCon +} + +func getResults(rows *dbSql.Rows) (error, []A) { + var result []A + + for rows.Next() { + var a A + a.Fields = make(map[string]interface{}) + err := rows.Scan() + if err != nil { + log.Info("Error scanning row:", err) + return err, nil + } + + columns, err := rows.Columns() + if err != nil { + fmt.Println("Error getting columns:", err) + return err, nil + } + + values := make([]interface{}, len(columns)) + for i := range values { + values[i] = &values[i] + } + + err = rows.Scan(values...) + if err != nil { + fmt.Println("Error scanning row:", err) + return err, nil + } + + for i, col := range columns { + a.Fields[col] = values[i] + } + + result = append(result, a) + } + + return nil, result +} diff --git a/db/types/sqlite.go b/db/types/sqlite.go new file mode 100644 index 00000000..2d6df760 --- /dev/null +++ b/db/types/sqlite.go @@ -0,0 +1,7 @@ +package types + +type SqliteData struct { + ProofLocalExitRoot string + ProofRollupExitRoot string + L1InfoTreeLeaf string +}