Skip to content

Commit

Permalink
636 plug analytics to the wormholescan events queue (#650)
Browse files Browse the repository at this point in the history
* update swagger doc to replace wormscan with wormholescan (#643)

* Add notification event from fly to analytics component

* fix parser published log message event

---------

Co-authored-by: gipsh <hernan.gips@xlabs.xyz>
  • Loading branch information
2 people authored and ftocal committed Oct 5, 2023
1 parent 4807e3e commit f6e5746
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 24 deletions.
25 changes: 10 additions & 15 deletions analytics/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,20 @@ type sqsEvent struct {
Message string `json:"Message"`
}

// VaaEvent represents a vaa data to be handle by the pipeline.
type VaaEvent struct {
ID string `json:"id"`
ChainID uint16 `json:"emitterChain"`
EmitterAddress string `json:"emitterAddr"`
Sequence string `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`
IndexedAt time.Time `json:"indexedAt"`
Timestamp *time.Time `json:"timestamp"`
UpdatedAt *time.Time `json:"updatedAt"`
TxHash string `json:"txHash"`
Version uint16 `json:"version"`
Revision uint16 `json:"revision"`
// Event represents a vaa event to be handle by the pipeline.
type Event struct {
ID string `json:"id"`
ChainID uint16 `json:"emitterChain"`
EmitterAddress string `json:"emitterAddr"`
Sequence uint64 `json:"sequence"`
Vaa []byte `json:"vaas"`
Timestamp *time.Time `json:"timestamp"`
TxHash string `json:"txHash"`
}

// ConsumerMessage defition.
type ConsumerMessage interface {
Data() *VaaEvent
Data() *Event
Done()
Failed()
IsExpired() bool
Expand Down
53 changes: 46 additions & 7 deletions analytics/queue/vaa_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.uber.org/zap"

sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
)

// SQSOption represents a VAA queue in SQS option function.
Expand Down Expand Up @@ -62,18 +63,25 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}

// unmarshal message to vaaEvent
var vaaEvent VaaEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &vaaEvent)
// unmarshal sqsEvent message to NotificationEvent
var notificationEvent domain.NotificationEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &notificationEvent)
if err != nil {
q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err))
q.logger.Error("Error decoding notificationEvent message from SQSEvent", zap.Error(err))
continue
}

// create event
event := q.createEvent(&notificationEvent)
if event == nil {
q.logger.Error("Error creating event from NotificationEvent")
continue
}

q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
data: &vaaEvent,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
Expand All @@ -93,8 +101,39 @@ func (q *SQS) Close() {
close(q.ch)
}

// createEvent creates an event from a notificationEvent.
func (q *SQS) createEvent(notification *domain.NotificationEvent) *Event {
if notification == nil {
q.logger.Debug("notificationEvent is nil")
return nil
}
if notification.Type != domain.SignedVaaType {
q.logger.Debug("notificationEvent type is not SignedVaaType",
zap.String("trackId", notification.TrackID),
zap.String("type", notification.Type))
return nil
}
signedVaa, err := domain.GetEventPayload[domain.SignedVaa](notification)
if err != nil {
q.logger.Error("Error getting SignedVaa from notificationEvent",
zap.Error(err), zap.String("trackId", notification.TrackID),
zap.String("type", notification.Type))
return nil
}

return &Event{
ID: signedVaa.ID,
ChainID: uint16(signedVaa.EmitterChain),
EmitterAddress: signedVaa.EmitterAddr,
Sequence: signedVaa.Sequence,
Vaa: []byte(signedVaa.Vaa),
Timestamp: &signedVaa.Timestamp,
TxHash: signedVaa.TxHash,
}
}

type sqsConsumerMessage struct {
data *VaaEvent
data *Event
consumer *sqs_client.Consumer
wg *sync.WaitGroup
id *string
Expand All @@ -103,7 +142,7 @@ type sqsConsumerMessage struct {
ctx context.Context
}

func (m *sqsConsumerMessage) Data() *VaaEvent {
func (m *sqsConsumerMessage) Data() *Event {
return m.data
}

Expand Down
2 changes: 1 addition & 1 deletion common/domain/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type PublishedLogMessage struct {
ID string `json:"id"`
EmitterChain uint16 `json:"emitterChain"`
EmitterAddr string `json:"emitterAddr"`
Sequence string `json:"sequence"`
Sequence uint64 `json:"sequence"`
Timestamp time.Time `json:"timestamp"`
Vaa []byte `json:"vaa"`
TxHash string `json:"txHash"`
Expand Down
4 changes: 3 additions & 1 deletion parser/queue/vaa_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -147,11 +148,12 @@ func (q *SQS) createEvent(notification *domain.NotificationEvent) *Event {
q.logger.Error("Error decoding publishedLogMessage from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
return nil
}

return &Event{
ID: plm.ID,
ChainID: plm.EmitterChain,
EmitterAddress: plm.EmitterAddr,
Sequence: plm.Sequence,
Sequence: strconv.FormatUint(plm.Sequence, 10),
Vaa: plm.Vaa,
Timestamp: &plm.Timestamp,
TxHash: plm.TxHash,
Expand Down

0 comments on commit f6e5746

Please sign in to comment.