Skip to content

Commit

Permalink
fix(iris): add message type when publishing message
Browse files Browse the repository at this point in the history
  • Loading branch information
jspark2000 committed Sep 7, 2024
1 parent f1d7031 commit 6200d16
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
2 changes: 1 addition & 1 deletion apps/iris/src/connector/rabbitmq/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *connector) handle(message amqp.Delivery, ctx context.Context) {
}

for result := range resultChan {
if err := c.producer.Publish(result, ctx); err != nil {
if err := c.producer.Publish(result, ctx, message.Type); err != nil {
c.logger.Log(logger.ERROR, fmt.Sprintf("failed to publish result: %s: %s", string(result), err))
// nack
} else {
Expand Down
7 changes: 4 additions & 3 deletions apps/iris/src/connector/rabbitmq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type Producer interface {
OpenChannel() error
Publish([]byte, context.Context) error
Publish([]byte, context.Context, string) error
CleanUp() error
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func (p *producer) confirmHandler(confirms chan amqp.Confirmation) {
}
}

func (p *producer) Publish(result []byte, ctx context.Context) error {
func (p *producer) Publish(result []byte, ctx context.Context, messageType string) error {

seqNo := p.channel.GetNextPublishSeqNo()
p.logger.Log(logger.INFO, fmt.Sprintf("publishing %dB body", len(result)))
Expand All @@ -117,7 +117,8 @@ func (p *producer) Publish(result []byte, ctx context.Context) error {
ContentEncoding: "",
Body: result,
DeliveryMode: amqp.Persistent, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
Priority: 0,
Type: messageType, // 0-9
},
); err != nil {
return fmt.Errorf("exchange publish: %s", err)
Expand Down

0 comments on commit 6200d16

Please sign in to comment.