From 0c8e41340d8fdaae04afb8c608a5d6a57a07edfc Mon Sep 17 00:00:00 2001 From: Taiyi Date: Thu, 28 Mar 2024 10:26:12 +0800 Subject: [PATCH] feat: rocketmq add util func (#1082) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 太一 --- pkg/client/rocketmq/producer.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pkg/client/rocketmq/producer.go b/pkg/client/rocketmq/producer.go index cf44fec0f0..cc8f3e019c 100644 --- a/pkg/client/rocketmq/producer.go +++ b/pkg/client/rocketmq/producer.go @@ -146,3 +146,28 @@ func (pc *Producer) SendWithMsg(ctx context.Context, msg *primitive.Message) err } return nil } + +// SendWithResult rocket mq 发送消息,可以自定义选择 tag 及返回结果 +func (pc *Producer) SendWithResult(ctx context.Context, msg []byte, tag string) (*primitive.SendResult, error) { + m := primitive.NewMessage(pc.Topic, msg) + if tag != "" { + m.WithTag(tag) + } + + res, err := pc.SendSync(ctx, m) + if err != nil { + xlog.Jupiter().Error("send message error", xlog.Any("msg", string(msg))) + return res, err + } + return res, nil +} + +// SendMsg... 自定义消息格式 +func (pc *Producer) SendMsg(ctx context.Context, msg *primitive.Message) (*primitive.SendResult, error) { + res, err := pc.SendSync(ctx, msg) + if err != nil { + xlog.Jupiter().Error("send message error", xlog.Any("msg", msg)) + return res, err + } + return res, nil +}