Skip to content

Commit

Permalink
fix: rocketmq unable to rebalance after adding queues (#917)
Browse files Browse the repository at this point in the history
* fix: rocketmq Unable to rebalance after adding queues
  • Loading branch information
hjxp authored Jun 28, 2023
1 parent 0940719 commit e601a33
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/BurntSushi/toml v1.2.1
github.com/alibaba/sentinel-golang v1.0.4
github.com/aliyun/aliyun-tablestore-go-sdk v1.7.7
github.com/apache/rocketmq-client-go/v2 v2.0.0-20230518020902-2a8172bb9174
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230628073434-533de03048e1
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0
github.com/coocood/freecache v1.2.3
github.com/cosmtrek/air v1.43.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ github.com/aliyun/aliyun-tablestore-go-sdk v1.7.7/go.mod h1:mZCxM44kLKLY5ci+0j6b
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/rocketmq-client-go/v2 v2.0.0-20230518020902-2a8172bb9174 h1:RXutpBr8h9zJH1zcmCCptgm6+q8N0oyMrpzbIjRess4=
github.com/apache/rocketmq-client-go/v2 v2.0.0-20230518020902-2a8172bb9174/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE=
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230628073434-533de03048e1 h1:vVViC77QvxLOUhbbMGSaVnOwkUnFCnXZQ7mX6SDuOZA=
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230628073434-533de03048e1/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
Expand Down
26 changes: 16 additions & 10 deletions pkg/client/rocketmq/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package rocketmq
import (
"crypto/md5"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -147,10 +148,11 @@ func StdProducerConfig(name string) *ProducerConfig {
func RawPushConsumerConfig(name string) *PushConsumerConfig {
var defaultConfig = DefaultConfig()
var pushConsumerConfig = defaultConfig.PushConsumer
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pushConsumerConfig))
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil ||
(len(pushConsumerConfig.Addr) == 0 && len(defaultConfig.Addresses) == 0) ||
len(pushConsumerConfig.Topic) == 0 {
xlog.Jupiter().Panic("pushConsumerConfig fail", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pushConsumerConfig))
}

// 兼容rocket_client_mq变更,addr需要携带shceme
if len(pushConsumerConfig.Addr) == 0 {
pushConsumerConfig.Addr = defaultConfig.Addresses
Expand All @@ -162,7 +164,7 @@ func RawPushConsumerConfig(name string) *PushConsumerConfig {
// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if pushConsumerConfig.InstanceName == "" {
pushConsumerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pushConsumerConfig.Addr, ","))))
pushConsumerConfig.InstanceName = fmt.Sprintf("%x@%d", md5.Sum([]byte(strings.Join(pushConsumerConfig.Addr, ","))), os.Getpid())
}

if xdebug.IsDevelopmentMode() {
Expand All @@ -175,8 +177,10 @@ func RawPushConsumerConfig(name string) *PushConsumerConfig {
func RawPullConsumerConfig(name string) *PullConsumerConfig {
var defaultConfig = DefaultConfig()
var pullConsumerConfig = defaultConfig.PullConsumer
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pullConsumerConfig))
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil ||
(len(pullConsumerConfig.Addr) == 0 && len(defaultConfig.Addresses) == 0) ||
len(pullConsumerConfig.Topic) == 0 {
xlog.Jupiter().Panic("PullConsumerConfig fail", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pullConsumerConfig))
}

// 兼容rocket_client_mq变更,addr需要携带shceme
Expand All @@ -190,7 +194,7 @@ func RawPullConsumerConfig(name string) *PullConsumerConfig {
// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if pullConsumerConfig.InstanceName == "" {
pullConsumerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pullConsumerConfig.Addr, ","))))
pullConsumerConfig.InstanceName = fmt.Sprintf("%x@%d", md5.Sum([]byte(strings.Join(pullConsumerConfig.Addr, ","))), os.Getpid())
}

if xdebug.IsDevelopmentMode() {
Expand All @@ -203,8 +207,10 @@ func RawPullConsumerConfig(name string) *PullConsumerConfig {
func RawProducerConfig(name string) *ProducerConfig {
var defaultConfig = DefaultConfig()
var producerConfig = defaultConfig.Producer
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", defaultConfig))
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil ||
(len(producerConfig.Addr) == 0 && len(defaultConfig.Addresses) == 0) ||
len(producerConfig.Topic) == 0 {
xlog.Jupiter().Panic("RawProducerConfig fail", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", producerConfig))
}

// 兼容rocket_client_mq变更,addr需要携带shceme
Expand All @@ -217,7 +223,7 @@ func RawProducerConfig(name string) *ProducerConfig {
// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if producerConfig.InstanceName == "" {
producerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(producerConfig.Addr, ","))))
producerConfig.InstanceName = fmt.Sprintf("%x@%d", md5.Sum([]byte(strings.Join(producerConfig.Addr, ","))), os.Getpid())
}

if xdebug.IsDevelopmentMode() {
Expand Down

0 comments on commit e601a33

Please sign in to comment.