Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

636 plug analytics to the wormholescan events queue #649

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions analytics/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,20 @@ type sqsEvent struct {
Message string `json:"Message"`
}

// VaaEvent represents a vaa data to be handle by the pipeline.
type VaaEvent struct {
ID string `json:"id"`
ChainID uint16 `json:"emitterChain"`
EmitterAddress string `json:"emitterAddr"`
Sequence string `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`
IndexedAt time.Time `json:"indexedAt"`
Timestamp *time.Time `json:"timestamp"`
UpdatedAt *time.Time `json:"updatedAt"`
TxHash string `json:"txHash"`
Version uint16 `json:"version"`
Revision uint16 `json:"revision"`
// Event represents a vaa event to be handle by the pipeline.
type Event struct {
ID string `json:"id"`
ChainID uint16 `json:"emitterChain"`
EmitterAddress string `json:"emitterAddr"`
Sequence uint64 `json:"sequence"`
Vaa []byte `json:"vaas"`
Timestamp *time.Time `json:"timestamp"`
TxHash string `json:"txHash"`
}

// ConsumerMessage defition.
type ConsumerMessage interface {
Data() *VaaEvent
Data() *Event
Done()
Failed()
IsExpired() bool
Expand Down
53 changes: 46 additions & 7 deletions analytics/queue/vaa_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.uber.org/zap"

sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
)

// SQSOption represents a VAA queue in SQS option function.
Expand Down Expand Up @@ -62,18 +63,25 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}

// unmarshal message to vaaEvent
var vaaEvent VaaEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &vaaEvent)
// unmarshal sqsEvent message to NotificationEvent
var notificationEvent domain.NotificationEvent
err = json.Unmarshal([]byte(sqsEvent.Message), &notificationEvent)
if err != nil {
q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err))
q.logger.Error("Error decoding notificationEvent message from SQSEvent", zap.Error(err))
continue
}

// create event
event := q.createEvent(&notificationEvent)
if event == nil {
q.logger.Error("Error creating event from NotificationEvent")
continue
}

q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
id: msg.ReceiptHandle,
data: &vaaEvent,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
Expand All @@ -93,8 +101,39 @@ func (q *SQS) Close() {
close(q.ch)
}

// createEvent creates an event from a notificationEvent.
func (q *SQS) createEvent(notification *domain.NotificationEvent) *Event {
if notification == nil {
q.logger.Debug("notificationEvent is nil")
return nil
}
if notification.Type != domain.SignedVaaType {
q.logger.Debug("notificationEvent type is not SignedVaaType",
zap.String("trackId", notification.TrackID),
zap.String("type", notification.Type))
return nil
}
signedVaa, err := domain.GetEventPayload[domain.SignedVaa](notification)
if err != nil {
q.logger.Error("Error getting SignedVaa from notificationEvent",
zap.Error(err), zap.String("trackId", notification.TrackID),
zap.String("type", notification.Type))
return nil
}

return &Event{
ID: signedVaa.ID,
ChainID: uint16(signedVaa.EmitterChain),
EmitterAddress: signedVaa.EmitterAddr,
Sequence: signedVaa.Sequence,
Vaa: []byte(signedVaa.Vaa),
Timestamp: &signedVaa.Timestamp,
TxHash: signedVaa.TxHash,
}
}

type sqsConsumerMessage struct {
data *VaaEvent
data *Event
consumer *sqs_client.Consumer
wg *sync.WaitGroup
id *string
Expand All @@ -103,7 +142,7 @@ type sqsConsumerMessage struct {
ctx context.Context
}

func (m *sqsConsumerMessage) Data() *VaaEvent {
func (m *sqsConsumerMessage) Data() *Event {
return m.data
}

Expand Down
Loading
Loading