Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
refactor the publisher and processor both of their msg channel and re…
Browse files Browse the repository at this point in the history
…name both of them

Signed-off-by: Future Outlier <eric901201@gmai.com>
  • Loading branch information
Future Outlier committed Aug 5, 2023
1 parent c05a78c commit 44e5e86
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 21 deletions.
10 changes: 5 additions & 5 deletions pkg/async/notifications/implementations/sandbox_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

type SandboxProcessor struct {
email interfaces.Emailer
msgChan chan []byte
subChan <-chan []byte
}

func (p *SandboxProcessor) StartProcessing() {
Expand All @@ -30,7 +30,7 @@ func (p *SandboxProcessor) run() error {

for {
select {
case msg := <-p.msgChan:
case msg := <-p.subChan:
err := proto.Unmarshal(msg, &emailMessage)
if err != nil {
logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err)
Expand All @@ -39,7 +39,7 @@ func (p *SandboxProcessor) run() error {

err = p.email.SendEmail(context.Background(), emailMessage)
if err != nil {
logger.Errorf(context.Background(), "error with sendemail message [%v] ", err)
logger.Errorf(context.Background(), "Error sending an email message for message [%s] with emailM with err: %v", emailMessage.String(), err)
return err
}
default:
Expand All @@ -54,9 +54,9 @@ func (p *SandboxProcessor) StopProcessing() error {
return nil
}

func NewSandboxProcessor(msgChan chan []byte, emailer interfaces.Emailer) interfaces.Processor {
func NewSandboxProcessor(subChan <-chan []byte, emailer interfaces.Emailer) interfaces.Processor {
return &SandboxProcessor{
msgChan: msgChan,
subChan: subChan,
email: emailer,
}
}
12 changes: 0 additions & 12 deletions pkg/async/notifications/implementations/sandbox_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,11 @@ import (

"github.com/flyteorg/flyteadmin/pkg/async/notifications/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)

var mockSandboxEmailer mocks.MockEmailer

func TestSandboxProcessor_UnmarshalMessage(t *testing.T) {
var emailMessage admin.EmailMessage
err := proto.Unmarshal(msg, &emailMessage)
assert.Nil(t, err)

assert.Equal(t, emailMessage.Body, testEmail.Body)
assert.Equal(t, emailMessage.RecipientsEmail, testEmail.RecipientsEmail)
assert.Equal(t, emailMessage.SubjectLine, testEmail.SubjectLine)
assert.Equal(t, emailMessage.SenderEmail, testEmail.SenderEmail)
}

func TestSandboxProcessor_StartProcessing(t *testing.T) {
msgChan := make(chan []byte, 1)
testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer)
Expand Down
8 changes: 4 additions & 4 deletions pkg/async/notifications/implementations/sandbox_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type SandboxPublisher struct {
msgChan chan []byte
pubChan chan<- []byte
}

func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
Expand All @@ -21,13 +21,13 @@ func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string,
return err
}

p.msgChan <- data
p.pubChan <- data

return nil
}

func NewSandboxPublisher(msgChan chan []byte) *SandboxPublisher {
func NewSandboxPublisher(pubChan chan<- []byte) *SandboxPublisher {
return &SandboxPublisher{
msgChan: msgChan,
pubChan: pubChan,
}
}

0 comments on commit 44e5e86

Please sign in to comment.