Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
refactor msg channel using singleton pattern and write better unit te…
Browse files Browse the repository at this point in the history
…sting

Signed-off-by: Future Outlier <eric901201@gmai.com>
  • Loading branch information
Future Outlier committed Aug 4, 2023
1 parent 742c010 commit c05a78c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 35 deletions.
16 changes: 14 additions & 2 deletions pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package notifications
import (
"context"
"fmt"
"sync"
"time"

"github.com/flyteorg/flyteadmin/pkg/async"
Expand All @@ -27,6 +28,9 @@ const maxRetries = 3

var enable64decoding = false

var msgChan chan []byte
var once sync.Once

type PublisherConfig struct {
TopicName string
}
Expand All @@ -41,6 +45,13 @@ type EmailerConfig struct {
BaseURL string
}

// For sandbox only
func CreateMsgChan() {
once.Do(func() {
msgChan = make(chan []byte)
})
}

func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Emailer {
// If an external email service is specified use that instead.
// TODO: Handling of this is messy, see https://github.com/flyteorg/flyte/issues/1063
Expand Down Expand Up @@ -122,7 +133,7 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco
return implementations.NewGcpProcessor(sub, emailer, scope)
case common.Sandbox:
emailer = GetEmailer(config, scope)
return implementations.NewSandboxProcessor(emailer)
return implementations.NewSandboxProcessor(msgChan, emailer)
case common.Local:
fallthrough
default:
Expand Down Expand Up @@ -175,7 +186,8 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
}
return implementations.NewPublisher(publisher, scope)
case common.Sandbox:
return implementations.NewSandboxPublisher()
CreateMsgChan()
return implementations.NewSandboxPublisher(msgChan)
case common.Local:
fallthrough
default:
Expand Down
10 changes: 6 additions & 4 deletions pkg/async/notifications/implementations/sandbox_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

type SandboxProcessor struct {
email interfaces.Emailer
email interfaces.Emailer
msgChan chan []byte
}

func (p *SandboxProcessor) StartProcessing() {
Expand All @@ -29,7 +30,7 @@ func (p *SandboxProcessor) run() error {

for {
select {
case msg := <-msgChan:
case msg := <-p.msgChan:
err := proto.Unmarshal(msg, &emailMessage)
if err != nil {
logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err)
Expand All @@ -53,8 +54,9 @@ func (p *SandboxProcessor) StopProcessing() error {
return nil

Check warning on line 54 in pkg/async/notifications/implementations/sandbox_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_processor.go#L52-L54

Added lines #L52 - L54 were not covered by tests
}

func NewSandboxProcessor(emailer interfaces.Emailer) interfaces.Processor {
func NewSandboxProcessor(msgChan chan []byte, emailer interfaces.Emailer) interfaces.Processor {
return &SandboxProcessor{
email: emailer,
msgChan: msgChan,
email: emailer,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func TestSandboxProcessor_UnmarshalMessage(t *testing.T) {
}

func TestSandboxProcessor_StartProcessing(t *testing.T) {

testSandboxProcessor := NewSandboxProcessor(&mockSandboxEmailer)
msgChan := make(chan []byte, 1)
testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer)

sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error {
assert.Equal(t, testEmail.Body, email.Body)
Expand Down
14 changes: 8 additions & 6 deletions pkg/async/notifications/implementations/sandbox_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/golang/protobuf/proto"
)

type SandboxPublisher struct{}

var msgChan = make(chan []byte)
type SandboxPublisher struct {
msgChan chan []byte
}

func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
logger.Debugf(ctx, "Publishing the following message [%s]", msg.String())
Expand All @@ -21,11 +21,13 @@ func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string,
return err
}

Check warning on line 22 in pkg/async/notifications/implementations/sandbox_publisher.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_publisher.go#L20-L22

Added lines #L20 - L22 were not covered by tests

msgChan <- data
p.msgChan <- data

return nil
}

func NewSandboxPublisher() *SandboxPublisher {
return &SandboxPublisher{}
func NewSandboxPublisher(msgChan chan []byte) *SandboxPublisher {
return &SandboxPublisher{
msgChan: msgChan,
}
}
24 changes: 3 additions & 21 deletions pkg/async/notifications/implementations/sandbox_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,16 @@ package implementations
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSandboxPublisher_Publish(t *testing.T) {
publisher := NewSandboxPublisher()

errChan := make(chan string)

go func() {
select {
case <-msgChan:
// if message received, no need to send an error
case <-time.After(time.Second * 5):
errChan <- "No data was received in the channel within the expected time frame"
}
}()
msgChan := make(chan []byte, 1)
publisher := NewSandboxPublisher(msgChan)

err := publisher.Publish(context.Background(), "NOTIFICATION_TYPE", &testEmail)

// Check if there was an error in the goroutine
select {
case errMsg := <-errChan:
t.Fatal(errMsg)
default:
// no error from the goroutine
}

assert.NotZero(t, len(msgChan))
assert.Nil(t, err)
}

0 comments on commit c05a78c

Please sign in to comment.