From f8bf1f48f599214a5b09eebefa4ecbeaddeddc95 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Thu, 14 Jan 2021 09:10:06 +0100 Subject: [PATCH 01/14] Pass ethereum block counter to generated ethereum contract The block counter will be used later (one of the next commits) in subscription to get the right range of past events. --- tools/generators/ethereum/command.go.tmpl | 10 ++++++++++ tools/generators/ethereum/command_template_content.go | 10 ++++++++++ tools/generators/ethereum/contract.go.tmpl | 4 ++++ tools/generators/ethereum/contract_template_content.go | 4 ++++ 4 files changed, 28 insertions(+) diff --git a/tools/generators/ethereum/command.go.tmpl b/tools/generators/ethereum/command.go.tmpl index bd21546..477b49e 100644 --- a/tools/generators/ethereum/command.go.tmpl +++ b/tools/generators/ethereum/command.go.tmpl @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" "github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil" "github.com/keep-network/keep-common/pkg/cmd" @@ -226,6 +227,14 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) { miningWaiter := ethutil.NewMiningWaiter(client, checkInterval, maxGasPrice) + blockCounter, err := blockcounter.CreateBlockCounter(client) + if err != nil { + return nil, fmt.Errorf( + "failed to create Ethereum blockcounter: [%v]", + err, + ) + } + address := common.HexToAddress(config.ContractAddresses["{{.Class}}"]) return contract.New{{.Class}}( @@ -234,6 +243,7 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) { client, ethutil.NewNonceManager(key.Address, client), miningWaiter, + blockCounter, &sync.Mutex{}, ) } diff --git a/tools/generators/ethereum/command_template_content.go b/tools/generators/ethereum/command_template_content.go index 9b6d021..d6aac18 100644 --- a/tools/generators/ethereum/command_template_content.go +++ b/tools/generators/ethereum/command_template_content.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" "github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil" "github.com/keep-network/keep-common/pkg/cmd" @@ -229,6 +230,14 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) { miningWaiter := ethutil.NewMiningWaiter(client, checkInterval, maxGasPrice) + blockCounter, err := blockcounter.CreateBlockCounter(client) + if err != nil { + return nil, fmt.Errorf( + "failed to create Ethereum blockcounter: [%v]", + err, + ) + } + address := common.HexToAddress(config.ContractAddresses["{{.Class}}"]) return contract.New{{.Class}}( @@ -237,6 +246,7 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) { client, ethutil.NewNonceManager(key.Address, client), miningWaiter, + blockCounter, &sync.Mutex{}, ) } diff --git a/tools/generators/ethereum/contract.go.tmpl b/tools/generators/ethereum/contract.go.tmpl index 102bc78..43eacc0 100644 --- a/tools/generators/ethereum/contract.go.tmpl +++ b/tools/generators/ethereum/contract.go.tmpl @@ -16,6 +16,7 @@ import ( "github.com/ipfs/go-log" + "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" "github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil" "github.com/keep-network/keep-common/pkg/subscription" ) @@ -51,6 +52,7 @@ type {{.Class}} struct { errorResolver *ethutil.ErrorResolver nonceManager *ethutil.NonceManager miningWaiter *ethutil.MiningWaiter + blockCounter *blockcounter.EthereumBlockCounter transactionMutex *sync.Mutex } @@ -61,6 +63,7 @@ func New{{.Class}}( backend bind.ContractBackend, nonceManager *ethutil.NonceManager, miningWaiter *ethutil.MiningWaiter, + blockCounter *blockcounter.EthereumBlockCounter, transactionMutex *sync.Mutex, ) (*{{.Class}}, error) { callerOptions := &bind.CallOpts{ @@ -99,6 +102,7 @@ func New{{.Class}}( errorResolver: ethutil.NewErrorResolver(backend, &contractABI, &contractAddress), nonceManager: nonceManager, miningWaiter: miningWaiter, + blockCounter: blockCounter, transactionMutex: transactionMutex, }, nil } diff --git a/tools/generators/ethereum/contract_template_content.go b/tools/generators/ethereum/contract_template_content.go index e2085fb..d4eec8b 100644 --- a/tools/generators/ethereum/contract_template_content.go +++ b/tools/generators/ethereum/contract_template_content.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/go-log" + "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" "github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil" "github.com/keep-network/keep-common/pkg/subscription" ) @@ -54,6 +55,7 @@ type {{.Class}} struct { errorResolver *ethutil.ErrorResolver nonceManager *ethutil.NonceManager miningWaiter *ethutil.MiningWaiter + blockCounter *blockcounter.EthereumBlockCounter transactionMutex *sync.Mutex } @@ -64,6 +66,7 @@ func New{{.Class}}( backend bind.ContractBackend, nonceManager *ethutil.NonceManager, miningWaiter *ethutil.MiningWaiter, + blockCounter *blockcounter.EthereumBlockCounter, transactionMutex *sync.Mutex, ) (*{{.Class}}, error) { callerOptions := &bind.CallOpts{ @@ -102,6 +105,7 @@ func New{{.Class}}( errorResolver: ethutil.NewErrorResolver(backend, &contractABI, &contractAddress), nonceManager: nonceManager, miningWaiter: miningWaiter, + blockCounter: blockCounter, transactionMutex: transactionMutex, }, nil } From 8b44e63917dacbae43a5b6c0e25b9f89e1b56492 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Thu, 14 Jan 2021 11:07:55 +0100 Subject: [PATCH 02/14] Draft of the structure for the new event subscription mechanism Subscription generated individually for each event will allow to either install a callback handler or to pipe events to a channel. --- pkg/chain/ethereum/ethutil/subscribe_opts.go | 13 +++++++++ .../ethereum/contract_events.go.tmpl | 29 ++++++++++++++++++- .../contract_events_template_content.go | 29 ++++++++++++++++++- tools/generators/ethereum/contract_parsing.go | 11 +++++++ 4 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 pkg/chain/ethereum/ethutil/subscribe_opts.go diff --git a/pkg/chain/ethereum/ethutil/subscribe_opts.go b/pkg/chain/ethereum/ethutil/subscribe_opts.go new file mode 100644 index 0000000..6a673e5 --- /dev/null +++ b/pkg/chain/ethereum/ethutil/subscribe_opts.go @@ -0,0 +1,13 @@ +package ethutil + +import "time" + +const ( + DefaultSubscribeOptsTickDuration = 15 * time.Minute + DefaultSubscribeOptsBlocksBack = 100 +) + +type SubscribeOpts struct { + TickDuration time.Duration + BlocksBack uint64 +} diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 17f9f10..5ff5692 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -2,10 +2,37 @@ {{- $logger := (print $contract.ShortVar "Logger") -}} {{- range $i, $event := .Events }} +func ({{$contract.ShortVar}} *{{$contract.Class}}) {{$event.CapsName}}( + opts *ethutil.SubscribeOpts, + {{$event.IndexedFilterDeclarations -}} +) *{{$event.CapsName}}Subscription { + return &{{$event.CapsName}}Subscription{ + opts, + {{$event.IndexedFilters}} + } +} + +type {{$event.CapsName}}Subscription struct { + opts *ethutil.SubscribeOpts + {{$event.IndexedFilterFields -}} +} + +func ({{$event.ShortVar}}s *{{$event.CapsName}}Subscription) Pipe( + chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, +) { + +} + type {{$contract.FullVar}}{{$event.CapsName}}Func func( - {{$event.ParamDeclarations -}} + {{$event.ParamDeclarations -}} ) +func ({{$event.ShortVar}}s *{{$event.CapsName}}Subscription) OnEvent( + handler {{$contract.FullVar}}{{$event.CapsName}}Func, +) { + +} + func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events( startBlock uint64, endBlock *uint64, diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 753e362..8f1aa61 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -5,10 +5,37 @@ var contractEventsTemplateContent = `{{- $contract := . -}} {{- $logger := (print $contract.ShortVar "Logger") -}} {{- range $i, $event := .Events }} +func ({{$contract.ShortVar}} *{{$contract.Class}}) {{$event.CapsName}}( + opts *ethutil.SubscribeOpts, + {{$event.IndexedFilterDeclarations -}} +) *{{$event.CapsName}}Subscription { + return &{{$event.CapsName}}Subscription{ + opts, + {{$event.IndexedFilters}} + } +} + +type {{$event.CapsName}}Subscription struct { + opts *ethutil.SubscribeOpts + {{$event.IndexedFilterFields -}} +} + +func ({{$event.ShortVar}}s *{{$event.CapsName}}Subscription) Pipe( + chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, +) { + +} + type {{$contract.FullVar}}{{$event.CapsName}}Func func( - {{$event.ParamDeclarations -}} + {{$event.ParamDeclarations -}} ) +func ({{$event.ShortVar}}s *{{$event.CapsName}}Subscription) OnEvent( + handler {{$contract.FullVar}}{{$event.CapsName}}Func, +) { + +} + func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events( startBlock uint64, endBlock *uint64, diff --git a/tools/generators/ethereum/contract_parsing.go b/tools/generators/ethereum/contract_parsing.go index 58230d9..1e285a1 100644 --- a/tools/generators/ethereum/contract_parsing.go +++ b/tools/generators/ethereum/contract_parsing.go @@ -80,10 +80,12 @@ type returnInfo struct { type eventInfo struct { CapsName string LowerName string + ShortVar string IndexedFilters string ParamExtractors string ParamDeclarations string IndexedFilterDeclarations string + IndexedFilterFields string } func buildContractInfo( @@ -245,6 +247,7 @@ func buildEventInfo(eventsByName map[string]abi.Event) []eventInfo { paramDeclarations := "" paramExtractors := "" indexedFilterDeclarations := "" + indexedFilterFields := "" indexedFilters := "" for _, param := range event.Inputs { upperParam := uppercaseFirst(param.Name) @@ -254,6 +257,7 @@ func buildEventInfo(eventsByName map[string]abi.Event) []eventInfo { paramExtractors += fmt.Sprintf("event.%v,\n", upperParam) if param.Indexed { indexedFilterDeclarations += fmt.Sprintf("%vFilter []%v,\n", param.Name, goType) + indexedFilterFields += fmt.Sprintf("%vFilter []%v\n", param.Name, goType) indexedFilters += fmt.Sprintf("%vFilter,\n", param.Name) } } @@ -261,13 +265,20 @@ func buildEventInfo(eventsByName map[string]abi.Event) []eventInfo { paramDeclarations += "blockNumber uint64,\n" paramExtractors += "event.Raw.BlockNumber,\n" + shortVar := strings.ToLower(string(shortVarRegexp.ReplaceAll( + []byte(name), + []byte("$1"), + ))) + eventInfos = append(eventInfos, eventInfo{ uppercaseFirst(name), lowercaseFirst(name), + shortVar, indexedFilters, paramExtractors, paramDeclarations, indexedFilterDeclarations, + indexedFilterFields, }) } From a5d442efe133f6b72818c3333f70e5bd4c79ddd9 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Thu, 14 Jan 2021 15:16:16 +0100 Subject: [PATCH 03/14] OnEvent and PipeTo event subscription The new event subscription mechanism allows to install event callback handler function using OnEvent function as well as pass a channel to which events should be piped. So far all our event subscriptions use function handlers. While it is convenient in some cases, in some other cases it is the opposite. For example, OnBondedECDSAKeepCreated handler in ECDSA client works perfectly fine as a function. It triggers the protocol and does not have to constantly monior the state of the chain. On the other hand, OnDKGResultSubmitted from beacon client needs to monitor the chain and exit the process of event publication in case another node has already done it. In this case, the code could be better structured with channel-based subscription. Another important change introduced in this commit is a background event check loop that is calling GetPastEvents every 15 minutes (by default). This logic was added to make sure no events are lost from the client's perspective. We observed on mainnet that some third party Ethereum providers are often not delivering events. It may also happen that the event was emitted when client was attempting to reconnect to Ethereum. Both those problems should be fixed by the background loop calling getPastEvents next to the regular event subscription. Important: with this solution, client will receive even more event duplicates than before. It is important to implement an appropriate duplicate filtering logic in the handler. --- .../ethereum/contract_events.go.tmpl | 166 ++++++++++++------ .../contract_events_template_content.go | 166 ++++++++++++------ tools/generators/ethereum/contract_parsing.go | 28 ++- 3 files changed, 239 insertions(+), 121 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 5ff5692..8d5f8de 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -5,95 +5,123 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) {{$event.CapsName}}( opts *ethutil.SubscribeOpts, {{$event.IndexedFilterDeclarations -}} -) *{{$event.CapsName}}Subscription { - return &{{$event.CapsName}}Subscription{ +) *{{$event.SubscriptionCapsName}} { + if opts == nil { + opts = new(ethutil.SubscribeOpts) + } + if opts.TickDuration == 0 { + opts.TickDuration = ethutil.DefaultSubscribeOptsTickDuration + } + if opts.BlocksBack == 0 { + opts.BlocksBack = ethutil.DefaultSubscribeOptsBlocksBack + } + + return &{{$event.SubscriptionCapsName}}{ + {{$contract.ShortVar}}, opts, {{$event.IndexedFilters}} } } -type {{$event.CapsName}}Subscription struct { +type {{$event.SubscriptionCapsName}} struct { + contract *{{$contract.Class}} opts *ethutil.SubscribeOpts {{$event.IndexedFilterFields -}} } -func ({{$event.ShortVar}}s *{{$event.CapsName}}Subscription) Pipe( - chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, -) { - -} - type {{$contract.FullVar}}{{$event.CapsName}}Func func( {{$event.ParamDeclarations -}} ) -func ({{$event.ShortVar}}s *{{$event.CapsName}}Subscription) OnEvent( +func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( handler {{$contract.FullVar}}{{$event.CapsName}}Func, -) { - -} - -func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events( - startBlock uint64, - endBlock *uint64, - {{$event.IndexedFilterDeclarations -}} -) ([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, error){ - iterator, err := {{$contract.ShortVar}}.contract.Filter{{$event.CapsName}}( - &bind.FilterOpts{ - Start: startBlock, - End: endBlock, - }, - {{$event.IndexedFilters}} - ) - if err != nil { - return nil, fmt.Errorf( - "error retrieving past {{$event.CapsName}} events: [%v]", - err, - ) - } - - events := make([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, 0) +) subscription.EventSubscription { + onEventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) + ctx, cancel := context.WithCancel(context.Background()) - for iterator.Next() { - event := iterator.Event - events = append(events, event) - } + go func() { + for { + select { + case <-ctx.Done(): + return + case event := <- onEventChan: + handler( + {{$event.ParamExtractors}} + ) + } + } + }() - return events, nil + sub := {{$event.SubscriptionShortVar}}.Pipe(onEventChan) + return subscription.NewEventSubscription(func() { + sub.Unsubscribe() + cancel() + }) } -func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( - success {{$contract.FullVar}}{{$event.CapsName}}Func, - {{$event.IndexedFilterDeclarations -}} -) (subscription.EventSubscription) { - eventOccurred := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - +func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( + sink chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, +) subscription.EventSubscription { ctx, cancel := context.WithCancel(context.Background()) - - // TODO: Watch* function will soon accept channel as a parameter instead - // of the callback. This loop will be eliminated then. go func() { + ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.TickDuration) for { select { case <-ctx.Done(): + ticker.Stop() return - case event := <-eventOccurred: - success( - {{$event.ParamExtractors}} + case <-ticker.C: + lastBlock, err := {{$event.SubscriptionShortVar}}.contract.blockCounter.CurrentBlock() + if err != nil { + {{$logger}}.Errorf( + "subscription failed to pull events: [%v]", + err, + ) + } + events, err := {{$event.SubscriptionShortVar}}.contract.Past{{$event.CapsName}}Events( + lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack, + nil, + {{$event.IndexedFilterExtractors}} ) + if err != nil { + {{$logger}}.Errorf( + "subscription failed to pull events: [%v]", + err, + ) + continue + } + + for _, event := range events { + sink <- event + } } } }() + sub := {{$event.SubscriptionShortVar}}.contract.watch{{$event.CapsName}}( + sink, + {{$event.IndexedFilterExtractors}} + ) + + return subscription.NewEventSubscription(func() { + sub.Unsubscribe() + cancel() + }) +} + +func ({{$contract.ShortVar}} *{{$contract.Class}}) watch{{$event.CapsName}}( + sink chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, + {{$event.IndexedFilterDeclarations -}} +) event.Subscription { subscribeFn := func(ctx context.Context) (event.Subscription, error) { return {{$contract.ShortVar}}.contract.Watch{{$event.CapsName}}( &bind.WatchOpts{Context: ctx}, - eventOccurred, + sink, {{$event.IndexedFilters}} ) } - sub := ethutil.WithResubscription( + return ethutil.WithResubscription( {{$contract.ShortVar}}SubscriptionBackoffMax, subscribeFn, {{$contract.ShortVar}}SubscriptionAlertThreshold, @@ -114,11 +142,35 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( ) }, ) +} - return subscription.NewEventSubscription(func() { - sub.Unsubscribe() - cancel() - }) +func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events( + startBlock uint64, + endBlock *uint64, + {{$event.IndexedFilterDeclarations -}} +) ([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, error){ + iterator, err := {{$contract.ShortVar}}.contract.Filter{{$event.CapsName}}( + &bind.FilterOpts{ + Start: startBlock, + End: endBlock, + }, + {{$event.IndexedFilters}} + ) + if err != nil { + return nil, fmt.Errorf( + "error retrieving past {{$event.CapsName}} events: [%v]", + err, + ) + } + + events := make([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, 0) + + for iterator.Next() { + event := iterator.Event + events = append(events, event) + } + + return events, nil } {{- end -}} \ No newline at end of file diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 8f1aa61..0fe9c73 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -8,95 +8,123 @@ var contractEventsTemplateContent = `{{- $contract := . -}} func ({{$contract.ShortVar}} *{{$contract.Class}}) {{$event.CapsName}}( opts *ethutil.SubscribeOpts, {{$event.IndexedFilterDeclarations -}} -) *{{$event.CapsName}}Subscription { - return &{{$event.CapsName}}Subscription{ +) *{{$event.SubscriptionCapsName}} { + if opts == nil { + opts = new(ethutil.SubscribeOpts) + } + if opts.TickDuration == 0 { + opts.TickDuration = ethutil.DefaultSubscribeOptsTickDuration + } + if opts.BlocksBack == 0 { + opts.BlocksBack = ethutil.DefaultSubscribeOptsBlocksBack + } + + return &{{$event.SubscriptionCapsName}}{ + {{$contract.ShortVar}}, opts, {{$event.IndexedFilters}} } } -type {{$event.CapsName}}Subscription struct { +type {{$event.SubscriptionCapsName}} struct { + contract *{{$contract.Class}} opts *ethutil.SubscribeOpts {{$event.IndexedFilterFields -}} } -func ({{$event.ShortVar}}s *{{$event.CapsName}}Subscription) Pipe( - chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, -) { - -} - type {{$contract.FullVar}}{{$event.CapsName}}Func func( {{$event.ParamDeclarations -}} ) -func ({{$event.ShortVar}}s *{{$event.CapsName}}Subscription) OnEvent( +func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( handler {{$contract.FullVar}}{{$event.CapsName}}Func, -) { - -} - -func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events( - startBlock uint64, - endBlock *uint64, - {{$event.IndexedFilterDeclarations -}} -) ([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, error){ - iterator, err := {{$contract.ShortVar}}.contract.Filter{{$event.CapsName}}( - &bind.FilterOpts{ - Start: startBlock, - End: endBlock, - }, - {{$event.IndexedFilters}} - ) - if err != nil { - return nil, fmt.Errorf( - "error retrieving past {{$event.CapsName}} events: [%v]", - err, - ) - } - - events := make([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, 0) +) subscription.EventSubscription { + onEventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) + ctx, cancel := context.WithCancel(context.Background()) - for iterator.Next() { - event := iterator.Event - events = append(events, event) - } + go func() { + for { + select { + case <-ctx.Done(): + return + case event := <- onEventChan: + handler( + {{$event.ParamExtractors}} + ) + } + } + }() - return events, nil + sub := {{$event.SubscriptionShortVar}}.Pipe(onEventChan) + return subscription.NewEventSubscription(func() { + sub.Unsubscribe() + cancel() + }) } -func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( - success {{$contract.FullVar}}{{$event.CapsName}}Func, - {{$event.IndexedFilterDeclarations -}} -) (subscription.EventSubscription) { - eventOccurred := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - +func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( + sink chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, +) subscription.EventSubscription { ctx, cancel := context.WithCancel(context.Background()) - - // TODO: Watch* function will soon accept channel as a parameter instead - // of the callback. This loop will be eliminated then. go func() { + ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.TickDuration) for { select { case <-ctx.Done(): + ticker.Stop() return - case event := <-eventOccurred: - success( - {{$event.ParamExtractors}} + case <-ticker.C: + lastBlock, err := {{$event.SubscriptionShortVar}}.contract.blockCounter.CurrentBlock() + if err != nil { + {{$logger}}.Errorf( + "subscription failed to pull events: [%v]", + err, + ) + } + events, err := {{$event.SubscriptionShortVar}}.contract.Past{{$event.CapsName}}Events( + lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack, + nil, + {{$event.IndexedFilterExtractors}} ) + if err != nil { + {{$logger}}.Errorf( + "subscription failed to pull events: [%v]", + err, + ) + continue + } + + for _, event := range events { + sink <- event + } } } }() + sub := {{$event.SubscriptionShortVar}}.contract.watch{{$event.CapsName}}( + sink, + {{$event.IndexedFilterExtractors}} + ) + + return subscription.NewEventSubscription(func() { + sub.Unsubscribe() + cancel() + }) +} + +func ({{$contract.ShortVar}} *{{$contract.Class}}) watch{{$event.CapsName}}( + sink chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, + {{$event.IndexedFilterDeclarations -}} +) event.Subscription { subscribeFn := func(ctx context.Context) (event.Subscription, error) { return {{$contract.ShortVar}}.contract.Watch{{$event.CapsName}}( &bind.WatchOpts{Context: ctx}, - eventOccurred, + sink, {{$event.IndexedFilters}} ) } - sub := ethutil.WithResubscription( + return ethutil.WithResubscription( {{$contract.ShortVar}}SubscriptionBackoffMax, subscribeFn, {{$contract.ShortVar}}SubscriptionAlertThreshold, @@ -117,11 +145,35 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) Watch{{$event.CapsName}}( ) }, ) +} - return subscription.NewEventSubscription(func() { - sub.Unsubscribe() - cancel() - }) +func ({{$contract.ShortVar}} *{{$contract.Class}}) Past{{$event.CapsName}}Events( + startBlock uint64, + endBlock *uint64, + {{$event.IndexedFilterDeclarations -}} +) ([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, error){ + iterator, err := {{$contract.ShortVar}}.contract.Filter{{$event.CapsName}}( + &bind.FilterOpts{ + Start: startBlock, + End: endBlock, + }, + {{$event.IndexedFilters}} + ) + if err != nil { + return nil, fmt.Errorf( + "error retrieving past {{$event.CapsName}} events: [%v]", + err, + ) + } + + events := make([]*abi.{{$contract.AbiClass}}{{$event.CapsName}}, 0) + + for iterator.Next() { + event := iterator.Event + events = append(events, event) + } + + return events, nil } {{- end -}}` diff --git a/tools/generators/ethereum/contract_parsing.go b/tools/generators/ethereum/contract_parsing.go index 1e285a1..ac95519 100644 --- a/tools/generators/ethereum/contract_parsing.go +++ b/tools/generators/ethereum/contract_parsing.go @@ -80,10 +80,13 @@ type returnInfo struct { type eventInfo struct { CapsName string LowerName string + SubscriptionCapsName string ShortVar string + SubscriptionShortVar string IndexedFilters string ParamExtractors string ParamDeclarations string + IndexedFilterExtractors string IndexedFilterDeclarations string IndexedFilterFields string } @@ -244,8 +247,20 @@ func buildMethodInfo( func buildEventInfo(eventsByName map[string]abi.Event) []eventInfo { eventInfos := make([]eventInfo, 0, len(eventsByName)) for name, event := range eventsByName { + + capsName := uppercaseFirst(name) + lowerName := lowercaseFirst(name) + subscriptionCapsName := capsName + "Subscription" + + shortVar := strings.ToLower(string(shortVarRegexp.ReplaceAll( + []byte(name), + []byte("$1"), + ))) + subscriptionShortVar := shortVar + "s" + paramDeclarations := "" paramExtractors := "" + indexedFilterExtractors := "" indexedFilterDeclarations := "" indexedFilterFields := "" indexedFilters := "" @@ -256,6 +271,7 @@ func buildEventInfo(eventsByName map[string]abi.Event) []eventInfo { paramDeclarations += fmt.Sprintf("%v %v,\n", upperParam, goType) paramExtractors += fmt.Sprintf("event.%v,\n", upperParam) if param.Indexed { + indexedFilterExtractors += fmt.Sprintf("%v.%vFilter,\n", subscriptionShortVar, param.Name) indexedFilterDeclarations += fmt.Sprintf("%vFilter []%v,\n", param.Name, goType) indexedFilterFields += fmt.Sprintf("%vFilter []%v\n", param.Name, goType) indexedFilters += fmt.Sprintf("%vFilter,\n", param.Name) @@ -265,18 +281,16 @@ func buildEventInfo(eventsByName map[string]abi.Event) []eventInfo { paramDeclarations += "blockNumber uint64,\n" paramExtractors += "event.Raw.BlockNumber,\n" - shortVar := strings.ToLower(string(shortVarRegexp.ReplaceAll( - []byte(name), - []byte("$1"), - ))) - eventInfos = append(eventInfos, eventInfo{ - uppercaseFirst(name), - lowercaseFirst(name), + capsName, + lowerName, + subscriptionCapsName, shortVar, + subscriptionShortVar, indexedFilters, paramExtractors, paramDeclarations, + indexedFilterExtractors, indexedFilterDeclarations, indexedFilterFields, }) From a6170fa4d8912216e8b8c947498cf9f931ceef56 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Fri, 15 Jan 2021 13:26:01 +0100 Subject: [PATCH 04/14] Log on debug level when subscription monitoring pulls events Simple change adding two logger.Debugf calls. I have also pulled two indentation fixes into this commit. --- tools/generators/ethereum/command.go.tmpl | 2 +- .../generators/ethereum/command_template_content.go | 2 +- tools/generators/ethereum/contract.go.tmpl | 4 ++-- tools/generators/ethereum/contract_events.go.tmpl | 13 ++++++++++++- .../ethereum/contract_events_template_content.go | 13 ++++++++++++- .../ethereum/contract_template_content.go | 4 ++-- 6 files changed, 30 insertions(+), 8 deletions(-) diff --git a/tools/generators/ethereum/command.go.tmpl b/tools/generators/ethereum/command.go.tmpl index 477b49e..5ddfefc 100644 --- a/tools/generators/ethereum/command.go.tmpl +++ b/tools/generators/ethereum/command.go.tmpl @@ -10,7 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" - "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" + "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" "github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil" "github.com/keep-network/keep-common/pkg/cmd" diff --git a/tools/generators/ethereum/command_template_content.go b/tools/generators/ethereum/command_template_content.go index d6aac18..f8e14fc 100644 --- a/tools/generators/ethereum/command_template_content.go +++ b/tools/generators/ethereum/command_template_content.go @@ -13,7 +13,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" - "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" + "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" "github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil" "github.com/keep-network/keep-common/pkg/cmd" diff --git a/tools/generators/ethereum/contract.go.tmpl b/tools/generators/ethereum/contract.go.tmpl index 43eacc0..c53b680 100644 --- a/tools/generators/ethereum/contract.go.tmpl +++ b/tools/generators/ethereum/contract.go.tmpl @@ -16,7 +16,7 @@ import ( "github.com/ipfs/go-log" - "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" + "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" "github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil" "github.com/keep-network/keep-common/pkg/subscription" ) @@ -63,7 +63,7 @@ func New{{.Class}}( backend bind.ContractBackend, nonceManager *ethutil.NonceManager, miningWaiter *ethutil.MiningWaiter, - blockCounter *blockcounter.EthereumBlockCounter, + blockCounter *blockcounter.EthereumBlockCounter, transactionMutex *sync.Mutex, ) (*{{.Class}}, error) { callerOptions := &bind.CallOpts{ diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 8d5f8de..013d72f 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -78,8 +78,15 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( err, ) } + fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack + + {{$logger}}.Debugf( + "Subscription monitoring fetching past {{$event.CapsName}} events " + + "starting from block [%v]", + fromBlock, + ) events, err := {{$event.SubscriptionShortVar}}.contract.Past{{$event.CapsName}}Events( - lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack, + fromBlock, nil, {{$event.IndexedFilterExtractors}} ) @@ -90,6 +97,10 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( ) continue } + {{$logger}}.Debugf( + "Subscription monitoring fetched [%v] past {{$event.CapsName}} events", + len(events), + ) for _, event := range events { sink <- event diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 0fe9c73..be1da94 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -81,8 +81,15 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( err, ) } + fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack + + {{$logger}}.Debugf( + "Subscription monitoring fetching past {{$event.CapsName}} events " + + "starting from block [%v]", + fromBlock, + ) events, err := {{$event.SubscriptionShortVar}}.contract.Past{{$event.CapsName}}Events( - lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack, + fromBlock, nil, {{$event.IndexedFilterExtractors}} ) @@ -93,6 +100,10 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( ) continue } + {{$logger}}.Debugf( + "Subscription monitoring fetched [%v] past {{$event.CapsName}} events", + len(events), + ) for _, event := range events { sink <- event diff --git a/tools/generators/ethereum/contract_template_content.go b/tools/generators/ethereum/contract_template_content.go index d4eec8b..97aa5ab 100644 --- a/tools/generators/ethereum/contract_template_content.go +++ b/tools/generators/ethereum/contract_template_content.go @@ -19,7 +19,7 @@ import ( "github.com/ipfs/go-log" - "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" + "github.com/keep-network/keep-common/pkg/chain/ethereum/blockcounter" "github.com/keep-network/keep-common/pkg/chain/ethereum/ethutil" "github.com/keep-network/keep-common/pkg/subscription" ) @@ -66,7 +66,7 @@ func New{{.Class}}( backend bind.ContractBackend, nonceManager *ethutil.NonceManager, miningWaiter *ethutil.MiningWaiter, - blockCounter *blockcounter.EthereumBlockCounter, + blockCounter *blockcounter.EthereumBlockCounter, transactionMutex *sync.Mutex, ) (*{{.Class}}, error) { callerOptions := &bind.CallOpts{ From e07cc06c379442ca43530b56947a20b347f0aab2 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Fri, 15 Jan 2021 14:00:27 +0100 Subject: [PATCH 05/14] Added documentation to ethutil.SubscribeOpts structure and constants --- pkg/chain/ethereum/ethutil/subscribe_opts.go | 27 ++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/pkg/chain/ethereum/ethutil/subscribe_opts.go b/pkg/chain/ethereum/ethutil/subscribe_opts.go index 6a673e5..a35a795 100644 --- a/pkg/chain/ethereum/ethutil/subscribe_opts.go +++ b/pkg/chain/ethereum/ethutil/subscribe_opts.go @@ -3,11 +3,34 @@ package ethutil import "time" const ( + // DefaultSubscribeOptsTickDuration is the default duration with which + // past events are pulled from the chain by the subscription monitoring + // mechanism if no other value is provided in SubscribeOpts when creating + // the subscription. DefaultSubscribeOptsTickDuration = 15 * time.Minute - DefaultSubscribeOptsBlocksBack = 100 + + // DefaultSubscribeOptsBlocksBack is the default number of past blocks + // pulled from the chain by the subscription monitoring mechanism if no + // other value is provided in SubscribeOpts when creating the subscription. + DefaultSubscribeOptsBlocksBack = 100 ) +// SubscribeOpts specifies optional configuration options that can be passed +// when creating Ethereum event subscription. type SubscribeOpts struct { + + // TickDuration is the duration with which subscription monitoring mechanism + // pulls events from the chain. This mechanism is an additional process + // next to a regular watchLogs subscription making sure no events are lost + // even in case the regular subscription missed them because of, for + // example, connectivity problems. TickDuration time.Duration - BlocksBack uint64 + + // BlocksBack is the number of past blocks subscription monitoring mechanism + // takes into consideration when pulling past events from the chain. + // This event pull mechanism is an additional process next to a regular + // watchLogs subscription making sure no events are lost even in case the + // regular subscription missed them because of, for example, connectivity + // problems. + BlocksBack uint64 } From 90288e13a93d1b37cf78774282c5ecb81240a750 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Fri, 22 Jan 2021 15:36:23 +0100 Subject: [PATCH 06/14] Log subscription monitoring messages on info level This change will provide additional context for event deduplicator messages logged on info level and informing about event being already handled / considered as a duplicate. --- tools/generators/ethereum/contract_events.go.tmpl | 4 ++-- tools/generators/ethereum/contract_events_template_content.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 013d72f..58463ba 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -80,7 +80,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( } fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack - {{$logger}}.Debugf( + {{$logger}}.Infof( "Subscription monitoring fetching past {{$event.CapsName}} events " + "starting from block [%v]", fromBlock, @@ -97,7 +97,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( ) continue } - {{$logger}}.Debugf( + {{$logger}}.Infof( "Subscription monitoring fetched [%v] past {{$event.CapsName}} events", len(events), ) diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index be1da94..5fa317b 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -83,7 +83,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( } fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack - {{$logger}}.Debugf( + {{$logger}}.Infof( "Subscription monitoring fetching past {{$event.CapsName}} events " + "starting from block [%v]", fromBlock, @@ -100,7 +100,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( ) continue } - {{$logger}}.Debugf( + {{$logger}}.Infof( "Subscription monitoring fetched [%v] past {{$event.CapsName}} events", len(events), ) From a05af3111090eecc167afc6106a863543da812e2 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 13:23:09 +0100 Subject: [PATCH 07/14] Logs should start with a lowercase --- tools/generators/ethereum/contract_events.go.tmpl | 4 ++-- tools/generators/ethereum/contract_events_template_content.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 58463ba..d7ee910 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -81,7 +81,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack {{$logger}}.Infof( - "Subscription monitoring fetching past {{$event.CapsName}} events " + + "subscription monitoring fetching past {{$event.CapsName}} events " + "starting from block [%v]", fromBlock, ) @@ -98,7 +98,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( continue } {{$logger}}.Infof( - "Subscription monitoring fetched [%v] past {{$event.CapsName}} events", + "subscription monitoring fetched [%v] past {{$event.CapsName}} events", len(events), ) diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 5fa317b..018913f 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -84,7 +84,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack {{$logger}}.Infof( - "Subscription monitoring fetching past {{$event.CapsName}} events " + + "subscription monitoring fetching past {{$event.CapsName}} events " + "starting from block [%v]", fromBlock, ) @@ -101,7 +101,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( continue } {{$logger}}.Infof( - "Subscription monitoring fetched [%v] past {{$event.CapsName}} events", + "subscription monitoring fetched [%v] past {{$event.CapsName}} events", len(events), ) From 8e70693c0c7366bbfe77b06870652406fd371b5e Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 13:25:26 +0100 Subject: [PATCH 08/14] s/onEventChan/eventChan Just to keep things simple. --- tools/generators/ethereum/contract_events.go.tmpl | 6 +++--- .../generators/ethereum/contract_events_template_content.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index d7ee910..5c1ca73 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -36,7 +36,7 @@ type {{$contract.FullVar}}{{$event.CapsName}}Func func( func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( handler {{$contract.FullVar}}{{$event.CapsName}}Func, ) subscription.EventSubscription { - onEventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) + eventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) ctx, cancel := context.WithCancel(context.Background()) go func() { @@ -44,7 +44,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( select { case <-ctx.Done(): return - case event := <- onEventChan: + case event := <- eventChan: handler( {{$event.ParamExtractors}} ) @@ -52,7 +52,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( } }() - sub := {{$event.SubscriptionShortVar}}.Pipe(onEventChan) + sub := {{$event.SubscriptionShortVar}}.Pipe(eventChan) return subscription.NewEventSubscription(func() { sub.Unsubscribe() cancel() diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 018913f..919deeb 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -39,7 +39,7 @@ type {{$contract.FullVar}}{{$event.CapsName}}Func func( func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( handler {{$contract.FullVar}}{{$event.CapsName}}Func, ) subscription.EventSubscription { - onEventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) + eventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) ctx, cancel := context.WithCancel(context.Background()) go func() { @@ -47,7 +47,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( select { case <-ctx.Done(): return - case event := <- onEventChan: + case event := <- eventChan: handler( {{$event.ParamExtractors}} ) @@ -55,7 +55,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( } }() - sub := {{$event.SubscriptionShortVar}}.Pipe(onEventChan) + sub := {{$event.SubscriptionShortVar}}.Pipe(eventChan) return subscription.NewEventSubscription(func() { sub.Unsubscribe() cancel() From a78a9de97937a782ed862be24092a4f9ba871d07 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 13:26:30 +0100 Subject: [PATCH 09/14] s/cancel/cancelCtx cancelCtx better describes what we are cancelling. --- tools/generators/ethereum/contract_events.go.tmpl | 8 ++++---- .../ethereum/contract_events_template_content.go | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 5c1ca73..2f4defb 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -37,7 +37,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( handler {{$contract.FullVar}}{{$event.CapsName}}Func, ) subscription.EventSubscription { eventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancelCtx := context.WithCancel(context.Background()) go func() { for { @@ -55,14 +55,14 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( sub := {{$event.SubscriptionShortVar}}.Pipe(eventChan) return subscription.NewEventSubscription(func() { sub.Unsubscribe() - cancel() + cancelCtx() }) } func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( sink chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, ) subscription.EventSubscription { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancelCtx := context.WithCancel(context.Background()) go func() { ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.TickDuration) for { @@ -116,7 +116,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( return subscription.NewEventSubscription(func() { sub.Unsubscribe() - cancel() + cancelCtx() }) } diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 919deeb..1f236d6 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -40,7 +40,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( handler {{$contract.FullVar}}{{$event.CapsName}}Func, ) subscription.EventSubscription { eventChan := make(chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancelCtx := context.WithCancel(context.Background()) go func() { for { @@ -58,14 +58,14 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) OnEvent( sub := {{$event.SubscriptionShortVar}}.Pipe(eventChan) return subscription.NewEventSubscription(func() { sub.Unsubscribe() - cancel() + cancelCtx() }) } func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( sink chan *abi.{{$contract.AbiClass}}{{$event.CapsName}}, ) subscription.EventSubscription { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancelCtx := context.WithCancel(context.Background()) go func() { ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.TickDuration) for { @@ -119,7 +119,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( return subscription.NewEventSubscription(func() { sub.Unsubscribe() - cancel() + cancelCtx() }) } From 9e52d983dc7e0284e0d2d8d0ced1b8266ffd5f49 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 13:29:37 +0100 Subject: [PATCH 10/14] Use defer call to stop ticker --- tools/generators/ethereum/contract_events.go.tmpl | 4 ++-- tools/generators/ethereum/contract_events_template_content.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 2f4defb..57b691c 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -65,10 +65,10 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( ctx, cancelCtx := context.WithCancel(context.Background()) go func() { ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.TickDuration) + defer ticker.Stop() for { select { - case <-ctx.Done(): - ticker.Stop() + case <-ctx.Done(): return case <-ticker.C: lastBlock, err := {{$event.SubscriptionShortVar}}.contract.blockCounter.CurrentBlock() diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 1f236d6..ea6c2e6 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -68,10 +68,10 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( ctx, cancelCtx := context.WithCancel(context.Background()) go func() { ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.TickDuration) + defer ticker.Stop() for { select { - case <-ctx.Done(): - ticker.Stop() + case <-ctx.Done(): return case <-ticker.C: lastBlock, err := {{$event.SubscriptionShortVar}}.contract.blockCounter.CurrentBlock() From 552f2ba2234eebf836e10b5037c3b919def0ab40 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 18:01:23 +0100 Subject: [PATCH 11/14] SubscribeOpts.TickDuration -> SubscribeOpts.Tick Similarly, renamed DefaultSubscribeOptsTickDuration to DefaultSubscribeOptsTick. This is consistent with `func Tick(d Duration) <-chan Time` function from Go `time` package and does not suggest events are pulled constantly during this time period. --- pkg/chain/ethereum/ethutil/subscribe_opts.go | 8 ++++---- tools/generators/ethereum/contract_events.go.tmpl | 6 +++--- .../ethereum/contract_events_template_content.go | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/chain/ethereum/ethutil/subscribe_opts.go b/pkg/chain/ethereum/ethutil/subscribe_opts.go index a35a795..57be411 100644 --- a/pkg/chain/ethereum/ethutil/subscribe_opts.go +++ b/pkg/chain/ethereum/ethutil/subscribe_opts.go @@ -3,11 +3,11 @@ package ethutil import "time" const ( - // DefaultSubscribeOptsTickDuration is the default duration with which + // DefaultSubscribeOptsTick is the default duration with which // past events are pulled from the chain by the subscription monitoring // mechanism if no other value is provided in SubscribeOpts when creating // the subscription. - DefaultSubscribeOptsTickDuration = 15 * time.Minute + DefaultSubscribeOptsTick = 15 * time.Minute // DefaultSubscribeOptsBlocksBack is the default number of past blocks // pulled from the chain by the subscription monitoring mechanism if no @@ -19,12 +19,12 @@ const ( // when creating Ethereum event subscription. type SubscribeOpts struct { - // TickDuration is the duration with which subscription monitoring mechanism + // Tick is the duration with which subscription monitoring mechanism // pulls events from the chain. This mechanism is an additional process // next to a regular watchLogs subscription making sure no events are lost // even in case the regular subscription missed them because of, for // example, connectivity problems. - TickDuration time.Duration + Tick time.Duration // BlocksBack is the number of past blocks subscription monitoring mechanism // takes into consideration when pulling past events from the chain. diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 7ae6f56..4778c2b 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -9,8 +9,8 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) {{$event.CapsName}}( if opts == nil { opts = new(ethutil.SubscribeOpts) } - if opts.TickDuration == 0 { - opts.TickDuration = ethutil.DefaultSubscribeOptsTickDuration + if opts.Tick == 0 { + opts.Tick = ethutil.DefaultSubscribeOptsTick } if opts.BlocksBack == 0 { opts.BlocksBack = ethutil.DefaultSubscribeOptsBlocksBack @@ -64,7 +64,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( ) subscription.EventSubscription { ctx, cancelCtx := context.WithCancel(context.Background()) go func() { - ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.TickDuration) + ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.Tick) defer ticker.Stop() for { select { diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index a5adc0a..918fa9d 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -12,8 +12,8 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) {{$event.CapsName}}( if opts == nil { opts = new(ethutil.SubscribeOpts) } - if opts.TickDuration == 0 { - opts.TickDuration = ethutil.DefaultSubscribeOptsTickDuration + if opts.Tick == 0 { + opts.Tick = ethutil.DefaultSubscribeOptsTick } if opts.BlocksBack == 0 { opts.BlocksBack = ethutil.DefaultSubscribeOptsBlocksBack @@ -67,7 +67,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( ) subscription.EventSubscription { ctx, cancelCtx := context.WithCancel(context.Background()) go func() { - ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.TickDuration) + ticker := time.NewTicker({{$event.SubscriptionShortVar}}.opts.Tick) defer ticker.Stop() for { select { From 4b47a3344dd027c7a41718dfb9e217b4b28e1690 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 18:05:37 +0100 Subject: [PATCH 12/14] SubscribeOpts.BlocksBack renamed to PastBlocks Also renamed DefaultSubscribeOptsBlocksBack to DefaultSubscribeOptsPastBlocks. The new naming corresponds with web3 function getPastEvent. --- pkg/chain/ethereum/ethutil/subscribe_opts.go | 8 ++++---- tools/generators/ethereum/contract_events.go.tmpl | 6 +++--- .../ethereum/contract_events_template_content.go | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/chain/ethereum/ethutil/subscribe_opts.go b/pkg/chain/ethereum/ethutil/subscribe_opts.go index 57be411..4811f8f 100644 --- a/pkg/chain/ethereum/ethutil/subscribe_opts.go +++ b/pkg/chain/ethereum/ethutil/subscribe_opts.go @@ -9,10 +9,10 @@ const ( // the subscription. DefaultSubscribeOptsTick = 15 * time.Minute - // DefaultSubscribeOptsBlocksBack is the default number of past blocks + // DefaultSubscribeOptsPastBlocks is the default number of past blocks // pulled from the chain by the subscription monitoring mechanism if no // other value is provided in SubscribeOpts when creating the subscription. - DefaultSubscribeOptsBlocksBack = 100 + DefaultSubscribeOptsPastBlocks = 100 ) // SubscribeOpts specifies optional configuration options that can be passed @@ -26,11 +26,11 @@ type SubscribeOpts struct { // example, connectivity problems. Tick time.Duration - // BlocksBack is the number of past blocks subscription monitoring mechanism + // PastBlocks is the number of past blocks subscription monitoring mechanism // takes into consideration when pulling past events from the chain. // This event pull mechanism is an additional process next to a regular // watchLogs subscription making sure no events are lost even in case the // regular subscription missed them because of, for example, connectivity // problems. - BlocksBack uint64 + PastBlocks uint64 } diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index 4778c2b..e36071e 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -12,8 +12,8 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) {{$event.CapsName}}( if opts.Tick == 0 { opts.Tick = ethutil.DefaultSubscribeOptsTick } - if opts.BlocksBack == 0 { - opts.BlocksBack = ethutil.DefaultSubscribeOptsBlocksBack + if opts.PastBlocks == 0 { + opts.PastBlocks = ethutil.DefaultSubscribeOptsPastBlocks } return &{{$event.SubscriptionCapsName}}{ @@ -78,7 +78,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( err, ) } - fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack + fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.PastBlocks {{$logger}}.Infof( "subscription monitoring fetching past {{$event.CapsName}} events " + diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 918fa9d..06cbc53 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -15,8 +15,8 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) {{$event.CapsName}}( if opts.Tick == 0 { opts.Tick = ethutil.DefaultSubscribeOptsTick } - if opts.BlocksBack == 0 { - opts.BlocksBack = ethutil.DefaultSubscribeOptsBlocksBack + if opts.PastBlocks == 0 { + opts.PastBlocks = ethutil.DefaultSubscribeOptsPastBlocks } return &{{$event.SubscriptionCapsName}}{ @@ -81,7 +81,7 @@ func ({{$event.SubscriptionShortVar}} *{{$event.SubscriptionCapsName}}) Pipe( err, ) } - fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.BlocksBack + fromBlock := lastBlock-{{$event.SubscriptionShortVar}}.opts.PastBlocks {{$logger}}.Infof( "subscription monitoring fetching past {{$event.CapsName}} events " + From f5ebf3ce7b7f31895858cd8bc5c1ce790d6dc5e3 Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Wed, 27 Jan 2021 18:09:43 +0100 Subject: [PATCH 13/14] Fixed indentation Tabs vs spaces war. --- tools/generators/ethereum/command.go.tmpl | 2 +- tools/generators/ethereum/command_template_content.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/generators/ethereum/command.go.tmpl b/tools/generators/ethereum/command.go.tmpl index 5ddfefc..beab0e1 100644 --- a/tools/generators/ethereum/command.go.tmpl +++ b/tools/generators/ethereum/command.go.tmpl @@ -227,7 +227,7 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) { miningWaiter := ethutil.NewMiningWaiter(client, checkInterval, maxGasPrice) - blockCounter, err := blockcounter.CreateBlockCounter(client) + blockCounter, err := blockcounter.CreateBlockCounter(client) if err != nil { return nil, fmt.Errorf( "failed to create Ethereum blockcounter: [%v]", diff --git a/tools/generators/ethereum/command_template_content.go b/tools/generators/ethereum/command_template_content.go index f8e14fc..f5e09ea 100644 --- a/tools/generators/ethereum/command_template_content.go +++ b/tools/generators/ethereum/command_template_content.go @@ -230,7 +230,7 @@ func initialize{{.Class}}(c *cli.Context) (*contract.{{.Class}}, error) { miningWaiter := ethutil.NewMiningWaiter(client, checkInterval, maxGasPrice) - blockCounter, err := blockcounter.CreateBlockCounter(client) + blockCounter, err := blockcounter.CreateBlockCounter(client) if err != nil { return nil, fmt.Errorf( "failed to create Ethereum blockcounter: [%v]", From b3b56f6f88c8ec8ad27efeeca237dc015de75dbf Mon Sep 17 00:00:00 2001 From: Piotr Dyraga Date: Thu, 28 Jan 2021 11:54:31 +0100 Subject: [PATCH 14/14] Moved subscription backoff max and alert threshold to subscribe opts Instead of defining those values per-contract we defined them only one time now. --- pkg/chain/ethereum/ethutil/subscribe_opts.go | 16 ++++++++++++++++ tools/generators/ethereum/contract.go.tmpl | 15 --------------- .../generators/ethereum/contract_events.go.tmpl | 6 +++--- .../ethereum/contract_events_template_content.go | 6 +++--- .../ethereum/contract_template_content.go | 15 --------------- 5 files changed, 22 insertions(+), 36 deletions(-) diff --git a/pkg/chain/ethereum/ethutil/subscribe_opts.go b/pkg/chain/ethereum/ethutil/subscribe_opts.go index 4811f8f..d165ba8 100644 --- a/pkg/chain/ethereum/ethutil/subscribe_opts.go +++ b/pkg/chain/ethereum/ethutil/subscribe_opts.go @@ -13,6 +13,22 @@ const ( // pulled from the chain by the subscription monitoring mechanism if no // other value is provided in SubscribeOpts when creating the subscription. DefaultSubscribeOptsPastBlocks = 100 + + // SubscriptionBackoffMax is the maximum backoff time between event + // resubscription attempts. + SubscriptionBackoffMax = 2 * time.Minute + + // SubscriptionAlertThreshold is time threshold below which event + // resubscription emits an error to the logs. + // WS connection can be dropped at any moment and event resubscription will + // follow. However, if WS connection for event subscription is getting + // dropped too often, it may indicate something is wrong with Ethereum + // client. This constant defines the minimum lifetime of an event + // subscription required before the subscription failure happens and + // resubscription follows so that the resubscription does not emit an error + // to the logs alerting about potential problems with Ethereum client + // connection. + SubscriptionAlertThreshold = 15 * time.Minute ) // SubscribeOpts specifies optional configuration options that can be passed diff --git a/tools/generators/ethereum/contract.go.tmpl b/tools/generators/ethereum/contract.go.tmpl index c53b680..549db2d 100644 --- a/tools/generators/ethereum/contract.go.tmpl +++ b/tools/generators/ethereum/contract.go.tmpl @@ -26,21 +26,6 @@ import ( // included or excluded from logging at startup by name. var {{.ShortVar}}Logger = log.Logger("keep-contract-{{.Class}}") -const ( - // Maximum backoff time between event resubscription attempts. - {{.ShortVar}}SubscriptionBackoffMax = 2 * time.Minute - - // Threshold below which event resubscription emits an error to the logs. - // WS connection can be dropped at any moment and event resubscription will - // follow. However, if WS connection for event subscription is getting - // dropped too often, it may indicate something is wrong with Ethereum - // client. This constant defines the minimum lifetime of an event - // subscription required before the subscription failure happens and - // resubscription follows so that the resubscription does not emit an error - // to the logs alerting about potential problems with Ethereum client. - {{.ShortVar}}SubscriptionAlertThreshold = 15 * time.Minute -) - type {{.Class}} struct { contract *abi.{{.AbiClass}} contractAddress common.Address diff --git a/tools/generators/ethereum/contract_events.go.tmpl b/tools/generators/ethereum/contract_events.go.tmpl index e36071e..79d5574 100644 --- a/tools/generators/ethereum/contract_events.go.tmpl +++ b/tools/generators/ethereum/contract_events.go.tmpl @@ -150,10 +150,10 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) watch{{$event.CapsName}}( ) } - sub := ethutil.WithResubscription( - {{$contract.ShortVar}}SubscriptionBackoffMax, + return ethutil.WithResubscription( + ethutil.SubscriptionBackoffMax, subscribeFn, - {{$contract.ShortVar}}SubscriptionAlertThreshold, + ethutil.SubscriptionAlertThreshold, thresholdViolatedFn, subscriptionFailedFn, ) diff --git a/tools/generators/ethereum/contract_events_template_content.go b/tools/generators/ethereum/contract_events_template_content.go index 06cbc53..ec7c707 100644 --- a/tools/generators/ethereum/contract_events_template_content.go +++ b/tools/generators/ethereum/contract_events_template_content.go @@ -153,10 +153,10 @@ func ({{$contract.ShortVar}} *{{$contract.Class}}) watch{{$event.CapsName}}( ) } - sub := ethutil.WithResubscription( - {{$contract.ShortVar}}SubscriptionBackoffMax, + return ethutil.WithResubscription( + ethutil.SubscriptionBackoffMax, subscribeFn, - {{$contract.ShortVar}}SubscriptionAlertThreshold, + ethutil.SubscriptionAlertThreshold, thresholdViolatedFn, subscriptionFailedFn, ) diff --git a/tools/generators/ethereum/contract_template_content.go b/tools/generators/ethereum/contract_template_content.go index 97aa5ab..34f016d 100644 --- a/tools/generators/ethereum/contract_template_content.go +++ b/tools/generators/ethereum/contract_template_content.go @@ -29,21 +29,6 @@ import ( // included or excluded from logging at startup by name. var {{.ShortVar}}Logger = log.Logger("keep-contract-{{.Class}}") -const ( - // Maximum backoff time between event resubscription attempts. - {{.ShortVar}}SubscriptionBackoffMax = 2 * time.Minute - - // Threshold below which event resubscription emits an error to the logs. - // WS connection can be dropped at any moment and event resubscription will - // follow. However, if WS connection for event subscription is getting - // dropped too often, it may indicate something is wrong with Ethereum - // client. This constant defines the minimum lifetime of an event - // subscription required before the subscription failure happens and - // resubscription follows so that the resubscription does not emit an error - // to the logs alerting about potential problems with Ethereum client. - {{.ShortVar}}SubscriptionAlertThreshold = 15 * time.Minute -) - type {{.Class}} struct { contract *abi.{{.AbiClass}} contractAddress common.Address