Skip to content

Commit

Permalink
Handle new type of messages from fly and event-watcher in parser (#648)
Browse files Browse the repository at this point in the history
  • Loading branch information
ftocal committed Oct 6, 2023
1 parent d83b014 commit d5fe356
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 36 deletions.
30 changes: 24 additions & 6 deletions common/domain/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,31 @@ import (
"time"
)

const (
SignedVaaType = "signed-vaa"
PublishedLogMessageType = "published-log-message"
)

type NotificationEvent struct {
TrackID string `json:"trackId"`
Source string `json:"source"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}

func NewNotificationEvent[T EventPayload](trackID, source, _type string, payload T) (*NotificationEvent, error) {
p, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return &NotificationEvent{
TrackID: trackID,
Source: source,
Type: _type,
Payload: json.RawMessage(p),
}, nil
}

type EventPayload interface {
SignedVaa | PublishedLogMessage
}
Expand All @@ -24,22 +42,22 @@ func GetEventPayload[T EventPayload](e *NotificationEvent) (T, error) {

type SignedVaa struct {
ID string `json:"id"`
EmitterChain int `json:"emitterChain"`
EmitterChain uint16 `json:"emitterChain"`
EmitterAddr string `json:"emitterAddr"`
Sequence string `json:"sequence"`
GuardianSetIndex int `json:"guardianSetIndex"`
Sequence uint64 `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Timestamp time.Time `json:"timestamp"`
Vaa string `json:"vaa"`
Vaa []byte `json:"vaa"`
TxHash string `json:"txHash"`
Version int `json:"version"`
}

type PublishedLogMessage struct {
ID string `json:"id"`
EmitterChain int `json:"emitterChain"`
EmitterChain uint16 `json:"emitterChain"`
EmitterAddr string `json:"emitterAddr"`
Sequence string `json:"sequence"`
Timestamp time.Time `json:"timestamp"`
Vaa string `json:"vaa"`
Vaa []byte `json:"vaa"`
TxHash string `json:"txHash"`
}
57 changes: 57 additions & 0 deletions common/domain/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package domain

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
)

// TestGetEventPayload contains a test harness for the `GetEventPayload` function.
func Test_GetEventPayload(t *testing.T) {

body := `{
"trackId": "63e16082da939a263512a307",
"source": "fly",
"type": "signed-vaa",
"payload": {
"id": "2/000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7/162727",
"emitterChain": 2,
"emitterAddr": "000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7",
"sequence": 162727,
"guardianSetIndex": 0,
"timestamp": "2023-08-04T11:43:48.000Z",
"vaa": "010000000001005defe63f46c192b506758684fada6b97f5a8ee287a82efefa35c59dcf369a83b1abfe5431ad51a31051bf42851b5f699421e525745db03e8bc43a6b36dde6fc00064cd0ea4446900000002000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d70000000000027ba7010300000000000000000000000000000000000000000000000000000000004c4b40000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d600026d9ae6b2d333c1d65301a59da3eed388ca5dc60cb12496584b75cbe6b15fdbed002000000000000000000000000072b916142650cb48bbbed0acaeb5b287d1c55d917b2262617369635f726563697069656e74223a7b22726563697069656e74223a22633256704d58426f4e445631626a646a4e6a426c6448566d6432317964575272617a4a3061336877647a4e6f595859794e6d4e6d5a6a5933227d7d",
"txHash" : "406065c15b62426c51f987f5923fb376f6b60cb1c15724cc5460a08d18ccc337",
"version" : 1
}
}`

event := NotificationEvent{}
err := json.Unmarshal([]byte(body), &event)
assert.NoError(t, err)
assert.Equal(t, "63e16082da939a263512a307", event.TrackID)
assert.Equal(t, "fly", event.Source)
assert.Equal(t, SignedVaaType, event.Type)
signedVaa, err := GetEventPayload[SignedVaa](&event)
assert.NoError(t, err)
assert.Equal(t, "2/000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7/162727", signedVaa.ID)
}

func Test_GetEventPayload_Error(t *testing.T) {

body := `{
"trackId": "63e16082da939a263512a307",
"source": "fly",
"type": "signed-vaa"
}`

event := NotificationEvent{}
err := json.Unmarshal([]byte(body), &event)
assert.NoError(t, err)
assert.Equal(t, "63e16082da939a263512a307", event.TrackID)
assert.Equal(t, "fly", event.Source)
assert.Equal(t, SignedVaaType, event.Type)
_, err = GetEventPayload[SignedVaa](&event)
assert.Error(t, err)
}
4 changes: 2 additions & 2 deletions parser/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func (c *Consumer) Start(ctx context.Context) {

// check id message is expired.
if msg.IsExpired() {
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
c.logger.Warn("Notification event expired", zap.String("id", event.ID))
msg.Failed()
continue
}
c.metrics.IncVaaUnexpired(event.ChainID)

_, err := c.process(ctx, event.Vaa)
if err != nil {
c.logger.Error("Error processing parsed vaa",
c.logger.Error("Error processing notification event",
zap.String("id", event.ID),
zap.Error(err))
msg.Failed()
Expand Down
6 changes: 3 additions & 3 deletions parser/queue/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package queue
import "github.com/wormhole-foundation/wormhole/sdk/vaa"

// PythFilter filter vaa event from pyth chain.
func PythFilter(vaaEvent *VaaEvent) bool {
return vaaEvent.ChainID == uint16(vaa.ChainIDPythNet)
func PythFilter(event *Event) bool {
return event.ChainID == uint16(vaa.ChainIDPythNet)
}

// NonFilter non filter vaa evant.
func NonFilter(vaaEvent *VaaEvent) bool {
func NonFilter(event *Event) bool {
return false
}
25 changes: 10 additions & 15 deletions parser/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,20 @@ type sqsEvent struct {
Message string `json:"Message"`
}

// VaaEvent represents a vaa data to be handle by the pipeline.
type VaaEvent struct {
ID string `json:"id"`
ChainID uint16 `json:"emitterChain"`
EmitterAddress string `json:"emitterAddr"`
Sequence string `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`
IndexedAt time.Time `json:"indexedAt"`
Timestamp *time.Time `json:"timestamp"`
UpdatedAt *time.Time `json:"updatedAt"`
TxHash string `json:"txHash"`
Version uint16 `json:"version"`
Revision uint16 `json:"revision"`
// Event represents a vaa data to be handle by the pipeline.
type Event struct {
ID string
ChainID uint16
EmitterAddress string
Sequence string
Vaa []byte
Timestamp *time.Time
TxHash string
}

// ConsumerMessage defition.
type ConsumerMessage interface {
Data() *VaaEvent
Data() *Event
Done()
Failed()
IsExpired() bool
Expand Down
69 changes: 59 additions & 10 deletions parser/queue/vaa_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package queue
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
"go.uber.org/zap"
Expand All @@ -26,7 +28,7 @@ type SQS struct {
}

// FilterConsumeFunc filter vaaa func definition.
type FilterConsumeFunc func(vaaEvent *VaaEvent) bool
type FilterConsumeFunc func(vaaEvent *Event) bool

// NewVAASQS creates a VAA queue in SQS instances.
func NewVAASQS(consumer *sqs.Consumer, filterConsume FilterConsumeFunc, metrics metrics.Metrics, logger *zap.Logger, opts ...SQSOption) *SQS {
Expand Down Expand Up @@ -70,28 +72,34 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}

// unmarshal message to vaaEvent
var vaaEvent VaaEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &vaaEvent)
// unmarshal message to NotificationEvent
var notification domain.NotificationEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &notification)
if err != nil {
q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err))
continue
}
q.metrics.IncVaaConsumedQueue(vaaEvent.ChainID)

event := q.createEvent(&notification)
if event == nil {
continue
}

q.metrics.IncVaaConsumedQueue(event.ChainID)

// filter vaaEvent by p2p net.
if q.filterConsume(&vaaEvent) {
if q.filterConsume(event) {
if err := q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
q.logger.Error("Error deleting message from SQS", zap.Error(err))
}
continue
}
q.metrics.IncVaaUnfiltered(vaaEvent.ChainID)
q.metrics.IncVaaUnfiltered(event.ChainID)

q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
data: &vaaEvent,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
Expand All @@ -111,8 +119,49 @@ func (q *SQS) Close() {
close(q.ch)
}

func (q *SQS) createEvent(notification *domain.NotificationEvent) *Event {
if notification.Type != domain.SignedVaaType && notification.Type != domain.PublishedLogMessageType {
q.logger.Debug("Skip event type", zap.String("trackId", notification.TrackID), zap.String("type", notification.Type))
return nil
}

switch notification.Type {
case domain.SignedVaaType:
signedVaa, err := domain.GetEventPayload[domain.SignedVaa](notification)
if err != nil {
q.logger.Error("Error decoding signedVAA from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil
}
return &Event{
ID: signedVaa.ID,
ChainID: signedVaa.EmitterChain,
EmitterAddress: signedVaa.EmitterAddr,
Sequence: fmt.Sprintf("%d", signedVaa.Sequence),
Vaa: signedVaa.Vaa,
Timestamp: &signedVaa.Timestamp,
TxHash: signedVaa.TxHash,
}
case domain.PublishedLogMessageType:
plm, err := domain.GetEventPayload[domain.PublishedLogMessage](notification)
if err != nil {
q.logger.Error("Error decoding publishedLogMessage from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil
}
return &Event{
ID: plm.ID,
ChainID: plm.EmitterChain,
EmitterAddress: plm.EmitterAddr,
Sequence: plm.Sequence,
Vaa: plm.Vaa,
Timestamp: &plm.Timestamp,
TxHash: plm.TxHash,
}
}
return nil
}

type sqsConsumerMessage struct {
data *VaaEvent
data *Event
consumer *sqs.Consumer
wg *sync.WaitGroup
id *string
Expand All @@ -121,7 +170,7 @@ type sqsConsumerMessage struct {
ctx context.Context
}

func (m *sqsConsumerMessage) Data() *VaaEvent {
func (m *sqsConsumerMessage) Data() *Event {
return m.data
}

Expand Down

0 comments on commit d5fe356

Please sign in to comment.