Skip to content

Commit

Permalink
v1.1.8 add redis 7.0 shard pub/sub #156
Browse files Browse the repository at this point in the history
  • Loading branch information
2881099 committed Sep 1, 2023
1 parent 748dd65 commit ec862f8
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/FreeRedis/FreeRedis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<AssemblyName>FreeRedis</AssemblyName>
<PackageId>FreeRedis</PackageId>
<RootNamespace>FreeRedis</RootNamespace>
<Version>1.1.7</Version>
<Version>1.1.8</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageProjectUrl>https://github.com/2881099/FreeRedis</PackageProjectUrl>
<Description>FreeRedis is .NET redis client, supports cluster, sentinel, master-slave, pipeline, transaction and connection pool.</Description>
Expand Down
50 changes: 50 additions & 0 deletions src/FreeRedis/FreeRedis.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 33 additions & 4 deletions src/FreeRedis/IRedisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,31 @@ public interface IRedisClient
long PubSubNumPat();
void PUnSubscribe(params string[] pattern);
void UnSubscribe(params string[] channels);
IDisposable PSubscribe(string pattern, Action<string, object> handler);
IDisposable PSubscribe(string[] pattern, Action<string, object> handler);
IDisposable Subscribe(string[] channels, Action<string, object> handler);
IDisposable Subscribe(string channel, Action<string, object> handler);

/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
long SPublish(string shardchannel, string message);
string[] PubSubShardChannels(string pattern = "*");
long PubSubShardNumSub(string channel);
long[] PubSubShardNumSub(string[] channels);
/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
IDisposable SSubscribe(string[] shardchannels, Action<string, object> handler);
/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
IDisposable SSubscribe(string shardchannel, Action<string, object> handler);
/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
void SUnSubscribe(params string[] shardchannels);

bool ScriptExists(string sha1);
bool[] ScriptExists(string[] sha1);
void ScriptFlush();
Expand Down Expand Up @@ -538,10 +563,14 @@ public interface IRedisClient
Task<long> PubSubNumSubAsync(string channel);
Task<long[]> PubSubNumSubAsync(string[] channels);
Task<long> PubSubNumPatAsync();
IDisposable PSubscribe(string pattern, Action<string, object> handler);
IDisposable PSubscribe(string[] pattern, Action<string, object> handler);
IDisposable Subscribe(string[] channels, Action<string, object> handler);
IDisposable Subscribe(string channel, Action<string, object> handler);
/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
Task<long> SPublishAsync(string shardchannel, string message);
Task<string[]> PubSubShardChannelsAsync(string pattern = "*");
Task<long> PubSubShardNumSubAsync(string channel);
Task<long[]> PubSubShardNumSubAsync(string[] channels);

Task<object> EvalAsync(string script, string[] keys = null, params object[] arguments);
Task<object> EvalShaAsync(string sha1, string[] keys = null, params object[] arguments);
Task<bool> ScriptExistsAsync(string sha1);
Expand Down
93 changes: 76 additions & 17 deletions src/FreeRedis/RedisClient/PubSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,28 @@ partial class RedisClient
public Task<long> PubSubNumSubAsync(string channel) => CallAsync("PUBSUB".SubCommand("NUMSUB").Input(channel), rt => rt.ThrowOrValue((a, _) => a.MapToList((x, y) => y.ConvertTo<long>()).FirstOrDefault()));
public Task<long[]> PubSubNumSubAsync(string[] channels) => CallAsync("PUBSUB".SubCommand("NUMSUB").Input(channels), rt => rt.ThrowOrValue((a, _) => a.MapToList((x, y) => y.ConvertTo<long>()).ToArray()));
public Task<long> PubSubNumPatAsync() => CallAsync("PUBLISH".SubCommand("NUMPAT"), rt => rt.ThrowOrValue<long>());

/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
public Task<long> SPublishAsync(string shardchannel, string message) => CallAsync("SPUBLISH".Input(shardchannel, message), rt => rt.ThrowOrValue<long>());
public Task<string[]> PubSubShardChannelsAsync(string pattern = "*") => CallAsync("PUBSUB".SubCommand("SHARDCHANNELS").Input(pattern), rt => rt.ThrowOrValue<string[]>());
public Task<long> PubSubShardNumSubAsync(string channel) => CallAsync("PUBSUB".SubCommand("SHARDNUMSUB").Input(channel), rt => rt.ThrowOrValue((a, _) => a.MapToList((x, y) => y.ConvertTo<long>()).FirstOrDefault()));
public Task<long[]> PubSubShardNumSubAsync(string[] channels) => CallAsync("PUBSUB".SubCommand("SHARDNUMSUB").Input(channels), rt => rt.ThrowOrValue((a, _) => a.MapToList((x, y) => y.ConvertTo<long>()).ToArray()));
#endregion
#endif

public IDisposable PSubscribe(string pattern, Action<string, object> handler)
{
if (string.IsNullOrEmpty(pattern)) throw new ArgumentNullException(nameof(pattern));
if (handler == null) throw new ArgumentNullException(nameof(handler));
return _pubsub.Subscribe(true, new[] { pattern }, (p, k, d) => handler(k, d));
return _pubsub.Subscribe(true, false, new[] { pattern }, (p, k, d) => handler(k, d));
}
public IDisposable PSubscribe(string[] pattern, Action<string, object> handler)
{
if (pattern?.Any() != true) throw new ArgumentNullException(nameof(pattern));
if (handler == null) throw new ArgumentNullException(nameof(handler));
return _pubsub.Subscribe(true, pattern, (p, k, d) => handler(k, d));
return _pubsub.Subscribe(true, false, pattern, (p, k, d) => handler(k, d));
}

public long Publish(string channel, string message) => Call("PUBLISH".Input(channel, message), rt => rt.ThrowOrValue<long>());
Expand All @@ -63,21 +71,51 @@ public IDisposable PSubscribe(string[] pattern, Action<string, object> handler)
public long[] PubSubNumSub(string[] channels) => Call("PUBSUB".SubCommand("NUMSUB").Input(channels), rt => rt.ThrowOrValue((a, _) => a.MapToList((x, y) => y.ConvertTo<long>()).ToArray()));
public long PubSubNumPat() => Call("PUBLISH".SubCommand("NUMPAT"), rt => rt.ThrowOrValue<long>());


public void PUnSubscribe(params string[] pattern) => _pubsub.UnSubscribe(true, pattern);
public void PUnSubscribe(params string[] pattern) => _pubsub.UnSubscribe(true, false, pattern);
public IDisposable Subscribe(string[] channels, Action<string, object> handler)
{
if (channels?.Any() != true) throw new ArgumentNullException(nameof(channels));
if (handler == null) throw new ArgumentNullException(nameof(handler));
return _pubsub.Subscribe(false, channels, (p, k, d) => handler(k, d));
return _pubsub.Subscribe(false, false, channels, (p, k, d) => handler(k, d));
}
public IDisposable Subscribe(string channel, Action<string, object> handler)
{
if (string.IsNullOrEmpty(channel)) throw new ArgumentNullException(nameof(channel));
if (handler == null) throw new ArgumentNullException(nameof(handler));
return _pubsub.Subscribe(false, new[] { channel }, (p, k, d) => handler(k, d));
return _pubsub.Subscribe(false, false, new[] { channel }, (p, k, d) => handler(k, d));
}
public void UnSubscribe(params string[] channels) => _pubsub.UnSubscribe(false, channels);
public void UnSubscribe(params string[] channels) => _pubsub.UnSubscribe(false, false, channels);


/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
public long SPublish(string shardchannel, string message) => Call("SPUBLISH".Input(shardchannel, message), rt => rt.ThrowOrValue<long>());
public string[] PubSubShardChannels(string pattern = "*") => Call("PUBSUB".SubCommand("SHARDCHANNELS").Input(pattern), rt => rt.ThrowOrValue<string[]>());
public long PubSubShardNumSub(string channel) => Call("PUBSUB".SubCommand("SHARDNUMSUB").Input(channel), rt => rt.ThrowOrValue((a, _) => a.MapToList((x, y) => y.ConvertTo<long>()).FirstOrDefault()));
public long[] PubSubShardNumSub(string[] channels) => Call("PUBSUB".SubCommand("SHARDNUMSUB").Input(channels), rt => rt.ThrowOrValue((a, _) => a.MapToList((x, y) => y.ConvertTo<long>()).ToArray()));
/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
public IDisposable SSubscribe(string[] shardchannels, Action<string, object> handler)
{
if (shardchannels?.Any() != true) throw new ArgumentNullException(nameof(shardchannels));
if (handler == null) throw new ArgumentNullException(nameof(handler));
return _pubsub.Subscribe(false, true, shardchannels, (p, k, d) => handler(k, d));
}
/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
public IDisposable SSubscribe(string shardchannel, Action<string, object> handler)
{
if (string.IsNullOrEmpty(shardchannel)) throw new ArgumentNullException(nameof(shardchannel));
if (handler == null) throw new ArgumentNullException(nameof(handler));
return _pubsub.Subscribe(false, true, new[] { shardchannel }, (p, k, d) => handler(k, d));
}
/// <summary>
/// redis 7.0 shard pub/sub
/// </summary>
public void SUnSubscribe(params string[] shardchannels) => _pubsub.UnSubscribe(false, true, shardchannels);


class PubSubSubscribeDisposable : IPubSubSubscriber
Expand Down Expand Up @@ -107,6 +145,7 @@ class PubSub : IDisposable
ConcurrentDictionary<Guid, string[]> _cancels = new ConcurrentDictionary<Guid, string[]>();
ConcurrentDictionary<string, ConcurrentDictionary<Guid, RegisterInfo>> _registers = new ConcurrentDictionary<string, ConcurrentDictionary<Guid, RegisterInfo>>();
const string _psub_regkey_prefix = "PSubscribe__ |";
const string _ssub_regkey_prefix = "SSubscribe__ |";
internal class RegisterInfo
{
public Guid Id { get; }
Expand Down Expand Up @@ -145,32 +184,44 @@ internal void Cancel(params Guid[] ids)
readyUnsubInterKeys.Add(oldkey);
}
}
var unsub = readyUnsubInterKeys.Where(a => !a.StartsWith(_psub_regkey_prefix)).ToArray();
var unsub = readyUnsubInterKeys.Where(a => !a.StartsWith(_psub_regkey_prefix) && !a.StartsWith(_ssub_regkey_prefix)).ToArray();
var punsub = readyUnsubInterKeys.Where(a => a.StartsWith(_psub_regkey_prefix)).Select(a => a.Replace(_psub_regkey_prefix, "")).ToArray();
var sunsub = readyUnsubInterKeys.Where(a => a.StartsWith(_ssub_regkey_prefix)).Select(a => a.Replace(_ssub_regkey_prefix, "")).ToArray();
if (unsub.Any()) Call("UNSUBSCRIBE".Input(unsub));
if (punsub.Any()) Call("PUNSUBSCRIBE".Input(punsub));
if (sunsub.Any()) Call("SUNSUBSCRIBE".Input(punsub));

if (!_cancels.Any())
lock (_lock)
if (!_cancels.Any())
_redisSocket?.ReleaseSocket();
}
internal void UnSubscribe(bool punsub, string[] channels)
internal void UnSubscribe(bool punsub, bool sunsub, string[] channels)
{
channels = channels?.Distinct().Select(a => punsub ? $"{_psub_regkey_prefix}{a}" : a).ToArray();
channels = channels?.Distinct().Select(a =>
{
if (punsub) return $"{_psub_regkey_prefix}{a}";
if (sunsub) return $"{_ssub_regkey_prefix}{a}";
return a;
}).ToArray();
if (channels.Any() != true) return;
var ids = channels.Select(a => _registers.TryGetValue(a, out var tryval) ? tryval : null).Where(a => a != null).SelectMany(a => a.Keys).Distinct().ToArray();
Cancel(ids);
}
internal IDisposable Subscribe(bool psub, string[] channels, Action<string, string, object> handler)
internal IDisposable Subscribe(bool psub, bool ssub, string[] channels, Action<string, string, object> handler)
{
if (_stoped) return new PubSubSubscribeDisposable(this, null);
channels = channels?.Distinct().Where(a => !string.IsNullOrEmpty(a)).ToArray(); //In case of external modification
if (channels?.Any() != true) return new PubSubSubscribeDisposable(this, null);

var id = Guid.NewGuid();
var time = DateTime.Now;
var regkeys = channels.Select(a => psub ? $"{_psub_regkey_prefix}{a}" : a).ToArray();
var regkeys = channels.Select(a =>
{
if (psub) return $"{_psub_regkey_prefix}{a}";
if (ssub) return $"{_ssub_regkey_prefix}{a}";
return a;
}).ToArray();
for (var a = 0; a < regkeys.Length; a++)
{
ConcurrentDictionary<Guid, RegisterInfo> dict = null;
Expand Down Expand Up @@ -198,10 +249,12 @@ internal IDisposable Subscribe(bool psub, string[] channels, Action<string, stri
if (object.Equals(_, (_topOwner._pubsub._redisSocket as DefaultRedisSocket.TempProxyRedisSocket)?._owner))
{
var chans = _cancels.SelectMany(a => a.Value).ToList();
var resub = chans.Where(a => !a.StartsWith(_psub_regkey_prefix)).ToArray();
var resub = chans.Where(a => !a.StartsWith(_psub_regkey_prefix) && !a.StartsWith(_ssub_regkey_prefix)).ToArray();
var repsub = chans.Where(a => a.StartsWith(_psub_regkey_prefix)).Select(a => a.Replace(_psub_regkey_prefix, "")).ToArray();
var ressub = chans.Where(a => a.StartsWith(_ssub_regkey_prefix)).Select(a => a.Replace(_ssub_regkey_prefix, "")).ToArray();
if (resub.Any()) Call("SUBSCRIBE".Input(resub));
if (repsub.Any()) Call("PSUBSCRIBE".Input(repsub));
if (ressub.Any()) Call("SSUBSCRIBE".Input(repsub));
}
};
new Thread(() =>
Expand Down Expand Up @@ -235,13 +288,17 @@ internal IDisposable Subscribe(bool psub, string[] channels, Action<string, stri
{
case "pong":
case "punsubscribe":
case "sunsubscribe":
case "unsubscribe":
continue;
case "pmessage":
OnData(val[1].ConvertTo<string>(), val[2].ConvertTo<string>(), val[3]);
OnData(val[1].ConvertTo<string>(), false, val[2].ConvertTo<string>(), val[3]);
continue;
case "message":
OnData(null, val[1].ConvertTo<string>(), val[2]);
OnData(null, false, val[1].ConvertTo<string>(), val[2]);
continue;
case "smessage":
OnData(null, true, val[1].ConvertTo<string>(), val[2]);
continue;
}
}
Expand All @@ -259,9 +316,11 @@ internal IDisposable Subscribe(bool psub, string[] channels, Action<string, stri
Call((psub ? "PSUBSCRIBE" : "SUBSCRIBE").Input(channels));
return new PubSubSubscribeDisposable(this, () => Cancel(id));
}
void OnData(string pattern, string key, object data)
void OnData(string pattern, bool ssub, string key, object data)
{
var regkey = pattern == null ? key : $"{_psub_regkey_prefix}{pattern}";
var regkey = key;
if (pattern != null) regkey = $"{_psub_regkey_prefix}{pattern}";
if (ssub) regkey = $"{_ssub_regkey_prefix}{ key}";
if (_registers.TryGetValue(regkey, out var tryval) == false) return;
var multirecvs = tryval.Values.OrderBy(a => a.RegTime).ToArray(); //Execute in order
foreach (var recv in multirecvs)
Expand Down

0 comments on commit ec862f8

Please sign in to comment.