Skip to content

Commit

Permalink
Feature: add transfer_ticket. Refactoring: receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Aug 15, 2023
1 parent 5259b08 commit 89b577c
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 113 deletions.
10 changes: 5 additions & 5 deletions cmd/api/handlers/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,19 +496,19 @@ func prepareOperation(ctx *config.Context, operation operation.Operation, bmd []
}
op.Protocol = proto.Hash

if operation.IsEvent() {
eventType, err := ast.NewTypedAstFromBytes(operation.EventType)
if operation.HasPayload() {
payloadType, err := ast.NewTypedAstFromBytes(operation.PayloadType)
if err != nil {
return op, err
}
if err := eventType.SettleFromBytes(operation.EventPayload); err != nil {
if err := payloadType.SettleFromBytes(operation.Payload); err != nil {
return op, err
}
eventMiguel, err := eventType.ToMiguel()
payloadMiguel, err := payloadType.ToMiguel()
if err != nil {
return op, err
}
op.Event = eventMiguel
op.Event = payloadMiguel
return op, err
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/api/handlers/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ type Event struct {

// NewEvent -
func NewEvent(o operation.Operation) (*Event, error) {
if !o.IsEvent() {
if o.Kind != types.OperationKindEvent {
return nil, nil
}

Expand All @@ -660,11 +660,11 @@ func NewEvent(o operation.Operation) (*Event, error) {
Tag: o.Tag.String(),
}

eventType, err := ast.NewTypedAstFromBytes(o.EventType)
eventType, err := ast.NewTypedAstFromBytes(o.PayloadType)
if err != nil {
return nil, err
}
if err := eventType.SettleFromBytes(o.EventPayload); err != nil {
if err := eventType.SettleFromBytes(o.Payload); err != nil {
return nil, err
}
eventMiguel, err := eventType.ToMiguel()
Expand Down
3 changes: 3 additions & 0 deletions cmd/indexer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func NewBlockchainIndexer(ctx context.Context, cfg config.Config, network string
// Close -
func (bi *BlockchainIndexer) Close() error {
close(bi.refreshTimer)
if err := bi.receiver.Close(); err != nil {
return nil
}
return bi.Context.Close()
}

Expand Down
105 changes: 35 additions & 70 deletions cmd/indexer/indexer/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package indexer

import (
"context"
"sync"

"github.com/baking-bad/bcdhub/internal/logger"
"github.com/baking-bad/bcdhub/internal/noderpc"
"github.com/dipdup-io/workerpool"
"github.com/pkg/errors"
)

// Block -
Expand All @@ -17,15 +18,10 @@ type Block struct {

// Receiver -
type Receiver struct {
rpc noderpc.INode
queue chan int64
failed chan int64
blocks chan *Block

threads chan struct{}
present map[int64]struct{}
mx sync.RWMutex
wg sync.WaitGroup
rpc noderpc.INode
blocks chan *Block
pool *workerpool.Pool[int64]
inProcess Map[int64, struct{}]
}

// NewReceiver -
Expand All @@ -36,61 +32,44 @@ func NewReceiver(rpc noderpc.INode, queueSize, threadsCount int64) *Receiver {
if threadsCount == 0 {
threadsCount = 2
}
return &Receiver{
rpc: rpc,
queue: make(chan int64, queueSize),
failed: make(chan int64, queueSize),
blocks: make(chan *Block, queueSize),
threads: make(chan struct{}, threadsCount),
present: make(map[int64]struct{}),
receiver := &Receiver{
rpc: rpc,
blocks: make(chan *Block, queueSize),
inProcess: NewMap[int64, struct{}](),
}
receiver.pool = workerpool.NewPool(receiver.job, int(threadsCount))
return receiver
}

// AddTask -
func (r *Receiver) AddTask(level int64) {
r.mx.RLock()
if _, ok := r.present[level]; ok {
r.mx.RUnlock()
if exists := r.inProcess.Exists(level); exists {
return
}
r.mx.RUnlock()

r.mx.Lock()
{
r.present[level] = struct{}{}
}
r.mx.Unlock()
r.queue <- level
r.inProcess.Set(level, struct{}{})
r.pool.AddTask(level)
}

// Start -
func (r *Receiver) Start(ctx context.Context) {
go r.start(ctx)
r.pool.Start(ctx)
}

// Close -
func (r *Receiver) Close() error {
if err := r.pool.Close(); err != nil {
return err
}

close(r.blocks)
return nil
}

// Blocks -
func (r *Receiver) Blocks() <-chan *Block {
return r.blocks
}

func (r *Receiver) start(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.wg.Wait()
close(r.threads)
close(r.blocks)
close(r.failed)
close(r.queue)
return
case level := <-r.failed:
r.job(ctx, level)
case level := <-r.queue:
r.job(ctx, level)
}
}
}

func (r *Receiver) get(ctx context.Context, level int64) (Block, error) {
var block Block
header, err := r.rpc.Block(ctx, level)
Expand All @@ -114,28 +93,14 @@ func (r *Receiver) get(ctx context.Context, level int64) (Block, error) {
}

func (r *Receiver) job(ctx context.Context, level int64) {
r.threads <- struct{}{}
r.wg.Add(1)
go func() {
defer func() {
<-r.threads
r.wg.Done()
}()

block, err := r.get(ctx, level)
if err != nil {
if ctx.Err() == nil {
logger.Error().Int64("block", level).Err(err).Msg("Receiver.get")
r.failed <- level
}
return
}
r.blocks <- &block

r.mx.Lock()
{
delete(r.present, level)
block, err := r.get(ctx, level)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error().Int64("block", level).Err(err).Msg("Receiver.get")
r.pool.AddTask(level)
}
r.mx.Unlock()
}()
return
}
r.blocks <- &block
r.inProcess.Delete(level)
}
47 changes: 47 additions & 0 deletions cmd/indexer/indexer/sync_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package indexer

import "sync"

// Map -
type Map[K comparable, V any] struct {
m map[K]V
mx *sync.RWMutex
}

// NewMap -
func NewMap[K comparable, V any]() Map[K, V] {
return Map[K, V]{
m: make(map[K]V),
mx: new(sync.RWMutex),
}
}

// Set -
func (m Map[K, V]) Set(key K, value V) {
m.mx.Lock()
m.m[key] = value
m.mx.Unlock()
}

// Get -
func (m Map[K, V]) Get(key K) (V, bool) {
m.mx.RLock()
value, ok := m.m[key]
m.mx.RUnlock()
return value, ok
}

// Exists -
func (m Map[K, V]) Exists(key K) bool {
m.mx.RLock()
_, ok := m.m[key]
m.mx.RUnlock()
return ok
}

// Delete -
func (m Map[K, V]) Delete(key K) {
m.mx.Lock()
delete(m.m, key)
m.mx.Unlock()
}
8 changes: 4 additions & 4 deletions configs/development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ indexer:
project_name: indexer
sentry_enabled: false
networks:
mainnet:
receiver_threads: 5
ghostnet:
receiver_threads: 10
# mainnet:
# receiver_threads: 5
# ghostnet:
# receiver_threads: 10
nairobinet:
receiver_threads: 10
connections:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/aws/aws-sdk-go v1.44.92
github.com/btcsuite/btcutil v1.0.2
github.com/dipdup-io/workerpool v0.0.3
github.com/ebellocchia/go-base58 v0.1.0
github.com/fatih/color v1.13.0
github.com/getsentry/sentry-go v0.13.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dipdup-io/workerpool v0.0.3 h1:+cnO0/J0e4UiJ0EBEDpvuhriSDVHlsPminGRU2Il+ZI=
github.com/dipdup-io/workerpool v0.0.3/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA=
github.com/ebellocchia/go-base58 v0.1.0 h1:0w/ODEfZnOPW5KW0QY/Xpb1fxba/BxQJMUa5iYzpljk=
github.com/ebellocchia/go-base58 v0.1.0/go.mod h1:RHE/6C6Ru6YAH9Tc+A9eHQ6ZKEooLC0jw+YLnpt3CAU=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down
1 change: 1 addition & 0 deletions internal/bcd/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
RegisterGlobalConstant = "register_global_constant"
TxRollupOrigination = "tx_rollup_origination"
Event = "event"
TransferTicket = "transfer_ticket"
)

// Error IDs
Expand Down
10 changes: 5 additions & 5 deletions internal/models/operation/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type Operation struct {
Hash []byte
Parameters []byte
DeffatedStorage []byte
EventPayload []byte
EventType []byte
Payload []byte
PayloadType []byte
Script []byte `pg:"-"`

Errors tezerrors.Errors `pg:",type:bytea"`
Expand Down Expand Up @@ -150,9 +150,9 @@ func (o *Operation) IsCall() bool {
return bcd.IsContract(o.Destination.Address) && len(o.Parameters) > 0
}

// IsEvent -
func (o *Operation) IsEvent() bool {
return o.Kind == types.OperationKindEvent
// HasPayload -
func (o *Operation) HasPayload() bool {
return o.Kind == types.OperationKindEvent || o.Kind == types.OperationKindTransferTicket
}

// Result -
Expand Down
5 changes: 5 additions & 0 deletions internal/models/types/operation_kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func NewOperationKind(value string) OperationKind {
return OperationKindTxRollupOrigination
case "event":
return OperationKindEvent
case "transfer_ticket":
return OperationKindTransferTicket
default:
return 0
}
Expand All @@ -42,6 +44,8 @@ func (kind OperationKind) String() string {
return "tx_rollup_origination"
case OperationKindEvent:
return "event"
case OperationKindTransferTicket:
return "transfer_ticket"
default:
return ""
}
Expand All @@ -55,4 +59,5 @@ const (
OperationKindRegisterGlobalConstant
OperationKindTxRollupOrigination
OperationKindEvent
OperationKindTransferTicket
)
43 changes: 24 additions & 19 deletions internal/noderpc/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,30 @@ type OperationGroup struct {

// Operation -
type Operation struct {
Kind string `json:"kind"`
Source string `json:"source"`
Destination *string `json:"destination,omitempty"`
Delegate string `json:"delegate,omitempty"`
Fee int64 `json:"fee,string"`
Counter int64 `json:"counter,string"`
Balance *int64 `json:"balance,omitempty,string"`
GasLimit int64 `json:"gas_limit,string"`
StorageLimit int64 `json:"storage_limit,string"`
Amount *int64 `json:"amount,omitempty,string"`
Nonce *int64 `json:"nonce,omitempty"`
Tag *string `json:"tag,omitempty"`
Parameters stdJSON.RawMessage `json:"parameters,omitempty"`
Metadata *OperationMetadata `json:"metadata,omitempty"`
Result *OperationResult `json:"result,omitempty"`
Script stdJSON.RawMessage `json:"script,omitempty"`
Value stdJSON.RawMessage `json:"value,omitempty"`
Payload stdJSON.RawMessage `json:"payload,omitempty"`
Type stdJSON.RawMessage `json:"type,omitempty"`
Kind string `json:"kind"`
Source string `json:"source"`
Destination *string `json:"destination,omitempty"`
Delegate string `json:"delegate,omitempty"`
Fee int64 `json:"fee,string"`
Counter int64 `json:"counter,string"`
Balance *int64 `json:"balance,omitempty,string"`
GasLimit int64 `json:"gas_limit,string"`
StorageLimit int64 `json:"storage_limit,string"`
Amount *int64 `json:"amount,omitempty,string"`
Nonce *int64 `json:"nonce,omitempty"`
Tag *string `json:"tag,omitempty"`
Entrypoint *string `json:"entrypoint,omitempty"`
TicketTicketer string `json:"ticket_ticketer,omitempty"`
TicketAmount string `json:"ticket_amount,omitempty"`
Parameters stdJSON.RawMessage `json:"parameters,omitempty"`
Metadata *OperationMetadata `json:"metadata,omitempty"`
Result *OperationResult `json:"result,omitempty"`
Script stdJSON.RawMessage `json:"script,omitempty"`
Value stdJSON.RawMessage `json:"value,omitempty"`
Payload stdJSON.RawMessage `json:"payload,omitempty"`
Type stdJSON.RawMessage `json:"type,omitempty"`
TicketContent stdJSON.RawMessage `json:"ticket_contents,omitempty"`
TicketType stdJSON.RawMessage `json:"ticket_ty,omitempty"`
}

// GetResult -
Expand Down
Loading

0 comments on commit 89b577c

Please sign in to comment.