Skip to content

Commit

Permalink
Class Serialisation Performance Maintenance (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloneguid authored Mar 28, 2023
1 parent 1a5dde2 commit fe69a55
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/full.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: 'Full Workflow'

env:
VERSION: 4.6.1
VERSION: 4.6.2
ASM_VERSION: 4.0.0

on:
Expand Down
63 changes: 63 additions & 0 deletions src/Parquet.PerfRunner/Benchmarks/Classes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using Microsoft.Win32.SafeHandles;
using Parquet.Serialization;

namespace Parquet.PerfRunner.Benchmarks {

class Record {
public DateTime Timestamp { get; set; }
public string? EventName { get; set; }
public double MeterValue { get; set; }
}


[ShortRunJob]
[MeanColumn]
[MemoryDiagnoser]
[MarkdownExporter]
public class Classes {
private List<Record>? _testData;
private MemoryStream _ms = new MemoryStream();

[GlobalSetup]
public async Task SetUp() {
_testData = Enumerable.Range(0, 1_000).Select(i => new Record {
Timestamp = DateTime.UtcNow.AddSeconds(i),
EventName = i % 2 == 0 ? "on" : "off",
MeterValue = i
}).ToList();

await ParquetSerializer.SerializeAsync(_testData, _ms);
}


[Benchmark(Baseline = true)]
public async Task Serialise_Legacy() {
using var ms = new MemoryStream();
await ParquetConvert.SerializeAsync(_testData, ms);
}

[Benchmark]
public async Task Deserialise_Legacy() {
_ms.Position = 0;
await ParquetConvert.DeserializeAsync<Record>(_ms);
}

[Benchmark]
public async Task Serialise() {
using var ms = new MemoryStream();
await ParquetSerializer.SerializeAsync(_testData, ms);
}

[Benchmark]
public async Task Deserialise() {
_ms.Position = 0;
await ParquetSerializer.DeserializeAsync<Record>(_ms);
}
}
}
8 changes: 7 additions & 1 deletion src/Parquet.PerfRunner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@
case "progression":
VersionedBenchmark.Run();
break;
case "classes":
BenchmarkRunner.Run<Classes>();
break;
}
} else {
//new VsParquetSharp().Main();
await new DataTypes().NullableInts();
//await new DataTypes().NullableInts();
var c = new Classes();
c.SetUp();
c.Serialise();
}
25 changes: 25 additions & 0 deletions src/Parquet.Test/Serialisation/ParquetSerializerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,31 @@ public async Task Struct_WithNullProps_Serde() {
Assert.Equivalent(data2, data);
}

//[Fact]
public async Task Struct_With_NestedNulls_Serde() {

var data = new List<AddressBookEntry> {
new AddressBookEntry {
FirstName = "Joe",
LastName = "Bloggs",
Address = new Address() {
City = null,
Country = null
}
}
};

// serialiser puts (null, 0) for Address.City, but should put (null, 1)

using var ms = new MemoryStream();
await ParquetSerializer.SerializeAsync(data, ms);

ms.Position = 0;
IList<AddressBookEntry> data2 = await ParquetSerializer.DeserializeAsync<AddressBookEntry>(ms);

XAssert.JsonEquivalent(data, data2);
}

[Fact]
public async Task List_Structs_Serde() {
var data = Enumerable.Range(0, 1_000).Select(i => new MovementHistory {
Expand Down
4 changes: 2 additions & 2 deletions src/Parquet/Globals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static class Globals {
/// </summary>
public static readonly string GithubSha = "${GITHUB_SHA}";

internal const string DataTypeEnumObsolete = "Please resort to using System.Type overloads. Will be removed in v6.";
internal const string ParquetConvertObsolete = "ParquetConvert was an experimental project and is not obsolete. Consider switching to ParquetSerializer which supports all data types, including nested ones, and is just superior. ParquetConvert will be removed in v6.";
internal const string DataTypeEnumObsolete = "Please resort to using System.Type overloads. Will be removed in v5.";
internal const string ParquetConvertObsolete = "ParquetConvert was an experimental project and is now obsolete. Consider switching to ParquetSerializer which supports all data types, including nested ones, and is just superior. ParquetConvert will be removed in v5.";
}
}
4 changes: 3 additions & 1 deletion src/Parquet/Serialization/Dremel/FieldAssemblerCompiler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,15 @@ private static void Discover(Field field, out bool isRepeated) {
(field.SchemaType == SchemaType.Data && field is DataField rdf && rdf.IsArray);
}

#if DEBUG
private static void InjectLevelDebug(string levelPropertyName,
object value, int dataIdx,
int dl, int rl,
int dlDepth, int rlDepth,
int[] rsm) {
Console.WriteLine("debug");
}
#endif

/// <summary>
/// Transitions RSM for current RL iteration
Expand Down Expand Up @@ -259,7 +261,7 @@ private Expression InjectLevel(Expression rootVar, Type rootType, Field[] levelF
} else {
if(isAtomic) {

// C#: dlDepth <= _dlVar?
// C#: dlDepth == _dlVar?
iteration =
Expression.IfThen(
Expression.Equal(Expression.Constant(dlDepth), _dlVar),
Expand Down
53 changes: 32 additions & 21 deletions src/Parquet/Serialization/Dremel/FieldStriperCompiler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class FieldStriperCompiler<TClass> {

private readonly ParquetSchema _schema;
private readonly DataField _df;
private readonly bool _hasRls;
private readonly bool _hasDls;

// input parameters
private readonly ParameterExpression _dfParam = Expression.Parameter(typeof(DataField), "df");
Expand All @@ -34,10 +36,14 @@ class FieldStriperCompiler<TClass> {
// currently iterated class element
private readonly ParameterExpression _classElementVar = Expression.Variable(typeof(TClass), "curr");

private static readonly Expression NullListOfInt = Expression.Convert(Expression.Constant(null), typeof(List<int>));

public FieldStriperCompiler(ParquetSchema schema, DataField df) {

_schema = schema;
_df = df;
_hasRls = _df.MaxRepetitionLevel > 0;
_hasDls = _df.MaxDefinitionLevel > 0;

//
_valuesListType = typeof(List<>).MakeGenericType(df.ClrType);
Expand Down Expand Up @@ -82,38 +88,40 @@ private Expression WriteValue(ParameterExpression valueVar,

// only need RL and DL-1
Expression.Block(
Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl - 1)),
Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar)),
_hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl - 1)) : Expression.Empty(),
_hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty()),

// everything, but value must be non-null
Expression.Block(
Expression.Call(_valuesVar, _valuesListAddMethod, getNonNullValue),
Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)),
Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar)));
_hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)) : Expression.Empty(),
_hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty()));

} else {
// required atomics are simple - add value, RL and DL as is
return Expression.Block(
Expression.Call(_valuesVar, _valuesListAddMethod, valueVar),
Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)),
Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar));
_hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)) : Expression.Empty(),
_hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty());
}
}

// non-atomics still need RL and DL dumped
return Expression.Block(
Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)),
Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar));
_hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)) : Expression.Empty(),
_hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty());

}

private Expression WriteMissingValue(int dl, Expression currentRlVar) {
return Expression.Block(
Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)),
Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar));
_hasDls ? Expression.Call(_dlsVar, LevelsAddMethod, Expression.Constant(dl)) : Expression.Empty(),
_hasRls ? Expression.Call(_rlsVar, LevelsAddMethod, currentRlVar) : Expression.Empty());
}

private Expression WhileBody(Expression element, bool isAtomic, int dl, ParameterExpression currentRlVar, ParameterExpression seenFieldsVar, Field field, int rlDepth, Type elementType, List<string> path) {
private Expression WhileBody(Expression element, bool isAtomic, int dl, ParameterExpression currentRlVar,
ParameterExpression seenFieldsVar, Field field, int rlDepth, Type elementType, List<string> path) {

string suffix = field.Path.ToString().Replace(".", "_");
ParameterExpression chRepetitionLevelVar = Expression.Variable(typeof(int), $"chRepetitionLevel_{suffix}");
ParameterExpression valueVar = Expression.Variable(elementType, $"value_{suffix}");
Expand All @@ -127,13 +135,15 @@ private Expression WhileBody(Expression element, bool isAtomic, int dl, Paramete
// L9-13
Expression.IfThenElse(
// if seenFields.Contains(field.Path)
Expression.Call(seenFieldsVar, typeof(HashSet<string>).GetMethod("Contains")!, Expression.Constant(field.Path.ToString())),
//Expression.Call(seenFieldsVar, typeof(HashSet<string>).GetMethod("Contains")!, Expression.Constant(field.Path.ToString())),
Expression.IsTrue(seenFieldsVar),

// chRepetitionLevelVar = treeDepth
Expression.Assign(chRepetitionLevelVar, Expression.Constant(rlDepth)),

// seenFields.Add(field.Path)
Expression.Call(seenFieldsVar, typeof(HashSet<string>).GetMethod("Add")!, Expression.Constant(field.Path.ToString()))
//Expression.Call(seenFieldsVar, typeof(HashSet<string>).GetMethod("Add")!, Expression.Constant(field.Path.ToString()))
Expression.Assign(seenFieldsVar, Expression.Constant(true))
),

// L14-
Expand Down Expand Up @@ -195,13 +205,14 @@ private Expression DissectRecord(
Expression levelProperty = Expression.Property(rootVar, levelPropertyName);
Type levelPropertyType = rootType.GetProperty(levelPropertyName)!.PropertyType;
ParameterExpression seenFieldsVar = Expression.Variable(typeof(HashSet<string>), $"seenFieldsVar_{levelPropertyName}");
ParameterExpression seenVar = Expression.Variable(typeof(bool), $"seen_{levelPropertyName}");

Expression extraBody;
if(isRepeated) {
Type elementType = ExtractElementTypeFromEnumerableType(levelPropertyType);
Expression collection = levelProperty;
ParameterExpression element = Expression.Variable(elementType, "element");
Expression elementProcessor = WhileBody(element, isAtomic, dl, currentRlVar, seenFieldsVar, field, rlDepth, elementType, path);
Expression elementProcessor = WhileBody(element, isAtomic, dl, currentRlVar, seenVar, field, rlDepth, elementType, path);
extraBody = elementProcessor.Loop(collection, elementType, element);

// todo: if levelProperty (collection) is null, we need extra iteration with null value (which rep and def level?)
Expand All @@ -212,12 +223,12 @@ private Expression DissectRecord(
extraBody);
} else {
Expression element = levelProperty;
extraBody = WhileBody(element, isAtomic, dl, currentRlVar, seenFieldsVar, field, rlDepth, levelPropertyType, path);
extraBody = WhileBody(element, isAtomic, dl, currentRlVar, seenVar, field, rlDepth, levelPropertyType, path);
}

return Expression.Block(
new[] { seenFieldsVar },
Expression.Assign(seenFieldsVar, Expression.New(typeof(HashSet<string>))),
new[] { seenVar },
Expression.Assign(seenVar, Expression.Constant(false)),
extraBody);
}

Expand All @@ -236,16 +247,16 @@ public FieldStriper<TClass> Compile() {
// init 3 building blocks
Expression.Block(
Expression.Assign(_valuesVar, Expression.New(_valuesListType)),
Expression.Assign(_dlsVar, Expression.New(typeof(List<int>))),
Expression.Assign(_rlsVar, Expression.New(typeof(List<int>)))),
Expression.Assign(_dlsVar, _hasDls ? Expression.New(typeof(List<int>)) : NullListOfInt),
Expression.Assign(_rlsVar, _hasRls ? Expression.New(typeof(List<int>)) : NullListOfInt)),

iterationLoop,

// result: use triple to construct ShreddedColumn and return (last element in the block)
Expression.New(ShreddedColumnConstructor,
Expression.Call(_valuesVar, _valuesListType.GetMethod("ToArray")!),
_df.MaxDefinitionLevel == 0 ? Expression.Convert(Expression.Constant(null), typeof(List<int>)) : _dlsVar,
_df.MaxRepetitionLevel == 0 ? Expression.Convert(Expression.Constant(null), typeof(List<int>)) : _rlsVar)
_dlsVar,
_rlsVar)
);

Func<DataField, IEnumerable<TClass>, ShreddedColumn> lambda = Expression
Expand Down
22 changes: 20 additions & 2 deletions src/Parquet/Serialization/ParquetSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ namespace Parquet.Serialization {
/// </summary>
public static class ParquetSerializer {

private static readonly Dictionary<Type, object> _typeToStriper = new();
private static readonly Dictionary<Type, object> _typeToAssembler = new();

/// <summary>
/// Serialize
/// </summary>
Expand All @@ -31,7 +34,14 @@ public static async Task<ParquetSchema> SerializeAsync<T>(IEnumerable<T> objectI
ParquetSerializerOptions? options = null,
CancellationToken cancellationToken = default) {

Striper<T> striper = new Striper<T>(typeof(T).GetParquetSchema(false));
Striper<T> striper;

if(_typeToStriper.TryGetValue(typeof(T), out object? boxedStriper)) {
striper = (Striper<T>)boxedStriper;
} else {
striper = new Striper<T>(typeof(T).GetParquetSchema(false));
_typeToStriper[typeof(T)] = striper;
}

bool append = options != null && options.Append;
using(ParquetWriter writer = await ParquetWriter.CreateAsync(striper.Schema, destination, null, append, cancellationToken)) {
Expand Down Expand Up @@ -86,7 +96,15 @@ public static async Task<IList<T>> DeserializeAsync<T>(Stream source,
CancellationToken cancellationToken = default)
where T : new() {

Assembler<T> asm = new Assembler<T>(typeof(T).GetParquetSchema(true));
Assembler<T> asm;

if(_typeToAssembler.TryGetValue(typeof(T), out object? boxedAssembler)) {
asm = (Assembler<T>)boxedAssembler;
} else {
asm = new Assembler<T>(typeof(T).GetParquetSchema(true));
_typeToAssembler[typeof(T)] = asm;
}

var result = new List<T>();

using ParquetReader reader = await ParquetReader.CreateAsync(source, new ParquetOptions { UnpackDefinitions = false });
Expand Down

0 comments on commit fe69a55

Please sign in to comment.