Skip to content

Commit

Permalink
Release 0.16.1. Added an internal semaphore to CachedNameApi to synch…
Browse files Browse the repository at this point in the history
…ronize access across threads.
  • Loading branch information
Arlodotexe committed Jun 1, 2024
1 parent 174ffc4 commit fb29abd
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 78 deletions.
169 changes: 92 additions & 77 deletions src/Cache/CachedNameApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Ipfs;
using Ipfs.CoreApi;
using OwlCore.ComponentModel;
using OwlCore.Extensions;
using OwlCore.Storage;

namespace OwlCore.Kubo.Cache;
Expand All @@ -14,16 +15,18 @@ namespace OwlCore.Kubo.Cache;
/// </remarks>
public class CachedNameApi : SettingsBase, INameApi, IDelegable<INameApi>, IFlushable
{
private readonly SemaphoreSlim _cacheUpdateMutex = new(1, 1);

/// <summary>
/// The cached record for a published path name in a <see cref="CachedNameApi"/>.
/// </summary>
public record PublishedPathName(string path, bool resolve, string key, TimeSpan? lifetime, NamedContent returnValue);

/// <summary>
/// The cached record for a published cid name in a <see cref="CachedNameApi"/>.
/// </summary>
public record PublishedCidName(Cid id, string key, TimeSpan? lifetime, NamedContent returnValue);

/// <summary>
/// The cached record for a resolved name in a <see cref="CachedNameApi"/>.
/// </summary>
Expand Down Expand Up @@ -80,73 +83,82 @@ public List<PublishedPathName> PublishedStringNamedContent
/// <param name="cancellationToken">A token that can be used to cancel the ongoing operation.</param>
public async Task FlushAsync(CancellationToken cancellationToken = default)
{
foreach (var item in PublishedCidNamedContent.ToArray())
using (await _cacheUpdateMutex.DisposableWaitAsync(cancellationToken))
{
cancellationToken.ThrowIfCancellationRequested();
foreach (var item in PublishedCidNamedContent)
{
cancellationToken.ThrowIfCancellationRequested();

Console.WriteLine($"Flushing key {item.key} with value {item.id}");
Console.WriteLine($"Flushing key {item.key} with value {item.id}");

// Publish to ipfs
var result = await Inner.PublishAsync(item.id, item.key, item.lifetime, cancellationToken);
// Publish to ipfs
var result = await Inner.PublishAsync(item.id, item.key, item.lifetime, cancellationToken);

// Verify result matches original returned data
_ = Guard.Equals(result.ContentPath, item.returnValue.ContentPath);
_ = Guard.Equals(result.NamePath, item.returnValue.NamePath);
// Verify result matches original returned data
_ = Guard.Equals(result.ContentPath, item.returnValue.ContentPath);
_ = Guard.Equals(result.NamePath, item.returnValue.NamePath);

// Update cache
PublishedCidNamedContent.Remove(item);
PublishedCidNamedContent.Add(item with { returnValue = result });
}
// Update cache
PublishedCidNamedContent.Remove(item);
PublishedCidNamedContent.Add(item with { returnValue = result });
}

foreach (var item in PublishedStringNamedContent)
{
cancellationToken.ThrowIfCancellationRequested();
foreach (var item in PublishedStringNamedContent)
{
cancellationToken.ThrowIfCancellationRequested();

Console.WriteLine($"Flushing key {item.key} with value {item.path}");
Console.WriteLine($"Flushing key {item.key} with value {item.path}");

// Publish to ipfs
var result = await Inner.PublishAsync(item.path, item.resolve, item.key, item.lifetime, cancellationToken);
// Publish to ipfs
var result = await Inner.PublishAsync(item.path, item.resolve, item.key, item.lifetime, cancellationToken);

// Verify result matches original returned data
_ = Guard.Equals(result.ContentPath, item.returnValue.ContentPath);
_ = Guard.Equals(result.NamePath, item.returnValue.NamePath);
// Verify result matches original returned data
_ = Guard.Equals(result.ContentPath, item.returnValue.ContentPath);
_ = Guard.Equals(result.NamePath, item.returnValue.NamePath);

// Update cache
PublishedStringNamedContent.Remove(item);
PublishedStringNamedContent.Add(item with { returnValue = result });
// Update cache
PublishedStringNamedContent.Remove(item);
PublishedStringNamedContent.Add(item with { returnValue = result });
}
}
}

/// <inheritdoc />
public async Task<NamedContent> PublishAsync(string path, bool resolve = true, string key = "self", TimeSpan? lifetime = null, CancellationToken cancel = default)
{
if (PublishedStringNamedContent.FirstOrDefault(x => x.key == key) is { } existing)
PublishedStringNamedContent.Remove(existing);
using (await _cacheUpdateMutex.DisposableWaitAsync(cancel))
{
if (PublishedStringNamedContent.FirstOrDefault(x => x.key == key) is { } existing)
PublishedStringNamedContent.Remove(existing);

var keys = await KeyApi.ListAsync(cancel);
var existingKey = keys.FirstOrDefault(x => x.Name == key);
var keyId = existingKey?.Id;
var keys = await KeyApi.ListAsync(cancel);
var existingKey = keys.FirstOrDefault(x => x.Name == key);
var keyId = existingKey?.Id;

NamedContent published = new() { ContentPath = path, NamePath = $"/ipns/{keyId}" };
NamedContent published = new() { ContentPath = path, NamePath = $"/ipns/{keyId}" };

PublishedStringNamedContent.Add(new(path, resolve, key, lifetime, published));
return published;
PublishedStringNamedContent.Add(new(path, resolve, key, lifetime, published));
return published;
}
}

/// <inheritdoc />
public async Task<NamedContent> PublishAsync(Cid id, string key = "self", TimeSpan? lifetime = null, CancellationToken cancel = default)
{
if (PublishedCidNamedContent.FirstOrDefault(x => x.key == key) is { } existing)
PublishedCidNamedContent.Remove(existing);
using (await _cacheUpdateMutex.DisposableWaitAsync(cancel))
{
if (PublishedCidNamedContent.FirstOrDefault(x => x.key == key) is { } existing)
PublishedCidNamedContent.Remove(existing);

var keys = await KeyApi.ListAsync(cancel);
var existingKey = keys.FirstOrDefault(x => x.Name == key);
var keyId = existingKey?.Id;
var keys = await KeyApi.ListAsync(cancel);
var existingKey = keys.FirstOrDefault(x => x.Name == key);
var keyId = existingKey?.Id;

NamedContent published = new() { ContentPath = $"/ipfs/{id}", NamePath = $"/ipns/{keyId}" };
PublishedCidNamedContent.Add(new(id, key, lifetime, published));
NamedContent published = new() { ContentPath = $"/ipfs/{id}", NamePath = $"/ipns/{keyId}" };
PublishedCidNamedContent.Add(new(id, key, lifetime, published));

return published;
return published;
}
}

/// <inheritdoc />
Expand All @@ -157,49 +169,52 @@ public async Task<NamedContent> PublishAsync(Cid id, string key = "self", TimeSp
/// </remarks>
public async Task<string> ResolveAsync(string name, bool recursive = false, bool nocache = false, CancellationToken cancel = default)
{
if (nocache)
using (await _cacheUpdateMutex.DisposableWaitAsync(cancel))
{
try
if (nocache)
{
// Don't resolve with cache, but still save resolved data to cache.
var resToCache = await Inner.ResolveAsync(name, recursive, nocache, cancel);

var existing = ResolvedNames.FirstOrDefault(x => x.name == name);
if (existing is not null)
ResolvedNames.Remove(existing);

ResolvedNames.Add(new(name, recursive, resToCache));

return resToCache;
try
{
// Don't resolve with cache, but still save resolved data to cache.
var resToCache = await Inner.ResolveAsync(name, recursive, nocache, cancel);

var existing = ResolvedNames.FirstOrDefault(x => x.name == name);
if (existing is not null)
ResolvedNames.Remove(existing);

ResolvedNames.Add(new(name, recursive, resToCache));

return resToCache;
}
catch
{
// request failed, continue with cache anyway
}
}
catch

// Check if name is in published cache
if (PublishedCidNamedContent.FirstOrDefault(x => x.returnValue.NamePath is not null && (name.Contains(x.returnValue.NamePath) || x.returnValue.NamePath.Contains(name))) is { } publishedCidNamedContent)
{
// request failed, continue with cache anyway
if (publishedCidNamedContent.returnValue.ContentPath is not null)
return publishedCidNamedContent.returnValue.ContentPath;
}
}

// Check if name is in published cache
if (PublishedCidNamedContent.FirstOrDefault(x => x.returnValue.NamePath is not null && (name.Contains(x.returnValue.NamePath) || x.returnValue.NamePath.Contains(name))) is { } publishedCidNamedContent)
{
if (publishedCidNamedContent.returnValue.ContentPath is not null)
return publishedCidNamedContent.returnValue.ContentPath;
}

// Check in other published cache
if (PublishedStringNamedContent.FirstOrDefault(x => x.returnValue.NamePath is not null && (name.Contains(x.returnValue.NamePath) || x.returnValue.NamePath.Contains(name))) is { } publishedStringNamedContent)
{
if (publishedStringNamedContent.returnValue.ContentPath is not null)
return publishedStringNamedContent.returnValue.ContentPath;
}
// Check in other published cache
if (PublishedStringNamedContent.FirstOrDefault(x => x.returnValue.NamePath is not null && (name.Contains(x.returnValue.NamePath) || x.returnValue.NamePath.Contains(name))) is { } publishedStringNamedContent)
{
if (publishedStringNamedContent.returnValue.ContentPath is not null)
return publishedStringNamedContent.returnValue.ContentPath;
}

// Check if previously resolved.
if (ResolvedNames.FirstOrDefault(x => x.name == name) is { } resolvedName)
return resolvedName.returnValue;
// Check if previously resolved.
if (ResolvedNames.FirstOrDefault(x => x.name == name) is { } resolvedName)
return resolvedName.returnValue;

// If not, resolve the name and cache.
var result = await Inner.ResolveAsync(name, recursive, nocache, cancel);
ResolvedNames.Add(new(name, recursive, result));
// If not, resolve the name and cache.
var result = await Inner.ResolveAsync(name, recursive, nocache, cancel);
ResolvedNames.Add(new(name, recursive, result));

return result;
return result;
}
}
}
6 changes: 5 additions & 1 deletion src/OwlCore.Kubo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>

<Author>Arlo Godfrey</Author>
<Version>0.16.0</Version>
<Version>0.16.1</Version>
<Product>OwlCore</Product>
<Description>
An essential toolkit for Kubo, IPFS and the distributed web.
</Description>
<PackageLicenseFile>LICENSE.txt</PackageLicenseFile>
<PackageReleaseNotes>
--- 0.16.1 ---
[Fixes]
Added an internal semaphore to CachedNameApi to synchronize access across threads.

--- 0.16.0 ---
[Breaking]
Inherited breaking changes from IpfsShipyard.Net.Http.Client 0.2.0.
Expand Down

0 comments on commit fb29abd

Please sign in to comment.