From f9a838749b4f5ba19ebc016846bbc314ae7fba90 Mon Sep 17 00:00:00 2001 From: olegfomenko Date: Thu, 7 Mar 2024 14:35:42 +0200 Subject: [PATCH] adding select with limit --- config.yaml | 3 ++ internal/config/main.go | 3 ++ internal/config/select.go | 36 ++++++++++++++++++++ internal/data/main.go | 1 + internal/data/pg/transaction.go | 7 ++++ internal/services/broadcaster/broadcaster.go | 4 ++- 6 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 internal/config/select.go diff --git a/config.yaml b/config.yaml index 0621dd7..ec5c2a7 100644 --- a/config.yaml +++ b/config.yaml @@ -16,3 +16,6 @@ key: cosmos: addr: "localhost:9090" + +select: + limit: 20 \ No newline at end of file diff --git a/internal/config/main.go b/internal/config/main.go index 0a66955..8cab151 100644 --- a/internal/config/main.go +++ b/internal/config/main.go @@ -13,6 +13,7 @@ type Config interface { pgdb.Databaser Keyer Cosmoser + Selecter Storage() data.Storage } @@ -23,6 +24,7 @@ type config struct { pgdb.Databaser Keyer Cosmoser + Selecter getter kv.Getter storage comfig.Once @@ -38,5 +40,6 @@ func New(getter kv.Getter) Config { Databaser: pgdb.NewDatabaser(getter), Keyer: NewKeyer(getter), Cosmoser: NewCosmoser(getter), + Selecter: NewSelecter(getter), } } diff --git a/internal/config/select.go b/internal/config/select.go new file mode 100644 index 0000000..4799f24 --- /dev/null +++ b/internal/config/select.go @@ -0,0 +1,36 @@ +package config + +import ( + "gitlab.com/distributed_lab/figure" + "gitlab.com/distributed_lab/kit/comfig" + "gitlab.com/distributed_lab/kit/kv" +) + +type Selecter interface { + Limit() uint64 +} + +type selecter struct { + getter kv.Getter + once comfig.Once +} + +func NewSelecter(getter kv.Getter) Selecter { + return &selecter{ + getter: getter, + } +} + +func (s *selecter) Limit() uint64 { + return s.once.Do(func() interface{} { + var config struct { + Limit uint64 `fig:"limit"` + } + + if err := figure.Out(&config).From(kv.MustGetStringMap(s.getter, "select")).Please(); err != nil { + panic(err) + } + + return config.Limit + }).(uint64) +} diff --git a/internal/data/main.go b/internal/data/main.go index 63d2302..72cd765 100644 --- a/internal/data/main.go +++ b/internal/data/main.go @@ -20,5 +20,6 @@ type TransactionQ interface { InsertCtx(ctx context.Context, t *Transaction) error DeleteCtx(ctx context.Context, t *Transaction) error Select(ctx context.Context) ([]Transaction, error) + SelectWithLimit(ctx context.Context, limit uint64) ([]Transaction, error) } type GorpMigrationQ interface{} diff --git a/internal/data/pg/transaction.go b/internal/data/pg/transaction.go index 6e44554..7254efa 100644 --- a/internal/data/pg/transaction.go +++ b/internal/data/pg/transaction.go @@ -13,3 +13,10 @@ func (q TransactionQ) Select(ctx context.Context) ([]data.Transaction, error) { err := q.db.SelectContext(ctx, &result, stmt) return result, errors.Wrap(err, "failed to exec stmt") } + +func (q TransactionQ) SelectWithLimit(ctx context.Context, limit uint64) ([]data.Transaction, error) { + stmt := squirrel.Select(colsTransaction).From("transactions").Limit(limit) + var result []data.Transaction + err := q.db.SelectContext(ctx, &result, stmt) + return result, errors.Wrap(err, "failed to exec stmt") +} diff --git a/internal/services/broadcaster/broadcaster.go b/internal/services/broadcaster/broadcaster.go index 91ea21b..75c4463 100644 --- a/internal/services/broadcaster/broadcaster.go +++ b/internal/services/broadcaster/broadcaster.go @@ -38,6 +38,7 @@ type broadcaster struct { txclient txclient.ServiceClient auth authtypes.QueryClient txQ data.TransactionQ + limit uint64 } func Run(ctx context.Context, cfg config.Config) { @@ -53,13 +54,14 @@ func Run(ctx context.Context, cfg config.Config) { codec.NewProtoCodec(codectypes.NewInterfaceRegistry()), []signing.SignMode{signing.SignMode_SIGN_MODE_DIRECT}, ), + limit: cfg.Limit(), } running.WithBackOff(ctx, svc.log, "run", svc.runOnceIndexing, 5*time.Second, 5*time.Second, 5*time.Second) } func (t *broadcaster) runOnceIndexing(ctx context.Context) error { - txs, err := t.txQ.Select(ctx) + txs, err := t.txQ.SelectWithLimit(ctx, t.limit) if err != nil { return errors.Wrap(err, "failed to select txs") }