Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zjg555543 committed Dec 12, 2024
1 parent beb7f7f commit ec2c903
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ build-go:
$(GOENVVARS) go build -ldflags "all=$(LDFLAGS)" -o $(GOBIN)/$(GOBINARY) $(GOCMD)

.PHONY: build-docker
build-docker: build-mock-signer-docker ## Builds a docker image with the cdk binary
build-docker:
docker build -t cdk -f ./Dockerfile .

.PHONY: build-mock-signer-docker
Expand Down
41 changes: 41 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/0xPolygon/cdk/config"
"github.com/0xPolygon/cdk/dataavailability"
"github.com/0xPolygon/cdk/dataavailability/datacommittee"
sqldb "github.com/0xPolygon/cdk/db"
"github.com/0xPolygon/cdk/etherman"
ethermanconfig "github.com/0xPolygon/cdk/etherman/config"
"github.com/0xPolygon/cdk/etherman/contracts"
Expand Down Expand Up @@ -85,6 +86,8 @@ func start(cliCtx *cli.Context) error {
cliCtx.Context, components, c.LastGERSync, reorgDetectorL2, l2Client, l1InfoTreeSync,
)

runSqliteServiceIfNeeded(components, *c)

for _, component := range components {
switch component {
case cdkcommon.SEQUENCE_SENDER:
Expand Down Expand Up @@ -812,3 +815,41 @@ func getL2RPCUrl(c *config.Config) string {

return c.AggOracle.EVMSender.URLRPCL2
}

func runSqliteServiceIfNeeded(
components []string,
cfg config.Config,
) {
dbPath := make(map[string]string)
if isNeeded([]string{
cdkcommon.AGGREGATOR},
components) {
dbPath[sqldb.AGG_TX_MGR] = cfg.Aggregator.EthTxManager.StoragePath
dbPath[sqldb.AGG_SYNC] = cfg.Aggregator.Synchronizer.SQLDB.DataSource
dbPath[sqldb.AGG_REORG_L1] = cfg.ReorgDetectorL1.DBPath
} else if isNeeded([]string{
cdkcommon.SEQUENCE_SENDER},
components) {
dbPath[sqldb.SEQS_TX_MGR] = cfg.SequenceSender.EthTxManager.StoragePath
dbPath[sqldb.SEQS_L1_TREE] = cfg.L1InfoTreeSync.DBPath
dbPath[sqldb.SEQS_REORG_L1] = cfg.ReorgDetectorL1.DBPath
} else {
log.Warn("No need to start sqlite service")
return
}

allDBPath := ""
for dbPathKey, dbPathValue := range dbPath {
allDBPath += fmt.Sprintf("%s=%s \n", dbPathKey, dbPathValue)
}

server := sqldb.CreateSqliteService(cfg.Sqlite, dbPath)
log.Info(fmt.Sprintf("Starting sqlite service on %s:%d\n%v", cfg.Sqlite.Host, cfg.Sqlite.Port, allDBPath))
go func() {
if err := server.Start(); err != nil {
log.Fatal(err)
}
}()

return
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"github.com/0xPolygon/cdk/db"
"os"
"strings"

Expand Down Expand Up @@ -165,6 +166,8 @@ type Config struct {

// AggSender is the configuration of the agg sender service
AggSender aggsender.Config

Sqlite db.Config
}

// Load loads the configuration
Expand Down
22 changes: 22 additions & 0 deletions db/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package db

import "github.com/0xPolygon/cdk-rpc/config/types"

// Config represents the configuration of the json rpc
type Config struct {
// Host defines the network adapter that will be used to serve the HTTP requests
Host string `mapstructure:"Host"`

// Port defines the port to serve the endpoints via HTTP
Port int `mapstructure:"Port"`

// ReadTimeout is the HTTP server read timeout
// check net/http.server.ReadTimeout and net/http.server.ReadHeaderTimeout
ReadTimeout types.Duration `mapstructure:"ReadTimeout"`

// WriteTimeout is the HTTP server write timeout
// check net/http.server.WriteTimeout
WriteTimeout types.Duration `mapstructure:"WriteTimeout"`

AuthMethodList string `mapstructure:"AuthMethodList"`
}
247 changes: 247 additions & 0 deletions db/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package db

import (
"context"
dbSql "database/sql"
"errors"
"fmt"
"strings"
"time"

"github.com/0xPolygon/cdk-data-availability/rpc"
jRPC "github.com/0xPolygon/cdk-rpc/rpc"
"github.com/0xPolygon/cdk/db/types"
"github.com/0xPolygon/cdk/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

const (
NAME = "sqlite"
meterName = "github.com/0xPolygon/cdk/sqlite/service"

METHOD_SELECT = "select"
METHOD_UPDATE = "update"
METHOD_DELETE = "delete"

LIMIT_SQL_LEN = 6

zeroHex = "0x0"

SEQS_L1_TREE = "seqs_l1tree"
SEQS_TX_MGR = "seqs_txmgr"
SEQS_REORG_L1 = "seqs_reorg_l1"

AGG_SYNC = "agg_sync"
AGG_TX_MGR = "agg_txmgr"
AGG_REORG_L1 = "agg_reorg_l1"
)

type SqliteEndpoints struct {
logger *log.Logger
meter metric.Meter
readTimeout time.Duration
writeTimeout time.Duration

authMethods []string

dbMaps map[string]string
sqlDBs map[string]*dbSql.DB
}

func CreateSqliteService(
cfg Config,
dbMaps map[string]string,
) *jRPC.Server {
logger := log.WithFields("module", NAME)

meter := otel.Meter(meterName)
methodList := strings.Split(cfg.AuthMethodList, ",")
log.Info(fmt.Sprintf("Sqlite service method auth list: %s", methodList))
for _, s := range methodList {
methodList = append(methodList, s)
}
log.Info(fmt.Sprintf("Sqlite service dbMaps: %v", dbMaps))
time.Sleep(10 * time.Second)
sqlDBs := make(map[string]*dbSql.DB)
for k, dbPath := range dbMaps {
log.Info(fmt.Sprintf("Sqlite service: %s, %s", k, dbPath))
db, err := NewSQLiteDB(dbPath)
if err != nil {
log.Fatal(err)
}
sqlDBs[k] = db
}

services := []jRPC.Service{
{
Name: NAME,
Service: &SqliteEndpoints{
logger: logger,
meter: meter,
readTimeout: cfg.ReadTimeout.Duration,
writeTimeout: cfg.WriteTimeout.Duration,
authMethods: methodList,
dbMaps: dbMaps,
sqlDBs: sqlDBs,
},
},
}

return jRPC.NewServer(jRPC.Config{
Host: cfg.Host,
Port: cfg.Port,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
}, services, jRPC.WithLogger(logger.GetSugaredLogger()))
}

type A struct {
Fields map[string]interface{}
}

func (b *SqliteEndpoints) Select(
db string,
sql string,
) (interface{}, rpc.Error) {
err, dbCon := b.checkAndGetDB(db, sql, METHOD_SELECT)
if err != nil {
return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("invalid sql: %s", err.Error()))
}

ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
rows, err := dbCon.QueryContext(ctx, sql)
if err != nil {
if errors.Is(err, dbSql.ErrNoRows) {
return nil, rpc.NewRPCError(
rpc.DefaultErrorCode, fmt.Sprintf("No rows"), ErrNotFound)
}
return nil, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to query: %s", err.Error()))
}
err, result := getResults(rows)

return result, nil
}

func (b *SqliteEndpoints) Update(
db string,
sql string,
) (interface{}, rpc.Error) {
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
c, merr := b.meter.Int64Counter("claim_proof")
if merr != nil {
b.logger.Warnf("failed to create claim_proof counter: %s", merr)
}
c.Add(ctx, 1)

return types.SqliteData{
ProofLocalExitRoot: "ProofLocalExitRoot",
ProofRollupExitRoot: "ProofRollupExitRoot",
L1InfoTreeLeaf: "L1InfoTreeLeaf",
}, nil
}

func (b *SqliteEndpoints) Delete(
db string,
sql string,
) (interface{}, rpc.Error) {
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
c, merr := b.meter.Int64Counter("claim_proof")
if merr != nil {
b.logger.Warnf("failed to create claim_proof counter: %s", merr)
}
c.Add(ctx, 1)

return types.SqliteData{
ProofLocalExitRoot: "ProofLocalExitRoot",
ProofRollupExitRoot: "ProofRollupExitRoot",
L1InfoTreeLeaf: "L1InfoTreeLeaf",
}, nil
}

func (b *SqliteEndpoints) GetDbList() (interface{}, rpc.Error) {
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
c, merr := b.meter.Int64Counter("claim_proof")
if merr != nil {
b.logger.Warnf("failed to create claim_proof counter: %s", merr)
}
c.Add(ctx, 1)

return types.SqliteData{
ProofLocalExitRoot: "ProofLocalExitRoot",
ProofRollupExitRoot: "ProofRollupExitRoot",
L1InfoTreeLeaf: "L1InfoTreeLeaf",
}, nil
}

func (b *SqliteEndpoints) checkAndGetDB(db string, sql string, method string) (error, *dbSql.DB) {
if len(sql) <= LIMIT_SQL_LEN {
return fmt.Errorf("sql length is too short"), nil
}

sqlMethod := strings.ToLower(sql[:6])
if sqlMethod != method {
return fmt.Errorf("sql method is not valid"), nil
}

found := false
for _, str := range b.authMethods {
if str == method {
found = true
break
}
}
if !found {
return fmt.Errorf("sql method is not authorized"), nil
}

dbCon, ok := b.sqlDBs[db]
if !ok {
return fmt.Errorf("sql db is not valid"), nil
}

return nil, dbCon
}

func getResults(rows *dbSql.Rows) (error, []A) {
var result []A

for rows.Next() {
var a A
a.Fields = make(map[string]interface{})
err := rows.Scan()
if err != nil {
log.Info("Error scanning row:", err)
return err, nil
}

columns, err := rows.Columns()
if err != nil {
fmt.Println("Error getting columns:", err)
return err, nil
}

values := make([]interface{}, len(columns))
for i := range values {
values[i] = &values[i]
}

err = rows.Scan(values...)
if err != nil {
fmt.Println("Error scanning row:", err)
return err, nil
}

for i, col := range columns {
a.Fields[col] = values[i]
}

result = append(result, a)
}

return nil, result
}
7 changes: 7 additions & 0 deletions db/types/sqlite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package types

type SqliteData struct {
ProofLocalExitRoot string
ProofRollupExitRoot string
L1InfoTreeLeaf string
}

0 comments on commit ec2c903

Please sign in to comment.