Skip to content

Commit

Permalink
adding select with limit
Browse files Browse the repository at this point in the history
  • Loading branch information
olegfomenko committed Mar 7, 2024
1 parent 16cd6a3 commit f9a8387
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 1 deletion.
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ key:

cosmos:
addr: "localhost:9090"

select:
limit: 20
3 changes: 3 additions & 0 deletions internal/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config interface {
pgdb.Databaser
Keyer
Cosmoser
Selecter

Storage() data.Storage
}
Expand All @@ -23,6 +24,7 @@ type config struct {
pgdb.Databaser
Keyer
Cosmoser
Selecter

getter kv.Getter
storage comfig.Once
Expand All @@ -38,5 +40,6 @@ func New(getter kv.Getter) Config {
Databaser: pgdb.NewDatabaser(getter),
Keyer: NewKeyer(getter),
Cosmoser: NewCosmoser(getter),
Selecter: NewSelecter(getter),
}
}
36 changes: 36 additions & 0 deletions internal/config/select.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package config

import (
"gitlab.com/distributed_lab/figure"
"gitlab.com/distributed_lab/kit/comfig"
"gitlab.com/distributed_lab/kit/kv"
)

type Selecter interface {
Limit() uint64
}

type selecter struct {
getter kv.Getter
once comfig.Once
}

func NewSelecter(getter kv.Getter) Selecter {
return &selecter{
getter: getter,
}
}

func (s *selecter) Limit() uint64 {
return s.once.Do(func() interface{} {
var config struct {
Limit uint64 `fig:"limit"`
}

if err := figure.Out(&config).From(kv.MustGetStringMap(s.getter, "select")).Please(); err != nil {
panic(err)
}

return config.Limit
}).(uint64)
}
1 change: 1 addition & 0 deletions internal/data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ type TransactionQ interface {
InsertCtx(ctx context.Context, t *Transaction) error
DeleteCtx(ctx context.Context, t *Transaction) error
Select(ctx context.Context) ([]Transaction, error)
SelectWithLimit(ctx context.Context, limit uint64) ([]Transaction, error)
}
type GorpMigrationQ interface{}
7 changes: 7 additions & 0 deletions internal/data/pg/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ func (q TransactionQ) Select(ctx context.Context) ([]data.Transaction, error) {
err := q.db.SelectContext(ctx, &result, stmt)
return result, errors.Wrap(err, "failed to exec stmt")
}

func (q TransactionQ) SelectWithLimit(ctx context.Context, limit uint64) ([]data.Transaction, error) {
stmt := squirrel.Select(colsTransaction).From("transactions").Limit(limit)
var result []data.Transaction
err := q.db.SelectContext(ctx, &result, stmt)
return result, errors.Wrap(err, "failed to exec stmt")
}
4 changes: 3 additions & 1 deletion internal/services/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type broadcaster struct {
txclient txclient.ServiceClient
auth authtypes.QueryClient
txQ data.TransactionQ
limit uint64
}

func Run(ctx context.Context, cfg config.Config) {
Expand All @@ -53,13 +54,14 @@ func Run(ctx context.Context, cfg config.Config) {
codec.NewProtoCodec(codectypes.NewInterfaceRegistry()),
[]signing.SignMode{signing.SignMode_SIGN_MODE_DIRECT},
),
limit: cfg.Limit(),
}

running.WithBackOff(ctx, svc.log, "run", svc.runOnceIndexing, 5*time.Second, 5*time.Second, 5*time.Second)
}

func (t *broadcaster) runOnceIndexing(ctx context.Context) error {
txs, err := t.txQ.Select(ctx)
txs, err := t.txQ.SelectWithLimit(ctx, t.limit)
if err != nil {
return errors.Wrap(err, "failed to select txs")
}
Expand Down

0 comments on commit f9a8387

Please sign in to comment.