Skip to content

Commit

Permalink
Feat/add message queue (#185)
Browse files Browse the repository at this point in the history
* feat: add dependency

* feat: add interface and mock

* feat: add mq service

* feat: set up mq

* feat: add example env

* fix: test issue

---------

Co-authored-by: Eyo Chen <eyo.chen@amazingtalker.com>
  • Loading branch information
eyo-chen and Eyo Chen authored Dec 21, 2024
1 parent 927e71d commit 0a3258e
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 3 deletions.
4 changes: 3 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ REDIS_URL=redis://redis:6379
AWS_REGION=aws_region
AWS_KEY=aws_key
AWS_SECRET=aws_secret
AWS_BUCKET=aws_bucket
AWS_BUCKET=aws_bucket
MESSAGE_QUEUE_URL=amqp://guest:guest@rabbitmq:5672/
MESSAGE_QUEUE_NAME=expense-tracker-queue
10 changes: 9 additions & 1 deletion cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

adapter "github.com/eyo-chen/expense-tracker-go/internal/adapter"
"github.com/eyo-chen/expense-tracker-go/internal/adapter/service/mq"
"github.com/eyo-chen/expense-tracker-go/internal/adapter/service/s3"
"github.com/eyo-chen/expense-tracker-go/internal/handler"
"github.com/eyo-chen/expense-tracker-go/internal/router"
Expand Down Expand Up @@ -54,8 +55,15 @@ func main() {
logger.Info("Connecting to S3...")
s3Client, presignClient := s3.NewS3Clients(os.Getenv("AWS_REGION"), os.Getenv("AWS_KEY"), os.Getenv("AWS_SECRET"))

logger.Info("Connecting to RabbitMQ...")
mqClient, err := mq.NewMQClient(os.Getenv("MESSAGE_QUEUE_URL"))
if err != nil {
logger.Fatal("Unable to connect to rabbitmq", "error", err)
}
defer mqClient.Close()

// Setup adapter, usecase, and handler
adapter := adapter.New(mysqlDB, redisClient, s3Client, presignClient, os.Getenv("AWS_BUCKET"))
adapter := adapter.New(mysqlDB, redisClient, s3Client, presignClient, mqClient, os.Getenv("AWS_BUCKET"), os.Getenv("MESSAGE_QUEUE_NAME"))
usecase := usecase.New(adapter.User, adapter.MainCateg, adapter.SubCateg, adapter.Icon, adapter.Transaction, adapter.MonthlyTrans, adapter.RedisService, adapter.UserIcon, adapter.S3Service)
handler := handler.New(usecase.User, usecase.MainCateg, usecase.SubCateg, usecase.Transaction, usecase.Icon, usecase.UserIcon, usecase.InitData)
if err := initServe(handler); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cron/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func handleRequest(ctx context.Context) error {
defer mysqlDB.Close()

// Setup adapter and usecase
adapter := adapter.New(mysqlDB, nil, nil, nil, "")
adapter := adapter.New(mysqlDB, nil, nil, nil, nil, "", "")
monthlyTransUC := monthlytrans.New(adapter.MonthlyTrans, adapter.Transaction)

// Execute monthly transaction aggregation for previous month
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/opencontainers/runc v1.1.13 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
github.com/redis/go-redis/v9 v9.6.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/objx v0.5.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
5 changes: 5 additions & 0 deletions internal/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/eyo-chen/expense-tracker-go/internal/adapter/repository/transaction"
"github.com/eyo-chen/expense-tracker-go/internal/adapter/repository/user"
"github.com/eyo-chen/expense-tracker-go/internal/adapter/repository/usericon"
"github.com/eyo-chen/expense-tracker-go/internal/adapter/service/mq"
redisservice "github.com/eyo-chen/expense-tracker-go/internal/adapter/service/redis"
s3service "github.com/eyo-chen/expense-tracker-go/internal/adapter/service/s3"
"github.com/redis/go-redis/v9"
Expand All @@ -26,13 +27,16 @@ type Adapter struct {
UserIcon *usericon.Repo
S3Service *s3service.Service
MonthlyTrans *monthlytrans.Repo
MQService *mq.Service
}

func New(mysqlDB *sql.DB,
redisClient *redis.Client,
s3Client interfaces.S3Client,
presignClient interfaces.S3PresignClient,
mqClient interfaces.MQClient,
bucket string,
queueName string,
) *Adapter {
return &Adapter{
User: user.New(mysqlDB),
Expand All @@ -44,5 +48,6 @@ func New(mysqlDB *sql.DB,
UserIcon: usericon.New(mysqlDB),
S3Service: s3service.New(bucket, s3Client, presignClient),
MonthlyTrans: monthlytrans.New(mysqlDB),
MQService: mq.New(queueName, mqClient),
}
}
16 changes: 16 additions & 0 deletions internal/adapter/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/service/s3"
amqp "github.com/rabbitmq/amqp091-go"
)

// S3PresignClient is the interface that wraps the basic methods for s3 presign client.
Expand All @@ -21,3 +22,18 @@ type S3Client interface {
// DeleteObject deletes an object from S3.
DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
}

// MQClient is the interface that wraps the basic methods for message queue client.
type MQClient interface {
// PublishWithContext publishes a message to a message queue.
PublishWithContext(ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error

// ConsumeWithContext consumes messages from a message queue.
ConsumeWithContext(ctx context.Context, queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

// QueueDeclare declares a queue.
QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) (amqp.Queue, error)

// Close closes the message queue client.
Close() error
}
24 changes: 24 additions & 0 deletions internal/adapter/service/mq/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package mq

import (
"github.com/eyo-chen/expense-tracker-go/internal/adapter/interfaces"
"github.com/eyo-chen/expense-tracker-go/pkg/logger"
amqp "github.com/rabbitmq/amqp091-go"
)

// NewMQClient initializes a new MQ client.
func NewMQClient(url string) (interfaces.MQClient, error) {
conn, err := amqp.Dial(url)
if err != nil {
logger.Error("Failed to connect to message queue", "error", err)
return nil, err
}

ch, err := conn.Channel()
if err != nil {
logger.Error("Failed to create channel", "error", err)
return nil, err
}

return ch, nil
}
54 changes: 54 additions & 0 deletions internal/adapter/service/mq/mq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package mq

import (
"context"
"encoding/json"

"github.com/eyo-chen/expense-tracker-go/internal/adapter/interfaces"
"github.com/eyo-chen/expense-tracker-go/pkg/logger"
amqp "github.com/rabbitmq/amqp091-go"
)

const (
packageName = "adapter/service/mq"
)

type Service struct {
QueueName string
MQClient interfaces.MQClient
}

// NewMQService initializes a new MQ service.
func New(queueName string, mqClient interfaces.MQClient) *Service {
queue, err := mqClient.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
logger.Fatal("Failed to declare queue", "error", err, "package", packageName)
return nil
}

return &Service{QueueName: queue.Name, MQClient: mqClient}
}

func (s *Service) Publish(ctx context.Context, msg interface{}) error {
body, err := json.Marshal(msg)
if err != nil {
logger.Error("Failed to marshal message", "error", err, "package", packageName)
return err
}

return s.MQClient.PublishWithContext(
ctx,
"", // default exchange
s.QueueName,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
}

func (s *Service) Close() {
s.MQClient.Close()
}
100 changes: 100 additions & 0 deletions internal/adapter/service/mq/mq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package mq

import (
"context"
"encoding/json"
"errors"
"testing"

"github.com/eyo-chen/expense-tracker-go/mocks"
"github.com/eyo-chen/expense-tracker-go/pkg/logger"
"github.com/eyo-chen/expense-tracker-go/pkg/testutil"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/suite"
)

var (
mockCTX = context.Background()
mockQueueName = "test-queue"
)

type mqServiceSuite struct {
suite.Suite
service *Service
mockMQClient *mocks.MQClient
}

func TestMQServiceSuite(t *testing.T) {
suite.Run(t, new(mqServiceSuite))
}

func (s *mqServiceSuite) SetupSuite() {
logger.Register()
}

func (s *mqServiceSuite) SetupTest() {
s.mockMQClient = new(mocks.MQClient)

var args amqp.Table
s.mockMQClient.On("QueueDeclare", mockQueueName, true, false, false, false, args).Return(amqp.Queue{Name: mockQueueName}, nil)
s.service = New(mockQueueName, s.mockMQClient)
}

func (s *mqServiceSuite) TearDownTest() {
s.mockMQClient.AssertExpectations(s.T())
}

func (s *mqServiceSuite) TestPublish() {
for scenario, fn := range map[string]func(s *mqServiceSuite, desc string){
"when no error, publish successfully": publish_NoError_ReturnSuccessfully,
"when publish failed, return error": publish_Error_ReturnError,
} {
s.Run(testutil.GetFunName(fn), func() {
s.SetupTest()
fn(s, scenario)
s.TearDownTest()
})
}
}

func publish_NoError_ReturnSuccessfully(s *mqServiceSuite, desc string) {
// prepare mock data
mockMessage := "test-message"
mockBody, err := json.Marshal(mockMessage)
s.Require().NoError(err, desc)
mockPublishing := amqp.Publishing{
ContentType: "application/json",
Body: mockBody,
}

s.mockMQClient.On("PublishWithContext", mockCTX, "", mockQueueName, false, false, mockPublishing).Return(nil)

err = s.service.Publish(mockCTX, mockMessage)
s.Require().NoError(err, desc)
}

func publish_Error_ReturnError(s *mqServiceSuite, desc string) {
// prepare mock data
mockMessage := "test-message"
mockBody, err := json.Marshal(mockMessage)
s.Require().NoError(err, desc)
mockPublishing := amqp.Publishing{
ContentType: "application/json",
Body: mockBody,
}
mockError := errors.New("test-error")

s.mockMQClient.On("PublishWithContext", mockCTX, "", mockQueueName, false, false, mockPublishing).Return(mockError)

err = s.service.Publish(mockCTX, mockMessage)

s.Require().ErrorIs(err, mockError, desc)
}

func (s *mqServiceSuite) TestClose() {
// mock service
s.mockMQClient.On("Close").Return(nil)

// action
s.service.Close()
}
Loading

0 comments on commit 0a3258e

Please sign in to comment.