Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Implement Sandbox notifications processor and publisher #595

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8061ef2
1st version of email publisher
Jul 23, 2023
af73e9a
fix input
Jul 23, 2023
870f382
email feature
Jul 25, 2023
89ccfd2
Update boilerplate version (#589)
flyte-bot Jul 14, 2023
66fb45d
Update boilerplate version (#594)
flyte-bot Jul 26, 2023
b8706bb
sandbox email publisher and processor with test
Jul 27, 2023
c4a0fcf
delete unnecessary comments
Jul 27, 2023
d821aaf
Merge branch 'flyteorg:master' into first-version-email-publihser
Future-Outlier Jul 27, 2023
dc16bb2
update go.sum
Jul 28, 2023
aad4081
Merge branch 'first-version-email-publihser' of https://github.com/Fu…
Jul 28, 2023
8e4c50a
add common.Sandbox type
Jul 29, 2023
e5c12bf
remove emailer setting in factory.go and add mock emailer test in sa…
Jul 30, 2023
be74e6e
update file go.mod and go.sum
Jul 30, 2023
45b050e
Add test for NewNotificationsPublisher and NewNotificationsProcessor …
Aug 1, 2023
742c010
complete test for StartProcessing, Publish and StopProcessing in Test…
Aug 1, 2023
c05a78c
refactor msg channel using singleton pattern and write better unit te…
Aug 4, 2023
44e5e86
refactor the publisher and processor both of their msg channel and re…
Aug 5, 2023
c2a9301
add email error for sandbox processor test, trying to imporove the co…
Aug 9, 2023
22d84e7
improve test code coverage by adding test function in sandbox process…
Aug 9, 2023
85bbe17
make sandbox_processor and sandbox_publisher to 100% code coverage
Aug 9, 2023
19aef7f
Merge branch 'flyteorg:master' into first-version-email-publihser
Future-Outlier Aug 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco
}
emailer = GetEmailer(config, scope)
return implementations.NewGcpProcessor(sub, emailer, scope)
case common.Sandbox:
emailer = GetEmailer(config, scope)
return implementations.NewSandboxProcessor(emailer)
case common.Local:
fallthrough
default:
Expand Down Expand Up @@ -171,6 +174,8 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
panic(err)
}
return implementations.NewPublisher(publisher, scope)
case common.Sandbox:
return implementations.NewSandboxPublisher()
case common.Local:
fallthrough
default:
Expand Down
34 changes: 34 additions & 0 deletions pkg/async/notifications/factory_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
package notifications

import (
"context"
"testing"

"github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)

var (
scope = promutils.NewScope("test_sandbox_processor")
notificationsConfig = runtimeInterfaces.NotificationsConfig{
Type: "sandbox",
}
testEmail = admin.EmailMessage{
RecipientsEmail: []string{
"a@example.com",
"b@example.com",
},
SenderEmail: "no-reply@example.com",
SubjectLine: "Test email",
Body: "This is a sample email.",
}
)

func TestGetEmailer(t *testing.T) {
defer func() { r := recover(); assert.NotNil(t, r) }()
cfg := runtimeInterfaces.NotificationsConfig{
Expand All @@ -23,3 +42,18 @@ func TestGetEmailer(t *testing.T) {
// shouldn't reach here
t.Errorf("did not panic")
}

func TestNewNotificationPublisherAndProcessor(t *testing.T) {
testSandboxPublisher := NewNotificationsPublisher(notificationsConfig, scope)
assert.IsType(t, testSandboxPublisher, &implementations.SandboxPublisher{})
testSandboxProcessor := NewNotificationsProcessor(notificationsConfig, scope)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
assert.IsType(t, testSandboxProcessor, &implementations.SandboxProcessor{})

go func() {
testSandboxProcessor.StartProcessing()
}()

assert.Nil(t, testSandboxPublisher.Publish(context.Background(), "TEST_NOTIFICATION", &testEmail))

assert.Nil(t, testSandboxProcessor.StopProcessing())
}
60 changes: 60 additions & 0 deletions pkg/async/notifications/implementations/sandbox_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package implementations

import (
"context"
"time"

"github.com/flyteorg/flyteadmin/pkg/async"
"github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
)

type SandboxProcessor struct {
email interfaces.Emailer
}

func (p *SandboxProcessor) StartProcessing() {
for {
logger.Warningf(context.Background(), "Starting SandBox notifications processor")
err := p.run()
logger.Errorf(context.Background(), "error with running processor err: [%v] ", err)
time.Sleep(async.RetryDelay)
}
}

func (p *SandboxProcessor) run() error {
var emailMessage admin.EmailMessage

for {
select {
case msg := <-msgChan:
err := proto.Unmarshal(msg, &emailMessage)
if err != nil {
logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err)
return err
}

err = p.email.SendEmail(context.Background(), emailMessage)
if err != nil {
logger.Errorf(context.Background(), "error with sendemail message [%v] ", err)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
return err
}
default:
logger.Debugf(context.Background(), "no message to process")
return nil
}
}
}

func (p *SandboxProcessor) StopProcessing() error {
logger.Debug(context.Background(), "call to sandbox stop processing.")
return nil
}

func NewSandboxProcessor(emailer interfaces.Emailer) interfaces.Processor {
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
return &SandboxProcessor{
email: emailer,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package implementations

import (
"context"
"testing"

"github.com/flyteorg/flyteadmin/pkg/async/notifications/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)

var mockSandboxEmailer mocks.MockEmailer

func TestSandboxProcessor_UnmarshalMessage(t *testing.T) {
var emailMessage admin.EmailMessage
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
err := proto.Unmarshal(msg, &emailMessage)
assert.Nil(t, err)

assert.Equal(t, emailMessage.Body, testEmail.Body)
assert.Equal(t, emailMessage.RecipientsEmail, testEmail.RecipientsEmail)
assert.Equal(t, emailMessage.SubjectLine, testEmail.SubjectLine)
assert.Equal(t, emailMessage.SenderEmail, testEmail.SenderEmail)
}

func TestSandboxProcessor_StartProcessing(t *testing.T) {

testSandboxProcessor := NewSandboxProcessor(&mockSandboxEmailer)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error {
assert.Equal(t, testEmail.Body, email.Body)
assert.Equal(t, testEmail.RecipientsEmail, email.RecipientsEmail)
assert.Equal(t, testEmail.SubjectLine, email.SubjectLine)
assert.Equal(t, testEmail.SenderEmail, email.SenderEmail)
return nil
}
mockSandboxEmailer.SetSendEmailFunc(sendEmailValidationFunc)
assert.Nil(t, testSandboxProcessor.(*SandboxProcessor).run())
}
31 changes: 31 additions & 0 deletions pkg/async/notifications/implementations/sandbox_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package implementations

import (
"context"

"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
)

type SandboxPublisher struct{}

var msgChan = make(chan []byte)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
logger.Debugf(ctx, "Publishing the following message [%s]", msg.String())

data, err := proto.Marshal(msg)

if err != nil {
logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err)
return err
}

msgChan <- data

return nil
}

func NewSandboxPublisher() *SandboxPublisher {
return &SandboxPublisher{}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package implementations

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSandboxPublisher_Publish(t *testing.T) {
publisher := NewSandboxPublisher()
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

errChan := make(chan string)

go func() {
select {
case <-msgChan:
// if message received, no need to send an error
case <-time.After(time.Second * 5):
errChan <- "No data was received in the channel within the expected time frame"
}
}()

err := publisher.Publish(context.Background(), "NOTIFICATION_TYPE", &testEmail)

// Check if there was an error in the goroutine
select {
case errMsg := <-errChan:
t.Fatal(errMsg)
default:
// no error from the goroutine
}

assert.Nil(t, err)
}
9 changes: 5 additions & 4 deletions pkg/common/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ package common
type CloudProvider = string

const (
AWS CloudProvider = "aws"
GCP CloudProvider = "gcp"
Local CloudProvider = "local"
None CloudProvider = "none"
AWS CloudProvider = "aws"
GCP CloudProvider = "gcp"
Sandbox CloudProvider = "sandbox"
Local CloudProvider = "local"
None CloudProvider = "none"
)
Loading