Skip to content

Commit

Permalink
Merge pull request #126 from richardschneider/issue-125
Browse files Browse the repository at this point in the history
Unixfs balanced trees
  • Loading branch information
richardschneider authored Jul 13, 2019
2 parents a57f090 + 475596b commit 8d40963
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 43 deletions.
87 changes: 85 additions & 2 deletions IpfsCli/Commands/ObjectCommand.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,95 @@
using McMaster.Extensions.CommandLineUtils;
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Text;
using System.Threading.Tasks;
using Ipfs.Engine.UnixFileSystem;
using System.IO;

namespace Ipfs.Cli
{
[Command(Description = "Manage raw dag nodes [WIP]")]
class ObjectCommand : CommandBase // TODO
[Command(Description = "Manage IPFS objects")]
[Subcommand("links", typeof(ObjectLinksCommand))]
[Subcommand("get", typeof(ObjectGetCommand))]
[Subcommand("dump", typeof(ObjectDumpCommand))]
class ObjectCommand : CommandBase
{
public Program Parent { get; set; }

protected override Task<int> OnExecute(CommandLineApplication app)
{
app.ShowHelp();
return Task.FromResult(0);
}
}

[Command(Description = "Information on the links pointed to by the IPFS block")]
class ObjectLinksCommand : CommandBase
{
[Argument(0, "cid", "The content ID of the object")]
[Required]
public string Cid { get; set; }

ObjectCommand Parent { get; set; }

protected override async Task<int> OnExecute(CommandLineApplication app)
{
var Program = Parent.Parent;
var links = await Program.CoreApi.Object.LinksAsync(Cid);

return Program.Output(app, links, (data, writer) =>
{
foreach (var link in data)
{
writer.WriteLine($"{link.Id.Encode()} {link.Size} {link.Name}");
}
});
}
}

[Command(Description = "Serialise the DAG node")]
class ObjectGetCommand : CommandBase
{
[Argument(0, "cid", "The content ID of the object")]
[Required]
public string Cid { get; set; }

ObjectCommand Parent { get; set; }

protected override async Task<int> OnExecute(CommandLineApplication app)
{
var Program = Parent.Parent;
var node = await Program.CoreApi.Object.GetAsync(Cid);

return Program.Output(app, node, null);
}
}

[Command(Description = "Dump the DAG node")]
class ObjectDumpCommand : CommandBase
{
[Argument(0, "cid", "The content ID of the object")]
[Required]
public string Cid { get; set; }

ObjectCommand Parent { get; set; }

class Node
{
public DagNode Dag;
public DataMessage DataMessage;
}

protected override async Task<int> OnExecute(CommandLineApplication app)
{
var Program = Parent.Parent;
var node = new Node();
var block = await Program.CoreApi.Block.GetAsync(Cid);
node.Dag = new DagNode(block.DataStream);
node.DataMessage = ProtoBuf.Serializer.Deserialize<DataMessage>(node.Dag.DataStream);

return Program.Output(app, node, null);
}
}
}
2 changes: 1 addition & 1 deletion IpfsCli/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static int Main(string[] args)
}

var took = DateTime.Now - startTime;
Console.Write($"Took {took.TotalSeconds} seconds.");
//Console.Write($"Took {took.TotalSeconds} seconds.");

return 0;
}
Expand Down
104 changes: 68 additions & 36 deletions src/CoreApi/FileSystemApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class FileSystemApi : IFileSystemApi
static ILog log = LogManager.GetLogger(typeof(FileSystemApi));
IpfsEngine ipfs;

static readonly int DefaultLinksPerBlock = 174;

public FileSystemApi(IpfsEngine ipfs)
{
this.ipfs = ipfs;
Expand Down Expand Up @@ -63,42 +65,7 @@ public async Task<IFileSystemNode> AddAsync(
var nodes = await chunker.ChunkAsync(stream, name, options, blockService, keyChain, cancel).ConfigureAwait(false);

// Multiple nodes for the file?
FileSystemNode node = null;
if (nodes.Count() == 1)
{
node = nodes.First();
}
else
{
// Build the DAG that contains all the file nodes.
var links = nodes.Select(n => n.ToLink()).ToArray();
var fileSize = (ulong)nodes.Sum(n => n.Size);
var dm = new DataMessage
{
Type = DataType.File,
FileSize = fileSize,
BlockSizes = nodes.Select(n => (ulong) n.Size).ToArray()
};
var pb = new MemoryStream();
ProtoBuf.Serializer.Serialize<DataMessage>(pb, dm);
var dag = new DagNode(pb.ToArray(), links, options.Hash);

// Save it.
dag.Id = await blockService.PutAsync(
data: dag.ToArray(),
multiHash: options.Hash,
encoding: options.Encoding,
pin: options.Pin,
cancel: cancel).ConfigureAwait(false);

node = new FileSystemNode
{
Id = dag.Id,
Size = (long)dm.FileSize,
DagSize = dag.Size,
Links = links
};
}
FileSystemNode node = await BuildTreeAsync(nodes, options, cancel);

// Wrap in directory?
if (options.Wrap)
Expand All @@ -122,6 +89,71 @@ public async Task<IFileSystemNode> AddAsync(
return node;
}

async Task<FileSystemNode> BuildTreeAsync(
IEnumerable<FileSystemNode> nodes,
AddFileOptions options,
CancellationToken cancel)
{
if (nodes.Count() == 1)
{
return nodes.First();
}

// Bundle DefaultLinksPerBlock links into a block.
var tree = new List<FileSystemNode>();
for (int i = 0; true; ++i)
{
var bundle = nodes
.Skip(DefaultLinksPerBlock * i)
.Take(DefaultLinksPerBlock);
if (bundle.Count() == 0)
{
break;
}
var node = await BuildTreeNodeAsync(bundle, options, cancel);
tree.Add(node);
}
return await BuildTreeAsync(tree, options, cancel);
}

async Task<FileSystemNode> BuildTreeNodeAsync(
IEnumerable<FileSystemNode> nodes,
AddFileOptions options,
CancellationToken cancel)
{
var blockService = GetBlockService(options);

// Build the DAG that contains all the file nodes.
var links = nodes.Select(n => n.ToLink()).ToArray();
var fileSize = (ulong)nodes.Sum(n => n.Size);
var dagSize = nodes.Sum(n => n.DagSize);
var dm = new DataMessage
{
Type = DataType.File,
FileSize = fileSize,
BlockSizes = nodes.Select(n => (ulong)n.Size).ToArray()
};
var pb = new MemoryStream();
ProtoBuf.Serializer.Serialize<DataMessage>(pb, dm);
var dag = new DagNode(pb.ToArray(), links, options.Hash);

// Save it.
dag.Id = await blockService.PutAsync(
data: dag.ToArray(),
multiHash: options.Hash,
encoding: options.Encoding,
pin: options.Pin,
cancel: cancel).ConfigureAwait(false);

return new FileSystemNode
{
Id = dag.Id,
Size = (long)dm.FileSize,
DagSize = dagSize + dag.Size,
Links = links
};
}

public async Task<IFileSystemNode> AddDirectoryAsync(
string path,
bool recursive = true,
Expand Down
48 changes: 46 additions & 2 deletions src/UnixFileSystem/DataMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,83 @@

namespace Ipfs.Engine.UnixFileSystem
{
enum DataType
/// <summary>
/// Specifies the type of data.
/// </summary>
public enum DataType
{
/// <summary>
/// Raw data
/// </summary>
Raw = 0,

/// <summary>
/// A directory of files.
/// </summary>
Directory = 1,

/// <summary>
/// A file.
/// </summary>
File = 2,

/// <summary>
/// Metadata (NYI)
/// </summary>
Metadata = 3,

/// <summary>
/// Symbolic link (NYI)
/// </summary>
Symlink = 4,

/// <summary>
/// NYI
/// </summary>
HAMTShard = 5
};

/// <summary>
/// The ProtoBuf data that is stored in a DAG.
/// </summary>
[ProtoContract]
internal class DataMessage
public class DataMessage
{
/// <summary>
/// The type of data.
/// </summary>
[ProtoMember(1, IsRequired = true)]
public DataType Type;

/// <summary>
/// The data.
/// </summary>
[ProtoMember(2, IsRequired = false)]
public byte[] Data;

/// <summary>
/// The file size.
/// </summary>
[ProtoMember(3, IsRequired = false)]
public ulong? FileSize;

/// <summary>
/// The file size of each block.
/// </summary>
[ProtoMember(4, IsRequired = false)]
public ulong[] BlockSizes;

#pragma warning disable 0649 // disable warning about unassinged fields
/// <summary>
/// NYI
/// </summary>
[ProtoMember(5, IsRequired = false)]
public ulong? HashType;

#pragma warning disable 0649 // disable warning about unassinged fields
/// <summary>
/// NYI
/// </summary>
[ProtoMember(6, IsRequired = false)]
public ulong? Fanout;
}
Expand Down
2 changes: 1 addition & 1 deletion src/UnixFileSystem/FileSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Ipfs.Engine.UnixFileSystem
/// </summary>
public static class FileSystem
{
static byte[] emptyData = new byte[0];
static readonly byte[] emptyData = new byte[0];

/// <summary>
/// Creates a stream that can read the supplied <see cref="Cid"/>.
Expand Down
2 changes: 1 addition & 1 deletion src/UnixFileSystem/SizeChunker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class SizeChunker
/// A task that represents the asynchronous operation. The task's value is
/// the sequence of file system nodes of the added data blocks.
/// </returns>
public async Task<IEnumerable<FileSystemNode>> ChunkAsync(
public async Task<List<FileSystemNode>> ChunkAsync(
Stream stream,
string name,
AddFileOptions options,
Expand Down
42 changes: 42 additions & 0 deletions test/CoreApi/FileSystemApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,48 @@ public void AddFile_Large()
Console.WriteLine("Readfile file took {0} seconds.", stopWatch.Elapsed.TotalSeconds);
}


/// <seealso href="https://github.com/richardschneider/net-ipfs-engine/issues/125"/>
[TestMethod]
public void AddFile_Larger()
{
AddFile(); // warm up

var path = "starx2.mp4";
var ipfs = TestFixture.Ipfs;
var stopWatch = new Stopwatch();
stopWatch.Start();
var node = ipfs.FileSystem.AddFileAsync(path).Result;
stopWatch.Stop();
Console.WriteLine("Add file took {0} seconds.", stopWatch.Elapsed.TotalSeconds);

Assert.AreEqual("QmeFhfB4g2GFbxYb7usApWzq8uC1vmuxJajFpiJiT5zLoy", (string)node.Id);

var k = 8 * 1024;
var buffer1 = new byte[k];
var buffer2 = new byte[k];
stopWatch.Restart();
using (var localStream = new FileStream(path, FileMode.Open, FileAccess.Read))
using (var ipfsStream = ipfs.FileSystem.ReadFileAsync(node.Id).Result)
{
while (true)
{
var n1 = localStream.Read(buffer1, 0, k);
var n2 = ipfsStream.Read(buffer2, 0, k);
Assert.AreEqual(n1, n2);
if (n1 == 0)
break;
for (var i = 0; i < n1; ++i)
{
if (buffer1[i] != buffer2[i])
Assert.Fail("data not the same");
}
}
}
stopWatch.Stop();
Console.WriteLine("Readfile file took {0} seconds.", stopWatch.Elapsed.TotalSeconds);
}

[TestMethod]
public async Task AddFile_Wrap()
{
Expand Down
3 changes: 3 additions & 0 deletions test/IpfsEngineTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
</ItemGroup>

<ItemGroup>
<None Update="starx2.mp4">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="star_trails.mp4">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
Expand Down
Binary file added test/starx2.mp4
Binary file not shown.

0 comments on commit 8d40963

Please sign in to comment.