Skip to content

Commit

Permalink
In Memory Bridge Response (#13422)
Browse files Browse the repository at this point in the history
* In Memory Bridge Response

This commit adds a cache to the bridges ORM for responses and upserts responses async instead
of waiting for a database call to complete. Get requests for responses are lazy in that they
first look in the memory cache before querying the database.

* initialize internal values

* add tests and refactor constructor

* fix orm query

* addressed comments

* rename to remove 'Bridge'

* health report using CopyHealth

* remove 'Bridge

* remove bridge from test

* add BridgeCache to health tests

* fix data race

* results of make generate

* addressed comments

* simplify getting values from map

* make linter happy

* add logger to tests

* fix test

* address comments

* initialize logger before runner

* cleanup tests

* whitespace fix

* increase interval and default to SQL timeout
  • Loading branch information
EasterTheBunny authored Jun 18, 2024
1 parent 6e80574 commit 42c74fc
Show file tree
Hide file tree
Showing 14 changed files with 7,714 additions and 64 deletions.
7 changes: 7 additions & 0 deletions core/bridges/bridge_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,10 @@ func (t *BridgeName) Scan(value interface{}) error {
*t = BridgeName(temp)
return nil
}

type BridgeResponse struct {
DotID string
SpecID int32
Value []byte
FinishedAt time.Time
}
265 changes: 265 additions & 0 deletions core/bridges/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
package bridges

import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"

"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

const (
CacheServiceName = "BridgeCache"
DefaultUpsertInterval = 5 * time.Second
)

type Cache struct {
// dependencies and configurations
ORM
lggr logger.Logger
interval time.Duration

// service state
services.StateMachine
wg sync.WaitGroup
stop chan struct{}

// data state
bridgeTypesCache sync.Map
bridgeLastValueCache map[string]BridgeResponse
mu sync.RWMutex
}

var _ ORM = (*Cache)(nil)
var _ services.Service = (*Cache)(nil)

func NewCache(base ORM, lggr logger.Logger, upsertInterval time.Duration) *Cache {
return &Cache{
ORM: base,
lggr: lggr.Named(CacheServiceName),
interval: upsertInterval,
stop: make(chan struct{}, 1),
bridgeLastValueCache: make(map[string]BridgeResponse),
}
}

func (c *Cache) WithDataSource(ds sqlutil.DataSource) ORM {
return NewCache(NewORM(ds), c.lggr, c.interval)
}

func (c *Cache) FindBridge(ctx context.Context, name BridgeName) (BridgeType, error) {
if bridgeType, ok := c.bridgeTypesCache.Load(name); ok {
return bridgeType.(BridgeType), nil
}

ormResult, err := c.ORM.FindBridge(ctx, name)
if err == nil {
c.bridgeTypesCache.Store(ormResult.Name, ormResult)
}

return ormResult, err
}

func (c *Cache) FindBridges(ctx context.Context, names []BridgeName) ([]BridgeType, error) {
if len(names) == 0 {
return nil, errors.New("at least one bridge name is required")
}

var (
allFoundBts []BridgeType
searchNames []BridgeName
)

for _, n := range names {
if bridgeType, ok := c.bridgeTypesCache.Load(n); ok {
allFoundBts = append(allFoundBts, bridgeType.(BridgeType))

continue
}

searchNames = append(searchNames, n)
}

if len(allFoundBts) == len(names) {
return allFoundBts, nil
}

bts, err := c.ORM.FindBridges(ctx, searchNames)
if err != nil {
return nil, err
}

for _, bt := range bts {
c.bridgeTypesCache.Store(bt.Name, bt)
}

allFoundBts = append(allFoundBts, bts...)
if len(allFoundBts) != len(names) {
return nil, fmt.Errorf("not all bridges exist, asked for %v, exists %v", names, allFoundBts)
}

return allFoundBts, nil
}

func (c *Cache) DeleteBridgeType(ctx context.Context, bt *BridgeType) error {
err := c.ORM.DeleteBridgeType(ctx, bt)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return err
}
}

// We delete regardless of the rows affected, in case it gets out of sync
c.bridgeTypesCache.Delete(bt.Name)

return err
}

func (c *Cache) BridgeTypes(ctx context.Context, offset int, limit int) ([]BridgeType, int, error) {
return c.ORM.BridgeTypes(ctx, offset, limit)
}

func (c *Cache) CreateBridgeType(ctx context.Context, bt *BridgeType) error {
err := c.ORM.CreateBridgeType(ctx, bt)
if err != nil {
return err
}

c.bridgeTypesCache.Store(bt.Name, *bt)

return nil
}

func (c *Cache) UpdateBridgeType(ctx context.Context, bt *BridgeType, btr *BridgeTypeRequest) error {
if err := c.ORM.UpdateBridgeType(ctx, bt, btr); err != nil {
return err
}

c.bridgeTypesCache.Store(bt.Name, *bt)

return nil
}

func (c *Cache) GetCachedResponse(ctx context.Context, dotId string, specId int32, maxElapsed time.Duration) ([]byte, error) {
// prefer to get latest value from cache
cached, inCache := c.latestValue(dotId, specId)
if inCache && cached.FinishedAt.After(time.Now().Add(-maxElapsed)) {
return cached.Value, nil
}

response, finishedAt, err := c.ORM.GetCachedResponseWithFinished(ctx, dotId, specId, maxElapsed)
if err != nil {
return nil, err
}

c.setValue(dotId, specId, BridgeResponse{
DotID: dotId,
SpecID: specId,
Value: response,
FinishedAt: finishedAt,
})

return response, nil
}

func (c *Cache) UpsertBridgeResponse(ctx context.Context, dotId string, specId int32, response []byte) error {
upsertTime := time.Now()

// catch the rare case of a save race
cached, inCache := c.latestValue(dotId, specId)
if inCache && cached.FinishedAt.After(upsertTime) {
return nil
}

c.setValue(dotId, specId, BridgeResponse{
DotID: dotId,
SpecID: specId,
Value: response,
FinishedAt: upsertTime,
})

return nil
}

func (c *Cache) Start(context.Context) error {
return c.StartOnce(CacheServiceName, func() error {
c.wg.Add(1)

go c.run()

return nil
})
}

func (c *Cache) Close() error {
return c.StopOnce(CacheServiceName, func() error {
close(c.stop)
c.wg.Wait()

return nil
})
}

func (c *Cache) HealthReport() map[string]error {
return map[string]error{c.Name(): c.Healthy()}
}

func (c *Cache) Name() string {
return c.lggr.Name()
}

func (c *Cache) run() {
defer c.wg.Done()

for {
timer := time.NewTimer(utils.WithJitter(c.interval))

select {
case <-timer.C:
c.doBulkUpsert()
case <-c.stop:
timer.Stop()

return
}
}
}

func (c *Cache) doBulkUpsert() {
c.mu.RLock()
values := maps.Values(c.bridgeLastValueCache)
c.mu.RUnlock()

if err := c.ORM.BulkUpsertBridgeResponse(context.Background(), values); err != nil {
c.lggr.Warnf("bulk upsert of bridge responses failed: %s", err.Error())
}
}

func (c *Cache) latestValue(dotId string, specId int32) (BridgeResponse, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

cached, inCache := c.bridgeLastValueCache[responseKey(dotId, specId)]

return cached, inCache
}

func (c *Cache) setValue(dotId string, specId int32, resp BridgeResponse) {
c.mu.Lock()
defer c.mu.Unlock()

c.bridgeLastValueCache[responseKey(dotId, specId)] = resp
}

func responseKey(dotId string, specId int32) string {
return fmt.Sprintf("%s||%d", dotId, specId)
}
Loading

0 comments on commit 42c74fc

Please sign in to comment.