From 750bc7d508f46262c9d5575fd290e4ed82ddd215 Mon Sep 17 00:00:00 2001 From: Shayne van Asperen Date: Mon, 13 Dec 2021 16:08:39 +0000 Subject: [PATCH] Extract common base class for ICryptoOrderBook implementations * Fix bug in CryptoOrderBookL3PerformanceTests and tweak test parameters * Don't dispose the given IOrderBookSource --- Directory.Build.props | 1 + .../Models/CryptoQuotes.cs | 10 +- .../OrderBooks/CryptoOrderBook.cs | 880 ++---------------- .../OrderBooks/CryptoOrderBookBase.cs | 795 ++++++++++++++++ .../OrderBooks/CryptoOrderBookL2.cs | 826 ++-------------- .../OrderBooks/Models/IOrderBookChangeInfo.cs | 7 +- .../OrderBooks/Models/OrderBookChangeInfo.cs | 9 +- .../CryptoOrderBookL2Tests.cs | 4 +- .../CryptoOrderBookL3PerformanceTests.cs | 2 +- .../CryptoOrderBookTests.cs | 4 +- .../OrderBookExample.cs | 2 +- 11 files changed, 952 insertions(+), 1588 deletions(-) create mode 100644 src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookBase.cs diff --git a/Directory.Build.props b/Directory.Build.props index ef44980..3504f95 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -3,6 +3,7 @@ + latest 2.7.1 diff --git a/src/Crypto.Websocket.Extensions.Core/Models/CryptoQuotes.cs b/src/Crypto.Websocket.Extensions.Core/Models/CryptoQuotes.cs index d3c464b..a4ce59f 100644 --- a/src/Crypto.Websocket.Extensions.Core/Models/CryptoQuotes.cs +++ b/src/Crypto.Websocket.Extensions.Core/Models/CryptoQuotes.cs @@ -23,27 +23,27 @@ public CryptoQuotes(double bid, double ask, double bidAmount, double askAmount) /// /// Top level bid price /// - public double Bid { get; } + public double Bid { get; internal set; } /// /// Top level ask price /// - public double Ask { get; } + public double Ask { get; internal set; } /// /// Current mid price /// - public double Mid { get; } + public double Mid { get; internal set; } /// /// Top level bid amount /// - public double BidAmount { get; } + public double BidAmount { get; internal set; } /// /// Top level ask amount /// - public double AskAmount { get; } + public double AskAmount { get; internal set; } /// /// Returns true if quotes are in valid state diff --git a/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBook.cs b/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBook.cs index fd2b62b..d3c8934 100644 --- a/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBook.cs +++ b/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBook.cs @@ -21,41 +21,14 @@ namespace Crypto.Websocket.Extensions.Core.OrderBooks /// Process order book data from one source and one target pair. /// Only first levels are computed in advance, allocates more memory than CryptoOrderBookL2 counterpart. /// - [DebuggerDisplay("CryptoOrderBook [{TargetPair} {TargetType}] bid: {BidPrice} ({_bidLevels.Count}) ask: {AskPrice} ({_askLevels.Count})")] - public class CryptoOrderBook : ICryptoOrderBook + [DebuggerDisplay("CryptoOrderBook [{TargetPair} {TargetType}] bid: {BidPrice} ({BidLevelsInternal.Count}) ask: {AskPrice} ({AskLevelsInternal.Count})")] + public class CryptoOrderBook : CryptoOrderBookBase { - private readonly object _locker = new object(); - - private readonly IOrderBookSource _source; - - private readonly Subject _bidAskUpdated = new Subject(); - private readonly Subject _topLevelUpdated = new Subject(); - private readonly Subject _orderBookUpdated = new Subject(); - - private readonly SortedList _bidLevels = new SortedList(new DescendingComparer()); - private readonly SortedList _askLevels = new SortedList(); - - private readonly OrderBookLevelsOrderPerPrice _bidLevelOrdering = new OrderBookLevelsOrderPerPrice(200); - private readonly OrderBookLevelsOrderPerPrice _askLevelOrdering = new OrderBookLevelsOrderPerPrice(200); - - private readonly OrderBookLevelsById _allBidLevels = new OrderBookLevelsById(500); - private readonly OrderBookLevelsById _allAskLevels = new OrderBookLevelsById(500); - - private bool _isSnapshotLoaded; - private Timer? _snapshotReloadTimer; - private TimeSpan _snapshotReloadTimeout = TimeSpan.FromMinutes(1); - private bool _snapshotReloadEnabled; - - private Timer? _validityCheckTimer; - private TimeSpan _validityCheckTimeout = TimeSpan.FromSeconds(5); - private bool _validityCheckEnabled = true; - private int _validityCheckCounter; + private readonly OrderBookLevelsOrderPerPrice _bidLevelOrdering = new(200); + private readonly OrderBookLevelsOrderPerPrice _askLevelOrdering = new(200); private readonly int _priceLevelInitialCapacity = 2; - private IDisposable? _subscriptionDiff; - private IDisposable? _subscriptionSnapshot; - /// /// Cryptocurrency order book. /// Process order book data from one source per one target pair. @@ -63,503 +36,110 @@ public class CryptoOrderBook : ICryptoOrderBook /// Select target pair /// Provide order book data source /// Select target precision (default: All - accepts every type of data) - public CryptoOrderBook(string targetPair, IOrderBookSource source, CryptoOrderBookType targetType = CryptoOrderBookType.All) + public CryptoOrderBook(string targetPair, IOrderBookSource source, CryptoOrderBookType targetType = CryptoOrderBookType.All) : base(targetPair, source) { - CryptoValidations.ValidateInput(targetPair, nameof(targetPair)); - CryptoValidations.ValidateInput(source, nameof(source)); - - TargetPairOriginal = targetPair; - TargetPair = CryptoPairsHelper.Clean(targetPair); - _source = source; TargetType = targetType; - if (targetType == CryptoOrderBookType.L1 || targetType == CryptoOrderBookType.L2) + if (targetType is CryptoOrderBookType.L1 or CryptoOrderBookType.L2) { // save memory, only one order at price level _priceLevelInitialCapacity = 1; } - else if (targetType == CryptoOrderBookType.L3) + else if (targetType is CryptoOrderBookType.L3) { // better performance, could be multiple orders at price level _priceLevelInitialCapacity = 5; } - Subscribe(); - RestartAutoSnapshotReloading(); - RestartValidityChecking(); + Initialize(); } - /// - /// Dispose background processing - /// - public void Dispose() - { - DeactivateAutoSnapshotReloading(); - DeactivateValidityChecking(); - _source.Dispose(); - _subscriptionDiff?.Dispose(); - _subscriptionSnapshot?.Dispose(); - } - - /// - /// Origin exchange name - /// - public string ExchangeName => _source.ExchangeName; - - /// - /// Target pair for this order book data - /// - public string TargetPair { get; } - - /// - /// Originally provided target pair for this order book data - /// - public string TargetPairOriginal { get; } - - /// - /// Order book type, which precision it supports - /// - public CryptoOrderBookType TargetType { get; } - - /// - /// Time interval for auto snapshot reloading. - /// Default 1 min. - /// - public TimeSpan SnapshotReloadTimeout - { - get => _snapshotReloadTimeout; - set - { - _snapshotReloadTimeout = value; - RestartAutoSnapshotReloading(); - } - } - - /// - /// Whenever auto snapshot reloading feature is enabled. - /// Disabled by default - /// - public bool SnapshotReloadEnabled - { - get => _snapshotReloadEnabled; - set - { - _snapshotReloadEnabled = value; - RestartAutoSnapshotReloading(); - } - } - - /// - /// Time interval for validity checking. - /// It forces snapshot reloading whenever invalid state. - /// Default 5 sec. - /// - public TimeSpan ValidityCheckTimeout - { - get => _validityCheckTimeout; - set - { - _validityCheckTimeout = value; - RestartValidityChecking(); - } - } - - /// - /// How many times it should check validity before processing snapshot reload. - /// Default 6 times (which is 6 * 5sec = 30sec). - /// - public int ValidityCheckLimit { get; set; } = 6; - - /// - /// Whenever validity checking feature is enabled. - /// It forces snapshot reloading whenever invalid state. - /// Enabled by default - /// - public bool ValidityCheckEnabled - { - get => _validityCheckEnabled; - set - { - _validityCheckEnabled = value; - RestartValidityChecking(); - } - } - - /// - /// Provide more info (on every change) whenever enabled. - /// Disabled by default - /// - public bool DebugEnabled { get; set; } = false; - - /// - /// Logs more info (state, performance) whenever enabled. - /// Disabled by default - /// - public bool DebugLogEnabled { get; set; } = false; - - /// - /// Whenever snapshot was already handled - /// - public bool IsSnapshotLoaded => _isSnapshotLoaded; - - /// - /// All diffs/deltas that come before snapshot will be ignored (default: true) - /// - public bool IgnoreDiffsBeforeSnapshot { get; set; } = true; - - /// - /// Compute index (position) per every updated level, performance is slightly reduced (default: false) - /// - public bool IsIndexComputationEnabled { get; set; } - - /// - /// Streams data when top level bid or ask price was updated - /// - public IObservable BidAskUpdatedStream => _bidAskUpdated.AsObservable(); - - /// - /// Streams data when top level bid or ask price or amount was updated - /// - public IObservable TopLevelUpdatedStream => _topLevelUpdated.AsObservable(); - - /// - /// Streams data on every order book change (price or amount at any level) - /// - public IObservable OrderBookUpdatedStream => _orderBookUpdated.AsObservable(); - - /// - /// Current bid side of the order book (ordered from higher to lower price) - /// - public OrderBookLevel[] BidLevels => ComputeBidLevels(); - /// /// Current bid side of the order book grouped by price (ordered from higher to lower price) /// - public IReadOnlyDictionary BidLevelsPerPrice => ComputeLevelsPerPrice(_bidLevels); - - /// - /// Current ask side of the order book (ordered from lower to higher price) - /// - public OrderBookLevel[] AskLevels => ComputeAskLevels(); + public IReadOnlyDictionary BidLevelsPerPrice => ComputeLevelsPerPrice(BidLevelsInternal); /// /// Current ask side of the order book grouped by price (ordered from lower to higher price) /// - public IReadOnlyDictionary AskLevelsPerPrice => ComputeLevelsPerPrice(_askLevels); + public IReadOnlyDictionary AskLevelsPerPrice => ComputeLevelsPerPrice(AskLevelsInternal); - /// - /// All current levels together - /// - public OrderBookLevel[] Levels => ComputeAllLevels(); - - /// - /// Current top level bid price - /// - public double BidPrice { get; private set; } - - /// - /// Current top level ask price - /// - public double AskPrice { get; private set; } - - /// - /// Current mid price - /// - public double MidPrice => (AskPrice + BidPrice) / 2; - - /// - /// Current top level bid amount - /// - public double BidAmount { get; private set; } - - /// - /// Current top level ask price - /// - public double AskAmount { get; private set; } - - /// - /// Returns true if order book is in valid state - /// - public bool IsValid() - { - var isPriceValid = BidPrice <= AskPrice; - return isPriceValid && _source.IsValid(); - } - - - /// - /// Find bid level for provided price (returns null in case of missing) - /// - public OrderBookLevel? FindBidLevelByPrice(double price) + /// + public override OrderBookLevel FindBidLevelByPrice(double price) { - lock (_locker) + lock (Locker) { - if (_bidLevels.TryGetValue(price, out OrderedDictionary? group)) - return group.Values.OfType().FirstOrDefault(); - return null; + return BidLevelsInternal.TryGetValue(price, out var group) + ? group.Values.OfType().FirstOrDefault() + : null; } } - /// - /// Find all bid levels for provided price (returns empty when not found) - /// - public OrderBookLevel[] FindBidLevelsByPrice(double price) - { - lock (_locker) - { - if (_bidLevels.TryGetValue(price, out OrderedDictionary? group)) - return group.Values.OfType().ToArray(); - return Array.Empty(); - } - } - - - /// - /// Find ask level for provided price (returns null in case of missing) - /// - public OrderBookLevel? FindAskLevelByPrice(double price) + /// + public override OrderBookLevel[] FindBidLevelsByPrice(double price) { - lock (_locker) + lock (Locker) { - if (_askLevels.TryGetValue(price, out OrderedDictionary? group)) - return group.Values.OfType().FirstOrDefault(); - return null; + return BidLevelsInternal.TryGetValue(price, out var group) + ? group.Values.OfType().ToArray() + : Array.Empty(); } } - /// - /// Find all ask levels for provided price (returns empty when not found) - /// - public OrderBookLevel[] FindAskLevelsByPrice(double price) + /// + public override OrderBookLevel FindAskLevelByPrice(double price) { - lock (_locker) + lock (Locker) { - if (_askLevels.TryGetValue(price, out OrderedDictionary? group)) - return group.Values.OfType().ToArray(); - return Array.Empty(); + return AskLevelsInternal.TryGetValue(price, out var group) + ? group.Values.OfType().FirstOrDefault() + : null; } } - /// - /// Find bid level by provided identification (returns null in case of not found) - /// - public OrderBookLevel? FindBidLevelById(string id) - { - return FindLevelById(id, CryptoOrderSide.Bid); - } - - /// - /// Find ask level by provided identification (returns null in case of not found) - /// - public OrderBookLevel? FindAskLevelById(string id) - { - return FindLevelById(id, CryptoOrderSide.Ask); - } - - /// - /// Find level by provided identification (returns null in case of not found). - /// You need to specify side. - /// - public OrderBookLevel? FindLevelById(string id, CryptoOrderSide side) - { - if (side == CryptoOrderSide.Undefined) - return null; - var collection = GetAllCollection(side); - return collection.GetValueOrDefault(id); - } - - private void Subscribe() + /// + public override OrderBookLevel[] FindAskLevelsByPrice(double price) { - _subscriptionSnapshot = _source - .OrderBookSnapshotStream - .Subscribe(HandleSnapshotSynchronized); - - _subscriptionDiff = _source - .OrderBookStream - .Subscribe(HandleDiffSynchronized); - } - - private void HandleSnapshotSynchronized(OrderBookLevelBulk bulk) - { - if (bulk == null || !IsCorrectType(bulk.OrderBookType)) - return; - - var levels = bulk.Levels; - var levelsForThis = levels - .Where(x => TargetPair.Equals(x.Pair)) - .ToList(); - if (!levelsForThis.Any()) - { - // snapshot for different pair, ignore - return; - } - - double oldBid; - double oldAsk; - double oldBidAmount; - double oldAskAmount; - OrderBookChangeInfo change; - - lock (_locker) + lock (Locker) { - oldBid = BidPrice; - oldAsk = AskPrice; - oldBidAmount = BidAmount; - oldAskAmount = AskAmount; - HandleSnapshot(levelsForThis); - - change = CreateBookChangeNotification( - levelsForThis, - new[] { bulk }, - true - ); + return AskLevelsInternal.TryGetValue(price, out var group) + ? group.Values.OfType().ToArray() + : Array.Empty(); } - - _orderBookUpdated.OnNext(change); - NotifyIfTopLevelChanged(oldBid, oldAsk, oldBidAmount, oldAskAmount, change); - NotifyIfBidAskChanged(oldBid, oldAsk, change); } - private void HandleDiffSynchronized(OrderBookLevelBulk[] bulks) + /// + protected override bool IsForThis(OrderBookLevelBulk bulk) { - var sw = DebugEnabled ? Stopwatch.StartNew() : null; - - var forThis = bulks - .Where(x => x != null) - .Where(x => IsCorrectType(x.OrderBookType)) - //.Where(x => x.Levels.Any(y => TargetPair.Equals(y.Pair))) - .ToArray(); - if (!forThis.Any()) - { - // data for different pair, ignore - return; - } - - double oldBid; - double oldAsk; - double oldBidAmount; - double oldAskAmount; - var allLevels = new List(); - OrderBookChangeInfo change; - - lock (_locker) + if (TargetType == CryptoOrderBookType.All) { - oldBid = BidPrice; - oldAsk = AskPrice; - oldBidAmount = BidAmount; - oldAskAmount = AskAmount; - - foreach (var bulk in forThis) - { - var levelsForThis = bulk.Levels - .Where(x => TargetPair.Equals(x.Pair)) - .ToArray(); - allLevels.AddRange(levelsForThis); - HandleDiff(bulk, levelsForThis); - } - - RecomputeAfterChangeAndSetIndexes(allLevels); - - change = CreateBookChangeNotification( - allLevels, - forThis, - false - ); + // handling everything, do not filter out + return true; } - if (allLevels.Any()) - _orderBookUpdated.OnNext(change); - NotifyIfBidAskChanged(oldBid, oldAsk, change); - NotifyIfTopLevelChanged(oldBid, oldAsk, oldBidAmount, oldAskAmount, change); - - if (sw != null) - { - var levels = forThis.SelectMany(x => x.Levels).Count(); - LogDebug($"Diff ({forThis.Length} bulks, {levels} levels) processing took {sw.ElapsedMilliseconds} ms, {sw.ElapsedTicks} ticks"); - } + // handle only same type as selected + return TargetType == bulk.OrderBookType; } - private void HandleSnapshot(List levels) + /// + protected override void ClearLevels() { - _bidLevels.Clear(); + BidLevelsInternal.Clear(); _bidLevelOrdering.Clear(); - _askLevels.Clear(); + AskLevelsInternal.Clear(); _askLevelOrdering.Clear(); - _allBidLevels.Clear(); - _allAskLevels.Clear(); - - LogDebug($"Handling snapshot: {levels.Count} levels"); - foreach (var level in levels) - { - var price = level.Price; - if (price == null || price < 0) - { - LogAlways($"Received snapshot level with weird price, ignoring. " + - $"[{level.Side}] Id: {level.Id}, price: {level.Price}, amount: {level.Amount}"); - continue; - } - - level.AmountDifference = level.Amount ?? 0; - level.CountDifference = level.Count ?? 0; - - if (level.Side == CryptoOrderSide.Bid) - { - InsertLevelIntoPriceGroup(level, _bidLevels, _bidLevelOrdering); - _allBidLevels[level.Id] = level; - } - - - if (level.Side == CryptoOrderSide.Ask) - { - InsertLevelIntoPriceGroup(level, _askLevels, _askLevelOrdering); - _allAskLevels[level.Id] = level; - } - } - - RecomputeAfterChangeAndSetIndexes(levels); - - _isSnapshotLoaded = true; + AllBidLevels.Clear(); + AllAskLevels.Clear(); } - private void HandleDiff(OrderBookLevelBulk bulk, OrderBookLevel[] correctLevels) - { - if (IgnoreDiffsBeforeSnapshot && !_isSnapshotLoaded) - { - LogDebug($"Snapshot not loaded yet, ignoring bulk: {bulk.Action} {correctLevels.Length} levels"); - // snapshot is not loaded yet, ignore data - return; - } + /// + protected override void HandleSnapshotBidLevel(OrderBookLevel level) => InsertLevelIntoPriceGroup(level, BidLevelsInternal, _bidLevelOrdering); - //LogDebug($"Handling diff, bulk {currentBulk}/{totalBulks}: {bulk.Action} {correctLevels.Length} levels"); - switch (bulk.Action) - { - case OrderBookAction.Insert: - UpdateLevels(correctLevels); - break; - case OrderBookAction.Update: - UpdateLevels(correctLevels); - break; - case OrderBookAction.Delete: - DeleteLevels(correctLevels); - break; - default: - return; - } - } + /// + protected override void HandleSnapshotAskLevel(OrderBookLevel level) => InsertLevelIntoPriceGroup(level, AskLevelsInternal, _askLevelOrdering); - private bool IsCorrectType(CryptoOrderBookType bulkType) - { - if (TargetType == CryptoOrderBookType.All) - { - // handling everything, do not filter out - return true; - } - - // handle only same type as selected - return TargetType == bulkType; - } - - private void UpdateLevels(OrderBookLevel[] levels) + /// + protected override void UpdateLevels(IEnumerable levels) { foreach (var level in levels) { @@ -582,29 +162,7 @@ private void UpdateLevels(OrderBookLevel[] levels) var previousPrice = existing.Price; var previousAmount = existing.Amount; - var amountDiff = (level.Amount ?? existing.Amount ?? 0) - (existing.Amount ?? 0); - var countDiff = (level.Count ?? existing.Count ?? 0) - (existing.Count ?? 0); - - existing.Price = level.Price ?? existing.Price; - existing.Amount = level.Amount ?? existing.Amount; - existing.Count = level.Count ?? existing.Count; - existing.Pair = level.Pair ?? existing.Pair; - - level.AmountDifference = amountDiff; - existing.AmountDifference = amountDiff; - - level.CountDifference = countDiff; - existing.CountDifference = countDiff; - - level.AmountDifferenceAggregated += amountDiff; - existing.AmountDifferenceAggregated += amountDiff; - - level.CountDifferenceAggregated += countDiff; - existing.CountDifferenceAggregated += countDiff; - - level.Amount = level.Amount ?? existing.Amount; - level.Count = level.Count ?? existing.Count; - level.Price = level.Price ?? existing.Price; + ComputeUpdate(existing, level); InsertToCollection(collection, ordering, existing, previousPrice, previousAmount); } @@ -677,14 +235,8 @@ private void InsertLevelIntoPriceGroup(OrderBookLevel level, IDictionary + protected override void DeleteLevels(IReadOnlyCollection levels) { FillCurrentIndex(levels); @@ -703,24 +255,7 @@ private void DeleteLevels(OrderBookLevel[] levels) { var existing = allLevels[level.Id]; - var amountDiff = -(existing.Amount ?? 0); - var countDiff = -(existing.Count ?? 0); - - level.Amount = level.Amount ?? existing.Amount; - level.Count = level.Count ?? existing.Count; - level.Price = level.Price ?? existing.Price; - - level.AmountDifference = amountDiff; - existing.AmountDifference = amountDiff; - - level.CountDifference = countDiff; - existing.CountDifference = countDiff; - - level.AmountDifferenceAggregated += amountDiff; - existing.AmountDifferenceAggregated += amountDiff; - - level.CountDifferenceAggregated += countDiff; - existing.CountDifferenceAggregated += countDiff; + ComputeDelete(existing, level); price = existing.Price ?? -1; allLevels.Remove(level.Id); @@ -741,313 +276,54 @@ private void DeleteLevels(OrderBookLevel[] levels) } } - private void RecomputeAfterChange() - { - var bids = _bidLevels.Values.FirstOrDefault(); - var firstBid = bids?.Count > 0 ? (OrderBookLevel)bids[0] : null; - - var asks = _askLevels.Values.FirstOrDefault(); - var firstAsk = asks?.Count > 0 ? (OrderBookLevel)asks[0] : null; - - BidPrice = firstBid?.Price ?? 0; - BidAmount = firstBid?.Amount ?? 0; - - AskPrice = firstAsk?.Price ?? 0; - AskAmount = firstAsk?.Amount ?? 0; - } - - private void RecomputeAfterChangeAndSetIndexes(List levels) - { - RecomputeAfterChange(); - FillCurrentIndex(levels); - } - - private void FillCurrentIndex(List levels) - { - if (!IsIndexComputationEnabled) - return; - - foreach (var level in levels) - { - FillIndex(level); - } - } - - private void FillCurrentIndex(OrderBookLevel[] levels) - { - if (!IsIndexComputationEnabled) - return; - - foreach (var level in levels) - { - FillIndex(level); - } - } - - private void FillIndex(OrderBookLevel level) + /// + protected override OrderBookLevel GetFirstBid() { - if (level.Index.HasValue) - return; - - var price = level.Price; - if (price == null) - { - var all = GetAllCollection(level.Side); - var existing = all.ContainsKey(level.Id) ? all[level.Id] : null; - price = existing?.Price; - } - - if (price == null) - return; - - var collection = GetLevelsCollection(level.Side); - if (collection == null) - return; - - var index = collection.IndexOfKey(price.Value); - level.Index = index; + var bids = BidLevelsInternal.Values.FirstOrDefault(); + return bids?.Count > 0 ? (OrderBookLevel)bids[0] : null; } - private OrderBookLevel[] ComputeBidLevels() + /// + protected override OrderBookLevel GetFirstAsk() { - lock (_locker) - { - return _bidLevels - .Values - .SelectMany(x => x.Values.Cast()) - .ToArray(); - } + var asks = AskLevelsInternal.Values.FirstOrDefault(); + return asks?.Count > 0 ? (OrderBookLevel)asks[0] : null; } - private OrderBookLevel[] ComputeAskLevels() + /// + protected override OrderBookLevel[] ComputeBidLevels() { - lock (_locker) + lock (Locker) { - return _askLevels - .Values - .SelectMany(x => x.Values.Cast()) - .ToArray(); + return BidLevelsInternal.Values.SelectMany(x => x.Values.Cast()).ToArray(); } } - private OrderBookLevel[] ComputeAllLevels() + /// + protected override OrderBookLevel[] ComputeAskLevels() { - lock (_locker) + lock (Locker) { - return _allBidLevels.Concat(_allAskLevels) - .Select(x => x.Value) - .ToArray(); + return AskLevelsInternal.Values.SelectMany(x => x.Values.Cast()).ToArray(); } } private IReadOnlyDictionary ComputeLevelsPerPrice(SortedList levels) { - lock (_locker) + lock (Locker) { - return levels - .ToDictionary(x => x.Key, - y => y.Value.Values.Cast().ToArray()); + return levels.ToDictionary(x => x.Key, y => y.Value.Values.Cast().ToArray()); } } - private SortedList GetLevelsCollection(CryptoOrderSide side) - { - if (side == CryptoOrderSide.Undefined) - return null; - return side == CryptoOrderSide.Bid ? - _bidLevels : - _askLevels; - } - private OrderBookLevelsOrderPerPrice GetLevelsOrdering(CryptoOrderSide side) { if (side == CryptoOrderSide.Undefined) return null; - return side == CryptoOrderSide.Bid ? - _bidLevelOrdering : - _askLevelOrdering; - } - - private OrderBookLevelsById GetAllCollection(CryptoOrderSide side) - { - if (side == CryptoOrderSide.Undefined) - return null; - return side == CryptoOrderSide.Bid ? - _allBidLevels : - _allAskLevels; - } - - private OrderBookChangeInfo CreateBookChangeNotification(List levels, OrderBookLevelBulk[] sources, bool isSnapshot) - { - var quotes = new CryptoQuotes(BidPrice, AskPrice, BidAmount, AskAmount); - var clonedLevels = DebugEnabled ? levels.Select(x => x.Clone()).ToArray() : new OrderBookLevel[0]; - var lastSource = sources.LastOrDefault(); - var change = new OrderBookChangeInfo( - TargetPair, - TargetPairOriginal, - quotes, - clonedLevels, - sources, - isSnapshot - ) - { - ExchangeName = lastSource?.ExchangeName, - ServerSequence = lastSource?.ServerSequence, - ServerTimestamp = lastSource?.ServerTimestamp - }; - return change; - } - - private void NotifyIfBidAskChanged(double oldBid, double oldAsk, OrderBookChangeInfo info) - { - if (PriceChanged(oldBid, oldAsk, info)) - { - _bidAskUpdated.OnNext(info); - } - } - - private void NotifyIfTopLevelChanged(double oldBid, double oldAsk, - double oldBidAmount, double oldAskAmount, - OrderBookChangeInfo info) - { - if (PriceChanged(oldBid, oldAsk, info) || AmountChanged(oldBidAmount, oldAskAmount, info)) - { - _topLevelUpdated.OnNext(info); - } - } - - private static bool PriceChanged(double oldBid, double oldAsk, OrderBookChangeInfo info) - { - return !CryptoMathUtils.IsSame(oldBid, info.Quotes.Bid) || - !CryptoMathUtils.IsSame(oldAsk, info.Quotes.Ask); - } - - private static bool AmountChanged(double oldBidAmount, double oldAskAmount, OrderBookChangeInfo info) - { - return !CryptoMathUtils.IsSame(oldBidAmount, info.Quotes.BidAmount) || - !CryptoMathUtils.IsSame(oldAskAmount, info.Quotes.AskAmount); - } - - private async Task ReloadSnapshotWithCheck() - { - if (!_source.LoadSnapshotEnabled) - { - // snapshot loading disabled on the source, do nothing - return; - } - - await ReloadSnapshot(); - } - - private async Task ReloadSnapshot() - { - try - { - DeactivateAutoSnapshotReloading(); - DeactivateValidityChecking(); - await _source.LoadSnapshot(TargetPairOriginal, 10000); - } - catch (Exception e) - { - LogAlways(e, $"Failed to reload snapshot for pair '{TargetPair}', " + - $"error: {e.Message}"); - } - finally - { - RestartAutoSnapshotReloading(); - RestartValidityChecking(); - } - } - - private async Task CheckValidityAndReload() - { - var isValid = IsValid(); - if (isValid) - { - // ob is valid, just reset counter and do nothing - _validityCheckCounter = 0; - return; - } - - _validityCheckCounter++; - if (_validityCheckCounter < ValidityCheckLimit) - { - // invalid state, but still in the check limit interval - // waiting for confirmation - return; - } - _validityCheckCounter = 0; - - LogAlways($"Order book is in invalid state, bid: {BidPrice}, ask: {AskPrice}, " + - "reloading snapshot..."); - await ReloadSnapshot(); - } - - private void RestartAutoSnapshotReloading() - { - DeactivateAutoSnapshotReloading(); - - if (!_snapshotReloadEnabled) - { - // snapshot reloading disabled, do not start timer - return; - } - - var timerMs = (int)SnapshotReloadTimeout.TotalMilliseconds; - _snapshotReloadTimer = new Timer(async _ => await ReloadSnapshotWithCheck(), - null, timerMs, timerMs); - } - - private void DeactivateAutoSnapshotReloading() - { - _snapshotReloadTimer?.Dispose(); - _snapshotReloadTimer = null; - } - - private void RestartValidityChecking() - { - DeactivateValidityChecking(); - if (!_validityCheckEnabled) - { - // validity checking disabled, do not start timer - return; - } - - var timerMs = (int)ValidityCheckTimeout.TotalMilliseconds; - _validityCheckTimer = new Timer(async _ => await CheckValidityAndReload(), - null, timerMs, timerMs); - } - - private void DeactivateValidityChecking() - { - _validityCheckTimer?.Dispose(); - _validityCheckTimer = null; - } - - private void LogDebug(string msg) - { - if (!DebugLogEnabled) - return; - LogAlways(msg); - } - - private void LogAlways(string msg) - { - _source.Logger.LogDebug("[ORDER BOOK {exchangeName} {targetPair}] {message}", ExchangeName, TargetPair, msg); - } - - private void LogAlways(Exception e, string msg) - { - _source.Logger.LogDebug(e, "[ORDER BOOK {exchangeName} {targetPair}] {message}", ExchangeName, TargetPair, msg); - } - - private class DescendingComparer : IComparer - { - public int Compare(double x, double y) - { - return y.CompareTo(x); - } + return side == CryptoOrderSide.Bid + ? _bidLevelOrdering + : _askLevelOrdering; } } } diff --git a/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookBase.cs b/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookBase.cs new file mode 100644 index 0000000..3bd1d59 --- /dev/null +++ b/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookBase.cs @@ -0,0 +1,795 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; +using Crypto.Websocket.Extensions.Core.Models; +using Crypto.Websocket.Extensions.Core.OrderBooks.Models; +using Crypto.Websocket.Extensions.Core.OrderBooks.Sources; +using Crypto.Websocket.Extensions.Core.Utils; +using Crypto.Websocket.Extensions.Core.Validations; +using Microsoft.Extensions.Logging; + +namespace Crypto.Websocket.Extensions.Core.OrderBooks +{ + /// + /// Base class for order books. + /// + public abstract class CryptoOrderBookBase : ICryptoOrderBook + { + /// + /// Object to use for synchronization. + /// + protected readonly object Locker = new(); + + /// + /// The source. + /// + protected readonly IOrderBookSource Source; + + /// + /// The internal collection of bid levels. + /// + protected readonly SortedList BidLevelsInternal = new(new DescendingComparer()); + + /// + /// The internal collection of ask levels. + /// + protected readonly SortedList AskLevelsInternal = new(); + + /// + /// Subject for streaming events when the top bid or ask price changes. + /// + protected readonly Subject BidAskUpdated = new(); + + /// + /// Subject for streaming events when the top bid or ask prices or amounts change. + /// + protected readonly Subject TopLevelUpdated = new(); + + /// + /// Subject for streaming events when any change is detected. + /// + protected readonly Subject OrderBookUpdated = new(); + + /// + /// All the bid levels (not grouped by price). + /// + protected readonly OrderBookLevelsById AllBidLevels = new(500); + + /// + /// All the ask levels (not grouped by price). + /// + protected readonly OrderBookLevelsById AllAskLevels = new(500); + + bool _isSnapshotLoaded; + Timer _snapshotReloadTimer; + TimeSpan _snapshotReloadTimeout = TimeSpan.FromMinutes(1); + bool _snapshotReloadEnabled; + + Timer _validityCheckTimer; + TimeSpan _validityCheckTimeout = TimeSpan.FromSeconds(5); + bool _validityCheckEnabled = true; + int _validityCheckCounter; + + IDisposable _subscriptionDiff; + IDisposable _subscriptionSnapshot; + + CryptoQuotes _previous; + CryptoQuotes _current; + + /// + /// Cryptocurrency order book. + /// Process order book data from one source per one target pair. + /// + /// Select target pair + /// Provide source data + protected CryptoOrderBookBase(string targetPair, IOrderBookSource source) + { + CryptoValidations.ValidateInput(targetPair, nameof(targetPair)); + CryptoValidations.ValidateInput(source, nameof(source)); + + _previous = new CryptoQuotes(BidPrice, AskPrice, BidAmount, AskAmount); + _current = new CryptoQuotes(BidPrice, AskPrice, BidAmount, AskAmount); + + TargetPairOriginal = targetPair; + TargetPair = CryptoPairsHelper.Clean(targetPair); + Source = source; + } + + /// + /// Subscribes to source streams and starts background threads for + /// auto snapshot reloading and validity checking. + /// + protected void Initialize() + { + Subscribe(); + RestartAutoSnapshotReloading(); + RestartValidityChecking(); + } + + /// + /// Dispose background processing. + /// + public void Dispose() + { + DeactivateAutoSnapshotReloading(); + DeactivateValidityChecking(); + _subscriptionDiff?.Dispose(); + _subscriptionSnapshot?.Dispose(); + _snapshotReloadTimer?.Dispose(); + _validityCheckTimer?.Dispose(); + } + + /// + public string ExchangeName => Source.ExchangeName; + + /// + public string TargetPair { get; } + + /// + public string TargetPairOriginal { get; } + + /// + public CryptoOrderBookType TargetType { get; protected set; } + + /// + public TimeSpan SnapshotReloadTimeout + { + get => _snapshotReloadTimeout; + set + { + _snapshotReloadTimeout = value; + RestartAutoSnapshotReloading(); + } + } + + /// + public bool SnapshotReloadEnabled + { + get => _snapshotReloadEnabled; + set + { + _snapshotReloadEnabled = value; + RestartAutoSnapshotReloading(); + } + } + + /// + public TimeSpan ValidityCheckTimeout + { + get => _validityCheckTimeout; + set + { + _validityCheckTimeout = value; + RestartValidityChecking(); + } + } + + /// + public int ValidityCheckLimit { get; set; } = 6; + + /// + public bool ValidityCheckEnabled + { + get => _validityCheckEnabled; + set + { + _validityCheckEnabled = value; + RestartValidityChecking(); + } + } + + /// + public bool DebugEnabled { get; set; } = false; + + /// + public bool DebugLogEnabled { get; set; } = false; + + /// + public bool IsSnapshotLoaded => _isSnapshotLoaded; + + /// + public bool IgnoreDiffsBeforeSnapshot { get; set; } = true; + + /// + public bool IsIndexComputationEnabled { get; set; } + + /// + public IObservable BidAskUpdatedStream => BidAskUpdated.AsObservable(); + + /// + public IObservable TopLevelUpdatedStream => TopLevelUpdated.AsObservable(); + + /// + public IObservable OrderBookUpdatedStream => OrderBookUpdated.AsObservable(); + + /// + public OrderBookLevel[] BidLevels => ComputeBidLevels(); + + /// + public OrderBookLevel[] AskLevels => ComputeAskLevels(); + + /// + public OrderBookLevel[] Levels => ComputeAllLevels(); + + /// + public double BidPrice { get; private set; } + + /// + public double AskPrice { get; private set; } + + /// + public double MidPrice => (AskPrice + BidPrice) / 2; + + /// + public double BidAmount { get; private set; } + + /// + public double AskAmount { get; private set; } + + /// + public bool IsValid() + { + var isPriceValid = BidPrice <= AskPrice; + return isPriceValid && Source.IsValid(); + } + + /// + public abstract OrderBookLevel FindBidLevelByPrice(double price); + + /// + public abstract OrderBookLevel[] FindBidLevelsByPrice(double price); + + /// + public abstract OrderBookLevel FindAskLevelByPrice(double price); + + /// + public abstract OrderBookLevel[] FindAskLevelsByPrice(double price); + + /// + public OrderBookLevel FindBidLevelById(string id) => FindLevelById(id, CryptoOrderSide.Bid); + + /// + public OrderBookLevel FindAskLevelById(string id) => FindLevelById(id, CryptoOrderSide.Ask); + + /// + public OrderBookLevel FindLevelById(string id, CryptoOrderSide side) + { + if (side == CryptoOrderSide.Undefined) + return null; + + var collection = GetAllCollection(side); + return collection.ContainsKey(id) + ? collection[id] + : null; + } + + void Subscribe() + { + _subscriptionSnapshot = Source.OrderBookSnapshotStream.Subscribe(HandleSnapshotSynchronized); + _subscriptionDiff = Source.OrderBookStream.Subscribe(HandleDiffsSynchronized); + } + + void HandleSnapshotSynchronized(OrderBookLevelBulk bulk) + { + if (bulk == null || !IsForThis(bulk)) + return; + + var levelsForThis = bulk.Levels.Where(x => TargetPair.Equals(x.Pair)).ToList(); + if (!levelsForThis.Any()) + { + // snapshot for different pair, ignore + return; + } + + lock (Locker) + { + HandleSnapshot(levelsForThis); + var change = CreateBookChangeNotification(levelsForThis, new[] { bulk }, true); + NotifyOrderBookChanges(change); + } + } + + /// + /// Is the bulk for this orderbook? + /// + /// The bulk. + /// True if the bulk is for this orderbook. + protected abstract bool IsForThis(OrderBookLevelBulk bulk); + + void HandleDiffsSynchronized(OrderBookLevelBulk[] bulks) + { + var sw = DebugEnabled ? Stopwatch.StartNew() : null; + + var forThis = bulks.Where(x => x != null).Where(IsForThis).ToList(); + if (!forThis.Any()) + { + // diffs for different pair, ignore + return; + } + + var allLevels = new List(); + + lock (Locker) + { + foreach (var bulk in forThis) + { + var levelsForThis = bulk.Levels.Where(x => TargetPair.Equals(x.Pair)).ToList(); + allLevels.AddRange(levelsForThis); + HandleDiff(bulk, levelsForThis); + } + + if (allLevels.Any()) + { + RecomputeAfterChangeAndSetIndexes(allLevels); + var change = CreateBookChangeNotification(allLevels, forThis, false); + NotifyOrderBookChanges(change); + } + + if (sw != null) + { + var levels = forThis.SelectMany(x => x.Levels).Count(); + LogDebug($"Diff ({forThis.Count} bulks, {levels} levels) processing took {sw.ElapsedMilliseconds} ms, {sw.ElapsedTicks} ticks"); + } + } + } + + void NotifyOrderBookChanges(OrderBookChangeInfo info) + { + OrderBookUpdated.OnNext(info); + + (_previous, _current) = (_current, _previous); + + var bidAskChanged = NotifyIfBidAskChanged(info); + NotifyIfTopLevelChanged(bidAskChanged, info); + UpdateSnapshot(_current); + } + + void HandleSnapshot(List levels) + { + ClearLevels(); + + LogDebug($"Handling snapshot: {levels.Count} levels"); + foreach (var level in levels) + { + var price = level.Price; + if (price is null or < 0) + { + LogAlways($"Received snapshot level with weird price, ignoring. [{level.Side}] Id: {level.Id}, price: {level.Price}, amount: {level.Amount}"); + continue; + } + + level.AmountDifference = level.Amount ?? 0; + level.CountDifference = level.Count ?? 0; + + switch (level.Side) + { + case CryptoOrderSide.Bid: + HandleSnapshotBidLevel(level); + AllBidLevels[level.Id] = level; + break; + case CryptoOrderSide.Ask: + HandleSnapshotAskLevel(level); + AllAskLevels[level.Id] = level; + break; + } + } + + RecomputeAfterChangeAndSetIndexes(levels); + + _isSnapshotLoaded = true; + } + + void UpdateSnapshot(CryptoQuotes snapshot) + { + snapshot.Bid = BidPrice; + snapshot.Ask = AskPrice; + snapshot.BidAmount = BidAmount; + snapshot.AskAmount = AskAmount; + } + + /// + /// Clears all internal levels state. Called at the beginning of handling a snapshot. + /// + protected abstract void ClearLevels(); + + /// + /// Handle a bid level for a snapshot. + /// + /// The bid level. + protected abstract void HandleSnapshotBidLevel(OrderBookLevel level); + + /// + /// Handle an ask level for a snapshot. + /// + /// The ask level. + protected abstract void HandleSnapshotAskLevel(OrderBookLevel level); + + void HandleDiff(OrderBookLevelBulk bulk, IReadOnlyCollection correctLevels) + { + if (IgnoreDiffsBeforeSnapshot && !_isSnapshotLoaded) + { + LogDebug($"Snapshot not loaded yet, ignoring bulk: {bulk.Action} {correctLevels.Count} levels"); + // snapshot is not loaded yet, ignore data + return; + } + + //LogDebug($"Handling diff, bulk {currentBulk}/{totalBulks}: {bulk.Action} {correctLevels.Length} levels"); + switch (bulk.Action) + { + case OrderBookAction.Insert: + UpdateLevels(correctLevels); + break; + case OrderBookAction.Update: + UpdateLevels(correctLevels); + break; + case OrderBookAction.Delete: + DeleteLevels(correctLevels); + break; + default: + return; + } + } + + /// + /// Updates internal state levels. + /// + /// The levels to update. + protected abstract void UpdateLevels(IEnumerable levels); + + /// + /// Computes differences and copies state between existing and new level. + /// + /// The existing level. + /// The new level. + protected static void ComputeUpdate(OrderBookLevel existing, OrderBookLevel level) + { + var amountDiff = (level.Amount ?? existing.Amount ?? 0) - (existing.Amount ?? 0); + var countDiff = (level.Count ?? existing.Count ?? 0) - (existing.Count ?? 0); + + existing.Price = level.Price ?? existing.Price; + existing.Amount = level.Amount ?? existing.Amount; + existing.Count = level.Count ?? existing.Count; + existing.Pair = level.Pair ?? existing.Pair; + + level.AmountDifference = amountDiff; + existing.AmountDifference = amountDiff; + + level.CountDifference = countDiff; + existing.CountDifference = countDiff; + + level.AmountDifferenceAggregated += amountDiff; + existing.AmountDifferenceAggregated += amountDiff; + + level.CountDifferenceAggregated += countDiff; + existing.CountDifferenceAggregated += countDiff; + + level.Amount ??= existing.Amount; + level.Count ??= existing.Count; + level.Price ??= existing.Price; + } + + /// + /// Decides whether or not a level is valid. + /// + /// The level to check. + /// True if the given level is valid. + protected static bool IsInvalidLevel(OrderBookLevel level) => + string.IsNullOrWhiteSpace(level.Id) || + level.Price == null || + level.Amount == null; + + /// + /// Deletes internal state levels. + /// + /// The levels to delete. + protected abstract void DeleteLevels(IReadOnlyCollection levels); + + /// + /// Computes differences and copies state between existing and new level. + /// + /// The existing level. + /// The new level. + protected static void ComputeDelete(OrderBookLevel existing, OrderBookLevel level) + { + var amountDiff = -(existing.Amount ?? 0); + var countDiff = -(existing.Count ?? 0); + + level.Amount ??= existing.Amount; + level.Count ??= existing.Count; + level.Price ??= existing.Price; + + level.AmountDifference = amountDiff; + existing.AmountDifference = amountDiff; + + level.CountDifference = countDiff; + existing.CountDifference = countDiff; + + level.AmountDifferenceAggregated += amountDiff; + existing.AmountDifferenceAggregated += amountDiff; + + level.CountDifferenceAggregated += countDiff; + existing.CountDifferenceAggregated += countDiff; + } + + void RecomputeAfterChange() + { + var firstBid = GetFirstBid(); + var firstAsk = GetFirstAsk(); + + BidPrice = firstBid?.Price ?? 0; + BidAmount = firstBid?.Amount ?? 0; + + AskPrice = firstAsk?.Price ?? 0; + AskAmount = firstAsk?.Amount ?? 0; + } + + /// + /// Gets the first bid. + /// + /// The first bid. + protected abstract OrderBookLevel GetFirstBid(); + + /// + /// Gets the first ask. + /// + /// The first ask. + protected abstract OrderBookLevel GetFirstAsk(); + + void RecomputeAfterChangeAndSetIndexes(IEnumerable levels) + { + RecomputeAfterChange(); + FillCurrentIndex(levels); + } + + /// + /// Fills the current index. + /// + /// The levels to use. + protected void FillCurrentIndex(IEnumerable levels) + { + if (!IsIndexComputationEnabled) + return; + + foreach (var level in levels) + FillIndex(level); + } + + void FillIndex(OrderBookLevel level) + { + if (level.Index.HasValue) + return; + + var price = level.Price; + if (price == null) + { + var all = GetAllCollection(level.Side); + var existing = all.ContainsKey(level.Id) ? all[level.Id] : null; + price = existing?.Price; + } + + if (price == null) + return; + + var collection = GetLevelsCollection(level.Side); + if (collection == null) + return; + + var index = collection.IndexOfKey(price.Value); + level.Index = index; + } + + /// + /// Gets the levels collection for the specified side. + /// + /// The side. + /// The levels for the specified side. + protected SortedList GetLevelsCollection(CryptoOrderSide side) + { + if (side == CryptoOrderSide.Undefined) + return null; + + return side == CryptoOrderSide.Bid + ? BidLevelsInternal + : AskLevelsInternal; + } + + /// + /// Calculates the bid levels. + /// + /// The computed bids. + protected abstract OrderBookLevel[] ComputeBidLevels(); + + /// + /// Calculates the ask levels. + /// + /// The computed asks. + protected abstract OrderBookLevel[] ComputeAskLevels(); + + OrderBookLevel[] ComputeAllLevels() + { + lock (Locker) + { + return AllBidLevels.Concat(AllAskLevels).Select(x => x.Value).ToArray(); + } + } + + /// + /// Get all the levels for the specified side. + /// + /// The side. + /// All the levels for the specified side. + protected OrderBookLevelsById GetAllCollection(CryptoOrderSide side) + { + if (side == CryptoOrderSide.Undefined) + return null; + + return side == CryptoOrderSide.Bid + ? AllBidLevels + : AllAskLevels; + } + + OrderBookChangeInfo CreateBookChangeNotification(IEnumerable levels, IReadOnlyList sources, bool isSnapshot) + { + var quotes = new CryptoQuotes(BidPrice, AskPrice, BidAmount, AskAmount); + var clonedLevels = DebugEnabled ? levels.Select(x => x.Clone()).ToArray() : Array.Empty(); + var lastSource = sources.LastOrDefault(); + var change = new OrderBookChangeInfo(TargetPair, TargetPairOriginal, quotes, clonedLevels, sources, isSnapshot) + { + ExchangeName = lastSource?.ExchangeName, + ServerSequence = lastSource?.ServerSequence, + ServerTimestamp = lastSource?.ServerTimestamp + }; + return change; + } + + bool NotifyIfBidAskChanged(OrderBookChangeInfo info) + { + if (!CryptoMathUtils.IsSame(_previous.Bid, info.Quotes.Bid) || + !CryptoMathUtils.IsSame(_previous.Ask, info.Quotes.Ask)) + { + BidAskUpdated.OnNext(info); + return true; + } + return false; + } + + bool NotifyIfTopLevelChanged(bool bidAskChanged, OrderBookChangeInfo info) + { + if (bidAskChanged || + !CryptoMathUtils.IsSame(_previous.BidAmount, info.Quotes.BidAmount) || + !CryptoMathUtils.IsSame(_previous.AskAmount, info.Quotes.AskAmount)) + { + TopLevelUpdated.OnNext(info); + return true; + } + return false; + } + + async Task ReloadSnapshotWithCheck() + { + if (!Source.LoadSnapshotEnabled) + { + // snapshot loading disabled on the source, do nothing + return; + } + + await ReloadSnapshot(); + } + + async Task ReloadSnapshot() + { + try + { + DeactivateAutoSnapshotReloading(); + DeactivateValidityChecking(); + await Source.LoadSnapshot(TargetPairOriginal, 10000); + } + catch (Exception e) + { + LogAlways(e, $"Failed to reload snapshot for pair '{TargetPair}', error: {e.Message}"); + } + finally + { + RestartAutoSnapshotReloading(); + RestartValidityChecking(); + } + } + + async Task CheckValidityAndReload() + { + var isValid = IsValid(); + if (isValid) + { + // ob is valid, just reset counter and do nothing + _validityCheckCounter = 0; + return; + } + + _validityCheckCounter++; + if (_validityCheckCounter < ValidityCheckLimit) + { + // invalid state, but still in the check limit interval + // waiting for confirmation + return; + } + _validityCheckCounter = 0; + + LogAlways($"Order book is in invalid state, bid: {BidPrice}, ask: {AskPrice}, reloading snapshot..."); + await ReloadSnapshot(); + } + + void RestartAutoSnapshotReloading() + { + DeactivateAutoSnapshotReloading(); + + if (!_snapshotReloadEnabled) + { + // snapshot reloading disabled, do not start timer + return; + } + + _snapshotReloadTimer = new Timer(async _ => await ReloadSnapshotWithCheck(), null, SnapshotReloadTimeout, SnapshotReloadTimeout); + } + + void DeactivateAutoSnapshotReloading() + { + _snapshotReloadTimer?.Dispose(); + _snapshotReloadTimer = null; + } + + void RestartValidityChecking() + { + DeactivateValidityChecking(); + + if (!_validityCheckEnabled) + { + // validity checking disabled, do not start timer + return; + } + + _validityCheckTimer = new Timer(async _ => await CheckValidityAndReload(), null, ValidityCheckTimeout, ValidityCheckTimeout); + } + + void DeactivateValidityChecking() + { + _validityCheckTimer?.Dispose(); + _validityCheckTimer = null; + } + + /// + /// Log a message if DebugLogEnabled is true. + /// + /// The message. + protected void LogDebug(string msg) + { + if (DebugLogEnabled) + LogAlways(msg); + } + + /// + /// Always log a message. + /// + /// The message. + protected void LogAlways(string msg) => Source.Logger.LogDebug("[ORDER BOOK {exchangeName} {targetPair}] {message}", ExchangeName, TargetPair, msg); + + /// + /// Always log an exception. + /// + /// The exception. + /// The message. + protected void LogAlways(Exception e, string msg) => Source.Logger.LogDebug(e, "[ORDER BOOK {exchangeName} {targetPair}] {message}", ExchangeName, TargetPair, msg); + + private class DescendingComparer : IComparer + { + public int Compare(double x, double y) + { + return y.CompareTo(x); + } + } + } +} diff --git a/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookL2.cs b/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookL2.cs index 9c29785..187abb8 100644 --- a/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookL2.cs +++ b/src/Crypto.Websocket.Extensions.Core/OrderBooks/CryptoOrderBookL2.cs @@ -18,495 +18,83 @@ namespace Crypto.Websocket.Extensions.Core.OrderBooks /// /// Cryptocurrency order book optimized for L2 precision (grouped by price). /// Process order book data from one source and one target pair. - /// Only first levels are computed in advance, allocates less memory than CryptoOrderBook counterpart. + /// Only first levels are computed in advance, allocates less memory than CryptoOrderBook counterpart. /// - [DebuggerDisplay("CryptoOrderBook [{TargetPair}] bid: {BidPrice} ({_bidLevels.Count}) ask: {AskPrice} ({_askLevels.Count})")] - public class CryptoOrderBookL2 : ICryptoOrderBook + [DebuggerDisplay("CryptoOrderBook [{TargetPair}] bid: {BidPrice} ({BidLevelsInternal.Count}) ask: {AskPrice} ({AskLevelsInternal.Count})")] + public class CryptoOrderBookL2 : CryptoOrderBookBase { - private readonly object _locker = new object(); - - private readonly IOrderBookSource _source; - - private readonly Subject _bidAskUpdated = new Subject(); - private readonly Subject _topLevelUpdated = new Subject(); - private readonly Subject _orderBookUpdated = new Subject(); - - private readonly SortedList _bidLevels = new SortedList(new DescendingComparer()); - private readonly SortedList _askLevels = new SortedList(); - - private readonly OrderBookLevelsById _allBidLevels = new OrderBookLevelsById(500); - private readonly OrderBookLevelsById _allAskLevels = new OrderBookLevelsById(500); - - private bool _isSnapshotLoaded; - private Timer _snapshotReloadTimer; - private TimeSpan _snapshotReloadTimeout = TimeSpan.FromMinutes(1); - private bool _snapshotReloadEnabled; - - private Timer _validityCheckTimer; - private TimeSpan _validityCheckTimeout = TimeSpan.FromSeconds(5); - private bool _validityCheckEnabled = true; - private int _validityCheckCounter; - - private IDisposable _subscriptionDiff; - private IDisposable _subscriptionSnapshot; - /// /// Cryptocurrency order book. /// Process order book data from one source per one target pair. /// /// Select target pair /// Provide level 2 source data - public CryptoOrderBookL2(string targetPair, IOrderBookSource source) - { - CryptoValidations.ValidateInput(targetPair, nameof(targetPair)); - CryptoValidations.ValidateInput(source, nameof(source)); - - TargetPairOriginal = targetPair; - TargetPair = CryptoPairsHelper.Clean(targetPair); - _source = source; - - Subscribe(); - RestartAutoSnapshotReloading(); - RestartValidityChecking(); - } - - /// - /// Dispose background processing - /// - public void Dispose() - { - DeactivateAutoSnapshotReloading(); - DeactivateValidityChecking(); - _source.Dispose(); - _subscriptionDiff?.Dispose(); - _subscriptionSnapshot?.Dispose(); - } - - /// - /// Origin exchange name - /// - public string ExchangeName => _source.ExchangeName; - - /// - /// Target pair for this order book data - /// - public string TargetPair { get; } - - /// - /// Originally provided target pair for this order book data - /// - public string TargetPairOriginal { get; } - - /// - /// Order book type, which precision it supports - /// - public CryptoOrderBookType TargetType => CryptoOrderBookType.L2; - - /// - /// Time interval for auto snapshot reloading. - /// Default 1 min. - /// - public TimeSpan SnapshotReloadTimeout - { - get => _snapshotReloadTimeout; - set - { - _snapshotReloadTimeout = value; - RestartAutoSnapshotReloading(); - } - } - - /// - /// Whenever auto snapshot reloading feature is enabled. - /// Disabled by default - /// - public bool SnapshotReloadEnabled - { - get => _snapshotReloadEnabled; - set - { - _snapshotReloadEnabled = value; - RestartAutoSnapshotReloading(); - } - } - - /// - /// Time interval for validity checking. - /// It forces snapshot reloading whenever invalid state. - /// Default 5 sec. - /// - public TimeSpan ValidityCheckTimeout - { - get => _validityCheckTimeout; - set - { - _validityCheckTimeout = value; - RestartValidityChecking(); - } - } - - /// - /// How many times it should check validity before processing snapshot reload. - /// Default 6 times (which is 6 * 5sec = 30sec). - /// - public int ValidityCheckLimit { get; set; } = 6; - - /// - /// Whenever validity checking feature is enabled. - /// It forces snapshot reloading whenever invalid state. - /// Enabled by default - /// - public bool ValidityCheckEnabled - { - get => _validityCheckEnabled; - set - { - _validityCheckEnabled = value; - RestartValidityChecking(); - } - } - - /// - /// Provide more info (on every change) whenever enabled. - /// Disabled by default - /// - public bool DebugEnabled { get; set; } = false; - - /// - /// Logs more info (state, performance) whenever enabled. - /// Disabled by default - /// - public bool DebugLogEnabled { get; set; } - - - /// - /// Whenever snapshot was already handled - /// - public bool IsSnapshotLoaded => _isSnapshotLoaded; - - /// - /// All diffs/deltas that come before snapshot will be ignored (default: true) - /// - public bool IgnoreDiffsBeforeSnapshot { get; set; } = true; - - /// - /// Compute index (position) per every updated level, performance is slightly reduced (default: false) - /// - public bool IsIndexComputationEnabled { get; set; } - - /// - /// Streams data when top level bid or ask price was updated - /// - public IObservable BidAskUpdatedStream => _bidAskUpdated.AsObservable(); - - /// - /// Streams data when top level bid or ask price or amount was updated - /// - public IObservable TopLevelUpdatedStream => _topLevelUpdated.AsObservable(); - - /// - /// Streams data on every order book change (price or amount at any level) - /// - public IObservable OrderBookUpdatedStream => _orderBookUpdated.AsObservable(); - - /// - /// Current bid side of the order book (ordered from higher to lower price) - /// - public OrderBookLevel[] BidLevels => ComputeBidLevels(); - - /// - /// Current ask side of the order book (ordered from lower to higher price) - /// - public OrderBookLevel[] AskLevels => ComputeAskLevels(); - - /// - /// All current levels together - /// - public OrderBookLevel[] Levels => ComputeAllLevels(); - - /// - /// Current top level bid price - /// - public double BidPrice { get; private set; } - - /// - /// Current top level ask price - /// - public double AskPrice { get; private set; } - - /// - /// Current mid price - /// - public double MidPrice => (AskPrice + BidPrice) / 2; - - /// - /// Current top level bid amount - /// - public double BidAmount { get; private set; } - - /// - /// Current top level ask price - /// - public double AskAmount { get; private set; } - - /// - /// Returns true if order book is in valid state - /// - public bool IsValid() + public CryptoOrderBookL2(string targetPair, IOrderBookSource source) : base(targetPair, source) { - var isPriceValid = BidPrice <= AskPrice; - return isPriceValid && _source.IsValid(); + TargetType = CryptoOrderBookType.L2; + Initialize(); } - /// - /// Find bid level by provided price (returns null in case of not found) - /// - public OrderBookLevel FindBidLevelByPrice(double price) + /// + public override OrderBookLevel FindBidLevelByPrice(double price) { - lock (_locker) + lock (Locker) { - return _bidLevels.TryGetValue(price, out OrderBookLevel level) ? level : null; + return BidLevelsInternal.TryGetValue(price, out var level) + ? level + : null; } } - /// - /// Find all bid levels for provided price (returns empty when not found) - /// - public OrderBookLevel[] FindBidLevelsByPrice(double price) + /// + public override OrderBookLevel[] FindBidLevelsByPrice(double price) { var level = FindBidLevelByPrice(price); - if (level == null) - return Array.Empty(); - return new[] { level }; + return level == null + ? Array.Empty() + : new[] { level }; } - /// - /// Find ask level by provided price (returns null in case of not found) - /// - public OrderBookLevel FindAskLevelByPrice(double price) + /// + public override OrderBookLevel FindAskLevelByPrice(double price) { - lock (_locker) + lock (Locker) { - return _askLevels.TryGetValue(price, out OrderBookLevel level) ? level : null; + return AskLevelsInternal.TryGetValue(price, out var level) + ? level + : null; } } - /// - /// Find all ask levels for provided price (returns empty when not found) - /// - public OrderBookLevel[] FindAskLevelsByPrice(double price) + /// + public override OrderBookLevel[] FindAskLevelsByPrice(double price) { var level = FindAskLevelByPrice(price); - if (level == null) - return Array.Empty(); - return new[] { level }; + return level == null + ? Array.Empty() + : new[] { level }; } - /// - /// Find bid level by provided identification (returns null in case of not found) - /// - public OrderBookLevel FindBidLevelById(string id) - { - return FindLevelById(id, CryptoOrderSide.Bid); - } + /// + protected override bool IsForThis(OrderBookLevelBulk bulk) => bulk.OrderBookType is CryptoOrderBookType.L1 or CryptoOrderBookType.L2; - /// - /// Find ask level by provided identification (returns null in case of not found) - /// - public OrderBookLevel FindAskLevelById(string id) + /// + protected override void ClearLevels() { - return FindLevelById(id, CryptoOrderSide.Ask); + BidLevelsInternal.Clear(); + AskLevelsInternal.Clear(); + AllBidLevels.Clear(); + AllAskLevels.Clear(); } - /// - /// Find level by provided identification (returns null in case of not found). - /// You need to specify side. - /// - public OrderBookLevel FindLevelById(string id, CryptoOrderSide side) - { - if (side == CryptoOrderSide.Undefined) - return null; - var collection = GetAllCollection(side); - if (collection.ContainsKey(id)) - return collection[id]; - return null; - } - - private void Subscribe() - { - _subscriptionSnapshot = _source - .OrderBookSnapshotStream - .Subscribe(HandleSnapshotSynchronized); - - _subscriptionDiff = _source - .OrderBookStream - .Subscribe(HandleDiffSynchronized); - } - - private void HandleSnapshotSynchronized(OrderBookLevelBulk bulk) - { - if (bulk == null) - return; - - var levels = bulk.Levels; - var levelsForThis = levels - .Where(x => TargetPair.Equals(x.Pair)) - .ToList(); - if (!levelsForThis.Any()) - { - // snapshot for different pair, ignore - return; - } - - double oldBid; - double oldAsk; - double oldBidAmount; - double oldAskAmount; - OrderBookChangeInfo change; - - lock (_locker) - { - oldBid = BidPrice; - oldAsk = AskPrice; - oldBidAmount = BidAmount; - oldAskAmount = AskAmount; - HandleSnapshot(levelsForThis); - - change = NotifyAboutBookChange( - levelsForThis, - new[] { bulk }, - true - ); - } - - _orderBookUpdated.OnNext(change); - NotifyIfBidAskChanged(oldBid, oldAsk, change); - NotifyIfTopLevelChanged(oldBid, oldAsk, oldBidAmount, oldAskAmount, change); - } - - private void HandleDiffSynchronized(OrderBookLevelBulk[] bulks) - { - var sw = DebugEnabled ? Stopwatch.StartNew() : null; - - var forThis = bulks - .Where(x => x != null) - .Where(x => x.Levels.Any(y => TargetPair.Equals(y.Pair))) - .ToArray(); - if (!forThis.Any()) - { - // snapshot for different pair, ignore - return; - } - - double oldBid; - double oldAsk; - double oldBidAmount; - double oldAskAmount; - var allLevels = new List(); - OrderBookChangeInfo change; - - lock (_locker) - { - oldBid = BidPrice; - oldAsk = AskPrice; - oldBidAmount = BidAmount; - oldAskAmount = AskAmount; - - foreach (var bulk in forThis) - { - var levelsForThis = bulk.Levels - .Where(x => TargetPair.Equals(x.Pair)) - .ToArray(); - allLevels.AddRange(levelsForThis); - HandleDiff(bulk, levelsForThis); - } + /// + protected override void HandleSnapshotBidLevel(OrderBookLevel level) => BidLevelsInternal[level.Price!.Value] = level; - RecomputeAfterChangeAndSetIndexes(allLevels); + /// + protected override void HandleSnapshotAskLevel(OrderBookLevel level) => AskLevelsInternal[level.Price!.Value] = level; - change = NotifyAboutBookChange( - allLevels, - forThis, - false - ); - } - - _orderBookUpdated.OnNext(change); - NotifyIfTopLevelChanged(oldBid, oldAsk, oldBidAmount, oldAskAmount, change); - NotifyIfBidAskChanged(oldBid, oldAsk, change); - - if (sw != null) - { - var levels = forThis.SelectMany(x => x.Levels).Count(); - LogDebug($"Diff ({forThis.Length} bulks, {levels} levels) processing took {sw.ElapsedMilliseconds} ms, {sw.ElapsedTicks} ticks"); - } - } - - private void HandleSnapshot(List levels) - { - _bidLevels.Clear(); - _askLevels.Clear(); - _allBidLevels.Clear(); - _allAskLevels.Clear(); - - LogDebug($"Handling snapshot: {levels.Count} levels"); - foreach (var level in levels) - { - var price = level.Price; - if (price == null || price < 0) - { - LogAlways($"Received snapshot level with weird price, ignoring. Id: {level.Id}, price: {level.Price}, amount: {level.Amount}"); - continue; - } - - level.AmountDifference = level.Amount ?? 0; - level.CountDifference = level.Count ?? 0; - - if (level.Side == CryptoOrderSide.Bid) - { - _bidLevels[price.Value] = level; - _allBidLevels[level.Id] = level; - } - - - if (level.Side == CryptoOrderSide.Ask) - { - _askLevels[price.Value] = level; - _allAskLevels[level.Id] = level; - } - } - - RecomputeAfterChangeAndSetIndexes(levels); - - _isSnapshotLoaded = true; - } - - private void HandleDiff(OrderBookLevelBulk bulk, OrderBookLevel[] correctLevels) - { - if (IgnoreDiffsBeforeSnapshot && !_isSnapshotLoaded) - { - LogDebug($"Snapshot not loaded yet, ignoring bulk: {bulk.Action} {correctLevels.Length} levels"); - // snapshot is not loaded yet, ignore data - return; - } - - //LogDebug($"Handling diff, bulk {currentBulk}/{totalBulks}: {bulk.Action} {correctLevels.Length} levels"); - switch (bulk.Action) - { - case OrderBookAction.Insert: - UpdateLevels(correctLevels); - break; - case OrderBookAction.Update: - UpdateLevels(correctLevels); - break; - case OrderBookAction.Delete: - DeleteLevels(correctLevels); - break; - default: - return; - } - } - - private void UpdateLevels(OrderBookLevel[] levels) + /// + protected override void UpdateLevels(IEnumerable levels) { foreach (var level in levels) { @@ -525,29 +113,7 @@ private void UpdateLevels(OrderBookLevel[] levels) continue; } - var amountDiff = (level.Amount ?? existing.Amount ?? 0) - (existing.Amount ?? 0); - var countDiff = (level.Count ?? existing.Count ?? 0) - (existing.Count ?? 0); - - existing.Price = level.Price ?? existing.Price; - existing.Amount = level.Amount ?? existing.Amount; - existing.Count = level.Count ?? existing.Count; - existing.Pair = level.Pair ?? existing.Pair; - - level.AmountDifference = amountDiff; - existing.AmountDifference = amountDiff; - - level.CountDifference = countDiff; - existing.CountDifference = countDiff; - - level.AmountDifferenceAggregated += amountDiff; - existing.AmountDifferenceAggregated += amountDiff; - - level.CountDifferenceAggregated += countDiff; - existing.CountDifferenceAggregated += countDiff; - - level.Amount = level.Amount ?? existing.Amount; - level.Count = level.Count ?? existing.Count; - level.Price = level.Price ?? existing.Price; + ComputeUpdate(existing, level); InsertToCollection(collection, existing); } @@ -569,14 +135,8 @@ private void InsertToCollection(IDictionary collection, level.AmountUpdatedCount++; } - private static bool IsInvalidLevel(OrderBookLevel level) - { - return string.IsNullOrWhiteSpace(level.Id) || - level.Price == null || - level.Amount == null; - } - - private void DeleteLevels(OrderBookLevel[] levels) + /// + protected override void DeleteLevels(IReadOnlyCollection levels) { FillCurrentIndex(levels); @@ -605,301 +165,31 @@ private void DeleteLevels(OrderBookLevel[] levels) allLevels.Remove(level.Id); if (existing != null) - { - var amountDiff = -(existing.Amount ?? 0); - var countDiff = -(existing.Count ?? 0); - - level.Amount = level.Amount ?? existing.Amount; - level.Count = level.Count ?? existing.Count; - level.Price = level.Price ?? existing.Price; - - level.AmountDifference = amountDiff; - existing.AmountDifference = amountDiff; - - level.CountDifference = countDiff; - existing.CountDifference = countDiff; - - level.AmountDifferenceAggregated += amountDiff; - existing.AmountDifferenceAggregated += amountDiff; - - level.CountDifferenceAggregated += countDiff; - existing.CountDifferenceAggregated += countDiff; - } + ComputeDelete(existing, level); } } - private void RecomputeAfterChange() - { - var firstBid = _bidLevels.FirstOrDefault().Value; - var firstAsk = _askLevels.FirstOrDefault().Value; + /// + protected override OrderBookLevel GetFirstBid() => BidLevelsInternal.FirstOrDefault().Value; - BidPrice = firstBid?.Price ?? 0; - BidAmount = firstBid?.Amount ?? 0; + /// + protected override OrderBookLevel GetFirstAsk() => AskLevelsInternal.FirstOrDefault().Value; - AskPrice = firstAsk?.Price ?? 0; - AskAmount = firstAsk?.Amount ?? 0; - } - - private void RecomputeAfterChangeAndSetIndexes(List levels) - { - RecomputeAfterChange(); - FillCurrentIndex(levels); - } - - private void FillCurrentIndex(List levels) - { - if (!IsIndexComputationEnabled) - return; - - foreach (var level in levels) - { - FillIndex(level); - } - } - - private void FillCurrentIndex(OrderBookLevel[] levels) - { - if (!IsIndexComputationEnabled) - return; - - foreach (var level in levels) - { - FillIndex(level); - } - } - - private void FillIndex(OrderBookLevel level) + /// + protected override OrderBookLevel[] ComputeBidLevels() { - if (level.Index.HasValue) - return; - - var price = level.Price; - if (price == null) + lock (Locker) { - var all = GetAllCollection(level.Side); - var existing = all.ContainsKey(level.Id) ? all[level.Id] : null; - price = existing?.Price; + return BidLevelsInternal.Values.ToArray(); } - - if (price == null) - return; - - var collection = GetLevelsCollection(level.Side); - if (collection == null) - return; - - var index = collection.IndexOfKey(price.Value); - level.Index = index; - } - - private OrderBookLevel[] ComputeBidLevels() - { - return _bidLevels.Values.ToArray(); - } - - private OrderBookLevel[] ComputeAskLevels() - { - return _askLevels.Values.ToArray(); - } - - private OrderBookLevel[] ComputeAllLevels() - { - lock (_locker) - { - return _allBidLevels.Concat(_allAskLevels) - .Select(x => x.Value) - .ToArray(); - } - } - - private SortedList GetLevelsCollection(CryptoOrderSide side) - { - if (side == CryptoOrderSide.Undefined) - return null; - return side == CryptoOrderSide.Bid ? - _bidLevels : - _askLevels; - } - - private OrderBookLevelsById GetAllCollection(CryptoOrderSide side) - { - if (side == CryptoOrderSide.Undefined) - return null; - return side == CryptoOrderSide.Bid ? - _allBidLevels : - _allAskLevels; - } - - private OrderBookChangeInfo NotifyAboutBookChange(List levels, OrderBookLevelBulk[] sources, bool isSnapshot) - { - var quotes = new CryptoQuotes(BidPrice, AskPrice, BidAmount, AskAmount); - var clonedLevels = DebugEnabled ? levels.Select(x => x.Clone()).ToArray() : new OrderBookLevel[0]; - var lastSource = sources.LastOrDefault(); - var change = new OrderBookChangeInfo( - TargetPair, - TargetPairOriginal, - quotes, - clonedLevels, - sources, - isSnapshot - ) - { - ExchangeName = lastSource?.ExchangeName, - ServerSequence = lastSource?.ServerSequence, - ServerTimestamp = lastSource?.ServerTimestamp - }; - return change; - } - - private void NotifyIfBidAskChanged(double oldBid, double oldAsk, OrderBookChangeInfo info) - { - if (PriceChanged(oldBid, oldAsk, info)) - { - _bidAskUpdated.OnNext(info); - } - } - - private void NotifyIfTopLevelChanged(double oldBid, double oldAsk, - double oldBidAmount, double oldAskAmount, - OrderBookChangeInfo info) - { - if (PriceChanged(oldBid, oldAsk, info) || AmountChanged(oldBidAmount, oldAskAmount, info)) - { - _topLevelUpdated.OnNext(info); - } - } - - private static bool PriceChanged(double oldBid, double oldAsk, OrderBookChangeInfo info) - { - return !CryptoMathUtils.IsSame(oldBid, info.Quotes.Bid) || - !CryptoMathUtils.IsSame(oldAsk, info.Quotes.Ask); - } - - private static bool AmountChanged(double oldBidAmount, double oldAskAmount, OrderBookChangeInfo info) - { - return !CryptoMathUtils.IsSame(oldBidAmount, info.Quotes.BidAmount) || - !CryptoMathUtils.IsSame(oldAskAmount, info.Quotes.AskAmount); - } - - private async Task ReloadSnapshotWithCheck() - { - if (!_source.LoadSnapshotEnabled) - { - // snapshot loading disabled on the source, do nothing - return; - } - - await ReloadSnapshot(); - } - - private async Task ReloadSnapshot() - { - try - { - DeactivateAutoSnapshotReloading(); - DeactivateValidityChecking(); - await _source.LoadSnapshot(TargetPairOriginal, 10000); - } - catch (Exception e) - { - LogAlways(e, $"Failed to reload snapshot for pair '{TargetPair}', " + - $"error: {e.Message}"); - } - finally - { - RestartAutoSnapshotReloading(); - RestartValidityChecking(); - } - } - - private async Task CheckValidityAndReload() - { - var isValid = IsValid(); - if (isValid) - { - // ob is valid, just reset counter and do nothing - _validityCheckCounter = 0; - return; - } - - _validityCheckCounter++; - if (_validityCheckCounter < ValidityCheckLimit) - { - // invalid state, but still in the check limit interval - // waiting for confirmation - return; - } - _validityCheckCounter = 0; - - LogAlways($"Order book is in invalid state, bid: {BidPrice}, ask: {AskPrice}, " + - "reloading snapshot..."); - await ReloadSnapshot(); - } - - private void RestartAutoSnapshotReloading() - { - DeactivateAutoSnapshotReloading(); - - if (!_snapshotReloadEnabled) - { - // snapshot reloading disabled, do not start timer - return; - } - - var timerMs = (int)SnapshotReloadTimeout.TotalMilliseconds; - _snapshotReloadTimer = new Timer(async _ => await ReloadSnapshotWithCheck(), - null, timerMs, timerMs); - } - - private void DeactivateAutoSnapshotReloading() - { - _snapshotReloadTimer?.Dispose(); - _snapshotReloadTimer = null; - } - - private void RestartValidityChecking() - { - DeactivateValidityChecking(); - - if (!_validityCheckEnabled) - { - // validity checking disabled, do not start timer - return; - } - - var timerMs = (int)ValidityCheckTimeout.TotalMilliseconds; - _validityCheckTimer = new Timer(async _ => await CheckValidityAndReload(), - null, timerMs, timerMs); - } - - private void DeactivateValidityChecking() - { - _validityCheckTimer?.Dispose(); - _validityCheckTimer = null; - } - - private void LogDebug(string msg) - { - if (!DebugLogEnabled) - return; - LogAlways(msg); - } - - private void LogAlways(string msg) - { - _source.Logger.LogDebug("[ORDER BOOK {exchangeName} {targetPair}] {message}", ExchangeName, TargetPair, msg); - } - - private void LogAlways(Exception e, string msg) - { - _source.Logger.LogDebug(e, "[ORDER BOOK {exchangeName} {targetPair}] {message}", ExchangeName, TargetPair, msg); } - private class DescendingComparer : IComparer + /// + protected override OrderBookLevel[] ComputeAskLevels() { - public int Compare(double x, double y) + lock (Locker) { - return y.CompareTo(x); + return AskLevelsInternal.Values.ToArray(); } } } diff --git a/src/Crypto.Websocket.Extensions.Core/OrderBooks/Models/IOrderBookChangeInfo.cs b/src/Crypto.Websocket.Extensions.Core/OrderBooks/Models/IOrderBookChangeInfo.cs index fca3722..959951a 100644 --- a/src/Crypto.Websocket.Extensions.Core/OrderBooks/Models/IOrderBookChangeInfo.cs +++ b/src/Crypto.Websocket.Extensions.Core/OrderBooks/Models/IOrderBookChangeInfo.cs @@ -1,4 +1,5 @@ -using Crypto.Websocket.Extensions.Core.Models; +using System.Collections.Generic; +using Crypto.Websocket.Extensions.Core.Models; namespace Crypto.Websocket.Extensions.Core.OrderBooks.Models { @@ -26,12 +27,12 @@ public interface IOrderBookChangeInfo : ICryptoChangeInfo /// Order book levels that caused the change. /// Streamed only when debug mode is enabled. /// - OrderBookLevel[] Levels { get; } + IReadOnlyList Levels { get; } /// /// Source bulks that caused this update (all levels) /// - OrderBookLevelBulk[] Sources { get; } + IReadOnlyList Sources { get; } /// /// Whenever this order book change update comes from snapshot or diffs diff --git a/src/Crypto.Websocket.Extensions.Core/OrderBooks/Models/OrderBookChangeInfo.cs b/src/Crypto.Websocket.Extensions.Core/OrderBooks/Models/OrderBookChangeInfo.cs index 2853634..67faa0d 100644 --- a/src/Crypto.Websocket.Extensions.Core/OrderBooks/Models/OrderBookChangeInfo.cs +++ b/src/Crypto.Websocket.Extensions.Core/OrderBooks/Models/OrderBookChangeInfo.cs @@ -1,4 +1,5 @@ -using System.Diagnostics; +using System.Collections.Generic; +using System.Diagnostics; using Crypto.Websocket.Extensions.Core.Models; namespace Crypto.Websocket.Extensions.Core.OrderBooks.Models @@ -11,7 +12,7 @@ public class OrderBookChangeInfo : CryptoChangeInfo, IOrderBookChangeInfo { /// public OrderBookChangeInfo(string pair, string pairOriginal, - CryptoQuotes quotes, OrderBookLevel[] levels, OrderBookLevelBulk[] sources, + ICryptoQuotes quotes, IReadOnlyList levels, IReadOnlyList sources, bool isSnapshot) { Pair = pair; @@ -41,12 +42,12 @@ public OrderBookChangeInfo(string pair, string pairOriginal, /// Order book levels that caused the change. /// Streamed only when debug mode is enabled. /// - public OrderBookLevel[] Levels { get; } + public IReadOnlyList Levels { get; } /// /// Source bulks that caused this update (all levels) /// - public OrderBookLevelBulk[] Sources { get; } + public IReadOnlyList Sources { get; } /// /// Whenever this order book change update comes from snapshot or diffs diff --git a/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookL2Tests.cs b/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookL2Tests.cs index 039ff5e..7a4f771 100644 --- a/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookL2Tests.cs +++ b/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookL2Tests.cs @@ -590,11 +590,11 @@ public async Task StreamingData_ShouldNotifyOneByOne() var firstChange = changes.First(); var secondChange = changes[1]; - Assert.Equal(2, firstChange.Levels.Length); + Assert.Equal(2, firstChange.Levels.Count); Assert.Equal(499.4, firstChange.Levels.First().Price); Assert.Equal(500.2, firstChange.Levels.Last().Price); - Assert.Equal(6, secondChange.Levels.Length); + Assert.Equal(6, secondChange.Levels.Count); } [Fact] diff --git a/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookL3PerformanceTests.cs b/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookL3PerformanceTests.cs index 88d79f5..ef05cc0 100644 --- a/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookL3PerformanceTests.cs +++ b/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookL3PerformanceTests.cs @@ -128,7 +128,7 @@ private static long StreamLevels(string pair, OrderBookSourceMock source, ICrypt var newAskPrice = maxBidPrice + (i % maxAskPrice); // update levels - var bulk = GetUpdateBulkL2( + var bulk = GetUpdateBulk(CryptoOrderBookType.L3, CreateLevel(pair, newBidPrice, CryptoOrderSide.Bid, bid.Id), CreateLevel(pair, newAskPrice, CryptoOrderSide.Ask, ask.Id) ); diff --git a/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookTests.cs b/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookTests.cs index c65ad4e..7372695 100644 --- a/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookTests.cs +++ b/test/Crypto.Websocket.Extensions.Tests/CryptoOrderBookTests.cs @@ -590,11 +590,11 @@ public async Task StreamingData_ShouldNotifyOneByOne() var firstChange = changes.First(); var secondChange = changes[1]; - Assert.Equal(2, firstChange.Levels.Length); + Assert.Equal(2, firstChange.Levels.Count); Assert.Equal(499.4, firstChange.Levels.First().Price); Assert.Equal(500.2, firstChange.Levels.Last().Price); - Assert.Equal(6, secondChange.Levels.Length); + Assert.Equal(6, secondChange.Levels.Count); } [Fact] diff --git a/test_integration/Crypto.Websocket.Extensions.Sample/OrderBookExample.cs b/test_integration/Crypto.Websocket.Extensions.Sample/OrderBookExample.cs index d8db0b0..88a90ca 100644 --- a/test_integration/Crypto.Websocket.Extensions.Sample/OrderBookExample.cs +++ b/test_integration/Crypto.Websocket.Extensions.Sample/OrderBookExample.cs @@ -103,7 +103,7 @@ private static void HandleQuoteChanged(IList quotes, bool } var metaString = simple ? string.Empty : $" (" + - $"{x.Sources.Length} " + + $"{x.Sources.Count} " + $"{x.Sources.Sum(y => y.Levels.Length)}" + $"{time})"; return