Skip to content

Commit

Permalink
[improv]改进阿里云rocketmq对接。在过去两年时间里,阿里云rocketmq做了升级,导致某些指令兼容性没那么好,这里统一做兼…
Browse files Browse the repository at this point in the history
…容处理。阿里云rmq的网络架构非常特殊,在vpc内网时,就当作普通rmq使用,没有特别之处。在公网时,获取得到的broker实际上是网关,然后获取消费组状态时,得到的却是内网broker状态,这里修改代码强行通过,但是消费时仍然得到不支持lite pull的错误。
  • Loading branch information
nnhy committed Apr 10, 2024
1 parent 26c7568 commit b7d0910
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 263 deletions.
8 changes: 4 additions & 4 deletions NewLife.RocketMQ/BrokerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ public void Ping()
// 生产者 和 消费者 略有不同
if (cfg is Producer pd)
{
body.ProducerDataSet = new[] {
body.ProducerDataSet = [
new ProducerData { GroupName = pd.Group },
new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" },
};
body.ConsumerDataSet = new ConsumerData[] { };
];
body.ConsumerDataSet = [];
}
else if (cfg is Consumer cm)
{
body.ProducerDataSet = new[] { new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" } };
body.ProducerDataSet = [new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" }];
body.ConsumerDataSet = cm.Data.ToArray();
}

Expand Down
78 changes: 34 additions & 44 deletions NewLife.RocketMQ/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,50 +80,36 @@ protected override void Dispose(Boolean disposing)

/// <summary>启动</summary>
/// <returns></returns>
public override Boolean Start()
protected override void OnStart()
{
if (Active) return true;

WriteLog("正在准备消费 {0}", Topic);

using var span = Tracer?.NewSpan($"mq:{Topic}:Start");
try
var list = Data;
if (list == null)
{
var list = Data;
if (list == null)
// 建立消费者数据,用于心跳
var sd = new SubscriptionData
{
// 建立消费者数据,用于心跳
var sd = new SubscriptionData
{
Topic = Topic,
TagsSet = Tags
};
var cd = new ConsumerData
{
GroupName = Group,
ConsumeFromWhere = FromLastOffset ? "CONSUME_FROM_LAST_OFFSET" : "CONSUME_FROM_FIRST_OFFSET",
MessageModel = MessageModel.ToString().ToUpper(),
SubscriptionDataSet = new[] { sd },
};

list = new List<ConsumerData> { cd };

Data = list;
}
Topic = Topic,
TagsSet = Tags
};
var cd = new ConsumerData
{
GroupName = Group,
ConsumeFromWhere = FromLastOffset ? "CONSUME_FROM_LAST_OFFSET" : "CONSUME_FROM_FIRST_OFFSET",
MessageModel = MessageModel.ToString().ToUpper(),
SubscriptionDataSet = [sd],
};

if (!base.Start()) return false;
list = [cd];

// 默认自动开始调度
if (AutoSchedule) StartSchedule();
Data = list;
}
catch (Exception ex)
{
span?.SetError(ex, null);

throw;
}
base.OnStart();

return true;
// 默认自动开始调度
if (AutoSchedule) StartSchedule();
}

/// <summary>
Expand All @@ -133,7 +119,7 @@ public override void Stop()
{
if (!Active) return;

using var span = Tracer?.NewSpan($"mq:{Topic}:Stop");
using var span = Tracer?.NewSpan($"mq:{Name}:Stop");
try
{
// 停止并保存偏移
Expand Down Expand Up @@ -207,7 +193,7 @@ public async Task<PullResult> Pull(MessageQueue mq, Int64 offset, Int32 maxNums,
else
{
pr.Status = PullStatus.Unknown;
Log.Warn("响应编号:{0} 响应备注:{1} 序列编号:{2} 序列偏移量:{3}", rs.Header.Code, rs.Header.Remark, mq.QueueId, offset);
Log.Warn("[{0}]{1} 序列编号:{2} 序列偏移量:{3}", (ResponseCode)rs.Header.Code, rs.Header.Remark, mq.QueueId, offset);
}

pr.Read(rs.Header?.ExtFields);
Expand Down Expand Up @@ -331,7 +317,7 @@ public async Task<ICollection<String>> GetConsumers(String group = null)
// 在所有Broker上查询
foreach (var item in Brokers)
{
using var span = Tracer?.NewSpan($"mq:{Topic}:GetConsumers", item.Name);
using var span = Tracer?.NewSpan($"mq:{Name}:GetConsumers", item.Name);
try
{
var bk = GetBroker(item.Name);
Expand Down Expand Up @@ -476,7 +462,7 @@ private async Task DoPull(QueueStore st, CancellationToken cancellationToken)
DefaultSpan.Current = null;

// 性能埋点
using var span = Tracer?.NewSpan($"mq:{Topic}:Consume", pr.Messages);
using var span = Tracer?.NewSpan($"mq:{Name}:Consume", pr.Messages);
try
{
// 触发消费
Expand Down Expand Up @@ -512,7 +498,7 @@ private async Task DoPull(QueueStore st, CancellationToken cancellationToken)

break;
case PullStatus.Unknown:
Log.Error("未知响应类型消息序列[{1}]偏移量{0}", st.Offset, st.Queue.QueueId);
Log.Error("未知响应类型消息,序列[{1}]偏移量{0}", st.Offset, st.Queue.QueueId);
break;
default:
break;
Expand Down Expand Up @@ -587,7 +573,8 @@ private async Task PersistAll(IEnumerable<QueueStore> stores)

class QueueStore
{
[XmlIgnore] public MessageQueue Queue { get; set; }
[XmlIgnore]
public MessageQueue Queue { get; set; }
public Int64 Offset { get; set; } = -1;
public Int64 CommitOffset { get; set; } = -1;

Expand All @@ -602,6 +589,7 @@ class QueueStore
/// <returns></returns>
public override Int32 GetHashCode() => (Queue == null ? 0 : Queue.GetHashCode()) ^ Offset.GetHashCode();

public override String ToString() => Queue?.ToString();
#endregion
}

Expand Down Expand Up @@ -682,7 +670,7 @@ public async Task<Boolean> Rebalance()
var str = dic.Join(";", e => $"{e.Key}[{e.Value}]");
WriteLog("消费重新平衡,当前消费者负责queue分片:{0}", str);

using var span = Tracer?.NewSpan($"mq:{Topic}:Rebalance", str);
using var span = Tracer?.NewSpan($"mq:{Name}:Rebalance", str);

_Queues = rs.ToArray();
await InitOffsetAsync();
Expand Down Expand Up @@ -710,7 +698,7 @@ private async Task CheckGroup(Object state = null)
if (_checking) return;
_checking = true;

using var span = Tracer?.NewSpan($"mq:{Topic}:CheckGroup");
using var span = Tracer?.NewSpan($"mq:{Name}:CheckGroup");
try
{
var rs = await Rebalance();
Expand Down Expand Up @@ -766,7 +754,8 @@ private async Task InitOffsetAsync(CancellationToken cancellationToken = default
if (store.Offset >= 0) continue;

var item = offsetTables.FirstOrDefault(t => t.Key.BrokerName == store.Queue.BrokerName && t.Key.QueueId == store.Queue.QueueId);
var offsetTable = item.Value;
//!! 阿里云公网版RocketMQ,消费者状态返回的是真正brokerName,而前面Broker得到的是网关名,导致这里无法匹配
var offsetTable = item.Value ?? new OffsetWrapperModel();
if (neverConsumed)
{
var offset = 0L;
Expand Down Expand Up @@ -937,10 +926,11 @@ private Command GetConsumerRunningInfo(Command cmd)
ci.Properties = dic;

var sd = new SubscriptionData { Topic = Topic, };
ci.SubscriptionSet = new[] { sd };
ci.SubscriptionSet = [sd];

var sb = new StringBuilder();
sb.Append('{');
if (_Queues != null)
{
sb.Append("\"mqTable\":{");
for (var i = 0; i < _Queues.Length; i++)
Expand Down
107 changes: 60 additions & 47 deletions NewLife.RocketMQ/MqBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,19 @@ namespace NewLife.RocketMQ.Client;
public abstract class MqBase : DisposeBase
{
#region 属性
/// <summary>名称</summary>
public String Name { get; set; }

/// <summary>名称服务器地址</summary>
public String NameServerAddress { get; set; }

private String _group = "DEFAULT_PRODUCER";
/// <summary>消费组</summary>
/// <remarks>阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group]</remarks>
public String Group
{
get
{
// 阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group]
var ins = Aliyun?.InstanceId;
return ins.IsNullOrEmpty() ? _group : $"{ins}%{_group}";
}
set
{
_group = value;
}
}
public String Group { get; set; } = "DEFAULT_PRODUCER";

private String _topic = "TBW102";
/// <summary>主题</summary>
/// <remarks>阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]</remarks>
public String Topic
{
get
{
// 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]
var ins = Aliyun?.InstanceId;
return ins.IsNullOrEmpty() ? _topic : $"{ins}%{_topic}";
}
set
{
_topic = value;
}
}
public String Topic { get; set; } = "TBW102";

/// <summary>本地IP地址</summary>
public String ClientIP { get; set; } = NetHelper.MyIP() + "";
Expand Down Expand Up @@ -91,6 +68,9 @@ public String Topic
/// <summary>性能跟踪</summary>
public ITracer Tracer { get; set; } = DefaultTracer.Instance;

private String _group;
private String _topic;

/// <summary>名称服务器</summary>
protected NameClient _NameServer;
#endregion
Expand Down Expand Up @@ -143,36 +123,71 @@ protected override void Dispose(Boolean disposing)

/// <summary>友好字符串</summary>
/// <returns></returns>
public override String ToString() => Group;
public override String ToString() => _group;
#endregion

#region 基础方法
/// <summary>应用配置</summary>
/// <param name="setting"></param>
public virtual void Configure(MqSetting setting)
{
NameServerAddress = setting.NameServer;
Topic = setting.Topic;
Group = setting.Group;
if (!setting.NameServer.IsNullOrEmpty()) NameServerAddress = setting.NameServer;
if (!setting.Topic.IsNullOrEmpty()) Topic = setting.Topic;
if (!setting.Group.IsNullOrEmpty()) Group = setting.Group;

Aliyun ??= new AliyunOptions();
if (!setting.Server.IsNullOrEmpty()) Aliyun.Server = setting.Server;
if (!setting.AccessKey.IsNullOrEmpty()) Aliyun.AccessKey = setting.AccessKey;
if (!setting.SecretKey.IsNullOrEmpty()) Aliyun.SecretKey = setting.SecretKey;
}

/// <summary>开始</summary>
/// <returns></returns>
public Boolean Start()
{
if (Active) return true;

if (!setting.Server.IsNullOrEmpty() &&
!setting.AccessKey.IsNullOrEmpty())
_group = Group;
_topic = Topic;
if (Name.IsNullOrEmpty()) Name = Topic;

// 解析阿里云实例
var aliyun = Aliyun;
if (aliyun != null && !aliyun.AccessKey.IsNullOrEmpty())
{
var ns = NameServerAddress;
if (aliyun.InstanceId.IsNullOrEmpty() && !ns.IsNullOrEmpty() && ns.Contains("MQ_INST_"))
{
aliyun.InstanceId = ns.Substring("://", ".");
}
}

using var span = Tracer?.NewSpan($"mq:{Name}:Start");
try
{
Aliyun = new AliyunOptions
// 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]
var ins = Aliyun?.InstanceId;
if (!ins.IsNullOrEmpty())
{
Server = setting.Server,
AccessKey = setting.AccessKey,
SecretKey = setting.SecretKey,
};
if (!Topic.StartsWith(ins)) Topic = $"{ins}%{Topic}";
if (!Group.StartsWith(ins)) Group = $"{ins}%{Group}";
}

OnStart();
}
catch (Exception ex)
{
span?.SetError(ex, null);

throw;
}

return Active = true;
}

/// <summary>开始</summary>
/// <returns></returns>
public virtual Boolean Start()
protected virtual void OnStart()
{
if (Active) return true;

if (NameServerAddress.IsNullOrEmpty())
{
// 获取阿里云ONS的名称服务器地址
Expand All @@ -190,7 +205,7 @@ public virtual Boolean Start()

var client = new NameClient(ClientId, this)
{
Name = Topic,
Name = Name,
Tracer = Tracer,
Log = Log
};
Expand All @@ -204,8 +219,6 @@ public virtual Boolean Start()
}

_NameServer = client;

return Active = true;
}

/// <summary>停止</summary>
Expand Down Expand Up @@ -313,7 +326,7 @@ public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag
order = false,
};

using var span = Tracer?.NewSpan($"mq:{Topic}:CreateTopic", header);
using var span = Tracer?.NewSpan($"mq:{Name}:CreateTopic", header);
try
{
// 在所有Broker上创建Topic
Expand Down
3 changes: 3 additions & 0 deletions NewLife.RocketMQ/NameClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ protected override void Dispose(Boolean disposing)
protected override void OnStart()
{
var cfg = Config;
if (cfg.NameServerAddress.IsNullOrEmpty())
throw new ArgumentNullException(nameof(cfg.NameServerAddress), "未指定NameServer地址");

var ss = cfg.NameServerAddress.Split(";");

var list = new List<NetUri>();
Expand Down
Loading

0 comments on commit b7d0910

Please sign in to comment.