-
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathCommon.cs
97 lines (78 loc) · 2.88 KB
/
Common.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;
namespace RecordParser.Benchmark
{
public static class Common
{
public const int BufferSize = 4_096; // 2^12
public static async Task ProcessFile(string filePath, FuncSpanT<Person> parser, int limitRecord)
{
using var stream = File.OpenRead(filePath);
PipeReader reader = PipeReader.Create(stream, new(null, BufferSize, 1024, false));
var i = 0;
while (true)
{
ReadResult read = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = read.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> sequence))
{
if (i++ == limitRecord) return;
var person = ProcessSequence(sequence, parser);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (read.IsCompleted)
{
break;
}
}
}
private static bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
var position = buffer.PositionOf((byte)'\n');
if (position == null)
{
line = default;
return false;
}
line = buffer.Slice(0, position.Value);
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
private static T ProcessSequence<T>(ReadOnlySequence<byte> sequence, FuncSpanT<T> parser)
{
const int LengthLimit = 256;
if (sequence.IsSingleSegment)
{
return Parse(sequence.FirstSpan, parser);
}
var length = (int)sequence.Length;
if (length > LengthLimit)
{
throw new ArgumentException($"Line has a length exceeding the limit: {length}");
}
Span<byte> span = stackalloc byte[(int)sequence.Length];
sequence.CopyTo(span);
return Parse(span, parser);
}
private static T Parse<T>(ReadOnlySpan<byte> bytes, FuncSpanT<T> parser)
{
Span<char> chars = stackalloc char[bytes.Length];
Encoding.UTF8.GetChars(bytes, chars);
return parser(chars);
}
public static ReadOnlySpan<char> ParseChunk(ref ReadOnlySpan<char> span, ref int scanned, ref int position)
{
scanned += position + 1;
position = span.Slice(scanned, span.Length - scanned).IndexOf(',');
if (position < 0)
{
position = span.Slice(scanned, span.Length - scanned).Length;
}
return span.Slice(scanned, position);
}
}
}