Skip to content

Commit

Permalink
finish client-side-caching
Browse files Browse the repository at this point in the history
  • Loading branch information
2881099 committed Nov 10, 2020
1 parent f2d2945 commit f57c555
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 141 deletions.
105 changes: 32 additions & 73 deletions examples/console_netcore31_client_side_caching/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
using FreeRedis.Internal;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;

namespace console_netcore31_client_side_caching
{
Expand All @@ -24,86 +26,43 @@ class Program

static void Main(string[] args)
{
cli.UseClientSideCaching();

cli.Set("Interceptor01", "123123");

var val1 = cli.Get("Interceptor01");
var val2 = cli.Get("Interceptor01");
var val3 = cli.Get("Interceptor01");
cli.UseClientSideCaching(new ClientSideCachingOptions
{
//本地缓存的容量
Capacity = 3,
//过滤哪些键能被本地缓存
KeyFilter = key => key.StartsWith("Interceptor"),
//检查长期未使用的缓存
CheckExpired = (key, dt) => DateTime.Now.Subtract(dt) > TimeSpan.FromSeconds(2)
});

Console.ReadKey();
cli.Set("Interceptor01", "123123"); //redis-server

var val4 = cli.Get("Interceptor01");
var val1 = cli.Get("Interceptor01"); //redis-server
var val2 = cli.Get("Interceptor01"); //本地
var val3 = cli.Get("Interceptor01"); //断点等3秒,redis-server

Console.ReadKey();
}
}

static class MemoryCacheAopExtensions
{
public static void UseClientSideCaching(this RedisClient cli)
{
var sub = cli.Subscribe("__redis__:invalidate", (chan, msg) =>
{
var keys = msg as object[];
foreach (var key in keys)
{
_dicStrings.TryRemove(string.Concat(key), out var old);
}
}) as IPubSubSubscriber;
cli.Set("Interceptor01", "234567"); //redis-server
var val4 = cli.Get("Interceptor01"); //redis-server
var val5 = cli.Get("Interceptor01"); //本地

var context = new ClientSideCachingContext(cli, sub);
cli.Interceptors.Add(() => new MemoryCacheAop());
cli.Unavailable += (_, e) =>
{
_dicStrings.Clear();
};
cli.Connected += (_, e) =>
{
e.Client.ClientTracking(true, context._sub.RedisSocket.ClientId, null, false, false, false, false);
};
}
var val6 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //redis-server
var val7 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //本地
var val8 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //本地

class ClientSideCachingContext
{
internal RedisClient _cli;
internal IPubSubSubscriber _sub;
public ClientSideCachingContext(RedisClient cli, IPubSubSubscriber sub)
{
_cli = cli;
_sub = sub;
}
}
cli.MSet("Interceptor01", "Interceptor01Value", "Interceptor02", "Interceptor02Value", "Interceptor03", "Interceptor03Value"); //redis-server
var val9 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //redis-server
var val10 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //本地

static ConcurrentDictionary<string, object> _dicStrings = new ConcurrentDictionary<string, object>();
class MemoryCacheAop : IInterceptor
{
public void After(InterceptorAfterEventArgs args)
{
switch (args.Command._command)
{
case "GET":
if (_iscached == false && args.Exception == null)
_dicStrings.TryAdd(args.Command.GetKey(0), args.Value);
break;
}
}
//以下 KeyFilter 返回 false,从而不使用本地缓存
cli.Set("123Interceptor01", "123123"); //redis-server

bool _iscached = false;
public void Before(InterceptorBeforeEventArgs args)
{
switch (args.Command._command)
{
case "GET":
if (_dicStrings.TryGetValue(args.Command.GetKey(0), out var tryval))
{
args.Value = tryval;
_iscached = true;
}
break;
}
}
var val11 = cli.Get("123Interceptor01"); //redis-server
var val12 = cli.Get("123Interceptor01"); //redis-server
var val23 = cli.Get("123Interceptor01"); //redis-server
Console.ReadKey();
}
}


}
16 changes: 16 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ public static RedisClient cli = new RedisClient(

-----

#### ⚡ Client-side-cahing (本地缓存)

> requires redis-server 6.0 and above
```csharp
cli.UseClientSideCaching(new ClientSideCachingOptions
{
//本地缓存的容量
Capacity = 3,
//过滤哪些键能被本地缓存
KeyFilter = key => key.StartsWith("Interceptor"),
//检查长期未使用的缓存
CheckExpired = (key, dt) => DateTime.Now.Subtract(dt) > TimeSpan.FromSeconds(2)
});
```

#### 📡 Subscribe (订阅)

```csharp
Expand Down
242 changes: 242 additions & 0 deletions src/FreeRedis/ClientSideCaching.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
using FreeRedis.Internal;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace FreeRedis
{
public class ClientSideCachingOptions
{
public int Capacity { get; set; }

/// <summary>
/// true: cache
/// </summary>
public Func<string, bool> KeyFilter { get; set; }

/// <summary>
/// true: expired
/// </summary>
public Func<string, DateTime, bool> CheckExpired { get; set; }
}

public static class ClientSideCachingExtensions
{
public static void UseClientSideCaching(this RedisClient cli, ClientSideCachingOptions options)
{
new ClientSideCachingContext(cli, options)
.Start();
}

class ClientSideCachingContext
{
readonly RedisClient _cli;
readonly ClientSideCachingOptions _options;
IPubSubSubscriber _sub;

public ClientSideCachingContext(RedisClient cli, ClientSideCachingOptions options)
{
_cli = cli;
_options = options ?? new ClientSideCachingOptions();
}

public void Start()
{
_sub = _cli.Subscribe("__redis__:invalidate", InValidate) as IPubSubSubscriber;
_cli.Interceptors.Add(() => new MemoryCacheAop(this));
_cli.Unavailable += (_, e) =>
{
lock (_dictLock) _dictSort.Clear();
_dict.Clear();
};
_cli.Connected += (_, e) =>
{
e.Client.ClientTracking(true, _sub.RedisSocket.ClientId, null, false, false, false, false);
};
}

void InValidate(string chan, object msg)
{
var keys = msg as object[];
foreach (var key in keys)
RemoveCache(string.Concat(key));
}

static readonly DateTime _dt2020 = new DateTime(2020, 1, 1);
static long GetTime() => (long)DateTime.Now.Subtract(_dt2020).TotalSeconds;
/// <summary>
/// key -> Type(string|byte[]|class) -> value
/// </summary>
readonly ConcurrentDictionary<string, DictValue> _dict = new ConcurrentDictionary<string, DictValue>();
readonly SortedSet<string> _dictSort = new SortedSet<string>();
readonly object _dictLock = new object();
bool TryGetCacheValue(string key, Type valueType, out object value)
{
if (_dict.TryGetValue(key, out var trydictval) && trydictval.Values.TryGetValue(valueType, out var tryval)
//&& DateTime.Now.Subtract(_dt2020.AddSeconds(tryval.SetTime)) < TimeSpan.FromMinutes(5)
)
{
if (_options.CheckExpired?.Invoke(key, _dt2020.AddSeconds(tryval.SetTime)) == true)
{
RemoveCache(key);
value = null;
return false;
}
var time = GetTime();
if (_options.Capacity > 0)
{
lock (_dictLock)
{
_dictSort.Remove($"{trydictval.GetTime.ToString("X").PadLeft(16, '0')}{key}");
_dictSort.Add($"{time.ToString("X").PadLeft(16, '0')}{key}");
}
}
Interlocked.Exchange(ref trydictval.GetTime, time);
value = tryval.Value;
return true;
}
value = null;
return false;
}
void SetCacheValue(string command, string key, Type valueType, object value)
{
_dict.GetOrAdd(key, keyTmp =>
{
var time = GetTime();
if (_options.Capacity > 0)
{
string removeKey = null;
lock (_dictLock)
{
if (_dictSort.Count >= _options.Capacity) removeKey = _dictSort.First().Substring(16);
_dictSort.Add($"{time.ToString("X").PadLeft(16, '0')}{key}");
}
if (removeKey != null)
RemoveCache(removeKey);
}
return new DictValue(command, time);
}).Values
.AddOrUpdate(valueType, new DictValue.ObjectValue(value), (oldkey, oldval) => new DictValue.ObjectValue(value));
}
void RemoveCache(params string[] keys)
{
if (keys?.Any() != true) return;
foreach (var key in keys)
{
if (_dict.TryRemove(key, out var old))
{
if (_options.Capacity > 0)
{
lock (_dictLock)
{
_dictSort.Remove($"{old.GetTime.ToString("X").PadLeft(16, '0')}{key}");
}
}
}
}
}
class DictValue
{
public readonly ConcurrentDictionary<Type, ObjectValue> Values = new ConcurrentDictionary<Type, ObjectValue>();
public readonly string Command;
public long GetTime;
public DictValue(string command, long gettime)
{
this.Command = command;
this.GetTime = gettime;
}
public class ObjectValue
{
public readonly object Value;
public readonly long SetTime = (long)DateTime.Now.Subtract(_dt2020).TotalSeconds;
public ObjectValue(object value) => this.Value = value;
}
}

class MemoryCacheAop : IInterceptor
{
ClientSideCachingContext _cscc;
public MemoryCacheAop(ClientSideCachingContext cscc)
{
_cscc = cscc;
}

bool _iscached = false;
public void Before(InterceptorBeforeEventArgs args)
{
switch (args.Command._command)
{
case "GET":
if (_cscc.TryGetCacheValue(args.Command.GetKey(0), args.ValueType, out var getval))
{
args.Value = getval;
_iscached = true;
}
break;
case "MGET":
var mgetValType = args.ValueType.GetElementType();
var mgetKeys = args.Command._keyIndexes.Select((item, index) => args.Command.GetKey(index)).ToArray();
var mgetVals = mgetKeys.Select(a => _cscc.TryGetCacheValue(a, mgetValType, out var mgetval) ?
new DictGetResult { Value = mgetval, Exists = true } : new DictGetResult { Value = null, Exists = false })
.Where(a => a.Exists).Select(a => a.Value).ToArray();
if (mgetVals.Length == mgetKeys.Length)
{
args.Value = args.ValueType.FromObject(mgetVals);
_iscached = true;
}
break;
}
}

public void After(InterceptorAfterEventArgs args)
{
switch (args.Command._command)
{
case "GET":
if (_iscached == false && args.Exception == null)
{
var getkey = args.Command.GetKey(0);
if (_cscc._options.KeyFilter?.Invoke(getkey) != false)
_cscc.SetCacheValue(args.Command._command, getkey, args.ValueType, args.Value);
}
break;
case "MGET":
if (_iscached == false && args.Exception == null)
{
if (args.Value is Array valueArr)
{
var valueArrElementType = args.ValueType.GetElementType();
var sourceArrLen = valueArr.Length;
for (var a = 0; a < sourceArrLen; a++)
_cscc.SetCacheValue("GET", args.Command.GetKey(a), valueArrElementType, valueArr.GetValue(a));
}
}
break;
default:
if (args.Command._keyIndexes.Any())
{
var cmdset = CommandSets.Get(args.Command._command);
if (cmdset != null &&
(cmdset.Flag & CommandSets.ServerFlag.write) == CommandSets.ServerFlag.write &&
(cmdset.Tag & CommandSets.ServerTag.write) == CommandSets.ServerTag.write &&
(cmdset.Tag & CommandSets.ServerTag.@string) == CommandSets.ServerTag.@string)
{
_cscc.RemoveCache(args.Command._keyIndexes.Select((item, index) => args.Command.GetKey(index)).ToArray());
}
}
break;
}
}

class DictGetResult
{
public object Value;
public bool Exists;
}
}
}
}
}
Loading

0 comments on commit f57c555

Please sign in to comment.